Skip to main content

lib/council_ex.ex

defmodule CouncilEx do
  @moduledoc """
  Multi-model LLM council workflows for Elixir.

  ## Usage

      defmodule MyCouncil do
        use CouncilEx

        member :optimist, MyApp.Members.Optimist, provider: :open_router, model: "openai/gpt-4o-mini"
        member :skeptic,  MyApp.Members.Skeptic,  provider: :open_router, model: "anthropic/claude-3.5-sonnet"

        round :independent_analysis

        chair MyApp.Members.Synthesizer, id: :chair, provider: :open_router, model: "openai/gpt-4o"
      end

      {:ok, result} = CouncilEx.run(MyCouncil, %{question: "go or wait?"})
  """

  alias CouncilEx.{PubSub, Result}
  alias CouncilEx.RunServer
  alias CouncilEx.Runner.RecorderProcess

  @builtin_rounds %{
    independent_analysis: CouncilEx.Rounds.IndependentAnalysis,
    synthesis: CouncilEx.Rounds.Synthesis,
    peer_review: CouncilEx.Rounds.PeerReview,
    anonymized_peer_review: CouncilEx.Rounds.AnonymizedPeerReview,
    critique: CouncilEx.Rounds.Critique,
    ranking: CouncilEx.Rounds.Ranking,
    vote: CouncilEx.Rounds.Vote,
    iterate: CouncilEx.Rounds.Iterate,
    pairwise_elimination: CouncilEx.Rounds.PairwiseElimination
  }

  @doc """
  Start an async run; returns `{:ok, pid}` immediately. Get the run id
  from the pid with `CouncilEx.RunServer.run_id/1`.

  ## Options

    * `:verbose` — `true | :debug | false` (default `false`). When set,
      attaches a `CouncilEx.Verbose` tracer subscribed to the run's topic
      that prints a timeline to stdout. `true` = timeline + duration +
      tokens. `:debug` = also dumps response bodies (truncated). The
      tracer auto-detaches on terminal event.
    * `:verbose_io` — IO device for the tracer (default `:stdio`).
    * `:subscribe` — `true | false` (default `false`). When `true`, subscribes
      the calling process to the run's PubSub topic BEFORE the RunServer is
      started, guaranteeing no events are missed. Without this, callers must
      call `CouncilEx.PubSub.subscribe/1` themselves AFTER `start/3`
      returns, which races against the RunServer's continue callback under
      load. Recommended for any caller that needs the full event timeline.
    * `:registry` — name of a `Registry` to register the run under.
      Defaults to the bundled `CouncilEx.Runner.Registry`. Override
      only when you need tenant-level isolation (so two tenants can't
      collide on `run_id`). The caller MUST start the registry first:
      `{Registry, keys: :unique, name: MyApp.RunReg}`. Lookup APIs
      (`await/2`, `cancel/2`, `terminate_run/2`, `pid_for/2`,
      `list_active_runs/1`) take the same `:registry` opt and must be
      passed the same name. Most apps should leave this alone — `run_id`
      is already a strong unique key.
    * `:relay_topics` — string topic or list of topics. Every PubSub
      event broadcast for this run is also published to each of these
      topics. Use for cross-run aggregation (one app-wide topic that
      receives every event from every council run, no aggregator
      process needed).
    * `:run_id` — explicit run id (binary). When omitted, the runtime
      generates one. Supply this when an external system has already
      assigned an identity to the run (e.g. an Oban job that wants the
      same id across retries, or a tenant that needs a domain-specific
      naming scheme). Caller is responsible for uniqueness.
    * `:recorder` — `{module, args}` tuple where `module` implements
      `CouncilEx.Recorder`. The runtime spawns a recorder process
      *before* the `RunServer` starts (so it subscribes to the run
      topic in time for `:run_started`), then dispatches lifecycle
      events to `module.handle_event/2`. The recorder exits after
      `module.handle_finalize/2` is called with the terminal outcome.
      A recorder crash does not kill the run.
  """
  @spec start(module() | CouncilEx.DynamicCouncil.t(), term(), keyword()) ::
          {:ok, pid()} | {:error, term()}
  def start(council, input, opts \\ []), do: do_start(council, input, opts, :unlinked)

  @doc """
  Like `start/3`, but links the calling process to the run server
  (`GenServer.start_link/3` semantics).

  Caller-die kills the run; runner-die kills the caller (unless trapping
  exits). Use when the caller owns the run's lifecycle — Oban workers,
  tests, or any context where a leaked background run would burn tokens.

  Returns `{:ok, pid}`. Use `CouncilEx.RunServer.run_id/1` if you
  need the auto-generated run_id (e.g. for PubSub subscribe across
  processes, or to persist as a durable handle).
  """
  @spec start_link(module() | CouncilEx.DynamicCouncil.t(), term(), keyword()) ::
          {:ok, pid()} | {:error, term()}
  def start_link(council, input, opts \\ []), do: do_start(council, input, opts, :linked)

  defp do_start(%CouncilEx.DynamicCouncil{} = council, input, opts, mode) do
    case CouncilEx.DynamicCouncil.validate(council) do
      :ok ->
        spec = CouncilEx.DynamicCouncil.to_spec(council)
        spawn_run_server("dynamic:" <> council.id, spec, input, opts, mode)

      {:error, errors} ->
        {:error, {:invalid_council, errors}}
    end
  end

  defp do_start(council_module, input, opts, mode) when is_atom(council_module) do
    case validate(council_module) do
      :ok ->
        spec = council_module.__council__()
        spawn_run_server(council_module, spec, input, opts, mode)

      {:error, errors} ->
        {:error, {:invalid_council, errors}}
    end
  end

  defp spawn_run_server(council_id, spec, input, opts, mode) do
    run_id = Keyword.get(opts, :run_id) || generate_run_id()
    verbose = Keyword.get(opts, :verbose, false)
    subscribe? = Keyword.get(opts, :subscribe, false)
    registry = Keyword.get(opts, :registry, CouncilEx.RunServer.default_registry())

    recorder =
      case Keyword.fetch(opts, :recorder) do
        {:ok, value} -> value
        :error -> CouncilEx.Config.recorder()
      end

    # Capture the caller chain here (in the calling process) so that the RunServer
    # and its spawned tasks can walk back to find Mock scripts registered by test procs.
    callers = [self() | Process.get(:"$callers", [])]

    server_opts = [
      run_id: run_id,
      council: council_id,
      spec: spec,
      input: input,
      user_opts: opts,
      registry: registry,
      callers: callers
    ]

    # Spawn the recorder BEFORE the RunServer so it subscribes to the
    # run topic in time to receive :run_started and every subsequent
    # event without race.
    recorder_pid = maybe_start_recorder(recorder, run_id)

    # Subscribe the caller BEFORE starting the RunServer so that no events
    # broadcast from init/1 or the immediately-following continue callback
    # are missed. This eliminates a documented race where the caller would
    # otherwise subscribe after start/3 returns — by which point the
    # RunServer's handle_continue may have already broadcast :run_started
    # and beyond.
    if subscribe?, do: PubSub.subscribe(topic_for(run_id))

    # Attach the verbose tracer BEFORE the RunServer starts so it does not
    # miss the :run_started broadcast emitted from RunServer.init/1.
    if verbose, do: maybe_attach_verbose(run_id, verbose, opts)

    spawn_result =
      case mode do
        :linked -> RunServer.start_link(server_opts)
        :unlinked -> RunServer.start(server_opts)
      end

    case spawn_result do
      {:ok, pid} ->
        # Tell the recorder which RunServer to monitor so a crash without
        # a terminal broadcast still triggers handle_finalize/2.
        if recorder_pid, do: RecorderProcess.set_run_pid(recorder_pid, pid)

        {:ok, pid}

      {:error, _} = err ->
        # If the RunServer failed to start, tear the recorder down too —
        # otherwise it sits subscribed forever waiting for events that
        # will never arrive.
        if recorder_pid && Process.alive?(recorder_pid) do
          Process.exit(recorder_pid, :shutdown)
        end

        err
    end
  end

  defp maybe_start_recorder(nil, _run_id), do: nil

  defp maybe_start_recorder({module, args}, run_id) when is_atom(module) do
    case RecorderProcess.start(run_id: run_id, module: module, args: args) do
      {:ok, pid} ->
        pid

      # Recorder startup is best-effort — a recorder that refuses to init
      # should not block the run. Surface via telemetry instead of crashing.
      {:error, reason} ->
        :telemetry.execute(
          [:council_ex, :recorder, :start_failed],
          %{system_time: System.system_time()},
          %{run_id: run_id, module: module, reason: reason}
        )

        nil
    end
  end

  defp maybe_attach_verbose(run_id, verbose, opts) do
    mode = if verbose == :debug, do: :debug, else: true
    io = Keyword.get(opts, :verbose_io, :stdio)
    pid = CouncilEx.Verbose.attach(run_id, mode: mode, io: io)
    # Stash the tracer pid so the synchronous run/3 path can wait for it
    # to flush final writes after await/2 returns, avoiding IO races.
    Process.put({:__council_verbose_pid__, run_id}, pid)
    :ok
  end

  @doc "Subscribe the calling process to run events."
  @spec subscribe(String.t()) :: :ok
  def subscribe(run_id) when is_binary(run_id) do
    PubSub.subscribe(topic_for(run_id))
  end

  @doc "Unsubscribe the calling process from run events."
  @spec unsubscribe(String.t()) :: :ok
  def unsubscribe(run_id) when is_binary(run_id) do
    PubSub.unsubscribe(topic_for(run_id))
  end

  @doc """
  Block waiting for the run to finish; returns the Result.

  Accepts a runner pid (preferred — direct, no registry lookup needed
  for the PubSub topic resolution) or a `run_id` binary (looks up the
  pid via Registry; pass `:registry` for caller-owned registries).
  """
  @spec await(pid() | String.t(), keyword()) ::
          {:ok, Result.t()} | {:error, Result.t()} | {:error, :unknown_run | :timeout}
  def await(pid_or_run_id, opts \\ [])

  def await(pid, opts) when is_pid(pid) do
    await(CouncilEx.RunServer.run_id(pid), opts)
  end

  def await(run_id, opts) when is_binary(run_id) do
    timeout = Keyword.get(opts, :await_timeout, :infinity)
    registry = Keyword.get(opts, :registry, CouncilEx.RunServer.default_registry())
    topic = topic_for(run_id)

    PubSub.subscribe(topic)

    case Registry.lookup(registry, run_id) do
      [] ->
        # Either never started or already finished and deregistered.
        # Try fetch_result; if process gone, give up.
        PubSub.unsubscribe(topic)
        {:error, :unknown_run}

      [{_pid, _}] ->
        do_await(run_id, timeout, registry)
    end
  end

  defp do_await(run_id, timeout, registry) do
    receive do
      {:run_completed, ^run_id, %Result{} = r} -> {:ok, r}
      {:run_failed, ^run_id, _errors, %Result{} = r} -> {:error, r}
    after
      0 ->
        # No event in mailbox. Try fetch_result for race fallback.
        case fetch_with_fallback(run_id, timeout, registry) do
          :keep_waiting ->
            receive do
              {:run_completed, ^run_id, %Result{} = r} -> {:ok, r}
              {:run_failed, ^run_id, _errors, %Result{} = r} -> {:error, r}
            after
              normalize_timeout(timeout) -> {:error, :timeout}
            end

          other ->
            other
        end
    end
  end

  defp fetch_with_fallback(run_id, timeout, registry) do
    try do
      RunServer.fetch_result(run_id, normalize_timeout_for_call(timeout), registry: registry)
    catch
      :exit, {:noproc, _} -> :keep_waiting
      :exit, {:timeout, _} -> :keep_waiting
    end
  end

  defp normalize_timeout(:infinity), do: :infinity
  defp normalize_timeout(t) when is_integer(t), do: t

  defp normalize_timeout_for_call(:infinity), do: 60_000
  defp normalize_timeout_for_call(t) when is_integer(t), do: t

  @doc """
  Cancel a running council cooperatively (the runner will finalize as
  `:cancelled` after the in-flight round drains).

  Accepts a runner pid (preferred — direct `GenServer.cast`) or a
  `run_id` binary (looks up the pid via Registry).

  Returns:
    * `:ok` — cancel signal delivered.
    * `{:error, :unknown_run}` — no entry in the registry (run_id form only).
    * `{:error, :runner_dead}` — the registry holds a stale entry whose
      pid is gone, OR the supplied pid is dead. Caller can fall through
      to `terminate_run/2` or treat the run as already over.

  This is a *cooperative* signal — for an immediate kill (e.g., the
  run is wedged inside a non-responsive provider call), use
  `terminate_run/2`.
  """
  @spec cancel(pid() | String.t(), keyword()) ::
          :ok | {:error, :unknown_run | :runner_dead}
  def cancel(pid_or_run_id, opts \\ [])

  def cancel(pid, _opts) when is_pid(pid) do
    if Process.alive?(pid) do
      GenServer.cast(pid, :cancel)
    else
      {:error, :runner_dead}
    end
  end

  def cancel(run_id, opts) when is_binary(run_id) do
    registry = Keyword.get(opts, :registry, CouncilEx.RunServer.default_registry())

    case Registry.lookup(registry, run_id) do
      [] -> {:error, :unknown_run}
      [{pid, _}] -> cancel(pid, opts)
    end
  end

  @doc """
  Look up the runner pid for `run_id`.

  Returns:
    * `{:ok, pid()}` when the runner is alive.
    * `{:error, :unknown_run}` when no registry entry exists.
    * `{:error, :runner_dead}` when a stale entry points at a dead pid.

  Useful for `Process.monitor/1` after the fact, or for asserting a run
  is still alive before issuing a long blocking call.
  """
  @spec pid_for(String.t(), keyword()) ::
          {:ok, pid()} | {:error, :unknown_run | :runner_dead}
  def pid_for(run_id, opts \\ []) when is_binary(run_id),
    do: CouncilEx.RunServer.pid_for(run_id, opts)

  @doc """
  Forcefully terminate a run via `Process.exit/2`.

  Looks up the runner pid by `run_id` and sends `:shutdown`. Returns
  `:ok` on success or `{:error, :unknown_run}` if the run id has no
  live registry entry.

  Unlike `cancel/1`, this is non-cooperative: the run process is killed
  immediately and no `:run_completed` / `:run_failed` event is emitted.
  Use when:

    * the runner is wedged and `cancel/1` won't be drained, or
    * the host application is shutting down a tenant / session and
      needs runs gone synchronously.

  Subscribers should still receive a `:DOWN` message if they monitored
  the runner pid (returned from `start/3` / `start_link/3`).
  """
  @spec terminate_run(pid() | String.t(), keyword()) :: :ok | {:error, :unknown_run}
  def terminate_run(pid_or_run_id, opts \\ [])

  def terminate_run(pid, _opts) when is_pid(pid) do
    if Process.alive?(pid), do: Process.exit(pid, :shutdown)
    :ok
  end

  def terminate_run(run_id, opts) when is_binary(run_id) do
    registry = Keyword.get(opts, :registry, CouncilEx.RunServer.default_registry())

    case Registry.lookup(registry, run_id) do
      [{pid, _}] -> terminate_run(pid, opts)
      [] -> {:error, :unknown_run}
    end
  end

  @doc """
  Validate a council without running it. Returns `:ok` or
  `{:error, [%{path, code, message}]}`.

  Accepts a module-form council (atom) or a `%CouncilEx.DynamicCouncil{}`.
  Dynamic councils delegate to `CouncilEx.DynamicCouncil.validate/1`.
  Module-form councils walk the compiled spec and verify that every
  member's resolved opts include `:provider` and `:model`, and that any
  referenced provider is registered under `config :council_ex, :providers`.

  `start/3` runs this validator first and returns
  `{:error, {:invalid_council, errs}}` on failure — call this manually
  when you want to surface structured errors before dispatch (e.g., from
  a builder UI, before charging tokens or spawning processes).

  Sub-council shim members (`provider: :__sub_council__`) are skipped;
  their inner councils are validated recursively when the sub-council
  ref is itself a `%DynamicCouncil{}`.
  """
  @spec validate(module() | CouncilEx.DynamicCouncil.t()) ::
          :ok | {:error, [map()]}
  def validate(%CouncilEx.DynamicCouncil{} = c),
    do: CouncilEx.DynamicCouncil.validate(c)

  def validate(council_module) when is_atom(council_module) do
    case build_module_spec(council_module) do
      {:ok, spec} ->
        errors =
          []
          |> validate_module_members(spec)
          |> validate_module_chair(spec)
          |> Enum.reverse()

        case errors do
          [] -> :ok
          errs -> {:error, errs}
        end

      {:error, _} = err ->
        err
    end
  end

  defp build_module_spec(mod) do
    cond do
      not Code.ensure_loaded?(mod) ->
        {:error,
         [
           %{
             path: [],
             code: :not_a_council,
             message: "module #{inspect(mod)} not loaded; cannot validate as a council"
           }
         ]}

      not function_exported?(mod, :__council__, 0) ->
        {:error,
         [
           %{
             path: [],
             code: :not_a_council,
             message: "#{inspect(mod)} does not export __council__/0; missing `use CouncilEx`?"
           }
         ]}

      true ->
        try do
          {:ok, mod.__council__()}
        rescue
          e ->
            {:error,
             [
               %{
                 path: [],
                 code: :spec_build_failed,
                 message:
                   "council module #{inspect(mod)} failed to build its spec: " <>
                     Exception.message(e)
               }
             ]}
        end
    end
  end

  defp validate_module_members(errs, %CouncilEx.Spec{members: members}) do
    members
    |> Enum.with_index()
    |> Enum.reduce(errs, fn {{id, _mod, opts}, idx}, acc ->
      validate_module_member_opts(acc, id, opts, ["members", idx])
    end)
  end

  defp validate_module_chair(errs, %CouncilEx.Spec{chair: nil}), do: errs

  defp validate_module_chair(errs, %CouncilEx.Spec{chair: {id, _mod, opts}}) do
    validate_module_member_opts(errs, id, opts, ["chair"])
  end

  defp validate_module_member_opts(errs, id, opts, path) do
    if Keyword.get(opts, :provider) == :__sub_council__ do
      errs
    else
      errs
      |> check_required_member_key(opts, :provider, id, path, :missing_provider)
      |> check_required_member_key(opts, :model, id, path, :missing_model)
      |> check_member_provider_registered(opts, id, path)
    end
  end

  defp check_required_member_key(errs, opts, key, id, path, code) do
    if Keyword.has_key?(opts, key) do
      errs
    else
      [
        %{
          path: path,
          code: code,
          message:
            "member #{inspect(id)} has no :#{key} after profile resolution; " <>
              "set it via :profile, council :default_profile, or inline opts"
        }
        | errs
      ]
    end
  end

  defp check_member_provider_registered(errs, opts, id, path) do
    case Keyword.get(opts, :provider) do
      nil ->
        errs

      :__sub_council__ ->
        errs

      provider when is_atom(provider) ->
        case CouncilEx.Provider.resolve(provider) do
          {:ok, _} ->
            errs

          {:error, :unknown_provider} ->
            [
              %{
                path: path ++ ["provider"],
                code: :unknown_provider,
                message:
                  "member #{inspect(id)} references unregistered provider " <>
                    "#{inspect(provider)}; register under " <>
                    "`config :council_ex, :providers`"
              }
              | errs
            ]
        end

      other ->
        [
          %{
            path: path ++ ["provider"],
            code: :invalid_provider,
            message:
              "member #{inspect(id)} :provider must be an atom, got " <>
                inspect(other)
          }
          | errs
        ]
    end
  end

  @doc """
  Run a council synchronously.

  Accepts either a council module (compile-time DSL) or a
  `%CouncilEx.DynamicCouncil{}` (data-only). For dynamic councils,
  `Result.council` carries `"dynamic:" <> council.id`.
  """
  @spec run(
          module() | CouncilEx.DynamicCouncil.t() | CouncilEx.AutoCouncil.t(),
          term(),
          keyword()
        ) ::
          {:ok, Result.t()} | {:error, Result.t() | term()}
  def run(council, input, opts \\ [])

  def run(%CouncilEx.AutoCouncil{} = auto, input, opts) do
    with {:ok, decision, meta} <-
           CouncilEx.AutoCouncil.resolve(auto, extract_prompt(input)) do
      decision
      |> decision_to_runnable()
      |> do_sync_run(input, opts)
      |> attach_auto_meta(meta)
    end
  end

  def run(%CouncilEx.DynamicCouncil{} = council, input, opts) do
    do_sync_run(council, input, opts)
  end

  def run(council_module, input, opts) when is_atom(council_module) do
    do_sync_run(council_module, input, opts)
  end

  @doc """
  Convenience wrapper that routes `input` through the configured default
  `AutoCouncil` and runs it synchronously.

  Default config lives under `:council_ex, :auto`:

      config :council_ex, :auto,
        strategy:    :rules,
        catalog:     {:registry, :council},
        on_no_match: :error

  Per-call overrides are merged on top:

      CouncilEx.auto(%{question: "..."}, strategy: :cascade, options: [chain: [:rules]])

  The keyword list may mix `AutoCouncil` struct opts with run opts in the
  same call. Struct opts (`:strategy`, `:catalog`, `:on_no_match`, `:name`,
  `:options`, `:provider_check`) build the router; everything else is
  forwarded to `run/3` — including `:verbose`, `:verbose_io`,
  `:await_timeout`:

      CouncilEx.auto(%{question: "audit my SEO"}, verbose: true)

  See `CouncilEx.AutoCouncil` for the full options list. Inspect
  `result.metadata.auto` to see which council was picked.
  """
  @spec auto(term(), keyword()) :: {:ok, Result.t()} | {:error, Result.t() | term()}
  def auto(input, overrides \\ []) do
    cfg = Application.get_env(:council_ex, :auto, [])

    case Keyword.merge(cfg, overrides) do
      [] ->
        {:error, :no_auto_config}

      merged ->
        {struct_opts, run_opts} = split_auto_opts(merged)
        run(CouncilEx.AutoCouncil.new(struct_opts), input, run_opts)
    end
  end

  @auto_struct_keys [:strategy, :catalog, :on_no_match, :name, :options, :provider_check]
  defp split_auto_opts(opts), do: Keyword.split(opts, @auto_struct_keys)

  defp decision_to_runnable({:static, mod}), do: mod
  defp decision_to_runnable({:dynamic, %CouncilEx.DynamicCouncil{} = dc}), do: dc
  defp decision_to_runnable({:built, %CouncilEx.DynamicCouncil{} = dc}), do: dc

  defp attach_auto_meta({:ok, %Result{} = r}, meta), do: {:ok, put_auto_meta(r, meta)}
  defp attach_auto_meta({:error, %Result{} = r}, meta), do: {:error, put_auto_meta(r, meta)}
  defp attach_auto_meta(other, _meta), do: other

  defp put_auto_meta(%Result{metadata: m} = r, meta),
    do: %Result{r | metadata: Map.put(m || %{}, :auto, meta)}

  defp extract_prompt(input) when is_binary(input), do: input
  defp extract_prompt(%{question: q}) when is_binary(q), do: q
  defp extract_prompt(%{prompt: p}) when is_binary(p), do: p
  defp extract_prompt(other), do: inspect(other)

  defp do_sync_run(council, input, opts) do
    case start_link(council, input, opts) do
      {:ok, pid} ->
        run_id = RunServer.run_id(pid)
        result = await(run_id, await_timeout: Keyword.get(opts, :await_timeout, :infinity))
        wait_for_verbose_tracer(run_id)
        result

      err ->
        err
    end
  end

  # If a Verbose tracer was attached via verbose: opt, it stored its pid in
  # the process dictionary keyed by run_id. After await/2 returns, wait
  # briefly for the tracer task to flush its final writes and exit so that
  # callers (especially tests) can safely close their IO sink without racing.
  defp wait_for_verbose_tracer(run_id) do
    case Process.delete({:__council_verbose_pid__, run_id}) do
      pid when is_pid(pid) ->
        ref = Process.monitor(pid)

        receive do
          {:DOWN, ^ref, :process, ^pid, _} -> :ok
        after
          1_000 ->
            Process.demonitor(ref, [:flush])
            :ok
        end

      _ ->
        :ok
    end
  end

  @typedoc """
  Stable curated summary of an active (or recently completed) run, as
  returned by `list_active_runs/0`.
  """
  @type run_summary :: %{
          run_id: binary(),
          council: module(),
          status: :pending | :running | :completed | :failed | :cancelled,
          current_round: %{name: atom(), idx: non_neg_integer()} | nil,
          rounds_completed: non_neg_integer(),
          started_at: DateTime.t()
        }

  @doc """
  List currently active (or recently completed but not yet reaped) runs as
  curated summaries. Slow or dead runs are skipped without blocking.

  Options:
    * `:registry` — registry to scan; defaults to the bundled
      `CouncilEx.Runner.Registry`. Pass your own when using a
      caller-owned `Registry` (e.g. for tenant isolation, where each
      tenant has its own registry).
  """
  @spec list_active_runs(keyword()) :: [run_summary()]
  def list_active_runs(opts \\ []) do
    registry = Keyword.get(opts, :registry, CouncilEx.RunServer.default_registry())

    registry
    |> Registry.select([{{:"$1", :_, :_}, [], [:"$1"]}])
    |> Task.async_stream(&RunServer.state(&1, registry: registry),
      timeout: 1_000,
      on_timeout: :kill_task
    )
    |> Stream.flat_map(fn
      {:ok, {:ok, s}} -> [s]
      _ -> []
    end)
    |> Enum.to_list()
  end

  defp topic_for(run_id), do: "council_ex:run:#{run_id}"

  defp generate_run_id do
    bytes = :crypto.strong_rand_bytes(12)
    "run-" <> Base.url_encode64(bytes, padding: false)
  end

  @doc false
  def __builtin_rounds__, do: @builtin_rounds

  defmacro __using__(_opts) do
    quote do
      import Kernel, except: [round: 1]

      import CouncilEx,
        only: [
          member: 2,
          member: 3,
          round: 1,
          round: 2,
          chair: 2,
          router: 1,
          default_profile: 1
        ]

      Module.register_attribute(__MODULE__, :__council_members__, accumulate: true)
      Module.register_attribute(__MODULE__, :__council_rounds__, accumulate: true)
      Module.register_attribute(__MODULE__, :__council_chair__, accumulate: false)
      Module.register_attribute(__MODULE__, :__council_router__, accumulate: false)
      Module.register_attribute(__MODULE__, :__council_default_profile__, accumulate: false)

      @before_compile CouncilEx
    end
  end

  defmacro __before_compile__(_env) do
    quote do
      def __council__ do
        default_profile = @__council_default_profile__

        members =
          @__council_members__
          |> Enum.reverse()
          |> Enum.map(fn {id, mod, opts} ->
            {id, mod, CouncilEx.__resolve_member_opts__(id, opts, default_profile)}
          end)

        declared_rounds =
          @__council_rounds__
          |> Enum.reverse()
          |> Enum.map(fn
            {mod, {:opts_fn, fun_name}} -> {mod, apply(__MODULE__, fun_name, [])}
            {mod, opts} -> {mod, opts}
          end)

        chair =
          case @__council_chair__ do
            nil ->
              nil

            {chair_id, chair_mod, chair_opts} ->
              {chair_id, chair_mod,
               CouncilEx.__resolve_member_opts__(chair_id, chair_opts, default_profile)}
          end

        router = @__council_router__

        rounds = CouncilEx.__finalize_rounds__(declared_rounds, chair)

        %CouncilEx.Spec{
          council: __MODULE__,
          members: members,
          rounds: rounds,
          chair: chair,
          router: router,
          opts: []
        }
      end

      @doc """
      MapSet of providers this council references via its members + chair.

      Sentinels (`:__sub_council__`) and `nil` are filtered out. Sub-council
      members surface their inner providers — see
      `CouncilEx.AutoCouncil.Providers` for the recursion details.
      """
      def __providers__ do
        CouncilEx.AutoCouncil.Providers.from_spec(__council__())
      end
    end
  end

  @doc "Declare the default profile for members in this council."
  defmacro default_profile(profile_module) do
    quote do
      Module.put_attribute(
        __MODULE__,
        :__council_default_profile__,
        unquote(profile_module)
      )
    end
  end

  @doc "Declare a member in module form."
  defmacro member(id, module, opts) when is_atom(id) do
    quote do
      @__council_members__ {unquote(id), unquote(module), unquote(opts)}
    end
  end

  @doc "Declare a member with an inline block."
  defmacro member(id, do: block) when is_atom(id) do
    attr_name = :"__inline_member_spec_#{id}__"

    quote do
      Module.register_attribute(__MODULE__, unquote(attr_name), accumulate: true)

      import CouncilEx.Council.InlineMember.Block,
        only:
          unquote(
            Enum.map(CouncilEx.Council.InlineMember.__block_keys__(), &{&1, 1}) ++
              [transform_input: 1, validate: 1]
          )

      Process.put({:__inline_member__, __MODULE__}, unquote(attr_name))

      unquote(block)

      Process.delete({:__inline_member__, __MODULE__})

      spec = Module.get_attribute(__MODULE__, unquote(attr_name)) |> Enum.reverse()
      Module.delete_attribute(__MODULE__, unquote(attr_name))

      member_module =
        CouncilEx.Council.InlineMember.build_module(__MODULE__, unquote(id), spec)

      member_opts = CouncilEx.Council.InlineMember.member_opts(spec)
      @__council_members__ {unquote(id), member_module, member_opts}
    end
  end

  # Module-form with no inline opts: `member :id, MyMember`. Equivalent to
  # `member(id, module, [])`. Useful when runtime opts come from a profile.
  # The second argument is the module AST (alias or atom), not a list.
  defmacro member(id, module) when is_atom(id) and not is_list(module) do
    quote do
      @__council_members__ {unquote(id), unquote(module), []}
    end
  end

  # Sub-council member: `member :id, council: SubCouncil, input_mapper: fun, opts: [...]`.
  # See moduledoc for `CouncilEx.Members.SubCouncilAdapter`.
  defmacro member(id, opts)
           when is_atom(id) and is_list(opts) do
    cond do
      Keyword.has_key?(opts, :do) ->
        raise "member/2 inline form must use `do: block` syntax — got opts: #{inspect(opts)}"

      Keyword.has_key?(opts, :council) ->
        sub_council = Keyword.fetch!(opts, :council)
        sub_member_opts = Keyword.drop(opts, [:council])

        quote do
          shim =
            CouncilEx.Members.SubCouncilAdapter.build(
              unquote(id),
              unquote(sub_council),
              unquote(sub_member_opts)
            )

          @__council_members__ {unquote(id), shim, [provider: :__sub_council__, model: "n/a"]}
        end

      true ->
        raise "member/2 keyword form requires `:council` key — got: #{inspect(opts)}"
    end
  end

  @doc "Declare a round by name (built-in) or by module."
  defmacro round(name_or_module, opts \\ []) do
    if CouncilEx.__opts_have_funs__?(opts) do
      # opts contain functions — can't be escaped into module attributes.
      # Generate a private zero-arity function in the council module whose body
      # contains the opts AST (including closures). Use caller line for uniqueness.
      # Store {:opts_fn, fun_name} in @__council_rounds__; __council__/0 resolves it.
      line = __CALLER__.line
      file_hash = :erlang.phash2(__CALLER__.file)
      fun_name = :"__round_opts_#{file_hash}_#{line}__"

      quote do
        @doc false
        def unquote(fun_name)(), do: unquote(opts)

        CouncilEx.__resolve_round_fn_ref__(
          __MODULE__,
          unquote(name_or_module),
          unquote(fun_name)
        )
      end
    else
      quote do
        CouncilEx.__resolve_round__(__MODULE__, unquote(name_or_module), unquote(opts))
      end
    end
  end

  @doc """
  Declare an adaptive router. Accepts a module implementing
  `CouncilEx.Router` or a 2-arity anonymous fn `fn input, ctx -> [member_id] end`.
  """
  defmacro router(spec) do
    quote do
      Module.put_attribute(__MODULE__, :__council_router__, unquote(spec))
    end
  end

  @doc "Declare the chair (final synthesizer)."
  defmacro chair(module, opts) do
    quote do
      chair_id = Keyword.get(unquote(opts), :id, :chair)
      chair_opts = Keyword.delete(unquote(opts), :id)

      Module.put_attribute(
        __MODULE__,
        :__council_chair__,
        {chair_id, unquote(module), chair_opts}
      )
    end
  end

  @doc false
  def __resolve_round__(module, name_or_module, opts) when is_atom(name_or_module) do
    builtins = __builtin_rounds__()

    entry =
      cond do
        Map.has_key?(builtins, name_or_module) ->
          {Map.fetch!(builtins, name_or_module), opts}

        Code.ensure_loaded?(name_or_module) and
            function_exported?(name_or_module, :prepare_input, 3) ->
          {name_or_module, opts}

        true ->
          raise ArgumentError,
                "Unknown round: #{inspect(name_or_module)}. Built-ins: #{inspect(Map.keys(builtins))}"
      end

    Module.put_attribute(module, :__council_rounds__, entry)
  end

  @doc false
  def __resolve_round_fn_ref__(module, name_or_module, fun_name) when is_atom(name_or_module) do
    builtins = __builtin_rounds__()

    round_mod =
      cond do
        Map.has_key?(builtins, name_or_module) ->
          Map.fetch!(builtins, name_or_module)

        Code.ensure_loaded?(name_or_module) and
            function_exported?(name_or_module, :prepare_input, 3) ->
          name_or_module

        true ->
          raise ArgumentError,
                "Unknown round: #{inspect(name_or_module)}. Built-ins: #{inspect(Map.keys(builtins))}"
      end

    Module.put_attribute(module, :__council_rounds__, {round_mod, {:opts_fn, fun_name}})
  end

  @doc false
  # Detects if the opts AST contains any function literals (fn ... end or & captures).
  # Called at macro expansion time so opts is AST, not a value.
  def __opts_have_funs__?(opts) when is_list(opts) do
    Enum.any?(opts, fn
      {_key, {:fn, _, _}} -> true
      {_key, {:&, _, _}} -> true
      _ -> false
    end)
  end

  def __opts_have_funs__?(_), do: false

  @doc false
  # Resolve a member's runtime opts by merging the chosen profile with the
  # inline opts declared on the member line. Sub-council shim members (which
  # hardcode `provider: :__sub_council__`) bypass profile resolution entirely.
  def __resolve_member_opts__(_member_id, opts, council_default)
      when is_list(opts) do
    if Keyword.get(opts, :provider) == :__sub_council__ do
      opts
    else
      CouncilEx.Profile.resolve(opts, council_default)
    end
  end

  @doc false
  def __finalize_rounds__(rounds, nil), do: rounds

  def __finalize_rounds__(rounds, _chair) do
    chair_round_mods = [CouncilEx.Rounds.Synthesis, CouncilEx.Rounds.WeightedSynthesis]

    if Enum.any?(rounds, fn {mod, _opts} -> mod in chair_round_mods end) do
      rounds
    else
      rounds ++ [{CouncilEx.Rounds.Synthesis, []}]
    end
  end
end