Skip to main content

README.md

# Arrea

Async process orchestrator and telemetry for Elixir.

Arrea is an OTP-based library that provides parallel process execution, worker management, circuit breaker protection, command validation, and built-in telemetry for monitoring your Elixir applications.

## Quick Start

Add Arrea to your `mix.exs`:

```elixir
def deps do
  [
    {:arrea, "~> 0.1.0"}
  ]
end
```

Arrea starts its supervision tree automatically when added as a dependency. No manual setup required.

### Execute a single command

```elixir
# Shell command
{:ok, result} = Arrea.execute("echo hello")

# Or a function
{:ok, result} = Arrea.execute(fn -> :work end)
```

### Run commands in parallel

```elixir
{:ok, result} = Arrea.run(
  [
    fn -> Process.sleep(100); 1 end,
    fn -> Process.sleep(100); 2 end,
    fn -> Process.sleep(100); 3 end
  ],
  workers: 2
)
```

### Subscribe to events

```elixir
:ok = Arrea.subscribe()

receive do
  {:leader_event, %{type: :finished, worker_id: id}} ->
    IO.puts("Worker #{id} finished")
  {:leader_event, event} ->
    IO.inspect(event, label: "Event")
end

:ok = Arrea.unsubscribe()
```

### Get stats

```elixir
{:ok, stats} = Arrea.stats()
# => %{
#      total_workers: 10,
#      active_workers: 3,
#      completed_tasks: 42,
#      failed_tasks: 2
#    }
```

## Features

- **Parallel execution** — Run commands and functions concurrently with configurable worker pools via `Arrea.run/2`
- **Synchronous execution** — Execute single commands with `Arrea.execute/2`, including shell integration with real timeout cancellation
- **Circuit breaker** — Protect external calls with automatic open/close/half-open state transitions to prevent cascading failures
- **Command validation** — Built-in validation rules blocking dangerous commands (`rm -rf`, `sudo`, `mkfs`, fork bombs, injection patterns)
- **Telemetry** — Rich event system with worker lifecycle, task progress, system metrics, and circuit breaker state tracking
- **Error policies** — Configurable error handling: retry, stop, continue, or custom handlers with retry counts and delays
- **Worker monitoring** — Subscribe to real-time events: worker start, completion, failure, and progress updates
- **Batch execution** — Submit command batches with worker limits and per-worker timeouts
- **ASDF/mise integration** — Runtime version management via `asdf` or `mise` with support for `--asdf-<runtime>` CLI flags and `mise exec` wrapping
- **Custom shell** — Configurable shell per-command (`--shell`), via config (`Arrea.Config.set(:shell, ...)`), or auto-detected from `$SHELL` with automatic config file sourcing
- **Structured results** — `Arrea.Result` and `Arrea.Error` structs for consistent return types

## CLI

