lib/inngest/function.ex

defmodule Inngest.Function do
  @moduledoc """
  Module to be used within user code to setup an Inngest function.
  Making it servable and invokable.
  """
  alias Inngest.Config
  alias Inngest.Function.{Step, Trigger}

  @doc """
  Returns the function's human-readable ID, such as "sign-up-flow"
  """
  @callback slug() :: String.t()

  @doc """
  Returns the function name
  """
  @callback name() :: String.t()

  @doc """
  Returns the event name or schedule that triggers the function
  """
  @callback trigger() :: Trigger.t()

  @reserved [:run, :step, :sleep]

  defmacro __using__(opts) do
    quote location: :keep do
      unless Inngest.Function.__register__(__MODULE__, unquote(opts)) do
        # placeholder
      end

      alias Inngest.Client
      alias Inngest.Function.{Trigger, Step}
      # import Inngest.Function, only: [step: 2, step: 3]
      import Inngest.Function
      @before_compile unquote(__MODULE__)

      @behaviour Inngest.Function

      @opts unquote(opts)

      # TOOD: Use app name as prefix
      @fn_slug if Keyword.get(@opts, :id),
                 do: Keyword.get(@opts, :id),
                 else:
                   Keyword.get(@opts, :name)
                   |> String.replace(~r/[\.\/\s]+/, "-")
                   |> String.downcase()

      @impl true
      def slug(), do: @fn_slug

      @impl true
      def name(), do: Keyword.get(@opts, :name)

      @impl true
      def trigger(), do: @opts |> Map.new() |> trigger()
      defp trigger(%{event: event} = _opts), do: %Trigger{event: event}
      defp trigger(%{cron: cron} = _opts), do: %Trigger{cron: cron}

      def step(path),
        do: %{
          step: %Step{
            id: :step,
            name: "step",
            runtime: %Step.RunTime{
              url: "#{Config.app_host() <> path}?fnId=#{slug()}&step=step"
            },
            retries: %Step.Retry{}
          }
        }

      def steps(), do: __handler__().steps

      def serve(path) do
        %{
          id: slug(),
          name: name(),
          triggers: [trigger()],
          steps: step(path),
          mod: __MODULE__
        }
      end

      def send(events) do
        # NOTE: keep this for now so we can add things like tracing in the future
        Client.send(events, [])
      end
    end
  end

  def __register__(module, _opts) do
    registered? = Module.has_attribute?(module, :inngest_fn_steps)

    unless registered? do
      accumulate_attributes = [
        :inngest_fn_steps
      ]

      Enum.each(
        accumulate_attributes,
        &Module.register_attribute(module, &1, accumulate: true, persist: true)
      )
    end

    registered?
  end

  @doc """
  Defines a normal execution block with a `message` that is non-deterministic.

  Meaning whenever Inngest asks the SDK to execute, the code block wrapped
  within `run` will always run, no pun intended.

  Hence making it non deterministic, since each execution can yield a different
  result.

  This is best for things that do not need idempotency. The result here will be
  passed on to the next execution unit.

  #### Arguments

  It accepts an optional `map` that includes

  - `event`
  - `data`

  #### Expected output types
      @spec :ok | {:ok, map()} | {:error, map()}

  where the data is a `map` accumulated with outputs from previous executions.

  ## Examples

      run "non deterministic code block", %{event: event, data: data} do
        # do
        # something
        # here

        {:ok, %{result: result}}
      end
  """
  defmacro run(message, var \\ quote(do: _), contents) do
    unless is_tuple(var) do
      IO.warn(
        "step context is always a map. The pattern " <>
          "#{inspect(Macro.to_string(var))} will never match",
        Macro.Env.stacktrace(__CALLER__)
      )
    end

    contents =
      case contents do
        [do: block] ->
          quote do
            unquote(block)
          end

        _ ->
          quote do
            try(unquote(contents))
          end
      end

    var = Macro.escape(var)
    contents = Macro.escape(contents, unquote: true)

    %{module: mod, file: file, line: line} = __CALLER__

    quote bind_quoted: [
            var: var,
            contents: contents,
            message: message,
            mod: mod,
            file: file,
            line: line
          ] do
      slug = Inngest.Function.register_step(mod, file, line, :exec_run, message, [])
      def unquote(slug)(unquote(var)), do: unquote(contents)
    end
  end

  @doc """
  Defines a deterministic execution block with a `message`.

  This is exactly the same as `Inngest.Function.run/3`, except the code within
  the `step` blocks are always guaranteed to be executed once.

  Subsequent calls to the SDK will not execute and uses the previously executed
  result.

  If the code block returns an error or raised an exception, it will be retried.

  #### Arguments

  It accepts an optional `map` that includes

  - `event`
  - `data`

  #### Expected output types
      @spec :ok | {:ok, map()} | {:error, map()}

  where the data is a `map` accumulated with outputs from previous executions.

  ## Examples
      step "idempotent code block", %{event: event, data: data} do
        # do
        # something
        # here

        {:ok, %{result: result}}
      end
  """
  defmacro step(message, var \\ quote(do: _), contents) do
    unless is_tuple(var) do
      IO.warn(
        "step context is always a map. The pattern " <>
          "#{inspect(Macro.to_string(var))} will never match",
        Macro.Env.stacktrace(__CALLER__)
      )
    end

    contents =
      case contents do
        [do: block] ->
          quote do
            unquote(block)
          end

        _ ->
          quote do
            try(unquote(contents))
          end
      end

    var = Macro.escape(var)
    contents = Macro.escape(contents, unquote: true)

    %{module: mod, file: file, line: line} = __CALLER__

    quote bind_quoted: [
            var: var,
            contents: contents,
            message: message,
            mod: mod,
            file: file,
            line: line
          ] do
      slug = Inngest.Function.register_step(mod, file, line, :step_run, message)

      def unquote(slug)(unquote(var)), do: unquote(contents)
    end
  end

  @doc """
  Pauses the function execution until the specified DateTime.

  Expected valid datetime string formats are:
  - `RFC3389`
  - `RFC1123`
  - `RFC882`
  - `UNIX`
  - `ANSIC`
  - `ISOdate`

  ## Examples
      sleep "sleep until 2023-10-25", %{event: event, data: data} do
        # do something to caculate time

        # return the specified time that it should sleep until
        "2023-07-18T07:31:00Z"
      end
  """
  defmacro sleep(message, var \\ quote(do: _), contents) do
    unless is_tuple(var) do
      IO.warn(
        "step context is always a map. The pattern " <>
          "#{inspect(Macro.to_string(var))} will never match",
        Macro.Env.stacktrace(__CALLER__)
      )
    end

    contents =
      case contents do
        [do: block] ->
          quote do
            unquote(block)
          end

        _ ->
          quote do
            try(unquote(contents))
          end
      end

    var = Macro.escape(var)
    contents = Macro.escape(contents, unquote: true)

    %{module: mod, file: file, line: line} = __CALLER__

    quote bind_quoted: [
            var: var,
            contents: contents,
            message: message,
            mod: mod,
            file: file,
            line: line
          ] do
      slug = Inngest.Function.register_step(mod, file, line, :step_sleep, message, execute: true)

      def unquote(slug)(unquote(var)), do: unquote(contents)
    end
  end

  @doc """
  Set a duration to pause the execution of your function.

  Valid durations are combination of
  - `s` - second
  - `m` - minute
  - `h` - hour
  - `d` - day

  ## Examples
      sleep "2s"
      sleep "1d"
      sleep "5m"
      sleep "1h30m"
  """
  defmacro sleep(duration) do
    %{module: mod, file: file, line: line} = __CALLER__

    # Add differentiator for sleeps with potentially similar duration
    idx = Module.get_attribute(mod, :inngest_sleep_idx, 0)
    Module.put_attribute(mod, :inngest_sleep_idx, idx + 1)

    quote bind_quoted: [duration: duration, mod: mod, file: file, line: line, idx: idx] do
      slug = Inngest.Function.register_step(mod, file, line, :step_sleep, duration, idx: idx)
      def unquote(slug)(), do: nil
    end
  end

  @doc """
  Pause function execution until a particular event is received before continuing.

  It returns the accepted event object or `nil` if the event is not received within
  the timeout.

  The event name will be used as the key for storing the returned event for subsequent
  execution units.

  ## Examples

      wait_for_event "auth/signup.email.confirmed", %{event: event, data: data} do
        match = "user.id"
        [timeout: "1d", if: "event.\#{match} == async.\#{match}"]
      end

      # or in a shorter version
      wait_for_event "auth/signup.email.confirmed", do: [timeout: "1d", match: "user.id"]
  """
  defmacro wait_for_event(event_name, var \\ quote(do: _), contents) do
    unless is_tuple(var) do
      IO.warn(
        "step context is always a map. The pattern " <>
          "#{inspect(Macro.to_string(var))} will never match",
        Macro.Env.stacktrace(__CALLER__)
      )
    end

    contents =
      case contents do
        [do: block] ->
          quote do
            unquote(block)
          end

        _ ->
          quote do
            try(unquote(contents))
          end
      end

    var = Macro.escape(var)
    contents = Macro.escape(contents, unquote: true)

    %{module: mod, file: file, line: line} = __CALLER__

    quote bind_quoted: [
            var: var,
            contents: contents,
            event_name: event_name,
            mod: mod,
            file: file,
            line: line
          ] do
      slug = Inngest.Function.register_step(mod, file, line, :step_wait_for_event, event_name)

      def unquote(slug)(unquote(var)), do: unquote(contents)
    end
  end

  def register_step(mod, file, line, step_type, name, tags \\ []) do
    unless Module.has_attribute?(mod, :inngest_fn_steps) do
      raise "cannot define #{step_type}. Please make sure you have invoked " <>
              "\"use Inngest.Function\" in the current module"
    end

    opts =
      if step_type == :step_wait_for_event,
        do: tags |> normalize_tags(),
        else: %{}

    slug =
      case Keyword.get(tags, :idx) do
        nil -> validate_step_name("#{step_type} #{name}")
        idx -> validate_step_name("#{step_type} #{name} #{idx}")
      end

    if Module.defines?(mod, {slug, 1}) do
      raise ~s("#{slug}" is already defined in #{inspect(mod)})
    end

    tags =
      tags
      |> normalize_tags()
      |> validate_tags()
      |> Map.merge(%{
        file: file,
        line: line
      })

    fn_slug = Module.get_attribute(mod, :fn_slug)

    step = %Step{
      id: slug,
      name: name,
      step_type: step_type,
      opts: opts,
      tags: tags,
      mod: mod,
      runtime: %Step.RunTime{
        url: "#{Config.app_host()}/api/inngest?fnId=#{fn_slug}&step=#{slug}"
      },
      retries: %Step.Retry{}
    }

    Module.put_attribute(mod, :inngest_fn_steps, step)

    slug
  end

  defmacro __before_compile__(env) do
    steps =
      env.module
      |> Module.get_attribute(:inngest_fn_steps)
      |> Enum.reverse()
      |> Macro.escape()

    quote do
      def __handler__ do
        %Inngest.Function.Handler{
          file: __ENV__.file,
          mod: __MODULE__,
          steps: unquote(steps)
        }
      end
    end
  end

  def validate_datetime(datetime) when is_binary(datetime) do
    with {:error, _} <- Timex.parse(datetime, "{RFC3339}"),
         {:error, _} <- Timex.parse(datetime, "{YYYY}-{MM}-{DD}T{h24}:{mm}:{ss}"),
         {:error, _} <- Timex.parse(datetime, "{RFC1123}"),
         {:error, _} <- Timex.parse(datetime, "{RFC822}"),
         {:error, _} <- Timex.parse(datetime, "{RFC822z}"),
         # "Monday, 02-Jan-06 15:04:05 MST"
         {:error, _} <- Timex.parse(datetime, "{WDfull}, {D}-{Mshort}-{YY} {ISOtime} {Zname}"),
         # "Mon Jan 02 15:04:05 -0700 2006"
         {:error, _} <- Timex.parse(datetime, "{WDshort} {Mshort} {DD} {ISOtime} {Z} {YYYY}"),
         {:error, _} <- Timex.parse(datetime, "{UNIX}"),
         {:error, _} <- Timex.parse(datetime, "{ANSIC}"),
         # "Jan _2 15:04:05"
         # "Jan _2 15:04:05.000"
         {:error, _} <- Timex.parse(datetime, "{Mshort} {_D} {ISOtime}"),
         # {:error, _} <- Timex.parse(datetime, "{Mshort} {_D} {ISOtime}"),
         {:error, _} <- Timex.parse(datetime, "{ISOdate}") do
      {:error, "Unknown format for DateTime"}
    else
      {:ok, _val} ->
        {:ok, datetime}

      _ ->
        {:error, "Unknown result"}
    end
  end

  # TODO: Allow parsing DateTime, Date
  # def validate_datetime(%DateTime{} = datetime) do
  # end

  def validate_datetime(_), do: {:error, "Expect valid DateTime formatted input"}

  defp normalize_tags(tags) do
    tags
    |> Enum.reverse()
    |> Enum.reduce(%{}, fn
      {key, value}, acc -> Map.put(acc, key, value)
      tag, acc when is_atom(tag) -> Map.put(acc, tag, true)
      tag, acc when is_list(tag) -> Enum.into(tag, acc)
    end)
  end

  defp validate_tags(tags) do
    for tag <- @reserved, Map.has_key?(tags, tag) do
      raise "cannot set tag #{inspect(tag)} because it is reserved by Inngest.Function"
    end

    unless is_atom(tags[:step_type]) do
      raise("value for tag \":step_type\" must be an atom")
    end

    tags
  end

  defp validate_step_name(name) do
    try do
      name
      |> String.replace(~r/(\s|\:)+/, "_")
      |> String.downcase()
      |> String.to_atom()
    rescue
      SystemLimitError ->
        # credo:disable-for-next-line
        raise SystemLimitError, """
        the computed name of a step (which includes its type, \
        block if present, and the step name itself) must be shorter than 255 characters, \
        got: #{inspect(name)}
        """
    end
  end
end