Skip to main content

lib/squidie.ex

# credo:disable-for-this-file ExSlop.Check.Readability.DocFalseOnPublicFunction
defmodule Squidie do
  @moduledoc """
  Public entrypoint for the Squidie runtime.

  The API exposed here stays focused on declarative workflow operations. Host
  applications start, inspect, and later control runs through this surface
  without needing to work directly with persistence internals.
  """

  alias Squidie.Config
  alias Squidie.ReadModel.Inspection
  alias Squidie.ReadModel.Listing
  alias Squidie.ReadModel.Timeline
  alias Squidie.Runs.DynamicWorkPreview
  alias Squidie.Runs.GraphInspection
  alias Squidie.Runtime.DispatchAgent
  alias Squidie.Runtime.DispatchProtocol
  alias Squidie.Runtime.Journal.Cancellation
  alias Squidie.Runtime.Journal.ChildStarter
  alias Squidie.Runtime.Journal.DynamicWork
  alias Squidie.Runtime.Journal.EntryBuilder
  alias Squidie.Runtime.Journal.Executor
  alias Squidie.Runtime.Journal.Options
  alias Squidie.Runtime.Journal.Replay
  alias Squidie.Runtime.Journal.SignalInterpreter
  alias Squidie.Runtime.Journal.Starter
  alias Squidie.Runtime.Journal.WorkflowDefinitionLoader
  alias Squidie.Runtime.ScheduleIdentity
  alias Squidie.Runtime.Signal
  alias Squidie.Runtime.WorkflowAgent
  alias Squidie.Workflow.ActionRegistry
  alias Squidie.Workflow.SpecPreview

  @read_models [:read_model]
  @runtimes [:journal]
  @dispatch_schedule_retries 25
  @projection_snapshot_options [:queue, :now]
  @projection_list_options [:queue, :now]
  @journal_start_options [:runtime, :journal_storage, :queue, :now, :run_id]
  @journal_spec_start_options [
    :runtime,
    :journal_storage,
    :queue,
    :now,
    :run_id,
    :action_registry,
    :guardrail_registry
  ]
  @journal_child_start_options [
    :runtime,
    :journal_storage,
    :queue,
    :now,
    :child_key,
    :metadata
  ]
  @journal_control_options [:runtime, :journal_storage, :queue, :now]
  @journal_execute_options [
    :runtime,
    :journal_storage,
    :queue,
    :owner_id,
    :lease_for,
    :heartbeat_interval_ms,
    :now,
    :action_registry,
    :guardrail_registry
  ]
  @journal_dynamic_work_options [
    :runtime,
    :read_model,
    :journal_storage,
    :queue,
    :now,
    :repo,
    :action_registry
  ]

  @typedoc """
  Structured validation errors returned by the public read-model APIs.
  """
  @type read_option_error ::
          {:invalid_option,
           {:opts, term()}
           | {:option, term()}
           | {:read_model, term()}
           | {:journal_storage, nil}
           | {:run_id, term()}}

  @typedoc """
  Structured validation errors returned by the public start APIs.
  """
  @type start_option_error ::
          {:invalid_option,
           {:opts, term()}
           | {:runtime, term()}
           | {:journal_storage, nil}
           | {:queue, term()}
           | {:now, term()}
           | {:run_id, term()}
           | {:schedule_idempotency_key, term()}}

  @doc """
  Loads Squidie configuration from the application environment with optional
  runtime overrides.
  """
  @spec config(keyword()) :: {:ok, Config.t()} | {:error, Config.config_error()}
  defdelegate config(overrides \\ []), to: Config, as: :load

  @doc """
  Loads Squidie configuration and raises if required keys are missing.
  """
  @spec config!(keyword()) :: Config.t()
  defdelegate config!(overrides \\ []), to: Config, as: :load!

  @doc """
  Starts a new workflow run through the workflow's default trigger.

  Public manual starts load journal runtime defaults from `config :squidie`.
  When `journal_storage:` is not passed explicitly, Squidie infers Ecto-backed
  journal storage from the configured `:repo`, so host apps must either set
  `config :squidie, repo: MyApp.Repo` globally or pass an explicit
  `journal_storage:` override from their runtime boundary.
  """
  @spec start(module(), map()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error, Config.config_error()}
          | {:error, start_option_error()}
          | {:error, Starter.start_error()}
          | {:error, {:dispatch_failed, term()}}
  def start(workflow, payload) when is_map(payload), do: start(workflow, payload, [])

  @doc """
  Starts a new workflow run through the workflow's default trigger with runtime
  overrides, or through a named trigger without runtime overrides.
  """
  @spec start(module(), map(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error, Config.config_error()}
          | {:error, {:invalid_option, atom()}}
          | {:error, start_option_error()}
          | {:error, Starter.start_error()}
          | {:error, {:dispatch_failed, term()}}
  def start(workflow, payload, overrides) when is_map(payload) and is_list(overrides) do
    with :ok <- reject_public_start_options(overrides),
         {:ok, :journal} <- runtime(overrides) do
      start_default_run_with_runtime(:journal, workflow, payload, overrides)
    end
  end

  def start(_workflow, _payload, overrides) when is_list(overrides) do
    {:error, {:invalid_payload, :expected_map}}
  end

  @spec start(module(), atom(), map()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error, Config.config_error()}
          | {:error, start_option_error()}
          | {:error, Starter.start_error()}
          | {:error, {:dispatch_failed, term()}}
  def start(workflow, trigger_name, payload)
      when is_atom(trigger_name) and is_map(payload),
      do: start(workflow, trigger_name, payload, [])

  @doc """
  Starts a named trigger while applying runtime configuration overrides.

  Overrides are intended for host-app test and integration boundaries. Runtime
  context injection is kept out of this public API so scheduled starts and other
  internal callers can keep their idempotency metadata isolated.

  The Jido journal runtime is the default and infers Ecto-backed journal storage
  from the configured repo. Pass `journal_storage:` only when a host, test, or
  integration boundary needs a non-default storage adapter. Journal execution
  supports normal action steps, immediate built-in `:log` steps, built-in
  `:wait` steps in transition and dependency workflows, and manual `:pause` or
  `:approval` boundaries.

  Public manual starts still resolve that default storage through
  `config :squidie`, so host apps must keep `:repo` configured globally unless
  they pass an explicit `journal_storage:` override.
  """
  @spec start(module(), atom(), map(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error, Config.config_error()}
          | {:error, {:invalid_option, atom()}}
          | {:error, start_option_error()}
          | {:error, Starter.start_error()}
          | {:error, {:dispatch_failed, term()}}
  def start(workflow, trigger_name, payload, overrides)
      when is_atom(trigger_name) and is_map(payload) and is_list(overrides) do
    with :ok <- reject_public_start_options(overrides),
         {:ok, :journal} <- runtime(overrides) do
      start_triggered_run_with_runtime(:journal, workflow, trigger_name, payload, overrides)
    end
  end

  @doc """
  Starts a runtime-authored workflow spec through its default trigger.

  Runtime-authored specs should be validated with a host-owned action registry
  before activation. Passing `:action_registry` lets Squidie resolve stable
  action keys to approved executable modules at the start boundary.
  """
  @spec start_spec(Squidie.Workflow.Spec.t() | map(), map(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error, Config.config_error()}
          | {:error, start_option_error()}
          | {:error, Starter.start_error()}
          | {:error, {:dispatch_failed, term()}}
  def start_spec(spec, payload, overrides \\ [])

  def start_spec(spec, payload, overrides) when is_map(payload) and is_list(overrides) do
    with :ok <- reject_public_start_options(overrides),
         {:ok, :journal} <- runtime(overrides) do
      start_spec_run_with_runtime(:journal, spec, nil, payload, overrides)
    end
  end

  def start_spec(_spec, _payload, overrides) when is_list(overrides) do
    {:error, {:invalid_payload, :expected_map}}
  end

  @doc """
  Starts a runtime-authored workflow spec through a named trigger.
  """
  @spec start_spec(Squidie.Workflow.Spec.t() | map(), atom(), map(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error, Config.config_error()}
          | {:error, start_option_error()}
          | {:error, Starter.start_error()}
          | {:error, {:dispatch_failed, term()}}
  def start_spec(spec, trigger_name, payload, overrides)
      when is_atom(trigger_name) and is_map(payload) and is_list(overrides) do
    with :ok <- reject_public_start_options(overrides),
         {:ok, :journal} <- runtime(overrides) do
      start_spec_run_with_runtime(:journal, spec, trigger_name, payload, overrides)
    end
  end

  def start_spec(_spec, trigger_name, _payload, overrides) when is_list(overrides) do
    {:error, {:invalid_trigger, trigger_name}}
  end

  @doc """
  Previews a runtime-authored workflow spec with a sample payload.

  Preview execution resolves runtime-authored action keys through the host-owned
  action registry and calls only actions that explicitly opt into dry-run
  behavior. It does not create a run, append journal state, schedule dispatch
  attempts, or call durable action `run/2` callbacks.
  """
  @spec preview_spec(Squidie.Workflow.Spec.t() | map(), map(), keyword()) ::
          {:ok, Squidie.Runs.SpecPreview.t()} | {:error, term()}
  def preview_spec(spec, payload, overrides \\ [])

  def preview_spec(spec, payload, overrides) when is_map(payload) and is_list(overrides) do
    SpecPreview.preview(spec, nil, payload, overrides)
  end

  def preview_spec(_spec, _payload, overrides) when is_list(overrides) do
    {:error, {:invalid_payload, :expected_map}}
  end

  @doc """
  Previews a runtime-authored workflow spec through a named trigger.
  """
  @spec preview_spec(Squidie.Workflow.Spec.t() | map(), atom(), map(), keyword()) ::
          {:ok, Squidie.Runs.SpecPreview.t()} | {:error, term()}
  def preview_spec(spec, trigger_name, payload, overrides)
      when is_atom(trigger_name) and is_map(payload) and is_list(overrides) do
    SpecPreview.preview(spec, trigger_name, payload, overrides)
  end

  def preview_spec(_spec, trigger_name, _payload, overrides) when is_list(overrides) do
    {:error, {:invalid_trigger, trigger_name}}
  end

  @doc false
  @spec start_run_with_initial_context(module(), atom(), map(), map(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:ok, {:duplicate_schedule_start, Squidie.ReadModel.Inspection.Snapshot.t()}}
          | {:error, Config.config_error()}
          | {:error, start_option_error()}
          | {:error, Starter.start_error()}
          | {:error, {:dispatch_failed, term()}}
  def start_run_with_initial_context(workflow, trigger_name, payload, initial_context, overrides)
      when is_atom(trigger_name) and is_map(payload) and is_map(initial_context) and
             is_list(overrides) do
    with :ok <- reject_public_start_options(overrides),
         {:ok, :journal} <- runtime(overrides) do
      start_initial_context_run_with_runtime(
        :journal,
        workflow,
        trigger_name,
        payload,
        initial_context,
        overrides
      )
    end
  end

  @doc """
  Starts a child workflow run from a native step context.

  Child starts are deterministic for the parent run, parent step, child
  workflow, child trigger, and required `:child_key`. Duplicate calls with the
  same key return the existing child run and do not duplicate parent lineage.
  """
  @spec start_child_run(Squidie.Step.Context.t(), module(), map(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error, Config.config_error()}
          | {:error, {:invalid_option, atom() | term()}}
          | {:error, ChildStarter.start_error()}
  def start_child_run(parent_context, child_workflow, payload, overrides \\ [])

  def start_child_run(parent_context, child_workflow, payload, overrides)
      when is_map(payload) and is_list(overrides) do
    with :ok <- public_child_start_options(overrides),
         {:ok, :journal} <- runtime(overrides),
         {:ok, definition} <- Squidie.Workflow.Definition.load(child_workflow) do
      child_trigger = Squidie.Workflow.Definition.default_trigger(definition)

      ChildStarter.start_child_run(
        parent_context,
        child_workflow,
        child_trigger,
        payload,
        journal_child_start_options(overrides)
      )
    end
  end

  def start_child_run(_parent_context, _child_workflow, _payload, overrides)
      when is_list(overrides) do
    {:error, {:invalid_payload, :expected_map}}
  end

  def start_child_run(_parent_context, _child_workflow, _payload, _overrides) do
    {:error, {:invalid_option, {:opts, :invalid}}}
  end

  @spec start_child_run(Squidie.Step.Context.t(), module(), atom(), map(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error, Config.config_error()}
          | {:error, {:invalid_option, atom() | term()}}
          | {:error, {:invalid_trigger, :expected_atom}}
          | {:error, ChildStarter.start_error()}
  def start_child_run(parent_context, child_workflow, child_trigger, payload, overrides)
      when is_atom(child_trigger) and is_map(payload) and is_list(overrides) do
    with :ok <- public_child_start_options(overrides),
         {:ok, :journal} <- runtime(overrides) do
      ChildStarter.start_child_run(
        parent_context,
        child_workflow,
        child_trigger,
        payload,
        journal_child_start_options(overrides)
      )
    end
  end

  def start_child_run(_parent_context, _child_workflow, child_trigger, payload, overrides)
      when not is_atom(child_trigger) and is_map(payload) and is_list(overrides) do
    {:error, {:invalid_trigger, :expected_atom}}
  end

  def start_child_run(_parent_context, _child_workflow, _child_trigger, _payload, overrides)
      when is_list(overrides) do
    {:error, {:invalid_payload, :expected_map}}
  end

  def start_child_run(_parent_context, _child_workflow, _child_trigger, _payload, _overrides) do
    {:error, {:invalid_option, {:opts, :invalid}}}
  end

  @doc """
  Fetches one workflow run by id.

  The selected read model comes from host configuration unless overridden. The
  read model rebuilds a projection-backed snapshot from durable journal entries.
  """
  @spec inspect_run(String.t(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | read_option_error()
             | Config.config_error()
             | Inspection.snapshot_error()}
  def inspect_run(run_id, overrides \\ []) do
    with {:ok, :read_model} <- read_model(overrides) do
      inspect_projected_run(run_id, overrides)
    end
  end

  @doc """
  Fetches one workflow run as graph-oriented inspection data.

  The graph projection preserves `inspect_run/2` as the factual run snapshot and
  derives nodes and edges from that same durable state. The selected read model
  comes from host configuration unless overridden.
  """
  @spec inspect_run_graph(String.t(), keyword()) ::
          {:ok, GraphInspection.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | read_option_error()
             | Config.config_error()
             | Inspection.snapshot_error()}
  def inspect_run_graph(run_id, overrides \\ []) do
    with {:ok, :read_model} <- read_model(overrides),
         {:ok, inspection} <- inspect_graph_source(run_id, :read_model, overrides) do
      {:ok, graph_inspection(inspection, :read_model, overrides)}
    end
  end

  @doc """
  Fetches one workflow run as chronological operator timeline events.

  The timeline projection is derived from the same durable read-model snapshot
  as `inspect_run/2`. Events carry stable ordering fields and redaction-safe
  details so clients do not need to parse raw journal entries.
  """
  @spec inspect_run_timeline(String.t(), keyword()) ::
          {:ok, Timeline.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | read_option_error()
             | Config.config_error()
             | Inspection.snapshot_error()}
  def inspect_run_timeline(run_id, overrides \\ []) do
    with {:ok, :read_model} <- read_model(overrides),
         {:ok, %Inspection.Snapshot{} = snapshot} <- inspect_projected_run(run_id, overrides) do
      Inspection.timeline(snapshot)
    end
  end

  @doc """
  Validates bounded dynamic work and returns the graph it would produce.

  This is the read-only companion to `record_dynamic_work/3`. It applies the
  same validation and normalization rules, but does not append a journal fact.
  Use it for UI previews, visual editor validation, and host-side dry runs
  before recording dynamic work durably. Pass `:action_registry` to require
  every dynamic node action key to be host-allowlisted before previewing.
  """
  @spec preview_dynamic_work(String.t(), map() | keyword(), keyword()) ::
          {:ok, DynamicWorkPreview.t()}
          | {:error,
             :not_found
             | Config.config_error()
             | read_option_error()
             | DynamicWork.dynamic_work_error()
             | term()}
  def preview_dynamic_work(run_id, attrs, overrides \\ [])

  def preview_dynamic_work(run_id, attrs, overrides) when is_list(overrides) do
    with :ok <- public_dynamic_work_options(overrides),
         {:ok, :journal} <- runtime(overrides),
         {:ok, :read_model} <- read_model(overrides),
         {:ok, run_id} <- Options.thread_part(run_id, :run_id),
         {:ok, now} <- dynamic_work_time(overrides),
         {:ok, %Inspection.Snapshot{} = snapshot} <- inspect_projected_run(run_id, overrides),
         {:ok, context} <- dynamic_work_context(snapshot, overrides),
         {:ok, preview} <- DynamicWork.preview(run_id, attrs, now, context) do
      {:ok, dynamic_work_preview(snapshot, preview, overrides)}
    end
  end

  def preview_dynamic_work(_run_id, _attrs, _overrides) do
    {:error, {:invalid_option, {:opts, :invalid}}}
  end

  @doc """
  Records bounded dynamic work for one workflow run.

  This API records inspection-only dynamic structure. It does not make dynamic
  nodes executable, schedule dispatch attempts, or alter terminal-state
  decisions. Use it when host code or a runtime step has already made a bounded
  fanout or planning decision and dashboards need durable, validated metadata
  about that decision.

  The attribute payload must include:

    * `:dynamic_key` - a stable non-empty string for this dynamic work group.
    * `:origin` - a map with `:runnable_key`, `:step`, and positive `:attempt`
      matching a planned runnable in the active run.
    * `:nodes` - one or more dynamic node maps with unique non-empty `:id`
      values that do not collide with declared workflow steps or previously
      recorded dynamic nodes.

  Optional `:edges`, `:metadata`, `:reason`, and `:status` values are normalized
  and redacted like other journal metadata. Pass `:action_registry` to require
  every dynamic node action key to be host-allowlisted before recording. Exact
  duplicate records are idempotent while the run remains active. Terminal runs,
  stale workflow definitions, invalid origins, node collisions, unknown options,
  and write conflicts return structured `{:error, reason}` tuples.
  """
  @spec record_dynamic_work(String.t(), map() | keyword(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error,
             :not_found
             | Config.config_error()
             | read_option_error()
             | DynamicWork.dynamic_work_error()
             | term()}
  def record_dynamic_work(run_id, attrs, overrides \\ [])

  def record_dynamic_work(run_id, attrs, overrides) when is_list(overrides) do
    with :ok <- public_dynamic_work_options(overrides),
         {:ok, :journal} <- runtime(overrides),
         {:ok, :read_model} <- read_model(overrides),
         {:ok, storage} <- journal_storage(overrides),
         {:ok, run_id} <- Options.thread_part(run_id, :run_id),
         {:ok, now} <- dynamic_work_time(overrides),
         {:ok, %Inspection.Snapshot{} = snapshot} <- inspect_projected_run(run_id, overrides),
         {:ok, context} <- dynamic_work_context(snapshot, overrides),
         {:ok, intent} <-
           DynamicWork.new_entry(run_id, attrs, now, context) do
      record_dynamic_work_intent(storage, run_id, overrides, snapshot, intent)
    end
  end

  def record_dynamic_work(_run_id, _attrs, _overrides) do
    {:error, {:invalid_option, {:opts, :invalid}}}
  end

  @doc """
  Records bounded dynamic work and schedules each dynamic node as executable work.

  This is the executable counterpart to `record_dynamic_work/3`. The dynamic
  work fact and the planned runnable intents are appended to the run thread in
  one optimistic write, then missing dispatch attempts are scheduled from that
  durable plan. Pass `:action_registry`; every dynamic node must include an
  approved `:action` key and may include an `:input` map for the scheduled
  attempt.
  """
  @spec schedule_dynamic_work(String.t(), map() | keyword(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error,
             :not_found
             | Config.config_error()
             | read_option_error()
             | DynamicWork.dynamic_work_error()
             | term()}
  def schedule_dynamic_work(run_id, attrs, overrides \\ [])

  def schedule_dynamic_work(run_id, attrs, overrides) when is_list(overrides) do
    with :ok <- public_dynamic_work_options(overrides),
         {:ok, :journal} <- runtime(overrides),
         {:ok, :read_model} <- read_model(overrides),
         {:ok, storage} <- journal_storage(overrides),
         {:ok, queue} <- dynamic_work_queue(overrides),
         {:ok, registry} <- dynamic_work_action_registry(overrides),
         {:ok, run_id} <- Options.thread_part(run_id, :run_id),
         {:ok, now} <- dynamic_work_time(overrides),
         {:ok, %Inspection.Snapshot{} = snapshot} <- inspect_projected_run(run_id, overrides),
         {:ok, context} <- dynamic_work_context(snapshot, overrides),
         {:ok, intent} <-
           DynamicWork.new_entry(
             run_id,
             put_new_dynamic_work_status(attrs, :scheduled),
             now,
             context
           ),
         :ok <- dynamic_work_origin_applied(intent, snapshot),
         {:ok, runnables} <- dynamic_work_runnables(run_id, intent, queue, now, registry) do
      schedule_dynamic_work_intent(storage, run_id, overrides, snapshot, intent, runnables, now)
    end
  end

  def schedule_dynamic_work(_run_id, _attrs, _overrides) do
    {:error, {:invalid_option, {:opts, :invalid}}}
  end

  @doc """
  Explains the current runtime state of one workflow run.

  The result is structured diagnostic data for host apps, CLIs, and dashboards.
  Use `inspect_run/2` for the factual run snapshot and `explain_run/2` when an
  operator-facing surface needs the reason, evidence, and valid next actions for
  the run's current state.

  The selected read model comes from host configuration unless overridden. The
  read model derives diagnostics from durable journal projections.
  """
  @spec explain_run(String.t(), keyword()) ::
          {:ok, Squidie.ReadModel.Explanation.Diagnostic.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | read_option_error()
             | Config.config_error()
             | Squidie.ReadModel.Explanation.explanation_error()}
  def explain_run(run_id, overrides \\ []) do
    with {:ok, :read_model} <- read_model(overrides) do
      explain_projected_run(run_id, overrides)
    end
  end

  @doc """
  Executes the next visible workflow attempt through the selected runtime.

  The default journal runtime claims one visible Jido journal-backed attempt,
  runs its declared step, and appends durable attempt completion or failure
  facts. Pass `journal_storage:` only when overriding the inferred Ecto storage
  boundary.
  """
  @spec execute_next(keyword()) :: Executor.execute_result()
  def execute_next(overrides \\ [])

  def execute_next(overrides) when is_list(overrides) do
    with :ok <- public_execute_options(overrides),
         {:ok, :journal} <- runtime(overrides) do
      Executor.execute_next(journal_execute_options(overrides))
    end
  end

  def execute_next(overrides), do: Executor.execute_next(overrides)

  defp public_execute_options(opts) do
    cond do
      not Keyword.keyword?(opts) ->
        :ok

      unsupported = Enum.find(Keyword.keys(opts), &(&1 not in public_execute_option_keys())) ->
        {:error, {:invalid_option, {:option, unsupported}}}

      true ->
        :ok
    end
  end

  defp public_execute_option_keys do
    [
      :runtime,
      :journal_storage,
      :queue,
      :owner_id,
      :lease_for,
      :heartbeat_interval_ms,
      :now,
      :action_registry,
      :guardrail_registry
    ]
  end

  defp public_child_start_options(opts) do
    cond do
      not Keyword.keyword?(opts) ->
        {:error, {:invalid_option, {:opts, :invalid}}}

      unsupported = Enum.find(Keyword.keys(opts), &(&1 not in @journal_child_start_options)) ->
        {:error, {:invalid_option, {:option, unsupported}}}

      true ->
        :ok
    end
  end

  defp public_dynamic_work_options(opts) do
    cond do
      not Keyword.keyword?(opts) ->
        {:error, {:invalid_option, {:opts, :invalid}}}

      unsupported = Enum.find(Keyword.keys(opts), &(&1 not in @journal_dynamic_work_options)) ->
        {:error, {:invalid_option, {:option, unsupported}}}

      true ->
        :ok
    end
  end

  @doc """
  Lists workflow runs with optional filters.

  Journal-backed runtime calls return redacted listing summaries. Use
  `inspect_run/2` or `inspect_run_graph/2` with a run id when callers need
  detailed runtime state for one run.
  """
  @spec list_runs([Listing.list_filter()], keyword()) ::
          {:ok, [Listing.Summary.t()]}
          | {:error, Config.config_error() | Listing.list_error()}
  def list_runs(filters \\ [], overrides \\ []) do
    with {:ok, :journal} <- runtime(overrides) do
      list_runs_with_runtime(:journal, filters, overrides)
    end
  end

  @doc """
  Requests cancellation for an eligible workflow run.
  """
  @spec cancel(Ecto.UUID.t(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | Config.config_error()
             | Cancellation.cancel_error()}
  def cancel(run_id, overrides \\ []) do
    with {:ok, :journal} <- runtime(overrides) do
      cancel_run_with_runtime(:journal, run_id, overrides)
    end
  end

  @doc """
  Applies a Squidie-native runtime command signal.

  Use the named helpers such as `start/3`, `cancel/2`, `resume/3`,
  `approve/3`, `reject/3`, and `replay/2` for ordinary application calls. Use
  `apply_signal/2` when an agent, router, webhook, scheduler, or Jido interop
  boundary has already normalized a request into a
  `Squidie.Runtime.Signal` envelope.

  The envelope API applies the same runtime commands for starts, cron starts,
  replays, cancellation, and manual decisions while preserving signal metadata,
  occurrence time, and idempotency keys in the journal command history.
  """
  @spec apply_signal(Signal.t(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error, Config.config_error() | term()}
  def apply_signal(signal, overrides \\ [])

  def apply_signal(%Signal{} = signal, overrides) when is_list(overrides) do
    with {:ok, :journal} <- runtime(overrides) do
      SignalInterpreter.apply(signal, journal_control_options(overrides))
    end
  end

  def apply_signal(%Signal{}, _overrides), do: {:error, {:invalid_option, {:opts, :invalid}}}
  def apply_signal(_signal, _overrides), do: {:error, :invalid_signal}

  @doc """
  Resumes a run that is intentionally paused for manual intervention.
  """
  @spec resume(Ecto.UUID.t()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | Config.config_error()
             | term()}
  def resume(run_id), do: resume(run_id, %{}, [])

  @doc """
  Resumes a paused run with either configuration overrides or manual action
  attributes.
  """
  @spec resume(Ecto.UUID.t(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | Config.config_error()
             | term()}
  def resume(run_id, overrides) when is_list(overrides), do: resume(run_id, %{}, overrides)

  @spec resume(Ecto.UUID.t(), map()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | Config.config_error()
             | term()}
  def resume(run_id, attrs) when is_map(attrs), do: resume(run_id, attrs, [])

  @doc """
  Resumes a paused run with manual action attributes and configuration
  overrides.
  """
  @spec resume(Ecto.UUID.t(), map(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | Config.config_error()
             | term()}
  def resume(run_id, attrs, overrides) when is_map(attrs) and is_list(overrides) do
    with {:ok, :journal} <- runtime(overrides),
         {:ok, signal} <- control_signal(:resume_run, run_id, attrs, overrides) do
      SignalInterpreter.apply(signal, journal_control_options(overrides))
    else
      {:error, {:invalid_signal, reason}} -> {:error, public_signal_error(reason)}
      {:error, _reason} = error -> error
    end
  end

  @doc """
  Approves a paused approval step and resumes the run through its success path.
  """
  @spec approve(Ecto.UUID.t(), map(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | Config.config_error()
             | term()}
  def approve(run_id, attrs, overrides \\ []) when is_map(attrs) and is_list(overrides) do
    with {:ok, :journal} <- runtime(overrides),
         {:ok, signal} <- control_signal(:approve_run, run_id, attrs, overrides) do
      SignalInterpreter.apply(signal, journal_control_options(overrides))
    else
      {:error, {:invalid_signal, reason}} -> {:error, public_signal_error(reason)}
      {:error, _reason} = error -> error
    end
  end

  @doc """
  Rejects a paused approval step and resumes the run through its rejection path.
  """
  @spec reject(Ecto.UUID.t(), map(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | Config.config_error()
             | term()}
  def reject(run_id, attrs, overrides \\ []) when is_map(attrs) and is_list(overrides) do
    with {:ok, :journal} <- runtime(overrides),
         {:ok, signal} <- control_signal(:reject_run, run_id, attrs, overrides) do
      SignalInterpreter.apply(signal, journal_control_options(overrides))
    else
      {:error, {:invalid_signal, reason}} -> {:error, public_signal_error(reason)}
      {:error, _reason} = error -> error
    end
  end

  @doc """
  Creates a new run from a prior run and links it to the original run.

  Replays are blocked by default once the source run completed an irreversible
  or non-compensatable step. Pass `allow_irreversible: true` only after an
  operator has reviewed the side effect and accepted re-execution.
  """
  @spec replay(Ecto.UUID.t(), keyword()) ::
          {:ok, Squidie.ReadModel.Inspection.Snapshot.t()}
          | {:error,
             :not_found
             | :invalid_run_id
             | Config.config_error()
             | Replay.replay_error()}
          | {:error, {:dispatch_failed, term()}}
  def replay(run_id, overrides \\ []) do
    {replay_opts, config_overrides} = Keyword.split(overrides, [:allow_irreversible])

    with {:ok, :journal} <- runtime(config_overrides),
         {:ok, run} <- replay_run_with_runtime(:journal, run_id, replay_opts, config_overrides) do
      {:ok, run}
    else
      {:error, reason} = error when reason in [:not_found, :invalid_run_id] ->
        error

      {:error, {:unsafe_replay, _details} = reason} ->
        {:error, reason}

      {:error, {:invalid_run, _changeset} = reason} ->
        {:error, reason}

      {:error, {:invalid_option, _details} = reason} ->
        {:error, reason}

      {:error, {:incompatible_workflow_definition, _operation} = reason} ->
        {:error, reason}

      {:error, {:incompatible_workflow_definition, _operation, _metadata} = reason} ->
        {:error, reason}

      {:error, {:invalid_replay_source, _details} = reason} ->
        {:error, reason}

      {:error, %_struct{} = reason} ->
        {:error, {:dispatch_failed, reason}}

      {:error, reason} ->
        {:error, {:dispatch_failed, reason}}
    end
  end

  defp replay_run_with_runtime(:journal, run_id, replay_opts, config_overrides) do
    Replay.replay(run_id, replay_opts, journal_control_options(config_overrides))
  end

  defp start_default_run_with_runtime(:journal, workflow, payload, overrides) do
    Starter.start_run(workflow, nil, payload, journal_start_options(overrides))
  end

  defp start_triggered_run_with_runtime(:journal, workflow, trigger_name, payload, overrides) do
    Starter.start_run(workflow, trigger_name, payload, journal_start_options(overrides))
  end

  defp start_spec_run_with_runtime(:journal, spec, trigger_name, payload, overrides) do
    Starter.start_spec_run(spec, trigger_name, payload, journal_spec_start_options(overrides))
  end

  defp start_initial_context_run_with_runtime(
         :journal,
         workflow,
         trigger_name,
         payload,
         initial_context,
         overrides
       ) do
    with {:ok, opts} <-
           journal_initial_context_start_options(
             workflow,
             trigger_name,
             initial_context,
             overrides
           ) do
      Starter.start_run(workflow, trigger_name, payload, opts)
    end
  end

  defp journal_child_start_options(overrides) do
    configured_journal_options(overrides, @journal_child_start_options)
  end

  defp list_runs_with_runtime(:journal, filters, overrides) do
    with {:ok, storage} <- journal_storage(overrides) do
      Listing.list(storage, filters, journal_list_options(overrides))
    end
  end

  defp cancel_run_with_runtime(:journal, run_id, overrides) do
    case control_signal(:cancel_run, run_id, overrides) do
      {:ok, signal} ->
        SignalInterpreter.apply(signal, journal_control_options(overrides))

      {:error, {:invalid_signal, reason}} ->
        {:error, public_signal_error(reason)}

      {:error, _reason} = error ->
        error
    end
  end

  defp control_signal(:cancel_run, run_id, overrides) do
    with {:ok, signal_opts} <- control_signal_options(overrides) do
      run_id
      |> Signal.cancel_run(signal_opts)
      |> normalize_control_signal_result()
    end
  end

  defp control_signal(type, run_id, attrs, overrides) do
    with {:ok, signal_opts} <- control_signal_options(overrides) do
      Signal
      |> apply(type, [run_id, attrs, signal_opts])
      |> normalize_control_signal_result()
    end
  end

  defp control_signal_options(overrides) do
    with {:ok, opts} <- control_signal_occurred_at(overrides),
         {:ok, opts} <- control_signal_metadata(overrides, opts) do
      control_signal_idempotency_key(overrides, opts)
    end
  end

  defp control_signal_occurred_at(overrides) do
    case Keyword.fetch(overrides, :now) do
      {:ok, %DateTime{} = now} -> {:ok, [occurred_at: now]}
      {:ok, _invalid} -> {:error, {:invalid_option, {:now, :invalid}}}
      :error -> {:ok, []}
    end
  end

  defp control_signal_metadata(overrides, opts) do
    case Keyword.fetch(overrides, :metadata) do
      {:ok, metadata} when is_map(metadata) -> {:ok, Keyword.put(opts, :metadata, metadata)}
      {:ok, _invalid} -> {:error, {:invalid_option, {:metadata, :invalid}}}
      :error -> {:ok, opts}
    end
  end

  defp control_signal_idempotency_key(overrides, opts) do
    case Keyword.fetch(overrides, :idempotency_key) do
      {:ok, idempotency_key} when is_binary(idempotency_key) and idempotency_key != "" ->
        {:ok, Keyword.put(opts, :idempotency_key, idempotency_key)}

      {:ok, _invalid} ->
        {:error, {:invalid_option, {:idempotency_key, :invalid}}}

      :error ->
        {:ok, opts}
    end
  end

  defp normalize_control_signal_result({:ok, %Signal{}} = result), do: result

  defp normalize_control_signal_result({:error, {:invalid_signal, _reason}} = error), do: error

  defp normalize_control_signal_result({:error, reason}), do: {:error, {:invalid_signal, reason}}

  defp public_signal_error({:run_id, :invalid}), do: :invalid_run_id

  defp public_signal_error({:occurred_at, :expected_datetime}),
    do: {:invalid_option, {:now, :invalid}}

  defp public_signal_error(reason), do: {:invalid_signal, reason}

  defp journal_initial_context_start_options(workflow, trigger_name, initial_context, overrides) do
    opts =
      overrides
      |> journal_start_options()
      |> Keyword.put(:initial_context, initial_context)

    with {:ok, idempotency_key} <- schedule_idempotency_key(initial_context) do
      case idempotency_key do
        nil ->
          {:ok, opts}

        idempotency_key ->
          opts =
            opts
            |> Keyword.put(:run_id, schedule_run_id(workflow, trigger_name, idempotency_key))
            |> Keyword.put(:duplicate_schedule_start, true)

          {:ok, opts}
      end
    end
  end

  defp schedule_idempotency_key(context) when is_map(context) do
    context
    |> Squidie.Runtime.ScheduleContext.get()
    |> Squidie.Runtime.ScheduleContext.value(:idempotency_key)
    |> validate_schedule_idempotency_key()
  end

  defp validate_schedule_idempotency_key(nil), do: {:ok, nil}

  defp validate_schedule_idempotency_key(key) when is_binary(key), do: {:ok, key}

  defp validate_schedule_idempotency_key(_key) do
    {:error, {:invalid_option, {:schedule_idempotency_key, :invalid}}}
  end

  defp schedule_run_id(workflow, trigger_name, idempotency_key) do
    workflow_name = Squidie.Workflow.Definition.serialize_workflow(workflow)
    trigger = Squidie.Workflow.Definition.serialize_trigger(trigger_name)

    {:ok, run_id} = ScheduleIdentity.run_id(workflow_name, trigger, idempotency_key)
    run_id
  end

  defp reject_public_start_options(overrides) do
    cond do
      Keyword.has_key?(overrides, :context) ->
        {:error, {:invalid_option, :context}}

      Keyword.has_key?(overrides, :initial_context) ->
        {:error, {:invalid_option, :initial_context}}

      true ->
        :ok
    end
  end

  defp dynamic_work_time(overrides) do
    case Keyword.get(overrides, :now, DateTime.utc_now()) do
      %DateTime{} = now -> {:ok, now}
      _invalid -> {:error, {:invalid_option, {:now, :invalid}}}
    end
  end

  defp dynamic_work_queue(overrides) do
    overrides
    |> projected_snapshot_options()
    |> Keyword.get(:queue, "default")
    |> Options.queue()
  end

  defp dynamic_work_action_registry(overrides) do
    case Keyword.fetch(overrides, :action_registry) do
      {:ok, registry} -> {:ok, registry}
      :error -> {:error, {:invalid_dynamic_work, {:action_registry, :required}}}
    end
  end

  defp put_new_dynamic_work_status(attrs, status) when is_map(attrs) do
    Map.put_new(attrs, :status, status)
  end

  defp put_new_dynamic_work_status(attrs, status) when is_list(attrs) do
    if Keyword.keyword?(attrs), do: Keyword.put_new(attrs, :status, status), else: attrs
  end

  defp put_new_dynamic_work_status(attrs, _status), do: attrs

  defp record_dynamic_work_intent(
         _storage,
         _run_id,
         _overrides,
         %Inspection.Snapshot{} = snapshot,
         :duplicate
       ) do
    {:ok, snapshot}
  end

  defp record_dynamic_work_intent(
         storage,
         run_id,
         overrides,
         %Inspection.Snapshot{} = snapshot,
         entry
       ) do
    case Squidie.Runtime.Journal.append_entries(storage, [entry],
           expected_rev: snapshot.thread_revisions.run
         ) do
      {:ok, _thread} ->
        inspect_projected_run(run_id, overrides)

      {:error, :conflict} ->
        resolve_dynamic_work_conflict(run_id, overrides, entry)

      {:error, _reason} = error ->
        error
    end
  end

  defp schedule_dynamic_work_intent(
         storage,
         run_id,
         overrides,
         %Inspection.Snapshot{},
         :duplicate,
         _runnables,
         %DateTime{} = now
       ) do
    schedule_pending_dynamic_dispatches(storage, run_id, overrides, now)
  end

  defp schedule_dynamic_work_intent(
         storage,
         run_id,
         overrides,
         %Inspection.Snapshot{} = snapshot,
         entry,
         runnables,
         %DateTime{} = now
       ) do
    with {:ok, planned_entry} <- dynamic_work_runnables_planned_entry(run_id, runnables, now),
         {:ok, queue} <- dynamic_work_queue(overrides),
         {:ok, dispatch_agent} <- DispatchAgent.rebuild(storage, queue),
         {:ok, %{agent: dispatch_agent}} <-
           ensure_dynamic_work_run_queued(storage, dispatch_agent, run_id, now),
         {:ok, _thread} <-
           Squidie.Runtime.Journal.append_entries(storage, [entry, planned_entry],
             expected_rev: snapshot.thread_revisions.run
           ),
         {:ok, workflow_agent} <- WorkflowAgent.rebuild(storage, run_id),
         {:ok, _scheduled} <-
           WorkflowAgent.schedule_pending_dispatches(storage, workflow_agent, dispatch_agent,
             now: now
           ) do
      inspect_projected_run(run_id, overrides)
    else
      {:error, :conflict} ->
        resolve_scheduled_dynamic_work_conflict(storage, run_id, overrides, entry, now)

      {:error, _reason} = error ->
        error
    end
  end

  defp resolve_scheduled_dynamic_work_conflict(
         storage,
         run_id,
         overrides,
         entry,
         %DateTime{} = now
       ) do
    with {:ok, %Inspection.Snapshot{} = snapshot} <- inspect_projected_run(run_id, overrides),
         {:ok, context} <- dynamic_work_context(snapshot, overrides) do
      case DynamicWork.new_entry(run_id, entry.data, entry.occurred_at, context) do
        {:ok, :duplicate} ->
          schedule_pending_dynamic_dispatches(storage, run_id, overrides, now)

        {:error, {:invalid_dynamic_work, {:run, :terminal}}} = error ->
          error

        {:ok, _entry} ->
          {:error, :conflict}

        {:error, _reason} = error ->
          error
      end
    end
  end

  defp schedule_pending_dynamic_dispatches(storage, run_id, overrides, %DateTime{} = now) do
    with {:ok, workflow_agent} <- WorkflowAgent.rebuild(storage, run_id),
         {:ok, queue} <- dynamic_work_queue(overrides),
         {:ok, dispatch_agent} <- DispatchAgent.rebuild(storage, queue),
         {:ok, %{agent: dispatch_agent}} <-
           ensure_dynamic_work_run_queued(storage, dispatch_agent, run_id, now),
         {:ok, _scheduled} <-
           WorkflowAgent.schedule_pending_dispatches(storage, workflow_agent, dispatch_agent,
             now: now
           ) do
      inspect_projected_run(run_id, overrides)
    end
  end

  defp ensure_dynamic_work_run_queued(storage, dispatch_agent, run_id, %DateTime{} = now) do
    ensure_dynamic_work_run_queued(
      storage,
      dispatch_agent,
      run_id,
      now,
      @dispatch_schedule_retries
    )
  end

  defp ensure_dynamic_work_run_queued(_storage, _dispatch_agent, _run_id, %DateTime{}, 0) do
    {:error, :conflict}
  end

  defp ensure_dynamic_work_run_queued(storage, dispatch_agent, run_id, %DateTime{} = now, retries) do
    case DispatchAgent.ensure_run_queued(storage, dispatch_agent, run_id, now: now) do
      {:ok, _queued} = ok ->
        ok

      {:error, :conflict} ->
        with {:ok, queue} <- Options.queue(dispatch_agent.state.queue),
             {:ok, dispatch_agent} <- DispatchAgent.rebuild(storage, queue) do
          ensure_dynamic_work_run_queued(storage, dispatch_agent, run_id, now, retries - 1)
        end

      {:error, _reason} = error ->
        error
    end
  end

  defp dynamic_work_runnables_planned_entry(run_id, runnables, %DateTime{} = now) do
    EntryBuilder.runnables_planned(run_id, runnables, now)
  end

  defp dynamic_work_runnables(_run_id, :duplicate, _queue, _now, _registry), do: {:ok, []}

  defp dynamic_work_runnables(run_id, %DispatchProtocol.Entry{} = entry, queue, now, registry) do
    entry.data.nodes
    |> Enum.reduce_while({:ok, []}, fn node, {:ok, runnables} ->
      case dynamic_work_runnable(run_id, entry.data, node, queue, now, registry) do
        {:ok, runnable} -> {:cont, {:ok, [runnable | runnables]}}
        {:error, _reason} = error -> {:halt, error}
      end
    end)
    |> case do
      {:ok, runnables} -> {:ok, Enum.reverse(runnables)}
      {:error, _reason} = error -> error
    end
  end

  defp dynamic_work_runnable(run_id, dynamic_work, node, queue, %DateTime{} = now, registry) do
    with {:ok, module} <- ActionRegistry.resolve_action(Map.get(node, :action), registry),
         {:ok, action_opts} <-
           ActionRegistry.resolve_action_opts(Map.get(node, :action), registry),
         :ok <- validate_dynamic_action_input(module, Map.get(node, :input, %{}), action_opts) do
      node_id = Map.fetch!(node, :id)
      runnable_key = Enum.join([run_id, node_id, 1], ":")

      {:ok,
       %{
         run_id: run_id,
         runnable_key: runnable_key,
         idempotency_key: runnable_key,
         attempt_number: 1,
         queue: queue,
         step: node_id,
         input: Map.get(node, :input, %{}),
         visible_at: now,
         recovery: dynamic_work_recovery(Map.get(node, :action)),
         dynamic?: true,
         dynamic_work: %{
           dynamic_key: dynamic_work.dynamic_key,
           action: Map.get(node, :action),
           module: module,
           action_opts: persisted_action_opts(module, action_opts),
           retry: Map.get(node, :retry),
           origin: dynamic_work.origin
         }
       }}
    else
      {:error, reason} ->
        {:error, {:invalid_dynamic_work, {:nodes, {:action, reason}}}}
    end
  end

  defp validate_dynamic_action_input(module, input, action_opts) do
    if {:validate_action_input, 2} in module.__info__(:functions) do
      case module.validate_action_input(input || %{}, action_opts) do
        :ok -> :ok
        {:error, error} -> {:error, {:action_input, Map.get(error, :validation_errors, %{})}}
      end
    else
      :ok
    end
  end

  defp persisted_action_opts(module, action_opts)
       when is_atom(module) and is_list(action_opts) do
    if function_exported?(module, :persisted_action_opts, 1) do
      module.persisted_action_opts(action_opts)
    else
      action_opts
    end
  end

  defp persisted_action_opts(_module, _action_opts), do: []

  defp dynamic_work_recovery(action) do
    %{
      irreversible?: true,
      compensatable?: false,
      replay: :manual_review_required,
      recovery: :manual_intervention,
      dynamic?: true,
      action: action
    }
  end

  defp dynamic_work_origin_applied(:duplicate, _snapshot), do: :ok

  defp dynamic_work_origin_applied(%DispatchProtocol.Entry{} = entry, snapshot) do
    origin_key = entry.data.origin.runnable_key

    if origin_key in snapshot.applied_runnable_keys do
      :ok
    else
      {:error, {:invalid_dynamic_work, {:origin, :unapplied_runnable}}}
    end
  end

  defp dynamic_work_context(%Inspection.Snapshot{terminal?: true} = snapshot, overrides) do
    maybe_put_dynamic_work_action_registry(
      {:ok,
       %{
         terminal?: snapshot.terminal?,
         planned_runnables: snapshot.planned_runnables,
         dynamic_work: snapshot.dynamic_work,
         definition: nil
       }},
      overrides
    )
  end

  defp dynamic_work_context(%Inspection.Snapshot{} = snapshot, overrides) do
    with {:ok, definition} <- dynamic_work_definition(snapshot, overrides) do
      maybe_put_dynamic_work_action_registry(
        {:ok,
         %{
           terminal?: snapshot.terminal?,
           planned_runnables: snapshot.planned_runnables,
           dynamic_work: snapshot.dynamic_work,
           definition: definition
         }},
        overrides
      )
    end
  end

  defp maybe_put_dynamic_work_action_registry({:ok, context}, overrides) do
    case Keyword.fetch(overrides, :action_registry) do
      {:ok, registry} -> {:ok, Map.put(context, :action_registry, registry)}
      :error -> {:ok, context}
    end
  end

  defp dynamic_work_preview(%Inspection.Snapshot{} = snapshot, preview, overrides)
       when is_map(preview) do
    dynamic_work = Map.fetch!(preview, :dynamic_work)
    duplicate? = Map.fetch!(preview, :duplicate?)

    preview_snapshot =
      if duplicate? do
        snapshot
      else
        %Inspection.Snapshot{
          snapshot
          | dynamic_work: preview_dynamic_work_items(snapshot.dynamic_work, dynamic_work)
        }
      end

    DynamicWorkPreview.new(
      snapshot.run_id,
      dynamic_work,
      duplicate?,
      graph_inspection(preview_snapshot, :read_model, overrides)
    )
  end

  defp preview_dynamic_work_items(dynamic_work, preview) when is_list(dynamic_work) do
    Enum.sort_by(
      [preview | dynamic_work],
      &{Map.get(&1, :dynamic_key), Map.get(&1, :recorded_at)}
    )
  end

  defp resolve_dynamic_work_conflict(run_id, overrides, entry) do
    with {:ok, %Inspection.Snapshot{} = snapshot} <- inspect_projected_run(run_id, overrides),
         {:ok, context} <- dynamic_work_context(snapshot, overrides) do
      case DynamicWork.new_entry(run_id, entry.data, entry.occurred_at, context) do
        {:ok, :duplicate} -> {:ok, snapshot}
        {:error, {:invalid_dynamic_work, {:run, :terminal}}} = error -> error
        {:error, _reason} -> {:error, :conflict}
        {:ok, _entry} -> {:error, :conflict}
      end
    end
  end

  defp inspect_projected_run(run_id, overrides) when is_binary(run_id) do
    with {:ok, storage} <- journal_storage(overrides) do
      Inspection.snapshot(storage, run_id, projected_snapshot_options(overrides))
    end
  end

  defp inspect_projected_run(_run_id, _overrides) do
    {:error, {:invalid_option, {:run_id, :invalid}}}
  end

  defp inspect_graph_source(run_id, :read_model, overrides) do
    inspect_projected_run(run_id, overrides)
  end

  defp graph_inspection(%Inspection.Snapshot{} = snapshot, read_model, overrides) do
    GraphInspection.from_snapshot(
      snapshot,
      graph_inspection_options(snapshot, read_model, overrides)
    )
  end

  defp graph_inspection_options(%Inspection.Snapshot{} = snapshot, read_model, overrides) do
    [
      source: read_model,
      include_details: Keyword.get(overrides, :include_history, false),
      definition: graph_definition(snapshot, overrides)
    ]
  end

  defp graph_definition(%Inspection.Snapshot{run_id: run_id, workflow: workflow}, overrides)
       when is_binary(run_id) and is_binary(workflow) do
    with {:ok, storage} <- journal_storage(overrides),
         {:ok, _workflow, definition} <- WorkflowDefinitionLoader.load(storage, run_id, workflow) do
      definition
    else
      {:error, _reason} -> nil
    end
  end

  defp graph_definition(%Inspection.Snapshot{}, _overrides), do: nil

  defp dynamic_work_definition(
         %Inspection.Snapshot{run_id: run_id, workflow: workflow},
         overrides
       )
       when is_binary(run_id) and is_binary(workflow) do
    with {:ok, storage} <- journal_storage(overrides),
         {:ok, _workflow, definition} <- WorkflowDefinitionLoader.load(storage, run_id, workflow) do
      {:ok, definition}
    else
      {:error, reason} -> {:error, {:invalid_dynamic_work, {:definition, reason}}}
    end
  end

  defp dynamic_work_definition(%Inspection.Snapshot{}, _overrides) do
    {:error, {:invalid_dynamic_work, {:definition, :missing}}}
  end

  defp explain_projected_run(run_id, overrides) do
    with {:ok, storage} <- journal_storage(overrides) do
      Squidie.ReadModel.Explanation.explain(
        storage,
        run_id,
        projected_snapshot_options(overrides)
      )
    end
  end

  defp read_model(overrides) when is_list(overrides) do
    with :ok <- validate_keyword_options(overrides) do
      configured_read_model(overrides)
    end
  end

  defp read_model(_overrides), do: {:error, {:invalid_option, {:opts, :invalid}}}

  defp runtime(overrides) when is_list(overrides) do
    with :ok <- validate_keyword_options(overrides) do
      configured_runtime(overrides)
    end
  end

  defp runtime(_overrides), do: {:error, {:invalid_option, {:opts, :invalid}}}

  defp validate_keyword_options(overrides) do
    if Keyword.keyword?(overrides) do
      :ok
    else
      {:error, {:invalid_option, {:opts, :invalid}}}
    end
  end

  defp configured_read_model(overrides) do
    case Keyword.fetch(overrides, :read_model) do
      {:ok, read_model} when read_model in @read_models ->
        {:ok, read_model}

      {:ok, _read_model} ->
        {:error, {:invalid_option, {:read_model, :invalid}}}

      :error ->
        load_configured_read_model(overrides)
    end
  end

  defp load_configured_read_model(overrides) do
    case Config.load(config_routing_overrides(overrides)) do
      {:ok, %Config{read_model: read_model}} -> {:ok, read_model}
      {:error, _reason} = error -> error
    end
  end

  defp configured_runtime(overrides) do
    case Keyword.fetch(overrides, :runtime) do
      {:ok, runtime} when runtime in @runtimes ->
        {:ok, runtime}

      {:ok, _runtime} ->
        {:error, {:invalid_option, {:runtime, :invalid}}}

      :error ->
        load_configured_runtime(overrides)
    end
  end

  defp load_configured_runtime(overrides) do
    case Config.load(config_routing_overrides(overrides)) do
      {:ok, %Config{runtime: runtime}} -> {:ok, runtime}
      {:error, _reason} = error -> error
    end
  end

  defp journal_storage(overrides) do
    case Keyword.fetch(overrides, :journal_storage) do
      {:ok, storage} ->
        Options.storage(storage)

      :error ->
        case Config.load(config_routing_overrides(overrides)) do
          {:ok, %Config{} = config} -> Options.storage(config.journal_storage)
          {:error, {:missing_config, [:journal_storage]}} -> Options.storage(nil)
          {:error, _reason} = error -> error
        end
    end
  end

  defp projected_snapshot_options(overrides) do
    configured_journal_options(overrides, @projection_snapshot_options)
  end

  defp journal_start_options(overrides) do
    configured_journal_options(overrides, @journal_start_options)
  end

  defp journal_spec_start_options(overrides) do
    configured_journal_options(overrides, @journal_spec_start_options)
  end

  defp journal_control_options(overrides) do
    configured_journal_options(overrides, @journal_control_options)
  end

  defp journal_execute_options(overrides) do
    configured_journal_options(overrides, @journal_execute_options)
  end

  defp journal_list_options(overrides) do
    configured_journal_options(overrides, @projection_list_options)
  end

  defp configured_journal_options(overrides, keys) do
    case load_config_for_journal_options(overrides) do
      {:ok, %Config{} = config} ->
        [
          runtime: :journal,
          journal_storage: config.journal_storage,
          queue: config.queue
        ]
        |> Keyword.merge(Keyword.take(overrides, keys))
        |> Keyword.take(keys)

      {:error, _reason} ->
        Keyword.take(overrides, keys)
    end
  end

  defp load_config_for_journal_options(overrides) do
    case Config.load(config_routing_overrides(overrides)) do
      {:ok, %Config{} = config} ->
        {:ok, config}

      {:error, _reason} ->
        overrides
        |> Keyword.delete(:journal_storage)
        |> config_routing_overrides()
        |> Config.load()
    end
  end

  defp config_routing_overrides(overrides) do
    Keyword.take(overrides, [
      :repo,
      :runtime,
      :read_model,
      :journal_storage
    ])
  end
end