lib/lapin/consumer.ex

defmodule Lapin.Consumer do
  @moduledoc """
  Extensible behaviour to define consumer configuration.

  Lapin provides a number of submodules which implement the patterns found in
  the [RabbitMQ Tutorials](http://www.rabbitmq.com/getstarted.html).

  ```
  defmodule ExampleApp.SomeConsumer do
    use Lapin.Consumer

    [... callbacks implementation ...]
  end
  ```
  """

  require Logger

  alias AMQP.{Basic, Channel, Connection}
  alias Lapin.Queue

  @typedoc "Consumer Tag"
  @type consumer_tag :: String.t()

  @typedoc """
  Consumer configuration

  The following keys are supported:
    - pattern: producer pattern (module using the `Lapin.Producer` behaviour)

  If using the `Lapin.Consumer.Config` default implementation, the following keys are also supported:
    - queue: queue to consume from, (`String.t()`, *required*)
    - ack: producer ack (`boolean()`, default: false*
    - prefetch_count: consumer prefetch count (`integer()`, *default: 1*)
  """
  @type config :: Keyword.t()

  @typedoc "Consumer Prefetch"
  @type prefetch_count :: integer

  @doc """
  Consumer acknowledgements enabled
  """
  @callback ack(consumer :: t()) :: boolean

  @doc """
  Consumer message prefetch count
  """
  @callback prefetch_count(consumer :: t()) :: prefetch_count()

  @doc """
  Queue to consume from
  """
  @callback queue(consumer :: t()) :: Queue.t()

  defmacro __using__([]) do
    quote do
      alias Lapin.Consumer

      @behaviour Consumer

      def ack(%Consumer{config: config}), do: Keyword.get(config, :ack, false)
      def prefetch_count(%Consumer{config: config}), do: Keyword.get(config, :prefetch_count, 1)
      def queue(%Consumer{config: config}), do: Keyword.fetch!(config, :queue)

      defoverridable Consumer
    end
  end

  @typedoc "Lapin Consumer Behaviour"
  @type t :: %__MODULE__{
          channel: Channel.t(),
          consumer_tag: consumer_tag(),
          pattern: atom,
          config: config(),
          queue: String.t()
        }
  defstruct channel: nil,
            consumer_tag: nil,
            pattern: nil,
            config: nil,
            queue: nil

  @doc """
  Creates a consumer from configuration
  """
  @spec create(Connection.t(), config) :: t
  def create(connection, config) do
    pattern = Keyword.get(config, :pattern, Lapin.Consumer.Config)
    consumer = %__MODULE__{config: config, pattern: pattern}

    with {:ok, channel} <- Channel.open(connection),
         consumer <- %{consumer | channel: channel},
         queue <- pattern.queue(consumer),
         :ok <- set_prefetch_count(consumer, pattern.prefetch_count(consumer)),
         {:ok, consumer_tag} <- consume(consumer, queue) do
      %{consumer | consumer_tag: consumer_tag, queue: queue}
    else
      {:error, error} ->
        Logger.error("Error creating consumer from config #{config}: #{inspect(error)}")
        consumer
    end
  end

  @doc """
  Find consumer by consumer_tag
  """
  @spec get([t], consumer_tag) :: {:ok, t} | {:error, :not_found}
  def get(consumers, consumer_tag) do
    case Enum.find(consumers, &(&1.consumer_tag == consumer_tag)) do
      nil -> {:error, :not_found}
      channel -> {:ok, channel}
    end
  end

  @doc """
  Reject message
  """
  @spec reject_message(t, integer, boolean()) :: :ok | {:error, term}
  def reject_message(%{channel: channel}, delivery_tag, requeue) do
    case Basic.reject(channel, delivery_tag, requeue: requeue) do
      :ok ->
        Logger.debug("#{if requeue, do: "Requeued", else: "Rejected"} message #{delivery_tag}")

      error ->
        Logger.error(
          "Error #{if requeue, do: "requeueing", else: "rejecting"} message #{delivery_tag}: #{inspect(error)}"
        )

        error
    end
  end

  @doc """
  ACK message consumption
  """
  @spec ack_message(t, integer) :: :ok | {:error, term}
  def ack_message(%{channel: channel}, delivery_tag) do
    Basic.ack(channel, delivery_tag)
  end

  defp consume(_consumer, nil = _queue), do: {:ok, nil}

  defp consume(%{channel: channel, pattern: pattern} = consumer, queue) do
    Basic.consume(channel, queue, nil, no_ack: not pattern.ack(consumer))
  end

  defp set_prefetch_count(_consumer, nil = _prefetch_count), do: :ok

  defp set_prefetch_count(%{channel: channel}, prefetch_count) do
    Basic.qos(channel, prefetch_count: prefetch_count)
  end
end