lib/telemetry_metrics/reporter.ex

defmodule Membrane.TelemetryMetrics.Reporter do
  @moduledoc """
  Attaches handlers to :telemetry events based on the received list of metrics definitions.
  The attached handlers store metrics values in ETS tables.
  These values can be gotten by calling `scrape/2` function or also reset by calling `scrape_and_cleanup/2`.

  Currently supported types of metrics are:
   - `Telemetry.Metrics.Counter`
   - `Telemetry.Metrics.Sum`
   - `Telemetry.Metrics.LastValue`

  Currently supported fields of metrics definitions are: `:name`, `:event_name`, `measurement`.
  Fields `:keep`, `:reporter_options`, `tag_values`, `tags`, `:unit` and functionalities related to them are not supported yet.
  Metrics values are grouped by `label`.
  """

  use GenServer

  @type reporter() :: pid() | atom()
  @type report() :: map()

  @spec start_link([Telemetry.Metrics.t()], GenServer.options()) :: GenServer.on_start()
  def start_link(init_arg, options \\ []) do
    options = Keyword.put(options, :trap_exit, true)
    GenServer.start_link(__MODULE__, init_arg, options)
  end

  @spec scrape(reporter(), non_neg_integer()) :: report()
  def scrape(reporter, timeout \\ 5000) do
    GenServer.call(reporter, :scrape, timeout)
  end

  @spec scrape_and_cleanup(reporter(), non_neg_integer()) :: report()
  def scrape_and_cleanup(reporter, timeout \\ 5000) do
    GenServer.call(reporter, :scrape_and_cleanup, timeout)
  end

  @spec stop(reporter()) :: :ok
  def stop(reporter) do
    GenServer.stop(reporter)
  end

  @spec child_spec(Keyword.t()) :: Supervisor.child_spec()
  def child_spec(arg) do
    {metrics, process_opts} = Keyword.pop(arg, :metrics, [])

    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, [metrics, process_opts]}
    }
  end

  @impl true
  def init(metrics) do
    metrics_data =
      metrics
      |> Enum.map(fn metric ->
        ets_table = create_ets_table()
        handler_ids = attach_handlers(metric, ets_table)

        %{
          metric: metric,
          name: Enum.join(metric.name, ".") |> String.to_atom(),
          ets_table: ets_table,
          handler_ids: handler_ids
        }
      end)

    {:ok, %{metrics_data: metrics_data}}
  end

  @impl true
  def handle_call(:scrape, _from, state) do
    report =
      Enum.map(state.metrics_data, fn metric_data ->
        {metric_data.name, get_metric_report(metric_data.ets_table)}
      end)
      |> Enum.map(&move_metric_down_in_report/1)
      |> merge_metrics_reports()

    {:reply, report, state}
  end

  @impl true
  def handle_call(:scrape_and_cleanup, _from, state) do
    report =
      Enum.map(state.metrics_data, fn metric_data ->
        {metric_data.name, get_metric_report_and_do_clanup(metric_data.ets_table)}
      end)
      |> Enum.map(&move_metric_down_in_report/1)
      |> merge_metrics_reports()

    {:reply, report, state}
  end

  @impl true
  def terminate(_reason, state) do
    Enum.each(state.metrics_data, fn metric_data ->
      %{
        handler_ids: handler_ids,
        ets_table: ets_table
      } = metric_data

      Enum.each(handler_ids, &:telemetry.detach/1)
      :ets.delete(ets_table)
    end)
  end

  defp create_ets_table(),
    do: :ets.new(:metric_table, [:public, :set, {:write_concurrency, true}])

  defp attach_handlers(metric, ets_table) do
    case metric do
      %Telemetry.Metrics.Counter{} -> __MODULE__.Counter
      %Telemetry.Metrics.LastValue{} -> __MODULE__.LastValue
      %Telemetry.Metrics.Sum{} -> __MODULE__.Sum
    end
    |> apply(:attach, [metric, ets_table])
  end

  defp get_metric_report(ets_table) do
    :ets.tab2list(ets_table)
    |> aggregate_report()
  end

  defp get_metric_report_and_do_clanup(ets_table) do
    :ets.tab2list(ets_table)
    |> Enum.flat_map(fn {key, _val} -> :ets.take(ets_table, key) end)
    |> aggregate_report()
  end

  defp aggregate_report(content) do
    {aggregated_content, content_to_aggregate} =
      Enum.split_with(content, fn {key, _val} -> key == [] end)

    content_to_aggregate
    |> Enum.group_by(
      # key fun
      fn {[head | _tail], _val} -> head end,
      # value fun
      fn {[_head | tail], val} -> {tail, val} end
    )
    |> Enum.map(fn {key, subcontent} -> {key, aggregate_report(subcontent)} end)
    |> Enum.concat(aggregated_content)
  end

  defp move_metric_down_in_report({metric_name, report}) do
    Enum.map(report, fn
      {[], val} -> {metric_name, val}
      {key, sub_report} -> {key, move_metric_down_in_report({metric_name, sub_report})}
    end)
  end

  defp to_deep_map(report) when is_list(report) do
    Map.new(report, fn {key, val} -> {key, to_deep_map(val)} end)
  end

  defp to_deep_map(report), do: report

  defp merge_metrics_reports(reports) do
    Enum.map(reports, &to_deep_map/1)
    |> Enum.reduce(%{}, &merge_metrics_reports/2)
  end

  defp merge_metrics_reports(report1, report2) do
    Map.merge(
      report1,
      report2,
      fn _key, val1, val2 -> merge_metrics_reports(val1, val2) end
    )
  end
end