
# OffBroadwayWebSocket
An Elixir library that provides a **Broadway** producer for handling WebSocket connections using the **gun** library. It supports ping/pong timeout monitoring, and demand-based message dispatching in an Off-Broadway setup.
## Features
- Manages WebSocket connections.
- Monitors WebSocket connections with ping/pong messages and triggers timeouts.
- Integrates seamlessly with **Broadway** for demand-driven message processing.
## Installation
Add `off_broadway_websocket` to your list of dependencies in `mix.exs`:
def deps do
{:off_broadway_websocket, "~> 0.2.2"}
Run the following to fetch and compile the dependency:
mix deps.get
mix deps.compile
## Usage
### Basic Setup with **Broadway**
In your project, create a **Broadway** module to use the **OffBroadwayWebSocket.Producer** as the producer.
defmodule MyApp.Broadway do
use Broadway
require Logger
alias Broadway.Message
def start_link(_args) do
name: __MODULE__,
producer: [
module: {
url: "wss://example.com:443",
path: "/path_to_ws_endpoint",
ws_timeout: 15_000,
ws_opts: %{keepalive: 10_000, silence_pings: false},
http_opts: %{version: :"HTTP/1.1"},
telemetry_id: :custom_telemetry_id,
headers: [ # Optional headers
{"X-ABC-APIKEY", "api-key"},
{"X-ABC-PAYLOAD", %{}},
{"X-ABC-SIGNATURE", "signature"}
transformer: {__MODULE__, :transform, []},
concurrency: 1
processors: [
default: [min_demand: 0, max_demand: 100, concurrency: 8]
@impl true
def handle_message(_processor, %Message{data: raw_message} = message, _context) do
case Jason.decode(raw_message) do
{:ok, %{"type" => "heartbeat"}} ->
Logger.debug("Heartbeat message received.")
{:ok, data} ->
Logger.info("Data received: #{inspect(data)}")
{:error, error} ->
Logger.error("Failed to decode message: #{inspect(error)}")
def transform(event, _opts) do
data: event,
acknowledger: {__MODULE__, :ack_id, :ack_data}
def ack(:ack_id, _successful, _failed) do
### Configuration Options
- **url**: The WebSocket URL.
- **path**: The WebSocket endpoint path.
- **ws_timeout**: Time in milliseconds to wait for a pong response before assuming the connection is lost.
- **ws_opts**: WebSocket-specific options passed to the **gun 2.1** library, such as `keepalive` and `silence_pings`.
- **http_opts**: HTTP-specific options also compatible with **gun 2.1**, including version or custom headers.
- **headers**: Optional headers to use when upgrading to WebSocket.
- **telemetry_id**: Optional custom identifier for telemetry events. Defaults to :websocket_producer.
Complete list of options accepted by `http_opts` and `ws_opts` is available [here](https://ninenines.eu/docs/en/gun/2.1/manual/gun/).
## Telemetry Events
**OffBroadwayWebSocket** emits telemetry events for key WebSocket operations. These events can be used for monitoring and integration with tools like Prometheus, Datadog, or other observability platforms.
### Event Table
| **Event Name** | **Measurements** | **Metadata** | **Description** |
| `[:websocket_producer, :connection, :success]` | `count: 1` | `url: String` | Emitted when a connection is successfully established. |
| `[:websocket_producer, :connection, :failure]` | `count: 1` | `reason: term()` | Emitted when a connection attempt fails. |
| `[:websocket_producer, :connection, :disconnected]` | `count: 1` | `reason: term()` | Emitted when the WebSocket connection is disconnected. |
| `[:websocket_producer, :connection, :timeout]` | `count: 1` | (none) | Emitted when a ping/pong timeout occurs. |
| `[:websocket_producer, :connection, :status]` | `value: [0,1]` | (none) | Emitted to indicate the current WebSocket connection status (`0` = down, `1` = up). |
### Example Usage
You can attach custom handlers to these telemetry events for logging or monitoring purposes. Here's an example:
[:websocket_producer, :connection, :success],
fn event_name, measurements, metadata, _config ->
IO.inspect({event_name, measurements, metadata}, label: "Telemetry Event")
This allows you to customize behavior or integrate the events into observability tools.
### Running Tests
To run tests:
mix test
Ensure your WebSocket endpoint is reachable and configured properly for end-to-end tests.
### Running Dialyzer
For static analysis with Dialyzer, make sure PLTs are built:
mix dialyzer --plt
mix dialyzer
## Contributing
Feel free to open issues or submit PRs to enhance the functionality of **OffBroadwayWebSocket**. Contributions are welcome!
## License
This project is licensed under the Apache License, Version 2.0.