lib/ash/engine/engine.ex

defmodule Ash.Engine do
  @moduledoc """
  The Ash engine handles the parallelization/running of requests to Ash.

  Much of the complexity of this doesn't come into play for simple requests.
  The way it works is that it accepts a list of `Ash.Engine.Request` structs.
  Some of values on those structs will be instances of `Ash.Engine.Request.UnresolvedField`.
  These unresolved fields can express a dependence on the field values from other requests.
  This allows the engine to wait on executing some code until it has its required inputs,
  or if all of its dependencies are met, it can execute it immediately. The engine's job is
  to resolve its unresolved fields in the proper order, potentially in parallel.
  It also has knowledge baked in about certain special fields, like `data` which is the
  field we are ultimately trying to resolve, and `query` which is the field that drives authorization
  for read requests. Authorization is done on a *per engine request* basis.

  As the complexity of a system grows, it becomes very difficult to write code that
  is both imperative and performant. This is especially true of a framework that is
  designed to be configurable. What exactly is done, as well as the order it is done in,
  and whether or not is can be parallelized, varies wildly based on factors like how
  the resources are configured and what capabilities the data layer has. By implementing
  a generic "parallel engine", we can let the engine solve that problem. We simply
  have to express the various operations that must happen, and what other pieces of data
  they need in order to happen, and the engine handles the rest.

  There are various tradeoffs in the current design. The original version of the engine started a process
  for each request. While this had the least constrained performance characteristics of all the designs,
  it was problematic for various reasons. The primary reason being that it could deadlock without any
  reasonable way to debug said deadlock because the various states were distributed. The second version
  of the engine introduced a central `Engine` process that helped with some of these issues, but ultimately
  had the same problem. The third (and current) version of the engine is reworked instead to be drastically
  simpler, potentially at the expense of performance for some requests. Instead of starting a process per
  request, it opts to only parallelize the `data` field resolution of fields that are marked as `async?: true`,
  (unlike the previous versions which started a process for the whole request.) Although it does its best
  to prioritize starting any async tasks, it is possible that if some mix of async/sync requests are passed in
  a potentially long running sync task could prevent it from starting an async task, giving this potentially worse
  performance characteristics. In practice, this doesn't really matter because the robust data layers support running
  asynchronously, unless they are in a transaction in which case everything runs serially anyway.

  The current version of the engine can be seen as an event loop that will async some events and yield them. It also
  has support for a concurrency limit (per engine invocation, not globally, although that could now be added much more
  easily). This limit defaults to `2 * schedulers_online`.

  Check out the docs for `Ash.Engine.Request` for some more information. This is a private
  interface at the moment, though, so this documentation is just here to explain how it works
  it is not intended to give you enough information to use the engine directly.
  """

  defstruct [
    :ref,
    :id,
    :resolved_fields,
    :authorize?,
    :actor,
    :verbose?,
    :async?,
    :concurrency_limit,
    # There are no other failure modes, but this is there
    # to express the intent for there to eventually be.
    failure_mode: :complete,
    return_notifications?: false,
    opts: [],
    requests: [],
    data: %{},
    unsent_dependencies: [],
    dependencies_seen: MapSet.new(),
    dependencies: %{},
    reverse_dependencies: %{},
    resource_notifications: [],
    tasks: [],
    errors: [],
    notifications: MapSet.new(),
    pending_tasks: []
  ]

  alias Ash.Engine.Request
  require Logger

  def run(requests, opts \\ []) do
    cond do
      opts[:transaction?] && !opts[:resource] ->
        raise "Engine invoked with `transaction?: true` but no resource, so no transaction could be started."

      opts[:transaction?] && Ash.DataLayer.data_layer_can?(opts[:resource], :transact) ->
        Ash.DataLayer.transaction(
          opts[:resource],
          fn ->
            case do_run(requests, opts) do
              {:ok, result} ->
                result

              {:error, error} ->
                Ash.DataLayer.rollback(opts[:resource], error)
            end
          end,
          opts[:timeout] || :infinity
        )

      true ->
        if !Application.get_env(:ash, :disable_async?) &&
             (is_nil(opts[:resource]) ||
                Ash.DataLayer.data_layer_can?(opts[:resource], :async_engine)) && opts[:timeout] &&
             opts[:timeout] != :infinity && !Ash.DataLayer.in_transaction?(opts[:resource]) do
          task =
            async(fn ->
              do_run(requests, opts)
            end)

          try do
            Task.await(task, opts[:timeout])
          catch
            :exit, {:timeout, {Task, :await, [^task, timeout]}} ->
              {:error, Ash.Error.Invalid.Timeout.exception(timeout: timeout, name: opts[:name])}
          end
        else
          do_run(requests, opts)
        end
    end
    |> case do
      {:ok, %{resource_notifications: resource_notifications} = result} ->
        notifications =
          if opts[:return_notifications?] do
            resource_notifications
          else
            Ash.Notifier.notify(resource_notifications)
          end

        {:ok, %{result | resource_notifications: notifications}}

      {:error, :timeout} ->
        {:error, Ash.Error.Invalid.Timeout.exception(timeout: opts[:timeout], name: opts[:name])}

      other ->
        other
    end
  end

  def do_run(requests, opts \\ []) do
    state =
      %__MODULE__{
        ref: make_ref(),
        id: System.unique_integer([:positive, :monotonic]),
        resolved_fields: %{},
        actor: opts[:actor],
        return_notifications?: opts[:return_notifications?],
        concurrency_limit: System.schedulers_online() * 2,
        authorize?: opts[:authorize?] || false,
        verbose?: opts[:verbose?] || false,
        async?: !Application.get_env(:ash, :disable_async?),
        opts: opts
      }
      |> add_requests(requests)

    log(state, fn ->
      "Engine Starting - #{Enum.map_join(state.requests, ", ", & &1.name)}"
    end)

    case run_to_completion(state) do
      %__MODULE__{errors: []} = result ->
        {:ok,
         result.requests
         |> Enum.reduce(result, &add_data(&2, &1.path, &1.data))}

      state ->
        {:error, state}
    end
  end

  defp add_data(state, path, data) do
    %{
      state
      | data: put_nested_key(state.data, path, data)
    }
  end

  defp set_async(requests) do
    if Application.get_env(:ash, :disable_async?) do
      Enum.map(requests, &Map.put(&1, :async?, false))
    else
      Enum.map(requests, fn request ->
        if must_be_local?(request) do
          %{request | async?: false}
        else
          request
        end
      end)
    end
  end

  @doc false
  def must_be_local?(%{async?: false}), do: true

  def must_be_local?(request) do
    [request.resource | request.touches_resources || []]
    |> Enum.filter(& &1)
    |> Enum.any?(fn resource ->
      (Ash.DataLayer.data_layer_can?(resource, :transact) &&
         Ash.DataLayer.in_transaction?(resource)) ||
        not Ash.DataLayer.data_layer_can?(resource, :async_engine)
    end)
  end

  defp run_to_completion(state) do
    if Enum.all?(state.requests, &(&1.state in [:complete, :error])) do
      log(state, "Engine Complete")
      state
    else
      state
      |> run_iteration()
      |> case do
        ^state ->
          if state.tasks == [] && state.pending_tasks == [] do
            detect_deadlocks(state)

            raise """
            Engine Deadlock! No async tasks and state is the same after iteration.
            #{long_breakdown(state)}
            """
          else
            state
            |> run_iteration()
            |> run_to_completion()
          end

        new_state ->
          run_to_completion(new_state)
      end
    end
  end

  defp errors(state) do
    if state.errors == [] do
      ""
    else
      """
      Errors:
      #{Enum.map_join(state.errors, "\n", fn error -> Exception.format(:error, Ash.Error.to_ash_error(error), stacktrace(error)) end)}
      """
    end
  end

  defp stacktrace(%{stacktraces?: true, stacktrace: %{stacktrace: stacktrace}})
       when not is_nil(stacktrace) do
    stacktrace
  end

  defp stacktrace(_), do: nil

  defp completed_requests(state) do
    state.requests
    |> Enum.filter(&(&1.state == :complete))
    |> case do
      [] ->
        ""

      requests ->
        """
        Complete Requests:
        #{Enum.map_join(requests, "\n\n", fn request -> Request.summarize(request) end)}
        """
    end
  end

  defp errored_requests(state) do
    state.requests
    |> Enum.filter(&(&1.state == :error))
    |> case do
      [] ->
        ""

      requests ->
        """
        Errored Requests:
        #{Enum.map_join(requests, "\n\n", fn request -> Request.summarize(request) end)}
        """
    end
  end

  defp pending_requests(state) do
    state.requests
    |> Enum.reject(&(&1.state in [:complete, :error]))
    |> case do
      [] ->
        ""

      requests ->
        """
        Requests:
        #{Enum.map_join(requests, "\n\n", fn request -> Request.summarize(request) <> "\n  " <> depends_on_summary(request, state) end)}
        """
    end
  end

  defp depends_on_summary(request, state) do
    dependencies = state.dependencies[request.path] || []

    if Enum.empty?(dependencies) do
      " state: #{request.state}"
    else
      " state: #{request.state} | depends on #{Enum.map_join(dependencies, ", ", &name_of(&1, state))}"
    end
  end

  defp name_of({path, dep}, state) do
    "#{inspect(Enum.find(state.requests, &(&1.path == path)).name)}.#{dep}"
  end

  defp run_iteration(%__MODULE__{tasks: tasks} = state) when tasks != [] do
    Enum.reduce(tasks, state, fn task, state ->
      case Task.yield(task, 0) do
        {:ok, {request_path, result}} ->
          state = %{state | tasks: tasks -- [task]}
          request = Enum.find(state.requests, &(&1.path == request_path))

          new_request = %{request | async_fetch_state: {:fetched, result}}

          replace_request(state, new_request)

        nil ->
          state
      end
    end)
  end

  defp run_iteration(
         %__MODULE__{unsent_dependencies: [{request_path, dep} | remaining_unsent_dependencies]} =
           state
       ) do
    state = %{state | unsent_dependencies: remaining_unsent_dependencies}
    request = Enum.find(state.requests, &(&1.path == request_path))
    path = :lists.droplast(dep)
    field = List.last(dep)

    depended_on_request = Enum.find(state.requests, &(&1.path == path))

    cond do
      request.state == :error ->
        state

      depended_on_request.state == :error ->
        case Request.wont_receive(request, path, field) do
          {:stop, :dependency_failed, new_request} ->
            new_request = %{new_request | state: :error}

            state
            |> replace_request(new_request)
            |> notify_error(new_request)
        end

      true ->
        {state, notifications, dependencies} =
          notify_local_request(
            state,
            depended_on_request,
            request,
            field
          )

        state
        |> notify(notifications)
        |> store_dependencies(dependencies)
    end
  end

  defp run_iteration(state) do
    state = start_pending_tasks(state)

    case Enum.find(state.requests, &match?({:requested, _}, &1.async_fetch_state)) do
      nil ->
        {async, sync} =
          state.requests
          |> Enum.filter(fn request ->
            Enum.empty?(state.dependencies[request.path] || [])
          end)
          |> Enum.split_with(& &1.async?)

        {state, do_sync?} =
          Enum.reduce(async, {state, true}, fn
            request, {state, false} ->
              {do_run_iteration(state, request), false}

            request, {state, true} ->
              new_state = do_run_iteration(state, request)
              # We only want to process synchronous requests once all asynchronous requests
              # have done all of their work.
              {new_state, state == new_state}
          end)

        if do_sync? do
          Enum.reduce(sync, state, &do_run_iteration(&2, &1))
        else
          state
        end

      request ->
        {_, resolver_context} = request.async_fetch_state
        request = %{request | async_fetch_state: :fetching}

        if Enum.count(state.tasks) >= state.concurrency_limit do
          pending_task = fn ->
            {request.path, request.data.resolver.(resolver_context)}
          end

          %{state | pending_tasks: [pending_task | state.pending_tasks]}
        else
          task =
            async(fn ->
              {request.path, request.data.resolver.(resolver_context)}
            end)

          %{state | tasks: [task | state.tasks]}
        end
    end
  end

  defp async(func) do
    ash_context = Ash.get_context_for_transfer()

    Task.async(fn ->
      Ash.transfer_context(ash_context)

      func.()
    end)
  end

  defp do_run_iteration(state, request) do
    log(state, fn -> breakdown(state) end)
    {state, notifications, dependencies} = fully_advance_request(state, request)

    state
    |> notify(notifications)
    |> store_dependencies(dependencies)
  end

  defp detect_deadlocks(state) do
    state.dependencies
    |> Enum.each(fn {path, deps} ->
      case Enum.find_value(deps, &depends_on(state, &1, path)) do
        nil ->
          :ok

        {circular_dep, path} ->
          raise "Deadlock detected! #{inspect(path)} and #{circular_dep} depend on each other via #{Enum.map_join(path, " -> ", &inspect/1)}"

        circular_dep ->
          raise "Deadlock detected! #{inspect(path)} and #{circular_dep} depend on each other"
      end
    end)

    state
  end

  defp depends_on(state, source, destination, trail \\ []) do
    deps =
      state.dependencies[source]
      |> Kernel.||([])

    if Enum.any?(deps, fn {path, _} -> path == destination end) do
      source
    else
      deps
      |> Enum.reject(&(&1 in trail))
      |> Enum.find(&depends_on(state, &1, destination, [&1 | []]))
      |> case do
        nil ->
          false

        {circular_dep, path} ->
          {source, path ++ circular_dep}

        circular_dep ->
          {source, [circular_dep]}
      end
    end
  end

  defp breakdown(state) do
    """
    State breakdown:
    #{Enum.map_join(state.requests, "\n", &"#{&1.name}: #{&1.state}")}
    """
  end

  def long_breakdown(state) do
    """
    #{errors(state)}
    #{completed_requests(state)}
    #{errored_requests(state)}
    #{pending_requests(state)}
    """
  end

  defp fully_advance_request(state, request) do
    case advance_request(request, state) do
      {:ok, new_request, notifications, dependencies, resource_notification} ->
        new_state =
          state
          |> replace_request(new_request)
          |> add_resource_notification(resource_notification)

        new_dependencies = build_dependencies(new_request, dependencies)

        {new_state, notifications, new_dependencies}

      {:ok, new_request, notifications, dependencies} ->
        new_state = replace_request(state, new_request)

        new_dependencies = build_dependencies(new_request, dependencies)

        {new_state, notifications, new_dependencies}

      {:error, error, new_request} ->
        new_request = %{new_request | state: :error}

        new_state =
          state
          |> add_error(new_request.path, error)
          |> replace_request(new_request)
          |> notify_error(new_request)

        {new_state, [], []}
    end
  end

  defp notify_error(state, new_request) do
    state.reverse_dependencies[new_request.path]
    |> Kernel.||([])
    |> Enum.reduce(state, fn {request_path_that_wont_receive, dep_they_wont_receive}, state ->
      request = Enum.find(state.requests, &(&1.path == request_path_that_wont_receive))

      if request.state == :error do
        state
      else
        case Request.wont_receive(
               request,
               new_request.path,
               dep_they_wont_receive
             ) do
          {:stop, :dependency_failed, new_request} ->
            new_request = %{new_request | state: :error}

            state
            |> notify_error(new_request)
            |> replace_request(new_request)
        end
      end
    end)
  end

  defp start_pending_tasks(%{pending_tasks: []} = state), do: state

  defp start_pending_tasks(state) do
    available_tasks = state.concurrency_limit - Enum.count(state.tasks)

    {to_start, remaining} = Enum.split(state.pending_tasks, available_tasks)

    state = %{state | pending_tasks: remaining}

    new_tasks = Enum.map(to_start, &Task.async/1)

    %{state | tasks: state.tasks ++ new_tasks}
  end

  defp advance_request(%{state: state} = request, _state) when state in [:error] do
    {:ok, request, [], []}
  end

  defp advance_request(request, state) do
    case Request.next(request) do
      {:already_complete, new_request, new_notifications, new_dependencies} ->
        {:ok, new_request, new_notifications, new_dependencies}

      {:complete, new_request, new_notifications, new_dependencies} ->
        if new_request.resource && new_request.notify? do
          resource_notification =
            sanitize_notification(Request.resource_notification(new_request), state)

          {:ok, new_request, new_notifications, new_dependencies, resource_notification}
        else
          {:ok, new_request, new_notifications, new_dependencies}
        end

      {:continue, new_request, new_notifications} ->
        {:ok, new_request, new_notifications, []}

      {:error, error, new_request} ->
        {:error, error, new_request}

      {:wait, new_request, new_notifications, new_dependencies} ->
        {:ok, new_request, new_notifications, new_dependencies}
    end
  end

  defp notify(state, notifications) do
    notifications =
      notifications
      |> Enum.uniq()
      |> Enum.reject(&MapSet.member?(state.notifications, &1))

    new_state_notifications = Enum.reduce(notifications, state.notifications, &MapSet.put(&2, &1))
    state = %{state | notifications: new_state_notifications}

    notifications
    |> Request.sort_and_clean_notifications()
    |> Enum.reduce(state, fn
      {:requests, requests}, state ->
        state
        |> add_requests(requests)

      {:set_extra_data, key, value}, state ->
        %{state | data: Map.put(state.data, key, value)}

      %Ash.Notifier.Notification{} = resource_notification, state ->
        resource_notification = sanitize_notification(resource_notification, state)

        add_resource_notification(state, resource_notification)

      {receiver_path, request_path, field, value}, state ->
        receiver_request = Enum.find(state.requests, &(&1.path == receiver_path))

        {:continue, new_request} =
          Request.receive_field(receiver_request, request_path, field, value)

        state
        |> Map.update!(:dependencies, fn dependencies ->
          Map.update!(dependencies, receiver_request.path, fn deps ->
            MapSet.delete(deps, {request_path, field})
          end)
        end)
        |> Map.update!(:reverse_dependencies, fn dependencies ->
          Map.update!(dependencies, request_path, fn deps ->
            MapSet.delete(deps, {receiver_request.path, field})
          end)
        end)
        |> replace_request(new_request)
    end)
  end

  defp sanitize_notification(resource_notification, state) do
    %{
      resource_notification
      | from: self(),
        metadata:
          Map.merge(
            resource_notification.metadata || %{},
            state.opts[:notification_metadata] || %{}
          )
    }
  end

  defp store_dependencies(state, dependencies) do
    dependencies
    |> Enum.uniq()
    |> Enum.reject(fn {request_path, dep} = path_and_field ->
      MapSet.member?(state.dependencies_seen, path_and_field) ||
        dep in (state.dependencies[request_path] || [])
    end)
    |> Enum.reduce(
      state,
      fn {request_path, dep} = seen_dep, state ->
        dep_path = :lists.droplast(dep)
        dep_field = List.last(dep)

        depended_on_request = Enum.find(state.requests, &(&1.path == dep_path))

        if !depended_on_request do
          raise "Engine Error in request #{inspect(request_path)}: No request found with path #{inspect(dep_path)}. Available paths:\n #{Enum.map_join(state.requests, "\n", &inspect(&1.path))}"
        end

        # we want to send things from non async requests
        # after we've sent all info to async requests
        unsent_dependencies =
          if depended_on_request.async? do
            state.unsent_dependencies ++ [{request_path, dep}]
          else
            [{request_path, dep} | state.unsent_dependencies]
          end

        %{
          state
          | dependencies:
              state.dependencies
              |> Map.put_new_lazy(request_path, fn -> MapSet.new() end)
              |> Map.update!(request_path, &MapSet.put(&1, {dep_path, dep_field})),
            reverse_dependencies:
              state.reverse_dependencies
              |> Map.put_new_lazy(dep_path, fn -> MapSet.new() end)
              |> Map.update!(dep_path, &MapSet.put(&1, {request_path, dep_field})),
            dependencies_seen: MapSet.put(state.dependencies_seen, seen_dep),
            unsent_dependencies: unsent_dependencies
        }
      end
    )
  end

  defp notify_local_request(
         state,
         depended_on_request,
         request,
         field
       ) do
    case Request.send_field(depended_on_request, request.path, field) do
      {:ok, new_request, new_notifications} ->
        state = replace_request(state, new_request)
        {state, new_notifications, []}

      {:waiting, new_request, new_notifications, new_dependencies} ->
        new_dependencies = build_dependencies(new_request, new_dependencies)

        new_state = replace_request(state, new_request)

        {new_state, new_notifications, new_dependencies}

      {:error, error, new_request} ->
        new_state =
          state
          |> replace_request(%{new_request | state: :error})
          |> add_error(new_request.path, error)

        {new_state, [], []}
    end
  end

  defp add_error(state, path, errors) when is_list(errors) do
    Enum.reduce(errors, state, &add_error(&2, path, &1))
  end

  defp add_error(state, _path, error) do
    error = Ash.Error.to_ash_error(error)

    if error in state.errors do
      state
    else
      %{state | errors: [error | state.errors]}
    end
  end

  defp replace_request(state, request) do
    %{
      state
      | requests:
          Enum.map(state.requests, fn existing_request ->
            if existing_request.path == request.path do
              request
            else
              existing_request
            end
          end)
    }
  end

  defp add_resource_notification(state, resource_notification) do
    if Ash.DataLayer.in_transaction?(resource_notification.resource) ||
         state.return_notifications? do
      %{state | resource_notifications: [resource_notification | state.resource_notifications]}
    else
      unsent = Ash.Notifier.notify(resource_notification)

      %{state | resource_notifications: unsent ++ state.resource_notifications}
    end
  end

  def add_requests(state, requests) do
    requests =
      Enum.reject(requests, fn new_request ->
        Enum.find(state.requests, &(&1.path == new_request.path))
      end)

    {async, non_async} =
      requests
      |> Enum.map(fn request ->
        authorize? = request.authorize? and state.authorize?

        %{
          request
          | authorize?: authorize?,
            authorized?: !authorize?,
            actor: request.actor || state.actor,
            verbose?: request.verbose? || state.verbose?,
            async?: request.async? and state.async?
        }
      end)
      |> Enum.map(&Request.add_initial_authorizer_state/1)
      |> set_async()
      |> Enum.split_with(& &1.async?)

    %{state | requests: async ++ state.requests ++ non_async}
  end

  defp build_dependencies(request, dependencies) do
    Enum.map(dependencies, fn dep ->
      {request.path, dep}
    end)
  end

  defp log(state, message, level \\ :debug)

  defp log(%{verbose?: true, id: id}, message, level) when is_function(message) do
    Logger.log(level, fn -> ["#{id}: ", message.()] end)
  end

  defp log(%{verbose?: true, id: id}, message, level) do
    Logger.log(level, fn -> ["#{id}: ", message] end)
  end

  defp log(_, _, _) do
    false
  end

  def put_nested_key(state, [key], value) do
    Map.put(state, key, value)
  end

  def put_nested_key(state, [key | rest], value) do
    case Map.fetch(state, key) do
      {:ok, nested_state} when is_map(nested_state) ->
        Map.put(state, key, put_nested_key(nested_state, rest, value))

      _ ->
        Map.put(state, key, put_nested_key(%{}, rest, value))
    end
  end

  def put_nested_key(state, key, value) do
    Map.put(state, key, value)
  end
end