defmodule Amqpx.Gen.Producer do
@moduledoc """
Generic implementation of amqp producer
"""
require Logger
use GenServer
alias Amqpx.{Basic, Channel, Confirm, Helper}
@type state() :: %__MODULE__{}
defstruct [
:channel,
:publisher_confirms,
publish_timeout: 1_000,
backoff: 5_000,
exchanges: [],
connection_name: Amqpx.Gen.ConnectionManager
]
# Public API
def start_link(opts) do
name = Map.get(opts, :name, __MODULE__)
GenServer.start_link(__MODULE__, opts, name: name)
end
def init(opts) do
state = struct(__MODULE__, opts)
Process.send(self(), :setup, [])
{:ok, state}
end
@spec publish(
exchange_name :: String.t(),
routing_key :: String.t(),
payload :: String.t(),
options :: Keyword.t()
) ::
:ok | :error
def publish(exchange_name, routing_key, payload, options \\ []) do
publish_by(__MODULE__, exchange_name, routing_key, payload, options)
end
@spec publish_by(
producer_name :: GenServer.name(),
exchange_name :: String.t(),
routing_key :: String.t(),
payload :: String.t(),
options :: Keyword.t()
) ::
:ok | :error
def publish_by(producer_name, exchange_name, routing_key, payload, options \\ []) do
case GenServer.call(producer_name, {:publish, {exchange_name, routing_key, payload, options}}) do
:ok ->
:ok
reason ->
Logger.error("Error during publish: #{inspect(reason)}")
:error
end
end
# Callbacks
def handle_info(
:setup,
%{
backoff: backoff,
publisher_confirms: publisher_confirms,
exchanges: exchanges,
connection_name: connection_name
} = state
) do
case GenServer.call(connection_name, :get_connection) do
nil ->
:timer.sleep(backoff)
{:stop, :not_ready, state}
connection ->
{:ok, channel} = Channel.open(connection, self())
Process.monitor(channel.pid)
state = %{state | channel: channel}
declare_exchanges(exchanges, channel)
if publisher_confirms do
Confirm.select(channel)
end
{:noreply, state}
end
end
def handle_info({_ref, {:ok, _port, _pid}}, state) do
Logger.debug("Amqpx socket probably restarted by underlying library")
{:noreply, state}
end
# Error handling
def handle_info({_ref, {:error, :no_socket, _pid}}, state) do
Logger.debug("Amqpx socket not found")
{:noreply, state}
end
def handle_info({:DOWN, _, :process, _pid, reason}, state) do
Logger.info("Monitored channel process crashed: #{inspect(reason)}. Restarting...")
{:stop, :channel_exited, state}
end
def handle_info(message, state) do
Logger.warn("Unknown message received #{inspect(message)}")
{:noreply, state}
end
def terminate(_, %__MODULE__{channel: nil}), do: nil
def terminate(_, %__MODULE__{channel: channel}) do
if Process.alive?(channel.pid) do
Channel.close(channel)
end
end
def handle_call(_msg, _from, %{channel: nil}) do
{:reply, {:error, :not_connected}}
end
def handle_call(
{:publish, {exchange, routing_key, payload, options}},
_from,
%{
channel: channel,
publisher_confirms: publisher_confirms,
publish_timeout: publish_timeout
} = state
) do
with :ok <-
Basic.publish(
channel,
exchange,
routing_key,
payload,
Keyword.merge([persistent: true], options)
),
{:confirm, true} <-
{:confirm, confirm_delivery(publisher_confirms, publish_timeout, channel)} do
{:reply, :ok, state}
else
{:error, reason} ->
Logger.error("cannot publish message to broker: #{inspect(reason)}")
{:stop, reason, {:error, reason}, state}
{:confirm, :timeout} ->
Logger.error("cannot publish message to broker: publisher timeout")
{:stop, "publisher timeout", {:error, :timeout}, state}
{:confirm, false} ->
Logger.error("cannot publish message to broker: broker nack")
{:stop, "publisher error", {:error, :nack}, state}
end
end
# Private functions
@spec confirm_delivery(boolean(), integer(), Channel.t()) :: boolean() | :timeout
defp confirm_delivery(false, _, _), do: true
defp confirm_delivery(true, timeout, channel) do
Confirm.wait_for_confirms(channel, timeout)
end
defp declare_exchanges(exchanges, channel) do
Enum.each(exchanges, &Helper.setup_exchange(channel, &1))
end
end