<p align="center">
<img src="assets/slither.svg" alt="Slither" width="200"/>
</p>
<p align="center">
<strong>Bounded, observable BEAM<->Python pipelines with shared state on ETS</strong>
</p>
<p align="center">
<a href="https://hex.pm/packages/slither"><img src="https://img.shields.io/hexpm/v/slither.svg?style=flat-square&color=8b1e3f" alt="Hex.pm"/></a>
<a href="https://hexdocs.pm/slither"><img src="https://img.shields.io/badge/docs-hexdocs-4cb8a4.svg?style=flat-square" alt="HexDocs"/></a>
<a href="https://opensource.org/licenses/MIT"><img src="https://img.shields.io/badge/license-MIT-c9a227.svg?style=flat-square" alt="License"/></a>
</p>
Slither is a low-level concurrency substrate for BEAM + Python systems, built on top of SnakeBridge and Snakepit.
It gives you three primitives:
- `Store`: ETS-backed shared state (lock-free reads, serialized writes), exposed to Python via views.
- `Dispatch`: batched fan-out to Python workers with bounded in-flight work (`max_in_flight`).
- `Pipe`: a DSL to compose BEAM stages, Python stages, and routing in one supervised flow.
## Why use Slither
Use Slither when you want Python to keep doing compute, while BEAM owns concurrency, state, and coordination.
Common wins:
- no unbounded queue growth (backpressure)
- no shared-thread state races in Python workers
- run-scoped session affinity for Python state
- consistent telemetry across runs, stages, and batches
## Installation
```elixir
defp deps do
[
{:slither, "~> 0.1.0"}
]
end
```
Then:
```bash
mix deps.get
mix compile
```
## Quick Start
### 1) Configure a Python worker pool
```elixir
# config/runtime.exs
import Config
SnakeBridge.ConfigHelper.configure_snakepit!(pool_size: 2)
```
### 2) Define a pipeline
```elixir
defmodule MyApp.ScorePipe do
use Slither.Pipe
pipe :score do
stage :prepare, :beam,
handler: fn item, _ctx ->
%{"text" => item.payload}
end
stage :predict, :python,
executor: Slither.Dispatch.Executors.SnakeBridge,
module: "my_model",
function: "predict_batch",
pool: :default,
batch_size: 32,
max_in_flight: 4
stage :route, :router,
routes: [
{fn item -> item.payload["score"] >= 0.8 end, :accept},
{fn item -> item.payload["score"] >= 0.4 end, :review}
]
output :accept
output :review
output :default
on_error :predict, :skip
on_error :*, :halt
end
end
```
### 3) Run it
```elixir
{:ok, outputs} = Slither.run_pipe(MyApp.ScorePipe, ["first", "second", "third"])
accepted_payloads = Enum.map(outputs.accept, & &1.payload)
review_payloads = Enum.map(outputs.review, & &1.payload)
```
## Run the built-in demos
```bash
mix slither.example
mix slither.example text_analysis
mix slither.example batch_stats
mix slither.example data_etl
mix slither.example ml_scoring
mix slither.example image_pipeline
mix slither.example --all
mix slither.example --no-baseline
```
`ml_scoring` and `image_pipeline` auto-install Python deps via Snakepit/uv.
## Core APIs
### Pipe (orchestration)
- `Slither.Pipe.Runner.run/3`: run a pipe and return output buckets.
- `Slither.Pipe.Runner.stream/3`: stream outputs lazily.
- `Slither.run_pipe/3`: convenience wrapper.
### Dispatch (batched Python calls)
```elixir
items = Slither.Item.wrap_many([1, 2, 3, 4])
{:ok, results} =
Slither.dispatch(items,
executor: Slither.Dispatch.Executors.SnakeBridge,
module: "my_model",
function: "predict_batch",
pool: :default,
batch_size: 64,
max_in_flight: 8,
ordering: :preserve,
on_error: :halt
)
```
For large workloads, use `Slither.Dispatch.stream/2`.
### Store (shared state on ETS)
```elixir
defmodule MyApp.FeatureStore do
@behaviour Slither.Store
@impl true
def tables do
[
%{name: :features, type: :set, read_concurrency: true}
]
end
@impl true
def views do
[
%{
name: :lookup_feature,
mode: :scalar,
scope: :session,
handler: fn %{"key" => key}, _ctx ->
case Slither.Store.Server.get(__MODULE__, :features, key) do
nil -> %{"error" => "not_found"}
value -> %{"value" => value}
end
end,
timeout_ms: 5_000
}
]
end
@impl true
def load(tables) do
:ets.insert(tables[:features], {"user:42", %{tier: "gold"}})
:ok
end
end
```
Start store processes by listing modules in config:
```elixir
config :slither,
stores: [MyApp.FeatureStore]
```
Read/write API:
```elixir
Slither.Store.Server.get(MyApp.FeatureStore, :features, "user:42")
Slither.Store.Server.put(MyApp.FeatureStore, :features, "user:99", %{tier: "silver"})
```
## Configuration knobs
```elixir
config :slither,
stores: [],
dispatch: [
default_batch_size: 64,
default_max_in_flight: 8,
default_ordering: :preserve,
default_on_error: :halt
],
bridge: [
default_scope: :session
]
```
## Telemetry events
Slither emits under `[:slither, ...]`.
- dispatch: `[:slither, :dispatch, :batch, :start|:stop|:exception]`
- pipe run: `[:slither, :pipe, :run, :start|:stop|:exception]`
- pipe stage: `[:slither, :pipe, :stage, :start|:stop|:exception]`
- store: `[:slither, :store, :write]`, `[:slither, :store, :reload, :start|:stop]`
- bridge view callbacks: `[:slither, :bridge, :view, :start|:stop]`
## Troubleshooting
- Python module not found:
run examples through `mix slither.example ...` so `PYTHONPATH` is set.
- SnakeBridge/Snakepit calls failing:
verify runtime pool config in `config/runtime.exs`.
- Optional package import errors:
run `mix slither.example ml_scoring` or `mix slither.example image_pipeline` to auto-install deps.
## Guides
- `guides/getting-started.md`
- `guides/store.md`
- `guides/dispatch.md`
- `guides/pipe.md`
- `guides/examples.md`
- `guides/operations.md`
## License
MIT