documentation/how-to/performance-optimization.md

# How to Optimize Reactor Performance

## Problem
You need to improve the performance of your reactor workflows, optimize concurrency, handle large-scale processing efficiently, and scale your system to handle high throughput with minimal resource usage.

## Solution Overview
This guide covers systematic performance optimization techniques for Reactor workflows, including concurrency tuning, resource management, memory optimization, and monitoring strategies. You'll learn to identify bottlenecks and apply targeted optimizations.

## Prerequisites
- Understanding of Reactor basics (inputs, steps, arguments)
- Familiarity with Elixir concurrency concepts (processes, tasks, supervisors)
- Experience with async workflows and map steps
- Basic knowledge of system performance concepts

## Understanding Performance Characteristics

### Reactor's Performance Model

Reactor is designed for high-throughput, concurrent execution with these characteristics:

**Strengths:**
- **Automatic parallelisation**: Independent steps run concurrently by default
- **Resource pooling**: Shared concurrency pools across reactor instances
- **Lazy evaluation**: Only processes what's needed when it's needed
- **Efficient batching**: Map steps process collections in configurable chunks

**Bottlenecks to watch for:**
- **Dependency chains**: Sequential dependencies limit parallelisation
- **Resource contention**: Too many concurrent operations overwhelming external systems
- **Memory usage**: Large intermediate results consuming memory
- **CPU vs I/O mixing**: Synchronous CPU work blocking async I/O operations

## Concurrency Optimization

### 1. Configuring Basic Concurrency Limits

Control how many steps can run concurrently within a single reactor:

```elixir
defmodule DataProcessor do
  use Reactor

  input :user_ids

  # Multiple independent steps that can run in parallel
  step :fetch_profiles do
    argument :user_ids, input(:user_ids)
    run fn %{user_ids: ids}, _context ->
      profiles = fetch_user_profiles(ids)
      {:ok, profiles}
    end
  end

  step :fetch_preferences do
    argument :user_ids, input(:user_ids)
    run fn %{user_ids: ids}, _context ->
      preferences = fetch_user_preferences(ids)
      {:ok, preferences}
    end
  end

  step :fetch_activity do
    argument :user_ids, input(:user_ids)
    run fn %{user_ids: ids}, _context ->
      activity = fetch_user_activity(ids)
      {:ok, activity}
    end
  end

  # This step waits for all the above to complete
  step :combine_data do
    argument :profiles, result(:fetch_profiles)
    argument :preferences, result(:fetch_preferences)
    argument :activity, result(:fetch_activity)
    
    run fn args, _context ->
      combined = combine_user_data(args)
      {:ok, combined}
    end
  end

  return :combine_data
end

# Control concurrency when running the reactor
{:ok, result} = Reactor.run(
  DataProcessor,
  %{user_ids: [1, 2, 3, 4, 5]},
  %{},
  max_concurrency: 10  # At most 10 steps running concurrently
)
```

### 2. Tuning Concurrency Settings

Optimal concurrency settings depend on your specific workload and system characteristics. Start with these guidelines, then experiment and measure:

```elixir
# Starting points for different workload types:

# For I/O-bound workloads (API calls, database queries)
max_concurrency: System.schedulers_online() * 4  # 4x CPU cores

# For CPU-bound workloads  
max_concurrency: System.schedulers_online()      # 1x CPU cores

# For mixed workloads
max_concurrency: System.schedulers_online() * 2  # 2x CPU cores

# For external service limits (rate limiting)
max_concurrency: 10  # Based on service constraints
```

**How to find your optimal settings:**

1. **Start with the suggested baseline** for your workload type
2. **Monitor system resources** - CPU usage, memory, network connections
3. **Test with different values** - try 50%, 150%, 200% of your baseline
4. **Measure end-to-end performance** - both throughput and latency
5. **Watch for external bottlenecks** - database connection limits, API rate limits
6. **Consider system stability** - avoid settings that cause resource exhaustion

