defmodule Dust.CallbackRegistry do
@default_max_queue_size 1000
def new do
:ets.new(:dust_callbacks, [:bag, :public])
end
@doc """
Register a callback for a store/pattern with options.
Options:
- `:max_queue_size` - maximum pending events before the subscription is
dropped (default: 1000)
- `:on_resync` - callback invoked with `%{error: :resync_required, ref: ref}`
when the subscription is dropped due to backpressure
- `:mode` - which events to deliver to this subscription:
- `:all` (default) - optimistic for own writes, committed for others'
(the historical SDK behaviour: exactly one event per write).
- `:committed` - only committed events, including the server echo of
your own writes (so the callback always sees a `store_seq`).
- `:optimistic` - only the optimistic local events; never the
committed echoes.
Each registration spawns a `Dust.CallbackWorker` process that receives events
asynchronously. The dispatcher checks the worker's mailbox length before
sending; if it exceeds `max_queue_size`, the subscription is unregistered and
`on_resync` fires.
"""
def register(table, store, pattern, callback, opts \\ []) when is_function(callback, 1) do
ref = make_ref()
{:ok, compiled} = Dust.Protocol.Glob.compile(pattern)
max_queue_size = Keyword.get(opts, :max_queue_size, @default_max_queue_size)
on_resync = Keyword.get(opts, :on_resync)
mode = Keyword.get(opts, :mode, :all)
unless mode in [:all, :committed, :optimistic] do
raise ArgumentError,
"invalid :mode #{inspect(mode)} (allowed: :all, :committed, :optimistic)"
end
{:ok, worker_pid} =
Dust.CallbackWorker.start(
callback: callback,
ref: ref,
max_queue_size: max_queue_size,
on_resync: on_resync
)
:ets.insert(
table,
{store, compiled, pattern, worker_pid, ref, max_queue_size, on_resync, mode}
)
ref
end
def unregister(table, ref) do
# Find the worker pid before deleting so we can stop it
entries = :ets.match_object(table, {:_, :_, :_, :_, ref, :_, :_, :_})
Enum.each(entries, fn {_, _, _, worker_pid, _, _, _, _} ->
if Process.alive?(worker_pid), do: Process.exit(worker_pid, :kill)
end)
:ets.match_delete(table, {:_, :_, :_, :_, ref, :_, :_, :_})
:ok
end
@doc """
Look up a registered subscription by ref. Returns
`{worker_pid, max_queue_size, on_resync, mode}` or `nil` if no subscription
with that ref exists.
"""
def lookup(table, ref) do
case :ets.match_object(table, {:_, :_, :_, :_, ref, :_, :_, :_}) do
[{_store, _compiled, _pattern, worker_pid, ^ref, max_queue_size, on_resync, mode}] ->
{worker_pid, max_queue_size, on_resync, mode}
[] ->
nil
end
end
@doc """
Find all matching subscriptions for a store/path. Returns a list of
`{worker_pid, ref, max_queue_size, on_resync, mode}` tuples.
"""
def match(table, store, path) do
case Dust.Protocol.Path.parse_rendered(path) do
{:ok, path_segments} ->
:ets.lookup(table, store)
|> Enum.filter(fn {_store, compiled, _pattern, _pid, _ref, _max, _resync, _mode} ->
Dust.Protocol.Glob.match?(compiled, path_segments)
end)
|> Enum.map(fn {_store, _compiled, _pattern, pid, ref, max, on_resync, mode} ->
{pid, ref, max, on_resync, mode}
end)
_ ->
[]
end
end
end