usage-rules/merging.md

# Multi-File PCAP Merge Guide

PcapFileEx provides chronological merging of multiple PCAP/PCAPNG files. This guide explains when and how to use the merge functionality effectively.

## Overview

The `PcapFileEx.Merge` module merges packets from multiple capture files into a single chronologically-ordered stream using nanosecond-precision timestamps.

### Key Features

| Feature | Description | Benefit |
|---------|-------------|---------|
| **Nanosecond precision** | Uses `Timestamp.compare/2` for accurate ordering | Preserves microsecond/nanosecond timestamps |
| **Memory efficient** | O(N files) memory via min-heap | Handles unlimited file sizes |
| **Mixed formats** | PCAP + PCAPNG in same merge | No conversion needed |
| **Interface remapping** | Global interface ID assignment | Prevents PCAPNG interface collisions |
| **Source annotation** | Track packet origins | Debug and provenance tracking |
| **Clock validation** | Detect timestamp drift | Identify clock sync issues |
| **Flexible error handling** | `:halt`, `:skip`, `:collect` modes | Control error behavior |

## Decision Tree: When to Use Merge

```
Need to combine multiple capture files?
├─ YES: Are files from synchronized clocks?
│   ├─ YES: Are files < 100MB each?
│   │   ├─ YES: Use Merge.stream/2 (simple)
│   │   └─ NO: Use Merge.stream/2 with lazy processing
│   └─ NO: Use Merge.validate_clocks/1 first
│       ├─ Drift < 10s: Use merge (with warning)
│       └─ Drift > 10s: Fix clock sync, then merge
└─ NO: Use standard PcapFileEx.stream/1

Need to track packet origins?
├─ YES: Use annotate_source: true option
└─ NO: Use default options

Files might have corrupt packets?
├─ YES: Use on_error: :collect or :skip
└─ NO: Use default on_error: :halt
```

## Basic Usage

### Simple Merge

```elixir
# Merge multiple files chronologically
{:ok, stream} = PcapFileEx.Merge.stream([
  "server1.pcap",
  "server2.pcap",
  "server3.pcap"
])

# Process as normal stream
packets = Enum.to_list(stream)

# Or use bang variant (raises on error)
stream = PcapFileEx.Merge.stream!([
  "server1.pcap",
  "server2.pcap"
])
```

### Count Total Packets

```elixir
# Fast packet count without loading packets
count = PcapFileEx.Merge.count([
  "capture1.pcap",
  "capture2.pcap",
  "capture3.pcap"
])

IO.puts("Total packets: #{count}")
```

### Validate Clock Synchronization

```elixir
files = ["server1.pcap", "server2.pcap", "server3.pcap"]

case PcapFileEx.Merge.validate_clocks(files) do
  {:ok, stats} ->
    IO.puts("Clock drift: #{stats.max_drift_ms}ms - acceptable")
    {:ok, stream} = PcapFileEx.Merge.stream(files)
    # Process stream...

  {:error, :excessive_drift, stats} ->
    IO.puts("WARNING: Clock drift #{stats.max_drift_ms}ms exceeds threshold")
    IO.puts("Files:")
    Enum.each(stats.files, fn file_stats ->
      IO.puts("  #{file_stats.path}: #{file_stats.count} packets")
      IO.puts("    First: #{file_stats.first_timestamp}")
      IO.puts("    Last:  #{file_stats.last_timestamp}")
    end)
    # Decide whether to proceed with merge or fix clock sync
end
```

## Advanced Features

### Source Annotation

Track which file each packet came from:

```elixir
{:ok, stream} = PcapFileEx.Merge.stream(
  ["server1.pcap", "server2.pcap"],
  annotate_source: true
)

# Each item is now {packet, metadata}
stream
|> Enum.take(10)
|> Enum.each(fn {packet, metadata} ->
  IO.puts("Packet from #{metadata.source_file}")
  IO.puts("  File index: #{metadata.file_index}")
  IO.puts("  Packet index: #{metadata.packet_index}")
  IO.puts("  Timestamp: #{packet.timestamp_precise}")

  # PCAPNG files include interface ID info
  if Map.has_key?(metadata, :original_interface_id) do
    IO.puts("  Original interface: #{metadata.original_interface_id}")
    IO.puts("  Remapped interface: #{metadata.remapped_interface_id}")
  end
end)
```

