# Carotte 🥕
[](https://hex.pm/packages/carotte)
[](https://hexdocs.pm/carotte/)
A type-safe RabbitMQ client for Gleam that provides a clean, idiomatic interface for message queue operations on the Erlang VM.
## Features
- **Type-safe API** - Leverage Gleam's type system for safe message handling
- **High Performance** - Built on top of the battle-tested `amqp_client` Erlang library
- **Idiomatic Gleam** - Clean, functional API with everything in a single module
- **Complete Feature Set** - Support for exchanges, queues, publishing, consuming, and more
- **Supervised Consumers** - OTP-based consumer supervision with automatic restarts
- **Connection Helpers** - Built-in reconnection support and connection monitoring
- **Async Operations** - Non-blocking operations with `_async` variants
- **Flexible Message Acknowledgment** - Manual acknowledgment support for reliable message processing
- **Full Headers Support** - Send and receive message headers with type-safe accessors
- **Operation-Specific Error Types** - Granular error types for precise error handling
## Installation
```sh
gleam add carotte
```
## Quick Start
```gleam
import carotte
import gleam/erlang/process
import gleam/io
pub fn main() {
// Connect to RabbitMQ
let assert Ok(client) =
carotte.ClientConfig(
..carotte.default_client(),
host: "localhost",
port: 5672,
)
|> carotte.start()
// Open a channel
let assert Ok(ch) = carotte.open_channel(client)
// Declare an exchange
let assert Ok(_) =
carotte.Exchange(..carotte.exchange("my_exchange"), exchange_type: carotte.Direct)
|> carotte.declare_exchange(ch)
// Declare a durable queue
let assert Ok(_) =
carotte.QueueConfig(..carotte.queue("my_queue"), durable: True)
|> carotte.declare_queue(ch)
// Bind queue to exchange
let assert Ok(_) =
carotte.bind_queue(
channel: ch,
queue: "my_queue",
exchange: "my_exchange",
routing_key: "my_routing_key",
)
// Publish a message (payload is BitArray)
let assert Ok(_) =
carotte.publish(
channel: ch,
exchange: "my_exchange",
routing_key: "my_routing_key",
payload: <<"Hello, RabbitMQ!">>,
options: [],
)
// Start a consumer supervisor
let consumers = process.new_name("consumers")
let assert Ok(consumer) = carotte.start_consumer(consumers)
// Subscribe to messages (supervised) - returns consumer_tag string
let assert Ok(consumer_tag) =
carotte.subscribe(
consumer,
channel: ch,
queue: "my_queue",
callback: fn(msg, _deliver) {
// msg.payload is BitArray - convert to string if needed
let assert Ok(text) = bit_array.to_string(msg.payload)
io.println("Received: " <> text)
// Messages are auto-acknowledged by default
},
)
// Clean up
let assert Ok(_) = carotte.unsubscribe(channel: ch, consumer_tag:)
let assert Ok(_) = carotte.close(client)
}
```
## Core Concepts
### Connection Management
Create and configure a RabbitMQ connection:
```gleam
import gleam/time/duration
let assert Ok(client) =
carotte.ClientConfig(
..carotte.default_client(),
username: "admin",
password: "secret",
host: "rabbitmq.example.com",
virtual_host: "/production",
heartbeat: duration.seconds(30),
connection_timeout: duration.seconds(60),
)
|> carotte.start()
// Check connection status
assert carotte.is_connected(client) == True
// Reconnect if needed
case carotte.is_connected(client) {
True -> client
False -> {
let assert Ok(new_client) = carotte.reconnect(client)
new_client
}
}
```
### Exchanges
Carotte supports all RabbitMQ exchange types:
```gleam
// Create a durable topic exchange
carotte.Exchange(
..carotte.exchange("logs"),
exchange_type: carotte.Topic,
durable: True,
)
|> carotte.declare_exchange(channel)
// Available exchange types:
// - Direct: Route based on exact routing key match
// - Topic: Route based on routing key patterns
// - Fanout: Route to all bound queues
// - Headers: Route based on message headers
```
### Queues
Declare and configure queues using record update syntax:
```gleam
carotte.QueueConfig(
..carotte.queue("task_queue"),
durable: True, // Survive broker restart
exclusive: True, // Only one consumer allowed
auto_delete: True, // Delete when last consumer disconnects
)
|> carotte.declare_queue(channel)
```
### Publishing Messages
Publish messages with various options. The payload is a `BitArray`, which allows sending any binary data:
```gleam
import gleam/bit_array
import gleam/time/duration
// For text/JSON, convert string to BitArray
let json_payload = bit_array.from_string(json.to_string(user_data))
carotte.publish(
channel: ch,
exchange: "notifications",
routing_key: "user.signup",
payload: json_payload,
options: [
carotte.Persistent(True),
carotte.ContentType("application/json"),
carotte.MessageHeaders(
carotte.headers_from_list([
#("user_id", carotte.StringHeader("123")),
#("retry_count", carotte.IntHeader(0)),
])
),
carotte.Expiration(duration.seconds(60)), // Message expires in 60 seconds
]
)
```
### Supervised Consumers
Carotte integrates with gleam_otp for proper OTP supervision of consumers. The recommended approach is to add the consumer supervisor to your application's supervision tree using `consumer_supervised`:
```gleam
import gleam/erlang/process
import gleam/otp/static_supervisor
pub fn start_app() {
// Create a name for the consumer supervisor at program startup
let consumers_name = process.new_name("consumers")
// Create the child specification
let consumer_spec = carotte.consumer_supervised(consumers_name)
// Add to your application's supervision tree
let assert Ok(_) =
static_supervisor.new(static_supervisor.OneForOne)
|> static_supervisor.add(consumer_spec)
|> static_supervisor.start()
// Later, get the consumer reference to subscribe
let consumer = carotte.named_consumer(consumers_name)
// Subscribe to queues (consumers are supervised) - returns consumer_tag
let assert Ok(consumer_tag) =
carotte.subscribe(
consumer,
channel: ch,
queue: "work_queue",
callback: fn(payload, deliver) {
// payload.payload is BitArray - convert to string for text messages
let assert Ok(text) = bit_array.to_string(payload.payload)
io.println("Processing: " <> text)
io.println("Exchange: " <> deliver.exchange)
io.println("Routing key: " <> deliver.routing_key)
// If callback crashes, consumer will be restarted by supervisor
}
)
}
```
**Standalone mode** (for simpler use cases without a supervision tree):
```gleam
// Start supervisor directly (linked to calling process)
let consumers = process.new_name("consumers")
let assert Ok(consumer) = carotte.start_consumer(consumers)
let assert Ok(consumer_tag) = carotte.subscribe(consumer, channel: ch, queue: "my_queue", callback: handler)
```
### Manual Acknowledgment
For more control over message acknowledgment:
```gleam
let assert Ok(consumer_tag) =
carotte.subscribe_with_options(
consumer,
channel: ch,
queue: "work_queue",
callback: fn(msg, deliver) {
// Process the message
case process_message(msg) {
Ok(_) -> {
// Acknowledge on success
let assert Ok(_) = carotte.ack_single(ch, deliver.delivery_tag)
}
Error(_) -> {
// Don't ack - message will be redelivered
}
}
Nil
},
options: [carotte.AutoAck(False)],
)
// Acknowledge multiple messages at once
let assert Ok(_) = carotte.ack(ch, deliver.delivery_tag, True)
```
### Message Headers
Carotte supports reading and writing message headers. Headers can contain various types of values:
```gleam
// Available header types
carotte.BoolHeader(True)
carotte.IntHeader(42)
carotte.FloatHeader(3.14)
carotte.StringHeader("hello")
carotte.ListHeader([carotte.IntHeader(1), carotte.IntHeader(2)])
```
**Sending headers:**
```gleam
carotte.publish(
channel: ch,
exchange: "my_exchange",
routing_key: "my_key",
payload: <<"Hello!">>,
options: [
carotte.MessageHeaders(
carotte.headers_from_list([
#("user_id", carotte.StringHeader("123")),
#("priority", carotte.IntHeader(1)),
])
),
],
)
```
**Reading headers from received messages:**
```gleam
carotte.subscribe(
consumer,
channel: ch,
queue: "my_queue",
callback: fn(payload, _deliver) {
// Convert headers to a list of name-value pairs
let headers = carotte.headers_to_list(payload.headers)
// Find a specific header
let user_id = list.find(headers, fn(h) { h.0 == "user_id" })
case user_id {
Ok(#(_, carotte.StringHeader(id))) -> io.println("User: " <> id)
_ -> io.println("No user_id header found")
}
},
)
```
## Error Handling
Carotte provides operation-specific error types for precise error handling. Each operation category has its own error type, making it easy to handle errors appropriately.
### Error Types
| Error Type | Used By | Variants |
|------------|---------|----------|
| `ConnectionError` | `start`, `close`, `reconnect` | `ConnectionBlocked`, `ConnectionClosed`, `ConnectionAuthFailure`, `ConnectionRefused`, `ConnectionTimeout`, `NotConnected`, `AlreadyConnected`, `ReconnectionFailed`, `ConnectionUnknownError` |
| `ChannelError` | `open_channel` | `ChannelClosed`, `ChannelProcessNotFound`, `ChannelConnectionClosed`, `ChannelUnknownError` |
| `ExchangeError` | `declare_exchange`, `delete_exchange`, `bind_exchange`, `unbind_exchange` | `ExchangeNotFound`, `ExchangeAccessRefused`, `ExchangePreconditionFailed`, `ExchangeChannelClosed`, `ExchangeUnknownError` |
| `QueueError` | `declare_queue`, `delete_queue`, `bind_queue`, `unbind_queue`, `purge_queue`, `queue_status` | `QueueNotFound`, `QueueAccessRefused`, `QueuePreconditionFailed`, `QueueResourceLocked`, `QueueChannelClosed`, `QueueUnknownError` |
| `PublishError` | `publish` | `PublishNoRoute`, `PublishChannelClosed`, `PublishUnknownError` |
| `ConsumeError` | `subscribe`, `unsubscribe`, `ack` | `ConsumeInitTimeout`, `ConsumeInitFailed`, `ConsumeProcessNotFound`, `ConsumeChannelClosed`, `ConsumeUnknownError` |
### Handling Errors
```gleam
// Connection errors
case carotte.start(client_config) {
Ok(client) -> process_messages(client)
Error(carotte.ConnectionAuthFailure(msg)) -> {
io.println("Authentication failed: " <> msg)
}
Error(carotte.ConnectionTimeout(msg)) -> {
io.println("Connection timeout: " <> msg)
}
Error(other) -> {
io.println("Connection error: " <> carotte.describe_connection_error(other))
}
}
// Queue errors
case carotte.declare_queue(my_queue, channel) {
Ok(queue) -> use_queue(queue)
Error(carotte.QueueAccessRefused(msg)) -> {
io.println("Access refused: " <> msg)
}
Error(carotte.QueuePreconditionFailed(msg)) -> {
io.println("Queue configuration mismatch: " <> msg)
}
Error(other) -> {
io.println("Queue error: " <> carotte.describe_queue_error(other))
}
}
// Publish errors
case carotte.publish(channel:, exchange:, routing_key:, payload:, options: [carotte.Mandatory(True)]) {
Ok(_) -> io.println("Message published")
Error(carotte.PublishNoRoute(msg)) -> {
io.println("No route for message: " <> msg)
}
Error(other) -> {
io.println("Publish error: " <> carotte.describe_publish_error(other))
}
}
```
### Error Description Functions
Each error type has a corresponding `describe_*_error` function that converts the error to a human-readable string:
```gleam
carotte.describe_connection_error(err) // ConnectionError -> String
carotte.describe_channel_error(err) // ChannelError -> String
carotte.describe_exchange_error(err) // ExchangeError -> String
carotte.describe_queue_error(err) // QueueError -> String
carotte.describe_publish_error(err) // PublishError -> String
carotte.describe_consume_error(err) // ConsumeError -> String
```
## Advanced Features
### Asynchronous Operations
Most operations have async variants for non-blocking execution:
```gleam
// Async queue declaration
carotte.declare_queue_async(my_queue, channel)
// Async exchange deletion
carotte.delete_exchange_async(channel:, exchange: "old_exchange", if_unused: True)
// Async queue binding
carotte.bind_queue_async(
channel:,
queue: "my_queue",
exchange: "my_exchange",
routing_key: "key"
)
```
### Queue Management
Perform administrative operations on queues:
```gleam
// Get queue status
let assert Ok(status) = carotte.queue_status(channel:, queue: "my_queue")
io.println("Messages: " <> int.to_string(status.message_count))
io.println("Consumers: " <> int.to_string(status.consumer_count))
// Purge all messages from a queue
let assert Ok(message_count) = carotte.purge_queue(channel:, queue: "my_queue")
// Delete a queue
let assert Ok(_) = carotte.delete_queue(
channel:,
queue: "my_queue",
if_unused: True, // Only delete if no consumers
if_empty: True // Only delete if empty
)
```
### Exchange Bindings
Create complex routing topologies:
```gleam
// Bind exchange to exchange
carotte.bind_exchange(
channel:,
source: "raw_logs",
destination: "processed_logs",
routing_key: "*.error"
)
// Unbind when no longer needed
carotte.unbind_exchange(
channel:,
source: "raw_logs",
destination: "processed_logs",
routing_key: "*.error"
)
```
## Development
```bash
# Run tests (requires local RabbitMQ on localhost:5672)
gleam test
# Build documentation
gleam docs build
# Format code
gleam format
```
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
## License
This project is licensed under the MIT License - see the LICENSE file for details.
## Acknowledgments
- Built on top of the robust [amqp_client](https://github.com/rabbitmq/rabbitmq-server/tree/main/deps/amqp_client) Erlang library
- Inspired by RabbitMQ clients in other languages
- Thanks to the Gleam community for their support and feedback
## Support
- [Documentation](https://hexdocs.pm/carotte)
- [Issue Tracker](https://github.com/renatillas/carotte/issues)
- [Discussions](https://github.com/renatillas/carotte/discussions)