README.md

# rabbit_mq_ex

A better RabbitMQ client.

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `rabbit_mq_ex` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:rabbit_mq_ex, "~> 0.1.0"}
  ]
end
```

## Usage

### Producers

In order to publish messages onto an exchange, let's first create a simple Producer.

```elixir
defmodule Bookings.Producers.AirlineRequestProducer do
  alias MQ.Producer

  use Producer, exchange: "airline_request"

  @valid_airline_codes ~w(ba qr)a

  def place_booking(airline_code, %{date_time: _, flight_number: _} = params, opts)
      when airline_code in @valid_airline_codes and is_list(opts) do
    airline = airline(airline_code)
    payload = payload(params)
    opts = opts |> Keyword.put(:routing_key, "#{airline}.place_booking")

    publish(payload, opts)
  end

  def cancel_booking(airline_code, %{booking_id: _} = params, opts)
      when airline_code in @valid_airline_codes and is_list(opts) do
    airline = airline(airline_code)
    payload = payload(params)
    opts = opts |> Keyword.put(:routing_key, "#{airline}.cancel_booking")

    publish(payload, opts)
  end

  defp payload(%{date_time: _, flight_number: _} = params),
    do: params |> Map.take([:date_time, :flight_number]) |> Jason.encode!()

  defp payload(%{booking_id: _} = params),
    do: params |> Map.take([:booking_id]) |> Jason.encode!()

  defp airline(:ba), do: "british_airways"
  defp airline(:qr), do: "qatar_airways"
end

```

In this specific example, we will publish messages onto the `airline_request` exchange, which we are just about to configure and declare in the section below.

### Topology

To set up the exchange and the associated bindings, we will create a `Topology` module that all our services will use to interact with RabbitMQ.

```elixir
defmodule Bookings.Topology do
  alias MQ.Topology

  @exchanges ~w(airline_request)

  @behaviour Topology

  def gen do
    @exchanges |> Enum.map(&exchange/1)
  end

  defp exchange("airline_request" = exchange) do
    {exchange,
     type: :topic,
     durable: true,
     routing_keys: [
       {"*.place_booking",
        queue: "#{exchange}_queue/*.place_booking/bookings_app",
        durable: true,
        dlq: "#{exchange}_dead_letter_queue"},
       {"*.cancel_booking",
        queue: "#{exchange}_queue/*.cancel_booking/bookings_app",
        durable: true,
        dlq: "#{exchange}_dead_letter_queue"}
     ]}
  end
end

