Skip to main content

lib/llm.ex

defmodule LLM do
  @moduledoc """
  Lightweight Elixir client for LLM APIs.

  Supports 4 adapters covering 99% of providers:
  - **OpenAI Chat Completions** — `/v1/chat/completions` (also works with Ollama, Groq, OpenRouter, DeepSeek, xAI, etc.)
  - **OpenAI Responses** — `/v1/responses`
  - **Anthropic Messages** — `/v1/messages`
  - **Gemini** — `/v1beta/models/{model}:generateContent`

  ## Quick Start

      # Simple text generation
      {:ok, response} = LLM.generate("What is Elixir?",
        provider: :openai,
        model: "gpt-4"
      )
      response.message.content  #=> "Elixir is a..."

      # Streaming
      {:ok, response} = LLM.stream("Tell me a story",
        provider: :anthropic,
        model: "claude-sonnet-4-5-20250514",
        on_chunk: &IO.write/1
      )

      # Using provider modules
      {:ok, response} = LLM.generate("Hello",
        provider: LLM.Provider.OpenAI,
        model: "gpt-4"
      )

      # With explicit API key
      {:ok, response} = LLM.generate("Hello",
        provider: {LLM.Provider.OpenAI, api_key: "sk-..."},
        model: "gpt-4"
      )

      # Custom provider (runtime)
      {:ok, response} = LLM.generate("Hello",
        provider: %{
          adapter: LLM.Adapter.Anthropic,
          base_url: "https://my-proxy.com",
          api_key: "sk-ant-..."
        },
        model: "claude-sonnet-4-5-20250514"
      )

      # With tools
      {:ok, response} = LLM.generate("Read mix.exs",
        provider: :openai,
        model: "gpt-4",
        tools: [MyApp.ReadFileTool]
      )

  ## Configuration

      # config/config.exs
      config :llm, :providers,
        openai: [api_key: "sk-..."],
        anthropic: [api_key: "sk-ant-..."]

      # Or at runtime
      LLM.put_key(:openai, "sk-...")

  ## Provider

  A provider can be:
  - An atom preset (`:openai`, `:anthropic`, `:gemini`, `:openrouter`, `:openai_responses`)
  - A provider module (`LLM.Provider.OpenAI`, `LLM.Provider.Anthropic`, etc.)
  - A tuple `{module, opts}` for runtime API key (`{LLM.Provider.OpenAI, api_key: "sk-..."}`)
  - A map with `:adapter`, `:base_url`, and optionally `:api_key`
  """

  @type stream_option ::
          {:provider, atom() | map() | module()}
          | {:model, String.t()}
          | {:max_tokens, non_neg_integer()}
          | {:temperature, float()}
          | {:thinking, atom() | map()}
          | {:tools, [module() | LLM.Tool.t() | {module(), map()}]}
          | {:auto_tools, boolean()}
          | {:max_rounds, non_neg_integer()}
          | {:on_message, (LLM.Message.t() -> any())}
          | {:system, String.t()}
          | {:messages, [LLM.Message.t()]}
          | {:schema, map()}

  @type generate_option ::
          {:provider, atom() | map() | module()}
          | {:model, String.t()}
          | {:max_tokens, non_neg_integer()}
          | {:temperature, float()}
          | {:thinking, atom() | map()}
          | {:tools, [module() | LLM.Tool.t() | {module(), map()}]}
          | {:auto_tools, boolean()}
          | {:max_rounds, non_neg_integer()}
          | {:on_message, (LLM.Message.t() -> any())}
          | {:system, String.t()}
          | {:messages, [LLM.Message.t()]}
          | {:schema, map()}

  @doc """
  Stream a prompt and return the final response.

  Returns `{:ok, response}` on success, `{:error, reason}` on failure.

  Callbacks can be placed in `opts` (2nd arg) or passed separately as a third
  keyword argument (which takes precedence):

      # Callbacks in opts
      {:ok, response} = LLM.stream("Tell me a story",
        provider: :openai,
        model: "gpt-4",
        on_chunk: &IO.write/1
      )

      # Callbacks as separate third argument
      {:ok, response} = LLM.stream("Tell me a story",
        [provider: :openai, model: "gpt-4"],
        on_chunk: &IO.write/1
      )

  ## Callbacks

    * `:on_chunk` — called for each chunk, `fn chunk -> ... end`
    * `:on_message` — called once per completed `LLM.Message`,
      `fn message -> ... end`

  For manual stream control use `LLM.Stream.start/2` and
  `LLM.Stream.collect/2` directly.
  """
  @spec stream(String.t() | LLM.Context.t(), keyword(), keyword()) ::
          {:ok, LLM.Response.t()} | {:error, term()}
  def stream(prompt, opts \\ [], callbacks \\ []) do
    context = to_context(prompt, opts)

    collect_opts = [
      auto_tools: Keyword.get(opts, :auto_tools, true),
      max_rounds: Keyword.get(opts, :max_rounds, 10),
      on_chunk: callbacks[:on_chunk] || opts[:on_chunk],
      on_message: callbacks[:on_message] || opts[:on_message]
    ]

    with {:ok, stream} <- LLM.Stream.start(context, opts),
         {:ok, response} <- LLM.Stream.collect(stream, collect_opts) do
      {:ok, maybe_parse_schema(response, opts)}
    end
  end

  @doc """
  Stream a prompt and return the response, raising on error.
  """
  @spec stream!(String.t() | LLM.Context.t(), keyword(), keyword()) :: LLM.Response.t()
  def stream!(prompt, opts \\ [], callbacks \\ []) do
    case stream(prompt, opts, callbacks) do
      {:ok, response} -> response
      {:error, reason} -> raise "LLM streaming failed: #{inspect(reason)}"
    end
  end

  @doc """
  Generate text (non-streaming). Returns the final response.

  Sends a regular HTTP request and decodes the provider's full response.
  Executes tool calls with follow-up regular HTTP requests when present.

  Callbacks can be placed in `opts` (2nd arg) or passed separately as a third
  keyword argument (which takes precedence):

      # on_message in opts
      {:ok, response} = LLM.generate("What is Elixir?",
        provider: :openai,
        model: "gpt-4",
        on_message: fn msg -> IO.inspect(msg.role) end
      )

      # on_message as separate third argument
      {:ok, response} = LLM.generate("What is Elixir?",
        [provider: :openai, model: "gpt-4"],
        on_message: fn msg -> IO.inspect(msg.role) end
      )

  ## Callbacks

    * `:on_message` — called once per completed `LLM.Message`,
      `fn message -> ... end`

  When `schema:` is set, the model is asked to return JSON matching the
  given schema. On success, `response.parsed` contains the decoded map and the
  last assistant message in `response.messages` also carries `:parsed`. Tool
  auto-execution is disabled for that request since the response is structured
  data, not a tool call round-trip.

  ## Structured output

      {:ok, response} = LLM.generate("Extract the name and age.",
        provider: :openai,
        model: "gpt-4o",
        schema: %{
          "name" => "person",
          "schema" => %{
            "type" => "object",
            "properties" => %{
              "name" => %{"type" => "string"},
              "age" => %{"type" => "integer"}
            },
            "required" => ["name", "age"]
          }
        }
      )
      response.parsed  #=> %{"name" => "Alice", "age" => 30}

  Pass a bare JSON Schema map to use `"output"` as the default name:

      schema: %{"type" => "object", "properties" => %{...}}

  The schema is passed through to the provider unchanged — it must be valid for the
  target provider's structured-output feature (schema requirements vary by provider).
  """
  @spec generate(String.t() | LLM.Context.t(), [generate_option], keyword()) ::
          {:ok, LLM.Response.t()} | {:error, term()}
  def generate(prompt, opts \\ [], callbacks \\ []) do
    opts =
      if opts[:schema],
        do: Keyword.put(opts, :auto_tools, false),
        else: opts

    on_message = callbacks[:on_message] || opts[:on_message]
    opts = if on_message, do: Keyword.put(opts, :on_message, on_message), else: opts

    context = to_context(prompt, opts)

    with {:ok, response} <- do_generate(context, opts, 0, %LLM.Usage{}) do
      {:ok, maybe_parse_schema(response, opts)}
    end
  end

  @doc """
  Generate text, raising on error.
  """
  @spec generate!(String.t() | LLM.Context.t(), [generate_option], keyword()) :: LLM.Response.t()
  def generate!(prompt, opts \\ [], callbacks \\ []) do
    case generate(prompt, opts, callbacks) do
      {:ok, response} -> response
      {:error, reason} -> raise "LLM generation failed: #{inspect(reason)}"
    end
  end

  @doc """
  List available provider presets.
  """
  @spec providers() :: [atom()]
  def providers, do: LLM.Provider.Resolver.list_providers()

  @doc """
  List available models from a provider.

  Returns `{:ok, models}` on success, where `models` is a list of model info maps.
  Returns `{:error, reason}` on failure.

  ## Options

    * `:provider` - provider preset atom, module, or config map (defaults to `:openai`)

  ## Examples

      {:ok, models} = LLM.models(provider: :openai)
      {:ok, models} = LLM.models(provider: :anthropic)

  """
  @spec models(keyword()) :: {:ok, [LLM.Adapter.model_info()]} | {:error, term()}
  def models(opts \\ []) do
    provider = LLM.Provider.Resolver.resolve(opts[:provider] || :openai)

    try do
      provider.adapter.list_models(provider)
    rescue
      UndefinedFunctionError -> {:error, :not_supported}
    end
  end

  @doc """
  Store an API key at runtime.
  """
  @spec put_key(atom(), String.t()) :: :ok
  def put_key(provider_name, api_key) do
    Process.put({__MODULE__, :provider_key, provider_name}, api_key)
    :ok
  end

  @doc """
  Get an API key, checking process dictionary first, then application config.
  """
  @spec get_key(atom()) :: String.t() | nil
  def get_key(provider_name) do
    case Process.get({__MODULE__, :provider_key, provider_name}) do
      nil ->
        case Application.get_env(:llm, :providers, %{}) do
          config when is_list(config) ->
            config[provider_name][:api_key]

          config when is_map(config) ->
            config[provider_name][:api_key]

          _ ->
            nil
        end

      key ->
        key
    end
  end

  # --- Private ---

  defp to_context(prompt, opts) when is_binary(prompt) do
    tools = normalize_tools(opts[:tools] || [])
    previous_messages = opts[:messages] || []

    %LLM.Context{
      system: opts[:system],
      messages: previous_messages ++ [LLM.Message.new(prompt)],
      tools: tools,
      provider_state: %{}
    }
  end

  defp to_context(%LLM.Context{} = ctx, opts) do
    tools = normalize_tools(opts[:tools] || [])
    %{ctx | tools: tools ++ ctx.tools}
  end

  defp normalize_tools(tools) do
    Enum.map(tools, &LLM.Tool.normalize/1)
  end

  defp do_generate(context, opts, rounds, accumulated_usage) do
    max_rounds = Keyword.get(opts, :max_rounds, 10)

    if rounds >= max_rounds do
      request_once(context, opts)
      |> case do
        {:ok, response} ->
          {:ok,
           finalize_generate_response(
             response,
             context,
             :max_rounds,
             accumulated_usage,
             opts[:on_message]
           )}

        {:error, _} = err ->
          err
      end
    else
      with {:ok, response} <- request_once(context, opts) do
        auto_tools = Keyword.get(opts, :auto_tools, true)

        if auto_tools and has_tool_calls?(response) do
          {:ok, next_context} = execute_tool_calls(response.message, context, opts)
          usage = LLM.Usage.add(accumulated_usage, response.usage || %LLM.Usage{})
          do_generate(next_context, opts, rounds + 1, usage)
        else
          {:ok,
           finalize_generate_response(
             response,
             context,
             nil,
             accumulated_usage,
             opts[:on_message]
           )}
        end
      end
    end
  end

  defp request_once(context, opts) do
    provider = LLM.Provider.Resolver.resolve(opts[:provider] || :openai)
    adapter = provider.adapter
    model = opts[:model] || raise ArgumentError, "model is required"

    request_opts = [model: model, stream: false] ++ opts
    request_body = adapter.build_request(context, request_opts)
    path = non_stream_path(adapter, model)
    url = LLM.Stream.build_url(provider.base_url, path)
    headers = LLM.Stream.build_headers(provider, request_opts)

    req =
      Req.new(
        url: url,
        method: :post,
        headers: headers,
        json: request_body
      )

    case LLM.HTTPClient.request(req) do
      {:ok, %Req.Response{status: 200, body: body}} when is_map(body) ->
        adapter.decode_response(body)

      {:ok, %Req.Response{status: 200, body: body}} when is_binary(body) ->
        with {:ok, decoded} <- Jason.decode(body),
             {:ok, response} <- adapter.decode_response(decoded) do
          {:ok, response}
        else
          {:error, %Jason.DecodeError{} = error} -> {:error, {:invalid_json, error}}
          {:error, _} = err -> err
        end

      {:ok, %Req.Response{status: 200, body: body}} ->
        {:error, {:unexpected_response_body, body}}

      {:ok, %Req.Response{status: status, body: body} = response}
      when status != 200 ->
        {:error, %{status: status, body: LLM.Stream.normalize_error_body(body, response)}}

      {:error, _} = err ->
        err
    end
  end

  defp non_stream_path(adapter, model) do
    Code.ensure_loaded(adapter)

    raw_path =
      if function_exported?(adapter, :non_stream_path, 0),
        do: adapter.non_stream_path(),
        else: adapter.stream_path()

    LLM.Stream.build_path(raw_path, model)
  end

  defp has_tool_calls?(%LLM.Response{message: %LLM.Message{tools: tools}}),
    do: is_list(tools) and tools != []

  defp execute_tool_calls(%LLM.Message{} = assistant_message, context, opts) do
    on_message = opts[:on_message]
    tools = Enum.map(context.tools, &LLM.Tool.normalize/1)

    results =
      assistant_message.tools
      |> Task.async_stream(
        fn call ->
          tool = Enum.find(tools, fn t -> t.name == call.name end)

          result =
            if tool do
              try do
                case tool.execute.(call.args, %{messages: context.messages}) do
                  {:ok, result} -> to_string(result)
                  {:error, err} -> "Error: #{err}"
                end
              rescue
                e -> "Error: #{Exception.message(e)}"
              end
            else
              "Error: Unknown tool #{inspect(call.name)}"
            end

          {:ok, %{id: call.id, name: call.name, content: result}}
        end,
        ordered: true,
        timeout: Keyword.get(opts, :tool_timeout, 30_000)
      )
      |> Enum.map(fn
        {:ok, {:ok, result}} -> result
        {:ok, {:exit, reason}} -> %{id: nil, name: nil, content: "Error: #{inspect(reason)}"}
        {:exit, reason} -> %{id: nil, name: nil, content: "Error: #{inspect(reason)}"}
      end)

    tool_result_messages =
      Enum.map(results, fn r ->
        %LLM.Message{role: :tool, tool_call_id: r.id, name: r.name, content: r.content}
      end)

    if on_message do
      on_message.(assistant_message)
      Enum.each(tool_result_messages, on_message)
    end

    {:ok, %{context | messages: context.messages ++ [assistant_message] ++ tool_result_messages}}
  end

  defp finalize_generate_response(
         response,
         context,
         forced_stop_reason,
         accumulated_usage,
         on_message
       ) do
    response_usage = response.usage || %LLM.Usage{}
    total_usage = LLM.Usage.add(accumulated_usage, response_usage)
    message = %{response.message | usage: response.message.usage || response_usage}

    if on_message, do: on_message.(message)

    %{
      response
      | message: message,
        messages: context.messages ++ [message],
        usage: total_usage,
        stop_reason: forced_stop_reason || response.stop_reason
    }
  end

  defp maybe_parse_schema(response, opts) do
    case opts[:schema] do
      nil -> response
      _ -> put_parsed_on_response_and_message(response)
    end
  end

  defp put_parsed_on_response_and_message(response) do
    parsed = parse_schema(response)
    message = %{response.message | parsed: parsed}
    messages = update_last_message(response.messages, message)

    %{response | parsed: parsed, message: message, messages: messages}
  end

  defp update_last_message(nil, _message), do: nil
  defp update_last_message([], _message), do: []

  defp update_last_message(messages, message),
    do: List.update_at(messages, -1, fn _ -> message end)

  # Anthropic tool-forcing: the result is in the __structured_output__ tool call args.
  defp parse_schema(%LLM.Response{message: %LLM.Message{tools: tools} = msg})
       when is_list(tools) do
    if Enum.any?(tools, &(&1.name == "__structured_output__")) or
         Enum.any?(tools, &(&1[:name] == "__structured_output__")) do
      case Enum.find(tools, &(&1.name == "__structured_output__")) do
        %{args: args} when is_map(args) -> args
        _ -> nil
      end
    else
      parse_schema(%LLM.Response{message: %{msg | tools: nil}})
    end
  end

  # OpenAI / Gemini: the result is JSON in message content.
  defp parse_schema(%LLM.Response{message: %LLM.Message{content: content}})
       when is_binary(content) do
    case Jason.decode(content) do
      {:ok, parsed} when is_map(parsed) -> parsed
      _ -> nil
    end
  end

  defp parse_schema(_), do: nil
end