Skip to main content

lib/skill_kit/agent/sub_loop.ex

defmodule SkillKit.Agent.SubLoop do
  @moduledoc false

  # Private primitive: runs an in-process LLM loop with a scoped system
  # prompt, a resolved tool list, and event forwarding to the parent's
  # caller tagged with a synthetic sub-agent name.
  #
  # Used by:
  #   * `SkillKit.Agent.SkillActivation` — nested skill calls driven by
  #     the LLM's `activate_skill` tool use
  #   * `SkillKit.send_event/3` (future) — webhook deliveries and other
  #     event triggers that want isolated processing with a scoped tool
  #     set and no pollution of the parent's message history
  #
  # Callers prepare the scope (sub_tools, initial_messages, system_append,
  # sub_name, error_prefix) and this module runs the LLM + tool-dispatch
  # loop until a final assistant message with no tool calls. Returns the
  # final text (or an error string prefixed with `error_prefix`).

  alias SkillKit.Agent.Server
  alias SkillKit.Agent.SkillActivation
  alias SkillKit.Agent.StreamAccumulator
  alias SkillKit.Event.Delta
  alias SkillKit.Event.Done
  alias SkillKit.Event.ToolCallComplete
  alias SkillKit.Event.ToolCallStart
  alias SkillKit.Event.Usage
  alias SkillKit.LLM
  alias SkillKit.ToolExecution
  alias SkillKit.Types.AssistantMessage
  alias SkillKit.Types.ToolCall
  alias SkillKit.Types.ToolResult

  @type sub_tool :: {module(), map(), SkillKit.Tool.t()}

  @type config :: %{
          required(:system_append) => String.t(),
          required(:initial_messages) => [term()],
          required(:sub_tools) => [sub_tool()],
          required(:sub_name) => String.t(),
          required(:error_prefix) => String.t(),
          optional(:model) => String.t() | nil
        }

  @spec run(Server.t(), config()) :: String.t()
  def run(%Server{} = parent_state, config) do
    system = parent_state.agent.system_prompt <> "\n\n" <> config.system_append
    tool_defs = Enum.map(config.sub_tools, fn {_m, _c, def} -> def end)
    loop(parent_state, config, system, config.initial_messages, tool_defs)
  end

  # -- main loop -----------------------------------------------------------

  defp loop(parent_state, config, system, messages, tool_defs) do
    dispatch_stream(
      LLM.stream(messages,
        model: Map.get(config, :model, parent_state.agent.model),
        system: system,
        tools: tool_defs
      ),
      parent_state,
      config,
      system,
      messages,
      tool_defs
    )
  end

  defp dispatch_stream({:ok, stream}, parent_state, config, system, messages, tool_defs) do
    response = consume_stream(stream, parent_state.agent, config.sub_name)
    handle_response(response, parent_state, config, system, messages, tool_defs)
  end

  defp dispatch_stream({:error, reason}, _parent, config, _sys, _msgs, _td) do
    "#{config.error_prefix}: #{inspect(reason)}"
  end

  defp handle_response(%AssistantMessage{tool_calls: []} = response, _p, _c, _sys, _m, _td) do
    response.content || ""
  end

  defp handle_response(
         %AssistantMessage{tool_calls: tool_calls} = response,
         parent_state,
         config,
         system,
         messages,
         tool_defs
       ) do
    messages = messages ++ [response]
    results = Enum.map(tool_calls, &execute_tool_call(&1, parent_state, config))
    messages = messages ++ results
    loop(parent_state, config, system, messages, tool_defs)
  end

  # -- stream consumption --------------------------------------------------

  defp consume_stream(stream, agent, sub_name) do
    acc =
      Enum.reduce(stream, StreamAccumulator.new(), &process_event(&1, &2, agent, sub_name))

    StreamAccumulator.finalize(acc)
  end

  defp process_event(%Delta{text: text}, acc, agent, sub_name) do
    notify_caller(agent, %Delta{text: text, agent: sub_name})
    %{acc | text: acc.text <> text}
  end

  defp process_event(%ToolCallStart{} = event, acc, agent, sub_name) do
    notify_caller(agent, %{event | agent: sub_name})
    acc
  end

  defp process_event(%ToolCallComplete{} = event, acc, agent, sub_name) do
    notify_caller(agent, %{event | agent: sub_name})
    tool_call = %ToolCall{id: event.id, name: event.name, input: event.input}
    %{acc | tool_calls: acc.tool_calls ++ [tool_call]}
  end

  defp process_event(%Usage{} = usage, acc, agent, sub_name) do
    notify_caller(agent, %{usage | agent: sub_name})
    %{acc | usage: StreamAccumulator.merge_usage(acc.usage, usage)}
  end

  defp process_event(%Done{}, acc, _agent, _sub_name), do: acc
  defp process_event(_other, acc, _agent, _sub_name), do: acc

  # -- tool routing & execution -------------------------------------------

  defp execute_tool_call(
         %ToolCall{id: id, name: name, input: input},
         parent_state,
         config
       ) do
    result = run_tool(find_tool(config.sub_tools, name), id, name, input, parent_state)
    notify_caller(parent_state.agent, %{result | agent: config.sub_name})
    result
  end

  defp find_tool(sub_tools, name) do
    Enum.find(sub_tools, fn {_m, _c, def} -> def.name == name end)
  end

  defp run_tool(nil, id, name, _input, _parent_state) do
    %ToolResult{tool_call_id: id, content: "Unknown tool: #{name}", is_error: true}
  end

  defp run_tool({SkillActivation, _ctx, _def}, id, _name, input, parent_state) do
    SkillActivation.dispatch(parent_state, id, input)
  end

  defp run_tool({module, context, _def}, id, _name, input, _parent_state) do
    exec = %ToolExecution{tool: module, input: input, context: context}
    to_result(ToolExecution.execute(exec), id)
  end

  defp to_result({:ok, execution}, id) do
    %ToolResult{tool_call_id: id, content: extract_output(execution.result)}
  end

  defp to_result({:error, execution}, id) do
    %ToolResult{tool_call_id: id, content: extract_error(execution), is_error: true}
  end

  defp to_result({:pending, _execution}, id) do
    %ToolResult{
      tool_call_id: id,
      content: "Tool suspension is not supported in sub-loops",
      is_error: true
    }
  end

  defp extract_output({:ok, output}), do: ensure_non_empty(output)
  defp extract_output(output) when is_binary(output), do: ensure_non_empty(output)
  defp extract_output(other), do: inspect(other)

  defp extract_error(execution) do
    case execution.result do
      {output, _code} -> ensure_non_empty(output)
      reason when is_binary(reason) -> ensure_non_empty(reason)
      _ -> "Execution failed"
    end
  end

  defp ensure_non_empty(""), do: "(no output)"
  defp ensure_non_empty(nil), do: "(no output)"
  defp ensure_non_empty(str) when is_binary(str), do: str

  # -- notifications -------------------------------------------------------

  defp notify_caller(%{caller: nil}, _event), do: :ok
  defp notify_caller(%{caller: pid}, event), do: send(pid, event)
end