Skip to main content

lib/bloccs/web/telemetry/handler.ex

defmodule Bloccs.Web.Telemetry.Handler do
  @moduledoc """
  Attaches to the `[:bloccs, …]` telemetry events and forwards normalized data to
  the `Bloccs.Web.Telemetry.Collector`. Runs in the emitting (Broadway) process,
  so it does the minimum — normalize and cast — and never blocks the pipeline.

  It feeds two views:

    * **Metrics** — per-node rolling windows from `:start` / `:stop` / …
    * **Flow** — one event per message that crosses an edge. A node's `[:bloccs,
      :emit]` events (the edges it fired) are buffered in the process dictionary
      and flushed on `:stop` / `:exception`, paired with the node's outcome and
      latency. The handler runs in the node's process and a Broadway processor
      handles one message at a time, so the per-process buffer is race-free.
  """

  alias Bloccs.Web.Telemetry.Collector

  @buf :bloccs_flow_buf

  @events [
    [:bloccs, :node, :start],
    [:bloccs, :node, :stop],
    [:bloccs, :node, :exception],
    [:bloccs, :node, :retry],
    [:bloccs, :node, :skipped],
    [:bloccs, :node, :dropped],
    [:bloccs, :node, :dispatch_error],
    [:bloccs, :emit]
  ]

  @doc "Attach this handler, forwarding to `collector`. Idempotent per id."
  @spec attach(atom(), pid() | atom()) :: :ok | {:error, :already_exists}
  def attach(id, collector) do
    :telemetry.attach_many(id, @events, &__MODULE__.handle/4, %{collector: collector})
  end

  @spec detach(atom()) :: :ok | {:error, :not_found}
  def detach(id), do: :telemetry.detach(id)

  @doc false
  # An emit is one edge firing: stash it (with the opt-in payload snapshot) in the
  # current message's buffer.
  def handle([:bloccs, :emit], _measurements, metadata, _config) do
    case Process.get(@buf) do
      %{emits: emits} = buf ->
        emit = %{
          port: metadata[:from_port],
          targets: metadata[:targets] || [],
          payload: metadata[:payload],
          msg_id: metadata[:msg_id],
          parents: metadata[:parents] || [],
          trace_id: metadata[:trace_id]
        }

        Process.put(@buf, %{buf | emits: [emit | emits]})

      _ ->
        :ok
    end

    :ok
  end

  def handle([:bloccs, :node, kind], measurements, metadata, %{collector: collector}) do
    with network when not is_nil(network) <- metadata[:network],
         node when not is_nil(node) <- metadata[:node] do
      Collector.record(collector, network, normalize(kind, measurements, metadata))
      flow(kind, measurements, metadata, collector, network)
    else
      _ -> :ok
    end
  end

  # ---- metrics normalization ----

  defp normalize(:start, _measurements, meta), do: {:start, meta.node}

  defp normalize(:stop, measurements, meta) do
    {:stop, meta.node, duration_ms(measurements), meta[:outcome] || :ok}
  end

  defp normalize(:exception, _measurements, meta), do: {:exception, meta.node}
  defp normalize(kind, _measurements, meta), do: {:event, meta.node, kind}

  # ---- flow correlation ----

  defp flow(:start, _measurements, meta, collector, network) do
    # Aggregate nodes (batch/join) emit but never send `:stop`, so their buffered
    # emits would never flush. A node's pipeline process only ever runs that one
    # node, so any buffer still open when the next message starts belongs to the
    # previous (aggregate) message — flush it now (outcome :ok, no per-message
    # latency, since an aggregate has none). Transforms delete their buffer on
    # `:stop`, so there is nothing to flush here for them.
    flush_orphan(collector, network)
    Process.put(@buf, %{node: meta.node, emits: []})
    :ok
  end

  defp flow(:stop, measurements, meta, collector, network) do
    flush(collector, network, meta.node, meta[:outcome] || :ok, duration_ms(measurements), nil)
  end

  defp flow(:exception, measurements, meta, collector, network) do
    flush(collector, network, meta.node, :failed, duration_ms(measurements), meta[:reason])
  end

  defp flow(kind, _measurements, meta, collector, network)
       when kind in [:dropped, :skipped, :retry, :dispatch_error] do
    # No emit, so no descendant lineage — but the input message's id (when the
    # node processes one message) anchors this terminal event in its journey.
    Collector.record_flow(collector, network, %{
      node: meta.node,
      out_port: nil,
      to: nil,
      outcome: kind,
      duration_ms: nil,
      reason: nil,
      payload: nil,
      msg_id: meta[:msg_id],
      parents: [],
      trace_id: nil
    })
  end

  # Flush an aggregate node's emits left buffered (no `:stop` arrives for them).
  defp flush_orphan(collector, network) do
    case Process.get(@buf) do
      %{node: node, emits: [_ | _] = emits} ->
        Process.delete(@buf)

        for emit <- Enum.reverse(emits), target <- targets_or_nil(emit.targets) do
          Collector.record_flow(collector, network, %{
            node: node,
            out_port: emit.port,
            to: target,
            outcome: :ok,
            duration_ms: nil,
            reason: nil,
            payload: emit.payload,
            msg_id: emit.msg_id,
            parents: emit.parents,
            trace_id: emit.trace_id
          })
        end

      _ ->
        :ok
    end
  end

  defp flush(collector, network, node, outcome, duration, reason) do
    emits =
      case Process.get(@buf) do
        %{node: ^node, emits: emits} -> Enum.reverse(emits)
        _ -> []
      end

    Process.delete(@buf)

    for emit <- normalize_emits(emits), target <- targets_or_nil(emit.targets) do
      Collector.record_flow(collector, network, %{
        node: node,
        out_port: emit.port,
        to: target,
        outcome: outcome,
        duration_ms: duration,
        reason: reason,
        payload: emit.payload,
        msg_id: emit.msg_id,
        parents: emit.parents,
        trace_id: emit.trace_id
      })
    end

    :ok
  end

  # A node that emitted nothing (failed/dropped before emit, or terminal) still
  # produces one row so the failure is visible.
  defp normalize_emits([]),
    do: [%{port: nil, targets: [], payload: nil, msg_id: nil, parents: [], trace_id: nil}]

  defp normalize_emits(emits), do: emits

  defp targets_or_nil([]), do: [nil]
  defp targets_or_nil(targets), do: targets

  defp duration_ms(%{duration: native}) do
    System.convert_time_unit(native, :native, :microsecond) / 1000
  end

  defp duration_ms(_), do: nil
end