ADVANCED_FEATURES.md

# Advanced Pipeline Features Guide

**๐ŸŽฏ Library Status: Complete Implementation Ready** - This guide covers the 6 critical advanced features that transform pipeline_ex from an 8.5/10 library into a complete 10/10 AI engineering platform.

## Overview

This document covers the advanced features implemented to address the critical gaps identified in the missing pieces analysis. These features enable intelligent, self-correcting workflows with enterprise-grade capabilities.

## ๐Ÿ”„ 1. Enhanced Loop Constructs

### For Loops
Execute steps iteratively over data collections with full variable scoping and error handling.

```yaml
- name: "process_files"
  type: "for_loop"
  iterator: "file"
  data_source: "previous_response:file_list"
  steps:
    - name: "analyze_file"
      type: "claude"
      prompt: "Analyze file: {{loop.file.name}}"
      
- name: "nested_processing"
  type: "for_loop"
  iterator: "category"
  data_source: "categories"
  parallel: true
  max_parallel: 3
  steps:
    - name: "process_category_files"
      type: "for_loop"
      iterator: "file"
      data_source: "{{loop.category.files}}"
      steps:
        - name: "analyze_file"
          type: "claude"
          prompt: "Analyze {{loop.file.path}} in {{loop.parent.category.name}}"
```

### While Loops
Continue execution until conditions are met with safety limits and early termination.

```yaml
- name: "fix_until_passing"
  type: "while_loop"
  condition: "test_results.status != 'passed'"
  max_iterations: 5
  steps:
    - name: "run_tests"
      type: "claude"
      prompt: "Run tests and analyze failures"
    - name: "fix_issues"
      type: "claude"
      prompt: "Fix the failing tests based on: {{previous_response}}"
```

### Loop Features
- **Variable Scoping**: `{{loop.variable}}` access with nested loop support (`{{loop.parent.variable}}`)
- **Parallel Execution**: Configure `parallel: true` with `max_parallel` limits
- **Safety Limits**: `max_iterations` prevents infinite loops
- **Error Handling**: `break_on_error` and graceful degradation
- **Performance**: Memory-efficient streaming for large datasets

## ๐Ÿง  2. Complex Conditional Logic

### Boolean Expressions
Advanced condition evaluation with AND/OR/NOT logic and comparison operators.

```yaml
- name: "conditional_step"
  type: "claude"
  condition:
    and:
      - "analysis.score > 7"
      - or:
        - "analysis.status == 'passed'"
        - "analysis.warnings.length < 3"
      - not: "analysis.errors.length > 0"
  prompt: "Proceed with implementation..."
```

### Comparison Operations
Support for mathematical and string comparison operations.

```yaml
- name: "complex_decision"
  type: "claude"
  condition:
    and:
      - "analysis.score * analysis.confidence > 0.8"
      - "any(analysis.issues, 'severity == \"high\"') == false"
      - "length(analysis.recommendations) between 3 and 10"
      - "analysis.timestamp > now() - days(7)"
      - "analysis.file_path matches '.*\\.ex$'"
```

### Switch/Case Branching
Route execution based on values with default fallbacks.

```yaml
- name: "route_by_status"
  type: "switch"
  expression: "analysis.status"
  cases:
    "passed": 
      - name: "deploy_step"
        type: "claude"
        prompt: "Deploy the application..."
    "failed":
      - name: "fix_step"
        type: "claude"
        prompt: "Fix the identified issues..."
    "warning":
      - name: "review_step"
        type: "claude"
        prompt: "Review warnings and decide..."
  default:
    - name: "unknown_status"
      type: "claude"
      prompt: "Handle unknown status..."
```

### Condition Features
- **Operators**: `>`, `<`, `==`, `!=`, `contains`, `matches`, `between`
- **Functions**: `length()`, `any()`, `all()`, `count()`, `sum()`, `average()`
- **Pattern Matching**: Regular expressions and string patterns
- **Date/Time**: Relative time comparisons and arithmetic
- **Mathematical**: Complex expressions with variables

## ๐Ÿ“ 3. Advanced File Operations

### Core File Operations
Comprehensive file manipulation within pipeline workspaces.

