# aion_flow
Typed Gleam SDK for authoring durable Aion workflows. Use it to define workflow entry points, declare typed activities, receive signals, expose read-only queries, use deterministic timers/time/randomness, and test workflow code in Gleam.
Workflow code should be deterministic: do not read wall clocks or ambient entropy directly. Use `aion/workflow.now`, `aion/workflow.random`, `aion/workflow.random_int`, and timer primitives so replay sees the same command stream.
## Use in this repository
The package is named `aion_flow` and targets Erlang/BEAM. Repository examples use it as a local path dependency rather than relying on a published package:
```toml
[dependencies]
aion_flow = { path = "../../gleam/aion_flow" }
```
Public modules:
- `aion/workflow` — workflow definitions, activity dispatch, deterministic time/randomness, timers, and child workflows.
- `aion/activity` — typed activity invocation values plus retry, timeout, and heartbeat configuration.
- `aion/signal` — typed signal references and in-engine send/receive helpers.
- `aion/query` — typed read-only query handlers and dispatch helpers.
- `aion/codec` — typed payload codecs for values crossing engine boundaries.
- `aion/duration`, `aion/error`, `aion/child`, and `aion/testing` — supporting durations, errors, child handles, and the pure Gleam harness.
## Define a workflow
A workflow definition names an entry function and carries codecs for input, output, and workflow errors:
```gleam
import aion/workflow
pub fn definition() {
workflow.define(
"hello_world",
request_codec(),
greeting_codec(),
workflow_error_codec(),
run,
)
}
pub fn run(input: Request) -> Result(Greeting, String) {
// Durable workflow logic goes here.
Ok(Greeting(message: "Hello, " <> input.name <> "!"))
}
```
`workflow.define(name, input_codec, output_codec, error_codec, entry_fn)` returns a typed `WorkflowDefinition(input, output, workflow_error)` that package tooling and tests can inspect with `workflow.name`, `workflow.input_codec`, `workflow.output_codec`, `workflow.error_codec`, and `workflow.entry_fn`.
## Declare and call activities
Activities are typed values. `activity.new` stores the activity name, typed input, codecs, and local runner shape; `workflow.run` records the dispatch and returns `Result(output, error.ActivityError)`.
```gleam
import aion/activity
import aion/error
import aion/workflow
fn greet(name: String) -> activity.Activity(String, String) {
activity.new(
"greet",
name,
string_codec(),
string_codec(),
fn(value) { Ok("Hello, " <> value <> "!") },
)
}
pub fn run(input: Request) -> Result(String, String) {
case workflow.run(greet(input.name)) {
Ok(message) -> Ok(message)
Error(error.Retryable(message:, details: _)) -> Error(message)
Error(error.Terminal(message:, details: _)) -> Error(message)
Error(_) -> Error("activity failed")
}
}
```
An activity created with `activity.new` has no hidden retry, timeout, or heartbeat defaults. Add policies explicitly with `activity.retry`, `activity.timeout`, and `activity.heartbeat` when a workflow needs them.
For homogeneous fanout, use `workflow.all`, `workflow.race`, or `workflow.map` over activity values.
## Codecs
All workflow, activity, signal, and query payloads cross engine boundaries through `aion/codec.Codec(a)`:
```gleam
import aion/codec
import gleam/dynamic/decode
import gleam/json
fn string_codec() -> codec.Codec(String) {
codec.json_codec(json.string, decode.string)
}
```
For custom records, provide a JSON encoder and a `gleam/dynamic/decode.Decoder` for the same shape.
## Signals
Signals are named, typed messages delivered to a running workflow. Construct a `SignalRef(payload)` once and receive it from workflow code:
```gleam
import aion/signal
import aion/workflow
fn approval_signal() -> signal.SignalRef(Bool) {
signal.new("approval", bool_codec())
}
pub fn wait_for_approval() -> Result(Bool, String) {
case workflow.receive(approval_signal()) {
Ok(approved) -> Ok(approved)
Error(_) -> Error("approval signal failed")
}
}
```
`signal.send(workflow_id, reference, payload)` is available for callers already inside the engine/Gleam-client boundary. Network-facing callers should use the client SDKs.
## Timers, deterministic time, and timeouts
Use workflow primitives instead of wall-clock functions:
```gleam
import aion/duration
import aion/error
import aion/workflow
import gleam/result
pub fn pause_then_read_time() -> Result(workflow.Timestamp, error.EngineError) {
use _ <- result.try(workflow.sleep(duration.minutes(5)))
workflow.now()
}
```
The timer API also includes `workflow.start_timer`, `workflow.cancel_timer`, `workflow.timer_id`, and `workflow.with_timeout`.
## Queries
Queries are read-only and record no workflow events. A handler returns a typed value through a codec; by convention it must not dispatch activities or mutate workflow state.
```gleam
import aion/error
import aion/query
pub fn register_state_query(
current_status: fn() -> String,
) -> Result(Nil, error.QueryError) {
query.handler("state", string_codec(), current_status)
}
```
`query.dispatch(name, value_codec)` is provided for callers inside the engine boundary and for the Gleam test harness.
## Minimal example
```gleam
import aion/activity
import aion/codec
import aion/duration
import aion/error
import aion/signal
import aion/workflow
import gleam/dynamic/decode
import gleam/json
import gleam/result
type Request {
Request(name: String)
}
fn string_codec() -> codec.Codec(String) {
codec.json_codec(json.string, decode.string)
}
fn request_codec() -> codec.Codec(Request) {
codec.json_codec(request_to_json, request_decoder())
}
fn request_to_json(request: Request) -> json.Json {
json.object([#("name", json.string(request.name))])
}
fn request_decoder() -> decode.Decoder(Request) {
use name <- decode.field("name", decode.string)
decode.success(Request(name: name))
}
fn greet_activity(name: String) -> activity.Activity(String, String) {
activity.new("greet", name, string_codec(), string_codec(), fn(name) {
Ok("Hello, " <> name <> "!")
})
}
fn bool_codec() -> codec.Codec(Bool) {
codec.json_codec(json.bool, decode.bool)
}
fn approval_signal() -> signal.SignalRef(Bool) {
signal.new("approval", bool_codec())
}
pub fn definition() {
workflow.define("hello_world", request_codec(), string_codec(), string_codec(), run)
}
pub fn run(input: Request) -> Result(String, String) {
use greeting <- result.try(
case workflow.run(greet_activity(input.name)) {
Ok(value) -> Ok(value)
Error(_) -> Error("activity failed")
},
)
use _ <- result.try(
case workflow.sleep(duration.seconds(1)) {
Ok(value) -> Ok(value)
Error(_) -> Error("timer failed")
},
)
use approved <- result.try(
case workflow.receive(approval_signal()) {
Ok(value) -> Ok(value)
Error(_) -> Error("signal failed")
},
)
case approved {
True -> Ok(greeting)
False -> Error("not approved")
}
}
```
For a complete end-to-end sample that builds a Gleam workflow, packages it as `.aion`, starts `aion-server`, runs a Python worker, and starts a workflow over HTTP, see [`../../examples/hello-world/README.md`](../../examples/hello-world/README.md).