lib/amqp/direct_consumer.ex

defmodule AMQP.DirectConsumer do
  @moduledoc """
  `AMQP.DirectConsumer` is an example custom consumer. It's argument is a pid
  of a process which the channel is meant to forward the messages to.

  ## Usage

      iex> AMQP.Channel.open(conn, {AMQP.DirectConsumer, self()})

  This will forward all the messages from the channel to the calling process.

  This is an Elixir reimplementation of `:amqp_direct_consumer`. (https://github.com/rabbitmq/rabbitmq-erlang-client/blob/master/src/amqp_direct_consumer.erl)

  For more information see:
  https://www.rabbitmq.com/erlang-client-user-guide.html#consumers-imlementation

  ## Caveat

  ### You are recommended to use the default consumer

  AMQP 2.0 comes with an improved version of the default consumer(`AMQP.SelectiveConsumer`).
  There is not an obvious reason using DirectConsumer after the version 2.0.
  We highly recommend you to use the default consumer which lets you decouple
  the consumer concern from the channel.
  If you still want to keep using `DirectConsumer`, keep reading the caveat below.

  ### Close the channel explicitly

  By default, the DirectConsumer detects the user consumer down then...

  * DirectConsumer returns :error in handle_info which results the process to exit.
  * The channel supervisor will detect the DirectConsumer shutdown but the
    restart will be failing as the user consumer is still down.
  * It ends up the channel process to shut down.

  However this is an abnormal shutdown and can cause an unintended race condition.

  To avoid it, make sure to close the channel explicitly and shut down your
  consumer with a `:normal` signal.

  ### Ignore the user consumer shutdown

  DirectConsumer follows the Erlang version and ignores only `:normal` signals
  for the user consumer exit.

  You might want to also ignore `:shutdown` signals as it can also be called
  before the channel is closed.

  You can set the additional reasons to ignore with the following options:

      iex> opts = [ignore_consumer_down: [:normal, :shutdown]]
      iex> AMQP.Channel.open(conn, {AMQP.DirectConsumer, {self(), opts}})

  You can also ignore the user consumer down completely by setting `true` to the value:

      iex> opts = [ignore_consumer_down: true]
      iex> AMQP.Channel.open(conn, {AMQP.DirectConsumer, {self(), opts}})

  """
  import AMQP.Core
  import AMQP.ConsumerHelper
  @behaviour :amqp_gen_consumer
  @default_options [
    ignore_consumer_down: [:normal]
  ]

  #########################################################
  ### amqp_gen_consumer callbacks
  #########################################################

  @impl true
  def init({pid, options}) do
    _ref = Process.monitor(pid)

    options = %{
      ignore_consumer_down: options[:ignore_consumer_down]
    }

    {:ok, %{consumer: pid, options: options}}
  end

  def init(pid), do: init({pid, @default_options})

  @impl true
  def handle_consume(basic_consume(), _pid, state) do
    # silently discard
    {:ok, state}
  end

  @impl true
  def handle_consume_ok(method, _args, state) do
    send(state.consumer, compose_message(method))

    {:ok, state}
  end

  @impl true
  def handle_cancel(method, state) do
    send(state.consumer, compose_message(method))

    {:ok, state}
  end

  @impl true
  def handle_cancel_ok(method, _args, state) do
    send(state.consumer, compose_message(method))

    {:ok, state}
  end

  @impl true
  def handle_server_cancel(method, state) do
    send(state.consumer, compose_message(method))

    {:ok, state}
  end

  @impl true
  def handle_deliver(_method, _msg, %{consumer: nil} = _state) do
    {:error, :no_consumer, nil}
  end

  def handle_deliver(method, message, state) do
    send(state.consumer, compose_message(method, message))

    {:ok, state}
  end

  @impl true
  def handle_deliver(_, _args, _ctx, %{consumer: nil} = _state) do
    {:error, :no_consumer, nil}
  end

  def handle_deliver(basic_deliver(), _args, _ctx, _state) do
    # There's no support for direct connection
    # This callback implementation should be added with library support
    {:error, :undefined}
  end

  @impl true
  def handle_info(
        {:DOWN, _mref, :process, consumer, reason},
        %{consumer: consumer, options: options} = state
      ) do
    if ignore_consumer_down?(reason, options) do
      # Ignores the user consumer DOWN and sets the pid.
      {:ok, Map.put(state, :consumer, nil)}
    else
      # Exit with the same reason.
      {:error, {:consumer_died, reason}, state}
    end
  end

  def handle_info(down = {:DOWN, _, _, _, _}, state) do
    if state.consumer do
      send(state.consumer, down)
    end

    {:ok, state}
  end

  def handle_info({basic_return() = method, message}, state) do
    send(state.consumer, compose_message(method, message))

    {:ok, state}
  end

  def handle_info(basic_ack() = method, state) do
    send(state.consumer, compose_message(method))

    {:ok, state}
  end

  def handle_info(basic_nack() = method, state) do
    send(state.consumer, compose_message(method))

    {:ok, state}
  end

  @impl true
  def handle_call(
        {:register_return_handler, chan, consumer},
        _from,
        %{consumer: consumer} = state
      ) do
    :amqp_channel.register_return_handler(chan.pid, self())

    {:reply, :ok, state}
  end

  def handle_call(
        {:register_confirm_handler, chan, consumer},
        _from,
        %{consumer: consumer} = state
      ) do
    :amqp_channel.register_confirm_handler(chan.pid, self())

    {:reply, :ok, state}
  end

  def handle_call(_req, _from, state) do
    {:reply, {:error, :undefined}, state}
  end

  @impl true
  def terminate(_reason, state), do: state

  defp ignore_consumer_down?(_reason, %{ignore_consumer_down: true}), do: true

  defp ignore_consumer_down?(reason, %{ignore_consumer_down: reasons}) when is_list(reasons) do
    reason in reasons
  end

  defp ignore_consumer_down?(reason, %{ignore_consumer_down: reason}), do: true
  defp ignore_consumer_down?(_, _), do: false
end