defmodule CouncilEx do
@moduledoc """
Multi-model LLM council workflows for Elixir.
## Usage
defmodule MyCouncil do
use CouncilEx
member :optimist, MyApp.Members.Optimist, provider: :open_router, model: "openai/gpt-4o-mini"
member :skeptic, MyApp.Members.Skeptic, provider: :open_router, model: "anthropic/claude-3.5-sonnet"
round :independent_analysis
chair MyApp.Members.Synthesizer, id: :chair, provider: :open_router, model: "openai/gpt-4o"
end
{:ok, result} = CouncilEx.run(MyCouncil, %{question: "go or wait?"})
"""
alias CouncilEx.{PubSub, Result}
alias CouncilEx.RunServer
alias CouncilEx.Runner.RecorderProcess
@builtin_rounds %{
independent_analysis: CouncilEx.Rounds.IndependentAnalysis,
synthesis: CouncilEx.Rounds.Synthesis,
peer_review: CouncilEx.Rounds.PeerReview,
anonymized_peer_review: CouncilEx.Rounds.AnonymizedPeerReview,
critique: CouncilEx.Rounds.Critique,
ranking: CouncilEx.Rounds.Ranking,
vote: CouncilEx.Rounds.Vote,
iterate: CouncilEx.Rounds.Iterate,
pairwise_elimination: CouncilEx.Rounds.PairwiseElimination
}
@doc """
Start an async run; returns `{:ok, pid}` immediately. Get the run id
from the pid with `CouncilEx.RunServer.run_id/1`.
## Options
* `:verbose` — `true | :debug | false` (default `false`). When set,
attaches a `CouncilEx.Verbose` tracer subscribed to the run's topic
that prints a timeline to stdout. `true` = timeline + duration +
tokens. `:debug` = also dumps response bodies (truncated). The
tracer auto-detaches on terminal event.
* `:verbose_io` — IO device for the tracer (default `:stdio`).
* `:subscribe` — `true | false` (default `false`). When `true`, subscribes
the calling process to the run's PubSub topic BEFORE the RunServer is
started, guaranteeing no events are missed. Without this, callers must
call `CouncilEx.PubSub.subscribe/1` themselves AFTER `start/3`
returns, which races against the RunServer's continue callback under
load. Recommended for any caller that needs the full event timeline.
* `:registry` — name of a `Registry` to register the run under.
Defaults to the bundled `CouncilEx.Runner.Registry`. Override
only when you need tenant-level isolation (so two tenants can't
collide on `run_id`). The caller MUST start the registry first:
`{Registry, keys: :unique, name: MyApp.RunReg}`. Lookup APIs
(`await/2`, `cancel/2`, `terminate_run/2`, `pid_for/2`,
`list_active_runs/1`) take the same `:registry` opt and must be
passed the same name. Most apps should leave this alone — `run_id`
is already a strong unique key.
* `:relay_topics` — string topic or list of topics. Every PubSub
event broadcast for this run is also published to each of these
topics. Use for cross-run aggregation (one app-wide topic that
receives every event from every council run, no aggregator
process needed).
* `:run_id` — explicit run id (binary). When omitted, the runtime
generates one. Supply this when an external system has already
assigned an identity to the run (e.g. an Oban job that wants the
same id across retries, or a tenant that needs a domain-specific
naming scheme). Caller is responsible for uniqueness.
* `:recorder` — `{module, args}` tuple where `module` implements
`CouncilEx.Recorder`. The runtime spawns a recorder process
*before* the `RunServer` starts (so it subscribes to the run
topic in time for `:run_started`), then dispatches lifecycle
events to `module.handle_event/2`. The recorder exits after
`module.handle_finalize/2` is called with the terminal outcome.
A recorder crash does not kill the run.
"""
@spec start(module() | CouncilEx.DynamicCouncil.t(), term(), keyword()) ::
{:ok, pid()} | {:error, term()}
def start(council, input, opts \\ []), do: do_start(council, input, opts, :unlinked)
@doc """
Like `start/3`, but links the calling process to the run server
(`GenServer.start_link/3` semantics).
Caller-die kills the run; runner-die kills the caller (unless trapping
exits). Use when the caller owns the run's lifecycle — Oban workers,
tests, or any context where a leaked background run would burn tokens.
Returns `{:ok, pid}`. Use `CouncilEx.RunServer.run_id/1` if you
need the auto-generated run_id (e.g. for PubSub subscribe across
processes, or to persist as a durable handle).
"""
@spec start_link(module() | CouncilEx.DynamicCouncil.t(), term(), keyword()) ::
{:ok, pid()} | {:error, term()}
def start_link(council, input, opts \\ []), do: do_start(council, input, opts, :linked)
defp do_start(%CouncilEx.DynamicCouncil{} = council, input, opts, mode) do
case CouncilEx.DynamicCouncil.validate(council) do
:ok ->
spec = CouncilEx.DynamicCouncil.to_spec(council)
spawn_run_server("dynamic:" <> council.id, spec, input, opts, mode)
{:error, errors} ->
{:error, {:invalid_council, errors}}
end
end
defp do_start(council_module, input, opts, mode) when is_atom(council_module) do
case validate(council_module) do
:ok ->
spec = council_module.__council__()
spawn_run_server(council_module, spec, input, opts, mode)
{:error, errors} ->
{:error, {:invalid_council, errors}}
end
end
defp spawn_run_server(council_id, spec, input, opts, mode) do
run_id = Keyword.get(opts, :run_id) || generate_run_id()
verbose = Keyword.get(opts, :verbose, false)
subscribe? = Keyword.get(opts, :subscribe, false)
registry = Keyword.get(opts, :registry, CouncilEx.RunServer.default_registry())
recorder =
case Keyword.fetch(opts, :recorder) do
{:ok, value} -> value
:error -> CouncilEx.Config.recorder()
end
# Capture the caller chain here (in the calling process) so that the RunServer
# and its spawned tasks can walk back to find Mock scripts registered by test procs.
callers = [self() | Process.get(:"$callers", [])]
server_opts = [
run_id: run_id,
council: council_id,
spec: spec,
input: input,
user_opts: opts,
registry: registry,
callers: callers
]
# Spawn the recorder BEFORE the RunServer so it subscribes to the
# run topic in time to receive :run_started and every subsequent
# event without race.
recorder_pid = maybe_start_recorder(recorder, run_id)
# Subscribe the caller BEFORE starting the RunServer so that no events
# broadcast from init/1 or the immediately-following continue callback
# are missed. This eliminates a documented race where the caller would
# otherwise subscribe after start/3 returns — by which point the
# RunServer's handle_continue may have already broadcast :run_started
# and beyond.
if subscribe?, do: PubSub.subscribe(topic_for(run_id))
# Attach the verbose tracer BEFORE the RunServer starts so it does not
# miss the :run_started broadcast emitted from RunServer.init/1.
if verbose, do: maybe_attach_verbose(run_id, verbose, opts)
spawn_result =
case mode do
:linked -> RunServer.start_link(server_opts)
:unlinked -> RunServer.start(server_opts)
end
case spawn_result do
{:ok, pid} ->
# Tell the recorder which RunServer to monitor so a crash without
# a terminal broadcast still triggers handle_finalize/2.
if recorder_pid, do: RecorderProcess.set_run_pid(recorder_pid, pid)
{:ok, pid}
{:error, _} = err ->
# If the RunServer failed to start, tear the recorder down too —
# otherwise it sits subscribed forever waiting for events that
# will never arrive.
if recorder_pid && Process.alive?(recorder_pid) do
Process.exit(recorder_pid, :shutdown)
end
err
end
end
defp maybe_start_recorder(nil, _run_id), do: nil
defp maybe_start_recorder({module, args}, run_id) when is_atom(module) do
case RecorderProcess.start(run_id: run_id, module: module, args: args) do
{:ok, pid} ->
pid
# Recorder startup is best-effort — a recorder that refuses to init
# should not block the run. Surface via telemetry instead of crashing.
{:error, reason} ->
:telemetry.execute(
[:council_ex, :recorder, :start_failed],
%{system_time: System.system_time()},
%{run_id: run_id, module: module, reason: reason}
)
nil
end
end
defp maybe_attach_verbose(run_id, verbose, opts) do
mode = if verbose == :debug, do: :debug, else: true
io = Keyword.get(opts, :verbose_io, :stdio)
pid = CouncilEx.Verbose.attach(run_id, mode: mode, io: io)
# Stash the tracer pid so the synchronous run/3 path can wait for it
# to flush final writes after await/2 returns, avoiding IO races.
Process.put({:__council_verbose_pid__, run_id}, pid)
:ok
end
@doc "Subscribe the calling process to run events."
@spec subscribe(String.t()) :: :ok
def subscribe(run_id) when is_binary(run_id) do
PubSub.subscribe(topic_for(run_id))
end
@doc "Unsubscribe the calling process from run events."
@spec unsubscribe(String.t()) :: :ok
def unsubscribe(run_id) when is_binary(run_id) do
PubSub.unsubscribe(topic_for(run_id))
end
@doc """
Block waiting for the run to finish; returns the Result.
Accepts a runner pid (preferred — direct, no registry lookup needed
for the PubSub topic resolution) or a `run_id` binary (looks up the
pid via Registry; pass `:registry` for caller-owned registries).
"""
@spec await(pid() | String.t(), keyword()) ::
{:ok, Result.t()} | {:error, Result.t()} | {:error, :unknown_run | :timeout}
def await(pid_or_run_id, opts \\ [])
def await(pid, opts) when is_pid(pid) do
await(CouncilEx.RunServer.run_id(pid), opts)
end
def await(run_id, opts) when is_binary(run_id) do
timeout = Keyword.get(opts, :await_timeout, :infinity)
registry = Keyword.get(opts, :registry, CouncilEx.RunServer.default_registry())
topic = topic_for(run_id)
PubSub.subscribe(topic)
case Registry.lookup(registry, run_id) do
[] ->
# Either never started or already finished and deregistered.
# Try fetch_result; if process gone, give up.
PubSub.unsubscribe(topic)
{:error, :unknown_run}
[{_pid, _}] ->
do_await(run_id, timeout, registry)
end
end
defp do_await(run_id, timeout, registry) do
receive do
{:run_completed, ^run_id, %Result{} = r} -> {:ok, r}
{:run_failed, ^run_id, _errors, %Result{} = r} -> {:error, r}
after
0 ->
# No event in mailbox. Try fetch_result for race fallback.
case fetch_with_fallback(run_id, timeout, registry) do
:keep_waiting ->
receive do
{:run_completed, ^run_id, %Result{} = r} -> {:ok, r}
{:run_failed, ^run_id, _errors, %Result{} = r} -> {:error, r}
after
normalize_timeout(timeout) -> {:error, :timeout}
end
other ->
other
end
end
end
defp fetch_with_fallback(run_id, timeout, registry) do
try do
RunServer.fetch_result(run_id, normalize_timeout_for_call(timeout), registry: registry)
catch
:exit, {:noproc, _} -> :keep_waiting
:exit, {:timeout, _} -> :keep_waiting
end
end
defp normalize_timeout(:infinity), do: :infinity
defp normalize_timeout(t) when is_integer(t), do: t
defp normalize_timeout_for_call(:infinity), do: 60_000
defp normalize_timeout_for_call(t) when is_integer(t), do: t
@doc """
Cancel a running council cooperatively (the runner will finalize as
`:cancelled` after the in-flight round drains).
Accepts a runner pid (preferred — direct `GenServer.cast`) or a
`run_id` binary (looks up the pid via Registry).
Returns:
* `:ok` — cancel signal delivered.
* `{:error, :unknown_run}` — no entry in the registry (run_id form only).
* `{:error, :runner_dead}` — the registry holds a stale entry whose
pid is gone, OR the supplied pid is dead. Caller can fall through
to `terminate_run/2` or treat the run as already over.
This is a *cooperative* signal — for an immediate kill (e.g., the
run is wedged inside a non-responsive provider call), use
`terminate_run/2`.
"""
@spec cancel(pid() | String.t(), keyword()) ::
:ok | {:error, :unknown_run | :runner_dead}
def cancel(pid_or_run_id, opts \\ [])
def cancel(pid, _opts) when is_pid(pid) do
if Process.alive?(pid) do
GenServer.cast(pid, :cancel)
else
{:error, :runner_dead}
end
end
def cancel(run_id, opts) when is_binary(run_id) do
registry = Keyword.get(opts, :registry, CouncilEx.RunServer.default_registry())
case Registry.lookup(registry, run_id) do
[] -> {:error, :unknown_run}
[{pid, _}] -> cancel(pid, opts)
end
end
@doc """
Look up the runner pid for `run_id`.
Returns:
* `{:ok, pid()}` when the runner is alive.
* `{:error, :unknown_run}` when no registry entry exists.
* `{:error, :runner_dead}` when a stale entry points at a dead pid.
Useful for `Process.monitor/1` after the fact, or for asserting a run
is still alive before issuing a long blocking call.
"""
@spec pid_for(String.t(), keyword()) ::
{:ok, pid()} | {:error, :unknown_run | :runner_dead}
def pid_for(run_id, opts \\ []) when is_binary(run_id),
do: CouncilEx.RunServer.pid_for(run_id, opts)
@doc """
Forcefully terminate a run via `Process.exit/2`.
Looks up the runner pid by `run_id` and sends `:shutdown`. Returns
`:ok` on success or `{:error, :unknown_run}` if the run id has no
live registry entry.
Unlike `cancel/1`, this is non-cooperative: the run process is killed
immediately and no `:run_completed` / `:run_failed` event is emitted.
Use when:
* the runner is wedged and `cancel/1` won't be drained, or
* the host application is shutting down a tenant / session and
needs runs gone synchronously.
Subscribers should still receive a `:DOWN` message if they monitored
the runner pid (returned from `start/3` / `start_link/3`).
"""
@spec terminate_run(pid() | String.t(), keyword()) :: :ok | {:error, :unknown_run}
def terminate_run(pid_or_run_id, opts \\ [])
def terminate_run(pid, _opts) when is_pid(pid) do
if Process.alive?(pid), do: Process.exit(pid, :shutdown)
:ok
end
def terminate_run(run_id, opts) when is_binary(run_id) do
registry = Keyword.get(opts, :registry, CouncilEx.RunServer.default_registry())
case Registry.lookup(registry, run_id) do
[{pid, _}] -> terminate_run(pid, opts)
[] -> {:error, :unknown_run}
end
end
@doc """
Validate a council without running it. Returns `:ok` or
`{:error, [%{path, code, message}]}`.
Accepts a module-form council (atom) or a `%CouncilEx.DynamicCouncil{}`.
Dynamic councils delegate to `CouncilEx.DynamicCouncil.validate/1`.
Module-form councils walk the compiled spec and verify that every
member's resolved opts include `:provider` and `:model`, and that any
referenced provider is registered under `config :council_ex, :providers`.
`start/3` runs this validator first and returns
`{:error, {:invalid_council, errs}}` on failure — call this manually
when you want to surface structured errors before dispatch (e.g., from
a builder UI, before charging tokens or spawning processes).
Sub-council shim members (`provider: :__sub_council__`) are skipped;
their inner councils are validated recursively when the sub-council
ref is itself a `%DynamicCouncil{}`.
"""
@spec validate(module() | CouncilEx.DynamicCouncil.t()) ::
:ok | {:error, [map()]}
def validate(%CouncilEx.DynamicCouncil{} = c),
do: CouncilEx.DynamicCouncil.validate(c)
def validate(council_module) when is_atom(council_module) do
case build_module_spec(council_module) do
{:ok, spec} ->
errors =
[]
|> validate_module_members(spec)
|> validate_module_chair(spec)
|> Enum.reverse()
case errors do
[] -> :ok
errs -> {:error, errs}
end
{:error, _} = err ->
err
end
end
defp build_module_spec(mod) do
cond do
not Code.ensure_loaded?(mod) ->
{:error,
[
%{
path: [],
code: :not_a_council,
message: "module #{inspect(mod)} not loaded; cannot validate as a council"
}
]}
not function_exported?(mod, :__council__, 0) ->
{:error,
[
%{
path: [],
code: :not_a_council,
message: "#{inspect(mod)} does not export __council__/0; missing `use CouncilEx`?"
}
]}
true ->
try do
{:ok, mod.__council__()}
rescue
e ->
{:error,
[
%{
path: [],
code: :spec_build_failed,
message:
"council module #{inspect(mod)} failed to build its spec: " <>
Exception.message(e)
}
]}
end
end
end
defp validate_module_members(errs, %CouncilEx.Spec{members: members}) do
members
|> Enum.with_index()
|> Enum.reduce(errs, fn {{id, _mod, opts}, idx}, acc ->
validate_module_member_opts(acc, id, opts, ["members", idx])
end)
end
defp validate_module_chair(errs, %CouncilEx.Spec{chair: nil}), do: errs
defp validate_module_chair(errs, %CouncilEx.Spec{chair: {id, _mod, opts}}) do
validate_module_member_opts(errs, id, opts, ["chair"])
end
defp validate_module_member_opts(errs, id, opts, path) do
if Keyword.get(opts, :provider) == :__sub_council__ do
errs
else
errs
|> check_required_member_key(opts, :provider, id, path, :missing_provider)
|> check_required_member_key(opts, :model, id, path, :missing_model)
|> check_member_provider_registered(opts, id, path)
end
end
defp check_required_member_key(errs, opts, key, id, path, code) do
if Keyword.has_key?(opts, key) do
errs
else
[
%{
path: path,
code: code,
message:
"member #{inspect(id)} has no :#{key} after profile resolution; " <>
"set it via :profile, council :default_profile, or inline opts"
}
| errs
]
end
end
defp check_member_provider_registered(errs, opts, id, path) do
case Keyword.get(opts, :provider) do
nil ->
errs
:__sub_council__ ->
errs
provider when is_atom(provider) ->
case CouncilEx.Provider.resolve(provider) do
{:ok, _} ->
errs
{:error, :unknown_provider} ->
[
%{
path: path ++ ["provider"],
code: :unknown_provider,
message:
"member #{inspect(id)} references unregistered provider " <>
"#{inspect(provider)}; register under " <>
"`config :council_ex, :providers`"
}
| errs
]
end
other ->
[
%{
path: path ++ ["provider"],
code: :invalid_provider,
message:
"member #{inspect(id)} :provider must be an atom, got " <>
inspect(other)
}
| errs
]
end
end
@doc """
Run a council synchronously.
Accepts either a council module (compile-time DSL) or a
`%CouncilEx.DynamicCouncil{}` (data-only). For dynamic councils,
`Result.council` carries `"dynamic:" <> council.id`.
"""
@spec run(
module() | CouncilEx.DynamicCouncil.t() | CouncilEx.AutoCouncil.t(),
term(),
keyword()
) ::
{:ok, Result.t()} | {:error, Result.t() | term()}
def run(council, input, opts \\ [])
def run(%CouncilEx.AutoCouncil{} = auto, input, opts) do
with {:ok, decision, meta} <-
CouncilEx.AutoCouncil.resolve(auto, extract_prompt(input)) do
decision
|> decision_to_runnable()
|> do_sync_run(input, opts)
|> attach_auto_meta(meta)
end
end
def run(%CouncilEx.DynamicCouncil{} = council, input, opts) do
do_sync_run(council, input, opts)
end
def run(council_module, input, opts) when is_atom(council_module) do
do_sync_run(council_module, input, opts)
end
@doc """
Convenience wrapper that routes `input` through the configured default
`AutoCouncil` and runs it synchronously.
Default config lives under `:council_ex, :auto`:
config :council_ex, :auto,
strategy: :rules,
catalog: {:registry, :council},
on_no_match: :error
Per-call overrides are merged on top:
CouncilEx.auto(%{question: "..."}, strategy: :cascade, options: [chain: [:rules]])
The keyword list may mix `AutoCouncil` struct opts with run opts in the
same call. Struct opts (`:strategy`, `:catalog`, `:on_no_match`, `:name`,
`:options`, `:provider_check`) build the router; everything else is
forwarded to `run/3` — including `:verbose`, `:verbose_io`,
`:await_timeout`:
CouncilEx.auto(%{question: "audit my SEO"}, verbose: true)
See `CouncilEx.AutoCouncil` for the full options list. Inspect
`result.metadata.auto` to see which council was picked.
"""
@spec auto(term(), keyword()) :: {:ok, Result.t()} | {:error, Result.t() | term()}
def auto(input, overrides \\ []) do
cfg = Application.get_env(:council_ex, :auto, [])
case Keyword.merge(cfg, overrides) do
[] ->
{:error, :no_auto_config}
merged ->
{struct_opts, run_opts} = split_auto_opts(merged)
run(CouncilEx.AutoCouncil.new(struct_opts), input, run_opts)
end
end
@auto_struct_keys [:strategy, :catalog, :on_no_match, :name, :options, :provider_check]
defp split_auto_opts(opts), do: Keyword.split(opts, @auto_struct_keys)
defp decision_to_runnable({:static, mod}), do: mod
defp decision_to_runnable({:dynamic, %CouncilEx.DynamicCouncil{} = dc}), do: dc
defp decision_to_runnable({:built, %CouncilEx.DynamicCouncil{} = dc}), do: dc
defp attach_auto_meta({:ok, %Result{} = r}, meta), do: {:ok, put_auto_meta(r, meta)}
defp attach_auto_meta({:error, %Result{} = r}, meta), do: {:error, put_auto_meta(r, meta)}
defp attach_auto_meta(other, _meta), do: other
defp put_auto_meta(%Result{metadata: m} = r, meta),
do: %Result{r | metadata: Map.put(m || %{}, :auto, meta)}
defp extract_prompt(input) when is_binary(input), do: input
defp extract_prompt(%{question: q}) when is_binary(q), do: q
defp extract_prompt(%{prompt: p}) when is_binary(p), do: p
defp extract_prompt(other), do: inspect(other)
defp do_sync_run(council, input, opts) do
case start_link(council, input, opts) do
{:ok, pid} ->
run_id = RunServer.run_id(pid)
result = await(run_id, await_timeout: Keyword.get(opts, :await_timeout, :infinity))
wait_for_verbose_tracer(run_id)
result
err ->
err
end
end
# If a Verbose tracer was attached via verbose: opt, it stored its pid in
# the process dictionary keyed by run_id. After await/2 returns, wait
# briefly for the tracer task to flush its final writes and exit so that
# callers (especially tests) can safely close their IO sink without racing.
defp wait_for_verbose_tracer(run_id) do
case Process.delete({:__council_verbose_pid__, run_id}) do
pid when is_pid(pid) ->
ref = Process.monitor(pid)
receive do
{:DOWN, ^ref, :process, ^pid, _} -> :ok
after
1_000 ->
Process.demonitor(ref, [:flush])
:ok
end
_ ->
:ok
end
end
@typedoc """
Stable curated summary of an active (or recently completed) run, as
returned by `list_active_runs/0`.
"""
@type run_summary :: %{
run_id: binary(),
council: module(),
status: :pending | :running | :completed | :failed | :cancelled,
current_round: %{name: atom(), idx: non_neg_integer()} | nil,
rounds_completed: non_neg_integer(),
started_at: DateTime.t()
}
@doc """
List currently active (or recently completed but not yet reaped) runs as
curated summaries. Slow or dead runs are skipped without blocking.
Options:
* `:registry` — registry to scan; defaults to the bundled
`CouncilEx.Runner.Registry`. Pass your own when using a
caller-owned `Registry` (e.g. for tenant isolation, where each
tenant has its own registry).
"""
@spec list_active_runs(keyword()) :: [run_summary()]
def list_active_runs(opts \\ []) do
registry = Keyword.get(opts, :registry, CouncilEx.RunServer.default_registry())
registry
|> Registry.select([{{:"$1", :_, :_}, [], [:"$1"]}])
|> Task.async_stream(&RunServer.state(&1, registry: registry),
timeout: 1_000,
on_timeout: :kill_task
)
|> Stream.flat_map(fn
{:ok, {:ok, s}} -> [s]
_ -> []
end)
|> Enum.to_list()
end
defp topic_for(run_id), do: "council_ex:run:#{run_id}"
defp generate_run_id do
bytes = :crypto.strong_rand_bytes(12)
"run-" <> Base.url_encode64(bytes, padding: false)
end
@doc false
def __builtin_rounds__, do: @builtin_rounds
defmacro __using__(_opts) do
quote do
import Kernel, except: [round: 1]
import CouncilEx,
only: [
member: 2,
member: 3,
round: 1,
round: 2,
chair: 2,
router: 1,
default_profile: 1
]
Module.register_attribute(__MODULE__, :__council_members__, accumulate: true)
Module.register_attribute(__MODULE__, :__council_rounds__, accumulate: true)
Module.register_attribute(__MODULE__, :__council_chair__, accumulate: false)
Module.register_attribute(__MODULE__, :__council_router__, accumulate: false)
Module.register_attribute(__MODULE__, :__council_default_profile__, accumulate: false)
@before_compile CouncilEx
end
end
defmacro __before_compile__(_env) do
quote do
def __council__ do
default_profile = @__council_default_profile__
members =
@__council_members__
|> Enum.reverse()
|> Enum.map(fn {id, mod, opts} ->
{id, mod, CouncilEx.__resolve_member_opts__(id, opts, default_profile)}
end)
declared_rounds =
@__council_rounds__
|> Enum.reverse()
|> Enum.map(fn
{mod, {:opts_fn, fun_name}} -> {mod, apply(__MODULE__, fun_name, [])}
{mod, opts} -> {mod, opts}
end)
chair =
case @__council_chair__ do
nil ->
nil
{chair_id, chair_mod, chair_opts} ->
{chair_id, chair_mod,
CouncilEx.__resolve_member_opts__(chair_id, chair_opts, default_profile)}
end
router = @__council_router__
rounds = CouncilEx.__finalize_rounds__(declared_rounds, chair)
%CouncilEx.Spec{
council: __MODULE__,
members: members,
rounds: rounds,
chair: chair,
router: router,
opts: []
}
end
@doc """
MapSet of providers this council references via its members + chair.
Sentinels (`:__sub_council__`) and `nil` are filtered out. Sub-council
members surface their inner providers — see
`CouncilEx.AutoCouncil.Providers` for the recursion details.
"""
def __providers__ do
CouncilEx.AutoCouncil.Providers.from_spec(__council__())
end
end
end
@doc "Declare the default profile for members in this council."
defmacro default_profile(profile_module) do
quote do
Module.put_attribute(
__MODULE__,
:__council_default_profile__,
unquote(profile_module)
)
end
end
@doc "Declare a member in module form."
defmacro member(id, module, opts) when is_atom(id) do
quote do
@__council_members__ {unquote(id), unquote(module), unquote(opts)}
end
end
@doc "Declare a member with an inline block."
defmacro member(id, do: block) when is_atom(id) do
attr_name = :"__inline_member_spec_#{id}__"
quote do
Module.register_attribute(__MODULE__, unquote(attr_name), accumulate: true)
import CouncilEx.Council.InlineMember.Block,
only:
unquote(
Enum.map(CouncilEx.Council.InlineMember.__block_keys__(), &{&1, 1}) ++
[transform_input: 1, validate: 1]
)
Process.put({:__inline_member__, __MODULE__}, unquote(attr_name))
unquote(block)
Process.delete({:__inline_member__, __MODULE__})
spec = Module.get_attribute(__MODULE__, unquote(attr_name)) |> Enum.reverse()
Module.delete_attribute(__MODULE__, unquote(attr_name))
member_module =
CouncilEx.Council.InlineMember.build_module(__MODULE__, unquote(id), spec)
member_opts = CouncilEx.Council.InlineMember.member_opts(spec)
@__council_members__ {unquote(id), member_module, member_opts}
end
end
# Module-form with no inline opts: `member :id, MyMember`. Equivalent to
# `member(id, module, [])`. Useful when runtime opts come from a profile.
# The second argument is the module AST (alias or atom), not a list.
defmacro member(id, module) when is_atom(id) and not is_list(module) do
quote do
@__council_members__ {unquote(id), unquote(module), []}
end
end
# Sub-council member: `member :id, council: SubCouncil, input_mapper: fun, opts: [...]`.
# See moduledoc for `CouncilEx.Members.SubCouncilAdapter`.
defmacro member(id, opts)
when is_atom(id) and is_list(opts) do
cond do
Keyword.has_key?(opts, :do) ->
raise "member/2 inline form must use `do: block` syntax — got opts: #{inspect(opts)}"
Keyword.has_key?(opts, :council) ->
sub_council = Keyword.fetch!(opts, :council)
sub_member_opts = Keyword.drop(opts, [:council])
quote do
shim =
CouncilEx.Members.SubCouncilAdapter.build(
unquote(id),
unquote(sub_council),
unquote(sub_member_opts)
)
@__council_members__ {unquote(id), shim, [provider: :__sub_council__, model: "n/a"]}
end
true ->
raise "member/2 keyword form requires `:council` key — got: #{inspect(opts)}"
end
end
@doc "Declare a round by name (built-in) or by module."
defmacro round(name_or_module, opts \\ []) do
if CouncilEx.__opts_have_funs__?(opts) do
# opts contain functions — can't be escaped into module attributes.
# Generate a private zero-arity function in the council module whose body
# contains the opts AST (including closures). Use caller line for uniqueness.
# Store {:opts_fn, fun_name} in @__council_rounds__; __council__/0 resolves it.
line = __CALLER__.line
file_hash = :erlang.phash2(__CALLER__.file)
fun_name = :"__round_opts_#{file_hash}_#{line}__"
quote do
@doc false
def unquote(fun_name)(), do: unquote(opts)
CouncilEx.__resolve_round_fn_ref__(
__MODULE__,
unquote(name_or_module),
unquote(fun_name)
)
end
else
quote do
CouncilEx.__resolve_round__(__MODULE__, unquote(name_or_module), unquote(opts))
end
end
end
@doc """
Declare an adaptive router. Accepts a module implementing
`CouncilEx.Router` or a 2-arity anonymous fn `fn input, ctx -> [member_id] end`.
"""
defmacro router(spec) do
quote do
Module.put_attribute(__MODULE__, :__council_router__, unquote(spec))
end
end
@doc "Declare the chair (final synthesizer)."
defmacro chair(module, opts) do
quote do
chair_id = Keyword.get(unquote(opts), :id, :chair)
chair_opts = Keyword.delete(unquote(opts), :id)
Module.put_attribute(
__MODULE__,
:__council_chair__,
{chair_id, unquote(module), chair_opts}
)
end
end
@doc false
def __resolve_round__(module, name_or_module, opts) when is_atom(name_or_module) do
builtins = __builtin_rounds__()
entry =
cond do
Map.has_key?(builtins, name_or_module) ->
{Map.fetch!(builtins, name_or_module), opts}
Code.ensure_loaded?(name_or_module) and
function_exported?(name_or_module, :prepare_input, 3) ->
{name_or_module, opts}
true ->
raise ArgumentError,
"Unknown round: #{inspect(name_or_module)}. Built-ins: #{inspect(Map.keys(builtins))}"
end
Module.put_attribute(module, :__council_rounds__, entry)
end
@doc false
def __resolve_round_fn_ref__(module, name_or_module, fun_name) when is_atom(name_or_module) do
builtins = __builtin_rounds__()
round_mod =
cond do
Map.has_key?(builtins, name_or_module) ->
Map.fetch!(builtins, name_or_module)
Code.ensure_loaded?(name_or_module) and
function_exported?(name_or_module, :prepare_input, 3) ->
name_or_module
true ->
raise ArgumentError,
"Unknown round: #{inspect(name_or_module)}. Built-ins: #{inspect(Map.keys(builtins))}"
end
Module.put_attribute(module, :__council_rounds__, {round_mod, {:opts_fn, fun_name}})
end
@doc false
# Detects if the opts AST contains any function literals (fn ... end or & captures).
# Called at macro expansion time so opts is AST, not a value.
def __opts_have_funs__?(opts) when is_list(opts) do
Enum.any?(opts, fn
{_key, {:fn, _, _}} -> true
{_key, {:&, _, _}} -> true
_ -> false
end)
end
def __opts_have_funs__?(_), do: false
@doc false
# Resolve a member's runtime opts by merging the chosen profile with the
# inline opts declared on the member line. Sub-council shim members (which
# hardcode `provider: :__sub_council__`) bypass profile resolution entirely.
def __resolve_member_opts__(_member_id, opts, council_default)
when is_list(opts) do
if Keyword.get(opts, :provider) == :__sub_council__ do
opts
else
CouncilEx.Profile.resolve(opts, council_default)
end
end
@doc false
def __finalize_rounds__(rounds, nil), do: rounds
def __finalize_rounds__(rounds, _chair) do
chair_round_mods = [CouncilEx.Rounds.Synthesis, CouncilEx.Rounds.WeightedSynthesis]
if Enum.any?(rounds, fn {mod, _opts} -> mod in chair_round_mods end) do
rounds
else
rounds ++ [{CouncilEx.Rounds.Synthesis, []}]
end
end
end