docs/guides/streaming.md

# Streaming

This guide covers Tinkex's streaming capabilities for working with Server-Sent Events (SSE) from event-stream endpoints.

## Overview

Tinkex provides low-level streaming support through the SSE (Server-Sent Events) decoder, which can parse incremental chunks of event data from HTTP streams. The primary use case is consuming real-time event streams from compatible endpoints.

The streaming implementation consists of two main components:

- `Tinkex.Streaming.SSEDecoder` - A stateful decoder that parses SSE-formatted data
- `Tinkex.API.StreamResponse` - A wrapper struct containing the enumerable stream and metadata

## Server-Sent Events Format

Server-Sent Events follow a simple text-based format where each event is separated by double newlines (`\n\n`, `\r\n\r\n`, or `\r\r`). Each event consists of one or more fields:

```
event: custom_event_name
data: {"key": "value"}
id: event-123
retry: 5000

```

**Field types:**
- `event` - Event type identifier (optional)
- `data` - Event payload, can span multiple lines
- `id` - Event ID for tracking (optional)
- `retry` - Reconnection delay in milliseconds (optional)
- Lines starting with `:` are comments and ignored

## Using the SSEDecoder

The `SSEDecoder` module provides a stateful decoder that can be fed incremental binary chunks. This is useful when processing streaming HTTP responses where data arrives in fragments.

### Basic usage

```elixir
alias Tinkex.Streaming.{SSEDecoder, ServerSentEvent}

# Create a new decoder
decoder = SSEDecoder.new()

# Feed binary chunks as they arrive
chunk1 = "data: {\"message\": \"hello\"}\n\n"
{events1, decoder} = SSEDecoder.feed(decoder, chunk1)

# First event is parsed
[%ServerSentEvent{data: data}] = events1
IO.inspect(data)  # "{\"message\": \"hello\"}"

# Continue feeding more chunks
chunk2 = "event: update\ndata: {\"count\": 42}\n\n"
{events2, decoder} = SSEDecoder.feed(decoder, chunk2)
```

### The ServerSentEvent struct

Each parsed event is represented as a `ServerSentEvent` struct:

```elixir
%ServerSentEvent{
  event: "custom",           # Event type (or nil for unnamed events)
  data: "{\"value\": 123}",  # Event payload as string
  id: "evt-001",             # Event ID (or nil)
  retry: 5000                # Retry delay in ms (or nil)
}
```

### Decoding JSON data

Use `ServerSentEvent.json/1` to attempt JSON decoding of the event data:

```elixir
event = %ServerSentEvent{data: "{\"result\": 42}"}

decoded = ServerSentEvent.json(event)
# Returns: %{"result" => 42}

# If JSON parsing fails, returns the raw string
event = %ServerSentEvent{data: "plain text"}
ServerSentEvent.json(event)
# Returns: "plain text"
```

## Handling Partial Chunks

The decoder maintains an internal buffer to handle partial events that span multiple chunks:

```elixir
decoder = SSEDecoder.new()

# First chunk contains incomplete event
{[], decoder} = SSEDecoder.feed(decoder, "data: {\"par")

# Second chunk completes the event
{events, decoder} = SSEDecoder.feed(decoder, "tial\"}\n\n")

[event] = events
event.data  # "{\"partial\"}"
```

The decoder automatically:
- Buffers incomplete events
- Handles various line ending styles (`\n`, `\r\n`, `\r`)
- Supports double-newline separators in all formats
- Processes multiple events in a single chunk

## Streaming with the API Client

The `Tinkex.API` module provides `stream_get/2` for consuming SSE endpoints directly:

```elixir
{:ok, stream_response} = Tinkex.API.stream_get("/api/v1/events", config: config)

# Access stream metadata
stream_response.status       # 200
stream_response.method       # :get
stream_response.url          # Full URL
stream_response.headers      # Response headers map

# Process events from the stream
stream_response.stream
|> Enum.each(fn event ->
  IO.inspect(event, label: "Received event")
end)
```

### Custom event parsing

By default, `stream_get/2` parses event data as JSON. You can customize this with the `:event_parser` option:

