Skip to main content

lib/bloccs/web/telemetry/flow.ex

defmodule Bloccs.Web.Telemetry.Flow do
  @moduledoc """
  The pure core behind the Messages panel: a bounded record of messages moving
  through a network. Each event is one edge traversal (a `[:bloccs, :emit]`
  correlated with the emitting node's outcome and latency) or a failure/drop.

  Keeps a ring of the most recent events for the activity feed and per-second
  buckets for the throughput chart. Like the rest of the collector core it owns
  no clock — `now` (ms) is passed in — so it is deterministically testable.
  """

  @max_recent 100
  @bucket_window 60
  @keep_seconds 90

  @type endpoint :: {atom(), atom()}
  @type outcome :: :ok | :failed | :dropped | :skipped | :retry | :dispatch_error

  @typedoc """
  A normalized flow event (before timestamping). `msg_id` / `parents` /
  `trace_id` are the emitted message's `Bloccs.Lineage` (bloccs 0.5+): `msg_id`
  is this message, `parents` the input id(s) that caused it (many on a fan-in),
  `trace_id` the root correlation. They let one message be tracked across hops.
  """
  @type event :: %{
          node: atom(),
          out_port: atom() | nil,
          to: endpoint() | nil,
          outcome: outcome(),
          duration_ms: number() | nil,
          reason: term() | nil,
          payload: String.t() | nil,
          msg_id: pos_integer() | nil,
          parents: [pos_integer()],
          trace_id: pos_integer() | nil
        }

  @type t :: %{recent: [map()], buckets: %{integer() => map()}}

  @spec new() :: t()
  def new, do: %{recent: [], buckets: %{}}

  @spec record(t(), event(), integer()) :: t()
  def record(state, event, now) do
    sec = div(now, 1000)
    cls = classify(event.outcome)

    bucket =
      state.buckets
      |> Map.get(sec, %{ok: 0, failed: 0, dropped: 0, other: 0})
      |> Map.update!(cls, &(&1 + 1))

    %{
      state
      | recent: [Map.put(event, :at, now) | state.recent] |> Enum.take(@max_recent),
        buckets: state.buckets |> Map.put(sec, bucket) |> prune(sec)
    }
  end

  @doc """
  A snapshot for the panel: the recent events (newest first), a per-second
  series for the chart, and the current rate (events in the last full second).
  """
  @spec snapshot(t(), integer()) :: %{events: [map()], series: [map()], rate: non_neg_integer()}
  def snapshot(state, now) do
    cur = div(now, 1000)

    series =
      for s <- (cur - @bucket_window + 1)..cur do
        b = Map.get(state.buckets, s, %{ok: 0, failed: 0, dropped: 0, other: 0})
        %{ok: b.ok, failed: b.failed, dropped: b.dropped, other: b.other, total: total(b)}
      end

    %{
      events: state.recent,
      series: series,
      rate: state.buckets |> Map.get(cur - 1, %{}) |> total()
    }
  end

  @doc """
  The lineage **journey** of `msg_id`: every recorded event in its connected
  causal component — ancestors (via `parents`) and descendants (events that list
  it as a parent), transitively — ordered oldest-first. Branches and merges to
  match the topology, so a fan-in (batch/join) journey includes all the inputs
  that were combined. Bounded by what is still in the recent ring.
  """
  @spec journey([map()], pos_integer() | nil) :: [map()]
  def journey(_events, nil), do: []

  def journey(events, msg_id) do
    ids = connected_ids(events, msg_id)

    events
    |> Enum.filter(&(&1[:msg_id] in ids))
    # `msg_id` is monotonic with actual emit creation, so it orders hops causally
    # regardless of when each was recorded (an aggregate node's emit is flushed
    # late, so its `at` lags). Events without an id sort last, by `at`.
    |> Enum.sort_by(&{is_nil(&1[:msg_id]), &1[:msg_id] || 0, &1.at})
  end

  # ---- internals ----

  # The set of msg_ids reachable from `start` along parent/child lineage edges.
  defp connected_ids(events, start) do
    {up, down} =
      Enum.reduce(events, {%{}, %{}}, fn e, {up, down} ->
        mid = e[:msg_id]
        ps = e[:parents] || []

        up =
          if mid,
            do: Map.update(up, mid, MapSet.new(ps), &MapSet.union(&1, MapSet.new(ps))),
            else: up

        down =
          if mid,
            do:
              Enum.reduce(
                ps,
                down,
                &Map.update(&2, &1, MapSet.new([mid]), fn s -> MapSet.put(s, mid) end)
              ),
            else: down

        {up, down}
      end)

    bfs([start], MapSet.new([start]), up, down)
  end

  defp bfs([], seen, _up, _down), do: seen

  defp bfs([n | rest], seen, up, down) do
    neighbors = MapSet.union(Map.get(up, n, MapSet.new()), Map.get(down, n, MapSet.new()))
    fresh = MapSet.difference(neighbors, seen)
    bfs(rest ++ MapSet.to_list(fresh), MapSet.union(seen, fresh), up, down)
  end

  defp classify(:ok), do: :ok
  defp classify(:failed), do: :failed
  defp classify(:dispatch_error), do: :failed
  defp classify(:dropped), do: :dropped
  defp classify(:skipped), do: :dropped
  defp classify(_), do: :other

  defp total(b),
    do:
      Map.get(b, :ok, 0) + Map.get(b, :failed, 0) + Map.get(b, :dropped, 0) +
        Map.get(b, :other, 0)

  defp prune(buckets, cur), do: Map.reject(buckets, fn {s, _} -> s < cur - @keep_seconds end)
end