README.md

# Coney

[![Hex.pm Version](https://img.shields.io/hexpm/v/coney)](https://hex.pm/packages/coney)
![Build Status](https://github.com/coingaming/coney/actions/workflows/ci.yaml/badge.svg)

Consumer server for RabbitMQ with message publishing functionality.

## Table of Contents

- [Coney](#Coney)
  - [Table of Contents](#Table-of-Contents)
  - [Installation](#Installation)
  - [Setup a consumer server](#Setup-a-consumer-server)
  - [Configure consumers](#Configure-consumers)
    - [Rescuing exceptions](#Rescuing-exceptions)
      - [error_happened/3](#errorhappened3)
      - [error_happened/4](#errorhappened4)
    - [.process/2 and .error_happened return format](#process2-and-errorhappened-return-format)
    - [Reply description](#Reply-description)
    - [The default exchange](#The-default-exchange)
  - [Publish message](#Publish-message)
  - [Publish message asynchronously](#Publish-message-asynchronously)
  - [Checking connections](#Checking-connections)
  - [Contributing](#Contributing)
  - [License](#License)

## Installation

Add Coney as a dependency in your `mix.exs` file.

```elixir
def deps do
  [{:coney, "~> 3.0"}]
end
```

After you are done, run `mix deps.get` in your shell to fetch and compile Coney.

## Setup a consumer server

Default config:

```elixir
# config/config.exs
config :coney,
  auto_start: true,
  settings: %{
    url: "amqp://guest:guest@localhost", # or ["amqp://guest:guest@localhost", "amqp://guest:guest@other_host"]
    timeout: 1000
  }
```

If you need to create exchanges or queues before starting the consumer, you can define your RabbitMQ topology as follows:
```elixir
  config :coney,
    topology: %{
      exchanges: [{:topic, "my_exchange", durable: true}],
      queues: %{
        "my_queue" => %{
          options: [
            durable: true,
            arguments: [
              {"x-dead-letter-exchange", :longstr, "dlx_exchange"},
              {"x-message-ttl", :signedint, 60000}
            ]
          ],
          bindings: [
            [exchange: "my_exchange", options: [routing_key: "my_queue"]]
          ]
        }
      }
    }
```

Also, you can create a confuguration module (if you want to retreive settings from Consul or something else):

```elixir
# config/config.exs
config :coney,
  auto_start: true,
  settings: RabbitConfig,
  topology: RabbitConfig
```

```elixir
defmodule RabbitConfig do
  def settings do
    %{
      url: "amqp://guest:guest@localhost",
      timeout: 1000
    }
  end

  def topology do
    %{
      exchanges: [{:topic, "my_exchange", durable: true}],
      queues: %{
        "my_queue" => %{
          options: [
            durable: true,
            arguments: [
              {"x-dead-letter-exchange", :longstr, "exchange"},
              {"x-message-ttl", :signedint, 60000}
            ]
          ],
          bindings: [
            [exchange: "my_exchange", options: [routing_key: "my_queue"]]
          ]
        }
      }
    }
  end
end
```

If you don't want to automatically start Coney and want to control it's start, you can set `auto_start` to `false` and add Coney supervisor into yours:


```elixir
# config/config.exs
config :coney, auto_start: false
```

```elixir

defmodule YourApplication do
  use Application

  def start(_type, _args) do
    Supervisor.start_link([Coney.ApplicationSupervisor], [strategy: :one_for_one])
  end
end
```

If you want to disable Coney altogether (useful for testing config) set `enabled: false`
```elixir
# config/config.exs
config :coney, enabled: false, settings: %{}, topology: %{}
```

## Configure consumers

```elixir
# config/queues.exs

config :coney,
  workers: [
    MyApplication.MyConsumer
  ]
# also you can define mapping like this and skip it in consumer module:
  workers: [
    %{
      connection: %{
        prefetch_count: 10,
        queue: "my_queue"
      },
      worker: MyApplication.MyConsumer
    }
  ]
```

```elixir
# web/consumers/my_consumer.ex

defmodule MyApplication.MyConsumer do
  @behaviour Coney.Consumer

  def connection do
    %{
      prefetch_count: 10,
      queue: "my_queue",
      consumer_tag: "MyApp - MyConsumer" # optional
    }
  end

  def parse(payload, _meta) do
    String.to_integer(payload)
  end

  def process(number, _meta) do
    if number <= 10 do
      :ok
    else
      :reject
    end
  end

  # Be careful here, if call of `error_happened` will raise an exception,
  # message will be not handled properly and may be left unacked in a queue
  def error_happened(exception, payload, _meta) do
    IO.puts "Exception raised with #{ payload }"
    :redeliver
  end
end
```

### Rescuing exceptions

If exception was happened during calls of `parse` or `process` functions, by default Coney will reject this message. If you want to add additional functionality in order to handle exception in a special manner, you can implement one of `error_happened/3` or `error_happened/4` callbacks. But be careful, if call of `error_happened` will raise an exception, message will be not handled properly and may be left unacked in a queue.

#### error_happened/3

This callback receives `exception`, original `payload` and `meta` as parameters. Response format is the same as in [process callback](#process2-and-error_happened-return-format).

#### error_happened/4

This callback receives `exception`, `stacktrace`, original `payload` and `meta` as parameters. Response format is the same as in [process callback](#process2-and-error_happened-return-format).

### .process/2 and .error_happened return format

1. `:ok` - ack message.
1. `:reject` - reject message.
1. `:redeliver` - return message to the queue.
1. `{:reply, binary}` - response will be published to reply exchange.

### Reply description

To use `{:reply, binary}` you should add response exchange in `connection`:

```elixir
# web/consumers/my_consumer.ex

def connection do
  %{
    # ...
    respond_to: "response_exchange"
  }
end
```

Response will be published to `"response_exchange"` exchange.

### The default exchange

To use the default exchange you may declare exchange as `:default`:

```elixir
%{
    exchanges: [:default],
}
```
The following format is also acceptable:

```elixir
%{
  exchanges: [{:direct, ""}]
}
```

Or you can just skip it in the `exchanges` settings and setup the queue in the consumer's settings:

```elixir

%{
  prefetch_count: 10,
  queue: "my_queue"
}

```

## Publish message

```elixir
Coney.publish("exchange", "message")

# or

Coney.publish("exchange", "routing_key", "message")
```

## Publish message asynchronously

```elixir
Coney.publish_async("exchange", "message")

# or

Coney.publish_async("exchange", "routing_key", "message")
```

## Checking connections

You can use`Coney.status/0` if you need to get information about RabbitMQ connections:

```
iex> Coney.status()
[{#PID<0.972.0>, :connected}]
```

Result is a list of tuples, where first element in tuple is a pid of running connection server and second element describes connection status.

Connection status can be:

- `:pending` - when coney just started
- `:connected` - when RabbitMQ connection has been established and all consumers have been started
- `:disconnected` - when coney lost connection to RabbitMQ

## Contributing

Bug reports and pull requests are welcome on GitHub at https://github.com/coingaming/coney.

### Running test suite locally
1. Start the RabbitMQ instance via `docker compose up`.
2. Run `mix test`.

## Architecture
```mermaid
  graph TD;
      A[ApplicationSupervisor - Supervisor] --> B[ConsumerSupervisor - Supervisor];
      A --> C[ConnectionServer - GenServer];
      B -- supervises many --> D[ConsumerServer - GenServer];
      D -- monitors --> E[ConsumerExecutor];
      E -- sends messages to --> C;
      D -- opens AMQP conns via --> C;
```

## License

The library is available as open source under the terms of the [MIT License](http://opensource.org/licenses/MIT).