Skip to main content

lib/jido/runic/action_node.ex

defmodule Jido.Runic.ActionNode do
  @moduledoc """
  A Runic workflow node that wraps a Jido Action module.

  ActionNode preserves the full identity and semantics of a Jido Action within
  a Runic workflow graph. Unlike wrapping an action as a bare `Runic.Workflow.Step`,
  ActionNode retains:

  - **Stable identity**: Content-addressed hash based on `{action_mod, params}` — not
    an anonymous function reference — so hashes are stable across recompiles.
  - **Schema introspection**: Input/output schemas derived from the action module's
    NimbleOptions schema, exposed via the `Runic.Component` protocol.
  - **Full Jido execution semantics**: Execution delegates to `Jido.Exec.run/4`, which
    provides parameter validation, lifecycle hooks, output validation, retries,
    compensation, and telemetry.

  ## Usage

      alias Jido.Runic.ActionNode

      node = ActionNode.new(MyApp.Actions.ValidateOrder, %{strict: true}, name: :validate)

      workflow =
        Runic.Workflow.new(name: :pipeline)
        |> Runic.Workflow.add(node)

      workflow
      |> Runic.Workflow.react_until_satisfied(%{order_id: "123"})
      |> Runic.Workflow.raw_productions()

  ## Execution

  During Runic's three-phase execution cycle:

  1. **Prepare** — Extracts the input fact and builds a `%Runnable{}` with causal context.
  2. **Execute** — Merges the fact's value into the node's params and calls
     `Jido.Exec.run/4`. Timeout defaults to `0` (inline execution) so that
     Runic's scheduler owns concurrency and timeout control.
  3. **Apply** — The completed Runnable carries event structs and deferred hook
     reducers that `Runic.Workflow.apply_runnable/2` folds back into the graph.
  """

  alias Runic.Workflow.Components

  defstruct [:name, :hash, :action_mod, :params, :context, :exec_opts, :inputs, :outputs, executor: :local]

  @type t :: %__MODULE__{
          name: atom(),
          hash: integer(),
          action_mod: module(),
          params: map(),
          context: map(),
          exec_opts: keyword(),
          inputs: keyword(),
          outputs: keyword(),
          executor: :local | {:child, atom()} | {:child, atom(), term()}
        }

  @doc """
  Creates a new ActionNode wrapping the given Jido Action module.

  ## Options

  - `:name` — Node name used for graph lookups. Defaults to the action module's
    last segment, underscored (e.g., `MyApp.Actions.ValidateOrder` → `:validate_order`).
  - `:context` — Jido execution context map passed to `Jido.Exec.run/4`. Default `%{}`.
  - `:timeout` — Override the Exec timeout. Default `0` (inline, no spawned task) so
    Runic's scheduler owns concurrency and time budgeting.
  - `:executor` — `:local` (default) or `{:child, tag}` / `{:child, tag, spec}` to
    delegate execution; validated at construction time for safety.
  - Any other options are forwarded to `Jido.Exec.run/4` as exec opts.
  """
  @spec new(module(), map(), keyword()) :: t()
  def new(action_mod, params \\ %{}, opts \\ []) do
    Code.ensure_loaded(action_mod)

    {name, opts} = Keyword.pop_lazy(opts, :name, fn -> derive_name(action_mod) end)
    {context, opts} = Keyword.pop(opts, :context, %{})
    {executor, opts} = Keyword.pop(opts, :executor, :local)
    executor = validate_executor!(executor)
    exec_opts = Keyword.put_new(opts, :timeout, 0)

    %__MODULE__{
      name: name,
      hash: Components.fact_hash({:jido_action_node, action_mod, params, name}),
      action_mod: action_mod,
      params: params,
      context: context,
      exec_opts: exec_opts,
      executor: executor,
      inputs: derive_inputs(action_mod),
      outputs: derive_outputs(action_mod)
    }
  end

  @doc """
  Returns the action module's metadata map if available.
  """
  @spec action_metadata(t()) :: map() | nil
  def action_metadata(%__MODULE__{action_mod: mod}) do
    if function_exported?(mod, :to_json, 0), do: mod.to_json(), else: nil
  end

  defp validate_executor!(:local), do: :local
  defp validate_executor!({:child, tag}) when is_atom(tag), do: {:child, tag}
  defp validate_executor!({:child, tag, _spec} = exec) when is_atom(tag), do: exec

  defp validate_executor!(other) do
    raise ArgumentError,
          "invalid :executor option for ActionNode. Expected :local or {:child, tag[, spec]}, got: #{inspect(other)}"
  end

  defp derive_name(action_mod) do
    action_mod
    |> Module.split()
    |> List.last()
    |> Macro.underscore()
    |> String.to_atom()
  end

  defp derive_inputs(action_mod) do
    if function_exported?(action_mod, :schema, 0) do
      case action_mod.schema() do
        schema when is_list(schema) and schema != [] ->
          Enum.map(schema, fn {key, key_opts} ->
            {key, [type: Keyword.get(key_opts, :type, :any), doc: Keyword.get(key_opts, :doc, "")]}
          end)

        _ ->
          default_inputs()
      end
    else
      default_inputs()
    end
  end

  defp derive_outputs(action_mod) do
    if function_exported?(action_mod, :output_schema, 0) do
      case action_mod.output_schema() do
        schema when is_list(schema) and schema != [] ->
          Enum.map(schema, fn {key, key_opts} ->
            {key, [type: Keyword.get(key_opts, :type, :any), doc: Keyword.get(key_opts, :doc, "")]}
          end)

        _ ->
          default_outputs()
      end
    else
      default_outputs()
    end
  end

  defp default_inputs, do: [input: [type: :any, doc: "Input to the action"]]
  defp default_outputs, do: [result: [type: :any, doc: "Action result"]]
