lib/prom_ex/plugins/oban.ex

if Code.ensure_loaded?(Oban) do
  defmodule PromEx.Plugins.Oban do
    @moduledoc """
    This plugin captures metrics emitted by Oban. Specifically, it captures metrics from job events, producer events,
    and also from internal polling jobs to monitor queue sizes

    This plugin supports the following options:
    - `oban_supervisors`: This is an OPTIONAL option and it allows you to specify what Oban instances should have their events
      tracked. By default the only Oban instance that will have its events tracked is the default `Oban` instance. As a result, by
      default this option has a value of `[Oban]`. If you would like to track other named Oban instances, or perhaps your default
      and only Oban instance has a different name, you can pass in your own list of Oban instances (e.g. `[Oban, Oban.PrivateJobs]`).

    - `metric_prefix`: This option is OPTIONAL and is used to override the default metric prefix of
      `[otp_app, :prom_ex, :oban]`. If this changes you will also want to set `oban_metric_prefix`
      in your `dashboard_assigns` to the snakecase version of your prefix, the default
      `oban_metric_prefix` is `{otp_app}_prom_ex_oban`.

    - `poll_rate`: This option is OPTIONAL and is the rate at which poll metrics are refreshed (default is 5 seconds).

    - `duration_unit`: This is an OPTIONAL option and is a `Telemetry.Metrics.time_unit()`. It can be one of:
      `:second | :millisecond | :microsecond | :nanosecond`. It is `:millisecond` by default.

    This plugin exposes the following metric groups:
    - `:oban_init_event_metrics`
    - `:oban_job_event_metrics`
    - `:oban_producer_event_metrics`
    - `:oban_circuit_event_metrics`
    - `:oban_queue_poll_metrics`

    To use plugin in your application, add the following to your PromEx module:
    ```
    defmodule WebApp.PromEx do
      use PromEx, otp_app: :web_app

      @impl true
      def plugins do
        [
          ...
          {PromEx.Plugins.Oban, oban_supervisors: [Oban, Oban.AnotherSupervisor], poll_rate: 10_000}
        ]
      end

      @impl true
      def dashboards do
        [
          ...
          {:prom_ex, "oban.json"}
        ]
      end
    end
    ```
    """

    use PromEx.Plugin

    alias PromEx.Utils

    import Ecto.Query, only: [group_by: 3, select: 3]

    # Oban events
    @init_event [:oban, :supervisor, :init]
    @job_complete_event [:oban, :job, :stop]
    @job_exception_event [:oban, :job, :exception]
    @producer_complete_event [:oban, :producer, :stop]
    @producer_exception_event [:oban, :producer, :exception]
    @circuit_breaker_trip_event [:oban, :circuit, :trip]
    @circuit_breaker_open_event [:oban, :circuit, :open]

    # PromEx Oban proxy events
    @init_event_queue_limit_proxy [:prom_ex, :oban, :queue, :limit, :proxy]

    @impl true
    def event_metrics(opts) do
      otp_app = Keyword.fetch!(opts, :otp_app)
      metric_prefix = Keyword.get(opts, :metric_prefix, PromEx.metric_prefix(otp_app, :oban))
      duration_unit = Keyword.get(opts, :duration_unit, :millisecond)

      oban_supervisors = get_oban_supervisors(opts)
      keep_function_filter = keep_oban_instance_metrics(oban_supervisors)

      # Set up event proxies
      set_up_init_proxy_event(metric_prefix)

      [
        oban_supervisor_init_event_metrics(metric_prefix, keep_function_filter, duration_unit),
        oban_job_event_metrics(metric_prefix, keep_function_filter, duration_unit),
        oban_producer_event_metrics(metric_prefix, keep_function_filter, duration_unit),
        oban_circuit_breaker_event_metrics(metric_prefix, keep_function_filter)
      ]
    end

    @impl true
    def polling_metrics(opts) do
      otp_app = Keyword.fetch!(opts, :otp_app)
      metric_prefix = Keyword.get(opts, :metric_prefix, PromEx.metric_prefix(otp_app, :oban))
      poll_rate = Keyword.get(opts, :poll_rate, 5_000)

      oban_supervisors = get_oban_supervisors(opts)

      # Queue length details
      Polling.build(
        :oban_queue_poll_metrics,
        poll_rate,
        {__MODULE__, :execute_queue_metrics, [oban_supervisors]},
        [
          last_value(
            metric_prefix ++ [:queue, :length, :count],
            event_name: [:prom_ex, :plugin, :oban, :queue, :length, :count],
            description: "The total number jobs that are in the queue in the designated state",
            measurement: :count,
            tags: [:name, :queue, :state]
          )
        ]
      )
    end

    @doc false
    def execute_queue_metrics(oban_supervisors) do
      oban_supervisors
      |> Enum.each(fn oban_supervisor ->
        oban_supervisor
        |> Oban.Registry.whereis()
        |> case do
          oban_pid when is_pid(oban_pid) ->
            config = Oban.Registry.config(oban_supervisor)
            handle_oban_queue_polling_metrics(oban_supervisor, config)

          _ ->
            :skip
        end
      end)
    end

    def oban_circuit_breaker_event_metrics(metric_prefix, keep_function_filter) do
      Event.build(
        :oban_circuit_breaker_event_metrics,
        [
          counter(
            metric_prefix ++ [:circuit, :trip, :total],
            event_name: @circuit_breaker_trip_event,
            description: "The number of circuit breaker events that have occurred",
            tag_values: &circuit_breaker_trip_tag_values/1,
            tags: [:name, :circuit_breaker],
            keep: keep_function_filter
          ),
          counter(
            metric_prefix ++ [:circuit, :open, :total],
            event_name: @circuit_breaker_open_event,
            description: "The number of circuit open events that have occurred.",
            tag_values: &circuit_breaker_trip_tag_values/1,
            tags: [:name, :circuit_breaker],
            keep: keep_function_filter
          )
        ]
      )
    end

    @doc false
    def handle_proxy_init_event(_event_name, _event_measurement, event_metadata, _config) do
      Enum.each(event_metadata.conf.queues, fn {queue, queue_opts} ->
        limit = Keyword.get(queue_opts, :limit, 0)

        metadata = %{
          queue: queue,
          name: event_metadata.conf.name
        }

        :telemetry.execute(@init_event_queue_limit_proxy, %{limit: limit}, metadata)
      end)
    end

    defp circuit_breaker_trip_tag_values(%{name: name, config: conf}) do
      %{
        name: normalize_module_name(conf.name),
        circuit_breaker: normalize_module_name(name)
      }
    end

    defp circuit_breaker_trip_tag_values(%{name: name, conf: conf}) do
      %{
        name: normalize_module_name(conf.name),
        circuit_breaker: normalize_module_name(name)
      }
    end

    defp oban_job_event_metrics(metric_prefix, keep_function_filter, duration_unit) do
      job_attempt_buckets = [1, 5, 10]
      job_duration_buckets = [10, 100, 500, 1_000, 5_000, 20_000]
      duration_unit_plural = Utils.make_plural_atom(duration_unit)

      Event.build(
        :oban_job_event_metrics,
        [
          distribution(
            metric_prefix ++ [:job, :processing, :duration, duration_unit_plural],
            event_name: @job_complete_event,
            measurement: :duration,
            description: "The amount of time it takes to processes an Oban job.",
            reporter_options: [
              buckets: job_duration_buckets
            ],
            tag_values: &job_complete_tag_values/1,
            tags: [:name, :queue, :state, :worker],
            unit: {:native, duration_unit},
            keep: keep_function_filter
          ),
          distribution(
            metric_prefix ++ [:job, :queue, :time, duration_unit_plural],
            event_name: @job_complete_event,
            measurement: :queue_time,
            description: "The amount of time that the Oban job was waiting in queue for processing.",
            reporter_options: [
              buckets: job_duration_buckets
            ],
            tag_values: &job_complete_tag_values/1,
            tags: [:name, :queue, :state, :worker],
            unit: {:native, duration_unit},
            keep: keep_function_filter
          ),
          distribution(
            metric_prefix ++ [:job, :complete, :attempts],
            event_name: @job_complete_event,
            measurement: fn _measurement, %{attempt: attempt} ->
              attempt
            end,
            description: "The number of times a job was attempted prior to completing.",
            reporter_options: [
              buckets: job_attempt_buckets
            ],
            tag_values: &job_complete_tag_values/1,
            tags: [:name, :queue, :state, :worker],
            keep: keep_function_filter
          ),
          distribution(
            metric_prefix ++ [:job, :exception, :duration, duration_unit_plural],
            event_name: @job_exception_event,
            measurement: :duration,
            description: "The amount of time it took to process a job the encountered an exception.",
            reporter_options: [
              buckets: job_duration_buckets
            ],
            tag_values: &job_exception_tag_values/1,
            tags: [:name, :queue, :state, :worker, :kind, :error],
            unit: {:native, duration_unit},
            keep: keep_function_filter
          ),
          distribution(
            metric_prefix ++ [:job, :exception, :queue, :time, duration_unit_plural],
            event_name: @job_exception_event,
            measurement: :queue_time,
            description:
              "The amount of time that the Oban job was waiting in queue for processing prior to resulting in an exception.",
            reporter_options: [
              buckets: job_duration_buckets
            ],
            tag_values: &job_exception_tag_values/1,
            tags: [:name, :queue, :state, :worker, :kind, :error],
            unit: {:native, duration_unit},
            keep: keep_function_filter
          ),
          distribution(
            metric_prefix ++ [:job, :exception, :attempts],
            event_name: @job_exception_event,
            measurement: fn _measurement, %{attempt: attempt} ->
              attempt
            end,
            description: "The number of times a job was attempted prior to throwing an exception.",
            reporter_options: [
              buckets: job_attempt_buckets
            ],
            tag_values: &job_exception_tag_values/1,
            tags: [:name, :queue, :state, :worker],
            keep: keep_function_filter
          )
        ]
      )
    end

    defp oban_producer_event_metrics(metric_prefix, keep_function_filter, duration_unit) do
      duration_unit_plural = Utils.make_plural_atom(duration_unit)

      Event.build(
        :oban_producer_event_metrics,
        [
          distribution(
            metric_prefix ++ [:producer, :duration, duration_unit_plural],
            event_name: @producer_complete_event,
            measurement: :duration,
            description: "How long it took to dispatch the job.",
            reporter_options: [
              buckets: [10, 100, 500, 1_000, 5_000, 10_000]
            ],
            unit: {:native, duration_unit},
            tag_values: &producer_tag_values/1,
            tags: [:queue, :name],
            keep: keep_function_filter
          ),
          distribution(
            metric_prefix ++ [:producer, :dispatched, :count],
            event_name: @producer_complete_event,
            measurement: fn _measurement, %{dispatched_count: count} ->
              count
            end,
            description: "The number of jobs that were dispatched.",
            reporter_options: [
              buckets: [5, 10, 50, 100]
            ],
            tag_values: &producer_tag_values/1,
            tags: [:queue, :name],
            keep: keep_function_filter
          ),
          distribution(
            metric_prefix ++ [:producer, :exception, :duration, duration_unit_plural],
            event_name: @producer_exception_event,
            measurement: :duration,
            description: "How long it took for the producer to raise an exception.",
            reporter_options: [
              buckets: [10, 100, 500, 1_000, 5_000, 10_000]
            ],
            unit: {:native, duration_unit},
            tag_values: &producer_tag_values/1,
            tags: [:queue, :name],
            keep: keep_function_filter
          )
        ]
      )
    end

    defp job_complete_tag_values(metadata) do
      config =
        case metadata do
          %{config: config} ->
            config

          %{conf: config} ->
            config
        end

      %{
        name: normalize_module_name(config.name),
        queue: metadata.job.queue,
        state: metadata.state,
        worker: metadata.worker
      }
    end

    defp job_exception_tag_values(metadata) do
      error =
        case metadata.error do
          %error_type{} -> normalize_module_name(error_type)
          _ -> "Undefined"
        end

      config =
        case metadata do
          %{config: config} ->
            config

          %{conf: config} ->
            config
        end

      %{
        name: normalize_module_name(config.name),
        queue: metadata.job.queue,
        state: metadata.state,
        worker: metadata.worker,
        kind: metadata.kind,
        error: error
      }
    end

    defp producer_tag_values(metadata) do
      %{
        queue: metadata.queue,
        name: normalize_module_name(metadata.conf.name)
      }
    end

    defp oban_supervisor_init_event_metrics(metric_prefix, keep_function_filter, duration_unit) do
      duration_unit_plural = Utils.make_plural_atom(duration_unit)

      Event.build(
        :oban_init_event_metrics,
        [
          last_value(
            metric_prefix ++ [:init, :status, :info],
            event_name: @init_event,
            description: "Information regarding the initialized oban supervisor.",
            measurement: fn _measurements -> 1 end,
            tags: [:name, :node, :plugins, :prefix, :queues, :repo],
            tag_values: &oban_init_tag_values/1,
            keep: keep_function_filter
          ),
          last_value(
            metric_prefix ++ [:init, :shutdown, :grace, :period, duration_unit_plural],
            event_name: @init_event,
            description: "The Oban supervisor's shutdown grace period value.",
            measurement: fn _measurements, %{conf: config} ->
              config.shutdown_grace_period
            end,
            tags: [:name],
            tag_values: &oban_init_tag_values/1,
            keep: keep_function_filter
          ),
          last_value(
            metric_prefix ++ [:init, :dispatch, :cooldown, duration_unit_plural],
            event_name: @init_event,
            description: "The Oban supervisor's dispatch cooldown value.",
            measurement: fn _measurements, %{conf: config} ->
              config.dispatch_cooldown
            end,
            tags: [:name],
            tag_values: &oban_init_tag_values/1,
            keep: keep_function_filter
          ),
          last_value(
            metric_prefix ++ [:init, :queue, :concurrency, :limit],
            event_name: @init_event_queue_limit_proxy,
            description: "The concurrency limits of each of the Oban queue.",
            measurement: :limit,
            tags: [:name, :queue],
            tag_values: &oban_init_queues_tag_values/1,
            keep: keep_function_filter
          )
        ]
      )
    end

    defp handle_oban_queue_polling_metrics(oban_supervisor, config) do
      query =
        Oban.Job
        |> group_by([j], [j.queue, j.state])
        |> select([j], {j.queue, j.state, count(j.id)})

      config
      |> Oban.Repo.all(query)
      |> Enum.each(fn {queue, state, count} ->
        measurements = %{count: count}
        metadata = %{name: normalize_module_name(oban_supervisor), queue: queue, state: state}

        :telemetry.execute([:prom_ex, :plugin, :oban, :queue, :length, :count], measurements, metadata)
      end)
    end

    defp get_oban_supervisors(opts) do
      opts
      |> Keyword.get(:oban_supervisors, [Oban])
      |> case do
        supervisors when is_list(supervisors) ->
          MapSet.new(supervisors)

        _ ->
          raise "Invalid :oban_supervisors option value."
      end
    end

    defp keep_oban_instance_metrics(oban_supervisors) do
      fn
        %{conf: %{name: name}} ->
          MapSet.member?(oban_supervisors, name)

        %{name: name} ->
          MapSet.member?(oban_supervisors, name)

        _ ->
          false
      end
    end

    defp oban_init_tag_values(%{conf: config}) do
      plugins_string_list =
        config.plugins
        |> Enum.map_join(", ", fn plugin ->
          normalize_module_name(plugin)
        end)

      queues_string_list =
        config.queues
        |> Enum.map_join(", ", fn {queue, _queue_opts} ->
          Atom.to_string(queue)
        end)

      %{
        name: normalize_module_name(config.name),
        node: config.node,
        plugins: plugins_string_list,
        prefix: config.prefix,
        queues: queues_string_list,
        repo: normalize_module_name(config.repo)
      }
    end

    defp oban_init_queues_tag_values(%{name: name, queue: queue}) do
      %{
        name: normalize_module_name(name),
        queue: queue
      }
    end

    defp set_up_init_proxy_event(prefix) do
      :telemetry.attach(
        [:prom_ex, :oban, :proxy] ++ prefix,
        @init_event,
        &__MODULE__.handle_proxy_init_event/4,
        %{}
      )
    end

    defp normalize_module_name(name) when is_atom(name) do
      name
      |> Atom.to_string()
      |> String.trim_leading("Elixir.")
    end

    defp normalize_module_name({name, _options}), do: normalize_module_name(name)

    defp normalize_module_name(name), do: name
  end
else
  defmodule PromEx.Plugins.Oban do
    @moduledoc false
    use PromEx.Plugin

    @impl true
    def event_metrics(_opts) do
      PromEx.Plugin.no_dep_raise(__MODULE__, "Oban")
    end
  end
end