Skip to main content

lib/agent_sea/agent.ex

defmodule AgentSea.Agent do
  @moduledoc """
  An agent is a `GenServer` that owns its conversation state and runs the
  agentic loop: call the provider → if it requests tools, run them concurrently
  and feed the results back → repeat until the model answers or `max_iterations`
  is hit.

  Dependencies (provider, tools) are resolved from the `Config` struct, not
  injected via a constructor. A tool that raises is isolated by a supervised
  Task and folded into an `{:error, _}` result fed back to the model — the agent
  process never dies for a tool fault.
  """

  use GenServer

  alias AgentSea.{Response, ToolCall}

  defmodule Config do
    @moduledoc "Static configuration for an `AgentSea.Agent`."

    @enforce_keys [:name, :model, :provider]
    defstruct [
      :name,
      :description,
      :model,
      # {provider_module, provider_opts}
      :provider,
      :system_prompt,
      :role,
      tools: [],
      temperature: nil,
      max_tokens: nil,
      max_iterations: 10,
      # Optional guardrail hooks: a 1-arity fun applied to the user input before
      # the loop / to the final answer before returning. Each returns
      # `{:ok, content}` (possibly rewritten) or `{:block, reason}`. This is
      # exactly `AgentSea.Guardrails.run/2` partially applied.
      input_guard: nil,
      output_guard: nil,
      # Claude-specific, passed through to the provider:
      thinking: nil,
      effort: nil
    ]

    @type guard :: (String.t() -> {:ok, String.t()} | {:block, term()})

    @type t :: %__MODULE__{
            name: atom() | String.t(),
            description: String.t() | nil,
            model: String.t(),
            provider: {module(), keyword()},
            system_prompt: String.t() | nil,
            role: AgentSea.Role.t() | nil,
            tools: [module()],
            temperature: float() | nil,
            max_tokens: pos_integer() | nil,
            max_iterations: pos_integer(),
            input_guard: guard() | nil,
            output_guard: guard() | nil,
            thinking: term(),
            effort: atom() | nil
          }
  end

  @tool_timeout 30_000

  # --- Client API ---

  @doc "Start an agent process from a `Config`."
  def start_link(%Config{} = config, opts \\ []) do
    GenServer.start_link(__MODULE__, config, opts)
  end

  @doc "Run the agentic loop for `input`. Blocks the caller (not the scheduler)."
  def run(agent, input, ctx \\ %{}) when is_binary(input) do
    GenServer.call(agent, {:run, input, ctx}, :infinity)
  end

  @doc """
  Produce a bid for a task, based on the agent's role/capabilities and model
  price tier. Pure (no provider call) — used by the auction delegation strategy.
  The `task` may be any map/struct exposing `:id`, `:description` and
  `:required_capabilities`.
  """
  def bid(agent, task), do: GenServer.call(agent, {:bid, task})

  @doc "Return the accumulated conversation history (excludes the system prompt)."
  def history(agent), do: GenServer.call(agent, :history)

  @doc "Clear the conversation history."
  def reset(agent), do: GenServer.call(agent, :reset)

  # --- Server ---

  @impl GenServer
  def init(%Config{} = config) do
    {:ok, %{config: config, history: []}}
  end

  @impl GenServer
  def handle_call({:run, input, ctx}, _from, %{config: config, history: history} = state) do
    with {:ok, guarded_input} <- guard(config.input_guard, input, :input),
         messages =
           system_messages(config) ++ history ++ [%{role: :user, content: guarded_input}],
         {:ok, response, final_messages} <- run_loop(messages, config, ctx),
         {:ok, content} <- guard(config.output_guard, response.content, :output) do
      new_history = Enum.reject(final_messages, &(&1.role == :system))
      {:reply, {:ok, %{response | content: content}}, %{state | history: new_history}}
    else
      {:error, _reason} = error -> {:reply, error, state}
    end
  end

  def handle_call({:bid, task}, _from, %{config: config} = state) do
    {:reply, {:ok, compute_bid(config, task)}, state}
  end

  def handle_call(:history, _from, state), do: {:reply, state.history, state}
  def handle_call(:reset, _from, state), do: {:reply, :ok, %{state | history: []}}

  # --- Guardrail hooks ---

  defp guard(nil, content, _stage), do: {:ok, content}

  defp guard(fun, content, stage) when is_function(fun, 1) do
    case fun.(content) do
      {:ok, guarded} -> {:ok, guarded}
      {:block, reason} -> {:error, {:guardrail, stage, reason}}
    end
  end

  # --- The agentic loop ---

  defp run_loop(messages, config, ctx) do
    meta = %{name: config.name, model: config.model}

    :telemetry.span([:agentsea, :agent, :run], meta, fn ->
      result = loop(messages, config, ctx, 0)
      {result, Map.merge(meta, run_stop_metadata(result))}
    end)
  end

  defp loop(messages, %Config{max_iterations: max}, _ctx, iteration)
       when iteration >= max do
    {:error, {:max_iterations, messages}}
  end

  defp loop(messages, %Config{provider: {provider_mod, popts}} = config, ctx, iteration) do
    meta = %{provider: provider_mod, model: config.model, name: config.name, iteration: iteration}

    completion =
      :telemetry.span([:agentsea, :provider, :complete], meta, fn ->
        r = provider_mod.complete(messages, build_opts(config, popts))
        {r, Map.merge(meta, provider_stop_metadata(r))}
      end)

    case completion do
      {:ok, %Response{tool_calls: tool_calls} = response}
      when tool_calls == [] or tool_calls == nil ->
        {:ok, response, messages ++ [assistant_message(response)]}

      {:ok, %Response{tool_calls: tool_calls} = response} ->
        tool_messages = run_tools(tool_calls, config, ctx)
        next = messages ++ [assistant_message(response)] ++ tool_messages
        loop(next, config, ctx, iteration + 1)

      {:error, _reason} = error ->
        error
    end
  end

  # Run every requested tool concurrently under a supervised Task, isolating
  # crashes and enforcing a timeout. Results are returned in request order.
  defp run_tools(tool_calls, config, ctx) do
    AgentSea.ToolTaskSup
    |> Task.Supervisor.async_stream_nolink(
      tool_calls,
      fn call -> run_one_tool(call, config, ctx) end,
      timeout: @tool_timeout,
      on_timeout: :kill_task,
      max_concurrency: max(length(tool_calls), 1)
    )
    |> Enum.zip(tool_calls)
    |> Enum.map(fn
      {{:ok, result}, call} -> tool_result_message(call, result)
      {{:exit, reason}, call} -> tool_result_message(call, {:error, {:tool_crashed, reason}})
    end)
  end

  defp run_one_tool(
         %ToolCall{name: name, arguments: args},
         %Config{tools: tools, name: agent_name},
         ctx
       ) do
    meta = %{tool: name, agent: agent_name}

    :telemetry.span([:agentsea, :tool, :run], meta, fn ->
      result =
        case find_tool(tools, name) do
          nil -> {:error, {:unknown_tool, name}}
          tool -> invoke_tool(tool, args, ctx)
        end

      {result, Map.put(meta, :outcome, elem(result, 0))}
    end)
  end

  defp find_tool(tools, name), do: Enum.find(tools, &(tool_name(&1) == name))

  # Tools may be AgentSea.Tool modules or runtime AgentSea.Tool.Spec values.
  defp tool_name(%AgentSea.Tool.Spec{name: name}), do: name
  defp tool_name(module) when is_atom(module), do: module.name()

  defp invoke_tool(%AgentSea.Tool.Spec{run: run}, args, ctx), do: run.(args, ctx)
  defp invoke_tool(module, args, ctx) when is_atom(module), do: module.run(args, ctx)

  defp tool_description(%AgentSea.Tool.Spec{description: description}), do: description
  defp tool_description(module) when is_atom(module), do: module.description()

  defp tool_schema(%AgentSea.Tool.Spec{schema: schema}), do: schema
  defp tool_schema(module) when is_atom(module), do: module.schema()

  # --- Message helpers ---

  defp system_messages(%Config{} = config) do
    case effective_system_prompt(config) do
      nil -> []
      prompt -> [%{role: :system, content: prompt}]
    end
  end

  # An explicit system_prompt wins; otherwise fall back to the role's prompt.
  defp effective_system_prompt(%Config{system_prompt: sp}) when is_binary(sp), do: sp

  defp effective_system_prompt(%Config{role: %AgentSea.Role{system_prompt: sp}})
       when is_binary(sp),
       do: sp

  defp effective_system_prompt(_), do: nil

  defp assistant_message(%Response{content: content, tool_calls: tool_calls}) do
    %{role: :assistant, content: content, tool_calls: tool_calls || []}
  end

  defp tool_result_message(%ToolCall{} = call, {:ok, value}) do
    %{role: :tool, tool_call_id: call.id, name: call.name, content: stringify(value)}
  end

  defp tool_result_message(%ToolCall{} = call, {:error, reason}) do
    %{role: :tool, tool_call_id: call.id, name: call.name, content: "Error: #{inspect(reason)}"}
  end

  defp stringify(value) when is_binary(value), do: value
  defp stringify(value), do: inspect(value)

  # --- Provider options ---

  defp build_opts(%Config{} = config, popts) do
    computed =
      [model: config.model]
      |> put_opt(:system_prompt, config.system_prompt)
      |> put_opt(:temperature, config.temperature)
      |> put_opt(:max_tokens, config.max_tokens)
      |> put_opt(:thinking, config.thinking)
      |> put_opt(:effort, config.effort)
      |> put_opt(:tools, tool_specs(config.tools))

    Keyword.merge(popts, computed)
  end

  defp tool_specs([]), do: nil

  defp tool_specs(tools) do
    Enum.map(tools, fn tool ->
      %{name: tool_name(tool), description: tool_description(tool), schema: tool_schema(tool)}
    end)
  end

  defp put_opt(opts, _key, nil), do: opts
  defp put_opt(opts, _key, []), do: opts
  defp put_opt(opts, key, value), do: Keyword.put(opts, key, value)

  # --- Bidding (auction delegation) ---

  defp compute_bid(%Config{} = config, task) do
    required = Map.get(task, :required_capabilities, [])
    match = AgentSea.Capability.match(role_capabilities(config), required)

    # Reduce confidence when the agent is missing required capabilities.
    confidence = if match.can_execute, do: match.score, else: match.score * 0.5
    estimated_time = estimate_time(task)

    %AgentSea.Bid{
      agent_name: config.name,
      task_id: Map.get(task, :id),
      confidence: confidence,
      estimated_time: estimated_time,
      estimated_cost: AgentSea.ModelPricing.weight(config.model) * estimated_time,
      capabilities: match.matched,
      reasoning:
        "matched #{length(match.matched)} capabilit(ies); #{length(match.missing)} missing"
    }
  end

  defp role_capabilities(%Config{role: %AgentSea.Role{capabilities: caps}}), do: caps
  defp role_capabilities(_), do: []

  # Crude effort estimate (ms) from task description length.
  defp estimate_time(task) do
    description = Map.get(task, :description, "") || ""
    1000 + div(String.length(description), 100) * 500
  end

  # --- Telemetry stop metadata ---

  defp run_stop_metadata({:ok, %Response{} = response, _messages}),
    do: %{outcome: :ok, stop_reason: response.stop_reason}

  defp run_stop_metadata({:error, reason}), do: %{outcome: :error, reason: reason}

  defp provider_stop_metadata({:ok, %Response{} = response}) do
    %{
      outcome: :ok,
      stop_reason: response.stop_reason,
      input_tokens: response.usage.input_tokens,
      output_tokens: response.usage.output_tokens
    }
  end

  defp provider_stop_metadata({:error, reason}), do: %{outcome: :error, reason: reason}
end