lib/opentelemetry_oban/job_handler.ex

defmodule OpentelemetryOban.JobHandler do
  alias OpenTelemetry.Span

  @tracer_id :opentelemetry_oban

  def attach(opts \\ []) do
    attach_job_start_handler(opts)
    attach_job_stop_handler(opts)
    attach_job_exception_handler(opts)
  end

  defp attach_job_start_handler(opts) do
    :telemetry.attach(
      "#{__MODULE__}.job_start",
      [:oban, :job, :start],
      &__MODULE__.handle_job_start/4,
      opts
    )
  end

  defp attach_job_stop_handler(opts) do
    :telemetry.attach(
      "#{__MODULE__}.job_stop",
      [:oban, :job, :stop],
      &__MODULE__.handle_job_stop/4,
      opts
    )
  end

  defp attach_job_exception_handler(opts) do
    :telemetry.attach(
      "#{__MODULE__}.job_exception",
      [:oban, :job, :exception],
      &__MODULE__.handle_job_exception/4,
      opts
    )
  end

  def handle_job_start(_event, _measurements, metadata, sampler: sampler) do
    %{
      job: %{
        id: id,
        queue: queue,
        worker: worker,
        priority: priority,
        inserted_at: inserted_at,
        scheduled_at: scheduled_at,
        attempt: attempt,
        max_attempts: max_attempts,
        meta: job_meta
      }
    } = metadata

    :otel_propagator_text_map.extract(Map.to_list(job_meta))
    parent = OpenTelemetry.Tracer.current_span_ctx()
    links = if parent == :undefined, do: [], else: [OpenTelemetry.link(parent)]
    OpenTelemetry.Tracer.set_current_span(:undefined)

    attributes = [
      "messaging.system": "oban",
      "messaging.destination": queue,
      "messaging.destination_kind": "queue",
      "messaging.operation": "process",
      "messaging.oban.job_id": id,
      "messaging.oban.worker": worker,
      "messaging.oban.priority": priority,
      "messaging.oban.attempt": attempt,
      "messaging.oban.max_attempts": max_attempts,
      "messaging.oban.inserted_at": DateTime.to_iso8601(inserted_at),
      "messaging.oban.scheduled_at": DateTime.to_iso8601(scheduled_at)
    ]

    span_name = "#{worker} process"

    OpentelemetryTelemetry.start_telemetry_span(@tracer_id, span_name, metadata, %{
      kind: :consumer,
      links: links,
      sampler: sampler
    })
    |> Span.set_attributes(attributes)
  end

  def handle_job_stop(_event, _measurements, metadata, _config) do
    OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
  end

  def handle_job_exception(
        _event,
        _measurements,
        %{stacktrace: stacktrace, error: error} = metadata,
        _config
      ) do
    ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, metadata)

    # Record exception and mark the span as errored
    Span.record_exception(ctx, error, stacktrace)
    Span.set_status(ctx, OpenTelemetry.status(:error, ""))

    OpentelemetryTelemetry.end_telemetry_span(@tracer_id, metadata)
  end
end