lib/timescaledb/reporter.ex

defmodule Membrane.Telemetry.TimescaleDB.Reporter do
  @moduledoc """
  A worker responsible for persisting events/measurements to the TimescaleDB database.

  Receives measurements via `send_measurement/3` then, based on event names, eventually persists them to TimescaleDB database.
  """

  use GenServer

  require Logger

  alias Membrane.Telemetry.TimescaleDB.Model
  alias Membrane.Telemetry.TimescaleDB.TelemetryHandler

  @log_prefix "[#{__MODULE__}]"

  @spec registry() :: atom()
  def registry() do
    __MODULE__.Registry
  end

  @spec start(any) :: GenServer.on_start()
  def start(opts) do
    do_start(:start, opts)
  end

  @spec start_link(any) :: GenServer.on_start()
  def start_link(opts) do
    do_start(:start_link, opts)
  end

  defp do_start(method, opts) do
    metrics =
      opts[:metrics] ||
        raise ArgumentError, "the `:metrics` option is required by #{inspect(__MODULE__)}"

    id =
      opts[:id] || Keyword.get(opts, :name) ||
        raise ArgumentError, "the `:id` option is required by #{inspect(__MODULE__)}"

    apply(GenServer, method, [
      __MODULE__,
      [metrics: metrics, caller: self()],
      # allow for custom name for testing purposes
      [name: Keyword.get(opts, :name) || {:via, Registry, {registry(), id}}]
    ])
  end

  @doc """
  Sends measurement to GenServer which, based on event name, will eventually persist it to database.

  Logs warning on invalid/unsupported measurement event name or format.

  ## Supported events
    * `[:membrane, :metric, :value]` - caches measurements to a certain threshold and flushes them to the database via `Membrane.Telemetry.TimescaleDB.Model.add_all_measurements/1`.
    * `[:membrane, :link, :new]` - instantly passes measurement to `Membrane.Telemetry.TimescaleDB.Model.add_link/1`.
    * `[:membrane, :pipeline | :bin | :element, :init | :terminate]` - instantly persists information about component being initialized or terminated
  """
  @spec send_measurement(GenServer.server(), list(atom()), map(), map()) :: :ok
  def send_measurement(reporter, event_name, measurement, metadata \\ %{})

  def send_measurement(
        reporter,
        [:membrane, :metric, :value] = event_name,
        %{component_path: path, metric: metric, value: value} = measurement,
        _metadata
      )
      when is_binary(path) and is_binary(metric) and is_integer(value) do
    measurement =
      measurement
      |> Map.merge(%{
        component_path: extend_with_os_pid(path),
        time: NaiveDateTime.utc_now()
      })

    GenServer.cast(
      reporter,
      {:measurement, event_name, measurement}
    )
  end

  def send_measurement(
        reporter,
        [:membrane, :link, :new],
        %{parent_path: parent_path, from: from, to: to, pad_from: pad_from, pad_to: pad_to} =
          link,
        _metadata
      )
      when is_binary(parent_path) and is_binary(from) and is_binary(to) and is_binary(pad_from) and
             is_binary(pad_to) do
    link =
      link
      |> Map.merge(%{
        parent_path: extend_with_os_pid(parent_path),
        time: NaiveDateTime.utc_now()
      })

    GenServer.cast(
      reporter,
      {:link, link}
    )
  end

  def send_measurement(
        reporter,
        [:membrane, element_type, event_type],
        %{path: _path} = measurement,
        metadata
      )
      when element_type in [:pipeline, :bin, :element] and event_type in [:init, :terminate] do
    GenServer.cast(
      reporter,
      {:lifecycle_event, element_type,
       Map.put(measurement, :terminated, event_type == :terminate), metadata}
    )
  end

  def send_measurement(_reporter, event_name, measurement, _metadata) do
    Logger.warn(
      "#{__MODULE__}: Either event name: #{inspect(event_name)} or measurement format: #{inspect(measurement)} is not supported"
    )
  end

  @doc """
  Flushes cached measurements to the database.
  """
  @spec flush(GenServer.server()) :: :ok
  def flush(reporter) do
    GenServer.cast(reporter, :flush)
  end

  @doc """
  Resets cached measurements.
  """
  @spec reset(GenServer.server()) :: :ok
  def reset(reporter) do
    GenServer.cast(reporter, :reset)
  end

  @doc """
  Returns cached measurements.
  """
  @spec get_cached_measurements(GenServer.server()) :: list(map())
  def get_cached_measurements(reporter) do
    GenServer.call(reporter, :get_cached_measurements)
  end

  @doc """
  Returns list of metrics registered by GenServer.
  """
  @spec get_metrics(GenServer.server()) :: map()
  def get_metrics(reporter) do
    GenServer.call(reporter, :metrics)
  end

  @impl true
  def init(opts) do
    metrics = Keyword.fetch!(opts, :metrics)
    # NOTE: why do we use trap_exit?
    Process.flag(:trap_exit, true)
    Membrane.Telemetry.TimescaleDB.TelemetryHandler.register_metrics(metrics)

    flush_timeout = Application.get_env(:membrane_timescaledb_reporter, :flush_timeout, 5000)
    flush_threshold = Application.get_env(:membrane_timescaledb_reporter, :flush_threshold, 1000)

    Process.send_after(self(), :force_flush, flush_timeout)

    {:ok,
     %{
       # NOTE: for a single process keep a map, otherwise use ets
       # (in the end every worker will have consistent map and the lookup is faster than
       # for ets)
       registered_paths: %{},
       measurements: [],
       flush_timeout: flush_timeout,
       flush_threshold: flush_threshold,
       metrics: metrics
     }}
  end

  @impl true
  def handle_cast(
        {:measurement, event_name, measurement},
        state
      ) do
    cache? = event_name == [:membrane, :metric, :value]
    state = process_measurement({measurement, cache?}, state)
    {:noreply, state}
  end

  def handle_cast({:link, link}, state) do
    case Model.add_link(link) do
      {:ok, _} ->
        Logger.debug("#{@log_prefix} Added new link")

      {:error, reason} ->
        Logger.error("#{@log_prefix} Error while adding new link: #{inspect(reason)}")
    end

    {:noreply, state}
  end

  # ignore pipeline events
  def handle_cast({:lifecycle_event, type, measurement, metadata}, state)
      when type in [:bin, :element] do
    case Model.add_element_event(
           measurement
           |> Map.put(:time, NaiveDateTime.utc_now())
           |> Map.put(:metadata, sanitize_metadata(metadata))
           |> Map.update!(:path, &extend_with_os_pid/1)
         ) do
      {:ok, _} ->
        Logger.debug("#{@log_prefix} Added #{type} event")

      {:error, reason} ->
        Logger.error("#{@log_prefix} Error while adding #{type} event: #{inspect(reason)}")
    end

    {:noreply, state}
  end

  def handle_cast({:lifecycle_event, _type, _measurement}, state) do
    {:noreply, state}
  end

  def handle_cast(:flush, %{measurements: measurements} = state) do
    {:noreply, flush_measurements(measurements, state)}
  end

  def handle_cast(:reset, state) do
    {:noreply, %{state | measurements: []}}
  end

  @impl true
  def handle_call(:get_cached_measurements, _from, %{measurements: measurements} = state) do
    {:reply, measurements, state}
  end

  def handle_call(:metrics, _from, %{metrics: metrics} = state) do
    {:reply, metrics, state}
  end

  @impl true
  def handle_info(:force_flush, %{flush_timeout: flush_timeout} = state) do
    Logger.debug("#{@log_prefix} Reached flush timeout: #{flush_timeout}, flushing...")
    flush(self())
    Process.send_after(self(), :force_flush, flush_timeout)
    {:noreply, state}
  end

  @impl true
  def terminate(reason, _state) do
    Logger.debug(
      "#{__MODULE__}.terminate/2 called with reason #{inspect(reason)}, unregistering handler"
    )

    TelemetryHandler.unregister_handler()
  end

  defp extend_with_os_pid(path) do
    String.replace_prefix(path, "pipeline@", "pipeline@#{System.pid()}@")
  end

  defp process_measurement(
         {measurement, cache?},
         %{measurements: measurements, flush_threshold: flush_threshold} = state
       )
       when cache? == true do
    measurements = [measurement | measurements]

    if length(measurements) >= flush_threshold do
      flush_measurements(measurements, state)
    else
      %{state | measurements: measurements}
    end
  end

  defp process_measurement({measurement, cache?}, state) when cache? == false do
    case Model.add_measurement(measurement) do
      {:ok, _} ->
        Logger.debug("#{@log_prefix} Added new measurement")

      {:error, reason} ->
        Logger.error("#{@log_prefix} Error while adding new measurement: #{inspect(reason)}")
    end

    state
  end

  defp sanitize_metadata(metadata) when is_map(metadata) do
    metadata
    |> Enum.map(fn {key, value} -> {key, sanitize_metadata(value)} end)
    |> Map.new()
  end

  defp sanitize_metadata(metadata) when is_list(metadata) do
    if Keyword.keyword?(metadata) do
      metadata
      |> Enum.map(fn {key, value} -> {key, sanitize_metadata(value)} end)
      |> Map.new()
    else
      Enum.map(metadata, &sanitize_metadata/1)
    end
  end

  defp sanitize_metadata(metadata), do: metadata

  defp flush_measurements([], state) do
    state
  end

  defp flush_measurements(measurements, state) do
    accumulator =
      Enum.reduce(measurements, {[], [], []}, fn %{component_path: path} = measurement,
                                                 {with_paths, without_paths, paths_to_insert} ->
        path_id = Map.get(state.registered_paths, path)
        measurement = Map.put(measurement, :component_path_id, path_id)

        case path_id do
          nil ->
            {with_paths, [measurement | without_paths], [path | paths_to_insert]}

          _path_id ->
            {[Map.delete(measurement, :component_path) | with_paths], without_paths,
             paths_to_insert}
        end
      end)

    {:ok, inserted, inserted_paths} = Model.add_all_measurements(accumulator)

    Logger.debug("#{@log_prefix} Flushed #{inserted} measurements")

    registered_paths =
      if inserted_paths != %{} do
        Map.merge(state.registered_paths, inserted_paths)
      else
        state.registered_paths
      end

    %{state | measurements: [], registered_paths: registered_paths}
  end
end