# BeexQueue
[](https://github.com/nathanpotter/beexqueue/actions/workflows/ci.yml)
[](https://hex.pm/packages/beexqueue)
[](https://github.com/nathanpotter/beexqueue/blob/main/LICENSE)
**A Redis-backed job queue library for Elixir**
BeexQueue is an Elixir port of the Node.js bee-queue library, designed to provide seamless interoperability with existing Node.js bee-queue systems. It uses Redis as the backend with the same key structure, JSON serialization, and atomic operations for full compatibility.
## Key Features
- **Job Queue Management**: Create, enqueue, and process jobs with custom handlers
- **Retry Support**: Configurable retry counts with multiple backoff strategies
- **Delayed Jobs**: Schedule jobs to run at future times using Redis sorted sets
- **Stall Detection**: Automatic detection and re-enqueueing of stalled jobs
- **Cross-Language Compatibility**: Full interoperability with Node.js bee-queue
- **Concurrent Processing**: Support for multiple workers and concurrency control
- **Backoff Strategies**: Immediate, fixed, and exponential backoff support
- **Event Publishing**: Optional Redis pub/sub event publishing
- **Job Progress Tracking**: Built-in progress reporting and status management
- **Comprehensive Testing**: Unit, integration, and cross-language interop tests
## Requirements
- Elixir ~> 1.15
- Erlang ~> 26.0
- Redis (default: `redis://localhost:6379`)
## Installation
Add `beexqueue` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:beexqueue, "~> 0.1.0"}
]
end
```
Then run:
```bash
mix deps.get
```
## Quick Start
```elixir
# Create a queue
queue = BeexQueue.new("my-queue")
# Create and enqueue a job
{:ok, job} = BeexQueue.create_job(queue, %{task: "process_data", data: [1, 2, 3]})
:ok = BeexQueue.save(job)
# Process jobs
BeexQueue.process(queue, fn(job) ->
IO.inspect(job.data)
:ok
end)
```
## Configuration
### Basic Redis Configuration
Configure Redis connection in your `config/config.exs`:
```elixir
config :beexqueue, redis_url: "redis://localhost:6379"
```
### Advanced Queue Configuration
BeexQueue supports extensive configuration options:
```elixir
config :beexqueue,
redis_url: "redis://localhost:6379",
default_queue_settings: %{
# Redis connection settings
redis: %{host: "localhost", port: 6379},
# Queue behavior
prefix: "bq", # Redis key prefix
is_worker: true, # Whether to process jobs
stall_interval: 5000, # Stall check interval (ms)
near_term_window: 1_200_000, # Delayed job window (ms)
# Delayed job processing
activate_delayed_jobs: false, # Enable delayed job activation
delayed_debounce: 1000, # Debounce delay for delayed jobs
# Event publishing
send_events: true, # Publish job events to Redis
# Job storage
store_jobs: true, # Store job instances in memory
remove_on_success: false, # Remove jobs from Redis on success
remove_on_failure: false, # Remove jobs from Redis on failure
# Performance tuning
redis_scan_count: 100, # Batch size for Redis SCAN
initial_redis_failure_retry_delay: 1000,# Initial retry delay for Redis failures
# Connection management
auto_connect: true, # Auto-connect to Redis on creation
ensure_scripts: true # Cache Lua scripts in Redis
}
```
### Queue-Specific Configuration
Override settings when creating individual queues:
```elixir
# Custom Redis connection
{:ok, queue} = BeexQueue.new("custom_queue",
redis: %{host: "redis.example.com", port: 6380, password: "secret"},
stall_interval: 10000,
send_events: false
)
# High-performance queue for time-sensitive jobs
{:ok, fast_queue} = BeexQueue.new("fast_queue",
stall_interval: 1000,
near_term_window: 300_000,
activate_delayed_jobs: true,
redis_scan_count: 500
)
# Memory-efficient queue for large volumes
{:ok, efficient_queue} = BeexQueue.new("efficient_queue",
store_jobs: false,
remove_on_success: true,
remove_on_failure: true
)
```
## Redis Key Structure
BeexQueue uses the same Redis key structure as bee-queue for interoperability:
- `bq:<queue_name>:id`: Auto-incrementing job ID counter
- `bq:<queue_name>:jobs`: Hash mapping job ID to JSON serialized job data
- `bq:<queue_name>:waiting`: List of pending job IDs
- `bq:<queue_name>:active`: List of active job IDs
- `bq:<queue_name>:succeeded`: Set of completed job IDs
- `bq:<queue_name>:failed`: Set of failed job IDs
- `bq:<queue_name>:delayed`: Sorted set for delayed jobs (score = Unix timestamp in ms)
- `bq:<queue_name>:stalling:<id>`: Expiring keys for stall detection
## Usage Examples
### Basic Job Processing
```elixir
# Create a queue
{:ok, queue} = BeexQueue.new("email-queue")
# Create and save a job
{:ok, job} = BeexQueue.create_job(queue, %{to: "user@example.com", subject: "Hello"})
{:ok, saved_job} = BeexQueue.save(job)
# Process jobs with a handler function
BeexQueue.process(queue, fn(job) ->
# Your job logic here
IO.puts("Sending email to #{job.data["to"]}")
send_email(job.data)
:ok # Return :ok for success
end)
```
### Advanced Job Options
```elixir
# Job with retries and timeout
{:ok, job} = BeexQueue.create_job(queue,
%{task: "process_payment", amount: 100},
%{
retries: 3,
timeout: 30000, # 30 seconds
backoff: %{strategy: "exponential", delay: 1000}
}
)
BeexQueue.save(job)
```
### Backoff Strategies
```elixir
# Immediate retry (no delay)
{:ok, job} = BeexQueue.create_job(queue, data,
%{retries: 3, backoff: %{strategy: "immediate"}}
)
# Fixed delay between retries
{:ok, job} = BeexQueue.create_job(queue, data,
%{retries: 3, backoff: %{strategy: "fixed", delay: 5000}}
)
# Exponential backoff (delay doubles each retry)
{:ok, job} = BeexQueue.create_job(queue, data,
%{retries: 3, backoff: %{strategy: "exponential", delay: 1000}}
)
```
### Delayed Jobs
```elixir
# Schedule job to run in 1 hour
delay_ms = 60 * 60 * 1000
{:ok, job} = BeexQueue.create_job(queue, job_data, %{delay: delay_ms})
BeexQueue.save(job)
# Schedule job for specific time
future_time = DateTime.utc_now() |> DateTime.add(2, :hour)
delay_ms = DateTime.diff(future_time, DateTime.utc_now(), :millisecond)
{:ok, job} = BeexQueue.create_job(queue, job_data, %{delay: delay_ms})
BeexQueue.save(job)
```
### Job Progress Tracking
```elixir
BeexQueue.process(queue, fn(job) ->
# Report progress during long-running jobs
BeexQueue.Job.progress(job, 25)
do_step_1()
BeexQueue.Job.progress(job, 50)
do_step_2()
BeexQueue.Job.progress(job, 100)
:ok
end)
```
### Multiple Workers and Concurrency
```elixir
# Process with 5 concurrent workers
BeexQueue.process(queue, handler_function, concurrency: 5)
# High-throughput processing
{:ok, high_volume_queue} = BeexQueue.new("high-volume",
stall_interval: 1000,
redis_scan_count: 200
)
BeexQueue.process(high_volume_queue, handler, concurrency: 10)
```
### Error Handling and Job Failures
```elixir
BeexQueue.process(queue, fn(job) ->
case process_job(job.data) do
{:ok, result} ->
{:ok, result}
{:error, :temporary_failure} ->
# Job will be retried based on retry settings
{:error, "Temporary failure"}
{:error, :permanent_failure} ->
# Job will fail immediately
{:error, "Permanent failure"}
end
end)
```
### Job Status Monitoring
```elixir
# Get all waiting jobs
{:ok, waiting_jobs} = BeexQueue.get_jobs(queue, "waiting")
# Get all active jobs
{:ok, active_jobs} = BeexQueue.get_jobs(queue, "active")
# Get specific job by ID
{:ok, job} = BeexQueue.get_job(queue, "job_123")
# Get queue statistics
stats = BeexQueue.stats(queue)
IO.inspect(stats) # %{waiting: 5, active: 2, succeeded: 100, failed: 3}
```
## Architecture
BeexQueue follows Elixir best practices with:
- **Supervision**: Uses `Task.Supervisor` for workers and periodic tasks
- **State Management**: Queue state managed through structs (no global state)
- **Error Handling**: Returns `{:ok, value}` or `{:error, reason}` tuples
- **Logging**: Uses Elixir's Logger for debugging and monitoring
## Development
### Prerequisites
Ensure you have the following installed (matching our CI environment):
- **Elixir 1.18.3** (CI uses 1.18.x)
- **Erlang OTP 27.2** (CI uses 27.x)
- **Node.js 18.20.4** (CI uses 18.x for interop tests)
- **Docker** (for Redis and other services)
We recommend using [asdf](https://asdf-vm.com/) for version management. A `.tool-versions` file is included in the repository.
```bash
# Install asdf if you haven't already
git clone https://github.com/asdf-vm/asdf.git ~/.asdf
# Install required versions
asdf install
```
### Quick Setup (CI-Compatible Environment)
For the fastest setup that exactly matches CI:
```bash
# Clone and enter the repository
git clone https://github.com/nathanpotter/beexqueue.git
cd beexqueue
# Run the automated setup script (includes all CI checks)
./bin/setup
```
This script will:
- ✅ Verify you have the correct tool versions
- ✅ Install all Elixir and Node.js dependencies
- ✅ Start Redis using Docker Compose
- ✅ Run all CI checks (format, credo, compile, dialyzer)
- ✅ Run the complete test suite
### Manual Setup
If you prefer to set up manually:
1. **Clone the repository**
```bash
git clone https://github.com/nathanpotter/beexqueue.git
cd beexqueue
```
2. **Install Elixir dependencies**
```bash
mix local.rebar --force
mix local.hex --force
mix deps.get
```
3. **Install Node.js dependencies** (for interop tests)
```bash
cd test/interop
npm install
cd ../..
```
4. **Start Redis**
```bash
# Using Docker Compose (recommended - matches CI)
docker compose up -d redis
# Verify Redis is running
docker exec beexqueue-redis-1 redis-cli ping
```
5. **Verify your setup**
```bash
# Compile the project
mix compile --warnings-as-errors
# Run formatting check
mix format --check-formatted
# Run linting
mix credo --strict
# Run type checking
mix dialyzer
# Run tests
mix test
```
### Development Workflow
Use the development helper script for common tasks:
```bash
# Run all tests
./bin/dev test
# Run specific test types
./bin/dev unit # Unit tests only
./bin/dev integration # Integration tests only
./bin/dev interop # Interop tests only
# Code quality
./bin/dev format # Format code
./bin/dev lint # Run Credo
./bin/dev dialyzer # Run Dialyzer
# CI simulation
./bin/dev ci # Run full CI check locally
# Redis management
./bin/dev redis:start # Start Redis
./bin/dev redis:stop # Stop Redis
# Cleanup
./bin/dev clean # Clean build artifacts
```
### Running CI Checks Locally
To ensure your changes pass CI before pushing:
```bash
# Run the exact same checks as CI
./bin/dev ci
# Or run individual checks
mix format --check-formatted
mix credo
mix compile --warnings-as-errors
mix dialyzer
mix test
```
### Redis Development
Monitor Redis keys during development:
```bash
# List all BeexQueue keys
redis-cli KEYS "bq:*"
# Monitor Redis commands in real-time
redis-cli MONITOR
# Inspect job data
redis-cli HGETALL "bq:my_queue:jobs:1"
```
### Testing
#### Running Tests
```bash
# Run all tests
mix test
# Run specific test files
mix test test/beexqueue/job_test.exs
mix test test/beexqueue/integration_test.exs
# Run doctests (examples in @doc attributes)
mix test --doctest
# Run tests with verbose output
mix test --verbose
```
#### Test Types
**Unit Tests** (`test/beexqueue/`):
- Pure function testing with mocked Redis
- Job serialization/deserialization
- Configuration validation
- Backoff strategy calculations
**Integration Tests** (`test/beexqueue/integration_test.exs`):
- Full Redis integration
- Job lifecycle testing
- Queue operations
- Concurrency testing
**Interop Tests** (`test/beexqueue/interop_test.exs`):
- Node.js ↔ Elixir job compatibility
- Cross-language data preservation
- Bidirectional job processing
- JSON serialization compatibility
#### Interoperability Testing
BeexQueue includes comprehensive tests for Node.js bee-queue compatibility:
```bash
# Set up Node.js dependencies for interop tests
mix setup_interop
# Run interop tests
mix test --only interop
# Run interop tests with verbose output
mix test test/beexqueue/interop_test.exs --verbose
```
**Interop Test Scenarios:**
- **Node.js Producer → Elixir Consumer**: Jobs created by bee-queue processed by BeexQueue
- **Elixir Producer → Node.js Consumer**: Jobs created by BeexQueue processed by bee-queue
- **Bidirectional Flow**: Both systems working together concurrently
- **Data Integrity**: Complex data structures, nested objects, arrays
- **Error Handling**: Retry logic and failure scenarios
**Prerequisites for Interop Tests:**
- Node.js 12+
- Redis running on localhost:6379
- Internet connection (for npm install)
See `test/interop/README.md` for detailed interop test documentation.
### Code Quality
```bash
# Format code
mix format
# Check formatting (CI)
mix format --check-formatted
# Run linter
mix credo
# Run linter in strict mode
mix credo --strict
# Type checking
mix dialyzer
# Generate documentation
mix docs
```
### Redis Monitoring
Monitor Redis keys during development:
```bash
# List all BeexQueue keys
redis-cli KEYS "bq:*"
# Inspect jobs hash
redis-cli HGETALL "bq:my-queue:jobs"
# Check queue lengths
redis-cli LLEN "bq:my-queue:waiting"
redis-cli LLEN "bq:my-queue:active"
redis-cli SCARD "bq:my-queue:succeeded"
redis-cli SCARD "bq:my-queue:failed"
```
## Dependencies
### Runtime Dependencies
- [`redix ~> 1.5`](https://hex.pm/packages/redix): Redis client for Elixir
- [`jason ~> 1.4`](https://hex.pm/packages/jason): JSON serialization for job data
### Development and Testing Dependencies
- [`credo ~> 1.7`](https://hex.pm/packages/credo): Code quality linter and style checker
- [`dialyxir ~> 1.4`](https://hex.pm/packages/dialyxir): Static type analysis with Dialyzer
- [`ex_doc ~> 0.34`](https://hex.pm/packages/ex_doc): Documentation generator
- [`mox ~> 1.1`](https://hex.pm/packages/mox): Mocking framework for unit tests
## Compatibility
BeexQueue is designed to be fully compatible with Node.js bee-queue:
- Same Redis key structure and operations
- Compatible JSON serialization of job data
- Matching atomic operations (BRPOPLPUSH, MULTI/EXEC)
- Cross-language job interoperability
- Shared retry logic and backoff strategies
- Identical stall detection mechanisms
## Interoperability with Node.js bee-queue
BeexQueue provides seamless interoperability with the original Node.js bee-queue library, allowing you to:
- **Migrate gradually** between Node.js and Elixir systems
- **Share job queues** between different parts of your application
- **Process jobs** created by either system with either worker
- **Maintain data integrity** across language boundaries
### Interop Features
- **Bidirectional Job Processing**: Jobs created in Node.js can be processed by Elixir workers and vice versa
- **Data Structure Preservation**: Complex nested objects, arrays, and primitive types are maintained
- **JSON Compatibility**: Uses identical JSON serialization format
- **Redis Key Compatibility**: Same key structure and atomic operations
- **Job Options Compatibility**: Retry counts, timeouts, and backoff strategies work identically
### Setting Up Interop Tests
```bash
# Install Node.js dependencies for bee-queue
mix setup_interop
# Verify setup
cd test/interop && npm list
```
### Running Interop Tests
```bash
# Run all interop tests
mix test --only interop
# Run with detailed output
mix test test/beexqueue/interop_test.exs --verbose
```
### Interop Test Scenarios
The interop tests cover comprehensive compatibility scenarios:
#### Node.js Producer → Elixir Consumer
```javascript
// Node.js producer
const Queue = require("bee-queue");
const queue = new Queue("test-queue");
const job = queue.createJob({ message: "from Node.js" });
await job.save();
```
```elixir
# Elixir consumer
{:ok, queue} = BeexQueue.new("test-queue")
BeexQueue.process(queue, fn(job) ->
IO.puts("Processing: #{job.data["message"]}")
:ok
end)
```
#### Elixir Producer → Node.js Consumer
```elixir
# Elixir producer
{:ok, queue} = BeexQueue.new("test-queue")
{:ok, job} = BeexQueue.create_job(queue, %{message: "from Elixir"})
BeexQueue.save(job)
```
```javascript
// Node.js consumer
const Queue = require("bee-queue");
const queue = new Queue("test-queue");
queue.process((job, done) => {
console.log(`Processing: ${job.data.message}`);
done();
});
```
#### Data Structure Compatibility
Both systems preserve complex data structures:
```javascript
// Node.js
job.data = {
nested: { array: [1, 2, 3], boolean: true, null: null },
timestamp: new Date().toISOString(),
};
```
```elixir
# Elixir
job_data = %{
"nested" => %{
"array" => [1, 2, 3],
"boolean" => true,
"null" => nil
},
"timestamp" => DateTime.utc_now() |> DateTime.to_iso8601()
}
```
### Production Interop Setup
For production interoperability:
1. **Use the same Redis instance** for both Node.js and Elixir applications
2. **Configure identical queue names** across systems
3. **Set compatible job options** (retries, timeouts, backoff)
4. **Monitor both systems** for job processing statistics
5. **Test data serialization** thoroughly before deployment
### Troubleshooting Interop Issues
- **Version Compatibility**: Ensure bee-queue versions are compatible
- **Redis Connection**: Verify both systems connect to the same Redis instance
- **Data Types**: Check that data types are serializable in both languages
- **Job Options**: Validate that retry and backoff configurations match
- **Network Latency**: Account for network delays in distributed setups
See `test/interop/README.md` for detailed documentation and troubleshooting guides.
## Contributing
We welcome contributions! Please follow these guidelines:
### Development Workflow
1. **Fork the repository** and create a feature branch
2. **Follow coding standards**:
- Use `mix format` to format your code
- Run `mix credo` to check code quality
- Add tests for new features
- Ensure compatibility with Node.js bee-queue
3. **Testing**:
- Write comprehensive tests
- Aim for high test coverage
- Test both success and error scenarios
4. **Documentation**:
- Add `@doc` and `@moduledoc` for public APIs
- Update README if needed
- Include examples in documentation
5. **Commits**:
- Use conventional commit format
- Write clear, descriptive commit messages
### Pull Request Process
1. **Create a Pull Request** with a clear description
2. **Ensure CI passes** - all tests, linting, and formatting checks
3. **Code Review** - address any feedback from maintainers
4. **Merge** - once approved, your PR will be merged
### Code Quality Tools
Before submitting a PR, run these commands:
```bash
# Format code
mix format
# Run linter
mix credo
# Run type checker
mix dialyzer
# Run tests with coverage
mix test --cover
```
### Issues
- **Bug reports**: Use the issue template with detailed reproduction steps
- **Feature requests**: Describe the use case and proposed API
- **Questions**: Check existing issues and discussions first
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.
## Support
- **Documentation**: [HexDocs](https://hexdocs.pm/beexqueue)
- **Issues**: [GitHub Issues](https://github.com/nathanpotter/beexqueue/issues)
- **Discussions**: [GitHub Discussions](https://github.com/nathanpotter/beexqueue/discussions)