### Error Handling Modes

#### Halt Mode (Default)

```elixir
# Stop on first error - safest behavior
{:ok, stream} = PcapFileEx.Merge.stream(
  files,
  on_error: :halt  # default
)

# Stream will halt if any packet fails to parse
packets = Enum.to_list(stream)
```

#### Skip Mode

```elixir
# Skip corrupt packets, emit skip markers
{:ok, stream} = PcapFileEx.Merge.stream(
  files,
  on_error: :skip
)

stream
|> Enum.each(fn
  %PcapFileEx.Packet{} = packet ->
    # Normal packet
    process_packet(packet)

  {:skipped_packet, %{count: count, last_error: error}} ->
    # Corrupt packet skipped
    Logger.warning("Skipped #{count} packet(s): #{error.reason}")
end)
```

#### Collect Mode

```elixir
# Wrap all items in result tuples
{:ok, stream} = PcapFileEx.Merge.stream(
  files,
  on_error: :collect
)

{packets, errors} = Enum.reduce(stream, {[], []}, fn
  {:ok, packet}, {pkts, errs} ->
    {[packet | pkts], errs}

  {:error, metadata}, {pkts, errs} ->
    {pkts, [metadata | errs]}
end)

IO.puts("Successfully parsed: #{length(packets)} packets")
IO.puts("Errors: #{length(errors)}")

Enum.each(errors, fn error ->
  IO.puts("Error in #{error.source_file} at packet #{error.packet_index}")
  IO.puts("  Reason: #{error.reason}")
end)
```

#### Collect + Annotation (Nested Tuples)

```elixir
# Combine error collection with source tracking
{:ok, stream} = PcapFileEx.Merge.stream(
  files,
  annotate_source: true,
  on_error: :collect
)

# Items are now {:ok, {packet, metadata}} or {:error, metadata}
stream
|> Enum.take(100)
|> Enum.each(fn
  {:ok, {packet, metadata}} ->
    IO.puts("Packet from #{metadata.source_file}: #{byte_size(packet.data)} bytes")

  {:error, metadata} ->
    Logger.error("Failed to parse #{metadata.source_file} packet #{metadata.packet_index}")
end)
```

## PCAPNG Interface Remapping

When merging multiple PCAPNG files, interface IDs are automatically remapped to prevent collisions:

```elixir
# file1.pcapng has interfaces 0, 1
# file2.pcapng has interfaces 0, 1
# Merged stream has global interfaces 0, 1, 2, 3

{:ok, stream} = PcapFileEx.Merge.stream(
  ["file1.pcapng", "file2.pcapng"],
  annotate_source: true
)

stream
|> Enum.take(5)
|> Enum.each(fn {packet, metadata} ->
  # packet.interface_id is the global remapped ID
  # metadata.original_interface_id is the file-local ID
  # metadata.remapped_interface_id == packet.interface_id (always)

  IO.puts("From #{metadata.source_file}")
  IO.puts("  Original interface: #{metadata.original_interface_id}")
  IO.puts("  Global interface: #{metadata.remapped_interface_id}")

  # Invariant always holds: packet.interface_id == packet.interface.id
  if packet.interface do
    IO.puts("  Interface name: #{packet.interface.name}")
  end
end)
```

## Clock Synchronization Best Practices

### Why Clock Sync Matters

Without synchronized clocks, packets will be merged in **wrong order**:

```
Server A clock: 2025-11-09 10:00:00.000 (fast by 5 seconds)
Server B clock: 2025-11-09 09:59:55.000 (accurate)

Actual timeline:
  09:59:55.000 - Server B: HTTP request
  09:59:55.100 - Server A: HTTP response  (but timestamp says 10:00:00.100!)

Merged timeline WITHOUT sync:
  09:59:55.000 - Server B: HTTP request
  10:00:00.100 - Server A: HTTP response   <-- WRONG ORDER! (5 seconds late)

Result: HTTP response appears 5 seconds after request (impossible!)
```

### Recommended: chronyd (NTP)

See README.md section "Clock Synchronization for Multi-File Merge" for:
- Installation instructions (Linux/macOS)
- Configuration examples
- Verification commands
- Troubleshooting drift issues

### Pre-Merge Validation

