README_GRPC.md

# Snakepit gRPC Streaming Guide

## Overview

This guide covers Snakepit's gRPC streaming implementation - a modern replacement for stdin/stdout communication that enables real-time progress updates, progressive results, and superior performance for ML and data processing workflows.

## Table of Contents

- [Quick Start](#quick-start)
- [Installation](#installation)
- [Core Concepts](#core-concepts)
- [API Reference](#api-reference)
- [Streaming Examples](#streaming-examples)
- [Performance](#performance)
- [Migration Guide](#migration-guide)
- [Troubleshooting](#troubleshooting)

## Quick Start

### 1. Install Dependencies

```bash
# Install gRPC dependencies
make install-grpc

# Generate protocol buffer code
make proto-python

# Verify installation
python -c "import grpc, snakepit_bridge.grpc.snakepit_pb2; print('✅ gRPC ready')"
```

### 2. Basic Configuration

```elixir
# Configure Snakepit with gRPC adapter
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{
  base_port: 50051,
  port_range: 100  # Uses ports 50051-50151
})

{:ok, _} = Application.ensure_all_started(:snakepit)
```

### 3. Run Your First Stream

```elixir
# Traditional (still works)
{:ok, result} = Snakepit.execute("ping", %{})

# NEW: Streaming execution
Snakepit.execute_stream("ping_stream", %{count: 5}, fn chunk ->
  IO.puts("Received: #{chunk["message"]}")
end)
```

## Installation

### Prerequisites

- **Elixir**: 1.18+ with OTP 27+
- **Python**: 3.8+ 
- **System**: `protoc` compiler (for development)

### Elixir Dependencies

```elixir
# mix.exs
def deps do
  [
    {:grpc, "~> 0.8"},
    {:protobuf, "~> 0.12"},
    # ... existing deps
  ]
end
```

### Python Dependencies

```bash
# Option 1: Install with gRPC support
cd priv/python && pip install -e ".[grpc]"

# Option 2: Install manually
pip install grpcio>=1.50.0 protobuf>=4.0.0 grpcio-tools>=1.50.0

# Option 3: All features
pip install -e ".[all]"  # Includes gRPC + MessagePack
```

### Development Setup

```bash
# Complete development environment
make dev-setup

# Or step by step:
make install-grpc      # Install Python gRPC deps
make proto-python      # Generate protobuf code
make test             # Verify everything works
```

## Core Concepts

### Communication Protocols Comparison

| Protocol | Streaming | Multiplexing | Binary Data | Setup Complexity |
|----------|-----------|--------------|-------------|------------------|
| **stdin/stdout** | ❌ | ❌ | Base64 | Simple |
| **MessagePack** | ❌ | ❌ | ✅ Native | Simple |
| **gRPC** | ✅ Native | ✅ HTTP/2 | ✅ Native | Moderate |

### Protocol Selection

```elixir
# Snakepit automatically chooses the best available protocol:

# 1. If gRPC adapter configured and available → gRPC
# 2. If MessagePack adapter configured → MessagePack  
# 3. Fallback → JSON over stdin/stdout

# Force specific protocol:
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)        # gRPC
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonMsgpack)  # MessagePack
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonV2)     # JSON
```

### Worker Architecture

```
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Elixir App    │    │  gRPC Worker    │    │ Python Process  │
│                 │◄──►│   (GenServer)   │◄──►│  (gRPC Server)  │
│ execute_stream  │    │                 │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         │              HTTP/2 Connection               │
         │                       │                       │
    Callback Handler      Connection Mgmt          Stream Handlers
```

### Session Management

```elixir
# Sessions work seamlessly with gRPC
session_id = "ml_training_#{user_id}"

# Session-based execution (maintains state)
{:ok, result} = Snakepit.execute_in_session(session_id, "initialize_model", model_config)

# Session-based streaming
Snakepit.execute_in_session_stream(session_id, "train_model", training_data, fn chunk ->
  IO.puts("Epoch #{chunk["epoch"]}: loss=#{chunk["loss"]}")
end)
```

## API Reference

### Basic Execution

```elixir
# Snakepit.execute/3 - Same as always
{:ok, result} = Snakepit.execute(command, args, timeout \\ 30_000)

# Examples
{:ok, _} = Snakepit.execute("ping", %{})
{:ok, result} = Snakepit.execute("compute", %{operation: "add", a: 5, b: 3})
```

### Streaming Execution

```elixir
# Snakepit.execute_stream/4 - NEW!
:ok = Snakepit.execute_stream(command, args, callback_fn, timeout \\ 300_000)

# Callback function receives each chunk
callback_fn = fn chunk ->
  if chunk["is_final"] do
    IO.puts("Stream complete!")
  else
    IO.puts("Progress: #{chunk["progress"]}%")
  end
end
```

### Session-based APIs

```elixir
# Session execution (existing)
{:ok, result} = Snakepit.execute_in_session(session_id, command, args, timeout \\ 30_000)

# Session streaming (NEW!)
:ok = Snakepit.execute_in_session_stream(session_id, command, args, callback_fn, timeout \\ 300_000)
```

### Error Handling

```elixir
case Snakepit.execute_stream("long_process", %{data: large_data}, callback) do
  :ok ->
    IO.puts("Stream completed successfully")
    
  {:error, :worker_timeout} ->
    IO.puts("Operation timed out")
    
  {:error, :grpc_unavailable} ->
    IO.puts("gRPC not available, check setup")
    
  {:error, reason} ->
    IO.puts("Stream failed: #{inspect(reason)}")
end
```

## Streaming Examples

### 1. ML Batch Inference

Stream inference results as each item completes:

```elixir
batch_items = ["image_001.jpg", "image_002.jpg", "image_003.jpg"]

Snakepit.execute_stream("batch_inference", %{
  model_path: "/models/resnet50.pkl",
  batch_items: batch_items
}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("🎉 Batch inference complete!")
  else
    item = chunk["item"]
    prediction = chunk["prediction"] 
    confidence = chunk["confidence"]
    IO.puts("🧠 #{item}: #{prediction} (#{confidence}% confidence)")
  end
end)

# Output:
# 🧠 image_001.jpg: cat (94% confidence)
# 🧠 image_002.jpg: dog (87% confidence)  
# 🧠 image_003.jpg: bird (91% confidence)
# 🎉 Batch inference complete!
```

### 2. Large Dataset Processing

Process huge datasets with real-time progress:

```elixir
Snakepit.execute_stream("process_large_dataset", %{
  file_path: "/data/sales_data_10gb.csv",
  chunk_size: 10_000,
  operations: ["clean", "transform", "aggregate"]
}, fn chunk ->
  if chunk["is_final"] do
    stats = chunk["final_stats"]
    IO.puts("✅ Processing complete!")
    IO.puts("📊 Final stats: #{inspect(stats)}")
  else
    progress = chunk["progress_percent"]
    processed = chunk["processed_rows"]
    total = chunk["total_rows"]
    memory_mb = chunk["memory_usage_mb"]
    
    IO.puts("📊 Progress: #{progress}% (#{processed}/#{total} rows) - Memory: #{memory_mb}MB")
  end
end)

# Output:
# 📊 Progress: 10.0% (100000/1000000 rows) - Memory: 45MB
# 📊 Progress: 20.0% (200000/1000000 rows) - Memory: 47MB
# 📊 Progress: 30.0% (300000/1000000 rows) - Memory: 46MB
# ...
# ✅ Processing complete!
# 📊 Final stats: %{errors: 12, processed: 988000, skipped: 12}
```

### 3. Real-time Log Analysis

Analyze logs in real-time as new entries arrive:

```elixir
Snakepit.execute_stream("tail_and_analyze", %{
  log_path: "/var/log/app.log",
  patterns: ["ERROR", "FATAL", "OutOfMemoryError"],
  context_lines: 3
}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("📊 Log analysis session ended")
  else
    severity = chunk["severity"]
    timestamp = chunk["timestamp"]
    line = chunk["log_line"]
    pattern = chunk["pattern_matched"]
    
    emoji = case severity do
      "FATAL" -> "💀"
      "ERROR" -> "🚨" 
      "WARN" -> "⚠️"
      _ -> "ℹ️"
    end
    
    IO.puts("#{emoji} [#{timestamp}] #{pattern}: #{String.slice(line, 0, 80)}...")
  end
end)

# Output:
# 🚨 [2025-01-20T15:30:45] ERROR: Database connection failed: timeout after 30s...
# ⚠️ [2025-01-20T15:30:50] WARN: High memory usage detected: 85% of heap used...
# 💀 [2025-01-20T15:31:00] FATAL: OutOfMemoryError: Java heap space exceeded...
```

### 4. Distributed Training Monitoring

Monitor ML training progress across multiple nodes:

```elixir
Snakepit.execute_stream("distributed_training", %{
  model_config: %{
    architecture: "transformer",
    layers: 24,
    hidden_size: 1024
  },
  training_config: %{
    epochs: 100,
    batch_size: 32,
    learning_rate: 0.001
  },
  dataset_path: "/data/training_set"
}, fn chunk ->
  if chunk["is_final"] do
    model_path = chunk["final_model_path"] 
    best_acc = chunk["best_val_accuracy"]
    IO.puts("🎯 Training complete! Best accuracy: #{best_acc}")
    IO.puts("💾 Model saved: #{model_path}")
  else
    epoch = chunk["epoch"]
    train_loss = chunk["train_loss"]
    val_loss = chunk["val_loss"] 
    train_acc = chunk["train_acc"]
    val_acc = chunk["val_acc"]
    lr = chunk["learning_rate"]
    
    IO.puts("📈 Epoch #{epoch}/100:")
    IO.puts("   Train: loss=#{train_loss}, acc=#{train_acc}")
    IO.puts("   Val:   loss=#{val_loss}, acc=#{val_acc}")
    IO.puts("   LR: #{lr}")
  end
end)

# Output:
# 📈 Epoch 1/100:
#    Train: loss=2.45, acc=0.12
#    Val:   loss=2.38, acc=0.15
#    LR: 0.001
# 📈 Epoch 2/100:
#    Train: loss=2.12, acc=0.23
#    Val:   loss=2.05, acc=0.28
#    LR: 0.001
# ...
# 🎯 Training complete! Best accuracy: 0.94
# 💾 Model saved: /models/transformer_best_20250120.pkl
```

### 5. Financial Data Pipeline

Process real-time financial data with technical analysis:

```elixir
Snakepit.execute_stream("realtime_stock_analysis", %{
  symbols: ["AAPL", "GOOGL", "MSFT", "TSLA"],
  indicators: ["RSI", "MACD", "SMA_20", "SMA_50"],
  alert_thresholds: %{
    rsi_oversold: 30,
    rsi_overbought: 70,
    volume_spike: 2.0
  }
}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("📊 Market session ended")
  else
    symbol = chunk["symbol"]
    price = chunk["price"]
    volume = chunk["volume"] 
    rsi = chunk["rsi"]
    signal = chunk["trading_signal"]
    
    case signal do
      "BUY" -> IO.puts("🟢 #{symbol}: $#{price} - BUY signal (RSI: #{rsi})")
      "SELL" -> IO.puts("🔴 #{symbol}: $#{price} - SELL signal (RSI: #{rsi})")
      _ -> IO.puts("⚪ #{symbol}: $#{price} - HOLD (RSI: #{rsi})")
    end
  end
end)

# Output:
# ⚪ AAPL: $175.50 - HOLD (RSI: 45.2)
# 🟢 GOOGL: $142.80 - BUY signal (RSI: 28.5)
# ⚪ MSFT: $380.20 - HOLD (RSI: 52.1) 
# 🔴 TSLA: $210.15 - SELL signal (RSI: 73.8)
```

## Performance

### Streaming vs Traditional Comparison

| Metric | stdin/stdout | MessagePack | gRPC Streaming |
|--------|-------------|-------------|----------------|
| **Latency (first result)** | 5-10s | 3-7s | **0.1-0.5s** |
| **Memory usage** | Grows with result | Grows with result | **Constant** |
| **Progress visibility** | None | None | **Real-time** |
| **Cancellation** | Kill process | Kill process | **Graceful** |
| **Error granularity** | End only | End only | **Per chunk** |
| **User experience** | "Is it working?" | "Is it working?" | **Live updates** |

### Binary Serialization Performance

gRPC mode includes automatic binary serialization for large data:

| Data Size | JSON Encoding | Binary Encoding | Speedup |
|-----------|--------------|-----------------|---------|
| 1KB | 0.1ms | 0.1ms | 1x (uses JSON) |
| 10KB | 2ms | 2ms | 1x (threshold) |
| 100KB | 25ms | 3ms | **8x faster** |
| 1MB | 300ms | 30ms | **10x faster** |
| 10MB | 3500ms | 250ms | **14x faster** |

The 10KB threshold ensures optimal performance:
- Small data remains human-readable (JSON)
- Large data gets maximum performance (binary)

### Benchmarks

```bash
# Large dataset processing (1GB CSV file)
Traditional: Submit → Wait 10 minutes → Get result or timeout
gRPC Stream: Submit → Progress every 30s → Complete with stats

# ML batch inference (1000 images)  
Traditional: Submit → Wait 5 minutes → Get all predictions
gRPC Stream: Submit → Get prediction 1 → Get prediction 2 → ...

# Real-time monitoring
Traditional: Not possible
gRPC Stream: Live tail of logs/metrics with analysis
```

### Memory Usage Patterns

```elixir
# Traditional: Memory grows with result size
results = []  # Starts empty
# ... processing 1GB of data ...
results = [huge_dataset_results]  # Now using 1GB+ memory

# gRPC Streaming: Constant memory usage
Snakepit.execute_stream("process_1gb_dataset", %{}, fn chunk ->
  process_chunk(chunk)  # Handle immediately
  # chunk gets garbage collected
end)
# Memory usage stays constant regardless of total data size
```

## Migration Guide

### From stdin/stdout to gRPC

**Step 1: Install gRPC**
```bash
make install-grpc
make proto-python
```

**Step 2: Update Configuration**
```elixir
# Before
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GenericPythonV2)

# After  
Application.put_env(:snakepit, :adapter_module, Snakepit.Adapters.GRPCPython)
Application.put_env(:snakepit, :grpc_config, %{base_port: 50051, port_range: 100})
```

**Step 3: Keep Existing Code**
```elixir
# This still works exactly the same
{:ok, result} = Snakepit.execute("compute", %{operation: "add", a: 5, b: 3})
```

**Step 4: Add Streaming Where Beneficial**
```elixir
# Replace long-running operations with streaming versions
# Before: 
{:ok, results} = Snakepit.execute("batch_process", large_dataset)  # Blocks for minutes

# After:
Snakepit.execute_stream("batch_process", large_dataset, fn chunk ->
  update_progress_bar(chunk["progress"])
  save_partial_result(chunk["data"])
end)
```

### From MessagePack to gRPC

```elixir
# MessagePack config (keep as fallback)
Application.put_env(:snakepit, :adapters, [
  Snakepit.Adapters.GRPCPython,        # Try gRPC first
  Snakepit.Adapters.GenericPythonMsgpack  # Fallback to MessagePack
])
```

### Gradual Migration Strategy

1. **Phase 1**: Install gRPC alongside existing adapter
2. **Phase 2**: Test gRPC with non-critical workloads  
3. **Phase 3**: Move long-running operations to streaming
4. **Phase 4**: Default to gRPC, keep MessagePack as fallback
5. **Phase 5**: Full gRPC adoption

## Creating Streaming Commands in Python

This section shows how to implement custom streaming commands in your Python adapters.

### Basic Streaming Command

```python
from snakepit_bridge.base_adapter import BaseAdapter, tool

class MyAdapter(BaseAdapter):
    @tool(description="Stream progress updates", supports_streaming=True)
    def progress_stream(self, count: int = 10, delay: float = 0.5):
        """
        Stream progress updates to the client.

        For streaming tools, return a generator that yields dictionaries.
        Each yielded dict becomes a chunk sent to the Elixir callback.
        """
        import time

        for i in range(count):
            # Yield progress chunk
            yield {
                "progress": i + 1,
                "total": count,
                "percent": ((i + 1) / count) * 100,
                "message": f"Processing item {i + 1} of {count}",
                "is_final": False
            }
            time.sleep(delay)

        # Final chunk
        yield {
            "is_final": True,
            "total_processed": count,
            "message": "Stream complete"
        }
```

### ML Batch Inference Streaming

```python
from snakepit_bridge.base_adapter import BaseAdapter, tool
import time

class MLAdapter(BaseAdapter):
    def __init__(self):
        super().__init__()
        # Load your model here
        self.model = self._load_model()

    @tool(description="Batch inference with streaming results", supports_streaming=True)
    def batch_inference(self, items: list, model_path: str = None):
        """
        Process items in a batch, streaming each result as it completes.
        """
        total = len(items)

        for idx, item in enumerate(items):
            # Perform inference
            prediction, confidence = self._predict(item)

            # Yield result for this item
            yield {
                "item": item,
                "index": idx,
                "prediction": prediction,
                "confidence": confidence,
                "progress": idx + 1,
                "total": total,
                "is_final": False
            }

        # Final summary
        yield {
            "is_final": True,
            "total_processed": total,
            "message": f"Processed {total} items successfully"
        }

    def _predict(self, item):
        # Your ML inference logic here
        # This is a placeholder
        time.sleep(0.1)
        return "predicted_class", 0.95

    def _load_model(self):
        # Load your model
        return None
```

### Large Dataset Processing

```python
from snakepit_bridge.base_adapter import BaseAdapter, tool
import pandas as pd

class DataAdapter(BaseAdapter):
    @tool(description="Process large datasets with progress", supports_streaming=True)
    def process_large_dataset(self, file_path: str, chunk_size: int = 10000):
        """
        Process a large CSV file in chunks, streaming progress.
        """
        # Get total rows (for progress calculation)
        total_rows = sum(1 for _ in open(file_path)) - 1  # -1 for header

        processed = 0
        errors = 0

        # Process in chunks
        for chunk in pd.read_csv(file_path, chunksize=chunk_size):
            # Process this chunk
            try:
                results = self._process_chunk(chunk)
                processed += len(chunk)

                # Yield progress
                yield {
                    "processed_rows": processed,
                    "total_rows": total_rows,
                    "progress_percent": (processed / total_rows) * 100,
                    "memory_usage_mb": chunk.memory_usage(deep=True).sum() / 1024 / 1024,
                    "is_final": False
                }
            except Exception as e:
                errors += 1
                yield {
                    "error": str(e),
                    "chunk_start": processed,
                    "is_final": False
                }

        # Final statistics
        yield {
            "is_final": True,
            "final_stats": {
                "total_rows": total_rows,
                "processed": processed,
                "errors": errors,
                "skipped": total_rows - processed
            }
        }

    def _process_chunk(self, chunk):
        # Your processing logic
        return chunk.apply(lambda row: row, axis=1)
```

### Real-time Log Analysis

```python
from snakepit_bridge.base_adapter import BaseAdapter, tool
import re
import time

class LogAdapter(BaseAdapter):
    @tool(description="Tail and analyze logs in real-time", supports_streaming=True)
    def tail_and_analyze(self, log_path: str, patterns: list, context_lines: int = 3):
        """
        Stream log analysis results in real-time.
        """
        pattern_regexes = [re.compile(p) for p in patterns]

        # Simulate tailing (in production, use actual file watching)
        with open(log_path, 'r') as f:
            for line_num, line in enumerate(f):
                for pattern_idx, regex in enumerate(pattern_regexes):
                    if regex.search(line):
                        # Found a match
                        severity = self._determine_severity(line)

                        yield {
                            "line_number": line_num,
                            "log_line": line.strip(),
                            "pattern_matched": patterns[pattern_idx],
                            "severity": severity,
                            "timestamp": self._extract_timestamp(line),
                            "is_final": False
                        }

                # Simulate real-time (remove for actual file watching)
                time.sleep(0.01)

        yield {
            "is_final": True,
            "message": "Log analysis complete"
        }

    def _determine_severity(self, line):
        if "FATAL" in line or "CRITICAL" in line:
            return "FATAL"
        elif "ERROR" in line:
            return "ERROR"
        elif "WARN" in line:
            return "WARN"
        return "INFO"

    def _extract_timestamp(self, line):
        # Extract timestamp from log line
        # This is a placeholder
        return "2025-01-20T15:30:45"
```

### ML Training with Epoch Updates

```python
from snakepit_bridge.base_adapter import BaseAdapter, tool
import time

class TrainingAdapter(BaseAdapter):
    @tool(description="Train model with streaming epoch updates", supports_streaming=True)
    def distributed_training(self, model_config: dict, training_config: dict, dataset_path: str):
        """
        Train a model and stream updates for each epoch.
        """
        epochs = training_config.get("epochs", 100)
        best_val_acc = 0.0

        for epoch in range(1, epochs + 1):
            # Simulate training epoch
            train_loss, train_acc = self._train_epoch(epoch)
            val_loss, val_acc = self._validate_epoch(epoch)

            # Track best accuracy
            if val_acc > best_val_acc:
                best_val_acc = val_acc
                model_path = f"/models/model_epoch_{epoch}.pkl"
                self._save_checkpoint(model_path)

            # Yield epoch results
            yield {
                "epoch": epoch,
                "total_epochs": epochs,
                "train_loss": train_loss,
                "train_acc": train_acc,
                "val_loss": val_loss,
                "val_acc": val_acc,
                "learning_rate": self._get_current_lr(epoch),
                "is_final": False
            }

        # Final result
        yield {
            "is_final": True,
            "final_model_path": model_path,
            "best_val_accuracy": best_val_acc,
            "total_epochs": epochs,
            "message": "Training complete"
        }

    def _train_epoch(self, epoch):
        # Training logic (placeholder)
        time.sleep(0.1)
        return 2.5 - (epoch * 0.02), 0.1 + (epoch * 0.008)

    def _validate_epoch(self, epoch):
        # Validation logic (placeholder)
        time.sleep(0.05)
        return 2.3 - (epoch * 0.018), 0.15 + (epoch * 0.007)

    def _get_current_lr(self, epoch):
        # Learning rate schedule
        return 0.001 * (0.95 ** epoch)

    def _save_checkpoint(self, path):
        # Save model checkpoint
        pass
```

### Key Patterns for Streaming Commands

1. **Use Generator Functions**: Return a generator (using `yield`) instead of returning a value directly
2. **Mark with supports_streaming**: Add `supports_streaming=True` to the `@tool` decorator
3. **Include is_final**: Always include `"is_final": False` in intermediate chunks and `"is_final": True` in the final chunk
4. **Yield Dictionaries**: Each yielded value should be a dictionary that will be sent to the Elixir callback
5. **Progress Information**: Include progress indicators (`progress`, `total`, `percent`, etc.) to help users track completion
6. **Error Handling**: Yield error information as chunks rather than raising exceptions

### Calling Streaming Commands from Elixir

```elixir
# Call your custom streaming command
Snakepit.execute_stream("progress_stream", %{count: 10, delay: 0.5}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("✅ #{chunk["message"]}")
  else
    IO.puts("📊 Progress: #{chunk["percent"]}% - #{chunk["message"]}")
  end
end)

# With session affinity
Snakepit.execute_in_session_stream("ml_session", "distributed_training", %{
  model_config: %{layers: 12},
  training_config: %{epochs: 100},
  dataset_path: "/data/train.csv"
}, fn chunk ->
  if chunk["is_final"] do
    IO.puts("🎯 Training complete! Model: #{chunk["final_model_path"]}")
  else
    IO.puts("📈 Epoch #{chunk["epoch"]}: loss=#{chunk["train_loss"]}, acc=#{chunk["train_acc"]}")
  end
end)
```

## Troubleshooting

### Common Issues

#### 1. "Generated gRPC code not found"

```bash
# Solution: Generate protobuf code
make proto-python

# Verify:
ls priv/python/snakepit_bridge/grpc/
# Should show: snakepit_pb2.py, snakepit_pb2_grpc.py
```

#### 2. "gRPC dependencies not available"

```bash
# Check Python dependencies
python -c "import grpc; print('gRPC:', grpc.__version__)"
python -c "import google.protobuf; print('Protobuf OK')"

# Install if missing
pip install 'snakepit-bridge[grpc]'
```

#### 3. "Port already in use"

```elixir
# Solution: Configure different port range
Application.put_env(:snakepit, :grpc_config, %{
  base_port: 50100,  # Try different base port
  port_range: 50     # Smaller range
})
```

#### 4. Worker initialization timeout

```bash
# Check Python process
ps aux | grep grpc_bridge

# Check Python errors
python priv/python/grpc_bridge.py --help

# Increase timeout
Application.put_env(:snakepit, :worker_init_timeout, 30_000)
```

#### 5. Streaming callback errors

```elixir
# Bad: Callback crashes on unexpected data
callback = fn chunk ->
  result = chunk["result"]  # May be nil
  process_result(result)    # Crashes if nil
end

# Good: Defensive callback
callback = fn chunk ->
  case chunk do
    %{"is_final" => true} ->
      IO.puts("Stream complete")
    %{"result" => result} when not is_nil(result) ->
      process_result(result)
    %{"error" => error} ->
      IO.puts("Stream error: #{error}")
    other ->
      IO.puts("Unexpected chunk: #{inspect(other)}")
  end
end
```

### Debug Mode

```elixir
# Enable debug logging
Logger.configure(level: :debug)

# Trace gRPC worker
:sys.trace(worker_pid, true)

# Check gRPC connection health
Snakepit.GRPCWorker.get_health(worker)
Snakepit.GRPCWorker.get_info(worker)
```

### Performance Tuning

```elixir
# Adjust gRPC settings
Application.put_env(:snakepit, :grpc_config, %{
  base_port: 50051,
  port_range: 100,
  # Increase concurrent streams
  max_concurrent_streams: 10,
  # Tune timeouts
  connection_timeout: 10_000,
  stream_timeout: 300_000
})

# Pool optimization
Application.put_env(:snakepit, :pool_config, %{
  pool_size: 16,  # More workers for concurrent streams
  worker_init_timeout: 30_000
})
```

### Fallback Strategy

```elixir
# Graceful degradation if gRPC unavailable
defmodule MyApp.SnakepitClient do
  def execute_with_fallback(command, args, opts \\ []) do
    case Snakepit.execute(command, args, opts) do
      {:ok, result} -> 
        {:ok, result}
      {:error, :grpc_unavailable} ->
        # Fallback to MessagePack
        with_adapter(Snakepit.Adapters.GenericPythonMsgpack, fn ->
          Snakepit.execute(command, args, opts)
        end)
      {:error, reason} ->
        {:error, reason}
    end
  end
  
  defp with_adapter(adapter, fun) do
    old_adapter = Application.get_env(:snakepit, :adapter_module)
    Application.put_env(:snakepit, :adapter_module, adapter)
    
    try do
      fun.()
    after
      Application.put_env(:snakepit, :adapter_module, old_adapter)
    end
  end
end
```

---