lib/spandex_datadog/api_server.ex

defmodule SpandexDatadog.ApiServer do
  @moduledoc """
  Implements worker for sending spans to datadog as GenServer in order to send traces async.
  """

  use GenServer
  require Logger

  alias Spandex.{
    Span,
    Trace
  }

  defmodule State do
    @moduledoc false

    @type t :: %State{}

    defstruct [
      :asynchronous_send?,
      :http,
      :url,
      :host,
      :port,
      :verbose?,
      :waiting_traces,
      :batch_size,
      :sync_threshold,
      :agent_pid,
      :container_id
    ]
  end

  # Same as HTTPoison.headers
  @type headers :: [{atom, binary}] | [{binary, binary}] | %{binary => binary} | any

  @headers [{"Content-Type", "application/msgpack"}]

  @default_opts [
    host: "localhost",
    http: HTTPoison,
    port: 8126,
    verbose?: false,
    batch_size: 10,
    sync_threshold: 20,
    api_adapter: SpandexDatadog.ApiServer
  ]

  @doc """
  Starts the ApiServer with given options.

  ## Options

  * `:http` - The HTTP module to use for sending spans to the agent. Defaults to `HTTPoison`.
  * `:host` - The host the agent can be reached at. Defaults to `"localhost"`.
  * `:port` - The port to use when sending traces to the agent. Defaults to `8126`.
  * `:verbose?` - Only to be used for debugging: All finished traces will be logged. Defaults to `false`
  * `:batch_size` - The number of traces that should be sent in a single batch. Defaults to `10`.
  * `:sync_threshold` - The maximum number of processes that may be sending traces at any one time. This adds backpressure. Defaults to `20`.
  """
  @spec start_link(opts :: Keyword.t()) :: GenServer.on_start()
  def start_link(opts \\ []) do
    opts = Keyword.merge(@default_opts, opts)

    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @doc false
  @spec init(opts :: Keyword.t()) :: {:ok, State.t()}
  def init(opts) do
    {:ok, agent_pid} = Agent.start_link(fn -> 0 end)

    state = %State{
      asynchronous_send?: true,
      host: opts[:host],
      port: opts[:port],
      verbose?: opts[:verbose?],
      http: opts[:http],
      waiting_traces: [],
      batch_size: opts[:batch_size],
      sync_threshold: opts[:sync_threshold],
      agent_pid: agent_pid,
      container_id: get_container_id()
    }

    {:ok, state}
  end

  @cgroup_uuid "[0-9a-f]{8}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{4}[-_][0-9a-f]{12}"
  @cgroup_ctnr "[0-9a-f]{64}"
  @cgroup_task "[0-9a-f]{32}-\\d+"
  @cgroup_regex Regex.compile!(".*(#{@cgroup_uuid}|#{@cgroup_ctnr}|#{@cgroup_task})(?:\\.scope)?$", "m")

  defp get_container_id() do
    with {:ok, file_binary} <- File.read("/proc/self/cgroup"),
         [_, container_id] <- Regex.run(@cgroup_regex, file_binary) do
      container_id
    else
      _ -> nil
    end
  end

  @doc """
  Send spans asynchronously to DataDog.
  """
  @spec send_trace(Trace.t(), Keyword.t()) :: :ok
  def send_trace(%Trace{} = trace, opts \\ []) do
    :telemetry.span([:spandex_datadog, :send_trace], %{trace: trace}, fn ->
      timeout = Keyword.get(opts, :timeout, 30_000)
      result = GenServer.call(__MODULE__, {:send_trace, trace}, timeout)
      {result, %{trace: trace}}
    end)
  end

  @deprecated "Please use send_trace/2 instead"
  @doc false
  @spec send_spans([Span.t()], Keyword.t()) :: :ok
  def send_spans(spans, opts \\ []) when is_list(spans) do
    timeout = Keyword.get(opts, :timeout, 30_000)
    trace = %Trace{spans: spans}
    GenServer.call(__MODULE__, {:send_trace, trace}, timeout)
  end

  @doc false
  def handle_call({:send_trace, trace}, _from, state) do
    state =
      state
      |> enqueue_trace(trace)
      |> maybe_flush_traces()

    {:reply, :ok, state}
  end

  @spec send_and_log([Trace.t()], State.t()) :: :ok
  def send_and_log(traces, %{container_id: container_id, verbose?: verbose?} = state) do
    headers = @headers ++ [{"X-Datadog-Trace-Count", length(traces)}]
    headers = headers ++ List.wrap(if container_id, do: {"Datadog-Container-ID", container_id})

    response =
      traces
      |> Enum.map(&format/1)
      |> encode()
      |> push(headers, state)

    if verbose? do
      Logger.debug(fn -> "Trace response: #{inspect(response)}" end)
    end

    :ok
  end

  @deprecated "Please use format/3 instead"
  @doc false
  @spec format(Trace.t()) :: map()
  def format(%Trace{spans: spans, priority: priority, baggage: baggage}) do
    Enum.map(spans, fn span -> format(span, priority, baggage) end)
  end

  @deprecated "Please use format/3 instead"
  @doc false
  @spec format(Span.t()) :: map()
  def format(%Span{} = span), do: format(span, 1, [])

  @spec format(Span.t(), integer(), Keyword.t()) :: map()
  def format(%Span{} = span, priority, _baggage) do
    %{
      trace_id: span.trace_id,
      span_id: span.id,
      name: span.name,
      start: span.start,
      duration: (span.completion_time || SpandexDatadog.Adapter.now()) - span.start,
      parent_id: span.parent_id,
      error: error(span.error),
      resource: span.resource || span.name,
      service: span.service,
      type: span.type,
      meta: meta(span),
      metrics:
        metrics(span, %{
          _sampling_priority_v1: priority,
          "_dd.rule_psr": 1.0,
          "_dd.limit_psr": 1.0
        })
    }
  end

  # Private Helpers

  defp enqueue_trace(state, trace) do
    if state.verbose? do
      Logger.info(fn -> "Adding trace to stack with #{Enum.count(trace.spans)} spans" end)
    end

    %State{state | waiting_traces: [trace | state.waiting_traces]}
  end

  defp maybe_flush_traces(%{waiting_traces: traces, batch_size: size} = state) when length(traces) < size do
    state
  end

  defp maybe_flush_traces(state) do
    %{
      asynchronous_send?: async?,
      verbose?: verbose?,
      waiting_traces: traces
    } = state

    if verbose? do
      span_count = Enum.reduce(traces, 0, fn trace, acc -> acc + length(trace.spans) end)
      Logger.info(fn -> "Sending #{length(traces)} traces, #{span_count} spans." end)
      Logger.debug(fn -> "Trace: #{inspect(traces)}" end)
    end

    if async? do
      if below_sync_threshold?(state) do
        Task.start(fn ->
          try do
            send_and_log(traces, state)
          after
            Agent.update(state.agent_pid, fn count -> count - 1 end)
          end
        end)
      else
        # We get benefits from running in a separate process (like better GC)
        # So we async/await here to mimic the behaviour above but still apply backpressure
        task = Task.async(fn -> send_and_log(traces, state) end)
        Task.await(task)
      end
    else
      send_and_log(traces, state)
    end

    %State{state | waiting_traces: []}
  end

  defp below_sync_threshold?(state) do
    Agent.get_and_update(state.agent_pid, fn count ->
      if count < state.sync_threshold do
        {true, count + 1}
      else
        {false, count}
      end
    end)
  end

  @spec meta(Span.t()) :: map
  defp meta(span) do
    %{}
    |> add_datadog_meta(span)
    |> add_error_data(span)
    |> add_http_data(span)
    |> add_sql_data(span)
    |> add_tags(span)
    |> Enum.reject(fn {_k, v} -> is_nil(v) end)
    |> Enum.into(%{})
  end

  @spec add_datadog_meta(map, Span.t()) :: map
  defp add_datadog_meta(meta, %Span{env: nil}), do: meta

  defp add_datadog_meta(meta, %Span{env: env}) do
    Map.put(meta, :env, env)
  end

  @spec add_error_data(map, Span.t()) :: map
  defp add_error_data(meta, %{error: nil}), do: meta

  defp add_error_data(meta, %{error: error}) do
    meta
    |> add_error_type(error[:exception])
    |> add_error_message(error[:exception])
    |> add_error_stacktrace(error[:stacktrace])
  end

  @spec add_error_type(map, Exception.t() | nil) :: map
  defp add_error_type(meta, %struct{}), do: Map.put(meta, "error.type", struct)
  defp add_error_type(meta, _), do: meta

  @spec add_error_message(map, Exception.t() | nil) :: map
  defp add_error_message(meta, nil), do: meta

  defp add_error_message(meta, exception),
    do: Map.put(meta, "error.msg", Exception.message(exception))

  @spec add_error_stacktrace(map, list | nil) :: map
  defp add_error_stacktrace(meta, nil), do: meta

  defp add_error_stacktrace(meta, stacktrace),
    do: Map.put(meta, "error.stack", Exception.format_stacktrace(stacktrace))

  @spec add_http_data(map, Span.t()) :: map
  defp add_http_data(meta, %{http: nil}), do: meta

  defp add_http_data(meta, %{http: http}) do
    status_code =
      if http[:status_code] do
        to_string(http[:status_code])
      end

    meta
    |> Map.put("http.url", http[:url])
    |> Map.put("http.status_code", status_code)
    |> Map.put("http.method", http[:method])
  end

  @spec add_sql_data(map, Span.t()) :: map
  defp add_sql_data(meta, %{sql_query: nil}), do: meta

  defp add_sql_data(meta, %{sql_query: sql}) do
    meta
    |> Map.put("sql.query", sql[:query])
    |> Map.put("sql.rows", sql[:rows])
    |> Map.put("sql.db", sql[:db])
  end

  @spec add_tags(map, Span.t()) :: map
  defp add_tags(meta, %{tags: nil}), do: meta

  defp add_tags(meta, %{tags: tags}) do
    tags = tags |> Keyword.delete(:analytics_event)

    Map.merge(
      meta,
      tags
      |> Enum.map(fn {k, v} -> {k, term_to_string(v)} end)
      |> Enum.into(%{})
    )
  end

  @spec metrics(Span.t(), map) :: map
  defp metrics(span, initial_value = %{}) do
    initial_value
    |> add_metrics(span)
    |> Enum.reject(fn {_k, v} -> is_nil(v) end)
    |> Enum.into(%{})
  end

  @spec add_metrics(map, Span.t()) :: map
  defp add_metrics(metrics, %{tags: nil}), do: metrics

  defp add_metrics(metrics, %{tags: tags}) do
    with analytics_event <- tags |> Keyword.get(:analytics_event),
         true <- analytics_event != nil do
      Map.merge(
        metrics,
        %{"_dd1.sr.eausr" => 1}
      )
    else
      _ ->
        metrics
    end
  end

  @spec error(nil | Keyword.t()) :: integer
  defp error(nil), do: 0

  defp error(keyword) do
    if Enum.any?(keyword, fn {_, v} -> not is_nil(v) end) do
      1
    else
      0
    end
  end

  @spec encode(data :: term) :: iodata | no_return
  defp encode(data),
    do: data |> deep_remove_nils() |> Msgpax.pack!(data)

  @spec push(body :: iodata(), headers, State.t()) :: any()
  defp push(body, headers, %State{http: http, host: host, port: port}),
    do: http.put("#{host}:#{port}/v0.3/traces", body, headers)

  @spec deep_remove_nils(term) :: term
  defp deep_remove_nils(term) when is_map(term) do
    term
    |> Enum.reject(fn {_k, v} -> is_nil(v) end)
    |> Enum.map(fn {k, v} -> {k, deep_remove_nils(v)} end)
    |> Enum.into(%{})
  end

  defp deep_remove_nils(term) when is_list(term) do
    if Keyword.keyword?(term) do
      term
      |> Enum.reject(fn {_k, v} -> is_nil(v) end)
      |> Enum.map(fn {k, v} -> {k, deep_remove_nils(v)} end)
    else
      Enum.map(term, &deep_remove_nils/1)
    end
  end

  defp deep_remove_nils(term), do: term

  defp term_to_string(term) when is_binary(term), do: term
  defp term_to_string(term) when is_atom(term), do: term
  defp term_to_string(term), do: inspect(term)
end