defmodule Tidefall do
@moduledoc """
ETS-backed buffer for Elixir — accumulate writes, drain in periodic batches.
Tidefall accumulates data in partitioned ETS tables and drains it
to a user-supplied processor function on a fixed interval. It is
inspired by the
[OpenTelemetry Batch Processor][otel_batch_processor], generalised
as a reusable library.
Concrete buffer types:
* `Tidefall.Queue` — Insertion-ordered buffer (`:ordered_set` ETS).
* `Tidefall.HashMap` — Coalescing key-value buffer (`:set` ETS;
last-write-wins, optional version-aware conflict resolution).
See `Tidefall.Buffer` for the buffer-operations API (start, stop,
size, options, partition routing) and the behaviour callbacks
every implementation must satisfy.
[otel_batch_processor]: https://github.com/open-telemetry/opentelemetry-erlang/blob/main/apps/opentelemetry/src/otel_batch_processor.erl
## On this page
* [Quick start](#module-quick-start) — start a buffer in a few lines
* [The processor](#module-the-processor) — when it runs, batching, failure, drain
* [Choosing a buffer type](#module-choosing-a-buffer-type) — Queue vs. HashMap
* [Module-based buffers](#module-module-based-buffers-recommended) — the recommended pattern
* [Direct usage](#module-direct-usage-quick-dynamic) — quick or fully dynamic instances
* [Configuration](#module-configuration) — config file and supervision tree
* [Testing](#module-testing) — testing code that writes to a buffer
* [Architecture](#module-architecture) — supervision tree and partitions
* [Application configuration](#module-application-configuration) — `:tidefall` app env
* [Telemetry](#module-telemetry) — emitted events
## Quick start
Start a buffer with a processor, push some items, and let the engine
drain them on the next tick:
# Start a queue buffer with a processor
iex> {:ok, _pid} =
...> Tidefall.Queue.start_link(
...> name: :my_queue,
...> processor: fn batch -> IO.inspect(batch, label: "batch") end
...> )
# Push items — single or in bulk
iex> Tidefall.Queue.push(:my_queue, "event-1")
:ok
iex> Tidefall.Queue.push(:my_queue, ["event-2", "event-3"])
:ok
The buffer is **drain-only** — producers write, the engine drains; there
is no read-back queue.
## The processor
The processor is a function of arity 1 (`fn batch -> ... end` /
`&Mod.fun/1`) or an MFA, invoked on each processing tick with the
accumulated batch. What to know before relying on it:
* **Timing** — a tick runs every `:processing_interval` milliseconds
(default `5_000`).
* **Batch size** — the processor is called with up to
`:processing_batch_size` items (default `10`), so a tick holding more
items than that invokes the processor **multiple times**. Raise
`:processing_batch_size`, or set it to `:table` to receive the whole
buffer in a single call.
* **Return value** — discarded. The processor runs for its side effects
(export, persist, forward).
* **Failure isolation** — it runs in a task **unlinked** from the
partition, so a processor that raises or exceeds `:processing_timeout`
does not crash the buffer. That batch is dropped (its table was
already swapped out) and the
`[:tidefall, :partition, :processing, :exception]` and
`[:tidefall, :partition, :processing_failed]` telemetry events fire.
* **Shutdown drain** — on graceful `stop/3`, each partition runs a final
tick so buffered items are processed before the buffer terminates.
The batch **shape** differs by buffer type — a list of values for
`Tidefall.Queue`, a list of `t:Tidefall.HashMap.Entry.t/0` structs for
`Tidefall.HashMap`. See each module for details.
## Choosing a buffer type
Both buffer types share the same lifecycle, partitioning, and processor
contract; they differ in **what survives to the next tick**.
* **`Tidefall.Queue`** — every pushed item is buffered in insertion
order and delivered to the processor. Reach for it when items are
independent and all of them matter: event/log/metric forwarding,
span export, batch writes to a sink.
* **`Tidefall.HashMap`** — writes key into an entity, and same-key
writes coalesce so only the latest value per key survives to the
next tick. Reach for it when you care about the current state of a
key, not every write: state snapshots, change deduplication,
counters. Use `Tidefall.HashMap.put_newer/4` when conflict
resolution must respect an explicit version (newer version wins).
## Module-based buffers (recommended)
The recommended way to use a buffer is to define a dedicated module with
`use Tidefall.Queue` or `use Tidefall.HashMap`. The module name becomes
the default instance name, and start options are layered from
compile-time `use` opts, the application environment, and explicit
`start_link`/child-spec opts (in that order of increasing precedence):
defmodule MyApp.EventQueue do
use Tidefall.Queue, otp_app: :my_app
end
defmodule MyApp.StateMap do
use Tidefall.HashMap, otp_app: :my_app
end
Add them to your supervision tree and call the generated functions on the
default instance (named after the module). Each buffer still needs a
`:processor` — supply it in the child spec, the `use` opts, or the
application environment (see [Configuration](#module-configuration)):
children = [
{MyApp.EventQueue, processor: &MyApp.Sink.export/1},
{MyApp.StateMap, processor: &MyApp.Sink.export/1}
]
:ok = MyApp.EventQueue.push(event)
:ok = MyApp.EventQueue.push(event, partition_key: 1)
:ok = MyApp.StateMap.put(key, value)
:ok = MyApp.StateMap.put_newer(key, value, version: v)
The generated functions come in two forms:
* **Default instance** — nameless variants operating on the buffer
named after the module, e.g. `MyApp.StateMap.put(key, value)`.
* **Explicit instance** — one full-arity variant that takes the instance
name first, with every argument explicit (including the trailing
options), e.g. `MyApp.StateMap.put(:tenant_a, key, value, [])`.
Use the explicit form to address a **dynamically started instance** of
the same definition:
{:ok, _} = MyApp.StateMap.start_link(name: :tenant_a)
:ok = MyApp.StateMap.put(:tenant_a, key, value, [])
> #### Named instances require the full arity {: .warning}
>
> A named-instance call must pass **every** argument, including the
> trailing options. An intermediate-arity call that puts the instance
> name first — e.g. `MyApp.StateMap.get(:tenant_a, key)` or
> `MyApp.EventQueue.stop(:tenant_a)` — matches a *nameless* arity and
> silently operates on the **default** instance (binding `:tenant_a` as
> the key or reason), with no error. Always use the full-arity form
> shown above to address a named instance.
## Direct usage (quick / dynamic)
For quick experiments or fully dynamic instances, the buffer types can
be used directly with a runtime `:name` — no definition module required:
{:ok, _pid} =
Tidefall.Queue.start_link(
name: :my_queue,
processor: &MyApp.Sink.process/1
)
:ok = Tidefall.Queue.push(:my_queue, event)
## Configuration
Start options can be supplied wherever a buffer is started. Two common
setups:
**Via the supervision tree** — pass options inline in the child spec.
This works for both definition modules and direct usage:
children = [
# definition module (options layered over its `use`/app-env opts)
{MyApp.EventQueue, processing_interval: 1_000},
# direct usage (runtime name)
{Tidefall.HashMap,
name: :state_map,
processor: &MyApp.StateProcessor.process_batch/1,
partitions: 4}
]
Supervisor.start_link(children, strategy: :one_for_one)
**Via the application environment** — for definition modules with an
`:otp_app`, options can live in `config/runtime.exs` and are read at
start time:
# config/runtime.exs
import Config
config :my_app, MyApp.StateMap,
processor: &MyApp.Sink.process/1,
partitions: 4
> #### `:otp_app` is required for the config-file layer {: .warning}
>
> The application-environment layer is only consulted when the
> definition module was declared with `use Tidefall.Queue,
> otp_app: :my_app`. Without `:otp_app`, starting the buffer raises —
> the env layer is not silently skipped. Direct usage (runtime `:name`)
> does not read the application environment at all; pass its options
> explicitly.
See `Tidefall.Queue` and `Tidefall.HashMap` for the full list of start
and runtime options.
## Testing
Buffers process asynchronously on a timer, so tests should not depend on
wall-clock timing:
* Start the buffer with a short `:processing_interval` and a processor
that sends to the test process, then `assert_receive`:
processor: fn batch -> send(self(), {:batch, batch}) end
* Or push items and call `stop/3` — the shutdown drain processes
buffered items synchronously before it returns.
Start buffers per test (for example with `start_supervised!/1`) so state
does not leak between tests.
## Architecture
```asciidoc
[Tidefall.Supervisor] (application root)
|
+-----------------+-----------------+
| | |
[Tidefall.Metadata] [Tidefall.Registry] per-buffer trees:
v
[Tidefall.Buffer.Supervisor]
|
+-----------+----------------------+
| |
[Task.Supervisor] [Tidefall.Buffer.Partition.Supervisor]
|
+------------------+----------------------+
| | |
[Partition 0] [Partition 1] ... [Partition N-1]
| | |
[ETS tid] [ETS tid] [ETS tid]
```
Each buffer write routes to a partition via `:erlang.phash2/2`, and each
partition double-buffers its ETS table so processing swaps in a fresh
table with zero downtime. Per-type data flow lives in the
`Tidefall.Queue` and `Tidefall.HashMap` docs.
Two independent knobs control partitioning: the per-buffer `:partitions`
start option (how many partitions a single buffer spreads its writes
across; default `System.schedulers_online()`) and the app-level
`:registry_partitions` (see
[Application configuration](#module-application-configuration)), which
sizes the shared registry every buffer consults on each write. Raise
`:partitions` when one buffer is write-bound; raise `:registry_partitions`
when many buffers or very high aggregate write throughput make the shared
registry the bottleneck.
## Application configuration
These options are read from the `:tidefall` application environment at
startup (set them in `config/config.exs` or `config/runtime.exs`). They
configure the library as a whole — distinct from the per-buffer start
options above:
* `:registry_partitions` (positive integer, default
`System.schedulers_online()`) — number of internal ETS
partitions for `Tidefall.Registry`, the shared registry
used by all buffers to locate their partitions. Every
buffer write performs one registry lookup, so contention
here scales with overall write throughput across the
whole app. Higher values reduce that contention at the
cost of more ETS tables. The default matches the
schedulers-online heuristic used elsewhere in OTP.
Example:
# config/runtime.exs
import Config
config :tidefall, registry_partitions: 16
## Telemetry
`Tidefall` emits the following telemetry events.
* `[:tidefall, :partition, :start]` - Dispatched when a partition
is started.
* Measurement: `%{system_time: integer}`
* Metadata: `%{buffer: atom, partition: atom}`
* `[:tidefall, :partition, :stop]` - Dispatched when a partition
terminates (gracefully or abnormally).
* Measurement: `%{duration: native_time}`
* Metadata: `%{buffer: atom, partition: atom, reason: term}`
* `[:tidefall, :partition, :processing, :start]` - Dispatched
when a partition begins processing a batch of messages.
* Measurement: `%{system_time: integer, monotonic_time: integer}`
* Metadata: `%{buffer: atom, partition: atom}`
* `[:tidefall, :partition, :processing, :stop]` - Dispatched
when a partition completes processing a batch of messages.
* Measurement: `%{duration: native_time, monotonic_time: integer, size: non_neg_integer}`
* Metadata: `%{buffer: atom, partition: atom}`
* `[:tidefall, :partition, :processing, :exception]` - Dispatched
when an exception occurs during processing.
* Measurement: `%{duration: native_time, monotonic_time: integer}`
* Metadata:
```
%{
buffer: atom,
partition: atom,
kind: atom,
reason: term,
stacktrace: list
}
```
* `[:tidefall, :partition, :processing_failed]` - Dispatched
when a processing task encounters an error and fails.
* Measurement: `%{system_time: integer}`
* Metadata: `%{buffer: atom, partition: atom, reason: any}`
"""
end