lib/datadog/data_streams/integrations/kafka.ex

defmodule Datadog.DataStreams.Integrations.Kafka do
  @moduledoc """
  Functions for integrating Kafka tracing with DataStreams.

  ## Usage

  Because Elixir does not include a `context` grab bag to pass
  around, we use the OpenTelemetry context to store the current
  DataStreams pathway. If you are not using OpenTelemetry, or
  have a special fan in or fan out situation, you can use the
  respective functions that take a pathway as an argument.

  If you have a basic one message in one message out situation,
  _and_ you have OpenTelemetry already covering your application,
  you can use the `trace_produce/1` and `trace_consume/2` functions.

      require OpenTelemetry.Tracer, as: Tracer

      alias Datadog.DataStreams.Integrations.Kafka, as: DataStreamsKafka

      @doc "Handles a message from Kafka. Receives a message map with partition, topic, and headers."
      @spec handle_message(map()) :: :ok
      def handle_message(message) do
        # NOTE: This does not add the recommended Kafka span attributes.
        Tracer.with_span "\#{message.topic} process" do
          DataStreamsKafka.trace_consume(message, "my_consumer_group")

          # Do work

          Tracer.with_span "\#{new_message.topic} produce" do
            new_message
            |> DataStreamsKafka.trace_produce()
            |> send_to_kafka()
          end
        end
      end

  """

  require OpenTelemetry.Tracer, as: Tracer

  alias Datadog.DataStreams.{Context, Pathway, Propagator, Tags}

  @otel_attribute "pathway.hash"

  @typedoc """
  A general map that contains the topic, partition, and headers atoms. This
  matches the format of `Elsa.elsa_message` by default
  (and will work out of the box), though will need the topic and partition
  added if you are using standard `:brod` (or `kpro`).
  """
  @type message :: map()

  @doc """
  Traces a Kafka message being produced. Uses the pathway in the
  current `Datadog.DataStreams.Context`. Returns a new message with
  the pathway encoded in the header values.
  """
  @spec trace_produce(msg) :: msg when msg: message()
  def trace_produce(message) do
    with {new_message, _pathway} <- trace_produce_with_pathway(Context.get(), message) do
      new_message
    end
  end

  @doc """
  Traces a Kafka message being produced. Returns the new message with the
  pathway encoded in the header values, as well as the new pathway.
  """
  @spec trace_produce_with_pathway(Pathway.t() | nil, msg) :: {msg, Pathway.t()}
        when msg: message()
  def trace_produce_with_pathway(pathway, message) do
    Tracer.with_span "dsm.trace_kafka_produce" do
      edge_tags = produce_edge_tags(message)
      new_pathway = Pathway.set_checkpoint(pathway, edge_tags)

      Tracer.set_attribute(@otel_attribute, to_string(new_pathway.hash))

      new_headers = Propagator.encode_header(message.headers, new_pathway)
      {%{message | headers: new_headers}, new_pathway}
    end
  end

  @spec produce_edge_tags(message()) :: Tags.input()
  defp produce_edge_tags(message) do
    message
    |> Map.take([:topic, :partition])
    |> Map.merge(%{type: "kafka", direction: "out"})
  end

  @doc """
  Traces a Kafka message being consumed. Requires the current Kafka
  consumer group. Uses the pathway in the current
  `Datadog.DataStreams.Context`.
  """
  @spec trace_consume(message(), String.t()) :: Pathway.t()
  def trace_consume(message, consumer_group) do
    trace_consume_with_pathway(Context.get(), message, consumer_group)
  end

  @doc """
  Traces a Kafka message being consumed. Requires the current Kafka
  consumer group.

  Do not pass the resulting pathway from this function to another call
  of `trace_consume_with_pathway`, as it will modify the pathway incorrectly.
  """
  @spec trace_consume_with_pathway(Pathway.t() | nil, message(), String.t()) :: Pathway.t()
  def trace_consume_with_pathway(pathway, message, consumer_group) do
    Tracer.with_span "dsm.trace_kafka_consume" do
      edge_tags = consume_edge_tags(message, consumer_group)

      new_pathway =
        case Propagator.decode_header(message.headers) do
          nil ->
            Pathway.set_checkpoint(pathway, edge_tags)

          decoded_pathway ->
            Pathway.set_checkpoint(decoded_pathway, edge_tags)
        end

      Tracer.set_attribute(@otel_attribute, to_string(new_pathway.hash))
      new_pathway
    end
  end

  @spec consume_edge_tags(message(), String.t()) :: Tags.input()
  defp consume_edge_tags(message, consumer_group) do
    message
    |> Map.take([:topic, :partition])
    |> Map.merge(%{type: "kafka", direction: "in", group: consumer_group})
  end
end