defmodule Scoria.Eval.OnlineScoring do
@moduledoc false
import Ecto.Query, warn: false
alias Ecto.Multi
alias Scoria.Eval
alias Scoria.Eval.OnlineScoreCandidate
alias Scoria.Eval.EvalRun
alias Scoria.Eval.EvalSpec
alias Scoria.Eval.EvalCampaignTarget
alias Scoria.Repo
alias Scoria.Repo.Trace
def enqueue_sampled_trace(attrs, opts \\ []) when is_map(attrs) do
payload = normalize_payload(attrs)
result =
case get_or_create_candidate(payload) do
{:ok, %OnlineScoreCandidate{campaign_id: campaign_id} = candidate, :existing}
when not is_nil(campaign_id) ->
{:ok, %{candidate: candidate, reused?: true, enqueued?: false}}
{:ok, %OnlineScoreCandidate{} = candidate, _source} ->
enqueue_candidate(candidate, payload)
{:error, _} = error ->
error
end
notify(opts, result)
result
end
def execute_candidate(
%EvalRun{} = eval_run,
%{
target: %EvalCampaignTarget{} = target,
eval_spec: %EvalSpec{} = eval_spec,
dataset: dataset
} = _context
) do
with {:ok, candidate} <- fetch_candidate(target),
{:ok, trace} <- fetch_trace(candidate.trace_id),
{:ok, deterministic_scores, terminal?} <- deterministic_scores(candidate, trace, dataset),
{:ok, result} <- maybe_run_judge(eval_run, eval_spec, dataset, target, deterministic_scores, terminal?),
{:ok, candidate} <- sync_candidate(candidate.id, result.scores) do
{:ok, Map.put(result, :candidate, candidate)}
end
end
defp enqueue_candidate(%OnlineScoreCandidate{} = candidate, payload) do
campaign_attrs = campaign_attrs(candidate, payload)
with {:ok, enqueue_result} <- Eval.create_and_enqueue_campaign(campaign_attrs),
[eval_run | _] = eval_runs <- enqueue_result.eval_runs,
{:ok, candidate} <- update_candidate_lineage(candidate, enqueue_result.campaign.id, eval_run.id) do
{:ok,
%{
candidate: candidate,
campaign: enqueue_result.campaign,
eval_runs: eval_runs,
reused?: false,
enqueued?: true
}}
end
end
defp get_or_create_candidate(payload) do
attrs = candidate_attrs(payload)
Multi.new()
|> Multi.insert(
:candidate,
OnlineScoreCandidate.changeset(%OnlineScoreCandidate{}, attrs)
)
|> Repo.transaction()
|> case do
{:ok, %{candidate: candidate}} ->
{:ok, candidate, :inserted}
{:error, :candidate, changeset, _changes} ->
if duplicate_candidate?(changeset) do
{:ok, fetch_existing_candidate!(payload), :existing}
else
{:error, changeset}
end
end
end
defp update_candidate_lineage(candidate, campaign_id, eval_run_id) do
candidate
|> OnlineScoreCandidate.changeset(%{
campaign_id: campaign_id,
eval_run_id: eval_run_id
})
|> Repo.update()
end
defp fetch_existing_candidate!(payload) do
Repo.one!(
from(candidate in OnlineScoreCandidate,
where:
candidate.tenant_id == ^payload.tenant_id and
candidate.dedupe_key == ^payload.dedupe_key and
candidate.review_status in ["pending", "in_review"],
order_by: [desc: candidate.inserted_at, desc: candidate.id],
limit: 1
)
)
end
defp campaign_attrs(candidate, payload) do
scorer = payload.scorer
%{
tenant_id: payload.tenant_id,
eval_spec_id: Map.fetch!(scorer, "eval_spec_id"),
metadata: %{
"source" => "online_scoring",
"candidate_id" => candidate.id,
"trace_id" => payload.trace_id,
"workflow_run_id" => payload.workflow_run_id,
"workflow_step_id" => payload.workflow_step_id,
"sampling_metadata" => payload.sampling_metadata
},
targets: [
%{
tenant_id: payload.tenant_id,
provider: Map.fetch!(scorer, "provider"),
model: Map.fetch!(scorer, "model"),
queue: Map.get(scorer, "queue", "evals"),
priority: Map.get(scorer, "priority", 1),
metadata: %{
"source" => "online_scoring",
"candidate_id" => candidate.id,
"trace_id" => payload.trace_id,
"workflow_run_id" => payload.workflow_run_id,
"workflow_step_id" => payload.workflow_step_id,
"dedupe_key" => payload.dedupe_key,
"sampling_metadata" => payload.sampling_metadata,
"scorer" => scorer,
"evidence_refs" => payload.evidence_refs,
"promotion_snapshot" => payload.promotion_snapshot
}
}
]
}
end
defp candidate_attrs(payload) do
%{
tenant_id: payload.tenant_id,
trace_id: payload.trace_id,
workflow_run_id: payload.workflow_run_id,
workflow_step_id: payload.workflow_step_id,
dedupe_key: payload.dedupe_key,
status: "queued",
review_status: "pending",
sampling_metadata: payload.sampling_metadata,
evidence_refs: payload.evidence_refs,
promotion_snapshot: payload.promotion_snapshot,
metadata: %{"scorer" => payload.scorer},
sampled_at: DateTime.utc_now() |> DateTime.truncate(:microsecond)
}
end
defp duplicate_candidate?(changeset) do
Enum.any?(changeset.errors, fn
{:dedupe_key, {_message, metadata}} -> metadata[:constraint] == :unique
_ -> false
end)
end
defp notify(opts, result) do
case Keyword.get(opts, :notify) do
pid when is_pid(pid) -> send(pid, {:online_scoring_result, result})
_ -> :ok
end
end
defp normalize_payload(attrs) do
%{
trace_id: fetch_attr!(attrs, :trace_id),
tenant_id: fetch_attr!(attrs, :tenant_id),
workflow_run_id: fetch_attr!(attrs, :workflow_run_id),
workflow_step_id: fetch_attr!(attrs, :workflow_step_id),
dedupe_key: fetch_attr!(attrs, :dedupe_key),
scorer: attrs |> fetch_attr!(:scorer) |> normalize_map(),
evidence_refs: attrs |> fetch_attr(:evidence_refs, %{}) |> normalize_map(),
promotion_snapshot: attrs |> fetch_attr(:promotion_snapshot, %{}) |> normalize_map(),
sampling_metadata: attrs |> fetch_attr(:sampling_metadata, %{}) |> normalize_map()
}
end
defp fetch_attr!(attrs, key) do
case fetch_attr(attrs, key) do
nil -> raise ArgumentError, "missing required online scoring attribute #{key}"
value -> value
end
end
defp fetch_attr(attrs, key, default \\ nil) when is_map(attrs) do
Map.get(attrs, key) || Map.get(attrs, Atom.to_string(key)) || default
end
defp normalize_map(nil), do: %{}
defp normalize_map(map) when is_map(map) do
Map.new(map, fn {key, value} ->
{to_string(key), normalize_value(value)}
end)
end
defp normalize_value(value) when is_map(value), do: normalize_map(value)
defp normalize_value(value) when is_list(value), do: Enum.map(value, &normalize_value/1)
defp normalize_value(value), do: value
defp fetch_candidate(%EvalCampaignTarget{} = target) do
case candidate_id(target) do
nil -> {:error, {:invalid_campaign_contract, :missing_online_score_candidate}}
candidate_id -> {:ok, Repo.get!(OnlineScoreCandidate, candidate_id)}
end
rescue
Ecto.NoResultsError -> {:error, {:invalid_campaign_contract, :missing_online_score_candidate}}
end
defp fetch_trace(trace_id) do
case Repo.get(Trace, trace_id) do
%Trace{} = trace -> {:ok, trace}
nil -> {:error, {:invalid_campaign_contract, :missing_online_score_trace}}
end
end
defp deterministic_scores(candidate, %Trace{} = trace, dataset) do
dataset_items = dataset.id |> Eval.list_dataset_items() |> Enum.sort_by(& &1.id)
sample_reason = Map.get(candidate.sampling_metadata || %{}, "sample_reason", "production_sample")
policy_triggered? = sample_reason == "policy_trigger"
trace_env = trace.attributes |> normalize_map() |> Map.get("env", "unknown")
score_status = if(policy_triggered?, do: "failed", else: "passed")
score_value = if(policy_triggered?, do: 0.0, else: 1.0)
explanation =
if policy_triggered? do
"Policy trigger requires operator review"
else
"Deterministic online checks passed for #{trace_env} trace"
end
score_attrs =
Enum.map(dataset_items, fn dataset_item ->
%{
dataset_item_id: dataset_item.id,
scorer_kind: "deterministic_rule",
scorer_version: "policy-rules@2026.05.23",
status: score_status,
score: score_value,
explanation: explanation,
rubric_version: "online-feedback-v1",
evidence_refs: evidence_refs(candidate, trace, sample_reason),
metadata: %{
"candidate_id" => candidate.id,
"sample_reason" => sample_reason,
"trace_env" => trace_env,
"latency_ms" => 0,
"cost_usd" => "0.0"
}
}
end)
{:ok, score_attrs, policy_triggered?}
end
defp maybe_run_judge(eval_run, _eval_spec, _dataset, _target, deterministic_scores, true) do
with {:ok, updated_run, scores} <- Eval.replace_eval_scores(eval_run, deterministic_scores),
{:ok, completed_run} <-
Eval.complete_eval_run(updated_run, %{
status: "completed",
duration_ms: 0,
threshold_verdict: threshold_verdict(scores)
}) do
{:ok, %{eval_run: completed_run, scores: scores}}
end
end
defp maybe_run_judge(eval_run, eval_spec, dataset, target, deterministic_scores, false) do
Scoria.Eval.JudgeRunner.run_existing(eval_run, %{
eval_spec: eval_spec,
dataset: dataset,
provider: target.provider,
model: target.model,
judge_provider: target.provider,
judge_model: target.model,
base_score_attrs: deterministic_scores
})
end
defp sync_candidate(candidate_id, scores) do
candidate = Repo.get!(OnlineScoreCandidate, candidate_id)
summary = summarize_scores(scores)
now = DateTime.utc_now() |> DateTime.truncate(:microsecond)
candidate
|> OnlineScoreCandidate.changeset(%{
status: summary.status,
review_status: summary.review_status,
score: summary.score,
score_status: summary.score_status,
score_explanation: summary.score_explanation,
scorer_kind: summary.scorer_kind,
scorer_version: summary.scorer_version,
judge_model: summary.judge_model,
rubric_version: summary.rubric_version,
reviewed_at: now
})
|> Repo.update()
end
defp summarize_scores(scores) do
failed_score = Enum.find(scores, &(&1.status == "failed"))
last_score = List.last(scores)
mean_score = scores |> Enum.map(& &1.score) |> average_score()
if failed_score do
%{
status: "needs_review",
review_status: "pending",
score: mean_score,
score_status: failed_score.status,
score_explanation: failed_score.explanation,
scorer_kind: failed_score.scorer_kind,
scorer_version: failed_score.scorer_version,
judge_model: failed_score.judge_model,
rubric_version: failed_score.rubric_version
}
else
%{
status: "promotion_candidate",
review_status: "pending",
score: mean_score,
score_status: last_score && last_score.status,
score_explanation: last_score && last_score.explanation,
scorer_kind: last_score && last_score.scorer_kind,
scorer_version: last_score && last_score.scorer_version,
judge_model: last_score && last_score.judge_model,
rubric_version: last_score && last_score.rubric_version
}
end
end
defp average_score([]), do: nil
defp average_score(scores), do: Enum.sum(scores) / length(scores)
defp threshold_verdict(scores) do
if Enum.all?(scores, &(&1.status == "passed")), do: "passed", else: "failed"
end
defp candidate_id(%EvalCampaignTarget{} = target) do
target.metadata
|> normalize_map()
|> Map.get("candidate_id")
end
defp evidence_refs(candidate, trace, sample_reason) do
candidate.evidence_refs
|> normalize_map()
|> Map.merge(%{
"candidate_id" => candidate.id,
"trace_id" => trace.id,
"workflow_run_id" => candidate.workflow_run_id,
"workflow_step_id" => candidate.workflow_step_id,
"sample_reason" => sample_reason
})
end
end