```yaml
- name: "copy_config"
  type: "file_ops"
  operation: "copy"
  source: "templates/config.yaml"
  destination: "config/app.yaml"

- name: "validate_files"
  type: "file_ops"
  operation: "validate"
  files:
    - path: "lib/my_app.ex"
      must_exist: true
      min_size: 100
    - path: "test/"
      must_be_dir: true

- name: "convert_data"
  type: "file_ops"
  operation: "convert"
  source: "data.csv"
  destination: "data.json"
  format: "csv_to_json"
```

### Supported Operations
- **copy**: Duplicate files with path resolution
- **move**: Relocate files atomically
- **delete**: Remove files and directories safely
- **validate**: Check existence, size, permissions
- **list**: Directory scanning with filters
- **convert**: Format transformations (CSVโ†”JSONโ†”YAMLโ†”XML)

### File Operation Features
- **Atomic Operations**: Rollback on failure
- **Permission Checking**: Validate access before operations
- **Large File Streaming**: Memory-efficient processing
- **Workspace Relative**: Safe path resolution within workspaces
- **Binary Support**: Handle images, PDFs, and other binary formats

## ๐Ÿ”„ 4. Structured Data Transformation

### Schema Validation
Enforce structured output formats with comprehensive validation.

```yaml
- name: "analyze_code"
  type: "claude"
  output_schema:
    type: "object"
    required: ["analysis", "recommendations", "score"]
    properties:
      analysis:
        type: "string"
        min_length: 50
      score:
        type: "number"
        minimum: 0
        maximum: 10
      recommendations:
        type: "array"
        items:
          type: "object"
          properties:
            priority: {type: "string", enum: ["high", "medium", "low"]}
            action: {type: "string"}
```

### Data Transformation
Manipulate structured data between pipeline steps.

```yaml
- name: "process_results"
  type: "data_transform"
  input_source: "previous_response:analysis"
  operations:
    - operation: "filter"
      field: "recommendations"
      condition: "priority == 'high'"
    - operation: "aggregate"
      field: "scores"
      function: "average"
    - operation: "join"
      left_field: "files"
      right_source: "previous_response:file_metadata"
      join_key: "filename"
  output_field: "processed_data"
```

### Query Language
JSONPath-like syntax for complex data extraction.

```yaml
- name: "extract_high_priority"
  type: "data_transform"
  operations:
    - operation: "query"
      expression: "$.analysis[?(@.score > 7)].recommendations"
    - operation: "transform"
      expression: "$.files[*].{name: filename, size: filesize}"
```

### Data Features
- **JSON Schema**: Complete validation with clear error messages
- **Transformations**: filter, map, aggregate, join, group_by, sort
- **Query Engine**: JSONPath expressions with functions
- **Type Safety**: Automatic type conversion and validation
- **Chaining**: Multiple operations in sequence

## ๐Ÿ—‚๏ธ 5. Codebase Intelligence System

### Automatic Discovery
Intelligent project structure analysis and context awareness.

```yaml
- name: "analyze_project"
  type: "claude"
  codebase_context: true
  prompt: |
    Analyze this {{codebase.project_type}} project.
    Main files: {{codebase.structure.main_files}}
    Dependencies: {{codebase.dependencies}}
    Recent changes: {{codebase.git_info.recent_commits}}
```

### Codebase Queries
Search and analyze code relationships intelligently.

```yaml
- name: "find_related_files"
  type: "codebase_query"
  queries:
    main_modules:
      find_files:
        - type: "main"
        - pattern: "lib/**/*.ex"
        - exclude_tests: true
    test_files:
      find_files:
        - related_to: "{{previous_response:target_file}}"
        - type: "test"
    dependencies:
      find_dependencies:
        - for_file: "lib/user.ex"
        - include_transitive: false
```

### Code Analysis
AST parsing and semantic understanding for multiple languages.

```yaml
- name: "analyze_code_structure"
  type: "codebase_query"
  queries:
    functions:
      find_functions:
        - in_file: "lib/user.ex"
        - public_only: true
    dependencies:
      find_dependents:
        - of_file: "lib/user.ex"
        - include_tests: true
```

### Codebase Features
- **Project Detection**: Elixir, Node.js, Python, Go, Rust support
- **File Relationships**: Dependency mapping and impact analysis
- **Git Integration**: Commit history, branch status, change detection
- **Semantic Search**: Find functions, classes, imports across codebases
- **Test Mapping**: Automatic test-to-code relationship discovery

