Skip to main content

lib/exagent.ex

defmodule ExAgent do
  @external_resource "README.md"

  @moduledoc "README.md"
             |> File.read!()
             |> String.split("<!-- MDOC -->")
             |> Enum.at(1)

  alias ExAgent.{Model, ModelSettings, ModelRequestParameters, RunContext, Tool, UsageLimits}
  alias ExAgent.Message.{Part, Request, Response, Usage}

  defstruct model: nil,
            instructions: [],
            output_type: :text,
            tools: [],
            settings: %ModelSettings{},
            output_retries: 1,
            tool_timeout: 30_000,
            max_steps: 50,
            usage_limits: nil,
            capabilities: [],
            name: nil

  @type output_type :: :text | module()
  @type t :: %__MODULE__{
          model: Model.model(),
          instructions: [Part.System.t()],
          output_type: output_type(),
          tools: [Tool.t()],
          settings: ModelSettings.t(),
          output_retries: non_neg_integer(),
          tool_timeout: pos_integer(),
          max_steps: pos_integer(),
          usage_limits: ExAgent.UsageLimits.t() | nil,
          capabilities: [module() | struct()],
          name: String.t() | nil
        }

  @type result :: %{
          output: term(),
          messages: [ExAgent.Message.t()],
          new_messages: [ExAgent.Message.t()],
          usage: Usage.t(),
          run_step: non_neg_integer(),
          model: Model.model()
        }

  @output_tool_name "final_result"

  # -------------------------------------------------------------------------
  # Construction
  # -------------------------------------------------------------------------
  @doc "Build an agent from options. Use `:output` for the output spec."
  @spec new(keyword()) :: t()
  def new(opts) when is_list(opts) do
    model =
      case Keyword.fetch!(opts, :model) do
        %_{} = m -> m
        spec -> resolve_model!(spec)
      end

    %__MODULE__{
      model: model,
      instructions: to_instructions(Keyword.get(opts, :instructions)),
      output_type: Keyword.get(opts, :output, Keyword.get(opts, :output_type, :text)),
      tools: Keyword.get(opts, :tools, []),
      settings: ModelSettings.new(Keyword.get(opts, :model_settings, [])),
      output_retries: Keyword.get(opts, :output_retries, 1),
      tool_timeout: Keyword.get(opts, :tool_timeout, 30_000),
      max_steps: Keyword.get(opts, :max_steps, 50),
      usage_limits: Keyword.get(opts, :usage_limits),
      capabilities: Keyword.get(opts, :capabilities, []),
      name: Keyword.get(opts, :name)
    }
  end

  defp resolve_model!(spec) do
    case Model.resolve(spec) do
      {:ok, model} -> model
      {:error, reason} -> raise ArgumentError, inspect(reason)
    end
  end

  defp to_instructions(nil), do: []

  defp to_instructions(instructions) when is_binary(instructions),
    do: [%Part.System{content: instructions}]

  defp to_instructions(list) when is_list(list),
    do: Enum.map(list, &%Part.System{content: &1})

  # When output is an Ecto schema module, the model is forced to call an output
  # tool whose args are the schema; we validate those args (retry on failure).
  # For `:text` there's no output tool and free-text responses are allowed.
  defp output_config(%__MODULE__{output_type: :text}), do: {:text, [], true}

  defp output_config(%__MODULE__{output_type: mod}) when is_atom(mod) do
    tool = %Tool{
      name: @output_tool_name,
      description: "Return the final answer as structured data.",
      parameters_json_schema: ExAgent.OutputSchema.json_schema(mod),
      kind: :output,
      takes_ctx: false,
      call: nil
    }

    {:tool, [tool], false}
  end

  # -------------------------------------------------------------------------
  # Running
  # -------------------------------------------------------------------------
  @doc """
  Run the agent against `prompt`, returning `{:ok, result}` or `{:error, _}`.

  Options:
    * `:deps`              — dependency value threaded into `RunContext`.
    * `:message_history`   — prior `Message.t()` list to continue from.
    * `:model_settings`    — per-run settings overrides.
  """
  @spec run(t(), String.t(), keyword()) :: {:ok, result()} | {:error, term()}
  def run(agent, prompt, opts \\ []) do
    start = System.monotonic_time()

    ExAgent.Telemetry.execute([:run, :start], %{system_time: start}, %{
      agent: agent.name,
      prompt: prompt
    })

    state = init_state(agent, prompt, opts)

    emit(state, :run_started, %{prompt: prompt})
    result = drive(state)
    duration = System.monotonic_time() - start

    case result do
      {:ok, %{usage: usage, run_step: steps} = res} ->
        emit(state, :run_finished, %{
          output: res.output,
          usage: usage,
          steps: steps
        })

        ExAgent.Telemetry.execute([:run, :stop], %{duration: duration}, %{
          agent: agent.name,
          usage: usage,
          steps: steps
        })

      {:error, reason} ->
        emit(state, :run_failed, %{reason: reason})

        ExAgent.Telemetry.execute([:run, :exception], %{duration: duration}, %{
          agent: agent.name,
          reason: reason
        })
    end

    result
  end

  @doc "Run synchronously and return the output value directly, raising on error."
  @spec run!(t(), String.t(), keyword()) :: term()
  def run!(agent, prompt, opts \\ []) do
    case run(agent, prompt, opts) do
      {:ok, %{output: out}} -> out
      {:error, reason} -> raise ExAgent.UnexpectedModelBehavior, inspect(reason)
    end
  end

  @doc ~S"""
  Run the agent, returning a **lazy stream** of events as the model generates.

  This is the streaming variant. It yields:

    * `{:delta, binary}` — incremental output text (one per model text chunk),
    * `{:result, map}` — the final result once the stream completes.

  It is text-focused: best suited to chat / response-streaming UIs. Tool calls
  emitted mid-stream are not re-executed in this path (use `run/3` for full
  agentic tool loops). The same per-run options as `run/3` apply.
  """
  @spec run_stream(t(), String.t(), keyword()) :: Enumerable.t()
  def run_stream(agent, prompt, opts \\ []) do
    state = init_state(agent, prompt, opts)
    stream = Model.request_stream(state.model, state.messages, state.settings, state.params)

    Stream.transform(stream, %{run: state, text: <<>>}, fn
      {:text_delta, chunk}, acc ->
        {[{:delta, chunk}], %{acc | text: acc.text <> chunk}}

      {:response, %Response{} = resp}, acc ->
        {[stream_result(acc, resp)], acc}

      {:error, reason}, acc ->
        {[{:error, reason}, {:result, %{error: reason}}], acc}
    end)
  end

  defp stream_result(acc, %Response{} = resp) do
    text = acc.text
    output = if text == "", do: Response.text(resp), else: text
    messages = acc.run.messages ++ [resp]
    usage = merge_usage(acc.run.usage, resp.usage)

    {:result, %{output: output, messages: messages, usage: usage}}
  end

  # ----- per-run state -----------------------------------------------------
  defmodule Run do
    @moduledoc false
    defstruct agent: nil,
              model: nil,
              messages: [],
              # index into `messages` where this run's new messages begin
              first_new_message_index: 0,
              usage: %ExAgent.Message.Usage{input_tokens: 0, output_tokens: 0},
              deps: nil,
              settings: nil,
              params: nil,
              prompt: nil,
              output_retries_used: 0,
              # per-tool-name failure counters (drives the per-tool retry budget)
              tool_retries: %{},
              tool_timeout: 30_000,
              usage_limits: nil,
              capabilities: [],
              # transient override of messages sent to the model (set by capabilities)
              request_messages: nil,
              run_step: 0,
              max_steps: 50,
              # optional event sink: (ExAgent.RunEvent.t() -> any()). No-op by
              # default so one-shot callers that don't pass :on_event pay nothing.
              on_event: nil,
              run_id: nil,
              # number of tool calls executed so far this run (drives tool_calls_limit)
              tool_calls: 0,
              # optional (Usage.t() -> cents()) for max_budget_cents enforcement
              cost_estimator: nil,
              # optional ExAgent.Permissions.t() for per-tool admission control
              permissions: nil,
              # optional (tool_call -> :approve | :deny) for :ask permissions
              approve: nil
  end

  defp init_state(agent, prompt, opts) do
    history = Keyword.get(opts, :message_history, [])

    settings =
      ModelSettings.merge(
        agent.settings,
        ModelSettings.new(Keyword.get(opts, :model_settings, []))
      )

    {output_mode, output_tools, allow_text} = output_config(agent)

    params = %ModelRequestParameters{
      function_tools: agent.tools,
      output_tools: output_tools,
      output_mode: output_mode,
      allow_text_output: allow_text,
      instructions: agent.instructions
    }

    # On the first turn of a conversation we prepend the agent's system
    # instructions into the canonical history (so they survive serialization and
    # are seen by every subsequent run). When *continuing* an existing
    # conversation (non-empty history that already carries the instructions),
    # we only append the new user prompt — no duplicated system messages.
    prepend_instructions? = history == [] and Keyword.get(opts, :prepend_instructions, true)

    user = %Part.User{content: prompt, timestamp: DateTime.utc_now()}

    first_parts = if prepend_instructions?, do: agent.instructions ++ [user], else: [user]

    first_request = %Request{
      parts: first_parts,
      timestamp: DateTime.utc_now()
    }

    %Run{
      agent: agent,
      model: agent.model,
      messages: history ++ [first_request],
      first_new_message_index: length(history),
      deps: Keyword.get(opts, :deps),
      settings: settings,
      params: params,
      prompt: prompt,
      tool_timeout: agent.tool_timeout,
      max_steps: agent.max_steps,
      usage_limits: agent.usage_limits,
      capabilities: agent.capabilities,
      on_event: Keyword.get(opts, :on_event),
      run_id: Keyword.get(opts, :run_id),
      cost_estimator: Keyword.get(opts, :estimate_cost),
      permissions: Keyword.get(opts, :permissions),
      approve: Keyword.get(opts, :approve)
    }
  end

  # ----- the loop ----------------------------------------------------------
  # Emit a loop event to the optional :on_event sink. No-op when unset, so the
  # pure one-shot path is unaffected. `state` is read for run_id/step context.
  defp emit(%Run{on_event: nil}, _type, _data), do: :ok

  defp emit(%Run{on_event: fun} = state, type, data) when is_function(fun, 1) do
    event = ExAgent.RunEvent.new(type, run_id: state.run_id, step: state.run_step, data: data)
    fun.(event)
  rescue
    # A misbehaving event sink must never break a run.
    _ -> :ok
  end

  defp drive(%Run{run_step: step, max_steps: max} = _state) when step >= max,
    do: {:error, {:max_steps_exceeded, max}}

  defp drive(%Run{} = state) do
    %Run{model: model, settings: settings, params: params, capabilities: caps} = state

    case check_usage_limits(state) do
      :ok ->
        state = %{state | run_step: state.run_step + 1}
        emit(state, :run_step_started, %{step: state.run_step})
        # capabilities may set a transient request_messages override
        state = ExAgent.Capabilities.before_model_request(caps, state)
        request_messages = state.request_messages || state.messages

        case Model.request(model, request_messages, settings, params) do
          {:ok, %Response{} = response, model} ->
            state = %{state | model: model, request_messages: nil}
            state = %{state | messages: append(state.messages, response)}
            state = %{state | usage: merge_usage(state.usage, response.usage)}
            state = ExAgent.Capabilities.after_model_request(caps, state)
            handle_response(response, state)

          {:error, reason} ->
            {:error, {:model_request_failed, reason}}
        end

      {:error, _} = e ->
        e
    end
  end

  defp check_usage_limits(%Run{usage_limits: nil}), do: :ok

  defp check_usage_limits(%Run{
         usage_limits: limits,
         usage: usage,
         run_step: step,
         cost_estimator: estimator
       }) do
    cost = if is_function(estimator, 1), do: estimator.(usage), else: 0
    UsageLimits.check_before_request(limits, usage, step, cost)
  end

  # call_tools_node: decide what to do with the model's response.
  defp handle_response(%Response{} = response, %Run{} = state) do
    tool_calls = Response.tool_calls(response)
    text = Response.text(response)

    cond do
      tool_calls != [] ->
        handle_tool_calls(tool_calls, state)

      response.finish_reason == :length ->
        {:error, {:max_tokens_exceeded, response.model_name}}

      response.finish_reason == :content_filter ->
        {:error, {:content_filter, response.model_name}}

      state.params.allow_text_output and text != "" ->
        finalize_text(text, state)

      true ->
        retry_or_fail(state, actionable_hint(state))
    end
  end

  defp actionable_hint(%Run{params: %{output_tools: [_ | _]}}),
    do: "Please call the #{@output_tool_name} tool to return your answer."

  defp actionable_hint(%Run{}), do: "Please respond."

  # ----- tools -------------------------------------------------------------
  defp handle_tool_calls(tool_calls, %Run{} = state) do
    output_names = MapSet.new(state.params.output_tools, & &1.name)

    case Enum.split_with(tool_calls, &MapSet.member?(output_names, &1.tool_name)) do
      # The model occasionally emits MORE than one output call in one response.
      # Only the first is processed; the rest must still get ToolReturns so the
      # assistant→tool_result pairing stays 1:1 and the history replays cleanly
      # on the next provider request (OpenAI/Anthropic reject mismatched pairs).
      {[output_call | extra_output_calls], siblings} ->
        handle_output_call(output_call, extra_output_calls ++ siblings, state)

      {[], fn_calls} ->
        with :ok <- check_tool_calls_limit(state, length(fn_calls)),
             {:ok, parts, new_retries} <- execute_function_tools(fn_calls, state) do
          # Merge any token usage contributed by tools (e.g. a delegated
          # sub-agent run) into the run's accumulated usage.
          state =
            Enum.reduce(parts, state, fn
              %Part.ToolReturn{usage: %Usage{} = u}, st ->
                %{st | usage: merge_usage(st.usage, u)}

              _part, st ->
                st
            end)

          state = %{
            state
            | tool_calls: state.tool_calls + length(fn_calls),
              tool_retries: new_retries,
              messages:
                append(state.messages, %Request{parts: parts, timestamp: DateTime.utc_now()})
          }

          drive(state)
        end
    end
  end

  defp check_tool_calls_limit(%Run{usage_limits: nil}, _incoming), do: :ok

  defp check_tool_calls_limit(%Run{usage_limits: limits, tool_calls: executed}, incoming),
    do: UsageLimits.check_tool_calls(limits, executed, incoming)

  # The model returned the structured output: validate it, then finalize. A
  # `ToolReturn` is appended for the output call (plus stubs for any sibling
  # function calls) so the message history stays replayable on the next turn.
  defp handle_output_call(
         %Part.ToolCall{tool_call_id: id} = call,
         siblings,
         %Run{agent: %{output_type: mod}} = state
       )
       when is_atom(mod) and mod != :text do
    with {:ok, args} <- decode_args(call),
         {:ok, data} <- ExAgent.OutputSchema.validate(mod, args) do
      parts = [
        %Part.ToolReturn{tool_name: @output_tool_name, content: "ok", tool_call_id: id}
        | Enum.map(siblings, &stub_return/1)
      ]

      state = %{
        state
        | messages: append(state.messages, %Request{parts: parts, timestamp: DateTime.utc_now()})
      }

      {:ok, result(data, state)}
    else
      {:error, errors} -> retry_or_fail(state, errors, call, Enum.map(siblings, &stub_return/1))
    end
  end

  defp stub_return(%Part.ToolCall{tool_name: name, tool_call_id: id}),
    do: %Part.ToolReturn{
      tool_name: name,
      content: "Tool not executed - a final result was already processed.",
      tool_call_id: id || name
    }

  # Execute a batch of function tools **in parallel** via Task.async_stream
  # (ordered, with per-tool timeout + kill). The actual tool bodies run
  # concurrently (big latency win for I/O tools); the per-tool retry budget is
  # applied sequentially afterwards (pure, cheap). Returns
  # {:ok, parts, retries} or {:error, reason} (terminates the run when a tool
  # blows its budget).
  defp execute_function_tools(calls, %Run{} = state) do
    base_ctx = build_context(state)
    base_retries = state.tool_retries

    results =
      Task.async_stream(
        calls,
        # run_tool_raw is wrapped so a raise/exit inside the task (e.g. a
        # malformed tool_call args value, or a buggy capability hook) is
        # converted into {:error, _} instead of propagating a linked EXIT that
        # would kill the caller. A linked task that raises would otherwise
        # terminate the agent process and violate run/3's {:ok,_}|{:error,_}
        # contract. Task deaths (timeout via on_timeout: :kill_task) are still
        # surfaced as {:exit, _} by the stream itself.
        fn call ->
          try do
            run_tool_raw(call, base_ctx, state)
          catch
            :exit, reason -> {:error, {:task_exit, reason}}
            kind, reason -> {:error, {:task_crash, kind, reason}}
          end
        end,
        ordered: true,
        timeout: state.tool_timeout,
        on_timeout: :kill_task
      )
      |> Enum.zip(calls)

    Enum.reduce_while(results, {:ok, [], base_retries}, fn
      # Success: a tool that succeeds resets its consecutive-failure counter.
      {{:ok, {:ok, part}}, _call}, {:ok, parts, retries} ->
        {:cont, {:ok, parts ++ [part], Map.delete(retries, part.tool_name)}}

      {{:ok, {:error, reason}}, call}, {:ok, parts, retries} ->
        apply_failure(call, reason, retries, parts, state)

      # Any task death (timeout OR a raise/exit inside the task) is a failure.
      {{:exit, reason}, call}, {:ok, parts, retries} ->
        apply_failure(call, {:exit, reason}, retries, parts, state)
    end)
  end

  defp apply_failure(call, reason, retries, parts, state) do
    case decide_retry(call, reason, retries, state) do
      {:ok, part, new_retries} -> {:cont, {:ok, parts ++ [part], new_retries}}
      {:error, _} = e -> {:halt, e}
    end
  end

  # Run a single tool's body to completion (or raise). Used inside a Task.
  defp run_tool_raw(%Part.ToolCall{tool_name: name} = call, ctx, state) do
    start = System.monotonic_time()
    caps = state.capabilities
    call = ExAgent.Capabilities.before_tool_execute(caps, ctx, call)
    tool = find_tool(state, name)

    # Enrich ctx once so it carries tool_name/retry/max_retries into both the
    # tool body AND after_tool_execute (which otherwise gets the base ctx).
    ctx =
      put_tool_info(
        ctx,
        name,
        call.tool_call_id,
        Map.get(state.tool_retries, name, 0),
        max_retries(tool)
      )

    emit(state, :tool_call_started, %{
      tool_name: name,
      tool_call_id: call.tool_call_id,
      args: call.args
    })

    res =
      cond do
        permitted?(state, name, call) == :deny ->
          # Admission control refused this tool — tell the model, don't execute.
          {:ok,
           %Part.ToolReturn{
             tool_name: name,
             content: "Tool #{inspect(name)} is not permitted.",
             tool_call_id: call.tool_call_id
           }}

        true ->
          case tool do
            nil ->
              {:error, "Unknown tool #{inspect(name)}"}

            tool ->
              with {:ok, args} <- decode_args(call) do
                case invoke(tool, ctx, args) do
                  {:ok, value, %Usage{} = contributed} ->
                    {:ok,
                     %Part.ToolReturn{
                       tool_name: tool.name,
                       content: value,
                       tool_call_id: call.tool_call_id,
                       usage: contributed
                     }}

                  {:ok, value} ->
                    {:ok,
                     %Part.ToolReturn{
                       tool_name: tool.name,
                       content: value,
                       tool_call_id: call.tool_call_id
                     }}

                  {:error, _} = e ->
                    e
                end
              end
          end
      end

    res = ExAgent.Capabilities.after_tool_execute(caps, ctx, call, res)

    duration_ms = System.monotonic_time() - start

    emit(state, :tool_call_finished, %{
      tool_name: name,
      tool_call_id: call.tool_call_id,
      success: match?({:ok, _}, res),
      duration_ms: duration_ms
    })

    ExAgent.Telemetry.execute(
      [:tool, :stop],
      %{duration: duration_ms},
      %{tool_name: name, agent: state.agent.name, success: match?({:ok, _}, res)}
    )

    res
  end

  # Apply the per-tool retry budget to a failure (raw reason or :timeout).
  defp decide_retry(%Part.ToolCall{tool_name: name, tool_call_id: id}, reason, retries, state) do
    tool = find_tool(state, name)
    max = if tool, do: tool.max_retries, else: 0
    used_after = Map.get(retries, name, 0) + 1

    if used_after > max do
      {:error, {:unexpected_model_behavior, {:tool_retries_exhausted, name, reason}}}
    else
      {:ok, retry_part(name, id, reason), Map.put(retries, name, used_after)}
    end
  end

  defp find_tool(%Run{agent: agent}, name), do: Enum.find(agent.tools, &(&1.name == name))

  # Per-tool admission control. No permissions configured => everything allowed.
  defp permitted?(%Run{permissions: nil}, _name, _call), do: :allow

  defp permitted?(%Run{permissions: perms, approve: approve}, name, call) do
    perms
    |> ExAgent.Permissions.decide(name)
    |> ExAgent.Permissions.resolve(call, approve)
  end

  defp max_retries(nil), do: 0
  defp max_retries(%Tool{max_retries: m}), do: m

  defp decode_args(%Part.ToolCall{} = call) do
    case Part.ToolCall.args_as_map(call) do
      :empty -> {:ok, %{}}
      {:ok, map} when is_map(map) -> {:ok, map}
      {:error, _} = e -> e
    end
  end

  defp invoke(tool, ctx, args) do
    res =
      try do
        if tool.takes_ctx, do: tool.call.(ctx, args), else: tool.call.(args)
      rescue
        e in ExAgent.ModelRetry -> {:error, e.message}
        e -> {:error, Exception.message(e)}
      end

    case res do
      {:ok, _value, %Usage{}} = ok -> ok
      {:ok, _} = ok -> ok
      {:error, _} = e -> e
      value -> {:ok, value}
    end
  end

  defp retry_part(tool_name, tool_call_id, reason) do
    %Part.Retry{content: reason_msg(reason), tool_name: tool_name, tool_call_id: tool_call_id}
  end

  defp reason_msg(reason) when is_binary(reason), do: reason
  defp reason_msg(reasons) when is_list(reasons), do: Jason.encode!(%{"errors" => reasons})
  defp reason_msg(reason), do: inspect(reason)

  # ----- finalising --------------------------------------------------------
  defp finalize_text(text, %Run{} = state) do
    {:ok, result(text, state)}
  end

  defp retry_or_fail(%Run{} = state, error) do
    retry_or_fail(state, error, %Part.Retry{content: reason_msg(error)}, [])
  end

  defp retry_or_fail(
         %Run{} = state,
         error,
         %Part.ToolCall{tool_name: tool_name, tool_call_id: tool_call_id},
         extra_parts
       ) do
    retry = %Part.Retry{
      content: reason_msg(error),
      tool_name: tool_name,
      tool_call_id: tool_call_id
    }

    retry_or_fail(state, error, retry, extra_parts)
  end

  defp retry_or_fail(
         %Run{output_retries_used: used, agent: %{output_retries: max}},
         error,
         _retry,
         _extra_parts
       )
       when used >= max do
    {:error, {:unexpected_model_behavior, {:output_retries_exhausted, error}}}
  end

  defp retry_or_fail(%Run{} = state, _error, %Part.Retry{} = retry, extra_parts) do
    state = %{state | output_retries_used: state.output_retries_used + 1}

    state = %{
      state
      | messages:
          append(state.messages, %Request{
            parts: [retry | extra_parts],
            timestamp: DateTime.utc_now()
          })
    }

    drive(state)
  end

  # ----- helpers -----------------------------------------------------------
  defp build_context(%Run{
         deps: deps,
         model: model,
         messages: messages,
         usage: usage,
         prompt: prompt
       }) do
    %RunContext{
      deps: deps,
      model: model,
      prompt: prompt,
      messages: messages,
      usage: usage
    }
  end

  defp put_tool_info(%RunContext{} = ctx, tool_name, tool_call_id, retry, max_retries) do
    %{
      ctx
      | tool_name: tool_name,
        tool_call_id: tool_call_id,
        retry: retry,
        max_retries: max_retries
    }
  end

  defp merge_usage(%Usage{} = acc, nil), do: acc

  defp merge_usage(%Usage{} = acc, %Usage{} = resp) do
    %Usage{
      input_tokens: acc.input_tokens + (resp.input_tokens || 0),
      output_tokens: acc.output_tokens + (resp.output_tokens || 0),
      details: sum_details(acc.details, resp.details)
    }
  end

  defp sum_details(a, b) do
    # Sum per-key, but only for numeric values — some providers nest maps in the
    # usage details (e.g. prompt_tokens_details), and adding those would crash.
    # Non-numeric values keep the latest (b's) value.
    Map.merge(a, b, fn _k, x, y ->
      cond do
        is_number(x) and is_number(y) -> x + y
        is_number(y) -> y
        is_number(x) -> x
        true -> y
      end
    end)
  end

  defp append(messages, message), do: messages ++ [message]

  defp result(output, %Run{} = state) do
    new = Enum.drop(state.messages, state.first_new_message_index)

    %{
      output: output,
      messages: state.messages,
      new_messages: new,
      usage: state.usage,
      run_step: state.run_step,
      # The (possibly updated) model struct, so stateful models (e.g. the
      # script-driven Test) can be threaded across runs by the runtime layer.
      model: state.model
    }
  end
end