docs/guides/futures_and_async.md

# Futures and Async Operations

This guide covers asynchronous request handling in Tinkex using the `Future` module. Learn how to start polling operations, await results, handle queue states, and implement custom observers for production workloads.

## Overview

Tinkex follows an async-by-default design where long-running operations (sampling, training, checkpoint creation) return immediately with a request ID, then poll the server until completion. The `Future` module provides the client-side polling abstraction that:

- Returns `Task.t()` so you can integrate with your concurrency model
- Implements exponential backoff with configurable timeouts
- Emits telemetry events for queue state transitions **and** future errors/timeouts
- Supports custom observers via the `QueueStateObserver` behaviour

This approach decouples request initiation from result retrieval, enabling concurrent operations without blocking your application.

## How Futures Work

When you call a Tinkex API that creates a long-running request (e.g., `SamplingClient.sample/4` or `TrainingClient.forward_backward/3`), the server returns a request ID immediately. The client then polls `/api/v1/retrieve_future` until the request completes, fails, or times out.

**Server-side polling flow:**

1. Client calls `Future.poll/2` with request ID and config
2. Returns a `Task.t()` that repeatedly calls `retrieve_future`
3. Server responds with one of:
   - `%FutureCompletedResponse{}` → poll returns `{:ok, result}`
   - `%FuturePendingResponse{}` → sleep with exponential backoff, retry
   - `%FutureFailedResponse{}` → categorize error, retry or fail
   - `%TryAgainResponse{}` → emit queue state telemetry, sleep, retry
   - HTTP 410 (expired promise) → treated as retryable; recreate the request
4. Client awaits the task to get the final result

**Backoff strategy:**

- Initial backoff: 1 second
- Max backoff: 30 seconds
- Formula: `min(2^iteration * 1000ms, 30000ms)`
- `TryAgainResponse` may provide `retry_after_ms` to override

## Starting a Poll Operation

Use `Future.poll/2` to begin polling a server-side future. It returns a `Task.t()` immediately, allowing you to await the result later or run multiple polls concurrently.

```elixir
alias Tinkex.Future

# Poll a request by ID
task = Future.poll("req-abc123", config: config)

# Poll from a response map
response = %{"request_id" => "req-abc123", "status" => "pending"}
task = Future.poll(response, config: config)

# With custom timeouts
task = Future.poll("req-abc123",
  config: config,
  timeout: 60_000,        # overall polling deadline (60s)
  http_timeout: 5_000     # per-request HTTP timeout (5s)
)
```

**Options:**

- `:config` (required) — `Tinkex.Config.t()` with API credentials
- `:timeout` — polling deadline in milliseconds (default: `:infinity`)
- `:http_timeout` — per-request timeout (default: `config.timeout`)
- `:queue_state_observer` — module implementing `QueueStateObserver` behaviour
- `:telemetry_metadata` — additional metadata for telemetry events
- `:sleep_fun` — custom sleep function for testing (default: `&Process.sleep/1`)

## Awaiting a Single Future

Use `Future.await/2` to block until a polling task completes. This wraps `Task.await/2` but converts exits and timeouts into `{:error, %Tinkex.Error{type: :api_timeout}}` tuples instead of raising.

```elixir
# Basic await
task = Future.poll("req-abc123", config: config)
case Future.await(task, 30_000) do
  {:ok, result} ->
    IO.inspect(result, label: "success")

  {:error, %Tinkex.Error{type: :api_timeout} = error} ->
    IO.puts("Timed out: #{error.message}")

  {:error, %Tinkex.Error{} = error} ->
    IO.puts("Failed: #{error.message}")
end
```

**Timeout semantics:**

- `Future.poll/2`'s `:timeout` — controls how long the polling loop runs
- `Future.await/2`'s timeout — controls how long the caller waits on the task
- These are independent: set both if you want strict deadlines

```elixir
# Poll for max 60s, but caller will wait max 70s
task = Future.poll("req-abc123", config: config, timeout: 60_000)
Future.await(task, 70_000)
```

