core/sup_tree_common/metrics_uploader.ex

# Copyright(c) 2015-2023 ACCESS CO., LTD. All rights reserved.

use Croma

defmodule AntikytheraCore.MetricsUploader do
  @moduledoc """
  A `GenServer` which buffers metrics data and periodically flushes the aggregated results to stable storage.
  """

  use GenServer
  alias Antikythera.{Time, Context}
  alias Antikythera.ExecutorPool.Id, as: EPoolId
  alias Antikythera.Metrics.DataList
  alias AntikytheraCore.Metrics.{Buffer, Results, AggregateStrategy}
  alias AntikytheraEal.MetricsStorage, as: Storage
  require AntikytheraCore.Logger, as: L

  @flush_interval_base 60_000
  @flush_interval_rand_max 5_000

  def start_link([otp_app_name, name_to_register]) do
    GenServer.start_link(__MODULE__, otp_app_name, name: name_to_register)
  end

  @impl true
  def init(otp_app_name) do
    arrange_next_data_flushing()
    {:ok, %{otp_app_name: otp_app_name, buffer: Buffer.new(), results_to_resend: Results.new()}}
  end

  @impl true
  def handle_cast({now, metrics_data_list, epool_id}, %{buffer: buffer} = state) do
    new_buffer = Buffer.add(buffer, now, metrics_data_list, epool_id)
    {:noreply, %{state | buffer: new_buffer}}
  end

  @impl true
  def handle_info(:flush_data, state) do
    arrange_next_data_flushing()
    {:noreply, flush_data(state, Time.now())}
  end

  def handle_info(msg, state) do
    # On rare occasions MetricsUploader receives a message by :ssl module due to timeout in hackney, and it shouldn't trigger alert.
    L.info("received an unexpected message: #{inspect(msg)}")
    {:noreply, state}
  end

  defp arrange_next_data_flushing() do
    # Add slight randomness to prevent many MetricsUploader processes from sending metrics at the same time
    flush_interval = @flush_interval_base + :rand.uniform(@flush_interval_rand_max)
    Process.send_after(self(), :flush_data, flush_interval)
  end

  @impl true
  def terminate(_reason, state) do
    # use future time to flush all existing metrics data
    one_minute_after = Time.shift_minutes(Time.now(), 1)
    flush_data(state, one_minute_after)
  end

  defp flush_data(
         %{otp_app_name: otp_app_name, buffer: buffer, results_to_resend: results_to_resend} =
           state,
         now
       ) do
    {new_buffer, past_data} = Buffer.partition_ongoing_and_past(buffer, now)
    past_results = Results.compute_results(past_data)
    results_to_send = Results.merge(results_to_resend, past_results)
    new_results_to_resend = Storage.save(otp_app_name, results_to_send)

    if !Enum.empty?(new_results_to_resend) do
      L.info("app=#{otp_app_name} incomplete upload, will retry afterward")
    end

    %{state | buffer: new_buffer, results_to_resend: new_results_to_resend}
  end

  #
  # Public API
  #
  defun submit_with_time(
          worker_name :: GenServer.server(),
          now :: v[Time.t()],
          data_list :: v[DataList.t()],
          epool_id :: Buffer.epool_id()
        ) :: :ok do
    data_list2 =
      Enum.map(data_list, fn {n, s, v} -> {n, AggregateStrategy.name_to_module(s), v} end)

    GenServer.cast(worker_name, {now, data_list2, epool_id})
  end

  defun submit(
          worker_name :: GenServer.server(),
          data_list :: v[DataList.t()],
          epool_id :: Buffer.epool_id()
        ) :: :ok do
    submit_with_time(worker_name, Time.now(), data_list, epool_id)
  end

  defun submit_custom_metrics(
          worker_name :: GenServer.server(),
          data_list0 :: v[DataList.t()],
          context :: v[nil | Context.t()]
        ) :: :ok do
    data_list1 = add_prefix_to_labels(data_list0)

    case context do
      %Context{executor_pool_id: epool_id} when is_tuple(epool_id) ->
        submit(worker_name, data_list1, epool_id)

      _no_context_or_no_epool ->
        submit(worker_name, data_list1, EPoolId.nopool())
    end
  end

  defunp add_prefix_to_labels(data_list :: v[DataList.t()]) :: DataList.t() do
    Enum.map(data_list, fn {label, strategy, value} ->
      {"custom_#{label}", strategy, value}
    end)
  end
end