guides/integrations.md

# Integration Guide

ExDataSketch sketches integrate naturally with the Elixir ecosystem through
pure functions — no adapters or special dependencies required.

Every sketch module (`HLL`, `CMS`, `Theta`) exposes four convenience functions:

| Function             | Purpose                                      |
|----------------------|----------------------------------------------|
| `from_enumerable/2`  | Build a sketch from any `Enumerable`         |
| `merge_many/1`       | Merge a collection of sketches               |
| `reducer/1`          | Returns a 2-arity function for reduce chains |
| `merger/1`           | Returns a 2-arity function for merging       |

## Enum and Stream

### Building a sketch from a collection

```elixir
# One-liner with from_enumerable
sketch = HLL.from_enumerable(user_ids, p: 14)
HLL.estimate(sketch)

# Equivalent long form
sketch = HLL.new(p: 14) |> HLL.update_many(user_ids)
```

### Chunked streaming updates

For large datasets that don't fit in memory, use `Stream.chunk_every/2`
with `update_many/2`:

```elixir
File.stream!("events.csv")
|> Stream.map(&parse_user_id/1)
|> Stream.chunk_every(10_000)
|> Enum.reduce(HLL.new(p: 14), fn chunk, sketch ->
  HLL.update_many(sketch, chunk)
end)
|> HLL.estimate()
```

### Using reducer/1

The `reducer/1` function returns a function compatible with `Enum.reduce/3`:

```elixir
reducer_fn = HLL.reducer()
sketch = Enum.reduce(user_ids, HLL.new(), reducer_fn)
```

## Flow

