defmodule AMQP.Application do
@moduledoc """
Provides access to configured connections and channels.
"""
use Application
require Logger
@impl true
def start(_type, _args) do
children = load_connections() ++ load_channels()
opts = [
strategy: :one_for_one,
name: AMQP.Application,
max_restarts: length(children) * 2,
max_seconds: 1
]
Supervisor.start_link(children, opts)
end
defp load_connections do
conn = Application.get_env(:amqp, :connection)
conns = Application.get_env(:amqp, :connections, [])
conns = if conn, do: conns ++ [default: conn], else: conns
Enum.map(conns, fn {name, opts} ->
arg = opts ++ [proc_name: name]
id = AMQP.Application.Connection.get_server_name(name)
Supervisor.child_spec({AMQP.Application.Connection, arg}, id: id)
end)
end
defp load_channels do
chan = Application.get_env(:amqp, :channel)
chans = Application.get_env(:amqp, :channels, [])
chans = if chan, do: chans ++ [default: chan], else: chans
Enum.map(chans, fn {name, opts} ->
arg = opts ++ [proc_name: name]
id = AMQP.Application.Channel.get_server_name(name)
Supervisor.child_spec({AMQP.Application.Channel, arg}, id: id)
end)
end
@doc """
Provides an easy way to access an AMQP connection.
The connection will be monitored by AMQP's GenServer and it will
automatically try to reconnect when the connection is gone.
## Usage
When you want to have a single connection in your app:
config :amqp, connection: [
url: "amqp://guest:guest@localhost:15672"
]
You can also use any options available on `AMQP.Connection.open/2`:
config :amqp, connection: [
host: "localhost",
port: 15672
username: "guest",
password: "guest"
]
Then the connection will be open at the start of the application and you can
access via this function.
iex> {:ok, conn} = AMQP.Application.get_connection()
By default, it tries to connect to your local RabbitMQ. You can simply pass
the empty keyword list too:
config :amqp, connection: [] # == [url: "amqp://0.0.0.0"]
You can set up multiple connections with `:connections` key:
config :amqp, connections: [
business_report: [
url: "amqp://host1"
],
analytics: [
url: "amqp://host2"
]
]
Then you can access each connection with its name.
iex> {:ok, conn1} = AMQP.Application.get_connection(:business_report)
iex> {:ok, conn2} = AMQP.Application.get_connection(:analytics)
The default name is :default so These two configurations are equivalent:
config :amqp, connection: []
config :amqp, connections: [default: []]
## Configuration options
* `:retry_interval` - The retry interval in milliseconds when the connection
is failed to open. (default `5000`)
* `:url` - AMQP URI for the connection
See also `AMQP.Connection.open/2` for all available options.
"""
@spec get_connection(binary | atom) :: {:ok, AMQP.Connection.t()} | {:error, any}
def get_connection(name \\ :default) do
AMQP.Application.Connection.get_connection(name)
end
@doc """
Provides an easy way to access an AMQP channel.
AMQP.Application provides a wrapper on top of `AMQP.Channel` with .
The channel will be monitored by AMQP's GenServer and it will automatically
try to reopen when the channel is gone.
## Usage
When you want to have a single channel in your app:
config :amqp,
connection: [url: "amqp://guest:guest@localhost:15672"],
channel: []
Then the channel will be open at the start of the application and you can
access it via this function.
iex> {:ok, chan} = AMQP.Application.get_channel()
You can also set up multiple channels with `:channels` key:
config :amqp,
connections: [
business_report: [url: "amqp://host1"],
analytics: [url: "amqp://host2"]
],
channels: [
bisiness_report: [connection: :business_report],
analytics: [connection: :analytics]
]
Then you can access each channel with its name.
iex> {:ok, conn1} = AMQP.Application.get_channel(:business_report)
iex> {:ok, conn2} = AMQP.Application.get_channel(:analytics)
You can also have multiple channels for a single connection.
config :amqp,
connection: [],
channels: [
consumer: [],
producer: []
]
## Configuration options
* `:connection` - The connection name configured with `connection` or
`connections` (default `:default`)
* `:retry_interval` - The retry interval in milliseconds when the channel is
failed to open (default `5000`)
## Caveat
Although AMQP will reopen the named channel automatically when it is closed
for some reasons, your application still needs to monitor the channel for a
consumer process.
Be aware the channel reopened doesn't automatically recover the subscription
of your consumer
Here is a sample GenServer module that monitors the channel and re-subscribe
the channel.
defmodule AppConsumer do
use GenServer
@channel :default
@queue "myqueue"
....
def handle_info(:subscribe, state) do
case subscribe() do
{:ok, chan} ->
{:noreply, Map.put(state, :channel, chan)}
_ ->
{:noreply, state}
end
end
def handle_info({:DOWN, _, :process, pid, reason}, %{channel: %{pid: pid}} = state) do
send(self(), :subscribe)
{:noreply, Map.put(state, :channel, nil)}
end
defp subscribe() do
case AMQP.Application.get_channel(@channel) do
{:ok, chan} ->
Process.monitor(chan.pid)
{:ok, _consumer_tag} = AMQP.Basic.consume(@channel, @queue)
{:ok, chan}
_error ->
Process.send_after(self(), :subscribe, 1000)
{:error, :retrying}
end
end
end
"""
@spec get_channel(binary | atom) :: {:ok, AMQP.Channel.t()} | {:error, any}
def get_channel(name \\ :default) do
AMQP.Application.Channel.get_channel(name)
end
end