```elixir
files = ["server1.pcap", "server2.pcap", "server3.pcap"]

case PcapFileEx.Merge.validate_clocks(files) do
  {:ok, %{max_drift_ms: drift}} when drift < 1000 ->
    # < 1 second drift is excellent
    IO.puts("✅ Clocks well synchronized (#{drift}ms drift)")
    {:ok, stream} = PcapFileEx.Merge.stream(files)

  {:ok, %{max_drift_ms: drift}} when drift < 10_000 ->
    # 1-10 seconds drift is acceptable for most use cases
    IO.puts("⚠️  Moderate clock drift (#{drift}ms) - proceed with caution")
    {:ok, stream} = PcapFileEx.Merge.stream(files)

  {:error, :excessive_drift, %{max_drift_ms: drift}} ->
    # > 10 seconds drift is problematic
    IO.puts("❌ Excessive clock drift (#{drift}ms) - fix NTP sync before merging")
    :error
end
```

## Performance Characteristics

### Memory Usage

```elixir
# Merge is memory-efficient: O(N files) not O(M packets)
# Only one packet per file is buffered in memory

# Small files (3 files × 10MB each)
{:ok, stream} = PcapFileEx.Merge.stream([
  "small1.pcap",  # 10MB
  "small2.pcap",  # 10MB
  "small3.pcap"   # 10MB
])
# Memory: ~3 packets buffered (few KB)
# Total: 30MB of files, but only KB of memory used

# Large files (3 files × 1GB each)
{:ok, stream} = PcapFileEx.Merge.stream([
  "large1.pcap",  # 1GB
  "large2.pcap",  # 1GB
  "large3.pcap"   # 1GB
])
# Memory: Still ~3 packets buffered (few KB)
# Total: 3GB of files, but only KB of memory used
```

### Time Complexity

- **Merge algorithm**: O(M log N) where M = total packets, N = number of files
- **Heap operations**: O(log N) per packet (push/pop from min-heap)
- **Timestamp comparison**: O(1) using `Timestamp.compare/2`

### Lazy Evaluation

```elixir
# Stream is lazy - packets only read when consumed
{:ok, stream} = PcapFileEx.Merge.stream([
  "file1.pcap",
  "file2.pcap",
  "file3.pcap"
])

# No packets loaded yet! Files opened but not read.

# Take first 10 packets - only reads enough to produce 10 results
first_10 = Enum.take(stream, 10)

# Find first HTTP packet - stops as soon as found
first_http = Enum.find(stream, fn packet ->
  :http in packet.protocols
end)
```

## Common Patterns

### Distributed Network Capture

```elixir
# Capture from multiple network taps, merge chronologically
firewall_capture = "firewall.pcap"
web_server_capture = "webserver.pcap"
db_server_capture = "database.pcap"

{:ok, stream} = PcapFileEx.Merge.stream(
  [firewall_capture, web_server_capture, db_server_capture],
  annotate_source: true
)

# Trace HTTP request through all systems
stream
|> Stream.filter(fn {packet, _meta} -> :http in packet.protocols end)
|> Stream.take(100)
|> Enum.each(fn {packet, metadata} ->
  location = Path.basename(metadata.source_file, ".pcap")
  IO.puts("[#{location}] HTTP packet at #{packet.timestamp}")
end)
```

### Time-Range Analysis

```elixir
# Merge and filter to specific time window
files = ["capture1.pcap", "capture2.pcap", "capture3.pcap"]

{:ok, stream} = PcapFileEx.Merge.stream(files)

# Find packets in specific 10-second window
start_time = ~U[2025-11-09 10:30:00Z]
end_time = ~U[2025-11-09 10:30:10Z]

packets_in_window =
  stream
  |> Stream.filter(fn packet ->
    DateTime.compare(packet.timestamp, start_time) in [:gt, :eq] and
    DateTime.compare(packet.timestamp, end_time) in [:lt, :eq]
  end)
  |> Enum.to_list()

IO.puts("Found #{length(packets_in_window)} packets in time window")
```

### Multi-Site Correlation

