TELEMETRY.md

# Snakepit Telemetry

Snakepit provides a comprehensive distributed telemetry system that enables observability across your Elixir cluster and Python workers. All telemetry flows through Elixir's standard `:telemetry` library, providing a unified interface for monitoring, metrics, and tracing.

## Features

- **Distributed by Design** - All events include node metadata for cluster-wide visibility
- **Python-to-Elixir Event Folding** - Python worker metrics appear as Elixir `:telemetry` events
- **Bidirectional gRPC Stream** - Real-time event streaming with runtime control
- **Atom Safety** - Curated event catalog prevents atom table exhaustion
- **Runtime Control** - Adjust sampling rates, filtering, and toggle telemetry without restarting workers
- **Zero External Dependencies** - Core system uses only stdlib + `:telemetry`
- **High Performance** - <10μs overhead per event, <1% CPU impact

## Quick Start

### Attaching Event Handlers

```elixir
# In your application.ex
defmodule MyApp.Application do
  use Application

  def start(_type, _args) do
    # Attach telemetry handlers
    :telemetry.attach(
      "my-app-python-monitor",
      [:snakepit, :python, :call, :stop],
      &MyApp.Telemetry.handle_python_call/4,
      nil
    )

    children = [
      # ... your supervision tree
    ]

    Supervisor.start_link(children, strategy: :one_for_one)
  end
end

defmodule MyApp.Telemetry do
  require Logger

  def handle_python_call(_event, measurements, metadata, _config) do
    duration_ms = measurements.duration / 1_000_000

    Logger.info("Python call completed",
      command: metadata.command,
      duration_ms: duration_ms,
      worker_id: metadata.worker_id,
      node: metadata.node
    )
  end
end
```

### Emitting from Python

```python
from snakepit_bridge import telemetry

def my_tool(ctx, data):
    # Use span for automatic timing
    with telemetry.span("tool.execution", {"tool": "my_tool"}):
        result = expensive_operation(data)

        # Emit custom metrics
        telemetry.emit(
            "tool.result_size",
            {"bytes": len(result)},
            {"tool": "my_tool"}
        )

        return result
```

## Event Catalog

Snakepit emits events across three layers:

### Layer 1: Infrastructure Events (Elixir)

**Pool Management:**
- `[:snakepit, :pool, :initialized]` - Pool initialization complete
- `[:snakepit, :pool, :status]` - Periodic pool status snapshot
- `[:snakepit, :pool, :queue, :enqueued]` - Request queued (no workers available)
- `[:snakepit, :pool, :queue, :dequeued]` - Request dequeued (worker available)
- `[:snakepit, :pool, :queue, :timeout]` - Request timed out in queue

**Worker Lifecycle:**
- `[:snakepit, :pool, :worker, :spawn_started]` - Worker spawn initiated
- `[:snakepit, :pool, :worker, :spawned]` - Worker ready and connected
- `[:snakepit, :pool, :worker, :spawn_failed]` - Worker failed to start
- `[:snakepit, :pool, :worker, :terminated]` - Worker terminated
- `[:snakepit, :pool, :worker, :restarted]` - Worker restarted by supervisor

**Session Management:**
- `[:snakepit, :session, :created]` - New session created
- `[:snakepit, :session, :destroyed]` - Session destroyed
- `[:snakepit, :session, :affinity, :assigned]` - Session assigned to worker
- `[:snakepit, :session, :affinity, :broken]` - Session affinity broken

### Layer 2: Python Execution Events (Folded from Python)

**Call Lifecycle:**
- `[:snakepit, :python, :call, :start]` - Python command started
- `[:snakepit, :python, :call, :stop]` - Python command completed successfully
- `[:snakepit, :python, :call, :exception]` - Python command raised exception

**Tool Execution:**
- `[:snakepit, :python, :tool, :execution, :start]` - Tool execution started
- `[:snakepit, :python, :tool, :execution, :stop]` - Tool execution completed
- `[:snakepit, :python, :tool, :execution, :exception]` - Tool execution failed
- `[:snakepit, :python, :tool, :result_size]` - Tool result size metric