## ๐Ÿ’พ 6. State Management & Variables

### Variable Assignment
Persistent state management across pipeline execution.

```yaml
- name: "initialize_state"
  type: "set_variable"
  variables:
    attempt_count: 0
    error_threshold: 3
    processed_files: []

- name: "increment_counter"
  type: "set_variable"
  variables:
    attempt_count: "{{state.attempt_count + 1}}"
```

### Variable Interpolation
Use variables throughout pipeline configurations.

```yaml
- name: "conditional_step"
  type: "claude"
  condition: "state.attempt_count < state.error_threshold"
  prompt: "Attempt #{{state.attempt_count}}: Process data"
```

### State Persistence
Maintain state across pipeline runs and checkpoint recovery.

```yaml
- name: "save_progress"
  type: "checkpoint"
  state:
    completed_files: "{{state.processed_files}}"
    last_successful_step: "{{current_step}}"
```

### State Features
- **Scoping**: Global, loop, and session variable scopes
- **Persistence**: Automatic checkpoint integration
- **Type Safety**: Variable validation and type checking
- **Interpolation**: Template variables in any configuration field
- **Mutation**: Safe state updates with rollback support

## ๐Ÿš€ Performance & Streaming

### Large Dataset Processing
Memory-efficient handling of large files and data collections.

```yaml
- name: "process_large_dataset"
  type: "file_ops"
  operation: "stream_process"
  source: "large_data.csv"
  chunk_size: 1000
  processor:
    type: "claude"
    prompt: "Process data chunk: {{chunk}}"
```

### Parallel Execution
Concurrent processing with resource management.

```yaml
- name: "parallel_analysis"
  type: "for_loop"
  iterator: "file"
  data_source: "file_list"
  parallel: true
  max_parallel: 5
  memory_limit: "500MB"
  steps:
    - name: "analyze_file"
      type: "claude"
      prompt: "Analyze {{loop.file}}"
```

### Performance Features
- **Streaming I/O**: Process files without loading into memory
- **Lazy Evaluation**: Compute results only when needed
- **Resource Limits**: Memory and execution time constraints
- **Performance Monitoring**: Built-in metrics and bottleneck detection
- **Optimization**: Automatic query optimization and caching

## ๐Ÿ› ๏ธ Integration Examples

### Complete Workflow Example
A real-world example combining all advanced features:

```yaml
workflow:
  name: "advanced_code_analysis"
  description: "Complete codebase analysis with intelligent processing"
  
  steps:
    - name: "discover_project"
      type: "codebase_query"
      codebase_context: true
      queries:
        project_info:
          get_project_type: true
          get_dependencies: true
          get_git_status: true
    
    - name: "initialize_analysis_state"
      type: "set_variable"
      variables:
        total_files: 0
        processed_files: []
        issues_found: []
        analysis_score: 0
    
    - name: "find_source_files"
      type: "codebase_query"
      queries:
        source_files:
          find_files:
            - type: "source"
            - exclude_tests: true
            - modified_since: "{{state.last_analysis_date}}"
    
    - name: "analyze_files"
      type: "for_loop"
      iterator: "file"
      data_source: "previous_response:source_files"
      parallel: true
      max_parallel: 3
      steps:
        - name: "analyze_single_file"
          type: "claude"
          output_schema:
            type: "object"
            required: ["file_path", "issues", "score"]
            properties:
              file_path: {type: "string"}
              issues:
                type: "array"
                items:
                  type: "object"
                  properties:
                    severity: {type: "string", enum: ["low", "medium", "high"]}
                    message: {type: "string"}
              score: {type: "number", minimum: 0, maximum: 10}
          prompt: |
            Analyze this {{codebase.project_type}} file: {{loop.file.path}}
            
            File content:
            ```
            {{file:{{loop.file.path}}}}
            ```
            
            Consider:
            - Code quality and style
            - Potential bugs or issues
            - Performance concerns
            - Security vulnerabilities
        
        - name: "update_analysis_state"
          type: "set_variable"
          variables:
            processed_files: "{{state.processed_files + [loop.file.path]}}"
            issues_found: "{{state.issues_found + previous_response.issues}}"
    
    - name: "filter_high_priority_issues"
      type: "data_transform"
      input_source: "state.issues_found"
      operations:
        - operation: "filter"
          condition: "severity == 'high'"
        - operation: "group_by"
          field: "file_path"
        - operation: "sort"
          field: "severity"
          order: "desc"
    
    - name: "generate_fixes"
      type: "while_loop"
      condition: "length(filtered_issues) > 0 and state.fix_attempts < 3"
      max_iterations: 3
      steps:
        - name: "attempt_fix"
          type: "claude"
          condition: 
            and:
              - "length(previous_response:filtered_issues) > 0"
              - "state.fix_attempts < 3"
          prompt: |
            Fix these high-priority issues:
            {{previous_response:filtered_issues}}
            
            Generate specific fix recommendations for each issue.
        
        - name: "increment_fix_attempts"
          type: "set_variable"
          variables:
            fix_attempts: "{{state.fix_attempts + 1}}"
    
    - name: "save_analysis_report"
      type: "data_transform"
      operations:
        - operation: "aggregate"
          input_source: "state"
          output_format: "analysis_report"
      
    - name: "export_results"
      type: "file_ops"
      operation: "convert"
      source: "analysis_report"
      destination: "analysis_report.json"
      format: "object_to_json"

    - name: "checkpoint_final_state"
      type: "checkpoint"
      state:
        analysis_complete: true
        total_issues: "{{length(state.issues_found)}}"
        high_priority_issues: "{{length(filtered_issues)}}"
        completion_time: "{{now()}}"
```