Arrea includes a command-line interface built with [Alaja](https://github.com/lorenzo-sf/alaja):

```bash
# Build the escript
mix escript.build

# Run locally
./arrea run --command "echo hello"

# Install to ~/bin
mix install
```

### `arrea run`

Execute shell commands in parallel with progress tracking.

```bash
# Single command
arrea run --command "echo hello"

# Multiple commands (parallel)
arrea run --command "echo a" --command "echo b"

# With worker limit
arrea run --command "sleep 1" --command "sleep 2" --parallel 2

# Custom timeout (ms)
arrea run --command "sleep 10" --timeout 5000

# Quiet mode (suppress progress)
arrea run --command "echo done" --quiet

# Custom shell
arrea run --command "echo $0" --shell zsh

# With ASDF version
arrea run --command "mix test" --asdf-elixir 1.18.0

# With mise version
arrea run --command "node -v" --mise-node 20.0.0
```

### `arrea config`

Manage Arrea engine configuration at runtime.

```bash
# Show all config
arrea config --show

# Get a value
arrea config get max_workers

# Set a value
arrea config set max_workers 200
arrea config set default_policy stop
arrea config set asdf_enabled true
arrea config set log_level debug
```

### `arrea action`

Execute Arrea commands from JSON input (stdin, file, or inline).

```bash
# From stdin
echo '{"command":"run","args":["--command","echo hello"]}' | arrea action

# From file
arrea action --file ./pipeline.json

# Inline JSON
arrea action --data '{"command":"run","args":["--command","echo hi","--quiet"]}'

# Batch actions
arrea action --data '{
  "actions": [
    {"command": "run", "args": ["--command", "echo first"], "order": 0},
    {"command": "run", "args": ["--command", "echo second", "--quiet"], "order": 1}
  ]
}'
```

## Architecture

```
┌─────────────────────────────────────────────────────┐
│                    Arrea (Facade)                    │
└───────────────────────┬─────────────────────────────┘
                        │
┌───────────────────────▼─────────────────────────────┐
│              Arrea.Leader (GenServer)                │
│     Coordinates execution, manages workers,          │
│     broadcasts {:leader_event, event} to subscribers │
└───────────────────────┬─────────────────────────────┘
                        │
┌───────────────────────▼─────────────────────────────┐
│       Arrea.WorkerSupervisor (DynamicSupervisor)     │
│              Spawns ephemeral workers                │
└───────────────────────┬─────────────────────────────┘
                        │
┌───────────────────────▼─────────────────────────────┐
│            Arrea.Worker (GenServer)                  │
│     Executes individual tasks, handles policies,     │
│     reports progress via Leader                      │
└─────────────────────────────────────────────────────┘

  Arrea.Monitor (GenServer)   — Tracks worker lifecycle, provides stats
  Arrea.CircuitBreaker        — Fault tolerance for external dependencies
```

All processes run under `Arrea.Supervisor` with `:rest_for_one` strategy, using two Registries (`Arrea.Registry` for workers, `Arrea.CircuitBreaker.Registry` for circuit breakers). With `:rest_for_one`, only the processes that depend on a failed process are restarted, minimizing disruption to active batches.

## API

### `Arrea.execute/2`

Execute a single command (binary shell command or zero-arity function).

```elixir
@spec execute(binary() | (-> term()), keyword()) ::
        {:ok, Arrea.Result.t()} | {:error, Arrea.Error.t()}
```

Options:

- `:timeout` — Timeout in ms (default: `30_000`). **Real timeout**: cancels execution if exceeded.
- `:retry` — Whether to retry on failure
- `:shell` — Shell to use — highest priority, overrides config and `$SHELL`
- `:shell_config` — Path to shell config file to source (auto-detected by default)
- `:asdf_<runtime>` — Pin runtime version via asdf/mise (e.g. `asdf_elixir: "1.18.0"`)
- `:mise_<runtime>` — Use `mise exec` wrapping (e.g. `mise_node: "20.0.0"`)

### `Arrea.run/2`

Execute multiple commands in parallel.

```elixir
@spec run([binary() | (-> term())], keyword()) ::
        {:ok, Arrea.Result.t()} | {:error, Arrea.Error.t()}
```

Options:

- `:workers` — Max parallel workers (default: `max_workers()`)
- `:timeout` — Total timeout in ms

### `Arrea.subscribe/0` / `Arrea.unsubscribe/0`

Subscribe (or unsubscribe) the calling process to Leader events.

Messages received are `{:leader_event, event}` where `event` is a map with at least a `:type` key:

| Type               | Extra keys                               |
| ------------------ | ---------------------------------------- |
| `:worker_started`  | `worker_id`                              |
| `:progress`        | `worker_id`, `percent`, `task_index`, `total` |
| `:finished`        | `worker_id`                              |
| `:error`           | `worker_id`, `reason`                    |
| `:result`          | `worker_id`, `data`                      |

```elixir
@spec subscribe() :: :ok
@spec unsubscribe() :: :ok
```

### `Arrea.stats/0`

Get current engine statistics (provided by `Arrea.Monitor`).

```elixir
@spec stats() :: {:ok, map()} | {:error, :monitor_unavailable}
```

### `Arrea.max_workers/0`

Get the configured max workers.

```elixir
@spec max_workers() :: non_neg_integer()
```

## Configuration

### Priority (lowest to highest)

For use **as a library**:

1. `@default` in `Arrea.Config` — compile-time baseline
2. `config :arrea, :engine, [...]` in the consuming project's `config.exs` — overrides defaults
3. `Arrea.Config.set/2` at runtime — overrides static config for the current session
4. Opts passed directly to functions — highest priority, per-call

For use **as a CLI binary**:

1. `@default` baseline
2. Application env (from `config.exs` if applicable)
3. `arrea config set KEY VALUE` — session-level, persists while the binary process is running
4. CLI args — highest priority, per-invocation only

### `config.exs` example

Accepts both keyword list and map:

```elixir
config :arrea, :engine,
  max_workers: 100,
  max_commands_per_batch: 500,
  default_policy: :retry,
  max_retries: 3,
  retry_delay: 1_000,
  restart_limit: 3,
  circuit_breaker_threshold: 5,
  circuit_breaker_timeout: 60_000,
  validation_rules: [:no_rm_rf, :no_sudo, :no_dd, :no_mkfs, :no_fork_bomb],
  telemetry_enabled: true,
  log_level: :info
```

| Key                         | Type    | Default  | Description                                     |
| --------------------------- | ------- | -------- | ----------------------------------------------- |
| `max_workers`               | integer | `100`    | Maximum parallel workers                        |
| `max_commands_per_batch`    | integer | `500`    | Max commands per batch                          |
| `default_policy`            | atom    | `:retry` | Default error policy for workers                |
| `max_retries`               | integer | `3`      | Max retry attempts                              |
| `retry_delay`               | integer | `1_000`  | Delay between retries (ms)                      |
| `restart_limit`             | integer | `3`      | Worker restart limit                            |
| `circuit_breaker_threshold` | integer | `5`      | Failures before circuit opens                   |
| `circuit_breaker_timeout`   | integer | `60_000` | Time before half-open attempt (ms)             |
| `validation_rules`          | list    | see below | Blocked command patterns                     |
| `asdf_enabled`              | boolean | `true`   | Enable ASDF version management                 |
| `telemetry_enabled`         | boolean | `true`   | Enable telemetry                               |
| `log_level`                 | atom    | `:info`  | Log verbosity                                  |
| `shell`                     | string  | `nil`    | Default shell (e.g. `"/bin/zsh"`)             |

**Validation rules** (default):
- `:no_rm_rf` — blocks `rm -rf`
- `:no_sudo` — blocks `sudo`
- `:no_dd` — blocks `dd`
- `:no_mkfs` — blocks `mkfs`
- `:no_fork_bomb` — blocks fork bombs

### Runtime config

```elixir
Arrea.Config.get(:max_workers)      # => 100
Arrea.Config.set(:max_workers, 50)  # persists for the current VM session
Arrea.Config.all()                  # => full effective config map
```

## Telemetry Events

Arrea emits the following `:telemetry` events:

### Worker events

| Event                           | Measurements | Metadata              |
| ------------------------------- | ------------ | --------------------- |
| `[:arrea, :worker, :started]`   | —            | `worker_id`           |
| `[:arrea, :worker, :completed]` | `duration`   | `worker_id`           |
| `[:arrea, :worker, :error]`     | —            | `worker_id`, `reason` |
| `[:arrea, :worker, :message]`   | —            | `worker_id`           |

### Task events

| Event                         | Measurements | Metadata              |
| ----------------------------- | ------------ | --------------------- |
| `[:arrea, :task, :started]`   | —            | —                     |
| `[:arrea, :task, :completed]` | `duration`   | —                     |
| `[:arrea, :task, :error]`     | —            | `worker_id`, `reason` |

### Engine events

| Event                                 | Measurements | Metadata             |
| ------------------------------------- | ------------ | -------------------- |
| `[:arrea, :engine, :execute, :start]` | —            | `command`            |
| `[:arrea, :engine, :execute, :stop]`  | `duration`   | `command`, `success` |
| `[:arrea, :engine, :execute, :error]` | `duration`   | `command`, `reason`  |
| `[:arrea, :engine, :run, :start]`     | —            | `count`, `workers`   |
| `[:arrea, :engine, :run, :stop]`      | —            | `batch_id`           |

### Circuit breaker events

| Event                                 | Measurements | Metadata                      |
| ------------------------------------- | ------------ | ----------------------------- |
| `[:arrea, :circuit_breaker, :open]`   | —            | `breaker_id`                  |
| `[:arrea, :circuit_breaker, :closed]` | —            | `breaker_id`                  |
| `[:arrea, :circuit_breaker, :trip]`   | —            | `breaker_id`, `failure_count` |

### Communication events

| Event                                         | Measurements | Metadata |
| --------------------------------------------- | ------------ | -------- |
| `[:arrea, :communication, :message_sent]`     | —            | —        |
| `[:arrea, :communication, :message_received]` | —            | —        |
| `[:arrea, :communication, :error]`            | —            | —        |
| `[:arrea, :communication, :retry]`            | —            | —        |

### UI events (CLI / alaja components)

| Event                     | Measurements | Metadata  |
| ------------------------- | ------------ | --------- |
| `[:arrea, :ui, :render]`  | —            | —         |
| `[:arrea, :ui, :keypress]`| —            | —         |
| `[:arrea, :ui, :focus_change]` | —       | —         |

### Validation / Execution / System events

| Event                              | Measurements | Metadata |
| ---------------------------------- | ------------ | -------- |
| `[:arrea, :validation, :passed]`   | —            | —        |
| `[:arrea, :validation, :failed]`   | —            | —        |
| `[:arrea, :execution, :started]`   | —            | —        |
| `[:arrea, :execution, :completed]` | —            | —        |
| `[:arrea, :execution, :failed]`    | —            | —        |
| `[:arrea, :system, :started]`      | —            | —        |
| `[:arrea, :system, :stopped]`      | —            | —        |

### Attaching a custom handler

```elixir
:telemetry.attach(
  "my-handler",
  [:arrea, :worker, :completed],
  fn _event, measurements, metadata, _config ->
    IO.puts("Worker #{metadata.worker_id} finished in #{measurements.duration}ms")
  end,
  nil
)
```

### Built-in metrics and debug

```elixir
# Setup built-in ETS metrics (worker/task/circuit breaker counters)
Arrea.Telemetry.setup()

# Get current metrics snapshot
Arrea.Telemetry.get_current()

# Attach debug handler for development
Arrea.Telemetry.attach()

# Measure a function with telemetry
Arrea.Telemetry.measure(fn -> do_work() end, metadata: %{tag: "batch-1"})
```

## Policies

Arrea provides configurable error policies for workers:

```elixir
# Default policy (retry 3 times with 1s delay)
policy = Arrea.Policies.default()

# Strict policy (stop on first error)
policy = Arrea.Policies.strict()

# Tolerant policy (retry up to 10 times with 2s delay)
policy = Arrea.Policies.tolerant(max_retries: 10, retry_delay: 2000)

# Custom handler
policy = Arrea.Policies.custom(fn error, retry_count, context ->
  if retry_count < 5, do: :retry, else: :stop
end)
```

Workers without an explicit policy fall back to `Arrea.Config.get(:default_policy)`, which defaults to `:retry`.

Policy maps support the following fields:

| Field         | Type                                                | Default  | Description                |
| ------------- | --------------------------------------------------- | -------- | -------------------------- |
| `on_error`    | `:retry \| :stop \| :continue \| function`          | `:retry` | Action on task error       |
| `on_warning`  | `:log \| :notify \| :continue \| :promote_to_error` | `:log`   | Action on warning          |
| `on_timeout`  | `:retry \| :stop \| :continue`                      | `:retry` | Action on timeout          |
| `max_retries` | integer                                             | `3`      | Maximum retry attempts     |
| `retry_delay` | integer                                             | `1000`   | Delay between retries (ms) |
| `timeout`     | integer                                             | `30000`  | Per-task timeout (ms)      |

## Command Validation

Arrea validates all shell commands before execution, blocking dangerous patterns:

```elixir
iex> Arrea.Validation.Validator.validate_command("echo hello")
{:ok, "echo hello"}

iex> Arrea.Validation.Validator.validate_command("rm -rf /")
{:error, {:dangerous_command, "rm -rf"}}

iex> Arrea.Validation.Validator.validate_command("$(whoami)")
{:error, :possible_injection}
```

## Inter-worker Messaging

Workers can send messages to each other:

```elixir
# Structured message
Arrea.Worker.send_message(:worker_1, %{type: :ping})

# Route a message to another worker
Arrea.Worker.send_message(:worker_1, {:send_to_worker, :worker_2, %{type: :data, value: 42}})
```

## Dependencies

- **alaja** — Internal UI/CLI utility library (powers the Arrea CLI)
- **jason** — JSON encoding/decoding
- **telemetry** — Event emission and handling
- **telemetry_metrics** — Metric definitions
- **telemetry_poller** — Periodic metric collection

## Installation

Add `arrea` to your `mix.exs` dependencies:

```elixir
def deps do
  [
    {:arrea, "~> 0.1.0"}
  ]
end
```

Then run:

```bash
mix deps.get
```

## License

MIT License. See the [source repository](https://github.com/lorenzo-sf/arrea) for details.