lib/zen_monitor/local.ex

defmodule ZenMonitor.Local do
  @moduledoc """
  ZenMonitor.Local

  Most of the actual logic of monitoring and fan-out is handled by `ZenMonitor.Local.Connector`,
  see that module for more information.

  `ZenMonitor.Local` is responsible for monitoring the subscribing local processes and cleaning up
  monitors if they crash.
  """
  use GenStage
  use Instruments.CustomFunctions, prefix: "zen_monitor.local"
  alias ZenMonitor.Local.{Connector, Tables}

  @typedoc """
  Effective compatibility of a remote node
  """
  @type compatibility :: :compatible | :incompatible

  @typedoc """
  Represents a future down dispatch for a given pid to be delivered by
  `ZenMonitor.Local.Dispatcher`
  """
  @type down_dispatch :: {pid, {:DOWN, reference, :process, pid, {:zen_monitor, any}}}

  @subscribers_table Module.concat(__MODULE__, "Subscribers")
  @hibernation_threshold 1_000

  defmodule State do
    @moduledoc """
    Maintains the internal state for ZenMonitor.Local

     - `subscribers` is an ETS table that tracks local subscribers to prevent multiple monitors
     - `batch` is the queue of messages awaiting delivery to ZenMonitor.Local.Dispatcher
     - `length` is the current length of the batch queue (calculating queue length is an O(n)
        operation, it is simple to track it as elements are added / removed)
     - `queue_emptied` is the number of times the queue has been emptied.  Once this number
        exceeds the hibernation_threshold (see `hibernation_threshold/0`) the process will
        hibernate
    """

    @type t :: %__MODULE__{
            subscribers: :ets.tid(),
            length: integer,
            queue_emptied: integer,
            batch: :queue.queue()
          }
    defstruct [
      :subscribers,
      length: 0,
      queue_emptied: 0,
      batch: :queue.new()
    ]
  end

  ## Delegates

  defdelegate compatibility_for_node(remote), to: ZenMonitor.Local.Connector, as: :compatibility

  ## Client

  def start_link(_opts \\ []) do
    GenStage.start_link(__MODULE__, [], name: __MODULE__)
  end

  @doc """
  Begin monitoring the given process

  Has the same semantics as `Process.monitor/1`, DOWN messages will be delivered
  at a pace controlled by the :zen_monitor, :demand_interval and
  :zen_monitor, :demand_amount environment variables
  """
  @spec monitor(target :: ZenMonitor.destination()) :: reference
  def monitor(target) do
    increment("monitor")
    ref = make_ref()
    me = self()

    # Write the reference out
    :ets.insert(Tables.references(), {{me, ref}, target})

    # Enqueue the monitor into the Connector for async monitor
    Connector.monitor(target, ref, me)

    # Perform reciprocal monitoring (if needed)
    unless :ets.member(@subscribers_table, me) do
      GenStage.cast(__MODULE__, {:monitor_subscriber, me})
    end

    # Return the reference to the caller
    ref
  end

  @doc """
  Stop monitoring a process by monitor reference

  Has the same semantics as `Process.demonitor/2` (although you can pass the `:info` option, it
  has no effect and is not honored, `:flush` is honored)
  To demonitor a process you should pass in the reference returned from
  `ZenMonitor.Local.monitor/1` for the given process
  """
  @spec demonitor(ref :: reference, options :: [:flush]) :: true
  def demonitor(ref, options \\ []) when is_reference(ref) do
    increment("demonitor")
    me = self()

    # First consume the reference
    case :ets.take(Tables.references(), {me, ref}) do
      [] ->
        # Unknown reference, maybe it's been dispatched, consume any :DOWN messages in the inbox
        # if :flush is provided.  Dispatch atomically consumes the reference, which is why we only
        # need to scan the inbox if we don't find a reference.
        if :flush in options do
          receive do
            {:DOWN, ^ref, _, _, _} -> nil
          after
            0 ->
              nil
          end
        end

        :ok

      [{{^me, ^ref}, pid}] ->
        # Instruct the Connector to demonitor the monitor
        Connector.demonitor(pid, ref)
    end

    true
  end

  @doc """
  Check the compatiblity of the remote node that owns the provided destination

  This is a simple convenience function that looksup the node for the destination and then calls
  `ZenMonitor.Local.compatiblity_for_node/1`
  """
  @spec compatibility(target :: ZenMonitor.destination()) :: compatibility
  def compatibility(target) do
    target
    |> ZenMonitor.find_node()
    |> compatibility_for_node()
  end

  @doc """
  Asynchronously enqueue a list of down dispatches for delivery by the Dispatcher

  If called with the empty list, cast will be suppressed.
  """
  @spec enqueue(messages :: [down_dispatch]) :: :ok
  def enqueue([]), do: :ok

  def enqueue(messages) do
    GenStage.cast(__MODULE__, {:enqueue, messages})
  end

  @doc """
  Synchronously checks the length of the ZenMonitor.Local's internal batch
  """
  @spec batch_length() :: integer()
  def batch_length do
    GenStage.call(__MODULE__, :batch_length)
  end

  @doc """
  Gets the hibernation threshold from the Application Environment

  Every time the demand empties the queue a counter is incremented.  When this counter exceeds the
  hibernation threshold the ZenMonitor.Local process will be sent into hibernation. See
  ZenMonitor.Local's @hibernation_threshold for the default value

  This can be controlled at boot and runtime with the {:zen_monitor, :hibernation_threshold}
  setting, see ZenMonitor.Local.hibernation_threshold/1 for runtime convenience functionality.
  """
  @spec hibernation_threshold() :: integer
  def hibernation_threshold do
    Application.get_env(:zen_monitor, :hibernation_threshold, @hibernation_threshold)
  end

  @doc """
  Puts the hibernation threshold into the Application Environment

  This is a simple convenience function for overwriting the
  {:zen_monitor, :hibernation_threshold} setting at runtime.
  """
  @spec hibernation_threshold(value :: integer) :: :ok
  def hibernation_threshold(value) do
    Application.put_env(:zen_monitor, :hibernation_threshold, value)
  end

  ## Server

  def init(_opts) do
    Process.flag(:message_queue_data, :off_heap)

    subscribers =
      :ets.new(@subscribers_table, [:protected, :named_table, :set, read_concurrency: true])

    {:producer, %State{subscribers: subscribers}}
  end

  @doc """
  Handles demand from `ZenMonitor.Local.Dispatcher`

  ZenMonitor.Local maintains a queue of pending messages to be sent to local processes, the actual
  dispatch of which are throttled by ZenMonitor.Local.Dispatcher.  When
  ZenMonitor.Local.Dispatcher requests more messages to dispatch, this handler will collect up to
  the requested amount from the batch queue to satisfy the demand.
  """
  def handle_demand(demand, %State{length: length} = state) do
    if length <= demand do
      empty_queue(state)
    else
      chunk_queue(demand, state)
    end
  end

  # Handle a local subscriber going down
  # When a process establishes a remote monitor, ZenMonitor.Local establishes a reciprocal monitor,
  # see monitor/1 and handle_cast({:monitor_subscriber, ...}) for more information.
  # If the subscriber crashes, all of the ETS records maintained by ZenMonitor.Local and the various
  # ZenMonitor.Local.Connectors is no longer needed and will be cleaned up by this handler.
  def handle_info(
        {:DOWN, _ref, :process, subscriber, _reason},
        %State{subscribers: subscribers} = state
      ) do
    for [ref, remote_pid] <- :ets.match(Tables.references(), {{subscriber, :"$1"}, :"$2"}) do
      # Remove the reference
      :ets.delete(Tables.references(), {subscriber, ref})

      # Instruct the Connector to demonitor
      Connector.demonitor(remote_pid, ref)
    end

    # Remove the subscriber from the subscribers table
    :ets.delete(subscribers, subscriber)

    {:noreply, [], state}
  end

  # Handles recipricol subscriber monitoring
  # When a process establishes a remote monitor, ZenMonitor.Local will establish a reciprocal
  # monitor on the subscriber.  This is done so that appropriate cleanup can happen if the
  # subscriber goes down.
  # This handler guarantees that a local subscriber will only ever have one active reciprocal
  # monitor at a time by tracking the subscribers in an ETS table.
  def handle_cast({:monitor_subscriber, subscriber}, %State{subscribers: subscribers} = state) do
    if :ets.insert_new(subscribers, {subscriber}) do
      Process.monitor(subscriber)
    end

    {:noreply, [], state}
  end

  # Handles enqueuing messages for eventual dispatch
  # ZenMonitor.Local.Connector is responsible for generating down dispatches and enqueuing them with
  # ZenMonitor.Local.  ZenMonitor.Local takes these messages and places them into the
  # batch queue to be delivered to ZenMonitor.Local.Dispatcher as demanded.
  def handle_cast({:enqueue, messages}, %State{batch: batch, length: length} = state) do
    {batch, new_length} =
      messages
      |> Enum.reduce({batch, length}, fn item, {acc, len} ->
        {:queue.in(item, acc), len + 1}
      end)

    increment("enqueue", new_length - length)

    {:noreply, [], %State{state | batch: batch, length: new_length}}
  end

  # Handles batch length checks
  # Returns the current length of the batch
  def handle_call(:batch_length, _from, %State{length: length} = state) do
    {:reply, length, [], state}
  end

  ## Private

  @spec empty_queue(state :: State.t()) ::
          {:noreply, [down_dispatch], State.t()}
          | {:noreply, [down_dispatch], State.t(), :hibernate}
  defp empty_queue(%State{queue_emptied: queue_emptied, batch: batch} = state) do
    new_queue_emptied = queue_emptied + 1
    response = :queue.to_list(batch)

    if new_queue_emptied >= hibernation_threshold() do
      {:noreply, response, %State{state | batch: :queue.new(), length: 0, queue_emptied: 0},
       :hibernate}
    else
      {:noreply, response,
       %State{state | batch: :queue.new(), length: 0, queue_emptied: new_queue_emptied}}
    end
  end

  @spec chunk_queue(size :: integer(), state :: State.t()) ::
          {:noreply, [down_dispatch], State.t()}
  defp chunk_queue(size, %State{batch: batch, length: length} = state) do
    {messages, new_batch} = :queue.split(size, batch)
    {:noreply, :queue.to_list(messages), %State{state | batch: new_batch, length: length - size}}
  end
end