## Awaiting Multiple Futures

Use `Future.await_many/2` to await multiple polling tasks in parallel. Results are returned in input order, with each entry being `{:ok, result}` or `{:error, %Tinkex.Error{}}`.

```elixir
# Start multiple polls concurrently
tasks = [
  Future.poll("req-1", config: config),
  Future.poll("req-2", config: config),
  Future.poll("req-3", config: config)
]

# Await all (blocks until all complete or time out)
results = Future.await_many(tasks, 30_000)

# Process results in order
Enum.zip(["req-1", "req-2", "req-3"], results)
|> Enum.each(fn {id, result} ->
  case result do
    {:ok, data} -> IO.puts("#{id} succeeded: #{inspect(data)}")
    {:error, error} -> IO.puts("#{id} failed: #{error.message}")
  end
end)
```

**Key behavior:**

- All tasks are awaited independently (no short-circuit on first failure)
- Order is preserved: `results[i]` corresponds to `tasks[i]`
- Exits or timeouts are converted to `{:error, %Tinkex.Error{type: :api_timeout}}`
- Non-raising: you always get a list of results, never an exception

**Practical example: concurrent client creation**

```elixir
# Create multiple sampling clients in parallel
checkpoint_paths = [
  "tinker://run-1/weights/0010",
  "tinker://run-2/weights/0020",
  "tinker://run-3/weights/0030"
]

{:ok, service} = Tinkex.ServiceClient.start_link(config: config)

tasks =
  Enum.map(checkpoint_paths, fn path ->
    Tinkex.SamplingClient.create_async(service, model_path: path)
  end)

results = Task.await_many(tasks, 60_000)

clients =
  results
  |> Enum.zip(checkpoint_paths)
  |> Enum.map(fn {result, path} ->
    case result do
      {:ok, pid} ->
        IO.puts("✓ #{path} -> #{inspect(pid)}")
        pid

      {:error, error} ->
        IO.puts("✗ #{path} failed: #{error.message}")
        nil
    end
  end)
  |> Enum.reject(&is_nil/1)
```

**Training client async creation:**

```elixir
# Create training clients asynchronously
{:ok, service} = Tinkex.ServiceClient.start_link(config: config)

# Async LoRA training client
task = Tinkex.ServiceClient.create_lora_training_client_async(
  service,
  "meta-llama/Llama-3.1-8B",
  rank: 32
)
{:ok, training_client} = Task.await(task, 30_000)

# Async training client from checkpoint
task = Tinkex.ServiceClient.create_training_client_from_state_async(
  service,
  "tinker://run-123/weights/0001"
)
{:ok, restored_client} = Task.await(task, 60_000)

# Async training client from checkpoint (weights + optimizer)
task = Tinkex.ServiceClient.create_training_client_from_state_with_optimizer_async(
  service,
  "tinker://run-123/weights/0001"
)
{:ok, restored_with_opt} = Task.await(task, 60_000)
```

See `examples/async_client_creation.exs` for a complete runnable example.

## Queue State Telemetry

The polling loop emits `[:tinkex, :queue, :state_change]` telemetry events whenever the server sends a `TryAgainResponse` with a queue state transition. Metadata always includes `%{queue_state: atom, request_id: binary}`.

**Queue states:**

- `:active` — request is actively processing
- `:paused_rate_limit` — server is rate-limiting the request
- `:paused_capacity` — server lacks capacity (GPU slots full)
- `:unknown` — unrecognized state from server

**Example telemetry handler:**

```elixir
:telemetry.attach(
  "tinkex-queue-logger",
  [:tinkex, :queue, :state_change],
  fn _event, _measurements, metadata, _config ->
    IO.puts("Queue state: #{metadata.queue_state} (request: #{metadata.request_id})")
  end,
  nil
)

# Now poll a request
task = Future.poll("req-abc123", config: config)
Future.await(task, 30_000)

# Telemetry handler will print state changes during polling
```

