Skip to main content

lib/scoria/eval/review_queue.ex

defmodule Scoria.Eval.ReviewQueue do
  @moduledoc false

  import Ecto.Query, warn: false

  alias Scoria.Eval
  alias Scoria.Eval.DatasetPromotion
  alias Scoria.Eval.OnlineScoreCandidate
  alias Scoria.Observe.Approval
  alias Scoria.Repo
  alias Scoria.Workflows.Run

  @default_filters %{
    review_status: "pending"
  }

  def list_candidates(filters \\ %{}) do
    filters = normalize_filters(filters)

    OnlineScoreCandidate
    |> apply_filters(filters)
    |> order_by([candidate], asc: candidate.inserted_at, asc: candidate.id)
    |> Repo.all()
    |> Enum.map(&project_summary/1)
    |> sort_rows()
  end

  def summary(filters \\ %{}) do
    rows = list_candidates(filters)

    %{
      total_flagged: length(rows),
      low_quality_count: Enum.count(rows, &(&1.severity == "low_quality")),
      policy_triggered_count: Enum.count(rows, &(&1.severity == "policy_triggered")),
      promotion_candidate_count: Enum.count(rows, &(&1.status == "promotion_candidate"))
    }
  end

  def get_candidate(nil), do: nil

  def get_candidate(candidate_id) do
    case Repo.get(OnlineScoreCandidate, candidate_id) do
      nil -> nil
      candidate -> project_detail(candidate)
    end
  end

  def dismiss_candidate(candidate_id) do
    candidate = Repo.get!(OnlineScoreCandidate, candidate_id)

    candidate
    |> OnlineScoreCandidate.changeset(%{
      status: "dismissed",
      review_status: "dismissed",
      reviewed_at: now()
    })
    |> Repo.update()
    |> case do
      {:ok, updated} -> {:ok, project_detail(updated)}
      error -> error
    end
  end

  def promote_candidate(candidate_id, attrs) when is_map(attrs) do
    candidate = Repo.get!(OnlineScoreCandidate, candidate_id)
    detail = project_detail(candidate)
    dataset_id = attrs["dataset_id"] || attrs[:dataset_id]
    notes = attrs["notes"] || attrs[:notes] || ""
    expected_output = attrs["expected_output"] || attrs[:expected_output] || %{}

    promotion_attrs =
      DatasetPromotion.build_promotion_attrs(
        detail.promotion_context,
        dataset_id,
        notes,
        expected_output
      )

    with {:ok, item} <- Eval.promote_workflow_source(promotion_attrs),
         {:ok, updated} <- mark_promoted(candidate, item) do
      {:ok, project_detail(updated)}
    end
  end

  def request_baseline_approval(candidate_id, attrs) when is_map(attrs) do
    candidate = Repo.get!(OnlineScoreCandidate, candidate_id)
    detail = project_detail(candidate)
    dataset_id = attrs["dataset_id"] || attrs[:dataset_id]
    notes = attrs["notes"] || attrs[:notes] || ""
    expected_output = attrs["expected_output"] || attrs[:expected_output] || %{}

    request_attrs =
      DatasetPromotion.build_promotion_attrs(
        detail.promotion_context,
        dataset_id,
        notes,
        expected_output
      )

    with {:ok, approval} <- Scoria.Workflows.request_baseline_promotion(request_attrs),
         {:ok, updated} <- mark_approval_requested(candidate, approval) do
      {:ok, project_detail(updated)}
    end
  end

  defp apply_filters(query, filters) do
    Enum.reduce(filters, query, fn
      {_key, nil}, query ->
        query

      {:review_status, nil}, query ->
        where(query, [candidate], candidate.review_status not in ["dismissed", "promoted"])

      {:review_status, review_status}, query ->
        where(query, [candidate], candidate.review_status == ^review_status)

      {:severity, severity}, query ->
        severity_statuses = severity_statuses(severity)

        where(query, [candidate], candidate.status in ^severity_statuses)

      {:scorer_kind, scorer_kind}, query ->
        where(query, [candidate], candidate.scorer_kind == ^scorer_kind)

      {:promotion_state, promotion_state}, query ->
        where(query, [candidate], candidate.status == ^promotion_state)

      {_key, _value}, query ->
        query
    end)
  end

  defp normalize_filters(filters) when is_map(filters) do
    filters
    |> Enum.reduce(@default_filters, fn
      {key, value}, acc when is_binary(key) -> Map.put(acc, String.to_atom(key), blank_to_nil(value))
      {key, value}, acc -> Map.put(acc, key, blank_to_nil(value))
    end)
  rescue
    ArgumentError -> @default_filters
  end

  defp normalize_filters(_filters), do: @default_filters

  defp project_summary(candidate) do
    run = Repo.get(Run, candidate.workflow_run_id)
    severity = severity(candidate)
    sample_reason = map_value(candidate.sampling_metadata, "sample_reason")

    %{
      id: candidate.id,
      status: candidate.status,
      review_status: candidate.review_status,
      severity: severity,
      score: candidate.score,
      score_status: candidate.score_status,
      rationale: candidate.score_explanation,
      scorer_kind: candidate.scorer_kind,
      scorer_version: candidate.scorer_version,
      sample_reason: sample_reason,
      trace_id: candidate.trace_id,
      workflow_run_id: candidate.workflow_run_id,
      workflow_step_id: candidate.workflow_step_id,
      runtime_id: run && run.session_id,
      workflow_path: "/scoria/workflows/#{candidate.workflow_run_id}?review_candidate_id=#{candidate.id}",
      runtime_path: runtime_path(run && run.session_id, candidate.id),
      inserted_at: candidate.inserted_at,
      promotion_state: candidate.status,
      dataset_ref: promoted_dataset_ref(candidate)
    }
  end

  defp project_detail(candidate) do
    run = Repo.get(Run, candidate.workflow_run_id)
    approval = find_approval(candidate)

    %{
      id: candidate.id,
      status: candidate.status,
      review_status: candidate.review_status,
      severity: severity(candidate),
      score: candidate.score,
      score_status: candidate.score_status,
      rationale: candidate.score_explanation,
      scorer_kind: candidate.scorer_kind,
      scorer_version: candidate.scorer_version,
      judge_model: candidate.judge_model,
      rubric_version: candidate.rubric_version,
      trace_id: candidate.trace_id,
      workflow_run_id: candidate.workflow_run_id,
      workflow_step_id: candidate.workflow_step_id,
      runtime_id: run && run.session_id,
      workflow_path: "/scoria/workflows/#{candidate.workflow_run_id}?review_candidate_id=#{candidate.id}",
      runtime_path: runtime_path(run && run.session_id, candidate.id),
      sampling_provenance: candidate.sampling_metadata || %{},
      evidence_refs: candidate.evidence_refs || %{},
      promotion_snapshot: candidate.promotion_snapshot || %{},
      promotion_context: build_promotion_context(candidate, run),
      dataset_ref: promoted_dataset_ref(candidate),
      approval_lineage: approval && %{
        id: approval.id,
        status: approval.status,
        tool_name: approval.tool_name
      },
      inserted_at: candidate.inserted_at
    }
  end

  defp build_promotion_context(candidate, run) do
    source_variant = map_value(candidate.promotion_snapshot, "source_variant") || "original"
    replay_reason_code = map_value(candidate.promotion_snapshot, "replay_reason_code")
    replay_disposition = if(run && run.execution_mode == "replay", do: "blocked", else: nil)

    %{
      workflow_run_id: candidate.workflow_run_id,
      workflow_step_id: candidate.workflow_step_id,
      source_variant: source_variant,
      provenance: %{
        workflow_run_id: candidate.workflow_run_id,
        workflow_step_id: candidate.workflow_step_id,
        source_variant: source_variant,
        execution_mode: run && run.execution_mode || "live",
        source_run_id: run && run.source_run_id,
        source_checkpoint_id: run && run.source_checkpoint_id,
        replay_disposition: replay_disposition,
        replay_reason_code: replay_reason_code
      },
      checkpoint_output: %{
        "projected_context" => %{"review_candidate_id" => candidate.id},
        "recorded_outcome" => map_value(candidate.promotion_snapshot, "recorded_outcome")
      },
      safety: %{
        "replay_scope" => if(run && run.execution_mode == "replay", do: "replay_live", else: nil),
        "replay_disposition" => replay_disposition,
        "replay_reason_code" => replay_reason_code
      },
      promotion_snapshot: candidate.promotion_snapshot || %{},
      notes: "",
      expected_output: %{}
    }
  end

  defp sort_rows(rows) do
    Enum.sort_by(rows, fn row ->
      {severity_rank(row.severity), DateTime.to_unix(row.inserted_at, :microsecond), row.id}
    end)
  end

  defp severity(%OnlineScoreCandidate{} = candidate) do
    cond do
      map_value(candidate.sampling_metadata, "sample_reason") == "policy_trigger" -> "policy_triggered"
      candidate.status == "promotion_candidate" -> "promotion_candidate"
      candidate.score_status == "failed" or candidate.status == "needs_review" -> "low_quality"
      true -> "needs_review"
    end
  end

  defp severity_rank("policy_triggered"), do: 0
  defp severity_rank("low_quality"), do: 1
  defp severity_rank("promotion_candidate"), do: 2
  defp severity_rank(_severity), do: 3

  defp severity_statuses("policy_triggered"), do: ["needs_review"]
  defp severity_statuses("low_quality"), do: ["needs_review"]
  defp severity_statuses("promotion_candidate"), do: ["promotion_candidate"]
  defp severity_statuses(_severity), do: ["queued", "needs_review", "promotion_candidate", "approval_requested"]

  defp promoted_dataset_ref(candidate) do
    metadata = candidate.metadata || %{}

    case map_value(metadata, "promoted_dataset") do
      %{} = dataset -> dataset
      _ -> nil
    end
  end

  defp mark_promoted(candidate, item) do
    dataset = Eval.get_dataset!(item.dataset_id)
    metadata = (candidate.metadata || %{}) |> normalize_map()

    candidate
    |> OnlineScoreCandidate.changeset(%{
      status: "promoted",
      review_status: "promoted",
      promoted_at: now(),
      reviewed_at: now(),
      metadata:
        Map.put(metadata, "promoted_dataset", %{
          "dataset_id" => dataset.id,
          "dataset_name" => dataset.name,
          "dataset_version" => dataset.version,
          "dataset_item_id" => item.id
        })
    })
    |> Repo.update()
  end

  defp mark_approval_requested(candidate, approval) do
    metadata = (candidate.metadata || %{}) |> normalize_map()

    candidate
    |> OnlineScoreCandidate.changeset(%{
      status: "approval_requested",
      review_status: "in_review",
      reviewed_at: now(),
      metadata: Map.put(metadata, "approval_id", approval.id)
    })
    |> Repo.update()
  end

  defp find_approval(candidate) do
    Repo.one(
      from(approval in Approval,
        where:
          approval.workflow_run_id == ^candidate.workflow_run_id and
            approval.step_id == ^candidate.workflow_step_id and
            approval.tool_name == "dataset_baseline_promotion",
        order_by: [desc: approval.inserted_at, desc: approval.id],
        limit: 1
      )
    )
  end

  defp runtime_path(nil, candidate_id), do: "/scoria?review_candidate_id=#{candidate_id}"
  defp runtime_path(runtime_id, candidate_id), do: "/scoria?runtime=#{runtime_id}&review_candidate_id=#{candidate_id}"

  defp blank_to_nil(""), do: nil
  defp blank_to_nil(value), do: value

  defp map_value(map, key) when is_map(map) do
    Map.get(map, key, Map.get(map, String.to_atom(key), nil))
  rescue
    ArgumentError -> Map.get(map, key)
  end

  defp map_value(_map, _key), do: nil

  defp normalize_map(nil), do: %{}

  defp normalize_map(value) when is_map(value) do
    Map.new(value, fn {key, nested} ->
      {to_string(key), if(is_map(nested), do: normalize_map(nested), else: nested)}
    end)
  end

  defp normalize_map(_value), do: %{}

  defp now, do: DateTime.utc_now() |> DateTime.truncate(:microsecond)
end