```

We will use this to ensure our RabbitMQ setup is consistent across services and all exchanges, queues and bindings are correctly configured before we start our services.

As shown in the example above, we will declare 3 queues:

1) `airline_request_queue/*.place_booking/bookings_app`; used to Consume and process messages associated with _placing_ a booking with a specific airline
2) `airline_request_queue/*.cancel_booking/bookings_app`; used to Consume and process messages associated with _cancelling_ a booking with a specific airline
3) `airline_request_dead_letter_queue`; messages that cannot be delivered or processed will end up here

Please note that the strategy for naming queues is largely dependent on your use case. In the above example, we base it on the following:

`#{exchange_name}_queue/#{routing_key}/#{consuming_app_name}`

### Application configuration

Now that we have our topology defined, let's configure the `:rabbit_mq_ex` application environment to make use of it.

```elixir
use Mix.Config

config :rabbit_mq_ex, :config,
  amqp_url: "amqp://guest:guest@localhost:5672",
  topology: Bookings.Topology

```

This configuration will be used as follows:

* `:amqp_url` by the `MQ.ConnectionManager` module to connect to the broker
* `:topology` by the `mix rabbit.init` script to set up the exchanges, queues, and bindings

### Consumers and message processing

To consume and process messages from the queues above, we will need to create message processors.

```elixir
defmodule Bookings.MessageProcessors.PlaceBookingMessageProcessor do
  require Logger

  @date_format "{WDfull}, {0D} {Mfull} {YYYY}"

  def process_message(payload, _meta) do
    with {:ok, %{"date_time" => date_time_iso, "flight_number" => flight_number}} <-
           Jason.decode(payload),
         {:ok, date_time, _} <- DateTime.from_iso8601(date_time_iso),
         {:ok, formatted_date} <- Timex.format(date_time, @date_format) do
      Logger.info("Attempting to book #{flight_number} for #{formatted_date}.")
      :ok
    end
  end
end

```

```elixir
defmodule Bookings.MessageProcessors.CancelBookingMessageProcessor do
  require Logger

  def process_message(payload, _meta) do
    with {:ok, %{"booking_id" => booking_id}} <- Jason.decode(payload) do
      Logger.info("Attempting to cancel booking #{booking_id}.")
      :ok
    end
  end
end

```

### Putting it all together

Before we put our producers and consumers to work, we need to make sure that the topology is reflected on the RabbitMQ broker we will use with our application. To do this, we will run

```bash
mix rabbit.init

```

You should see the following in the console:

```bash
14:40:34.717 [debug] Declared airline_request_queue/*.place_booking/bookings_app queue: %{args: [{"x-dead-letter-exchange", :longstr, ""}, {"x-dead-letter-routing-key", :longstr, "airline_request_dead_letter_queue"}], durable: true, exchange: "airline_request", exclusive: false, queue: "airline_request_queue/*.place_booking/bookings_app", routing_key: "*.place_booking"}

14:40:34.721 [debug] Declared airline_request_queue/*.cancel_booking/bookings_app queue: %{args: [{"x-dead-letter-exchange", :longstr, ""}, {"x-dead-letter-routing-key", :longstr, "airline_request_dead_letter_queue"}], durable: true, exchange: "airline_request", exclusive: false, queue: "airline_request_queue/*.cancel_booking/bookings_app", routing_key: "*.cancel_booking"}

```

Now, let's create our Application.

```elixir
defmodule Bookings.Application do
  alias MQ.Supervisor, as: MQSupervisor

  alias Bookings.Producers.AirlineRequestProducer

  alias Bookings.MessageProcessors.{
    PlaceBookingMessageProcessor,
    CancelBookingMessageProcessor
  }

  use Application

  def start(_type, _args) do
    opts = [
      consumers: [
        {PlaceBookingMessageProcessor,
         queue: "airline_request_queue/*.place_booking/bookings_app"},
        {CancelBookingMessageProcessor,
         queue: "airline_request_queue/*.cancel_booking/bookings_app"}
      ],
      producers: [
        AirlineRequestProducer
      ]
    ]

    children = [
      {MQSupervisor, opts}
      # ... add more children here
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

```

In `mix.exs`:

```elixir
  def application do
    [
      mod: {Bookings.Application, []}
    ]
  end
```

Now, let's verify our producers and consumers work as expected. Run `iex -S mix`, then:

To place a booking:

```elixir
iex(1)> Bookings.Producers.AirlineRequestProducer.place_booking(:qr, %{date_time: DateTime.utc_now() |> DateTime.to_iso8601(), flight_number: "QR007"}, [])
:ok
iex(2)>

[info] Attempting to book QR007 for Friday, 01 November 2019.
```

To cancel a booking:

```elixir
iex(1)> Bookings.Producers.AirlineRequestProducer.cancel_booking(:qr, %{booking_id: "baf4dfde-50b1-4d55-9c76-44eae1159325"}, [])
:ok
iex(2)>

[info] Attempting to cancel booking baf4dfde-50b1-4d55-9c76-44eae1159325.
```

## Testing

In `config/test.exs`:

```elixir
use Mix.Config

config :rabbit_mq_ex, :config,
  amqp_url: "amqp://guest:guest@localhost:5672",
  topology: Bookings.Topology

```

Then in `test/test_helper.exs`:

```elixir
:ok = MQTest.Support.TestConsumerRegistry.init()
ExUnit.start()
```

### Testing producers

To test our airline request producer, we will try to assert the following:

1) `place_booking/3` supports `qatar_airways` and `british_airways` and sets up the corresponding routing keys, e.g. `qatar_airways.place_booking` and `british_airways.place_booking` upon publish
2) `place_booking/3` adds default metadata to the request, expressly `correlation_id` and `timestamp`
3) `place_booking/3` is capable of setting any metadata (apart from `routing_key` which is configured in its implementation and cannot be overriden)

Later we will, of course, try to assert the same with `cancel_booking/3`.

```elixir
defmodule BookingsTest.Producers.AirlineRequestProducer do
  alias MQ.ConnectionManager
  alias MQTest.Support.{RabbitCase, ExclusiveQueue, TestConsumer}
  alias Bookings.Producers.AirlineRequestProducer

  use RabbitCase

  setup_all do
    assert {:ok, _pid} = start_supervised(AirlineRequestProducer.child_spec())

    # Make sure our tests receive all messages published to the `airline_request`
    # exchange, regardless of the `routing_key` configured (hence `#`).
    assert {:ok, airline_request_queue} =
             ExclusiveQueue.declare(exchange: "airline_request", routing_key: "#")

    # Start the `TestConsumer` module, which consumes messages from a given queue
    # and sends them to a process associated with a test that's being executed.
    #
    # See `TestConsumer.register_reply_to(self())` in the `setup` section below.
    assert {:ok, _pid} = start_supervised(TestConsumer.child_spec(queue: airline_request_queue))

    :ok
  end

  setup do
    # Each test process will register its pid (`self()`) so that we can receive
    # corresponding payloads and metadata published via the `Producer`(s).
    assert {:ok, reply_to} = TestConsumer.register_reply_to(self())

    # Each registration generates a unique identifier which will be used
    # in the `TestConsumer`'s message processor module to look up the pid
    # of the currently running test and send the payload and the metadata
    # to that process.
    publish_opts = [reply_to: reply_to]

    [publish_opts: publish_opts]
  end

  describe "Bookings.Producers.AirlineRequestProducer" do
    test "place_booking/3 only accepts `qatar_airways` and `british_airways` booking requests", %{
      publish_opts: publish_opts
    } do
      payload = %{
        date_time: DateTime.utc_now() |> DateTime.to_iso8601(),
        flight_number: Nanoid.generate_non_secure()
      }

      assert :ok = AirlineRequestProducer.place_booking(:ba, payload, publish_opts)
      assert :ok = AirlineRequestProducer.place_booking(:qr, payload, publish_opts)

      assert_receive({:json, %{}, %{routing_key: "british_airways.place_booking"}}, 250)
      assert_receive({:json, %{}, %{routing_key: "qatar_airways.place_booking"}}, 250)
    end

    test "place_booking/3 produces a message with default metadata", %{publish_opts: publish_opts} do
      date_time = DateTime.utc_now() |> DateTime.to_iso8601()
      flight_number = "QR007"
      payload = %{date_time: date_time, flight_number: flight_number}

      assert :ok = AirlineRequestProducer.place_booking(:qr, payload, publish_opts)

      assert_receive(
        {:json, %{"date_time" => ^date_time, "flight_number" => ^flight_number},
         %{routing_key: "qatar_airways.place_booking"} = meta},
        250
      )

      assert {:ok, _details} = UUID.info(meta.correlation_id)
      refute meta.timestamp == :undefined

      refute_receive 100
    end

    test "place_booking/3 produces a message with custom metadata, but does not override `routing_key`", %{publish_opts: publish_opts} do
      date_time = DateTime.utc_now() |> DateTime.to_iso8601()
      flight_number = "QR007"
      payload = %{date_time: date_time, flight_number: flight_number}

      correlation_id = UUID.uuid4()
      timestamp = DateTime.utc_now() |> DateTime.to_unix(:second)

      publish_opts =
        publish_opts
        |> Keyword.merge(
          app_id: "bookings_app",
          correlation_id: correlation_id,
          headers: [{"authorization", "Bearer abc.123"}],
          routing_key: "unsupported_airline.unsupported_action",
          timestamp: timestamp
        )

      assert :ok = AirlineRequestProducer.place_booking(:qr, payload, publish_opts)

      assert_receive(
        {:json, %{"date_time" => ^date_time, "flight_number" => ^flight_number},
         %{routing_key: "qatar_airways.place_booking"} = meta},
        250
      )

      assert meta.app_id == "bookings_app"
      assert meta.correlation_id == correlation_id
      assert meta.headers == [{"authorization", :longstr, "Bearer abc.123"}]
      assert meta.timestamp == timestamp
    end

    # ... implement the same for `cancel_booking/3` below
  end
end

```

### Documentation

Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at [https://hexdocs.pm/rabbit_mq_ex](https://hexdocs.pm/rabbit_mq_ex).