defmodule StepFlow.Amqp.CommonConsumer do
@moduledoc """
Definition of a Common Consumer of RabbitMQ queue.
To implement a consumer,
```elixir
defmodule MyModule do
use StepFlow.Amqp.CommonConsumer, %{
queue: "name_of_the_rabbit_mq_queue",
exchange "name_of_exchange",
consumer: &MyModule.consume/4
}
def consume(channel, tag, redelivered, payload) do
...
Basic.ack(channel, tag)
end
end
```
"""
require Logger
alias StepFlow.Amqp.CommonConsumer
alias StepFlow.Amqp.Helpers
@default_channel_status_interval 10_000
@doc false
defmacro __using__(opts) do
quote do
use GenServer
use AMQP
alias StepFlow.Amqp.CommonEmitter
alias StepFlow.Amqp.Helpers
def child_spec(_) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
type: :worker
}
end
@doc false
def start_link do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end
@impl true
@doc false
def init(:ok) do
result = rabbitmq_connect()
Process.send_after(self(), :check, CommonConsumer.get_channel_check_interval())
result
end
def disconnect do
GenServer.call(__MODULE__, :disconnect)
end
@impl true
def handle_call(:disconnect, _from, state) do
state = CommonConsumer.rabbitmq_disconnect(state)
{:reply, :ok, state}
end
@impl true
# Confirmation sent by broker after registering this process as consumer
def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, state) do
{:noreply, state}
end
@impl true
# Sent by broker when consumer is unexpectedly cancelled (such as after queue deletion)
def handle_info({:basic_cancel, %{consumer_tag: _consumer_tag}}, state) do
{:stop, :normal, state}
end
@impl true
# Confirmation sent by broker to consumer process after a Basic.cancel
def handle_info({:basic_cancel_ok, %{consumer_tag: _consumer_tag}}, state) do
{:noreply, state}
end
@impl true
def handle_info(
{:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered} = headers},
state
) do
queue = unquote(opts).queue
Logger.info("#{__MODULE__}: receive message on queue: #{queue}")
max_retry_to_timeout =
StepFlow.Configuration.get_var_value(StepFlow.Amqp, :max_retry_to_timeout, 10)
Logger.debug("#{__MODULE__} #{inspect(headers)}")
max_retry_reached =
with headers when headers != :undefined <- Map.get(headers, :headers),
{"x-death", :array, death} <- List.keyfind(headers, "x-death", 0),
{:table, table} <- List.first(death),
{"count", :long, count} <- List.keyfind(table, "count", 0) do
count > max_retry_to_timeout
else
_ -> false
end
if max_retry_reached do
Logger.info("#{__MODULE__}: timeout message sent to queue: #{queue}_timeout")
CommonEmitter.publish(queue <> "_timeout", payload, [], "job_response")
AMQP.Basic.ack(state.channel, tag)
else
# Check if payload is json
case payload |> Jason.decode() do
{:ok, data} ->
unquote(opts).consumer.(state.channel, tag, redelivered, data)
{:error, _} ->
Logger.error("#{__MODULE__}: Payload is not a json: #{payload}")
AMQP.Basic.reject(state.channel, tag, requeue: false)
end
end
{:noreply, state}
end
@impl true
def handle_info(:check, state) do
Logger.debug(
"[#{__MODULE__}] Check AMQP channel status. Current channel: #{inspect(state.channel)}"
)
{:ok, state} =
if Process.alive?(state.channel.pid) do
{:ok, state}
else
state = CommonConsumer.rabbitmq_disconnect(state)
rabbitmq_connect()
end
Process.send_after(self(), :check, CommonConsumer.get_channel_check_interval())
{:noreply, state}
end
@impl true
def handle_info({:DOWN, _, :process, pid, reason}, state) do
{:ok, state} =
if state.auto_restart do
rabbitmq_connect()
else
{:ok, state}
end
{:noreply, state}
end
defp rabbitmq_connect do
url = Helpers.get_amqp_connection_url()
options = Helpers.get_amqp_connection_options()
queue = unquote(opts).queue
exchange_name = unquote(opts).exchange
prefetch_count = Map.get(unquote(opts), :prefetch_count)
CommonConsumer.rabbitmq_connect(url, options, queue, exchange_name, prefetch_count)
end
@impl true
def terminate(_reason, state) do
CommonConsumer.rabbitmq_disconnect(state)
end
end
end
def rabbitmq_connect(url, options, queue, exchange, prefetch_count) do
case AMQP.Connection.open(url, options) do
{:ok, connection} ->
init_amqp_connection(connection, queue, exchange, prefetch_count)
{:error, message} ->
Logger.error("#{__MODULE__}: unable to connect to: #{url}, reason: #{inspect(message)}")
# Reconnection loop
:timer.sleep(10_000)
rabbitmq_connect(url, options, queue, exchange, prefetch_count)
end
end
def init_amqp_connection(connection, queue, exchange_name, prefetch_count \\ nil) do
Process.monitor(connection.pid)
{:ok, channel} = AMQP.Channel.open(connection)
Logger.info("[#{__MODULE__}] init_amqp_connection: opened channel=#{inspect(channel)}")
if prefetch_count != nil do
:ok = AMQP.Basic.qos(channel, prefetch_count: prefetch_count)
end
CommonConsumer.create_queues(channel, queue)
Logger.info("#{__MODULE__}: bind #{queue}")
AMQP.Queue.bind(channel, queue, exchange_name, routing_key: queue)
Logger.info("#{__MODULE__}: connected to queue #{queue}")
{:ok, _consumer_tag} = AMQP.Basic.consume(channel, queue)
{:ok, %{channel: channel, connection: connection, auto_restart: true}}
end
def rabbitmq_disconnect(state) do
Logger.info("#{__MODULE__}: Closing AMQP connection...")
AMQP.Connection.close(state.connection)
state
|> Map.replace(:auto_restart, false)
end
def create_queues(channel, queue) do
queue_type =
case Helpers.get_amqp_server_configuration() do
"cluster" -> "quorum"
_ -> "classic"
end
AMQP.Queue.declare(channel, "job_response_not_found",
durable: true,
arguments: [{"x-queue-type", :longstr, queue_type}]
)
AMQP.Queue.declare(channel, queue <> "_timeout",
durable: true,
arguments: [{"x-queue-type", :longstr, queue_type}]
)
AMQP.Exchange.topic(channel, "job_response",
durable: true,
arguments: [{"alternate-exchange", :longstr, "job_response_not_found"}]
)
AMQP.Exchange.topic(channel, "worker_response",
durable: true,
arguments: [{"alternate-exchange", :longstr, "worker_response_not_found"}]
)
AMQP.Queue.declare(channel, "direct_messaging_not_found",
durable: true,
arguments: [{"x-queue-type", :longstr, queue_type}]
)
AMQP.Queue.declare(channel, queue <> "_timeout",
durable: true,
arguments: [{"x-queue-type", :longstr, queue_type}]
)
AMQP.Exchange.declare(channel, "direct_messaging", :headers,
durable: true,
arguments: [{"alternate-exchange", :longstr, "direct_messaging_not_found"}]
)
AMQP.Exchange.fanout(channel, "job_response_delayed",
durable: true,
arguments: [{"x-queue-type", :longstr, queue_type}]
)
{:ok, _job_response_delayed_queue} =
AMQP.Queue.declare(channel, "job_response_delayed",
durable: true,
arguments: [
{"x-message-ttl", :short, 5000},
{"x-dead-letter-exchange", :longstr, ""},
{"x-queue-type", :longstr, queue_type}
]
)
AMQP.Queue.bind(channel, "job_response_delayed", "job_response_delayed", routing_key: "*")
AMQP.Queue.declare(channel, queue,
durable: true,
arguments: [
{"x-dead-letter-exchange", :longstr, "job_response_delayed"},
{"x-dead-letter-routing-key", :longstr, queue},
{"x-queue-type", :longstr, queue_type}
]
)
end
def get_channel_check_interval do
Application.get_env(:step_flow, StepFlow.Workers,
amqp_channel_status_interval: @default_channel_status_interval
)
|> Keyword.get(:amqp_channel_status_interval, @default_channel_status_interval)
end
end