defmodule CMDCRAGArcana.Pipeline do
@moduledoc """
Arcana Pipeline 的企业治理入口。
本模块执行 `PipelinePreset` 而不是执行 Agent 动态拼出来的 step。调用方需要
提供 `preset_id`、collection scope 和 ACL 上下文。
"""
alias CMDCRAGArcana.{Config, SearchResult}
alias CMDCRAGArcana.Pipeline.{Preset, Registry, RunSummary}
@doc "按 preset 执行 Arcana Pipeline answer。"
@spec answer(String.t(), keyword()) :: {:ok, SearchResult.t()} | {:error, term()}
def answer(question, opts) when is_binary(question) and is_list(opts) do
config = Config.new!(opts)
with {:ok, preset} <- Registry.resolve(config),
{:ok, preflight} <- Registry.preflight(preset, config) do
config = %{config | collections: preflight.collections}
:telemetry.span(
[:cmdc_rag_arcana, :pipeline_answer],
start_metadata(question, preset, config),
fn ->
result =
config.pipeline_runner.run(question, preset, config)
|> normalize_runner_result(question, preset, config)
|> apply_fail_policy(preset, config)
{result, stop_metadata(result, preset, config)}
end
)
end
rescue
e -> {:error, {:pipeline_exception, Exception.message(e)}}
end
@doc "返回 preset 描述,供 Phoenix AgentOps 发布审批和 Diff 使用。"
@spec describe_pipeline_preset(String.t(), keyword()) :: {:ok, map()} | {:error, term()}
def describe_pipeline_preset(preset_id, opts \\ []) when is_binary(preset_id) do
opts
|> Config.new!()
|> Registry.describe(preset_id)
end
@doc "执行 preset preflight,不调用 Arcana。"
@spec preflight(String.t(), keyword()) :: {:ok, map()} | {:error, term()}
def preflight(preset_id, opts \\ []) when is_binary(preset_id) do
config = Config.new!(Keyword.put(opts, :preset_id, preset_id))
with {:ok, preset} <- Registry.resolve(config) do
Registry.preflight(preset, config)
end
end
defp normalize_runner_result({:error, reason}, _question, _preset, _config),
do: {:error, reason}
defp normalize_runner_result({:ok, %SearchResult{} = result}, _question, preset, config) do
{:ok, put_summary(result, preset, config, %{})}
end
defp normalize_runner_result({:ok, raw}, question, preset, config) when is_map(raw) do
context = List.wrap(field(raw, :context) || field(raw, :chunks) || [])
answer = field(raw, :answer)
result =
question
|> SearchResult.from_answer(answer || "", context, config)
|> Map.put(:answer, answer)
|> put_summary(preset, config, raw)
{:ok, result}
end
defp normalize_runner_result(other, _question, _preset, _config) do
{:error, {:invalid_pipeline_runner_result, other}}
end
defp apply_fail_policy({:error, _reason} = error, _preset, _config), do: error
defp apply_fail_policy({:ok, %SearchResult{} = result}, %Preset{} = preset, _config) do
violations = violations(result, preset)
if violations == [] do
{:ok, result}
else
summary = pipeline_summary(result)
case preset.fail_mode do
:block ->
{:error, {:pipeline_blocked, violations, %{summary | blocked?: true}}}
:search_only ->
{:ok,
result
|> Map.put(:answer, nil)
|> put_pipeline_summary(%{summary | degraded?: true, warnings: violations})}
:answer_with_warning ->
{:ok, put_pipeline_summary(result, %{summary | warnings: violations})}
:needs_review ->
{:ok,
put_pipeline_summary(result, %{summary | review_required?: true, warnings: violations})}
end
end
end
defp violations(%SearchResult{} = result, %Preset{} = preset) do
[]
|> maybe_add_missing_citations(result, preset)
|> maybe_add_grounding_violation(result, preset)
|> Enum.reverse()
end
defp maybe_add_missing_citations(violations, %SearchResult{citations: []}, %Preset{
require_citations?: true
}) do
[:missing_citations | violations]
end
defp maybe_add_missing_citations(violations, _result, _preset), do: violations
defp maybe_add_grounding_violation(violations, _result, %Preset{min_grounding_score: nil}) do
violations
end
defp maybe_add_grounding_violation(violations, result, %Preset{min_grounding_score: min_score}) do
score =
get_in(result.metadata, [:grounding, :score]) ||
get_in(result.metadata, ["grounding", "score"])
cond do
is_nil(score) ->
[{:missing_grounding_score, min_score} | violations]
score < min_score ->
[{:grounding_below_threshold, score, min_score} | violations]
true ->
violations
end
end
defp put_summary(%SearchResult{} = result, %Preset{} = preset, %Config{} = config, raw) do
grounding = field(raw, :grounding)
grounding_score = grounding_score(grounding)
step_metadata = field(raw, :step_summary) || field(raw, :steps) || %{}
summary = %RunSummary{
preset_id: preset.id,
steps: Enum.map(preset.steps, & &1.name),
collections: field(raw, :collections) || config.collections,
fail_mode: preset.fail_mode,
grounding_score: grounding_score,
citation_count: length(result.citations),
result_count: length(result.results),
metadata: %{steps: step_metadata}
}
result
|> put_metadata(:grounding, normalize_grounding(grounding))
|> put_pipeline_summary(summary)
end
defp put_pipeline_summary(%SearchResult{} = result, %RunSummary{} = summary) do
result
|> put_metadata(:pipeline_run_summary, RunSummary.to_map(summary))
|> put_metadata(:pipeline, RunSummary.to_map(summary))
end
defp pipeline_summary(%SearchResult{} = result) do
summary_map = metadata_value(result.metadata, :pipeline_run_summary, %{})
%RunSummary{
preset_id: to_string(metadata_value(summary_map, :preset_id, nil)),
steps: metadata_value(summary_map, :steps, []),
collections: metadata_value(summary_map, :collections, []),
fail_mode: metadata_value(summary_map, :fail_mode, nil),
grounding_score: metadata_value(summary_map, :grounding_score, nil),
citation_count: metadata_value(summary_map, :citation_count, 0),
result_count: metadata_value(summary_map, :result_count, 0),
metadata: metadata_value(summary_map, :metadata, %{})
}
end
defp metadata_value(map, key, default) when is_map(map) and is_atom(key) do
Map.get(map, key) || Map.get(map, Atom.to_string(key)) || default
end
defp put_metadata(%SearchResult{metadata: metadata} = result, key, nil) do
%{result | metadata: Map.put(metadata, key, nil)}
end
defp put_metadata(%SearchResult{metadata: metadata} = result, key, value) do
%{result | metadata: Map.put(metadata, key, value)}
end
defp normalize_grounding(nil), do: nil
defp normalize_grounding(%{} = grounding) do
%{
score: field(grounding, :score),
hallucinated_span_count: length(List.wrap(field(grounding, :hallucinated_spans))),
faithful_span_count: length(List.wrap(field(grounding, :faithful_spans)))
}
end
defp normalize_grounding(_grounding), do: nil
defp grounding_score(nil), do: nil
defp grounding_score(%{} = grounding), do: field(grounding, :score)
defp grounding_score(_grounding), do: nil
defp start_metadata(question, %Preset{} = preset, %Config{} = config) do
%{
query: question,
preset_id: preset.id,
tenant_id: config.tenant_id,
user_id: config.user_id,
collections: config.collections,
risk_level: config.risk_level,
use_case: config.use_case
}
end
defp stop_metadata({:ok, %SearchResult{} = result}, %Preset{} = preset, %Config{} = config) do
%{
preset_id: preset.id,
tenant_id: config.tenant_id,
collections: config.collections,
result_count: length(result.results),
citation_count: length(result.citations),
grounding_score: get_in(result.metadata, [:grounding, :score])
}
end
defp stop_metadata({:error, reason}, %Preset{} = preset, %Config{} = config) do
%{
preset_id: preset.id,
tenant_id: config.tenant_id,
collections: config.collections,
error: inspect(reason)
}
end
defp field(map, key) when is_map(map) and is_atom(key) do
Map.get(map, key) || Map.get(map, Atom.to_string(key))
end
end