defmodule Rabbit.Connection do
@moduledoc """
A RabbitMQ connection pool process.
Connections form the basis of any application that is working with RabbitMQ. A
connection module is needed by all the other modules included with Rabbit. They
wrap around the standard `AMQP.Connection` and provide the following
benefits:
* Durability during connection failures through use of exponential backoff.
* Increased throughput via connection pooling.
* Subscriptions that assist connection status monitoring.
* Easy runtime setup through an `c:init/2` callback.
## Example
# Connection module
defmodule MyConnection do
use Rabbit.Connection
def start_link(opts \\\\ []) do
Rabbit.Connection.start_link(__MODULE__, opts, name: __MODULE__)
end
# Callbacks
@impl Rabbit.Connection
def init(_type, opts) do
# Perform any runtime configuration
{:ok, opts}
end
end
# Start the connection
MyConnection.start_link()
# Subscribe to the connection
Rabbit.Connection.subscribe(MyConnection)
receive do
{:connected, connection} -> "hello"
end
# Stop the connection
Rabbit.Connection.stop(MyConnection)
receive do
{:disconnected, reason} -> "bye"
end
"""
alias Rabbit.Connection
@type t :: GenServer.name()
@type option ::
{:uri, String.t()}
| {:pool_size, non_neg_integer()}
| {:max_overflow, non_neg_integer()}
| {:strategy, :lifo | :fifo}
| {:name, String.t()}
| {:username, String.t()}
| {:password, String.t()}
| {:virtual_host, String.t()}
| {:host, String.t()}
| {:port, integer()}
| {:channel_max, integer()}
| {:frame_max, integer()}
| {:heartbeat, integer()}
| {:connection_timeout, integer()}
| {:ssl_options, atom() | Keyword.t()}
| {:socket_options, Keyword.t()}
| {:retry_backoff, non_neg_integer()}
| {:retry_max_delay, non_neg_integer()}
@type options :: [option()]
@doc """
A callback executed by each component of the connection pool.
Two versions of the callback must be created. One for the pool, and one
for the connections. The first argument differentiates the callback.
# Initialize the pool
def init(:connection_pool, opts) do
{:ok, opts}
end
# Initialize a single connection
def init(:connection, opts) do
{:ok, opts}
end
Returning `{:ok, opts}` - where `opts` is a keyword list of `t:option/0` will,
cause `start_link/3` to return `{:ok, pid}` and the process to enter its loop.
Returning `:ignore` will cause `start_link/3` to return `:ignore` and the process
will exit normally without entering the loop.
"""
@callback init(:connection_pool | :connection, options()) :: {:ok, options()} | :ignore
################################
# Public API
################################
@doc """
Starts a connection pool process.
## Options
* `:uri` - The connection URI. This takes priority over other connection attributes.
* `:pool_size` - The number of processes to create for connections - defaults to `1`.
Each process consumes a RabbitMQ connection.
* `:max_overflow` - Maximum number of temporary workers created when the pool
is empty - defaults to `0`.
* `:stratgey` - Determines whether checked in workers should be placed first
or last in the line of available workers - defaults to `:fifo`.
* `:name` - A name that will be displayed in the management UI.
* `:username` - The name of a user registered with the broker - defaults to `"guest"`.
* `:password` - The password of user - defaults to `"guest"`.
* `:virtual_host` - The name of a virtual host in the broker - defaults to `"/"`.
* `:host` - The hostname of the broker - defaults to `"localhost"`.
* `:port` - The port the broker is listening on - defaults to `5672`.
* `:channel_max` - The channel_max handshake parameter - defaults to `0`.
* `:frame_max` - The frame_max handshake parameter - defaults to `0`.
* `:heartbeat` - The hearbeat interval in seconds - defaults to `10`.
* `:connection_timeout` - The connection timeout in milliseconds - defaults to `50000`.
* `:retry_backoff` - The amount of time in milliseconds to add between connection retry
attempts - defaults to `1_000`.
* `:retry_max_delay` - The max amount of time in milliseconds to be used between
connection attempts - defaults to `5_000`.
* `:ssl_options` - Enable SSL by setting the location to cert files - defaults to `:none`.
* `:client_properties` - A list of extra client properties to be sent to the server - defaults to `[]`.
* `:socket_options` - Extra socket options. These are appended to the default options. \
See http://www.erlang.org/doc/man/inet.html#setopts-2 and http://www.erlang.org/doc/man/gen_tcp.html#connect-4 \
for descriptions of the available options.
## Server Options
You can also provide server options - which are simply the same ones available
for `t:GenServer.options/0`.
"""
@spec start_link(module(), options(), GenServer.options()) :: GenServer.on_start()
def start_link(module, opts \\ [], server_opts \\ []) do
Connection.Pool.start_link(module, opts, server_opts)
end
@doc """
Stops the connection pool.
"""
@spec stop(Rabbit.Connection.t()) :: :ok
def stop(connection_pool) do
for {_, worker, _, _} <- GenServer.call(connection_pool, :get_all_workers) do
:ok = GenServer.call(worker, :disconnect)
end
:poolboy.stop(connection_pool)
end
@doc """
Fetches a raw `AMQP.Connection` struct from the pool.
"""
@spec fetch(Rabbit.Connection.t(), timeout()) ::
{:ok, AMQP.Connection.t()} | {:error, :not_connected}
def fetch(connection_pool, timeout \\ 5_000) do
:poolboy.transaction(connection_pool, &GenServer.call(&1, :fetch, timeout))
end
@doc """
Checks whether a connection is alive within the pool.
"""
@spec alive?(Rabbit.Connection.t(), timeout()) :: boolean()
def alive?(connection_pool, timeout \\ 5_000) do
:poolboy.transaction(connection_pool, &GenServer.call(&1, :alive?, timeout))
end
@doc """
Runs the given function inside a transaction.
The function must accept a connection pid.
"""
@spec transaction(Rabbit.Connection.t(), (Rabbit.Connection.t() -> any())) :: any()
def transaction(connection_pool, fun) do
:poolboy.transaction(connection_pool, &fun.(&1))
end
@doc """
Subscribes a process to a connection in the pool.
A subscribed process can receive the following messages:
`{:connected, connection}` - where connection is an `AMQP.Connection` struct.
During the subscription process, if the connection is alive, this message will
immediately be sent. If the connection goes down, and manages to reconnect, this
message will be sent.
`{:disconnected, reason}` - where reason can be any value.
If the connection goes down, all subscribing processes are sent this message.
The connection process will then go through an exponential backoff period until
connection is achieved again.
"""
@spec subscribe(Rabbit.Connection.t(), pid() | nil, timeout()) :: :ok
def subscribe(connection_pool, subscriber \\ nil, timeout \\ 5_000) do
subscriber = subscriber || self()
:poolboy.transaction(connection_pool, &GenServer.call(&1, {:subscribe, subscriber}, timeout))
end
defmacro __using__(opts) do
quote location: :keep do
@behaviour Rabbit.Connection
if Module.get_attribute(__MODULE__, :doc) == nil do
@doc """
Returns a specification to start this connection under a supervisor.
See `Supervisor`.
"""
end
def child_spec(args) do
default = %{
id: __MODULE__,
start: {__MODULE__, :start_link, [args]}
}
Supervisor.child_spec(default, unquote(Macro.escape(opts)))
end
defoverridable(child_spec: 1)
end
end
end