# SparkEx
[](https://github.com/lukaszsamson/spark_ex/actions)
Native Elixir client for [Apache Spark](https://spark.apache.org/) via the
[Spark Connect](https://spark.apache.org/docs/latest/spark-connect-overview.html) protocol.
SparkEx communicates with Spark over gRPC using the official Spark Connect
protobuf contract, giving Elixir and Livebook first-class access to
distributed Spark SQL, DataFrames, streaming, and the catalog — without
a JVM in your application.
Targets **Spark 3.5 - 4.1**.
## Features
- **Lazy DataFrame API** — select, filter, join, group, window, set ops,
reshape, sampling, and more
- **590+ Spark SQL functions** — auto-generated from a declarative
registry, covering math, string, date/time, array, map, conditional, window,
and aggregate families
- **Reader/Writer** — Parquet, CSV, JSON, ORC, text, JDBC, plus generic
`format`/`load`/`save` and V2 DataSource writes
- **Structured Streaming** — `DataStreamReader`, `DataStreamWriter`,
`StreamingQuery`, and `StreamingQueryManager`
- **Catalog API** — browse and manage databases, tables, functions,
columns, and cache
- **Explorer integration** — materialize Spark results as
`Explorer.DataFrame` with automatic type mapping
- **Livebook / Kino** — DataFrames render as interactive tables with
schema, preview, and explain tabs
- **NA/Stat sub-APIs** — `fillna`, `dropna`, `describe`, `corr`, `cov`,
`crosstab`, `approx_quantile`, and more
- **MERGE INTO** — upsert builder with match/not-matched clauses and
schema evolution
- **UDF/UDTF registration** — register Java UDFs and Python UDTFs
- **Session lifecycle** — clone, release, interrupt by tag/operation,
reattachable execution with automatic mid-stream recovery
- **Observability** — Telemetry events for every RPC, retry, reattach,
batch, and progress update
## Installation
Add `spark_ex` to your list of dependencies in `mix.exs`:
```elixir
def deps do
[
{:spark_ex, "~> 0.1.0"},
# Optional — for Arrow decoding and to_explorer/2
{:explorer, "~> 0.10"},
# Optional — for Livebook rendering
{:kino, "~> 0.14"}
]
end
```
## Getting started
### Connect to Spark
```elixir
{:ok, session} = SparkEx.connect(url: "sc://localhost:15002")
{:ok, "4.1.1"} = SparkEx.spark_version(session)
```
### Create and query DataFrames
```elixir
import SparkEx.Functions
alias SparkEx.{DataFrame, Column}
df =
SparkEx.sql(session, """
SELECT * FROM VALUES
(1, 'Alice', 100), (2, 'Bob', 200), (3, 'Carol', 150)
AS t(id, name, salary)
""")
df
|> DataFrame.filter(Column.gt(col("salary"), lit(120)))
|> DataFrame.select([col("name"), col("salary")])
|> DataFrame.order_by([desc("salary")])
|> DataFrame.collect()
# => {:ok, [%{"name" => "Bob", "salary" => 200}, %{"name" => "Carol", "salary" => 150}]}
```
### Joins and aggregates
```elixir
employees = SparkEx.sql(session, """
SELECT * FROM VALUES (1, 'eng'), (2, 'hr'), (3, 'eng') AS t(id, dept)
""")
departments = SparkEx.sql(session, """
SELECT * FROM VALUES ('eng', 'Engineering'), ('hr', 'People') AS t(dept, name)
""")
employees
|> DataFrame.join(departments, ["dept"], :inner)
|> DataFrame.group_by(["name"])
|> SparkEx.GroupedData.agg([
Column.alias_(count(col("id")), "headcount")
])
|> DataFrame.collect()
# => {:ok, [%{"name" => "Engineering", "headcount" => 2}, %{"name" => "People", "headcount" => 1}]}
```
### Window functions
```elixir
alias SparkEx.{Window, WindowSpec}
df = SparkEx.sql(session, """
SELECT * FROM VALUES
('eng', 'Alice', 100), ('eng', 'Bob', 120), ('hr', 'Carol', 90)
AS t(dept, name, salary)
""")
w =
Window.partition_by(["dept"])
|> WindowSpec.order_by([col("salary") |> Column.desc()])
df
|> DataFrame.with_column("rank", row_number() |> Column.over(w))
|> DataFrame.collect()
```
### Read and write data
```elixir
alias SparkEx.{Reader, Writer}
# Read
df = Reader.parquet(session, "/data/events.parquet")
df = Reader.csv(session, "/data/users.csv", schema: "name STRING, age INT")
df = Reader.json(session, "/data/logs.json")
df = Reader.table(session, "my_catalog.my_table")
# Write
df |> DataFrame.write() |> Writer.parquet("/output/events.parquet")
df |> DataFrame.write() |> Writer.mode("overwrite") |> Writer.save_as_table("results")
```
### Create DataFrames from local data
```elixir
# From a list of maps
{:ok, df} = SparkEx.create_dataframe(session, [
%{"name" => "Alice", "age" => 30},
%{"name" => "Bob", "age" => 25}
])
# From an Explorer.DataFrame
explorer_df = Explorer.DataFrame.new(%{x: [1, 2, 3], y: ["a", "b", "c"]})
{:ok, df} = SparkEx.create_dataframe(session, explorer_df)
```
### Explorer integration
```elixir
# Materialize as Explorer.DataFrame (bounded by default: 10k rows / 64 MB)
{:ok, explorer_df} = DataFrame.to_explorer(df, max_rows: 1_000)
# Unbounded (use with care)
{:ok, explorer_df} = DataFrame.to_explorer(df, unsafe: true, max_rows: :infinity)
```
### Structured streaming
```elixir
alias SparkEx.{StreamReader, StreamWriter, StreamingQuery}
# Start a streaming query
{:ok, query} =
StreamReader.rate(session, rows_per_second: 10)
|> DataFrame.write_stream()
|> StreamWriter.format("memory")
|> StreamWriter.output_mode("append")
|> StreamWriter.query_name("my_stream")
|> StreamWriter.option("checkpointLocation", "/tmp/checkpoint")
|> StreamWriter.start()
# Monitor
{:ok, true} = StreamingQuery.is_active?(query)
{:ok, status} = StreamingQuery.status(query)
# Stop
:ok = StreamingQuery.stop(query)
```
### Catalog
```elixir
alias SparkEx.Catalog
{:ok, dbs} = Catalog.list_databases(session)
{:ok, tables} = Catalog.list_tables(session)
{:ok, cols} = Catalog.list_columns(session, "my_table")
:ok = Catalog.cache_table(session, "frequently_used")
{:ok, true} = Catalog.is_cached?(session, "frequently_used")
```
### Temporary views
```elixir
df = SparkEx.sql(session, "SELECT 1 AS id, 'hello' AS msg")
:ok = DataFrame.create_or_replace_temp_view(df, "greetings")
SparkEx.sql(session, "SELECT * FROM greetings") |> DataFrame.collect()
# => {:ok, [%{"id" => 1, "msg" => "hello"}]}
```
### NA handling and statistics
```elixir
df = SparkEx.sql(session, """
SELECT * FROM VALUES (1, 10.0), (2, null), (null, 30.0) AS t(id, score)
""")
# Fill nulls
DataFrame.fillna(df, %{"id" => 0, "score" => 0.0})
# Drop rows with nulls
DataFrame.dropna(df)
# Descriptive statistics
DataFrame.describe(df) |> DataFrame.collect()
```
### MERGE INTO
```elixir
alias SparkEx.{MergeIntoWriter, Column}
source = SparkEx.sql(session, "SELECT 2 AS id, 'Bobby' AS name")
DataFrame.merge_into(source, "target_table")
|> MergeIntoWriter.on(Column.eq(col("source.id"), col("target.id")))
|> MergeIntoWriter.when_matched_update_all()
|> MergeIntoWriter.when_not_matched_insert_all()
|> MergeIntoWriter.merge()
```
### Session management
```elixir
# Tag operations for selective interruption
df = SparkEx.sql(session, "SELECT * FROM big_table") |> DataFrame.tag("etl-job")
# Interrupt from another process
SparkEx.interrupt_tag(session, "etl-job")
SparkEx.interrupt_all(session)
# Clone a session (shares server state, independent client)
{:ok, session2} = SparkEx.clone_session(session)
# Release server resources
SparkEx.Session.release(session)
```
### Livebook
In Livebook, `SparkEx.DataFrame` structs render automatically with tabs for
Schema, Preview, Explain, and Raw via the `Kino.Render` protocol.
```elixir
# Explicit rendering helpers
SparkEx.Livebook.preview(df, num_rows: 50)
SparkEx.Livebook.explain(df, mode: :extended)
SparkEx.Livebook.schema(df)
```
See [`notebooks/spark_ex_demo.livemd`](notebooks/spark_ex_demo.livemd) for a
full interactive walkthrough.
### Telemetry
SparkEx emits [`:telemetry`](https://hex.pm/packages/telemetry) events you can
attach to for logging, metrics, or tracing:
| Event | Description |
| ---------------------------------- | ------------------------------- |
| `[:spark_ex, :rpc, :start\|stop]` | Every gRPC call |
| `[:spark_ex, :retry, :attempt]` | Transient-error retry |
| `[:spark_ex, :reattach, :attempt]` | Mid-stream reattach |
| `[:spark_ex, :result, :batch]` | Each Arrow batch received |
| `[:spark_ex, :result, :progress]` | Query progress update |
## Prerequisites
| Requirement | Notes |
| ------------------- | --------------------------------------------------------------------- |
| Elixir >= 1.15 | |
| Java 17 or 21 | For running the Spark Connect server |
| Spark >= 3.5 with spark-connect | For running the Spark Connect server |
| Explorer (optional) | Arrow IPC decoding and `to_explorer/2` |
| Kino (optional) | Livebook rendering |
`protoc` + `protoc-gen-elixir` are only needed if you regenerate the protobuf
stubs (see [Proto regeneration](#proto-regeneration) below).
## Running tests
### Unit tests
```bash
mix test
```
### Integration tests
Integration tests require a running Spark Connect server and are excluded by
default.
**One-command runner** (starts server, runs tests, tears down):
```bash
./test/run_integration.sh
```
**Manual setup:**
1. Download Spark:
```bash
curl -L -o /tmp/spark.tgz \
'https://dlcdn.apache.org/spark/spark-4.1.1/spark-4.1.1-bin-hadoop3-connect.tgz'
tar -xzf /tmp/spark.tgz -C test/
```
2. Start the server:
```bash
export JAVA_HOME="/opt/homebrew/opt/openjdk@21/libexec/openjdk.jdk/Contents/Home"
export SPARK_HOME="test/spark-4.1.1-bin-hadoop3-connect"
bash "$SPARK_HOME/sbin/start-connect-server.sh"
```
3. Run:
```bash
mix test --include integration
```
4. Or point at a remote server:
```bash
SPARK_REMOTE="sc://my-spark-host:15002" mix test --include integration
```
## Proto regeneration
The vendored `.proto` files in `priv/proto/spark/connect/` and the generated
Elixir modules in `lib/spark_ex/proto/` are checked in. You do **not** need
`protoc` for normal development.
To update protos from a new Spark release:
```bash
# Copy protos from Spark source
git clone --depth 1 --branch v4.1.1 https://github.com/apache/spark.git /tmp/spark
cp /tmp/spark/sql/connect/common/src/main/protobuf/spark/connect/*.proto \
priv/proto/spark/connect/
# Regenerate Elixir stubs
mix escript.install hex protobuf # one-time
mix spark_ex.gen_proto
mix format
```
## Project layout
```
lib/
spark_ex.ex # Public API entry point
spark_ex/
session.ex # Session GenServer
data_frame.ex # Lazy DataFrame API
data_frame/na.ex # Null-value handling
data_frame/stat.ex # Statistical operations
column.ex # Expression wrapper
functions.ex # 590+ auto-generated Spark SQL functions
grouped_data.ex # GroupedData (group_by + agg + pivot)
window.ex # Window convenience constructors
window_spec.ex # WindowSpec (partition, order, frame)
reader.ex # Batch readers (parquet, csv, json, etc.)
writer.ex # Batch writer (V1)
writer_v2.ex # V2 DataSource writer
stream_reader.ex # Streaming source reader
stream_writer.ex # Streaming sink writer
streaming_query.ex # StreamingQuery controls
streaming_query_manager.ex # Manage active streaming queries
catalog.ex # Catalog API
merge_into_writer.ex # MERGE INTO builder
udf_registration.ex # UDF/UDTF registration
livebook.ex # Livebook/Kino helpers
connect/
channel.ex # sc:// URI parser + gRPC channel
client.ex # Low-level gRPC calls (telemetry + retry)
plan_encoder.ex # DataFrame ops -> protobuf encoding
command_encoder.ex # Commands -> protobuf encoding
result_decoder.ex # Arrow IPC decoding
type_mapper.ex # Spark DataType <-> Explorer dtype
proto/spark/connect/*.pb.ex # Generated protobuf modules
priv/proto/spark/connect/ # Vendored Spark Connect protos (v4.1.1)
notebooks/spark_ex_demo.livemd # Interactive Livebook demo
test/unit/ # Unit tests (~1090 tests)
test/integration/ # Integration tests (~625 tests)
```
## Acknowledgements
SparkEx builds on the [Spark Connect](https://spark.apache.org/docs/latest/spark-connect-overview.html)
protocol introduced in Apache Spark 3.4 and stabilized in Spark 4.x.
## License
Copyright (c) 2026 Łukasz Samson
Licensed under the Apache License, Version 2.0. See [LICENSE](LICENSE) for details.