# Streaming Sketches
This guide explains how to use ExDataSketch with Elixir streams and the
`Collectable` protocol for idiomatic, memory-efficient sketch construction.
## Why Sketches Fit Streaming Architectures
Probabilistic sketches are natural stream consumers:
- **Bounded memory**: Sketches use a fixed amount of memory regardless of input
size. An HLL with `p: 14` uses 16 KiB whether it processes 1,000 or
1,000,000,000 items.
- **Single-pass**: Sketches ingest each item exactly once. No buffering or
multi-pass scans are needed.
- **Mergeable**: Most sketches support `merge/2`, which is associative and
commutative. This means partial results from parallel or partitioned
workers can be combined in any order.
- **No random access**: Sketches never need to revisit earlier items. This
makes them ideal for lazy streams and pipelines.
## ExDataSketch.Stream
The `ExDataSketch.Stream` module provides terminal stream consumers that
build sketches from lazy enumerables. Each function consumes the stream and
returns a completed sketch struct.
### Building Sketches from Streams
```elixir
# HLL cardinality estimation from a lazy stream
cardinality =
1..100_000
|> Stream.map(&to_string/1)
|> ExDataSketch.Stream.hll(p: 14)
|> ExDataSketch.HLL.estimate()
# CMS frequency estimation
freq =
File.stream!("access.log")
|> Stream.map(&parse_request_path/1)
|> ExDataSketch.Stream.cms(width: 2048, depth: 5)
|> ExDataSketch.CMS.estimate("/api/users")
```
### Available Stream Functions
| Function | Sketch |
|----------|--------|
| `hll/2` | HyperLogLog |
| `cms/2` | Count-Min Sketch |
| `theta/2` | Theta Sketch |
| `kll/2` | KLL Quantile Sketch |
| `ddsketch/2` | DDSketch |
| `req/2` | REQ Sketch |
| `ull/2` | UltraLogLog |
| `frequent_items/2` | FrequentItems (SpaceSaving) |
| `misra_gries/2` | Misra-Gries |
| `bloom/2` | Bloom Filter |
| `quotient/2` | Quotient Filter |
| `cqf/2` | Counting Quotient Filter |
| `iblt/2` | Invertible Bloom Lookup Table |
All stream functions delegate to the corresponding `from_enumerable/2`
function. No ingestion logic is duplicated.
### reduce_into/3
`reduce_into/3` accepts either a sketch module atom or an existing sketch
struct:
```elixir
# Create a new sketch from a module
sketch = ExDataSketch.Stream.reduce_into(items, ExDataSketch.HLL, p: 14)
# Accumulate into an existing sketch
existing = ExDataSketch.HLL.new(p: 14)
sketch = ExDataSketch.Stream.reduce_into(more_items, existing)
```
### reduce_partitioned/3
For large streams, `reduce_partitioned/3` splits work into chunks, builds a
sketch per chunk, and merges all partial results:
```elixir
sketch =
large_stream
|> ExDataSketch.Stream.reduce_partitioned(ExDataSketch.HLL, partitions: 8, p: 14)
```
The default partition count is `System.schedulers_online()`. Partition count
does not affect result accuracy for mergeable sketches because `merge/2` is
associative and commutative. It only affects throughput and memory usage
during intermediate stages.
## Collectable Protocol
All sketch types that support `merge/2` implement the `Collectable` protocol,
enabling `Enum.into/2` and `Enum.into/3`:
```elixir
# Build an HLL from a range
sketch = Enum.into(1..1000, ExDataSketch.HLL.new(p: 14))
# Build a CMS from a stream
sketch =
some_stream
|> Enum.into(ExDataSketch.CMS.new(width: 2048, depth: 5))
```
### Collectable Semantics
`Collectable.into/1` returns `{sketch, collector_fn}` where `collector_fn`
handles:
- `{:cont, item}` -- inserts the item via the sketch's `update/2` or `put/2`
- `:done` -- returns the completed sketch
- `:halt` -- discards the sketch and returns `:ok`
### Collectable vs from_enumerable
For performance-sensitive code, prefer `from_enumerable/2` or `update_many/2`
because they batch items internally. `Collectable` processes items one at a
time, which is correct but may be slower for very large collections.
| Pattern | Performance | Use case |
|---------|------------|----------|
| `from_enumerable/2` | Best (batched) | Building from a known collection |
| `update_many/2` | Best (batched) | Adding a batch to an existing sketch |
| `Enum.into/2` (Collectable) | Good (one at a time) | Pipeline integration, `for` comprehensions |
| `ExDataSketch.Stream.hll/2` | Same as `from_enumerable` | Lazy stream consumption |
| `reducer/1` + `Enum.reduce` | Good (one at a time) | Custom reduce chains |
### Supported Collectable Sketches
Every sketch that supports `merge/2` implements `Collectable`:
- HLL, CMS, Theta, KLL, DDSketch, REQ, ULL
- FrequentItems, MisraGries
- Bloom, Quotient, CQF, IBLT
Skipped sketches:
- **XorFilter** -- static construction requires all items up-front; not
mergeable.
- **Cuckoo** -- bounded capacity means `put/2` can return `{:error, :full}`;
`Collectable` has no error signalling mechanism.
- **Bloom** -- included in `Collectable` above, but note that Bloom filters
have fixed capacity; `put/2` does not return an error, but items added
beyond the intended capacity degrade the false-positive rate.
## Merge and Partition Awareness
Sketch merge operations are **associative** and **commutative**. This means:
```elixir
# These produce equivalent results
HLL.merge(HLL.merge(a, b), c) == HLL.merge(a, HLL.merge(b, c))
HLL.merge(a, b) == HLL.merge(b, a) # same cardinality
```
This property is what makes partition-local reduction safe. You can build
partial sketches on different workers, partitions, or time windows and merge
them later without worrying about order.
### Partition-Aware Reduction
```elixir
# Build partial HLLs per partition, then merge
partial_sketches =
0..3
|> Enum.map(fn partition ->
partition_data
|> ExDataSketch.HLL.from_enumerable(p: 14)
end)
final = ExDataSketch.HLL.merge_many(partial_sketches)
```
This pattern is exactly what `reduce_partitioned/3` automates:
```elixir
final = ExDataSketch.Stream.reduce_partitioned(data, ExDataSketch.HLL, partitions: 4, p: 14)
```
## Elixir Stream Reduction
Elixir's `Stream` module produces lazy enumerables. When you pipe a stream
into `ExDataSketch.Stream.hll/2`, the stream is consumed once and the sketch
accumulates each element. No intermediate list is created.
```elixir
# Lazy: never holds all items in memory
sketch =
File.stream!("large_file.csv")
|> Stream.map(&parse_line/1)
|> Stream.filter(&valid?/1)
|> ExDataSketch.Stream.hll(p: 14)
```
This works because `from_enumerable/2` uses `update_many/2` internally, which
chunks input and processes each chunk without materializing the entire stream.