# S2 Elixir SDK
Elixir client for the [S2](https://s2.dev) durable stream API. Protobuf data plane (append, read, check tail) over Mint HTTP/2 + JSON control plane (basins, streams, access tokens, metrics) over Req.
## Installation
```elixir
def deps do
[
{:s2_client, "~> 1.0"}
]
end
```
## Example: Chat App
A chat app where each room gets its own stream. Messages are Ecto embedded schemas, durably ordered, and listeners tail the stream in real time.
```elixir
alias MyApp.Chat
alias MyApp.Chat.Message
# Create rooms (each becomes its own S2 stream)
Chat.create_room("general")
Chat.create_room("random")
# Append typed messages
Chat.append("general", Message.new(user: "alice", text: "hey everyone!"))
Chat.append("general", Message.new(user: "bob", text: "hi alice!"))
Chat.append("random", Message.new(user: "alice", text: "anyone here?"))
# Listen to a room — tails the stream, calling your function for each message
{:ok, listener} = Chat.listen("general", fn %Message{} = msg ->
IO.puts("[#{msg.ts}] #{msg.user}: #{msg.text}")
end)
# [2026-02-25T14:30:00Z] alice: hey everyone!
# [2026-02-25T14:30:01Z] bob: hi alice!
# ... stays open, prints new messages as they arrive
# Stop listening when done
Chat.stop_listener(listener)
# Listen for only new messages (skip history)
Chat.listen("general", fn msg -> IO.inspect(msg) end, from: :tail)
```
Here's the implementation. `S2.Store` manages connections, serialization, and session lifecycle — like `Ecto.Repo` for streams. It handles chunking, framing, and deduplication automatically (see [Patterns](#patterns)), so you just work with your own types.
```elixir
# config/config.exs
config :my_app, MyApp.S2,
base_url: "https://aws.s2.dev",
token: System.get_env("S2_TOKEN"),
max_retries: :infinity, # reconnection attempts before giving up (default: :infinity)
base_delay: 500, # base delay in ms for exponential backoff (default: 500)
max_queue_size: 1000 # pending appends per stream before {:error, :overloaded} (default: 1000)
```
```elixir
# lib/my_app/s2.ex
defmodule MyApp.S2 do
use S2.Store,
otp_app: :my_app,
basin: "my-app"
end
```
```elixir
# lib/my_app/chat/message.ex
defmodule MyApp.Chat.Message do
use Ecto.Schema
import Ecto.Changeset
@derive Jason.Encoder
@primary_key false
embedded_schema do
field :user, :string
field :text, :string
field :ts, :utc_datetime
end
def new(attrs) do
%__MODULE__{ts: DateTime.utc_now()}
|> changeset(Map.new(attrs))
|> apply_action!(:new)
end
def changeset(message \\ %__MODULE__{}, attrs) do
message
|> cast(attrs, [:user, :text, :ts])
|> validate_required([:user, :text])
end
def serializer do
%{
serialize: &Jason.encode!/1,
deserialize: fn json ->
attrs = Jason.decode!(json)
%__MODULE__{} |> cast(attrs, [:user, :text, :ts]) |> apply_action!(:load)
end
}
end
end
```
```elixir
# lib/my_app/application.ex
def start(_type, _args) do
children = [
MyApp.S2
]
Supervisor.start_link(children, strategy: :one_for_one)
end
```
```elixir
# lib/my_app/chat.ex
defmodule MyApp.Chat do
use MyApp.S2, serializer: MyApp.Chat.Message.serializer()
def create_room(room), do: create_stream("chat/#{room}")
end
```
## Control Plane
All control plane functions take an opts keyword list with `server: client` (and `basin: "name"` where required).
First, create a client:
```elixir
config = S2.Config.new(base_url: "https://aws.s2.dev", token: "my-token")
client = S2.Client.new(config)
```
### Basins
```elixir
{:ok, basins} = S2.Basins.list_basins(server: client)
{:ok, basin} = S2.Basins.create_basin(%S2.CreateBasinRequest{basin: "my-basin"}, server: client)
{:ok, config} = S2.Basins.get_basin_config("my-basin", server: client)
:ok = S2.Basins.delete_basin("my-basin", server: client)
```
### Streams
```elixir
{:ok, streams} = S2.Streams.list_streams(server: client, basin: "my-basin")
{:ok, stream} = S2.Streams.create_stream(%S2.CreateStreamRequest{stream: "my-stream"}, server: client, basin: "my-basin")
{:ok, config} = S2.Streams.get_stream_config("my-stream", server: client, basin: "my-basin")
:ok = S2.Streams.delete_stream("my-stream", server: client, basin: "my-basin")
```
### Access Tokens
Issue scoped, expiring tokens for clients. Useful for giving a browser read-only access to a specific stream (e.g. for real-time updates over SSE or a WebSocket bridge).
```elixir
# Issue a read-only token scoped to a single basin
{:ok, resp} = S2.AccessTokens.issue_access_token(
%S2.AccessTokenInfo{
expires_at: DateTime.add(DateTime.utc_now(), 3600, :second),
scope: %S2.AccessTokenScope{
basins: %{"my-basin" => %{}},
op_groups: %S2.PermittedOperationGroups{
stream: %S2.ReadWritePermissions{read: true, write: false}
}
}
},
server: client
)
# resp.access_token is the bearer token string — send it to the client
# The client can then connect to the S2 data plane directly to read streams
{:ok, tokens} = S2.AccessTokens.list_access_tokens(server: client)
:ok = S2.AccessTokens.revoke_access_token("token-id", server: client)
```
You can also scope tokens to specific operations via the `ops` field (e.g. `["read", "check-tail"]`) or to specific streams via the `streams` field.
### Metrics
```elixir
{:ok, metrics} = S2.Metrics.account_metrics(server: client)
{:ok, metrics} = S2.Metrics.basin_metrics("my-basin", server: client)
{:ok, metrics} = S2.Metrics.stream_metrics("my-basin", "my-stream", server: client)
```
## Data Plane
Data plane operations use S2S-framed protobuf over a persistent Mint HTTP/2 connection. All calls return an updated `conn` for connection reuse.
### Single Request
```elixir
{:ok, conn} = S2.S2S.Connection.open("https://aws.s2.dev")
# Append records
input = %S2.V1.AppendInput{records: [%S2.V1.AppendRecord{body: "hello"}]}
{:ok, ack, conn} = S2.S2S.Append.call(conn, "my-basin", "my-stream", input)
# Read records
{:ok, batch, conn} = S2.S2S.Read.call(conn, "my-basin", "my-stream", seq_num: 0)
# Check tail position
{:ok, position, conn} = S2.S2S.CheckTail.call(conn, "my-basin", "my-stream")
```
### Streaming Append
```elixir
{:ok, session} = S2.S2S.AppendSession.open(conn, "my-basin", "my-stream")
input = %S2.V1.AppendInput{records: [%S2.V1.AppendRecord{body: "event-1"}]}
{:ok, ack, session} = S2.S2S.AppendSession.append(session, input)
{:ok, session} = S2.S2S.AppendSession.close(session)
```
### Streaming Read
```elixir
{:ok, session} = S2.S2S.ReadSession.open(conn, "my-basin", "my-stream", seq_num: 0)
{:ok, batch, session} = S2.S2S.ReadSession.next_batch(session)
# batch.records contains the records
{:ok, session} = S2.S2S.ReadSession.close(session)
```
Read options: `:seq_num`, `:count`, `:wait`, `:until`, `:clamp`, `:tail_offset`.
### Process Affinity
Streaming sessions are **not safe to share across processes**. The underlying Mint connection delivers TCP messages to the owning process's mailbox. Create and use sessions within the same process.
## Patterns
`S2.Store` automatically handles all of this for you. You only need this section if you're using the data plane directly.
Under the hood, every `append` and `listen` call runs through a pipeline that handles chunking, framing, deduplication, and serialization — mirroring the [TypeScript SDK patterns](https://github.com/s2-streamstore/s2-sdk-typescript/tree/main/packages/patterns).
| Step | Write side | Read side |
|------|-----------|-----------|
| 1 | Serialize term to binary | Filter duplicate records |
| 2 | Chunk binary into sub-1 MiB pieces | Reassemble chunks into complete message |
| 3 | Frame chunks with reassembly headers | Deserialize binary back to term |
| 4 | Stamp with writer ID + dedupe sequence | |
If you're working with the data plane directly, you can use the patterns modules yourself:
```elixir
alias S2.Patterns.Serialization
serializer = %{serialize: &Jason.encode!/1, deserialize: &Jason.decode!/1}
# Writing — serialize, chunk, frame, and stamp for dedup
writer = Serialization.writer()
{input, writer} = Serialization.prepare(writer, %{"event" => "signup"}, serializer)
{:ok, ack, conn} = S2.S2S.Append.call(conn, "my-basin", "my-stream", input)
# Reading — dedup, reassemble, and deserialize
reader = Serialization.reader()
{:ok, batch, conn} = S2.S2S.Read.call(conn, "my-basin", "my-stream", seq_num: 0)
{messages, reader} = Serialization.decode(reader, batch.records, serializer)
```
## Architecture
### How `S2.Store` works
When you call `MyApp.S2.append("chat/general", message)`, here's what happens:
```
MyApp.S2 (Supervisor)
├── Registry — maps stream names to worker pids
├── DynamicSupervisor — owns stream workers
│ ├── StreamWorker("chat/general") — own connection + open AppendSession
│ ├── StreamWorker("chat/random") — own connection + open AppendSession
│ └── ...started lazily on first append
├── ControlPlane — shared JSON client for create/delete stream
└── listener Tasks — each spawned with own connection + ReadSession
```
- **One process per stream.** Each stream gets its own `StreamWorker` GenServer with a dedicated Mint HTTP/2 connection and a persistent `AppendSession`. Appends to different streams run in parallel.
- **Workers start lazily.** The first `append("chat/general", ...)` starts a worker for that stream. Subsequent appends reuse the open session — no handshake overhead.
- **Listeners are independent.** Each `listen` call spawns a Task with its own connection and `ReadSession`, tailing the stream and calling your callback as messages arrive. This is required because Mint delivers TCP data to the owning process's mailbox.
- **Control plane is shared.** `create_stream` and `delete_stream` go through a single `ControlPlane` GenServer using the JSON/Req client. These are infrequent operations that don't need per-stream isolation.
- **Automatic reconnection with backoff.** If a TCP connection drops, both appends and listeners reconnect transparently with exponential backoff (configurable via `max_retries` and `base_delay`). Append workers detect the failure on the next send, open a new connection and session, and retry — the caller just gets back `{:ok, ack}`. Listeners detect the drop when the read times out, then reconnect from the last successfully read sequence number so no messages are lost. The dedupe writer preserves its ID and sequence across reconnects, so if a message was written but the ack was lost, readers filter the duplicate automatically.
- **Backpressure.** Each stream worker monitors its mailbox depth. If pending appends exceed `max_queue_size`, the worker returns `{:error, :overloaded}` immediately instead of buffering without bound.
- **Telemetry.** All operations emit `:telemetry` events under the `[:s2, :store, ...]` prefix — append start/stop/exception, reconnect attempts, and listener connections. Attach your own handlers for metrics, logging, or alerting.
### Protocol layers
| Layer | Transport | Encoding | Library |
|-------|-----------|----------|---------|
| `S2.Store` | Managed | Managed | — |
| Control plane (basins, streams, tokens, metrics) | HTTP/1.1 or 2 | JSON | Req |
| Data plane (append, read, check tail) | HTTP/2 | S2S-framed Protobuf | Mint |
`S2.Store` is the recommended way to use the SDK. The control and data plane modules below it are available if you need lower-level access.
## Guarantees
### From S2 (the server)
- **Durability.** Appended records are persisted and survive restarts.
- **Ordering.** Records within a stream are totally ordered by sequence number.
- **At-least-once delivery.** Readers see every record at least once.
### From this SDK
- **Automatic reconnection.** If a TCP connection drops, appends reconnect and retry transparently with exponential backoff. Listeners reconnect from the last read sequence number — no messages are lost.
- **No duplicates.** Each writer stamps records with a unique ID and monotonic sequence. If a message was written but the ack was lost, the retry produces a duplicate on the wire, but readers filter it out automatically.
- **Large message support.** Messages over 1 MiB are chunked on write and reassembled on read. You don't need to think about record size limits.
- **Stream isolation.** Each stream gets its own process and connection. A slow or failed stream doesn't block others.
- **Backpressure.** Stream workers reject appends with `{:error, :overloaded}` when the mailbox exceeds `max_queue_size`, preventing unbounded memory growth.
- **Supervised workers.** Stream workers are managed by a DynamicSupervisor. If a worker crashes, the supervisor restarts it and the next append picks up where it left off.
- **Validate on write, not on read.** Serializers cast fields on read without validation, so schema changes don't break deserialization of old messages.
## Testing
98.8% test coverage (with Toxiproxy network fault tests). The remaining uncovered lines are exhaustive pattern match arms that can't be triggered — see `test/test_helper.exs` for details.
Coverage threshold is set to 95% — CI fails if coverage drops below that.
## License
MIT