[](https://hex.pm/packages/off_broadway_mqtt_connector)
[](https://coveralls.io/github/kbredemeier/off_broadway_mqtt?branch=master)
[](https://circleci.com/gh/kbredemeier/off_broadway_mqtt)
[](https://github.com/kbredemeier/off_broadway_mqtt/blob/master/LICENSE)
# OffBroadway.MQTT
A MQTT connector for [Broadway](https://github.com/plataformatec/broadway).
## Installation
Add `off_broadway_mqtt_connector` to the list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:off_broadway_mqtt, "~> 0.1.0", hex: "off_broadway_mqtt_connector"}
]
end
```
Notice that the package has a different name than the application!
## Usage
Add it as a producer to your `Broadway`:
```elixir
defmodule MyApp.NincompoopFilter do
use OffBroadway.MQTT
defmodule Nincompoop do
defexception ack: :ignore, message: nil
def message(e) do
"message is probably coming from a nincompoop: " <> e.message
end
end
def start_link(config, topic) do
Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
default: [
module: {Producer, [config, subscription: {topic, 0}]},
stages: 1
]
],
processors: [default: [stages: 1]],
batchers: [
default: [stages: 1, batch_size: 10]
]
)
end
@impl true
def handle_message(_processor_name, message, _context) do
message
|> Message.update_data(&process_data/1)
rescue
e ->
Message.failed(message, e)
end
defp process_data(%OffBroadway.MQTT.Data{acc: msg} = data) do
msg
|> String.downcase()
|> String.contains?("great again")
|> case do
true -> raise Nincompoop, "contains \"great again\""
false -> data
end
end
@impl true
def handle_batch(_, messages, _batch_info, _context) do
# ...
messages
end
end
```
Start it by passing it at least a `t:OffBroadway.MQTT.Config.t/0` and the
`subscription` option with a topic to subscribe to and the desired QOS. For
further options refer to the `OffBroadway.MQTT.Producer` docs.
Default values for the configuration can be given via the mix config:
```elixir
use Mix.Config
config :off_broadway_mqtt,
client_id_prefix: "sensor_data_processor",
server_opts: [
host: "vernemq",
port: 8883,
transport: :ssl
],
handler: MyApp.BetterHandler
```
Then build a config and start your broadway:
```elixir
# Builds a configuration with all configured default values
config = OffBroadway.MQTT.Config.new_from_app_config()
# Builds a configuration from the defaults and overrides values
config =
OffBroadway.MQTT.Config.new(
client_id_prefix: "myapp",
server_opts: [
host: "mosquitto",
# port is converted into a `integer` if it is not already one
port: "1883",
transport: :tcp,
username: "admin",
password: "admin"
]
)
# Start broadway
MyApp.NincompoopFilter.start(config, "test_topic")
```
Keep in mind that any option with nil or the empty string will be removed from
the server options to prevent issues when configuring the application from
environment variables.
## Telemetry events
Telemetry events are disabled by default. To enable them the following must be
configured at compile time:
```elixir
use Mix.Config
config :off_broadway_mqtt,
telemetry_enabled: true,
```
A prefix can be configured that is used to prefix any telemetry event.
```elixir
use Mix.Config
config :off_broadway_mqtt,
telemetry_prefix: :my_broadway,
```
The prefix can also be passed at runtime with the
`t:OffBroadway.MQTT.Config.t/0` to the producer.
The following events are emitted:
- `my_broadway.client.connection.up.count`
- `my_broadway.client.connection.down.count`
- `my_broadway.client.subscription.up.count`
- `my_broadway.client.subscription.down.count`
- `my_broadway.client.messages.count`
- `my_broadway.queue.in.count`
- `my_broadway.queue.in.size`
- `my_broadway.queue.out.count`
- `my_broadway.queue.out.size`
- `my_broadway.acknowledger.success.count`
- `my_broadway.acknowledger.failed.count`
- `my_broadway.acknowledger.ignored.count`
- `my_broadway.acknowledger.requeued.count`
## Development
For development you probably need a running MQTT server in your develoment
environment. The provided `docker-compose.yml` starts a `vernemq` container for you.
Then run the following commands:
```
mix deps.get
mix test
```
The documentation can be generated with `mix docs`. Code coverage report can be
generated with `mix coveralls.html`.
## Attribution
This library heavily depends on Martin Gausby's [tortoise](https://github.com/gausby/tortoise)
for connecting to the MQTT broker.
## License
Copyright 2019 Kristopher Bredemeier
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
[http://www.apache.org/licenses/LICENSE-2.0](http://www.apache.org/licenses/LICENSE-2.0)
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.