lib/zen_monitor/proxy.ex

defmodule ZenMonitor.Proxy do
  @moduledoc """
  ZenMonitor.Proxy monitors local processes and proxies their down messages to interested
  ZenMonitor.Locals on remote nodes for fanout.
  """
  use GenServer

  alias ZenMonitor.Truncator
  alias ZenMonitor.Proxy.{Batcher, Tables}

  @typedoc """
  Defines the valid operations that can be processed
  """
  @type operation :: :subscribe | :unsubscribe

  @typedoc """
  An instruction is a valid operation upon a given destination
  """
  @type instruction :: {operation, ZenMonitor.destination()}

  @typedoc """
  A string of instructions with the same operation can be collapsed into a partition for more
  efficient processing.
  """
  @type partition :: {operation, [ZenMonitor.destination()]}

  defmodule State do
    @moduledoc """
    Maintains the internal state for ZenMonitor.Proxy

    `monitors` is an ETS table with all the pids that the Proxy is currently monitoring
    """
    @type t :: %__MODULE__{
            monitors: :ets.tid()
          }
    defstruct [
      :monitors
    ]
  end

  ## Client

  def start_link(args) do
    GenServer.start_link(__MODULE__, args, name: __MODULE__)
  end

  @doc """
  Ping is a diagnostic function to check that the proxy is running.

  It is mainly used by ZenMonitor.Local.Connectors to check if ZenMonitor.Proxy is available
  and running on a remote node
  """
  @spec ping() :: :pong
  def ping() do
    GenServer.call(__MODULE__, :ping)
  end

  ## Server

  def init(_args) do
    Process.flag(:message_queue_data, :off_heap)
    {:ok, %State{monitors: :ets.new(:monitors, [:private, :set])}}
  end

  def handle_call(:ping, _from, %State{} = state) do
    {:reply, :pong, state}
  end

  def handle_cast({:subscribe, subscriber, targets}, %State{} = state) do
    process_operation(:subscribe, subscriber, targets, state)
    {:noreply, state}
  end

  def handle_cast({:process, subscriber, instructions}, %State{} = state) do
    # Create the most efficient instruction partitions
    for {operation, targets} <- partition_instructions(instructions) do
      process_operation(operation, subscriber, targets, state)
    end

    {:noreply, state}
  end

  def handle_info({:DOWN, _, :process, pid, reason}, %State{monitors: monitors} = state) do
    # Reasons can include stack traces and other dangerous items, truncate them.
    truncated_reason = Truncator.truncate(reason)

    # Enqueue the death certificates with the interested subscriber's batchers
    for [subscriber] <- :ets.match(Tables.subscribers(), {{pid, :"$1"}}) do
      # Delete the subscription
      :ets.delete(Tables.subscribers(), {pid, subscriber})

      # Enqueue the death certificate with the Batcher
      subscriber
      |> Batcher.get()
      |> Batcher.enqueue(pid, truncated_reason)
    end

    # Clear the monitor
    :ets.delete(monitors, pid)

    {:noreply, state}
  end

  ## Private

  @spec process_operation(
          operation,
          subscriber :: pid(),
          targets :: [ZenMonitor.destination()],
          State.t()
        ) :: :ok
  defp process_operation(:subscribe, subscriber, targets, %State{monitors: monitors}) do
    # Record that the subscriber is interested in the targets
    :ets.insert(Tables.subscribers(), Enum.map(targets, &{{&1, subscriber}}))

    # Record and monitor each of the pids, filtering out already monitored pids
    for target <- targets,
        :ets.insert_new(monitors, {target}) do
      Process.monitor(target)
    end

    :ok
  end

  defp process_operation(:unsubscribe, subscriber, targets, _state) do
    # Remove the subscriptions from the subscribers table
    for target <- targets do
      :ets.delete(Tables.subscribers(), {target, subscriber})
    end

    :ok
  end

  @spec partition_instructions([instruction]) :: [partition]
  defp partition_instructions(instructions) do
    do_partition_instructions(instructions, [])
  end

  @spec do_partition_instructions([instruction], [partition]) :: [partition]
  defp do_partition_instructions([], acc) do
    # There are no more instructions to process, the accumulator now has all the partitions, but
    # in reverse order, reverse and return it
    Enum.reverse(acc)
  end

  defp do_partition_instructions([{op, target} | rest], acc) do
    # Inspect the first instruction in the instruction list, collect all the targets with that
    # operation into a new partition.
    {partition, remaining} = do_collect_targets(op, rest, [target])

    # Recursively process any remaining instructions after prepending in the new partition into
    # the accumulator
    do_partition_instructions(remaining, [{op, partition} | acc])
  end

  @spec do_collect_targets(operation, [instruction], [ZenMonitor.destination()]) ::
          {[ZenMonitor.destination()], [instruction]}
  defp do_collect_targets(_op, [], acc) do
    # There are no more instructions to process, return the accumulator.  Note that since
    # instructions of the same operation are commutative there is no need to reverse the
    # accumulator even though the targets are in reverse order
    {acc, []}
  end

  defp do_collect_targets(op, [{op, target} | rest], acc) do
    # The next instruction matches the current operation, prepend the target into the accumulator
    # and recursively process the rest of the instructions
    do_collect_targets(op, rest, [target | acc])
  end

  defp do_collect_targets(_op, [{_other, _} | _rest] = remainder, acc) do
    # The next instruction does not match the current operations.  Similar to when there are no
    # more instructions to process, the accumulator is returned as-is.  The remaining instructions
    # (including the current instruction that didn't match) are returned for further processing.
    {acc, remainder}
  end
end