defmodule OpentelemetryOban do
@moduledoc """
OpentelemetryOban uses [telemetry](https://hexdocs.pm/telemetry/) handlers to
create `OpenTelemetry` spans from Oban events.
Supported events include job start/stop and also when an exception is raised.
## Usage
In your application start:
def start(_type, _args) do
OpentelemetryOban.setup()
# ...
end
"""
alias Ecto.Changeset
alias OpenTelemetry.Span
require OpenTelemetry.Tracer
@tracer_id :opentelemetry_oban
@doc """
Initializes and configures telemetry handlers.
## Sampling
By default only jobs are sampled. If you wish to sample plugins as well then use:
OpentelemetryOban.setup(trace: [:jobs, :plugins])
It is also possible to provide your own sampler:
OpentelemetryOban.setup(
trace: [
jobs: :otel_sampler.new(:always_on),
plugins: :otel_sampler.new(:always_on)
]
)
"""
@spec setup() :: :ok
def setup(opts \\ []) do
{:ok, otel_tracer_vsn} = :application.get_key(@tracer_id, :vsn)
OpenTelemetry.register_tracer(@tracer_id, otel_tracer_vsn)
always_on = :otel_sampler.new(:always_on)
always_off = :otel_sampler.new(:always_off)
OpentelemetryOban.JobHandler.attach(handler_opts(:jobs, opts, always_on))
OpentelemetryOban.PluginHandler.attach(handler_opts(:plugins, opts, always_off))
:ok
end
def insert(name \\ Oban, %Changeset{} = changeset) do
attributes = attributes_before_insert(changeset)
worker = Changeset.get_field(changeset, :worker, "unknown")
OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
changeset = add_tracing_information_to_meta(changeset)
case Oban.insert(name, changeset) do
{:ok, job} ->
OpenTelemetry.Tracer.set_attributes(attributes_after_insert(job))
{:ok, job}
other ->
other
end
end
end
def insert(name \\ Oban, multi, multi_name, changeset_or_fun) do
Oban.insert(name, multi, multi_name, changeset_or_fun)
end
def insert!(name \\ Oban, %Changeset{} = changeset) do
attributes = attributes_before_insert(changeset)
worker = Changeset.get_field(changeset, :worker, "unknown")
OpenTelemetry.Tracer.with_span "#{worker} send", attributes: attributes, kind: :producer do
changeset = add_tracing_information_to_meta(changeset)
try do
job = Oban.insert!(name, changeset)
OpenTelemetry.Tracer.set_attributes(attributes_after_insert(job))
job
rescue
exception ->
ctx = OpenTelemetry.Tracer.current_span_ctx()
Span.record_exception(ctx, exception, __STACKTRACE__)
Span.set_status(ctx, OpenTelemetry.status(:error, ""))
reraise exception, __STACKTRACE__
end
end
end
def insert_all(name \\ Oban, changesets_or_wrapper)
def insert_all(name, %{changesets: changesets}) when is_list(changesets) do
insert_all(name, changesets)
end
def insert_all(name, changesets) when is_list(changesets) do
# changesets in insert_all can include different workers and different
# queues. This means we cannot provide much information here, but we can
# still record the insert and propagate the context information.
OpenTelemetry.Tracer.with_span "Oban bulk insert", kind: :producer do
changesets = Enum.map(changesets, &add_tracing_information_to_meta/1)
Oban.insert_all(name, changesets)
end
end
def insert_all(name \\ __MODULE__, multi, multi_name, changesets_or_wrapper) do
Oban.insert_all(name, multi, multi_name, changesets_or_wrapper)
end
defp add_tracing_information_to_meta(changeset) do
meta = Changeset.get_field(changeset, :meta, %{})
new_meta =
[]
|> :otel_propagator_text_map.inject()
|> Enum.into(meta)
Changeset.change(changeset, %{meta: new_meta})
end
defp attributes_before_insert(changeset) do
queue = Changeset.get_field(changeset, :queue, "unknown")
worker = Changeset.get_field(changeset, :worker, "unknown")
[
"messaging.system": "oban",
"messaging.destination": queue,
"messaging.destination_kind": "queue",
"messaging.oban.worker": worker
]
end
defp attributes_after_insert(job) do
[
"messaging.oban.job_id": job.id,
"messaging.oban.priority": job.priority,
"messaging.oban.max_attempts": job.max_attempts
]
end
defp handler_opts(name, opts, default_sampler) do
trace = Keyword.get(opts, :trace, [])
sampler =
if Enum.member?(trace, name) do
# If just the handler name is specified then use always on sampler
:otel_sampler.new(:always_on)
else
# Use provided sampler or default to the default sampler
Keyword.get(trace, name, default_sampler)
end
[sampler: sampler]
end
end