lib/broadway.ex

defmodule NewRelicBroadway.Telemetry.Broadway do
  @moduledoc """
  Provides `Broadway` instrumentation via `telemetry`.

  Broadway pipelines are auto-discovered and instrumented.

  We automatically gather:

  * Transaction metrics and events
  * Transaction Traces

  ----

  To prevent reporting an individual transaction:

  ```elixir
  NewRelic.ignore_transaction()
  ```

  ----

  Inside a Transaction, the agent will track work across processes that are spawned and linked.
  You can signal to the agent not to track work done inside a spawned process, which will
  exclude it from the current Transaction.

  To exclude a process from the Transaction:

  ```elixir
  Task.async(fn ->
    NewRelic.exclude_from_transaction()
    Work.wont_be_tracked()
  end)
  ```
  """

  use GenServer

  alias NewRelic.Transaction

  @doc false
  def start_link(_) do
    config = %{
      handler_id: {:new_relic, :broadway}
    }

    GenServer.start_link(__MODULE__, config, name: __MODULE__)
  end

  @broadway_start [:broadway, :processor, :message, :start]
  @broadway_stop [:broadway, :processor, :message, :stop]
  @broadway_exception [:broadway, :processor, :message, :exception]

  @broadway_events [
    @broadway_start,
    @broadway_stop,
    @broadway_exception
  ]

  @doc false
  def init(config) do
    :telemetry.attach_many(
      config.handler_id,
      @broadway_events,
      &__MODULE__.handle_event/4,
      config
    )

    Process.flag(:trap_exit, true)
    {:ok, config}
  end

  @doc false
  def terminate(_reason, %{handler_id: handler_id}) do
    :telemetry.detach(handler_id)
  end

  @doc false
  def handle_event(
        @broadway_start,
        %{system_time: system_time},
        meta,
        _config
      ) do
    Transaction.Reporter.start_transaction(:other)

    add_start_attrs(meta, system_time)
  end

  def handle_event(
        @broadway_stop,
        %{duration: duration} = _meas,
        _meta,
        _config
      ) do
    add_stop_attrs(duration)

    Transaction.Reporter.stop_transaction(:other)
  end

  def handle_event(
        @broadway_exception,
        %{duration: duration} = _meas,
        %{kind: kind} = meta,
        _config
      ) do
    add_stop_attrs(duration)
    {reason, stack} = reason_and_stack(meta)

    Transaction.Reporter.fail(%{kind: kind, reason: reason, stack: stack})
    Transaction.Reporter.stop_transaction(:other)
  end

  def handle_event(_event, _measurements, _meta, _config) do
    :ignore
  end

  defp add_start_attrs(meta, system_time) do
    name = "Broadway/#{meta.topology_name}/perform"

    [
      pid: inspect(self()),
      system_time: system_time,
      index: meta.index,
      name: name,
      other_transaction_name: name,
      topology_name: meta.topology_name,
      processor_key: meta.processor_key
    ]
    |> NewRelic.add_attributes()
  end

  @kb 1024
  defp add_stop_attrs(duration) do
    info = Process.info(self(), [:memory, :reductions])

    [
      duration: duration,
      memory_kb: info[:memory] / @kb,
      reductions: info[:reductions]
    ]
    |> NewRelic.add_attributes()
  end

  defp reason_and_stack(%{reason: %{__exception__: true} = reason, stacktrace: stack}) do
    {reason, stack}
  end

  defp reason_and_stack(%{reason: {{reason, stack}, _init_call}}) do
    {reason, stack}
  end

  defp reason_and_stack(%{reason: {reason, _init_call}}) do
    {reason, []}
  end

  defp reason_and_stack(unexpected_exception) do
    NewRelic.log(:debug, "unexpected_exception: #{inspect(unexpected_exception)}")
    {:unexpected_exception, []}
  end
end