usage-rules/flows.md

# Traffic Flows Analysis Guide

## Overview

The `PcapFileEx.Flows` module provides a unified API to analyze PCAP files and identify traffic flows by protocol (HTTP/1, HTTP/2, UDP).

## Quick Start

```elixir
# Analyze a PCAP file
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng")

# Access flows by protocol
IO.puts("HTTP/1 flows: #{length(result.http1)}")
IO.puts("HTTP/2 flows: #{length(result.http2)}")
IO.puts("UDP flows: #{length(result.udp)}")
```

## Key Concepts

### AnalysisResult

The main result structure containing all flows:

```elixir
%PcapFileEx.Flows.AnalysisResult{
  flows: %{FlowKey.t() => flow_ref()},   # O(1) lookup map
  http1: [HTTP1.Flow.t()],               # Sorted by first exchange timestamp
  http2: [HTTP2.Flow.t()],               # Sorted by first stream timestamp
  udp: [UDP.Flow.t()],                   # Sorted by first datagram timestamp
  timeline: [TimelineEvent.t()],         # Unified timeline
  stats: Stats.t()                       # Aggregate statistics
}
```

### FlowKey

Stable identity for O(1) flow lookups:

```elixir
key = PcapFileEx.FlowKey.new(:http2, client_endpoint, server_endpoint)
flow = PcapFileEx.Flows.AnalysisResult.get_flow(result, key)
```

### Flow

Base flow identity with display and authoritative fields:

```elixir
%PcapFileEx.Flow{
  protocol: :http2,
  from: "web-client",           # Display: hostname (no port)
  server: "api-gateway:8080",   # Display: host:port
  client: "web-client:54321",   # Display: host:port
  server_endpoint: %Endpoint{}, # Authoritative
  client_endpoint: %Endpoint{}  # Authoritative
}
```

### TimelineEvent

For unified playback across protocols:

```elixir
Enum.each(result.timeline, fn event ->
  data = PcapFileEx.Flows.AnalysisResult.get_event(result, event)

  case data do
    %HTTP1.Exchange{} -> handle_http1(data)
    %HTTP2.Stream{} -> handle_http2(data)
    %UDP.Datagram{} -> handle_udp(data)
  end
end)
```

## Protocol-Specific Flows

### HTTP/1 Flows

```elixir
Enum.each(result.http1, fn flow ->
  IO.puts("Flow from #{flow.flow.from} to #{flow.flow.server}")

  Enum.each(flow.exchanges, fn exchange ->
    IO.puts("  #{exchange.request.method} #{exchange.request.path}")

    if exchange.complete do
      IO.puts("    -> #{exchange.response.status} (#{exchange.response_delay_ms}ms)")
    end
  end)
end)
```

### HTTP/2 Flows

HTTP/2 uses "streams" to match HTTP/2 spec terminology:

```elixir
Enum.each(result.http2, fn flow ->
  IO.puts("Flow from #{flow.flow.from} to #{flow.flow.server}")

  # Complete streams
  Enum.each(flow.streams, fn stream ->
    ex = stream.exchange
    IO.puts("  #{ex.request.method} #{ex.request.path} -> #{ex.response.status}")
    IO.puts("    Response delay: #{stream.response_delay_ms}ms")
  end)

  # Incomplete streams (RST_STREAM, GOAWAY, truncated)
  Enum.each(flow.incomplete, fn inc ->
    IO.puts("  Incomplete stream #{inc.stream_id}: #{inc.reason}")
  end)
end)
```

### UDP Flows

UDP flows are grouped by server (destination) only:

```elixir
Enum.each(result.udp, fn flow ->
  # UDP flows have from: :any since sources can vary
  IO.puts("UDP to #{flow.flow.server}: #{length(flow.datagrams)} datagrams")

  Enum.each(flow.datagrams, fn dg ->
    IO.puts("  #{dg.from} -> #{dg.to}: #{dg.size} bytes @ +#{dg.relative_offset_ms}ms")
  end)
end)
```

## Playback Timing

### HTTP Response Delay

```elixir
# HTTP/1
exchange.response_delay_ms  # Time from request to response

# HTTP/2
stream.response_delay_ms    # Time from request start to response completion

# Example playback
def playback_http1(exchange) do
  send_request(exchange.request)
  Process.sleep(exchange.response_delay_ms)
  send_response(exchange.response)
end
```

### UDP Relative Offset

```elixir
# First datagram in flow has relative_offset_ms = 0
datagram.relative_offset_ms  # Offset from flow start

# Example playback
def playback_udp(flow) do
  start_time = System.monotonic_time(:millisecond)

  Enum.each(flow.datagrams, fn dg ->
    elapsed = System.monotonic_time(:millisecond) - start_time
    remaining = dg.relative_offset_ms - elapsed
    if remaining > 0, do: Process.sleep(remaining)

    send_udp(dg.to, dg.payload)
  end)
end
```

## Hosts Mapping

Resolve IP addresses to human-readable hostnames:

