lib/amqp/gen/connection_manager.ex

defmodule Amqpx.Gen.ConnectionManager do
  @moduledoc """
  Handle connection for all consumers and producer
  """
  require Logger
  use GenServer
  alias Amqpx.Connection

  defstruct [
    :connection_params,
    backoff: 5_000,
    connection: nil
  ]

  @type state() :: %__MODULE__{}

  def start_link(%{connection_params: connection_params} = opts) do
    GenServer.start_link(__MODULE__, opts, name: Keyword.get(connection_params, :name, Amqpx.Gen.ConnectionManager))
  end

  def init(opts) do
    state = struct(__MODULE__, opts)
    Process.send(self(), :setup, [])
    {:ok, state}
  end

  def handle_call(:get_connection, _from, %{connection: connection} = state) do
    {:reply, connection, state}
  end

  def handle_info(:setup, %{backoff: backoff, connection_params: connection_params} = state) do
    case connect(connection_params) do
      {:ok, connection} ->
        state = %{state | connection: connection}
        {:noreply, state}

      error ->
        Logger.error("Unable to connect to Broker! Retrying with #{backoff}ms backoff",
          error: inspect(error)
        )

        Process.send_after(self(), {:shutdown, error}, backoff, [])
        {:noreply, state}
    end
  end

  def handle_info({:shutdown, reason}, state) do
    {:stop, reason, state}
  end

  def handle_info({:DOWN, _, :process, _pid, reason}, state) do
    Logger.error("Monitored connection process crashed: #{inspect(reason)}")
    state = %{state | connection: nil}
    {:stop, :connection_exited, state}
  end

  def handle_info({:EXIT, _pid, :normal}, state), do: {:stop, :EXIT, state}

  def handle_info(message, state) do
    Logger.warn("Unknown message received #{inspect(message)}")
    {:noreply, state}
  end

  def terminate(_, %__MODULE__{connection: nil}), do: nil

  def terminate(_, %__MODULE__{connection: connection}) do
    if Process.alive?(connection.pid) do
      Process.exit(connection.pid, :kill)
    end
  end

  @spec connect(map) :: {:ok, Connection.t()} | {:error, any}
  defp connect(connection_params) do
    with {:ok, connection} <- Connection.open(connection_params) do
      Process.monitor(connection.pid)

      {:ok, connection}
    end
  catch
    _, reason ->
      {:error, reason}
  end
end