**Signs you need to adjust:**

- **Too low**: CPU cores are idle, external services aren't being fully utilised
- **Too high**: High memory usage, connection pool exhaustion, degraded response times
- **External limits**: Rate limiting errors, connection timeouts, service overload responses

### 3. Shared Concurrency Pools

Share concurrency pools across reactor instances to prevent resource competition:

```elixir
# Create a shared pool
{:ok, pool_key} = Reactor.Executor.ConcurrencyTracker.allocate_pool(100)

# Use the pool across multiple reactor runs
opts = [concurrency_key: pool_key, max_concurrency: 100]

# All these reactors share the same 100-task limit
Task.async(fn -> Reactor.run(DataProcessor, inputs1, %{}, opts) end)
Task.async(fn -> Reactor.run(DataProcessor, inputs2, %{}, opts) end) 
Task.async(fn -> Reactor.run(DataProcessor, inputs3, %{}, opts) end)
```

## Map Step Performance Optimization

### 1. Batch Size Tuning

Configure batch sizes based on data characteristics and system capacity:

```elixir
defmodule BatchProcessingReactor do
  use Reactor

  input :records
  input :processing_type

  # For small, fast operations - larger batches
  map :process_lightweight_data do
    argument :source, input(:records)
    batch_size 1000  # Process 1000 items at once
    allow_async? true
    
    step :validate do
      argument :record, element(:source)
      run fn %{record: record}, _context ->
        # Fast validation logic
        {:ok, validate_record(record)}
      end
    end
    
    return :validate
  end

  # For expensive operations - smaller batches  
  map :process_heavy_computation do
    argument :source, input(:records)
    batch_size 10    # Process only 10 items at once
    # Tune reactor-level max_concurrency for CPU-bound work
    
    step :complex_calculation do
      argument :record, element(:source)
      run fn %{record: record}, _context ->
        # CPU-intensive processing
        {:ok, expensive_computation(record)}
      end
    end
    
    return :complex_calculation
  end

  return template("Processing complete: {{ processed }} records") do
    assign :processed, result(:process_lightweight_data)
  end
end
```

### 2. Understanding Map Step Memory Usage

**Critical memory considerations** when using map steps:

Map steps have complex memory usage patterns that are a function of both **batch size** and **mapped result size**. Understanding this is crucial for processing large datasets efficiently.

#### Memory Usage Components

**1. Input Record Storage:**
Each batch of input records is converted into individual steps and added to the reactor state. These steps contain the input record as a `value` argument, contributing directly to memory usage until the step runs.

**2. Intermediate Results Storage:**
A new map step is emitted which depends on the results of all batch steps. This means **all batch step results are stored in Reactor's intermediate results** until the map step completes.

**3. Final Result Storage:**
The overall result of the map step (collection of all batch results) is likely to be depended upon by other steps, so remains in intermediate results storage.

#### Memory Usage Formula

```elixir
Total Memory ≈ (Batch Size × Input Record Size) + 
               (Batch Size × Output Result Size) + 
               (Final Collection Size)
```

#### Batch Size Guidelines by Data Characteristics

```elixir
# For small input records, small output results
batch_size 1000  # Safe - total memory stays manageable

# For large input records OR large output results  
batch_size 10    # Conservative - limits memory multiplication

# For very large transformations (e.g., image processing)
batch_size 1     # Process one at a time to avoid memory spikes

# Consider your total dataset size:
# 1M records × 100 batch size = 100K steps in memory at once
# 1M records × 10 batch size = 100K steps total, 10K at once
```

#### Example: Memory-Aware Batch Configuration