end

defimpl Runic.Workflow.Invokable, for: Jido.Runic.ActionNode do
  alias Runic.Workflow
  alias Runic.Workflow.{Fact, Runnable, CausalContext, HookRunner}
  alias Runic.Workflow.Events.{ActivationConsumed, FactProduced, MapReduceTracked}

  @doc """
  ActionNode is an execute node — it produces facts (not a gate/predicate).
  """
  def match_or_execute(_node), do: :execute

  @doc """
  High-level invoke that delegates to the 3-phase cycle.
  """
  def invoke(%Jido.Runic.ActionNode{} = node, workflow, fact) do
    {:ok, runnable} = prepare(node, workflow, fact)
    executed = execute(node, runnable)
    Workflow.apply_runnable(workflow, executed)
  end

  @doc """
  Phase 1: Extract minimal context from workflow into a Runnable.
  """
  def prepare(%Jido.Runic.ActionNode{} = node, workflow, fact) do
    fan_out_context = build_fan_out_context(workflow, node, fact)

    context =
      CausalContext.new(
        node_hash: node.hash,
        input_fact: fact,
        ancestry_depth: Workflow.ancestry_depth(workflow, fact),
        hooks: Workflow.get_hooks(workflow, node.hash),
        fan_out_context: fan_out_context
      )

    {:ok, Runnable.new(node, fact, context)}
  end

  @doc """
  Phase 2: Execute the Jido Action via Jido.Exec.run/4.
  No workflow access — safe for parallel dispatch. The returned runnable carries
  event structs plus deferred hook reducers for Runic's apply phase.
  """
  def execute(%Jido.Runic.ActionNode{} = node, %Runnable{input_fact: fact, context: ctx} = runnable) do
    with {:ok, before_apply_fns} <- HookRunner.run_before(ctx, node, fact) do
      merged_params = Map.merge(node.params, to_params(fact.value), fn _k, _node_val, fact_val -> fact_val end)

      case Jido.Exec.run(node.action_mod, merged_params, node.context, node.exec_opts) do
        {:ok, result} ->
          result_fact = Fact.new(value: result, ancestry: {node.hash, fact.hash})

          case HookRunner.run_after(ctx, node, fact, result_fact) do
            {:ok, after_apply_fns} ->
              events = build_events(node, fact, result_fact, ctx)
              hook_fns = before_apply_fns ++ after_apply_fns
              Runnable.complete(runnable, result_fact, events, hook_fns)

            {:error, reason} ->
              Runnable.fail(runnable, {:hook_error, reason})
          end

        {:ok, result, extra} ->
          result_fact =
            Fact.new(value: %{result: result, extra: extra}, ancestry: {node.hash, fact.hash})

          case HookRunner.run_after(ctx, node, fact, result_fact) do
            {:ok, after_apply_fns} ->
              events = build_events(node, fact, result_fact, ctx)
              hook_fns = before_apply_fns ++ after_apply_fns
              Runnable.complete(runnable, result_fact, events, hook_fns)

            {:error, reason} ->
              Runnable.fail(runnable, {:hook_error, reason})
          end

        {:error, reason} ->
          Runnable.fail(runnable, reason)
      end
    else
      {:error, reason} ->
        Runnable.fail(runnable, {:hook_error, reason})
    end
  end

  defp to_params(value) when is_map(value), do: value
  defp to_params(value), do: %{input: value}

  defp build_events(node, input_fact, result_fact, ctx) do
    events = [
      %FactProduced{
        hash: result_fact.hash,
        value: result_fact.value,
        ancestry: result_fact.ancestry,
        producer_label: :produced,
        weight: ctx.ancestry_depth + 1
      },
      %ActivationConsumed{
        fact_hash: input_fact.hash,
        node_hash: node.hash,
        from_label: :runnable
      }
    ]

    case ctx.fan_out_context do
      %{is_reduced: true, source_fact_hash: sfh, fan_out_hash: foh, fan_out_fact_hash: fofh} ->
        events ++
          [
            %MapReduceTracked{
              source_fact_hash: sfh,
              fan_out_hash: foh,
              fan_out_fact_hash: fofh,
              step_hash: node.hash,
              result_fact_hash: result_fact.hash
            }
          ]

      _ ->
        events
    end
  end

  defp build_fan_out_context(workflow, node, fact) do
    if is_reduced_in_map?(workflow, node) do
      case find_fan_out_info_from_input(workflow, fact) do
        {source_fact_hash, fan_out_hash, fan_out_fact_hash} ->
          %{
            is_reduced: true,
            source_fact_hash: source_fact_hash,
            fan_out_hash: fan_out_hash,
            fan_out_fact_hash: fan_out_fact_hash
          }

        nil ->
          nil
      end
    else
      nil
    end
  end

  defp find_fan_out_info_from_input(
         workflow,
         %Fact{ancestry: {producer_hash, parent_fact_hash}} = fact
       ) do
    producer = workflow.graph.vertices[producer_hash]

    case producer do
      %Runic.Workflow.FanOut{} = fan_out ->
        {parent_fact_hash, fan_out.hash, fact.hash}

      _ ->
        find_fan_out_info(workflow, fact)
    end
  end

  defp find_fan_out_info_from_input(_workflow, _fact), do: nil

  defp is_reduced_in_map?(workflow, node) do
    MapSet.member?(workflow.mapped.mapped_paths, node.hash)
  end

  defp find_fan_out_info(workflow, %Fact{ancestry: {_producer_hash, input_fact_hash}}) do
    do_find_fan_out_info(workflow, input_fact_hash)
  end

  defp do_find_fan_out_info(_workflow, nil), do: nil

  defp do_find_fan_out_info(workflow, fact_hash) do
    fact = workflow.graph.vertices[fact_hash]

    case fact do
      %Fact{ancestry: {producer_hash, parent_fact_hash}} ->
        producer = workflow.graph.vertices[producer_hash]

        case producer do
          %Runic.Workflow.FanOut{} = fan_out ->
            {parent_fact_hash, fan_out.hash, fact_hash}

          _ ->
            do_find_fan_out_info(workflow, parent_fact_hash)
        end

      _ ->
        nil
    end
  end