**Custom metadata:**

Add your own metadata via `:telemetry_metadata` to correlate events with your application context:

```elixir
task = Future.poll("req-abc123",
  config: config,
  telemetry_metadata: %{
    user_id: "user-123",
    experiment_id: "exp-456"
  }
)
```

Telemetry events will include both your custom metadata and `request_id`.

## QueueStateObserver Behaviour

For more control than telemetry, implement the `Tinkex.QueueStateObserver` behaviour to receive direct callbacks on queue state transitions. This is useful for backpressure tracking or adaptive request scheduling.

**Behaviour definition:**

```elixir
@callback on_queue_state_change(QueueState.t()) :: any()
```

**Example observer:**

```elixir
defmodule MyApp.QueueObserver do
  @behaviour Tinkex.QueueStateObserver
  require Logger

  @impl true
  def on_queue_state_change(queue_state) do
    case queue_state do
      :paused_rate_limit ->
        Logger.warning("Rate limited, backing off new requests")
        MyApp.RequestScheduler.pause()

      :paused_capacity ->
        Logger.warning("Capacity exhausted, waiting for GPU slots")
        MyApp.RequestScheduler.pause()

      :active ->
        Logger.info("Queue active again, resuming requests")
        MyApp.RequestScheduler.resume()

      _ ->
        :ok
    end
  end
end
```

**Using the observer:**

```elixir
task = Future.poll("req-abc123",
  config: config,
  queue_state_observer: MyApp.QueueObserver
)

Future.await(task, 30_000)
```

The observer receives callbacks alongside telemetry events. If the callback crashes, a warning is logged but polling continues.

**Internal usage:**

`SamplingClient` and `TrainingClient` can accept a `:queue_state_observer` option and forward it to `Future.poll/2`. This allows downstream applications to react to queue state changes without modifying client code.

## Timeout Handling

Tinkex provides two levels of timeout control for fine-grained deadline management:

### Poll Timeout

The `:timeout` option in `Future.poll/2` controls the overall polling deadline — how long the polling loop will run before giving up. When exceeded, the task returns `{:error, %Tinkex.Error{type: :api_timeout}}`.

```elixir
# Poll for max 60 seconds
task = Future.poll("req-abc123", config: config, timeout: 60_000)

# If polling exceeds 60s, you get an error
case Future.await(task, :infinity) do
  {:error, %Tinkex.Error{type: :api_timeout, message: msg}} ->
    IO.puts("Poll timeout: #{msg}")
end
```

**Default:** `:infinity` (poll forever)

### Await Timeout

The timeout argument to `Future.await/2` controls how long the caller is willing to wait on the task process. This is independent from the polling timeout and useful for request prioritization.

```elixir
task = Future.poll("req-abc123", config: config)

# Caller waits max 10 seconds
case Future.await(task, 10_000) do
  {:error, %Tinkex.Error{type: :api_timeout}} ->
    IO.puts("Caller gave up after 10s")
end
```

When the await timeout is exceeded, the task is killed with `:brutal_kill` and the caller receives `{:error, %Tinkex.Error{type: :api_timeout}}`.

**Default:** `:infinity` (wait forever)

### HTTP Timeout

The `:http_timeout` option controls the timeout for each individual HTTP request to `retrieve_future`. Defaults to `config.timeout` (typically 60 seconds).

```elixir
task = Future.poll("req-abc123",
  config: config,
  http_timeout: 5_000  # Each HTTP call times out after 5s
)
```

### Combining Timeouts

For production workloads, set all three to enforce strict SLAs:

```elixir
task = Future.poll("req-abc123",
  config: config,
  timeout: 120_000,      # Poll for max 2 minutes
  http_timeout: 10_000   # Each HTTP request times out after 10s
)

case Future.await(task, 150_000) do  # Caller waits max 2.5 minutes
  {:ok, result} -> handle_success(result)
  {:error, error} -> handle_timeout(error)
end
```

