lib/lapin/producer.ex

defmodule Lapin.Producer do
  @moduledoc """
  Extensible behaviour to define producer configuration.

  Lapin provides a number of submodules which implement the patterns found in
  the [RabbitMQ Tutorials](http://www.rabbitmq.com/getstarted.html).

  ```
  defmodule ExampleApp.SomePatter do
    use Lapin.Producer

    [... callbacks implementation ...]
  end
  ```
  """

  require Logger

  alias AMQP.{Basic, Channel, Confirm, Connection}
  alias Lapin.{Exchange, Message}

  @doc """
  Request publisher confirms (RabbitMQ only)
  """
  @callback confirm(producer :: t()) :: boolean()

  @doc """
  Declare exchange
  """
  @callback exchange(producer :: t()) :: Exchange.t()

  @doc """
  Request message persistence when publishing
  """
  @callback persistent(producer :: t()) :: boolean()

  @doc """
  Request message mandatory routing when publishing
  """
  @callback mandatory(producer :: t()) :: boolean()

  defmacro __using__([]) do
    quote do
      alias Lapin.Producer

      @behaviour Producer

      def confirm(%Producer{config: config}), do: Keyword.get(config, :confirm, false)
      def exchange(%Producer{config: config}), do: Keyword.fetch!(config, :exchange)
      def mandatory(%Producer{config: config}), do: Keyword.get(config, :mandatory, false)
      def persistent(%Producer{config: config}), do: Keyword.get(config, :persistent, false)

      defoverridable Producer
    end
  end

  @typedoc """
  Producer configuration

  The following keys are supported:
    - pattern: producer pattern (module using the `Lapin.Producer` behaviour)

  If using the `Lapin.Producer.Config` default implementation, the following keys are also supported:
    - exchange: exchange used for publish (`String.t`, *required*)
    - confirm: expect RabbitMQ publish confirms (`boolean()`, *default: false*)
    - mandatory: messages published as mandatory by default (`boolean()`, *default: false*)
    - persistent: messages published as persistent by default (`boolean()`, *default: false*)
  """
  @type config :: Keyword.t()

  @typedoc "Lapin Producer"
  @type t :: %__MODULE__{
          channel: Channel.t(),
          pattern: atom,
          config: config,
          exchange: String.t()
        }
  defstruct channel: nil,
            pattern: nil,
            config: nil,
            exchange: nil

  @doc """
  Creates a producer from configuration
  """
  @spec create(Connection.t(), config) :: t
  def create(connection, config) do
    pattern = Keyword.get(config, :pattern, Lapin.Producer.Config)
    producer = %__MODULE__{config: config, pattern: pattern}

    with {:ok, channel} <- Channel.open(connection),
         producer <- %{producer | channel: channel},
         exchange <- pattern.exchange(producer),
         :ok <- set_confirm(producer, pattern.confirm(producer)) do
      %{producer | exchange: exchange}
    else
      {:error, error} ->
        Logger.error("Error creating producer from config #{config}: #{inspect(error)}")
        producer
    end
  end

  @doc """
  Find consumer by consumer_tag
  """
  @spec get([t], String.t()) :: {:ok, t} | {:error, :not_found}
  def get(producers, exchange) do
    case Enum.find(producers, &(&1.exchange == exchange)) do
      nil -> {:error, :not_found}
      producer -> {:ok, producer}
    end
  end

  @doc """
  Publish message
  """
  @spec publish(t, Exchange.name(), Exchange.routing_key(), Message.payload(), Keyword.t()) ::
          :ok | {:error, term}
  def publish(%{channel: channel}, exchange, routing_key, payload, options) do
    Basic.publish(channel, exchange, routing_key, payload, options)
  end

  @doc """
  Wait for publish confirmation
  """
  @spec confirm(t) :: boolean()
  def confirm(%{channel: channel}) do
    case Confirm.wait_for_confirms(channel) do
      true -> true
      _ -> false
    end
  end

  defp set_confirm(_producer, false = _confirm), do: :ok

  defp set_confirm(%{channel: channel}, true = _confirm) do
    with :ok <- Confirm.select(channel),
         :ok <- Basic.return(channel, self()) do
      :ok
    else
      error ->
        error
    end
  end
end