Skip to main content

lib/cmdc_rag_arcana/pipeline.ex

defmodule CMDCRAGArcana.Pipeline do
  @moduledoc """
  Arcana Pipeline 的企业治理入口。

  本模块执行 `PipelinePreset` 而不是执行 Agent 动态拼出来的 step。调用方需要
  提供 `preset_id`、collection scope 和 ACL 上下文。
  """

  alias CMDCRAGArcana.{Config, SearchResult}
  alias CMDCRAGArcana.Pipeline.{Preset, Registry, RunSummary}

  @doc "按 preset 执行 Arcana Pipeline answer。"
  @spec answer(String.t(), keyword()) :: {:ok, SearchResult.t()} | {:error, term()}
  def answer(question, opts) when is_binary(question) and is_list(opts) do
    config = Config.new!(opts)

    with {:ok, preset} <- Registry.resolve(config),
         {:ok, preflight} <- Registry.preflight(preset, config) do
      config = %{config | collections: preflight.collections}

      :telemetry.span(
        [:cmdc_rag_arcana, :pipeline_answer],
        start_metadata(question, preset, config),
        fn ->
          result =
            config.pipeline_runner.run(question, preset, config)
            |> normalize_runner_result(question, preset, config)
            |> apply_fail_policy(preset, config)

          {result, stop_metadata(result, preset, config)}
        end
      )
    end
  rescue
    e -> {:error, {:pipeline_exception, Exception.message(e)}}
  end

  @doc "返回 preset 描述,供 Phoenix AgentOps 发布审批和 Diff 使用。"
  @spec describe_pipeline_preset(String.t(), keyword()) :: {:ok, map()} | {:error, term()}
  def describe_pipeline_preset(preset_id, opts \\ []) when is_binary(preset_id) do
    opts
    |> Config.new!()
    |> Registry.describe(preset_id)
  end

  @doc "执行 preset preflight,不调用 Arcana。"
  @spec preflight(String.t(), keyword()) :: {:ok, map()} | {:error, term()}
  def preflight(preset_id, opts \\ []) when is_binary(preset_id) do
    config = Config.new!(Keyword.put(opts, :preset_id, preset_id))

    with {:ok, preset} <- Registry.resolve(config) do
      Registry.preflight(preset, config)
    end
  end

  defp normalize_runner_result({:error, reason}, _question, _preset, _config),
    do: {:error, reason}

  defp normalize_runner_result({:ok, %SearchResult{} = result}, _question, preset, config) do
    {:ok, put_summary(result, preset, config, %{})}
  end

  defp normalize_runner_result({:ok, raw}, question, preset, config) when is_map(raw) do
    context = List.wrap(field(raw, :context) || field(raw, :chunks) || [])
    answer = field(raw, :answer)

    result =
      question
      |> SearchResult.from_answer(answer || "", context, config)
      |> Map.put(:answer, answer)
      |> put_summary(preset, config, raw)

    {:ok, result}
  end

  defp normalize_runner_result(other, _question, _preset, _config) do
    {:error, {:invalid_pipeline_runner_result, other}}
  end

  defp apply_fail_policy({:error, _reason} = error, _preset, _config), do: error

  defp apply_fail_policy({:ok, %SearchResult{} = result}, %Preset{} = preset, _config) do
    violations = violations(result, preset)

    if violations == [] do
      {:ok, result}
    else
      summary = pipeline_summary(result)

      case preset.fail_mode do
        :block ->
          {:error, {:pipeline_blocked, violations, %{summary | blocked?: true}}}

        :search_only ->
          {:ok,
           result
           |> Map.put(:answer, nil)
           |> put_pipeline_summary(%{summary | degraded?: true, warnings: violations})}

        :answer_with_warning ->
          {:ok, put_pipeline_summary(result, %{summary | warnings: violations})}

        :needs_review ->
          {:ok,
           put_pipeline_summary(result, %{summary | review_required?: true, warnings: violations})}
      end
    end
  end

  defp violations(%SearchResult{} = result, %Preset{} = preset) do
    []
    |> maybe_add_missing_citations(result, preset)
    |> maybe_add_grounding_violation(result, preset)
    |> Enum.reverse()
  end

  defp maybe_add_missing_citations(violations, %SearchResult{citations: []}, %Preset{
         require_citations?: true
       }) do
    [:missing_citations | violations]
  end

  defp maybe_add_missing_citations(violations, _result, _preset), do: violations

  defp maybe_add_grounding_violation(violations, _result, %Preset{min_grounding_score: nil}) do
    violations
  end

  defp maybe_add_grounding_violation(violations, result, %Preset{min_grounding_score: min_score}) do
    score =
      get_in(result.metadata, [:grounding, :score]) ||
        get_in(result.metadata, ["grounding", "score"])

    cond do
      is_nil(score) ->
        [{:missing_grounding_score, min_score} | violations]

      score < min_score ->
        [{:grounding_below_threshold, score, min_score} | violations]

      true ->
        violations
    end
  end

  defp put_summary(%SearchResult{} = result, %Preset{} = preset, %Config{} = config, raw) do
    grounding = field(raw, :grounding)
    grounding_score = grounding_score(grounding)
    step_metadata = field(raw, :step_summary) || field(raw, :steps) || %{}

    summary = %RunSummary{
      preset_id: preset.id,
      steps: Enum.map(preset.steps, & &1.name),
      collections: field(raw, :collections) || config.collections,
      fail_mode: preset.fail_mode,
      grounding_score: grounding_score,
      citation_count: length(result.citations),
      result_count: length(result.results),
      metadata: %{steps: step_metadata}
    }

    result
    |> put_metadata(:grounding, normalize_grounding(grounding))
    |> put_pipeline_summary(summary)
  end

  defp put_pipeline_summary(%SearchResult{} = result, %RunSummary{} = summary) do
    result
    |> put_metadata(:pipeline_run_summary, RunSummary.to_map(summary))
    |> put_metadata(:pipeline, RunSummary.to_map(summary))
  end

  defp pipeline_summary(%SearchResult{} = result) do
    summary_map = metadata_value(result.metadata, :pipeline_run_summary, %{})

    %RunSummary{
      preset_id: to_string(metadata_value(summary_map, :preset_id, nil)),
      steps: metadata_value(summary_map, :steps, []),
      collections: metadata_value(summary_map, :collections, []),
      fail_mode: metadata_value(summary_map, :fail_mode, nil),
      grounding_score: metadata_value(summary_map, :grounding_score, nil),
      citation_count: metadata_value(summary_map, :citation_count, 0),
      result_count: metadata_value(summary_map, :result_count, 0),
      metadata: metadata_value(summary_map, :metadata, %{})
    }
  end

  defp metadata_value(map, key, default) when is_map(map) and is_atom(key) do
    Map.get(map, key) || Map.get(map, Atom.to_string(key)) || default
  end

  defp put_metadata(%SearchResult{metadata: metadata} = result, key, nil) do
    %{result | metadata: Map.put(metadata, key, nil)}
  end

  defp put_metadata(%SearchResult{metadata: metadata} = result, key, value) do
    %{result | metadata: Map.put(metadata, key, value)}
  end

  defp normalize_grounding(nil), do: nil

  defp normalize_grounding(%{} = grounding) do
    %{
      score: field(grounding, :score),
      hallucinated_span_count: length(List.wrap(field(grounding, :hallucinated_spans))),
      faithful_span_count: length(List.wrap(field(grounding, :faithful_spans)))
    }
  end

  defp normalize_grounding(_grounding), do: nil

  defp grounding_score(nil), do: nil
  defp grounding_score(%{} = grounding), do: field(grounding, :score)
  defp grounding_score(_grounding), do: nil

  defp start_metadata(question, %Preset{} = preset, %Config{} = config) do
    %{
      query: question,
      preset_id: preset.id,
      tenant_id: config.tenant_id,
      user_id: config.user_id,
      collections: config.collections,
      risk_level: config.risk_level,
      use_case: config.use_case
    }
  end

  defp stop_metadata({:ok, %SearchResult{} = result}, %Preset{} = preset, %Config{} = config) do
    %{
      preset_id: preset.id,
      tenant_id: config.tenant_id,
      collections: config.collections,
      result_count: length(result.results),
      citation_count: length(result.citations),
      grounding_score: get_in(result.metadata, [:grounding, :score])
    }
  end

  defp stop_metadata({:error, reason}, %Preset{} = preset, %Config{} = config) do
    %{
      preset_id: preset.id,
      tenant_id: config.tenant_id,
      collections: config.collections,
      error: inspect(reason)
    }
  end

  defp field(map, key) when is_map(map) and is_atom(key) do
    Map.get(map, key) || Map.get(map, Atom.to_string(key))
  end
end