# Chatx
Erlang-first realtime chat engine for BEAM
The hot path runs in Erlang only (validation, middleware, routing, fanout, queueing). Elixir is a thin API and adapter layer for easy integration.
Chatx is not just a behaviour scaffold. It already provides the full core chat engine out of the box (rooms, fanout, queueing, middleware execution, rate limiting, backpressure, and presence). You only add adapter modules when you want to customize integrations such as persistence, notifications, moderation, or auth.

---
## Step-by-Step Integration Guide
This guide walks through adding chatx to an existing Elixir/Phoenix application from scratch.
### Step 1 — Add the dependency
```elixir
# mix.exs
defp deps do
[
{:chatx, "~> 0.1.0"}
]
end
```
```bash
mix deps.get
```
---
### Step 2 — Create your Chat module
Add `use Chatx` in a module of your application. This generates `join/2`, `leave/2`, `subscribe/1`, and `send_message/3` delegates.
This module is an API wrapper for your app boundary. The chat runtime logic still lives inside chatx itself.
```elixir
# lib/my_app/chat.ex
defmodule MyApp.Chat do
use Chatx
end
```
---
### Step 3 — Configure adapters
chatx ships with no-op defaults for all adapters. Override only what you need.
```elixir
# config/config.exs
config :chatx,
persistence: MyApp.Chat.Persistence, # saves messages (e.g. to DB)
notifier: MyApp.Chat.Notifier, # push notifications, emails, etc.
moderation: MyApp.Chat.Moderation, # content moderation
auth: MyApp.Chat.Auth # authorize join/send per user+room
```
> If you don't need a particular adapter, omit it — chatx uses safe no-ops automatically.
>
> In other words: you can start with zero custom adapter modules and still have a working chat system. Implement adapters only for app-specific behavior (for example writing to your DB or calling external services).
---
### Step 4 — (Optional) Implement adapters
Each adapter is a small module that implements a `@behaviour`:
```elixir
# Persist messages to your database
defmodule MyApp.Chat.Persistence do
@behaviour Chatx.Adapter.Persistence
def save_message(%{room_id: room_id, user_id: user_id, content: content} = _msg) do
# e.g. Repo.insert(%Message{room_id: room_id, user_id: user_id, body: content})
:ok
end
end
# Block certain content before it reaches other users
defmodule MyApp.Chat.Moderation do
@behaviour Chatx.Adapter.Moderation
def check(content) do
if String.contains?(content, "spam") do
{:error, :blocked}
else
:ok
end
end
end
# Only allow authenticated users to join or send
defmodule MyApp.Chat.Auth do
@behaviour Chatx.Adapter.Auth
def authorize(user_id, _room_id) do
if MyApp.Accounts.user_exists?(user_id), do: :ok, else: {:error, :unauthorized}
end
end
```
---
### Step 5 — Join, subscribe, and send
Every process (Phoenix Channel, LiveView, GenServer, etc.) can interact with a room:
```elixir
# Join the room and register this PID as a participant
:ok = MyApp.Chat.join("room:general", current_user.id)
# Subscribe this PID to incoming messages
:ok = MyApp.Chat.subscribe("room:general")
# Send a message — runs through the full Erlang middleware pipeline
:ok = MyApp.Chat.send_message("room:general", current_user.id, "Hello everyone!")
# Leave the room
:ok = MyApp.Chat.leave("room:general", current_user.id)
```
Incoming messages arrive in the subscribing process mailbox as:
```elixir
{:chat_message, %{
id: 123456789,
room_id: "room:general",
user_id: "user-42",
content: "Hello everyone!",
inserted_at: 1712345678000 # milliseconds
}}
```
---
### Step 6 — Wire into a Phoenix Channel
```elixir
# lib/my_app_web/channels/room_channel.ex
defmodule MyAppWeb.RoomChannel do
use Phoenix.Channel
alias MyApp.Chat
def join("room:" <> room_id, _params, socket) do
:ok = Chat.join(room_id, socket.assigns.user_id)
:ok = Chat.subscribe(room_id)
{:ok, assign(socket, :room_id, room_id)}
end
def handle_in("new_message", %{"content" => content}, socket) do
case Chat.send_message(socket.assigns.room_id, socket.assigns.user_id, content) do
:ok -> {:reply, :ok, socket}
{:error, reason} -> {:reply, {:error, %{reason: inspect(reason)}}, socket}
end
end
# Relay fanout messages to the client over WebSocket
def handle_info({:chat_message, msg}, socket) do
push(socket, "new_message", msg)
{:noreply, socket}
end
end
```
Add the channel to your socket:
```elixir
# lib/my_app_web/user_socket.ex
channel "room:*", MyAppWeb.RoomChannel
```
---
### Step 7 — Wire into a Phoenix LiveView
```elixir
# lib/my_app_web/live/chat_live.ex
defmodule MyAppWeb.ChatLive do
use MyAppWeb, :live_view
alias MyApp.Chat
def mount(%{"room_id" => room_id}, session, socket) do
user_id = session["user_id"]
if connected?(socket) do
:ok = Chat.join(room_id, user_id)
end
{:ok, assign(socket, room_id: room_id, user_id: user_id) |> stream(:messages, [])}
end
def handle_event("send_message", %{"content" => content}, socket) do
Chat.send_message(socket.assigns.room_id, socket.assigns.user_id, content)
{:noreply, assign(socket, message_input: "")}
end
# chatx pushes {:chat_message, msg} to every subscribed PID — including this LV
def handle_info({:chat_message, msg}, socket) do
{:noreply, stream_insert(socket, :messages, msg)}
end
end
```
---
### Step 8 — (Optional) Tune behaviour
```elixir
# config/config.exs
config :chatx,
# Middleware execution order (all run in Erlang)
middlewares: [:auth, :rate_limit, :moderation],
# Safety limits
message_size_limit: 4096, # bytes
room_queue_limit: 2048, # messages buffered per room before backpressure
# Token-bucket rate limiter per {user_id, room_id}
rate_limit: [messages_per_sec: 10, burst: 20],
# Async delivery worker
delivery_batch_size: 100,
delivery_tick_ms: 100,
delivery_max_retries: 5,
delivery_retry_base_ms: 100,
# Multi-node room placement: :local | :pg | :horde
cluster_strategy: :local
```
---
## Erlang-First Architecture
- One room = one process
- ETS for presence, room index, token bucket state
- Async delivery worker with retry and exponential backoff
- Middleware chain in Erlang with configurable order
- Cluster-aware room placement with deterministic node selection
## Advanced Config
```elixir
config :chatx,
# adapters
persistence: MyApp.ChatPersistence,
notifier: MyApp.ChatNotifier,
moderation: MyApp.ChatModeration,
auth: MyApp.ChatAuth,
# middleware order (Erlang pipeline)
middlewares: [:auth, :rate_limit, :moderation],
# security + flow control
message_size_limit: 4096,
room_queue_limit: 2048,
rate_limit: [messages_per_sec: 10, burst: 20],
# async delivery
delivery_batch_size: 100,
delivery_tick_ms: 100,
delivery_max_retries: 5,
delivery_retry_base_ms: 100,
# multi-node strategy: :local | :pg | :horde
cluster_strategy: :pg
```
## Adapter Contracts
```elixir
defmodule MyApp.ChatPersistence do
@behaviour Chatx.Adapter.Persistence
def save_message(message), do: :ok
end
defmodule MyApp.ChatNotifier do
@behaviour Chatx.Adapter.Notifier
def notify(_user_id, _message), do: :ok
end
defmodule MyApp.ChatModeration do
@behaviour Chatx.Adapter.Moderation
def check(content), do: if(String.contains?(content, "blocked"), do: {:error, :blocked}, else: :ok)
end
defmodule MyApp.ChatAuth do
@behaviour Chatx.Adapter.Auth
def authorize(_user_id, _room_id), do: :ok
end
```
## Multi-Node Behavior
- `:local`: single node, local room ownership
- `:pg`: deterministic owner node for a room + pg membership lookup
- `:horde`: currently compatible fallback through pg path unless Horde registry is available
Room placement is deterministic (`chatx_placement`) so the same room lands on the same owner node across the cluster view.
## Security and Cost Controls
- Strict binary/type/UTF-8 validation in Erlang before fanout
- Hard payload size limit
- Token-bucket rate limiting per user and room
- Room-level backpressure (`room_queue_limit`)
- Adapter boundary hardening (exceptions and invalid return values are contained)
- `message_queue_data: off_heap` in busy processes to reduce GC pressure
## Telemetry
- `[:chatx, :message_sent]`
- `[:chatx, :room_join]`
- `[:chatx, :room_leave]`
- `[:chatx, :broadcast]`
- `[:chatx, :queue_size]`
- `[:chatx, :queue_overflow]`
- `[:chatx, :rate_limit_hit]`
## Phoenix Demo Project
A fully wired Phoenix demo app lives in [`examples/chatx_phx_demo/`](examples/chatx_phx_demo/)
It includes a LiveView chat UI, a Phoenix Channel, and a path dependency on this package
```bash
cd examples/chatx_phx_demo
mix deps.get
mix phx.server
# open http://localhost:4000 in two tabs and chat
```