## ๐Ÿงช Testing & Validation

All advanced features support comprehensive testing:

### Mock Mode Testing
```bash
# Test all advanced features with mocks
mix pipeline.run examples/advanced_features_example.yaml

# Test specific feature sets
mix pipeline.run examples/loops_example.yaml
mix pipeline.run examples/conditions_example.yaml
mix pipeline.run examples/file_ops_example.yaml
mix pipeline.run examples/data_transform_example.yaml
mix pipeline.run examples/codebase_query_example.yaml
```

### Live Mode Testing
```bash
# Test with real AI providers
mix pipeline.run.live examples/advanced_features_example.yaml

# Performance testing
mix pipeline.benchmark examples/large_dataset_example.yaml
```

### Integration Testing
```bash
# Full integration test suite
mix test test/integration/advanced_features_test.exs

# Performance benchmarks
mix test test/performance/advanced_features_performance_test.exs
```

## ๐Ÿ“š Migration Guide

### From Basic to Advanced
Existing pipelines remain fully compatible. To use advanced features:

1. **Add Loop Processing**:
   ```yaml
   # Before: Single step
   - name: "analyze"
     type: "claude"
     prompt: "Analyze file1.ex"
   
   # After: Loop over multiple files
   - name: "analyze_all"
     type: "for_loop"
     iterator: "file"
     data_source: "file_list"
     steps:
       - name: "analyze"
         type: "claude"
         prompt: "Analyze {{loop.file}}"
   ```

2. **Add Conditional Logic**:
   ```yaml
   # Before: Always runs
   - name: "deploy"
     type: "claude"
     prompt: "Deploy application"
   
   # After: Conditional execution
   - name: "deploy"
     type: "claude"
     condition: "tests.status == 'passed' and analysis.score > 8"
     prompt: "Deploy application"
   ```

3. **Add Schema Validation**:
   ```yaml
   # Before: Unstructured output
   - name: "analyze"
     type: "claude"
     prompt: "Analyze code"
   
   # After: Structured output
   - name: "analyze"
     type: "claude"
     output_schema:
       type: "object"
       required: ["score", "issues"]
     prompt: "Analyze code and return JSON"
   ```

## ๐ŸŽฏ Best Practices

### Performance
- Use `parallel: true` for independent loop iterations
- Set appropriate `max_parallel` limits (typically 3-5)
- Use streaming for files >100MB
- Set memory limits for long-running processes

### Error Handling
- Always set `max_iterations` on while loops
- Use `break_on_error: false` for non-critical operations
- Implement fallback strategies with conditions
- Add checkpoints for long-running workflows

