# SparkEx Livebook Demo
## Setup
Install SparkEx from the local path with its optional dependencies (Explorer for Arrow
decoding, Kino for interactive rendering).
**Prerequisites:** A Spark Connect server must be running on `localhost:15002`.
Start one with:
```bash
export JAVA_HOME="/opt/homebrew/opt/openjdk@21/libexec/openjdk.jdk/Contents/Home"
export SPARK_HOME="test/spark-4.1.1-bin-hadoop3-connect"
bash "$SPARK_HOME/sbin/start-connect-server.sh"
```
```elixir
Mix.install([
{:spark_ex, path: Path.expand("..", __DIR__)},
{:explorer, "~> 0.10"},
{:kino, "~> 0.14"}
])
```
## Connect to Spark
```elixir
{:ok, session} = SparkEx.connect(url: "sc://localhost:15002")
{:ok, version} = SparkEx.spark_version(session)
IO.puts("Connected to Spark #{version}")
```
## Kino.Render — automatic DataFrame rendering
Simply returning a `SparkEx.DataFrame` from a cell renders it with interactive tabs:
**Schema**, **Preview** (sortable/pageable `Kino.DataTable`), **Explain**, and **Raw**.
The preview tab uses `to_explorer/2` with a bounded LIMIT (100 rows by default),
so it is safe to run on large datasets without OOM.
### SQL DataFrame
```elixir
SparkEx.sql(session, """
SELECT * FROM VALUES
(1, 'Alice', 'Engineering', 95000),
(2, 'Bob', 'Engineering', 105000),
(3, 'Carol', 'Marketing', 88000),
(4, 'Dave', 'Marketing', 92000),
(5, 'Eve', 'Sales', 78000),
(6, 'Frank', 'Sales', 82000),
(7, 'Grace', 'Engineering', 115000),
(8, 'Hank', 'Marketing', 97000)
AS employees(id, name, department, salary)
""")
```
### Range DataFrame (large — preview is bounded)
This creates a DataFrame with 1,000,000 rows, but preview only fetches 100.
```elixir
SparkEx.range(session, 1_000_000)
```
## Transforms with Kino.Render
Transforms are lazy — they build up a plan without executing. The plan only runs
when the cell evaluates the DataFrame and `Kino.Render` kicks in.
Click the **Explain** tab to see the full query plan.
```elixir
import SparkEx.Functions
alias SparkEx.{DataFrame, Column}
SparkEx.sql(session, """
SELECT * FROM VALUES
(1, 'Alice', 'Engineering', 95000),
(2, 'Bob', 'Engineering', 105000),
(3, 'Carol', 'Marketing', 88000),
(4, 'Dave', 'Marketing', 92000),
(5, 'Eve', 'Sales', 78000),
(6, 'Frank', 'Sales', 82000),
(7, 'Grace', 'Engineering', 115000),
(8, 'Hank', 'Marketing', 97000)
AS employees(id, name, department, salary)
""")
|> DataFrame.filter(col("salary") |> Column.gt(lit(85000)))
|> DataFrame.with_column("bonus", col("salary") |> Column.multiply(lit(0.1)))
|> DataFrame.order_by([col("salary") |> Column.desc()])
```
## SparkEx.Livebook helpers
The helper functions return Kino terms directly. They do **not** call `Kino.render/1`
internally — you control when and how they are rendered.
### Preview with options
`preview/2` returns a `Kino.DataTable` with sorting enabled. Pass `:num_rows`
to control how many rows are fetched (bounded query, safe for large tables).
```elixir
employees =
SparkEx.sql(session, """
SELECT * FROM VALUES
(1, 'Alice', 'Engineering', 95000),
(2, 'Bob', 'Engineering', 105000),
(3, 'Carol', 'Marketing', 88000),
(4, 'Dave', 'Marketing', 92000),
(5, 'Eve', 'Sales', 78000),
(6, 'Frank', 'Sales', 82000),
(7, 'Grace', 'Engineering', 115000),
(8, 'Hank', 'Marketing', 97000)
AS employees(id, name, department, salary)
""")
SparkEx.Livebook.preview(employees, num_rows: 50, name: "Employees")
```
### Schema
```elixir
SparkEx.Livebook.schema(employees)
```
### Explain
```elixir
SparkEx.Livebook.explain(employees, mode: :extended)
```
## Joins and aggregates
```elixir
departments =
SparkEx.sql(session, """
SELECT * FROM VALUES
('Engineering', 'Building 1'),
('Marketing', 'Building 2'),
('Sales', 'Building 3')
AS departments(department, building)
""")
result =
DataFrame.join(employees, departments, ["department"], :inner)
|> DataFrame.group_by(["department", "building"])
|> SparkEx.GroupedData.agg([
Column.alias_(count(col("id")), "headcount"),
Column.alias_(sum(col("salary")), "total_salary"),
Column.alias_(avg(col("salary")), "avg_salary")
])
|> DataFrame.order_by([col("department")])
result
```
## to_explorer/2 — bounded materialization
`to_explorer/2` returns an `Explorer.DataFrame`. By default, it injects a LIMIT
into the Spark plan to prevent unbounded collection and OOM.
```elixir
{:ok, explorer_df} = DataFrame.to_explorer(result)
explorer_df
```
### Demonstrating OOM protection
Attempting to materialize a large dataset without limits raises `LimitExceeded`:
```elixir
large = SparkEx.range(session, 100_000)
case DataFrame.to_explorer(large, max_rows: 1_000) do
{:ok, df} ->
IO.puts("Got #{Explorer.DataFrame.n_rows(df)} rows (bounded by LIMIT injection)")
df
{:error, %SparkEx.Error.LimitExceeded{} = e} ->
IO.puts("Protected from OOM: #{Exception.message(e)}")
end
```
### Unsafe mode (opt-in)
Pass `unsafe: true` to skip LIMIT injection. Use only when you know the result fits in memory.
```elixir
small = SparkEx.range(session, 25)
{:ok, explorer_df} = DataFrame.to_explorer(small, unsafe: true)
IO.puts("Collected #{Explorer.DataFrame.n_rows(explorer_df)} rows (no LIMIT)")
explorer_df
```
## Reading files from disk
```elixir
spark_home = Path.expand("../test/spark-4.1.1-bin-hadoop3-connect", __DIR__)
resources = Path.join(spark_home, "examples/src/main/resources")
SparkEx.Reader.json(session, Path.join(resources, "employees.json"))
|> DataFrame.order_by([col("salary") |> Column.desc()])
```
```elixir
SparkEx.Reader.csv(
session,
Path.join(resources, "people.csv"),
options: %{"delimiter" => ";", "header" => "true"},
schema: "name STRING, age INT, job STRING"
)
```
## Set operations
```elixir
top_earners =
SparkEx.sql(session, """
SELECT * FROM VALUES ('Alice'), ('Bob'), ('Grace') AS t(name)
""")
engineers =
SparkEx.sql(session, """
SELECT * FROM VALUES ('Alice'), ('Bob'), ('Grace') AS t(name)
""")
marketers =
SparkEx.sql(session, """
SELECT * FROM VALUES ('Carol'), ('Dave'), ('Hank') AS t(name)
""")
Kino.Layout.tabs([
{"Union", SparkEx.Livebook.preview(DataFrame.union(engineers, marketers), name: "Union")},
{"Intersect",
SparkEx.Livebook.preview(DataFrame.intersect(top_earners, engineers), name: "Intersect")},
{"Except",
SparkEx.Livebook.preview(DataFrame.except(top_earners, marketers), name: "Except")}
])
```
## Cleanup
```elixir
SparkEx.Session.stop(session)
:ok
```