Skip to main content

lib/squidie/runtime/run_index_projection.ex

defmodule Squidie.Runtime.RunIndexProjection do
  @moduledoc """
  Rebuildable projection over a workflow's run-index journal.

  Run-index entries are lookup facts, not execution state. They let the
  Jido-native runtime rebuild "which runs exist for this workflow?" from the
  journal boundary without scanning storage adapter internals.

  Duplicate entries for the same run are idempotent when they carry the same
  workflow and timestamp. Conflicting or malformed persisted entries are kept as
  anomalies so callers can surface index drift without losing the valid portion
  of the read model.
  """

  alias Squidie.Runtime.DispatchProtocol.Entry
  alias Squidie.Runtime.RunProjection

  @type anomaly :: %{
          required(:reason) => atom(),
          required(:entry_type) => atom(),
          optional(:run_id) => String.t(),
          optional(:workflow) => String.t(),
          optional(:queue) => String.t()
        }

  @type run_summary :: %{
          required(:run_id) => String.t(),
          required(:workflow) => String.t(),
          required(:indexed_at) => DateTime.t(),
          required(:queue) => String.t()
        }

  @type t :: %__MODULE__{
          workflow: String.t() | nil,
          runs: %{optional(String.t()) => run_summary()},
          anomalies: [anomaly()]
        }

  defstruct workflow: nil,
            runs: %{},
            anomalies: []

  @doc """
  Returns a new empty run-index projection.
  """
  @spec new(String.t() | nil) :: t()
  def new(workflow \\ nil), do: %__MODULE__{workflow: workflow}

  @doc """
  Rebuilds a run-index projection from durable journal entries.
  """
  @spec rebuild([Entry.t()]) :: t()
  def rebuild(entries) when is_list(entries) do
    replay(new(), entries)
  end

  @doc """
  Replays additional run-index entries into an existing projection.
  """
  @spec replay(t(), [Entry.t()]) :: t()
  def replay(%__MODULE__{} = projection, entries) when is_list(entries) do
    Enum.reduce(entries, projection, &apply_entry/2)
  end

  @doc """
  Returns the workflow this index projection describes.
  """
  @spec workflow(t()) :: String.t() | nil
  def workflow(%__MODULE__{workflow: workflow}), do: workflow

  @doc """
  Returns indexed run summaries ordered by index timestamp and run id.
  """
  @spec runs(t()) :: [run_summary()]
  def runs(%__MODULE__{runs: runs}) do
    RunProjection.sorted_runs(runs)
  end

  @doc """
  Returns indexed run ids in the same deterministic order as `runs/1`.
  """
  @spec run_ids(t()) :: [String.t()]
  def run_ids(%__MODULE__{} = projection) do
    RunProjection.run_ids(projection.runs)
  end

  @doc """
  Returns malformed or conflicting index facts discovered during replay.
  """
  @spec anomalies(t()) :: [anomaly()]
  def anomalies(%__MODULE__{anomalies: anomalies}), do: RunProjection.anomalies(anomalies)

  defp apply_entry(%Entry{type: :run_indexed, data: data} = entry, %__MODULE__{} = projection) do
    if valid_index_data?(data) do
      index_run(projection, entry, data)
    else
      add_anomaly(projection, entry, :malformed_entry)
    end
  end

  defp apply_entry(%Entry{}, %__MODULE__{} = projection), do: projection

  defp valid_index_data?(data) when is_map(data) do
    is_binary(Map.get(data, :run_id)) and is_binary(Map.get(data, :workflow)) and
      is_binary(Map.get(data, :queue))
  end

  defp valid_index_data?(_data), do: false

  defp index_run(%__MODULE__{workflow: nil} = projection, entry, data) do
    index_run(%__MODULE__{projection | workflow: data.workflow}, entry, data)
  end

  defp index_run(%__MODULE__{workflow: workflow} = projection, entry, data)
       when data.workflow != workflow do
    add_anomaly(projection, entry, :conflicting_workflow)
  end

  defp index_run(%__MODULE__{runs: runs} = projection, entry, data) do
    summary = RunProjection.summary(data.run_id, data.workflow, data.queue, entry.occurred_at)

    case Map.fetch(runs, data.run_id) do
      {:ok, ^summary} ->
        projection

      {:ok, _existing_summary} ->
        add_anomaly(projection, entry, :conflicting_run_index)

      :error ->
        %__MODULE__{projection | runs: Map.put(runs, data.run_id, summary)}
    end
  end

  defp add_anomaly(%__MODULE__{} = projection, %Entry{} = entry, reason) do
    %__MODULE__{
      projection
      | anomalies: [RunProjection.anomaly(entry, reason) | projection.anomalies]
    }
  end
end