**Resource Metrics:**
- `[:snakepit, :python, :memory, :sampled]` - Python process memory usage
- `[:snakepit, :python, :cpu, :sampled]` - Python process CPU usage
- `[:snakepit, :python, :gc, :completed]` - Python garbage collection completed
- `[:snakepit, :python, :error, :occurred]` - Python error detected

### Layer 3: gRPC Bridge Events (Elixir)

**Call Events:**
- `[:snakepit, :grpc, :call, :start]` - gRPC call initiated
- `[:snakepit, :grpc, :call, :stop]` - gRPC call completed
- `[:snakepit, :grpc, :call, :exception]` - gRPC call failed

**Stream Events:**
- `[:snakepit, :grpc, :stream, :opened]` - Streaming RPC opened
- `[:snakepit, :grpc, :stream, :message]` - Stream message sent/received
- `[:snakepit, :grpc, :stream, :closed]` - Stream closed

**Connection Events:**
- `[:snakepit, :grpc, :connection, :established]` - gRPC channel connected
- `[:snakepit, :grpc, :connection, :lost]` - gRPC connection lost
- `[:snakepit, :grpc, :connection, :reconnected]` - gRPC reconnected after failure

## Usage Patterns

### Monitoring Python Call Performance

```elixir
:telemetry.attach(
  "python-perf-monitor",
  [:snakepit, :python, :call, :stop],
  fn _event, %{duration: duration}, metadata, _ ->
    duration_ms = duration / 1_000_000

    if duration_ms > 1000 do
      Logger.warning("Slow Python call detected",
        command: metadata.command,
        duration_ms: duration_ms,
        worker_id: metadata.worker_id
      )
    end
  end,
  nil
)
```

### Tracking Worker Health

```elixir
:telemetry.attach(
  "worker-health-monitor",
  [:snakepit, :pool, :worker, :restarted],
  fn _event, %{restart_count: count}, metadata, _ ->
    if count > 5 do
      Logger.error("Worker restarting frequently",
        worker_id: metadata.worker_id,
        restart_count: count,
        reason: metadata.reason
      )

      # Alert ops team
      MyApp.Alerts.send_alert(:worker_flapping, metadata)
    end
  end,
  nil
)
```

### Monitoring Queue Depth

```elixir
:telemetry.attach(
  "queue-depth-monitor",
  [:snakepit, :pool, :status],
  fn _event, %{queue_depth: depth}, metadata, _ ->
    if depth > 50 do
      Logger.error("High queue depth detected",
        pool: metadata.pool_name,
        queue_depth: depth,
        available_workers: metadata.available_workers
      )

      # Trigger autoscaling
      MyApp.Autoscaler.scale_up(metadata.pool_name)
    end
  end,
  nil
)
```

### Distributed Tracing with Correlation IDs

```elixir
:telemetry.attach_many(
  "distributed-tracer",
  [
    [:snakepit, :pool, :queue, :enqueued],
    [:snakepit, :python, :call, :start],
    [:snakepit, :python, :call, :stop]
  ],
  fn event, measurements, %{correlation_id: id} = metadata, _ ->
    # All events for the same request share the same correlation_id
    MyApp.Tracing.record_span(id, event, measurements, metadata)
  end,
  nil
)
```

## Integration with Metrics Systems

### Prometheus