## Error Handling

`Future.poll/2` categorizes errors and applies appropriate retry logic:

### User Errors (fail immediately)

When the server returns `%FutureFailedResponse{}` with `category: "user"`, the polling loop fails immediately without retrying. This indicates a permanent error like invalid input.

```elixir
task = Future.poll("req-bad-input", config: config)

case Future.await(task, 30_000) do
  {:error, %Tinkex.Error{type: :request_failed, category: :user, message: msg}} ->
    IO.puts("User error: #{msg}")
    # Don't retry — fix the input and resubmit
end
```

### Server Errors (retry until timeout)

When `category` is `"server"` or `"provider"`, the polling loop retries with exponential backoff until the poll timeout is exceeded. The last error encountered is returned.

```elixir
task = Future.poll("req-flaky", config: config, timeout: 30_000)

case Future.await(task, :infinity) do
  {:error, %Tinkex.Error{type: :request_failed, category: :server, message: msg}} ->
    IO.puts("Server error after retries: #{msg}")
    # Consider exponential backoff before resubmitting
end
```

### Network Errors

HTTP-level errors (connection refused, DNS failure, etc.) are returned as `{:error, %Tinkex.Error{type: :network}}` and do not automatically retry. Handle these at the call site.

### Task Exits

If the polling task crashes or exits unexpectedly, `Future.await/2` converts the exit into `{:error, %Tinkex.Error{type: :api_timeout}}` with the exit reason in the data field.

```elixir
case Future.await(task, 30_000) do
  {:error, %Tinkex.Error{type: :api_timeout, data: %{exit_reason: reason}}} ->
    IO.puts("Task crashed: #{Exception.format_exit(reason)}")
end
```

## Best Practices

### 1. Always Use Futures for Concurrent Operations

Don't block on each request sequentially — start multiple polls and await them in parallel:

```elixir
# Bad: sequential awaits
results =
  Enum.map(request_ids, fn id ->
    task = Future.poll(id, config: config)
    Future.await(task, 30_000)
  end)

# Good: parallel awaits
tasks = Enum.map(request_ids, &Future.poll(&1, config: config))
results = Future.await_many(tasks, 30_000)
```

### 2. Set Explicit Timeouts in Production

Default `:infinity` timeouts are fine for development but can cause resource leaks in production. Always set explicit deadlines:

```elixir
task = Future.poll(request_id,
  config: config,
  timeout: Application.get_env(:my_app, :poll_timeout, 120_000),
  http_timeout: Application.get_env(:my_app, :http_timeout, 10_000)
)

Future.await(task, Application.get_env(:my_app, :await_timeout, 150_000))
```

### 3. Monitor Queue States for Capacity Planning

Use telemetry or custom observers to track `:paused_capacity` events. Frequent capacity pauses indicate you need more GPU slots or should reduce request rate:

```elixir
:telemetry.attach(
  "capacity-alerter",
  [:tinkex, :queue, :state_change],
  fn _event, _measurements, %{queue_state: :paused_capacity}, _config ->
    MyApp.Metrics.increment("tinkex.capacity_pause")
    # Alert if pauses exceed threshold
  end,
  nil
)
```

### 4. Handle Both Success and Failure

Always pattern match on both `{:ok, result}` and `{:error, error}` — network failures and server errors are common in distributed systems:

```elixir
case Future.await(task, 30_000) do
  {:ok, result} ->
    handle_success(result)

  {:error, %Tinkex.Error{type: :request_failed, category: :user} = error} ->
    handle_user_error(error)

  {:error, %Tinkex.Error{} = error} ->
    handle_transient_error(error)
end
```

### 5. Use Task Supervision for Long-Lived Polls

If you need to poll for minutes or hours, supervise the task to ensure it doesn't leak on process crashes:

