defmodule Handoff do
@moduledoc """
Handoff is a library for building and executing Directed Acyclic Graphs (DAGs) of functions.
It provides tools for defining computation graphs, managing resources, and executing
the graphs in a distributed environment.
"""
@doc """
Creates a new DAG instance.
"""
def new do
Handoff.DAG.new()
end
@doc """
Starts the Handoff supervision tree.
Must be called before executing any DAGs.
"""
def start(opts \\ []) do
Handoff.Supervisor.start_link(opts)
end
@doc """
Executes all functions in a DAG, respecting dependencies.
## Parameters
- dag: The DAG to execute
- opts: Optional execution settings
- :allocation_strategy - Strategy for allocating functions to nodes
(:first_available or :load_balanced, defaults to :first_available)
## Returns
- `{:ok, %{dag_id: dag_id, results: results_map}}` with the DAG ID and a map of function IDs to results on success
- `{:error, reason}` on failure
"""
def execute(dag, opts \\ []) do
# Use DistributedExecutor, which handles local execution if no other nodes are present.
Handoff.DistributedExecutor.execute(dag, opts)
end
@doc """
Executes all functions in a DAG across multiple nodes, respecting dependencies.
## Parameters
- dag: The DAG to execute
- opts: Optional execution settings
- :allocation_strategy - Strategy for allocating functions to nodes
(:first_available or :load_balanced, defaults to :first_available)
- :max_retries - Maximum number of times to retry failed functions (default: 3)
## Returns
- `{:ok, %{dag_id: dag_id, results: results_map}}` with the DAG ID and a map of function IDs to results on success
- `{:error, reason}` on failure
"""
def execute_distributed(dag, opts \\ []) do
Handoff.DistributedExecutor.execute(dag, opts)
end
@doc """
Executes all functions in a DAG strictly on the local node, bypassing resource allocation
and ensuring all functions have `node` set to `Node.self()` and `cost` set to `nil`.
This is useful for ensuring local execution regardless of global configuration or function definitions.
## Parameters
- dag: The DAG to execute
- opts: Optional execution settings (passed to `Handoff.DistributedExecutor`)
## Returns
- Same as `Handoff.execute/2`.
"""
def execute_local(dag, opts \\ []) do
# TODO: use simpler topo-sort + reduce strategy for executing the graph locally.
dag =
update_in(dag, [Access.key(:functions), Access.all()], fn {id, func} ->
modified_func = %{func | node: Node.self(), cost: nil}
{id, modified_func}
end)
Handoff.DistributedExecutor.execute(dag, opts)
end
@doc """
Registers a node with its resource capabilities.
## Parameters
- node: The node to register
- caps: Map of capabilities/resources the node provides
## Example
```
Handoff.register_node(Node.self(), %{cpu: 4, memory: 8000})
```
"""
def register_node(node, caps) do
Handoff.SimpleResourceTracker.register(node, caps)
end
@doc """
Discovers and registers nodes in the cluster with their capabilities.
## Returns
- `{:ok, discovered}` with a map of node names to their capabilities
"""
def discover_nodes do
Handoff.DistributedExecutor.discover_nodes()
end
@doc """
Registers the local node with its capabilities for distributed execution.
## Parameters
- caps: Map of capabilities provided by this node
## Example
```
Handoff.register_local_node(%{cpu: 8, memory: 16000})
```
"""
def register_local_node(caps) do
Handoff.DistributedExecutor.register_local_node(caps)
end
@doc """
Checks if the specified node has the required resources available.
## Parameters
- node: The node to check
- req: Map of resource requirements to check
## Returns
- true if resources are available
- false otherwise
## Example
```
Handoff.resources_available?(Node.self(), %{cpu: 2, memory: 4000})
```
"""
def resources_available?(node, req) do
Handoff.SimpleResourceTracker.available?(node, req)
end
@doc """
Stores a function result locally on the origin node for a specific DAG and registers its location.
The result is stored only on the node where it was produced, not broadcast.
## Parameters
- dag_id: The ID of the DAG
- function_id: The ID of the function
- result: The result to store
- origin_node: The node where the result was produced (defaults to current node)
"""
def store_result(dag_id, function_id, result, origin_node \\ Node.self()) do
Handoff.DistributedResultStore.store_distributed(dag_id, function_id, result, origin_node)
end
@doc """
Retrieves a result for a specific DAG, automatically fetching it from its origin node if necessary.
## Parameters
- dag_id: The ID of the DAG
- id: The ID of the result/argument to retrieve
- timeout: Maximum time to wait in milliseconds, defaults to 5000
## Returns
- `{:ok, result}` on success
- `{:error, :timeout}` if the result is not available within the timeout
"""
def get_result(dag_id, id, timeout \\ 5000) do
Handoff.DistributedResultStore.get_with_timeout(dag_id, id, timeout)
end
@doc """
Directly stores a value in the local store for a specific DAG.
## Parameters
- dag_id: The ID of the DAG
- id: The ID of the value
- value: The value to store
"""
def store_value(dag_id, id, value) do
Handoff.ResultStore.store(dag_id, id, value)
end
@doc """
Retrieves a value from the local store only for a specific DAG.
## Parameters
- dag_id: The ID of the DAG
- id: The ID of the value to retrieve
## Returns
- `{:ok, value}` if found locally
- `{:error, :not_found}` if not found
"""
def get_local_value(dag_id, id) do
Handoff.ResultStore.get(dag_id, id)
end
@doc """
Retrieves a value for a specific DAG, with automatic remote fetching if needed.
## Parameters
- `dag_id`: The ID of the DAG
- `id`: The ID of the value to retrieve
- `from_node`: Optional specific node to fetch from
## Returns
- `{:ok, value}` if found or successfully fetched
- `{:error, reason}` if retrieval failed
"""
def get_value(dag_id, id, from_node \\ nil) do
Handoff.ResultStore.get_with_fetch(dag_id, id, from_node)
end
@doc """
Registers the location of a data item (argument or result) for a specific DAG.
## Parameters
- dag_id: The ID of the DAG
- data_id: The ID of the data
- node_id: The node where the data is stored
"""
def register_data_location(dag_id, data_id, node_id) do
Handoff.DataLocationRegistry.register(dag_id, data_id, node_id)
end
@doc """
Looks up where a data item (argument or result) is stored for a specific DAG.
## Parameters
- `dag_id`: The ID of the DAG
- `data_id`: The ID of the data to look up
## Returns
- `{:ok, node_id}` if the data location is found
- `{:error, :not_found}` if the data location is not registered
"""
def lookup_data_location(dag_id, data_id) do
Handoff.DataLocationRegistry.lookup(dag_id, data_id)
end
end