lib/amqp/gen/consumer.ex

defmodule Amqpx.Gen.Consumer do
  @moduledoc """
  Generic implementation of amqp consumer
  """
  require Logger
  use GenServer
  import Amqpx.Core
  alias Amqpx.{Basic, Channel}

  defstruct [
    :channel,
    :handler_module,
    :handler_state,
    prefetch_count: 50,
    backoff: 5_000,
    connection_name: Amqpx.Gen.ConnectionManager,
    requeue_on_reject: true
  ]

  @type state() :: %__MODULE__{}

  @callback setup(Channel.t()) :: {:ok, map()} | {:error, any()}
  @callback handle_message(any(), map(), map()) :: {:ok, map()} | {:error, any()}
  @callback handle_message_rejection(message :: any(), error :: any()) :: :ok | {:error, any()}
  @optional_callbacks handle_message_rejection: 2

  @gen_server_opts [:name, :timeout, :debug, :spawn_opt, :hibernate_after]

  @spec start_link(opts :: map()) :: GenServer.server()
  def start_link(opts) do
    {gen_server_opts, opts} = Map.split(opts, @gen_server_opts)

    GenServer.start_link(__MODULE__, opts, Map.to_list(gen_server_opts))
  end

  def init(opts) do
    Process.flag(:trap_exit, true)

    state = struct(__MODULE__, opts)
    Process.send(self(), :setup, [])
    {:ok, state}
  end

  def handle_info(
        :setup,
        %{
          backoff: backoff,
          prefetch_count: prefetch_count,
          handler_module: handler_module,
          connection_name: connection_name
        } = state
      ) do
    case GenServer.call(connection_name, :get_connection) do
      nil ->
        :timer.sleep(backoff)
        {:stop, :not_ready, state}

      connection ->
        {:ok, channel} = Channel.open(connection, self())
        Process.monitor(channel.pid)
        state = %{state | channel: channel}

        Basic.qos(channel, prefetch_count: prefetch_count)

        {:ok, handler_state} = handler_module.setup(channel)
        state = %{state | handler_state: handler_state}

        {:noreply, state}
    end
  end

  # Confirmation sent by the broker after registering this process as a consumer
  def handle_info({:"basic.consume_ok", _consumer_tag}, state) do
    {:noreply, state}
  end

  # Sent by the broker when the consumer is unexpectedly cancelled
  def handle_info({:server_cancel, {:"basic.cancel", _consumer_tag, _nowait}}, state) do
    {:stop, :basic_cancel, state}
  end

  # Received if we call Basic.cancel
  def handle_info({:"basic.cancel", _consumer_tag, _nowait}, state) do
    {:noreply, state, :hibernate}
  end

  # Confirmation sent by the broker to the consumer process after a Basic.cancel
  def handle_info({:"basic.cancel_ok", _consumer_tag}, state) do
    {:stop, :basic_cancel_ok, state}
  end

  def handle_info(
        {basic_deliver(
           consumer_tag: consumer_tag,
           delivery_tag: delivery_tag,
           redelivered: redelivered,
           exchange: exchange,
           routing_key: routing_key
         ),
         amqp_msg(
           props:
             p_basic(
               content_type: content_type,
               content_encoding: content_encoding,
               headers: headers,
               delivery_mode: delivery_mode,
               priority: priority,
               correlation_id: correlation_id,
               reply_to: reply_to,
               expiration: expiration,
               message_id: message_id,
               timestamp: timestamp,
               type: type,
               user_id: user_id,
               app_id: app_id,
               cluster_id: cluster_id
             ),
           payload: payload
         )},
        state
      ) do
    meta = %{
      consumer_tag: consumer_tag,
      delivery_tag: delivery_tag,
      redelivered: redelivered,
      exchange: exchange,
      routing_key: routing_key,
      content_type: content_type,
      content_encoding: content_encoding,
      headers: headers,
      persistent: delivery_mode == 2,
      priority: priority,
      correlation_id: correlation_id,
      reply_to: reply_to,
      expiration: expiration,
      message_id: message_id,
      timestamp: timestamp,
      type: type,
      user_id: user_id,
      app_id: app_id,
      cluster_id: cluster_id
    }

    {:noreply, handle_message(payload, meta, state)}
  end

  def handle_info(
        {basic_consume(
           ticket: _ticket,
           queue: _queue,
           consumer_tag: _consumer_tag,
           no_local: _no_local,
           no_ack: _no_ack,
           exclusive: _exclusive,
           nowait: _nowait,
           arguments: _arguments
         ), _pid},
        state
      ) do
    {:noreply, state}
  end

  def handle_info({_ref, {:ok, _port, _pid}}, state) do
    Logger.debug("Amqpx socket probably restarted by underlying library")
    {:noreply, state}
  end

  # Error handling

  def handle_info({_ref, {:error, :no_socket, _pid}}, state) do
    Logger.debug("Amqpx socket not found")
    {:noreply, state}
  end

  def handle_info({:DOWN, _, :process, _pid, reason}, state) do
    Logger.info("Monitored channel process crashed: #{inspect(reason)}. Restarting...")
    state = %{state | channel: nil}
    {:stop, :channel_exited, state}
  end

  def handle_info({:EXIT, _, :normal}, state), do: {:noreply, state}

  def handle_info(message, state) do
    Logger.warn("Unknown message received #{inspect(message)}")
    {:noreply, state}
  end

  def terminate(_, %__MODULE__{channel: nil}), do: nil

  def terminate(reason, %__MODULE__{channel: channel}) do
    case reason do
      :normal -> close_channel(channel)
      :shutdown -> close_channel(channel)
      {:shutdown, _} -> close_channel(channel)
      _ -> :ok
    end
  end

  # Private functions

  defp close_channel(%{pid: pid} = channel) do
    if Process.alive?(pid) do
      Channel.close(channel)

      receive do
        {:DOWN, _, :process, ^pid, _reason} ->
          :ok
      end
    end
  end

  defp handle_message(
         message,
         %{delivery_tag: tag, redelivered: redelivered} = meta,
         %__MODULE__{
           handler_module: handler_module,
           handler_state: handler_state,
           backoff: backoff,
           requeue_on_reject: requeue_on_reject
         } = state
       ) do
    {:ok, handler_state} = handler_module.handle_message(message, meta, handler_state)
    Basic.ack(state.channel, tag)
    %{state | handler_state: handler_state}
  rescue
    e in _ ->
      Logger.error(Exception.format(:error, e, __STACKTRACE__))

      Task.start(fn ->
        :timer.sleep(backoff)

        is_message_to_reject =
          function_exported?(handler_module, :handle_message_rejection, 2) &&
            (!requeue_on_reject || (redelivered && requeue_on_reject))

        if is_message_to_reject do
          handler_module.handle_message_rejection(message, e)
        end

        Basic.reject(state.channel, tag, requeue: requeue_on_reject && !redelivered)
      end)

      state
  end
end