usage-rules/flows.md

# Traffic Flows Analysis Guide

## Overview

The `PcapFileEx.Flows` module provides a unified API to analyze PCAP files and identify traffic flows by protocol (HTTP/1, HTTP/2, UDP).

## Quick Start

```elixir
# Analyze a PCAP file
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng")

# Access flows by protocol
IO.puts("HTTP/1 flows: #{length(result.http1)}")
IO.puts("HTTP/2 flows: #{length(result.http2)}")
IO.puts("UDP flows: #{length(result.udp)}")
```

## Key Concepts

### AnalysisResult

The main result structure containing all flows:

```elixir
%PcapFileEx.Flows.AnalysisResult{
  flows: %{FlowKey.t() => flow_ref()},   # O(1) lookup map
  http1: [HTTP1.Flow.t()],               # Sorted by first exchange timestamp
  http2: [HTTP2.Flow.t()],               # Sorted by first stream timestamp
  udp: [UDP.Flow.t()],                   # Sorted by first datagram timestamp
  timeline: [TimelineEvent.t()],         # Unified timeline
  stats: Stats.t(),                      # Aggregate statistics
  summary: Summary.t()                   # Pre-aggregated traffic for topology
}
```

### FlowKey

Stable identity for O(1) flow lookups:

```elixir
key = PcapFileEx.FlowKey.new(:http2, client_endpoint, server_endpoint)
flow = PcapFileEx.Flows.AnalysisResult.get_flow(result, key)
```

### Flow

Base flow identity with display and authoritative fields:

```elixir
%PcapFileEx.Flow{
  protocol: :http2,
  from: "web-client",           # Display: hostname (no port)
  server: "api-gateway:8080",   # Display: host:port
  client: "web-client:54321",   # Display: host:port
  server_endpoint: %Endpoint{}, # Authoritative
  client_endpoint: %Endpoint{}  # Authoritative
}
```

### TimelineEvent

For unified playback across protocols:

```elixir
Enum.each(result.timeline, fn event ->
  data = PcapFileEx.Flows.AnalysisResult.get_event(result, event)

  case data do
    %HTTP1.Exchange{} -> handle_http1(data)
    %HTTP2.Stream{} -> handle_http2(data)
    %UDP.Datagram{} -> handle_udp(data)
  end
end)
```

## Protocol-Specific Flows

### HTTP/1 Flows

```elixir
Enum.each(result.http1, fn flow ->
  IO.puts("Flow from #{flow.flow.from} to #{flow.flow.server}")

  Enum.each(flow.exchanges, fn exchange ->
    IO.puts("  #{exchange.request.method} #{exchange.request.path}")

    if exchange.complete do
      IO.puts("    -> #{exchange.response.status} (#{exchange.response_delay_ms}ms)")
    end
  end)
end)
```

### HTTP/2 Flows

HTTP/2 uses "streams" to match HTTP/2 spec terminology:

```elixir
Enum.each(result.http2, fn flow ->
  IO.puts("Flow from #{flow.flow.from} to #{flow.flow.server}")

  # Complete streams
  Enum.each(flow.streams, fn stream ->
    ex = stream.exchange
    IO.puts("  #{ex.request.method} #{ex.request.path} -> #{ex.response.status}")
    IO.puts("    Response delay: #{stream.response_delay_ms}ms")
  end)

  # Incomplete streams (RST_STREAM, GOAWAY, truncated)
  Enum.each(flow.incomplete, fn inc ->
    IO.puts("  Incomplete stream #{inc.stream_id}: #{inc.reason}")
  end)
end)
```

### UDP Flows

UDP flows are grouped by server (destination) only:

```elixir
Enum.each(result.udp, fn flow ->
  # UDP flows have from: :any since sources can vary
  IO.puts("UDP to #{flow.flow.server}: #{length(flow.datagrams)} datagrams")

  Enum.each(flow.datagrams, fn dg ->
    IO.puts("  #{dg.from} -> #{dg.to}: #{dg.size} bytes @ +#{dg.relative_offset_ms}ms")
  end)
end)
```

## Playback Timing

### HTTP Response Delay

```elixir
# HTTP/1
exchange.response_delay_ms  # Time from request to response

# HTTP/2
stream.response_delay_ms    # Time from request start to response completion

# Example playback
def playback_http1(exchange) do
  send_request(exchange.request)
  Process.sleep(exchange.response_delay_ms)
  send_response(exchange.response)
end
```