```elixir
# For small records and lightweight transformations
defmodule LightweightProcessor do
  use Reactor

  input :records

  map :process_records do
    source input(:records)
    batch_size 1000  # Safe for small records (~1KB each)
    
    step :transform_record do
      argument :record, element(:process_records)
      run fn %{record: record}, _context ->
        # Lightweight transformation
        {:ok, simple_transform(record)}
      end
    end
    
    return :transform_record
  end

  return :process_records
end

# For large records or heavy transformations  
defmodule HeavyProcessor do
  use Reactor

  input :records

  map :process_records do
    source input(:records)
    batch_size 10   # Conservative for large records (>10KB each)
    
    step :transform_record do
      argument :record, element(:process_records)
      run fn %{record: record}, _context ->
        # Heavy transformation that produces large results
        {:ok, complex_transform(record)}
      end
    end
    
    return :transform_record
  end

  return :process_records
end

# For very large records (images, files, etc.)
defmodule SingleItemProcessor do
  use Reactor

  input :large_items

  map :process_items do
    source input(:large_items)
    batch_size 1    # Process one large item at a time
    
    step :transform_item do
      argument :item, element(:process_items)
      run fn %{item: item}, _context ->
        # Memory-intensive processing (e.g., image manipulation)
        {:ok, process_large_item(item)}
      end
    end
    
    return :transform_item
  end

  return :process_items
end
```

### 3. Memory-Efficient Streaming

Process large datasets without loading everything into memory:

```elixir
defmodule StreamingProcessor do
  use Reactor

  input :file_path
  input :output_path

  # Stream file processing without loading entire file
  map :process_file_stream do
    argument :source, input(:file_path)
    batch_size 100
    strict_ordering? false  # Improves performance when order doesn't matter
    
    step :transform_line do
      argument :line, element(:source)
      run fn %{line: line}, _context ->
        transformed = transform_data(line)
        {:ok, transformed}
      end
    end
    
    return :transform_line
  end

  # Write results in batches  
  step :write_output do
    argument :processed_data, result(:process_file_stream)
    argument :output_path, input(:output_path)
    
    run fn %{processed_data: data, output_path: path}, _context ->
      File.stream!(path, [], :line)
      |> Stream.chunk_every(1000)
      |> Stream.each(&write_batch(&1))
      |> Stream.run()
      
      {:ok, :written}
    end
  end

  return :write_output
end

```

## CPU vs I/O Optimization

### 1. Optimizing CPU and I/O Operations

Balance concurrency based on operation type - I/O operations can handle high concurrency, while CPU operations should use more conservative limits:

```elixir
defmodule OptimalWorkloadReactor do
  use Reactor

  input :user_ids

  # I/O operations - keep async (default)
  step :fetch_profiles do
    argument :user_ids, input(:user_ids)
    run fn %{user_ids: ids}, _context ->
      profiles = fetch_user_profiles(ids)
      {:ok, profiles}
    end
  end

  step :fetch_preferences do  
    argument :user_ids, input(:user_ids)
    run fn %{user_ids: ids}, _context ->
      preferences = fetch_user_preferences(ids)
      {:ok, preferences}
    end
  end

  # CPU operations - keep async but use lower concurrency limits
  step :calculate_recommendations do
    argument :profiles, result(:fetch_profiles)
    argument :preferences, result(:fetch_preferences)
    
    run fn %{profiles: profiles, preferences: prefs}, _context ->
      recommendations = calculate_complex_recommendations(profiles, prefs)
      {:ok, recommendations}
    end
  end

  # I/O operation - back to async
  step :save_recommendations do
    argument :recommendations, result(:calculate_recommendations)
    run fn %{recommendations: recs}, _context ->
      save_to_database(recs)
      {:ok, :saved}
    end
  end

  return :save_recommendations
end

# Run with different concurrency settings based on workload
# For this mixed I/O + CPU reactor, use moderate concurrency
{:ok, result} = Reactor.run(
  OptimalWorkloadReactor,
  %{user_ids: [1, 2, 3, 4, 5]},
  %{},
  max_concurrency: System.schedulers_online() * 2  # Balanced for mixed workload
)
```