### Data Management
- Validate schemas early in pipelines
- Use data transformations to normalize between steps
- Keep state variables minimal and focused
- Clean up large temporary data regularly

### Codebase Intelligence
- Enable `codebase_context: true` for code analysis steps
- Cache codebase discovery results for multiple runs
- Use specific queries rather than broad scans
- Combine with file operations for intelligent refactoring

## ๐Ÿ”— Related Documentation

- [README.md](README.md) - Complete library usage guide
- [PIPELINE_CONFIG_GUIDE.md](PIPELINE_CONFIG_GUIDE.md) - Configuration reference
- [USE_CASES.md](USE_CASES.md) - Working examples for various features
- [TESTING_ARCHITECTURE.md](TESTING_ARCHITECTURE.md) - Testing approaches

## ๐Ÿš€ 6. Async Streaming

Enable real-time message streaming for all Claude-based steps, displaying complete messages as they arrive from ClaudeCodeSDK for better user experience and resource management.

### Why Async Streaming?

- **Real-time feedback**: See Claude's complete messages as they arrive (message-by-message streaming)
- **Memory efficiency**: Stream large outputs without loading everything into memory
- **Early interruption**: Stop long-running operations if they go off track
- **Better user experience**: Progressive display of assistant responses, tool uses, and results

### Basic Configuration

```yaml
- name: "streaming_analysis"
  type: "claude"
  claude_options:
    # Enable async streaming
    async_streaming: true
    stream_handler: "console"  # Handler type
    stream_buffer_size: 100    # Buffer size for batching
    
    # Regular options work as normal
    max_turns: 20
    allowed_tools: ["Write", "Edit", "Read"]
```

### Stream Handlers

The implementation provides 6 specialized handlers for different streaming needs:

#### 1. Console Handler (`console`)
Fancy formatted output with styled headers and statistics:

```yaml
stream_handler: "console"
stream_handler_opts:
  show_header: true          # Display styled header
  show_stats: true           # Show completion statistics
  show_tool_use: true        # Display tool invocations
  show_timestamps: false     # Add timestamps to messages
  use_colors: true           # Colorized output
```

Output example:
```
โ•ญโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฎ
โ”‚      Claude Streaming Response          โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ

Assistant message content here...

โ•ญโ”€โ”€โ”€ Stream Statistics โ”€โ”€โ”€โ•ฎ
โ”‚ Messages: 3             โ”‚
โ”‚ Tokens:   0             โ”‚
โ”‚ Duration: 3.5s          โ”‚
โ”‚ Avg/msg:  1.2s          โ”‚
โ•ฐโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ•ฏ
```

#### 2. Simple Handler (`simple`)
Clean line-by-line output with optional timestamps:

```yaml
stream_handler: "simple"
stream_handler_opts:
  show_timestamps: true      # Prepend timestamps
```

Output example:
```
[06:54:52] ASSISTANT: I'll perform these file operations...
[06:54:53] TOOL USE: Write
[06:54:53] TOOL RESULT: File created successfully...
[06:54:56] ASSISTANT: โœ… Step 1 completed...

โœ“ Stream completed: 5 messages in 23356ms
```

#### 3. Debug Handler (`debug`)
Complete message debugging showing all message types:

```yaml
stream_handler: "debug"
```

Shows system messages, assistant responses, tool uses, tool results, and metadata.

#### 4. File Handler (`file`)
Stream messages to file with rotation support:

```yaml
stream_handler: "file"
stream_handler_opts:
  file_path: "./outputs/stream.log"
  append: true
  format: "jsonl"            # json lines format
```

#### 5. Buffer Handler (`buffer`)
Collect messages in memory for later processing:

```yaml
stream_handler: "buffer"
stream_handler_opts:
  max_size: 1000            # Maximum buffer size
```

#### 6. Callback Handler (`callback`)
Custom message processing with your own handler:

```yaml
stream_handler: "callback"
stream_handler_opts:
  module: "MyApp.StreamProcessor"
  function: "handle_message"
```

### Advanced Streaming Features

#### Session Streaming
Maintain conversation continuity with streaming:

```yaml
- name: "streaming_session"
  type: "claude_session"
  session_config:
    session_name: "interactive_coding"
    persist: true
  claude_options:
    async_streaming: true
    stream_handler: "file"
    stream_file_path: "./sessions/{{session_id}}_stream.jsonl"
```

