README.md

# PartitionedBuffer

An ETS-based partitioned buffer library for high-throughput data processing in
Elixir.

`PartitionedBuffer` is a generic, reusable buffering system inspired by the
[OpenTelemetry Batch Processor](https://github.com/open-telemetry/opentelemetry-erlang/blob/main/apps/opentelemetry/src/otel_batch_processor.erl).
It efficiently buffers arbitrary data and periodically processes it using a
configurable processor function at regular intervals.

It ships with two concrete buffer implementations:

- **`PartitionedBuffer.Queue`** — Ordered queue buffer (insertion-time ordered,
  backed by `:ordered_set` ETS tables).
- **`PartitionedBuffer.Map`** — Key-value map buffer (last-write-wins semantics,
  backed by `:set` ETS tables).

Both use partitioning to reduce lock contention and double-buffering for
zero-downtime processing.

## Installation

Add `:partitioned_buffer` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:partitioned_buffer, "~> 0.2"}
  ]
end
```

## Usage

### Queue

`PartitionedBuffer.Queue` buffers items in insertion order and processes them
in batches:

```elixir
# Start a queue buffer
{:ok, _pid} =
  PartitionedBuffer.Queue.start_link(
    name: :my_queue,
    processor: fn batch -> IO.inspect(batch) end
  )

# Push items into the buffer
:ok = PartitionedBuffer.Queue.push(:my_queue, "message1")
:ok = PartitionedBuffer.Queue.push(:my_queue, ["message2", "message3"])

# Check buffer size
PartitionedBuffer.Queue.size(:my_queue)
```

### Map

`PartitionedBuffer.Map` buffers key-value entries with last-write-wins
semantics. Entries with the same key overwrite previous values:

```elixir
# Start a map buffer
# The processor receives a list of {key, value, version, updates} tuples
{:ok, _pid} =
  PartitionedBuffer.Map.start_link(
    name: :my_map,
    processor: fn batch -> IO.inspect(batch) end
  )

# Put entries into the buffer
:ok = PartitionedBuffer.Map.put(:my_map, :key1, "value1")
:ok = PartitionedBuffer.Map.put_all(:my_map, %{key2: "value2", key3: "value3"})

# Read and delete entries
"value1" = PartitionedBuffer.Map.get(:my_map, :key1)
:ok = PartitionedBuffer.Map.delete(:my_map, :key1)

# Check buffer size
PartitionedBuffer.Map.size(:my_map)
```

#### Versioned Updates

For scenarios requiring "newer version wins" semantics (e.g., event sourcing,
state synchronization), use `put_newer/5` and `put_all_newer/3`:

```elixir
# Only updates if the version is greater than the existing one
:ok = PartitionedBuffer.Map.put_newer(:my_map, :key1, "v1", 100)
:ok = PartitionedBuffer.Map.put_newer(:my_map, :key1, "v2", 200)  # overwrites
:ok = PartitionedBuffer.Map.put_newer(:my_map, :key1, "v3", 50)   # ignored (50 < 200)

"v2" = PartitionedBuffer.Map.get(:my_map, :key1)

# Batch versioned updates
entries = [
  {:user_1, %{name: "Alice"}, 100},
  {:user_2, %{name: "Bob"}, 200}
]
:ok = PartitionedBuffer.Map.put_all_newer(:my_map, entries)
```

### Adding to a Supervision Tree

In most applications, you'll want to add a buffer as a child in your
application's supervision tree:

```elixir
defmodule MyApp.Application do
  use Application

  @impl true
  def start(_type, _args) do
    children = [
      # Queue buffer
      {PartitionedBuffer.Queue,
       name: :event_queue,
       processor: &MyApp.EventProcessor.process_batch/1,
       processing_interval_ms: 1000,
       partitions: 4},

      # Map buffer
      {PartitionedBuffer.Map,
       name: :state_map,
       processor: &MyApp.StateProcessor.process_batch/1}
    ]

    opts = [strategy: :one_for_one, name: MyApp.Supervisor]
    Supervisor.start_link(children, opts)
  end
end
```

The buffer will be automatically started with your application and will process
any remaining items during graceful shutdown.

### Configuration Options

```elixir
{:ok, _pid} = PartitionedBuffer.Queue.start_link(
  name: :my_buffer,
  partitions: 4,                        # Number of partitions (default: schedulers_online)
  processing_interval_ms: 1000,         # Process every second (default: 5000)
  processing_batch_size: 100,           # Batch size for processing (default: 10)
  processing_timeout_ms: 5000,          # Timeout for processing tasks (default: 60000)
  processor: &MyApp.Exporter.export/1
)
```

See the `PartitionedBuffer` module documentation for the full list of start and
runtime options.