defmodule OpentelemetryOban do
@moduledoc """
OpentelemetryOban uses [telemetry](https://hexdocs.pm/telemetry/) handlers to
create `OpenTelemetry` spans from Oban events.
Supported events include job start/stop and also when an exception is raised.
## Usage
In your application start:
def start(_type, _args) do
OpenTelemetry.register_application_tracer(:my_project)
OpentelemetryOban.setup()
# ...
end
"""
alias Ecto.Changeset
alias OpenTelemetry.Span
require OpenTelemetry.Tracer
@tracer_id :opentelemetry_oban
@doc """
Initializes and configures telemetry handlers.
"""
@spec setup() :: :ok
def setup() do
{:ok, otel_tracer_vsn} = :application.get_key(@tracer_id, :vsn)
OpenTelemetry.register_tracer(@tracer_id, otel_tracer_vsn)
OpentelemetryOban.JobHandler.attach()
:ok
end
def insert(name \\ Oban, %Changeset{} = changeset) do
attributes = attributes_before_insert(changeset)
worker = Changeset.get_field(changeset, :worker, "unknown")
OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
changeset = add_tracing_information_to_meta(changeset)
case Oban.insert(name, changeset) do
{:ok, job} ->
OpenTelemetry.Tracer.set_attributes(attributes_after_insert(job))
{:ok, job}
other ->
other
end
end
end
def insert(name \\ Oban, multi, multi_name, changeset_or_fun) do
Oban.insert(name, multi, multi_name, changeset_or_fun)
end
def insert!(name \\ Oban, %Changeset{} = changeset) do
attributes = attributes_before_insert(changeset)
worker = Changeset.get_field(changeset, :worker, "unknown")
OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
changeset = add_tracing_information_to_meta(changeset)
try do
job = Oban.insert!(name, changeset)
OpenTelemetry.Tracer.set_attributes(attributes_after_insert(job))
job
rescue
exception ->
ctx = OpenTelemetry.Tracer.current_span_ctx()
Span.record_exception(ctx, exception, __STACKTRACE__)
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
reraise exception, __STACKTRACE__
end
end
end
def insert_all(name \\ Oban, changesets_or_wrapper) do
Oban.insert_all(name, changesets_or_wrapper)
end
def insert_all(name \\ __MODULE__, multi, multi_name, changesets_or_wrapper) do
Oban.insert_all(name, multi, multi_name, changesets_or_wrapper)
end
defp add_tracing_information_to_meta(changeset) do
meta = Changeset.get_field(changeset, :meta, %{})
new_meta =
[]
|> :otel_propagator.text_map_inject()
|> Enum.into(meta)
Changeset.change(changeset, %{meta: new_meta})
end
defp attributes_before_insert(changeset) do
queue = Changeset.get_field(changeset, :queue, "unknown")
worker = Changeset.get_field(changeset, :worker, "unknown")
[
"messaging.system": "oban",
"messaging.destination": queue,
"messaging.destination_kind": "queue",
"messaging.oban.worker": worker
]
end
defp attributes_after_insert(job) do
[
"messaging.oban.job_id": job.id,
"messaging.oban.priority": job.priority,
"messaging.oban.max_attempts": job.max_attempts
]
end
end