lib/retry_publisher.ex

defmodule HareMq.RetryPublisher do
  require Logger
  alias HareMq.Configuration

  @moduledoc """
  Module providing functions for republishing messages with retry handling.

  This module includes functions for determining retry count, republishing to delay queues, and handling retries.
  """

  @doc """
  Determine the retry count from message headers.

  ## Parameters

  - `headers`: Message headers.

  ## Examples

      retry_count = HareMq.RetryPublisher.retry_count(headers)
  """
  def retry_count(:undefined), do: 0
  def retry_count([]), do: 0
  def retry_count([{"retry_count", :long, count} | _tail]), do: count
  def retry_count([_ | tail]), do: retry_count(tail)

  @doc """
  Republish a message to either a delay or dead letter queue based on the retry count.

  ## Parameters

  - `payload`: The message payload.
  - `configuration`: A `%Configuration{}` struct containing queue configuration.
  - `metadata`: Message metadata.

  ## Examples

      HareMq.RetryPublisher.republish(payload, configuration, metadata)
  """
  def republish(payload, %Configuration{} = configuration, %{headers: headers}) do
    retry_count = retry_count(headers)

    retry_options = [
      persistent: true,
      headers: [retry_count: retry_count + 1]
    ]

    if(retry_count < configuration.retry_limit) do
      Logger.debug("Sending message to a delay queue")

      AMQP.Basic.publish(
        configuration.channel,
        "",
        configuration.delay_queue_name,
        payload,
        retry_options
      )
    else
      Logger.debug("Sending message to a dead messages queue")

      AMQP.Basic.publish(
        configuration.channel,
        "",
        configuration.dead_queue_name,
        payload,
        retry_options
      )
    end
  end

  @moduledoc """
  Module providing functions for republishing dead messages with retry handling.

  This module includes functions for republishing messages from a dead letter queue to the original exchange.
  """

  @doc """
  Republish a specified number of dead messages from the dead letter queue.

  ## Parameters

  - `configuration`: A `%Configuration{}` struct containing queue configuration.
  - `count`: The number of dead messages to republish.

  ## Examples

      HareMq.RetryPublisher.republish_dead_messages(configuration, 5)
  """
  def republish_dead_messages(%Configuration{} = configuration, count) do
    0..(count - 1)
    |> Enum.each(fn _ ->
      case AMQP.Basic.get(configuration.channel, configuration.dead_queue_name) do
        {:empty, _} ->
          Logger.debug("Queue is empty")

        {:ok, message, %{delivery_tag: tag}} ->
          :ok =
            AMQP.Basic.publish(
              configuration.channel,
              configuration.exchange,
              configuration.routing_key,
              message,
              persistent: true
            )

          :ok = AMQP.Basic.ack(configuration.channel, tag)
      end
    end)
  end
end