[Flow](https://hex.pm/packages/flow) provides parallel data processing.
Sketches are ideal Flow accumulators because merge is associative and
commutative.

### Partitioned cardinality counting

```elixir
alias ExDataSketch.HLL

File.stream!("events.csv")
|> Flow.from_enumerable()
|> Flow.partition()
|> Flow.reduce(fn -> HLL.new(p: 14) end, HLL.reducer())
|> Flow.departition(
  fn -> HLL.new(p: 14) end,
  HLL.merger(),
  & &1
)
|> Enum.to_list()
|> hd()
|> HLL.estimate()
```

### Parallel frequency counting

```elixir
alias ExDataSketch.CMS

File.stream!("queries.log")
|> Flow.from_enumerable()
|> Flow.partition()
|> Flow.reduce(fn -> CMS.new() end, CMS.reducer())
|> Flow.departition(
  fn -> CMS.new() end,
  CMS.merger(),
  & &1
)
|> Enum.to_list()
|> hd()
|> CMS.estimate("popular_query")
```

## Broadway

[Broadway](https://hex.pm/packages/broadway) processes data from message
queues. Sketches fit naturally in the batch processing pipeline.

### Per-batch sketch with GenServer aggregator

```elixir
defmodule MyPipeline do
  use Broadway

  alias ExDataSketch.HLL

  @impl true
  def handle_batch(_batcher, messages, _batch_info, _context) do
    items = Enum.map(messages, fn msg -> msg.data.user_id end)
    sketch = HLL.from_enumerable(items, p: 14)

    # Send partial sketch to an aggregator GenServer
    SketchAggregator.merge(sketch)

    messages
  end
end

defmodule SketchAggregator do
  use GenServer

  alias ExDataSketch.HLL

  def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  def merge(sketch), do: GenServer.cast(__MODULE__, {:merge, sketch})
  def estimate, do: GenServer.call(__MODULE__, :estimate)

  @impl true
  def init(_opts), do: {:ok, HLL.new(p: 14)}

  @impl true
  def handle_cast({:merge, sketch}, state) do
    {:noreply, HLL.merge(state, sketch)}
  end

  @impl true
  def handle_call(:estimate, _from, state) do
    {:reply, HLL.estimate(state), state}
  end
end
```

## Explorer

[Explorer](https://hex.pm/packages/explorer) provides DataFrames for Elixir.
Convert a Series to a list to feed into a sketch:

```elixir
alias ExDataSketch.HLL

df = Explorer.DataFrame.from_csv!("users.csv")

df["user_id"]
|> Explorer.Series.to_list()
|> HLL.from_enumerable(p: 14)
|> HLL.estimate()
```

For frequency estimation:

```elixir
alias ExDataSketch.CMS

df["search_query"]
|> Explorer.Series.to_list()
|> CMS.from_enumerable()
|> CMS.estimate("popular_query")
```

## Nx

[Nx](https://hex.pm/packages/nx) provides numerical computing. Sketch
operations work on individual values, not tensors, so convert to a flat list
first:

```elixir
alias ExDataSketch.HLL

tensor = Nx.tensor([1, 2, 3, 2, 1])

tensor
|> Nx.to_flat_list()
|> HLL.from_enumerable()
|> HLL.estimate()
```

> **Note:** Sketches operate on discrete items, not continuous numerical data.
> Use Nx for numerical operations and sketches for approximate counting.

## ExArrow (Apache Arrow IPC / Flight / ADBC)

[ExArrow](https://hex.pm/packages/ex_arrow) provides Apache Arrow support
for the BEAM: IPC stream and file reading, Arrow Flight clients, and ADBC
database connections.

### Arrow IPC stream -- chunked sketch aggregation

Arrow IPC streams deliver data as a sequence of RecordBatches. Build a
sketch per batch, then merge:

```elixir
alias ExDataSketch.HLL

{:ok, stream} = ExArrow.IPC.Reader.from_file("/data/events.arrows")
{:ok, _schema} = ExArrow.Stream.schema(stream)

stream
|> Stream.unfold(fn s ->
  case ExArrow.Stream.next(s) do
    nil -> nil
    {:error, _} -> nil
    batch -> {batch, s}
  end
end)
|> Stream.map(fn batch ->
  batch
  |> ExArrow.RecordBatch.column("user_id")
  |> ExArrow.Array.to_list()
  |> HLL.from_enumerable(p: 14)
end)
|> Enum.to_list()
|> HLL.merge_many()
|> HLL.estimate()
```

### Arrow IPC file -- random-access batch processing

IPC files support random access to individual batches, useful for parallel
sketch construction:

```elixir
alias ExDataSketch.{Bloom, FrequentItems}

{:ok, file} = ExArrow.IPC.File.from_file("/data/users.arrow")
n = ExArrow.IPC.File.batch_count(file)

# Build a Bloom filter of known user IDs across all batches
bloom =
  0..(n - 1)
  |> Task.async_stream(fn i ->
    {:ok, batch} = ExArrow.IPC.File.get_batch(file, i)
    ids = batch |> ExArrow.RecordBatch.column("user_id") |> ExArrow.Array.to_list()
    Bloom.from_enumerable(ids, capacity: 1_000_000)
  end)
  |> Enum.map(fn {:ok, sketch} -> sketch end)
  |> Bloom.merge_many()

# Build a FrequentItems sketch of search queries
top_queries =
  0..(n - 1)
  |> Enum.reduce(FrequentItems.new(k: 64), fn i, sketch ->
    {:ok, batch} = ExArrow.IPC.File.get_batch(file, i)
    queries = batch |> ExArrow.RecordBatch.column("query") |> ExArrow.Array.to_list()
    FrequentItems.update_many(sketch, queries)
  end)
  |> FrequentItems.top_k(limit: 20)
```

### ADBC -- query databases with sketch aggregation

Use ADBC with DuckDB to query Parquet files or databases and feed results
into sketches:

```elixir
alias ExDataSketch.CMS

{:ok, result} =
  Adbc.Connection.query(MyApp.Conn,
    "SELECT search_query FROM read_parquet('/data/queries/*.parquet')")

result
|> Adbc.Result.to_map()
|> Map.fetch!("search_query")
|> CMS.from_enumerable()
|> CMS.estimate("popular_query")
```

## ExZarr (Zarr v2/v3 N-dimensional arrays)

[ExZarr](https://hex.pm/packages/ex_zarr) provides chunked, compressed
N-dimensional arrays with multiple storage backends (filesystem, S3, GCS,
memory). Its `chunk_stream/1` returns a lazy enumerable -- ideal for
building sketches without loading the full array into memory.

### Chunk streaming -- memory-efficient sketch construction

```elixir
alias ExDataSketch.{HLL, KLL}

{:ok, array} = ExZarr.open(path: "/data/sensor_readings", storage: :filesystem)

# Count distinct sensor values using chunk streaming
# ExZarr returns raw binaries -- decode based on dtype
hll =
  ExZarr.Array.chunk_stream(array)
  |> Enum.reduce(HLL.new(p: 14), fn {_idx, bin}, sketch ->
    values = for <<v::float-64-little <- bin>>, do: v
    HLL.update_many(sketch, values)
  end)

IO.puts("Distinct values: #{HLL.estimate(hll)}")

# Compute quantiles over the same chunked data
kll =
  ExZarr.Array.chunk_stream(array)
  |> Enum.reduce(KLL.new(k: 200), fn {_idx, bin}, sketch ->
    values = for <<v::float-64-little <- bin>>, do: v
    KLL.update_many(sketch, values)
  end)

IO.puts("Median: #{KLL.quantile(kll, 0.5)}")
IO.puts("P99: #{KLL.quantile(kll, 0.99)}")
```

### Parallel chunk processing

For large arrays, process chunks in parallel and merge:

```elixir
alias ExDataSketch.DDSketch

{:ok, array} = ExZarr.open(path: "/data/latencies", storage: :s3,
  bucket: "metrics", prefix: "2026/03")

DDSketch =
  ExZarr.Array.chunk_stream(array, parallel: true)
  |> Task.async_stream(fn {_idx, bin} ->
    values = for <<v::float-64-little <- bin>>, do: v
    DDSketch.from_enumerable(values, alpha: 0.01)
  end, max_concurrency: System.schedulers_online())
  |> Enum.map(fn {:ok, sketch} -> sketch end)
  |> DDSketch.merge_many()

IO.puts("P50: #{DDSketch.quantile(sketch, 0.5)}")
IO.puts("P99: #{DDSketch.quantile(sketch, 0.99)}")
```

### Zarr + Nx bridge

When working with ExZarr's Nx integration, convert tensors to flat lists
for sketch consumption:

```elixir
alias ExDataSketch.HLL

{:ok, array} = ExZarr.open(path: "/data/experiment", storage: :filesystem)
{:ok, tensor} = ExZarr.Nx.to_tensor(array)

tensor
|> Nx.to_flat_list()
|> HLL.from_enumerable(p: 14)
|> HLL.estimate()
```

### Multi-array group analysis

Zarr groups organize related arrays hierarchically. Sketch each array
and compare:

```elixir
alias ExDataSketch.{HLL, CMS}

{:ok, group} = ExZarr.Group.open("/", storage: :filesystem, path: "/data/experiments")
array_names = ExZarr.Group.list_arrays(group)

# Build an HLL per array to compare cardinalities
sketches =
  for name <- array_names, into: %{} do
    {:ok, arr} = ExZarr.Group.get_array(group, name)
    {:ok, bin} = ExZarr.Array.to_binary(arr)
    values = for <<v::float-64-little <- bin>>, do: v
    {name, HLL.from_enumerable(values, p: 14)}
  end

for {name, sketch} <- sketches do
  IO.puts("#{name}: ~#{round(HLL.estimate(sketch))} distinct values")
end
```

## General chunked data pattern

The chunk-iterate-merge pattern works with any data source that provides
batched or chunked iteration:

```elixir
alias ExDataSketch.HLL

chunks
|> Enum.map(fn chunk ->
  chunk
  |> to_list()
  |> HLL.from_enumerable(p: 14)
end)
|> HLL.merge_many()
|> HLL.estimate()
```

This applies to `ex_arrow`, `ex_zarr`, custom Parquet readers, Kafka
consumer batches, or any other chunked data source.