[](https://codecov.io/gh/mpol1t/off_broadway_websocket)
[](https://hex.pm/packages/off_broadway_websocket)
[](https://github.com/mpol1t/off_broadway_websocket/blob/main/LICENSE)
[](https://hexdocs.pm/off_broadway_websocket)
[](https://github.com/mpol1t/off_broadway_websocket/actions)
[](https://elixir-lang.org/)
# OffBroadwayWebSocket
An Elixir library providing a **Broadway** producer for resilient WebSocket connections using **gun**. Supports unified `gun_opts`, idle‐timeout detection (ping/pong & data frames), demand‐based dispatch, and custom retry strategies.
---
## Installation
Add to your `mix.exs`:
```elixir
def deps do
[
{:off_broadway_websocket, "~> 1.0.0"}
]
end
```
Fetch & compile:
```bash
mix deps.get
mix deps.compile
```
---
## Quickstart
```elixir
defmodule MyApp.Broadway do
use Broadway
require Logger
alias Broadway.Message
alias Broadway.NoopAcknowledger
def start_link(_args) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producer: [
module: {
OffBroadwayWebSocket.Producer,
# Your WebSocket endpoint:
url: "wss://example.com:443",
path: "/stream/updates",
# Idle timeout (ms) for no ping/data before reconnect:
ws_timeout: 15_000,
# How long to wait (ms) for Gun to come up:
await_timeout: 8_000,
# Retry configuration – must include at least :retries_left and :delay:
ws_retry_opts: %{
max_retries: 5,
retries_left: 5,
delay: 1_000, # initial backoff (ms)
max_delay: 30_000, # cap for backoff (ms)
backoff_factor: 2, # exponential factor
jitter_fraction: 0.1 # ±10% random jitter
},
ws_retry_fun: &MyApp.Backoff.exponential_backoff_with_jitter/1,
# Gun options (TCP/TLS, HTTP, WS):
gun_opts: %{
connect_timeout: 5_000, # TCP/TLS handshake timeout
protocols: [:http], # application protocols
transport: :tls, # :tcp or :tls
tls_opts: [
verify: :verify_peer,
cacertfile: CAStore.file_path(),
depth: 10,
reuse_sessions: false,
verify_fun: {
&:ssl_verify_hostname.verify_fun/3,
[check_hostname: String.to_charlist("example.com")]
}
],
ws_opts: %{
keepalive: 10_000, # send ping if silent
silence_pings: false
},
http_opts: %{
version: :"HTTP/1.1"
}
},
# Prefix for telemetry events:
telemetry_id: :custom_telemetry,
# Optional headers
headers: [
{"X-ABC-APIKEY", "api-key"},
{"X-ABC-PAYLOAD", %{}},
{"X-ABC-SIGNATURE", "signature"}
],
},
transformer: {__MODULE__, :transform, []},
concurrency: 1
],
processors: [
default: [min_demand: 0, max_demand: 100, concurrency: 8]
],
context: []
)
end
@impl true
def handle_message(_stage, %Message{data: raw} = msg, _ctx) do
case Jason.decode(raw) do
{:ok, data} ->
Logger.debug(fn -> "Data: #{inspect(data)}" end)
msg
{:error, err} ->
Logger.error("Decode error: #{inspect(err)}")
Message.failed(msg, err)
end
end
def transform(event, _opts) do
%Broadway.Message{
data: event,
acknowledger: NoopAcknowledger.init()
}
end
end
```
---
## Configuration Options
When calling `OffBroadwayWebSocket.Producer`, you may pass:
- **`:url`** (_string_, required) — WebSocket base URL.
- **`:path`** (_string_, required) — Upgrade path and querystring.
- **`:ws_timeout`** (_ms_, optional) — Idle timeout for no ping/data.
- **`:await_timeout`** (_ms_, optional) — Timeout for `:gun.await_up/2`.
- **`:headers`** (_list_, optional) — HTTP headers for WS upgrade.
- **`:min_demand`** / **`:max_demand`** (_integer_) — Broadway backpressure.
- **`:telemetry_id`** (_atom_) — Prefix for telemetry events.
- **`:gun_opts`** (_map_) — All options forwarded to `:gun.open/3` and friends.
- **`:ws_retry_opts`** (_map_) — Your initial retry state; must include:
- `:retries_left`, `:delay` (ms).
- Extra keys (e.g. `:backoff_factor`, `:jitter_fraction`) are carried through.
- **`:ws_retry_fun`** (_function_) — A `(retry_opts() -> retry_opts())` function.
After each failed connect, the returned map’s `:delay` is used and stored as the next call’s input. After successful
reconnection, `:ws_retry_opts` are reset to initial value.
---
## Default Configuration
Out of the box, `OffBroadwayWebSocket.Producer` uses these defaults:
| Option | Default | Description |
|--------------------|------------------------------------|---------------------------------------------|
| `:url` | **—** | WebSocket URL (required) |
| `:path` | **—** | WebSocket path (required) |
| `:ws_timeout` | `nil` | Idle timeout (ms) for ping/data |
| `:await_timeout` | `10_000` | `gun.await_up/2` timeout (ms) |
| `:headers` | `[]` | Upgrade HTTP headers |
| `:min_demand` | `10` | Broadway `min_demand` |
| `:max_demand` | `100` | Broadway `max_demand` |
| `:telemetry_id` | `:websocket_producer` | Prefix for telemetry events |
| `:gun_opts` | `%{}` | Direct options to `:gun.open/3`, etc. |
| `:ws_retry_opts` | see _Default `ws_retry_opts`_ | Initial retry state |
| `:ws_retry_fun` | `&OffBroadwayWebSocket.State.default_ws_retry_fun/1` | Backoff function contract |
### Default Backoff Function
By default, a constant backoff function is used with the config shown below:
```elixir
%{
max_retries: 5, # total retry attempts
retries_left: 5, # decremented on each failure
delay: 10_000 # constant delay in ms between retries
}
```
---
## Telemetry Events
Fired under `[:<telemetry_id>, :connection, <event>]`:
| Event | Measurements | Metadata | Description |
|-----------------|-----------------|-------------------|-----------------------------------|
| `:success` | `%{count: 1}` | `%{url: String}` | Handshake completed |
| `:failure` | `%{count: 1}` | `%{reason: term}` | Connect or upgrade failed |
| `:disconnected` | `%{count: 1}` | `%{reason: term}` | Underlying TCP connection dropped |
| `:timeout` | `%{count: 1}` | `%{}` | Idle ping/data timeout |
| `:status` | `%{value: 0|1}` | `%{}` | `0`=down, `1`=up |
Attach as usual:
```elixir
:telemetry.attach(
"log-connection-success",
[:websocket_producer, :connection, :success],
fn event_name, measurements, metadata, _config ->
IO.inspect({event_name, measurements, metadata}, label: "Telemetry Event")
end,
nil
)
```
---
## Running Tests
```bash
mix test
```
---
## Dialyzer
```bash
mix dialyzer --plt
mix dialyzer
```
---
## Contributing
PRs and issues welcome! Please follow Elixir conventions and include tests.
---
## License
Apache License 2.0 © 2025