README.md

# Chatx

Erlang-first realtime chat engine for BEAM

The hot path runs in Erlang only (validation, middleware, routing, fanout, queueing). Elixir is a thin API and adapter layer for easy integration.

Chatx is not just a behaviour scaffold. It already provides the full core chat engine out of the box (rooms, fanout, queueing, middleware execution, rate limiting, backpressure, and presence). You only add adapter modules when you want to customize integrations such as persistence, notifications, moderation, or auth.

![alt text](image.png)

---

## Step-by-Step Integration Guide

This guide walks through adding chatx to an existing Elixir/Phoenix application from scratch.

### Step 1 — Add the dependency

```elixir
# mix.exs
defp deps do
  [
    {:chatx, "~> 0.1.0"}
  ]
end
```

```bash
mix deps.get
```

---

### Step 2 — Create your Chat module

Add `use Chatx` in a module of your application. This generates `join/2`, `leave/2`, `subscribe/1`, and `send_message/3` delegates.

This module is an API wrapper for your app boundary. The chat runtime logic still lives inside chatx itself.

```elixir
# lib/my_app/chat.ex
defmodule MyApp.Chat do
  use Chatx
end
```

---

### Step 3 — Configure adapters

chatx ships with no-op defaults for all adapters. Override only what you need.

```elixir
# config/config.exs
config :chatx,
  persistence: MyApp.Chat.Persistence,    # saves messages (e.g. to DB)
  notifier:    MyApp.Chat.Notifier,       # push notifications, emails, etc.
  moderation:  MyApp.Chat.Moderation,     # content moderation
  auth:        MyApp.Chat.Auth            # authorize join/send per user+room
```

> If you don't need a particular adapter, omit it — chatx uses safe no-ops automatically.
>
> In other words: you can start with zero custom adapter modules and still have a working chat system. Implement adapters only for app-specific behavior (for example writing to your DB or calling external services).

---

### Step 4 — (Optional) Implement adapters

Each adapter is a small module that implements a `@behaviour`:

```elixir
# Persist messages to your database
defmodule MyApp.Chat.Persistence do
  @behaviour Chatx.Adapter.Persistence

  def save_message(%{room_id: room_id, user_id: user_id, content: content} = _msg) do
    # e.g. Repo.insert(%Message{room_id: room_id, user_id: user_id, body: content})
    :ok
  end
end

# Block certain content before it reaches other users
defmodule MyApp.Chat.Moderation do
  @behaviour Chatx.Adapter.Moderation

  def check(content) do
    if String.contains?(content, "spam") do
      {:error, :blocked}
    else
      :ok
    end
  end
end

# Only allow authenticated users to join or send
defmodule MyApp.Chat.Auth do
  @behaviour Chatx.Adapter.Auth

  def authorize(user_id, _room_id) do
    if MyApp.Accounts.user_exists?(user_id), do: :ok, else: {:error, :unauthorized}
  end
end
```

---

### Step 5 — Join, subscribe, and send

Every process (Phoenix Channel, LiveView, GenServer, etc.) can interact with a room:

```elixir
# Join the room and register this PID as a participant
:ok = MyApp.Chat.join("room:general", current_user.id)

# Subscribe this PID to incoming messages
:ok = MyApp.Chat.subscribe("room:general")

# Send a message — runs through the full Erlang middleware pipeline
:ok = MyApp.Chat.send_message("room:general", current_user.id, "Hello everyone!")

# Leave the room
:ok = MyApp.Chat.leave("room:general", current_user.id)
```

Incoming messages arrive in the subscribing process mailbox as:

```elixir
{:chat_message, %{
  id:          123456789,
  room_id:     "room:general",
  user_id:     "user-42",
  content:     "Hello everyone!",
  inserted_at: 1712345678000   # milliseconds
}}
```

---

### Step 6 — Wire into a Phoenix Channel

```elixir
# lib/my_app_web/channels/room_channel.ex
defmodule MyAppWeb.RoomChannel do
  use Phoenix.Channel
  alias MyApp.Chat

  def join("room:" <> room_id, _params, socket) do
    :ok = Chat.join(room_id, socket.assigns.user_id)
    :ok = Chat.subscribe(room_id)
    {:ok, assign(socket, :room_id, room_id)}
  end

  def handle_in("new_message", %{"content" => content}, socket) do
    case Chat.send_message(socket.assigns.room_id, socket.assigns.user_id, content) do
      :ok              -> {:reply, :ok, socket}
      {:error, reason} -> {:reply, {:error, %{reason: inspect(reason)}}, socket}
    end
  end

  # Relay fanout messages to the client over WebSocket
  def handle_info({:chat_message, msg}, socket) do
    push(socket, "new_message", msg)
    {:noreply, socket}
  end
end
```

Add the channel to your socket:

```elixir
# lib/my_app_web/user_socket.ex
channel "room:*", MyAppWeb.RoomChannel
```