```elixir
# Return raw ServerSentEvent structs
{:ok, response} =
  Tinkex.API.stream_get("/events",
    config: config,
    event_parser: :raw
  )

response.stream
|> Enum.each(fn %ServerSentEvent{} = event ->
  IO.puts("Event type: #{event.event}")
  IO.puts("Data: #{event.data}")
end)

# Use custom parser function
parser = fn event ->
  # Custom transformation logic
  event.data
  |> String.upcase()
end

{:ok, response} =
  Tinkex.API.stream_get("/events",
    config: config,
    event_parser: parser
  )
```

### Example: Processing a stream

```elixir
alias Tinkex.API

# Configure the client
config = Tinkex.Config.new(
  api_key: System.fetch_env!("TINKER_API_KEY"),
  timeout: 30_000  # Longer timeout for streaming
)

# Connect to an event stream
{:ok, stream_resp} = API.stream_get("/api/v1/notifications", config: config)

# Process events as they arrive
stream_resp.stream
|> Stream.filter(fn event ->
  event["type"] == "notification"
end)
|> Stream.map(fn event ->
  %{
    timestamp: DateTime.utc_now(),
    message: event["message"]
  }
end)
|> Enum.take(10)  # Take first 10 events
```

## Error Handling

Streaming operations can fail at multiple points. Handle errors appropriately:

```elixir
case API.stream_get("/events", config: config) do
  {:ok, stream_resp} ->
    try do
      stream_resp.stream
      |> Enum.each(&process_event/1)
    rescue
      e in RuntimeError ->
        Logger.error("Stream processing failed: #{Exception.message(e)}")
    end

  {:error, %Tinkex.Error{} = error} ->
    Logger.error("Failed to connect to stream: #{error.message}")
    # Check error.type for specific error categories:
    # :api_connection, :api_status, :validation
end
```

### Connection errors

Common errors when establishing streams:

- `:api_connection` - Network/transport errors, failed DNS, timeouts
- `:api_status` - HTTP error status codes (4xx, 5xx)
- `:validation` - Invalid response format

### Processing errors

Errors during stream consumption typically surface as exceptions when enumerating:

```elixir
{:ok, stream_resp} = API.stream_get("/events", config: config)

# Wrap enumeration in error handling
result =
  try do
    count =
      stream_resp.stream
      |> Enum.count()

    {:ok, count}
  rescue
    e -> {:error, e}
  end

case result do
  {:ok, count} -> IO.puts("Processed #{count} events")
  {:error, e} -> Logger.error("Stream error: #{inspect(e)}")
end
```

## Use Cases

### Real-time notifications

```elixir
# Monitor a notification stream
def monitor_notifications(config) do
  {:ok, stream} = API.stream_get("/notifications", config: config)

  stream.stream
  |> Stream.each(fn notification ->
    send_alert(notification["severity"], notification["message"])
  end)
  |> Stream.run()
end
```

### Event aggregation

```elixir
# Collect events over a time window
def collect_metrics(config, duration_ms) do
  {:ok, stream} = API.stream_get("/metrics", config: config)

  task = Task.async(fn ->
    stream.stream
    |> Enum.take_while(fn _ ->
      # Could implement time-based cutoff here
      true
    end)
    |> Enum.to_list()
  end)

  Task.await(task, duration_ms)
end
```

### Progressive data loading

```elixir
# Load large datasets progressively
def stream_dataset(config, dataset_id) do
  path = "/datasets/#{dataset_id}/stream"
  {:ok, stream} = API.stream_get(path, config: config)

  stream.stream
  |> Stream.chunk_every(100)  # Process in batches
  |> Stream.each(&process_batch/1)
  |> Stream.run()
end
```

## Current Limitations

The streaming implementation in Tinkex is intentionally minimal and focused on SSE parsing:

1. **No built-in reconnection** - Reconnection logic must be implemented by the caller
2. **No automatic retry** - Unlike regular API calls, streaming endpoints don't auto-retry
3. **Buffered delivery** - Currently `stream_get/2` buffers the full response before parsing
4. **Limited endpoint support** - Check API documentation to confirm which endpoints support streaming

For production streaming applications requiring reconnection, heartbeat monitoring, or true incremental processing, consider wrapping the SSE decoder in a GenServer or supervision tree that implements these features.

## What to read next

- API overview: `docs/guides/api_reference.md`
- Error handling and categories: `docs/guides/troubleshooting.md`
- Configuration and timeouts: `docs/guides/getting_started.md`