# Glyn ✨
[](https://hex.pm/packages/glyn)
[](https://hexdocs.pm/glyn/)
**Type-safe PubSub and Registry for Gleam actors with distributed clustering support.**
Built on the Erlang [syn](https://github.com/ostinelli/syn) library.
Glyn provides two complementary systems for actor communication:
- **PubSub**: Broadcast events to multiple subscribers
- **Registry**: Direct command routing to named processes
## Installation
```sh
gleam add glyn
```
## Quick Example: PubSub
```gleam
import glyn
import glyn/pubsub
import gleam/erlang/process
// Define your event type
pub type OrderEvent {
OrderCreated(id: String, amount: Int)
OrderShipped(id: String, tracking: String)
OrderCancelled(id: String, reason: String)
}
// Create a MessageType for compile-time safety
pub const order_event_type: glyn.MessageType(OrderEvent) = glyn.MessageType("OrderEvent_v1")
pub fn main() {
// Create PubSub system
let pubsub = pubsub.new(scope: "orders", message_type: order_event_type)
// Subscribe to events
let subscription = pubsub.subscribe(pubsub, "processing", process.self())
// Publish events (returns number of subscribers reached)
let reached = pubsub.publish(pubsub, "processing", OrderCreated("ORDER123", 99))
// reached == 1
}
```
## Quick Example: Registry
```gleam
import glyn
import glyn/registry
import gleam/erlang/process.{type Subject}
// Define your command type
pub type UserCommand {
GetProfile(reply_with: Subject(String))
UpdateEmail(email: String, reply_with: Subject(Bool))
}
// Create a MessageType for compile-time safety
pub const user_command_type: glyn.MessageType(UserCommand) = glyn.MessageType("UserCommand_v1")
pub fn main() {
// Create Registry system
let registry = registry.new(scope: "users", message_type: user_command_type)
// Register a process under a name
let user_subject = process.new_subject()
let assert Ok(_registration) = registry.register(registry, "user_123", user_subject, "active")
// Send commands to registered processes
let reply_subject = process.new_subject()
let assert Ok(_) = registry.send(registry, "user_123", GetProfile(reply_subject))
// Or use call for request/reply pattern
let assert Ok(profile) = registry.call(registry, "user_123", waiting: 1000, sending: GetProfile)
}
```
## Integration Pattern: Actor with Both PubSub and Registry
The real power of Glyn comes from combining both systems in a single Gleam actor:
```gleam
import glyn
import glyn/pubsub
import glyn/registry
import gleam/erlang/process.{type Subject}
import gleam/otp/actor
// Separate command and event types for clean separation of concerns
pub type Command {
ProcessOrder(id: String, reply_with: Subject(Bool))
GetStatus(reply_with: Subject(String))
}
pub type Event {
OrderCreated(id: String, amount: Int)
PaymentProcessed(id: String)
SystemAlert(message: String)
}
// Actor message type that composes both
pub type ProcessorMessage {
CommandMessage(Command)
PubSubMessage(Event)
}
// MessageTypes for each system
pub const command_type: glyn.MessageType(Command) = glyn.MessageType("Command_v1")
pub const event_type: glyn.MessageType(Event) = glyn.MessageType("Event_v1")
pub fn start_order_processor() {
// Create both systems
let event_pubsub = pubsub.new(scope: "events", message_type: event_type)
let command_registry = registry.new(scope: "commands", message_type: command_type)
actor.new_with_initialiser(5000, fn(subject) {
// Subscribe to events
let event_subscription = pubsub.subscribe(event_pubsub, "orders", process.self())
// Create command subject and register it
let command_subject = process.new_subject()
let assert Ok(_) = registry.register(command_registry, "order_processor", command_subject, "v1.0")
// Compose messages using selector
let selector =
process.new_selector()
|> process.select(subject)
|> process.select_map(command_subject, CommandMessage)
|> process.select_map(event_subscription.subject, PubSubMessage)
// Return initialized actor
actor.initialised(ProcessorState(status: "ready", processed: 0))
|> actor.selecting(selector)
|> actor.returning(subject)
|> Ok
})
|> actor.on_message(handle_processor_message)
|> actor.start()
}
fn handle_processor_message(state, message) {
case message {
CommandMessage(ProcessOrder(id, reply_with)) -> {
// Handle direct command
process.send(reply_with, True)
actor.continue(ProcessorState(..state, processed: state.processed + 1))
}
CommandMessage(GetStatus(reply_with)) -> {
// Return status
process.send(reply_with, state.status)
actor.continue(state)
}
PubSubMessage(OrderCreated(id, amount)) -> {
// React to broadcasted event
actor.continue(ProcessorState(..state, status: "Processing order " <> id))
}
PubSubMessage(SystemAlert(message)) -> {
// Handle system-wide alerts
actor.continue(ProcessorState(..state, status: "Alert: " <> message))
}
}
}
// Usage:
pub fn example_usage() {
let assert Ok(processor) = start_order_processor()
// Send commands via Registry
let assert Ok(status) = registry.call(command_registry, "order_processor", waiting: 1000, sending: GetStatus)
let assert Ok(_) = registry.send(command_registry, "order_processor", ProcessOrder("ORDER456", reply_subject))
// Broadcast events via PubSub
let reached = pubsub.publish(event_pubsub, "orders", OrderCreated("ORDER789", 150))
}
```
## Type Safety with MessageType
Glyn enforces type safety at both compile-time and runtime using `MessageType`:
```gleam
// Define MessageTypes with version strings for evolution
pub const chat_type: glyn.MessageType(ChatMessage) = glyn.MessageType("ChatMessage_v1")
pub const metric_type: glyn.MessageType(MetricEvent) = glyn.MessageType("MetricEvent_v1")
// Different MessageTypes are completely isolated
let chat_pubsub = pubsub.new(scope: "app", message_type: chat_type)
let metric_pubsub = pubsub.new(scope: "app", message_type: metric_type)
let chat_subscription = pubsub.subscribe(chat_pubsub, "general", process.self())
// This publishes to metric subscribers only - won't reach chat subscribers
let reached = pubsub.publish(metric_pubsub, "general", CounterIncrement("requests", 1))
// reached == 0 (no chat subscribers receive metric events)
```
## Distributed Clustering
Both PubSub and Registry work across Erlang distributed clusters:
```gleam
// Node A - Start subscriber and register service
let pubsub = pubsub.new(scope: "global_events", message_type: order_event_type)
let registry = registry.new(scope: "global_services", message_type: command_type)
let subscription = pubsub.subscribe(pubsub, "orders", process.self())
let assert Ok(_) = registry.register(registry, "order_service", command_subject, "v1.0")
// Node B - Publish event and call service (same scope and MessageType)
let pubsub = pubsub.new(scope: "global_events", message_type: order_event_type)
let registry = registry.new(scope: "global_services", message_type: command_type)
// Event published on Node B reaches subscriber on Node A
let reached = pubsub.publish(pubsub, "orders", OrderCreated("GLOBAL123", 200))
// Command sent to service registered on Node A
let assert Ok(result) = registry.call(registry, "order_service", waiting: 5000, sending: ProcessOrder("GLOBAL123"))
```
## API Reference
### PubSub Functions
```gleam
import glyn/pubsub
// Create PubSub system
pubsub.new(scope: String, message_type: MessageType(message)) -> PubSub(message)
// Subscribe to events
pubsub.subscribe(pubsub: PubSub(message), group: String, pid: Pid) -> Subscription(message, String)
// Publish events (returns subscriber count)
pubsub.publish(pubsub: PubSub(message), group: String, message: message) -> Int
// Get current subscribers
pubsub.subscribers(pubsub: PubSub(message), group: String) -> List(Pid)
// Unsubscribe
pubsub.unsubscribe(subscription: Subscription(message, group)) -> Nil
```
### Registry Functions
```gleam
import glyn/registry
// Create Registry system
registry.new(scope: String, message_type: MessageType(message)) -> Registry(message, metadata)
// Register process under name
registry.register(registry: Registry(message, metadata), name: String, subject: Subject(message), metadata: metadata) -> Result(Registration, String)
// Look up registered process
registry.lookup(registry: Registry(message, metadata), name: String) -> Result(#(Subject(message), metadata), String)
// Send message (fire and forget)
registry.send(registry: Registry(message, metadata), name: String, message: message) -> Result(Nil, String)
// Call and wait for reply
registry.call(registry: Registry(message, metadata), name: String, waiting: Int, sending: fn(Subject(reply)) -> message) -> Result(reply, String)
// Unregister process
registry.unregister(registration: Registration(message, metadata)) -> Result(Nil, String)
```
## Development
```sh
gleam test # Run tests
gleam docs build # Build documentation
```
## License
This project is licensed under the MIT License.