Skip to main content

lib/anthropic/client.ex

defmodule Anthropic.Client do
  @moduledoc """
  Struct holding Anthropic API connection configuration.

  Used by `Anthropic.stream/3` to build HTTP requests. Struct-based
  so tests can pass config directly without global state.
  """

  @default_endpoint "https://api.anthropic.com"

  @api_version "2023-06-01"

  @max_retries 3

  @type t :: %__MODULE__{
          api_key: String.t(),
          endpoint: String.t()
        }

  @enforce_keys [:api_key]
  defstruct [
    :api_key,
    endpoint: @default_endpoint
  ]

  @doc "Creates a new client from a keyword list. `:api_key` is required."
  @spec new(keyword()) :: t()
  def new(opts) do
    struct!(__MODULE__, opts)
  end

  @doc """
  Sends a streaming request to the Anthropic Messages API.

  Returns `{:ok, Enumerable.t()}` where each element is a parsed
  JSON map from an SSE `data:` line. Non-data lines are skipped.

  `opts` are merged into the request body (e.g. `model`, `max_tokens`).
  """
  @spec stream(t(), list(map()), keyword()) :: {:ok, Enumerable.t()} | {:error, term()}
  def stream(%__MODULE__{} = client, messages, opts \\ []) do
    do_stream(client, messages, opts, 0)
  end

  defp do_stream(client, messages, opts, retry_count) do
    body = build_request_body(messages, opts)

    case post_streaming(client, body) do
      {:ok, %{status: 200} = resp} ->
        {:ok, sse_stream(resp.body)}

      {:ok, %{status: 429} = resp} when retry_count < @max_retries ->
        retry_after = parse_retry_after(resp)

        Anthropic.Telemetry.event(
          [:rate_limited],
          %{retry_after: retry_after, attempt: retry_count + 1},
          %{endpoint: client.endpoint}
        )

        Process.sleep(retry_after)
        do_stream(client, messages, opts, retry_count + 1)

      {:ok, resp} ->
        error_body = collect_async_body(resp.body)
        {:error, {resp.status, error_body}}

      {:error, reason} ->
        {:error, reason}
    end
  end

  defp parse_retry_after(%{headers: headers}) do
    case Map.get(headers, "retry-after") do
      [value | _] -> String.to_integer(value) * 1000
      _ -> 1000
    end
  end

  defp build_request_body(messages, opts) do
    opts
    |> Keyword.take([:model, :max_tokens, :system, :tools, :temperature, :top_p])
    |> Map.new()
    |> Map.put(:messages, messages)
    |> Map.put(:stream, true)
  end

  defp post_streaming(client, body) do
    Req.post(
      url: client.endpoint <> "/v1/messages",
      headers: [
        {"x-api-key", client.api_key},
        {"anthropic-version", @api_version},
        {"content-type", "application/json"}
      ],
      json: body,
      into: :self,
      receive_timeout: 300_000
    )
  end

  defp collect_async_body(%Req.Response.Async{} = async) do
    Enum.reduce(async, "", fn chunk, acc -> acc <> chunk end)
  end

  defp collect_async_body(body), do: body

  defp sse_stream(async_body) do
    Stream.flat_map(async_body, fn chunk ->
      chunk
      |> String.split("\n")
      |> Enum.filter(&String.starts_with?(&1, "data: "))
      |> Enum.map(fn "data: " <> json -> Jason.decode!(json) end)
      |> Enum.map(&Anthropic.Event.parse/1)
      |> Enum.reject(&(&1 == :skip))
    end)
  end
end