Skip to main content

lib/council_ex/runner/recorder_process.ex

defmodule CouncilEx.Runner.RecorderProcess do
  @moduledoc """
  GenServer that hosts a `CouncilEx.Recorder` callback module for a
  single run.

  Started by `CouncilEx.start/3` *before* the `RunServer` (so the
  subscribe to the run topic lands in time for `:run_started`).
  Spawned unsupervised (`GenServer.start/3` semantics). Subscribes to
  the run topic in `init/1`, dispatches each broadcast event to the
  user's `handle_event/2`, and calls `handle_finalize/2` exactly once
  on terminal outcome — either via the `:run_completed` /
  `:run_failed` PubSub events, or via `Process.monitor/1` if the
  `RunServer` dies without broadcasting.

  Internal — do not use directly. Configure via the `:recorder`
  option on `CouncilEx.start/3`.
  """

  use GenServer, restart: :temporary

  alias CouncilEx.PubSub

  defstruct [:run_id, :module, :user_state, :run_pid, :run_monitor, :finalized]

  @type t :: %__MODULE__{
          run_id: String.t(),
          module: module(),
          user_state: term(),
          run_pid: pid() | nil,
          run_monitor: reference() | nil,
          finalized: boolean()
        }

  @doc """
  Start a recorder process for `run_id` with the given user `module`
  and `args`.

  This call subscribes to the run topic synchronously and invokes
  `module.init(run_id, args)` before returning, guaranteeing no events
  are missed between recorder start and `RunServer` start.
  """
  @spec start_link(keyword()) :: GenServer.on_start()
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  @doc "Unsupervised + unlinked spawn (mirrors `GenServer.start/3`)."
  @spec start(keyword()) :: GenServer.on_start()
  def start(opts) do
    GenServer.start(__MODULE__, opts)
  end

  @doc """
  Inform the recorder of the `RunServer` pid so it can monitor for
  crashes that don't produce a terminal broadcast.

  Sent from `CouncilEx.start/3` after `DynamicSupervisor.start_child/2`
  succeeds.
  """
  @spec set_run_pid(pid(), pid()) :: :ok
  def set_run_pid(recorder_pid, run_pid) when is_pid(recorder_pid) and is_pid(run_pid) do
    GenServer.cast(recorder_pid, {:set_run_pid, run_pid})
  end

  @impl GenServer
  def init(opts) do
    run_id = Keyword.fetch!(opts, :run_id)
    module = Keyword.fetch!(opts, :module)
    args = Keyword.get(opts, :args, %{})

    :ok = PubSub.subscribe(topic(run_id))

    case module.init(run_id, args) do
      {:ok, user_state} ->
        {:ok,
         %__MODULE__{
           run_id: run_id,
           module: module,
           user_state: user_state,
           finalized: false
         }}

      {:stop, reason} ->
        :ok = PubSub.unsubscribe(topic(run_id))
        {:stop, reason}
    end
  end

  @impl GenServer
  def handle_cast({:set_run_pid, run_pid}, %__MODULE__{finalized: false} = state) do
    ref = Process.monitor(run_pid)
    {:noreply, %{state | run_pid: run_pid, run_monitor: ref}}
  end

  def handle_cast({:set_run_pid, _run_pid}, state), do: {:noreply, state}

  @impl GenServer
  def handle_info({:run_completed, run_id, result}, %__MODULE__{run_id: run_id} = state) do
    finalize({:ok, result}, state)
  end

  def handle_info(
        {:run_failed, run_id, errors, result},
        %__MODULE__{run_id: run_id} = state
      ) do
    outcome = classify_failure(errors, result)
    finalize(outcome, state)
  end

  def handle_info(
        {:DOWN, ref, :process, _pid, reason},
        %__MODULE__{run_monitor: ref, finalized: false} = state
      ) do
    finalize({:error, {:runserver_crashed, reason}}, state)
  end

  def handle_info({:DOWN, _ref, :process, _pid, _reason}, state), do: {:noreply, state}

  def handle_info(event, %__MODULE__{run_id: run_id, finalized: false} = state)
      when is_tuple(event) and elem(event, 1) == run_id do
    {:ok, user_state} = state.module.handle_event(event, state.user_state)
    {:noreply, %{state | user_state: user_state}}
  end

  def handle_info(_other, state), do: {:noreply, state}

  defp finalize(_outcome, %__MODULE__{finalized: true} = state), do: {:stop, :normal, state}

  defp finalize(outcome, %__MODULE__{} = state) do
    :ok = state.module.handle_finalize(outcome, state.user_state)

    if state.run_monitor, do: Process.demonitor(state.run_monitor, [:flush])
    :ok = PubSub.unsubscribe(topic(state.run_id))

    {:stop, :normal, %{state | finalized: true}}
  end

  defp classify_failure(errors, _result) when is_list(errors) do
    if Enum.any?(errors, &cancelled?/1) do
      :cancelled
    else
      {:error, errors}
    end
  end

  defp classify_failure(other, _result), do: {:error, other}

  defp cancelled?(%{kind: :cancelled}), do: true
  defp cancelled?(_), do: false

  defp topic(run_id), do: "council_ex:run:#{run_id}"
end