### 2. CPU-bound Map Operations

Handle CPU-intensive map operations efficiently:

```elixir
defmodule CPUIntensiveProcessor do
  use Reactor

  input :datasets

  # CPU-bound processing with controlled concurrency
  map :process_datasets do
    argument :source, input(:datasets)
    batch_size 5        # Small batches for CPU work
    # Tune reactor-level max_concurrency for CPU-bound work
    
    step :complex_analysis do
      argument :dataset, element(:source)
      run fn %{dataset: data}, _context ->
        # CPU-intensive mathematical analysis
        result = perform_statistical_analysis(data)
        {:ok, result}
      end
    end
    
    return :complex_analysis
  end

  return :process_datasets
end
```

## Memory Management

### 1. Controlling Intermediate Results

Minimize memory usage by avoiding unnecessary intermediate result storage. Remember that undoable steps will have their results stored in the reactor's undo stack, so non-undoable steps save memory:

```elixir
defmodule MemoryEfficientReactor do
  use Reactor

  input :large_dataset

  # This step's result won't be stored since no other steps depend on it
  # Also, it's not undoable so won't be kept in the undo stack
  step :validate_data do
    argument :data, input(:large_dataset)
    run fn %{data: data}, _context ->
      # Large intermediate result that we don't want to keep
      validated = validate_large_dataset(data) 
      
      # Return only what's needed for next steps
      summary = %{
        total_records: length(validated),
        valid_records: count_valid(validated),
        errors: extract_errors(validated)
      }
      
      {:ok, summary}
    end
    # No undo/4 callback defined = not undoable = no result stored in undo stack
  end

  # Use summary instead of full dataset
  step :generate_report do
    argument :summary, result(:validate_data)
    run fn %{summary: summary}, _context ->
      report = create_summary_report(summary)
      {:ok, report}
    end
  end

  return :generate_report
end
```


## Performance Monitoring

### 1. Adding Telemetry for Performance Tracking

Monitor reactor performance with built-in telemetry:

```elixir
defmodule MonitoredReactor do
  use Reactor
  
  middleware Reactor.Middleware.Telemetry

  input :data

  step :slow_operation do
    argument :data, input(:data)
    run fn %{data: data}, _context ->
      # Slow operation that we want to monitor
      result = expensive_operation(data)
      {:ok, result}
    end
  end

  return :slow_operation
end

# Set up telemetry handlers
:telemetry.attach_many(
  "reactor-performance",
  [
    [:reactor, :run, :start],
    [:reactor, :run, :stop],
    [:reactor, :step, :run, :start], 
    [:reactor, :step, :run, :stop]
  ],
  fn event, measurements, metadata, _config ->
    case event do
      [:reactor, :run, :stop] ->
        duration_ms = measurements.duration |> System.convert_time_unit(:native, :millisecond)
        Logger.info("Reactor completed in #{duration_ms}ms")
        
      [:reactor, :step, :run, :stop] ->
        duration_ms = measurements.duration |> System.convert_time_unit(:native, :millisecond)
        step_name = metadata.step.name
        Logger.debug("Step #{step_name} completed in #{duration_ms}ms")
        
      _ -> :ok
    end
  end,
  nil
)
```


## Performance Benchmarking

### 1. Using Benchee for Reactor Performance Testing

