Skip to main content

lib/dust/callback_worker.ex

defmodule Dust.CallbackWorker do
  @moduledoc """
  Per-subscription worker process that receives events and calls the callback.

  Each subscription gets its own worker with a bounded mailbox. The SyncEngine
  checks `Process.info(pid, :message_queue_len)` before dispatching; if the
  queue exceeds `max_queue_size`, the subscription is dropped and the
  `on_resync` callback fires with `%{error: :resync_required}`.
  """

  use GenServer

  defstruct [:callback, :ref, :max_queue_size, :on_resync]

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  @doc "Start a worker without linking to the caller."
  def start(opts) do
    GenServer.start(__MODULE__, opts)
  end

  @doc "Send an event to the worker. Called by the SyncEngine dispatcher."
  def dispatch(pid, event) do
    GenServer.cast(pid, {:event, event})
  end

  @doc "Return the current message queue length of the worker process."
  def queue_len(pid) do
    case Process.info(pid, :message_queue_len) do
      {:message_queue_len, len} -> len
      nil -> 0
    end
  end

  # Server

  @impl true
  def init(opts) do
    state = %__MODULE__{
      callback: Keyword.fetch!(opts, :callback),
      ref: Keyword.fetch!(opts, :ref),
      max_queue_size: Keyword.get(opts, :max_queue_size, 1000),
      on_resync: Keyword.get(opts, :on_resync)
    }

    {:ok, state}
  end

  @impl true
  def handle_cast({:event, event}, state) do
    state.callback.(event)
    {:noreply, state}
  end
end