# Streaming
Streaming is the primitive in ALLM. The non-streaming entry points
(`generate/3`, `step/3`, `chat/3`) are reducers over a stream — every
provider adapter implements both shapes, but the streaming path is the
canonical one. This guide covers when to stream, what events to expect,
and how to control the firehose.
## When to stream
Reach for `stream_generate/3` or `stream/3` when you want incremental
output — typing-effect UIs, progressive tool-call dispatch, latency
hiding for long completions. The non-streaming variants give you a
single `%Response{}` (or `%ChatResult{}`) at the end; the streaming
variants give you a lazy `Enumerable` of `t:ALLM.Event.t/0` values you fold
over yourself.
## The simplest stream
iex> engine = ALLM.Engine.new(
...> adapter: ALLM.Providers.Fake,
...> adapter_opts: [
...> stream_script: [[
...> {:text_delta, "Hello"},
...> {:text_delta, ", "},
...> {:text_delta, "world"},
...> {:finish, :stop}
...> ]]
...> ]
...> )
iex> req = ALLM.request([ALLM.user("hi")])
iex> {:ok, stream} = ALLM.stream_generate(engine, req)
iex> Enum.any?(Enum.to_list(stream), &match?({:message_completed, _}, &1))
true
`stream_generate/3` returns `{:ok, stream}`. The stream is lazy; it
doesn't dispatch to the provider until you start consuming. Common
consumption patterns:
```elixir
# Print every text delta as it arrives.
{:ok, stream} = ALLM.stream_generate(engine, req)
stream
|> Stream.each(fn
{:text_delta, %{delta: chunk}} -> IO.write(chunk)
_ -> :ok
end)
|> Stream.run()
```
```elixir
# Collect the full response synchronously (equivalent to generate/3).
{:ok, stream} = ALLM.stream_generate(engine, req)
{:ok, response} = ALLM.StreamCollector.collect(stream)
```
## The event union
Every event is a tagged tuple with a payload map. The closed set:
| Tag | When it fires | Payload keys |
|---|---|---|
| `{:request_started, _}` | Before the first byte to the provider | `:request`, `:engine_summary` |
| `{:text_delta, _}` | Each chunk of assistant text | `:delta`, `:cumulative` |
| `{:tool_call_delta, _}` | Each chunk of a tool-call argument blob | `:index`, `:delta`, `:cumulative_args` |
| `{:tool_call, _}` | A complete tool call has assembled | `:tool_call` |
| `{:tool_result, _}` | A tool executed and returned a result | `:tool_call_id`, `:result`, `:status` |
| `{:message_completed, _}` | The assistant message finished | `:response` |
| `{:step_completed, _}` | One round-trip completed (chat/3 emits this each loop iteration) | `:step_result`, `:mode` |
| `{:halted, _}` | The chat loop halted (manual mode, ask-user, etc.) | `:reason`, `:metadata` |
| `{:raw_chunk, _}` | Provider-native chunk passthrough (when `:include_raw_chunks` is on) | `:chunk` |
| `{:error, _}` | Mid-stream error from the provider | `:error` |
Pattern-matching on a payload key is **not exhaustive** — adding new
keys to a payload map is non-breaking. Match on the leading tag.
## Filter opts
Most consumers don't need every event. `stream_generate/3` and
`stream/3` accept filter options:
* `:emit_text_deltas` (default `true`) — set to `false` to drop
`:text_delta` events.
* `:emit_tool_deltas` (default `true`) — set to `false` to drop
`:tool_call_delta` events; you'll still receive the assembled
`:tool_call` event.
* `:include_raw_chunks` (default `false`) — set to `true` to receive
`:raw_chunk` events with provider-native chunks (useful for
passthrough proxies).
* `:on_event` — a 1-arity function called on every event before the
consumer sees it. Useful for telemetry instrumentation that doesn't
need to mutate the stream.
```elixir
{:ok, stream} = ALLM.stream_generate(engine, req,
emit_tool_deltas: false,
on_event: fn event -> :telemetry.execute([:my_app, :llm, :event], %{}, %{event: event}) end
)
```
## stream/3 — the multi-turn streaming loop
`ALLM.stream/3` is `chat/3` plus streaming. It runs the auto-loop —
calling tools as they're requested, feeding results back in, looping
until the model stops asking — and emits events the entire way. You'll
see a `:step_completed` event per loop iteration, and a final
`:message_completed` when the loop exits.
iex> engine = ALLM.Engine.new(
...> adapter: ALLM.Providers.Fake,
...> adapter_opts: [
...> stream_script: [[
...> {:text_delta, "done"},
...> {:finish, :stop}
...> ]]
...> ]
...> )
iex> {:ok, stream} = ALLM.stream(engine, [ALLM.user("hi")])
iex> events = Enum.to_list(stream)
iex> Enum.any?(events, &match?({:message_completed, _}, &1))
true
## Cancellation and cleanup
The stream returned by `stream_generate/3` / `stream/3` is built on
`Stream.resource/3`. When the consumer halts early — `Enum.take/2`,
breaking out of `Enum.reduce_while/3`, the consumer process crashing —
the resource's `after_fun` runs and tears down the underlying HTTP
connection. You don't need to call any explicit cancel function.
If you want to halt the stream after a fixed number of text deltas:
```elixir
{:ok, stream} = ALLM.stream_generate(engine, req)
stream
|> Stream.filter(&match?({:text_delta, _}, &1))
|> Enum.take(10)
```
Taking 10 elements halts the stream; the underlying HTTP connection is
closed automatically.
## Mid-stream errors
A mid-stream `{:error, struct}` event surfaces as
`{:ok, %Response{finish_reason: :error, metadata: %{error: struct}}}`
from the non-streaming variants — the call-site tuple stays `{:ok, _}`.
For streaming consumers, you see the `{:error, _}` event in the stream
itself. The stream then ends; subsequent enumeration yields no further
events.
```elixir
{:ok, stream} = ALLM.stream_generate(engine, req)
Enum.each(stream, fn
{:error, %ALLM.Error.AdapterError{reason: :rate_limited}} ->
IO.puts("backing off")
{:text_delta, %{delta: chunk}} ->
IO.write(chunk)
_ ->
:ok
end)
```
If you're using `StreamCollector.collect/1`, the collector folds the
mid-stream error into the response struct's `:metadata.error` field and
returns `{:ok, response}` with `finish_reason: :error`. Pre-flight
errors (validation failures, missing adapter) surface as `{:error, _}`
at the `stream_generate/3` call site, before the stream is built.
## Where to next
* `tools.md` — streaming + tool calls.
* `errors_and_retries.md` — retry policy for transient errors.
* `examples/02_streaming_text.exs` — runnable smoke test against any
provider.