# Changelog
## [0.3.0](https://github.com/intility/off_broadway_emqtt/compare/v0.2.1...v0.3.0) (2026-04-20)
_Breaking changes and reliability rework._
This release removes the ETS/disk-log buffer layer and replaces it with proper
MQTT protocol-level backpressure. The result is a simpler, more reliable producer
that correctly participates in QoS 1/2 guarantees.
### Breaking changes
- **ETS buffer removed.** The options `buffer_size`, `buffer_overflow_strategy`,
`buffer_durability`, and `buffer_log_dir` no longer exist. Remove them from
your configuration. Backpressure is now handled entirely by `max_inflight` and
the MQTT protocol.
- **`clean_start` defaults to `false`** (was `true`). With the old default, the
broker discarded the session on every reconnect, silently losing all unACKed
QoS 1/2 messages. The new default preserves the session so the broker redelivers
unACKed messages after a producer restart. If you explicitly want a fresh session
on each connect, set `config: [clean_start: true]`.
- **`topics` is now required.** Previously it defaulted to `[]`, starting a
producer that subscribed to nothing. Omitting `topics` now raises at startup.
- **`MessageHandler` behaviour simplified.** The callbacks `handle_connect/1`,
`handle_disconnect/1`, and `handle_pubrel/1` have been removed. Only
`handle_message/3` remains. Custom handlers implementing the removed callbacks
must delete them.
- **`concurrency > 1` requires `shared_group`.** Starting a pipeline with multiple
producer instances and no `shared_group` now raises at startup. Without shared
subscriptions every producer receives every message, causing duplicates.
- **Client ID suffix `_N` is now always appended.** Each producer instance connects
with `{clientid}_0`, `{clientid}_1`, etc. A pipeline that previously connected as
`my-client` now connects as `my-client_0`. If you have a persistent broker session
keyed by the exact client ID, update the `clientid` in your config to match the new
suffix (e.g. `clientid: "my-client_0"`) or accept that the session will be treated
as new on first connect.
- **emqtt bumped to `~> 1.14`**, cowlib to `~> 2.13.0`.
### Bug fixes
- **QoS 2 acknowledgement fixed.** The `pubcomp` step was incorrectly calling
`:emqtt.pubrec` instead of `:emqtt.pubcomp`, breaking the QoS 2 handshake
entirely. QoS 2 exactly-once delivery now works correctly.
### New features
- **Protocol-level backpressure via `max_inflight`.** The broker stops delivering
new messages once `max_inflight` unACKed QoS 1/2 messages are outstanding.
Default is 100. For MQTT v5, `Receive-Maximum` is automatically set in the
CONNECT properties so the broker enforces the limit server-side.
- **`shared_group` option** for distributing messages across multiple producer
instances using MQTT shared subscriptions (`$share/{group}/{topic}`).
- **New `config` options:** `reconnect`, `reconnect_timeout`, `low_mem`.
- **`connection: :down` telemetry event** emitted when the MQTT connection is lost,
with `%{client_id: string, producer_index: integer, reason: term}` metadata.
- **Clearer startup errors.** A `Logger.error` message including host and port is
emitted when the producer fails to connect to the broker.
- **`persistent_term` cleanup.** The ack-options entry written at pipeline startup
is now erased when the pipeline stops, preventing accumulation in long-running
applications that start and stop pipelines dynamically.
### Migration from v0.2.x
#### Remove buffer options
The ETS/disk-log buffer has been removed. Delete these options from your producer config:
```elixir
# Remove all of these:
buffer_size: 10_000,
buffer_overflow_strategy: :drop_head,
buffer_durability: :durable,
buffer_log_dir: System.tmp_dir!(),
```
Backpressure is now handled by `max_inflight`.
#### Review clean_start
`clean_start` now defaults to `false` (was `true`). This is the safer default: the broker
redelivers unACKed QoS 1/2 messages after a restart instead of discarding them.
If your pipeline was relying on the broker discarding the session on reconnect, add
`config: [clean_start: true]` explicitly. Be aware this means unACKed messages are lost
on every restart.
#### Remove MessageHandler callbacks
If you implemented a custom `MessageHandler`, delete any `handle_connect/1`, `handle_disconnect/1`,
or `handle_pubrel/1` callbacks. Only `handle_message/3` is part of the behaviour.
#### Add shared_group for concurrency > 1
If you were running with `concurrency > 1`, you must now add `shared_group`:
```elixir
# Before (would cause duplicate messages):
producer: [
module: {OffBroadway.EMQTT.Producer, topics: [{"my/topic", 1}], config: [...]},
concurrency: 3
]
# After:
producer: [
module: {OffBroadway.EMQTT.Producer,
topics: [{"my/topic", 1}],
shared_group: "my-pipeline",
config: [...]
},
concurrency: 3
]
```
#### Update dependencies
```elixir
{:off_broadway_emqtt, "~> 0.3.0"}
```
---
## [0.2.1](https://github.com/intility/off_broadway_emqtt/compare/v0.2.0...v0.2.1) (2025-06-05)
- If `:clean_start` option is `true`, truncate the buffer log file and skip replay when the producer starts.
- Properly disconnect from the MQTT broker on terminate.
## [0.2.0](https://github.com/intility/off_broadway_emqtt/compare/v0.1.0...v0.2.0) (2025-06-03)
- Add support for wrapping the ETS buffer cache with [:disk_log](https://www.erlang.org/docs/17/man/disk_log) to persist cached messages for producer.
- Introduced new option `buffer_durability` which can be either `:durable` or `:transient`. When `:durable`,
messages will be persisted to disk to ensure messages are not lost if the producer crashes. Defaults to
`:transient` (in-memory buffer only).
- New option `buffer_log_dir` can be either a string, or a zero-arity function that returns the directory to
store buffer logs.
- Added new telemetry events for `:durable` buffer operations.
## 0.1.1 (unreleased)
_Never tagged. Changes rolled into later releases._
- Emitting `off_broadway_emqtt.receive_message.ack` reads message topic from message receipt instead of from the message body.
This ensures that topic is included in telemetry events even if the message has been altered during dispatch.
- Move `emqtt.start_link/1` and `emqtt.connect/1` to a `handle_continue/2` callback to prevent blocking `GenServer.init/1`.
- Convert `host` and `server_name_indication` to charlist when validating options.
- Return new state from `handle_continue` on connection error.
- Publish the `payload` field as message data, and the rest as metadata.
## 0.1.0 (2024-09-24)
_Initial release._
The initial release supports connecting to an MQTT broker using [emqtt](https://github.com/emqx/emqtt),
and consume messages using a Broadway pipeline.
**Supported features**
- [x] Support most `emqtt` configurable options as producer config options.
- [x] Specify buffer size and overflow strategy for the `ets` table buffer.
- [x] `OffBroadway.EMQTT.MessageHandler` behaviour to support overriding default implementation.
- [x] Telemetry events for observability.