README.md

# Snakepit

<div align="center">
  <img src="assets/snakepit-logo.svg" alt="Snakepit Logo" width="200" height="200">
</div>

> A high-performance, generalized process pooler and session manager for external language integrations in Elixir

[![Hex Version](https://img.shields.io/hexpm/v/snakepit.svg)](https://hex.pm/packages/snakepit)
[![Hex Docs](https://img.shields.io/badge/hex-docs-lightgreen.svg)](https://hexdocs.pm/snakepit)
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Elixir](https://img.shields.io/badge/Elixir-1.18%2B-purple.svg)](https://elixir-lang.org)

## Features

- **High-performance process pooling** with concurrent worker initialization
- **Session affinity** for stateful operations across requests
- **gRPC streaming** for real-time progress updates and large data transfers
- **Bidirectional tool bridge** allowing Python to call Elixir functions and vice versa
- **Production-ready process management** with automatic orphan cleanup
- **Hardware detection** for ML accelerators (CUDA, MPS, ROCm)
- **Fault tolerance** with circuit breakers, retry policies, and crash barriers
- **Comprehensive telemetry** with OpenTelemetry support
- **Dual worker profiles** (process isolation or threaded parallelism)
- **Zero-copy data interop** via DLPack and Arrow

## Installation

Add `snakepit` to your dependencies in `mix.exs`:

```elixir
def deps do
  [
    {:snakepit, "~> 0.8.3"}
  ]
end
```

Then run:

```bash
mix deps.get
mix snakepit.setup    # Install Python dependencies and generate gRPC stubs
mix snakepit.doctor   # Verify environment is correctly configured
```

## Quick Start

```elixir
# Execute a command on any available worker
{:ok, result} = Snakepit.execute("ping", %{})

# Execute with session affinity (same worker for related requests)
{:ok, result} = Snakepit.execute_in_session("session_123", "process_data", %{input: data})

# Stream results for long-running operations
Snakepit.execute_stream("batch_process", %{items: items}, fn chunk ->
  IO.puts("Progress: #{chunk["progress"]}%")
end)
```

## Configuration

### Simple Configuration

```elixir
# config/config.exs
config :snakepit,
  pooling_enabled: true,
  adapter_module: Snakepit.Adapters.GRPCPython,
  adapter_args: ["--adapter", "your_adapter_module"],
  pool_size: 10,
  log_level: :error
```

### Multi-Pool Configuration (v0.6+)

```elixir
config :snakepit,
  pools: [
    %{
      name: :default,
      worker_profile: :process,
      pool_size: 10,
      adapter_module: Snakepit.Adapters.GRPCPython,
      adapter_args: ["--adapter", "my_app.adapters.MainAdapter"]
    },
    %{
      name: :compute,
      worker_profile: :thread,
      pool_size: 4,
      threads_per_worker: 8,
      adapter_args: ["--adapter", "my_app.adapters.ComputeAdapter"]
    }
  ]
```

### Logging Configuration

Snakepit is silent by default (errors only):

```elixir
config :snakepit, log_level: :error          # Default - errors only
config :snakepit, log_level: :info           # Include info messages
config :snakepit, log_level: :debug          # Verbose debugging
config :snakepit, log_level: :none           # Complete silence

# Filter to specific categories
config :snakepit, log_level: :debug, log_categories: [:grpc, :pool]
```

## Core API

### Basic Execution

```elixir
# Simple command execution
{:ok, result} = Snakepit.execute("command_name", %{param: "value"})

# With timeout
{:ok, result} = Snakepit.execute("slow_command", %{}, timeout: 30_000)

# Target specific pool
{:ok, result} = Snakepit.execute("ml_inference", %{}, pool: :compute)
```

### Session Affinity

Sessions route related requests to the same worker, enabling stateful operations:

```elixir
session_id = "user_#{user.id}"

# First call establishes worker affinity
{:ok, _} = Snakepit.execute_in_session(session_id, "load_model", %{model: "gpt-4"})

# Subsequent calls go to the same worker
{:ok, result} = Snakepit.execute_in_session(session_id, "generate", %{prompt: "Hello"})
{:ok, result} = Snakepit.execute_in_session(session_id, "generate", %{prompt: "Continue"})
```

### Streaming Operations

```elixir
# Stream with callback for progress updates
Snakepit.execute_stream("train_model", %{epochs: 100}, fn chunk ->
  case chunk do
    %{"type" => "progress", "epoch" => n, "loss" => loss} ->
      IO.puts("Epoch #{n}: loss=#{loss}")
    %{"type" => "complete", "model_path" => path} ->
      IO.puts("Training complete: #{path}")
  end
end)

# Stream with session affinity
Snakepit.execute_in_session_stream(session_id, "process_batch", %{}, fn chunk ->
  handle_chunk(chunk)
end)
```

### Pool Statistics

```elixir
# Get pool statistics
stats = Snakepit.get_stats()
# => %{requests: 1523, errors: 2, queued: 0, queue_timeouts: 0}

# List worker IDs
workers = Snakepit.list_workers()
# => ["worker_1", "worker_2", "worker_3"]

# Wait for pool to be ready (useful in tests)
:ok = Snakepit.Pool.await_ready(:default, 10_000)
```

## Worker Profiles

### Process Profile (Default)

One Python process per worker. Full process isolation, works with all Python versions.

```elixir
%{
  name: :default,
  worker_profile: :process,
  pool_size: 10,
  startup_batch_size: 4,      # Spawn 4 workers at a time
  startup_batch_delay_ms: 500 # Wait between batches
}
```

Best for: I/O-bound workloads, maximum isolation, Python < 3.13.

### Thread Profile (Python 3.13+)

Multiple threads within each Python process. Shared memory, true parallelism with free-threaded Python.

```elixir
%{
  name: :compute,
  worker_profile: :thread,
  pool_size: 4,
  threads_per_worker: 8,
  thread_safety_checks: true
}
```

Best for: CPU-bound ML workloads, large shared models, Python 3.13+ with free-threading.

## Hardware Detection

Snakepit detects available ML accelerators for intelligent device selection:

```elixir
# Detect all hardware
info = Snakepit.Hardware.detect()
# => %{accelerator: :cuda, cpu: %{cores: 8, ...}, cuda: %{devices: [...]}}

# Check capabilities
caps = Snakepit.Hardware.capabilities()
# => %{cuda: true, mps: false, rocm: false, avx2: true}

# Select device with fallback chain
{:ok, device} = Snakepit.Hardware.select_with_fallback([:cuda, :mps, :cpu])
# => {:ok, {:cuda, 0}}

# Generate hardware identity for lock files
identity = Snakepit.Hardware.identity()
File.write!("hardware.lock", Jason.encode!(identity))
```

Supported accelerators: CPU (with AVX/AVX2/AVX-512 detection), NVIDIA CUDA, Apple MPS, AMD ROCm.

## Fault Tolerance

### Circuit Breaker

Prevents cascading failures by temporarily blocking requests after repeated failures:

```elixir
{:ok, cb} = Snakepit.CircuitBreaker.start_link(
  failure_threshold: 5,
  reset_timeout_ms: 30_000
)

# Execute through circuit breaker
case Snakepit.CircuitBreaker.call(cb, fn -> external_api_call() end) do
  {:ok, result} -> handle_result(result)
  {:error, :circuit_open} -> {:error, :service_unavailable}
  {:error, reason} -> {:error, reason}
end

# Check state
Snakepit.CircuitBreaker.state(cb)  # => :closed | :open | :half_open
```

### Retry Policies

```elixir
policy = Snakepit.RetryPolicy.new(
  max_attempts: 4,
  backoff_ms: [100, 200, 400, 800],
  jitter: true,
  retriable_errors: [:timeout, :unavailable]
)

# Use with Executor
Snakepit.Executor.execute_with_retry(
  fn -> flaky_operation() end,
  max_attempts: 3,
  backoff_ms: [100, 200, 400]
)
```

### Health Monitoring

```elixir
{:ok, hm} = Snakepit.HealthMonitor.start_link(
  pool: :default,
  max_crashes: 5,
  crash_window_ms: 60_000
)

Snakepit.HealthMonitor.healthy?(hm)  # => true | false
Snakepit.HealthMonitor.stats(hm)     # => %{total_crashes: 2, ...}
```

### Executor Helpers

```elixir
# With timeout
Snakepit.Executor.execute_with_timeout(fn -> slow_op() end, timeout_ms: 5000)

# With retry
Snakepit.Executor.execute_with_retry(fn -> flaky_op() end, max_attempts: 3)

# Combined retry + circuit breaker
Snakepit.Executor.execute_with_protection(circuit_breaker, fn ->
  risky_operation()
end, max_attempts: 3)

# Batch execution
Snakepit.Executor.execute_batch(
  [fn -> op1() end, fn -> op2() end, fn -> op3() end],
  max_concurrency: 2
)
```

## Python Adapters

### Creating an Adapter

```python
# my_adapter.py
from snakepit_bridge import BaseAdapter, tool

class MyAdapter(BaseAdapter):
    def __init__(self):
        super().__init__()
        self.model = None

    async def initialize(self):
        """Called once when worker starts."""
        self.model = load_model()

    async def cleanup(self):
        """Called when worker shuts down."""
        self.model = None

    @tool(description="Run inference on input data")
    def predict(self, input_data: dict) -> dict:
        result = self.model.predict(input_data["text"])
        return {"prediction": result, "confidence": 0.95}

    @tool(description="Process with progress updates", supports_streaming=True)
    def batch_process(self, items: list):
        for i, item in enumerate(items):
            result = self.process_item(item)
            yield {"progress": (i + 1) / len(items) * 100, "result": result}
```

### Thread-Safe Adapters (Python 3.13+)

```python
from snakepit_bridge import ThreadSafeAdapter, tool, thread_safe_method

class ThreadedAdapter(ThreadSafeAdapter):
    __thread_safe__ = True

    def __init__(self):
        super().__init__()
        self.shared_model = None  # Read-only after init

    async def initialize(self):
        self.shared_model = load_model()  # Safe: happens before requests

    @tool
    @thread_safe_method
    def predict(self, text: str) -> dict:
        # Thread-local cache
        cache = self.get_thread_local("cache", default={})
        if text in cache:
            return cache[text]

        result = self.shared_model.predict(text)
        cache[text] = result
        self.set_thread_local("cache", cache)
        return result

    @tool
    @thread_safe_method
    def update_config(self, config: dict):
        # Protect shared mutable state
        with self.acquire_lock():
            self.config.update(config)
```

### Session Context and Elixir Tool Calls

```python
@tool(description="Process using Elixir tools")
def hybrid_process(self, data: dict) -> dict:
    # Access session information
    session_id = self.session_context.session_id

    # Call a registered Elixir tool
    validation = self.session_context.call_elixir_tool(
        "validate_data",
        {"data": data, "schema": "user"}
    )

    if validation["valid"]:
        return self.process(data)
    else:
        return {"error": validation["errors"]}
```

## Bidirectional Tool Bridge

### Registering Elixir Tools

```elixir
# Register an Elixir function callable from Python
Snakepit.Bridge.ToolRegistry.register_elixir_tool(
  session_id,
  "calculate_hash",
  fn params ->
    hash = :crypto.hash(:sha256, params["data"]) |> Base.encode16()
    %{hash: hash, algorithm: "sha256"}
  end,
  %{
    description: "Calculate SHA256 hash of data",
    exposed_to_python: true
  }
)
```

### Calling from Python

```python
# Python adapter can call registered Elixir tools
result = self.session_context.call_elixir_tool("calculate_hash", {"data": "hello"})
# => {"hash": "2CF24DBA5FB0A30E...", "algorithm": "sha256"}

# Or use the proxy
hash_tool = self.session_context.elixir_tools["calculate_hash"]
result = hash_tool(data="hello")
```

## Telemetry & Observability

### Attaching Handlers

```elixir
# Worker lifecycle events
:telemetry.attach("worker-spawned", [:snakepit, :pool, :worker, :spawned], fn
  _event, %{duration: duration}, %{worker_id: id}, _config ->
    Logger.info("Worker #{id} spawned in #{duration}ms")
end, nil)

# Python execution events
:telemetry.attach("python-call", [:snakepit, :grpc_worker, :execute, :stop], fn
  _event, %{duration_ms: ms}, %{command: cmd}, _config ->
    Logger.info("#{cmd} completed in #{ms}ms")
end, nil)
```

### Python Telemetry API

```python
from snakepit_bridge import telemetry

# Emit custom events
telemetry.emit(
    "model.inference",
    measurements={"latency_ms": 45, "tokens": 128},
    metadata={"model": "gpt-4", "batch_size": 1}
)

# Automatic timing with spans
with telemetry.span("data_processing", {"stage": "preprocessing"}):
    processed = preprocess(data)  # Automatically timed
```

### OpenTelemetry Integration

```elixir
config :snakepit, :opentelemetry, %{enabled: true}
```

## Process Management

### Automatic Cleanup

Snakepit automatically tracks and cleans up Python processes:

- Each BEAM run gets a unique run ID stored in process registry
- On application restart, orphaned processes from previous runs are killed
- Graceful shutdown sends SIGTERM, followed by SIGKILL if needed

### Manual Cleanup

```elixir
# Force cleanup of all worker processes
Snakepit.cleanup()
```

### Script Mode

For Mix tasks and scripts, use `run_as_script/2` for automatic lifecycle management:

```elixir
Snakepit.run_as_script(fn ->
  {:ok, result} = Snakepit.execute("process_data", %{})
  IO.inspect(result)
end, halt: true)
```

## Mix Tasks

| Task | Description |
|------|-------------|
| `mix snakepit.setup` | Install Python dependencies and generate gRPC stubs |
| `mix snakepit.doctor` | Verify environment is correctly configured |
| `mix snakepit.status` | Show pool status and worker information |
| `mix snakepit.gen.adapter NAME` | Generate adapter scaffolding |

## Examples

The `examples/` directory contains working demonstrations:

| Example | Description |
|---------|-------------|
| `grpc_basic.exs` | Basic execute, ping, echo, add operations |
| `grpc_sessions.exs` | Session affinity and isolation |
| `grpc_streaming.exs` | Pool scaling and concurrent execution |
| `hardware_detection.exs` | Hardware detection for ML workloads |
| `crash_recovery.exs` | Circuit breaker, retry, health monitoring |
| `bidirectional_tools_demo.exs` | Cross-language tool calls |
| `telemetry_basic.exs` | Telemetry event handling |
| `threaded_profile_demo.exs` | Python 3.13+ thread profile |

Run examples:

```bash
mix run examples/grpc_basic.exs
mix run examples/hardware_detection.exs
```

## Documentation

| Guide | Description |
|-------|-------------|
| [Getting Started](guides/getting-started.md) | Installation and first steps |
| [Configuration](guides/configuration.md) | All configuration options |
| [Worker Profiles](guides/worker-profiles.md) | Process vs thread profiles |
| [Hardware Detection](guides/hardware-detection.md) | ML accelerator detection |
| [Fault Tolerance](guides/fault-tolerance.md) | Circuit breaker, retry, health |
| [Streaming](guides/streaming.md) | gRPC streaming operations |
| [Python Adapters](guides/python-adapters.md) | Writing Python adapters |
| [Observability](guides/observability.md) | Telemetry and logging |
| [Production](guides/production.md) | Deployment and troubleshooting |

## Requirements

- Elixir 1.18+
- Erlang/OTP 27+
- Python 3.9+ (3.13+ for thread profile)
- gRPC Python packages (`grpcio`, `grpcio-tools`)

## License

MIT License - see [LICENSE](LICENSE) for details.