Skip to main content

lib/bloccs/web/panels/messages.ex

defmodule Bloccs.Web.Panels.Messages do
  @moduledoc """
  Panel 5 — packages moving through the network. A live throughput chart over the
  per-second `Bloccs.Web.Telemetry.Flow` buckets, plus a scrolling feed of recent
  message events: each edge a message crossed (`from.port → to.port`), its
  outcome, the emitting node's latency, and — when `Bloccs.Inspect` capture is
  enabled (bloccs 0.3+) — a bounded, redacted snapshot of the payload.

  Click a row to inspect it: the full payload plus the hop it took, highlighted on
  a mini-topology. Filterable by node and outcome.
  """

  use Bloccs.Web, :html

  import Bloccs.Web.Components.{Chart, Graph}

  alias Bloccs.Web.Format

  attr :network, :any, required: true
  attr :base_path, :string, required: true
  attr :flow, :map, default: %{events: [], series: [], rate: 0}
  attr :filters, :map, default: %{node: nil, outcome: nil}
  attr :selected, :any, default: nil
  # The selected message's journey snapshot (lineage hops, oldest-first), taken at
  # selection time by the live view so it persists as the feed scrolls. Prev/Next
  # walks THIS list, so navigation follows the message and the position is stable.
  attr :journey, :list, default: []
  attr :paused, :boolean, default: false

  def render(assigns) do
    events = filtered(assigns.flow.events, assigns.filters)
    hop_idx = assigns.selected && Enum.find_index(assigns.journey, &same?(&1, assigns.selected))

    assigns =
      assigns
      |> assign(:events, events)
      |> assign(:any_payload, Enum.any?(events, & &1[:payload]))
      |> assign(:hop_idx, hop_idx)
      |> assign(:hop_count, length(assigns.journey))

    ~H"""
    <section class="bloccs-messages">
      <header class="bloccs-panel__header">
        <h1>Messages</h1>
        <div class="bloccs-msg-live">
          <span :if={not @paused} class="bloccs-muted">
            <span class="bloccs-live"></span> live · {@flow.rate}/s
          </span>
          <span :if={@paused} class="bloccs-paused">⏸ paused</span>
          <button type="button" class="bloccs-btn bloccs-btn--sm" phx-click="toggle_pause">
            {if @paused, do: "▶ Resume", else: "⏸ Pause"}
          </button>
        </div>
      </header>

      <.throughput series={@flow.series} />

      <div class="bloccs-msg-filters">
        <form phx-change="flow_filter">
          <select name="node" class="bloccs-select">
            <option value="" selected={@filters.node in [nil, ""]}>all nodes</option>
            <option :for={n <- node_ids(@network)} value={n} selected={to_string(n) == @filters.node}>
              {n}
            </option>
          </select>
          <select name="outcome" class="bloccs-select">
            <option value="" selected={@filters.outcome in [nil, ""]}>all outcomes</option>
            <option :for={o <- ~w(ok failed dropped)} value={o} selected={o == @filters.outcome}>
              {o}
            </option>
          </select>
        </form>
        <span class="bloccs-muted">{length(@events)} shown</span>
      </div>

      <table class="bloccs-table bloccs-feed">
        <thead>
          <tr>
            <th>Time</th>
            <th>From → To</th>
            <th>Payload</th>
            <th>Outcome</th>
            <th class="bloccs-num">Latency</th>
          </tr>
        </thead>
        <tbody>
          <%= for {e, idx} <- Enum.with_index(@events) do %>
            <tr
              class={["bloccs-row", "bloccs-feed__row", same?(e, @selected) && "is-selected"]}
              phx-click="inspect_msg"
              phx-value-idx={idx}
            >
              <td class="bloccs-feed__time">{time(e.at)}</td>
              <td class="bloccs-feed__edge">{edge(e)}</td>
              <td class="bloccs-feed__payload" title={payload_full(e)}>{payload(e)}</td>
              <td><.status_pill state={pill(e.outcome)} label={Atom.to_string(e.outcome)} /></td>
              <td class="bloccs-num">{Format.latency(e.duration_ms)}</td>
            </tr>
          <% end %>
        </tbody>
      </table>

      <p :if={@events == []} class="bloccs-empty">
        <strong>No messages yet.</strong>
        <span class="bloccs-muted">Send traffic through the network and it appears here live.</span>
      </p>

      <p :if={@events != [] and not @any_payload} class="bloccs-muted bloccs-hint">
        Payload contents are hidden. Enable capture with
        <code>config :bloccs, :inspect, enabled: true</code>
        (bloccs 0.3+).
      </p>

      <%!-- message inspector: a right-side drawer, so the live feed never pushes it --%>
      <div :if={@selected} class="bloccs-drawer-scrim" phx-click="close_msg" />
      <aside :if={@selected} class="bloccs-drawer" phx-window-keydown="msg_key">
        <header class="bloccs-drawer__head">
          <div class="bloccs-drawer__heading">
            <div class="bloccs-drawer__title">{edge(@selected)}</div>
            <div class="bloccs-drawer__sub">
              <.status_pill
                state={pill(@selected.outcome)}
                label={Atom.to_string(@selected.outcome)}
              />
              <span class="bloccs-muted">{Format.latency(@selected.duration_ms)}</span>
            </div>
          </div>
          <button type="button" class="bloccs-drawer__x" phx-click="close_msg" title="Close (Esc)">
            ×
          </button>
        </header>

        <div class="bloccs-drawer__body">
          <div class="bloccs-drawer__section">
            <h3>
              Journey <span :if={@journey != []} class="bloccs-muted">· {length(@journey)} hops</span>
            </h3>
            <div class="bloccs-drawer__graph">
              <.graph
                network={@network}
                states={journey_states(@journey)}
                active_edges={journey_edges(@journey)}
                selected={@selected.node}
                labels={false}
              />
            </div>
            <ol class="bloccs-journey">
              <li
                :for={hop <- @journey}
                class={["bloccs-journey__hop", same?(hop, @selected) && "is-selected"]}
                phx-click="inspect_hop"
                phx-value-msgid={hop.msg_id}
                phx-value-to={hop_token(hop)}
              >
                <span class="bloccs-journey__time">{time(hop.at)}</span>
                <span class="bloccs-journey__edge">{edge(hop)}</span>
                <.status_pill state={pill(hop.outcome)} label={Atom.to_string(hop.outcome)} />
                <span class="bloccs-journey__lat bloccs-num">{Format.latency(hop.duration_ms)}</span>
              </li>
            </ol>
            <p :if={@journey in [nil, []]} class="bloccs-muted">
              No lineage recorded for this message (it may have aged out of the feed).
            </p>
          </div>

          <div class="bloccs-drawer__section">
            <h3>Selected hop</h3>
            <div class="bloccs-kv">
              <span class="bloccs-muted">from</span><code>{from_label(@selected)}</code>
            </div>
            <div class="bloccs-kv">
              <span class="bloccs-muted">to</span><code>{to_label(@selected)}</code>
            </div>
            <div class="bloccs-kv">
              <span class="bloccs-muted">outcome</span><code>{@selected.outcome}</code>
            </div>
            <div class="bloccs-kv">
              <span class="bloccs-muted">latency</span>
              <code>{Format.latency(@selected.duration_ms)}</code>
            </div>
            <div class="bloccs-kv">
              <span class="bloccs-muted">at</span><code>{time(@selected.at)}</code>
            </div>
          </div>

          <div class="bloccs-drawer__section">
            <h3>Payload</h3>
            <pre class="bloccs-detail__payload">{payload_full(@selected) || "(payload capture disabled — config :bloccs, :inspect, enabled: true)"}</pre>
          </div>
        </div>

        <footer class="bloccs-drawer__nav">
          <span class="bloccs-drawer__pos">{hop_pos(@hop_idx, @hop_count)}</span>
          <button
            type="button"
            class="bloccs-btn bloccs-drawer__navbtn"
            phx-click="msg_nav"
            phx-value-dir="prev"
            disabled={@hop_idx in [nil, 0]}
          >
            ← Prev hop
          </button>
          <button
            type="button"
            class="bloccs-btn bloccs-drawer__navbtn"
            phx-click="msg_nav"
            phx-value-dir="next"
            disabled={@hop_idx == nil or @hop_idx >= @hop_count - 1}
          >
            Next hop →
          </button>
        </footer>
      </aside>
    </section>
    """
  end

  @doc "Filter flow events by node and outcome (used by the panel and the live view)."
  def filtered(events, %{node: node, outcome: outcome}) do
    events
    |> reject_blank(:node, node, fn e, v -> to_string(e.node) == v end)
    |> reject_blank(:outcome, outcome, fn e, v -> outcome_class(e.outcome) == v end)
  end

  def filtered(events, _), do: events

  @doc "Find the journey hop with this `msg_id` (string) and edge `to` token. Used by the live view."
  def find_hop(events, msgid, to_token) do
    Enum.find(events, fn e -> to_string(e[:msg_id]) == msgid and hop_token(e) == to_token end)
  end

  @doc """
  Whether a flow event is the currently-selected one. Identified by its lineage
  `msg_id` plus the edge `to` (one emit can fan to several edges → same msg_id,
  distinct rows) and `at` (a node may re-emit the same id is not expected, but
  `at` keeps it exact). Stable across live feed updates.
  """
  def same?(_e, nil), do: false

  def same?(e, s),
    do: e[:msg_id] == s[:msg_id] and e.to == s.to and e.at == s.at and e.node == s.node

  defp reject_blank(events, _key, v, _match) when v in [nil, ""], do: events
  defp reject_blank(events, _key, v, match), do: Enum.filter(events, &match.(&1, v))

  defp outcome_class(:ok), do: "ok"
  defp outcome_class(o) when o in [:dropped, :skipped], do: "dropped"
  defp outcome_class(_), do: "failed"

  defp node_ids(%{nodes: nodes}), do: nodes |> Enum.map(& &1.id) |> Enum.sort()

  defp payload(%{payload: p}) when is_binary(p), do: strip_map(p)
  defp payload(_), do: "—"

  defp payload_full(%{payload: p}) when is_binary(p), do: p
  defp payload_full(_), do: nil

  defp strip_map("%{" <> rest = full) do
    if String.ends_with?(rest, "}"), do: binary_part(rest, 0, byte_size(rest) - 1), else: full
  end

  defp strip_map(other), do: other

  # Highlight every node + edge the message touched across its whole journey.
  defp journey_states(journey) when is_list(journey) do
    Enum.reduce(journey, %{}, fn e, acc ->
      acc = Map.put(acc, e.node, :running)

      case e.to do
        {tn, _tp} -> Map.put(acc, tn, :running)
        _ -> acc
      end
    end)
  end

  defp journey_states(_), do: %{}

  defp journey_edges(journey) when is_list(journey) do
    for %{to: {tn, _tp}, node: n} <- journey, into: MapSet.new(), do: {n, tn}
  end

  defp journey_edges(_), do: MapSet.new()

  # A stable token identifying a hop's edge, for `inspect_hop` clicks.
  defp hop_token(%{to: {tn, tp}}), do: "#{tn}.#{tp}"
  defp hop_token(_), do: ""

  defp hop_pos(nil, _count), do: "—"
  defp hop_pos(idx, count), do: "hop #{idx + 1} of #{count}"

  defp edge(%{out_port: nil, node: node}), do: "#{node}"
  defp edge(%{node: node, out_port: port, to: nil}), do: "#{node}.#{port} → ·"
  defp edge(%{node: node, out_port: port, to: {tn, tp}}), do: "#{node}.#{port}#{tn}.#{tp}"

  defp from_label(%{node: n, out_port: nil}), do: "#{n}"
  defp from_label(%{node: n, out_port: p}), do: "#{n}.#{p}"

  defp to_label(%{to: {tn, tp}}), do: "#{tn}.#{tp}"
  defp to_label(_), do: "·"

  defp pill(:ok), do: :ok
  defp pill(o) when o in [:dropped, :skipped], do: :idle
  defp pill(_), do: :failed

  defp time(ms) do
    ms
    |> DateTime.from_unix!(:millisecond)
    |> Calendar.strftime("%H:%M:%S")
    |> Kernel.<>("." <> (ms |> rem(1000) |> Integer.to_string() |> String.pad_leading(3, "0")))
  end
end