# distribute
<p align="center">
<img src="assets/img/distribute.png" alt="distribute logo" width="256" height="256" />
</p>
[](https://hex.pm/packages/distribute)
[](https://hexdocs.pm/distribute/)
**Distribute** brings the full power of Erlang's distributed computing to Gleam, with a **type-safe** and **gleam_otp-integrated** API.
While Gleam runs on the BEAM, accessing distributed primitives (like connecting nodes, global registration, or RPC) traditionally required dealing with untyped atoms and unsafe Erlang terms. **Distribute v2.0** solves this by providing:
✅ **Type-safe messaging** using binary codecs (`Encoder(a)`, `Decoder(a)`) and `gleam/erlang/process.Subject(BitArray)`
✅ **Full gleam_otp integration** — integrates with actors and selectors
✅ **Explicit error handling** — all operations return typed `Result` values
✅ **Composable codecs** — built-in support for primitives, Option, Result, tuples, and custom types
✅ **Production-ready** — comprehensive error handling, deprecated legacy APIs
> **Note:** Use the `_typed` variants of all functions (e.g., `send_global_typed`, `broadcast_typed`) for full type safety. Legacy untyped functions are deprecated and will be removed in v3.0.
## Features
### Core Distributed Primitives
- **Node Management** — Start distributed nodes, connect to peers, and list connected nodes with proper error handling.
- **Global Registry** — Register and lookup processes across the cluster using a type-safe wrapper around `:global`.
- **Cross-node Messaging** — Send messages to remote processes or globally registered names transparently.
- **Process Groups** — Join/leave groups and broadcast messages to all members (wrapper around `:pg`).
- **Remote Monitoring** — Monitor processes and nodes for failure detection across the network.
- **RPC** — Perform Remote Procedure Calls to any Erlang/Gleam module with timeout control.
### Type-Safe API (v2.0)
- **Binary Codec System** — Encoder/Decoder types for compile-time safe serialization
- **Envelope Protocol** — Tag + version validation for protocol mismatch detection (see `distribute/codec.wrap_envelope` and `distribute/codec.unwrap_envelope`)
- **Typed Messaging** — `send_typed`, `call_typed`, `broadcast_typed` with explicit errors
- **Receiver Helpers** — Convenient `receive_typed` integration with gleam/erlang/process
- **gleam_otp Compatible** — Use standard `Subject(BitArray)` from gleam/erlang/process
### Advanced Features
- **SWIM-like Membership** — A built-in background service for failure detection, gossip, and cluster membership.
- **Raft-lite Election** — A lightweight leader election implementation for simple coordination needs.
## Installation
```toml
# gleam.toml
[dependencies]
gleam_stdlib = ">= 0.43.0"
gleam_erlang = ">= 0.5.0"
gleam_otp = ">= 0.1.0"
distribute = "~> 2.0"
```
```sh
gleam add distribute
```
## Documentation
- **Main Documentation**: https://hexdocs.pm/distribute/
- **Examples**:
- [General Examples](https://hexdocs.pm/distribute/examples.html)
- [Two Nodes Setup](https://hexdocs.pm/distribute/examples-two-nodes.html)
- [Two Nodes App](https://hexdocs.pm/distribute/examples-two-nodes-app.html)
## Quick Start (Type-Safe API)
### 1. Start a Distributed Node
```gleam
import distribute/node_builder
import gleam/io
pub fn main() {
// Start this node with builder pattern
let assert Ok(_) = node_builder.new()
|> node_builder.with_name("app@127.0.0.1")
|> node_builder.with_cookie("secret")
|> node_builder.start()
io.println("Node started!")
}
```
### 2. Connect to Another Node
```gleam
import distribute/cluster
import gleam/io
import gleam/string
pub fn connect_peer() {
case cluster.connect("other@127.0.0.1") {
Ok(_) -> io.println("Connected to peer!")
Error(err) -> io.println("Connection failed: " <> string.inspect(err))
}
}
```
### 3. Register and Lookup Processes Globally (Type-Safe)
```gleam
import distribute/codec
import distribute/global
import distribute/messaging
import distribute/receiver
import distribute/registry
import gleam/erlang/process
pub fn register_and_lookup() {
// Create a type-safe global subject with encoder/decoder
let encoder = codec.string_encoder()
let decoder = codec.string_decoder()
let global = global.new(encoder, decoder)
// Register it globally
let _ = registry.register_typed("my_service", global.subject(global))
// From another node/process: look up the service type-safely
let assert Ok(remote_service) = registry.whereis_global(
"my_service",
encoder,
decoder
)
// Send a typed message through GlobalSubject
let _ = global.send(remote_service, "Hello, world!")
// Or use messaging API directly
let _ = messaging.send_global_typed(
"my_service",
"Hello, world!",
encoder
)
// Receive typed messages
let assert Ok(msg) = receiver.receive_typed(
global.subject(global),
decoder,
1000
)
}
```
### 4. Process Groups (Type-Safe)
```gleam
import distribute/codec
import distribute/codec/builder
import distribute/global
import distribute/groups
pub type Task {
Task(name: String, id: Int)
}
pub fn use_groups() {
// Build a codec for Task
let #(task_encoder, task_decoder) = builder.custom2(
codec.string_encoder(),
codec.int_encoder(),
codec.string_decoder(),
codec.int_decoder(),
Task,
fn(t) { #(t.name, t.id) },
)
// Create a global subject for group communication
let global = global.new(task_encoder, task_decoder)
// Join a group named "workers"
let _ = groups.join_typed("workers", global.subject(global))
// Broadcast a typed message to all members
let _ = global.send(global, Task("process_data", 42))
// Get members list
let members = groups.members_typed("workers")
}
```
### 5. SWIM Membership Service
```gleam
import distribute/cluster/membership
pub fn use_membership() {
// Start the background membership service (probe every 500ms)
membership.start_service(500)
// Get a list of alive nodes
let alive = membership.alive()
// Get nodes with full status details
let nodes = membership.members_with_status()
// Returns: [#("node@host", Alive, 0), ...]
// Get current leader (lexicographically largest alive node)
let leader = membership.current_leader()
// Stop the service when done
membership.stop_service()
}
```
### 6. Leader Election (Raft-lite)
```gleam
import distribute/election/raft_lite
import gleam/io
pub fn elect_leader() {
case raft_lite.elect() {
raft_lite.Leader(name) -> io.println("Current Leader: " <> name)
raft_lite.NoLeader -> io.println("No leader available yet")
}
}
```
### 7. Remote Procedure Calls
```gleam
import distribute/remote_call
import gleam/io
import gleam/string
pub fn call_remote() {
// Call erlang:node() on a remote node
case remote_call.call("other@host", "erlang", "node", [], []) {
Ok(result) -> io.println("Remote node name: " <> string.inspect(result))
Error(remote_call.RpcBadRpc(reason)) -> io.println("RPC failed: " <> reason)
Error(_) -> io.println("RPC error")
}
}
```
## Module Reference
| Module | Description |
|--------|-------------|
| `distribute/cluster` | Node management: start, connect, ping, list nodes |
| `distribute/registry` | Global process registration with type-safe subjects |
| `distribute/messaging` | Type-safe cross-node messaging with codecs |
| `distribute/groups` | Process groups with type-safe broadcast |
| `distribute/monitor` | Monitor processes and nodes |
| `distribute/remote_call` | Type-safe RPC to remote nodes |
| `distribute/codec` | Binary encoding/decoding for primitives and composite types |
| `distribute/codec/builder` | Helpers for building custom type codecs |
| `distribute/codec/tagged` | Tag and version validation for protocol safety |
| `distribute/global` | Type-safe global subjects with integrated codecs |
| `distribute/receiver` | Type-safe message receiving with codecs |
| `distribute/sugar` | Convenience helpers for common patterns |
| `distribute/actor` | Simple actor wrappers |
| `distribute/cluster/membership` | SWIM-like membership with gossip and failure detection |
| `distribute/cluster/gossip` | Gossip protocol for membership state propagation |
| `distribute/cluster/health` | Health checks for nodes and cluster |
| `distribute/election/raft_lite` | Lightweight leader election with term-based voting |
> **Note**: Legacy untyped functions (`send_global`, `broadcast`, `call`) are deprecated and will be removed in v3.0. Use the `_typed` variants with codecs for full type safety.
## Running Tests
```sh
gleam test
```
## Integration Tests
You can run a multi-node SWIM integration test using the provided script:
```sh
./examples/two_nodes/swim_integration.sh
```
This script starts 3 local Erlang nodes, runs the membership service, and verifies gossip convergence.
> **Note:** The legacy wrapper `examples/two_nodes/run_swim_integration.sh` has been removed — run `examples/two_nodes/swim_integration.sh` directly.
> **Note**: Scripts under `examples/` are **manual integration demos**. They rely on local node naming (`-sname` + `hostname`) and are not intended to be a CI hard requirement.
## Architecture
```
src/
├── distribute.gleam # Top-level module
├── distribute/
│ ├── cluster.gleam # Node management
│ ├── registry.gleam # Global registry
│ ├── messaging.gleam # Cross-node messaging
│ ├── groups.gleam # Process groups
│ ├── monitor.gleam # Process/node monitoring
│ ├── remote_call.gleam # RPC
│ ├── cluster/
│ │ ├── membership.gleam # SWIM-like membership
│ │ ├── gossip.gleam # Gossip protocol
│ │ └── health.gleam # Health checks
│ └── election/
│ └── raft_lite.gleam # Leader election
└── *_ffi.erl # Erlang FFI files (in src root)
```
## Design Philosophy
1. **No magic** — Everything is explicit.
2. **Zero custom runtime** — Only wraps standard BEAM features.
3. **Small but complete APIs** — Just what you need for clustering.
4. **Type-safe** — Gleam's type system prevents common errors.
5. **Compatible with gleam/otp** — Works alongside standard OTP patterns.
## Safety
- **Atom creation**: Some module APIs (FFI) accept string inputs that are converted to Erlang atoms. Creating atoms dynamically from arbitrary input can exhaust the BEAM atom table (which is not garbage-collected). Use caution when passing untrusted input to functions that convert strings to atoms. Where possible, use existing atoms or map inputs to a constrained set of known atoms.
The library makes a best-effort attempt to avoid creating new atoms where possible (attempting `binary_to_existing_atom` first), but code paths that require atoms (e.g., `global:register_name`, `pg:join`) may still create atoms in some cases. Prefer using stable, pre-defined names or registering via a controlled mapping to avoid growing the atom table.
## Settings
This library exposes a small `settings` API which controls behaviour that impacts safety and logging.
- `settings.set_allow_atom_creation(allow: Bool)` — default: `False`.
- If `False`, FFI calls will attempt `binary_to_existing_atom` and *will not* create a new atom if it doesn't exist, returning an error instead. This prevents uncontrolled atom creation from untrusted inputs.
- If `True`, the library may create atoms when necessary for compatibility (e.g., RPC and monitor fallback).
- `settings.set_use_crypto_ids(use_crypto: Bool)` — default: `False`.
- If `True`, `log.generate_correlation_id()` will prefer crypto-derived IDs for better randomness/uniqueness. If `False`, a monotonic/time-based fallback is used.
Example:
```gleam
import distribute/settings
pub fn main() {
// Secure default — don't allow uncontrolled atom creation
settings.set_allow_atom_creation(False)
// Prefer crypto-based correlation ids
settings.set_use_crypto_ids(True)
}
```
## Logging & Correlation IDs
This library provides a structured logging helper `log` with metadata and correlation ids.
- Set logging backend:
- `log.set_backend("console")` (default)
- `log.set_backend("erlang_logger")`
- Generate correlation ids:
- `let id = log.generate_correlation_id()`
- Use `log.info_with_correlation`, `log.error_with_correlation`, etc. to attach `id` to logs and `metadata`.
Example:
```gleam
import distribute/log
import distribute/settings
pub fn main() {
settings.set_use_crypto_ids(True)
log.set_backend("erlang_logger")
let id = log.generate_correlation_id()
log.info_with_correlation("Starting operation", [#("user","alice")], id)
}
```
## RPC Timeouts & Error Handling
Remote calls use a default timeout of 5000ms. Use `remote_call.call_with_timeout` to customise the timeout for long-running calls.
Example with a custom timeout:
```gleam
import distribute/remote_call
import gleam/io
import gleam/string
pub fn call_remote_with_timeout() {
// Call erlang:node() on a remote node with a 10s timeout
case remote_call.call_with_timeout("other@host", "erlang", "node", [], [], 10_000) {
Ok(result) -> io.println("Remote node name: " <> string.inspect(result))
Error(remote_call.RpcBadRpc(reason)) -> io.println("RPC failed: " <> reason)
Error(_) -> io.println("RPC error")
}
}
```
## Documentation
- Main documentation: https://hexdocs.pm/distribute/
- Examples overview: [examples/README.md](examples/README.md)
## Examples
- Multi-node shell scripts live in `examples/two_nodes/`.
- A tiny example project wrapper lives in `examples/two_nodes_app/`.
## Contributing
Contributions welcome! Please open an issue or PR on GitHub.
## License
This project is licensed under the MIT License - see the [LICENSE](LICENSE) file for details.