lib/prom_ex/plugins/broadway.ex

if Code.ensure_loaded?(Broadway) do
  defmodule PromEx.Plugins.Broadway do
    @moduledoc """
    This plugin captures metrics emitted by Broadway.

    This plugin exposes the following metric groups:
    - `:broadway_init_event_metrics`
    - `:broadway_message_event_metrics`
    - `:broadway_batch_event_metrics`

    To use plugin in your application, add the following to your PromEx module:
    ```
    defmodule WebApp.PromEx do
      use PromEx, otp_app: :web_app

      @impl true
      def plugins do
        [
          ...
          PromEx.Plugins.Broadway
        ]
      end

      @impl true
      def dashboards do
        [
          ...
          {:prom_ex, "broadway.json"}
        ]
      end
    end
    ```

    To correctly capture per-message metrics and error rate, add the following transform to your pipeline:
    ```
    defmodule WebApp.MyPipeline do
      use Broadway

      alias Broadway.Message

      def start_link(_opts) do
        Broadway.start_link(__MODULE__,
          name: __MODULE__,
          producer: [
            ...
            transformer: {__MODULE__, :transform, []}
          ]
        )
      end

      def transform(event, _opts) do
        %Message{
          data: event,
          acknowledger: {__MODULE__, :ack_id, :ack_data}
        }
      end
    end
    ```
    """

    use PromEx.Plugin

    require Logger

    alias Broadway.{BatchInfo, Message, Options}
    alias PromEx.Utils

    @millisecond_duration_buckets [10, 100, 500, 1_000, 10_000, 30_000, 60_000]
    @message_batch_size_buckets [1, 5, 10, 20, 50, 100]

    @init_topology_event [:broadway, :topology, :init]
    @message_stop_event [:broadway, :processor, :message, :stop]
    @message_exception_event [:broadway, :processor, :message, :exception]
    @batch_stop_event [:broadway, :batch_processor, :stop]

    @init_topology_processors_proxy_event [:prom_ex, :broadway, :proxy, :processor, :init]
    @init_topology_batchers_proxy_event [:prom_ex, :broadway, :proxy, :batcher, :init]

    @impl true
    def event_metrics(opts) do
      otp_app = Keyword.fetch!(opts, :otp_app)
      metric_prefix = Keyword.get(opts, :metric_prefix, PromEx.metric_prefix(otp_app, :broadway))

      # Telemetry metrics will emit warnings if multiple handlers with the same names are defined.
      # As a result, this plugin supports gathering metrics on multiple processors and batches, but needs
      # to proxy them as not to create multiple definitions of the same metrics. The final data point will
      # have a label for the module associated with the event though so you'll be able to separate one
      # measurement from another.
      set_up_telemetry_proxies(@init_topology_event, otp_app)

      # Event metrics definitions
      [
        topology_init_events(metric_prefix),
        handle_message_events(metric_prefix),
        handle_batch_events(metric_prefix)
      ]
    end

    defp topology_init_events(metric_prefix) do
      Event.build(
        :broadway_init_event_metrics,
        [
          last_value(
            metric_prefix ++ [:init, :status, :info],
            event_name: @init_topology_event,
            measurement: fn _measurements -> 1 end,
            description: "The topology configuration data that was provided to Broadway.",
            tags: [:name],
            tag_values: &extract_init_tag_values/1
          ),
          last_value(
            metric_prefix ++ [:init, :hibernate_after, :default, :milliseconds],
            event_name: @init_topology_event,
            description: "The Broadway supervisor's hibernate after default value.",
            measurement: extract_default_config_measurement(:hibernate_after),
            tags: [:name],
            tag_values: &extract_init_tag_values/1
          ),
          last_value(
            metric_prefix ++ [:init, :resubscribe_interval, :default, :milliseconds],
            event_name: @init_topology_event,
            description: "The Broadway supervisor's resubscribe interval default value.",
            measurement: extract_default_config_measurement(:resubscribe_interval),
            tags: [:name],
            tag_values: &extract_init_tag_values/1
          ),
          last_value(
            metric_prefix ++ [:init, :max, :duration, :default, :milliseconds],
            event_name: @init_topology_event,
            description: "The Broadway supervisor's max seconds default value (in milliseconds).",
            measurement: extract_default_config_measurement(:max_seconds),
            tags: [:name],
            tag_values: &extract_init_tag_values/1,
            unit: {:second, :millisecond}
          ),
          last_value(
            metric_prefix ++ [:init, :max_restarts, :default, :value],
            event_name: @init_topology_event,
            description: "The Broadway supervisor's max restarts default value.",
            measurement: extract_default_config_measurement(:max_restarts),
            tags: [:name],
            tag_values: &extract_init_tag_values/1
          ),
          last_value(
            metric_prefix ++ [:init, :shutdown, :default, :milliseconds],
            event_name: @init_topology_event,
            description: "The Broadway supervisor's shutdown default value.",
            measurement: extract_default_config_measurement(:shutdown),
            tags: [:name],
            tag_values: &extract_init_tag_values/1
          ),
          last_value(
            metric_prefix ++ [:init, :processor, :hibernate_after, :milliseconds],
            event_name: @init_topology_processors_proxy_event,
            description: "The Broadway processors hibernate after value.",
            measurement: fn _measurements, %{hibernate_after: hibernate_after} -> hibernate_after end,
            tags: [:name, :processor]
          ),
          last_value(
            metric_prefix ++ [:init, :processor, :max_demand, :value],
            event_name: @init_topology_processors_proxy_event,
            description: "The Broadway processors max demand value.",
            measurement: fn _measurements, %{max_demand: max_demand} -> max_demand end,
            tags: [:name, :processor]
          ),
          last_value(
            metric_prefix ++ [:init, :processor, :concurrency, :value],
            event_name: @init_topology_processors_proxy_event,
            description: "The Broadway processors concurrency value.",
            measurement: fn _measurements, %{concurrency: concurrency} -> concurrency end,
            tags: [:name, :processor]
          ),
          last_value(
            metric_prefix ++ [:init, :batcher, :hibernate_after, :milliseconds],
            event_name: @init_topology_batchers_proxy_event,
            description: "The Broadway batchers hibernate after value.",
            measurement: fn _measurements, %{hibernate_after: hibernate_after} -> hibernate_after end,
            tags: [:name, :batcher]
          ),
          last_value(
            metric_prefix ++ [:init, :batcher, :concurrency, :value],
            event_name: @init_topology_batchers_proxy_event,
            description: "The Broadway batchers concurrency value.",
            measurement: fn _measurements, %{concurrency: concurrency} -> concurrency end,
            tags: [:name, :batcher]
          ),
          last_value(
            metric_prefix ++ [:init, :batcher, :batch_size, :value],
            event_name: @init_topology_batchers_proxy_event,
            description: "The Broadway batchers batch size value.",
            measurement: fn _measurements, %{batch_size: batch_size} -> batch_size end,
            tags: [:name, :batcher]
          ),
          last_value(
            metric_prefix ++ [:init, :batcher, :batch_timeout, :milliseconds],
            event_name: @init_topology_batchers_proxy_event,
            description: "The Broadway batchers timeout value.",
            measurement: fn _measurements, %{batch_timeout: batch_timeout} -> batch_timeout end,
            tags: [:name, :batcher]
          )
        ]
      )
    end

    @doc false
    def proxy_broadway_init_event(_event_name, _measurements, %{config: config}, _config) do
      # Invoking Broadway module
      broadway_module =
        config
        |> Keyword.fetch!(:name)
        |> Utils.normalize_module_name()

      # Extract all of the processors and proxy for each processor
      config
      |> Keyword.get(:processors, [])
      |> Enum.each(fn {processor, processor_options} ->
        metadata =
          processor_options
          |> Map.new()
          |> Map.put(:processor, processor)
          |> Map.put(:name, broadway_module)

        :telemetry.execute(@init_topology_processors_proxy_event, %{}, metadata)
      end)

      # Extract all of the batchers and proxy for each batcher
      config
      |> Keyword.get(:batchers, [])
      |> Enum.each(fn {batcher, batcher_options} ->
        metadata =
          batcher_options
          |> Map.new()
          |> Map.put(:batcher, batcher)
          |> Map.put(:name, broadway_module)

        :telemetry.execute(@init_topology_batchers_proxy_event, %{}, metadata)
      end)
    end

    defp set_up_telemetry_proxies(init_topology_event, otp_app) do
      :telemetry.attach(
        [:prom_ex, :broadway, :proxy, otp_app],
        init_topology_event,
        &__MODULE__.proxy_broadway_init_event/4,
        %{}
      )
    end

    defp extract_default_config_measurement(field) do
      fn _measurements, %{config: config} ->
        config
        |> NimbleOptions.validate!(Options.definition())
        |> Map.new()
        |> Map.get(field)
      end
    end

    defp handle_message_events(metric_prefix) do
      Event.build(
        :broadway_message_event_metrics,
        [
          distribution(
            metric_prefix ++ [:process, :message, :duration, :milliseconds],
            event_name: @message_stop_event,
            measurement: :duration,
            description: "The time it takes Broadway to process a message.",
            reporter_options: [
              buckets: @millisecond_duration_buckets
            ],
            tags: [:processor_key, :name],
            tag_values: fn %{processor_key: processor_key, message: %Message{acknowledger: {acknowledger, _, _}}} ->
              %{
                processor_key: processor_key,
                name: Utils.normalize_module_name(acknowledger)
              }
            end,
            unit: {:native, :millisecond}
          ),
          distribution(
            metric_prefix ++ [:process, :message, :exception, :duration, :milliseconds],
            event_name: @message_exception_event,
            measurement: :duration,
            description: "The time it takes Broadway to process a message that results in an error.",
            reporter_options: [
              buckets: @millisecond_duration_buckets
            ],
            tags: [:processor_key, :name, :kind, :reason],
            tag_values: &extract_exception_tag_values/1,
            unit: {:native, :millisecond}
          )
        ]
      )
    end

    defp handle_batch_events(metric_prefix) do
      Event.build(
        :broadway_batch_event_metrics,
        [
          distribution(
            metric_prefix ++ [:process, :batch, :duration, :milliseconds],
            event_name: @batch_stop_event,
            measurement: :duration,
            description: "The time it takes Broadway to process a batch of messages.",
            reporter_options: [
              buckets: @millisecond_duration_buckets
            ],
            tags: [:batcher, :name],
            tag_values: &extract_batcher_tag_values/1,
            unit: {:native, :millisecond}
          ),
          distribution(
            metric_prefix ++ [:process, :batch, :failure, :size],
            event_name: @batch_stop_event,
            measurement: fn _measurements, metadata ->
              length(metadata.failed_messages)
            end,
            description: "How many of the messages in the batch failed to process.",
            reporter_options: [
              buckets: @message_batch_size_buckets
            ],
            tags: [:batcher, :name],
            tag_values: &extract_batcher_tag_values/1
          ),
          distribution(
            metric_prefix ++ [:process, :batch, :success, :size],
            event_name: @batch_stop_event,
            measurement: fn _measurements, metadata ->
              length(metadata.successful_messages)
            end,
            description: "How many of the messages in the batch were successfully processed.",
            reporter_options: [
              buckets: @message_batch_size_buckets
            ],
            tags: [:batcher, :name],
            tag_values: &extract_batcher_tag_values/1
          )
        ]
      )
    end

    defp extract_batcher_tag_values(%{batch_info: batch_info = %BatchInfo{}, topology_name: name}) do
      %{
        name: Utils.normalize_module_name(name),
        batch_key: batch_info.batch_key,
        batcher: batch_info.batcher
      }
    end

    defp extract_init_tag_values(metadata) do
      full_configuration =
        metadata.config
        |> NimbleOptions.validate!(Broadway.Options.definition())
        |> Map.new()

      %{
        name: Utils.normalize_module_name(full_configuration.name)
      }
    end

    defp extract_exception_tag_values(%{
           processor_key: processor_key,
           kind: kind,
           reason: reason,
           stacktrace: stacktrace,
           message: %Message{acknowledger: {acknowledger, _, _}}
         }) do
      reason = Utils.normalize_exception(kind, reason, stacktrace)

      %{
        processor_key: processor_key,
        kind: kind,
        reason: reason,
        name: Utils.normalize_module_name(acknowledger)
      }
    end
  end
else
  defmodule PromEx.Plugins.Broadway do
    @moduledoc false
    use PromEx.Plugin

    @impl true
    def event_metrics(_opts) do
      PromEx.Plugin.no_dep_raise(__MODULE__, "Broadway")
    end
  end
end