```elixir
{:ok, task_supervisor} = Task.Supervisor.start_link()

task =
  Task.Supervisor.async_nolink(task_supervisor, fn ->
    poll_task = Future.poll(request_id, config: config, timeout: 3_600_000)
    Future.await(poll_task, :infinity)
  end)

# Task is supervised — if the parent crashes, cleanup happens automatically
```

### 6. Test with Custom Sleep Functions

Inject a no-op sleep function in tests to avoid actual delays:

```elixir
# In test
sleep_fun = fn _ms -> :ok end
task = Future.poll(request_id, config: config, sleep_fun: sleep_fun)

# Polling completes instantly without sleeping
```

## Complete Example

Here's a full example demonstrating futures, async operations, queue state monitoring, and error handling:

```elixir
defmodule MyApp.AsyncSampling do
  alias Tinkex.{ServiceClient, SamplingClient, Future, Config}
  require Logger

  defmodule QueueMonitor do
    @behaviour Tinkex.QueueStateObserver

    @impl true
    def on_queue_state_change(queue_state) do
      Logger.metadata(tinker_queue_state: queue_state)

      case queue_state do
        :paused_rate_limit ->
          Logger.warning("Rate limited — consider reducing request rate")
        :paused_capacity ->
          Logger.warning("Capacity exhausted — GPU slots full")
        :active ->
          Logger.info("Queue active")
        _ ->
          :ok
      end
    end
  end

  def run_concurrent_samples(prompts, opts \\ []) do
    config = Config.new(api_key: System.fetch_env!("TINKER_API_KEY"))

    {:ok, service} = ServiceClient.start_link(config: config)
    {:ok, sampler} =
      ServiceClient.create_sampling_client(service,
        base_model: "meta-llama/Llama-3.1-8B"
      )

    # Attach telemetry
    :telemetry.attach(
      "queue-logger",
      [:tinkex, :queue, :state_change],
      &log_queue_event/4,
      nil
    )

    try do
      # Start all samples concurrently
      tasks =
        Enum.map(prompts, fn prompt ->
          {:ok, model_input} =
            Tinkex.Types.ModelInput.from_text(prompt,
              model_name: "meta-llama/Llama-3.1-8B"
            )

          params = %Tinkex.Types.SamplingParams{max_tokens: 64}

          # Returns Task.t() immediately
          {:ok, task} =
            SamplingClient.sample(sampler, model_input, params,
              num_samples: 1,
              queue_state_observer: QueueMonitor,
              timeout: Keyword.get(opts, :timeout, 120_000),
              await_timeout: Keyword.get(opts, :await_timeout, 150_000)
            )

          task
        end)

      # Await all in parallel
      results = Task.await_many(tasks, Keyword.get(opts, :await_timeout, 150_000))

      # Process results
      Enum.zip(prompts, results)
      |> Enum.map(fn {prompt, result} ->
        case result do
          {:ok, response} ->
            text = hd(response.sequences).tokens
            Logger.info("Prompt: #{prompt}\nResponse: #{text}")
            {:ok, text}

          {:error, error} ->
            Logger.error("Failed: #{prompt}\nError: #{error.message}")
            {:error, error}
        end
      end)
    after
      :telemetry.detach("queue-logger")
      GenServer.stop(sampler)
      GenServer.stop(service)
    end
  end

  defp log_queue_event(_event, _measurements, metadata, _config) do
    Logger.info("Queue state changed",
      request_id: metadata.request_id,
      queue_state: metadata.queue_state
    )
  end
end

# Run it
prompts = [
  "Explain async programming in Elixir",
  "What are the benefits of OTP?",
  "How does Task.async work?"
]

MyApp.AsyncSampling.run_concurrent_samples(prompts,
  timeout: 60_000,
  await_timeout: 90_000
)
```

## What to Read Next

- API overview: `docs/guides/api_reference.md`
- Training loop patterns: `docs/guides/training_loop.md`
- Troubleshooting timeout issues: `docs/guides/troubleshooting.md`
- Getting started with the SDK: `docs/guides/getting_started.md`