lib/commanded/event_store.ex

defmodule OpentelemetryCommanded.EventStore do
  @moduledoc false

  require OpenTelemetry.Tracer

  import OpentelemetryCommanded.Util

  alias OpenTelemetry.Span

  @tracer_id __MODULE__

  def setup do
    ~w(
      ack_event
      adapter
      append_to_stream
      delete_snapshot
      delete_subscription
      read_snapshot
      record_snapshot
      stream_forward
      stream_forward
      stream_forward
      subscribe
      subscribe_to
      subscribe_to
      unsubscribe
    )a
    |> Enum.each(fn event ->
      :telemetry.attach(
        {__MODULE__, :start},
        [:commanded, :event_store, event, :start],
        &__MODULE__.handle_start/4,
        []
      )

      :telemetry.attach(
        {__MODULE__, :stop},
        [:commanded, :event_store, event, :stop],
        &__MODULE__.handle_stop/4,
        []
      )

      :telemetry.attach(
        {__MODULE__, :exception},
        [:commanded, :event_store, event, :exception],
        &__MODULE__.handle_exception/4,
        []
      )
    end)
  end

  def handle_start([_, _, action, _type], _measurements, meta, _) do
    event = meta.event

    safe_context_propagation(event.metadata["trace_ctx"])

    attributes = [
      "commanded.application": meta.application,
      "commanded.causation_id": event.causation_id,
      "commanded.correlation_id": event.correlation_id,
      "commanded.event": event.event_type,
      "commanded.event_id": event.event_id,
      "commanded.event_number": event.event_number,
      "commanded.stream_id": event.stream_id,
      "commanded.stream_version": event.stream_version
    ]

    OpentelemetryTelemetry.start_telemetry_span(
      @tracer_id,
      "commanded.event_store.#{action}",
      meta,
      %{
        kind: :internal,
        attributes: attributes
      }
    )
  end

  def handle_stop(_event, _measurements, meta, _) do
    ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta)

    if error = meta[:error] do
      Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error)))
    end

    OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta)
  end

  def handle_exception(
        _event,
        _measurements,
        %{kind: kind, reason: reason, stacktrace: stacktrace} = meta,
        _config
      ) do
    ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta)

    # try to normalize all errors to Elixir exceptions
    exception = Exception.normalize(kind, reason, stacktrace)

    # record exception and mark the span as errored
    Span.record_exception(ctx, exception, stacktrace)
    Span.set_status(ctx, OpenTelemetry.status(:error, inspect(reason)))

    OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta)
  end
end