Skip to main content

lib/tidefall/queue.ex

defmodule Tidefall.Queue do
  @moduledoc """
  ETS-based queue buffer for high-throughput data processing.

  `Tidefall.Queue` buffers arbitrary data in insertion order and
  periodically processes it using a configurable processor function.
  The queue is **drain-only**: there is no `pop`/`dequeue`. Producers
  `push/3` items, and the engine drains the buffer to the processor on
  each processing tick.
  It implements partitioning to reduce lock contention during high-throughput
  writes, and uses double-buffering to ensure zero-downtime processing.

  ## Data Flow

  ```asciidoc
  push(buffer, items)
         |
         v
  +-------------------+
  | Partition Routing |
  | phash2(item, N)   |
  +-------------------+
     |         |         |
     v         v         v
  +-------+ +-------+ +-------+     ETS :ordered_set
  | P 0   | | P 1   | | P N-1 |     Key: {sort_key, ref}
  +-------+ +-------+ +-------+     Val: item
     |         |         |
     v         v         v
  +--------------------------------------+
  | processor(batch)                     |
  | batch = [val1, val2, ...]            |
  +--------------------------------------+
  ```

  Items are routed to partitions via `phash2`, stored in
  `:ordered_set` ETS tables keyed by `{sort_key, ref}`, and
  periodically flushed to the processor in batches. `sort_key`
  is `System.monotonic_time()` by default — giving insertion-time
  order — or the term produced by the `:sort_key` runtime option;
  `ref` keeps every key unique so no item is ever overwritten.
  Ordering is **per partition** (see `:sort_key` under runtime options).

  ## Start options

  #{Tidefall.Buffer.Options.start_options_docs()}

  ## Runtime options

  #{Tidefall.Queue.Options.runtime_options_docs()}

  ## Examples

  ### Standalone Usage

      # Start a queue buffer with a custom processor
      iex> {:ok, _sup_pid} =
      ...>   Tidefall.Queue.start_link(
      ...>     name: :my_buffer,
      ...>     processor: fn batch -> IO.inspect(batch) end
      ...>   )

      # Push a single item into the buffer
      iex> Tidefall.Queue.push(:my_buffer, "item1")
      :ok

      # Push a batch of items
      iex> Tidefall.Queue.push(:my_buffer, ["item2", "item3"])
      :ok

      # Check buffer size
      iex> Tidefall.Queue.size(:my_buffer)
      3

      # Stop the buffer gracefully (processes remaining items)
      iex> Tidefall.Queue.stop(:my_buffer)
      :ok

  ### Adding to a Supervision Tree

      children = [
        {Tidefall.Queue,
         name: :my_buffer,
         processor: &MyApp.EventProcessor.process_batch/1}
      ]

      Supervisor.start_link(children, strategy: :one_for_one)

  ## Defining a buffer module

  For the recommended **module-based** pattern — `use Tidefall.Queue`,
  where the module name becomes the default instance and start options
  layer across compile-time `use` opts, the application environment, and
  explicit opts — see the
  [Module-based buffers](`m:Tidefall#module-module-based-buffers-recommended`)
  section of `Tidefall`.

  ## Processor

  The processor function receives a list of values
  (the items pushed to the buffer):

      fn batch ->
        # batch is [value1, value2, ...]
        Enum.each(batch, fn value -> process(value) end)
      end

  See [The processor](`m:Tidefall#module-the-processor`) for when it runs,
  batching, failure isolation, and shutdown-drain behavior.

  """

  @behaviour Tidefall.Buffer

  import Record, only: [defrecordp: 2]

  alias Tidefall.Buffer
  alias Tidefall.Buffer.{Definition, Partition}
  alias Tidefall.Queue.Options

  # Queue-specific key record.
  #
  # `sort_key` is the primary ordering term — `System.monotonic_time()`
  # by default (insertion order), or whatever the `:sort_key` runtime
  # option resolves to. `ref` is always retained as the uniqueness
  # tiebreaker: since `make_ref()` is unique, distinct items never
  # collide in the `:ordered_set`, so nothing is overwritten even when
  # two items share the same `sort_key`.
  defrecordp(:key, sort_key: nil, ref: nil)

  # Entry record stored in ETS. Queue only needs key/value; the
  # match spec returns just the value to the processor.
  defrecordp(:entry, key: nil, value: nil)

  @typedoc "Any term that will be buffered and processed"
  @type item() :: any()

  @typedoc "Proxy type for a buffer"
  @type buffer() :: Tidefall.Buffer.buffer()

  ## Definition module

  @doc false
  defmacro __using__(opts) do
    # Public operations delegated to the definition module. Each entry is
    # `{name, leading_params, min_optional, max_optional}` — `leading_params`
    # counts the required non-buffer/non-opts params, the optional window
    # drives the distinct nameless arities. See `Tidefall.Buffer.Definition`.
    ops = [
      {:push, 1, 0, 1},
      {:size, 0, 0, 0},
      {:update_options, 1, 0, 0},
      {:stop, 0, 0, 2}
    ]

    Definition.define(__MODULE__, ops, opts)
  end

  ## API

  @doc """
  Starts a new queue buffer.

  ## Options

  See [start options](`m:Tidefall.Queue#module-start-options`).

  ## Examples

      Tidefall.Queue.start_link(
        name: :my_queue_buffer,
        processor: &MyApp.Sink.process/1
      )

  """
  @spec start_link(keyword()) :: Supervisor.on_start()
  def start_link(opts \\ []) do
    opts
    |> Keyword.put(:module, __MODULE__)
    |> Buffer.start_link()
  end

  @doc """
  Stops a queue buffer gracefully.

  ## Examples

      Tidefall.Queue.stop(:my_queue_buffer)

  """
  @spec stop(buffer() | pid(), reason :: any(), timeout()) :: :ok
  defdelegate stop(buffer, reason \\ :normal, timeout \\ :infinity), to: Buffer

  @doc """
  Returns the queue buffer child spec.
  """
  @spec child_spec(keyword()) :: Supervisor.child_spec()
  def child_spec(opts) do
    %{
      id: opts[:name] || __MODULE__,
      start: {__MODULE__, :start_link, [opts]},
      type: :supervisor
    }
  end

  @doc """
  Pushes an item or a batch of items into the buffer.

  ## Parameters

    * `buffer` - The buffer name (atom).
    * `item_or_batch` - A single item or a list of items to push.
    * `opts` - Optional runtime options.

  ## Options

  See [runtime options](`m:Tidefall.Queue#module-runtime-options`).

  ## Examples

      # Simple push with default routing
      push(:my_buffer, "item1")
      push(:my_buffer, ["item2", "item3"])

      # Custom partition routing using function
      push(:my_buffer, user_event, partition_key: &(&1.user_id))

      # Custom partition routing using MFA tuple (item prepended to args)
      push(:my_buffer, event, partition_key: {MyApp.Router, :get_partition, []})

      # Custom partition routing with fixed key (all items to same partition)
      push(:my_buffer, log_entry, partition_key: :logs)

      # Custom ordering: drain by a value-derived sort key (per partition)
      push(:my_buffer, event, sort_key: & &1.priority)

  """
  @spec push(buffer(), item() | [item()], keyword()) :: :ok
  def push(buffer, item_or_batch, opts \\ [])

  def push(buffer, batch, opts) when is_list(batch) do
    opts = Options.validate_runtime_options!(opts)
    partition_key = Keyword.fetch!(opts, :partition_key)
    sort_key = Keyword.get(opts, :sort_key)

    batch
    |> Enum.group_by(&Buffer.get_partition(buffer, partition_key, &1))
    |> Enum.each(fn {partition, items} ->
      partition
      |> Partition.current_table()
      |> :ets.insert(Enum.map(items, &new_entry(build_key(sort_key, &1), &1)))
    end)
  end

  def push(buffer, item, opts) do
    push(buffer, [item], opts)
  end

  @doc """
  Returns the queue size (total number of items across all partitions).

  ## Examples

      size(:my_buffer)

  """
  @spec size(buffer()) :: non_neg_integer()
  defdelegate size(buffer), to: Buffer, as: :buffer_size

  @doc """
  Updates the options for the queue buffer.

  ## Options

  Updatable options: `:processing_interval`, `:processing_timeout`,
  `:processing_batch_size`, `:drain_threshold`, `:drain_check_interval`. See
  [start options](`m:Tidefall.Queue#module-start-options`) for each option's
  semantics.

  ## Examples

      # Update the processing interval to 100ms
      update_options(:my_buffer, processing_interval: 100)

  """
  @spec update_options(buffer(), keyword()) :: :ok
  defdelegate update_options(buffer, opts), to: Buffer

  ## Callbacks

  @impl Tidefall.Buffer
  def ets_table_opts do
    [
      :ordered_set,
      :public,
      keypos: entry(:key) + 1,
      write_concurrency: true,
      decentralized_counters: true
    ]
  end

  @impl Tidefall.Buffer
  def ets_match_spec do
    [
      {
        entry(key: :"$1", value: :"$2"),
        [true],
        [:"$2"]
      }
    ]
  end

  ## Private functions

  # Iniline common instructions
  @compile inline: [new_entry: 2]

  # Default (no `:sort_key`): order by insertion time.
  defp build_key(nil, _item) do
    key(sort_key: System.monotonic_time(), ref: make_ref())
  end

  # Arity-1: derive the sort term from the item.
  defp build_key(fun, item) when is_function(fun, 1) do
    key(sort_key: fun.(item), ref: make_ref())
  end

  # Arity-0: generate the sort term at push time.
  defp build_key(fun, _item) when is_function(fun, 0) do
    key(sort_key: fun.(), ref: make_ref())
  end

  defp new_entry(key, value), do: entry(key: key, value: value)
end