```elixir
hosts = %{
  "192.168.1.10" => "api-gateway",
  "192.168.1.20" => "metrics-collector",
  "192.168.1.30" => "web-client"
}

{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", hosts_map: hosts)

# Now flows show friendly names
result.http2
|> Enum.map(fn f -> {f.flow.from, f.flow.server} end)
# => [{"web-client", "api-gateway:8080"}, ...]
```

## Protocol Detection

TCP flows are classified by content inspection:

- **HTTP/2**: Connection preface `"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"`
- **HTTP/1**: Request methods (`GET `, `POST `, etc.) or `HTTP/` response

```elixir
alias PcapFileEx.Flows.ProtocolDetector

ProtocolDetector.detect("GET / HTTP/1.1\r\n")  # => :http1
ProtocolDetector.detect("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")  # => :http2
ProtocolDetector.detect(<<0, 1, 2, 3>>)  # => :unknown
```

## Options

```elixir
PcapFileEx.Flows.analyze("capture.pcapng",
  hosts_map: %{...},      # IP to hostname mapping
  decode_content: true,   # Decode HTTP bodies (default: true)
  tcp_port: 8080,         # Filter TCP to specific port
  udp_port: 5005          # Filter UDP to specific port
)
```

## Common Patterns

### Filter by Client

```elixir
result.http2
|> Enum.filter(fn f -> f.flow.from == "web-client" end)
|> Enum.flat_map(& &1.streams)
```

### Get All Requests

```elixir
all_requests =
  result.http1
  |> Enum.flat_map(& &1.exchanges)
  |> Enum.map(& &1.request)

http2_requests =
  result.http2
  |> Enum.flat_map(& &1.streams)
  |> Enum.map(& &1.exchange.request)
```

### Find Errors

```elixir
# HTTP errors
errors =
  result.http1
  |> Enum.flat_map(& &1.exchanges)
  |> Enum.filter(fn ex -> ex.complete and ex.response.status >= 400 end)

# Incomplete HTTP/2 streams
incomplete =
  result.http2
  |> Enum.flat_map(& &1.incomplete)
```

### Calculate Statistics

```elixir
# Total bytes across all flows
total_bytes =
  result.http1
  |> Enum.map(& &1.stats.byte_count)
  |> Enum.sum()

# Duration of a flow
flow = hd(result.http2)
IO.puts("Duration: #{flow.stats.duration_ms}ms")
```

## Data Structures

### HTTP1.Exchange

```elixir
%HTTP1.Exchange{
  flow_seq: 0,                   # Index within flow's exchange list
  request: %{
    method: "GET",
    path: "/api/users",
    version: "1.1",
    headers: %{"host" => "api.example.com"},
    body: "",
    decoded_body: nil,
    timestamp: %Timestamp{}
  },
  response: %{
    status: 200,
    reason: "OK",
    version: "1.1",
    headers: %{"content-type" => "application/json"},
    body: "{...}",
    decoded_body: {:json, %{...}},
    timestamp: %Timestamp{}
  },
  start_timestamp: %Timestamp{},
  end_timestamp: %Timestamp{},
  response_delay_ms: 150,
  complete: true
}
```

### HTTP2.Stream

```elixir
%HTTP2.Stream{
  flow_seq: 0,                   # Index within flow's stream list
  exchange: %HTTP2.Exchange{},   # Full HTTP/2 exchange
  start_timestamp: %Timestamp{}, # Converted from DateTime
  response_delay_ms: 75          # Exchange duration (see Known Limitations)
}
```

### UDP.Datagram

```elixir
%UDP.Datagram{
  flow_seq: 0,                   # Index within flow's datagram list
  from: %Endpoint{},
  to: %Endpoint{},
  payload: <<...>>,
  timestamp: %Timestamp{},
  relative_offset_ms: 0,         # Offset from flow start
  size: 1024
}
```

## Best Practices

1. **Use `FlowKey` for lookups** - O(1) access instead of iterating

2. **Check `complete` for HTTP** - Incomplete exchanges have `nil` response

3. **Use `streams` for HTTP/2** - Matches HTTP/2 spec terminology

4. **Use timeline for playback** - Maintains chronological order across protocols

5. **Apply hosts_map early** - Makes logs and debugging more readable

6. **Understand `flow_seq` vs `seq_num`** - `flow_seq` is the index within a flow's event list; `seq_num` is only in TimelineEvent for timeline position

## Known Limitations

### HTTP/1 Timestamp Coarseness

HTTP/1 request/response timestamps use the first TCP segment timestamp for each direction. This means:

- Multiple pipelined requests share the same `start_timestamp`
- `response_delay_ms` may not reflect true per-request latency for pipelined traffic

**Workaround**: For precise timing, analyze flows with single request/response exchanges.

### HTTP/2 response_delay_ms

`HTTP2.Stream.response_delay_ms` is the full exchange duration (request start → response complete), not time-to-first-byte (TTFB). For large response bodies, this over-estimates actual response latency.

**Workaround**: For TTFB approximations, consider using the underlying `exchange.start_timestamp` and `exchange.end_timestamp` along with response body size.

### FlowKey Host Independence

FlowKey lookups ignore the `host` field in endpoints. This means you can look up flows using keys built with or without `hosts_map` applied - both will find the same flow.