lib/ash/flow/executor/ash_engine.ex

defmodule Ash.Flow.Executor.AshEngine do
  @moduledoc """
  Executes the requests using the Ash engine, which can parallelize individual steps when possible.
  """
  use Ash.Flow.Executor
  require Logger
  require Ash.Tracer
  require Ash.Flow.StepHelpers

  defmodule Step do
    @moduledoc false
    defstruct [:step, :input]
  end

  defmodule Flow do
    @moduledoc false
    defstruct [:steps, :flow, :returns]
  end

  def build(flow, input, opts) do
    steps =
      flow
      |> Ash.Flow.Info.steps()
      |> to_steps(input, opts)
      |> hydrate_flows(opts)

    {:ok,
     %Flow{
       steps: steps,
       flow: flow,
       returns: Ash.Flow.Info.returns(flow)
     }}
  end

  defp to_steps(steps, input, opts) do
    Enum.map(steps, fn step ->
      step =
        case step do
          %{steps: steps} = step ->
            step =
              if Map.has_key?(step, :output) && is_nil(step.output) do
                if last_step = List.last(steps) do
                  %{step | output: last_step.name}
                else
                  step
                end
              else
                step
              end

            %{step | steps: to_steps(steps, input, opts)}

          %{tenant: _tenant} = step ->
            %{step | tenant: Keyword.get(opts, :tenant)}

          step ->
            step
        end

      %Step{step: step, input: input}
    end)
  end

  defp hydrate_flows(steps, opts) do
    Enum.flat_map(steps, fn
      %Step{
        input: input,
        step: %Ash.Flow.Step.RunFlow{
          name: name,
          flow: flow,
          input: run_flow_input,
          wait_for: wait_for,
          halt_if: halt_if
        }
      } = run_flow_step ->
        {run_flow_input, _} = Ash.Flow.Template.handle_input_template(run_flow_input, input)
        {:ok, %{steps: hydrated_steps}} = build(flow, run_flow_input, opts)

        built_steps =
          hydrated_steps
          |> then(fn steps ->
            if wait_for do
              map_steps(steps, fn
                %{wait_for: nil} = step ->
                  %{step | wait_for: wait_for}

                %{wait_for: step_wait_for} = step
                when is_list(step_wait_for) and is_list(wait_for) ->
                  %{step | wait_for: wait_for ++ step_wait_for}

                %{wait_for: step_wait_for} = step when is_list(step_wait_for) ->
                  %{step | wait_for: [wait_for | step_wait_for]}

                %{wait_for: step_wait_for} = step ->
                  %{step | wait_for: [wait_for, step_wait_for]}
              end)
            else
              steps
            end
          end)
          |> then(fn steps ->
            if halt_if do
              map_steps(steps, fn
                %{halt_if: nil} = step ->
                  %{step | halt_if: halt_if}

                %{halt_if: step_halt_if} = step ->
                  %{step | halt_if: Ash.Flow.StepHelpers.expr(^step_halt_if or ^halt_if)}
              end)
            else
              steps
            end
          end)
          |> remap_step_names(fn nested_name ->
            List.wrap(name) ++ List.wrap(nested_name)
          end)
          |> handle_input_templates()

        Enum.concat(built_steps, [
          %{run_flow_step | step: %{run_flow_step.step | built: built_steps}}
        ])
        |> handle_input_templates()

      %Step{step: %{steps: steps} = inner_step} = step ->
        [%{step | step: %{inner_step | steps: hydrate_flows(steps, opts)}}]
        |> handle_input_templates()

      step ->
        [step] |> handle_input_templates()
    end)
  end

  @deps_keys [
    :input,
    :over,
    :record,
    :wait_for,
    :halt_if,
    :tenant,
    :condition,
    :resource,
    :api,
    :action
  ]

  defp handle_input_templates(run_flow_steps) do
    run_flow_steps
    |> map_outer_steps(fn %Step{step: step, input: input} = outer_step ->
      new_step =
        Enum.reduce(@deps_keys, step, fn key, step ->
          case Map.fetch(step, key) do
            {:ok, value} ->
              {new_value, _} = Ash.Flow.Template.handle_input_template(value, input)

              Map.put(step, key, new_value)

            :error ->
              step
          end
        end)

      %{outer_step | step: new_step}
    end)
  end

  def execute(%Flow{steps: steps, flow: flow}, _input, opts) do
    steps
    |> Enum.flat_map(&requests(flow, steps, &1, opts))
    |> resume(opts[:resume])
    |> case do
      {:error, error} ->
        {:error, error}

      {:ok, requests} ->
        requests
        |> Ash.Engine.run(
          actor: opts[:actor],
          verbose?: opts[:verbose?],
          timeout: opts[:timeout],
          authorize?: Keyword.get(opts, :authorize?, true),
          name: inspect(flow),
          failure_mode: :continue,
          tracer: opts[:tracer]
        )
        |> case do
          {:ok, %Ash.Engine{data: data, resource_notifications: resource_notifications}} ->
            runner_metadata = %{data: data}

            {:ok, return_value(steps, data, Ash.Flow.Info.returns(flow)),
             %{notifications: resource_notifications, runner_metadata: runner_metadata}}

          {:error,
           %Ash.Engine{errors: errors, data: data, resource_notifications: resource_notifications}} ->
            {:error, %{runner_metadata: %{data: data}, notifications: resource_notifications},
             Ash.Error.to_error_class(errors)}

          {:error, error} ->
            {:error, error}
        end
    end
  end

  defp resume(requests, nil), do: {:ok, requests}

  defp resume(requests, %Ash.Flow.Result{runner_metadata: %{data: data}}) do
    {:ok,
     Enum.map(requests, fn request ->
       case Ash.Flow.do_fetch_in(data, request.path) do
         {:ok, result} ->
           %{request | completion: true, data: result}

         :error ->
           request
       end
     end)}
  end

  defp resume(_, _) do
    {:error, "Could not resume flow because engine data was not present!"}
  end

  defp must_be_local?(%{resource: resource, touches_resources: touches_resources}) do
    Ash.Engine.must_be_local?(%Ash.Engine.Request{
      resource: resource,
      async?: true,
      touches_resources: touches_resources
    })
  end

  defp must_be_local?(_), do: false

  defp transaction_dependencies(
         transaction_name,
         transaction_steps,
         all_steps
       ) do
    Enum.flat_map(transaction_steps, fn step ->
      deps =
        step.step
        |> get_all_deps()
        |> Enum.flat_map(fn {:_result, dep} ->
          case find_step_dep(all_steps, dep, transaction_name, true) do
            nil ->
              []

            step ->
              [result_path(step)]
          end
        end)

      case step do
        %{step: %{steps: steps}} ->
          deps ++ transaction_dependencies(transaction_name, steps, all_steps)

        _ ->
          deps
      end
    end)
  end

  def deps_keys, do: @deps_keys

  defp get_all_deps(step) do
    Enum.flat_map(@deps_keys, fn key ->
      case Map.fetch(step, key) do
        {:ok, value} ->
          {_, deps} = Ash.Flow.Template.handle_input_template(value, %{})
          deps

        :error ->
          []
      end
    end)
  end

  defp return_value(steps, data, returns) do
    case returns do
      name when is_atom(name) ->
        get_return_value(steps, data, name)

      names when is_list(names) ->
        Map.new(names, fn
          {key, name} ->
            {name, get_return_value(steps, data, key)}

          name ->
            {name, get_return_value(steps, data, name)}
        end)
    end
  end

  defp get_return_value(steps, data, name) do
    case find_step_dep(steps, name, nil) do
      nil ->
        nil

      %Ash.Flow.Step.Transaction{name: ^name, output: output} ->
        get_return_value(steps, data, output)

      %Ash.Flow.Step.Transaction{name: transaction_name} ->
        Ash.Flow.do_get_in(data, [transaction_name, name])

      step ->
        Ash.Flow.do_get_in(data, data_path(step))
    end
  end

  defp halt_if(halt_if, reason, step, results, context, otherwise) do
    halt_if =
      halt_if
      |> Ash.Flow.Template.set_dependent_values(%{
        results: results,
        elements: Map.get(context, :_ash_engine_elements)
      })
      |> Ash.Flow.handle_modifiers()

    if halt_if do
      {:error, Ash.Error.Flow.Halted.exception(step: step, reason: reason)}
    else
      otherwise.()
    end
  end

  defp requests(flow, all_steps, step, opts, request_opts \\ []) do
    additional_context = request_opts[:context] || %{}
    transaction_name = request_opts[:transaction_name]

    case step do
      %Step{
        step: %Ash.Flow.Step.Debug{
          name: name,
          input: input,
          wait_for: wait_for,
          halt_if: halt_if,
          halt_reason: halt_reason
        }
      } ->
        {input, deps} = Ash.Flow.Template.handle_input_template(input, %{})
        {_, wait_for_deps} = Ash.Flow.Template.handle_input_template(wait_for, %{})
        {_, halt_if_deps} = Ash.Flow.Template.handle_input_template(halt_if, %{})

        dep_paths =
          get_dep_paths(all_steps, deps, transaction_name, wait_for_deps ++ halt_if_deps)

        request_deps = dependable_request_paths(dep_paths)
        id = System.unique_integer()

        [
          Ash.Engine.Request.new(
            path: [name],
            name: inspect(name),
            async?: false,
            error_path: [name],
            data:
              Ash.Engine.Request.resolve(request_deps, fn context ->
                context = Ash.Helpers.deep_merge_maps(context, additional_context)

                results = results(dep_paths, context)

                halt_if(halt_if, halt_reason, name, results, context, fn ->
                  input =
                    input
                    |> Ash.Flow.Template.set_dependent_values(%{
                      results: results,
                      elements: Map.get(context, :_ash_engine_elements)
                    })
                    |> Ash.Flow.handle_modifiers()

                  output = """
                  Debug Output for: #{inspect(name)} | #{id}

                  #{inspect(input)}
                  """

                  IO.puts(output)

                  {:ok, output}
                end)
              end)
          )
        ]

      %Step{
        step:
          %Ash.Flow.Step.Transaction{
            steps: transaction_steps,
            name: name,
            resource: resource,
            wait_for: wait_for,
            halt_if: halt_if,
            halt_reason: halt_reason,
            touches_resources: touches_resources
          } = transaction
      } ->
        request_name = "Transaction #{inspect(name)}"

        maybe_dynamic(
          [resource, touches_resources],
          request_name,
          [name],
          %{},
          all_steps,
          transaction_name,
          additional_context,
          fn resource, touches_resources ->
            depends_on_requests =
              transaction_dependencies(name, transaction_steps, all_steps) |> Enum.uniq()

            {_, wait_for_deps} = Ash.Flow.Template.handle_input_template(wait_for, %{})
            {_, halt_if_deps} = Ash.Flow.Template.handle_input_template(halt_if, %{})
            dep_paths = get_dep_paths(all_steps, [], name, wait_for_deps ++ halt_if_deps)

            request_deps = dependable_request_paths(dep_paths)

            touches_resources =
              if resource do
                [resource | touches_resources || []]
              else
                touches_resources || []
              end
              |> Enum.filter(&(&1 && is_atom(&1)))

            if touches_resources == [] do
              Logger.error("""
              No resources configured for transaction #{inspect(name)}.
              """)
            end

            [
              Ash.Engine.Request.new(
                path: [name],
                name: request_name,
                async?: Enum.any?(transaction_steps, &must_be_local?/1),
                touches_resources: touches_resources,
                error_path: [name],
                data:
                  Ash.Engine.Request.resolve(depends_on_requests ++ request_deps, fn context ->
                    context = Ash.Helpers.deep_merge_maps(context, additional_context)

                    results = results(dep_paths, context)

                    halt_if(halt_if, halt_reason, name, results, context, fn ->
                      transaction_steps
                      |> Enum.flat_map(
                        &requests(flow, all_steps, &1, opts,
                          context: context,
                          transaction_name: transaction.name
                        )
                      )
                      # TODO: Can we resume a transaction? For now, you cannot.
                      # Either the whole thing fails or the whole thing succeeds.
                      |> case do
                        [] ->
                          {:ok, %{}}

                        [first | rest] ->
                          # only one of the requests needs to be annotated as touching the resources
                          # the transaction claims to touch, since the transaction is run over all touched resources
                          # in all requests
                          [
                            %{
                              first
                              | touches_resources:
                                  Enum.uniq(touches_resources ++ first.touches_resources)
                            }
                            | rest
                          ]
                          |> Ash.Engine.run(
                            transaction_reason: %{
                              type: :flow_transaction,
                              metadata: %{
                                step_name: name,
                                flow: flow
                              }
                            },
                            authorize?: context[:authorize?],
                            resource: resource,
                            name: "Transaction #{inspect(name)}",
                            verbose?: context[:verbose?],
                            tracer: context[:tracer],
                            actor: context[:actor],
                            failure_mode: :stop,
                            transaction?: true
                          )
                          |> case do
                            {:ok, %{data: data, resource_notifications: notifications}} ->
                              if opts[:return_notifications?] do
                                {:ok,
                                 Map.new(transaction_steps, fn step ->
                                   {step.step.name,
                                    Ash.Flow.do_get_in(data, data_path(step.step))}
                                 end), %{notifications: notifications}}
                              else
                                {:ok,
                                 Map.new(transaction_steps, fn step ->
                                   {step.step.name,
                                    Ash.Flow.do_get_in(data, data_path(step.step))}
                                 end)}
                              end

                            {:error, %Ash.Engine{errors: errors}} ->
                              {:error, Ash.Error.to_error_class(errors)}

                            {:error, error} ->
                              {:error, error}
                          end
                      end
                    end)
                  end)
              )
            ]
          end
        )

      %Step{
        step: %Ash.Flow.Step.Branch{
          name: name,
          steps: branch_steps,
          output: output,
          condition: condition,
          wait_for: wait_for,
          halt_if: halt_if,
          halt_reason: halt_reason
        },
        input: input
      } ->
        output = output || List.last(branch_steps).step.name

        {condition, deps} = Ash.Flow.Template.handle_input_template(condition, input)
        {_, wait_for_deps} = Ash.Flow.Template.handle_input_template(wait_for, input)
        {_, halt_if_deps} = Ash.Flow.Template.handle_input_template(halt_if, input)

        dep_paths =
          get_dep_paths(all_steps, deps, transaction_name, wait_for_deps ++ halt_if_deps)

        request_deps = dependable_request_paths(dep_paths)

        [
          Ash.Engine.Request.new(
            path: [name],
            async?: true,
            name: "#{inspect(name)}",
            error_path: List.wrap(name),
            data:
              Ash.Engine.Request.resolve(request_deps, fn context ->
                context = Ash.Helpers.deep_merge_maps(context, additional_context)

                results = results(dep_paths, context)

                halt_if(halt_if, halt_reason, name, results, context, fn ->
                  execute? =
                    condition
                    |> Ash.Flow.Template.set_dependent_values(%{
                      results: results,
                      elements: Map.get(context, :_ash_engine_elements)
                    })
                    |> Ash.Flow.handle_modifiers()

                  output_step = Enum.find(branch_steps, &(&1.step.name == output))

                  if is_nil(output_step) do
                    raise "#{inspect(output)} not found in #{inspect(step_names(branch_steps))}"
                  end

                  output_step = output_step.step
                  output_path = result_path(output_step)
                  result = Ash.Flow.do_fetch_in(context, output_path)

                  if execute? && match?({:ok, _}, result) do
                    result
                  else
                    if execute? do
                      all_steps_with_branch_steps =
                        all_steps
                        |> update_step(name, fn step ->
                          %{step | step: %{step.step | steps: branch_steps}}
                        end)

                      {:requests,
                       branch_steps
                       |> Enum.flat_map(
                         &requests(flow, all_steps_with_branch_steps, &1, opts,
                           context: context,
                           transaction_name: transaction_name
                         )
                       ), [output_path]}
                    else
                      requests =
                        branch_steps
                        |> step_names()
                        |> Enum.map(fn name ->
                          Ash.Engine.Request.new(
                            authorize?: false,
                            async?: false,
                            name: "Set nil result for #{inspect(name)}",
                            path: [name],
                            data: {:ok, nil}
                          )
                        end)

                      {:ok, nil,
                       %{
                         requests: requests
                       }}
                    end
                  end
                end)
              end)
          )
        ]

      %Step{
        step: %Ash.Flow.Step.Map{
          name: name,
          steps: map_steps,
          over: over,
          output: output,
          wait_for: wait_for,
          halt_if: halt_if,
          halt_reason: halt_reason
        },
        input: input
      } ->
        output = output || List.last(map_steps).step.name

        {over, deps} = Ash.Flow.Template.handle_input_template(over, input)
        {_, wait_for_deps} = Ash.Flow.Template.handle_input_template(wait_for, input)
        {_, halt_if_deps} = Ash.Flow.Template.handle_input_template(halt_if, input)

        dep_paths =
          get_dep_paths(all_steps, deps, transaction_name, wait_for_deps ++ halt_if_deps)

        request_deps = dependable_request_paths(dep_paths)

        [
          Ash.Engine.Request.new(
            path: [name],
            async?: true,
            name: "#{inspect(name)}",
            error_path: List.wrap(name),
            data:
              Ash.Engine.Request.resolve(request_deps, fn context ->
                context = Ash.Helpers.deep_merge_maps(context, additional_context)

                results = results(dep_paths, context)

                halt_if(halt_if, halt_reason, name, results, context, fn ->
                  elements =
                    over
                    |> Ash.Flow.Template.set_dependent_values(%{
                      results: results,
                      elements: Map.get(context, :_ash_engine_elements)
                    })
                    |> Ash.Flow.handle_modifiers()
                    |> Kernel.||([])

                  if Enum.empty?(elements) do
                    {:ok, []}
                  else
                    case Ash.Flow.do_get_in(context, [[name, :elements]]) do
                      nil ->
                        elements
                        |> Enum.with_index()
                        |> Enum.flat_map(fn {element, i} ->
                          new_additional_context =
                            additional_context
                            |> Map.put_new(:_ash_engine_elements, %{})
                            |> Map.update!(:_ash_engine_elements, &Map.put(&1, name, element))
                            |> Map.put_new(:_ash_engine_indices, %{})
                            |> Map.update!(:_ash_engine_indices, &Map.put(&1, name, i))

                          map_steps =
                            map_steps
                            |> remap_step_names(&set_index(&1, i))

                          output_name = set_index(output, i)

                          output_step = Enum.find(map_steps, &(&1.step.name == output_name))

                          if is_nil(output_step) do
                            raise "#{inspect(output_name)} not found in #{inspect(step_names(map_steps))}"
                          end

                          output_step = output_step.step
                          output_path = result_path(output_step)

                          all_steps_with_map_steps =
                            all_steps
                            |> update_step(name, fn step ->
                              %{step | step: %{step.step | steps: map_steps}}
                            end)

                          map_steps
                          |> Enum.flat_map(
                            &requests(flow, all_steps_with_map_steps, &1, opts,
                              context: new_additional_context,
                              transaction_name: transaction_name
                            )
                          )
                          |> Kernel.++([
                            {Ash.Engine.Request.new(
                               path: [[name, :elements], i],
                               error_path: [[name, :elements], i],
                               name: "Handle #{inspect(name)} index #{i}",
                               authorize?: false,
                               additional_context: new_additional_context,
                               data:
                                 Ash.Engine.Request.resolve([output_path], fn context ->
                                   {:ok, Ash.Flow.do_get_in(context, output_path)}
                                 end),
                               async?: true
                             ), :data}
                          ])
                        end)
                        |> case do
                          [] ->
                            {:ok, []}

                          requests ->
                            {:requests, requests}
                        end

                      elements ->
                        {:ok,
                         elements
                         |> Enum.sort_by(&elem(&1, 0))
                         |> Enum.map(&elem(&1, 1))
                         |> Enum.map(&Map.get(&1 || %{}, :data))}
                    end
                  end
                end)
              end)
          )
        ]

      %Step{
        step: %Ash.Flow.Step.Custom{
          name: name,
          input: custom_input,
          custom: {mod, opts},
          async?: async?,
          wait_for: wait_for,
          halt_if: halt_if,
          halt_reason: halt_reason
        },
        input: input
      } ->
        {custom_input, deps} = Ash.Flow.Template.handle_input_template(custom_input, input)
        {_, wait_for_deps} = Ash.Flow.Template.handle_input_template(wait_for, input)
        {_, halt_if_deps} = Ash.Flow.Template.handle_input_template(halt_if, input)

        dep_paths =
          get_dep_paths(all_steps, deps, transaction_name, wait_for_deps ++ halt_if_deps)

        request_deps = dependable_request_paths(dep_paths)

        [
          Ash.Engine.Request.new(
            authorize?: false,
            async?: async?,
            name: "Run custom step #{inspect(name)}",
            path: [name],
            error_path: List.wrap(name),
            data:
              Ash.Engine.Request.resolve(request_deps, fn context ->
                context = Ash.Helpers.deep_merge_maps(context, additional_context)

                results = results(dep_paths, context)

                halt_if(halt_if, halt_reason, name, results, context, fn ->
                  custom_input =
                    custom_input
                    |> Ash.Flow.Template.set_dependent_values(%{
                      results: results,
                      elements: Map.get(context, :_ash_engine_elements)
                    })
                    |> Ash.Flow.handle_modifiers()

                  context_arg = Map.take(context, [:actor, :authorize?, :async?, :verbose?])

                  Ash.Tracer.span :custom_flow_step,
                                  "custom step #{inspect(mod)}",
                                  opts[:tracer] do
                    metadata = %{
                      flow_short_name: Ash.Flow.Info.short_name(flow),
                      name: inspect(name),
                      step: inspect(mod)
                    }

                    Ash.Tracer.set_metadata(opts[:tracer], :custom_flow_step, metadata)

                    Ash.Tracer.telemetry_span [:ash, :flow, :custom_step], metadata do
                      mod.run(custom_input, opts, context_arg)
                    end
                  end
                end)
              end)
          )
        ]

      %Step{step: %Ash.Flow.Step.RunFlow{name: name, flow: flow, built: built}} ->
        # No need to do anything with `wait_for` here, because
        # `wait_for` is pushed down into all child steps when we hydrate
        result_dependencies =
          flow
          |> Ash.Flow.Info.returns()
          |> List.wrap()
          |> Enum.map(fn
            {key, _} ->
              key

            other ->
              other
          end)
          |> Enum.map(fn key ->
            {:_result, List.wrap(name) ++ List.wrap(key)}
          end)

        # If you want the result of the run flow step, you only get it
        # when all parts of that flow are complete.
        dep_paths =
          get_dep_paths(
            all_steps,
            result_dependencies,
            transaction_name,
            step_names(built)
          )

        request_deps = dependable_request_paths(dep_paths)

        [
          Ash.Engine.Request.new(
            path: [name],
            name: "Build return value for #{inspect(name)}",
            async?: false,
            error_path: List.wrap(name),
            data:
              Ash.Engine.Request.resolve(request_deps, fn context ->
                case Ash.Flow.Info.returns(flow) do
                  key when is_atom(key) ->
                    {:ok,
                     get_flow_return_value(
                       all_steps,
                       List.wrap(name) ++ [key],
                       context,
                       transaction_name
                     )}

                  list when is_list(list) ->
                    list =
                      Enum.map(list, fn
                        {key, val} ->
                          {key, val}

                        key ->
                          {key, key}
                      end)

                    {:ok,
                     Map.new(list, fn {key, val} ->
                       {val,
                        get_flow_return_value(
                          all_steps,
                          List.wrap(name) ++ [key],
                          context,
                          transaction_name
                        )}
                     end)}
                end
              end)
          )
        ]

      %Step{step: %Ash.Flow.Step.Read{} = read, input: input} ->
        %{
          action: action,
          api: api,
          name: name,
          resource: resource,
          input: action_input,
          tenant: tenant,
          get?: get?,
          not_found_error?: not_found_error?,
          wait_for: wait_for,
          halt_if: halt_if,
          halt_reason: halt_reason
        } = read

        List.wrap(
          maybe_dynamic(
            [resource, action],
            name,
            [name, :data],
            input,
            all_steps,
            transaction_name,
            additional_context,
            fn resource, action ->
              %{
                action: action,
                action_input: action_input,
                resource: resource,
                api: api,
                dep_paths: dep_paths,
                tenant: tenant,
                request_deps: request_deps
              } =
                action_request_info(
                  all_steps,
                  resource,
                  action,
                  api,
                  action_input,
                  input,
                  transaction_name,
                  tenant,
                  [wait_for, halt_if]
                )

              Ash.Actions.Read.as_requests([name], resource, api, action,
                error_path: List.wrap(name),
                authorize?: opts[:authorize?],
                actor: opts[:actor],
                tracer: opts[:tracer],
                query_dependencies: request_deps,
                get?: get? || action.get?,
                not_found_error?: not_found_error?,
                tenant: fn context ->
                  context = Ash.Helpers.deep_merge_maps(context, additional_context)
                  results = results(dep_paths, context)

                  tenant
                  |> Ash.Flow.Template.set_dependent_values(%{
                    results: results,
                    elements: Map.get(context, :_ash_engine_elements)
                  })
                  |> Ash.Flow.handle_modifiers()
                end,
                modify_query: fn query, context ->
                  context = Ash.Helpers.deep_merge_maps(context, additional_context)
                  results = results(dep_paths, context)

                  halt_if(halt_if, halt_reason, name, results, context, fn -> query end)
                end,
                query_input: fn context ->
                  context = Ash.Helpers.deep_merge_maps(context, additional_context)

                  results = results(dep_paths, context)

                  action_input
                  |> Ash.Flow.Template.set_dependent_values(%{
                    results: results,
                    elements: Map.get(context, :_ash_engine_elements)
                  })
                  |> Ash.Flow.handle_modifiers()
                end
              )
            end
          )
        )

      %Step{step: %Ash.Flow.Step.Create{} = create, input: input} ->
        %{
          action: action,
          api: api,
          name: name,
          resource: resource,
          input: action_input,
          tenant: tenant,
          wait_for: wait_for,
          halt_if: halt_if,
          upsert?: upsert?,
          upsert_identity: upsert_identity
        } = create

        List.wrap(
          maybe_dynamic(
            [resource, action],
            name,
            [name, :commit],
            input,
            all_steps,
            transaction_name,
            additional_context,
            fn resource, action ->
              %{
                action: action,
                resource: resource,
                api: api,
                action_input: action_input,
                dep_paths: dep_paths,
                tenant: tenant,
                request_deps: request_deps
              } =
                action_request_info(
                  all_steps,
                  resource,
                  action,
                  api,
                  action_input,
                  input,
                  transaction_name,
                  tenant,
                  [wait_for, halt_if]
                )

              Ash.Actions.Create.as_requests([name], resource, api, action,
                error_path: List.wrap(name),
                authorize?: opts[:authorize?],
                actor: opts[:actor],
                tracer: opts[:tracer],
                upsert?: upsert?,
                upsert_identity: upsert_identity,
                changeset_dependencies: request_deps,
                tenant: fn context ->
                  context = Ash.Helpers.deep_merge_maps(context, additional_context)
                  results = results(dep_paths, context)

                  tenant
                  |> Ash.Flow.Template.set_dependent_values(%{
                    results: results,
                    elements: Map.get(context, :_ash_engine_elements)
                  })
                  |> Ash.Flow.handle_modifiers()
                end,
                changeset_input: fn context ->
                  context = Ash.Helpers.deep_merge_maps(context, additional_context)

                  results = results(dep_paths, context)

                  action_input
                  |> Ash.Flow.Template.set_dependent_values(%{
                    results: results,
                    elements: Map.get(context, :_ash_engine_elements)
                  })
                  |> Ash.Flow.handle_modifiers()
                end
              )
            end
          )
        )

      %Step{step: %Ash.Flow.Step.Validate{only_keys: keys} = validate, input: input} ->
        %{
          action: action,
          name: name,
          resource: resource,
          input: action_input,
          record: record,
          tenant: tenant,
          wait_for: wait_for,
          halt_if: halt_if,
          halt_reason: halt_reason
        } = validate

        %{
          action: action,
          resource: resource,
          action_input: action_input,
          dep_paths: dep_paths,
          tenant: tenant,
          request_deps: request_deps
        } =
          action_request_info(
            all_steps,
            resource,
            action,
            nil,
            action_input,
            input,
            transaction_name,
            tenant,
            [wait_for, halt_if]
          )

        {changeset_deps, get_request} =
          if action.type in [:create, :read] do
            get_request(
              record,
              input,
              all_steps,
              transaction_name,
              name,
              resource,
              additional_context
            )
          else
            {[], nil}
          end

        validate_request =
          Ash.Engine.Request.new(
            path: [name, :validate],
            name: "Fetch record for #{inspect(name)}",
            error_path: List.wrap(name) ++ [:fetch],
            async?: must_be_local?(%{resource: resource}),
            resource: resource,
            authorize?: false,
            data:
              Ash.Engine.Request.resolve(request_deps ++ changeset_deps, fn context ->
                context = Ash.Helpers.deep_merge_maps(context, additional_context)

                results = results(changeset_deps ++ dep_paths, context)

                halt_if(halt_if, halt_reason, name, results, context, fn ->
                  tenant =
                    tenant
                    |> Ash.Flow.Template.set_dependent_values(%{
                      results: results,
                      elements: Map.get(context, :_ash_engine_elements)
                    })
                    |> Ash.Flow.handle_modifiers()

                  input =
                    action_input
                    |> Ash.Flow.Template.set_dependent_values(%{
                      results: results,
                      elements: Map.get(context, :_ash_engine_elements)
                    })
                    |> Ash.Flow.handle_modifiers()

                  result =
                    case action.type do
                      :read ->
                        Ash.Query.for_read(resource, action.name, input,
                          tenant: tenant,
                          actor: context[:actor],
                          authorize?: context[:authorize?],
                          tracer: context[:tracer]
                        )

                      :create ->
                        Ash.Changeset.for_create(resource, action.name, input,
                          tenant: tenant,
                          actor: context[:actor],
                          authorize?: context[:authorize?],
                          tracer: context[:tracer]
                        )

                      type when type in [:update, :destroy] ->
                        record
                        |> Ash.Flow.Template.set_dependent_values(%{
                          results: results,
                          elements: Map.get(context, :_ash_engine_elements)
                        })
                        |> Ash.Flow.handle_modifiers()
                        |> case do
                          %^resource{} = record ->
                            Ash.Changeset.for_action(record, action.name, input,
                              tenant: tenant,
                              actor: context[:actor],
                              authorize?: context[:authorize?],
                              tracer: context[:tracer]
                            )

                          nil ->
                            {:error, "Cannot validate #{action.name} because input is nil"}

                          value ->
                            {:error, "Invalid record #{inspect(value)}"}
                        end
                    end

                  extract_changeset_or_query_errors(result, keys)
                end)
              end)
          )

        if get_request do
          [get_request, validate_request]
        else
          [validate_request]
        end

      %Step{step: %Ash.Flow.Step.Update{} = update, input: input} ->
        %{
          action: action,
          api: api,
          name: name,
          resource: resource,
          input: action_input,
          record: record,
          tenant: tenant,
          wait_for: wait_for,
          halt_if: halt_if,
          halt_reason: halt_reason
        } = update

        List.wrap(
          maybe_dynamic(
            [resource, action],
            name,
            [name, :commit],
            input,
            all_steps,
            transaction_name,
            additional_context,
            fn resource, action ->
              %{
                action: action,
                action_input: action_input,
                resource: resource,
                api: api,
                dep_paths: dep_paths,
                tenant: tenant,
                request_deps: request_deps
              } =
                action_request_info(
                  all_steps,
                  resource,
                  action,
                  api,
                  action_input,
                  input,
                  transaction_name,
                  tenant,
                  [wait_for, halt_if]
                )

              get_request =
                get_request(
                  record,
                  input,
                  all_steps,
                  transaction_name,
                  name,
                  resource,
                  additional_context
                )

              [
                get_request
                | Ash.Actions.Update.as_requests([name], resource, api, action,
                    error_path: List.wrap(name),
                    authorize?: opts[:authorize?],
                    actor: opts[:actor],
                    tracer: opts[:tracer],
                    changeset_dependencies: [[name, :fetch, :data] | request_deps],
                    skip_on_nil_record?: true,
                    modify_changeset: fn changeset, context ->
                      context = Ash.Helpers.deep_merge_maps(context, additional_context)
                      results = results(dep_paths, context)

                      halt_if(halt_if, halt_reason, name, results, context, fn -> changeset end)
                    end,
                    tenant: fn context ->
                      context = Ash.Helpers.deep_merge_maps(context, additional_context)
                      results = results(dep_paths, context)

                      tenant
                      |> Ash.Flow.Template.set_dependent_values(%{
                        results: results,
                        elements: Map.get(context, :_ash_engine_elements)
                      })
                      |> Ash.Flow.handle_modifiers()
                    end,
                    record: fn context ->
                      Ash.Flow.do_get_in(context, [name, :fetch, :data])
                    end,
                    changeset_input: fn context ->
                      context = Ash.Helpers.deep_merge_maps(context, additional_context)

                      results = results(dep_paths, context)

                      action_input
                      |> Ash.Flow.Template.set_dependent_values(%{
                        results: results,
                        elements: Map.get(context, :_ash_engine_elements)
                      })
                      |> Ash.Flow.handle_modifiers()
                    end
                  )
              ]
            end
          )
        )

      %Step{step: %Ash.Flow.Step.Destroy{} = destroy, input: input} ->
        %{
          action: action,
          api: api,
          name: name,
          resource: resource,
          input: action_input,
          record: record,
          tenant: tenant,
          wait_for: wait_for,
          halt_if: halt_if,
          halt_reason: halt_reason
        } = destroy

        List.wrap(
          maybe_dynamic(
            [resource, action],
            name,
            [name, :commit],
            input,
            all_steps,
            transaction_name,
            additional_context,
            fn resource, action ->
              %{
                action: action,
                action_input: action_input,
                resource: resource,
                api: api,
                dep_paths: dep_paths,
                tenant: tenant,
                request_deps: request_deps
              } =
                action_request_info(
                  all_steps,
                  resource,
                  action,
                  api,
                  action_input,
                  input,
                  transaction_name,
                  tenant,
                  [wait_for, halt_if]
                )

              get_request =
                get_request(
                  record,
                  input,
                  all_steps,
                  transaction_name,
                  name,
                  resource,
                  additional_context
                )

              [
                get_request
                | Ash.Actions.Destroy.as_requests([name], resource, api, action,
                    error_path: List.wrap(name),
                    authorize?: opts[:authorize?],
                    actor: opts[:actor],
                    tracer: opts[:tracer],
                    changeset_dependencies: [[name, :fetch, :data] | request_deps],
                    skip_on_nil_record?: true,
                    record: fn context ->
                      Ash.Flow.do_get_in(context, [name, :fetch, :data])
                    end,
                    modify_changeset: fn changeset, context ->
                      context = Ash.Helpers.deep_merge_maps(context, additional_context)
                      results = results(dep_paths, context)

                      halt_if(halt_if, halt_reason, name, results, context, fn -> changeset end)
                    end,
                    tenant: fn context ->
                      context = Ash.Helpers.deep_merge_maps(context, additional_context)
                      results = results(dep_paths, context)

                      tenant
                      |> Ash.Flow.Template.set_dependent_values(%{
                        results: results,
                        elements: Map.get(context, :_ash_engine_elements)
                      })
                      |> Ash.Flow.handle_modifiers()
                    end,
                    changeset_input: fn context ->
                      context = Ash.Helpers.deep_merge_maps(context, additional_context)

                      results = results(dep_paths, context)

                      action_input
                      |> Ash.Flow.Template.set_dependent_values(%{
                        results: results,
                        elements: Map.get(context, :_ash_engine_elements)
                      })
                      |> Ash.Flow.handle_modifiers()
                    end
                  )
              ]
            end
          )
        )
    end
  end

  defp maybe_dynamic(
         items,
         name,
         request_path,
         input,
         all_steps,
         transaction_name,
         additional_context,
         fun
       ) do
    if Ash.Flow.Template.is_template?(items) do
      {items, deps} =
        Enum.reduce(items, {[], []}, fn item, {items, deps} ->
          {item, new_deps} = Ash.Flow.Template.handle_input_template(item, input)
          {[item | items], deps ++ new_deps}
        end)

      dep_paths = get_dep_paths(all_steps, deps, transaction_name, [])
      request_deps = dependable_request_paths(dep_paths)

      Ash.Engine.Request.new(
        authorize?: false,
        async?: false,
        name: "Run dynamic step #{inspect(name)}",
        path: [:dynamic, request_path],
        error_path: [],
        data:
          Ash.Engine.Request.resolve(request_deps, fn context ->
            context = Ash.Helpers.deep_merge_maps(context, additional_context)

            results = results(dep_paths, context)

            items =
              items
              |> Ash.Flow.Template.set_dependent_values(%{
                results: results,
                elements: Map.get(context, :_ash_engine_elements)
              })
              |> Ash.Flow.handle_modifiers()

            requests =
              fun
              |> apply(Enum.reverse(items))
              |> List.wrap()

            {:ok, nil, %{requests: requests}}
          end)
      )
    else
      apply(fun, items)
    end
  end

  defp extract_changeset_or_query_errors(result, keys) do
    keys =
      Enum.map(keys, fn key ->
        if is_list(key) do
          {:lists.droplast(key), List.last(key)}
        else
          {[], key}
        end
      end)

    result.errors
    |> expand_errors()
    |> Enum.filter(&error_applies?(&1, keys))
  end

  defp error_applies?(error, keys) do
    Enum.any?(keys, fn {path, key} ->
      path_matches?(error.path, path ++ [key]) ||
        (path_matches?(error.path || [], path) && field_matches?(error, key))
    end)
  end

  defp field_matches?(error, key) do
    key in List.wrap(Map.get(error, :fields)) || key == Map.get(error, :field)
  end

  defp path_matches?(nil, []), do: true
  defp path_matches?(path, path), do: true
  defp path_matches?(_, _), do: false

  defp expand_errors(errors) when is_list(errors) do
    Enum.flat_map(errors, &expand_errors/1)
  end

  defp expand_errors(%class_mod{} = error)
       when class_mod in [
              Ash.Error.Forbidden,
              Ash.Error.Framework,
              Ash.Error.Invalid,
              Ash.Error.Unkonwn
            ] do
    Enum.flat_map(error.errors, &expand_errors/1)
  end

  defp expand_errors(other), do: List.wrap(other)

  defp set_index(name, i) do
    List.wrap(i) ++ List.wrap(name)
  end

  defp results(dep_paths, context) do
    Map.new(dep_paths, fn {name, %{fetch_path: fetch_path}} ->
      {name, Ash.Flow.do_get_in(context, fetch_path)}
    end)
  end

  defp map_steps(steps, fun) do
    steps
    |> Enum.map(fn %Step{step: inner_step} = step ->
      %{step | step: fun.(inner_step)}
    end)
    |> Enum.map(fn
      %Step{step: %{steps: inner_steps}} = step ->
        %{step | step: %{step.step | steps: map_steps(inner_steps, fun)}}

      step ->
        step
    end)
  end

  defp map_outer_steps(steps, fun) do
    steps
    |> Enum.map(fn step ->
      fun.(step)
    end)
    |> Enum.map(fn
      %Step{step: %{steps: inner_steps}} = step ->
        %{step | step: %{step.step | steps: map_outer_steps(inner_steps, fun)}}

      step ->
        step
    end)
  end

  defp remap_step_names(steps, fun, step_names \\ nil) do
    step_names = step_names || step_names(steps)

    steps
    |> Enum.map(fn
      %Step{step: %{name: name} = inner_step} = step ->
        %{step | step: %{inner_step | name: fun.(name)}}
        |> remap_step(step_names, fun)
    end)
    |> Enum.map(fn
      %Step{step: %{steps: inner_steps}} = step ->
        %{step | step: %{step.step | steps: remap_step_names(inner_steps, fun, step_names)}}

      other ->
        other
    end)
  end

  defp step_names(steps) do
    Enum.flat_map(steps, fn step ->
      case step.step do
        %{name: name, steps: steps} ->
          [name | step_names(steps)]

        %{name: name} ->
          [name]
      end
    end)
  end

  defp update_step(steps, name, func) do
    Enum.map(steps, fn step ->
      if step.step.name == name do
        func.(step)
      else
        case step do
          %{step: %{steps: steps}} ->
            %{step | step: %{step.step | steps: update_step(steps, name, func)}}

          step ->
            step
        end
      end
    end)
  end

  # output needs a config
  @remapped_step_keys [
    input: [],
    over: [],
    output: [
      raw?: true
    ],
    condition: [],
    record: [],
    tenant: [],
    wait_for: [],
    halt_if: []
  ]

  defp remap_step(step, step_names, fun) do
    Enum.reduce(@remapped_step_keys, step, fn {key, config}, step ->
      remap_key(step, step_names, key, config, fun)
    end)
  end

  defp remap_key(%{step: step} = outer_step, step_names, key, config, fun) do
    case Map.fetch(step, key) do
      {:ok, value} ->
        new_value =
          if config[:raw?] do
            if value do
              fun.(value)
            end
          else
            Ash.Flow.Template.remap_result_references(value, fn name ->
              if name in step_names do
                fun.(name)
              else
                name
              end
            end)
          end

        %{outer_step | step: Map.put(step, key, new_value)}

      :error ->
        outer_step
    end
  end

  defp get_flow_return_value(
         steps,
         name,
         data,
         transaction_name,
         in_transaction? \\ false,
         default \\ nil
       ) do
    case find_step_dep(steps, name, transaction_name) do
      nil ->
        default

      %Ash.Flow.Step.Transaction{} = transaction ->
        data =
          if in_transaction? do
            Ash.Flow.do_get_in(data, [transaction.name])
          else
            Ash.Flow.do_get_in(data, [transaction.name, :data])
          end

        get_flow_return_value(transaction.steps, name, data, transaction.name, true, data)

      step ->
        if in_transaction? do
          Ash.Flow.do_get_in(data, [step.name])
        else
          Ash.Flow.do_get_in(data, result_path(%{step | name: name}))
        end
    end
  end

  defp get_request(
         record,
         input,
         all_steps,
         transaction_name,
         name,
         resource,
         additional_context
       ) do
    {record, record_deps} = Ash.Flow.Template.handle_input_template(record, input)

    get_request_dep_paths =
      all_steps
      |> get_dep_paths(record_deps, transaction_name, [])

    get_request_deps = dependable_request_paths(get_request_dep_paths)

    Ash.Engine.Request.new(
      path: [name, :fetch],
      name: "Fetch record for #{inspect(name)}",
      error_path: List.wrap(name) ++ [:fetch],
      async?: must_be_local?(%{resource: resource}),
      resource: resource,
      authorize?: false,
      data:
        Ash.Engine.Request.resolve(get_request_deps, fn context ->
          context = Ash.Helpers.deep_merge_maps(context, additional_context)

          results = results(get_request_dep_paths, context)

          record
          |> Ash.Flow.Template.set_dependent_values(%{
            results: results,
            elements: Map.get(context, :_ash_engine_elements)
          })
          |> Ash.Flow.handle_modifiers()
          |> case do
            %^resource{} = record ->
              {:ok, record}

            nil ->
              {:ok, nil}

            value ->
              {:error, "Invalid record #{inspect(value)}"}
          end
        end)
    )
  end

  defp action_request_info(
         all_steps,
         resource,
         action,
         api,
         action_input,
         input,
         transaction_name,
         tenant,
         additional
       ) do
    {action_input, deps} = Ash.Flow.Template.handle_input_template(action_input, input)
    {tenant, tenant_deps} = Ash.Flow.Template.handle_input_template(tenant, input)
    {_, additional_deps} = Ash.Flow.Template.handle_input_template(additional, input)
    {resource, resource_deps} = Ash.Flow.Template.handle_input_template(resource, input)
    {api, api_deps} = Ash.Flow.Template.handle_input_template(api, input)
    {action, action_deps} = Ash.Flow.Template.handle_input_template(action, input)

    action =
      Ash.Resource.Info.action(resource, action) ||
        raise "No such action #{action} for #{resource}"

    dep_paths =
      get_dep_paths(
        all_steps,
        deps ++ tenant_deps ++ api_deps ++ resource_deps ++ action_deps,
        transaction_name,
        additional_deps
      )

    request_deps = dependable_request_paths(dep_paths)

    %{
      resource: resource,
      api: api,
      action: action,
      action_input: action_input,
      dep_paths: dep_paths,
      tenant: tenant,
      request_deps: request_deps
    }
  end

  defp dependable_request_paths(dep_paths) do
    for {_, %{request_path: request_path, depend?: true}} <- dep_paths do
      request_path
    end
  end

  defp get_dep_paths(all_steps, deps, transaction_name, wait_for) do
    # We favor the result dependencies over the completion dependencies
    Map.merge(
      dep_paths(wait_for, all_steps, transaction_name, :completion),
      dep_paths(deps, all_steps, transaction_name)
    )
  end

  defp dep_paths(deps, steps, transaction_name, result_type \\ :result) do
    deps
    |> Enum.reduce(%{}, fn
      {:_result, dep}, acc ->
        case find_step_dep(steps, dep, transaction_name, false, true) do
          nil ->
            acc

          {step, :no_depend} ->
            add_dep(step, dep, acc, result_type, false)

          step ->
            add_dep(step, dep, acc, result_type, true)
        end

      _, acc ->
        acc
    end)
  end

  defp add_dep(step, dep, acc, type, depend?) do
    case step do
      %Ash.Flow.Step.Transaction{name: name} ->
        request_path =
          if type == :completion do
            [name, :completion]
          else
            [name, :data]
          end

        Map.put(acc, dep, %{
          fetch_path: [name, :data, dep],
          request_path: request_path,
          depend?: depend?
        })

      step ->
        request_path =
          if type == :completion do
            completion_path(step)
          else
            result_path(step)
          end

        Map.put(acc, dep, %{
          fetch_path: request_path,
          request_path: request_path,
          depend?: depend?
        })
    end
  end

  defp find_step_dep(
         steps,
         dep,
         transaction_name,
         stop_at_transaction_name? \\ false,
         no_depend? \\ false
       ) do
    Enum.find_value(steps, fn
      %Step{step: %Ash.Flow.Step.Transaction{name: ^dep} = step} ->
        if transaction_name && no_depend? do
          {step, :no_depend}
        else
          step
        end

      %Step{step: %Ash.Flow.Step.Transaction{name: ^transaction_name}}
      when stop_at_transaction_name? ->
        nil

      %Step{step: %Ash.Flow.Step.Transaction{name: ^transaction_name, steps: steps}} ->
        find_step_dep(steps, dep, nil, stop_at_transaction_name?, no_depend?)

      %Step{step: %Ash.Flow.Step.Transaction{steps: steps} = step} ->
        case find_step_dep(steps, dep, transaction_name, stop_at_transaction_name?, no_depend?) do
          nil ->
            nil

          inner_step ->
            if transaction_name do
              inner_step
            else
              step
            end
        end

      %Step{step: %{name: ^dep} = step} ->
        if transaction_name && no_depend? do
          {step, :no_depend}
        else
          step
        end

      %Step{step: %{steps: steps}} ->
        find_step_dep(steps, dep, transaction_name, stop_at_transaction_name?, no_depend?)

      _ ->
        nil
    end)
  end

  defp result_path(%Ash.Flow.Step.Read{name: name}) do
    [name, :data, :data]
  end

  defp result_path(%Ash.Flow.Step.Create{name: name}) do
    [name, :commit, :data]
  end

  defp result_path(%Ash.Flow.Step.Destroy{name: name}) do
    [name, :commit, :data]
  end

  defp result_path(%Ash.Flow.Step.Validate{name: name}) do
    [name, :validate, :data]
  end

  defp result_path(%Ash.Flow.Step.Update{name: name}) do
    [name, :commit, :data]
  end

  defp result_path(%Ash.Flow.Step.Transaction{name: name}) do
    [name, :data]
  end

  defp result_path(%Ash.Flow.Step.RunFlow{name: name}) do
    [name, :data]
  end

  defp result_path(%Ash.Flow.Step.Branch{name: name}) do
    [name, :data]
  end

  defp result_path(%Ash.Flow.Step.Map{name: name}) do
    [name, :data]
  end

  defp result_path(%Ash.Flow.Step.Custom{name: name}) do
    [name, :data]
  end

  defp result_path(%Ash.Flow.Step.Debug{name: name}) do
    [name, :data]
  end

  defp completion_path(%Ash.Flow.Step.Read{name: name}) do
    [name, :data, :completion]
  end

  defp completion_path(%Ash.Flow.Step.Create{name: name}) do
    [name, :commit, :completion]
  end

  defp completion_path(%Ash.Flow.Step.Validate{name: name}) do
    [name, :commit, :completion]
  end

  defp completion_path(%Ash.Flow.Step.Update{name: name}) do
    [name, :commit, :completion]
  end

  defp completion_path(%Ash.Flow.Step.Destroy{name: name}) do
    [name, :commit, :completion]
  end

  defp completion_path(%Ash.Flow.Step.Transaction{name: name}) do
    [name, :completion]
  end

  defp completion_path(%Ash.Flow.Step.RunFlow{name: name}) do
    [name, :completion]
  end

  defp completion_path(%Ash.Flow.Step.Branch{name: name}) do
    [name, :completion]
  end

  defp completion_path(%Ash.Flow.Step.Map{name: name}) do
    [name, :completion]
  end

  defp completion_path(%Ash.Flow.Step.Custom{name: name}) do
    [name, :completion]
  end

  defp completion_path(%Ash.Flow.Step.Debug{name: name}) do
    [name, :completion]
  end

  defp data_path(%Ash.Flow.Step.Debug{name: name}) do
    [name, :data]
  end

  defp data_path(%Ash.Flow.Step.Transaction{name: name}) do
    [name, :data]
  end

  defp data_path(%Ash.Flow.Step.Read{name: name}) do
    [name, :data]
  end

  defp data_path(%Ash.Flow.Step.Create{name: name}) do
    [name, :commit]
  end

  defp data_path(%Ash.Flow.Step.Destroy{name: name}) do
    [name, :commit]
  end

  defp data_path(%Ash.Flow.Step.Validate{name: name}) do
    [name, :commit]
  end

  defp data_path(%Ash.Flow.Step.Update{name: name}) do
    [name, :commit]
  end

  defp data_path(%Ash.Flow.Step.RunFlow{name: name}) do
    [name]
  end

  defp data_path(%Ash.Flow.Step.Custom{name: name}) do
    [name]
  end

  defp data_path(%Ash.Flow.Step.Branch{name: name}) do
    [name]
  end

  defp data_path(%Ash.Flow.Step.Map{name: name}) do
    [name]
  end
end