defmodule Tidefall.Buffer do
@moduledoc """
Buffer operations and behaviour.
This module is the public interface for buffer-level concerns:
* Starting and stopping buffers
* Total size across partitions
* Mutating runtime options
* Locating the partition for a routing key
* The behaviour contract every buffer implementation must satisfy
End-user code typically goes through the buffer-type module
(`Tidefall.Queue`, `Tidefall.HashMap`), which delegates the shared
operations here.
## Start options
#{Tidefall.Buffer.Options.start_options_docs()}
## Runtime options
The following runtime options are shared by `Tidefall.Queue` and
`Tidefall.HashMap`:
#{Tidefall.Buffer.Options.runtime_options_docs()}
Each buffer type may accept additional runtime options of its own —
see `Tidefall.Queue` (e.g. `:sort_key`) and `Tidefall.HashMap`
(e.g. `:key_hasher`) for their full option docs.
"""
alias Tidefall.Buffer.{Options, Partition}
@typedoc "Buffer name"
@type buffer() :: atom()
## Callbacks
@doc """
Returns the list of options passed verbatim to `:ets.new/2` when
the partition creates one of its two backing tables.
The list must include the ETS table type
(`:set` / `:ordered_set` / `:bag` / `:duplicate_bag`), the
`:keypos`, and any concurrency / access knobs the impl wants.
The partition does not augment or rewrite this list; what the
impl returns is exactly what `:ets.new/2` gets.
"""
@callback ets_table_opts() :: [atom() | {atom(), any()}]
@doc """
Returns the match spec used by the processing task when it drains
the swapped table via `:ets.select/3`. The spec determines the
shape of each element handed to the processor.
"""
@callback ets_match_spec() :: :ets.match_spec()
## API
@doc """
Starts a new buffer.
> #### Prefer implementation-specific functions {: .tip}
>
> It is recommended to use `Tidefall.Queue.start_link/1` or
> `Tidefall.HashMap.start_link/1` instead, as they automatically
> set the `:module` option for you.
## Examples
iex> Tidefall.Buffer.start_link(
...> module: Tidefall.Queue,
...> name: :my_buffer
...> )
{:ok, #PID<0.123.0>}
> Notice that the `:module` option must be set to `Tidefall.Queue` or
> `Tidefall.HashMap`.
"""
@spec start_link(keyword()) :: Supervisor.on_start()
defdelegate start_link(opts), to: Tidefall.Buffer.Supervisor
@doc """
Stops a buffer gracefully.
## Examples
iex> Tidefall.Buffer.stop(:my_buffer)
:ok
"""
@spec stop(buffer() | pid(), reason :: any(), timeout()) :: :ok
def stop(buffer, reason, timeout)
def stop(buffer, reason, timeout) when is_atom(buffer) do
[buffer, Supervisor]
|> Module.concat()
|> Supervisor.stop(reason, timeout)
end
def stop(buffer, reason, timeout) when is_pid(buffer) do
Supervisor.stop(buffer, reason, timeout)
end
@doc """
Returns the buffer size (total number of messages across all partitions).
Exposed as `size/1` on `Tidefall.Queue` and `Tidefall.HashMap` — most
code calls those rather than `buffer_size/1` directly.
## Examples
iex> Tidefall.Buffer.buffer_size(:my_buffer)
10
"""
@spec buffer_size(buffer()) :: non_neg_integer()
def buffer_size(buffer) do
buffer
|> lookup()
|> Enum.map(&Partition.buffer_size(elem(&1, 1)))
|> Enum.sum()
end
@doc """
Updates the options for the buffer.
## Examples
iex> Tidefall.Buffer.update_options(:my_buffer, processing_interval: 1000)
:ok
> Notice that the options are updated for all partitions of the buffer.
"""
@spec update_options(buffer(), keyword()) :: :ok
def update_options(buffer, opts) do
# `validate_update_options!` type-checks AND injects defaults for omitted
# keys; keep only the keys the caller actually supplied so a partial update
# changes just those options instead of resetting the rest to defaults.
changes =
opts
|> Options.validate_update_options!()
|> Keyword.take(Keyword.keys(opts))
buffer
|> lookup()
|> Enum.each(&Partition.update_options(elem(&1, 0), changes))
end
## Shared routing helpers (used by Queue, HashMap, etc.)
@doc """
Returns the partition based on the given arguments.
"""
@spec get_partition(buffer(), any(), any()) :: atom()
def get_partition(buffer, partition_key, object) do
key = partition_key(partition_key, object)
case lookup(buffer) do
[] ->
raise "no partitions available for buffer #{inspect(buffer)}. " <>
"The buffer is not running, possibly because it is not " <>
"started or does not exist"
partitions ->
partitions
|> Enum.at(:erlang.phash2(key, length(partitions)))
|> elem(1)
end
end
## Private functions
@compile inline: [lookup: 1]
defp lookup(buffer) do
Registry.lookup(Tidefall.Registry, buffer)
end
# Compute the partition key
defp partition_key(partition_key, object)
# The partition key is not provided, use the message hash as the key
defp partition_key(nil, object) do
:erlang.phash2(object)
end
# The partition key is a function, apply it to the message
defp partition_key(partition_key, object) when is_function(partition_key, 1) do
partition_key.(object)
end
# The partition key is an MFA tuple, apply it (the message is prepended to the args)
defp partition_key({m, f, a}, object) when is_atom(m) and is_atom(f) and is_list(a) do
apply(m, f, [object | a])
end
# The partition key is a static value, return it
defp partition_key(partition_key, _object) do
partition_key
end
end