Skip to main content

lib/bloccs/web/telemetry/metrics.ex

defmodule Bloccs.Web.Telemetry.Metrics do
  @moduledoc """
  The pure functional core of the metrics collector: folds normalized bloccs
  telemetry into per-node rolling windows and renders a snapshot. No processes,
  no clock of its own — `now` (monotonic ms) is always passed in, so the whole
  thing is deterministically testable with synthetic events.

  Normalized events (produced by `Bloccs.Web.Telemetry.Handler`):

    * `{:start, node}` — a message entered the node
    * `{:stop, node, duration_ms, :ok | :failed}` — it finished
    * `{:exception, node}` — it raised
    * `{:event, node, kind}` — retry / skipped / dropped / dispatch_error

  A node's `state` (`:idle | :running | :ok | :failed`) drives the topology
  glyph; the windowed stats drive the metrics panel.
  """

  @window_ms 10_000
  @max_samples 500

  @type node_state :: %{
          completed: non_neg_integer(),
          errors: non_neg_integer(),
          events: non_neg_integer(),
          samples: [{integer(), number()}],
          state: :idle | :running | :ok | :failed,
          last_at: integer() | nil
        }
  @type t :: %{optional(atom()) => node_state()}

  @type event ::
          {:start, atom()}
          | {:stop, atom(), number(), :ok | :failed}
          | {:exception, atom()}
          | {:event, atom(), atom()}

  @type node_view :: %{
          state: atom(),
          completed: non_neg_integer(),
          errors: non_neg_integer(),
          error_rate: float(),
          throughput: float(),
          p50: number() | nil,
          p95: number() | nil
        }

  @spec new() :: t()
  def new, do: %{}

  @spec apply(t(), event(), integer()) :: t()
  def apply(nodes, {:start, node}, now) do
    update(nodes, node, now, &%{&1 | state: :running})
  end

  def apply(nodes, {:stop, node, duration_ms, outcome}, now) do
    update(nodes, node, now, fn m ->
      %{
        m
        | completed: m.completed + 1,
          errors: m.errors + if(outcome == :failed, do: 1, else: 0),
          state: if(outcome == :failed, do: :failed, else: :ok),
          samples: add_sample(m.samples, {now, duration_ms})
      }
    end)
  end

  def apply(nodes, {:exception, node}, now) do
    update(nodes, node, now, fn m ->
      %{m | completed: m.completed + 1, errors: m.errors + 1, state: :failed}
    end)
  end

  def apply(nodes, {:event, node, _kind}, now) do
    update(nodes, node, now, &%{&1 | events: &1.events + 1})
  end

  def apply(nodes, _unknown, _now), do: nodes

  @doc "Render a `{node => view}` snapshot, pruning samples older than the window."
  @spec snapshot(t(), integer()) :: %{nodes: %{atom() => node_view()}, updated_at: integer()}
  def snapshot(nodes, now) do
    views =
      Map.new(nodes, fn {id, m} ->
        recent = prune(m.samples, now)
        durations = Enum.map(recent, &elem(&1, 1))

        {id,
         %{
           state: decayed_state(m, now),
           completed: m.completed,
           errors: m.errors,
           error_rate: ratio(m.errors, m.completed),
           throughput: length(recent) / (@window_ms / 1000),
           p50: percentile(durations, 0.5),
           p95: percentile(durations, 0.95),
           series: series(recent, now)
         }}
      end)

    %{nodes: views, updated_at: now}
  end

  # ---- internals ----

  defp blank, do: %{completed: 0, errors: 0, events: 0, samples: [], state: :idle, last_at: nil}

  defp update(nodes, node, now, fun) do
    m = Map.get(nodes, node, blank())
    Map.put(nodes, node, fun.(%{m | last_at: now}))
  end

  defp add_sample(samples, sample) do
    [sample | samples] |> Enum.take(@max_samples)
  end

  defp prune(samples, now) do
    Enum.filter(samples, fn {t, _} -> now - t <= @window_ms end)
  end

  # Per-second completion counts across the window (oldest → newest), for a tiny
  # throughput sparkline on the metrics panel.
  @buckets div(@window_ms, 1000)
  defp series(samples, now) do
    start = now - @window_ms

    counts =
      Enum.reduce(samples, %{}, fn {t, _}, acc ->
        sec = div(t - start, 1000)
        if sec >= 0 and sec < @buckets, do: Map.update(acc, sec, 1, &(&1 + 1)), else: acc
      end)

    for s <- 0..(@buckets - 1), do: Map.get(counts, s, 0)
  end

  # A node that finished a while ago settles back to :idle so the graph doesn't
  # stay lit forever; a failure stays visible for the window.
  defp decayed_state(%{state: :failed} = m, now) do
    if stale?(m, now), do: :idle, else: :failed
  end

  defp decayed_state(%{state: :ok} = m, now) do
    if stale?(m, now), do: :idle, else: :ok
  end

  defp decayed_state(%{state: state}, _now), do: state

  defp stale?(%{last_at: nil}, _now), do: true
  defp stale?(%{last_at: at}, now), do: now - at > @window_ms

  defp ratio(_n, 0), do: 0.0
  defp ratio(n, d), do: n / d

  defp percentile([], _q), do: nil

  defp percentile(values, q) do
    sorted = Enum.sort(values)
    idx = max(0, round(q * (length(sorted) - 1)))
    Enum.at(sorted, idx)
  end
end