# claude-sdk-gleam
A Gleam SDK for the Anthropic Messages API with a built-in agentic tool-use loop, leveraging BEAM concurrency for parallel tool execution and fault isolation.
[](https://hex.pm/packages/claude)
[](https://hexdocs.pm/claude/)
## What this is
`claude_sdk` provides a typed Gleam interface to the Anthropic Claude API. Beyond simple message sending, it includes a full agent loop that automatically handles multi-turn tool-use conversations: send a prompt, let the model call tools, execute those tools concurrently on the BEAM, feed results back, and repeat until the model produces a final response.
Key features:
- Type-safe API for the Anthropic Messages endpoint
- Automatic agent loop with configurable iteration limits
- Concurrent tool execution -- each tool call runs in its own BEAM process
- Event streaming via callbacks or OTP actor message passing
- SSE streaming support for the Messages API
- Extended thinking support
- Builder-pattern configuration
## Requirements
- **Gleam** >= 1.0
- **Erlang/OTP** >= 27 (required by `gleam_json` v3)
- An **Anthropic API key** (set as `ANTHROPIC_API_KEY` or passed directly)
## Installation
Add `claude` to your Gleam project:
```sh
gleam add claude
```
## Quick start
```gleam
import claude
import gleam/io
const weather_schema = "{
\"type\": \"object\",
\"properties\": {
\"location\": {
\"type\": \"string\",
\"description\": \"The city and state, e.g. San Francisco, CA\"
}
},
\"required\": [\"location\"]
}"
pub fn main() {
// Create a client from the ANTHROPIC_API_KEY environment variable
let assert Ok(client) = claude.from_env()
// Define a tool
let weather_tool =
claude.tool("get_weather", "Get the current weather for a location", weather_schema)
// Handle tool calls
let handler = fn(name, _input) {
case name {
"get_weather" ->
Ok("{\"temperature\": 72, \"condition\": \"sunny\"}")
_ -> Error("Unknown tool: " <> name)
}
}
// Run the agent loop
case claude.run(client, "What's the weather in San Francisco?", [weather_tool], handler) {
Ok(result) -> io.println(claude.result_text(result))
Error(_err) -> io.println("Agent error")
}
}
```
## Core concepts
### Client configuration
Create a client with an API key directly or from the environment:
```gleam
// Direct API key
let client = claude.new("sk-ant-...")
// From ANTHROPIC_API_KEY environment variable
let assert Ok(client) = claude.from_env()
```
The client defaults to:
- Base URL: `https://api.anthropic.com`
- Model: `claude-sonnet-4-5-20250929`
- Max tokens: `4096`
Override defaults with the builder functions on `client.Config`:
```gleam
import claude/client
let client =
claude.new("sk-ant-...")
|> client.with_model("claude-opus-4-5-20250929")
|> client.with_max_tokens(8192)
|> client.with_base_url("https://my-proxy.example.com")
```
### Defining tools
Tools are defined with a name, description, and a JSON Schema string for the input parameters:
```gleam
let tool = claude.tool(
"get_weather",
"Get the current weather for a location",
"{\"type\": \"object\", \"properties\": {\"location\": {\"type\": \"string\"}}, \"required\": [\"location\"]}",
)
```
The `input_schema` is a raw JSON string that conforms to JSON Schema. The model uses this schema to understand what arguments to provide.
### Tool handlers
A tool handler is a function with the signature:
```gleam
fn(String, String) -> Result(String, String)
```
The first argument is the tool name, the second is the JSON string of input arguments. Return `Ok(json_string)` on success or `Error(error_message)` on failure. The agent loop dispatches all tool calls through a single handler function:
```gleam
let handler = fn(name, input) {
case name {
"get_weather" -> Ok("{\"temperature\": 72}")
"calculator" -> handle_calculator(input)
_ -> Error("Unknown tool: " <> name)
}
}
```
### Running the agent loop
The primary entry point is `claude.run`:
```gleam
pub fn run(
client: client.Config,
prompt: String,
tools: List(tool.Tool),
handler: ToolHandler,
) -> Result(AgentResult, AgentError)
```
`AgentResult` contains:
- `final_message` -- the model's last `Message` response
- `messages` -- the full conversation history as `List(MessageParam)`
- `iterations` -- how many API round-trips were made
- `total_input_tokens` / `total_output_tokens` -- cumulative token usage
`AgentError` is one of:
- `ApiCallFailed(ApiError)` -- an API request failed
- `MaxIterationsReached(messages, iterations)` -- the iteration limit was hit
### What happens under the hood
1. The prompt is sent to the Messages API
2. The SDK checks the response's `stop_reason`
3. If `stop_reason` is `tool_use`, it extracts all `ToolUse` content blocks
4. Each tool call is executed concurrently in its own BEAM process
5. Tool results are collected (with per-tool timeout) and assembled into a `tool_result` message
6. The results are appended to the conversation history and sent back to the API
7. Steps 2--6 repeat until the model stops with `end_turn`, `max_tokens`, or another non-tool-use reason, or until `max_iterations` is reached
## Advanced usage
### Custom AgentConfig with builder pattern
For full control, build an `AgentConfig` and use `claude.run_with_config`:
```gleam
import claude
import claude/agent/config
let assert Ok(client) = claude.from_env()
let tools = [claude.tool("get_weather", "Get weather", schema)]
let cfg =
config.new(client: client, tools: tools, tool_handler: handler)
|> config.with_system("You are a helpful weather assistant.")
|> config.with_model("claude-opus-4-5-20250929")
|> config.with_max_tokens(2048)
|> config.with_max_iterations(5)
|> config.with_thinking(10_000)
|> config.with_tool_timeout(60_000)
|> config.with_tool_choice(tool.Auto(disable_parallel: False))
case claude.run_with_config(cfg, "How's the weather in Tokyo?") {
Ok(result) -> io.println(claude.result_text(result))
Error(_) -> io.println("Error")
}
```
Available config builder functions:
| Function | Default | Description |
|---|---|---|
| `with_system(String)` | `None` | System prompt |
| `with_model(String)` | Client default | Model ID |
| `with_max_tokens(Int)` | Client default | Max output tokens per API call |
| `with_max_iterations(Int)` | `10` | Maximum agent loop iterations |
| `with_thinking(Int)` | `None` | Extended thinking token budget |
| `with_tool_timeout(Int)` | `30_000` | Per-tool execution timeout in ms |
| `with_tool_choice(ToolChoice)` | `None` (API default) | Tool selection strategy |
### Continuing conversations with `agent.run_with_messages`
To continue an existing conversation or provide pre-built message history:
```gleam
import claude/agent
import claude/agent/config
import claude/types/message
import claude/types/content
let cfg = config.new(client: client, tools: tools, tool_handler: handler)
// Build a message history manually
let messages = [
message.new_user("My name is Alice."),
message.new_assistant_blocks([content.TextParam(text: "Hello Alice!")]),
message.new_user("What's my name?"),
]
case agent.run_with_messages(cfg, messages) {
Ok(result) -> io.println(claude.result_text(result))
Error(_) -> io.println("Error")
}
```
### Concurrent tool execution
When the model returns multiple tool calls in a single response, the SDK executes them concurrently. Each tool call runs in its own BEAM process via `tool_runner.execute_concurrent`. Results are collected with a per-tool timeout (default 30 seconds). If a tool exceeds the timeout, it returns `Error("Tool execution timed out")` for that tool while other results are unaffected.
This happens automatically -- no additional configuration is needed. The concurrency is a natural fit for the BEAM: each tool runs in a lightweight process with its own heap and fault isolation.
### Event streaming with `actor.run_with_events` and `actor.start`
The `claude/agent/actor` module provides two ways to observe agent progress in real time.
**Synchronous callback-based events:**
```gleam
import claude/agent/actor
actor.run_with_events(cfg, "What's 25 * 4?", fn(event) {
case event {
actor.Started(id) -> io.println("Started: " <> id)
actor.AssistantResponse(_msg) -> io.println("Got response")
actor.ToolExecuting(name, id) -> io.println("Executing: " <> name)
actor.ToolCompleted(id, result) -> io.println("Tool done: " <> id)
actor.Done(result) -> io.println("Agent finished")
actor.Failed(error) -> io.println("Agent failed")
}
})
```
**Asynchronous OTP actor pattern:**
The `actor.start` function spawns the agent in a separate BEAM process and sends events to a `Subject` that the caller can receive from:
```gleam
import gleam/erlang/process
import claude/agent/actor
let events = process.new_subject()
let _pid = actor.start(cfg, "Hello", events)
// Receive events from the agent's mailbox
case process.receive(events, 30_000) {
Ok(actor.Done(result)) -> io.println(claude.result_text(result))
Ok(actor.Failed(err)) -> io.println("Failed")
_ -> io.println("Timeout or other event")
}
```
The `AgentEvent` type covers the full lifecycle:
| Event | Description |
|---|---|
| `Started(session_id)` | Agent loop began |
| `AssistantResponse(message)` | Model returned a response |
| `ToolExecuting(tool_name, tool_id)` | A tool call is about to run |
| `ToolCompleted(tool_id, result)` | A tool call finished |
| `Done(result)` | Agent completed successfully |
| `Failed(error)` | Agent encountered an error |
### Direct Messages API access
For one-shot messages without the agent loop:
```gleam
// Simple helper
case claude.message(client, "What is 2 + 2?") {
Ok(msg) -> io.println(claude.text_content(msg))
Error(_) -> io.println("API error")
}
```
For full control over the Messages API call:
```gleam
import claude/messages
import claude/types/message
import gleam/option.{None, Some}
case messages.create(
config: client,
model: Some("claude-opus-4-5-20250929"),
max_tokens: Some(1024),
messages: [message.new_user("Explain monads.")],
tools: [],
system: Some("You are a Haskell expert."),
tool_choice: None,
thinking: None,
) {
Ok(msg) -> // msg is a Message
Error(api_error) -> // api_error is an ApiError
}
```
### Streaming SSE events
The SDK supports streaming responses via `messages.create_stream`. Note that `gleam_httpc` buffers the full response, so events are parsed from the complete SSE payload after the request completes:
```gleam
import claude/messages
import claude/streaming.{TextDelta, ContentBlockDelta, MessageStop}
import gleam/option.{None}
case messages.create_stream(
config: client,
model: None,
max_tokens: None,
messages: [message.new_user("Tell me a story.")],
tools: [],
system: None,
tool_choice: None,
thinking: None,
) {
Ok(events) ->
list.each(events, fn(event) {
case event {
ContentBlockDelta(index: _, delta: TextDelta(text)) ->
io.print(text)
MessageStop -> io.println("\n[done]")
_ -> Nil
}
})
Error(_) -> io.println("Stream error")
}
```
`StreamEvent` variants: `MessageStart`, `ContentBlockStart`, `ContentBlockDelta`, `ContentBlockStop`, `MessageDelta`, `MessageStop`, `Ping`, `UnknownEvent`.
## API reference
### `claude` (top-level)
| Function | Description |
|---|---|
| `new(api_key) -> Config` | Create client with API key |
| `from_env() -> Result(Config, EnvError)` | Create client from `ANTHROPIC_API_KEY` |
| `tool(name, description, input_schema) -> Tool` | Define a tool |
| `run(client, prompt, tools, handler) -> Result(AgentResult, AgentError)` | Run the agent loop |
| `run_with_config(config, prompt) -> Result(AgentResult, AgentError)` | Run with custom config |
| `message(client, prompt) -> Result(Message, ApiError)` | Send a single message |
| `text_content(message) -> String` | Extract text from a Message |
| `result_text(result) -> String` | Extract text from an AgentResult |
| `version() -> String` | SDK version string |
### `claude/client`
| Function | Description |
|---|---|
| `new(api_key) -> Config` | Create client config |
| `with_model(config, model) -> Config` | Set default model |
| `with_base_url(config, url) -> Config` | Set API base URL |
| `with_max_tokens(config, n) -> Config` | Set default max tokens |
### `claude/agent`
| Function | Description |
|---|---|
| `run(config, prompt) -> Result(AgentResult, AgentError)` | Run agent loop |
| `run_with_messages(config, messages) -> Result(AgentResult, AgentError)` | Run with existing history |
| `extract_tool_calls(content) -> List(ContentBlock)` | Filter tool-use blocks |
| `build_tool_results_message(results) -> MessageParam` | Build tool results message |
### `claude/agent/config`
| Function | Description |
|---|---|
| `new(client, tools, tool_handler) -> AgentConfig` | Create agent config |
| `with_system(config, system) -> AgentConfig` | Set system prompt |
| `with_model(config, model) -> AgentConfig` | Set model |
| `with_max_tokens(config, n) -> AgentConfig` | Set max tokens |
| `with_max_iterations(config, n) -> AgentConfig` | Set iteration limit |
| `with_thinking(config, budget) -> AgentConfig` | Enable extended thinking |
| `with_tool_timeout(config, ms) -> AgentConfig` | Set tool timeout |
| `with_tool_choice(config, choice) -> AgentConfig` | Set tool choice strategy |
### `claude/agent/actor`
| Function | Description |
|---|---|
| `run_with_events(config, prompt, on_event) -> Result(AgentResult, AgentError)` | Synchronous with event callbacks |
| `start(config, prompt, caller) -> Pid` | Async agent in a new BEAM process |
### `claude/agent/tool_runner`
| Function | Description |
|---|---|
| `execute_concurrent(tool_calls, handler, timeout_ms) -> List(ToolResult)` | Run tools concurrently |
### `claude/messages`
| Function | Description |
|---|---|
| `create(...) -> Result(Message, ApiError)` | Send a Messages API request |
| `create_simple(config, message) -> Result(Message, ApiError)` | Simple one-shot message |
| `create_stream(...) -> Result(List(StreamEvent), ApiError)` | Streaming Messages API request |
| `build_request(...) -> Request(String)` | Build HTTP request without sending |
| `build_stream_request(...) -> Request(String)` | Build streaming HTTP request |
### `claude/streaming`
| Function | Description |
|---|---|
| `parse_sse(text) -> List(StreamEvent)` | Parse SSE payload into events |
| `parse_event(event_type, data) -> StreamEvent` | Parse a single SSE event |
### `claude/types/tool`
| Type | Description |
|---|---|
| `Tool(name, description, input_schema)` | Tool definition |
| `ToolChoice` | `Auto`, `Any`, `SpecificTool(name)`, `NoTools` -- each with `disable_parallel` flag |
### `claude/types/message`
| Type | Description |
|---|---|
| `Message` | Full API response message with content, usage, stop_reason |
| `MessageParam` | Message sent to the API (role + content) |
| `StopReason` | `EndTurn`, `ToolUseStop`, `MaxTokens`, `StopSequence`, `PauseTurn`, `Refusal` |
| `Usage` | Token counts (input, output, cache creation, cache read) |
### `claude/types/content`
| Type | Description |
|---|---|
| `ContentBlock` | Response content: `Text`, `Thinking`, `ToolUse`, `ServerToolUse`, `WebSearchResult` |
| `ContentBlockParam` | Request content: `TextParam`, `ImageParam`, `DocumentParam`, `ToolUseParam`, `ToolResultParam` |
### `claude/types/error`
| Type | Description |
|---|---|
| `ApiError` | `AuthenticationError`, `RateLimitError`, `BadRequestError`, `NotFoundError`, `ServerError`, `ConnectionError`, `TimeoutError`, `UnknownError` |
## Architecture
```
claude.gleam -- Top-level public API (facade)
claude/
client.gleam -- Client config (API key, model, base URL)
messages.gleam -- HTTP layer: build and send Messages API requests
streaming.gleam -- SSE parser for streaming responses
agent.gleam -- Core agent loop (recursive, synchronous)
agent/
config.gleam -- AgentConfig type and builder functions
tool_runner.gleam -- Concurrent tool execution via BEAM processes
actor.gleam -- Event-emitting agent loop + async OTP actor
types/
content.gleam -- ContentBlock and ContentBlockParam types
message.gleam -- Message, MessageParam, Role, StopReason, Usage
tool.gleam -- Tool and ToolChoice types
error.gleam -- ApiError type and HTTP status mapping
json/
encode.gleam -- JSON encoding for API request bodies
decode.gleam -- JSON decoding for API response bodies
```
The architecture takes advantage of the BEAM in three ways:
1. **Concurrent tool execution** -- When the model returns N tool calls, `tool_runner` spawns N lightweight BEAM processes that run in parallel. Results are collected with per-tool timeouts using `process.receive`. This is significantly faster than sequential execution for I/O-bound tools.
2. **Fault isolation** -- Each tool runs in its own process with its own heap. If a tool handler crashes or times out, it does not affect the agent loop or other tool executions. The agent simply receives a timeout error for that specific tool.
3. **Actor-based event streaming** -- `actor.start` spawns the entire agent loop in a new BEAM process and communicates results via `Subject` message passing, fitting naturally into OTP supervision trees and concurrent application architectures.
## Development
Build the project:
```sh
gleam build
```
Run tests:
```sh
gleam test
```
Format code:
```sh
gleam format
```
Run an example (add the example file to `src/` first):
```sh
ANTHROPIC_API_KEY=sk-ant-... gleam run -m weather_agent
```
## License
MIT