# Pipelines
The Pipeline system provides composable RAG workflows with parallel execution, caching, and error handling.
## Overview
Pipelines consist of:
- **Steps** - Individual processing units
- **Context** - Shared state between steps
- **Executor** - Runs steps with caching/retry/telemetry
## Creating a Pipeline
```elixir
alias Rag.Pipeline
alias Rag.Pipeline.Step
pipeline = Pipeline.new(:rag_pipeline, description: "Complete RAG workflow")
```
### Adding Steps
```elixir
pipeline = Pipeline.add_step(pipeline,
name: :embed_query,
module: Steps,
function: :embed_query,
args: [model: "gemini"],
timeout: 10_000,
on_error: {:retry, 2},
cache: true
)
```
### Step Options
| Option | Default | Description |
|--------|---------|-------------|
| `name` | required | Step identifier (atom) |
| `module` | required | Module containing function |
| `function` | required | Function to call |
| `args` | `[]` | Arguments passed to function |
| `inputs` | `nil` | Dependencies on previous steps |
| `parallel` | `false` | Run concurrently |
| `on_error` | `:halt` | Error handling strategy |
| `cache` | `false` | Cache results with ETS |
| `timeout` | `nil` | Timeout in milliseconds |
## Step Functions
Every step function must have this signature:
```elixir
def step_name(input, context, opts) do
# input: Result from previous step(s)
# context: Pipeline.Context struct
# opts: Keyword list from step's :args
# Return one of:
{:ok, result}
{:ok, result, updated_context}
{:error, reason}
end
```
### Example Step
```elixir
defmodule MySteps do
alias Rag.Pipeline.Context
def embed_query(query, context, opts) do
router = opts[:router]
case Router.execute(router, :embeddings, [query], []) do
{:ok, [embedding], _} ->
updated_context = %{context | query_embedding: embedding}
{:ok, embedding, updated_context}
{:error, reason} ->
{:error, reason}
end
end
end
```
## Context
The context holds state throughout pipeline execution:
```elixir
alias Rag.Pipeline.Context
# Create context
context = Context.new("What is Elixir?")
# Context structure
%Context{
input: any(), # Original input
query: String.t() | nil,
query_embedding: [float()] | nil,
retrieval_results: list(),
reranked_results: list(),
context_text: String.t() | nil,
response: String.t() | nil,
metadata: %{step_results: %{}},
errors: []
}
```
### Context API
```elixir
# Store step result
context = Context.put_step_result(context, :embed, embedding)
# Get step result
embedding = Context.get_step_result(context, :embed)
# Store metadata
context = Context.put_metadata(context, :user_id, 123)
# Add error (for :continue strategy)
context = Context.add_error(context, {:step_failed, :rerank})
```
## Input Dependencies
### No Dependencies (Default)
Step receives output from previous step:
```elixir
Pipeline.add_step(pipeline, name: :step2, ...)
# step2 receives output from step1
```
### Single Dependency
```elixir
Pipeline.add_step(pipeline,
name: :generate,
inputs: [:retrieve],
...
)
# generate receives the :retrieve step's result
```
### Multiple Dependencies
```elixir
Pipeline.add_step(pipeline,
name: :combine,
inputs: [:semantic_search, :fulltext_search],
...
)
# combine receives: %{semantic_search: [...], fulltext_search: [...]}
```
## Error Handling
### `:halt` (Default)
Stop pipeline immediately on error:
```elixir
Pipeline.add_step(pipeline,
name: :critical,
on_error: :halt # Pipeline stops if this fails
)
```
### `:continue`
Log error but continue execution:
```elixir
Pipeline.add_step(pipeline,
name: :optional_rerank,
on_error: :continue # Skip if fails, continue pipeline
)
```
### `{:retry, n}`
Retry up to n times:
```elixir
Pipeline.add_step(pipeline,
name: :api_call,
on_error: {:retry, 3} # Retry up to 3 times
)
```
## Parallel Execution
Independent steps can run concurrently:
```elixir
Pipeline.new(:hybrid_search)
|> Pipeline.add_step(name: :embed, ...)
|> Pipeline.add_step(
name: :semantic_search,
inputs: [:embed],
parallel: true # Runs in parallel
)
|> Pipeline.add_step(
name: :fulltext_search,
inputs: [:embed],
parallel: true # Runs in parallel
)
|> Pipeline.add_step(
name: :combine,
inputs: [:semantic_search, :fulltext_search] # Waits for both
)
```
## Caching
Cache expensive operations with ETS:
```elixir
Pipeline.add_step(pipeline,
name: :embed,
cache: true # Results cached in ETS
)
```
**Benefits:**
- Same input skips execution, uses cache
- Persists across pipeline runs
- Useful for embeddings, expensive computations
**Performance:**
```
First run: 3500ms (embedding computed)
Second run: 2100ms (embedding cached)
Speedup: ~40%
```
## Timeouts
Prevent hanging on slow steps:
```elixir
Pipeline.add_step(pipeline,
name: :llm_generate,
timeout: 30_000 # 30 second timeout
)
```
## Telemetry
Pipeline emits telemetry events:
```elixir
# Start event
[:rag, :pipeline, :step, :start]
# Metadata: %{pipeline: :name, step: :step_name, attempt: 0}
# Stop event
[:rag, :pipeline, :step, :stop]
# Measurements: %{duration: microseconds}
# Exception event
[:rag, :pipeline, :step, :exception]
# Metadata: %{pipeline: :name, step: :step_name, error: reason}
```
### Attaching Handlers
```elixir
:telemetry.attach(
"pipeline-logger",
[:rag, :pipeline, :step, :stop],
fn _event, measurements, metadata, _config ->
IO.puts("Step #{metadata.step} completed in #{measurements.duration}μs")
end,
nil
)
```
## Complete Example
```elixir
defmodule MyApp.RAGPipeline do
alias Rag.Pipeline
alias Rag.Pipeline.Context
def build(router, retriever) do
Pipeline.new(:rag_pipeline, description: "Complete RAG")
|> Pipeline.add_step(
name: :embed_query,
module: __MODULE__,
function: :embed_query,
args: [router: router],
timeout: 10_000,
on_error: {:retry, 2},
cache: true
)
|> Pipeline.add_step(
name: :retrieve,
module: __MODULE__,
function: :retrieve,
args: [retriever: retriever],
inputs: [:embed_query],
timeout: 5_000
)
|> Pipeline.add_step(
name: :rerank,
module: __MODULE__,
function: :rerank,
args: [router: router],
inputs: [:retrieve],
on_error: :continue # Optional step
)
|> Pipeline.add_step(
name: :generate,
module: __MODULE__,
function: :generate,
args: [router: router],
inputs: [:rerank],
timeout: 30_000
)
end
def embed_query(query, context, opts) do
router = opts[:router]
case Router.execute(router, :embeddings, [query], []) do
{:ok, [embedding], _} ->
{:ok, embedding, %{context | query: query, query_embedding: embedding}}
{:error, reason} ->
{:error, reason}
end
end
def retrieve(embedding, context, opts) do
retriever = opts[:retriever]
case Retriever.retrieve(retriever, {embedding, context.query}, limit: 10) do
{:ok, results} ->
{:ok, results, %{context | retrieval_results: results}}
{:error, reason} ->
{:error, reason}
end
end
def rerank(results, context, opts) do
router = opts[:router]
reranker = Rag.Reranker.LLM.new(router: router)
case Rag.Reranker.rerank(reranker, context.query, results, top_k: 5) do
{:ok, reranked} ->
{:ok, reranked, %{context | reranked_results: reranked}}
{:error, _} ->
# Return original results if reranking fails
{:ok, results}
end
end
def generate(results, context, opts) do
router = opts[:router]
context_text = Enum.map(results, & &1.content) |> Enum.join("\n\n")
prompt = """
Answer based on context:
#{context_text}
Question: #{context.query}
"""
case Router.execute(router, :text, prompt, []) do
{:ok, response, _} ->
{:ok, response, %{context | response: response, context_text: context_text}}
{:error, reason} ->
{:error, reason}
end
end
end
# Usage
{:ok, router} = Router.new(providers: [:gemini])
retriever = %Rag.Retriever.Hybrid{repo: Repo}
pipeline = MyApp.RAGPipeline.build(router, retriever)
context = Context.new("What is GenServer?")
case Pipeline.execute(pipeline, context.input) do
{:ok, response, final_context} ->
IO.puts("Answer: #{response}")
IO.puts("Used #{length(final_context.retrieval_results)} documents")
{:error, reason} ->
IO.puts("Pipeline failed: #{inspect(reason)}")
end
```
## Configuration Best Practices
### Timeouts
| Step Type | Suggested Timeout |
|-----------|-------------------|
| Embedding | 10-15 seconds |
| Database query | 5 seconds |
| LLM generation | 30-60 seconds |
| Overall pipeline | Sum + buffer |
### Caching
Enable for:
- Embeddings (deterministic)
- Expensive computations
- Repeated queries
Disable for:
- User-specific results
- Time-sensitive data
### Error Handling
| Step Type | Strategy |
|-----------|----------|
| Critical (embedding, retrieval) | `:halt` or `{:retry, 2}` |
| Enhancement (reranking) | `:continue` |
| External APIs | `{:retry, 3}` |
## Next Steps
- [Retrievers](retrievers.md) - Retrieval strategies for pipelines
- [Rerankers](rerankers.md) - Improve retrieval quality
- [Agent Framework](agent_framework.md) - Integrate agents in pipelines