defmodule Jido.Runic.ActionNode do
@moduledoc """
A Runic workflow node that wraps a Jido Action module.
ActionNode preserves the full identity and semantics of a Jido Action within
a Runic workflow graph. Unlike wrapping an action as a bare `Runic.Workflow.Step`,
ActionNode retains:
- **Stable identity**: Content-addressed hash based on `{action_mod, params}` — not
an anonymous function reference — so hashes are stable across recompiles.
- **Schema introspection**: Input/output schemas derived from the action module's
NimbleOptions schema, exposed via the `Runic.Component` protocol.
- **Full Jido execution semantics**: Execution delegates to `Jido.Exec.run/4`, which
provides parameter validation, lifecycle hooks, output validation, retries,
compensation, and telemetry.
## Usage
alias Jido.Runic.ActionNode
node = ActionNode.new(MyApp.Actions.ValidateOrder, %{strict: true}, name: :validate)
workflow =
Runic.Workflow.new(name: :pipeline)
|> Runic.Workflow.add(node)
workflow
|> Runic.Workflow.react_until_satisfied(%{order_id: "123"})
|> Runic.Workflow.raw_productions()
## Execution
During Runic's three-phase execution cycle:
1. **Prepare** — Extracts the input fact and builds a `%Runnable{}` with causal context.
2. **Execute** — Merges the fact's value into the node's params and calls
`Jido.Exec.run/4`. Timeout defaults to `0` (inline execution) so that
Runic's scheduler owns concurrency and timeout control.
3. **Apply** — The completed Runnable carries event structs and deferred hook
reducers that `Runic.Workflow.apply_runnable/2` folds back into the graph.
"""
alias Runic.Workflow.Components
defstruct [:name, :hash, :action_mod, :params, :context, :exec_opts, :inputs, :outputs, executor: :local]
@type t :: %__MODULE__{
name: atom(),
hash: integer(),
action_mod: module(),
params: map(),
context: map(),
exec_opts: keyword(),
inputs: keyword(),
outputs: keyword(),
executor: :local | {:child, atom()} | {:child, atom(), term()}
}
@doc """
Creates a new ActionNode wrapping the given Jido Action module.
## Options
- `:name` — Node name used for graph lookups. Defaults to the action module's
last segment, underscored (e.g., `MyApp.Actions.ValidateOrder` → `:validate_order`).
- `:context` — Jido execution context map passed to `Jido.Exec.run/4`. Default `%{}`.
- `:timeout` — Override the Exec timeout. Default `0` (inline, no spawned task) so
Runic's scheduler owns concurrency and time budgeting.
- `:executor` — `:local` (default) or `{:child, tag}` / `{:child, tag, spec}` to
delegate execution; validated at construction time for safety.
- Any other options are forwarded to `Jido.Exec.run/4` as exec opts.
"""
@spec new(module(), map(), keyword()) :: t()
def new(action_mod, params \\ %{}, opts \\ []) do
Code.ensure_loaded(action_mod)
{name, opts} = Keyword.pop_lazy(opts, :name, fn -> derive_name(action_mod) end)
{context, opts} = Keyword.pop(opts, :context, %{})
{executor, opts} = Keyword.pop(opts, :executor, :local)
executor = validate_executor!(executor)
exec_opts = Keyword.put_new(opts, :timeout, 0)
%__MODULE__{
name: name,
hash: Components.fact_hash({:jido_action_node, action_mod, params, name}),
action_mod: action_mod,
params: params,
context: context,
exec_opts: exec_opts,
executor: executor,
inputs: derive_inputs(action_mod),
outputs: derive_outputs(action_mod)
}
end
@doc """
Returns the action module's metadata map if available.
"""
@spec action_metadata(t()) :: map() | nil
def action_metadata(%__MODULE__{action_mod: mod}) do
if function_exported?(mod, :to_json, 0), do: mod.to_json(), else: nil
end
defp validate_executor!(:local), do: :local
defp validate_executor!({:child, tag}) when is_atom(tag), do: {:child, tag}
defp validate_executor!({:child, tag, _spec} = exec) when is_atom(tag), do: exec
defp validate_executor!(other) do
raise ArgumentError,
"invalid :executor option for ActionNode. Expected :local or {:child, tag[, spec]}, got: #{inspect(other)}"
end
defp derive_name(action_mod) do
action_mod
|> Module.split()
|> List.last()
|> Macro.underscore()
|> String.to_atom()
end
defp derive_inputs(action_mod) do
if function_exported?(action_mod, :schema, 0) do
case action_mod.schema() do
schema when is_list(schema) and schema != [] ->
Enum.map(schema, fn {key, key_opts} ->
{key, [type: Keyword.get(key_opts, :type, :any), doc: Keyword.get(key_opts, :doc, "")]}
end)
_ ->
default_inputs()
end
else
default_inputs()
end
end
defp derive_outputs(action_mod) do
if function_exported?(action_mod, :output_schema, 0) do
case action_mod.output_schema() do
schema when is_list(schema) and schema != [] ->
Enum.map(schema, fn {key, key_opts} ->
{key, [type: Keyword.get(key_opts, :type, :any), doc: Keyword.get(key_opts, :doc, "")]}
end)
_ ->
default_outputs()
end
else
default_outputs()
end
end
defp default_inputs, do: [input: [type: :any, doc: "Input to the action"]]
defp default_outputs, do: [result: [type: :any, doc: "Action result"]]
end
defimpl Runic.Workflow.Invokable, for: Jido.Runic.ActionNode do
alias Runic.Workflow
alias Runic.Workflow.{Fact, Runnable, CausalContext, HookRunner}
alias Runic.Workflow.Events.{ActivationConsumed, FactProduced, MapReduceTracked}
@doc """
ActionNode is an execute node — it produces facts (not a gate/predicate).
"""
def match_or_execute(_node), do: :execute
@doc """
High-level invoke that delegates to the 3-phase cycle.
"""
def invoke(%Jido.Runic.ActionNode{} = node, workflow, fact) do
{:ok, runnable} = prepare(node, workflow, fact)
executed = execute(node, runnable)
Workflow.apply_runnable(workflow, executed)
end
@doc """
Phase 1: Extract minimal context from workflow into a Runnable.
"""
def prepare(%Jido.Runic.ActionNode{} = node, workflow, fact) do
fan_out_context = build_fan_out_context(workflow, node, fact)
context =
CausalContext.new(
node_hash: node.hash,
input_fact: fact,
ancestry_depth: Workflow.ancestry_depth(workflow, fact),
hooks: Workflow.get_hooks(workflow, node.hash),
fan_out_context: fan_out_context
)
{:ok, Runnable.new(node, fact, context)}
end
@doc """
Phase 2: Execute the Jido Action via Jido.Exec.run/4.
No workflow access — safe for parallel dispatch. The returned runnable carries
event structs plus deferred hook reducers for Runic's apply phase.
"""
def execute(%Jido.Runic.ActionNode{} = node, %Runnable{input_fact: fact, context: ctx} = runnable) do
with {:ok, before_apply_fns} <- HookRunner.run_before(ctx, node, fact) do
merged_params = Map.merge(node.params, to_params(fact.value), fn _k, _node_val, fact_val -> fact_val end)
case Jido.Exec.run(node.action_mod, merged_params, node.context, node.exec_opts) do
{:ok, result} ->
result_fact = Fact.new(value: result, ancestry: {node.hash, fact.hash})
case HookRunner.run_after(ctx, node, fact, result_fact) do
{:ok, after_apply_fns} ->
events = build_events(node, fact, result_fact, ctx)
hook_fns = before_apply_fns ++ after_apply_fns
Runnable.complete(runnable, result_fact, events, hook_fns)
{:error, reason} ->
Runnable.fail(runnable, {:hook_error, reason})
end
{:ok, result, extra} ->
result_fact =
Fact.new(value: %{result: result, extra: extra}, ancestry: {node.hash, fact.hash})
case HookRunner.run_after(ctx, node, fact, result_fact) do
{:ok, after_apply_fns} ->
events = build_events(node, fact, result_fact, ctx)
hook_fns = before_apply_fns ++ after_apply_fns
Runnable.complete(runnable, result_fact, events, hook_fns)
{:error, reason} ->
Runnable.fail(runnable, {:hook_error, reason})
end
{:error, reason} ->
Runnable.fail(runnable, reason)
end
else
{:error, reason} ->
Runnable.fail(runnable, {:hook_error, reason})
end
end
defp to_params(value) when is_map(value), do: value
defp to_params(value), do: %{input: value}
defp build_events(node, input_fact, result_fact, ctx) do
events = [
%FactProduced{
hash: result_fact.hash,
value: result_fact.value,
ancestry: result_fact.ancestry,
producer_label: :produced,
weight: ctx.ancestry_depth + 1
},
%ActivationConsumed{
fact_hash: input_fact.hash,
node_hash: node.hash,
from_label: :runnable
}
]
case ctx.fan_out_context do
%{is_reduced: true, source_fact_hash: sfh, fan_out_hash: foh, fan_out_fact_hash: fofh} ->
events ++
[
%MapReduceTracked{
source_fact_hash: sfh,
fan_out_hash: foh,
fan_out_fact_hash: fofh,
step_hash: node.hash,
result_fact_hash: result_fact.hash
}
]
_ ->
events
end
end
defp build_fan_out_context(workflow, node, fact) do
if is_reduced_in_map?(workflow, node) do
case find_fan_out_info_from_input(workflow, fact) do
{source_fact_hash, fan_out_hash, fan_out_fact_hash} ->
%{
is_reduced: true,
source_fact_hash: source_fact_hash,
fan_out_hash: fan_out_hash,
fan_out_fact_hash: fan_out_fact_hash
}
nil ->
nil
end
else
nil
end
end
defp find_fan_out_info_from_input(
workflow,
%Fact{ancestry: {producer_hash, parent_fact_hash}} = fact
) do
producer = workflow.graph.vertices[producer_hash]
case producer do
%Runic.Workflow.FanOut{} = fan_out ->
{parent_fact_hash, fan_out.hash, fact.hash}
_ ->
find_fan_out_info(workflow, fact)
end
end
defp find_fan_out_info_from_input(_workflow, _fact), do: nil
defp is_reduced_in_map?(workflow, node) do
MapSet.member?(workflow.mapped.mapped_paths, node.hash)
end
defp find_fan_out_info(workflow, %Fact{ancestry: {_producer_hash, input_fact_hash}}) do
do_find_fan_out_info(workflow, input_fact_hash)
end
defp do_find_fan_out_info(_workflow, nil), do: nil
defp do_find_fan_out_info(workflow, fact_hash) do
fact = workflow.graph.vertices[fact_hash]
case fact do
%Fact{ancestry: {producer_hash, parent_fact_hash}} ->
producer = workflow.graph.vertices[producer_hash]
case producer do
%Runic.Workflow.FanOut{} = fan_out ->
{parent_fact_hash, fan_out.hash, fact_hash}
_ ->
do_find_fan_out_info(workflow, parent_fact_hash)
end
_ ->
nil
end
end
end
defimpl Runic.Component, for: Jido.Runic.ActionNode do
alias Runic.Workflow
def connectable?(_node, _other), do: true
def connect(node, to, workflow) when is_list(to) do
join =
to
|> Enum.map(fn
%{hash: hash} -> hash
other -> other
end)
|> Runic.Workflow.Join.new()
wrk =
Enum.reduce(to, workflow, fn
%{hash: _} = parent, wrk -> Workflow.add_step(wrk, parent, join)
_, wrk -> wrk
end)
wrk
|> Workflow.add_step(join, node)
|> Workflow.draw_connection(node, node, :component_of, properties: %{kind: :action_node})
|> Workflow.register_component(node)
end
def connect(node, to, workflow) do
workflow
|> Workflow.add_step(to, node)
|> Workflow.draw_connection(node, node, :component_of, properties: %{kind: :action_node})
|> Workflow.register_component(node)
end
def source(node) do
quote do
Jido.Runic.ActionNode.new(
unquote(node.action_mod),
unquote(Macro.escape(node.params)),
name: unquote(node.name)
)
end
end
def hash(node), do: node.hash
def inputs(%Jido.Runic.ActionNode{inputs: inputs}), do: inputs
def outputs(%Jido.Runic.ActionNode{outputs: outputs}), do: outputs
end
defimpl Runic.Transmutable, for: Jido.Runic.ActionNode do
alias Runic.Workflow
def transmute(node), do: to_workflow(node)
def to_workflow(%Jido.Runic.ActionNode{} = node) do
Workflow.new(name: node.name)
|> Workflow.add(node)
end
def to_component(node), do: node
end