defmodule LangChain.Chains.LLMChain do
@moduledoc """
Define an LLMChain. This is the heart of the LangChain library.
The chain deals with tools, a tool map, delta tracking, tracking the messages
exchanged during a run, the last_message tracking, conversation messages, and
verbose logging. Messages and tool results support multi-modal ContentParts,
enabling richer responses (text, images, files, thinking, etc.). ToolResult
content can be a list of ContentParts. The chain also supports
`async_tool_timeout` and improved fallback handling.
## Callbacks
Callbacks are fired as specific events occur in the chain as it is running.
The set of events are defined in `LangChain.Chains.ChainCallbacks`.
To be notified of an event you care about, register a callback handler with
the chain. Multiple callback handlers can be assigned. The callback handler
assigned to the `LLMChain` is not provided to an LLM chat model. For callbacks
on a chat model, set them there.
### Registering a callback handler
A handler is a map with key name for the callback to fire. A function is
assigned to the map key. Refer to the documentation for each function as they
arguments vary.
If we want to be notified when an LLM Assistant chat response message has been
processed and it is complete, this is how we could receive that event in our
running LiveView:
live_view_pid = self()
handler = %{
on_message_processed: fn _chain, message ->
send(live_view_pid, {:new_assistant_response, message})
end
}
LLMChain.new!(%{...})
|> LLMChain.add_callback(handler)
|> LLMChain.run()
In the LiveView, a `handle_info` function executes with the received message.
## Fallbacks
When running a chain, the `:with_fallbacks` option can be used to provide a
list of fallback chat models to try when a failure is encountered.
When working with language models, you may often encounter issues from the
underlying APIs, whether these be rate limiting, downtime, or something else.
Therefore, as you go to move your LLM applications into production it becomes
more and more important to safeguard against these. That's what fallbacks are
designed to provide.
A **fallback** is an alternative plan that may be used in an emergency.
A `before_fallback` function can be provided to alter or return a different
chain to use with the fallback LLM model. This is important because often, the
prompts needed for will differ for a fallback LLM. This means if your OpenAI
completion fails, a different prompt may be needed when retrying it with an
Anthropic fallback.
### Fallback for LLM API Errors
This is perhaps the most common use case for fallbacks. A request to an LLM
API can fail for a variety of reasons - the API could be down, you could have
hit rate limits, any number of things. Therefore, using fallbacks can help
protect against these types of failures.
## Fallback Examples
A simple fallback that tries a different LLM chat model
fallback_llm = ChatAnthropic.new!(%{stream: false})
{:ok, updated_chain} =
%{llm: ChatOpenAI.new!(%{stream: false})}
|> LLMChain.new!()
|> LLMChain.add_message(Message.new_system!("OpenAI system prompt"))
|> LLMChain.add_message(Message.new_user!("Why is the sky blue?"))
|> LLMChain.run(with_fallbacks: [fallback_llm])
Note the `with_fallbacks: [fallback_llm]` option when running the chain.
This example uses the `:before_fallback` option to provide a function that can
modify or return an alternate chain when used with a certain LLM. Also note
the utility function `LangChain.Utils.replace_system_message!/2` is used for
swapping out the system message when falling back to a different LLM.
fallback_llm = ChatAnthropic.new!(%{stream: false})
{:ok, updated_chain} =
%{llm: ChatOpenAI.new!(%{stream: false})}
|> LLMChain.new!()
|> LLMChain.add_message(Message.new_system!("OpenAI system prompt"))
|> LLMChain.add_message(Message.new_user!("Why is the sky blue?"))
|> LLMChain.run(
with_fallbacks: [fallback_llm],
before_fallback: fn chain ->
case chain.llm do
%ChatAnthropic{} ->
# replace the system message
%LLMChain{
chain
| messages:
Utils.replace_system_message!(
chain.messages,
Message.new_system!("Anthropic system prompt")
)
}
_open_ai ->
chain
end
end
)
See `LangChain.Chains.LLMChain.run/2` for more details.
## Run Until Tool Used
The `run_until_tool_used/3` function makes it easy to instruct an LLM to use a
set of tools and then call a specific tool to present the results. This is
particularly useful for complex workflows where you want the LLM to perform
multiple operations and then finalize with a specific action.
This works well for receiving a final structured output after multiple tools
are used.
When the specified tool is successfully called, the chain stops processing and
returns the result. This prevents unnecessary additional LLM calls and
provides a clear termination point for your workflow.
{:ok, %LLMChain{} = updated_chain, %ToolResult{} = tool_result} =
%{llm: ChatOpenAI.new!(%{stream: false})}
|> LLMChain.new!()
|> LLMChain.add_tools([special_search, report_results])
|> LLMChain.add_message(Message.new_system!())
|> LLMChain.add_message(Message.new_user!("..."))
|> LLMChain.run_until_tool_used("final_summary")
The function returns a tuple with three elements:
- `:ok` - Indicating success
- The updated chain with all messages and tool calls
- The specific tool result that matched the requested tool name
### Using Multiple Tool Names
You can also provide a list of tool names to stop when any one of them is called:
{:ok, %LLMChain{} = updated_chain, %ToolResult{} = tool_result} =
%{llm: ChatOpenAI.new!(%{stream: false})}
|> LLMChain.new!()
|> LLMChain.add_tools([search_tool, summary_tool, report_tool])
|> LLMChain.add_message(Message.new_system!())
|> LLMChain.add_message(Message.new_user!("..."))
|> LLMChain.run_until_tool_used(["summary_tool", "report_tool"])
This variant is useful when you have multiple tools that could serve as valid
endpoints for your workflow, and you want the LLM to choose the most appropriate
one based on the context.
To prevent runaway function calls, a default `max_runs` value of 25 is set.
You can adjust this as needed:
# Allow up to 50 runs before timing out
LLMChain.run_until_tool_used(chain, "final_summary", max_runs: 50)
The function also supports fallbacks, allowing you to gracefully handle LLM
failures:
LLMChain.run_until_tool_used(chain, "final_summary",
max_runs: 10,
with_fallbacks: [fallback_llm],
before_fallback: fn chain ->
# Modify chain before using fallback LLM
chain
end
)
See `LangChain.Chains.LLMChain.run_until_tool_used/3` for more details.
## Async Tool Timeout
When tools are defined with `async: true`, they execute in parallel using Elixir's
`Task.async/1`. The `async_tool_timeout` setting controls how long to wait for
these parallel tasks to complete.
**Important**: This timeout only applies to tools with `async: true`. Synchronous
tools (the default) run inline and are not subject to this timeout.
### Default Behavior
The default is `:infinity`, meaning async tools can run indefinitely. This is
appropriate for human-interactive agents where the user can manually stop
execution if needed.
For automated or unattended agents, consider setting a finite timeout.
### Configuration Levels
Timeout can be configured at three levels (highest precedence first):
1. **Chain-level** - Set when creating an LLMChain:
LLMChain.new!(%{
llm: model,
async_tool_timeout: 10 * 60 * 1000 # 10 minutes
})
2. **Application-level** - Set in config/runtime.exs:
config :langchain, async_tool_timeout: 5 * 60 * 1000 # 5 minutes
3. **Library default** - `:infinity` (no timeout)
### When to Use Async Tools
Mark a tool as `async: true` when:
- The operation may take significant time (web requests, file processing)
- Multiple such operations can run in parallel safely
- The tool has no side effects that depend on ordering
Function.new!(%{
name: "web_search",
async: true, # Enables parallel execution
function: fn args, ctx -> ... end
})
### Timeout Values
- `:infinity` - No timeout (wait forever)
- Integer - Milliseconds (e.g., `300_000` for 5 minutes)
"""
use Ecto.Schema
import Ecto.Changeset
require Logger
alias LangChain.Callbacks
alias LangChain.Chains.ChainCallbacks
alias LangChain.PromptTemplate
alias __MODULE__
alias LangChain.Message
alias LangChain.Message.ContentPart
alias LangChain.Message.ToolCall
alias LangChain.Message.ToolResult
alias LangChain.MessageDelta
alias LangChain.Function
alias LangChain.TokenUsage
alias LangChain.LangChainError
alias LangChain.Utils
alias LangChain.NativeTool
alias LangChain.Chains.LLMChain.Modes
@primary_key false
embedded_schema do
field :llm, :any, virtual: true
field :verbose, :boolean, default: false
# verbosely log each delta message.
field :verbose_deltas, :boolean, default: false
field :tools, {:array, :any}, default: [], virtual: true
# set and managed privately through tools
field :_tool_map, :map, default: %{}, virtual: true
# List of `Message` structs for creating the conversation with the LLM.
field :messages, {:array, :any}, default: [], virtual: true
# Custom context data made available to tools when executed.
# Could include information like account ID, user data, etc.
field :custom_context, :any, virtual: true
# A set of message pre-processors to execute on received messages.
field :message_processors, {:array, :any}, default: [], virtual: true
# The maximum consecutive LLM response failures permitted before failing the
# process.
field :max_retry_count, :integer, default: 3
# Internal failure count tracker. Is reset on a successful assistant
# response.
field :current_failure_count, :integer, default: 0
# Track the current merged `%MessageDelta{}` struct received when streamed.
# Set to `nil` when there is no current delta being tracked. This happens
# when the final delta is received that completes the message. At that point,
# the delta is converted to a message and the delta is set to nil.
field :delta, :any, virtual: true
# Track the last `%Message{}` received in the chain.
field :last_message, :any, virtual: true
# Internally managed. The list of exchanged messages during a `run` function
# execution. A single run can result in a number of newly created messages.
# It generates an Assistant message with one or more ToolCalls, the message
# with tool results where some of them may have failed, requiring the LLM to
# try again. This list tracks the full set of exchanged messages during a
# single run.
field :exchanged_messages, {:array, :any}, default: [], virtual: true
# Track if the state of the chain expects a response from the LLM. This
# happens after sending a user message, when a tool_call is received, or
# when we've provided a tool response and the LLM needs to respond.
field :needs_response, :boolean, default: false
# The timeout for async tool execution. An async Task execution is used when
# running a tool that has `async: true` set. Accepts an integer (milliseconds)
# or :infinity. Defaults to :infinity for human-interactive use cases.
# Configure via Application.get_env(:langchain, :async_tool_timeout).
field :async_tool_timeout, :any, virtual: true
# A list of maps for callback handlers
field :callbacks, {:array, :map}, default: []
end
# default to infinity for human-interactive use cases
@default_task_await_timeout :infinity
# Get the async tool timeout from application config or use library default
defp default_async_tool_timeout do
Application.get_env(:langchain, :async_tool_timeout, @default_task_await_timeout)
end
@type t :: %LLMChain{}
@typedoc """
The expected return types for a Message processor function. When successful,
it returns a `:cont` with an Message to use as a replacement. When it
fails, a `:halt` is returned along with an updated `LLMChain.t()` and a new
user message to be returned to the LLM reporting the error.
"""
@type processor_return :: {:cont, Message.t()} | {:halt, t(), Message.t()}
@typedoc """
A message processor is an arity 2 function that takes an
`LangChain.Chains.LLMChain` and a `LangChain.Message`. It is used to
"pre-process" the received message from the LLM. Processors can be chained
together to perform a sequence of transformations.
The return of the processor is a tuple with a keyword and a message. The
keyword is either `:cont` or `:halt`. If `:cont` is returned, the
message is used as the next message in the chain. If `:halt` is returned, the
halting message is returned to the LLM as an error and no further processors
will handle the message.
An example of this is the `LangChain.MessageProcessors.JsonProcessor` which
parses the message content as JSON and returns the parsed data as a map. If
the content is not valid JSON, the processor returns a halting message with an
error message for the LLM to respond to.
"""
@type message_processor :: (t(), Message.t() -> processor_return())
@create_fields [
:llm,
:tools,
:custom_context,
:max_retry_count,
:callbacks,
:verbose,
:verbose_deltas,
:async_tool_timeout
]
@required_fields [:llm]
@doc """
Start a new LLMChain configuration.
{:ok, chain} = LLMChain.new(%{
llm: %ChatOpenAI{model: "gpt-3.5-turbo", stream: true},
messages: [%Message.new_system!("You are a helpful assistant.")]
})
"""
@spec new(attrs :: map()) :: {:ok, t} | {:error, Ecto.Changeset.t()}
def new(attrs \\ %{}) do
%LLMChain{}
|> cast(attrs, @create_fields)
|> common_validation()
|> apply_action(:insert)
end
@doc """
Start a new LLMChain configuration and return it or raise an error if invalid.
chain = LLMChain.new!(%{
llm: %ChatOpenAI{model: "gpt-3.5-turbo", stream: true},
messages: [%Message.new_system!("You are a helpful assistant.")]
})
"""
@spec new!(attrs :: map()) :: t() | no_return()
def new!(attrs \\ %{}) do
case new(attrs) do
{:ok, chain} ->
chain
{:error, changeset} ->
raise LangChainError, changeset
end
end
def common_validation(changeset) do
changeset
|> validate_required(@required_fields)
|> Utils.validate_llm_is_struct()
|> build_tools_map_from_tools()
end
@doc false
def build_tools_map_from_tools(changeset) do
tools = get_field(changeset, :tools, [])
# get a list of all the tools indexed into a map by name
fun_map =
Enum.reduce(tools, %{}, fn f, acc ->
Map.put(acc, f.name, f)
end)
put_change(changeset, :_tool_map, fun_map)
end
@doc """
Add a tool to an LLMChain.
"""
@spec add_tools(t(), NativeTool.t() | Function.t() | [Function.t()]) :: t() | no_return()
def add_tools(%LLMChain{tools: existing} = chain, tools) do
updated = existing ++ List.wrap(tools)
chain
|> change()
|> cast(%{tools: updated}, [:tools])
|> build_tools_map_from_tools()
|> apply_action!(:update)
end
@doc """
Register a set of processors to be applied to received assistant messages.
"""
@spec message_processors(t(), [message_processor()]) :: t()
def message_processors(%LLMChain{} = chain, processors) do
%LLMChain{chain | message_processors: processors}
end
@doc """
Execute a single LLM call step.
This is the core primitive that modes use to call the LLM. It:
1. Sends the chain's messages and tools to the LLM
2. Processes the LLM's response (message or streaming deltas)
3. Adds the response to the chain's messages
4. Sets `needs_response` based on whether tool calls are pending
Returns `{:ok, updated_chain}` or `{:error, chain, reason}`.
This function does NOT execute tool calls — use `execute_tool_calls/1` for that.
## Usage in Custom Modes
def run(chain, opts) do
case LLMChain.execute_step(chain) do
{:ok, updated_chain} ->
updated_chain = LLMChain.execute_tool_calls(updated_chain)
if updated_chain.needs_response, do: run(updated_chain, opts), else: {:ok, updated_chain}
{:error, _chain, _reason} = error ->
error
end
end
"""
@spec execute_step(t()) :: {:ok, t()} | {:error, t(), LangChainError.t()}
def execute_step(%LLMChain{} = chain) do
do_run(chain)
end
@doc """
Run the chain on the LLM using messages and any registered functions. This
formats the request for a ChatLLMChain where messages are passed to the API.
When successful, it returns `{:ok, updated_chain}`
## Options
- `:mode` - It defaults to run the chain one time, stopping after receiving a
response from the LLM. Supports `:until_success`, `:while_needs_response`,
`:step`, or a module implementing the `LangChain.Chains.LLMChain.Mode`
behaviour.
- `mode: :until_success` - (for non-interactive processing done by the LLM
where it may repeatedly fail and need to re-try) Repeatedly evaluates a
received message through any message processors, returning any errors to the
LLM until it either succeeds or exceeds the `max_retry_count`. This includes
evaluating received `ToolCall`s until they succeed. If an LLM makes 3
ToolCalls in a single message and 2 succeed while 1 fails, the success
responses are returned to the LLM with the failure response of the remaining
`ToolCall`, giving the LLM an opportunity to resend the failed `ToolCall`,
and only the failed `ToolCall` until it succeeds or exceeds the
`max_retry_count`. In essence, once we have a successful response from the
LLM, we don't return any more to it and don't want any further responses.
- `mode: :while_needs_response` - (for interactive chats that make
`ToolCalls`) Repeatedly evaluates functions and submits to the LLM so long
as we still expect to get a response. Best fit for conversational LLMs where
a `ToolResult` is used by the LLM to continue. After all `ToolCall` messages
are evaluated, the `ToolResult` messages are returned to the LLM giving it
an opportunity to use the `ToolResult` information in an assistant response
message. In essence, this mode always gives the LLM the last word.
- `mode: :step` - (for step-by-step execution control) Executes one step of
the chain: makes an LLM call, processes the message, executes any tool
calls, and then stops. This allows the caller to inspect messages and
modify the chain between steps before deciding whether to continue by
calling `run` again. Perfect for scenarios where you need to examine
each message, update context, or modify the chain state before proceeding.
- `mode: MyCustomMode` - Pass a module implementing the
`LangChain.Chains.LLMChain.Mode` behaviour to use a custom execution loop.
The module's `run/2` callback receives the chain and the full opts keyword
list.
- `should_continue?` - (for automated stepped execution with conditional
stopping) Needs to be used with `mode: :step`, this option accepts a function
that receives the updated chain after each step and returns a boolean
indicating whether to continue. This internally handles the loop logic,
making stepped execution more streamlined for scenarios where you need
to inspect the chain state to determine when to stop (e.g., max iterations,
completion conditions, error thresholds). The function signature is
`(LLMChain.t() -> boolean())`.
- `with_fallbacks: [...]` - Provide a list of chat models to use as a fallback
when one fails. This helps a production system remain operational when an
API limit is reached, an LLM service is overloaded or down, or something
else new an exciting goes wrong.
When all fallbacks fail, a `%LangChainError{type: "all_fallbacks_failed"}`
is returned in the error response.
- `before_fallback: fn chain -> modified_chain end` - A `before_fallback`
function is called before the LLM call is made. **NOTE: When provided, it
also fires for the first attempt.** This allows a chain to be modified or
replaced before running against the configured LLM. This is helpful, for
example, when a different system prompt is needed for Anthropic vs OpenAI.
## Mode Examples
**Use Case**: A chat with an LLM where functions are available to the LLM:
LLMChain.run(chain, mode: :while_needs_response)
This will execute any LLM called functions, returning the result to the LLM,
and giving it a chance to respond to the results.
**Use Case**: An application that exposes a function to the LLM, but we want
to stop once the function is successfully executed. When errors are
encountered, the LLM should be given error feedback and allowed to try again.
LLMChain.run(chain, mode: :until_success)
**Use Case**: Automated stepped execution with a continuation function.
When you want step-by-step control but prefer the loop to be handled
internally based on a condition function.
should_continue_fn = fn updated_chain ->
# Continue while we need a response and haven't hit max iterations
updated_chain.needs_response && Enum.count(updated_chain.exchanged_messages) < 10
end
{:ok, final_chain} = LLMChain.run(chain, mode: :step, should_continue?: should_continue_fn)
**Use Case**: Step-by-step execution where you need control of the loop.
In case you want to inspect the result of each step and decide whether to
continue or not, This is useful for debugging, to stop when you receive a
signal of a guardrail or a specific condition.
{:ok, updated_chain} = LLMChain.run(chain, mode: :step)
# Inspect the result, check tool calls, etc.
if should_continue?(updated_chain) do
# Optionally modify the chain before continuing
modified_chain = updated_chain
|> LLMChain.update_custom_context(%{iteration_count: get_iteration_count() + 1})
|> LLMChain.add_message(Message.new_user!("Continue with the next step"))
{:ok, final_chain} = LLMChain.run(modified_chain, mode: :step)
end
**Use Case**: Custom execution mode:
LLMChain.run(chain, mode: MyApp.Modes.Custom)
"""
@spec run(t(), Keyword.t()) ::
{:ok, t()}
| {:ok, t(), term()}
| {:pause, t()}
| {:error, t(), LangChainError.t()}
def run(chain, opts \\ [])
def run(%LLMChain{} = chain, opts) do
try do
raise_on_obsolete_run_opts(opts)
raise_when_no_messages(chain)
initial_run_logging(chain)
# clear the set of exchanged messages.
chain = clear_exchanged_messages(chain)
# determine which function to run based on the mode.
function_to_run =
case Keyword.get(opts, :mode, nil) do
nil ->
&do_run/1
:while_needs_response ->
fn chain ->
Modes.WhileNeedsResponse.run(chain, opts)
end
:until_success ->
fn chain ->
Modes.UntilSuccess.run(chain, opts)
end
:step ->
fn chain ->
Modes.Step.run(chain, opts)
end
# When "mode" is a module, execute the run/2 function on it.
module when is_atom(module) ->
fn chain -> module.run(chain, opts) end
end
# Add telemetry for chain execution
metadata = %{
chain_type: "llm_chain",
mode: Keyword.get(opts, :mode, "default"),
message_count: length(chain.messages),
tool_count: length(chain.tools)
}
LangChain.Telemetry.span([:langchain, :chain, :execute], metadata, fn ->
# Run the chain and return the success or error results. NOTE: We do not add
# the current LLM to the list and process everything through a single
# codepath because failing after attempted fallbacks returns a different
# error.
if Keyword.has_key?(opts, :with_fallbacks) do
# run function and using fallbacks as needed.
with_fallbacks(chain, opts, function_to_run)
else
# run it directly right now and return the success or error
function_to_run.(chain)
end
end)
rescue
err in LangChainError ->
{:error, chain, err}
end
end
defp initial_run_logging(%LLMChain{verbose: false} = _chain), do: :ok
defp initial_run_logging(%LLMChain{verbose: true} = chain) do
# set the callback function on the chain
if chain.verbose, do: IO.inspect(chain.llm, label: "LLM")
if chain.verbose, do: IO.inspect(chain.messages, label: "MESSAGES")
if chain.verbose, do: IO.inspect(chain.tools, label: "TOOLS")
:ok
end
defp with_fallbacks(%LLMChain{} = chain, opts, run_fn) do
# Sources of inspiration:
# - https://python.langchain.com/v0.1/docs/guides/productionization/fallbacks/
# - https://python.langchain.com/docs/how_to/fallbacks/
# - https://python.langchain.com/docs/how_to/fallbacks/
llm_list = Keyword.fetch!(opts, :with_fallbacks)
before_fallback_fn = Keyword.get(opts, :before_fallback, nil)
# try the chain where we go through the full list of LLMs to try. Add the
# current LLM as the first so all are processed the same way.
try_chain_with_llm(chain, [chain.llm | llm_list], before_fallback_fn, run_fn)
end
# nothing left to try
defp try_chain_with_llm(%LLMChain{} = chain, [], _before_fallback_fn, _run_fn) do
{:error, chain,
LangChainError.exception(
type: "all_fallbacks_failed",
message: "Failed all attempts to generate response"
)}
end
defp try_chain_with_llm(%LLMChain{} = chain, [llm | tail], before_fallback_fn, run_fn) do
%llm_module{} = llm
use_chain = %LLMChain{chain | llm: llm}
use_chain =
if before_fallback_fn do
# use the returned chain from the before_fallback function.
before_fallback_fn.(use_chain)
else
use_chain
end
try do
case run_fn.(use_chain) do
{:ok, result} ->
{:ok, result}
{:error, _error_chain, reason} = error ->
# Check with the chat model if this error should be retried on a
# fallback model or not.
if llm_module.retry_on_fallback?(reason) do
# run attempt received an error. Try again with the next LLM
Logger.warning("LLM call failed, using next fallback. Reason: #{inspect(reason)}")
try_chain_with_llm(use_chain, tail, before_fallback_fn, run_fn)
else
# error should not be retried. Return the error.
error
end
end
rescue
err ->
# Log the error and stack trace, then try again.
Logger.warning(fn ->
"Rescued from exception during with_fallback processing. Error: #{inspect(err)}\nStack trace:\n#{Exception.format(:error, err, __STACKTRACE__)}"
end)
try_chain_with_llm(use_chain, tail, before_fallback_fn, run_fn)
end
end
@doc """
Run the chain until a specific tool call is made. This makes it easy for an
LLM to make multiple tool calls and call a specific tool to return a result,
signaling the end of the operation.
This function accepts either a single tool name as a string, or a list of tool
names. When provided with a list, the chain stops when any one of the specified
tools is called.
## Examples
With a single tool name:
{:ok, %LLMChain{} = updated_chain, %ToolResult{} = tool_result} =
chain
|> LLMChain.run_until_tool_used("final_summary")
With multiple tool names:
{:ok, %LLMChain{} = updated_chain, %ToolResult{} = tool_result} =
chain
|> LLMChain.run_until_tool_used(["summary_tool", "report_tool"])
## Options
- `max_runs`: The maximum number of times to run the chain. To prevent runaway
calls, it defaults to 25. When exceeded, a `%LangChainError{type: "exceeded_max_runs"}`
is returned in the error response.
- `with_fallbacks: [...]` - Provide a list of chat models to use as a fallback
when one fails. This helps a production system remain operational when an
API limit is reached, an LLM service is overloaded or down, or something
else new an exciting goes wrong.
When all fallbacks fail, a `%LangChainError{type: "all_fallbacks_failed"}`
is returned in the error response.
- `before_fallback: fn chain -> modified_chain end` - A `before_fallback`
function is called before the LLM call is made. **NOTE: When provided, it
also fires for the first attempt.** This allows a chain to be modified or
replaced before running against the configured LLM. This is helpful, for
example, when a different system prompt is needed for Anthropic vs OpenAI.
"""
@spec run_until_tool_used(t(), [String.t()] | String.t(), Keyword.t()) ::
{:ok, t(), Message.t()} | {:error, t(), LangChainError.t()}
def run_until_tool_used(chain, tool_name, opts \\ [])
def run_until_tool_used(%LLMChain{} = chain, tool_name, opts) when is_binary(tool_name) do
run_until_tool_used(chain, [tool_name], opts)
end
def run_until_tool_used(%LLMChain{} = chain, tool_names, opts) do
try do
raise_when_no_messages(chain)
initial_run_logging(chain)
chain = clear_exchanged_messages(chain)
mode_opts =
opts
|> Keyword.put(:tool_names, tool_names)
|> Keyword.put_new(:max_runs, 25)
metadata = %{
chain_type: "llm_chain",
mode: "run_until_tool_used",
message_count: length(chain.messages),
tool_count: length(chain.tools)
}
LangChain.Telemetry.span([:langchain, :chain, :execute], metadata, fn ->
Modes.UntilToolUsed.run(chain, mode_opts)
end)
rescue
err in LangChainError ->
{:error, chain, err}
end
end
# internal reusable function for running the chain
@spec do_run(t()) :: {:ok, t()} | {:error, t(), LangChainError.t()}
defp do_run(%LLMChain{current_failure_count: current_count, max_retry_count: max} = chain)
when current_count >= max do
Callbacks.fire(chain.callbacks, :on_retries_exceeded, [chain])
{:error, chain,
LangChainError.exception(
type: "exceeded_failure_count",
message: "Exceeded max failure count"
)}
end
defp do_run(%LLMChain{} = chain) do
# submit to LLM. The "llm" is a struct. Match to get the name of the module
# then execute the `.call` function on that module.
%module{} = chain.llm
# wrap and link the model's callbacks.
use_llm = Utils.rewrap_callbacks_for_model(chain.llm, chain.callbacks, chain)
# filter out any empty lists in the list of messages.
message_response =
case module.call(use_llm, chain.messages, chain.tools) do
{:ok, messages} when is_list(messages) ->
{:ok, Enum.reject(messages, &(&1 == []))}
non_list_or_error ->
non_list_or_error
end
# handle and output response
case message_response do
{:ok, [%Message{} = message]} ->
if chain.verbose, do: IO.inspect(message, label: "SINGLE MESSAGE RESPONSE")
{:ok, process_message(chain, message)}
{:ok, [%Message{} = message | _others] = messages} ->
if chain.verbose, do: IO.inspect(messages, label: "MULTIPLE MESSAGE RESPONSE")
# return the list of message responses. Happens when multiple
# "choices" are returned from LLM by request.
{:ok, process_message(chain, message)}
{:ok, %Message{} = message} ->
if chain.verbose,
do: IO.inspect(message, label: "SINGLE MESSAGE RESPONSE NO WRAPPED ARRAY")
{:ok, process_message(chain, message)}
{:ok, [%MessageDelta{} | _] = deltas} ->
if chain.verbose_deltas, do: IO.inspect(deltas, label: "DELTA MESSAGE LIST RESPONSE")
updated_chain = apply_deltas(chain, deltas)
if chain.verbose,
do: IO.inspect(updated_chain.last_message, label: "COMBINED DELTA MESSAGE RESPONSE")
{:ok, updated_chain}
{:ok, [[%MessageDelta{} | _] | _] = deltas} ->
if chain.verbose_deltas, do: IO.inspect(deltas, label: "DELTA MESSAGE LIST RESPONSE")
updated_chain = apply_deltas(chain, deltas)
if chain.verbose,
do: IO.inspect(updated_chain.last_message, label: "COMBINED DELTA MESSAGE RESPONSE")
{:ok, updated_chain}
{:error, %LangChainError{} = reason} ->
if chain.verbose, do: IO.inspect(reason, label: "ERROR")
{:error, chain, reason}
{:error, string_reason} when is_binary(string_reason) ->
if chain.verbose, do: IO.inspect(string_reason, label: "ERROR")
{:error, chain, LangChainError.exception(message: string_reason)}
{:ok, []} ->
# Empty response — all choices were filtered out (e.g., thinking model
# streaming where all chunks produce empty parsed results). Treat as
# an error rather than crashing with CaseClauseError.
Logger.warning("LLM returned an empty response (no messages or deltas)")
{:error, chain,
LangChainError.exception(
type: "empty_response",
message:
"LLM returned an empty response with no messages. " <>
"This can happen with thinking/reasoning models during streaming."
)}
{:ok, unexpected} ->
Logger.warning("Unexpected LLM response format: #{inspect(unexpected)}")
{:error, chain,
LangChainError.exception(
type: "unexpected_response",
message: "Unexpected response format from LLM: #{inspect(unexpected)}"
)}
end
end
@doc """
Update the LLMChain's `custom_context` map. Passing in a `context_update` map
will by default merge the map into the existing `custom_context`.
Use the `:as` option to:
- `:merge` - Merge update changes in. Default.
- `:replace` - Replace the context with the `context_update`.
"""
@spec update_custom_context(t(), context_update :: %{atom() => any()}, opts :: Keyword.t()) ::
t() | no_return()
def update_custom_context(chain, context_update, opts \\ [])
def update_custom_context(
%LLMChain{custom_context: %{} = context} = chain,
%{} = context_update,
opts
) do
new_context =
case Keyword.get(opts, :as) || :merge do
:merge ->
Map.merge(context, context_update)
:replace ->
context_update
other ->
raise LangChain.LangChainError,
"Invalid update_custom_context :as option of #{inspect(other)}"
end
%LLMChain{chain | custom_context: new_context}
end
def update_custom_context(
%LLMChain{custom_context: nil} = chain,
%{} = context_update,
_opts
) do
# can't merge a map with `nil`. Replace it.
%LLMChain{chain | custom_context: context_update}
end
@doc """
Apply a list of deltas to the chain. When the final delta is received that
completes the message, the LLMChain is updated to clear the `delta` and the
`last_message` and list of messages are updated. The message is processed and
fires any registered callbacks.
"""
@spec apply_deltas(t(), list()) :: t()
def apply_deltas(%LLMChain{} = chain, deltas) when is_list(deltas) do
chain
|> merge_deltas(deltas)
|> delta_to_message_when_complete()
end
@doc """
Merge a list of deltas into the chain.
"""
@spec merge_deltas(t(), list()) :: t()
def merge_deltas(%LLMChain{} = chain, deltas) do
deltas
|> List.flatten()
|> Enum.reduce(chain, fn d, acc -> merge_delta(acc, d) end)
end
@doc """
Merge a received MessageDelta struct into the chain's current delta. The
LLMChain tracks the current merged MessageDelta state. This is able to merge
in TokenUsage received after the final delta.
"""
@spec merge_delta(t(), MessageDelta.t() | TokenUsage.t() | {:error, LangChainError.t()}) :: t()
def merge_delta(%LLMChain{} = chain, %MessageDelta{} = new_delta) do
merged = MessageDelta.merge_delta(chain.delta, new_delta)
updated_chain = %LLMChain{chain | delta: merged}
# Augment tool calls with display_text and fire early identification callbacks.
# Uses display_text presence on ToolCall as the "already processed" signal.
augmented_delta = augment_and_notify_tool_calls(updated_chain, updated_chain.delta)
%LLMChain{updated_chain | delta: augmented_delta}
end
def merge_delta(%LLMChain{} = chain, %TokenUsage{} = usage) do
# OpenAI returns the token usage in a separate chunk after the last delta. We want to merge it into the final delta.
fake_delta = MessageDelta.new!(%{role: :assistant, metadata: %{usage: usage}})
merged = MessageDelta.merge_delta(chain.delta, fake_delta)
%LLMChain{chain | delta: merged}
end
# Handle when the server is overloaded and cancelled the stream on the server side.
def merge_delta(%LLMChain{} = chain, {:error, %LangChainError{type: "overloaded"} = error}) do
cancel_delta(chain, :cancelled, error)
end
# Handle any other error received during streaming (e.g. content filtering, invalid_request_error).
def merge_delta(%LLMChain{} = chain, {:error, %LangChainError{} = error}) do
Logger.warning("Received error during streaming: #{error.message}")
cancel_delta(chain, :cancelled, error)
end
# Unified function to augment tool calls with display_text and optionally
# fire :on_tool_call_identified callbacks. Works on any struct with a
# `tool_calls` field (both MessageDelta and Message).
#
# Uses the presence of `display_text` on each ToolCall as the "already
# processed" signal — no separate tracking map needed.
#
# Options:
# - notify: true (default) — fire :on_tool_call_identified for new tool calls
# - notify: false — only augment display_text, don't fire callbacks
@spec augment_and_notify_tool_calls(t(), struct(), keyword()) :: struct()
defp augment_and_notify_tool_calls(chain, struct, opts \\ [])
defp augment_and_notify_tool_calls(
%LLMChain{} = chain,
%{tool_calls: tool_calls} = struct,
opts
)
when is_list(tool_calls) and tool_calls != [] do
notify = Keyword.get(opts, :notify, true)
augmented_calls =
Enum.map(tool_calls, fn call ->
cond do
# Already augmented — skip
call.display_text != nil ->
call
# No name yet — can't look up
call.name == nil ->
call
# New tool call to augment
true ->
display_text = resolve_display_text(chain._tool_map, call.name)
augmented_call = %{call | display_text: display_text}
if notify do
case chain._tool_map[call.name] do
%Function{} = func ->
Callbacks.fire(chain.callbacks, :on_tool_call_identified, [
chain,
augmented_call,
func
])
nil ->
:ok
end
end
augmented_call
end
end)
%{struct | tool_calls: augmented_calls}
end
defp augment_and_notify_tool_calls(_chain, struct, _opts), do: struct
@doc """
Drop the current delta. This is useful when needing to ignore a partial or
complete delta because the message may be handled in a different way.
"""
@spec drop_delta(t()) :: t()
def drop_delta(%LLMChain{} = chain) do
reset_streaming_state(chain)
end
@spec reset_streaming_state(t()) :: t()
defp reset_streaming_state(%LLMChain{} = chain) do
%LLMChain{chain | delta: nil}
end
# Resolve display text for a tool call by looking up the Function definition.
# Falls back to a humanized version of the tool name.
@spec resolve_display_text(map(), String.t()) :: String.t()
defp resolve_display_text(tool_map, tool_name) do
case tool_map[tool_name] do
%Function{display_text: dt} when not is_nil(dt) -> dt
_ -> Utils.humanize_tool_name(tool_name)
end
end
@doc """
Convert any hanging delta of the chain to a message and append to the chain.
If the delta is `nil`, the chain is returned unmodified.
"""
@spec delta_to_message_when_complete(t()) :: t()
def delta_to_message_when_complete(
%LLMChain{delta: %MessageDelta{status: status} = delta} = chain
)
when status in [:complete, :length] do
# it's complete. Attempt to convert delta to a message
case MessageDelta.to_message(delta) do
{:ok, %Message{} = message} ->
process_message(reset_streaming_state(chain), message)
{:error, reason} ->
# Delta conversion failed. Log the error and clear the delta to prevent
# it from interfering with subsequent API calls.
Logger.warning("Error applying delta message. Reason: #{inspect(reason)}")
reset_streaming_state(chain)
end
end
def delta_to_message_when_complete(%LLMChain{} = chain) do
# either no delta or incomplete
chain
end
# Process an assistant message sequentially through each message processor.
@doc false
@spec run_message_processors(t(), Message.t()) ::
Message.t() | {:halted, Message.t(), Message.t()}
def run_message_processors(
%LLMChain{message_processors: processors} = chain,
%Message{role: :assistant} = message
)
when is_list(processors) and processors != [] do
# start `processed_content` with the message's content as a string
message = %Message{
message
| processed_content: ContentPart.content_to_string(message.content) || ""
}
processors
|> Enum.reduce_while(message, fn proc, m = _acc ->
try do
case proc.(chain, m) do
{:cont, updated_msg} ->
if chain.verbose, do: IO.inspect(proc, label: "MESSAGE PROCESSOR EXECUTED")
{:cont, updated_msg}
{:halt, %Message{} = returned_message} ->
if chain.verbose, do: IO.inspect(proc, label: "MESSAGE PROCESSOR HALTED")
# for debugging help, return the message so-far that failed in the
# processor
{:halt, {:halted, m, returned_message}}
end
rescue
err ->
Logger.warning(fn -> "Exception raised in processor #{inspect(proc)}" end)
{:halt,
{:halted, m,
Message.new_user!("ERROR: An exception was raised! Exception: #{inspect(err)}")}}
end
end)
end
# the message is not an assistant message. Skip message processing.
def run_message_processors(%LLMChain{} = _chain, %Message{} = message) do
message
end
@doc """
Process a newly message received from the LLM. Messages with a role of
`:assistant` may be processed through the `message_processors` before being
generally available or being notified through a callback.
"""
@spec process_message(t(), Message.t()) :: t()
def process_message(%LLMChain{} = chain, %Message{} = message) do
case run_message_processors(chain, message) do
{:halted, failed_message, new_message} ->
if chain.verbose do
IO.inspect(failed_message, label: "PROCESSOR FAILED ON MESSAGE")
IO.inspect(new_message, label: "PROCESSOR FAILURE RESPONSE MESSAGE")
end
# add the received assistant message, then add the newly created user
# return message and return the updated chain
chain
|> increment_current_failure_count()
|> add_message(failed_message)
|> add_message(new_message)
|> fire_callback_and_return(:on_message_processing_error, [failed_message])
|> fire_callback_and_return(:on_error_message_created, [new_message])
%Message{role: :assistant} = updated_message ->
if chain.verbose, do: IO.inspect(updated_message, label: "MESSAGE PROCESSED")
# Augment tool_calls with display_text and fire :on_tool_call_identified.
# For streaming, display_text was already set during merge_delta so this is a no-op.
# For non-streaming, this is the first time we see the tool calls.
augmented_message = augment_and_notify_tool_calls(chain, updated_message)
chain
|> add_message(augmented_message)
|> reset_current_failure_count_if(fn -> !Message.is_tool_related?(augmented_message) end)
|> fire_callback_and_return(:on_message_processed, [augmented_message])
|> fire_usage_callback_and_return(:on_llm_token_usage, [augmented_message])
end
end
@doc """
Add a received Message struct to the chain. The LLMChain tracks the
`last_message` received and the complete list of messages exchanged. Depending
on the message role, the chain may be in a pending or incomplete state where
a response from the LLM is anticipated.
For assistant messages with tool_calls, the tool_calls are automatically
augmented with display_text from the corresponding Function definitions.
This ensures display_text is available to all downstream consumers.
"""
@spec add_message(t(), Message.t()) :: t()
def add_message(%LLMChain{} = chain, %Message{} = new_message) do
# Augment tool_calls with display_text. (no callback — callbacks fire in
# merge_delta for streaming or process_message for non-streaming).
new_message = augment_and_notify_tool_calls(chain, new_message, notify: true)
needs_response =
cond do
new_message.role in [:user, :tool] -> true
Message.is_tool_call?(new_message) -> true
new_message.role in [:system, :assistant] -> false
end
%LLMChain{
chain
| messages: chain.messages ++ [new_message],
last_message: new_message,
exchanged_messages: chain.exchanged_messages ++ [new_message],
needs_response: needs_response
}
end
def add_message(%LLMChain{} = _chain, %PromptTemplate{} = template) do
raise LangChain.LangChainError,
"PromptTemplates must be converted to messages. You can use LLMChain.apply_prompt_templates/3. Received: #{inspect(template)}"
end
@doc """
Add a set of Message structs to the chain. This enables quickly building a chain
for submitting to an LLM.
"""
@spec add_messages(t(), [Message.t()]) :: t()
def add_messages(%LLMChain{} = chain, messages) do
Enum.reduce(messages, chain, fn msg, acc ->
add_message(acc, msg)
end)
end
@doc """
Apply a set of PromptTemplates to the chain. The list of templates can also
include Messages with no templates. Provide the inputs to apply to the
templates for rendering as a message. The prepared messages are applied to the
chain.
"""
@spec apply_prompt_templates(t(), [Message.t() | PromptTemplate.t()], %{atom() => any()}) ::
t() | no_return()
def apply_prompt_templates(%LLMChain{} = chain, templates, %{} = inputs) do
messages = PromptTemplate.to_messages!(templates, inputs)
add_messages(chain, messages)
end
@doc """
Convenience function for setting the prompt text for the LLMChain using
prepared text.
"""
@spec quick_prompt(t(), String.t()) :: t()
def quick_prompt(%LLMChain{} = chain, text) do
messages = [
Message.new_system!(),
Message.new_user!(text)
]
add_messages(chain, messages)
end
@doc """
If the `last_message` from the Assistant includes one or more `ToolCall`s, then the linked
tool is executed. If there is no `last_message` or the `last_message` is
not a `tool_call`, the LLMChain is returned with no action performed.
This makes it safe to call any time.
The `context` is additional data that will be passed to the executed tool.
The value given here will override any `custom_context` set on the LLMChain.
If not set, the global `custom_context` is used.
"""
@spec execute_tool_calls(t(), context :: nil | %{atom() => any()}) :: t()
def execute_tool_calls(chain, context \\ nil)
def execute_tool_calls(%LLMChain{last_message: nil} = chain, _context), do: chain
def execute_tool_calls(
%LLMChain{last_message: %Message{} = message} = chain,
context
) do
if Message.is_tool_call?(message) do
# context to use
use_context = context || chain.custom_context
verbose = chain.verbose
# Get all the tools to call. Accumulate them into a map.
grouped =
Enum.reduce(message.tool_calls, %{async: [], sync: [], invalid: []}, fn call, acc ->
case chain._tool_map[call.name] do
%Function{async: true} = func ->
Map.put(acc, :async, acc.async ++ [{call, func}])
%Function{async: false} = func ->
Map.put(acc, :sync, acc.sync ++ [{call, func}])
# invalid tool call
nil ->
Map.put(acc, :invalid, acc.invalid ++ [{call, nil}])
end
end)
# Fire execution started callbacks for ALL valid tools BEFORE execution
# This is the ONLY place :on_tool_execution_started fires (not during streaming detection)
# The :on_tool_call_identified callback already fired earlier during streaming
# Augment each ToolCall with display_text from Function before firing callback
Enum.each(grouped[:async] ++ grouped[:sync], fn {call, func} ->
Callbacks.fire(chain.callbacks, :on_tool_execution_started, [chain, call, func])
end)
# execute all the async calls. This keeps the responses in order too.
# Return tuple with call and func for callback firing
async_results =
grouped[:async]
|> Enum.map(fn {call, func} ->
Task.async(fn ->
result = execute_tool_call(call, func, verbose: verbose, context: use_context)
{call, func, result}
end)
end)
|> Task.await_many(chain.async_tool_timeout || default_async_tool_timeout())
# Fire completed/failed callbacks for async tools and extract results
async_tool_results =
Enum.map(async_results, fn {call, _func, result} ->
cond do
result.is_interrupt ->
:ok
result.is_error ->
Callbacks.fire(chain.callbacks, :on_tool_execution_failed, [
chain,
call,
result.content
])
true ->
Callbacks.fire(chain.callbacks, :on_tool_execution_completed, [chain, call, result])
end
result
end)
# Execute sync tools with immediate callbacks
sync_tool_results =
Enum.map(grouped[:sync], fn {call, func} ->
result = execute_tool_call(call, func, verbose: verbose, context: use_context)
# Fire completed/failed callback immediately after execution
cond do
result.is_interrupt ->
:ok
result.is_error ->
Callbacks.fire(chain.callbacks, :on_tool_execution_failed, [
chain,
call,
result.content
])
true ->
Callbacks.fire(chain.callbacks, :on_tool_execution_completed, [chain, call, result])
end
result
end)
# log invalid tool calls (can't augment - no func available)
invalid_calls =
Enum.map(grouped[:invalid], fn {call, _} ->
text = "Tool call made to #{call.name} but tool not found"
Logger.warning(text)
# Fire failed callback for invalid tools
Callbacks.fire(chain.callbacks, :on_tool_execution_failed, [chain, call, text])
ToolResult.new!(%{tool_call_id: call.call_id, content: text, is_error: true})
end)
combined_results = async_tool_results ++ sync_tool_results ++ invalid_calls
# Fire interrupt callback if any tools interrupted
interrupted_results = Enum.filter(combined_results, & &1.is_interrupt)
if interrupted_results != [] do
Callbacks.fire(chain.callbacks, :on_tool_interrupted, [chain, interrupted_results])
end
# create a single tool message that contains all the tool results
result_message =
Message.new_tool_result!(%{content: nil, tool_results: combined_results})
# add the tool result message to the chain
updated_chain = LLMChain.add_message(chain, result_message)
# if the tool results had an error, increment the failure counter. If not,
# clear it.
updated_chain =
if Message.tool_had_errors?(result_message) do
# something failed, increment our error counter
LLMChain.increment_current_failure_count(updated_chain)
else
# no errors, clear any errors
LLMChain.reset_current_failure_count(updated_chain)
end
# fire the callbacks
if chain.verbose, do: IO.inspect(result_message, label: "TOOL RESULTS")
updated_chain
|> fire_callback_and_return(:on_message_processed, [result_message])
|> fire_callback_and_return(:on_tool_response_created, [result_message])
else
# Not a complete tool call
chain
end
end
@doc """
Execute tool calls with human decisions (approve, edit, reject).
This is used for Human-in-the-Loop workflows where tool calls need human approval
before execution. Each decision controls how the corresponding tool call is handled:
- `:approve` - Execute the tool with original arguments
- `:edit` - Execute the tool with modified arguments from the decision
- `:reject` - Create an error result without executing the tool
Returns the updated chain with tool results added and callbacks fired.
## Parameters
* `chain` - The LLMChain instance
* `tool_calls` - List of ToolCall structs to execute
* `decisions` - List of decision maps, one per tool call. Each decision must have:
- `:type` - One of `:approve`, `:edit`, or `:reject`
- `:arguments` - (optional, required for `:edit`) The modified arguments map
## Examples
decisions = [
%{type: :approve},
%{type: :edit, arguments: %{"path" => "modified.txt"}},
%{type: :reject}
]
updated_chain = LLMChain.execute_tool_calls_with_decisions(chain, tool_calls, decisions)
"""
@spec execute_tool_calls_with_decisions(t(), [ToolCall.t()], [map()]) :: t()
def execute_tool_calls_with_decisions(%LLMChain{} = chain, tool_calls, decisions)
when is_list(tool_calls) and is_list(decisions) do
use_context = chain.custom_context
verbose = chain.verbose
# Execute each tool based on its decision
results =
tool_calls
|> Enum.zip(decisions)
|> Enum.map(fn {tool_call, decision} ->
case decision.type do
:approve ->
# Execute with original arguments
case chain._tool_map[tool_call.name] do
%Function{} = func ->
Callbacks.fire(chain.callbacks, :on_tool_execution_started, [
chain,
tool_call,
func
])
result =
execute_tool_call(tool_call, func, verbose: verbose, context: use_context)
# Fire completed/failed callback after execution (skip interrupts)
cond do
result.is_interrupt ->
:ok
result.is_error ->
Callbacks.fire(chain.callbacks, :on_tool_execution_failed, [
chain,
tool_call,
result.content
])
true ->
Callbacks.fire(chain.callbacks, :on_tool_execution_completed, [
chain,
tool_call,
result
])
end
result
nil ->
error_msg = "Tool '#{tool_call.name}' not found"
Callbacks.fire(chain.callbacks, :on_tool_execution_failed, [
chain,
tool_call,
error_msg
])
ToolResult.new!(%{
tool_call_id: tool_call.call_id,
name: tool_call.name,
content: error_msg,
is_error: true
})
end
:edit ->
# Execute with edited arguments
edited_args = Map.get(decision, :arguments, %{})
case chain._tool_map[tool_call.name] do
%Function{} = func ->
edited_call = %{tool_call | arguments: edited_args}
# Fire started callback before execution
Callbacks.fire(chain.callbacks, :on_tool_execution_started, [
chain,
edited_call,
func
])
result =
execute_tool_call(edited_call, func, verbose: verbose, context: use_context)
# Fire completed/failed callback after execution (skip interrupts)
cond do
result.is_interrupt ->
:ok
result.is_error ->
Callbacks.fire(chain.callbacks, :on_tool_execution_failed, [
chain,
edited_call,
result.content
])
true ->
Callbacks.fire(chain.callbacks, :on_tool_execution_completed, [
chain,
edited_call,
result
])
end
result
nil ->
error_msg = "Tool '#{tool_call.name}' not found"
Callbacks.fire(chain.callbacks, :on_tool_execution_failed, [
chain,
tool_call,
error_msg
])
ToolResult.new!(%{
tool_call_id: tool_call.call_id,
name: tool_call.name,
content: error_msg,
is_error: true
})
end
:reject ->
# Create rejection result without executing
rejection_msg = "Tool call '#{tool_call.name}' was rejected by a human reviewer."
# Fire failed callback for rejected tools
Callbacks.fire(chain.callbacks, :on_tool_execution_failed, [
chain,
tool_call,
rejection_msg
])
ToolResult.new!(%{
tool_call_id: tool_call.call_id,
name: tool_call.name,
content: rejection_msg,
is_error: false
})
end
end)
# Fire interrupt callback if any tools interrupted
interrupted_results = Enum.filter(results, & &1.is_interrupt)
if interrupted_results != [] do
Callbacks.fire(chain.callbacks, :on_tool_interrupted, [chain, interrupted_results])
end
# Create tool result message
result_message = Message.new_tool_result!(%{content: nil, tool_results: results})
# Add to chain
updated_chain = LLMChain.add_message(chain, result_message)
# Update failure counter based on errors
updated_chain =
if Message.tool_had_errors?(result_message) do
LLMChain.increment_current_failure_count(updated_chain)
else
LLMChain.reset_current_failure_count(updated_chain)
end
# Fire callbacks (same as execute_tool_calls does)
if chain.verbose, do: IO.inspect(result_message, label: "TOOL RESULTS")
updated_chain
|> fire_callback_and_return(:on_message_processed, [result_message])
|> fire_callback_and_return(:on_tool_response_created, [result_message])
end
@doc """
Replace a tool result in the chain's messages by `tool_call_id`.
Delegates to `Message.replace_tool_result/3`.
"""
@spec replace_tool_result(t(), String.t(), ToolResult.t()) :: t()
def replace_tool_result(%LLMChain{} = chain, tool_call_id, %ToolResult{} = new_result) do
updated_messages = Message.replace_tool_result(chain.messages, tool_call_id, new_result)
%{chain | messages: updated_messages}
end
@doc """
Execute the tool call with the tool. Returns the tool's message response.
"""
@spec execute_tool_call(ToolCall.t(), Function.t(), Keyword.t()) :: ToolResult.t()
def execute_tool_call(%ToolCall{} = call, %Function{} = function, opts \\ []) do
verbose = Keyword.get(opts, :verbose, false)
context = Keyword.get(opts, :context, nil)
metadata = %{
tool_name: function.name,
tool_call_id: call.call_id,
async: function.async
}
LangChain.Telemetry.span([:langchain, :tool, :call], metadata, fn ->
try do
if verbose, do: IO.inspect(function.name, label: "EXECUTING FUNCTION")
case Function.execute(function, call.arguments, context) do
{:ok, %ToolResult{} = result} ->
# allow the tool execution to return a ToolResult. Just set the
# tool_call_id and fallback settings for name and display_text. This
# allows the tool to explicitly set the options for the ToolResult.
%{
result
| tool_call_id: call.call_id,
name: result.name || function.name,
display_text: result.display_text || function.display_text
}
{:ok, llm_result, processed_result} ->
if verbose, do: IO.inspect(processed_result, label: "FUNCTION PROCESSED RESULT")
# successful execution and storage of processed_content.
ToolResult.new!(%{
tool_call_id: call.call_id,
content: llm_result,
processed_content: processed_result,
name: function.name,
display_text: function.display_text
})
{:ok, result} ->
if verbose, do: IO.inspect(result, label: "FUNCTION RESULT")
# successful execution.
ToolResult.new!(%{
tool_call_id: call.call_id,
content: result,
name: function.name,
display_text: function.display_text
})
{:interrupt, display_message, interrupt_data} ->
if verbose, do: IO.inspect(display_message, label: "FUNCTION INTERRUPTED")
ToolResult.new!(%{
tool_call_id: call.call_id,
content: display_message,
name: function.name,
display_text: function.display_text,
is_interrupt: true,
interrupt_data: interrupt_data
})
{:error, reason} when is_binary(reason) ->
if verbose, do: IO.inspect(reason, label: "FUNCTION ERROR")
ToolResult.new!(%{
tool_call_id: call.call_id,
content: reason,
name: function.name,
display_text: function.display_text,
is_error: true
})
end
rescue
err ->
Logger.warning(fn ->
"Function #{function.name} failed in execution. Exception: #{LangChainError.format_exception(err, __STACKTRACE__)}"
end)
ToolResult.new!(%{
tool_call_id: call.call_id,
content: "ERROR executing tool: #{inspect(err)}",
is_error: true
})
end
end)
end
@doc """
Remove an incomplete MessageDelta from `delta` and add a Message with the
desired status to the chain.
"""
def cancel_delta(%LLMChain{delta: nil} = chain, _message_status), do: chain
def cancel_delta(%LLMChain{} = chain, message_status) do
cancel_delta(chain, message_status, nil)
end
@doc """
Same as `cancel_delta/2` but stores an optional error in the message's
metadata under `:streaming_error`. This preserves the error reason through the
chain so higher layers (like the Sagents Agent and AgentServer) can detect and
surface it.
"""
def cancel_delta(%LLMChain{delta: nil} = chain, _message_status, _error), do: chain
def cancel_delta(%LLMChain{delta: %MessageDelta{} = delta} = chain, message_status, error) do
# remove the in-progress delta and reset streaming state
updated_chain = reset_streaming_state(chain)
case MessageDelta.to_message(%MessageDelta{delta | status: :complete}) do
{:ok, %Message{} = message} ->
message = %Message{message | status: message_status}
message =
if error do
metadata = (message.metadata || %{}) |> Map.put(:streaming_error, error)
%Message{message | metadata: metadata}
else
message
end
add_message(updated_chain, message)
{:error, _reason} ->
chain
end
end
@doc """
Increments the internal current_failure_count. Returns and incremented and
updated struct.
"""
@spec increment_current_failure_count(t()) :: t()
def increment_current_failure_count(%LLMChain{} = chain) do
%LLMChain{chain | current_failure_count: chain.current_failure_count + 1}
end
@doc """
Reset the internal current_failure_count to 0. Useful after receiving a
successfully returned and processed message from the LLM.
"""
@spec reset_current_failure_count(t()) :: t()
def reset_current_failure_count(%LLMChain{} = chain) do
%LLMChain{chain | current_failure_count: 0}
end
@doc """
Reset the internal current_failure_count to 0 if the function provided returns
`true`. Helps to make the change conditional.
"""
@spec reset_current_failure_count_if(t(), (-> boolean())) :: t()
def reset_current_failure_count_if(%LLMChain{} = chain, fun) do
if fun.() == true do
%LLMChain{chain | current_failure_count: 0}
else
chain
end
end
@doc """
Add another callback to the list of callbacks.
"""
@spec add_callback(t(), ChainCallbacks.chain_callback_handler()) :: t()
def add_callback(%LLMChain{callbacks: callbacks} = chain, additional_callback) do
%LLMChain{chain | callbacks: callbacks ++ [additional_callback]}
end
# a pipe-friendly execution of callbacks that returns the chain
defp fire_callback_and_return(%LLMChain{} = chain, callback_name, additional_arguments)
when is_list(additional_arguments) do
Callbacks.fire(chain.callbacks, callback_name, [chain] ++ additional_arguments)
chain
end
# fire token usage callback in a pipe-friendly function
defp fire_usage_callback_and_return(
%LLMChain{} = chain,
callback_name,
[%{metadata: %{usage: %TokenUsage{} = usage}}]
) do
Callbacks.fire(chain.callbacks, callback_name, [chain, usage])
chain
end
defp fire_usage_callback_and_return(%LLMChain{} = chain, _callback_name, _additional_arguments),
do: chain
defp clear_exchanged_messages(%LLMChain{} = chain) do
%LLMChain{chain | exchanged_messages: []}
end
defp raise_on_obsolete_run_opts(opts) do
if Keyword.has_key?(opts, :callback_fn) do
raise LangChainError,
"The LLMChain.run option `:callback_fn` was removed; see `add_callback/2` instead."
end
end
# Raise an exception when there are no messages in the LLMChain (checked when running)
defp raise_when_no_messages(%LLMChain{messages: []} = _chain) do
raise LangChainError, "LLMChain cannot be run without messages"
end
defp raise_when_no_messages(%LLMChain{} = chain), do: chain
end