Skip to main content

lib/gen_durable.ex

defmodule GenDurable do
  @moduledoc """
  Postgres-backed durable execution: an FSM whose state is committed to Postgres
  before each step proceeds, so instances survive process and node death and resume
  where they left off.

  An FSM is a database row, not a process — there is no GenServer per instance; each
  step runs as an ephemeral task. The runtime backbone (scheduler, reaper, GC) is a
  small set of GenServers that pick runnable rows and dispatch them.

  Start the engine in a host supervision tree:

      {GenDurable, repo: MyApp.Repo, queues: [default: 10, checkout: 5]}

  FSMs are resolved from the row (the `fsm` column defaults to the module name);
  pass `:fsms` only for a custom `:name` or to keep an old `:version` running.

  Then enqueue instances and deliver signals — address a signal by the returned
  internal id, or by the `:correlation_key` you set at insert:

      {:ok, _id} =
        GenDurable.insert(Checkout, state: %{order: 42}, correlation_key: "order:42")

      :ok = GenDurable.signal("order:42", "payment_confirmed", %{amount: 100}, dedup_key: "evt-7")

  ## Telemetry

  Attach to these `:telemetry` events (`[:gen_durable, …]`):

    * `[:gen_durable, :step, :stop]` — a step finished. Measurements `%{duration}`
      (native units); metadata `%{id, fsm, step, kind}` where `kind` is the outcome
      (`:next` · `:retry` · `:await` · `:schedule_childs` · `:done` · `:stop`).
    * `[:gen_durable, :pick, :stop]` — a picker batch ran. Measurements
      `%{count, demand}`; metadata `%{queue, worker}`. Watch `count` vs `demand` to
      see how full picks are.
    * `[:gen_durable, :scheduler, :saturation]` — per-poll gauge. Measurements
      `%{in_flight, buffer, concurrency, prefetch}`; metadata `%{queue}`. The signal
      for tuning the feeder knobs.
    * `[:gen_durable, :scheduler, :drain]` — graceful shutdown of a queue.
      Measurements `%{released, in_flight}`; metadata `%{queue}`.
    * `[:gen_durable, :concurrency, :contended]` — a concurrency_key advisory lock
      was contended (a row was handed back). Measurements `%{count}`; metadata
      `%{id, fsm, concurrency_key}`.
    * `[:gen_durable, :reaper, :reaped]` — expired leases reclaimed. Measurements
      `%{count}`; metadata `%{ids}`.
    * `[:gen_durable, :gc, :swept]` — terminal rows deleted by a GC sweep.
      Measurements `%{count}`; metadata `%{}`.
    * `[:gen_durable, :rate_limit, :throttled]` — a rate-limit bucket granted fewer rows
      than wanted in a pick (spec §12). Measurements `%{wanted, granted}`; metadata
      `%{key, queue}`. The signal that a limit is biting.
    * `[:gen_durable, :rate_limit, :unknown]` — a step named a `rate_limit` whose name has
      no configured policy (the row would stall). Measurements `%{count}`; metadata
      `%{key, name, fsm, step}`.

  See `gen_durable_spec.md` (normative) and `gen_durable_plan.md` (roadmap).
  """

  alias GenDurable.{Queries, State}

  # --- engine lifecycle ------------------------------------------------------

  defdelegate start_link(opts), to: GenDurable.Supervisor

  def child_spec(opts) do
    %{
      id: Keyword.get(opts, :name, GenDurable.Supervisor),
      start: {GenDurable.Supervisor, :start_link, [opts]},
      type: :supervisor
    }
  end

  # --- public API ------------------------------------------------------------

  @doc """
  Enqueue one FSM instance. Options: `:state` (alias `:args`, the job-form name),
  `:step` (default the FSM's `:initial`), `:queue`, `:priority`, `:concurrency_key`,
  `:correlation_key`, `:correlation_scope`, and scheduling — `:eligible_at` (a `DateTime`),
  or the sugar `:schedule_at` (a `DateTime`) / `:schedule_in` (milliseconds from now).
  Returns `{:ok, id}` or `{:error, :duplicate}`.

  `:correlation_key` is the instance's business identity — both the key you can later
  `signal/4` by (instead of the internal id) and a uniqueness guard. `:correlation_scope`
  is the list of statuses in which the key is "occupied": uniqueness is enforced and the
  signal address resolves only while the instance sits in one of them. It defaults to the
  non-terminal statuses (unique among live instances; freed on termination); pass `[]` to
  disable uniqueness, or include `:done`/`:failed` to keep the key reserved after the
  instance ends. A duplicate within the occupied scope is rejected as `{:error, :duplicate}`.
  With no `:correlation_key` the instance is neither addressable nor deduplicated.
  """
  def insert(fsm_module, opts \\ []) do
    repo = opts[:repo] || config().repo
    Queries.insert(repo, build_params(fsm_module, opts))
  end

  @doc """
  Batch-enqueue instances in a single statement (dedup via the partial unique
  index). `entries` is a list of per-instance option keyword lists. Returns the
  list of inserted ids (duplicates dropped).
  """
  def insert_all(fsm_module, entries, opts \\ []) do
    repo = opts[:repo] || config().repo
    Queries.insert_all(repo, Enum.map(entries, &build_params(fsm_module, &1)))
  end

  @doc """
  Deliver a durable signal to an instance (spec §5). Wakes the instance only on
  a name match. `:dedup_key` (default `nil`) makes redelivery idempotent.

  `target` is either the internal id (an integer) or a `:correlation_key` (a string) set
  at insert. Addressing by `correlation_key` resolves to the single instance currently
  occupying it (per its `:correlation_scope`); if none exists it returns `{:error, :no_target}`
  (a freed/terminal key can no longer be woken, and a signal is not held for an instance
  that does not exist yet). Returns `:ok` otherwise.
  """
  def signal(target, name, payload \\ %{}, opts \\ []) do
    repo = opts[:repo] || config().repo

    Queries.deliver_signal(
      repo,
      target,
      to_string(name),
      Jason.encode!(payload),
      opts[:dedup_key]
    )
  end

  # --- helpers ---------------------------------------------------------------

  @doc false
  def build_params(fsm_module, opts) do
    state_module = fsm_module.__gd_state__()

    %{
      fsm: fsm_module.__gd_name__(),
      fsm_version: fsm_module.__gd_version__(),
      step: opts[:step] || fsm_module.__gd_initial__(),
      state_json: State.cast(state_module, opts[:state] || opts[:args] || %{}),
      queue: opts[:queue] || fsm_module.__gd_queue__(),
      priority: opts[:priority] || 0,
      concurrency_key: opts[:concurrency_key],
      correlation_key: opts[:correlation_key],
      correlation_scope: correlation_scope(opts),
      rate_limit: rate_limit_key(opts[:rate_limit]),
      weight: opts[:weight] || 1,
      eligible_at: resolve_eligible_at(opts)
    }
  end

  # Resolve an insert-time `:rate_limit` key (spec §12): nil | name | {name, partition}.
  defp rate_limit_key(nil), do: nil
  defp rate_limit_key(name) when is_binary(name) or is_atom(name), do: to_string(name)
  defp rate_limit_key({name, partition}), do: "#{name}:#{partition}"

  # The statuses in which a correlation_key is "occupied": the engine enforces uniqueness
  # over them and resolves the signal address within them. Supplied directly as
  # :correlation_scope; defaults to the non-terminal statuses (unique among live instances,
  # freed on termination). Pass [] for a key with no uniqueness, or include :done/:failed
  # to keep it reserved after the instance ends.
  @live_statuses ~w(runnable executing awaiting_signal awaiting_children)

  defp correlation_scope(opts),
    do: Enum.map(opts[:correlation_scope] || @live_statuses, &to_string/1)

  # Scheduling sugar. Precedence: explicit :eligible_at, then :schedule_at
  # (a DateTime), then :schedule_in (milliseconds from now). nil ⇒ now() in SQL.
  defp resolve_eligible_at(opts) do
    cond do
      opts[:eligible_at] -> opts[:eligible_at]
      opts[:schedule_at] -> opts[:schedule_at]
      opts[:schedule_in] -> DateTime.add(DateTime.utc_now(), opts[:schedule_in], :millisecond)
      true -> nil
    end
  end

  defp config, do: :persistent_term.get({GenDurable, :config})
end