### UDP Relative Offset

```elixir
# First datagram in flow has relative_offset_ms = 0
datagram.relative_offset_ms  # Offset from flow start

# Example playback
def playback_udp(flow) do
  start_time = System.monotonic_time(:millisecond)

  Enum.each(flow.datagrams, fn dg ->
    elapsed = System.monotonic_time(:millisecond) - start_time
    remaining = dg.relative_offset_ms - elapsed
    if remaining > 0, do: Process.sleep(remaining)

    send_udp(dg.to, dg.payload)
  end)
end
```

## Hosts Mapping

Resolve IP addresses to human-readable hostnames:

```elixir
hosts = %{
  "192.168.1.10" => "api-gateway",
  "192.168.1.20" => "metrics-collector",
  "192.168.1.30" => "web-client"
}

{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", hosts_map: hosts)

# Now flows show friendly names
result.http2
|> Enum.map(fn f -> {f.flow.from, f.flow.server} end)
# => [{"web-client", "api-gateway:8080"}, ...]
```

## Traffic Summary

The `summary` field provides pre-aggregated traffic data for network topology visualization:

```elixir
%Summary{
  udp: [%UDPService{}, ...],    # UDP destinations with per-client stats
  http1: [%HTTPService{}, ...], # HTTP/1 servers with per-client stats
  http2: [%HTTPService{}, ...]  # HTTP/2 servers with per-client stats
}
```

### Use Cases

- **Network diagrams** - Show services and connected clients
- **Traffic aggregation** - Total bytes/requests per service
- **Client analysis** - Which clients connect to which services

### Accessing Summary

```elixir
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", hosts_map: hosts)

# Services sorted by traffic volume (bytes desc)
result.summary.http2
|> Enum.each(fn service ->
  IO.puts("#{service.server_host || service.server}")
  IO.puts("  Total: #{service.total_requests} requests, #{service.total_response_bytes} bytes")
  IO.puts("  Methods: #{inspect(service.methods)}")
  IO.puts("  Status codes: #{inspect(service.status_codes)}")

  Enum.each(service.clients, fn client ->
    IO.puts("  - #{client.client_host || client.client}: #{client.request_count} requests")
  end)
end)

# UDP summary
result.summary.udp
|> Enum.each(fn service ->
  IO.puts("UDP #{service.server_host || service.server}:")
  IO.puts("  Total: #{service.total_packets} packets, #{service.total_bytes} bytes")

  Enum.each(service.clients, fn client ->
    IO.puts("  - #{client.client_host || client.client}: #{client.packet_count} packets")
  end)
end)
```

### Summary Data Structures

#### HTTPService

```elixir
%Summary.HTTPService{
  protocol: :http1 | :http2,
  server: "192.168.1.10:8080",       # IP:port string
  server_host: "api-gateway",        # Hostname (from hosts_map)
  clients: [%HTTPClientStats{}, ...],
  total_requests: 150,
  total_responses: 148,
  total_request_bytes: 45000,
  total_response_bytes: 1200000,
  methods: %{"GET" => 100, "POST" => 50},
  status_codes: %{200 => 140, 404 => 5, 500 => 3},
  first_timestamp: %Timestamp{},
  last_timestamp: %Timestamp{}
}
```

#### HTTPClientStats

```elixir
%Summary.HTTPClientStats{
  client: "10.0.0.5",               # Client IP (no port - ephemeral)
  client_host: "web-client",        # Hostname (from hosts_map)
  connection_count: 3,              # TCP connections
  stream_count: 45,                 # HTTP/2 streams (nil for HTTP/1)
  request_count: 45,
  response_count: 44,
  request_bytes: 12000,
  response_bytes: 350000,
  methods: %{"GET" => 40, "POST" => 5},
  status_codes: %{200 => 42, 404 => 2},
  avg_response_time_ms: 75,
  min_response_time_ms: 12,
  max_response_time_ms: 450,
  first_timestamp: %Timestamp{},
  last_timestamp: %Timestamp{}
}
```

#### UDPService

```elixir
%Summary.UDPService{
  server: "192.168.1.20:5005",      # IP:port string
  server_host: "metrics-collector", # Hostname (from hosts_map)
  clients: [%UDPClientStats{}, ...],
  total_packets: 5000,
  total_bytes: 2500000,
  first_timestamp: %Timestamp{},
  last_timestamp: %Timestamp{}
}
```

