Skip to main content

lib/gen_durable/fsm.ex

defmodule GenDurable.FSM do
  @moduledoc """
  Behaviour for a durable FSM — or, in its degenerate one-step form, a durable job.

  A module defines **either** `step/2` (a state machine) **or** `perform/1`/`perform/2`
  (a one-shot job), never both.

  ## Job form (one step)

  Define `perform/1` or `perform/2` and you get a durable job — no step names, no
  outcome tuples:

      defmodule Cleanup do
        use GenDurable.FSM

        @impl true
        def perform(args, _ctx) do
          File.rm_rf!(args["path"])
          :ok
        end
      end

      GenDurable.insert(Cleanup, args: %{"path" => "/tmp/x"})

  `perform` receives the instance args (the plain map passed as `:args`/`:state`,
  or the typed struct if a `State` schema is declared) and, in the `/2` form, the
  `t:GenDurable.Context.t/0`. It returns:

    * `:ok` / `{:ok, result_map}` — the job is `done`.
    * `{:error, reason}` — retried with `backoff/1` until `:max_attempts`, then `failed`.
    * `{:cancel, reason}` — `failed` immediately, no retry.
    * a raised exception — treated as `{:error, exception}` (routed through `handle/2`).

  `:max_attempts` (default `20`) and `backoff/1` (default a capped exponential)
  tune retries. Override `backoff/1` to change the schedule.

  ## FSM form (many steps)

      defmodule Checkout do
        use GenDurable.FSM, version: 1, queue: "checkout"

        defmodule State do
          use GenDurable.State

          embedded_schema do
            field :n, :integer, default: 0
          end
        end

        @impl true
        def step("start", %{state: s}), do: {:await, "payment_confirmed", "ship", %{s | n: s.n + 1}}
        def step("ship", ctx), do: {:done, %{"shipped" => true, "paid" => hd(ctx.awaited).payload}}

        @impl true
        def handle(reason, ctx) do
          if ctx.attempt < 5, do: {:retry, ctx.state, backoff(ctx.attempt)}, else: {:stop, reason}
        end
      end

  ## `use` options

    * `:name`    — FSM name stored in the `fsm` column (default: `inspect(module)`).
    * `:version` — `fsm_version` (default `1`). Old versions coexist as separate
      registered modules; see `GenDurable.Registry`.
    * `:queue`   — default queue for instances (default `"default"`).
    * `:state`   — the `GenDurable.State` embedded-schema module. Optional: if a
      nested `State` schema module is defined inside the FSM (as above) it is picked
      up by convention, so you rarely pass this. Omit both for plain-map state.
    * `:initial` — initial step for `GenDurable.insert/2` (default `"start"`).
    * `:max_attempts` — job retry cap (default `20`). Job form only.

  For the FSM form, `handle/2` defaults to `{:stop, reason}` and is overridable.
  For the job form, both `handle/2` (retry-with-backoff) and `backoff/1` are
  generated and overridable; overriding `handle/2` drops to the FSM outcome contract.
  """

  alias GenDurable.{Context, Outcome}

  @type result :: :ok | {:ok, map()} | {:error, term()} | {:cancel, term()}

  @callback step(step :: String.t(), ctx :: Context.t()) :: Outcome.t() | term()
  @callback perform(args :: term()) :: result()
  @callback perform(args :: term(), ctx :: Context.t()) :: result()
  @callback handle(reason :: term(), ctx :: Context.t()) :: Outcome.t() | term()
  @callback backoff(attempt :: non_neg_integer()) :: non_neg_integer()

  @optional_callbacks step: 2, perform: 1, perform: 2, handle: 2, backoff: 1

  defmacro __using__(opts) do
    quote do
      @behaviour GenDurable.FSM
      @before_compile GenDurable.FSM
      @gd_opts unquote(opts)

      def __gd_name__, do: Keyword.get(@gd_opts, :name, inspect(__MODULE__))
      def __gd_version__, do: Keyword.get(@gd_opts, :version, 1)
      def __gd_queue__, do: Keyword.get(@gd_opts, :queue, "default")
      def __gd_initial__, do: Keyword.get(@gd_opts, :initial, "start")
    end
  end

  # Decide the module's shape from what it defines, and generate the rest. Runs
  # after the module body, so the nested `State` module (and the user's step/perform/
  # handle/backoff) are all visible. Everything emitted here is a compile-time
  # constant or a thin bridge — no runtime cost beyond the user's own code.
  @doc false
  defmacro __before_compile__(env) do
    mod = env.module
    opts = Module.get_attribute(mod, :gd_opts)

    has_step = Module.defines?(mod, {:step, 2})
    has_perform = Module.defines?(mod, {:perform, 2}) or Module.defines?(mod, {:perform, 1})

    cond do
      has_step and has_perform ->
        raise CompileError,
          file: env.file,
          line: env.line,
          description:
            "#{inspect(mod)} defines both step/2 and perform/_; use exactly one " <>
              "(step/2 for a machine, perform/1|2 for a one-shot job)"

      has_step ->
        quote do
          unquote(state_def(opts, mod))
          unquote(unless_defined(mod, {:handle, 2}, fsm_handle()))
        end

      has_perform ->
        quote do
          unquote(state_def(opts, mod))
          unquote(job_step(mod, opts))
          unquote(unless_defined(mod, {:backoff, 1}, job_backoff()))
          unquote(unless_defined(mod, {:handle, 2}, job_handle(opts)))
        end

      true ->
        raise CompileError,
          file: env.file,
          line: env.line,
          description:
            "#{inspect(mod)} must define step/2 (a machine) or perform/1|2 (a one-shot job)"
    end
  end

  # --- compile-time codegen helpers ------------------------------------------

  # Resolve the state schema: explicit `:state` wins; else a nested `State` schema.
  defp state_def(opts, mod) do
    explicit = Keyword.get(opts, :state)
    nested = Module.concat(mod, "State")

    state =
      explicit ||
        if Code.ensure_loaded?(nested) and function_exported?(nested, :__schema__, 1),
          do: nested

    quote do
      def __gd_state__, do: unquote(state)
    end
  end

  defp unless_defined(mod, fa, ast), do: unless(Module.defines?(mod, fa), do: ast)

  defp fsm_handle do
    quote do
      @impl true
      def handle(reason, _ctx), do: {:stop, reason}
    end
  end

  defp job_step(mod, opts) do
    max = Keyword.get(opts, :max_attempts, 20)

    call =
      if Module.defines?(mod, {:perform, 2}),
        do: quote(do: perform(ctx.state, ctx)),
        else: quote(do: perform(ctx.state))

    quote do
      @impl true
      def step(_step, ctx) do
        GenDurable.FSM.__outcome__(unquote(call), ctx, unquote(max), &backoff/1)
      end
    end
  end

  defp job_handle(opts) do
    max = Keyword.get(opts, :max_attempts, 20)

    quote do
      @impl true
      def handle(reason, ctx) do
        GenDurable.FSM.__retry__(reason, ctx, unquote(max), &backoff/1)
      end
    end
  end

  defp job_backoff do
    quote do
      @impl true
      def backoff(attempt), do: GenDurable.FSM.__backoff__(attempt)
    end
  end

  # --- runtime bridge (called from generated job code) -----------------------

  @doc false
  def __outcome__(result, ctx, max_attempts, backoff_fun) do
    case result do
      :ok -> {:done, %{}}
      {:ok, map} when is_map(map) -> {:done, map}
      {:error, reason} -> __retry__(reason, ctx, max_attempts, backoff_fun)
      {:cancel, reason} -> {:stop, reason}
      other -> {:stop, "perform returned an invalid value: #{inspect(other)}"}
    end
  end

  @doc false
  def __retry__(reason, ctx, max_attempts, backoff_fun) do
    if ctx.attempt + 1 < max_attempts do
      {:retry, ctx.state, backoff_fun.(ctx.attempt)}
    else
      {:stop, reason}
    end
  end

  @doc false
  def __backoff__(attempt), do: min(1000 * Integer.pow(2, attempt), 300_000)
end