README.md

# Snakepit 🐍

<div align="center">
  <img src="assets/snakepit-logo.svg" alt="Snakepit Logo" width="200" height="200">
</div>

> A high-performance, generalized process pooler and session manager for external language integrations in Elixir

[![CI](https://github.com/nshkrdotcom/snakepit/actions/workflows/ci.yml/badge.svg)](https://github.com/nshkrdotcom/snakepit/actions/workflows/ci.yml)
[![Hex Version](https://img.shields.io/hexpm/v/snakepit.svg)](https://hex.pm/packages/snakepit)
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)

## 🚀 What is Snakepit?

Snakepit is a battle-tested Elixir library that provides a robust pooling system for managing external processes (Python, Node.js, Ruby, R, etc.). Born from the need for reliable ML/AI integrations, it offers:

- **Lightning-fast concurrent initialization** - 1000x faster than sequential approaches
- **Session-based execution** with automatic worker affinity
- **Multiple communication protocols** - stdin/stdout, MessagePack, and gRPC streaming
- **Native streaming support** - Real-time progress updates and progressive results (gRPC)
- **Adapter pattern** for any external language/runtime
- **Built on OTP primitives** - DynamicSupervisor, Registry, GenServer
- **Production-ready** with telemetry, health checks, and graceful shutdowns

## 🆕 What's New in v0.3.0

### 🌊 **Native gRPC Streaming**
- **Real-time progress updates** for long-running operations
- **HTTP/2 multiplexing** for concurrent requests
- **Cancellable operations** with graceful stream termination
- **Built-in health checks** and rich error handling
- **Drop-in replacement** for existing JSON/MessagePack APIs

### ðŸ“Ķ **Enhanced MessagePack Support**
- **55x faster binary transfers** compared to JSON with base64
- **Automatic protocol negotiation** with graceful fallbacks
- **Native binary data handling** perfect for ML models and images
- **18-36% smaller message sizes** for improved performance

### 🐍 **Python Bridge V2 Architecture**
- **Production-ready packaging** with pip install support
- **Enhanced error handling** and robust shutdown management
- **Console script integration** for deployment flexibility
- **Type checking support** with proper py.typed markers

### 🔄 **Bridge Migration & Compatibility**
- **Deprecated V1 Python bridge** in favor of V2 architecture
- **Updated demo implementations** using latest best practices
- **Comprehensive documentation** for all bridge implementations
- **Backward compatibility** maintained for existing integrations

## 📋 Table of Contents

- [What's New in v0.3.0](#-whats-new-in-v030)
- [Quick Start](#-quick-start)
- [Installation](#-installation)
- [Core Concepts](#-core-concepts)
- [Configuration](#-configuration)
- [Usage Examples](#-usage-examples)
- [gRPC Streaming](#-grpc-streaming)
- [Python Bridges](#-python-bridges)
- [Built-in Adapters](#-built-in-adapters)
- [Creating Custom Adapters](#-creating-custom-adapters)
- [Session Management](#-session-management)
- [Monitoring & Telemetry](#-monitoring--telemetry)
- [Architecture Deep Dive](#-architecture-deep-dive)
- [Performance](#-performance)
- [Troubleshooting](#-troubleshooting)
- [Contributing](#-contributing)

## 🏃 Quick Start

```elixir
# In your mix.exs
def deps do
  [
    {:snakepit, "~> 0.3.0"}
  ]
end

# Configure with MessagePack for high performance
Application.put_env(:snakepit, :pooling_enabled, true)
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonV2)
Application.put_env(:snakepit, :wire_protocol, :auto)  # Auto-negotiates best protocol
Application.put_env(:snakepit, :pool_config, %{pool_size: 4})

{:ok, _} = Application.ensure_all_started(:snakepit)

# Execute commands with automatic protocol optimization
{:ok, result} = Snakepit.execute("ping", %{test: true})
{:ok, result} = Snakepit.execute("compute", %{operation: "add", a: 5, b: 3})

# Session-based execution (maintains state)
{:ok, result} = Snakepit.execute_in_session("user_123", "echo", %{message: "hello"})

# Binary data works natively with MessagePack (55x faster than JSON)
{:ok, result} = Snakepit.execute("echo", %{binary: :crypto.strong_rand_bytes(1024)})
```

## ðŸ“Ķ Installation

### Hex Package

```elixir
def deps do
  [
    {:snakepit, "~> 0.3.0"}
  ]
end
```

### GitHub (Latest)

```elixir
def deps do
  [
    {:snakepit, github: "nshkrdotcom/snakepit"}
  ]
end
```

### Requirements

- Elixir 1.18+
- Erlang/OTP 27+
- External runtime (Python 3.8+, Node.js 16+, etc.) depending on adapter

## ðŸŽŊ Core Concepts

### 1. **Adapters**
Adapters define how Snakepit communicates with external processes. They specify:
- The runtime executable (python3, node, ruby, etc.)
- The bridge script to execute
- Supported commands and validation
- Request/response transformations

### 2. **Workers**
Each worker is a GenServer that:
- Owns one external process via Erlang Port
- Handles request/response communication
- Manages health checks and metrics
- Auto-restarts on crashes

### 3. **Pool**
The pool manager:
- Starts workers concurrently on initialization
- Routes requests to available workers
- Handles queueing when all workers are busy
- Supports session affinity for stateful operations

### 4. **Sessions**
Sessions provide:
- State persistence across requests
- Worker affinity (same session prefers same worker)
- TTL-based expiration
- Centralized storage in ETS

## ⚙ïļ Configuration

### Basic Configuration

```elixir
# config/config.exs
config :snakepit,
  pooling_enabled: true,
  adapter_module: Snakepit.Adapters.GenericPythonV2,
  wire_protocol: :auto,  # :json, :msgpack, or :auto (recommended)
  pool_config: %{
    pool_size: 8  # Default: System.schedulers_online() * 2
  }
```

### Wire Protocol Options

Snakepit supports both JSON and MessagePack wire protocols for optimal performance:

```elixir
# Default (no configuration) - uses JSON protocol for backwards compatibility
# Existing applications continue working without any changes

# Auto-negotiation (recommended) - selects best available protocol
config :snakepit, wire_protocol: :auto

# Force JSON protocol
config :snakepit, wire_protocol: :json

# Force MessagePack protocol (requires msgpack Python package)
config :snakepit, wire_protocol: :msgpack
```

**⚠ïļ No Breaking Changes**: Existing applications work unchanged. If you don't specify `wire_protocol`, Snakepit defaults to JSON for full backwards compatibility. MessagePack is an opt-in performance enhancement.

### Advanced Configuration

```elixir
config :snakepit,
  # Pool settings
  pooling_enabled: true,
  pool_config: %{
    pool_size: 16
  },
  
  # Adapter
  adapter_module: MyApp.CustomAdapter,
  
  # Timeouts (milliseconds)
  pool_startup_timeout: 10_000,      # Max time for worker initialization
  pool_queue_timeout: 5_000,         # Max time in request queue
  worker_init_timeout: 20_000,       # Max time for worker to respond to init
  worker_health_check_interval: 30_000,  # Health check frequency
  worker_shutdown_grace_period: 2_000,   # Grace period for shutdown
  
  # Cleanup settings
  cleanup_retry_interval: 100,       # Retry interval for cleanup
  cleanup_max_retries: 10,          # Max cleanup retries
  
  # Queue management
  pool_max_queue_size: 1000         # Max queued requests before rejection
```

### Runtime Configuration

```elixir
# Override configuration at runtime
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericJavaScript)
Application.stop(:snakepit)
Application.start(:snakepit)
```

## 📖 Usage Examples

All examples are available in the `examples/` directory. Run them directly:

```bash
# Basic examples
elixir examples/non_session_demo_json.exs
elixir examples/session_based_demo_json.exs

# MessagePack examples (high performance)
elixir examples/non_session_demo_msgpack.exs
elixir examples/session_based_demo_msgpack.exs

# V2 Bridge examples (production ready)
elixir examples/v2/non_session_demo.exs
elixir examples/v2/session_based_demo.exs

# gRPC examples (streaming support)
elixir examples/grpc_non_streaming_demo.exs
elixir examples/grpc_streaming_demo.exs

# JavaScript examples
elixir examples/javascript_stateless_demo_json.exs
elixir examples/javascript_session_demo_msgpack.exs

# Enhanced Python Bridge examples
elixir examples/enhanced/basic_usage.exs
elixir examples/enhanced/dspy_integration.exs
elixir examples/enhanced/data_science_workflow.exs
```

### Basic Stateless Execution

```elixir
# Simple computation
{:ok, %{"result" => 8}} = Snakepit.execute("compute", %{
  operation: "add",
  a: 5,
  b: 3
})

# With timeout
{:ok, result} = Snakepit.execute("long_running_task", %{data: "..."}, timeout: 60_000)

# Error handling
case Snakepit.execute("risky_operation", %{}) do
  {:ok, result} -> handle_success(result)
  {:error, :worker_timeout} -> handle_timeout()
  {:error, :pool_saturated} -> handle_overload()
  {:error, reason} -> handle_error(reason)
end
```

### Session-Based Execution

```elixir
# Create a session and maintain state
session_id = "user_#{user.id}"

# First request - initializes session
{:ok, _} = Snakepit.execute_in_session(session_id, "initialize", %{
  user_id: user.id,
  preferences: user.preferences
})

# Subsequent requests use same worker when possible
{:ok, recommendations} = Snakepit.execute_in_session(session_id, "get_recommendations", %{
  category: "books"
})

# Session data persists across requests
{:ok, history} = Snakepit.execute_in_session(session_id, "get_history", %{})
```

### ML/AI Workflow Example

```elixir
# Using SessionHelpers for ML program management
alias Snakepit.SessionHelpers

# Create an ML program/model
{:ok, response} = SessionHelpers.execute_program_command(
  "ml_session_123",
  "create_program",
  %{
    signature: "question -> answer",
    model: "gpt-3.5-turbo",
    temperature: 0.7
  }
)

program_id = response["program_id"]

# Execute the program multiple times
{:ok, result} = SessionHelpers.execute_program_command(
  "ml_session_123", 
  "execute_program",
  %{
    program_id: program_id,
    input: %{question: "What is the capital of France?"}
  }
)
```

### High-Performance Binary Processing with MessagePack

```elixir
# Configure MessagePack for binary-intensive workloads
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonV2)
Application.put_env(:snakepit, :wire_protocol, :auto)

# Process binary data natively (no base64 encoding)
image_data = File.read!("large_image.jpg")  # 5MB image
model_weights = :crypto.strong_rand_bytes(1024 * 1024)  # 1MB binary

{:ok, result} = Snakepit.execute("process_image", %{
  image: image_data,           # Transferred 55x faster than JSON
  weights: model_weights,      # Native binary support
  config: %{model: "resnet50", threshold: 0.8}
})

# Compare protocols programmatically
Application.put_env(:snakepit, :wire_protocol, :json)
{time_json, _} = :timer.tc(fn -> 
  Snakepit.execute("echo", %{data: large_binary}) 
end)

Application.put_env(:snakepit, :wire_protocol, :msgpack)  
{time_msgpack, _} = :timer.tc(fn -> 
  Snakepit.execute("echo", %{data: large_binary}) 
end)

IO.puts("JSON: #{time_json}Ξs, MessagePack: #{time_msgpack}Ξs")
IO.puts("Speedup: #{Float.round(time_json / time_msgpack, 1)}x")
```

### Parallel Processing

```elixir
# Process multiple items in parallel across the pool
items = ["item1", "item2", "item3", "item4", "item5"]

tasks = Enum.map(items, fn item ->
  Task.async(fn ->
    Snakepit.execute("process_item", %{item: item})
  end)
end)

results = Task.await_many(tasks, 30_000)
```

## 🌊 gRPC Communication

Snakepit supports modern gRPC-based communication for advanced streaming capabilities, real-time progress updates, and superior performance.

### 🚀 **Upgrading from JSON/MessagePack to gRPC**

**Already have Snakepit working with JSON?** Great! Here's how to upgrade to gRPC for streaming and better performance:

#### Current Setup (JSON/MessagePack):
```elixir
# Your existing configuration
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonV2)
# V2 adapter with auto-negotiation (recommended)

# Your existing API calls work exactly the same
{:ok, result} = Snakepit.execute("ping", %{})
{:ok, result} = Snakepit.execute("compute", %{operation: "add", a: 5, b: 3})
```

#### Upgrade to gRPC (3 Steps):
```bash
# Step 1: Install gRPC dependencies
make install-grpc

# Step 2: Generate protocol buffer code
make proto-python

# Step 3: Test the upgrade
elixir examples/grpc_non_streaming_demo.exs
```

#### New Configuration (gRPC):
```elixir
# Replace your adapter configuration with this:
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{
  base_port: 50051,
  port_range: 100
})

# ✅ ALL your existing API calls work EXACTLY the same
{:ok, result} = Snakepit.execute("ping", %{})
{:ok, result} = Snakepit.execute("compute", %{operation: "add", a: 5, b: 3})

# 🆕 PLUS you get new streaming capabilities
Snakepit.execute_stream("batch_inference", %{
  batch_items: ["image1.jpg", "image2.jpg", "image3.jpg"]
}, fn chunk ->
  IO.puts("Processed: #{chunk["item"]} - #{chunk["confidence"]}")
end)
```

### 📋 **gRPC vs Traditional Protocols**

| Feature | JSON/stdin | MessagePack | gRPC Non-Streaming | gRPC Streaming |
|---------|------------|-------------|-------------------|----------------|
| **Backward Compatible** | ✅ | ✅ | ✅ **Same API** | ✅ **Same API + New** |
| **Streaming** | ❌ | ❌ | ❌ | ✅ **Real-time** |
| **HTTP/2 Multiplexing** | ❌ | ❌ | ✅ | ✅ |
| **Progress Updates** | ❌ | ❌ | ❌ | ✅ **Live Updates** |
| **Health Checks** | Manual | Manual | ✅ Built-in | ✅ Built-in |
| **Error Handling** | Custom | Custom | ✅ Rich Status | ✅ Rich Status |

### ðŸŽŊ **Two gRPC Modes Explained**

#### **Mode 1: gRPC Non-Streaming** 
**Use this for:** Drop-in replacement of JSON/MessagePack with better performance
```elixir
# IDENTICAL API to your existing JSON/MessagePack code
{:ok, result} = Snakepit.execute("ping", %{})
{:ok, result} = Snakepit.execute("compute", %{operation: "multiply", a: 10, b: 5})
{:ok, result} = Snakepit.execute("info", %{})

# Session support works exactly the same
{:ok, result} = Snakepit.execute_in_session("user_123", "echo", %{message: "hello"})
```

**When to use:**
- ✅ You want better performance without changing your code
- ✅ Your operations complete quickly (< 30 seconds)
- ✅ You don't need progress updates
- ✅ Drop-in replacement for existing JSON/MessagePack

#### **Mode 2: gRPC Streaming** 
**Use this for:** Long-running operations with real-time progress updates
```elixir
# NEW streaming API - get results as they complete
Snakepit.execute_stream("batch_inference", %{
  batch_items: ["img1.jpg", "img2.jpg", "img3.jpg"]
}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("✅ All done!")
  else
    IO.puts("🧠 Processed: #{chunk["item"]} - #{chunk["confidence"]}")
  end
end)

# Session-based streaming also available
Snakepit.execute_in_session_stream("session_123", "process_large_dataset", %{
  file_path: "/data/huge_file.csv"
}, fn chunk ->
  IO.puts("📊 Progress: #{chunk["progress_percent"]}%")
end)
```

**When to use:**
- ✅ Long-running operations (ML training, data processing)
- ✅ You want real-time progress updates
- ✅ Processing large datasets or batches
- ✅ Better user experience with live feedback

### 🛠ïļ **Setup Instructions**

#### Install gRPC Dependencies
```bash
# Install gRPC dependencies
make install-grpc

# Generate protocol buffer code
make proto-python

# Verify with non-streaming demo (same as your existing API)
elixir examples/grpc_non_streaming_demo.exs

# Try new streaming capabilities
elixir examples/grpc_streaming_demo.exs
```

### 📝 **Complete Examples**

#### **Non-Streaming Examples (Standard API)**
```elixir
# Configure gRPC (replaces your JSON/MessagePack config)
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{base_port: 50051, port_range: 100})

# All your existing code works unchanged
{:ok, result} = Snakepit.execute("ping", %{})
{:ok, result} = Snakepit.execute("compute", %{operation: "add", a: 5, b: 3})
{:ok, result} = Snakepit.execute("info", %{})

# Sessions work exactly the same
{:ok, result} = Snakepit.execute_in_session("session_123", "echo", %{message: "hello"})

# Try it: elixir examples/grpc_non_streaming_demo.exs
```

#### **Streaming Examples (New Capability)**

**ML Batch Inference with Real-time Progress:**
```elixir
# Process multiple items, get results as each completes
Snakepit.execute_stream("batch_inference", %{
  model_path: "/models/resnet50.pkl",
  batch_items: ["img1.jpg", "img2.jpg", "img3.jpg"]
}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("✅ All #{chunk["total_processed"]} items complete!")
  else
    IO.puts("🧠 #{chunk["item"]}: #{chunk["prediction"]} (#{chunk["confidence"]})")
  end
end)
```

**Large Dataset Processing with Progress:**
```elixir
# Process huge datasets, see progress in real-time
Snakepit.execute_stream("process_large_dataset", %{
  file_path: "/data/huge_dataset.csv",
  chunk_size: 5000
}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("🎉 Processing complete: #{chunk["final_stats"]}")
  else
    progress = chunk["progress_percent"]
    IO.puts("📊 Progress: #{progress}% (#{chunk["processed_rows"]}/#{chunk["total_rows"]})")
  end
end)
```

**Session-based Streaming:**
```elixir
# Streaming with session state
session_id = "ml_training_#{user_id}"

Snakepit.execute_in_session_stream(session_id, "distributed_training", %{
  model_config: training_config,
  dataset_path: "/data/training_set"
}, fn chunk ->
  if chunk["is_final"] do
    model_path = chunk["final_model_path"]
    IO.puts("ðŸŽŊ Training complete! Model saved: #{model_path}")
  else
    epoch = chunk["epoch"]
    loss = chunk["train_loss"]
    acc = chunk["val_acc"]
    IO.puts("📈 Epoch #{epoch}: loss=#{loss}, acc=#{acc}")
  end
end)

# Try it: elixir examples/grpc_streaming_demo.exs
```

### 🚀 **Performance & Benefits**

#### **Why Upgrade to gRPC?**

**gRPC Non-Streaming vs JSON/MessagePack:**
- ✅ **Better performance**: HTTP/2 multiplexing, protocol buffers
- ✅ **Built-in health checks**: Automatic worker monitoring
- ✅ **Rich error handling**: Detailed gRPC status codes
- ✅ **Zero code changes**: Drop-in replacement

**gRPC Streaming vs Traditional (All Protocols):**
- ✅ **Progressive results**: Get updates as work completes
- ✅ **Constant memory**: Process unlimited data without memory growth
- ✅ **Real-time feedback**: Users see progress immediately
- ✅ **Cancellable operations**: Stop long-running tasks mid-stream
- ✅ **Better UX**: No more "is it still working?" uncertainty

#### **Performance Comparison:**
```
Traditional (blocking):  Submit → Wait 10 minutes → Get all results
gRPC Non-streaming:     Submit → Get result faster (better protocol)
gRPC Streaming:         Submit → Get result 1 → Get result 2 → ...

Memory usage:           Fixed vs Grows with result size vs Constant
User experience:        "Wait..." vs "Wait..." vs Real-time updates
Cancellation:           Kill process vs Kill process vs Graceful stream close
```

### 📋 **Quick Decision Guide**

**Choose your mode based on your needs:**

| **Your Situation** | **Recommended Mode** | **Why** |
|-------------------|---------------------|---------|
| Just getting started | JSON/MessagePack | Simplest setup |
| Want better performance, same API | **gRPC Non-Streaming** | Drop-in upgrade |
| Need progress updates | **gRPC Streaming** | Real-time feedback |
| Long-running ML tasks | **gRPC Streaming** | See progress, cancel if needed |
| Quick operations (< 30s) | gRPC Non-Streaming | No streaming overhead |
| Large dataset processing | **gRPC Streaming** | Memory efficient |

**Migration path:**
1. **Start**: JSON → **Upgrade**: gRPC Non-Streaming → **Add**: Streaming for long tasks

### gRPC Dependencies

**Elixir:**
```elixir
# mix.exs
def deps do
  [
    {:grpc, "~> 0.8"},
    {:protobuf, "~> 0.12"},
    # ... other deps
  ]
end
```

**Python:**
```bash
# Install with gRPC support
pip install 'snakepit-bridge[grpc]'

# Or manually
pip install grpcio protobuf grpcio-tools
```

### Available Streaming Commands

| Command | Description | Use Case |
|---------|-------------|----------|
| `ping_stream` | Heartbeat stream | Testing, monitoring |
| `batch_inference` | ML model inference | Computer vision, NLP |
| `process_large_dataset` | Data processing | ETL, analytics |
| `tail_and_analyze` | Log analysis | Real-time monitoring |
| `distributed_training` | ML training | Neural networks |

For comprehensive gRPC documentation, see **[README_GRPC.md](README_GRPC.md)**.

For additional streaming examples, see `docs/specs/grpc_streaming_examples.md`.

## 🐍 Python Bridges

For detailed documentation on all Python bridge implementations (V1, V2, Enhanced, gRPC), see **[README_BRIDGES.md](README_BRIDGES.md)**.

## 🔌 Built-in Adapters

### gRPC Python Adapter (Next Generation)

```elixir
# Configure with gRPC for streaming and advanced features
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{base_port: 50051, port_range: 100})

# Install dependencies
# pip install 'snakepit-bridge[grpc]'
# make proto-python

# New streaming capabilities
{:ok, _} = Snakepit.execute_stream("batch_inference", %{
  batch_items: ["img1.jpg", "img2.jpg", "img3.jpg"]
}, fn chunk ->
  IO.puts("Processed: #{chunk["item"]} - #{chunk["confidence"]}")
end)
```

#### gRPC Features
- ✅ **Native streaming** - Progressive results and real-time updates
- ✅ **HTTP/2 multiplexing** - Multiple concurrent requests per connection
- ✅ **Built-in health checks** - Automatic worker health monitoring
- ✅ **Rich error handling** - gRPC status codes with detailed context
- ✅ **Protocol buffers** - Efficient binary serialization
- ✅ **Cancellable operations** - Stop long-running tasks gracefully

#### Installation & Usage

```bash
# Install gRPC dependencies
make install-grpc

# Generate protocol buffer code  
make proto-python

# Test with streaming demo
elixir examples/grpc_streaming_demo.exs

# Test with non-streaming demo
elixir examples/grpc_non_streaming_demo.exs
```

```elixir
# Use auto-negotiation with fallback
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)

# Streaming examples
elixir examples/grpc_streaming_demo.exs

# Non-streaming examples  
elixir examples/grpc_non_streaming_demo.exs
```

### MessagePack Python Adapter (High Performance)

```elixir
# Configure with MessagePack for maximum performance
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonV2)
Application.put_env(:snakepit, :wire_protocol, :auto)

# Install Python dependencies
# pip install msgpack

# Performance benefits over JSON:
# - 1.3-2.3x faster encoding/decoding for regular data
# - 55x faster for binary data (no base64 encoding)
# - 18-36% smaller message sizes
# - Native binary data handling perfect for ML workloads

# Examples with binary data support
{:ok, _} = Snakepit.execute("echo", %{
  text: "Hello MessagePack!",
  binary_data: :crypto.strong_rand_bytes(1024),  # Works natively!
  numbers: [1, 2, 3, 4, 5]
})

# Protocol negotiation happens automatically
# Falls back to JSON if MessagePack unavailable
```

#### MessagePack Features
- ✅ **55x faster binary transfers** - No base64 encoding overhead
- ✅ **Automatic protocol negotiation** - Falls back to JSON gracefully  
- ✅ **Native binary support** - Perfect for ML models, images, numpy arrays
- ✅ **Backward compatible** - Works with existing JSON bridges
- ✅ **Smaller payloads** - 18-36% reduction in message size

#### Installation & Usage

```bash
# Install MessagePack in your Python environment
pip install msgpack

# Or with conda
conda install msgpack
```

```elixir
# Use auto-negotiation (recommended)
Application.put_env(:snakepit, :wire_protocol, :auto)
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonV2)

# Examples
elixir examples/non_session_demo_msgpack.exs
elixir examples/session_based_demo_msgpack.exs
```

### Python Adapter V2 (JSON Protocol)

```elixir
# Configure with robust V2 adapter
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonV2)

# Install the bridge package (recommended for production)
# cd priv/python && pip install -e .

# Available commands
{:ok, _} = Snakepit.execute("ping", %{})
{:ok, _} = Snakepit.execute("echo", %{message: "hello"})
{:ok, _} = Snakepit.execute("compute", %{operation: "multiply", a: 10, b: 5})
{:ok, _} = Snakepit.execute("info", %{})

# Check if package is installed
Snakepit.Adapters.GenericPythonV2.package_installed?()  # true/false

# Get installation instructions
Snakepit.Adapters.GenericPythonV2.installation_instructions()
```

#### V2 Features
- ✅ **Proper package structure** - No fragile sys.path manipulation
- ✅ **Production deployment** - pip install support with console scripts
- ✅ **Dual mode operation** - Automatic fallback to development scripts
- ✅ **Enhanced error handling** - Robust shutdown and broken pipe management
- ✅ **Type checking support** - Includes py.typed marker

#### Installation Options

```bash
# Option 1: Development install (recommended)
cd priv/python && pip install -e .

# Option 2: Regular install
cd priv/python && pip install .

# Option 3: Development mode (no installation)
# Uses V2 scripts with automatic package detection
```

#### Console Scripts (after installation)

```bash
# Test installed bridges
snakepit-generic-bridge --help
snakepit-custom-bridge --help

# Run in pool-worker mode
snakepit-generic-bridge  # Production mode
python3 priv/python/generic_bridge_v2.py  # Development mode
```

### JavaScript/Node.js Adapter

```elixir
# Configure
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericJavaScript)

# Additional commands
{:ok, _} = Snakepit.execute("random", %{type: "uniform", min: 0, max: 100})
{:ok, _} = Snakepit.execute("compute", %{operation: "sqrt", a: 16})
```

## 🛠ïļ Creating Custom Adapters

### Python Bridge V2 Pattern (Recommended)

```python
# my_custom_bridge.py
from snakepit_bridge import BaseCommandHandler, ProtocolHandler
from snakepit_bridge.core import setup_graceful_shutdown, setup_broken_pipe_suppression

class MyCustomHandler(BaseCommandHandler):
    def _register_commands(self):
        self.register_command("my_command", self.handle_my_command)
        self.register_command("process_data", self.handle_process_data)
    
    def handle_my_command(self, args):
        return {"result": "processed", "input": args}
    
    def handle_process_data(self, args):
        data = args.get("data", "")
        return {"processed": data.upper(), "length": len(data)}

def main():
    setup_broken_pipe_suppression()
    
    command_handler = MyCustomHandler()
    protocol_handler = ProtocolHandler(command_handler)
    setup_graceful_shutdown(protocol_handler)
    
    protocol_handler.run()

if __name__ == "__main__":
    main()
```

#### Key Benefits of V2 Approach
- ✅ **No sys.path manipulation** - proper package imports
- ✅ **Location independent** - works from any directory  
- ✅ **Production ready** - can be packaged and installed
- ✅ **Enhanced error handling** - robust shutdown and signal management
- ✅ **Type checking** - full IDE support with proper imports

### Elixir Adapter Implementation

```elixir
defmodule MyApp.RubyAdapter do
  @behaviour Snakepit.Adapter
  
  @impl true
  def executable_path do
    System.find_executable("ruby")
  end
  
  @impl true
  def script_path do
    Path.join(:code.priv_dir(:my_app), "ruby/bridge.rb")
  end
  
  @impl true
  def script_args do
    ["--mode", "pool-worker"]
  end
  
  @impl true
  def supported_commands do
    ["ping", "process_data", "generate_report"]
  end
  
  @impl true
  def validate_command("process_data", args) do
    if Map.has_key?(args, :data) do
      :ok
    else
      {:error, "Missing required field: data"}
    end
  end
  
  def validate_command("ping", _args), do: :ok
  def validate_command(cmd, _args), do: {:error, "Unsupported command: #{cmd}"}
  
  # Optional callbacks
  @impl true
  def prepare_args("process_data", args) do
    # Transform args before sending
    Map.update(args, :data, "", &String.trim/1)
  end
  
  @impl true
  def process_response("generate_report", %{"report" => report} = response) do
    # Post-process the response
    {:ok, Map.put(response, "processed_at", DateTime.utc_now())}
  end
  
  @impl true
  def command_timeout("generate_report", _args), do: 120_000  # 2 minutes
  def command_timeout(_command, _args), do: 30_000  # Default 30 seconds
end
```

### External Bridge Script (Ruby Example)

```ruby
#!/usr/bin/env ruby
# priv/ruby/bridge.rb

require 'json'

class BridgeHandler
  def initialize
    @commands = {
      'ping' => method(:handle_ping),
      'process_data' => method(:handle_process_data),
      'generate_report' => method(:handle_generate_report)
    }
  end
  
  def run
    STDERR.puts "Ruby bridge started"
    
    loop do
      # Read 4-byte length header
      length_bytes = STDIN.read(4)
      break unless length_bytes
      
      # Unpack length (big-endian)
      length = length_bytes.unpack('N')[0]
      
      # Read JSON payload
      json_data = STDIN.read(length)
      request = JSON.parse(json_data)
      
      # Process command
      response = process_command(request)
      
      # Send response
      json_response = JSON.generate(response)
      length_header = [json_response.bytesize].pack('N')
      
      STDOUT.write(length_header)
      STDOUT.write(json_response)
      STDOUT.flush
    end
  end
  
  private
  
  def process_command(request)
    command = request['command']
    args = request['args'] || {}
    
    handler = @commands[command]
    if handler
      result = handler.call(args)
      {
        'id' => request['id'],
        'success' => true,
        'result' => result,
        'timestamp' => Time.now.iso8601
      }
    else
      {
        'id' => request['id'],
        'success' => false,
        'error' => "Unknown command: #{command}",
        'timestamp' => Time.now.iso8601
      }
    end
  rescue => e
    {
      'id' => request['id'],
      'success' => false,
      'error' => e.message,
      'timestamp' => Time.now.iso8601
    }
  end
  
  def handle_ping(args)
    { 'status' => 'ok', 'message' => 'pong' }
  end
  
  def handle_process_data(args)
    data = args['data'] || ''
    { 'processed' => data.upcase, 'length' => data.length }
  end
  
  def handle_generate_report(args)
    # Simulate report generation
    sleep(1)
    { 
      'report' => {
        'title' => args['title'] || 'Report',
        'generated_at' => Time.now.iso8601,
        'data' => args['data'] || {}
      }
    }
  end
end

# Handle signals gracefully
Signal.trap('TERM') { exit(0) }
Signal.trap('INT') { exit(0) }

# Run the bridge
BridgeHandler.new.run
```

## 🗃ïļ Session Management

### Session Store API

```elixir
alias Snakepit.Bridge.SessionStore

# Create a session
{:ok, session} = SessionStore.create_session("session_123", ttl: 7200)

# Store data in session
:ok = SessionStore.store_program("session_123", "prog_1", %{
  model: "gpt-4",
  temperature: 0.8
})

# Retrieve session data
{:ok, session} = SessionStore.get_session("session_123")
{:ok, program} = SessionStore.get_program("session_123", "prog_1")

# Update session
{:ok, updated} = SessionStore.update_session("session_123", fn session ->
  Map.put(session, :last_activity, DateTime.utc_now())
end)

# Check if session exists
true = SessionStore.session_exists?("session_123")

# List all sessions
session_ids = SessionStore.list_sessions()

# Manual cleanup
SessionStore.delete_session("session_123")

# Get session statistics
stats = SessionStore.get_stats()
```

### Global Program Storage

```elixir
# Store programs accessible by any worker
:ok = SessionStore.store_global_program("template_1", %{
  type: "qa_template",
  prompt: "Answer the following question: {question}"
})

# Retrieve from any worker
{:ok, template} = SessionStore.get_global_program("template_1")
```

## 📊 Monitoring & Telemetry

### Available Events

```elixir
# Worker request completed
[:snakepit, :worker, :request]
# Measurements: %{duration: milliseconds}
# Metadata: %{result: :ok | :error}

# Worker initialized
[:snakepit, :worker, :initialized]
# Measurements: %{initialization_time: seconds}
# Metadata: %{worker_id: string}
```

### Setting Up Monitoring

```elixir
# In your application startup
:telemetry.attach_many(
  "snakepit-metrics",
  [
    [:snakepit, :worker, :request],
    [:snakepit, :worker, :initialized]
  ],
  &MyApp.Metrics.handle_event/4,
  %{}
)

defmodule MyApp.Metrics do
  require Logger
  
  def handle_event([:snakepit, :worker, :request], measurements, metadata, _config) do
    # Log slow requests
    if measurements.duration > 5000 do
      Logger.warning("Slow request: #{measurements.duration}ms")
    end
    
    # Send to StatsD/Prometheus/DataDog
    MyApp.Metrics.Client.histogram(
      "snakepit.request.duration",
      measurements.duration,
      tags: ["result:#{metadata.result}"]
    )
  end
  
  def handle_event([:snakepit, :worker, :initialized], measurements, metadata, _config) do
    Logger.info("Worker #{metadata.worker_id} started in #{measurements.initialization_time}s")
  end
end
```

### Pool Statistics

```elixir
stats = Snakepit.get_stats()
# Returns:
# %{
#   workers: 8,          # Total workers
#   available: 6,        # Available workers
#   busy: 2,            # Busy workers
#   requests: 1534,     # Total requests
#   queued: 0,          # Currently queued
#   errors: 12,         # Total errors
#   queue_timeouts: 3,  # Queue timeout count
#   pool_saturated: 0   # Saturation rejections
# }
```

## 🏗ïļ Architecture Deep Dive

### Component Overview


```
┌───────────────────────────────────────────────────────┐
│                    Snakepit Application               │
├───────────────────────────────────────────────────────â”Ī
│                                                       │
│  ┌─────────────┐  ┌──────────────┐  ┌───────────────┐ │
│  │    Pool     │  │ SessionStore │  │  Registries   │ │
│  │  Manager    │  │   (ETS)      │  │  (Worker/Proc)│ │
│  └──────┮──────┘  └──────────────┘  └───────────────┘ │
│         │                                             │
│  ┌──────▾────────────────────────────────────────────┐│
│  │            WorkerSupervisor (Dynamic)             ││
│  └──────┮────────────────────────────────────────────┘│
│         │                                             │
│  ┌──────▾──────┐  ┌──────────────┐  ┌──────────────┐  │
│  │   Worker    │  │   Worker     │  │   Worker     │  │
│  │  Starter    │  │  Starter     │  │  Starter     │  │
│  │(Supervisor) │  │(Supervisor)  │  │(Supervisor)  │  │
│  └──────┮──────┘  └───────┮──────┘  └───────┮──────┘  │
│         │                 │                 │         │
│  ┌──────▾──────┐  ┌───────▾──────┐  ┌───────▾──────┐  │
│  │   Worker    │  │   Worker     │  │   Worker     │  │
│  │ (GenServer) │  │ (GenServer)  │  │ (GenServer)  │  │
│  └──────┮──────┘  └───────┮──────┘  └───────┮──────┘  │
│         │                 │                 │         │
└─────────┾─────────────────┾─────────────────┾─────────┘
          │                 │                 │
    ┌─────▾──────┐    ┌─────▾──────┐    ┌─────▾──────┐
    │  External  │    │  External  │    │  External  │
    │  Process   │    │  Process   │    │  Process   │
    │  (Python)  │    │  (Node.js) │    │   (Ruby)   │
    └────────────┘    └────────────┘    └────────────┘
```

### Key Design Decisions

1. **Concurrent Initialization**: Workers start in parallel using `Task.async_stream`
2. **Permanent Wrapper Pattern**: Worker.Starter supervises Workers for auto-restart
3. **Centralized State**: All session data in ETS, workers are stateless
4. **Registry-Based**: O(1) worker lookups and reverse PID lookups
5. **Port Communication**: Binary protocol with 4-byte length headers

### Process Lifecycle

1. **Startup**:
   - Pool manager starts
   - Concurrently spawns N workers via WorkerSupervisor
   - Each worker starts its external process
   - Workers send init ping and register when ready

2. **Request Flow**:
   - Client calls `Snakepit.execute/3`
   - Pool finds available worker (with session affinity if applicable)
   - Worker sends request to external process
   - External process responds
   - Worker returns result to client

3. **Crash Recovery**:
   - Worker crashes → Worker.Starter restarts it automatically
   - External process dies → Worker detects and crashes → restart
   - Pool crashes → Supervisor restarts entire pool

4. **Shutdown**:
   - Pool manager sends shutdown to all workers
   - Workers close ports gracefully (SIGTERM)
   - ApplicationCleanup ensures no orphaned processes (SIGKILL)

## ⚡ Performance

### Wire Protocol Benchmarks

```
Configuration: 16 workers, Python adapter
Hardware: 8-core CPU, 32GB RAM

Wire Protocol Performance:
JSON vs MessagePack Comparison

Regular Data (1KB payload):
- JSON encoding: 45Ξs
- MessagePack encoding: 19Ξs (2.3x faster)

- JSON decoding: 38Ξs  
- MessagePack decoding: 24Ξs (1.6x faster)

Binary Data (1MB payload):
- JSON (base64): 2.1ms encoding + 55% size overhead
- MessagePack: 0.038ms encoding (55x faster, no overhead)

Message Sizes:
- JSON: 1,340 bytes (with base64 binary)
- MessagePack: 1,024 bytes (24% smaller)

Startup Time:
- Sequential: 16 seconds (1s per worker)
- Concurrent: 1.2 seconds (13x faster)

Throughput (MessagePack):
- Simple computation: 65,000 req/s (vs 50,000 JSON)
- Binary ML inference: 8,000 req/s (vs 1,000 JSON)  
- Session operations: 58,000 req/s (vs 45,000 JSON)

Latency (p99, MessagePack):
- Simple computation: < 1.5ms (vs 2ms JSON)
- Binary ML inference: < 12ms (vs 100ms JSON)
- Session operations: < 0.8ms (vs 1ms JSON)
```

### Optimization Tips

1. **Pool Size**: Start with `System.schedulers_online() * 2`
2. **Queue Size**: Monitor `pool_saturated` errors and adjust
3. **Timeouts**: Set appropriate timeouts per command type
4. **Session TTL**: Balance memory usage vs cache hits
5. **Health Checks**: Increase interval for stable workloads

## 🔧 Troubleshooting

### Common Issues

#### Workers Not Starting

```elixir
# Check adapter configuration
adapter = Application.get_env(:snakepit, :adapter_module)
adapter.executable_path()  # Should return valid path
File.exists?(adapter.script_path())  # Should return true

# Check logs for errors
Logger.configure(level: :debug)
```

#### Port Exits

```elixir
# Enable port tracing
:erlang.trace(Process.whereis(Snakepit.Pool.Worker), true, [:receive, :send])

# Check external process logs
# Python: Add logging to bridge script
# Node.js: Check stderr output
```

#### Memory Leaks

```elixir
# Monitor ETS usage
:ets.info(:snakepit_sessions, :memory)

# Check for orphaned processes
Snakepit.Pool.ProcessRegistry.get_stats()

# Force cleanup
Snakepit.Bridge.SessionStore.cleanup_expired_sessions()
```

### Debug Mode

```elixir
# Enable debug logging
Logger.configure(level: :debug)

# Trace specific worker
:sys.trace(Snakepit.Pool.Registry.via_tuple("worker_1"), true)

# Get internal state
:sys.get_state(Snakepit.Pool)
```

## ðŸĪ Contributing

We welcome contributions! Please see our [Contributing Guide](https://github.com/nshkrdotcom/snakepit/blob/main/CONTRIBUTING.md) for details.

### Development Setup

```bash
# Clone the repo
git clone https://github.com/nshkrdotcom/snakepit.git
cd snakepit

# Install dependencies
mix deps.get

# Run tests
mix test

# Run example scripts
elixir examples/v2/session_based_demo.exs
elixir examples/javascript_session_demo_json.exs

# Check code quality
mix format --check-formatted
mix dialyzer
```

### Running Tests

```bash
# All tests
mix test

# With coverage
mix test --cover

# Specific test
mix test test/snakepit_test.exs:42
```

## 📝 License

Snakepit is released under the MIT License. See the [LICENSE](https://github.com/nshkrdotcom/snakepit/blob/main/LICENSE) file for details.

## 🙏 Acknowledgments

- Inspired by the need for reliable ML/AI integrations in Elixir
- Built on battle-tested OTP principles
- Special thanks to the Elixir community

## 📊 Development Status

**v0.3.0 (Current Release)**
- ✅ **gRPC streaming bridge** implementation complete
- ✅ **MessagePack protocol** support with automatic negotiation
- ✅ **Python Bridge V2** architecture with production packaging
- ✅ **Comprehensive documentation** and examples
- ✅ **Performance benchmarks** and optimization
- ✅ **End-to-end testing** across all protocols

**Roadmap**
- 🔄 Enhanced streaming operations and cancellation
- 🔄 Additional language adapters (Ruby, R, Go)
- 🔄 Advanced telemetry and monitoring features
- 🔄 Distributed worker pools

## 📚 Resources

- [Hex Package](https://hex.pm/packages/snakepit)
- [API Documentation](https://hexdocs.pm/snakepit)
- [GitHub Repository](https://github.com/nshkrdotcom/snakepit)
- [Example Projects](https://github.com/nshkrdotcom/snakepit/tree/main/examples)
- [gRPC Bridge Documentation](README_GRPC.md)
- [Python Bridge Documentation](README_BRIDGES.md)

---

Made with âĪïļ by [NSHkr](https://github.com/nshkrdotcom)