#### Parallel Streaming
Multiple concurrent streams with different handlers:

```yaml
- name: "parallel_streams"
  type: "parallel_claude"
  parallel_tasks:
    - id: "console_task"
      claude_options:
        async_streaming: true
        stream_handler: "console"
    - id: "file_task"
      claude_options:
        async_streaming: true
        stream_handler: "file"
    - id: "buffer_task"
      claude_options:
        async_streaming: true
        stream_handler: "buffer"
```

#### Robust Streaming
Error recovery with streaming continuity:

```yaml
- name: "robust_stream"
  type: "claude_robust"
  retry_config:
    max_retries: 3
    retry_conditions: ["stream_interrupted"]
  claude_options:
    async_streaming: true
    stream_handler: "console"
```

### Performance Benefits

1. **Time to First Message**: See output as soon as first message arrives
2. **Memory Usage**: Constant memory through streaming instead of buffering
3. **Progressive Display**: View assistant responses, tool uses, and results in real-time
4. **Message Metrics**: Track message count, duration, and timing statistics

### Integration Examples

#### Custom Stream Processor

```elixir
defmodule MyApp.StreamProcessor do
  @behaviour Pipeline.Streaming.AsyncHandler
  
  def init(opts) do
    {:ok, %{message_count: 0, start_time: System.monotonic_time()}}
  end
  
  def handle_message(%{type: :text, data: data}, state) do
    # Process text messages
    IO.write(data.content)
    {:ok, %{state | message_count: state.message_count + 1}}
  end
  
  def handle_message(%{type: :tool_use}, state) do
    # Buffer tool use messages
    {:buffer, state}
  end
  
  def handle_batch(messages, state) do
    # Process buffered messages
    Enum.each(messages, &process_tool_use/1)
    {:ok, state}
  end
end
```

#### Phoenix LiveView Integration

```elixir
defmodule MyAppWeb.StreamingLive do
  use Phoenix.LiveView
  
  def handle_event("start_analysis", params, socket) do
    # Start streaming pipeline
    task = Task.async(fn ->
      Pipeline.run("streaming_analysis.yaml",
        stream_callback: &send(self(), {:stream_update, &1})
      )
    end)
    
    {:noreply, assign(socket, task: task, messages: [])}
  end
  
  def handle_info({:stream_update, message}, socket) do
    # Update UI with streaming messages
    {:noreply, update(socket, :messages, &[message | &1])}
  end
end
```

### Testing Streaming

```elixir
# Test with mock streams
test "handles streaming responses" do
  AsyncMocks.create_mock_stream("my_step", :realistic, %{
    message_count: 50,
    include_tool_use: true
  })
  
  assert {:ok, results} = Pipeline.run("streaming_pipeline.yaml")
  assert results["my_step"]["streaming_enabled"] == true
end

# Test performance
test "streaming improves time to first output" do
  # Compare streaming vs non-streaming
  {time_streaming, _} = :timer.tc(fn ->
    Pipeline.run("pipeline.yaml", async_streaming: true)
  end)
  
  {time_normal, _} = :timer.tc(fn ->
    Pipeline.run("pipeline.yaml", async_streaming: false)
  end)
  
  # Streaming should show output faster
  assert time_streaming < time_normal
end
```

### Implementation Notes

- **Message-based streaming**: ClaudeCodeSDK uses `--output-format stream-json` to stream complete messages
- **Not character streaming**: Each message arrives as a complete unit, not character-by-character
- **Message types**: Stream includes system init, assistant messages, tool uses, tool results, and completion status
- **Escaped newlines**: The handlers properly convert `\n` to actual line breaks in message content

### Examples

See complete streaming examples:
- `examples/clean_streaming_numbers.yaml` - Simple number output example
- `examples/streaming_file_operations.yaml` - Multi-message file operations
- `examples/STREAMING_GUIDE.md` - Complete streaming implementation guide
- `test/integration/async_streaming_test.exs` - Comprehensive tests

---

**Next Steps**: See [TESTING_ARCHITECTURE.md](TESTING_ARCHITECTURE.md) for comprehensive examples and testing approaches for each advanced feature.