#### UDPClientStats

```elixir
%Summary.UDPClientStats{
  client: "10.0.0.5",               # Client IP (no port)
  client_host: "sensor-node",       # Hostname (from hosts_map)
  packet_count: 1200,
  total_bytes: 600000,
  avg_size: 500,
  min_size: 64,
  max_size: 1400,
  first_timestamp: %Timestamp{},
  last_timestamp: %Timestamp{}
}
```

### Rendering Summary

Use `Summary.Render` to generate markdown tables or Mermaid flowcharts:

```elixir
alias PcapFileEx.Flows.Summary.Render

{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", hosts_map: hosts)

# Markdown tables
markdown = Render.to_markdown(result.summary)
IO.puts(markdown)

# Mermaid flowchart
mermaid = Render.to_mermaid(result.summary)
IO.puts(mermaid)
```

#### to_markdown/2 Options

```elixir
Render.to_markdown(summary,
  title: true,           # Add "## HTTP Traffic" / "## UDP Traffic" headers
  humanize_bytes: false, # Format as "1.5 MB" instead of "1500000"
  protocol: :all         # :all, :http1, :http2, or :udp
)
```

#### to_mermaid/2 Options

```elixir
Render.to_mermaid(summary,
  style: :host,          # :host (default) or :service
  direction: :lr,        # :lr (left-right), :tb (top-bottom), :rl, :bt
  group_by: :protocol    # :protocol (subgraphs per protocol) or :none (only for :service)
)
```

**Styles:**
- `:host` (default) - Unified host nodes, protocol/port on edges. Hosts that act as both client AND server appear as a single node.
- `:service` - Each service (host:port) is a separate node, grouped by protocol with Clients subgraph

#### Example Mermaid Output (style: :host - default)

```mermaid
flowchart LR
  web_client[web-client]
  api_gateway[api-gateway]
  metrics[metrics]
  web_client -->|"HTTP/2 :8080 (45 req)"| api_gateway
  web_client -->|"UDP :5005 (100 pkts)"| api_gateway
```

The default host-centric view:
- Uses unified node IDs (hosts that are both client and server appear once)
- No subgraphs - roles are clear from arrow direction
- Protocol and port information shown on edges

#### Example Mermaid Output (style: :service)

```elixir
Render.to_mermaid(summary, style: :service)
```

```mermaid
flowchart LR
  subgraph Clients
    c_web_client[web-client]
    c_sensor_1[sensor-1]
  end
  subgraph HTTP/2
    shttp2_0[api-gateway:8080]
  end
  subgraph UDP
    sudp_0[metrics:5005]
  end
  c_web_client -->|"45 req"| shttp2_0
  c_sensor_1 -->|"1200 pkts"| sudp_0
```

The service-centric view groups nodes by protocol with separate client/server nodes.

## Protocol Detection

TCP flows are classified by content inspection:

- **HTTP/2**: Connection preface `"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"`
- **HTTP/1**: Request methods (`GET `, `POST `, etc.) or `HTTP/` response

```elixir
alias PcapFileEx.Flows.ProtocolDetector

ProtocolDetector.detect("GET / HTTP/1.1\r\n")  # => :http1
ProtocolDetector.detect("PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n")  # => :http2
ProtocolDetector.detect(<<0, 1, 2, 3>>)  # => :unknown
```

## Options

```elixir
PcapFileEx.Flows.analyze("capture.pcapng",
  hosts_map: %{...},      # IP to hostname mapping
  decode_content: true,   # Decode HTTP bodies (default: true)
  decoders: [...],        # Custom decoder specs (see flows-decoders.md)
  keep_binary: false,     # Preserve original binary after decoding (default: false)
  unwrap_custom: true,    # Return custom decoder results directly (default: true)
  tcp_port: 8080,         # Filter TCP to specific port
  udp_port: 5005          # Filter UDP to specific port
)
```

### unwrap_custom Option

Controls how custom decoder results are returned:

- `unwrap_custom: true` (default) - Decoder results returned directly
- `unwrap_custom: false` - Results wrapped in `{:custom, ...}` tuples

