defmodule CouncilEx.Runner.RecorderProcess do
@moduledoc """
GenServer that hosts a `CouncilEx.Recorder` callback module for a
single run.
Started by `CouncilEx.start/3` *before* the `RunServer` (so the
subscribe to the run topic lands in time for `:run_started`).
Spawned unsupervised (`GenServer.start/3` semantics). Subscribes to
the run topic in `init/1`, dispatches each broadcast event to the
user's `handle_event/2`, and calls `handle_finalize/2` exactly once
on terminal outcome — either via the `:run_completed` /
`:run_failed` PubSub events, or via `Process.monitor/1` if the
`RunServer` dies without broadcasting.
Internal — do not use directly. Configure via the `:recorder`
option on `CouncilEx.start/3`.
"""
use GenServer, restart: :temporary
alias CouncilEx.PubSub
defstruct [:run_id, :module, :user_state, :run_pid, :run_monitor, :finalized]
@type t :: %__MODULE__{
run_id: String.t(),
module: module(),
user_state: term(),
run_pid: pid() | nil,
run_monitor: reference() | nil,
finalized: boolean()
}
@doc """
Start a recorder process for `run_id` with the given user `module`
and `args`.
This call subscribes to the run topic synchronously and invokes
`module.init(run_id, args)` before returning, guaranteeing no events
are missed between recorder start and `RunServer` start.
"""
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
@doc "Unsupervised + unlinked spawn (mirrors `GenServer.start/3`)."
@spec start(keyword()) :: GenServer.on_start()
def start(opts) do
GenServer.start(__MODULE__, opts)
end
@doc """
Inform the recorder of the `RunServer` pid so it can monitor for
crashes that don't produce a terminal broadcast.
Sent from `CouncilEx.start/3` after `DynamicSupervisor.start_child/2`
succeeds.
"""
@spec set_run_pid(pid(), pid()) :: :ok
def set_run_pid(recorder_pid, run_pid) when is_pid(recorder_pid) and is_pid(run_pid) do
GenServer.cast(recorder_pid, {:set_run_pid, run_pid})
end
@impl GenServer
def init(opts) do
run_id = Keyword.fetch!(opts, :run_id)
module = Keyword.fetch!(opts, :module)
args = Keyword.get(opts, :args, %{})
:ok = PubSub.subscribe(topic(run_id))
case module.init(run_id, args) do
{:ok, user_state} ->
{:ok,
%__MODULE__{
run_id: run_id,
module: module,
user_state: user_state,
finalized: false
}}
{:stop, reason} ->
:ok = PubSub.unsubscribe(topic(run_id))
{:stop, reason}
end
end
@impl GenServer
def handle_cast({:set_run_pid, run_pid}, %__MODULE__{finalized: false} = state) do
ref = Process.monitor(run_pid)
{:noreply, %{state | run_pid: run_pid, run_monitor: ref}}
end
def handle_cast({:set_run_pid, _run_pid}, state), do: {:noreply, state}
@impl GenServer
def handle_info({:run_completed, run_id, result}, %__MODULE__{run_id: run_id} = state) do
finalize({:ok, result}, state)
end
def handle_info(
{:run_failed, run_id, errors, result},
%__MODULE__{run_id: run_id} = state
) do
outcome = classify_failure(errors, result)
finalize(outcome, state)
end
def handle_info(
{:DOWN, ref, :process, _pid, reason},
%__MODULE__{run_monitor: ref, finalized: false} = state
) do
finalize({:error, {:runserver_crashed, reason}}, state)
end
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state), do: {:noreply, state}
def handle_info(event, %__MODULE__{run_id: run_id, finalized: false} = state)
when is_tuple(event) and elem(event, 1) == run_id do
{:ok, user_state} = state.module.handle_event(event, state.user_state)
{:noreply, %{state | user_state: user_state}}
end
def handle_info(_other, state), do: {:noreply, state}
defp finalize(_outcome, %__MODULE__{finalized: true} = state), do: {:stop, :normal, state}
defp finalize(outcome, %__MODULE__{} = state) do
:ok = state.module.handle_finalize(outcome, state.user_state)
if state.run_monitor, do: Process.demonitor(state.run_monitor, [:flush])
:ok = PubSub.unsubscribe(topic(state.run_id))
{:stop, :normal, %{state | finalized: true}}
end
defp classify_failure(errors, _result) when is_list(errors) do
if Enum.any?(errors, &cancelled?/1) do
:cancelled
else
{:error, errors}
end
end
defp classify_failure(other, _result), do: {:error, other}
defp cancelled?(%{kind: :cancelled}), do: true
defp cancelled?(_), do: false
defp topic(run_id), do: "council_ex:run:#{run_id}"
end