# Streaming Responses in Ragex
This document explains the streaming response functionality.
## Overview
Ragex now supports streaming responses from all four AI providers:
- **OpenAI**: GPT-4, GPT-4-turbo, GPT-3.5-turbo (SSE format)
- **Anthropic**: Claude 3 Opus/Sonnet/Haiku (SSE format with event types)
- **DeepSeek**: deepseek-chat, deepseek-reasoner (SSE format, OpenAI-compatible)
- **Ollama**: Local LLMs (NDJSON format)
Streaming provides real-time response generation, allowing for:
- Progressive UI updates as content arrives
- Lower perceived latency for long responses
- Better user experience for interactive applications
- Token usage tracking in real-time
## Architecture
### Provider Level (lib/ragex/ai/provider/)
Each provider implements the `stream_generate/3` callback defined in `Ragex.AI.Behaviour`:
```elixir
@callback stream_generate(prompt :: String.t(), context :: map() | nil, opts) ::
{:ok, Enumerable.t(chunk())} | {:error, term()}
```
**Chunk format:**
```elixir
%{
content: String.t(), # Incremental content
done: boolean(), # true for final chunk
metadata: map() # Provider info, usage stats (on final chunk)
}
```
**Implementation pattern:**
1. Initiate HTTP streaming request with `Req.post(..., into: fn {:data, data}, {req, resp} -> ...)`
2. Use `Task.async` to handle streaming in separate process
3. Use `Stream.resource` to create Elixir stream from HTTP chunks
4. Parse SSE/NDJSON events and extract content deltas
5. Track token usage and include in final chunk metadata
**Error handling:**
- HTTP errors: `{:error, {:api_error, status, body}}`
- Network errors: `{:error, {:http_error, reason}}`
- Timeouts: 30-second receive timeout per provider
### Pipeline Level (lib/ragex/rag/pipeline.ex)
Three new streaming functions:
```elixir
# Query with streaming
Pipeline.stream_query(user_query, opts)
# Explain with streaming
Pipeline.stream_explain(target, aspect, opts)
# Suggest with streaming
Pipeline.stream_suggest(target, focus, opts)
```
**Features:**
- Automatic usage tracking (records tokens on final chunk)
- Source attribution (added to final chunk metadata)
- Rate limiting (checked before starting stream)
- Retrieval context injection (same as non-streaming)
**Options:**
- `:stream_metadata` - Include sources in every chunk (default: false)
- All standard RAG options (`:limit`, `:threshold`, `:provider`, etc.)
### MCP Tools Level (lib/ragex/mcp/handlers/tools.ex)
Three new MCP tools:
```text
rag_query_stream - Streaming version of rag_query
rag_explain_stream - Streaming version of rag_explain
rag_suggest_stream - Streaming version of rag_suggest
```
## Usage Examples
### Direct Provider Usage
```elixir
alias Ragex.AI.Provider.OpenAI
# Start streaming
{:ok, stream} = OpenAI.stream_generate(
"Explain this code",
%{context: "def foo, do: :bar"},
temperature: 0.7
)
# Consume stream
Enum.each(stream, fn
%{done: false, content: chunk} ->
IO.write(chunk) # Print as it arrives
%{done: true, metadata: meta} ->
IO.puts("\n\nUsage: #{inspect(meta.usage)}")
{:error, reason} ->
IO.puts("Error: #{inspect(reason)}")
end)
```
### RAG Pipeline Usage
```elixir
alias Ragex.RAG.Pipeline
# Stream a query
{:ok, stream} = Pipeline.stream_query("How does auth work?", limit: 5)
# Accumulate content
content =
stream
|> Stream.filter(fn %{done: done} -> not done end)
|> Stream.map(fn %{content: c} -> c end)
|> Enum.join()
# Get final metadata
final_chunk =
stream
|> Enum.find(fn %{done: done} -> done end)
IO.puts("Response: #{content}")
IO.puts("Sources: #{length(final_chunk.metadata.sources)}")
```
### MCP Tool Usage
Via MCP client:
```json
{
"jsonrpc": "2.0",
"method": "tools/call",
"params": {
"name": "rag_query_stream",
"arguments": {
"query": "Explain the authentication flow",
"limit": 5,
"provider": "openai",
"show_chunks": true
}
}
}
```
Response:
```json
{
"status": "success",
"query": "Explain the authentication flow",
"response": "The authentication flow consists of...",
"sources_count": 3,
"model_used": "gpt-4-turbo",
"streaming": true,
"chunks_count": 12,
"chunks": [...] // Only if show_chunks: true
}
```
## Protocol Details
### OpenAI SSE Format
```text
data: {"choices":[{"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"choices":[{"delta":{"content":" world"},"finish_reason":null}]}
data: {"choices":[{"delta":{},"finish_reason":"stop"}],"usage":{"prompt_tokens":10,"completion_tokens":5}}
data: [DONE]
```
### Anthropic SSE Format
```text
event: message_start
data: {"type":"message_start","message":{"usage":{"input_tokens":10}}}
event: content_block_delta
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Hello"}}
event: content_block_delta
data: {"type":"content_block_delta","delta":{"type":"text_delta","text":" world"}}
event: message_delta
data: {"type":"message_delta","usage":{"output_tokens":5}}
event: message_stop
data: {"type":"message_stop"}
```
### Ollama NDJSON Format
```json
{"model":"codellama","response":"Hello","done":false}
{"model":"codellama","response":" world","done":false}
{"model":"codellama","response":"","done":true}
```
## Performance Characteristics
**Latency:**
- First chunk: ~200-500ms (same as non-streaming)
- Subsequent chunks: ~50-100ms intervals
- Total time: Same as non-streaming (no overhead)
**Token Usage:**
- Tracked identically to non-streaming
- Reported in final chunk metadata
- Recorded via Usage module for cost tracking
**Memory:**
- Constant memory per stream (buffering only incomplete events)
- No accumulation until explicitly collected
**Cancellation:**
- Streams can be stopped early by halting enumeration
- Task cleanup via Stream.resource cleanup function
- 30-second receive timeout prevents hanging
## Error Scenarios
| Error | When | Handling |
|-------|------|----------|
| API Error (4xx/5xx) | HTTP status != 200 | `{:error, {:api_error, status, body}}` |
| Network Error | Connection lost | `{:error, {:http_error, reason}}` |
| Timeout | No data for 30s | `{:error, :timeout}` in stream |
| Rate Limit | Before request | `{:error, {:rate_limited, reason}}` |
| Invalid JSON | SSE/NDJSON parse | Skip chunk, continue stream |
## Configuration
No additional configuration required. Streaming uses the same provider settings as non-streaming:
```elixir
config :ragex, :ai_providers,
openai: [
endpoint: "https://api.openai.com/v1",
model: "gpt-4-turbo",
options: [
temperature: 0.7,
max_tokens: 2048
]
]
```
## What’s there
1. **Full MCP Streaming Protocol**
- Emit JSON-RPC notifications for each chunk
- Cancellation support via MCP protocol
- Progress indicators
2. **Advanced Features**
- Stream caching (cache reconstructed from chunks)
- Concurrent multi-provider streaming (race/merge strategies)
- Stream transformations (filtering, augmentation)
3. **Performance Optimizations**
- Adaptive buffering based on chunk size
- Connection pooling for multiple streams
- Predictive prefetching
## Limitations
**Protocol:**
- OpenAI: Requires `stream_options: %{include_usage: true}` for token counts
- Anthropic: Usage split across message_start and message_delta events
- Ollama: Token counts are estimated (not provided by API)
## Troubleshooting
**Stream hangs or times out:**
- Check network connectivity
- Verify API key is valid
- Increase timeout if needed (modify receive after clause)
**Chunks arrive slowly:**
- Normal behavior (depends on model response time)
- Larger prompts take longer to process
- Use faster models (gpt-3.5-turbo vs gpt-4)
**Missing final chunk:**
- Check for errors in stream
- Ensure stream is fully consumed
- Look for `:stream_done` or `:stream_error` messages
**Token counts are zero:**
- OpenAI: Ensure API supports stream_options
- Anthropic: Check for message_delta event
- Ollama: Counts are estimated, may be rough
## See Also
- `lib/ragex/ai/behaviour.ex` - Streaming callback definition
- `lib/ragex/ai/provider/*` - Provider implementations
- `lib/ragex/rag/pipeline.ex` - Pipeline streaming functions
- `lib/ragex/mcp/handlers/tools.ex` - MCP streaming tools