---

### Step 7 — Wire into a Phoenix LiveView

```elixir
# lib/my_app_web/live/chat_live.ex
defmodule MyAppWeb.ChatLive do
  use MyAppWeb, :live_view
  alias MyApp.Chat

  def mount(%{"room_id" => room_id}, session, socket) do
    user_id = session["user_id"]

    if connected?(socket) do
      :ok = Chat.join(room_id, user_id)
    end

    {:ok, assign(socket, room_id: room_id, user_id: user_id) |> stream(:messages, [])}
  end

  def handle_event("send_message", %{"content" => content}, socket) do
    Chat.send_message(socket.assigns.room_id, socket.assigns.user_id, content)
    {:noreply, assign(socket, message_input: "")}
  end

  # chatx pushes {:chat_message, msg} to every subscribed PID — including this LV
  def handle_info({:chat_message, msg}, socket) do
    {:noreply, stream_insert(socket, :messages, msg)}
  end
end
```

---

### Step 8 — (Optional) Tune behaviour

```elixir
# config/config.exs
config :chatx,
  # Middleware execution order (all run in Erlang)
  middlewares: [:auth, :rate_limit, :moderation],

  # Safety limits
  message_size_limit: 4096,       # bytes
  room_queue_limit:   2048,       # messages buffered per room before backpressure

  # Token-bucket rate limiter per {user_id, room_id}
  rate_limit: [messages_per_sec: 10, burst: 20],

  # Async delivery worker
  delivery_batch_size:    100,
  delivery_tick_ms:       100,
  delivery_max_retries:   5,
  delivery_retry_base_ms: 100,

  # Multi-node room placement: :local | :pg | :horde
  cluster_strategy: :local
```

---

## Erlang-First Architecture

- One room = one process
- ETS for presence, room index, token bucket state
- Async delivery worker with retry and exponential backoff
- Middleware chain in Erlang with configurable order
- Cluster-aware room placement with deterministic node selection

## Advanced Config

```elixir
config :chatx,
  # adapters
  persistence: MyApp.ChatPersistence,
  notifier: MyApp.ChatNotifier,
  moderation: MyApp.ChatModeration,
  auth: MyApp.ChatAuth,

  # middleware order (Erlang pipeline)
  middlewares: [:auth, :rate_limit, :moderation],

  # security + flow control
  message_size_limit: 4096,
  room_queue_limit: 2048,
  rate_limit: [messages_per_sec: 10, burst: 20],

  # async delivery
  delivery_batch_size: 100,
  delivery_tick_ms: 100,
  delivery_max_retries: 5,
  delivery_retry_base_ms: 100,

  # multi-node strategy: :local | :pg | :horde
  cluster_strategy: :pg
```

## Adapter Contracts

```elixir
defmodule MyApp.ChatPersistence do
  @behaviour Chatx.Adapter.Persistence
  def save_message(message), do: :ok
end

defmodule MyApp.ChatNotifier do
  @behaviour Chatx.Adapter.Notifier
  def notify(_user_id, _message), do: :ok
end

defmodule MyApp.ChatModeration do
  @behaviour Chatx.Adapter.Moderation
  def check(content), do: if(String.contains?(content, "blocked"), do: {:error, :blocked}, else: :ok)
end

defmodule MyApp.ChatAuth do
  @behaviour Chatx.Adapter.Auth
  def authorize(_user_id, _room_id), do: :ok
end
```

## Multi-Node Behavior

- `:local`: single node, local room ownership
- `:pg`: deterministic owner node for a room + pg membership lookup
- `:horde`: currently compatible fallback through pg path unless Horde registry is available

Room placement is deterministic (`chatx_placement`) so the same room lands on the same owner node across the cluster view.

## Security and Cost Controls

- Strict binary/type/UTF-8 validation in Erlang before fanout
- Hard payload size limit
- Token-bucket rate limiting per user and room
- Room-level backpressure (`room_queue_limit`)
- Adapter boundary hardening (exceptions and invalid return values are contained)
- `message_queue_data: off_heap` in busy processes to reduce GC pressure

## Telemetry

- `[:chatx, :message_sent]`
- `[:chatx, :room_join]`
- `[:chatx, :room_leave]`
- `[:chatx, :broadcast]`
- `[:chatx, :queue_size]`
- `[:chatx, :queue_overflow]`
- `[:chatx, :rate_limit_hit]`

## Phoenix Demo Project

A fully wired Phoenix demo app lives in [`examples/chatx_phx_demo/`](examples/chatx_phx_demo/)
It includes a LiveView chat UI, a Phoenix Channel, and a path dependency on this package

```bash
cd examples/chatx_phx_demo
mix deps.get
mix phx.server
# open http://localhost:4000 in two tabs and chat
```

![alt text](image-1.png)

![alt text](image-2.png)