defmodule Bloccs.Web.DashboardLive do
@moduledoc """
The single LiveView behind every dashboard panel. `live_action` selects the
panel (`:networks`, `:topology`, `:metrics`, `:coverage`) — the oban_web
pattern — so navigation never remounts.
It loads the read-only `Bloccs.Introspect` data each panel needs in
`handle_params`, resolves the Pro-gating context `on_mount`, and dispatches
rendering to the per-panel function components in `Bloccs.Web.Panels.*`. Live
telemetry (P4) is layered on top via PubSub.
"""
use Bloccs.Web, :live_view
alias Bloccs.{Introspect, Trace}
alias Bloccs.Web.{Access, Coverage, Paths}
alias Bloccs.Web.Panels
alias Bloccs.Web.Telemetry.{Collector, Flow}
@pubsub Bloccs.Web.PubSub
@doc """
`on_mount` hook installed by `bloccs_dashboard/2`: stash the configured
resolver, the resolved user/access/features, and the mount base path on the
socket before any panel renders.
"""
def on_mount({:resolver, resolver}, _params, session, socket) do
user = call_resolver(resolver, :resolve_user, [session], nil)
access = call_resolver(resolver, :resolve_access, [user], :all)
features = call_resolver(resolver, :resolve_features, [user], :all)
socket =
socket
|> Phoenix.Component.assign(:bloccs_resolver, resolver)
|> Phoenix.Component.assign(:bloccs_user, user)
|> Phoenix.Component.assign(:bloccs_access, access)
|> Phoenix.Component.assign(:bloccs_features, features)
|> Phoenix.Component.assign(:base_path, session["base_path"] || "/bloccs")
case access do
{:forbidden, reason} -> {:halt, redirect_forbidden(socket, reason)}
_ -> {:cont, socket}
end
end
@impl true
def mount(_params, _session, socket) do
{:ok,
socket
|> assign(
page_title: "bloccs",
network: nil,
networks: [],
node_states: %{},
frame: %{nodes: %{}, updated_at: nil},
metrics_topic: nil,
flow: %{events: [], series: [], rate: 0},
flow_topic: nil,
flow_filters: %{node: nil, outcome: nil},
coverage: nil,
recording: nil,
net_stats: %{},
overview_ids: [],
selected_msg: nil,
selected_journey: [],
inspect_node: nil,
flow_paused: false
)
# `.bloccs-trace` has no registered MIME type, so accept :any and validate
# the contents on load instead of by extension.
|> allow_upload(:trace, accept: :any, max_entries: 1)}
end
@impl true
def handle_info({:bloccs_frame, network, frame}, socket) do
{:noreply, socket |> put_frame(frame) |> update_overview(network, frame: frame)}
end
def handle_info({:bloccs_flow, network, flow}, socket) do
# Always fold into the overview stats; freeze the live feed when paused so the
# user can inspect without rows scrolling away.
socket = update_overview(socket, network, flow: flow)
socket = if socket.assigns.flow_paused, do: socket, else: assign(socket, :flow, flow)
{:noreply, socket}
end
@impl true
def handle_event("flow_filter", params, socket) do
filters = %{node: blank(params["node"]), outcome: blank(params["outcome"])}
{:noreply, assign(socket, :flow_filters, filters)}
end
def handle_event("toggle_pause", _params, socket) do
{:noreply, assign(socket, :flow_paused, not socket.assigns.flow_paused)}
end
def handle_event("close_msg", _params, socket) do
{:noreply, deselect(socket)}
end
def handle_event("msg_nav", %{"dir" => dir}, socket) do
{:noreply, nav_msg(socket, dir)}
end
def handle_event("msg_key", %{"key" => "ArrowUp"}, socket),
do: {:noreply, nav_msg(socket, "prev")}
def handle_event("msg_key", %{"key" => "ArrowDown"}, socket),
do: {:noreply, nav_msg(socket, "next")}
def handle_event("msg_key", %{"key" => "Escape"}, socket),
do: {:noreply, deselect(socket)}
def handle_event("msg_key", _params, socket), do: {:noreply, socket}
def handle_event("inspect_msg", %{"idx" => idx}, socket) do
events = Panels.Messages.filtered(socket.assigns.flow.events, socket.assigns.flow_filters)
event = Enum.at(events, String.to_integer(idx))
if event && Panels.Messages.same?(event, socket.assigns.selected_msg) do
{:noreply, deselect(socket)}
else
# Snapshot the message's journey at selection time, so the open drawer
# persists and navigates stably even as the live feed scrolls it out of
# the recent ring.
journey = if event, do: Flow.journey(events, event[:msg_id]), else: []
{:noreply, socket |> assign(:selected_msg, event) |> assign(:selected_journey, journey)}
end
end
# Re-center the drawer on a specific hop of the open message's journey snapshot.
def handle_event("inspect_hop", %{"msgid" => msgid, "to" => to}, socket) do
case Panels.Messages.find_hop(socket.assigns.selected_journey, msgid, to) do
nil -> {:noreply, socket}
hop -> {:noreply, assign(socket, :selected_msg, hop)}
end
end
@impl true
def handle_event("coverage_record", _params, %{assigns: %{network: net}} = socket)
when not is_nil(net) do
recording = Trace.record(net.id)
{:noreply, assign(socket, :recording, recording)}
end
def handle_event("coverage_stop", _params, %{assigns: %{recording: rec}} = socket)
when not is_nil(rec) do
events = Trace.stop(rec)
{:noreply, socket |> assign(:recording, nil) |> put_coverage(events, :recording)}
end
def handle_event("coverage_validate", _params, socket), do: {:noreply, socket}
def handle_event("coverage_load", _params, socket) do
case consume_uploaded_entries(socket, :trace, fn %{path: path}, _entry ->
{:ok, Trace.load(path)}
end) do
[{:ok, events}] -> {:noreply, put_coverage(socket, events, :trace)}
_ -> {:noreply, socket}
end
end
def handle_event(_event, _params, socket), do: {:noreply, socket}
defp deselect(socket),
do: socket |> assign(:selected_msg, nil) |> assign(:selected_journey, [])
# Step to the previous/next hop within the selected message's journey snapshot,
# so navigation follows the message along its path through the network. The
# snapshot is fixed at selection, so it never drifts or empties as the feed moves.
defp nav_msg(%{assigns: %{selected_msg: nil}} = socket, _dir), do: socket
defp nav_msg(socket, dir) do
journey = socket.assigns.selected_journey
with i when is_integer(i) <-
Enum.find_index(journey, &Panels.Messages.same?(&1, socket.assigns.selected_msg)),
j when j >= 0 <- if(dir == "prev", do: i - 1, else: i + 1),
hop when is_map(hop) <- Enum.at(journey, j) do
assign(socket, :selected_msg, hop)
else
_ -> socket
end
end
@impl true
def terminate(_reason, socket) do
if rec = socket.assigns[:recording], do: Trace.stop(rec)
:ok
end
@impl true
def handle_params(params, _uri, socket) do
{:noreply,
socket
|> assign(:now, System.monotonic_time(:millisecond))
|> assign(:page_title, page_title(socket.assigns.live_action, params))
|> assign(:flow_filters, %{node: blank(params["node"]), outcome: blank(params["outcome"])})
|> assign(:selected_msg, nil)
|> assign(:selected_journey, [])
|> assign(:inspect_node, blank(params["node"]))
|> assign(:flow_paused, false)
|> load_panel(socket.assigns.live_action, params)}
end
@impl true
def render(assigns) do
~H"""
<div class="bloccs-dashboard" data-access={inspect(@bloccs_access)}>
<.panel_nav
active={@live_action}
base_path={@base_path}
network={@network}
features={@bloccs_features}
/>
<main class="bloccs-panel">
<.panel_body {assigns} />
</main>
</div>
"""
end
# ---- data loading per panel ----
defp load_panel(socket, :networks, _params) do
networks = Introspect.list_networks()
# In-flight request/response load per network (Bloccs.call/4 · cast/4),
# a snapshot at panel load — see `Bloccs.Collector.stats/0`.
inflight = Bloccs.Collector.stats().by_network
socket
|> assign(:networks, networks)
|> assign(:net_stats, Map.new(networks, &{&1.id, net_stat(&1.id, inflight)}))
|> subscribe_overview(networks)
end
defp load_panel(socket, action, params)
when action in [:topology, :messages, :metrics, :coverage] do
case fetch_network(params["network"]) do
{:ok, network} ->
socket
|> assign(:network, network)
|> maybe_subscribe(action, network)
:error ->
assign(socket, :network, nil)
end
end
# ---- networks overview (live cards) ----
# Subscribe to every network's flow + metric topics so the overview cards stay
# live. Re-entrant: unsubscribe the previous set before subscribing the new one.
defp subscribe_overview(socket, networks) do
if connected?(socket) do
ids = Enum.map(networks, & &1.id)
Enum.each(socket.assigns.overview_ids -- ids, &unsubscribe_overview/1)
Enum.each(ids -- socket.assigns.overview_ids, &subscribe_one_overview/1)
assign(socket, :overview_ids, ids)
else
socket
end
end
defp subscribe_one_overview(id) do
Phoenix.PubSub.subscribe(@pubsub, Collector.flow_topic(id))
Phoenix.PubSub.subscribe(@pubsub, Collector.topic(id))
end
defp unsubscribe_overview(id) do
Phoenix.PubSub.unsubscribe(@pubsub, Collector.flow_topic(id))
Phoenix.PubSub.unsubscribe(@pubsub, Collector.topic(id))
end
defp net_stat(id, inflight) do
frame = Collector.snapshot(id)
flow = Collector.flow_snapshot(id)
%{
rate: flow.rate,
series: series_totals(flow.series),
errors: total_errors(frame),
in_flight: Map.get(inflight, id, 0)
}
end
defp series_totals(series), do: Enum.map(series, &Map.get(&1, :total, 0))
defp total_errors(%{nodes: nodes}),
do: nodes |> Map.values() |> Enum.map(&Map.get(&1, :errors, 0)) |> Enum.sum()
defp total_errors(_), do: 0
# Fold a live frame into the overview stats for that network (no-op if the
# network isn't on the currently-loaded overview).
defp update_overview(socket, network, change) do
stats = socket.assigns.net_stats
case Map.fetch(stats, network) do
{:ok, cur} ->
updated =
case change do
[flow: flow] -> %{cur | rate: flow.rate, series: series_totals(flow.series)}
[frame: frame] -> %{cur | errors: total_errors(frame)}
end
assign(socket, :net_stats, Map.put(stats, network, updated))
:error ->
socket
end
end
# The live topology reads both metric frames (node state + throughput) and the
# flow snapshot (which edges are active), so it subscribes to both.
defp maybe_subscribe(socket, :topology, network),
do: socket |> subscribe_metrics(network) |> subscribe_flow(network)
defp maybe_subscribe(socket, :metrics, network), do: subscribe_metrics(socket, network)
defp maybe_subscribe(socket, :messages, network), do: subscribe_flow(socket, network)
defp maybe_subscribe(socket, _action, _network), do: socket
# Subscribe to the network's flow frames and prime the first paint.
defp subscribe_flow(socket, network) do
if connected?(socket) do
topic = Collector.flow_topic(network.id)
if socket.assigns[:flow_topic] != topic do
if old = socket.assigns[:flow_topic], do: Phoenix.PubSub.unsubscribe(@pubsub, old)
Phoenix.PubSub.subscribe(@pubsub, topic)
end
socket
|> assign(:flow_topic, topic)
|> assign(:flow, Collector.flow_snapshot(network.id))
else
socket
end
end
# Subscribe to the network's metric frames (idempotently) and prime the first
# paint from the collector snapshot. No-op until the socket is connected.
defp subscribe_metrics(socket, network) do
if connected?(socket) do
topic = Collector.topic(network.id)
if socket.assigns[:metrics_topic] != topic do
if old = socket.assigns[:metrics_topic], do: Phoenix.PubSub.unsubscribe(@pubsub, old)
Phoenix.PubSub.subscribe(@pubsub, topic)
end
socket
|> assign(:metrics_topic, topic)
|> put_frame(Collector.snapshot(network.id))
else
socket
end
end
defp put_frame(socket, frame) do
socket
|> assign(:frame, frame)
|> assign(:node_states, node_states(frame))
end
defp node_states(%{nodes: nodes}), do: Map.new(nodes, fn {id, v} -> {id, v.state} end)
defp node_states(_), do: %{}
defp blank(v) when v in [nil, ""], do: nil
defp blank(v), do: v
# Build the coverage report from trace events and stash it (plus a re-encoded
# .bloccs-trace for the gated export).
defp put_coverage(socket, events, source) do
network = socket.assigns.network
report = Coverage.report(network, Trace.reached(events))
assign(socket, :coverage, %{
report: report,
source: source,
json: encode_trace(events, network.id)
})
end
defp encode_trace(events, network_id) do
path =
Path.join(
System.tmp_dir!(),
"bloccs-#{network_id}-#{System.unique_integer([:positive])}.bloccs-trace"
)
case Trace.dump(events, network_id, path) do
:ok ->
json = File.read!(path)
_ = File.rm(path)
json
_ ->
nil
end
end
defp fetch_network(nil), do: :error
defp fetch_network(id) when is_binary(id) do
# A network that was started (so its atom exists) but has since stopped
# returns `{:error, :not_found}` — collapse it, and an unknown atom, to the
# one `:error` the caller renders as the not-found panel.
case Introspect.network(String.to_existing_atom(id)) do
{:ok, network} -> {:ok, network}
{:error, :not_found} -> :error
end
rescue
ArgumentError -> :error
end
# ---- rendering dispatch ----
defp panel_body(%{live_action: :networks} = assigns) do
~H"""
<Panels.Networks.render
networks={@networks}
base_path={@base_path}
now={@now}
stats={@net_stats}
/>
"""
end
defp panel_body(%{network: nil} = assigns) do
~H"""
<section class="bloccs-empty">
<p><strong>Network not found.</strong></p>
<p class="bloccs-muted">
It may have stopped. <.link navigate={Paths.networks(@base_path)} class="bloccs-link">
Back to networks</.link>.
</p>
</section>
"""
end
defp panel_body(%{live_action: :topology} = assigns) do
~H"""
<Panels.Topology.render
network={@network}
base_path={@base_path}
states={@node_states}
frame={@frame}
flow={@flow}
selected={@inspect_node}
/>
"""
end
defp panel_body(%{live_action: :messages} = assigns) do
~H"""
<Panels.Messages.render
network={@network}
base_path={@base_path}
flow={@flow}
filters={@flow_filters}
selected={@selected_msg}
journey={@selected_journey}
paused={@flow_paused}
/>
"""
end
defp panel_body(%{live_action: :metrics} = assigns) do
~H"""
<Panels.Metrics.render network={@network} base_path={@base_path} frame={@frame} />
"""
end
defp panel_body(%{live_action: :coverage} = assigns) do
~H"""
<Panels.Coverage.render
network={@network}
base_path={@base_path}
features={@bloccs_features}
coverage={@coverage}
recording={@recording != nil}
upload={@uploads.trace}
/>
"""
end
# ---- chrome ----
attr :active, :atom, required: true
attr :base_path, :string, required: true
attr :network, :any, default: nil
attr :features, :any, required: true
defp panel_nav(assigns) do
~H"""
<nav class="bloccs-nav">
<.link navigate={Paths.networks(@base_path)} class="bloccs-brand" aria-label="BloccsWeb">
<img src={"#{@base_path}/assets/mark.svg"} alt="" class="bloccs-mark" />
<span class="bloccs-wordmark"><span class="bloccs-wm-thin">Bloccs</span>Web</span>
</.link>
<.nav_link
active={@active}
action={:networks}
href={Paths.networks(@base_path)}
label="Networks"
/>
<%= if @network do %>
<span class="bloccs-nav__sep">/</span>
<span class="bloccs-nav__network">{@network.id}</span>
<.nav_link
active={@active}
action={:topology}
href={Paths.topology(@base_path, @network.id)}
label="Topology"
/>
<.nav_link
:if={Access.enabled?(:messages, @features)}
active={@active}
action={:messages}
href={Paths.messages(@base_path, @network.id)}
label="Messages"
/>
<.nav_link
:if={Access.enabled?(:metrics, @features)}
active={@active}
action={:metrics}
href={Paths.metrics(@base_path, @network.id)}
label="Metrics"
/>
<.nav_link
:if={Access.enabled?(:coverage, @features)}
active={@active}
action={:coverage}
href={Paths.coverage(@base_path, @network.id)}
label="Coverage"
/>
<% end %>
</nav>
"""
end
attr :active, :atom, required: true
attr :action, :atom, required: true
attr :href, :string, required: true
attr :label, :string, required: true
defp nav_link(assigns) do
~H"""
<.link
navigate={@href}
class={["bloccs-nav__link", @active == @action && "bloccs-nav__link--active"]}
>
{@label}
</.link>
"""
end
defp page_title(:networks, _), do: "bloccs · networks"
defp page_title(action, %{"network" => net}), do: "bloccs · #{net} · #{action}"
defp page_title(action, _), do: "bloccs · #{action}"
# ---- resolver plumbing ----
defp call_resolver(resolver, fun, args, default) do
if function_exported?(resolver, fun, length(args)) do
apply(resolver, fun, args)
else
default
end
end
defp redirect_forbidden(socket, _reason) do
Phoenix.LiveView.redirect(socket, to: "/")
end
end