```elixir
# mix.exs
def deps do
  [
    {:snakepit, "~> 0.7"},
    {:telemetry_metrics_prometheus, "~> 1.1"}
  ]
end

# lib/myapp/telemetry.ex
defmodule MyApp.Telemetry do
  use Supervisor
  import Telemetry.Metrics

  def start_link(arg) do
    Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
  end

  def init(_arg) do
    children = [
      {:telemetry_metrics_prometheus, metrics: metrics()}
    ]

    Supervisor.init(children, strategy: :one_for_one)
  end

  defp metrics do
    [
      # Pool metrics
      last_value("snakepit.pool.status.queue_depth",
        tags: [:node, :pool_name]
      ),
      last_value("snakepit.pool.status.available_workers",
        tags: [:node, :pool_name]
      ),

      # Python call metrics
      summary("snakepit.python.call.stop.duration",
        unit: {:native, :millisecond},
        tags: [:node, :pool_name, :command]
      ),
      counter("snakepit.python.call.exception.count",
        tags: [:node, :error_type]
      ),

      # Worker lifecycle
      counter("snakepit.pool.worker.spawned.count",
        tags: [:node, :pool_name]
      )
    ]
  end
end
```

### StatsD

```elixir
# mix.exs
{:telemetry_metrics_statsd, "~> 0.7"}

# In your telemetry module
children = [
  {TelemetryMetricsStatsd,
   metrics: metrics(),
   host: "statsd.local",
   port: 8125}
]
```

### OpenTelemetry

```elixir
# mix.exs
{:opentelemetry_telemetry, "~> 1.0"}

# Attach OTEL handlers
:telemetry.attach_many(
  "otel-tracer",
  [
    [:snakepit, :python, :call, :start],
    [:snakepit, :python, :call, :stop],
    [:snakepit, :python, :call, :exception]
  ],
  &OpentelemetryTelemetry.handle_event/4,
  %{span_name: "snakepit.python.call"}
)
```

## Runtime Control

### Adjusting Sampling Rates

```elixir
# Reduce to 10% sampling for high-frequency events
Snakepit.Telemetry.GrpcStream.update_sampling("worker_1", 0.1)

# Apply to specific event patterns
Snakepit.Telemetry.GrpcStream.update_sampling(
  "worker_1",
  0.1,
  ["python.call.*"]
)
```

### Toggling Telemetry

```elixir
# Disable telemetry for a specific worker
Snakepit.Telemetry.GrpcStream.toggle("worker_1", false)

# Re-enable
Snakepit.Telemetry.GrpcStream.toggle("worker_1", true)
```

### Event Filtering

```elixir
# Only allow specific events
Snakepit.Telemetry.GrpcStream.update_filter("worker_1",
  allow: ["python.call.*", "python.tool.*"]
)

# Block specific events
Snakepit.Telemetry.GrpcStream.update_filter("worker_1",
  deny: ["python.memory.sampled"]
)
```

## Python API

### Basic Event Emission

```python
from snakepit_bridge import telemetry

# Emit a simple event
telemetry.emit(
    "tool.execution.start",
    {"system_time": time.time_ns()},
    {"tool": "my_tool", "operation": "predict"},
    correlation_id="abc-123"
)
```

### Span Context Manager

```python
from snakepit_bridge import telemetry

def my_tool(ctx, data):
    # Automatically emits start/stop/exception events
    with telemetry.span("tool.execution", {"tool": "my_tool"}):
        result = process_data(data)

        # Emit additional metrics within the span
        telemetry.emit(
            "tool.result_size",
            {"bytes": len(result)},
            {"tool": "my_tool"}
        )

        return result
```

### Correlation ID Propagation

```python
from snakepit_bridge import telemetry

def my_tool(ctx, data):
    # Get correlation ID from context
    correlation_id = telemetry.get_correlation_id()

    # All events within this tool will share the same correlation_id
    with telemetry.span("tool.execution", {"tool": "my_tool"}, correlation_id):
        result = do_work(data)
        return result
```

## Event Structure

All telemetry events follow the standard `:telemetry` structure:

```elixir
:telemetry.execute(
  event_name :: [atom()],           # [:snakepit, :component, :resource, :action]
  measurements :: %{atom() => number()},
  metadata :: %{atom() => term()}
)
```

### Measurements

Numeric data about the event:

```elixir
%{
  duration: 1_234_567,              # Native time units
  system_time: 1_698_234_567_890,  # System.system_time()
  queue_depth: 5,
  memory_bytes: 1_048_576
}
```

### Metadata

Contextual information about the event:

```elixir
%{
  node: :node1@localhost,          # Always included
  pool_name: :default,
  worker_id: "worker_1",
  command: "predict",
  correlation_id: "abc-123",       # For distributed tracing
  result: :success
}
```

## Performance Considerations

### Event Frequency

**High Frequency** (consider sampling):
- `[:snakepit, :python, :call, :*]` - Every command execution
- `[:snakepit, :grpc, :call, :*]` - Every gRPC call

**Medium Frequency**:
- `[:snakepit, :pool, :worker, :*]` - Worker lifecycle
- `[:snakepit, :session, :*]` - Session lifecycle

**Low Frequency**:
- `[:snakepit, :pool, :status]` - Periodic (every 5-60s)
- `[:snakepit, :python, :memory, :sampled]` - Periodic

### Sampling Strategy

```elixir
# For high-frequency events, use sampling
config :snakepit, :telemetry,
  sampling_rate: 0.1  # 10% of events

# Or sample per-worker at runtime
Snakepit.Telemetry.GrpcStream.update_sampling("worker_1", 0.1)
```

### Performance Impact

- **Event emission**: ~1-5 μs
- **gRPC serialization**: ~1-2 μs
- **Validation**: ~2-3 μs
- **Total overhead**: <10 μs per event
- **CPU impact**: <1% at 100% sampling, <0.1% at 10% sampling

## Troubleshooting

### No Events Received

```elixir
# Check if telemetry is enabled
Application.get_env(:snakepit, :telemetry, [])

# List attached handlers
:telemetry.list_handlers([:snakepit, :python, :call, :stop])

# Test emission manually
:telemetry.execute(
  [:snakepit, :python, :call, :stop],
  %{duration: 1000},
  %{command: "test"}
)
```

### Handler Crashes

```elixir
# Always wrap handlers with error handling
def handle_event(event, measurements, metadata, _config) do
  try do
    actual_handler(event, measurements, metadata)
  rescue
    e ->
      Logger.error("Telemetry handler crashed: #{inspect(e)}")
  end
end
```

### High Memory Usage

Check Python telemetry queue:

```python
# In Python worker
backend = telemetry.get_backend()
if hasattr(backend, 'stream'):
    dropped = backend.stream.dropped_count
    if dropped > 0:
        logger.warning(f"Telemetry dropped {dropped} events")
```

### Performance Issues

```elixir
# Reduce sampling rate
Snakepit.Telemetry.GrpcStream.update_sampling("worker_1", 0.1)

# Or disable entirely for specific workers
Snakepit.Telemetry.GrpcStream.toggle("worker_1", false)
```

## Architecture

The telemetry system consists of three main components:

1. **Event Catalog** (`Snakepit.Telemetry.Naming`) - Validates event names and prevents atom table exhaustion
2. **Metadata Safety** (`Snakepit.Telemetry.SafeMetadata`) - Sanitizes metadata from Python
3. **gRPC Stream Manager** (`Snakepit.Telemetry.GrpcStream`) - Manages bidirectional streams with Python workers

Events flow: Python → gRPC Stream → Validation → `:telemetry.execute()` → Your Handlers

## Additional Resources

- **Detailed Design**: See `docs/20251028/telemetry/00_ARCHITECTURE.md`
- **Event Catalog**: See `docs/20251028/telemetry/01_EVENT_CATALOG.md`
- **Python Integration**: See `docs/20251028/telemetry/02_PYTHON_INTEGRATION.md`
- **Client Guide**: See `docs/20251028/telemetry/03_CLIENT_GUIDE.md`
- **gRPC Stream Details**: See `docs/20251028/telemetry/04_GRPC_STREAM.md`

## License

Same as Snakepit - see LICENSE file.