# LangEx
**LangGraph for Elixir.** A graph-based agent orchestration library for building stateful, multi-step LLM workflows with nodes, edges, conditional routing, state reducers, human-in-the-loop interrupts, and checkpointing (Redis / Postgres). Inspired by [LangGraph](https://www.langchain.com/langgraph), built on BEAM primitives.
## Features
- **StateGraph builder** - declarative graph construction with `add_node`, `add_edge`, `add_conditional_edges`, `add_sequence`
- **Pregel execution engine** - super-step processing with parallel node execution via `Task.Supervisor`
- **State reducers** - per-key merge functions (append lists, sum values, or custom logic)
- **Command routing** - combine state updates and control flow in a single return value
- **Checkpointing** - persist execution state to Redis (default) or PostgreSQL for pause/resume
- **Interrupts** - human-in-the-loop: pause graph execution, wait for external input, resume
- **Streaming** - lazy `Stream` of execution events (node start/end, step boundaries, done)
- **Runtime context** - inject dependencies into nodes without baking them into closures
- **Subgraphs** - use a compiled graph as a node inside a parent graph
- **Send fan-out** - dynamic map-reduce patterns with `%Send{}` from conditional edges
- **Managed values** - `remaining_steps` automatically injected and tracked per super-step
- **ChatModels registry** - auto-resolve model strings (`"gpt-4o"`, `"claude-sonnet-4-20250514"`) to provider modules
- **LLM adapters** - built-in OpenAI and Anthropic, extensible via `LangEx.LLM` behaviour
- **MessagesState** - pre-built schema with `messages` key and `add_messages` reducer
> **Want to try it hands-on?** The [Support Triage Router](examples/support_triage/) example wires up a full graph with conditional routing, Gemini LLM calls, human-in-the-loop interrupts, and Postgres checkpointing.
## Installation
Add `lang_ex` to your dependencies in `mix.exs`:
```elixir
def deps do
[
{:lang_ex, "~> 0.1.0"},
# Optional: for Redis checkpointer (connection starts automatically when present)
{:redix, "~> 1.5"},
# Optional: for PostgreSQL checkpointer (requires Ecto migration, see below)
{:postgrex, "~> 0.19"},
{:ecto_sql, "~> 3.12"}
]
end
```
The core library (`req`, `jason`) has no checkpointer dependencies. Add only
the ones you need:
| Checkpointer | Required deps |
|---|---|
| `LangEx.Checkpointer.Redis` | `redix` |
| `LangEx.Checkpointer.Postgres` | `postgrex` + `ecto_sql` |
| None (in-memory only) | — |
When `redix` is present, a named Redix connection (`LangEx.Redix`) starts
automatically under `LangEx.Supervisor`. Without it, the connection is simply
skipped.
## Quick Start
```elixir
alias LangEx.Graph
alias LangEx.Message
graph =
Graph.new(messages: {[], &Message.add_messages/2}, intent: nil)
|> Graph.add_node(:classify, fn state ->
content = List.last(state.messages).content
intent = if String.contains?(content, "weather"), do: "weather", else: "greeting"
%{intent: intent}
end)
|> Graph.add_node(:weather, fn _state -> %{messages: [Message.ai("It's sunny today!")]} end)
|> Graph.add_node(:greet, fn _state -> %{messages: [Message.ai("Hello there!")]} end)
|> Graph.add_edge(:__start__, :classify)
|> Graph.add_conditional_edges(:classify, &Map.get(&1, :intent), %{
"weather" => :weather,
"greeting" => :greet
})
|> Graph.add_edge(:weather, :__end__)
|> Graph.add_edge(:greet, :__end__)
|> Graph.compile()
{:ok, result} = LangEx.invoke(graph, %{messages: [Message.human("What's the weather?")]})
# => %{intent: "weather", messages: [%Message.Human{...}, %Message.AI{content: "It's sunny today!"}]}
```
## Configuration
API keys are resolved in order: explicit opts > Application config > environment variables.
```elixir
# Option 1: Environment variables (recommended for production)
# OPENAI_API_KEY=sk-...
# ANTHROPIC_API_KEY=sk-ant-...
# Option 2: Application config
config :lang_ex, :openai, api_key: "sk-..."
config :lang_ex, :anthropic, api_key: "sk-ant-..."
# Option 3: Explicit opts per call
ChatModel.node(model: "gpt-4o", api_key: "sk-...")
```
### Custom Providers
Register custom providers via application config and runtime registration:
```elixir
# config/config.exs
config :lang_ex, :providers,
groq: %{env_key: "GROQ_API_KEY", default_model: "llama-3.3-70b"}
# At runtime
LangEx.ChatModels.register_provider(:groq, MyApp.LLM.Groq)
LangEx.ChatModels.register_prefix("llama-", :groq)
```
## ChatModels Registry
Model strings are auto-resolved to provider modules:
```elixir
# Auto-resolved from model string prefix
Graph.add_node(:llm, LangEx.ChatModel.node(model: "gpt-4o"))
Graph.add_node(:llm, LangEx.ChatModel.node(model: "claude-sonnet-4-20250514"))
# Explicit provider
Graph.add_node(:llm, LangEx.ChatModel.node(provider: LangEx.LLM.OpenAI, model: "gpt-4o"))
# Programmatic resolution
{LangEx.LLM.OpenAI, opts} = LangEx.ChatModels.init_chat_model("gpt-4o", temperature: 0.3)
```
## Checkpointing
Checkpointing persists graph execution state after each super-step, enabling pause/resume, fault recovery, and time-travel debugging. Each checkpoint captures the full state, pending next nodes, step counter, and any pending interrupts.
A checkpointer is **required** for interrupts (human-in-the-loop) since state must survive the pause between invocations.
Pass a checkpointer module when compiling a graph and a `thread_id` at invocation time:
```elixir
graph = Graph.new(...) |> ... |> Graph.compile(checkpointer: LangEx.Checkpointer.Redis)
{:ok, result} = LangEx.invoke(graph, input, config: [thread_id: "my-thread"])
```
Both built-in adapters implement the `LangEx.Checkpointer` behaviour:
```elixir
@callback save(config(), Checkpoint.t()) :: :ok | {:error, term()}
@callback load(config()) :: {:ok, Checkpoint.t()} | :none
@callback list(config(), keyword()) :: [Checkpoint.t()]
```
### Redis
Requires the optional `redix` dependency. When `redix` is included, a named
Redix connection starts automatically under `LangEx.Supervisor`.
```elixir
graph =
Graph.new(value: 0)
|> Graph.add_node(:inc, fn state -> %{value: state.value + 1} end)
|> Graph.add_edge(:__start__, :inc)
|> Graph.add_edge(:inc, :__end__)
|> Graph.compile(checkpointer: LangEx.Checkpointer.Redis)
{:ok, result} = LangEx.invoke(graph, %{value: 0}, config: [thread_id: "my-thread"])
```
**Key layout:** Checkpoints are stored as JSON under `lang_ex:cp:{thread_id}:{checkpoint_id}`. A sorted set `lang_ex:thread:{thread_id}` indexes checkpoint IDs by timestamp for ordered retrieval.
**TTL support:** Expire old checkpoints automatically by passing a TTL (in seconds) in the config:
```elixir
config = [thread_id: "t1", ttl: 3600]
```
**Custom Redix connection:** Override the default connection name with the `:conn` config key:
```elixir
config = [thread_id: "t1", conn: MyApp.Redix]
```
**Redis URL configuration:**
```elixir
# config/config.exs
config :lang_ex, redis_url: "redis://localhost:6379"
```
### PostgreSQL
Requires the optional `postgrex` and `ecto_sql` dependencies. The adapter
stores checkpoints in a `lang_ex_checkpoints` table with JSONB columns for
state and metadata.
**1. Generate and run the migration** (Oban-style versioned migrations):
```bash
mix ecto.gen.migration add_lang_ex
```
```elixir
defmodule MyApp.Repo.Migrations.AddLangEx do
use Ecto.Migration
def up, do: LangEx.Migration.up()
def down, do: LangEx.Migration.down()
end
```
```bash
mix ecto.migrate
```
**2. Use the Postgres checkpointer:**
```elixir
graph = Graph.new(...) |> ... |> Graph.compile(checkpointer: LangEx.Checkpointer.Postgres)
{:ok, result} = LangEx.invoke(graph, input, config: [repo: MyApp.Repo, thread_id: "t1"])
```
**Schema prefix support:** Isolate LangEx tables in a separate PostgreSQL schema:
```elixir
# In migration
def up, do: LangEx.Migration.up(prefix: "private")
def down, do: LangEx.Migration.down(prefix: "private")
```
**Versioned upgrades:** When upgrading LangEx, generate a new migration targeting the next version:
```elixir
defmodule MyApp.Repo.Migrations.UpgradeLangExToV2 do
use Ecto.Migration
def up, do: LangEx.Migration.up(version: 2)
def down, do: LangEx.Migration.down(version: 2)
end
```
### Choosing an Adapter
| | Redis | PostgreSQL |
|---|---|---|
| **Setup** | Add `redix` dep (auto-starts) | Add `ecto_sql` dep + migration |
| **Best for** | Fast iteration, ephemeral workflows | Durable state, transactional guarantees |
| **Dependencies** | `redix` (optional) | `postgrex` + `ecto_sql` (optional) |
| **TTL / expiry** | Built-in via config | Manage manually or with DB policies |
| **Schema isolation** | Key prefix (`lang_ex:`) | PostgreSQL schema prefix |
## Interrupts (Human-in-the-Loop)
Interrupts let you pause graph execution at any node, surface a payload to the caller, and resume later with a human-provided value. This is the core mechanism for human-in-the-loop workflows like approvals, reviews, and manual overrides.
### How It Works
1. A node calls `LangEx.Interrupt.interrupt(payload)` during execution.
2. The Pregel engine catches the interrupt, saves a checkpoint with `pending_interrupts`, and returns `{:interrupt, payload, state}` to the caller.
3. The caller presents the payload to a human (UI, Slack, email, etc.).
4. When the human responds, the caller resumes the graph by invoking it with `%LangEx.Types.Command{resume: value}` and the same `thread_id`.
5. On resume, the checkpointer loads the saved state, `interrupt/1` returns the resume value instead of throwing, and execution continues from where it left off.
> **Checkpointer required.** Interrupts depend on checkpointing to persist state across the pause. Always compile with a checkpointer when using interrupts.
### Basic Example
```elixir
graph =
Graph.new(value: 0, approved: false)
|> Graph.add_node(:check, fn state ->
approval = LangEx.Interrupt.interrupt("Approve value #{state.value}?")
%{approved: approval}
end)
|> Graph.add_node(:finalize, fn state -> %{value: state.value * 10} end)
|> Graph.add_edge(:__start__, :check)
|> Graph.add_edge(:check, :finalize)
|> Graph.add_edge(:finalize, :__end__)
|> Graph.compile(checkpointer: LangEx.Checkpointer.Redis)
# First invocation pauses at the interrupt
{:interrupt, "Approve value 42?", _state} =
LangEx.invoke(graph, %{value: 42}, config: [thread_id: "approval-1"])
# Resume with the human's decision
{:ok, result} =
LangEx.invoke(graph, %LangEx.Types.Command{resume: true}, config: [thread_id: "approval-1"])
# => %{value: 420, approved: true}
```
### Interrupts with Postgres (Durable Pause/Resume)
For workflows where the pause may last hours or days (e.g. manager approval), use the Postgres checkpointer so state survives application restarts:
```elixir
graph =
Graph.new(ticket: nil, approved: false)
|> Graph.add_node(:draft, fn state ->
%{ticket: "Escalation: #{state.ticket}"}
end)
|> Graph.add_node(:approve, fn state ->
decision = LangEx.Interrupt.interrupt(state.ticket)
%{approved: decision}
end)
|> Graph.add_node(:finalize, fn state -> state end)
|> Graph.add_edge(:__start__, :draft)
|> Graph.add_edge(:draft, :approve)
|> Graph.add_edge(:approve, :finalize)
|> Graph.add_edge(:finalize, :__end__)
|> Graph.compile(checkpointer: LangEx.Checkpointer.Postgres)
config = [repo: MyApp.Repo, thread_id: "escalation-#{ticket_id}"]
# Pauses at :approve, state is saved to Postgres
{:interrupt, ticket_text, _state} = LangEx.invoke(graph, %{ticket: "Server down"}, config: config)
# Hours later, after human review, state is loaded from Postgres
{:ok, result} = LangEx.invoke(graph, %LangEx.Types.Command{resume: true}, config: config)
```
### Conditional Interrupts
Not every path through the graph needs to interrupt. Use normal control flow to decide whether to pause:
```elixir
Graph.add_node(:maybe_approve, fn state ->
if state.needs_approval do
approved = LangEx.Interrupt.interrupt("Please review: #{state.summary}")
%{approved: approved}
else
%{approved: true}
end
end)
```
Paths that don't hit `interrupt/1` complete in a single invocation as usual.
## Streaming
Get a lazy stream of execution events:
```elixir
graph
|> LangEx.stream(%{value: 0})
|> Enum.each(fn
{:node_start, name} -> IO.puts("Starting #{name}...")
{:node_end, name, _update} -> IO.puts("Finished #{name}")
{:step_end, step, state} -> IO.inspect(state, label: "Step #{step}")
{:done, {:ok, result}} -> IO.inspect(result, label: "Final")
_ -> :ok
end)
```
## Runtime Context
Inject dependencies into nodes without closures:
```elixir
graph =
Graph.new(greeting: "")
|> Graph.add_node(:greet, fn state, context ->
%{greeting: "Hello from #{context.provider}!"}
end)
|> Graph.add_edge(:__start__, :greet)
|> Graph.add_edge(:greet, :__end__)
|> Graph.compile()
{:ok, result} = LangEx.invoke(graph, %{}, context: %{provider: "OpenAI"})
```
## Subgraphs
Use a compiled graph as a node:
```elixir
inner =
Graph.new(value: 0)
|> Graph.add_node(:double, fn state -> %{value: state.value * 2} end)
|> Graph.add_edge(:__start__, :double)
|> Graph.add_edge(:double, :__end__)
|> Graph.compile()
outer =
Graph.new(value: 0, label: "")
|> Graph.add_node(:sub, inner)
|> Graph.add_node(:tag, fn _state -> %{label: "done"} end)
|> Graph.add_edge(:__start__, :sub)
|> Graph.add_edge(:sub, :tag)
|> Graph.add_edge(:tag, :__end__)
|> Graph.compile()
{:ok, %{value: 14, label: "done"}} = LangEx.invoke(outer, %{value: 7})
```
## Docker Compose (Development)
Start Redis and PostgreSQL for local development:
```bash
cd lang_ex
docker-compose up -d
```
```yaml
# docker-compose.yml
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
postgres:
image: postgres:16-alpine
environment:
POSTGRES_USER: lang_ex
POSTGRES_PASSWORD: lang_ex
POSTGRES_DB: lang_ex_dev
ports:
- "5432:5432"
```
## Extending LangEx
### Custom LLM Provider
Implement the `LangEx.LLM` behaviour and register:
```elixir
defmodule MyApp.LLM.Groq do
@behaviour LangEx.LLM
@impl true
def chat(messages, opts) do
# Your API call here
{:ok, LangEx.Message.ai("response")}
end
end
# Register at application startup
LangEx.ChatModels.register_provider(:groq, MyApp.LLM.Groq)
LangEx.ChatModels.register_prefix("llama-", :groq)
```
### Custom Checkpointer
Implement the `LangEx.Checkpointer` behaviour:
```elixir
defmodule MyApp.Checkpointer.S3 do
@behaviour LangEx.Checkpointer
@impl true
def save(config, checkpoint), do: # ...
@impl true
def load(config), do: # ...
@impl true
def list(config, opts \\ []), do: # ...
end
```
## License
MIT