lib/ex_rabbitmq_admin/queue.ex

defmodule ExRabbitMQAdmin.Queue do
  @moduledoc """
  This module contains functions for interacting with RabbitMQ queues.
  """
  import ExRabbitMQAdmin.Options,
    only: [
      pagination_definition: 0,
      put_queue_definition: 0,
      delete_queue_definition: 0,
      receive_messages_definition: 0,
      format_error: 1
    ]

  @api_namespace "/api/queues"

  @doc """
  List all queues running on the RabbitMQ cluster.

  ### Params

    * `client` - Tesla client used to perform the request.
    #{NimbleOptions.docs(pagination_definition())}
  """
  @spec list_queues(client :: Tesla.Client.t(), opts :: Keyword.t()) :: {:ok, Tesla.Env.t()}
  def list_queues(client, opts \\ []) do
    case NimbleOptions.validate(opts, pagination_definition()) do
      {:ok, params} ->
        client
        |> ExRabbitMQAdmin.add_query_middleware(params)
        |> Tesla.get(@api_namespace)

      {:error, error} ->
        raise ArgumentError, format_error(error)
    end
  end

  @doc """
  List all queues in a given virtual host running on the RabbitMQ cluster.

  ### Params

    * `client` - Tesla client used to perform the request.
    * `vhost` - Name of the virtual host.
    #{NimbleOptions.docs(pagination_definition())}
  """
  @spec list_vhost_queues(client :: Tesla.Client.t(), vhost :: String.t(), opts :: Keyword.t()) ::
          {:ok, Tesla.Env.t()}
  def list_vhost_queues(client, vhost, opts \\ []) do
    case NimbleOptions.validate(opts, pagination_definition()) do
      {:ok, params} ->
        client
        |> ExRabbitMQAdmin.add_query_middleware(params)
        |> Tesla.get("#{@api_namespace}/#{vhost}")

      {:error, error} ->
        raise ArgumentError, format_error(error)
    end
  end

  @doc """
  List all bindings for a queue under a virtual host.

  ### Params

    * `client` - Tesla client used to perform the request.
    * `vhost` - type: `string`, Virtual host for the queue.
    * `queue` - type: `string`, Name of the queue to get bindings for.
  """
  @spec list_queue_bindings(
          client :: Tesla.Client.t(),
          vhost :: String.t(),
          queue :: String.t()
        ) :: {:ok, Tesla.Env.t()}
  def list_queue_bindings(client, vhost, queue),
    do: Tesla.get(client, "#{@api_namespace}/#{vhost}/#{queue}/bindings")

  @doc """
  Get an individual queue under a virtual host by name.

  ### Params

    * `client` - Tesla client used to perform the request.
    * `vhost` - type: `string`, Virtual host for the queue.
    * `queue` - type: `string`, Name of the queue to get.
  """
  @spec get_queue(client :: Tesla.Client.t(), vhost :: String.t(), queue :: String.t()) ::
          {:ok, Tesla.Env.t()}
  def get_queue(client, vhost, queue),
    do: Tesla.get(client, "#{@api_namespace}/#{vhost}/#{queue}")

  @doc """
  Create a new queue under a virtual host.

  ### Params

    * `client` - Tesla client used to perform the request.
    * `vhost` - type: `string`, Virtual host for the queue.
    * `queue` - type: `string`, Name of the queue to get.
    #{NimbleOptions.docs(put_queue_definition())}
  """
  @spec put_queue(
          client :: Tesla.Client.t(),
          vhost :: String.t(),
          queue :: String.t(),
          opts :: Keyword.t()
        ) :: {:ok, Tesla.Env.t()}
  def put_queue(client, vhost, queue, opts \\ []) do
    case NimbleOptions.validate(opts, put_queue_definition()) do
      {:ok, params} ->
        client
        |> Tesla.put("#{@api_namespace}/#{vhost}/#{queue}", Enum.into(params, %{}))

      {:error, error} ->
        raise ArgumentError, format_error(error)
    end
  end

  @doc """
  Delete an existing queue under a virtual host.

  ### Params

    * `client` - Tesla client used to perform the request.
    * `vhost` - type: `string`, Virtual host for the queue.
    * `queue` - type: `string`, Name of the queue to delete.
    #{NimbleOptions.docs(delete_queue_definition())}
  """
  @spec delete_queue(
          client :: Tesla.Client.t(),
          vhost :: String.t(),
          queue :: String.t(),
          params :: Keyword.t()
        ) :: {:ok, Tesla.Env.t()}
  def delete_queue(client, vhost, queue, opts \\ []) do
    case NimbleOptions.validate(opts, delete_queue_definition()) do
      {:ok, params} ->
        params =
          Enum.reduce(params, [], fn
            {:if_empty, true}, acc -> Keyword.put(acc, :"if-empty", true)
            {:if_unused, true}, acc -> Keyword.put(acc, :"if-unused", true)
            _, acc -> acc
          end)

        client
        |> ExRabbitMQAdmin.add_query_middleware(params)
        |> Tesla.delete("#{@api_namespace}/#{vhost}/#{queue}")

      {:error, error} ->
        raise ArgumentError, format_error(error)
    end
  end

  @doc """
  Purge all messages on a queue under a virtual host.

  ### Params

    * `client` - Tesla client used to perform the request.
    * `vhost` - type: `string`, Virtual host for the queue.
    * `queue` - type: `string`, Name of the queue to purge messages from.
  """
  @spec purge_queue(
          client :: Tesla.Client.t(),
          vhost :: String.t(),
          queue :: String.t()
        ) :: {:ok, Tesla.Env.t()}
  def purge_queue(client, vhost, queue),
    do: Tesla.delete(client, "#{@api_namespace}/#{vhost}/#{queue}/contents")

  @doc """
  Perform an action on a queue.

  ### Params

    * `client` - Tesla client used to perform the request.
    * `vhost` - type: `string`, Virtual host for the queue.
    * `queue` - type: `string`, Name of the queue to perform actions for.
    * `action` - type: `atom`, Action to perform. Currently only supports `:sync` and `:cancel_sync`
  """
  @spec perform_queue_action(
          client :: Tesla.Client.t(),
          vhost :: String.t(),
          queue :: String.t(),
          action :: :sync | :cancel_sync
        ) :: {:ok, Tesla.Env.t()}
  def perform_queue_action(client, vhost, queue, :sync) do
    Tesla.post(client, "#{@api_namespace}/#{vhost}/#{queue}/actions", %{"action" => "sync"})
  end

  def perform_queue_action(client, vhost, queue, :cancel_sync) do
    Tesla.post(client, "#{@api_namespace}/#{vhost}/#{queue}/actions", %{
      "action" => "cancel_sync"
    })
  end

  @doc """
  Receive messages from a queue under a virtual host.
  Please not that this is *not* an optimal way to consume messages
  from a queue. Consider using a library like [AMQP](https://hexdocs.pm/amqp/readme.html).

  ### Params

    * `client` - Tesla client used to perform the request.
    * `vhost` - type: `string`, Virtual host for the queue.
    * `queue` - type: `string`, Name of the queue to receive messages from.
    #{NimbleOptions.docs(receive_messages_definition())}
  """
  @spec receive_queue_messages(
          client :: Tesla.Client.t(),
          vhost :: String.t(),
          queue :: String.t(),
          opts :: Keyword.t()
        ) :: {:ok, Tesla.Env.t()}
  def receive_queue_messages(client, vhost, queue, opts \\ []) do
    case NimbleOptions.validate(opts, receive_messages_definition()) do
      {:ok, params} ->
        client
        |> Tesla.post("#{@api_namespace}/#{vhost}/#{queue}/get", Enum.into(params, %{}))

      {:error, error} ->
        raise ArgumentError, format_error(error)
    end
  end
end