lib/telemetry_broadcaster.ex

defmodule ALF.TelemetryBroadcaster do
  use GenServer

  defmodule Handler do
    def handle_event([:alf, :component, type], measurements, metadata, %{pid: pid}) do
      GenServer.cast(pid, {:handle_event, [:alf, :component, type], measurements, metadata})
    end
  end

  defstruct remote_function: nil,
            pid: nil,
            memo: %{}

  @allowed_opts [:interval]

  @spec start_link([]) :: GenServer.on_start()
  def start_link([]) do
    GenServer.start_link(__MODULE__, %__MODULE__{}, name: __MODULE__)
  end

  @impl true
  def init(%__MODULE__{} = state) do
    state = %{state | pid: self()}

    {:ok, state, {:continue, :attach_telemetry}}
  end

  @impl true
  def handle_continue(:attach_telemetry, %__MODULE__{} = state) do
    do_attach_telemetry(state.pid)
    {:noreply, state}
  end

  @spec register_remote_function(atom(), atom(), atom(), list()) :: {atom(), atom(), atom()}
  def register_remote_function(name, module, function, opts \\ [interval: nil])
      when is_atom(name) and is_atom(module) and is_atom(function) and is_list(opts) do
    wrong_options = Keyword.keys(opts) -- @allowed_opts

    if Enum.any?(wrong_options) do
      raise "Wrong options for TelemetryBroadcaster: #{inspect(wrong_options)}. " <>
              "Available options are #{inspect(@allowed_opts)}"
    end

    GenServer.call(__MODULE__, {:register_remote_function, {name, module, function, opts}})
  end

  def remote_function(), do: GenServer.call(__MODULE__, :remote_function)

  @impl true
  def handle_call(:remote_function, _from, state), do: {:reply, state.remote_function, state}

  def handle_call({:register_remote_function, {name, module, function, opts}}, _from, state) do
    state = %{state | remote_function: {name, module, function, opts}}
    {:reply, {name, module, function, opts}, state}
  end

  @impl true
  def handle_cast({:handle_event, [:alf, :component, _], _, _}, %{remote_function: nil} = state) do
    {:noreply, state}
  end

  def handle_cast({:handle_event, [:alf, :component, type], measurements, metadata}, state) do
    {name, module, function, opts} = state.remote_function

    if opts[:interval] && opts[:interval] > 0 do
      do_handle_cast_with_interval(
        {type, measurements, metadata},
        {name, module, function},
        opts[:interval],
        state
      )
    else
      :rpc.call(name, module, function, [[:alf, :component, type], measurements, metadata])
      {:noreply, state}
    end
  end

  defp do_handle_cast_with_interval(
         {type, measurements, metadata},
         {name, module, function},
         interval,
         state
       ) do
    {component_name, pipeline, ip_ref} = fetch_data_from_meta(metadata)
    {timestamp, producer_ip_ref} = Map.get(state.memo, pipeline, {false, false})

    if component_name == :producer and type == :start do
      time_now = Time.utc_now()

      if timestamp do
        if Time.diff(time_now, timestamp, :millisecond) > interval do
          :rpc.call(name, module, function, [[:alf, :component, type], measurements, metadata])
          new_memo = Map.put(state.memo, pipeline, {time_now, ip_ref})
          {:noreply, %{state | memo: new_memo}}
        else
          {:noreply, state}
        end
      else
        :rpc.call(name, module, function, [[:alf, :component, type], measurements, metadata])
        new_memo = Map.put(state.memo, pipeline, {time_now, ip_ref})
        {:noreply, %{state | memo: new_memo}}
      end
    else
      if ip_ref == producer_ip_ref do
        :rpc.call(name, module, function, [[:alf, :component, type], measurements, metadata])
        {:noreply, state}
      else
        {:noreply, state}
      end
    end
  end

  defp fetch_data_from_meta(metadata) do
    {
      get_in(metadata, [:component, :name]),
      get_in(metadata, [:component, :pipeline_module]),
      get_in(metadata, [:ip, :ref])
    }
  end

  defp do_attach_telemetry(pid) do
    :ok =
      :telemetry.attach_many(
        "telemetry-broadcaster",
        [
          [:alf, :component, :start],
          [:alf, :component, :stop],
          [:alf, :component, :exception]
        ],
        &Handler.handle_event/4,
        %{pid: pid}
      )
  end
end