# AGENTS.md
Guidance for AI coding assistants integrating **MqttX** into a project.
Read this before suggesting code that uses this library — it captures the
mental model and the mistakes agents most often make.
> Modifying MqttX itself? See `CONTRIBUTING.md` for repo layout, test commands,
> and deferred work.
## What MqttX is
A single hex package (`{:mqttx, "~> 0.10"}`) that ships **three independent
pieces** — choose only what you need:
| Piece | Module | Use when |
|-------|--------|----------|
| **Wire codec** | `MqttX.Packet.Codec` | You have your own transport and just need encode/decode |
| **Client** | `MqttX.Client` | Your app connects to an MQTT broker (AWS IoT, EMQX, HiveMQ, Mosquitto, …) |
| **Broker** | `MqttX.Server` | You are *running* an MQTT broker (e.g. an IoT backend that owns its devices) |
Most apps want only the **client**. Build a broker only when you need to own
the message routing — for talking to a third-party broker, the client is
sufficient on its own.
## Picking a transport
The codec is dep-free; transports are optional packages:
| Transport | Add to deps |
|-----------|-------------|
| TCP / TLS client (`tcp` / `ssl`) | nothing extra |
| WebSocket client (`ws` / `wss`) | nothing extra (RFC 6455 client is built-in) |
| TCP server | `{:thousand_island, "~> 1.4"}` (preferred) or `{:ranch, "~> 2.2"}` |
| WebSocket server | `{:bandit, "~> 1.6"} + {:websock_adapter, "~> 0.5"}` |
If `MqttX.Transport.ThousandIsland` (or `Ranch`, or `Bandit`) fails at server
startup with an undefined-module / undefined-function error, the
corresponding optional dep is missing from `mix.exs` — that's the single most
common setup mistake.
## Mental model — client side
```
your code ──MqttX.Client.subscribe──▶ broker
your code ──MqttX.Client.publish───▶ broker
│
MqttX.Client ◀─PUBLISH──────┘
│
▼
handler_module.handle_mqtt_event(:message, {topic, payload, packet}, state)
```
- The client is a **GenServer**. You don't poll it — it pushes events to a
handler module.
- `MqttX.Client.connect/1` blocks until CONNACK arrives, so once it returns
`{:ok, pid}` the session is live and you can immediately subscribe/publish.
- `subscribe/3` is synchronous and **waits for SUBACK** before returning
`{:ok, granted_qos_list}`. `publish/4` returns `:ok` as soon as the packet
is written to the socket (it does not wait for PUBACK at QoS 1/2 — those
acks are tracked in the background and surfaced via the handler module).
- If the connection has dropped (and not yet reconnected), `subscribe`,
`publish`, and `unsubscribe` return `{:error, :not_connected}` immediately —
they do not queue.
- The handler module implements **`handle_mqtt_event/3`**, which receives:
- `(:connected, %{properties: props, session_present: bool}, state)` — after CONNACK success
- `(:disconnected, reason, state)` — `reason` is `:closed`, `:pingresp_timeout`, `{:error, posix}`, or `{:server_disconnect, code, props}`
- `(:message, {topic, payload, full_packet}, state)` — for each PUBLISH
`topic` arrives as a **list of segments** (`["sensors", "room1", "temp"]`),
not the original string — use `Enum.join(topic, "/")` if you need to round-trip.
## Mental model — broker side
`use MqttX.Server` defines a behaviour with one callback per MQTT verb:
```
device ──CONNECT──▶ handle_connect(client_id, creds, info, state)
device ──SUBSCRIBE─▶ handle_subscribe(topics, state) → grant per-topic QoS
device ──PUBLISH──▶ handle_publish(topic, payload, opts, state)
device ──DISCONNECT▶ handle_disconnect(reason, state)
your app ──send(broker_pid, msg)─▶ handle_info(msg, state)
└─▶ {:publish, topic, payload, state} (fan out to device)
```
Servers are *per-connection* state machines — `state` is one device's state.
For app-wide state (subscriber registry, message bus), use Phoenix.PubSub or
`:pg` from inside the callbacks.
## Idiomatic patterns
### Receive messages on the client
```elixir
defmodule MyApp.MqttHandler do
def handle_mqtt_event(:connected, _info, state), do: state
def handle_mqtt_event(:disconnected, _reason, state), do: state
def handle_mqtt_event(:message, {topic, payload, _packet}, state) do
Logger.info("got #{payload} on #{Enum.join(topic, "/")}")
state
end
end
{:ok, c} = MqttX.Client.connect(
host: "broker.example.com",
client_id: "my-app-#{node()}",
handler: MyApp.MqttHandler,
handler_state: %{}
)
{:ok, _granted} = MqttX.Client.subscribe(c, "sensors/#", qos: 1)
```
### Bridge MQTT broker ↔ Phoenix.PubSub (fan-out)
```elixir
defmodule MyBroker do
use MqttX.Server
def init(_), do: %{}
def handle_connect(client_id, _creds, _info, state) do
Phoenix.PubSub.subscribe(MyApp.PubSub, "downlink:#{client_id}")
{:ok, Map.put(state, :client_id, client_id)}
end
def handle_publish(topic, payload, _opts, state) do
Phoenix.PubSub.broadcast(MyApp.PubSub, "uplink", {state.client_id, topic, payload})
{:ok, state}
end
def handle_info({:downlink, topic, payload}, state) do
{:publish, topic, payload, %{qos: 1, retain: false}, state}
end
def handle_subscribe(topics, s), do: {:ok, Enum.map(topics, & &1.qos), s}
def handle_disconnect(_r, _s), do: :ok
end
# elsewhere in your app:
Phoenix.PubSub.broadcast(MyApp.PubSub, "downlink:device-123",
{:downlink, "device-123/cmd", "reboot"})
```
### MQTT 5 persistent sessions (resume QoS 1/2 across reconnects)
```elixir
MqttX.Client.connect(
host: "broker.example.com",
client_id: "stable-id-not-uuid", # MUST be stable across reconnects
protocol_version: 5, # required for properties
clean_session: false,
connect_properties: %{session_expiry_interval: 3600},
session_store: MqttX.Session.ETSStore # client-side persistence
)
```
### Custom auth (reject CONNECT)
```elixir
def handle_connect(client_id, %{username: u, password: p}, _info, state) do
case MyApp.Auth.verify(u, p) do
{:ok, _} -> {:ok, state}
:error -> {:error, 0x86, state} # 0x86 = Bad User Name or Password
end
end
```
Reason codes worth knowing: `0x80` Unspecified, `0x86` Bad credentials,
`0x87` Not authorized, `0x95` Packet too large, `0x97` Quota exceeded.
Full list in MQTT 5.0 §2.4.
## Common mistakes (do not do these)
- **Wildcards in PUBLISH.** `+` and `#` are subscribe-only — publishing them
is a Protocol Error and the broker will disconnect. Validate with
`MqttX.Topic.validate_publish/1` for any topic that mixes user input.
- **Using `handle_publish/4` on the client.** That's a *server* callback. The
client receives PUBLISHes via `handle_mqtt_event(:message, …)`. They are
*not* the same callback — agents confuse this constantly.
- **`clean_session: false` without `:session_store`.** The flag tells the
*broker* to keep state. For the *client* to remember in-flight QoS 1/2
across reconnects, also pass `:session_store`.
- **Random `client_id` per connect.** Sessions, retained messages, and shared
subscriptions are all keyed by `client_id`. `UUID.uuid4()` per connect
silently breaks all three.
- **Picking QoS 2 by default.** QoS 2 is a 4-packet handshake (PUBLISH →
PUBREC → PUBREL → PUBCOMP) — use it only when *duplicate delivery would
cause real harm* (financial transactions). For telemetry use QoS 0; for
commands use QoS 1.
- **Expecting `#` to match `$SYS/...`.** Per MQTT §4.7.2, `$`-prefixed topics
require explicit subscription. `subscribe(c, "#")` does **not** receive
`$SYS/broker/uptime`.
- **Treating `MqttX.Server.Router` as a public pubsub.** It is the broker's
internal subscription index. To send messages between processes, use
Phoenix.PubSub or `:pg`, then bridge via the broker callback.
- **Setting `keepalive` higher than the cloud-proxy idle timeout.** Fly.io,
AWS NLB, Azure Front Door all idle TCP at ~60s. Use ≤ 30s for cellular IoT
or set `server_keep_alive: 30` in `transport_opts` to enforce it server-side
for v5 clients.
- **Assuming retained = "all past messages".** Retain stores the **last**
message per topic only — it's a "current state" mechanism, not a history.
- **Ignoring CONNACK reason codes.** `MqttX.Client.connect/1` returns
`{:ok, pid}` only on success; on broker rejection it returns
`{:error, {:connack_error, reason_code, %{server_reference: ref_or_nil}}}`
(e.g. `0x84` for unsupported version, `0x86` for bad credentials, `0x9C` to
use the included server reference for redirect).
## Decision helpers
- **Topic structure:** prefer hierarchy that matches your subscription
patterns. `tenant/{id}/device/{id}/telemetry/{metric}` lets `tenant/+/device/+/telemetry/#`
fan out cleanly. Avoid encoding multiple dimensions into one segment.
- **Payload format:** Protobuf for cellular IoT (5-10× smaller than JSON);
JSON for backend interop where payload size doesn't dominate.
- **`max_inflight` on the client:** default 100. Raise only if your broker
advertises a high `receive_maximum` *and* you're seeing throughput limits;
otherwise increasing it just delays backpressure.
- **Shared subscriptions** (`$share/group/topic`): use to load-balance
consumers, not to broadcast. Each message goes to exactly one subscriber
in the group. Subscribing to the same topic from N clients without `$share`
delivers N copies.
## Where to find authoritative answers
- **Public API:** [hexdocs.pm/mqttx](https://hexdocs.pm/mqttx) — `@doc`
strings on every public function
- **Worked examples:** `README.md` ("Common Patterns") and the integration
tests at `test/mqttx/integration_test.exs` and
`test/mqttx/interop_emqx_test.exs`
- **Recent behavior changes:** `CHANGELOG.md` — the `[0.10.0]` entry
documents the v0.10.0 spec sweep, which tightened many edge cases that older
examples on the internet may not reflect
- **MQTT spec:** OASIS [3.1.1](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/) /
[5.0](https://docs.oasis-open.org/mqtt/mqtt/v5.0/) — section references in
this codebase (e.g. `§3.3.1.2`) point here