lib/step_flow/amqp/common_consumer.ex

defmodule StepFlow.Amqp.CommonConsumer do
  @moduledoc """
  Definition of a Common Consumer of RabbitMQ queue.

  To implement a consumer,

  ```elixir
  defmodule MyModule do
    use StepFlow.Amqp.CommonConsumer, %{
      queue: "name_of_the_rabbit_mq_queue",
      exchange "name_of_exchange",
      consumer: &MyModule.consume/4
    }

    def consume(channel, tag, redelivered, payload) do
      ...
      Basic.ack(channel, tag)
    end
  end
  ```

  """

  require Logger

  alias StepFlow.Amqp.CommonConsumer
  alias StepFlow.Amqp.Helpers

  @default_channel_status_interval 10_000

  @doc false
  defmacro __using__(opts) do
    quote do
      use GenServer
      use AMQP

      alias StepFlow.Amqp.CommonEmitter
      alias StepFlow.Amqp.Helpers

      def child_spec(_) do
        %{
          id: __MODULE__,
          start: {__MODULE__, :start_link, []},
          type: :worker
        }
      end

      @doc false
      def start_link do
        GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
      end

      @impl true
      @doc false
      def init(:ok) do
        result = rabbitmq_connect()

        Process.send_after(self(), :check, CommonConsumer.get_channel_check_interval())

        result
      end

      def disconnect do
        GenServer.call(__MODULE__, :disconnect)
      end

      @impl true
      def handle_call(:disconnect, _from, state) do
        state = CommonConsumer.rabbitmq_disconnect(state)

        {:reply, :ok, state}
      end

      @impl true
      # Confirmation sent by broker after registering this process as consumer
      def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, state) do
        {:noreply, state}
      end

      @impl true
      # Sent by broker when consumer is unexpectedly cancelled (such as after queue deletion)
      def handle_info({:basic_cancel, %{consumer_tag: _consumer_tag}}, state) do
        {:stop, :normal, state}
      end

      @impl true
      # Confirmation sent by broker to consumer process after a Basic.cancel
      def handle_info({:basic_cancel_ok, %{consumer_tag: _consumer_tag}}, state) do
        {:noreply, state}
      end

      @impl true
      def handle_info(
            {:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered} = headers},
            state
          ) do
        queue = unquote(opts).queue

        Logger.info("#{__MODULE__}: receive message on queue: #{queue}")

        max_retry_to_timeout =
          StepFlow.Configuration.get_var_value(StepFlow.Amqp, :max_retry_to_timeout, 10)

        Logger.debug("#{__MODULE__} #{inspect(headers)}")

        max_retry_reached =
          with headers when headers != :undefined <- Map.get(headers, :headers),
               {"x-death", :array, death} <- List.keyfind(headers, "x-death", 0),
               {:table, table} <- List.first(death),
               {"count", :long, count} <- List.keyfind(table, "count", 0) do
            count > max_retry_to_timeout
          else
            _ -> false
          end

        if max_retry_reached do
          Logger.info("#{__MODULE__}: timeout message sent to queue: #{queue}_timeout")
          CommonEmitter.publish(queue <> "_timeout", payload, [], "job_response")
          AMQP.Basic.ack(state.channel, tag)
        else
          # Check if payload is json
          case payload |> Jason.decode() do
            {:ok, data} ->
              unquote(opts).consumer.(state.channel, tag, redelivered, data)

            {:error, _} ->
              Logger.error("#{__MODULE__}: Payload is not a json: #{payload}")

              AMQP.Basic.reject(state.channel, tag, requeue: false)
          end
        end

        {:noreply, state}
      end

      @impl true
      def handle_info(:check, state) do
        Logger.debug(
          "[#{__MODULE__}] Check AMQP channel status. Current channel: #{inspect(state.channel)}"
        )

        {:ok, state} =
          if Process.alive?(state.channel.pid) do
            {:ok, state}
          else
            state = CommonConsumer.rabbitmq_disconnect(state)
            rabbitmq_connect()
          end

        Process.send_after(self(), :check, CommonConsumer.get_channel_check_interval())

        {:noreply, state}
      end

      @impl true
      def handle_info({:DOWN, _, :process, pid, reason}, state) do
        {:ok, state} =
          if state.auto_restart do
            rabbitmq_connect()
          else
            {:ok, state}
          end

        {:noreply, state}
      end

      defp rabbitmq_connect do
        url = Helpers.get_amqp_connection_url()
        options = Helpers.get_amqp_connection_options()

        queue = unquote(opts).queue
        exchange_name = unquote(opts).exchange
        prefetch_count = Map.get(unquote(opts), :prefetch_count)

        CommonConsumer.rabbitmq_connect(url, options, queue, exchange_name, prefetch_count)
      end

      @impl true
      def terminate(_reason, state) do
        CommonConsumer.rabbitmq_disconnect(state)
      end
    end
  end

  def rabbitmq_connect(url, options, queue, exchange, prefetch_count) do
    case AMQP.Connection.open(url, options) do
      {:ok, connection} ->
        init_amqp_connection(connection, queue, exchange, prefetch_count)

      {:error, message} ->
        Logger.error("#{__MODULE__}: unable to connect to: #{url}, reason: #{inspect(message)}")

        # Reconnection loop
        :timer.sleep(10_000)
        rabbitmq_connect(url, options, queue, exchange, prefetch_count)
    end
  end

  def init_amqp_connection(connection, queue, exchange_name, prefetch_count \\ nil) do
    Process.monitor(connection.pid)

    {:ok, channel} = AMQP.Channel.open(connection)
    Logger.info("[#{__MODULE__}] init_amqp_connection: opened channel=#{inspect(channel)}")

    if prefetch_count != nil do
      :ok = AMQP.Basic.qos(channel, prefetch_count: prefetch_count)
    end

    CommonConsumer.create_queues(channel, queue)

    Logger.info("#{__MODULE__}: bind #{queue}")
    AMQP.Queue.bind(channel, queue, exchange_name, routing_key: queue)

    Logger.info("#{__MODULE__}: connected to queue #{queue}")

    {:ok, _consumer_tag} = AMQP.Basic.consume(channel, queue)
    {:ok, %{channel: channel, connection: connection, auto_restart: true}}
  end

  def rabbitmq_disconnect(state) do
    Logger.info("#{__MODULE__}: Closing AMQP connection...")
    AMQP.Connection.close(state.connection)

    state
    |> Map.replace(:auto_restart, false)
  end

  def create_queues(channel, queue) do
    queue_type =
      case Helpers.get_amqp_server_configuration() do
        "cluster" -> "quorum"
        _ -> "classic"
      end

    AMQP.Queue.declare(channel, "job_response_not_found",
      durable: true,
      arguments: [{"x-queue-type", :longstr, queue_type}]
    )

    AMQP.Queue.declare(channel, queue <> "_timeout",
      durable: true,
      arguments: [{"x-queue-type", :longstr, queue_type}]
    )

    AMQP.Exchange.topic(channel, "job_response",
      durable: true,
      arguments: [{"alternate-exchange", :longstr, "job_response_not_found"}]
    )

    AMQP.Exchange.topic(channel, "worker_response",
      durable: true,
      arguments: [{"alternate-exchange", :longstr, "worker_response_not_found"}]
    )

    AMQP.Queue.declare(channel, "direct_messaging_not_found",
      durable: true,
      arguments: [{"x-queue-type", :longstr, queue_type}]
    )

    AMQP.Queue.declare(channel, queue <> "_timeout",
      durable: true,
      arguments: [{"x-queue-type", :longstr, queue_type}]
    )

    AMQP.Exchange.declare(channel, "direct_messaging", :headers,
      durable: true,
      arguments: [{"alternate-exchange", :longstr, "direct_messaging_not_found"}]
    )

    AMQP.Exchange.fanout(channel, "job_response_delayed",
      durable: true,
      arguments: [{"x-queue-type", :longstr, queue_type}]
    )

    {:ok, _job_response_delayed_queue} =
      AMQP.Queue.declare(channel, "job_response_delayed",
        durable: true,
        arguments: [
          {"x-message-ttl", :short, 5000},
          {"x-dead-letter-exchange", :longstr, ""},
          {"x-queue-type", :longstr, queue_type}
        ]
      )

    AMQP.Queue.bind(channel, "job_response_delayed", "job_response_delayed", routing_key: "*")

    AMQP.Queue.declare(channel, queue,
      durable: true,
      arguments: [
        {"x-dead-letter-exchange", :longstr, "job_response_delayed"},
        {"x-dead-letter-routing-key", :longstr, queue},
        {"x-queue-type", :longstr, queue_type}
      ]
    )
  end

  def get_channel_check_interval do
    Application.get_env(:step_flow, StepFlow.Workers,
      amqp_channel_status_interval: @default_channel_status_interval
    )
    |> Keyword.get(:amqp_channel_status_interval, @default_channel_status_interval)
  end
end