lib/publisher.ex

defmodule HareMq.Publisher do
  defmodule Behaviour do
    @callback publish_message(map() | binary()) :: :ok | {:error, reason :: :blocked | :closing}
  end

  @moduledoc """
  GenServer module implementing a RabbitMQ message publisher.

  This module provides a behavior for publishing messages to RabbitMQ, including connecting to RabbitMQ, declaring exchanges, and sending messages.
  """

  defmacro __using__(options) do
    quote location: :keep, generated: true do
      require Logger
      @opts unquote(options)
      @reconnect_interval Application.compile_env(:hare_mq, :configuration)[
                            :reconnect_interval_in_ms
                          ] || 10_000
      @behaviour HareMq.Publisher.Behaviour
      @before_compile unquote(__MODULE__)

      if(is_nil(@opts[:routing_key])) do
        raise "routing_key can not be empty"
      end

      @config [
        routing_key: @opts[:routing_key],
        exchange: @opts[:exchange]
      ]

      def child_spec(opts) do
        default = %{
          id: __MODULE__,
          start: {__MODULE__, :start_link, [opts]},
          restart: :permanent,
          type: :worker
        }

        Supervisor.child_spec(default, [])
      end

      def start_link(opts \\ []) do
        GenServer.start_link(__MODULE__, opts, name: __MODULE__)
      end

      def init(_) do
        Process.flag(:trap_exit, true)
        send(self(), :connect)
        {:ok, nil}
      end

      def get_channel do
        case GenServer.call(__MODULE__, :get_channel) do
          nil -> {:error, :not_connected}
          %AMQP.Channel{} = conn -> {:ok, conn}
        end
      end

      def handle_call(:get_channel, _, %AMQP.Channel{} = conn) do
        {:reply, conn, conn}
      end

      def handle_call(:get_channel, _, state) do
        {:reply, state, state}
      end

      def handle_info(:connect, state) do
        case HareMq.Connection.get_connection() do
          {:ok, conn} ->
            # Get notifications when the connection goes down
            Process.monitor(conn.pid)

            case AMQP.Channel.open(conn) do
              {:ok, chan} ->
                Process.monitor(chan.pid)
                {:noreply, chan}

              _ ->
                Logger.error("Faile to open channel!")
                Process.send_after(self(), :send, @reconnect_interval)
                {:noreply, state}
            end

          {:error, _} ->
            Logger.error("Failed to connect. Reconnecting later...")
            # Retry later
            Process.send_after(self(), :connect, @reconnect_interval)
            {:noreply, state}
        end
      end
    end
  end

  defmacro __before_compile__(_env) do
    quote location: :keep, generated: true do
      def publish_message(message) when is_map(message) do
        case get_channel() do
          {:error, :not_connected} ->
            {:error, :not_connected}

          {:ok, channel} ->
            encoded_message = Jason.encode!(message)

            AMQP.Basic.publish(
              channel,
              @config[:exchange],
              @config[:routing_key],
              encoded_message,
              persistent: true
            )
        end
      end

      @doc """
      Func for publishing messages.

      ## Examples

          defmodule MyPublisher do
            use HareMq.Publisher, exchange: "my_exchange", routing_key: "my_routing_key"

            def publish() do
              message = %{key: "value"}
              HareMq.Publisher.publish_message(message)
            end
          end
      """
      def publish_message(message) when is_binary(message) do
        case get_channel() do
          {:error, :not_connected} ->
            {:error, :not_connected}

          {:ok, channel} ->
            AMQP.Basic.publish(
              channel,
              @config[:exchange],
              @config[:routing_key],
              message,
              persistent: true
            )
        end
      end
    end
  end
end