Use [Benchee](https://hex.pm/packages/benchee) to create professional performance benchmarks with detailed reports:

```elixir
# Add to mix.exs dependencies
{:benchee, "~> 1.0", only: :dev}

defmodule ReactorBenchmarks do
  def run_concurrency_benchmark do
    data = generate_test_data(1_000)
    
    Benchee.run(
      %{
        "concurrency_1" => fn ->
          Reactor.run(ProcessingReactor, %{data: data}, %{}, max_concurrency: 1)
        end,
        "concurrency_5" => fn ->
          Reactor.run(ProcessingReactor, %{data: data}, %{}, max_concurrency: 5)
        end,
        "concurrency_10" => fn ->
          Reactor.run(ProcessingReactor, %{data: data}, %{}, max_concurrency: 10)
        end,
        "concurrency_20" => fn ->
          Reactor.run(ProcessingReactor, %{data: data}, %{}, max_concurrency: 20)
        end
      },
      warmup: 2,
      time: 5,
      memory_time: 2,
      formatters: [
        Benchee.Formatters.Console,
        {Benchee.Formatters.HTML, file: "benchmarks/concurrency_results.html"}
      ]
    )
  end

  def run_batch_size_benchmark do
    data = generate_test_data(1_000)
    
    # Use inputs to test different batch sizes
    Benchee.run(
      %{
        "map_processing" => fn {batch_size, data} ->
          reactor = create_map_reactor(batch_size)
          Reactor.run(reactor, %{data: data})
        end
      },
      inputs: %{
        "batch_10" => {10, data},
        "batch_50" => {50, data}, 
        "batch_100" => {100, data},
        "batch_500" => {500, data}
      },
      warmup: 1,
      time: 3,
      memory_time: 1
    )
  end

  def run_reactor_comparison_benchmark do
    data = generate_test_data(500)
    
    Benchee.run(
      %{
        "sync_reactor" => fn ->
          Reactor.run(SyncReactor, %{data: data}, %{}, async?: false)
        end,
        "async_reactor" => fn ->
          Reactor.run(AsyncReactor, %{data: data}, %{}, max_concurrency: 10)
        end,
        "optimized_reactor" => fn ->
          Reactor.run(OptimizedReactor, %{data: data}, %{}, max_concurrency: 5)
        end
      },
      warmup: 2,
      time: 5,
      memory_time: 2,
      formatters: [
        Benchee.Formatters.Console,
        {Benchee.Formatters.HTML, file: "benchmarks/reactor_comparison.html"},
        {Benchee.Formatters.JSON, file: "benchmarks/reactor_comparison.json"}
      ]
    )
  end
end

# Example output from Benchee:
#
# Name                    ips        average  deviation         median         99th %
# concurrency_10      12.34 K       81.05 μs    ±15.23%       76.00 μs      145.67 μs
# concurrency_5        8.91 K      112.23 μs    ±18.45%      108.00 μs      189.34 μs
# concurrency_1        3.45 K      289.78 μs    ±12.67%      285.00 μs      387.23 μs
#
# Comparison:
# concurrency_10      12.34 K
# concurrency_5        8.91 K - 1.38x slower +31.18 μs
# concurrency_1        3.45 K - 3.58x slower +208.73 μs
```


## Common Performance Anti-patterns

### 1. Avoid: Blocking Async Steps

When a step blocks, identify the root cause and model dependencies properly:

```elixir
# ❌ BAD: Blocking because of missing dependency
step :process_data do
  run fn args, _context ->
    # Blocking while waiting for external resource
    wait_for_resource_to_be_ready()  # This wastes concurrency slots
    process_with_resource(args)
  end
end

# ✅ GOOD: Model the dependency explicitly with separate steps
step :prepare_resource do
  run fn _args, _context ->
    # This step ensures the resource is ready
    setup_resource()
    {:ok, :resource_ready}
  end
end

step :process_data do
  argument :resource_ready, result(:prepare_resource)
  argument :data, input(:data)
  run fn %{data: data}, _context ->
    # No blocking - resource dependency ensures it's ready
    process_with_resource(data)
  end
end

# ✅ BETTER: Use recursive step for external resources
step :wait_for_external_service do
  run fn args, context ->
    case check_external_service() do
      {:ok, result} -> 
        {:ok, result}
      {:error, :not_ready} ->
        # Emit the current step again - Reactor will retry when it can
        current_step = context.current_step
        {:ok, nil, [current_step]}
    end
  end
end
```

### 2. Avoid: Excessive Dependencies

Design workflows to maximise parallelisation:

```elixir
# ❌ BAD: Artificial sequential dependencies
step :step1, do: run(fn -> {:ok, data1} end)
step :step2 do
  argument :data1, result(:step1)  # Unnecessary dependency
  run fn %{data1: _} -> {:ok, data2} end  # Doesn't actually use data1
end
step :step3 do
  argument :data2, result(:step2)  # Creates sequential chain
  run fn %{data2: _} -> {:ok, data3} end
end

# ✅ GOOD: Independent steps run in parallel
step :step1, do: run(fn -> {:ok, data1} end)
step :step2, do: run(fn -> {:ok, data2} end)  # No dependency on step1
step :step3, do: run(fn -> {:ok, data3} end)  # No dependency on step2

# Combine results only when needed
step :combine_results do
  argument :data1, result(:step1)
  argument :data2, result(:step2)
  argument :data3, result(:step3)
  run fn args -> {:ok, combine(args)} end
end
```

## Performance Troubleshooting

### Common issues and solutions

**Problem**: Reactor runs slower than expected  
**Diagnosis**: Check for:
- Reactor concurrency limits that don't match your workload
- Overly conservative concurrency limits
- Sequential dependencies that prevent parallelisation

**Problem**: High memory usage  
**Diagnosis**: Check for:
- Large intermediate results being stored unnecessarily
- Map steps with batch sizes that are too large
- Streams not being processed lazily

**Problem**: External service rate limiting errors  
**Solution**: Use compensation with exponential backoff and reduce `max_concurrency`:

```elixir
step :api_call do
  run fn args, _context ->
    call_external_api(args)
  end
  
  compensate fn _args, context ->
    # Use current_try for exponential backoff
    retry_count = context.current_try
    delay_ms = :math.pow(2, retry_count) * 1000 |> round()
    
    Process.sleep(delay_ms)
    :retry
  end
end

# Also reduce concurrency to respect service limits
{:ok, result} = Reactor.run(
  APIReactor,
  inputs,
  %{},
  max_concurrency: 5  # Lower concurrency for rate-limited APIs
)
```

**Problem**: Inconsistent performance  
**Solution**: Ensure proper resource isolation and monitoring:

```elixir
# Use dedicated concurrency pools for different workload types
fast_pool = Reactor.Executor.ConcurrencyTracker.allocate_pool(100)
slow_pool = Reactor.Executor.ConcurrencyTracker.allocate_pool(10)

# Fast operations
Reactor.run(FastReactor, data, %{}, concurrency_key: fast_pool)

# Slow operations  
Reactor.run(SlowReactor, data, %{}, concurrency_key: slow_pool)
```

## Summary

Optimizing Reactor performance requires understanding your workload characteristics and applying appropriate strategies:

**For I/O-bound workloads:**
- Use higher concurrency limits (4x CPU cores)
- Keep steps async by default
- Use shared concurrency pools to prevent resource competition

**For CPU-bound workloads:**
- Tune reactor concurrency limits to match CPU cores
- Keep steps async but control resource usage via reactor max_concurrency
- Use smaller batch sizes in map operations

**For mixed workloads:**
- Separate I/O and CPU operations
- Use moderate concurrency limits (2x CPU cores)
- Monitor performance with telemetry

**For memory efficiency:**
- Process data in streams when possible
- Avoid storing large intermediate results
- Use appropriate batch sizes for your data characteristics

## Related Guides

- [Building Async Workflows](../tutorials/03-async-workflows.md) - Understanding Reactor's concurrency model
- [Data Processing Pipelines](data-pipelines.md) - Efficient batch processing patterns
- [Testing Strategies](testing-strategies.md) - Performance testing approaches
- [Debugging Workflows](debugging-workflows.md) - Performance monitoring and profiling