lib/datadog/data_streams/aggregator.ex

defmodule Datadog.DataStreams.Aggregator do
  @moduledoc """
  A `GenServer` instance responsible for aggregating many points of data
  together into 10 second buckets, and then sending them to the Datadog
  agent. It holds many structs in its memory, looking something like this:

  ```mermaid
  graph TD
      aggregator[Datadog.DataStreams.Aggregator]
      aggregator --> bucket[Datadog.DataStreams.Aggregator.Bucket]
      bucket --> group[Datadog.DataStreams.Aggregator.Group]
  ```

  When adding data, the calling code will create a new
  `Datadog.DataStreams.Aggregator.Point` which contains all of the needed
  data. It will then call `#{__MODULE__}.add/1` to add that point of data to the
  aggregator, where the aggregator will find (or create) a bucket that matches
  the 10 second window for the point. It will then find (or create) a group in
  that bucket based on the point's `hash`. Once the group is found, the
  `pathway_latency` and `edge_latency` `Datadog.Sketch` will be updated with
  the new latency.

  Every 10 seconds the aggregator will convert all non active (outside the 10
  second window) to a `Datadog.DataStreams.Payload`, encode it, and send it to
  the Datadog agent. If there is an error sending the payload, the old payloads
  are still removed from memory, but the
  `datadog.datastreams.aggregator.flush_errors.count` telemetry metric is
  incremented.
  """

  use GenServer

  alias Datadog.DataStreams.{Aggregator, Config, Payload, Transport}

  require Logger

  @send_interval 10_000

  @doc """
  Starts a new `#{__MODULE__}` instance. This takes no options as it
  uses the global `Datadog.DataStreams.Config` module. It is also started
  by the `Datadog.DataStreams.Application` and should not need to be started
  manually.
  """
  @spec start_link(Keyword.t()) :: GenServer.on_start()
  def start_link(_opts) do
    opts = [enabled?: Config.agent_enabled?()]
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end

  @doc """
  Adds new metrics to the aggregator.

  Note, this function will still return `:ok` if the aggregator is disabled.

  ## Examples

      iex> :ok = Aggregator.add(%Aggregator.Point{})

  """
  @spec add(Aggregator.Point.t()) :: :ok
  def add(%Aggregator.Point{} = point) do
    :telemetry.execute([:datadog, :datastreams, :aggregator, :payloads_in], %{count: 1})
    GenServer.cast(__MODULE__, {:add, point})
  end

  @doc """
  Sends all stored data to the Datadog agent.

  ## Examples

      iex> :ok = Aggregator.flush()
  """
  @spec flush() :: :ok
  def flush() do
    Process.send(__MODULE__, :send, [])
  end

  @doc false
  def init([{:enabled?, false} | _rest]), do: :ignore

  def init(_opts) do
    Process.flag(:trap_exit, true)

    {:ok,
     %{
       send_timer: Process.send_after(self(), :send, @send_interval),
       ts_type_current_buckets: %{},
       ts_type_origin_buckets: %{}
     }}
  end

  @doc false
  def handle_cast({:add, %Aggregator.Point{} = point}, state) do
    new_ts_type_current_buckets =
      Aggregator.Bucket.upsert(state.ts_type_current_buckets, point.timestamp, fn bucket ->
        new_groups =
          Aggregator.Group.upsert(bucket.groups, point, fn group ->
            Aggregator.Group.add(group, point)
          end)

        %{bucket | groups: new_groups}
      end)

    origin_timestamp = point.timestamp - point.pathway_latency

    new_ts_type_origin_buckets =
      Aggregator.Bucket.upsert(state.ts_type_origin_buckets, origin_timestamp, fn bucket ->
        new_groups =
          Aggregator.Group.upsert(bucket.groups, point, fn group ->
            Aggregator.Group.add(group, point)
          end)

        %{bucket | groups: new_groups}
      end)

    {:noreply,
     %{
       state
       | ts_type_current_buckets: new_ts_type_current_buckets,
         ts_type_origin_buckets: new_ts_type_origin_buckets
     }}
  end

  @doc false
  def handle_info(:send, state) do
    Process.cancel_timer(state.send_timer)

    now = :erlang.system_time(:nanosecond)

    {active_ts_type_current_buckets, past_ts_type_current_buckets} =
      split_with(state.ts_type_current_buckets, fn {_k, v} ->
        Aggregator.Bucket.current?(v, now)
      end)

    {active_ts_type_origin_buckets, past_ts_type_origin_buckets} =
      split_with(state.ts_type_origin_buckets, fn {_k, v} ->
        Aggregator.Bucket.current?(v, now)
      end)

    payload =
      Payload.new()
      |> Payload.add_buckets(past_ts_type_current_buckets, :current)
      |> Payload.add_buckets(past_ts_type_origin_buckets, :origin)

    unless Payload.stats_count(payload) == 0 do
      Task.async(fn ->
        with {:ok, encoded_payload} <- Payload.encode(payload),
             :ok <- Transport.send_pipeline_stats(encoded_payload) do
          {:ok, Payload.stats_count(payload)}
        else
          {:error, reason} -> {:error, reason}
          something -> {:error, something}
        end
      end)
    end

    {:noreply,
     %{
       state
       | send_timer: Process.send_after(self(), :send, @send_interval),
         ts_type_current_buckets: active_ts_type_current_buckets,
         ts_type_origin_buckets: active_ts_type_origin_buckets
     }}
  end

  @doc false
  def handle_info({task_ref, {:ok, count}}, state) when is_reference(task_ref) do
    Logger.debug("Successfully sent metrics to Datadog")
    :telemetry.execute([:datadog, :datastreams, :aggregator, :flushed_payloads], %{count: 1})
    :telemetry.execute([:datadog, :datastreams, :aggregator, :flushed_buckets], %{count: count})
    {:noreply, state}
  end

  @doc false
  def handle_info({task_ref, {:error, error}}, state) when is_reference(task_ref) do
    Logger.error("Error sending metrics to Datadog", error: error)
    :telemetry.execute([:datadog, :datastreams, :aggregator, :flush_errors], %{count: 1})
    {:noreply, state}
  end

  @doc false
  def handle_info(_, state), do: {:noreply, state}

  @doc false
  def terminate(_reason, %{ts_type_current_buckets: %{}, ts_type_origin_buckets: %{}}) do
    Logger.debug("Stopping #{__MODULE__} with an empty state")
  end

  @doc false
  def terminate(_reason, state) do
    payload =
      Payload.new()
      |> Payload.add_buckets(state.ts_type_current_buckets, :current)
      |> Payload.add_buckets(state.ts_type_origin_buckets, :origin)

    with {:ok, encoded_payload} <- Payload.encode(payload),
         :ok <- Transport.send_pipeline_stats(encoded_payload) do
      Logger.debug("Successfully sent metrics to Datadog before termination")
      :telemetry.execute([:datadog, :datastreams, :aggregator, :flushed_payloads], %{count: 1})

      :telemetry.execute([:datadog, :datastreams, :aggregator, :flushed_buckets], %{
        count: Payload.stats_count(payload)
      })
    else
      error ->
        Logger.error("Error sending metrics to Datadog before termination", error: error)
        :telemetry.execute([:datadog, :datastreams, :aggregator, :flush_errors], %{count: 1})
    end
  rescue
    error ->
      Logger.error("Error attempting to sending metrics to Datadog before termination",
        error: error
      )

      :telemetry.execute([:datadog, :datastreams, :aggregator, :flush_errors], %{count: 1})
  end

  # Splits the `map` into two maps according to the given function `fun`.
  # This function was taken from Elixir 1.15 for backwards support with older
  # versions.
  defp split_with(map, fun) when is_map(map) and is_function(fun, 1) do
    iter = :maps.iterator(map)
    next = :maps.next(iter)

    do_split_with(next, [], [], fun)
  end

  defp do_split_with(:none, while_true, while_false, _fun) do
    {:maps.from_list(while_true), :maps.from_list(while_false)}
  end

  defp do_split_with({key, value, iter}, while_true, while_false, fun) do
    if fun.({key, value}) do
      do_split_with(:maps.next(iter), [{key, value} | while_true], while_false, fun)
    else
      do_split_with(:maps.next(iter), while_true, [{key, value} | while_false], fun)
    end
  end
end