lib/lapin/connection.ex

defmodule Lapin.Connection do
  @moduledoc """
  RabbitMQ connection handler

  This module handles the RabbitMQ connection. It also provides a behaviour for
  worker module implementation. The worker module should use the `Lapin.Connection`
  behaviour and implement the callbacks it needs.

  When using the `Lapin.Connection` behaviour a `publish/4` function is injected in
  the worker module as a shortcut to the `Lapin.Connection.publish/5` function
  which removes the need for passing in the connection and is publicly callable
  to publish messages on the connection configured for the implementing module.
  """

  use Connection

  require Logger

  alias AMQP.Channel
  alias Lapin.{Consumer, Exchange, Message, Producer, Queue}
  alias Lapin.Message.Payload

  @typedoc """
  Connection configuration

  The following keys are supported:
    - module: module using the `Lapin.Connection` behaviour
    - uri: AMQP URI (String.t | URI.t)
    - host: broker hostname (string | charlist), *default: 'localhost'*
    - port: broker port (string | integer), *default: 5672*
    - virtual_host: broker vhost (string), *default: "/"*
    - username: username (string)
    - password: password (string)
    - auth_mechanisms: broker auth_mechanisms ([:amqplain | :external | :plain]), *default: amqp_client default*
    - ssl_options: ssl options ([:ssl:ssl_option]), *default: none*
    - producers: producers to configure ([Producer.config]), *default: []*
    - consumers: consumers to configure ([Consumer.config]), *default: []*
  """
  @type config :: [consumers: [Consumer.config()], producers: [Producer.config()]]

  @typedoc "Connection"
  @type t :: GenServer.server()

  @typedoc "Callback result"
  @type on_callback :: :ok | {:error, message :: String.t()}

  @typedoc "Reason for message rejection"
  @type reason :: term

  @typedoc "`handle_deliver/2` callback result"
  @type on_deliver :: :ok | {:reject, reason} | term

  @doc """
  Called when receiving a `basic.cancel` from the broker.
  """
  @callback handle_cancel(Channel.t()) :: on_callback

  @doc """
  Called when receiving a `basic.cancel_ok` from the broker.
  """
  @callback handle_cancel_ok(Channel.t()) :: on_callback

  @doc """
  Called when receiving a `basic.consume_ok` from the broker.

  This signals successul registration as a consumer.
  """
  @callback handle_consume_ok(Channel.t()) :: on_callback

  @doc """
  Called when receiving a `basic.deliver` from the broker.

  Return values from this callback determine message acknowledgement:
    - `:ok`: Message was processed by the consumer and should be removed from queue
    - `{:reject, reason}`: Message was not processed and should be rejected

  Any other return value requeues the message to prevent data loss.
  A crash in the callback code will however reject the message to prevent loops
  if the message was already delivered before.

  The `reason` term can be used by the application
  to signal the reason of rejection and is logged in debug.
  """
  @callback handle_deliver(Channel.t(), Message.t()) :: on_deliver

  @doc """
  Called when completing a `basic.publish` with the broker.

  Message transmission to the broker is successful when this callback is called.
  """
  @callback handle_publish(Channel.t(), Message.t()) :: on_callback

  @doc """
  Called when receiving a `basic.return` from the broker.

  This signals an undeliverable returned message from the broker.
  """
  @callback handle_return(Channel.t(), Message.t()) :: on_callback

  @doc """
  Called before `handle_deliver/2` to get the payload type.

  Should return a data type instance to decode the payload into.
  A `Lapin.Message.Payload` implementation must be provided for this type. The
  default implementation leaves the payload unaltered.
  """
  @callback payload_for(Channel.t(), Message.t()) :: Payload.t()

  defmacro __using__(_) do
    quote do
      alias Lapin.{Consumer, Message}

      @behaviour Lapin.Connection

      def handle_cancel(_consumer), do: :ok
      def handle_cancel_ok(_consumer), do: :ok
      def handle_consume_ok(_consumer), do: :ok
      def handle_deliver(_consumer, _message), do: :ok
      def handle_publish(_consumer, _message), do: :ok
      def handle_return(_consumer, _message), do: :ok
      def payload_for(_consumer, _message), do: <<>>

      defoverridable Lapin.Connection

      def publish(exchange, routing_key, message, options \\ []) do
        Lapin.Connection.publish(__MODULE__, exchange, routing_key, message, options)
      end
    end
  end

  @backoff 1_000
  @connection_default_params [connection_timeout: @backoff]
  @default_rabbitmq_host 'localhost'
  @default_rabbitmq_port 5672

  @doc """
  Starts a `Lapin.Connection` with the specified configuration
  """
  @spec start_link(config, options :: GenServer.options()) :: GenServer.on_start()
  def start_link(configuration, options \\ []) do
    {:ok, configuration} = cleanup_configuration(configuration)
    Connection.start_link(__MODULE__, configuration, options)
  end

  def init(configuration) do
    Process.flag(:trap_exit, true)

    {:connect, :init,
     %{configuration: configuration, consumers: [], producers: [], connection: nil, module: nil}}
  end

  @doc """
  Closes the connection
  """
  @spec close(connection :: t) :: on_callback()
  def close(connection), do: GenServer.stop(connection)

  def terminate(_reason, %{connection: nil}), do: :ok

  def terminate(_reason, %{connection: connection}) do
    AMQP.Connection.close(connection)
  end

  @doc """
  Publishes a message to the specified exchange with the given routing_key
  """
  @spec publish(
          connection :: t(),
          String.t(),
          String.t(),
          Payload.t(),
          options :: Keyword.t()
        ) :: on_callback
  def publish(connection, exchange, routing_key, payload, options \\ []) do
    Connection.call(connection, {:publish, exchange, routing_key, payload, options})
  end

  def handle_call(
        {:publish, _exchange, _routing_key, _payload, _options},
        _from,
        %{connection: nil} = state
      ) do
    {:reply, {:error, :not_connected}, state}
  end

  def handle_call(
        {:publish, exchange, routing_key, payload, options},
        _from,
        %{producers: producers, module: module} = state
      ) do
    with {:ok, %Producer{pattern: pattern} = producer} <- Producer.get(producers, exchange),
         mandatory <- pattern.mandatory(producer),
         persistent <- pattern.persistent(producer),
         options <- Keyword.merge([mandatory: mandatory, persistent: persistent], options),
         meta <- %{content_type: Payload.content_type(payload)},
         {:ok, payload} <- Payload.encode(payload),
         :ok <- Producer.publish(producer, exchange, routing_key, payload, options) do
      message = %Message{meta: Enum.into(options, meta), payload: payload}

      if not pattern.confirm(producer) or Producer.confirm(producer) do
        Logger.debug(fn -> "Published #{inspect(message)} on #{inspect(producer)}" end)
        {:reply, module.handle_publish(producer, message), state}
      else
        error = "Error publishing #{inspect(message)}"
        Logger.debug(fn -> error end)
        {:reply, {:error, error}, state}
      end
    else
      {:error, error} ->
        Logger.debug(fn -> "Error sending message: #{inspect(error)}" end)
        {:reply, {:error, error}, state}
    end
  end

  def handle_info(
        {:basic_cancel, %{consumer_tag: consumer_tag}},
        %{consumers: consumers, module: module} = state
      ) do
    case Consumer.get(consumers, consumer_tag) do
      {:ok, consumer} ->
        Logger.debug(fn -> "Broker cancelled consumer for #{inspect(consumer)}" end)
        module.handle_cancel(consumer)

      {:error, :not_found} ->
        Logger.warn(
          "Broker cancelled consumer_tag '#{consumer_tag}' for locally unknown consumer"
        )
    end

    {:stop, :normal, state}
  end

  def handle_info(
        {:basic_cancel_ok, %{consumer_tag: consumer_tag}},
        %{consumers: consumers, module: module} = state
      ) do
    with {:ok, consumer} <- Consumer.get(consumers, consumer_tag),
         :ok <- module.handle_cancel_ok(consumer) do
      Logger.debug(fn -> "Broker confirmed cancelling consumer for #{inspect(consumer)}" end)
    else
      {:error, :not_found} ->
        Logger.debug(fn ->
          "Broker confirmed cancelling consumer for locally unknown tag '#{consumer_tag}'"
        end)

      error ->
        Logger.error("Error handling broker cancel for '#{consumer_tag}': #{inspect(error)}")
    end

    {:noreply, state}
  end

  def handle_info(
        {:basic_consume_ok, %{consumer_tag: consumer_tag}},
        %{consumers: consumers, module: module} = state
      ) do
    with {:ok, consumer} <- Consumer.get(consumers, consumer_tag),
         :ok <- module.handle_consume_ok(consumer) do
      Logger.debug(fn -> "Broker registered consumer for #{inspect(consumer)}" end)
    else
      {:error, :not_found} ->
        Logger.warn(
          "Broker registered consumer_tag '#{consumer_tag}' for locally unknown consumer"
        )

      error ->
        Logger.error("Error handling broker register for '#{consumer_tag}': #{inspect(error)}")
    end

    {:noreply, state}
  end

  def handle_info(
        {:basic_return, payload, %{exchange: exchange} = meta},
        %{producers: producers, module: module} = state
      ) do
    message = %Message{meta: meta, payload: payload}

    with {:ok, producer} <- Producer.get(producers, exchange),
         :ok <- module.handle_return(producer, message) do
      Logger.debug(fn -> "Broker returned message #{inspect(message)}" end)
    else
      {:error, :not_found} ->
        Logger.warn("Broker returned message #{inspect(message)} for locally unknown channel")

      error ->
        Logger.debug(fn -> "Error handling returned message: #{inspect(error)}" end)
    end

    {:noreply, state}
  end

  def handle_info({:DOWN, _, :process, _pid, _reason}, state) do
    Logger.warn("Connection down, restarting...")
    {:stop, :normal, state}
  end

  def handle_info(
        {:basic_deliver, payload, %{consumer_tag: consumer_tag} = meta},
        %{consumers: consumers, module: module} = state
      ) do
    message = %Message{meta: meta, payload: payload}

    case Consumer.get(consumers, consumer_tag) do
      {:ok, consumer} ->
        spawn(fn -> consume(module, consumer, message) end)

      {:error, :not_found} ->
        Logger.error("Error processing message #{inspect(message)}, no local consumer")
    end

    {:noreply, state}
  end

  defp consume(
         module,
         %Consumer{pattern: pattern} = consumer,
         %Message{
           meta: %{delivery_tag: delivery_tag, redelivered: redelivered} = meta,
           payload: payload
         } = message
       ) do
    with ack <- pattern.ack(consumer),
         payload_for <- module.payload_for(consumer, message),
         content_type <- Payload.content_type(payload_for),
         meta <- Map.put(meta, :content_type, content_type),
         {:ok, payload} <- Payload.decode_into(payload_for, payload),
         message <- %Message{message | meta: meta, payload: payload},
         :ok <- module.handle_deliver(consumer, message) do
      Logger.debug(fn -> "Consuming message #{delivery_tag}" end)
      consume_ack(ack, consumer, delivery_tag)
    else
      {:reject, reason} ->
        case Consumer.reject_message(consumer, delivery_tag, false) do
          :ok ->
            Logger.error("Rejected message #{delivery_tag}: #{inspect(reason)}")
            :ok

          {:error, reason} ->
            Logger.debug("Failed rejecting message #{delivery_tag}: #{inspect(reason)}")
        end

      reason ->
        case Consumer.reject_message(consumer, delivery_tag, not redelivered) do
          :ok ->
            Logger.error("Rejected message #{delivery_tag}: #{inspect(reason)}")
            :ok

          {:error, reason} ->
            Logger.debug("Failed rejecting message #{delivery_tag}: #{inspect(reason)}")
        end
    end
  rescue
    exception ->
      case Consumer.reject_message(consumer, delivery_tag, not redelivered) do
        :ok ->
          Logger.error(
            "Rejected message #{delivery_tag}: #{Exception.format(:error, exception, __STACKTRACE__)}"
          )

          :ok

        {:error, reason} ->
          Logger.debug("Failed rejecting message #{delivery_tag}: #{inspect(reason)}")
      end
  end

  defp consume_ack(true = _consumer_ack, consumer, delivery_tag) do
    case Consumer.ack_message(consumer, delivery_tag) do
      :ok ->
        Logger.debug("Consumed message #{delivery_tag}, ACK sent")
        :ok

      error ->
        Logger.debug("ACK failed for message #{delivery_tag}")
        error
    end
  end

  defp consume_ack(false = _ack, _channel, delivery_tag) do
    Logger.debug(fn -> "Consumed message #{delivery_tag}, ACK not required" end)
    :ok
  end

  def connect(_info, %{configuration: configuration} = state) do
    module = Keyword.get(configuration, :module)

    with configuration <- Keyword.merge(@connection_default_params, configuration),
         {:ok, connection} <- AMQP.Connection.open(configuration),
         {:ok, config_channel} <- Channel.open(connection),
         {:ok, exchanges} <- declare_exchanges(configuration, config_channel),
         {:ok, queues} <- declare_queues(configuration, config_channel),
         :ok <- bind_exchanges(exchanges, config_channel),
         :ok <- bind_queues(queues, config_channel),
         {:ok, producers} <- create_producers(configuration, connection),
         {:ok, consumers} <- create_consumers(configuration, connection),
         :ok <- Channel.close(config_channel) do
      Process.monitor(connection.pid)

      {:ok,
       %{
         state
         | module: module,
           producers: producers,
           consumers: consumers,
           connection: connection
       }}
    else
      {:error, error} ->
        Logger.error(fn ->
          "Connection error: #{inspect(error)} for #{module}, backing off for #{@backoff}"
        end)

        {:backoff, @backoff, state}
    end
  end

  defp declare_exchanges(configuration, channel) do
    exchanges =
      configuration
      |> Keyword.get(:exchanges, [])
      |> Enum.map(fn {name, options} ->
        name
        |> Atom.to_string()
        |> Exchange.new(options)
      end)

    {Enum.each(exchanges, &Exchange.declare(&1, channel)), exchanges}
  end

  defp bind_exchanges(exchanges, channel), do: Enum.each(exchanges, &Exchange.bind(&1, channel))

  defp declare_queues(configuration, channel) do
    queues =
      configuration
      |> Keyword.get(:queues, [])
      |> Enum.map(fn {name, options} ->
        name
        |> Atom.to_string()
        |> Queue.new(options)
      end)

    {Enum.each(queues, &Queue.declare(&1, channel)), queues}
  end

  defp bind_queues(queues, channel), do: Enum.each(queues, &Queue.bind(&1, channel))

  defp create_producers(configuration, connection) do
    producers =
      configuration
      |> Keyword.get(:producers, [])
      |> Enum.map(&Producer.create(connection, &1))

    {:ok, producers}
  end

  defp create_consumers(configuration, connection) do
    consumers =
      configuration
      |> Keyword.get(:consumers, [])
      |> Enum.map(&Consumer.create(connection, &1))

    {:ok, consumers}
  end

  defp cleanup_configuration(configuration) do
    with :ok <- check_mandatory_params(configuration, [:module]),
         {uri, configuration} <-
           Keyword.get_and_update(configuration, :uri, fn uri ->
             {map_uri(uri), :pop}
           end),
         configuration <- Keyword.merge(configuration, uri),
         {_, configuration} <-
           Keyword.get_and_update(configuration, :host, fn host ->
             {host, map_host(host)}
           end),
         {_, configuration} <-
           Keyword.get_and_update(configuration, :port, fn port ->
             {port, map_port(port)}
           end),
         {_, configuration} <-
           Keyword.get_and_update(configuration, :virtual_host, fn vhost ->
             {vhost, map_vhost(vhost)}
           end),
         {_, configuration} <-
           Keyword.get_and_update(configuration, :auth_mechanisms, fn
             mechanisms when is_list(mechanisms) ->
               {mechanisms, Enum.map(mechanisms, &map_auth_mechanism(&1))}

             _ ->
               :pop
           end) do
      {:ok, configuration}
    else
      {:error, :missing_params, missing_params} ->
        params = Enum.join(missing_params, ", ")

        error =
          "Error creating connection #{inspect(configuration)}: missing mandatory params: #{params}"

        Logger.error(error)
        {:error, error}
    end
  end

  defp map_uri(nil), do: []

  defp map_uri(uri) when is_binary(uri) do
    uri
    |> URI.parse()
    |> map_uri()
  end

  defp map_uri(%URI{} = uri) do
    uri
    |> Map.from_struct()
    |> Enum.to_list()
    |> uri_to_list()
  end

  defp uri_to_list(uri) when is_list(uri) do
    with {path, uri} <- Keyword.pop(uri, :path),
         {userinfo, uri} <- Keyword.pop(uri, :userinfo),
         uri <- Keyword.drop(uri, [:authority, :query, :fragment, :scheme]),
         [username, password] <- map_userinfo(userinfo) do
      uri
      |> Keyword.put(:virtual_host, map_vhost(path))
      |> Keyword.put(:username, username)
      |> Keyword.put(:password, password)
      |> Enum.reject(fn {_k, v} -> v === nil end)
    end
  end

  defp map_userinfo(userinfo) when is_binary(userinfo) do
    parts =
      userinfo
      |> String.split(":", parts: 2)

    [Enum.at(parts, 0), Enum.at(parts, 1)]
  end

  defp map_userinfo(_), do: [nil, nil]

  defp map_vhost(nil), do: "/"

  defp map_vhost(path) do
    case String.replace_leading(path, "/", "") do
      "" -> "/"
      vhost -> vhost
    end
  end

  defp map_auth_mechanism(:amqplain), do: &:amqp_auth_mechanisms.amqplain/3
  defp map_auth_mechanism(:external), do: &:amqp_auth_mechanisms.external/3
  defp map_auth_mechanism(:plain), do: &:amqp_auth_mechanisms.plain/3
  defp map_auth_mechanism(auth_mechanism), do: auth_mechanism

  defp map_host(nil), do: @default_rabbitmq_host
  defp map_host(host) when is_binary(host), do: String.to_charlist(host)
  defp map_host(host), do: host

  defp map_port(nil), do: @default_rabbitmq_port
  defp map_port(port) when is_binary(port), do: String.to_integer(port)
  defp map_port(port), do: port

  defp check_mandatory_params(configuration, params) do
    if Enum.all?(params, &Keyword.has_key?(configuration, &1)) do
      :ok
    else
      missing_params = Enum.reject(params, &Keyword.has_key?(configuration, &1))
      {:error, :missing_params, missing_params}
    end
  end
end