end

defimpl Runic.Component, for: Jido.Runic.ActionNode do
  alias Runic.Workflow

  def connectable?(_node, _other), do: true

  def connect(node, to, workflow) when is_list(to) do
    join =
      to
      |> Enum.map(fn
        %{hash: hash} -> hash
        other -> other
      end)
      |> Runic.Workflow.Join.new()

    wrk =
      Enum.reduce(to, workflow, fn
        %{hash: _} = parent, wrk -> Workflow.add_step(wrk, parent, join)
        _, wrk -> wrk
      end)

    wrk
    |> Workflow.add_step(join, node)
    |> Workflow.draw_connection(node, node, :component_of, properties: %{kind: :action_node})
    |> Workflow.register_component(node)
  end

  def connect(node, to, workflow) do
    workflow
    |> Workflow.add_step(to, node)
    |> Workflow.draw_connection(node, node, :component_of, properties: %{kind: :action_node})
    |> Workflow.register_component(node)
  end

  def source(node) do
    quote do
      Jido.Runic.ActionNode.new(
        unquote(node.action_mod),
        unquote(Macro.escape(node.params)),
        name: unquote(node.name)
      )
    end
  end

  def hash(node), do: node.hash

  def inputs(%Jido.Runic.ActionNode{inputs: inputs}), do: inputs
  def outputs(%Jido.Runic.ActionNode{outputs: outputs}), do: outputs
end

defimpl Runic.Transmutable, for: Jido.Runic.ActionNode do
  alias Runic.Workflow

  def transmute(node), do: to_workflow(node)

  def to_workflow(%Jido.Runic.ActionNode{} = node) do
    Workflow.new(name: node.name)
    |> Workflow.add(node)
  end

  def to_component(node), do: node
end