# Carotte 🥕
[](https://hex.pm/packages/carotte)
[](https://hexdocs.pm/carotte/)
A type-safe RabbitMQ client for Gleam that provides a clean, idiomatic interface for message queue operations on the Erlang VM.
## Features
- 🔒 **Type-safe API** - Leverage Gleam's type system for safe message handling
- 🚀 **High Performance** - Built on top of the battle-tested `amqp_client` Erlang library
- 🎯 **Idiomatic Gleam** - Clean, functional API that feels natural in Gleam
- 📦 **Complete Feature Set** - Support for exchanges, queues, publishing, consuming, and more
- ⚡ **Async Operations** - Non-blocking operations with `_async` variants
- 🔄 **Flexible Message Acknowledgment** - Manual acknowledgment support for reliable message processing
## Installation
```sh
gleam add carotte
```
## Quick Start
```gleam
import carotte
import carotte/channel
import carotte/exchange
import carotte/queue
import carotte/publisher
import gleam/io
pub fn main() {
// Connect to RabbitMQ
let assert Ok(client) =
carotte.default_client()
|> carotte.with_host("localhost")
|> carotte.with_port(5672)
|> carotte.start()
// Open a channel
let assert Ok(ch) = channel.open_channel(client)
// Declare an exchange
let assert Ok(_) =
exchange.new("my_exchange")
|> exchange.with_type(exchange.Direct)
|> exchange.declare(ch)
// Declare a queue
let assert Ok(_) =
queue.new("my_queue")
|> queue.as_durable()
|> queue.declare(ch)
// Bind queue to exchange
let assert Ok(_) =
queue.bind(
channel: ch,
queue: "my_queue",
exchange: "my_exchange",
routing_key: "my_routing_key",
)
// Publish a message
let assert Ok(_) =
publisher.publish(
channel: ch,
exchange: "my_exchange",
routing_key: "my_routing_key",
payload: "Hello, RabbitMQ!",
options: [],
)
// Subscribe to messages
let assert Ok(consumer_tag) =
queue.subscribe(
channel: ch,
queue: "my_queue",
callback: fn(msg, deliver) {
io.println("Received: " <> msg.payload)
// Messages are auto-acknowledged by default
},
)
// Clean up
let assert Ok(_) = carotte.close(client)
}
```
## Core Concepts
### Connection Management
Create and configure a RabbitMQ connection:
```gleam
let client =
carotte.default_client()
|> carotte.with_username("admin")
|> carotte.with_password("secret")
|> carotte.with_host("rabbitmq.example.com")
|> carotte.with_virtual_host("/production")
|> carotte.with_heartbeat(30)
|> carotte.start()
```
### Exchanges
Carotte supports all RabbitMQ exchange types:
```gleam
// Create a topic exchange
exchange.new("logs")
|> exchange.with_type(exchange.Topic)
|> exchange.as_durable()
|> exchange.declare(channel)
// Available exchange types:
// - Direct: Route based on exact routing key match
// - Topic: Route based on routing key patterns
// - Fanout: Route to all bound queues
// - Headers: Route based on message headers
```
### Queues
Declare and configure queues:
```gleam
queue.new("task_queue")
|> queue.as_durable() // Survive broker restart
|> queue.as_exclusive() // Only one consumer allowed
|> queue.with_auto_delete() // Delete when last consumer disconnects
|> queue.declare(channel)
```
### Publishing Messages
Publish messages with various options:
```gleam
publisher.publish(
channel: ch,
exchange: "notifications",
routing_key: "user.signup",
payload: json.to_string(user_data),
options: [
publisher.Persistent(True),
publisher.ContentType("application/json"),
publisher.Headers(
publisher.headers_from_list([
#("user_id", publisher.StringHeader("123")),
#("retry_count", publisher.IntHeader(0)),
])
),
publisher.Expiration("60000"), // Message expires in 60 seconds
]
)
```
### Consuming Messages
Subscribe to queues and handle messages:
```gleam
queue.subscribe(
channel: ch,
queue: "work_queue",
callback: fn(payload, deliver) {
// Process the message
io.println("Processing: " <> payload.payload)
// Access delivery metadata
io.println("Exchange: " <> deliver.exchange)
io.println("Routing key: " <> deliver.routing_key)
// Message is automatically acknowledged on success
}
)
```
### Manual Acknowledgment
For more control over message acknowledgment:
```gleam
// Acknowledge a single message
let assert Ok(_) = queue.ack_single(ch, deliver.delivery_tag)
// Acknowledge multiple messages
let assert Ok(_) = queue.ack(ch, deliver.delivery_tag, True)
```
## Error Handling
Carotte provides detailed error types for robust error handling:
### Error Types
```gleam
pub type CarotteError {
// Connection errors
Blocked // Connection blocked by broker
Closed // Connection closed
ConnectionRefused(String) // Connection refused by server
ConnectionTimeout(String) // Connection or operation timed out
// Authentication/Authorization
AuthFailure(String) // Authentication failed
AccessRefused(String) // Access denied to resource
NotAllowed(String) // Operation not allowed
// Resource errors
ProcessNotFound // Process/connection not found
AlreadyRegistered(String) // Process name already registered
NotFound(String) // Resource not found (exchange, queue, etc.)
ResourceLocked(String) // Resource is locked (exclusive queue, etc.)
// Protocol errors
ChannelClosed(String) // Channel was closed
FrameError(String) // AMQP frame error
UnexpectedFrame(String) // Unexpected frame received
CommandInvalid(String) // Invalid AMQP command
// Operational errors
PreconditionFailed(String) // Precondition not met (e.g., queue in use)
NoRoute(String) // No route found for message
InvalidPath(String) // Invalid resource path
NotImplemented(String) // Feature not implemented
InternalError(String) // Internal server error
// Fallback
UnknownError(String) // Unknown/unmapped error
}
```
Handle errors appropriately:
```gleam
case carotte.start(client_config) {
Ok(client) -> {
// Connection successful
process_messages(client)
}
Error(AuthFailure(msg)) -> {
io.println("Authentication failed: " <> msg)
// Handle auth error
}
Error(ConnectionTimeout(msg)) -> {
io.println("Connection timeout: " <> msg)
// Retry connection
}
Error(other) -> {
io.println("Connection error: " <> string.inspect(other))
// Handle other errors
}
}
```
## Advanced Features
### Asynchronous Operations
Most operations have async variants for non-blocking execution:
```gleam
// Async queue declaration
queue.declare_async(my_queue, channel)
// Async exchange deletion
exchange.delete_async(channel: ch, exchange: "old_exchange", unused: True)
// Async queue binding
queue.bind_async(
channel: ch,
queue: "my_queue",
exchange: "my_exchange",
routing_key: "key"
)
```
### Queue Management
Perform administrative operations on queues:
```gleam
// Get queue status
let assert Ok(status) = queue.status(channel: ch, queue: "my_queue")
io.println("Messages: " <> int.to_string(status.message_count))
io.println("Consumers: " <> int.to_string(status.consumer_count))
// Purge all messages from a queue
let assert Ok(message_count) = queue.purge(channel: ch, queue: "my_queue")
// Delete a queue
let assert Ok(_) = queue.delete(
channel: ch,
queue: "my_queue",
if_unused: True, // Only delete if no consumers
if_empty: True // Only delete if empty
)
```
### Exchange Bindings
Create complex routing topologies:
```gleam
// Bind exchange to exchange
exchange.bind(
channel: ch,
source: "raw_logs",
destination: "processed_logs",
routing_key: "*.error"
)
// Unbind when no longer needed
exchange.unbind(
channel: ch,
source: "raw_logs",
destination: "processed_logs",
routing_key: "*.error"
)
```
## Examples
### Work Queue Pattern
Distribute time-consuming tasks among multiple workers:
```gleam
// Producer
pub fn send_task(channel, task_data) {
publisher.publish(
channel: channel,
exchange: "",
routing_key: "task_queue",
payload: task_data,
options: [publisher.Persistent(True)]
)
}
// Worker
pub fn start_worker(channel) {
let assert Ok(queue) =
queue.new("task_queue")
|> queue.as_durable()
|> queue.declare(channel)
queue.subscribe(
channel: channel,
queue: "task_queue",
callback: fn(payload, _meta) {
// Simulate work
process.sleep(1000)
io.println("Task completed: " <> payload.payload)
}
)
}
```
### Publish/Subscribe Pattern
Send messages to multiple consumers:
```gleam
// Publisher
pub fn broadcast_event(channel, event) {
publisher.publish(
channel: channel,
exchange: "events",
routing_key: "", // Fanout ignores routing key
payload: event,
options: []
)
}
// Subscriber
pub fn subscribe_to_events(channel, handler) {
// Create fanout exchange
let assert Ok(_) =
exchange.new("events")
|> exchange.with_type(exchange.Fanout)
|> exchange.declare(channel)
// Create exclusive queue for this subscriber
let assert Ok(q) =
queue.new("") // Server-generated name
|> queue.as_exclusive()
|> queue.declare(channel)
// Bind to exchange
let assert Ok(_) =
queue.bind(
channel: channel,
queue: q.name,
exchange: "events",
routing_key: ""
)
// Subscribe
queue.subscribe(
channel: channel,
queue: q.name,
callback: handler
)
}
```
### Topic-Based Routing
Route messages based on patterns:
```gleam
// Setup topic exchange
let assert Ok(_) =
exchange.new("logs")
|> exchange.with_type(exchange.Topic)
|> exchange.declare(channel)
// Subscribe to error logs from any service
queue.bind(
channel: channel,
queue: "error_logs",
exchange: "logs",
routing_key: "*.error"
)
// Subscribe to all logs from auth service
queue.bind(
channel: channel,
queue: "auth_logs",
exchange: "logs",
routing_key: "auth.*"
)
// Publish logs
publisher.publish(
channel: channel,
exchange: "logs",
routing_key: "auth.error", // Will go to both queues
payload: "Authentication failed",
options: []
)
```
## Requirements
- Gleam 1.0 or later
- Erlang/OTP 26 or later
- RabbitMQ 3.x or later
## Development
```bash
# Run tests
gleam test
# Build documentation
gleam docs build
# Format code
gleam format
```
## Contributing
Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
## License
This project is licensed under the MIT License - see the LICENSE file for details.
## Acknowledgments
- Built on top of the robust [amqp_client](https://github.com/rabbitmq/rabbitmq-server/tree/main/deps/amqp_client) Erlang library
- Inspired by RabbitMQ clients in other languages
- Thanks to the Gleam community for their support and feedback
## Support
- 📚 [Documentation](https://hexdocs.pm/carotte)
- 🐛 [Issue Tracker](https://github.com/renatillas/carotte/issues)
- 💬 [Discussions](https://github.com/renatillas/carotte/discussions)