<p align="center">
<img src="assets/flowstone.svg" alt="FlowStone Logo" width="220" height="248">
</p>
# FlowStone
[](https://github.com/nshkrdotcom/flowstone/actions/workflows/ci.yml)
[](https://hex.pm/packages/flowstone)
[](https://hexdocs.pm/flowstone)
[](https://github.com/nshkrdotcom/flowstone/blob/main/LICENSE)
**Asset-first data orchestration for the BEAM.**
FlowStone is an orchestration library for Elixir that treats *assets* (named data artifacts) as the primary abstraction. It's designed for building reliable, auditable data pipelines where persistence, lineage tracking, and operational visibility matter.
## When to Use FlowStone
FlowStone is the right choice when you need:
| Use Case | Why FlowStone |
|----------|---------------|
| **ETL/ELT pipelines** | Partition-aware materialization, dependency tracking |
| **Data warehouse orchestration** | Scheduled jobs, backfill support, lineage queries |
| **Audit-sensitive workflows** | Comprehensive audit logging, approval gates |
| **Durable job execution** | Oban-backed persistence, jobs survive restarts |
| **Multi-format data storage** | Pluggable I/O managers (PostgreSQL, S3, Parquet) |
### Ideal Workloads
- Daily/hourly batch processing with date-partitioned data
- Data transformation pipelines with explicit dependencies
- Workflows requiring human approval checkpoints
- Systems where "what produced this data?" matters (lineage)
- Long-running jobs that must not be lost on node failure
## When to Use Something Else
FlowStone is **not** the best fit for:
| Use Case | Better Alternative |
|----------|-------------------|
| **Real-time distributed compute** | [Handoff](https://github.com/polvalente/handoff) - native BEAM distribution, resource-aware scheduling |
| **High-throughput stream processing** | [Broadway](https://github.com/dashbitco/broadway), [GenStage](https://github.com/elixir-lang/gen_stage) |
| **Simple background jobs** | [Oban](https://github.com/sorentwo/oban) directly |
| **Ad-hoc parallel computation** | [Task.async_stream](https://hexdocs.pm/elixir/Task.html), [Flow](https://github.com/dashbitco/flow) |
| **ML model training/inference** | [Nx](https://github.com/elixir-nx/nx) + distributed compute layer |
### FlowStone vs Handoff
| Aspect | FlowStone | Handoff |
|--------|-----------|---------|
| **Primary abstraction** | Assets (data artifacts) | Functions (computation) |
| **Execution model** | Database-mediated (Oban) | Native BEAM RPC |
| **Durability** | Jobs persist to PostgreSQL | Ephemeral (ETS) |
| **Best for** | Data pipelines, ETL, audit trails | Distributed compute, GPU scheduling |
| **Dependencies** | Ecto, Oban, PostgreSQL | Zero production deps |
**Rule of thumb:** If your data needs to survive a restart and you care about lineage, use FlowStone. If you need fast ephemeral computation across nodes with resource awareness, use Handoff.
See `docs/20251215/flowstone-vs-handoff-comparison.md` for detailed analysis.
## Status
FlowStone is in **alpha**. Core execution, persistence primitives, and safety hardening are implemented, but some higher-level platform features (e.g., a bundled UI, fully resumable approvals, richer DSL surface) are still evolving.
## Installation
Add to your `mix.exs`:
```elixir
def deps do
[
{:flowstone, "~> 0.3.0"}
]
end
```
## Quick Start
Define a pipeline:
```elixir
defmodule MyApp.Pipeline do
use FlowStone.Pipeline
asset :raw do
execute fn _context, _deps -> {:ok, load_from_source()} end
end
asset :cleaned do
depends_on [:raw]
execute fn _context, %{raw: raw} -> {:ok, clean(raw)} end
end
asset :report do
depends_on [:cleaned]
execute fn _context, %{cleaned: data} -> {:ok, generate_report(data)} end
end
end
```
Register and materialize:
```elixir
# Start required services
{:ok, _} = FlowStone.Registry.start_link(name: MyRegistry)
{:ok, _} = FlowStone.IO.Memory.start_link(name: MyMemory)
# Register pipeline assets
FlowStone.register(MyApp.Pipeline, registry: MyRegistry)
# Configure I/O (in-memory for dev/test)
io = [config: %{agent: MyMemory}]
# Materialize a single asset for a partition
FlowStone.materialize(:report,
partition: ~D[2025-01-15],
registry: MyRegistry,
io: io
)
# Or materialize with all dependencies
FlowStone.materialize_all(:report,
partition: ~D[2025-01-15],
registry: MyRegistry,
io: io
)
```
Backfill across partitions:
```elixir
FlowStone.backfill(:report,
partitions: Date.range(~D[2025-01-01], ~D[2025-01-31]),
registry: MyRegistry,
io: io,
max_parallel: 4
)
```
## Core Concepts
### Assets
Assets are named data artifacts. Each asset declares:
- **Dependencies** - other assets it consumes
- **Execute function** - computation that produces the asset's value
- **Partition** - temporal or dimensional key (dates, tuples, custom)
```elixir
asset :daily_sales do
depends_on [:raw_transactions, :product_catalog]
execute fn context, deps ->
sales = compute_sales(deps.raw_transactions, deps.product_catalog)
{:ok, sales}
end
end
```
### Partitions
FlowStone supports partition-aware execution for time-series and dimensional data:
```elixir
# Date partitions
FlowStone.materialize(:report, partition: ~D[2025-01-15])
# DateTime partitions
FlowStone.materialize(:hourly_stats, partition: ~U[2025-01-15 14:00:00Z])
# Tuple partitions (multi-dimensional)
FlowStone.materialize(:regional_report, partition: {~D[2025-01-15], "us-east"})
```
### I/O Managers
Pluggable storage backends for asset data:
| Manager | Use Case |
|---------|----------|
| `FlowStone.IO.Memory` | Development, testing |
| `FlowStone.IO.Postgres` | Structured data, small-medium payloads |
| `FlowStone.IO.S3` | Large files, data lake integration |
| `FlowStone.IO.Parquet` | Columnar analytics data |
### Execution Model
- **Oban running:** `materialize/2` enqueues a durable job, returns `{:ok, %Oban.Job{}}`
- **Oban not running:** `materialize/2` executes synchronously, returns `:ok` or `{:error, reason}`
This allows the same code to work in dev (synchronous) and production (queued).
## Features
### Lineage Tracking
Query what data produced what:
```elixir
# What did :report consume?
FlowStone.Lineage.upstream(:report, ~D[2025-01-15])
# => [%{asset: :cleaned, partition: "2025-01-15"}]
# What depends on :raw?
FlowStone.Lineage.downstream(:raw, ~D[2025-01-15])
# => [%{asset: :cleaned, partition: "2025-01-15"}]
```
### Approval Gates
Pause execution for human review:
```elixir
asset :high_value_trade do
execute fn context, deps ->
trade = compute_trade(deps)
if trade.value > 1_000_000 do
{:wait_for_approval, message: "Large trade requires sign-off", context: trade}
else
{:ok, trade}
end
end
end
```
### Scheduling
Cron-based scheduling for recurring materializations:
```elixir
FlowStone.schedule(:daily_report,
cron: "0 6 * * *", # 6 AM daily
partition_fn: fn -> Date.utc_today() end
)
```
### Scatter (Dynamic Fan-Out)
Runtime-discovered parallel execution within FlowStone's asset-centric model:
```elixir
asset :scraped_article do
depends_on [:source_urls]
# Scatter function: transforms dependency data into scatter keys
scatter fn %{source_urls: urls} ->
Enum.map(urls, &%{url: &1})
end
scatter_options do
max_concurrent 50
rate_limit {10, :second}
failure_threshold 0.02
end
execute fn ctx, _deps ->
Scraper.fetch(ctx.scatter_key.url)
end
end
```
### Signal Gate (Durable External Suspension)
Zero-resource waiting for external signals (webhooks, callbacks):
```elixir
asset :embedded_documents do
execute fn ctx, deps ->
task_id = ECS.start_task(deps.data)
{:signal_gate, token: task_id, timeout: :timer.hours(1)}
end
on_signal fn _ctx, payload ->
{:ok, payload.result}
end
on_timeout fn ctx ->
{:error, :timeout}
end
end
```
### Rate Limiting
Distributed rate limiting for APIs and external services:
```elixir
# Check rate limit
case FlowStone.RateLimiter.check("api:openai", {60, :minute}) do
:ok -> call_api()
{:wait, ms} -> {:snooze, div(ms, 1000) + 1}
end
# Semaphore-based concurrency control
FlowStone.RateLimiter.with_slot("expensive:operation", 10, fn ->
expensive_operation()
end)
```
### Telemetry
FlowStone emits telemetry events for observability:
- `[:flowstone, :materialization, :start | :stop | :exception]`
- `[:flowstone, :scatter, :start | :complete | :failed | :instance_complete | :instance_fail]`
- `[:flowstone, :signal_gate, :create | :signal | :timeout]`
- `[:flowstone, :rate_limit, :check | :wait | :slot_acquired | :slot_released]`
## Documentation
- **Design overview:** `docs/design/OVERVIEW.md`
- **Architecture decisions:** `docs/adr/README.md`
- **Comparison with Handoff:** `docs/20251215/flowstone-vs-handoff-comparison.md`
- **Rebuild analysis:** `docs/20251215/rebuild-analysis.md`
## Contributing
Contributions are welcome. Please read the ADRs first to understand current decisions and constraints.
## License
MIT