README.md

# claude-sdk-gleam

A Gleam SDK for the Anthropic Messages API with a built-in agentic tool-use loop, leveraging BEAM concurrency for parallel tool execution and fault isolation.

[![Package Version](https://img.shields.io/hexpm/v/claude)](https://hex.pm/packages/claude)
[![Hex Docs](https://img.shields.io/badge/hex-docs-ffaff3)](https://hexdocs.pm/claude/)

## What this is

`claude_sdk` provides a typed Gleam interface to the Anthropic Claude API. Beyond simple message sending, it includes a full agent loop that automatically handles multi-turn tool-use conversations: send a prompt, let the model call tools, execute those tools concurrently on the BEAM, feed results back, and repeat until the model produces a final response.

Key features:

- Type-safe API for the Anthropic Messages endpoint
- Automatic agent loop with configurable iteration limits
- Concurrent tool execution -- each tool call runs in its own BEAM process
- Event streaming via callbacks or OTP actor message passing
- SSE streaming support for the Messages API
- Extended thinking support
- Builder-pattern configuration

## Requirements

- **Gleam** >= 1.0
- **Erlang/OTP** >= 27 (required by `gleam_json` v3)
- An **Anthropic API key** (set as `ANTHROPIC_API_KEY` or passed directly)

## Installation

Add `claude` to your Gleam project:

```sh
gleam add claude
```

## Quick start

```gleam
import claude
import gleam/io

const weather_schema = "{
  \"type\": \"object\",
  \"properties\": {
    \"location\": {
      \"type\": \"string\",
      \"description\": \"The city and state, e.g. San Francisco, CA\"
    }
  },
  \"required\": [\"location\"]
}"

pub fn main() {
  // Create a client from the ANTHROPIC_API_KEY environment variable
  let assert Ok(client) = claude.from_env()

  // Define a tool
  let weather_tool =
    claude.tool("get_weather", "Get the current weather for a location", weather_schema)

  // Handle tool calls
  let handler = fn(name, _input) {
    case name {
      "get_weather" ->
        Ok("{\"temperature\": 72, \"condition\": \"sunny\"}")
      _ -> Error("Unknown tool: " <> name)
    }
  }

  // Run the agent loop
  case claude.run(client, "What's the weather in San Francisco?", [weather_tool], handler) {
    Ok(result) -> io.println(claude.result_text(result))
    Error(_err) -> io.println("Agent error")
  }
}
```

## Core concepts

### Client configuration

Create a client with an API key directly or from the environment:

```gleam
// Direct API key
let client = claude.new("sk-ant-...")

// From ANTHROPIC_API_KEY environment variable
let assert Ok(client) = claude.from_env()
```

The client defaults to:
- Base URL: `https://api.anthropic.com`
- Model: `claude-sonnet-4-5-20250929`
- Max tokens: `4096`

Override defaults with the builder functions on `client.Config`:

```gleam
import claude/client

let client =
  claude.new("sk-ant-...")
  |> client.with_model("claude-opus-4-5-20250929")
  |> client.with_max_tokens(8192)
  |> client.with_base_url("https://my-proxy.example.com")
```

### Defining tools

Tools are defined with a name, description, and a JSON Schema string for the input parameters:

```gleam
let tool = claude.tool(
  "get_weather",
  "Get the current weather for a location",
  "{\"type\": \"object\", \"properties\": {\"location\": {\"type\": \"string\"}}, \"required\": [\"location\"]}",
)
```

The `input_schema` is a raw JSON string that conforms to JSON Schema. The model uses this schema to understand what arguments to provide.

### Tool handlers

A tool handler is a function with the signature:

```gleam
fn(String, String) -> Result(String, String)
```

The first argument is the tool name, the second is the JSON string of input arguments. Return `Ok(json_string)` on success or `Error(error_message)` on failure. The agent loop dispatches all tool calls through a single handler function:

```gleam
let handler = fn(name, input) {
  case name {
    "get_weather" -> Ok("{\"temperature\": 72}")
    "calculator" -> handle_calculator(input)
    _ -> Error("Unknown tool: " <> name)
  }
}
```

### Running the agent loop

The primary entry point is `claude.run`:

```gleam
pub fn run(
  client: client.Config,
  prompt: String,
  tools: List(tool.Tool),
  handler: ToolHandler,
) -> Result(AgentResult, AgentError)
```

`AgentResult` contains:
- `final_message` -- the model's last `Message` response
- `messages` -- the full conversation history as `List(MessageParam)`
- `iterations` -- how many API round-trips were made
- `total_input_tokens` / `total_output_tokens` -- cumulative token usage

`AgentError` is one of:
- `ApiCallFailed(ApiError)` -- an API request failed
- `MaxIterationsReached(messages, iterations)` -- the iteration limit was hit

### What happens under the hood

1. The prompt is sent to the Messages API
2. The SDK checks the response's `stop_reason`
3. If `stop_reason` is `tool_use`, it extracts all `ToolUse` content blocks
4. Each tool call is executed concurrently in its own BEAM process
5. Tool results are collected (with per-tool timeout) and assembled into a `tool_result` message
6. The results are appended to the conversation history and sent back to the API
7. Steps 2--6 repeat until the model stops with `end_turn`, `max_tokens`, or another non-tool-use reason, or until `max_iterations` is reached

## Advanced usage

### Custom AgentConfig with builder pattern

For full control, build an `AgentConfig` and use `claude.run_with_config`:

```gleam
import claude
import claude/agent/config

let assert Ok(client) = claude.from_env()

let tools = [claude.tool("get_weather", "Get weather", schema)]

let cfg =
  config.new(client: client, tools: tools, tool_handler: handler)
  |> config.with_system("You are a helpful weather assistant.")
  |> config.with_model("claude-opus-4-5-20250929")
  |> config.with_max_tokens(2048)
  |> config.with_max_iterations(5)
  |> config.with_thinking(10_000)
  |> config.with_tool_timeout(60_000)
  |> config.with_tool_choice(tool.Auto(disable_parallel: False))

case claude.run_with_config(cfg, "How's the weather in Tokyo?") {
  Ok(result) -> io.println(claude.result_text(result))
  Error(_) -> io.println("Error")
}
```

Available config builder functions:

| Function | Default | Description |
|---|---|---|
| `with_system(String)` | `None` | System prompt |
| `with_model(String)` | Client default | Model ID |
| `with_max_tokens(Int)` | Client default | Max output tokens per API call |
| `with_max_iterations(Int)` | `10` | Maximum agent loop iterations |
| `with_thinking(Int)` | `None` | Extended thinking token budget |
| `with_tool_timeout(Int)` | `30_000` | Per-tool execution timeout in ms |
| `with_tool_choice(ToolChoice)` | `None` (API default) | Tool selection strategy |

### Continuing conversations with `agent.run_with_messages`

To continue an existing conversation or provide pre-built message history:

```gleam
import claude/agent
import claude/agent/config
import claude/types/message
import claude/types/content

let cfg = config.new(client: client, tools: tools, tool_handler: handler)

// Build a message history manually
let messages = [
  message.new_user("My name is Alice."),
  message.new_assistant_blocks([content.TextParam(text: "Hello Alice!")]),
  message.new_user("What's my name?"),
]

case agent.run_with_messages(cfg, messages) {
  Ok(result) -> io.println(claude.result_text(result))
  Error(_) -> io.println("Error")
}
```

### Concurrent tool execution

When the model returns multiple tool calls in a single response, the SDK executes them concurrently. Each tool call runs in its own BEAM process via `tool_runner.execute_concurrent`. Results are collected with a per-tool timeout (default 30 seconds). If a tool exceeds the timeout, it returns `Error("Tool execution timed out")` for that tool while other results are unaffected.

This happens automatically -- no additional configuration is needed. The concurrency is a natural fit for the BEAM: each tool runs in a lightweight process with its own heap and fault isolation.

### Event streaming with `actor.run_with_events` and `actor.start`

The `claude/agent/actor` module provides two ways to observe agent progress in real time.

**Synchronous callback-based events:**

```gleam
import claude/agent/actor

actor.run_with_events(cfg, "What's 25 * 4?", fn(event) {
  case event {
    actor.Started(id) -> io.println("Started: " <> id)
    actor.AssistantResponse(_msg) -> io.println("Got response")
    actor.ToolExecuting(name, id) -> io.println("Executing: " <> name)
    actor.ToolCompleted(id, result) -> io.println("Tool done: " <> id)
    actor.Done(result) -> io.println("Agent finished")
    actor.Failed(error) -> io.println("Agent failed")
  }
})
```

**Asynchronous OTP actor pattern:**

The `actor.start` function spawns the agent in a separate BEAM process and sends events to a `Subject` that the caller can receive from:

```gleam
import gleam/erlang/process
import claude/agent/actor

let events = process.new_subject()
let _pid = actor.start(cfg, "Hello", events)

// Receive events from the agent's mailbox
case process.receive(events, 30_000) {
  Ok(actor.Done(result)) -> io.println(claude.result_text(result))
  Ok(actor.Failed(err)) -> io.println("Failed")
  _ -> io.println("Timeout or other event")
}
```

The `AgentEvent` type covers the full lifecycle:

| Event | Description |
|---|---|
| `Started(session_id)` | Agent loop began |
| `AssistantResponse(message)` | Model returned a response |
| `ToolExecuting(tool_name, tool_id)` | A tool call is about to run |
| `ToolCompleted(tool_id, result)` | A tool call finished |
| `Done(result)` | Agent completed successfully |
| `Failed(error)` | Agent encountered an error |

### Direct Messages API access

For one-shot messages without the agent loop:

```gleam
// Simple helper
case claude.message(client, "What is 2 + 2?") {
  Ok(msg) -> io.println(claude.text_content(msg))
  Error(_) -> io.println("API error")
}
```

For full control over the Messages API call:

```gleam
import claude/messages
import claude/types/message
import gleam/option.{None, Some}

case messages.create(
  config: client,
  model: Some("claude-opus-4-5-20250929"),
  max_tokens: Some(1024),
  messages: [message.new_user("Explain monads.")],
  tools: [],
  system: Some("You are a Haskell expert."),
  tool_choice: None,
  thinking: None,
) {
  Ok(msg) -> // msg is a Message
  Error(api_error) -> // api_error is an ApiError
}
```

### Streaming SSE events

The SDK supports streaming responses via `messages.create_stream`. Note that `gleam_httpc` buffers the full response, so events are parsed from the complete SSE payload after the request completes:

```gleam
import claude/messages
import claude/streaming.{TextDelta, ContentBlockDelta, MessageStop}
import gleam/option.{None}

case messages.create_stream(
  config: client,
  model: None,
  max_tokens: None,
  messages: [message.new_user("Tell me a story.")],
  tools: [],
  system: None,
  tool_choice: None,
  thinking: None,
) {
  Ok(events) ->
    list.each(events, fn(event) {
      case event {
        ContentBlockDelta(index: _, delta: TextDelta(text)) ->
          io.print(text)
        MessageStop -> io.println("\n[done]")
        _ -> Nil
      }
    })
  Error(_) -> io.println("Stream error")
}
```

`StreamEvent` variants: `MessageStart`, `ContentBlockStart`, `ContentBlockDelta`, `ContentBlockStop`, `MessageDelta`, `MessageStop`, `Ping`, `UnknownEvent`.

## API reference

### `claude` (top-level)

| Function | Description |
|---|---|
| `new(api_key) -> Config` | Create client with API key |
| `from_env() -> Result(Config, EnvError)` | Create client from `ANTHROPIC_API_KEY` |
| `tool(name, description, input_schema) -> Tool` | Define a tool |
| `run(client, prompt, tools, handler) -> Result(AgentResult, AgentError)` | Run the agent loop |
| `run_with_config(config, prompt) -> Result(AgentResult, AgentError)` | Run with custom config |
| `message(client, prompt) -> Result(Message, ApiError)` | Send a single message |
| `text_content(message) -> String` | Extract text from a Message |
| `result_text(result) -> String` | Extract text from an AgentResult |
| `version() -> String` | SDK version string |

### `claude/client`

| Function | Description |
|---|---|
| `new(api_key) -> Config` | Create client config |
| `with_model(config, model) -> Config` | Set default model |
| `with_base_url(config, url) -> Config` | Set API base URL |
| `with_max_tokens(config, n) -> Config` | Set default max tokens |

### `claude/agent`

| Function | Description |
|---|---|
| `run(config, prompt) -> Result(AgentResult, AgentError)` | Run agent loop |
| `run_with_messages(config, messages) -> Result(AgentResult, AgentError)` | Run with existing history |
| `extract_tool_calls(content) -> List(ContentBlock)` | Filter tool-use blocks |
| `build_tool_results_message(results) -> MessageParam` | Build tool results message |

### `claude/agent/config`

| Function | Description |
|---|---|
| `new(client, tools, tool_handler) -> AgentConfig` | Create agent config |
| `with_system(config, system) -> AgentConfig` | Set system prompt |
| `with_model(config, model) -> AgentConfig` | Set model |
| `with_max_tokens(config, n) -> AgentConfig` | Set max tokens |
| `with_max_iterations(config, n) -> AgentConfig` | Set iteration limit |
| `with_thinking(config, budget) -> AgentConfig` | Enable extended thinking |
| `with_tool_timeout(config, ms) -> AgentConfig` | Set tool timeout |
| `with_tool_choice(config, choice) -> AgentConfig` | Set tool choice strategy |

### `claude/agent/actor`

| Function | Description |
|---|---|
| `run_with_events(config, prompt, on_event) -> Result(AgentResult, AgentError)` | Synchronous with event callbacks |
| `start(config, prompt, caller) -> Pid` | Async agent in a new BEAM process |

### `claude/agent/tool_runner`

| Function | Description |
|---|---|
| `execute_concurrent(tool_calls, handler, timeout_ms) -> List(ToolResult)` | Run tools concurrently |

### `claude/messages`

| Function | Description |
|---|---|
| `create(...) -> Result(Message, ApiError)` | Send a Messages API request |
| `create_simple(config, message) -> Result(Message, ApiError)` | Simple one-shot message |
| `create_stream(...) -> Result(List(StreamEvent), ApiError)` | Streaming Messages API request |
| `build_request(...) -> Request(String)` | Build HTTP request without sending |
| `build_stream_request(...) -> Request(String)` | Build streaming HTTP request |

### `claude/streaming`

| Function | Description |
|---|---|
| `parse_sse(text) -> List(StreamEvent)` | Parse SSE payload into events |
| `parse_event(event_type, data) -> StreamEvent` | Parse a single SSE event |

### `claude/types/tool`

| Type | Description |
|---|---|
| `Tool(name, description, input_schema)` | Tool definition |
| `ToolChoice` | `Auto`, `Any`, `SpecificTool(name)`, `NoTools` -- each with `disable_parallel` flag |

### `claude/types/message`

| Type | Description |
|---|---|
| `Message` | Full API response message with content, usage, stop_reason |
| `MessageParam` | Message sent to the API (role + content) |
| `StopReason` | `EndTurn`, `ToolUseStop`, `MaxTokens`, `StopSequence`, `PauseTurn`, `Refusal` |
| `Usage` | Token counts (input, output, cache creation, cache read) |

### `claude/types/content`

| Type | Description |
|---|---|
| `ContentBlock` | Response content: `Text`, `Thinking`, `ToolUse`, `ServerToolUse`, `WebSearchResult` |
| `ContentBlockParam` | Request content: `TextParam`, `ImageParam`, `DocumentParam`, `ToolUseParam`, `ToolResultParam` |

### `claude/types/error`

| Type | Description |
|---|---|
| `ApiError` | `AuthenticationError`, `RateLimitError`, `BadRequestError`, `NotFoundError`, `ServerError`, `ConnectionError`, `TimeoutError`, `UnknownError` |

## Architecture

```
claude.gleam                    -- Top-level public API (facade)
claude/
  client.gleam                  -- Client config (API key, model, base URL)
  messages.gleam                -- HTTP layer: build and send Messages API requests
  streaming.gleam               -- SSE parser for streaming responses
  agent.gleam                   -- Core agent loop (recursive, synchronous)
  agent/
    config.gleam                -- AgentConfig type and builder functions
    tool_runner.gleam           -- Concurrent tool execution via BEAM processes
    actor.gleam                 -- Event-emitting agent loop + async OTP actor
  types/
    content.gleam               -- ContentBlock and ContentBlockParam types
    message.gleam               -- Message, MessageParam, Role, StopReason, Usage
    tool.gleam                  -- Tool and ToolChoice types
    error.gleam                 -- ApiError type and HTTP status mapping
  json/
    encode.gleam                -- JSON encoding for API request bodies
    decode.gleam                -- JSON decoding for API response bodies
```

The architecture takes advantage of the BEAM in three ways:

1. **Concurrent tool execution** -- When the model returns N tool calls, `tool_runner` spawns N lightweight BEAM processes that run in parallel. Results are collected with per-tool timeouts using `process.receive`. This is significantly faster than sequential execution for I/O-bound tools.

2. **Fault isolation** -- Each tool runs in its own process with its own heap. If a tool handler crashes or times out, it does not affect the agent loop or other tool executions. The agent simply receives a timeout error for that specific tool.

3. **Actor-based event streaming** -- `actor.start` spawns the entire agent loop in a new BEAM process and communicates results via `Subject` message passing, fitting naturally into OTP supervision trees and concurrent application architectures.

## Development

Build the project:

```sh
gleam build
```

Run tests:

```sh
gleam test
```

Format code:

```sh
gleam format
```

Run an example (add the example file to `src/` first):

```sh
ANTHROPIC_API_KEY=sk-ant-... gleam run -m weather_agent
```

## License

MIT