Skip to main content

lib/bloccs/web/telemetry/collector.ex

defmodule Bloccs.Web.Telemetry.Collector do
  @moduledoc """
  The single sink for bloccs telemetry. Attaches `Bloccs.Web.Telemetry.Handler`
  to the `[:bloccs, …]` stream and folds it into two per-network views:

    * **metrics** — `Bloccs.Web.Telemetry.Metrics` rolling windows (Metrics panel)
    * **flow** — `Bloccs.Web.Telemetry.Flow`, recent edge traversals + per-second
      throughput buckets (Messages panel)

  On a 1-second tick it broadcasts a snapshot of each over `Phoenix.PubSub`
  (`topic/1` for metrics, `flow_topic/1` for flow). Panels subscribe for live
  updates and call `snapshot/1` / `flow_snapshot/1` on mount for first paint.

  > A plain `GenServer` is the right tool here: this is a stateful telemetry sink
  > with a periodic tick, not a dataflow stage. The "no raw GenServers" rule
  > governs the `bloccs` library core (where Broadway/GenStage apply), not this
  > observability process.
  """

  use GenServer

  alias Bloccs.Web.Telemetry.{Flow, Handler, Metrics}

  @pubsub Bloccs.Web.PubSub
  @tick_ms 1_000
  @handler_id :bloccs_web_collector

  @type frame :: %{nodes: map(), updated_at: integer() | nil}

  # ---- public API ----

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: Keyword.get(opts, :name, __MODULE__))
  end

  @doc "PubSub topic carrying a network's metric frames."
  @spec topic(atom()) :: String.t()
  def topic(network), do: "bloccs:metrics:#{network}"

  @doc "PubSub topic carrying a network's flow frames."
  @spec flow_topic(atom()) :: String.t()
  def flow_topic(network), do: "bloccs:flow:#{network}"

  @doc "Cast a normalized metrics event in (called by the telemetry handler)."
  @spec record(pid() | atom(), atom(), Metrics.event()) :: :ok
  def record(collector, network, event), do: GenServer.cast(collector, {:record, network, event})

  @doc "Cast a flow event in (called by the telemetry handler)."
  @spec record_flow(pid() | atom(), atom(), Flow.event()) :: :ok
  def record_flow(collector, network, event),
    do: GenServer.cast(collector, {:flow, network, event})

  @doc "First-paint metrics snapshot for a network (empty frame if unseen)."
  @spec snapshot(atom()) :: frame()
  def snapshot(network) do
    GenServer.call(__MODULE__, {:snapshot, network})
  catch
    :exit, _ -> %{nodes: %{}, updated_at: nil}
  end

  @doc "First-paint flow snapshot for a network."
  @spec flow_snapshot(atom()) :: %{events: list(), series: list(), rate: non_neg_integer()}
  def flow_snapshot(network) do
    GenServer.call(__MODULE__, {:flow_snapshot, network})
  catch
    :exit, _ -> %{events: [], series: [], rate: 0}
  end

  # ---- GenServer ----

  @impl true
  def init(_opts) do
    Handler.attach(@handler_id, self())
    schedule_tick()
    {:ok, %{metrics: %{}, flow: %{}}}
  end

  @impl true
  def handle_cast({:record, network, event}, state) do
    now = System.monotonic_time(:millisecond)
    metrics = state.metrics |> Map.get(network, Metrics.new()) |> Metrics.apply(event, now)
    {:noreply, put_in(state.metrics[network], metrics)}
  end

  def handle_cast({:flow, network, event}, state) do
    now = System.system_time(:millisecond)
    flow = state.flow |> Map.get(network, Flow.new()) |> Flow.record(event, now)
    {:noreply, put_in(state.flow[network], flow)}
  end

  @impl true
  def handle_call({:snapshot, network}, _from, state) do
    now = System.monotonic_time(:millisecond)

    frame =
      case Map.fetch(state.metrics, network) do
        {:ok, metrics} -> Metrics.snapshot(metrics, now)
        :error -> %{nodes: %{}, updated_at: nil}
      end

    {:reply, frame, state}
  end

  def handle_call({:flow_snapshot, network}, _from, state) do
    {:reply, flow_frame(state, network), state}
  end

  @impl true
  def handle_info(:tick, state) do
    metrics_now = System.monotonic_time(:millisecond)

    Enum.each(state.metrics, fn {network, metrics} ->
      frame = Metrics.snapshot(metrics, metrics_now)
      Phoenix.PubSub.broadcast(@pubsub, topic(network), {:bloccs_frame, network, frame})
    end)

    Enum.each(state.flow, fn {network, _flow} ->
      Phoenix.PubSub.broadcast(
        @pubsub,
        flow_topic(network),
        {:bloccs_flow, network, flow_frame(state, network)}
      )
    end)

    schedule_tick()
    {:noreply, state}
  end

  @impl true
  def terminate(_reason, _state) do
    Handler.detach(@handler_id)
    :ok
  end

  defp flow_frame(state, network) do
    now = System.system_time(:millisecond)

    case Map.fetch(state.flow, network) do
      {:ok, flow} -> Flow.snapshot(flow, now)
      :error -> %{events: [], series: [], rate: 0}
    end
  end

  defp schedule_tick, do: Process.send_after(self(), :tick, @tick_ms)
end