defmodule ScoriaWeb.OperatorSurface do
@moduledoc """
Read model shared by the operator dashboard pages (Live Ops, Approvals,
Connectors, Incidents).
Extracted from `ScoriaWeb.OrchestratorLive` so the slimmed Live Ops page and
the routed `/connectors` / `/incidents` pages query the same projections
instead of duplicating the fleet and SRE read logic. Functions are pure reads:
they take a tenant/trace/run scope and return plain maps ready for rendering.
"""
import Ecto.Query, warn: false
alias Decimal, as: D
alias Scoria.Connectors
alias Scoria.Connectors.Connector
alias Scoria.Eval.OnlineScoreCandidate
alias Scoria.Repo
alias Scoria.Runtime
alias Scoria.Workflows
alias Scoria.SRE.{
AlertEvent,
AuditOutboxEvent,
BreakerTrip,
BudgetReservation,
Incident,
IncidentEvent,
NotificationDelivery
}
# ── Fleet (runtimes + connectors) ──────────────────────────────────────────
@doc "Runtime posture rows for a tenant, annotated with live presence status."
def load_runtimes(tenant_id) do
presence_topic = "mcp:runtimes:#{tenant_id}"
presence_ids = ScoriaWeb.Presence.list(presence_topic) |> Map.keys()
instances =
Runtime.Instance
|> where(tenant_id: ^tenant_id)
|> order_by(desc: :last_seen_at)
|> limit(10)
|> Repo.all()
Enum.map(instances, fn inst ->
status = if inst.id in presence_ids, do: "online", else: "offline"
%{
id: inst.id,
status: status,
host_session_id: inst.host_session_id,
transport_kind: inst.transport_kind,
terminal_offline_reason: inst.terminal_offline_reason,
current_run_id: inst.current_run_id,
last_seen_at: inst.last_seen_at,
semantic: runtime_drawer_semantic(inst.current_run_id)
}
end)
end
def runtime_drawer_semantic(nil), do: nil
def runtime_drawer_semantic(run_id) do
detail = Runtime.get_run_detail!(run_id)
summary = detail.semantic_evidence[:summary] || %{}
provenance = detail.semantic_evidence[:provenance] || %{}
if map_size(summary) == 0 do
nil
else
%{
lookup_status: summary[:lookup_status],
fallback_outcome: summary[:fallback_outcome],
lane_key: summary[:lane_key],
scope_kind: summary[:scope_kind],
scope_reason: summary[:scope_reason],
reason_code: summary[:lookup_reason_code] || summary[:eligibility_reason_code],
actor_id: provenance[:actor_id],
workflow_href: "/workflows/#{run_id}",
origin_run_href: provenance[:origin_run_href]
}
end
rescue
_error -> nil
end
def connector_fleet(tenant_id) do
if function_exported?(Connectors, :list_connector_fleet, 1) do
Connectors.list_connector_fleet(%{tenant_id: tenant_id})
else
[]
end
end
def connector_drawer(connector_id) do
if function_exported?(Connectors, :get_connector_drawer, 1) do
Connectors.get_connector_drawer(connector_id)
else
nil
end
end
# ── Summary counts (Live Ops at-a-glance strips) ───────────────────────────
def pending_approval_count(tenant_id) do
Workflows.list_pending_remote_approvals(%{tenant_id: tenant_id}) |> length()
rescue
_error -> 0
end
def status_home_summary(tenant_id) do
connectors = connector_health_summary(tenant_id)
incidents = incidents_summary(tenant_id)
%{
approvals: %{pending: pending_approval_count(tenant_id)},
incidents: incidents,
connectors: connectors,
reviews: %{pending: pending_review_count(tenant_id)}
}
rescue
_error -> empty_status_home_summary()
end
def empty_status_home_summary do
%{
approvals: %{pending: 0},
incidents: %{open: 0, review: 0, page: 0},
connectors: %{total: 0, degraded: 0},
reviews: %{pending: 0}
}
end
defp connector_health_summary(tenant_id) do
base_query = where(Connector, [connector], connector.tenant_id == ^tenant_id)
degraded_query =
where(
base_query,
[connector],
connector.health_state not in ["healthy", "ok", "unknown"] or
connector.status in ["degraded", "error"]
)
%{
total: Repo.aggregate(base_query, :count),
degraded: Repo.aggregate(degraded_query, :count)
}
rescue
_error -> %{total: 0, degraded: 0}
end
def fleet_summary(tenant_id) do
runtimes = load_runtimes(tenant_id)
connectors = connector_fleet(tenant_id)
%{
runtimes_total: length(runtimes),
runtimes_online: Enum.count(runtimes, &(&1.status == "online")),
connectors_total: length(connectors),
connectors_degraded:
Enum.count(connectors, &(Map.get(&1, :health_state) not in ["healthy", "ok", nil]))
}
rescue
_error ->
%{runtimes_total: 0, runtimes_online: 0, connectors_total: 0, connectors_degraded: 0}
end
def incidents_summary(tenant_id) do
incidents = list_tenant_incidents(tenant_id)
%{
open: Enum.count(incidents, &(&1.status == "open")),
review: Enum.count(incidents, &(&1.routing_class == "review" and &1.status == "open")),
page: Enum.count(incidents, &(&1.routing_class == "page" and &1.status == "open"))
}
rescue
_error -> %{open: 0, review: 0, page: 0}
end
defp pending_review_count(tenant_id) do
OnlineScoreCandidate
|> where([candidate], candidate.tenant_id == ^tenant_id)
|> where([candidate], candidate.review_status == "pending")
|> Repo.aggregate(:count)
rescue
_error -> 0
end
# ── Incidents (tenant rollup) ──────────────────────────────────────────────
@doc "All incidents for a tenant, newest first — the /incidents triage list."
def list_tenant_incidents(tenant_id) do
Incident
|> where(tenant_id: ^tenant_id)
|> order_by([incident], desc: incident.last_seen_at)
|> limit(50)
|> Repo.all()
rescue
_error -> []
end
# ── Per-trace incident/budget projections ──────────────────────────────────
def load_budget_projection(trace_id, run_id) do
incidents = list_incidents(trace_id, run_id)
budget = latest_budget(trace_id, run_id)
breaker = latest_breaker(trace_id, run_id)
%{
trace_id: trace_id,
run_id: run_id,
status_label: budget_status_label(budget),
actuals: budget_actuals(budget),
breaker_open?: breaker && breaker.state == "open",
review_open?: Enum.any?(incidents, &(&1.routing_class == "review")),
page_open?: Enum.any?(incidents, &(&1.routing_class == "page"))
}
rescue
_error ->
%{
trace_id: trace_id,
run_id: run_id,
status_label: "budget state unavailable",
actuals: "No reservation evidence available yet.",
breaker_open?: false,
review_open?: false,
page_open?: false
}
end
def load_incident_projection(trace_id, run_id) do
incidents = list_incidents(trace_id, run_id)
alert_events = list_alert_events(incidents)
incident_events = list_incident_events(incidents)
deliveries = list_deliveries(trace_id, run_id)
audit_rows = list_audit_rows(trace_id, run_id)
budget = latest_budget(trace_id, run_id)
breaker = latest_breaker(trace_id, run_id)
%{
trace_id: trace_id,
run_id: run_id,
health_rollup: %{
budget_signal: budget_signal(budget),
budget_detail: budget_actuals(budget),
breaker_signal: breaker_signal(breaker),
breaker_detail: breaker_detail(breaker),
review_count:
Enum.count(incidents, &(&1.routing_class == "review" and &1.status == "open")),
page_count: Enum.count(incidents, &(&1.routing_class == "page" and &1.status == "open")),
relay_signal: relay_signal(audit_rows, deliveries),
relay_detail: relay_detail(audit_rows, deliveries)
},
budget: %{
status: budget_status(budget),
status_label: budget_signal(budget),
actuals: budget_actuals(budget),
reason_code: if(budget, do: budget.reason_code, else: "budget evidence unavailable"),
policy_key: if(budget, do: budget.policy_key, else: "n/a"),
provider_ref:
if(budget && budget.provider_ref, do: budget.provider_ref, else: "provider n/a"),
tool_ref: if(budget && budget.tool_ref, do: budget.tool_ref, else: "tool n/a")
},
breaker: %{
breaker_key: if(breaker, do: breaker.breaker_key, else: "breaker evidence unavailable"),
state: if(breaker, do: breaker.state, else: "closed"),
state_label: if(breaker, do: breaker.state, else: "closed"),
reason_code: if(breaker, do: breaker.reason_code, else: "no breaker trip recorded"),
integration_kind: if(breaker, do: breaker.integration_kind, else: "local")
},
incidents: build_incident_rows(incidents, alert_events, incident_events),
audit_rows: build_audit_rows(audit_rows),
deliveries: build_delivery_rows(deliveries)
}
rescue
_error ->
empty_incident_projection(trace_id, run_id)
end
def empty_incident_projection(trace_id, run_id) do
%{
trace_id: trace_id,
run_id: run_id,
health_rollup: %{
budget_signal: "No budget evidence",
budget_detail: "Reservation actuals will appear after a run records them.",
breaker_signal: "Breaker clear",
breaker_detail: "No breaker trips recorded for this trace.",
review_count: 0,
page_count: 0,
relay_signal: "Relay quiet",
relay_detail: "No audit or delivery rows recorded for this trace yet."
},
budget: %{
status: "ok",
status_label: "No budget evidence",
actuals: "Reservation actuals unavailable.",
reason_code: "budget evidence unavailable",
policy_key: "n/a",
provider_ref: "provider n/a",
tool_ref: "tool n/a"
},
breaker: %{
breaker_key: "breaker evidence unavailable",
state: "closed",
state_label: "closed",
reason_code: "no breaker trip recorded",
integration_kind: "local"
},
incidents: [],
audit_rows: [],
deliveries: []
}
end
@doc "Compact badge assigns merged onto a trace row when evidence is loaded."
def compact_trace_badges(trace_id, run_id) do
incidents = list_incidents(trace_id, run_id)
budget = latest_budget(trace_id, run_id)
breaker = latest_breaker(trace_id, run_id)
%{
budget_state: budget_signal(budget),
breaker_state: breaker && breaker.state,
review_incident:
Enum.any?(incidents, &(&1.routing_class == "review" and &1.status == "open")),
page_incident: Enum.any?(incidents, &(&1.routing_class == "page" and &1.status == "open"))
}
end
# ── Queries ────────────────────────────────────────────────────────────────
def list_incidents(trace_id, run_id) do
Incident
|> where(^evidence_filter(trace_id, run_id))
|> order_by([incident], asc: incident.inserted_at)
|> Repo.all()
end
def list_alert_events([]), do: []
def list_alert_events(incidents) do
incident_ids = Enum.map(incidents, & &1.id)
AlertEvent
|> where([event], event.incident_id in ^incident_ids)
|> order_by([event], asc: event.inserted_at)
|> Repo.all()
end
def list_incident_events([]), do: []
def list_incident_events(incidents) do
incident_ids = Enum.map(incidents, & &1.id)
IncidentEvent
|> where([event], event.incident_id in ^incident_ids)
|> order_by([event], asc: event.inserted_at)
|> Repo.all()
end
def list_deliveries(trace_id, run_id) do
NotificationDelivery
|> where(^evidence_filter(trace_id, run_id))
|> order_by([delivery], asc: delivery.inserted_at)
|> Repo.all()
end
def list_audit_rows(trace_id, run_id) do
AuditOutboxEvent
|> where(^evidence_filter(trace_id, run_id))
|> order_by([audit], asc: audit.inserted_at)
|> Repo.all()
end
def latest_budget(trace_id, run_id) do
BudgetReservation
|> where(^evidence_filter(trace_id, run_id))
|> order_by([reservation], desc: reservation.inserted_at)
|> limit(1)
|> Repo.one()
end
def latest_breaker(trace_id, run_id) do
BreakerTrip
|> where(^evidence_filter(trace_id, run_id))
|> order_by([trip], desc: trip.inserted_at)
|> limit(1)
|> Repo.one()
end
defp evidence_filter(trace_id, run_id) do
if is_binary(run_id) and run_id != "" do
dynamic([row], row.trace_id == ^trace_id or field(row, :workflow_run_id) == ^run_id)
else
dynamic([row], row.trace_id == ^trace_id)
end
end
# ── Row builders / formatting ──────────────────────────────────────────────
def build_incident_rows(incidents, alert_events, incident_events) do
Enum.map(incidents, fn incident ->
alert_event = Enum.find(alert_events, &(&1.incident_id == incident.id))
incident_event = Enum.find(incident_events, &(&1.incident_id == incident.id))
approval_id =
first_present([
alert_event && get_in(alert_event.evidence_refs || %{}, ["approval_id"]),
incident_event && get_in(incident_event.evidence_refs || %{}, ["approval_id"]),
incident_event && get_in(incident_event.metadata || %{}, ["approval_id"])
])
%{
incident_key: incident.incident_key,
summary: incident.summary,
reason_code:
get_in(incident.metadata || %{}, ["reason_code"]) ||
(alert_event && alert_event.reason_code) || "incident",
routing_class: incident.routing_class,
routing_label: routing_label(incident.routing_class),
severity: incident.severity,
severity_label: severity_label(incident.severity),
trace_id: incident.trace_id,
run_id: incident.workflow_run_id,
approval_id: approval_id,
scorer_version:
first_present([
alert_event && alert_event.scorer_version_ref,
get_in(incident.evidence_summary || %{}, ["scorer_version"]),
get_in((incident_event && incident_event.metadata) || %{}, ["scorer_version"])
]) || "n/a",
baseline_version:
first_present([
alert_event && alert_event.baseline_version_ref,
get_in(incident.evidence_summary || %{}, ["baseline_version"]),
get_in((incident_event && incident_event.metadata) || %{}, ["baseline_version"])
]) || "n/a"
}
end)
end
def build_audit_rows(audit_rows) do
Enum.map(audit_rows, fn audit ->
%{
event_type: audit.event_type,
sink_status: audit.sink_status,
actor_ref: audit.actor_ref || "system",
approval_id: get_in(audit.redacted_refs || %{}, ["approval_id"]) || "n/a"
}
end)
end
def build_delivery_rows(deliveries) do
Enum.map(deliveries, fn delivery ->
%{
sink_kind: delivery.sink_kind,
delivery_status: delivery.delivery_status,
routing_key: delivery.routing_key,
attempt_count: delivery.attempt_count,
last_error: delivery.last_error,
delivery_outcome: delivery_outcome(delivery),
transport_mode: get_in(delivery.metadata || %{}, ["transport_mode"]),
transport_sink: get_in(delivery.metadata || %{}, ["transport_sink"])
}
end)
end
def delivery_outcome(delivery) do
get_in(delivery.metadata || %{}, ["delivery_outcome"]) ||
if(delivery.delivery_status == "failed", do: "failed", else: "delivered")
end
def budget_status(nil), do: "ok"
def budget_status(budget) do
cond do
budget.reason_code in ["budget_trip", "trip_threshold_exceeded"] -> "trip"
budget.reason_code in ["budget_warn", "warn_threshold_exceeded"] -> "warn"
true -> "ok"
end
end
def budget_signal(budget) do
case budget_status(budget) do
"trip" -> "Budget trip"
"warn" -> "Budget warn"
_ -> "Budget steady"
end
end
def budget_status_label(budget), do: budget_signal(budget)
def budget_actuals(nil), do: "Reservation actuals unavailable."
def budget_actuals(budget) do
estimated = decimal_to_string(budget.estimated_units)
actual = decimal_to_string(budget.actual_units)
"#{actual} actual / #{estimated} reserved"
end
def breaker_signal(nil), do: "Breaker clear"
def breaker_signal(%{state: "open"}), do: "Breaker open"
def breaker_signal(%{state: "half_open"}), do: "Breaker probing"
def breaker_signal(_breaker), do: "Breaker clear"
def breaker_detail(nil), do: "No breaker trips recorded for this trace."
def breaker_detail(breaker), do: "#{breaker.reason_code} on #{breaker.integration_kind}"
def relay_signal(audit_rows, deliveries) do
cond do
Enum.any?(deliveries, &(&1.delivery_status == "failed")) -> "Relay degraded"
Enum.any?(audit_rows, &(&1.sink_status == "pending")) -> "Relay pending"
audit_rows != [] or deliveries != [] -> "Relay healthy"
true -> "Relay quiet"
end
end
def relay_detail(audit_rows, deliveries) do
"#{length(audit_rows)} audit row(s), #{length(deliveries)} delivery outcome(s)"
end
def routing_label("page"), do: "Page incident"
def routing_label(_value), do: "Review incident"
def severity_label("critical"), do: "Critical severity"
def severity_label("warning"), do: "Warning severity"
def severity_label(_value), do: "Info severity"
def first_present(values), do: Enum.find(values, &(&1 not in [nil, ""]))
def decimal_to_string(nil), do: "0"
def decimal_to_string(%D{} = value), do: D.to_string(value, :normal)
def decimal_to_string(value), do: to_string(value)
end