defmodule Jackalope do
@moduledoc "README.md"
|> File.read!()
|> String.split("<!-- MDOC !-->")
|> Enum.fetch!(1)
use Supervisor
require Logger
@default_mqtt_server {
Tortoise311.Transport.Tcp,
host: "localhost", port: 1883
}
@default_max_work_list_size 100
@default_work_list_module Jackalope.TransientWorkList
@doc """
Start a Jackalope session
This will start a supervised group of processes; part of the group
will keep track of the topic filter subscription state, and hold a
list of yet to be published messages, as well as the requested
subscription changes; the other part of the process tree will keep
the MQTT connection specific parts, making sure we got a
connection. See the main documentation on the `Jackalope` module for
more information on the process architecture.
`Jackalope.start_link/1` takes a keyword list containing option
values, that configure the instance, as an argument. The available
options and their defaults are:
- `client_id` (default: "jackalope"), string that will be used as
the client_id of the MQTT connection; see `t Tortoise311.client_id`
for more information on valid client ids. Notice that the
client_id needs to be unique on the server, so two clients may not
have the same client_id.
- `user_name` (optional) specifies the MQTT Username.
- `password` (optional) specifies the MQTT Password. Google
cloud IOT requires a JWT for this field.
- `initial_topics` (optional) specifies a list of topic_filters
Jackalope should connect to when a connection has been
established. Notice that this list is also used should a reconnect
happen later in the life-cycle. Note that Jackalope does not support
dynamic subscriptions or unsubscribing. This is the only mechanism
for subscribing.
- `handler` (default: `Jackalope.Handler.Logger`) specifies the
module implementing the callbacks (implementing
`Jackalope.Handler` behaviour) to use. This module reacts to
the events Jackalope communicates about the connection
life-cycle, including receiving a message on a
subscribed topic filter. Read the documentation for
`Jackalope.Handler` for more information on the events and
callbacks.
- `server` (default: #{inspect(@default_mqtt_server)}) specifies the
connection type, and its options, to use when connecting to the
MQTT server. The default specification will attempt to connect to
a broker running on localhost:1883, on an insecure
connection. This value should only be used for testing and
development.
Server options for use with AWS IoT:
[
verify: :verify_peer,
host: mqtt_host(), # must return the full name, *without wild cards*, for e.g. "abcdefghijklmo-ats.iot.us-east-1.amazonaws.com"
port: mqtt_port(), # must return the correct port, e.g. 443
alpn_advertised_protocols: ["x-amzn-mqtt-ca"],
server_name_indication: to_charlist(mqtt_host()),
cert: cert, # the device's X509 certificate in DER format
key: key, # the device's private key in DER format
cacerts: [signer_cert] ++ aws_ca_certs(), # the device's signer cert, plus AWS IoT CA certs in DER format to be returned by aws_ca_certs()
versions: [:"tlsv1.2"],
customize_hostname_check: [match_fun: :public_key.pkix_verify_hostname_match_fun(:https)]
]
- `work_list_mod` names the module implementing the Jackalope WorkList protocol that will be used to manage
the publish commands sent to Tortoise by the Jackalope Session.
The module must also implement the function `@spec new(function(), function(), non_neg_integer(), Keyword.t()) :: any()`.
See Jackalope.TransientWorkList (the default) for examples.
- `max_work_list_size` (default: #{@default_max_work_list_size}) specifies the maximum
number of unexpired work orders Jackalope will retain in its work list
(the commands yet to be sent to the MQTT server). When the maximum is
reached, the oldest work order is dropped before adding a new work order.
- `last_will` (default: nil) specifies the last will message that
should get published on the MQTT broker if the connection is
closed or dropped unexpectedly. If we want to specify a last will
topic we should define a keyword list containing the following:
- `topic` (Required) the topic to post the last will message to;
this should be specified as a string and it should be a valid
MQTT topic; consult `t Tortoise311.topic` for more info on valid
MQTT topics.
- `payload` (default: nil) the payload of the last will message;
- `qos` (default: 0) either 0 or 1, denoting the quality of
service the last will message should get published with; note
that QoS=2 is not supported by AWS IoT.
- `backoff` (default: [min_interval: 1_000, max_interval: 30_000])
gives the bounds of an exponential backoff algorithm used when retrying
from failed connections.
"""
@spec start_link(keyword()) :: Supervisor.on_start()
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts)
end
@impl Supervisor
def init(opts) do
client_id = Keyword.get(opts, :client_id, "jackalope")
jackalope_handler = Keyword.get(opts, :handler, Jackalope.Handler.Logger)
max_work_list_size = Keyword.get(opts, :max_work_list_size, @default_max_work_list_size)
work_list_mod = Keyword.get(opts, :work_list_mod, @default_work_list_module)
children = [
{Jackalope.Session,
[
handler: jackalope_handler,
max_work_list_size: max_work_list_size,
work_list_mod: work_list_mod
]},
{Jackalope.Supervisor,
[
handler: jackalope_handler,
client_id: client_id,
connection_options: connection_options(opts),
last_will: Keyword.get(opts, :last_will),
work_list_mod: work_list_mod
]}
]
# Supervision strategy is rest for one, as a crash in Jackalope
# would result in inconsistent state in Jackalope; we would not be
# able to know about the subscription state; so we teardown the
# tortoise311 if Jackalope crash. Should the Jackalope.Supervisor
# crash, Jackalope should resubscribe to the topic filters it
# currently know about, so that should be okay.
Supervisor.init(children, strategy: :rest_for_one)
end
@doc """
Request the MQTT client to reconnect to the broker
This can be useful on devices that has multiple network
interfaces.
"""
@spec reconnect() :: :ok
defdelegate reconnect(), to: Jackalope.Session
@doc """
Publish a message to the MQTT broker
The `payload` will get published on `topic`. `Jackalope` will keep
the message in a queue until we got a connection, at which point it
will dispatch the publish. This of course present us with a problem:
what if we place a publish request to "unlock the front door" while
the client is offline? We don't want to receive a message that the
front door has been unlocked two hours later when the MQTT client
reconnect; To solve that problem we have a `ttl` option we can
specify on the publish.
```elixir
Jackalope.publish("doors/front_door", %{action: "unlock"}, qos: 1, ttl: 5_000)
```
The available publish options are:
- `qos` (default `1`) sets the quality of service of the message
delivery; Notice that only quality of service 0 an 1 are
supported by AWS IoT.
- `retain` (default `false`) sets whether the broker should retain the message.
Note that AWS IoT does not support this feature.
- `ttl` (default `3_600_000`) sets how long (in msecs) publishing the message will be
retried until it has expired.
"""
@spec publish(String.t(), any, options) ::
:ok | {:error, :invalid_qos}
when options: [
{:qos, 0..2} | {:retain, boolean} | {:ttl, non_neg_integer}
]
defdelegate publish(topic, payload, opts \\ []), to: Jackalope.Session
@doc """
Get the current MQTT connection status
"""
@spec connection_status() :: :offline | :online
defdelegate connection_status(), to: Jackalope.Session
# TODO Get rid of this stuff
defp connection_options(opts) do
server =
Keyword.get(opts, :server, @default_mqtt_server)
|> do_configure_server()
# Default backoff options is 1 sec to 30 secs, doubling each time.
backoff_opts = Keyword.get(opts, :backoff) || [min_interval: 1_000, max_interval: 30_000]
Logger.info("[Jackalope] Connecting with backoff options #{inspect(backoff_opts)}")
initial_topics = Keyword.get(opts, :initial_topics)
subscriptions =
for topic_filter <- List.wrap(initial_topics),
do: {topic_filter, 1}
[
server: server,
backoff: backoff_opts,
subscriptions: subscriptions
]
|> maybe_add_user_name_password(opts)
end
defp maybe_add_user_name_password(connection_options, user_opts) do
user_name = Keyword.get(user_opts, :user_name)
password = Keyword.get(user_opts, :password)
if Enum.any?([user_name, password], &is_nil/1) do
connection_options
else
connection_options
|> Keyword.put(:user_name, user_name)
|> Keyword.put(:password, password)
end
end
# Pass normal Tortoise311 transports through as is; assume that the
# configuration is correct!
defp do_configure_server({Tortoise311.Transport.Tcp, _opts} = keep), do: keep
defp do_configure_server({Tortoise311.Transport.SSL, _opts} = keep), do: keep
# Attempt to create setup a connection that works with AWS IoT
defp do_configure_server(aws_iot_opts) when is_list(aws_iot_opts) do
# TODO improve the user experience when working with AWS IoT and
# then remove this raise
raise ArgumentError, "Please specify a Tortoise311 transport for the server"
end
end