```elixir
# Default behavior: decoder results are unwrapped
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng", decoders: [my_decoder])
datagram.payload  # => {:my_telemetry, %{sensor: 1, temp: 23.5}}

# With unwrap_custom: false: wrapped in {:custom, ...}
{:ok, result} = PcapFileEx.Flows.analyze("capture.pcapng",
  decoders: [my_decoder],
  unwrap_custom: false
)
datagram.payload  # => {:custom, {:my_telemetry, %{sensor: 1, temp: 23.5}}}
```

See `flows-decoders.md` for full custom decoder documentation.

## Common Patterns

### Filter by Client

```elixir
result.http2
|> Enum.filter(fn f -> f.flow.from == "web-client" end)
|> Enum.flat_map(& &1.streams)
```

### Get All Requests

```elixir
all_requests =
  result.http1
  |> Enum.flat_map(& &1.exchanges)
  |> Enum.map(& &1.request)

http2_requests =
  result.http2
  |> Enum.flat_map(& &1.streams)
  |> Enum.map(& &1.exchange.request)
```

### Find Errors

```elixir
# HTTP errors
errors =
  result.http1
  |> Enum.flat_map(& &1.exchanges)
  |> Enum.filter(fn ex -> ex.complete and ex.response.status >= 400 end)

# Incomplete HTTP/2 streams
incomplete =
  result.http2
  |> Enum.flat_map(& &1.incomplete)
```

### Calculate Statistics

```elixir
# Total bytes across all flows
total_bytes =
  result.http1
  |> Enum.map(& &1.stats.byte_count)
  |> Enum.sum()

# Duration of a flow
flow = hd(result.http2)
IO.puts("Duration: #{flow.stats.duration_ms}ms")
```

## Data Structures

### HTTP1.Exchange

```elixir
%HTTP1.Exchange{
  flow_seq: 0,                   # Index within flow's exchange list
  request: %{
    method: "GET",
    path: "/api/users",
    version: "1.1",
    headers: %{"host" => "api.example.com"},
    body: "",
    decoded_body: nil,
    timestamp: %Timestamp{}
  },
  response: %{
    status: 200,
    reason: "OK",
    version: "1.1",
    headers: %{"content-type" => "application/json"},
    body: "{...}",
    decoded_body: {:json, %{...}},
    timestamp: %Timestamp{}
  },
  start_timestamp: %Timestamp{},
  end_timestamp: %Timestamp{},
  response_delay_ms: 150,
  complete: true
}
```

### HTTP2.Stream

```elixir
%HTTP2.Stream{
  flow_seq: 0,                   # Index within flow's stream list
  exchange: %HTTP2.Exchange{},   # Full HTTP/2 exchange
  start_timestamp: %Timestamp{}, # Converted from DateTime
  response_delay_ms: 75          # Exchange duration (see Known Limitations)
}
```

### UDP.Datagram

```elixir
%UDP.Datagram{
  flow_seq: 0,                   # Index within flow's datagram list
  from: %Endpoint{},
  to: %Endpoint{},
  payload: <<...>>,
  timestamp: %Timestamp{},
  relative_offset_ms: 0,         # Offset from flow start
  size: 1024
}
```

## Best Practices

1. **Use `FlowKey` for lookups** - O(1) access instead of iterating

2. **Check `complete` for HTTP** - Incomplete exchanges have `nil` response

3. **Use `streams` for HTTP/2** - Matches HTTP/2 spec terminology

4. **Use timeline for playback** - Maintains chronological order across protocols

5. **Apply hosts_map early** - Makes logs and debugging more readable

6. **Understand `flow_seq` vs `seq_num`** - `flow_seq` is the index within a flow's event list; `seq_num` is only in TimelineEvent for timeline position

## Known Limitations

### HTTP/1 Timestamp Coarseness

HTTP/1 request/response timestamps use the first TCP segment timestamp for each direction. This means:

- Multiple pipelined requests share the same `start_timestamp`
- `response_delay_ms` may not reflect true per-request latency for pipelined traffic

**Workaround**: For precise timing, analyze flows with single request/response exchanges.

### HTTP/2 response_delay_ms

`HTTP2.Stream.response_delay_ms` is the full exchange duration (request start → response complete), not time-to-first-byte (TTFB). For large response bodies, this over-estimates actual response latency.

**Workaround**: For TTFB approximations, consider using the underlying `exchange.start_timestamp` and `exchange.end_timestamp` along with response body size.

### FlowKey Host Independence

FlowKey lookups ignore the `host` field in endpoints. This means you can look up flows using keys built with or without `hosts_map` applied - both will find the same flow.