```elixir
# Correlate events across multiple geographic locations
sites = [
  {"us-east", "captures/us-east.pcap"},
  {"us-west", "captures/us-west.pcap"},
  {"eu-west", "captures/eu-west.pcap"},
  {"ap-south", "captures/ap-south.pcap"}
]

file_paths = Enum.map(sites, fn {_site, path} -> path end)

{:ok, stream} = PcapFileEx.Merge.stream(
  file_paths,
  annotate_source: true
)

# Track HTTP requests across sites
stream
|> Stream.filter(fn {packet, _meta} -> :http in packet.protocols end)
|> Enum.each(fn {packet, metadata} ->
  {site, _} = Enum.find(sites, fn {_, path} ->
    path == metadata.source_file
  end)

  IO.puts("[#{site}] #{packet.timestamp} - HTTP #{byte_size(packet.data)} bytes")
end)
```

### Error Recovery

```elixir
# Merge with automatic error recovery
files = ["potentially_corrupt1.pcap", "potentially_corrupt2.pcap"]

{:ok, stream} = PcapFileEx.Merge.stream(
  files,
  annotate_source: true,
  on_error: :skip
)

{success_count, skip_count} = Enum.reduce(stream, {0, 0}, fn
  %PcapFileEx.Packet{}, {success, skip} ->
    {success + 1, skip}

  {:skipped_packet, meta}, {success, skip} ->
    Logger.warning("Skipped #{meta.count} packet(s) in #{meta.last_error.source_file}")
    {success, skip + meta.count}
end)

IO.puts("Successfully parsed: #{success_count}")
IO.puts("Skipped corrupt packets: #{skip_count}")
```

## Troubleshooting

### Problem: Files Not Merging in Expected Order

**Symptom:** Packets appear out of order despite using merge

**Cause:** Clock drift between capture systems

**Solution:**
```elixir
# Validate clocks first
{:error, :excessive_drift, stats} = PcapFileEx.Merge.validate_clocks(files)

IO.puts("Clock drift: #{stats.max_drift_ms}ms")
Enum.each(stats.files, fn file_stats ->
  IO.puts("#{file_stats.path}:")
  IO.puts("  First: #{file_stats.first_timestamp}")
  IO.puts("  Last: #{file_stats.last_timestamp}")
end)

# Fix: Synchronize clocks with chronyd/NTP before capturing
```

### Problem: "file not found" Error

**Symptom:** `{:error, {:file_not_found, path}}`

**Solution:**
```elixir
# Validate files exist before merging
files = ["file1.pcap", "file2.pcap", "file3.pcap"]

existing_files = Enum.filter(files, &File.exists?/1)
missing_files = files -- existing_files

if missing_files != [] do
  IO.puts("Missing files: #{inspect(missing_files)}")
else
  {:ok, stream} = PcapFileEx.Merge.stream(existing_files)
end
```

### Problem: Memory Usage Growing

**Symptom:** Memory increases during merge

**Cause:** Using `Enum.to_list/1` instead of streaming

**Solution:**
```elixir
# ❌ Bad: Loads all packets into memory
{:ok, stream} = PcapFileEx.Merge.stream(files)
all_packets = Enum.to_list(stream)  # Memory grows!

# ✅ Good: Process lazily
{:ok, stream} = PcapFileEx.Merge.stream(files)
stream
|> Stream.filter(fn packet -> :http in packet.protocols end)
|> Stream.take(100)
|> Enum.each(&process_packet/1)  # Constant memory
```

### Problem: Interface IDs Don't Match

**Symptom:** For PCAPNG merges, `packet.interface_id != packet.interface.id`

**This should never happen!** If you see this, it's a bug in PcapFileEx. Please report with:
```elixir
# Include this debugging info in bug report
{:ok, stream} = PcapFileEx.Merge.stream(files, annotate_source: true)

stream
|> Enum.take(10)
|> Enum.each(fn {packet, metadata} ->
  if packet.interface && packet.interface_id != packet.interface.id do
    IO.puts("BUG: Interface ID mismatch!")
    IO.puts("  Source: #{metadata.source_file}")
    IO.puts("  packet.interface_id: #{packet.interface_id}")
    IO.puts("  packet.interface.id: #{packet.interface.id}")
    IO.puts("  Original ID: #{metadata.original_interface_id}")
    IO.puts("  Remapped ID: #{metadata.remapped_interface_id}")
  end
end)
```

## Related Documentation

- [Usage Rules](../usage-rules.md) - Main usage guide with decision trees
- [Performance Guide](performance.md) - Performance optimization strategies
- [Format Guide](formats.md) - PCAP vs PCAPNG differences
- [Examples](examples.md) - Complete working examples
- README.md - Clock synchronization setup instructions