Skip to main content

lib/continuum.ex

defmodule Continuum do
  @moduledoc """
  OTP-native durable execution engine for Elixir.

  Continuum lets you write a multi-step business process as straight-line
  Elixir code. The process survives crashes, node restarts, and partitions:
  the engine journals each effect to Postgres and replays the workflow's
  history through the same orchestration code on resume.

  See `Continuum.Workflow` for the workflow DSL and `Continuum.Activity` for
  activities (the only place side effects are allowed inside a workflow).

  ## Public API

    * `children/1` — Postgres runtime child specs for host supervision trees
    * `start/3` — start a new workflow run
    * `signal/3` — deliver an external signal to a running workflow
    * `cancel/2` — cancel a running workflow
    * `await/2` — block until a workflow completes (test/synchronous use)
    * `query/1` and `get_run/2` — inspect durable runs
    * `set_attributes/3` — externally update run search attributes
    * `now/0`, `uuid4/0`, `random/0`, `side_effect/1` — deterministic primitives
      callable from workflow code
    * `patched?/1` — journaled patch marker for compatible workflow changes
  """

  alias Continuum.Runtime.{Context, Effect}

  @type run_id :: binary()
  @type workflow_module :: module()
  @type input :: term()

  @doc """
  Returns runtime child specs for a named, non-default Continuum instance.

      children =
        [
          MyApp.Repo,
          Continuum.children(name: :billing_continuum, repo: MyApp.Repo)
        ]

  The default `Continuum` instance is owned by `Continuum.Application` and
  `Continuum.children()` returns `[]` to avoid duplicate process names.

  A named instance given a `:repo` uses the Postgres journal for every run
  started, signalled, cancelled, or awaited through it; pass `:journal` to
  override. The default instance follows `config :continuum, :journal`.

  Child-specific options may be passed with `:workflow_modules`,
  `:activity_executor`, `:heartbeater`, `:run_supervisor`,
  `:activity_supervisor`, `:recovery`, `:dispatcher`, `:activity_dispatcher`,
  `:timer_wheel`, `:signal_router`, and `:snapshotter`.
  Passing `false` for a child omits it from the returned list.
  """
  @spec children(keyword()) :: [Supervisor.child_spec()]
  def children(opts \\ []) do
    name = Keyword.get(opts, :name, Continuum)

    if name == Continuum do
      []
    else
      instance =
        Continuum.Runtime.Instance.new(
          name: name,
          repo: opts[:repo],
          journal: opts[:journal],
          activity_executor: Keyword.get(opts, :activity_executor, :builtin),
          workflow_modules: opts[:workflow_modules]
        )
        |> Continuum.Runtime.Instance.register()

      [
        Supervisor.child_spec({Phoenix.PubSub, name: instance.pubsub},
          id: {Phoenix.PubSub, instance.name}
        ),
        Supervisor.child_spec({Registry, keys: :unique, name: instance.registry},
          id: {Registry, instance.name}
        ),
        child(
          Continuum.Runtime.Lease.Heartbeater,
          Keyword.get(opts, :heartbeater, []),
          instance
        ),
        child(
          Continuum.Runtime.RunSupervisor,
          Keyword.get(opts, :run_supervisor, []),
          instance
        ),
        activity_supervisor_child(opts, instance),
        child(Continuum.Runtime.Recovery, Keyword.get(opts, :recovery, []), instance),
        child(Continuum.Runtime.Dispatcher, Keyword.get(opts, :dispatcher, []), instance),
        child(
          Continuum.Runtime.ActivityWorker.Dispatcher,
          Keyword.get(opts, :activity_dispatcher, []),
          instance
        ),
        child(Continuum.Runtime.Snapshotter, Keyword.get(opts, :snapshotter, []), instance),
        child(Continuum.Runtime.TimerWheel, Keyword.get(opts, :timer_wheel, []), instance),
        child(Continuum.Runtime.SignalRouter, Keyword.get(opts, :signal_router, []), instance),
        child(Continuum.VersionRegistry, Keyword.get(opts, :version_registry, []), instance)
      ]
      |> Enum.reject(&is_nil/1)
    end
  end

  @doc """
  Start a new workflow run.

  Options include `:instance` for selecting a named Continuum instance,
  `:namespace` for soft tenant scoping of list/query paths, `:trace_context` for
  persisting an opaque W3C traceparent binary that
  observability integrations can use to link resumed run attempts, and
  `:attributes` for JSON-encodable search metadata stored on the run row.
  """
  @spec start(workflow_module(), input(), keyword()) :: {:ok, run_id()} | {:error, term()}
  def start(workflow_module, input, opts \\ []) do
    Continuum.Runtime.Engine.start_run(workflow_module, input, opts)
  end

  @doc """
  Deliver a signal to a running workflow.
  """
  @spec signal(run_id(), atom(), term()) :: :ok | {:error, term()}
  def signal(run_id, name, payload) do
    signal(run_id, name, payload, [])
  end

  @doc """
  Deliver a signal to a running workflow, selecting a Continuum instance with
  `:instance`.
  """
  @spec signal(run_id(), atom(), term(), keyword()) :: :ok | {:error, term()}
  def signal(run_id, name, payload, opts) do
    Continuum.Runtime.SignalRouter.deliver(run_id, name, payload, opts)
  end

  @doc """
  Cancel a running workflow.

  Cancelling the root of a `continue_as_new` chain cancels the live tip. When
  the run's engine is alive on another reachable node, the cancel is forwarded
  to it. If the owner holds a live lease but no engine can be reached
  (partition, overload), the request is recorded durably and honored by the
  owning engine on its next lease heartbeat — the call returns
  `{:error, :owned_elsewhere}` so the caller knows cancellation is pending
  rather than complete.
  """
  @spec cancel(run_id(), keyword()) :: :ok | {:error, term()}
  def cancel(run_id, opts \\ []) do
    Continuum.Runtime.Engine.cancel(run_id, opts)
  end

  @doc """
  Block until the run completes. Test/synchronous use only.
  """
  @spec await(run_id(), timeout(), keyword()) :: {:ok, map()} | {:error, term()}
  def await(run_id, timeout \\ 5_000, opts \\ []) do
    Continuum.Runtime.Engine.await(run_id, timeout, opts)
  end

  @doc """
  Query durable runs with a closed, structured query spec.

  See `Continuum.Query` for supported `:where`, `:order_by`, and pagination
  options. Querying requires a Postgres-backed Continuum instance.
  """
  @spec query(keyword()) :: {:ok, map()} | {:error, term()}
  def query(opts \\ []) do
    Continuum.Query.list(opts)
  end

  @doc """
  Query durable runs for a named Continuum instance.
  """
  @spec query(atom() | Continuum.Runtime.Instance.t(), keyword()) ::
          {:ok, map()} | {:error, term()}
  def query(instance, opts) do
    Continuum.Query.list(Keyword.put(opts, :instance, instance))
  end

  @doc """
  Load one durable run by id.
  """
  @spec get_run(run_id(), keyword()) :: {:ok, map()} | {:error, :not_found | term()}
  def get_run(run_id, opts \\ []) do
    Continuum.Query.get_run(run_id, opts)
  end

  @doc """
  Merge JSON-encodable search attributes into a durable run row.

  This is external metadata. It is not journaled and workflow code cannot read
  it during replay.
  """
  @spec set_attributes(run_id(), map(), keyword()) :: :ok | {:error, term()}
  def set_attributes(run_id, attributes, opts \\ []) do
    Continuum.Query.set_attributes(run_id, attributes, opts)
  end

  # ---------------------------------------------------------------------------
  # Deterministic primitives — callable only from inside a workflow process.
  # Each consults the journal first; on first execution journals the value;
  # on replay returns the journaled value.
  # ---------------------------------------------------------------------------

  @doc """
  The current wall-clock time, journaled and replayed deterministically.
  """
  defmacro now do
    command = command_base(__CALLER__, :now)

    quote do
      Continuum.Runtime.Effect.run(
        {:side_effect, :now},
        {:command, unquote(Macro.escape(command)), &DateTime.utc_now/0}
      )
    end
  end

  @doc """
  The current UTC date, journaled and replayed deterministically.
  """
  defmacro today do
    command = command_base(__CALLER__, :today)

    quote do
      Continuum.Runtime.Effect.run(
        {:side_effect, :today},
        {:command, unquote(Macro.escape(command)), &Date.utc_today/0}
      )
    end
  end

  @doc """
  A v4 UUID, journaled and replayed deterministically.
  """
  defmacro uuid4 do
    command = command_base(__CALLER__, :uuid4)

    quote do
      Continuum.Runtime.Effect.run(
        {:side_effect, :uuid4},
        {:command, unquote(Macro.escape(command)), &Continuum.__generate_uuid4__/0}
      )
    end
  end

  @doc """
  A pseudo-random float in [0, 1), journaled and replayed deterministically.
  """
  defmacro random do
    command = command_base(__CALLER__, :random)

    quote do
      Continuum.Runtime.Effect.run(
        {:side_effect, :random},
        {:command, unquote(Macro.escape(command)), &:rand.uniform_real/0}
      )
    end
  end

  @doc """
  General-purpose escape hatch for an impure read whose result must be
  journaled and replayed.

  The function is called once on first execution; its return value is
  journaled and returned on every subsequent replay. Return values must be
  serializable via `:erlang.term_to_binary/1` — pids, refs, ports, and
  similar local-only terms are rejected.

  This is a macro so Continuum can capture the source call site for a stable
  command identity. Workflow modules that `use Continuum.Workflow` already
  require `Continuum`; other modules must `require Continuum` before calling it.

  > #### Helper-module caveat {: .warning}
  >
  > Command identity includes the call site's module and line. Inside a
  > *workflow* module that is safe: any edit changes the version hash and
  > in-flight runs keep resuming through the old version's entrypoint. A
  > `Continuum.Pure` helper module has no such protection — editing a helper
  > so that a `side_effect` call moves to a different line changes its command
  > identity and in-flight runs replaying through it raise
  > `Continuum.ReplayDriftError` on the next deploy. Prefer keeping
  > `side_effect` calls in the workflow module itself.
  """
  defmacro side_effect(fun) do
    command = command_base(__CALLER__, :user)

    quote do
      Continuum.__side_effect__(
        unquote(fun),
        unquote(Macro.escape(command))
      )
    end
  end

  @doc false
  @spec __side_effect__((-> term()), term()) :: term()
  def __side_effect__(fun, command_base) when is_function(fun, 0) do
    Effect.run({:side_effect, :user}, {:command, command_base, fun})
  end

  @doc """
  Recover an activity's raw return value from a compensation handle.

  When an `activity/2` call uses `compensate:`, a success is returned as
  `{:ok, %Continuum.ActivityRef{}}` rather than a bare term. `unwrap/1` peels the
  ref back to the activity's raw return:

    * `unwrap(%Continuum.ActivityRef{raw_result: raw})` → `raw`
    * `unwrap({:ok, %Continuum.ActivityRef{} = ref})` → `ref.raw_result`
    * `unwrap(other)` → `other` (activities without `compensate:` are unchanged)
  """
  @doc since: "0.3.0"
  @spec unwrap(term()) :: term()
  def unwrap(%Continuum.ActivityRef{raw_result: raw}), do: raw
  def unwrap({:ok, %Continuum.ActivityRef{raw_result: raw}}), do: raw
  def unwrap(other), do: other

  @doc """
  Whether we are currently executing inside a workflow process. Useful
  in helper modules that branch on context.
  """
  @spec in_workflow?() :: boolean()
  def in_workflow? do
    Context.active?()
  end

  @doc """
  Journaled patch marker for in-place, backward-compatible workflow changes.

      def run(input) do
        if Continuum.patched?(:add_fraud_check_v2) do
          activity FraudCheck.v2(input)
        else
          activity FraudCheck.v1(input)
        end
      end

  Inside a workflow the first call to `patched?(name)` at a given source line
  journals a `patched` event with `value: true` and returns `true`; the value
  is then replayed on resume so the run never changes branch mid-flight. A run
  that is replaying history recorded *before* the patch line existed returns
  `false` without consuming any event, keeping in-flight runs on the old path.

  Outside a workflow process (test setup, ordinary code) it returns `false`.

  Like `now/0` and `uuid4/0`, this is a macro so it captures `__CALLER__` for a
  stable command identity; modules that call it must `require Continuum`
  (`use Continuum.Workflow` does this for you).
  """
  @doc since: "0.3.0"
  defmacro patched?(patch_name) do
    command = command_base(__CALLER__, :patched)
    register_patch_site(__CALLER__, patch_name, command)

    quote do
      Continuum.__patched__(unquote(patch_name), unquote(Macro.escape(command)))
    end
  end

  @doc false
  def __patched__(patch_name, command_base) do
    if Context.active?() do
      Effect.run({:patched, patch_name}, {:command, command_base})
    else
      false
    end
  end

  # ---------------------------------------------------------------------------

  defp child(_module, false, _instance), do: nil
  defp child(module, true, instance), do: child(module, [], instance)

  defp child(module, opts, instance) do
    opts = opts |> List.wrap() |> Keyword.put(:instance, instance)
    Supervisor.child_spec({module, opts}, id: {module, instance.name})
  end

  defp activity_supervisor_child(opts, %{activity_executor: :builtin} = instance) do
    child(
      Continuum.Runtime.ActivityWorker.Supervisor,
      Keyword.get(opts, :activity_supervisor, []),
      instance
    )
  end

  defp activity_supervisor_child(_opts, _instance), do: nil

  @doc false
  def __generate_uuid4__ do
    <<u0::48, _::4, u1::12, _::2, u2::62>> = :crypto.strong_rand_bytes(16)

    <<a::32, b::16, c::16, d::16, e::48>> =
      <<u0::48, 4::4, u1::12, 2::2, u2::62>>

    :io_lib.format("~8.16.0b-~4.16.0b-~4.16.0b-~4.16.0b-~12.16.0b", [a, b, c, d, e])
    |> IO.iodata_to_binary()
  end

  defp command_base(env, kind) do
    {:side_effect, env.module, env.function, env.line, hash_term(kind)}
  end

  defp register_patch_site(%Macro.Env{module: nil}, _patch_name, _command), do: :ok

  defp register_patch_site(env, patch_name, command) do
    Module.put_attribute(env.module, :continuum_patch_sites, %{
      name: patch_name,
      command_id: command,
      file: env.file,
      line: env.line
    })
  rescue
    _ -> :ok
  end

  defp hash_term(term) do
    term
    |> :erlang.term_to_binary([:deterministic])
    |> then(&:crypto.hash(:sha256, &1))
    |> Base.encode16(case: :lower)
  end
end