lib/consumer.ex

defmodule HareMq.Consumer do
  defmodule Behaviour do
    @callback consume(map() | binary()) :: :ok | {:ok, any()} | :error | {:error, any()}
  end

  use AMQP

  @moduledoc """
  GenServer module implementing a RabbitMQ consumer.

  This module provides a behavior for RabbitMQ message consumption, including connecting to RabbitMQ, declaring queues, and handling incoming messages.
  """

  defmacro __using__(options) do
    quote location: :keep, generated: true do
      require Logger
      use GenServer

      @reconnect_interval Application.compile_env(:hare_mq, :configuration)[
                            :reconnect_interval_in_ms
                          ] || 10_000
      @opts unquote(options)
      @behaviour HareMq.Consumer.Behaviour
      @before_compile unquote(__MODULE__)

      if(is_nil(@opts[:queue_name])) do
        raise "queue_name can not be empty"
      end

      @config [
        queue_name: @opts[:queue_name],
        routing_key: @opts[:routing_key] || @opts[:queue_name],
        exchange: @opts[:exchange],
        prefetch_count: @opts[:prefetch_count] || 1
      ]

      def start_link(opts \\ []) do
        GenServer.start_link(__MODULE__, opts, name: __MODULE__)
      end

      def init(_) do
        Process.flag(:trap_exit, true)

        send(self(), :connect)
        {:ok, %HareMq.Configuration{}}
      end

      def declare_queues(channel, config) do
        :ok = HareMq.Exchange.declare(channel: config.channel, name: config.exchange)
        {:ok, _} = HareMq.Queue.declare_queue(config)
        {:ok, _} = HareMq.Queue.declare_delay_queue(config)
        {:ok, _} = HareMq.Queue.declare_dead_queue(config)
        :ok = HareMq.Queue.bind(config)
      end

      def handle_call(:get_channel, _, state) do
        {:reply, state, state}
      end

      def handle_call(:get_channel, _, state) do
        {:reply, state, state}
      end

      def handle_info(:connect, state) do
        case HareMq.Connection.get_connection() do
          {:ok, conn} ->
            # Get notifications when the connection goes down
            Process.monitor(conn.pid)

            case Channel.open(conn) do
              {:ok, chan} ->
                Process.monitor(chan.pid)

                config =
                  HareMq.Configuration.get_queue_configuration(
                    channel: chan,
                    name: @config[:queue_name],
                    exchange: @config[:exchange],
                    routing_key: @config[:routing_key]
                  )

                Basic.qos(chan, prefetch_count: @config[:prefetch_count])

                declare_queues(chan, config)

                {:ok, _consumer_tag} = Basic.consume(chan, @config[:queue_name])

                {:noreply, config}

              _ ->
                Logger.error("Faile to open channel!")
                Process.send_after(self(), :send, @reconnect_interval)
                {:noreply, state}
            end

          {:error, _} ->
            Logger.error("Failed to connect. Reconnecting later...")
            # Retry later
            Process.send_after(self(), :connect, @reconnect_interval)
            {:noreply, state}
        end
      end
    end
  end

  defmacro __before_compile__(_env) do
    quote location: :keep, generated: true do
      @doc """
      Callback for processing incoming messages.

      Implement this callback in your consumer module to define how to handle incoming messages.

      ## Examples

          defmodule MyConsumer do
            use HareMq.Consumer, queue_name: "my_queue", routing_key: "my_routing_key"

            def consume(message) do
              IO.puts("Received message: \#{inspect(message)}")
              :ok
            end
          end
      """
      def consume(message) do
        raise "Implement me"
      end

      defoverridable(consume: 1)

      def get_channel do
        case GenServer.call(__MODULE__, :get_channel) do
          nil -> {:error, :not_connected}
          state -> {:ok, state}
        end
      end

      def handle_info({:basic_consume_ok, %{consumer_tag: _consumer_tag}}, state) do
        {:noreply, state}
      end

      def handle_info({:basic_cancel, %{consumer_tag: _consumer_tag}}, state) do
        Logger.warn(
          "Sent by the broker when the consumer is unexpectedly cancelled (such as after a queue deletion)"
        )

        {:stop, :normal, state}
      end

      def handle_info({:basic_cancel_ok, %{consumer_tag: _consumer_tag}}, state) do
        {:noreply, state}
      end

      def handle_info(
            {:basic_deliver, payload, %{delivery_tag: tag, redelivered: redelivered} = metadata},
            state
          ) do
        try do
          message =
            case Jason.decode(payload) do
              {:ok, encoded} -> encoded
              {:error, _} -> payload
            end

          consume(message)
          |> process_result(payload, state, tag, metadata)
        rescue
          reason ->
            Logger.error(inspect(reason))
            retry(payload, state, tag, metadata)
        end

        {:noreply, state}
      end

      defp process_result(result, payload, state, tag, metadata) do
        case result do
          :ok -> Basic.ack(state.channel, tag)
          {:ok, _} -> Basic.ack(state.channel, tag)
          :error -> retry(payload, state, tag, metadata)
          {:error, _} -> retry(payload, state, tag, metadata)
        end
      end

      defp retry(payload, state, tag, metadata) do
        Basic.nack(state.channel, tag, multiple: false, requeue: false)

        Task.start(fn -> HareMq.RetryPublisher.republish(payload, state, metadata) end)
      end

      def republish_dead_messages(count \\ 1) when is_number(count) do
        case get_channel() do
          {:error, :not_connected} ->
            {:error, :not_connected}

          {:ok, configures} ->
            HareMq.RetryPublisher.republish_dead_messages(configures, count)
        end
      end

      def terminate(_reason, state) do
        Logger.error("worker #{__MODULE__} was terminated with state #{inspect(state)}")
        close_chan(state)
      end

      def handle_info({:DOWN, _, :process, _pid, reason}, state) do
        Logger.error("worker #{__MODULE__} was DOWN")
        close_chan(state)
        {:stop, {:connection_lost, reason}, nil}
      end

      def handle_info({:EXIT, _pid, reason}, state) do
        close_chan(state)
        {:noreply, state}
      end

      defp close_chan(state) do
        try do
          AMQP.Channel.close(state.channel)
        rescue
          _ -> :ok
        end
      end
    end
  end
end