Skip to main content

lib/dust/callback_registry.ex

defmodule Dust.CallbackRegistry do
  @default_max_queue_size 1000

  def new do
    :ets.new(:dust_callbacks, [:bag, :public])
  end

  @doc """
  Register a callback for a store/pattern with options.

  Options:
    - `:max_queue_size` - maximum pending events before the subscription is
      dropped (default: 1000)
    - `:on_resync` - callback invoked with `%{error: :resync_required, ref: ref}`
      when the subscription is dropped due to backpressure
    - `:mode` - which events to deliver to this subscription:
        - `:all` (default) - optimistic for own writes, committed for others'
          (the historical SDK behaviour: exactly one event per write).
        - `:committed` - only committed events, including the server echo of
          your own writes (so the callback always sees a `store_seq`).
        - `:optimistic` - only the optimistic local events; never the
          committed echoes.

  Each registration spawns a `Dust.CallbackWorker` process that receives events
  asynchronously. The dispatcher checks the worker's mailbox length before
  sending; if it exceeds `max_queue_size`, the subscription is unregistered and
  `on_resync` fires.
  """
  def register(table, store, pattern, callback, opts \\ []) when is_function(callback, 1) do
    ref = make_ref()
    {:ok, compiled} = Dust.Protocol.Glob.compile(pattern)
    max_queue_size = Keyword.get(opts, :max_queue_size, @default_max_queue_size)
    on_resync = Keyword.get(opts, :on_resync)
    mode = Keyword.get(opts, :mode, :all)

    unless mode in [:all, :committed, :optimistic] do
      raise ArgumentError,
            "invalid :mode #{inspect(mode)} (allowed: :all, :committed, :optimistic)"
    end

    {:ok, worker_pid} =
      Dust.CallbackWorker.start(
        callback: callback,
        ref: ref,
        max_queue_size: max_queue_size,
        on_resync: on_resync
      )

    :ets.insert(
      table,
      {store, compiled, pattern, worker_pid, ref, max_queue_size, on_resync, mode}
    )

    ref
  end

  def unregister(table, ref) do
    # Find the worker pid before deleting so we can stop it
    entries = :ets.match_object(table, {:_, :_, :_, :_, ref, :_, :_, :_})

    Enum.each(entries, fn {_, _, _, worker_pid, _, _, _, _} ->
      if Process.alive?(worker_pid), do: Process.exit(worker_pid, :kill)
    end)

    :ets.match_delete(table, {:_, :_, :_, :_, ref, :_, :_, :_})
    :ok
  end

  @doc """
  Look up a registered subscription by ref. Returns
  `{worker_pid, max_queue_size, on_resync, mode}` or `nil` if no subscription
  with that ref exists.
  """
  def lookup(table, ref) do
    case :ets.match_object(table, {:_, :_, :_, :_, ref, :_, :_, :_}) do
      [{_store, _compiled, _pattern, worker_pid, ^ref, max_queue_size, on_resync, mode}] ->
        {worker_pid, max_queue_size, on_resync, mode}

      [] ->
        nil
    end
  end

  @doc """
  Find all matching subscriptions for a store/path. Returns a list of
  `{worker_pid, ref, max_queue_size, on_resync, mode}` tuples.
  """
  def match(table, store, path) do
    case Dust.Protocol.Path.parse_rendered(path) do
      {:ok, path_segments} ->
        :ets.lookup(table, store)
        |> Enum.filter(fn {_store, compiled, _pattern, _pid, _ref, _max, _resync, _mode} ->
          Dust.Protocol.Glob.match?(compiled, path_segments)
        end)
        |> Enum.map(fn {_store, _compiled, _pattern, pid, ref, max, on_resync, mode} ->
          {pid, ref, max, on_resync, mode}
        end)

      _ ->
        []
    end
  end
end