README.md

# BeexQueue

[![CI](https://github.com/nathanpotter/beexqueue/actions/workflows/ci.yml/badge.svg)](https://github.com/nathanpotter/beexqueue/actions/workflows/ci.yml)
[![Hex.pm](https://img.shields.io/hexpm/v/beexqueue.svg)](https://hex.pm/packages/beexqueue)
[![License](https://img.shields.io/github/license/nathanpotter/beexqueue.svg)](https://github.com/nathanpotter/beexqueue/blob/main/LICENSE)

**A Redis-backed job queue library for Elixir**

BeexQueue is an Elixir port of the Node.js bee-queue library, designed to provide seamless interoperability with existing Node.js bee-queue systems. It uses Redis as the backend with the same key structure, JSON serialization, and atomic operations for full compatibility.

## Key Features

- **Job Queue Management**: Create, enqueue, and process jobs with custom handlers
- **Retry Support**: Configurable retry counts with multiple backoff strategies
- **Delayed Jobs**: Schedule jobs to run at future times using Redis sorted sets
- **Stall Detection**: Automatic detection and re-enqueueing of stalled jobs
- **Cross-Language Compatibility**: Full interoperability with Node.js bee-queue
- **Concurrent Processing**: Support for multiple workers and concurrency control
- **Backoff Strategies**: Immediate, fixed, and exponential backoff support
- **Event Publishing**: Optional Redis pub/sub event publishing
- **Job Progress Tracking**: Built-in progress reporting and status management
- **Comprehensive Testing**: Unit, integration, and cross-language interop tests

## Requirements

- Elixir ~> 1.15
- Erlang ~> 26.0
- Redis (default: `redis://localhost:6379`)

## Installation

Add `beexqueue` to your list of dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:beexqueue, "~> 0.1.0"}
  ]
end
```

Then run:

```bash
mix deps.get
```

## Quick Start

```elixir
# Create a queue
queue = BeexQueue.new("my-queue")

# Create and enqueue a job
{:ok, job} = BeexQueue.create_job(queue, %{task: "process_data", data: [1, 2, 3]})
:ok = BeexQueue.save(job)

# Process jobs
BeexQueue.process(queue, fn(job) ->
  IO.inspect(job.data)
  :ok
end)
```

## Configuration

### Basic Redis Configuration

Configure Redis connection in your `config/config.exs`:

```elixir
config :beexqueue, redis_url: "redis://localhost:6379"
```

### Advanced Queue Configuration

BeexQueue supports extensive configuration options:

```elixir
config :beexqueue,
  redis_url: "redis://localhost:6379",
  default_queue_settings: %{
    # Redis connection settings
    redis: %{host: "localhost", port: 6379},

    # Queue behavior
    prefix: "bq",                           # Redis key prefix
    is_worker: true,                        # Whether to process jobs
    stall_interval: 5000,                   # Stall check interval (ms)
    near_term_window: 1_200_000,            # Delayed job window (ms)

    # Delayed job processing
    activate_delayed_jobs: false,           # Enable delayed job activation
    delayed_debounce: 1000,                 # Debounce delay for delayed jobs

    # Event publishing
    send_events: true,                      # Publish job events to Redis

    # Job storage
    store_jobs: true,                       # Store job instances in memory
    remove_on_success: false,               # Remove jobs from Redis on success
    remove_on_failure: false,               # Remove jobs from Redis on failure

    # Performance tuning
    redis_scan_count: 100,                  # Batch size for Redis SCAN
    initial_redis_failure_retry_delay: 1000,# Initial retry delay for Redis failures

    # Connection management
    auto_connect: true,                     # Auto-connect to Redis on creation
    ensure_scripts: true                    # Cache Lua scripts in Redis
  }
```

### Queue-Specific Configuration

Override settings when creating individual queues:

```elixir
# Custom Redis connection
{:ok, queue} = BeexQueue.new("custom_queue",
  redis: %{host: "redis.example.com", port: 6380, password: "secret"},
  stall_interval: 10000,
  send_events: false
)

# High-performance queue for time-sensitive jobs
{:ok, fast_queue} = BeexQueue.new("fast_queue",
  stall_interval: 1000,
  near_term_window: 300_000,
  activate_delayed_jobs: true,
  redis_scan_count: 500
)

# Memory-efficient queue for large volumes
{:ok, efficient_queue} = BeexQueue.new("efficient_queue",
  store_jobs: false,
  remove_on_success: true,
  remove_on_failure: true
)
```

## Redis Key Structure

BeexQueue uses the same Redis key structure as bee-queue for interoperability:

- `bq:<queue_name>:id`: Auto-incrementing job ID counter
- `bq:<queue_name>:jobs`: Hash mapping job ID to JSON serialized job data
- `bq:<queue_name>:waiting`: List of pending job IDs
- `bq:<queue_name>:active`: List of active job IDs
- `bq:<queue_name>:succeeded`: Set of completed job IDs
- `bq:<queue_name>:failed`: Set of failed job IDs
- `bq:<queue_name>:delayed`: Sorted set for delayed jobs (score = Unix timestamp in ms)
- `bq:<queue_name>:stalling:<id>`: Expiring keys for stall detection

## Usage Examples

### Basic Job Processing

```elixir
# Create a queue
{:ok, queue} = BeexQueue.new("email-queue")

# Create and save a job
{:ok, job} = BeexQueue.create_job(queue, %{to: "user@example.com", subject: "Hello"})
{:ok, saved_job} = BeexQueue.save(job)

# Process jobs with a handler function
BeexQueue.process(queue, fn(job) ->
  # Your job logic here
  IO.puts("Sending email to #{job.data["to"]}")
  send_email(job.data)
  :ok  # Return :ok for success
end)
```

### Advanced Job Options

```elixir
# Job with retries and timeout
{:ok, job} = BeexQueue.create_job(queue,
  %{task: "process_payment", amount: 100},
  %{
    retries: 3,
    timeout: 30000,  # 30 seconds
    backoff: %{strategy: "exponential", delay: 1000}
  }
)
BeexQueue.save(job)
```

### Backoff Strategies

```elixir
# Immediate retry (no delay)
{:ok, job} = BeexQueue.create_job(queue, data,
  %{retries: 3, backoff: %{strategy: "immediate"}}
)

# Fixed delay between retries
{:ok, job} = BeexQueue.create_job(queue, data,
  %{retries: 3, backoff: %{strategy: "fixed", delay: 5000}}
)

# Exponential backoff (delay doubles each retry)
{:ok, job} = BeexQueue.create_job(queue, data,
  %{retries: 3, backoff: %{strategy: "exponential", delay: 1000}}
)
```

### Delayed Jobs

```elixir
# Schedule job to run in 1 hour
delay_ms = 60 * 60 * 1000
{:ok, job} = BeexQueue.create_job(queue, job_data, %{delay: delay_ms})
BeexQueue.save(job)

# Schedule job for specific time
future_time = DateTime.utc_now() |> DateTime.add(2, :hour)
delay_ms = DateTime.diff(future_time, DateTime.utc_now(), :millisecond)
{:ok, job} = BeexQueue.create_job(queue, job_data, %{delay: delay_ms})
BeexQueue.save(job)
```

### Job Progress Tracking

```elixir
BeexQueue.process(queue, fn(job) ->
  # Report progress during long-running jobs
  BeexQueue.Job.progress(job, 25)
  do_step_1()

  BeexQueue.Job.progress(job, 50)
  do_step_2()

  BeexQueue.Job.progress(job, 100)
  :ok
end)
```

### Multiple Workers and Concurrency

```elixir
# Process with 5 concurrent workers
BeexQueue.process(queue, handler_function, concurrency: 5)

# High-throughput processing
{:ok, high_volume_queue} = BeexQueue.new("high-volume",
  stall_interval: 1000,
  redis_scan_count: 200
)
BeexQueue.process(high_volume_queue, handler, concurrency: 10)
```

### Error Handling and Job Failures

```elixir
BeexQueue.process(queue, fn(job) ->
  case process_job(job.data) do
    {:ok, result} ->
      {:ok, result}

    {:error, :temporary_failure} ->
      # Job will be retried based on retry settings
      {:error, "Temporary failure"}

    {:error, :permanent_failure} ->
      # Job will fail immediately
      {:error, "Permanent failure"}
  end
end)
```

### Job Status Monitoring

```elixir
# Get all waiting jobs
{:ok, waiting_jobs} = BeexQueue.get_jobs(queue, "waiting")

# Get all active jobs
{:ok, active_jobs} = BeexQueue.get_jobs(queue, "active")

# Get specific job by ID
{:ok, job} = BeexQueue.get_job(queue, "job_123")

# Get queue statistics
stats = BeexQueue.stats(queue)
IO.inspect(stats)  # %{waiting: 5, active: 2, succeeded: 100, failed: 3}
```

## Architecture

BeexQueue follows Elixir best practices with:

- **Supervision**: Uses `Task.Supervisor` for workers and periodic tasks
- **State Management**: Queue state managed through structs (no global state)
- **Error Handling**: Returns `{:ok, value}` or `{:error, reason}` tuples
- **Logging**: Uses Elixir's Logger for debugging and monitoring

## Development

### Prerequisites

Ensure you have the following installed (matching our CI environment):

- **Elixir 1.18.3** (CI uses 1.18.x)
- **Erlang OTP 27.2** (CI uses 27.x)
- **Node.js 18.20.4** (CI uses 18.x for interop tests)
- **Docker** (for Redis and other services)

We recommend using [asdf](https://asdf-vm.com/) for version management. A `.tool-versions` file is included in the repository.

```bash
# Install asdf if you haven't already
git clone https://github.com/asdf-vm/asdf.git ~/.asdf

# Install required versions
asdf install
```

### Quick Setup (CI-Compatible Environment)

For the fastest setup that exactly matches CI:

```bash
# Clone and enter the repository
git clone https://github.com/nathanpotter/beexqueue.git
cd beexqueue

# Run the automated setup script (includes all CI checks)
./bin/setup
```

This script will:

- ✅ Verify you have the correct tool versions
- ✅ Install all Elixir and Node.js dependencies
- ✅ Start Redis using Docker Compose
- ✅ Run all CI checks (format, credo, compile, dialyzer)
- ✅ Run the complete test suite

### Manual Setup

If you prefer to set up manually:

1. **Clone the repository**

   ```bash
   git clone https://github.com/nathanpotter/beexqueue.git
   cd beexqueue
   ```

2. **Install Elixir dependencies**

   ```bash
   mix local.rebar --force
   mix local.hex --force
   mix deps.get
   ```

3. **Install Node.js dependencies** (for interop tests)

   ```bash
   cd test/interop
   npm install
   cd ../..
   ```

4. **Start Redis**

   ```bash
   # Using Docker Compose (recommended - matches CI)
   docker compose up -d redis

   # Verify Redis is running
   docker exec beexqueue-redis-1 redis-cli ping
   ```

5. **Verify your setup**

   ```bash
   # Compile the project
   mix compile --warnings-as-errors

   # Run formatting check
   mix format --check-formatted

   # Run linting
   mix credo --strict

   # Run type checking
   mix dialyzer

   # Run tests
   mix test
   ```

### Development Workflow

Use the development helper script for common tasks:

```bash
# Run all tests
./bin/dev test

# Run specific test types
./bin/dev unit          # Unit tests only
./bin/dev integration   # Integration tests only
./bin/dev interop       # Interop tests only

# Code quality
./bin/dev format        # Format code
./bin/dev lint          # Run Credo
./bin/dev dialyzer      # Run Dialyzer

# CI simulation
./bin/dev ci            # Run full CI check locally

# Redis management
./bin/dev redis:start   # Start Redis
./bin/dev redis:stop    # Stop Redis

# Cleanup
./bin/dev clean         # Clean build artifacts
```

### Running CI Checks Locally

To ensure your changes pass CI before pushing:

```bash
# Run the exact same checks as CI
./bin/dev ci

# Or run individual checks
mix format --check-formatted
mix credo
mix compile --warnings-as-errors
mix dialyzer
mix test
```

### Redis Development

Monitor Redis keys during development:

```bash
# List all BeexQueue keys
redis-cli KEYS "bq:*"

# Monitor Redis commands in real-time
redis-cli MONITOR

# Inspect job data
redis-cli HGETALL "bq:my_queue:jobs:1"
```

### Testing

#### Running Tests

```bash
# Run all tests
mix test

# Run specific test files
mix test test/beexqueue/job_test.exs
mix test test/beexqueue/integration_test.exs

# Run doctests (examples in @doc attributes)
mix test --doctest

# Run tests with verbose output
mix test --verbose
```

#### Test Types

**Unit Tests** (`test/beexqueue/`):

- Pure function testing with mocked Redis
- Job serialization/deserialization
- Configuration validation
- Backoff strategy calculations

**Integration Tests** (`test/beexqueue/integration_test.exs`):

- Full Redis integration
- Job lifecycle testing
- Queue operations
- Concurrency testing

**Interop Tests** (`test/beexqueue/interop_test.exs`):

- Node.js ↔ Elixir job compatibility
- Cross-language data preservation
- Bidirectional job processing
- JSON serialization compatibility

#### Interoperability Testing

BeexQueue includes comprehensive tests for Node.js bee-queue compatibility:

```bash
# Set up Node.js dependencies for interop tests
mix setup_interop

# Run interop tests
mix test --only interop

# Run interop tests with verbose output
mix test test/beexqueue/interop_test.exs --verbose
```

**Interop Test Scenarios:**

- **Node.js Producer → Elixir Consumer**: Jobs created by bee-queue processed by BeexQueue
- **Elixir Producer → Node.js Consumer**: Jobs created by BeexQueue processed by bee-queue
- **Bidirectional Flow**: Both systems working together concurrently
- **Data Integrity**: Complex data structures, nested objects, arrays
- **Error Handling**: Retry logic and failure scenarios

**Prerequisites for Interop Tests:**

- Node.js 12+
- Redis running on localhost:6379
- Internet connection (for npm install)

See `test/interop/README.md` for detailed interop test documentation.

### Code Quality

```bash
# Format code
mix format

# Check formatting (CI)
mix format --check-formatted

# Run linter
mix credo

# Run linter in strict mode
mix credo --strict

# Type checking
mix dialyzer

# Generate documentation
mix docs
```

### Redis Monitoring

Monitor Redis keys during development:

```bash
# List all BeexQueue keys
redis-cli KEYS "bq:*"

# Inspect jobs hash
redis-cli HGETALL "bq:my-queue:jobs"

# Check queue lengths
redis-cli LLEN "bq:my-queue:waiting"
redis-cli LLEN "bq:my-queue:active"
redis-cli SCARD "bq:my-queue:succeeded"
redis-cli SCARD "bq:my-queue:failed"
```

## Dependencies

### Runtime Dependencies

- [`redix ~> 1.5`](https://hex.pm/packages/redix): Redis client for Elixir
- [`jason ~> 1.4`](https://hex.pm/packages/jason): JSON serialization for job data

### Development and Testing Dependencies

- [`credo ~> 1.7`](https://hex.pm/packages/credo): Code quality linter and style checker
- [`dialyxir ~> 1.4`](https://hex.pm/packages/dialyxir): Static type analysis with Dialyzer
- [`ex_doc ~> 0.34`](https://hex.pm/packages/ex_doc): Documentation generator
- [`mox ~> 1.1`](https://hex.pm/packages/mox): Mocking framework for unit tests

## Compatibility

BeexQueue is designed to be fully compatible with Node.js bee-queue:

- Same Redis key structure and operations
- Compatible JSON serialization of job data
- Matching atomic operations (BRPOPLPUSH, MULTI/EXEC)
- Cross-language job interoperability
- Shared retry logic and backoff strategies
- Identical stall detection mechanisms

## Interoperability with Node.js bee-queue

BeexQueue provides seamless interoperability with the original Node.js bee-queue library, allowing you to:

- **Migrate gradually** between Node.js and Elixir systems
- **Share job queues** between different parts of your application
- **Process jobs** created by either system with either worker
- **Maintain data integrity** across language boundaries

### Interop Features

- **Bidirectional Job Processing**: Jobs created in Node.js can be processed by Elixir workers and vice versa
- **Data Structure Preservation**: Complex nested objects, arrays, and primitive types are maintained
- **JSON Compatibility**: Uses identical JSON serialization format
- **Redis Key Compatibility**: Same key structure and atomic operations
- **Job Options Compatibility**: Retry counts, timeouts, and backoff strategies work identically

### Setting Up Interop Tests

```bash
# Install Node.js dependencies for bee-queue
mix setup_interop

# Verify setup
cd test/interop && npm list
```

### Running Interop Tests

```bash
# Run all interop tests
mix test --only interop

# Run with detailed output
mix test test/beexqueue/interop_test.exs --verbose
```

### Interop Test Scenarios

The interop tests cover comprehensive compatibility scenarios:

#### Node.js Producer → Elixir Consumer

```javascript
// Node.js producer
const Queue = require("bee-queue");
const queue = new Queue("test-queue");
const job = queue.createJob({ message: "from Node.js" });
await job.save();
```

```elixir
# Elixir consumer
{:ok, queue} = BeexQueue.new("test-queue")
BeexQueue.process(queue, fn(job) ->
  IO.puts("Processing: #{job.data["message"]}")
  :ok
end)
```

#### Elixir Producer → Node.js Consumer

```elixir
# Elixir producer
{:ok, queue} = BeexQueue.new("test-queue")
{:ok, job} = BeexQueue.create_job(queue, %{message: "from Elixir"})
BeexQueue.save(job)
```

```javascript
// Node.js consumer
const Queue = require("bee-queue");
const queue = new Queue("test-queue");
queue.process((job, done) => {
  console.log(`Processing: ${job.data.message}`);
  done();
});
```

#### Data Structure Compatibility

Both systems preserve complex data structures:

```javascript
// Node.js
job.data = {
  nested: { array: [1, 2, 3], boolean: true, null: null },
  timestamp: new Date().toISOString(),
};
```

```elixir
# Elixir
job_data = %{
  "nested" => %{
    "array" => [1, 2, 3],
    "boolean" => true,
    "null" => nil
  },
  "timestamp" => DateTime.utc_now() |> DateTime.to_iso8601()
}
```

### Production Interop Setup

For production interoperability:

1. **Use the same Redis instance** for both Node.js and Elixir applications
2. **Configure identical queue names** across systems
3. **Set compatible job options** (retries, timeouts, backoff)
4. **Monitor both systems** for job processing statistics
5. **Test data serialization** thoroughly before deployment

### Troubleshooting Interop Issues

- **Version Compatibility**: Ensure bee-queue versions are compatible
- **Redis Connection**: Verify both systems connect to the same Redis instance
- **Data Types**: Check that data types are serializable in both languages
- **Job Options**: Validate that retry and backoff configurations match
- **Network Latency**: Account for network delays in distributed setups

See `test/interop/README.md` for detailed documentation and troubleshooting guides.

## Contributing

We welcome contributions! Please follow these guidelines:

### Development Workflow

1. **Fork the repository** and create a feature branch
2. **Follow coding standards**:

   - Use `mix format` to format your code
   - Run `mix credo` to check code quality
   - Add tests for new features
   - Ensure compatibility with Node.js bee-queue

3. **Testing**:

   - Write comprehensive tests
   - Aim for high test coverage
   - Test both success and error scenarios

4. **Documentation**:

   - Add `@doc` and `@moduledoc` for public APIs
   - Update README if needed
   - Include examples in documentation

5. **Commits**:
   - Use conventional commit format
   - Write clear, descriptive commit messages

### Pull Request Process

1. **Create a Pull Request** with a clear description
2. **Ensure CI passes** - all tests, linting, and formatting checks
3. **Code Review** - address any feedback from maintainers
4. **Merge** - once approved, your PR will be merged

### Code Quality Tools

Before submitting a PR, run these commands:

```bash
# Format code
mix format

# Run linter
mix credo

# Run type checker
mix dialyzer

# Run tests with coverage
mix test --cover
```

### Issues

- **Bug reports**: Use the issue template with detailed reproduction steps
- **Feature requests**: Describe the use case and proposed API
- **Questions**: Check existing issues and discussions first

## License

This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.

## Support

- **Documentation**: [HexDocs](https://hexdocs.pm/beexqueue)
- **Issues**: [GitHub Issues](https://github.com/nathanpotter/beexqueue/issues)
- **Discussions**: [GitHub Discussions](https://github.com/nathanpotter/beexqueue/discussions)