lib/ash/query/query.ex

defmodule Ash.Query do
  @moduledoc """
  Utilities around constructing/manipulating ash queries.

  Ash queries are used for read actions and loads, and ultimately
  map to queries to a resource's data layer.

  Queries are run by calling `read` on an API that contains the resource in question

  Examples:

  ```elixir
  MyApp.Post
  |> Ash.Query.filter(likes > 10)
  |> Ash.Query.sort([:title])
  |> MyApp.Api.read!()

  MyApp.Author
  |> Ash.Query.aggregate(:published_post_count, :posts, query: [filter: [published: true]])
  |> Ash.Query.sort(published_post_count: :desc)
  |> Ash.Query.limit(10)
  |> MyApp.Api.read!()

  MyApp.Author
  |> Ash.Query.load([:post_count, :comment_count])
  |> Ash.Query.load(posts: [:comments])
  |> MyApp.Api.read!()
  ```
  """

  defstruct [
    :__validated_for_action__,
    :action,
    :api,
    :distinct,
    :filter,
    :resource,
    :tenant,
    :timeout,
    :lock,
    action_failed?: false,
    after_action: [],
    aggregates: %{},
    arguments: %{},
    before_action: [],
    calculations: %{},
    context: %{},
    errors: [],
    limit: nil,
    load: [],
    offset: 0,
    params: %{},
    phase: :preparing,
    select: nil,
    sort: [],
    valid?: true
  ]

  @type t :: %__MODULE__{
          __validated_for_action__: atom | nil,
          action: Ash.Resource.Actions.Read.t() | nil,
          api: module | nil,
          distinct: [atom],
          filter: Ash.Filter.t() | nil,
          resource: module,
          tenant: any,
          timeout: pos_integer() | nil,
          action_failed?: boolean,
          after_action: [
            (t, [Ash.Resource.record()] ->
               {:ok, [Ash.Resource.record()]}
               | {:ok, [Ash.Resource.record()], [Ash.Notifier.Notification.t()]}
               | {:error, any})
          ],
          aggregates: %{optional(atom) => Ash.Filter.t()},
          arguments: %{optional(atom) => any},
          before_action: [(t -> t)],
          calculations: %{optional(atom) => :wat},
          context: map,
          errors: [Ash.Error.t()],
          limit: nil | non_neg_integer(),
          load: keyword(keyword),
          offset: non_neg_integer(),
          params: %{optional(atom | binary) => any},
          phase: :preparing | :before_action | :after_action | :executing,
          select: nil | [atom],
          sort: [atom | {atom, :asc | :desc}],
          valid?: boolean
        }

  alias Ash.Actions.Sort

  alias Ash.Error.Invalid.TimeoutNotSupported

  alias Ash.Error.Query.{
    AggregatesNotSupported,
    InvalidArgument,
    InvalidCalculationArgument,
    InvalidLimit,
    InvalidOffset,
    NoReadAction,
    ReadActionRequiresActor,
    Required
  }

  alias Ash.Error.Load.{InvalidQuery, NoSuchRelationship}
  alias Ash.Query.{Aggregate, Calculation}

  require Ash.Tracer
  import Ash.Filter.TemplateHelpers, only: [expr?: 1]

  defimpl Inspect do
    import Inspect.Algebra

    def inspect(query, opts) do
      sort? = query.sort != []
      load? = query.load != []
      aggregates? = query.aggregates != %{}
      calculations? = query.calculations != %{}
      limit? = not is_nil(query.limit)
      offset? = not (is_nil(query.offset) || query.offset == 0)
      filter? = not is_nil(query.filter)
      errors? = not Enum.empty?(query.errors)
      tenant? = not is_nil(query.tenant)
      select? = query.select not in [[], nil]
      distinct? = query.distinct not in [[], nil]
      lock? = not is_nil(query.lock)

      container_doc(
        "#Ash.Query<",
        [
          concat("resource: ", inspect(query.resource)),
          or_empty(concat("tenant: ", to_doc(query.tenant, opts)), tenant?),
          arguments(query, opts),
          or_empty(concat("filter: ", to_doc(query.filter, opts)), filter?),
          or_empty(concat("sort: ", to_doc(query.sort, opts)), sort?),
          or_empty(concat("limit: ", to_doc(query.limit, opts)), limit?),
          or_empty(concat("offset: ", to_doc(query.offset, opts)), offset?),
          or_empty(concat("load: ", to_doc(query.load, opts)), load?),
          or_empty(concat("aggregates: ", to_doc(query.aggregates, opts)), aggregates?),
          or_empty(concat("calculations: ", to_doc(query.calculations, opts)), calculations?),
          or_empty(concat("errors: ", to_doc(query.errors, opts)), errors?),
          or_empty(concat("select: ", to_doc(query.select, opts)), select?),
          or_empty(concat("distinct: ", to_doc(query.distinct, opts)), distinct?),
          or_empty(concat("lock: ", to_doc(query.lock, opts)), lock?)
        ],
        ">",
        opts,
        fn str, _ -> str end
      )
    end

    defp arguments(query, opts) do
      if query.action do
        if is_nil(query.action) || Enum.empty?(query.action.arguments) do
          empty()
        else
          arg_string =
            query.action.arguments
            |> Map.new(fn argument ->
              if argument.sensitive? do
                {argument.name, "**redacted**"}
              else
                {argument.name, Ash.Query.get_argument(query, argument.name)}
              end
            end)
            |> to_doc(opts)

          concat(["arguments: ", arg_string])
        end
      else
        empty()
      end
    end

    defp or_empty(value, true), do: value
    defp or_empty(_, false), do: empty()
  end

  defmacrop maybe_already_validated_error!(query) do
    {function, _arity} = __CALLER__.function

    quote do
      query = unquote(query)

      if query.__validated_for_action__ && !query.context[:private][:in_before_action?] do
        IO.warn("""
        Query has already been validated for action #{inspect(query.__validated_for_action__)}.

        For safety, we prevent any changes after that point because they will bypass validations or other action logic.
        However, you should prefer a pattern like the below, which makes any custom modifications *before* calling the action.

          Resource
          |> Ash.Query.new()
          |> Ash.Query.#{unquote(function)}(...)
          |> Ash.Query.for_read(...)
        """)
      end
    end
  end

  @doc """
  Attach a filter statement to the query.

  The filter is applied as an "and" to any filters currently on the query.
  For more information on writing filters, see: `Ash.Filter`.
  """
  defmacro filter(query, %Ash.Filter{} = filter) do
    quote location: :keep do
      Ash.Query.do_filter(unquote(query), unquote(filter))
    end
  end

  defmacro filter(query, nil), do: query
  defmacro filter(query, true), do: query

  defmacro filter(query, false) do
    quote location: :keep do
      Ash.Query.do_filter(unquote(query), false)
    end
  end

  defmacro filter(query, do: body) do
    quote location: :keep do
      Ash.Query.do_filter(unquote(query), unquote(body))
    end
  end

  defmacro filter(query, expression) do
    if Keyword.keyword?(expression) do
      quote location: :keep do
        Ash.Query.do_filter(unquote(query), unquote(expression))
      end
    else
      require Ash.Expr
      expr = Ash.Expr.do_expr(expression)

      quote location: :keep do
        Ash.Query.do_filter(unquote(query), List.wrap(unquote(expr)))
      end
    end
  end

  @doc "Create a new query"
  def new(resource, api \\ nil, opts \\ [])
  def new(%__MODULE__{} = query, _, _opts), do: query

  def new(resource, api, opts) when is_atom(resource) do
    query = %__MODULE__{
      api: api,
      filter: nil,
      resource: resource
    }

    query =
      case Ash.Resource.Info.base_filter(resource) do
        nil ->
          query

        filter ->
          if Keyword.get(opts, :base_filter?, true) do
            filter =
              resource
              |> Ash.Filter.parse!(filter, query.aggregates, query.calculations, query.context)
              |> Ash.Filter.embed_predicates()

            do_filter(query, filter)
          else
            query
          end
      end

    case Ash.Resource.Info.default_context(resource) do
      nil ->
        query

      context ->
        Ash.Query.set_context(query, context)
    end

    context = Process.get(:ash_context, %{}) || %{}

    set_context(query, context)
  end

  @for_read_opts [
    actor: [
      type: :any,
      doc:
        "set the actor, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)"
    ],
    authorize?: [
      type: :boolean,
      doc:
        "set authorize?, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)"
    ],
    authorize?: [
      type: :boolean,
      doc:
        "set tracer, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)"
    ],
    tracer: [
      type: :atom,
      doc:
        "A tracer to use. Will be carried over to the action. For more information see `Ash.Tracer`."
    ],
    tenant: [
      type: :any,
      doc: "set the tenant on the query"
    ]
  ]

  def for_read_opts, do: @for_read_opts

  @doc """
  Creates a query for a given read action and prepares it.

  Multitenancy is *not* validated until an action is called. This allows you to avoid specifying a tenant until just before calling
  the api action.

  ### Arguments
  Provide a map or keyword list of arguments for the read action

  ### Opts

  #{Spark.OptionsHelpers.docs(@for_read_opts)}

  """
  def for_read(query, action_name, args \\ %{}, opts \\ []) do
    query = to_query(query)

    {query, opts} = Ash.Actions.Helpers.add_process_context(query.api, query, opts)

    query =
      query
      |> Map.put(:params, Map.merge(query.params, Map.new(args)))
      |> set_context(Keyword.get(opts, :context, %{}))

    action = Ash.Resource.Info.action(query.resource, action_name, :read)

    if action do
      name = "query:" <> Ash.Resource.Info.trace_name(query.resource) <> ":#{action_name}"

      Ash.Tracer.span :query,
                      name,
                      opts[:tracer] do
        Ash.Tracer.telemetry_span [:ash, :query], %{
          resource_short_name: Ash.Resource.Info.short_name(query.resource)
        } do
          metadata = %{
            resource_short_name: Ash.Resource.Info.short_name(query.resource),
            resource: query.resource,
            actor: opts[:actor],
            tenant: opts[:tenant],
            action: action.name,
            authorize?: opts[:authorize?]
          }

          Ash.Tracer.set_metadata(opts[:tracer], :query, metadata)

          query
          |> Map.put(:action, action)
          |> reset_arguments()
          |> timeout(query.timeout || opts[:timeout])
          |> set_actor(opts)
          |> set_authorize?(opts)
          |> set_tracer(opts)
          |> Ash.Query.set_tenant(opts[:tenant] || query.tenant)
          |> cast_params(action, args)
          |> set_argument_defaults(action)
          |> require_arguments(action)
          |> run_preparations(action, opts[:actor], opts[:authorize?], opts[:tracer], metadata)
          |> add_action_filters(action, opts[:actor])
          |> Map.put(:__validated_for_action__, action_name)
        end
      end
    else
      add_error(query, :action, "No such action #{action_name}")
    end
  end

  def timeout(query, timeout) do
    query = to_query(query)

    if Ash.DataLayer.data_layer_can?(query.resource, :timeout) || is_nil(timeout) do
      %{query | timeout: timeout}
    else
      add_error(query, TimeoutNotSupported.exception(resource: query.resource))
    end
  end

  @doc false
  def set_actor(query, opts) do
    if Keyword.has_key?(opts, :actor) do
      put_context(query, :private, %{actor: opts[:actor]})
    else
      query
    end
  end

  @doc false
  def set_authorize?(query, opts) do
    if Keyword.has_key?(opts, :authorize?) do
      put_context(query, :private, %{authorize?: opts[:authorize?]})
    else
      query
    end
  end

  @doc false
  def set_tracer(query, opts) do
    if Keyword.has_key?(opts, :tracer) do
      put_context(query, :private, %{tracer: opts[:tracer]})
    else
      query
    end
  end

  defp require_arguments(query, action) do
    action.arguments
    |> Enum.filter(&(&1.allow_nil? == false))
    |> Enum.reduce(query, fn argument, query ->
      case fetch_argument(query, argument.name) do
        {:ok, value} when not is_nil(value) ->
          query

        _ ->
          add_error(
            query,
            Required.exception(
              resource: query.resource,
              field: argument.name,
              type: :argument
            )
          )
      end
    end)
  end

  defp set_argument_defaults(query, action) do
    Enum.reduce(action.arguments, query, fn argument, query ->
      case fetch_argument(query, argument.name) do
        :error ->
          if is_nil(argument.default) do
            query
          else
            %{
              query
              | arguments:
                  Map.put(query.arguments, argument.name, argument_default(argument.default))
            }
          end

        _ ->
          query
      end
    end)
  end

  defp cast_params(query, action, args) do
    Enum.reduce(args, query, fn {name, value}, query ->
      if has_argument?(action, name) do
        set_argument(query, name, value)
      else
        query
      end
    end)
  end

  defp has_argument?(action, name) when is_atom(name) do
    Enum.any?(action.arguments, &(&1.private? == false && &1.name == name))
  end

  defp has_argument?(action, name) when is_binary(name) do
    Enum.any?(action.arguments, &(&1.private? == false && to_string(&1.name) == name))
  end

  defp run_preparations(query, action, actor, authorize?, tracer, metadata) do
    query.resource
    |> Ash.Resource.Info.preparations()
    |> Enum.concat(action.preparations || [])
    |> Enum.reduce_while(query, fn %{preparation: {module, opts}}, query ->
      Ash.Tracer.span :preparation, "prepare: #{inspect(module)}", tracer do
        Ash.Tracer.telemetry_span [:ash, :preparation], %{
          resource_short_name: Ash.Resource.Info.short_name(query.resource),
          preparation: inspect(module)
        } do
          Ash.Tracer.set_metadata(opts[:tracer], :preparation, metadata)

          case module.init(opts) do
            {:ok, opts} ->
              opts =
                Ash.Filter.build_filter_from_template(
                  opts,
                  actor,
                  query.arguments,
                  query.context
                )

              case module.prepare(query, opts, %{
                     actor: actor,
                     authorize?: authorize?,
                     tracer: tracer
                   }) do
                %__MODULE__{} = prepared ->
                  {:cont, prepared}

                other ->
                  raise """
                  Invalid value returned from #{inspect(module)}.prepare/3

                  A query must be returned, but the following was received instead:

                  #{inspect(other)}
                  """
              end

            {:error, error} ->
              {:halt, Ash.Query.add_error(query, error)}
          end
        end
      end
    end)
  end

  @spec before_action(
          t(),
          (t() -> t() | {t(), list(Ash.Notifier.Notification.t())})
        ) ::
          t()
  def before_action(query, func) do
    query = to_query(query)
    %{query | before_action: [func | query.before_action]}
  end

  @spec after_action(
          t(),
          (t(), [Ash.Resource.record()] ->
             {:ok, [Ash.Resource.record()]}
             | {:ok, [Ash.Resource.record()], list(Ash.Notifier.Notification.t())}
             | {:error, term})
        ) :: t()
  def after_action(query, func) do
    query = to_query(query)
    %{query | after_action: [func | query.after_action]}
  end

  defp add_action_filters(query, %{filter: nil}, _actor), do: query

  defp add_action_filters(query, action, actor) do
    if Ash.Filter.template_references_actor?(action.filter) and is_nil(actor) do
      Ash.Query.add_error(query, ReadActionRequiresActor.exception([]))
    else
      built_filter =
        Ash.Filter.build_filter_from_template(
          action.filter,
          actor,
          query.arguments,
          query.context
        )

      do_filter(query, built_filter)
    end
  end

  @doc "Returns true if the value is one of the expression structs."
  @deprecated "use Ash.Filter.TemplateHelpers.expr?/1"
  def is_expr?(value), do: expr?(value)

  @doc """
  Creates an Ash expression for evaluation later.
  """
  defmacro expr(body) do
    quote location: :keep do
      require Ash.Expr
      Ash.Expr.expr(unquote(body))
    end
  end

  @doc """
  Ensure that only the specified *attributes* are present in the results.

  The first call to `select/2` will replace the default behavior of selecting
  all attributes. Subsequent calls to `select/2` will combine the provided
  fields unless the `replace?` option is provided with a value of `true`.

  If a field has been deselected, selecting it again will override that (because a single list of fields is tracked for selection)

  Primary key attributes are always selected and cannot be deselected.

  When attempting to load a relationship (or manage it with `Ash.Changeset.manage_relationship/3`),
  if the source field is not selected on the query/provided data an error will be produced. If loading
  a relationship with a query, an error is produced if the query does not select the destination field
  of the relationship.

  Use `ensure_selected/2` if you wish to make sure a field has been selected, without deselecting any other fields.
  """
  def select(query, fields, opts \\ []) do
    query = to_query(query)
    fields = List.wrap(fields)

    {fields, non_existent} =
      Enum.split_with(fields, &Ash.Resource.Info.attribute(query.resource, &1))

    query =
      Enum.reduce(non_existent, query, fn field, query ->
        Ash.Query.add_error(
          query,
          Ash.Error.Query.NoSuchAttribute.exception(resource: query.resource, name: field)
        )
      end)

    always_select =
      query.resource
      |> Ash.Resource.Info.attributes()
      |> Enum.filter(& &1.always_select?)
      |> Enum.map(& &1.name)

    if opts[:replace?] do
      %{
        query
        | select:
            Enum.uniq(fields ++ Ash.Resource.Info.primary_key(query.resource) ++ always_select)
      }
    else
      %{
        query
        | select:
            Enum.uniq(
              fields ++
                (query.select || []) ++
                Ash.Resource.Info.primary_key(query.resource) ++ always_select
            )
      }
    end
  end

  @doc """
  Determines if the filter statement of a query is equivalent to the provided expression.

  This uses the satisfiability solver that is used when solving for policy authorizations. In complex scenarios, or when using
  custom database expressions, (like fragments in ash_postgres), this function may return `:maybe`. Use `supserset_of?` to always return
  a boolean.
  """
  defmacro equivalent_to(query, expr) do
    quote do
      require Ash.Expr
      query = unquote(query)
      expr = unquote(Ash.Expr.do_expr(expr))
      require Ash.Query

      case Ash.Query.superset_of(query, expr) do
        :maybe ->
          :maybe

        true ->
          Ash.Query.subset_of(query, expr)

        false ->
          false
      end
    end
  end

  @doc """
  Same as `equivalent_to/2` but always returns a boolean. `:maybe` returns `false`.
  """
  defmacro equivalent_to?(query, expr) do
    quote do
      Ash.Query.equivalent_to(unquote(query), unquote(expr)) == true
    end
  end

  @doc """
  Determines if the provided expression would return data that is a subset of the data returned by the filter on the query.

  This uses the satisfiability solver that is used when solving for policy authorizations. In complex scenarios, or when using
  custom database expressions, (like fragments in ash_postgres), this function may return `:maybe`. Use `supserset_of?` to always return
  a boolean.
  """
  defmacro superset_of(query, expr) do
    quote do
      query = unquote(query)
      require Ash.Expr
      expr = unquote(Ash.Expr.do_expr(expr))
      left_filter = query.filter

      {:ok, left_expression} =
        Ash.Filter.hydrate_refs(left_filter.expression, %{
          resource: query.resource,
          aggregates: query.aggregates,
          calculations: query.calculations,
          public?: false
        })

      left_filter = %{left_filter | expression: left_expression}

      {:ok, right_expression} =
        Ash.Filter.hydrate_refs(expr, %{
          resource: query.resource,
          aggregates: query.aggregates,
          calculations: query.calculations,
          public?: false
        })

      right_filter = %{left_filter | expression: right_expression}

      Ash.SatSolver.strict_filter_subset(left_filter, right_filter)
    end
  end

  @doc """
  Same as `superset_of/2` but always returns a boolean. `:maybe` returns `false`.
  """
  defmacro superset_of?(query, expr) do
    quote do
      Ash.Query.superset_of(unquote(query), unquote(expr)) == true
    end
  end

  @doc """
  Determines if the provided expression would return data that is a suprset of the data returned by the filter on the query.

  This uses the satisfiability solver that is used when solving for policy authorizations. In complex scenarios, or when using
  custom database expressions, (like fragments in ash_postgres), this function may return `:maybe`. Use `subset_of?` to always return
  a boolean.
  """
  defmacro subset_of(query, expr) do
    quote do
      query = unquote(query)
      require Ash.Expr
      expr = unquote(Ash.Expr.do_expr(expr))
      right_filter = query.filter

      {:ok, right_expression} =
        Ash.Filter.hydrate_refs(right_filter.expression, %{
          resource: query.resource,
          aggregates: query.aggregates,
          calculations: query.calculations,
          public?: false
        })

      right_filter = %{right_filter | expression: right_expression}

      {:ok, left_expression} =
        Ash.Filter.hydrate_refs(expr, %{
          resource: query.resource,
          aggregates: query.aggregates,
          calculations: query.calculations,
          public?: false
        })

      left_filter = %{right_filter | expression: left_expression}
      Ash.SatSolver.strict_filter_subset(left_filter, right_filter)
    end
  end

  @doc """
  Same as `subset_of/2` but always returns a boolean. `:maybe` returns `false`.
  """
  defmacro subset_of?(query, expr) do
    quote do
      Ash.Query.subset_of(unquote(query), unquote(expr)) == true
    end
  end

  @doc """
  Ensures that the given attributes are selected.

  The first call to `select/2` will *limit* the fields to only the provided fields.
  Use `ensure_selected/2` to say "select this field (or these fields) without deselecting anything else".

  See `select/2` for more.
  """
  def ensure_selected(query, fields) do
    query = to_query(query)

    if query.select do
      Ash.Query.select(query, List.wrap(fields))
    else
      to_select =
        query.resource
        |> Ash.Resource.Info.attributes()
        |> Enum.map(& &1.name)

      Ash.Query.select(query, to_select)
    end
  end

  @doc """
  Ensure the the specified attributes are `nil` in the query results.
  """
  def deselect(query, fields) do
    query = to_query(query)

    select =
      if query.select do
        query.select
      else
        query.resource
        |> Ash.Resource.Info.attributes()
        |> Enum.map(& &1.name)
      end

    select = select -- List.wrap(fields)

    select(query, select, replace?: true)
  end

  def selecting?(query, field) do
    case query.select do
      nil ->
        not is_nil(Ash.Resource.Info.attribute(query.resource, field))

      select ->
        if field in select do
          true
        else
          attribute = Ash.Resource.Info.attribute(query.resource, field)

          attribute && (attribute.primary_key? || attribute.private?)
        end
    end
  end

  @doc """
  Returns true if the field/relationship or path to field/relationship is being loaded.

  It accepts an atom or a list of atoms, which is treated for as a "path", i.e:

      Resource |> Ash.Query.load(friends: [enemies: [:score]]) |> Ash.Query.loaded?([:friends, :enemies, :score])
      iex> true

      Resource |> Ash.Query.load(friends: [enemies: [:score]]) |> Ash.Query.loaded?([:friends, :score])
      iex> false

      Resource |> Ash.Query.load(friends: [enemies: [:score]]) |> Ash.Query.loaded?(:friends)
      iex> true
  """
  def loading?(query, [last]) do
    loading?(query, last)
  end

  def loading?(query, [first | rest]) do
    case Keyword.get(query.load || [], first) do
      %Ash.Query{} = next -> loading?(next, rest)
      nil -> false
      other -> raise "Cannot check if loading path #{inspect(rest)} of #{inspect(other)}"
    end
  end

  def loading?(query, item) when is_atom(item) do
    Keyword.has_key?(query.load || [], item) || Map.has_key?(query.calculations, item) ||
      Map.has_key?(query.aggregates, item) ||
      Enum.any?(query.calculations, fn {_, %{calc_name: calc_name}} ->
        calc_name == item
      end) ||
      Enum.any?(query.aggregates, fn {_, %{agg_name: agg_name}} ->
        agg_name == item
      end)
  end

  @doc """
  Loads relationships, calculations, or aggregates on the resource.

  Currently, loading attributes has no effects, as all attributes are returned.
  Before long, we will have the default list to load as the attributes, but if you say
  `load(query, [:attribute1])`, that will be the only field filled in. This will let
  data layers make more intelligent "select" statements as well.


  ```elixir
  # Loading nested relationships
  Ash.Query.load(query, [comments: [:author, :ratings]])

  # Loading relationships with a query
  Ash.Query.load(query, [comments: [author: author_query]])
  ```
  """
  @spec load(
          t() | Ash.Resource.t(),
          atom
          | Ash.Query.Calculation.t()
          | list(atom | Ash.Query.Calculation.t())
          | list({atom | Ash.Query.Calculation.t(), term})
        ) ::
          t()
  def load(query, fields) when not is_list(fields) do
    load(query, List.wrap(fields))
  end

  def load(query, fields) do
    query = to_query(query)

    Enum.reduce(fields, query, fn
      {field, %__MODULE__{} = nested}, query ->
        load_relationship(query, [{field, nested}])

      {field, rest}, query ->
        cond do
          rel = Ash.Resource.Info.relationship(query.resource, field) ->
            nested_query = load(rel.destination, rest)

            load_relationship(query, [{field, nested_query}])

          resource_calculation = Ash.Resource.Info.calculation(query.resource, field) ->
            {name, load} =
              cond do
                Keyword.keyword?(rest) ->
                  case Keyword.fetch(rest, :as) do
                    {:ok, value} ->
                      {value, nil}

                    :error ->
                      {resource_calculation.name, resource_calculation.name}
                  end

                is_map(rest) ->
                  case Map.fetch(rest, :as) do
                    {:ok, value} ->
                      {value, nil}

                    :error ->
                      {resource_calculation.name, resource_calculation.name}
                  end

                true ->
                  {resource_calculation.name, resource_calculation.name}
              end

            {module, opts} = resource_calculation.calculation

            with {:ok, args} <- validate_calculation_arguments(resource_calculation, rest),
                 {:ok, calculation} <-
                   Calculation.new(
                     name,
                     module,
                     opts,
                     {resource_calculation.type, resource_calculation.constraints},
                     Map.put(args, :context, query.context),
                     resource_calculation.filterable?,
                     resource_calculation.load
                   ) do
              calculation =
                select_and_load_calc(resource_calculation, %{calculation | load: load}, query)

              Map.update!(query, :calculations, &Map.put(&1, name, calculation))
            else
              {:error, error} ->
                Ash.Query.add_error(query, :load, error)
            end

          true ->
            add_error(query, :load, Ash.Error.Query.InvalidLoad.exception(load: field))
        end

      field, query ->
        do_load(query, field)
    end)
  end

  @doc false
  def select_and_load_calc(resource_calculation, calculation, query) do
    module = calculation.module
    opts = calculation.opts

    resource_calculation_select =
      if resource_calculation do
        List.wrap(resource_calculation.select)
      else
        []
      end

    resource_calculation_load =
      if resource_calculation do
        List.wrap(resource_calculation.load)
      else
        []
      end

    fields_to_select =
      resource_calculation_select
      |> Enum.concat(module.select(query, opts, calculation.context) || [])
      |> Enum.uniq()
      |> Enum.filter(&Ash.Resource.Info.attribute(query.resource, &1))

    loads =
      module.load(
        query,
        opts,
        Map.put(calculation.context, :context, query.context)
      )
      |> Ash.Actions.Helpers.validate_calculation_load!(module)
      |> Enum.concat(resource_calculation_load)
      |> reify_calculations(query)

    %{calculation | select: fields_to_select, required_loads: loads}
  end

  @doc false
  def reify_calculations(loads, query) do
    loads
    |> List.wrap()
    |> Enum.map(fn
      {load, args} ->
        if resource_calculation = Ash.Resource.Info.calculation(query.resource, load) do
          case resource_calc_to_calc(query, load, resource_calculation, args) do
            {:error, _} ->
              {load, args}

            {:ok, calc} ->
              calc
          end
        else
          if relationship = Ash.Resource.Info.relationship(query.resource, load) do
            related_query = new(relationship.destination)
            {load, reify_calculations(args, related_query)}
          else
            {load, args}
          end
        end

      load ->
        if resource_calculation = Ash.Resource.Info.calculation(query.resource, load) do
          case resource_calc_to_calc(query, load, resource_calculation) do
            {:error, _} ->
              load

            {:ok, calc} ->
              calc
          end
        else
          if relationship = Ash.Resource.Info.relationship(query.resource, load) do
            {load, relationship.destination |> new() |> set_tenant(query.tenant)}
          else
            load
          end
        end
    end)
    |> case do
      [%Ash.Query{} = query] -> query
      other -> other
    end
  end

  @doc false
  def resource_calc_to_calc(query, name, resource_calculation, args \\ %{}) do
    with %{calculation: {module, opts}} <- resource_calculation,
         {:ok, args} <- validate_calculation_arguments(resource_calculation, args),
         {:ok, calculation} <-
           Calculation.new(
             resource_calculation.name,
             module,
             opts,
             {resource_calculation.type, resource_calculation.constraints},
             Map.put(args, :context, query.context),
             resource_calculation.filterable?,
             resource_calculation.load
           ) do
      {:ok, select_and_load_calc(resource_calculation, %{calculation | load: name}, query)}
    end
  end

  defp do_load(query, field) do
    cond do
      match?(%Ash.Query.Calculation{}, field) ->
        Map.update!(
          query,
          :calculations,
          &Map.put(
            &1,
            field.name,
            select_and_load_calc(nil, field, query)
          )
        )

      Ash.Resource.Info.attribute(query.resource, field) ->
        Ash.Query.ensure_selected(query, field)

      Ash.Resource.Info.relationship(query.resource, field) ->
        load_relationship(query, field)

      aggregate = Ash.Resource.Info.aggregate(query.resource, field) ->
        related = Ash.Resource.Info.related(query.resource, aggregate.relationship_path)

        read_action =
          aggregate.read_action || Ash.Resource.Info.primary_action!(related, :read).name

        with {:can?, true} <-
               {:can?,
                Ash.DataLayer.data_layer_can?(query.resource, {:aggregate, aggregate.kind})},
             %{valid?: true} = aggregate_query <-
               Ash.Query.for_read(related, read_action),
             %{valid?: true} = aggregate_query <-
               build(aggregate_query, filter: aggregate.filter, sort: aggregate.sort),
             {:ok, query_aggregate} <-
               Aggregate.new(
                 query.resource,
                 aggregate.name,
                 aggregate.kind,
                 path: aggregate.relationship_path,
                 query: aggregate_query,
                 field: aggregate.field,
                 default: aggregate.default,
                 filterable?: aggregate.filterable?,
                 type: aggregate.type,
                 constraints: aggregate.constraints,
                 implementation: aggregate.implementation,
                 uniq?: aggregate.uniq?,
                 read_action: aggregate.read_action,
                 authorize?: aggregate.authorize?
               ) do
          query_aggregate = %{query_aggregate | load: field}
          new_aggregates = Map.put(query.aggregates, aggregate.name, query_aggregate)

          %{query | aggregates: new_aggregates}
        else
          %{errors: errors} ->
            add_error(
              query,
              :aggregates,
              Ash.Error.to_ash_error(errors, nil,
                error_context: "Loading aggregate: #{inspect(field)} for query: #{inspect(query)}"
              )
            )

          {:error, error} ->
            add_error(
              query,
              :aggregates,
              Ash.Error.to_ash_error(error, nil,
                error_context: "Loading aggregate: #{inspect(field)} for query: #{inspect(query)}"
              )
            )

          {:can?, false} ->
            add_error(
              query,
              :aggregate,
              AggregatesNotSupported.exception(resource: query.resource, feature: "using")
            )
        end

      resource_calculation = Ash.Resource.Info.calculation(query.resource, field) ->
        case resource_calc_to_calc(query, resource_calculation.name, resource_calculation) do
          {:error, error} ->
            add_error(query, :load, error)

          {:ok, calc} ->
            Map.update!(query, :calculations, &Map.put(&1, field, calc))
        end

      true ->
        add_error(query, :load, Ash.Error.Query.InvalidLoad.exception(load: field))
    end
  end

  @doc false
  def validate_calculation_arguments(calculation, args) do
    args =
      if Keyword.keyword?(args) do
        Map.new(args)
      else
        args
      end

    has_one_expr? = Enum.any?(args, fn {_, value} -> expr?(value) end)

    Enum.reduce_while(calculation.arguments, {:ok, %{}}, fn argument, {:ok, arg_values} ->
      value =
        default(
          Map.get(args, argument.name, Map.get(args, to_string(argument.name))),
          argument.default
        )

      cond do
        expr?(value) && argument.allow_expr? ->
          {:cont,
           {:ok,
            Map.put(
              arg_values,
              argument.name,
              expr(type(^value, ^argument.type, ^argument.constraints))
            )}}

        expr?(value) ->
          {:halt,
           {:error,
            InvalidCalculationArgument.exception(
              field: argument.name,
              calculation: calculation.name,
              message: "does not support expressions",
              value: value
            )}}

        is_nil(value) && argument.allow_nil? ->
          {:cont, {:ok, Map.put(arg_values, argument.name, nil)}}

        is_nil(value) ->
          {:halt,
           {:error,
            InvalidCalculationArgument.exception(
              field: argument.name,
              calculation: calculation.name,
              message: "is required",
              value: value
            )}}

        is_nil(Map.get(args, argument.name, Map.get(args, to_string(argument.name)))) &&
            not is_nil(value) ->
          if has_one_expr? do
            {:cont,
             {:ok,
              Map.put(
                arg_values,
                argument.name,
                expr(type(^value, ^argument.type, ^argument.constraints))
              )}}
          else
            {:cont,
             {:ok,
              Map.put(
                arg_values,
                argument.name,
                value
              )}}
          end

        true ->
          with {:ok, casted} <- Ash.Type.cast_input(argument.type, value, argument.constraints),
               {:ok, casted} <-
                 Ash.Type.apply_constraints(argument.type, casted, argument.constraints) do
            if has_one_expr? do
              {:cont,
               {:ok,
                Map.put(
                  arg_values,
                  argument.name,
                  expr(type(^casted, ^argument.type, ^argument.constraints))
                )}}
            else
              {:cont, {:ok, Map.put(arg_values, argument.name, casted)}}
            end
          else
            {:error, error} when is_binary(error) ->
              {:halt,
               {:error,
                InvalidCalculationArgument.exception(
                  field: argument.name,
                  calculation: calculation.name,
                  message: error,
                  value: value
                )}}

            {:error, error} ->
              {:halt, {:error, Ash.Error.to_ash_error(error)}}
          end
      end
    end)
  end

  defp default(nil, {module, function, args}), do: apply(module, function, args)
  defp default(nil, value) when is_function(value, 0), do: value.()
  defp default(nil, value), do: value
  defp default(value, _), do: value

  @doc """
  Sets a specific context key to a specific value

  See `set_context/2` for more information.
  """
  @spec put_context(t() | Ash.Resource.t(), atom, term) :: t()
  def put_context(query, key, value) do
    query = to_query(query)
    set_context(query, %{key => value})
  end

  @doc """
  Set the result of the action. This will prevent running the underlying datalayer behavior
  """
  @spec set_result(t(), term) :: t()
  def set_result(changeset, result) do
    set_context(changeset, %{private: %{action_result: result}})
  end

  @doc """
  Removes a result set previously with `set_result/2`
  """
  @spec clear_result(t()) :: t()
  def clear_result(changeset) do
    %{
      changeset
      | context: Map.update(changeset.context, :private, %{}, &Map.delete(&1, :action_result))
    }
  end

  @doc """
  Merge a map of values into the query context
  """
  @spec set_context(t() | Ash.Resource.t(), map | nil) :: t()
  def set_context(query, nil), do: to_query(query)

  def set_context(query, map) do
    query = to_query(query)

    %{query | context: Ash.Helpers.deep_merge_maps(query.context, map)}
  end

  @doc "Gets the value of an argument provided to the query"
  @spec get_argument(t, atom) :: term
  def get_argument(query, argument) when is_atom(argument) do
    Map.get(query.arguments, argument) || Map.get(query.arguments, to_string(argument))
  end

  @doc "fetches the value of an argument provided to the query or `:error`"
  @spec fetch_argument(t, atom) :: {:ok, term} | :error
  def fetch_argument(query, argument) when is_atom(argument) do
    case Map.fetch(query.arguments, argument) do
      {:ok, value} ->
        {:ok, value}

      :error ->
        case Map.fetch(query.arguments, to_string(argument)) do
          {:ok, value} -> {:ok, value}
          :error -> :error
        end
    end
  end

  @doc """
  Add an argument to the query, which can be used in filter templates on actions
  """
  def set_argument(query, argument, value) do
    maybe_already_validated_error!(query)
    query = to_query(query)

    if query.action do
      argument =
        Enum.find(
          query.action.arguments,
          &(&1.name == argument || to_string(&1.name) == argument)
        )

      with {:arg, argument} when not is_nil(argument) <- {:arg, argument},
           {:ok, casted} <-
             Ash.Type.Helpers.cast_input(argument.type, value, argument.constraints, query),
           {:constrained, {:ok, casted}, argument} when not is_nil(casted) <-
             {:constrained,
              Ash.Type.apply_constraints(argument.type, casted, argument.constraints),
              argument} do
        %{query | arguments: Map.put(query.arguments, argument.name, casted)}
      else
        {:arg, nil} ->
          query

        {:constrained, {:ok, nil}, argument} ->
          %{query | arguments: Map.put(query.arguments, argument.name, nil)}

        {:constrained, {:error, error}, argument} ->
          query = %{query | arguments: Map.put(query.arguments, argument.name, value)}
          add_invalid_errors(value, query, argument, error)

        {:error, error} ->
          query = %{query | arguments: Map.put(query.arguments, argument.name, value)}
          add_invalid_errors(value, query, argument, error)

        :error ->
          query = %{query | arguments: Map.put(query.arguments, argument.name, value)}
          add_invalid_errors(value, query, argument, "is invalid")
      end
    else
      %{query | arguments: Map.put(query.arguments, argument, value)}
    end
  end

  defp reset_arguments(%{arguments: arguments} = query) do
    Enum.reduce(arguments, query, fn {key, value}, query ->
      set_argument(query, key, value)
    end)
  end

  defp add_invalid_errors(value, query, argument, error) do
    messages =
      if Keyword.keyword?(error) do
        [error]
      else
        List.wrap(error)
      end

    messages
    |> Enum.reduce(query, fn message, query ->
      message
      |> Ash.Type.Helpers.error_to_exception_opts(argument)
      |> Enum.reduce(query, fn opts, query ->
        add_error(query, InvalidArgument.exception(Keyword.put(opts, :value, value)))
      end)
    end)
  end

  @doc """
  Remove an argument from the query
  """
  def delete_argument(query, argument_or_arguments) do
    query = to_query(query)

    argument_or_arguments
    |> List.wrap()
    |> Enum.reduce(query, fn argument, query ->
      %{query | arguments: Map.delete(query.arguments, argument)}
    end)
  end

  @doc """
  Merge a map of arguments to the arguments list
  """
  def set_arguments(query, map) do
    query = to_query(query)
    %{query | arguments: Map.merge(query.arguments, map)}
  end

  defp argument_default(value) when is_function(value, 0), do: value.()
  defp argument_default(value), do: value

  def struct?(%_{}), do: true
  def struct?(_), do: false

  @spec set_tenant(t() | Ash.Resource.t(), String.t()) :: t()
  def set_tenant(query, tenant) do
    query = to_query(query)
    %{query | tenant: tenant}
  end

  @doc "Removes a field from the list of fields to load"
  @spec unload(t(), list(atom)) :: t()
  def unload(query, fields) do
    query = to_query(query)

    Enum.reduce(fields, query, fn field, query ->
      case field do
        {field, rest} ->
          new_loads = do_unload_load(query.load, {field, rest})
          %{query | load: new_loads}

        field ->
          do_unload(query, field)
      end
    end)
  end

  defp do_unload(query, field) do
    cond do
      Ash.Resource.Info.attribute(query.resource, field) ->
        query

      Ash.Resource.Info.relationship(query.resource, field) ->
        %{query | load: Keyword.delete(query.load, field)}

      Ash.Resource.Info.aggregate(query.resource, field) ->
        new_aggregates =
          Enum.reduce(query.aggregates, %{}, fn
            {_field, %{load: ^field}}, acc ->
              acc

            {field, aggregate}, acc ->
              Map.put(acc, field, aggregate)
          end)

        %{query | aggregates: new_aggregates}
    end
  end

  defp do_unload_load(%__MODULE__{} = query, unload) do
    %{query | load: do_unload_load(query.load, unload)}
  end

  defp do_unload_load(loads, {field, rest}) do
    Enum.reduce(loads, [], fn
      ^field, acc ->
        acc

      {^field, value}, acc ->
        new_value =
          rest
          |> List.wrap()
          |> Enum.reduce(value, &do_unload_load(&2, &1))

        [{field, new_value} | acc]

      value, acc ->
        [value | acc]
    end)
    |> Enum.reverse()
  end

  defp do_unload_load(loads, field) do
    do_unload_load(loads, {field, []})
  end

  @build_opts [
    filter: [
      type: :any,
      doc: "A filter keyword, expression or %Ash.Filter{}"
    ],
    sort: [
      type: :any,
      doc: "A sort list or keyword"
    ],
    limit: [
      type: :integer,
      doc: "A limit to apply"
    ],
    offset: [
      type: :integer,
      doc: "An offset to apply"
    ],
    load: [
      type: :any,
      doc: "A load statement to add to the query"
    ],
    aggregate: [
      type: :any,
      doc:
        "A custom aggregate to add to the query. Can be `{name, type, relationship}` or `{name, type, relationship, build_opts}`"
    ],
    calculate: [
      type: :any,
      doc:
        "A custom calculation to add to the query. Can be `{name, module_and_opts}` or `{name, module_and_opts, context}`"
    ],
    distinct: [
      type: {:list, :atom},
      doc: "A distinct clause to add to the query"
    ],
    context: [
      type: :map,
      doc: "A map to merge into the query context"
    ]
  ]

  @doc false
  def build_opts, do: @build_opts

  @doc """
  Builds a query from a keyword list.

  This is used by certain query constructs like aggregates. It can also be used to manipulate a data structure
  before passing it to an ash query. It allows for building an entire query struct using only a keyword list.

  For example:

  ```elixir
  Ash.Query.build(MyResource, filter: [name: "fred"], sort: [name: :asc], load: [:foo, :bar], offset: 10)
  ```

  If you want to use the expression style filters, you can use `expr/1`.

  For example:

  ```elixir
  import Ash.Expr, only: [expr: 1]

  Ash.Query.build(Myresource, filter: expr(name == "marge"))
  ```

  ## Options

  #{Spark.OptionsHelpers.docs(@build_opts)}
  """
  @spec build(Ash.Resource.t(), Ash.Api.t() | nil, Keyword.t()) :: t()
  def build(resource, api \\ nil, keyword) do
    Enum.reduce(keyword, new(resource, api), fn
      {:filter, value}, query ->
        do_filter(query, value)

      {:sort, value}, query ->
        sort(query, value)

      {:limit, value}, query ->
        limit(query, value)

      {:offset, value}, query ->
        offset(query, value)

      {:load, value}, query ->
        load(query, value)

      {:distinct, value}, query ->
        distinct(query, value)

      {:lock, lock_type}, query ->
        lock(query, lock_type)

      {:aggregate, {name, type, relationship}}, query ->
        aggregate(query, name, type, relationship)

      {:aggregate, {name, type, relationship, agg_query}}, query ->
        aggregate(query, name, type, relationship, agg_query)

      {:calculate, {name, module_and_opts, type}}, query ->
        calculate(query, name, module_and_opts, type)

      {:calculate, {name, module_and_opts, type, context}}, query ->
        calculate(query, name, module_and_opts, type, context)

      {:select, fields}, query ->
        select(query, fields)

      {:deselect, fields}, query ->
        deselect(query, fields)

      {:ensure_selected, fields}, query ->
        ensure_selected(query, fields)

      {:context, context}, query ->
        set_context(query, context)
    end)
  end

  @doc "Set the query's api, and any loaded query's api"
  def set_api(query, api) do
    query = to_query(query)
    %{query | api: api, load: set_load_api(query.load, api)}
  end

  @doc """
  Adds an aggregation to the query.

  Aggregations are made available on the `aggregates` field of the records returned

  The filter option accepts either a filter or a keyword list of options to supply to build a limiting query for that aggregate.
  See the DSL docs for each aggregate type in `Ash.Resource.Dsl` for more information.

  Options:

    * query: The query over the destination resource to use as a base for aggregation
    * default: The default value to use if the aggregate returns nil
    * filterable?: Wether or not this aggregate may be referenced in filters
    * type: The type of the aggregate
    * constraints: Type constraints for the aggregate's type
    * implementation: An implementation used when the aggregate kind is custom
    * read_action: The read action to use on the destination resource
    * authorize?: Wether or not to authorize access to this aggregate
  """
  def aggregate(query, name, kind, relationship) do
    aggregate(query, name, kind, relationship, [])
  end

  def aggregate(query, name, kind, relationship, opts) when is_list(opts) do
    aggregate(
      query,
      name,
      kind,
      relationship,
      opts[:query],
      opts[:default],
      Keyword.get(opts, :filterable?, true),
      opts[:type],
      Keyword.get(opts, :constraints, []),
      opts[:implementation],
      opts[:uniq?],
      opts[:read_action],
      Keyword.get(opts, :authorize?, true)
    )
  end

  @doc """
  Adds an aggregation to the query.

  Aggregations are made available on the `aggregates` field of the records returned

  The filter option accepts either a filter or a keyword list of options to supply to build a limiting query for that aggregate.
  See the DSL docs for each aggregate type in `Ash.Resource.Dsl` for more information.
  """
  @spec aggregate(
          t() | Ash.Resource.t(),
          atom(),
          Ash.Query.Aggregate.kind(),
          atom | list(atom),
          Ash.Query.t() | Keyword.t() | nil
        ) :: t()
  def aggregate(
        query,
        name,
        kind,
        relationship,
        agg_query,
        default \\ nil,
        filterable? \\ true,
        type \\ nil,
        constraints \\ [],
        implementation \\ nil,
        uniq? \\ false,
        read_action \\ nil,
        authorize? \\ true
      ) do
    {field, agg_query} =
      case agg_query do
        %Ash.Query{} = query ->
          {nil, query}

        agg_query ->
          Keyword.pop(agg_query || [], :field)
      end

    query = to_query(query)
    relationship = List.wrap(relationship)

    if Ash.DataLayer.data_layer_can?(query.resource, {:aggregate, kind}) do
      related = Ash.Resource.Info.related(query.resource, relationship)

      agg_query =
        case agg_query do
          [] ->
            read_action = Ash.Resource.Info.primary_action!(related, :read).name

            related
            |> Ash.Query.for_read(read_action)

          options when is_list(options) ->
            read_action = Ash.Resource.Info.primary_action!(related, :read).name

            related
            |> Ash.Query.for_read(read_action)
            |> build(options)

          %Ash.Query{} = query ->
            query
        end

      case Aggregate.new(
             query.resource,
             name,
             kind,
             path: relationship,
             query: agg_query,
             field: field,
             default: default,
             filterable?: filterable?,
             type: type,
             constraints: constraints,
             implementation: implementation,
             uniq?: uniq?,
             read_action: read_action,
             authorize?: authorize?
           ) do
        {:ok, aggregate} ->
          new_aggregates = Map.put(query.aggregates, aggregate.name, aggregate)

          %{query | aggregates: new_aggregates}

        {:error, error} ->
          add_error(query, :aggregate, error)
      end
    else
      add_error(
        query,
        :aggregate,
        AggregatesNotSupported.exception(resource: query.resource, feature: "using #{kind}")
      )
    end
  end

  @doc """
  Adds a calculation to the query.

  Calculations are made available on the `calculations` field of the records returned

  The `module_and_opts` argument accepts either a `module` or a `{module, opts}`. For more information
  on what that module should look like, see `Ash.Calculation`.
  """
  def calculate(query, name, module_and_opts, type, context \\ %{}, constraints \\ []) do
    query = to_query(query)

    {module, opts} =
      case module_and_opts do
        {module, opts} -> {module, opts}
        module -> {module, []}
      end

    case Calculation.new(
           name,
           module,
           opts,
           {type, constraints},
           Map.put(context, :context, query.context)
         ) do
      {:ok, calculation} ->
        fields_to_select =
          query
          |> module.select(opts, calculation.context)
          |> List.wrap()
          |> Enum.concat(calculation.select)
          |> Enum.filter(&Ash.Resource.Info.attribute(query.resource, &1))

        loads =
          module.load(
            query,
            opts,
            Map.put(calculation.context, :context, query.context)
          )
          |> Ash.Actions.Helpers.validate_calculation_load!(module)
          |> Enum.concat(List.wrap(calculation.required_loads))
          |> reify_calculations(query)

        calculation = %{calculation | select: fields_to_select, required_loads: loads}
        %{query | calculations: Map.put(query.calculations, name, calculation)}

      {:error, error} ->
        add_error(query, :calculations, error)
    end
  end

  @doc """
  Adds a resource calculation to the query as a custom calculation with the provided name.
  """
  def load_calculation_as(query, calc_name, as_name, context \\ %{}) do
    query = to_query(query)

    if resource_calculation = Ash.Resource.Info.calculation(query.resource, calc_name) do
      {module, opts} = resource_calculation.calculation

      with {:ok, args} <- validate_calculation_arguments(resource_calculation, context),
           {:ok, calculation} <-
             Calculation.new(
               resource_calculation.name,
               module,
               opts,
               {resource_calculation.type, resource_calculation.constraints},
               Map.put(args, :context, query.context),
               resource_calculation.filterable?,
               resource_calculation.load
             ) do
        calculation =
          select_and_load_calc(
            resource_calculation,
            %{calculation | load: nil, name: as_name},
            query
          )

        Map.update!(query, :calculations, &Map.put(&1, as_name, calculation))
      else
        {:error, error} ->
          Ash.Query.add_error(query, :load, error)
      end
    else
      Ash.Query.add_error(query, "No such calculation: #{inspect(calc_name)}")
    end
  end

  @doc "Limit the results returned from the query"
  @spec limit(t() | Ash.Resource.t(), nil | integer()) :: t()
  def limit(query, nil), do: to_query(query)

  def limit(query, limit) when is_integer(limit) do
    query = to_query(query)

    if Ash.DataLayer.data_layer_can?(query.resource, :limit) do
      query
      |> Map.put(:limit, max(0, limit))
    else
      add_error(query, :limit, "Data layer does not support limits")
    end
  end

  def limit(query, limit) do
    add_error(query, :offset, InvalidLimit.exception(limit: limit))
  end

  @doc "Skip the first n records"
  @spec offset(t() | Ash.Resource.t(), nil | integer()) :: t()
  def offset(query, nil), do: to_query(query)

  def offset(query, offset) when is_integer(offset) do
    query = to_query(query)

    if Ash.DataLayer.data_layer_can?(query.resource, :offset) do
      query
      |> Map.put(:offset, max(0, offset))
    else
      add_error(query, :offset, "Data layer does not support offset")
    end
  end

  def offset(query, offset) do
    query
    |> to_query()
    |> add_error(:offset, InvalidOffset.exception(offset: offset))
  end

  defp load_relationship(query, statement) do
    query = to_query(query)

    with sanitized_statement <- List.wrap(sanitize_loads(statement)),
         :ok <-
           validate_load(query, sanitized_statement),
         new_loads <- merge_load(query.load, sanitized_statement) do
      %{query | load: new_loads}
    else
      {:error, errors} ->
        Enum.reduce(errors, query, &add_error(&2, &1))
    end
  end

  @doc false
  def validate_load(query, loads, path \\ []) do
    case do_validate_load(query, loads, path) do
      [] -> :ok
      errors -> {:error, errors}
    end
  end

  defp do_validate_load(_query, %Ash.Query{} = load_query, path) do
    case load_query.errors do
      [] ->
        []

      errors ->
        Enum.map(errors, &Ash.Error.set_path(&1, path))
    end
  end

  defp do_validate_load(query, {atom, _} = tuple, path) when is_atom(atom) do
    do_validate_load(query, [tuple], path)
  end

  defp do_validate_load(query, loads, path) when is_list(loads) do
    query = to_query(query)

    loads
    |> List.wrap()
    |> Enum.flat_map(fn
      {key, value} ->
        case Ash.Resource.Info.relationship(query.resource, key) do
          nil ->
            [
              NoSuchRelationship.exception(
                resource: query.resource,
                relationship: key,
                load_path: Enum.reverse(path)
              )
            ]

          relationship ->
            cond do
              !Ash.Resource.Info.primary_action(relationship.destination, :read) ->
                [
                  NoReadAction.exception(
                    resource: relationship.destination,
                    when: "loading relationship #{relationship.name}"
                  )
                ]

              relationship.type == :many_to_many &&
                  !Ash.Resource.Info.primary_action(relationship.through, :read) ->
                [
                  NoReadAction.exception(
                    resource: relationship.destination,
                    when: "loading relationship #{relationship.name}"
                  )
                ]

              true ->
                validate_matching_query_and_continue(
                  value,
                  query.resource,
                  key,
                  path,
                  relationship
                )
            end
        end
    end)
  end

  @doc false
  def do_filter(query, %Ash.Filter{} = filter) do
    query = to_query(query)

    if Ash.DataLayer.data_layer_can?(query.resource, :filter) do
      new_filter =
        case query.filter do
          nil ->
            Ash.Filter.parse(query.resource, filter, query.aggregates, query.calculations)

          existing_filter ->
            Ash.Filter.add_to_filter(
              existing_filter,
              filter,
              :and,
              query.aggregates,
              query.calculations
            )
        end

      case new_filter do
        {:ok, filter} ->
          %{query | filter: filter}

        {:error, error} ->
          add_error(query, :filter, error)
      end
    else
      add_error(query, :filter, "Data layer does not support filtering")
    end
  end

  def do_filter(query, nil), do: to_query(query)
  def do_filter(query, []), do: to_query(query)

  def do_filter(query, statement) do
    query = to_query(query)

    if Ash.DataLayer.data_layer_can?(query.resource, :filter) do
      filter =
        if query.filter do
          Ash.Filter.add_to_filter(
            query.filter,
            statement,
            :and,
            query.aggregates,
            query.calculations
          )
        else
          Ash.Filter.parse(
            query.resource,
            statement,
            query.aggregates,
            query.calculations
          )
        end

      case filter do
        {:ok, filter} ->
          Map.put(query, :filter, filter)

        {:error, error} ->
          add_error(query, :filter, error)
      end
    else
      add_error(query, :filter, "Data layer does not support filtering")
    end
  end

  @doc """
  Lock the query results.

  This must be run while in a transaction, and is not supported by all data layers.
  """
  @spec lock(t() | Ash.Resource.t(), Ash.DataLayer.lock_type()) :: t()
  def lock(query, nil), do: %{query | lock: nil}

  def lock(query, lock_type) do
    query = to_query(query)

    if Ash.DataLayer.data_layer_can?(query.resource, {:lock, lock_type}) do
      %{query | lock: lock_type}
    else
      add_error(
        query,
        Ash.Error.Query.LockNotSupported.exception(resource: query.resource, lock_type: lock_type)
      )
    end
  end

  @doc """
  Sort the results based on attributes, aggregates or calculations.

  Calculations are supported if they are defined with expressions, which can be done one of two ways.

  1. with the shorthand `calculate :calc, :type, expr(a + b)`
  2. By defining `expression/2` in a custom calculation module

  See the guide on calculations for more.

  Takes a list of fields to sort on, or a keyword list/mixed keyword list of fields and sort directions.
  The default sort direction is `:asc`.

  Examples:

  ```
  Ash.Query.sort(query, [:foo, :bar])

  Ash.Query.sort(query, [:foo, bar: :desc])

  Ash.Query.sort(query, [foo: :desc, bar: :asc])
  ```

  ## Options

  - `prepend?` - set to `true` to put your sort at the front of the list of a sort is already specified
  """
  @spec sort(t() | Ash.Resource.t(), Ash.Sort.t(), opts :: Keyword.t()) :: t()
  def sort(query, sorts, opts \\ []) do
    query = to_query(query)

    if sorts == [] || sorts == nil do
      query
    else
      if Ash.DataLayer.data_layer_can?(query.resource, :sort) do
        if opts[:prepend?] && query.sort != [] do
          query_sort = query.sort

          query
          |> Map.put(:sort, [])
          |> sort(sorts)
          |> sort(query_sort)
        else
          sorts
          |> List.wrap()
          |> Enum.reduce(query, fn
            {sort, direction}, query ->
              %{query | sort: query.sort ++ [{sort, direction}]}

            sort, query ->
              %{query | sort: query.sort ++ [{sort, :asc}]}
          end)
          |> validate_sort()
        end
      else
        add_error(query, :sort, "Data layer does not support sorting")
      end
    end
  end

  @doc """
  Get results distinct on the provided fields.

  Takes a list of fields to distinct on. Each call is additive, so to remove the `distinct` use
  `unset/2`.

  Examples:

  ```
  Ash.Query.distinct(query, [:first_name, :last_name])

  Ash.Query.distinct(query, :email)
  ```
  """
  @spec distinct(t() | Ash.Resource.t(), Ash.Sort.t()) :: t()
  def distinct(query, distincts) do
    query = to_query(query)

    if Ash.DataLayer.data_layer_can?(query.resource, :distinct) do
      %{query | distinct: List.wrap(query.distinct) ++ List.wrap(distincts)}
    else
      add_error(query, :distinct, "Data layer does not support distincting")
    end
  end

  @spec unset(Ash.Resource.t() | t(), atom | [atom]) :: t()
  def unset(query, keys) when is_list(keys) do
    query = to_query(query)

    new = new(query.resource)

    keys
    |> Enum.reduce(query, fn key, query ->
      do_unset(query, key, new)
    end)
  end

  def unset(query, key) do
    new = new(query.resource)
    do_unset(query, key, new)

    query
    |> to_query()
    |> struct([{key, Map.get(new, key)}])
  end

  defp do_unset(query, key, _new) when key in [:api, :resource] do
    query
  end

  defp do_unset(query, :load, new) do
    query = unset(query, [:calculations, :aggregates])

    struct(query, [{:load, Map.get(new, :load)}])
  end

  defp do_unset(query, key, new) do
    struct(query, [{key, Map.get(new, key)}])
  end

  @doc "Return the underlying data layer query for an ash query"
  def data_layer_query(ash_query, opts \\ [])

  def data_layer_query(%{errors: errors}, _opts) when errors not in [[], nil] do
    {:error, Ash.Error.to_error_class(errors)}
  end

  def data_layer_query(%{resource: resource, api: api} = ash_query, opts) do
    query = opts[:initial_query] || Ash.DataLayer.resource_to_query(resource, api)

    filter_aggregates =
      if ash_query.filter do
        Ash.Filter.used_aggregates(ash_query.filter)
      else
        []
      end

    sort_aggregates =
      Enum.flat_map(ash_query.sort, fn {field, _} ->
        case Map.fetch(ash_query.aggregates, field) do
          :error ->
            []

          {:ok, agg} ->
            [agg]
        end
      end)

    aggregates = Enum.uniq_by(filter_aggregates ++ sort_aggregates, & &1.name)

    with {:ok, query} <-
           Ash.DataLayer.set_context(
             resource,
             query,
             Map.put(ash_query.context, :action, ash_query.action)
           ),
         {:ok, query} <-
           add_tenant(query, ash_query),
         {:ok, query} <- Ash.DataLayer.select(query, ash_query.select, ash_query.resource),
         {:ok, query} <-
           add_aggregates(query, ash_query, aggregates),
         {:ok, query} <-
           Ash.DataLayer.sort(query, ash_query.sort, resource),
         {:ok, query} <- maybe_filter(query, ash_query, opts),
         {:ok, query} <- Ash.DataLayer.distinct(query, ash_query.distinct, resource),
         {:ok, query} <-
           Ash.DataLayer.limit(query, ash_query.limit, resource),
         {:ok, query} <-
           Ash.DataLayer.offset(query, ash_query.offset, resource),
         {:ok, query} <- Ash.DataLayer.lock(query, ash_query.lock, resource) do
      if opts[:no_modify?] || !ash_query.action || !ash_query.action.modify_query do
        {:ok, query}
      else
        case ash_query.action.modify_query do
          {m, f, a} ->
            apply(m, f, a ++ [ash_query, query])

          modify_query when is_function(modify_query, 2) ->
            modify_query.(ash_query, query)
        end
      end
    else
      {:error, error} -> {:error, error}
    end
  end

  defp add_tenant(query, ash_query) do
    with :context <- Ash.Resource.Info.multitenancy_strategy(ash_query.resource),
         tenant when not is_nil(tenant) <- ash_query.tenant,
         {:ok, query} <- Ash.DataLayer.set_tenant(ash_query.resource, query, tenant) do
      {:ok, query}
    else
      {:error, error} -> {:error, error}
      _ -> {:ok, query}
    end
  end

  defp add_aggregates(query, ash_query, aggregates) do
    resource = ash_query.resource

    used =
      ash_query
      |> Ash.Query.Aggregate.aggregates_from_filter()
      |> Enum.map(&elem(&1, 1))

    aggregates =
      aggregates
      |> Enum.concat(used)
      |> Enum.map(&add_tenant_to_aggregate_query(&1, ash_query))

    Ash.DataLayer.add_aggregates(query, aggregates, resource)
  end

  defp add_tenant_to_aggregate_query(aggregate, %{tenant: nil}), do: aggregate

  defp add_tenant_to_aggregate_query(%{query: nil} = aggregate, ash_query) do
    aggregate_with_query = %{aggregate | query: Ash.Query.new(aggregate.resource)}
    add_tenant_to_aggregate_query(aggregate_with_query, ash_query)
  end

  defp add_tenant_to_aggregate_query(aggregate, ash_query) do
    case Ash.Resource.Info.multitenancy_strategy(aggregate.resource) do
      nil ->
        aggregate

      :attribute ->
        attribute = Ash.Resource.Info.multitenancy_attribute(aggregate.resource)
        {m, f, a} = Ash.Resource.Info.multitenancy_parse_attribute(ash_query.resource)
        attribute_value = apply(m, f, [ash_query.tenant | a])
        %{aggregate | query: filter(aggregate.query, ^[{attribute, attribute_value}])}

      :context ->
        %{aggregate | query: set_tenant(aggregate.query, ash_query.tenant)}
    end
  end

  defp validate_sort(%{resource: resource, sort: sort} = query) do
    case Sort.process(resource, sort, query.aggregates, query.context) do
      {:ok, new_sort} -> %{query | sort: new_sort}
      {:error, error} -> add_error(query, :sort, error)
    end
  end

  def add_error(query, keys \\ [], message) do
    keys = List.wrap(keys)
    query = to_query(query)

    message =
      if is_binary(message) do
        string_path =
          case keys do
            [key] -> to_string(key)
            keys -> Enum.join(keys, ".")
          end

        "#{string_path}: #{message}"
      else
        message
      end

    message
    |> Ash.Error.to_ash_error()
    |> case do
      errors when is_list(errors) ->
        errors =
          Enum.map(errors, fn error ->
            Map.update(error, :path, keys, &(keys ++ List.wrap(&1)))
          end)

        %{query | errors: query.errors ++ errors, valid?: false}

      error ->
        error = Map.update(error, :path, keys, &(keys ++ List.wrap(&1)))
        %{query | errors: [error | query.errors], valid?: false}
    end
  end

  defp validate_matching_query_and_continue(value, resource, key, path, relationship) do
    %{destination: relationship_resource} = relationship

    case value do
      %__MODULE__{resource: query_resource} = destination_query
      when query_resource != relationship_resource ->
        [
          InvalidQuery.exception(
            resource: resource,
            relationship: key,
            query: destination_query,
            load_path: Enum.reverse(path)
          )
        ]

      %__MODULE__{} = destination_query ->
        if Map.get(relationship, :manual) &&
             (destination_query.limit ||
                (destination_query.offset && destination_query.offset != 0)) do
          [
            InvalidQuery.exception(
              resource: resource,
              relationship: key,
              query: destination_query,
              load_path: Enum.reverse(path)
            )
          ]
        else
          do_validate_load(relationship.destination, destination_query, [key | path])
        end

      other ->
        do_validate_load(relationship.destination, other, [key | path])
    end
  end

  defp maybe_filter(query, %{filter: nil}, _) do
    {:ok, query}
  end

  defp maybe_filter(query, ash_query, opts) do
    case Ash.DataLayer.filter(query, ash_query.filter, ash_query.resource) do
      {:ok, filtered} ->
        if Keyword.get(opts, :only_validate_filter?, false) do
          {:ok, query}
        else
          {:ok, filtered}
        end

      {:error, error} ->
        {:error, error}
    end
  end

  defp set_load_api(nil, _), do: nil
  defp set_load_api([], _), do: []

  defp set_load_api(%__MODULE__{} = query, api) do
    set_api(query, api)
  end

  defp set_load_api(loads, api) do
    Enum.map(loads, fn {key, further} ->
      {key, set_load_api(further, api)}
    end)
  end

  @doc """
  Takes a resource or a query and returns a query.
  """
  @spec to_query(t() | Ash.Resource.t()) :: t()
  def to_query(%__MODULE__{} = query), do: query

  def to_query(resource) do
    resource
    |> new()
    |> Ash.DataLayer.transform_query()
  end

  defp merge_load([], right), do: sanitize_loads(right)
  defp merge_load(left, []), do: sanitize_loads(left)

  defp merge_load(
         %__MODULE__{load: left_loads, calculations: left_calculations, tenant: left_tenant},
         %__MODULE__{load: right_loads, calculations: right_calculations} = query
       ) do
    %{
      query
      | load: merge_load(left_loads, right_loads),
        calculations: Map.merge(left_calculations, right_calculations)
    }
    |> set_tenant(query.tenant || left_tenant)
  end

  defp merge_load(%__MODULE__{} = query, right) when is_list(right) do
    load_relationship(query, right)
  end

  defp merge_load(left, %Ash.Query{} = query) when is_list(left) do
    load_relationship(query, left)
  end

  defp merge_load(left, right) when is_atom(left), do: merge_load([{left, []}], right)
  defp merge_load(left, right) when is_atom(right), do: merge_load(left, [{right, []}])

  defp merge_load(left, right) when is_list(left) and is_list(right) do
    right
    |> sanitize_loads()
    |> Enum.reduce(sanitize_loads(left), fn {rel, rest}, acc ->
      Keyword.update(acc, rel, rest, &merge_load(&1, rest))
    end)
  end

  defp sanitize_loads(load) when is_atom(load), do: {load, []}

  defp sanitize_loads(%Ash.Query{} = query) do
    Map.update!(query, :load, &sanitize_loads/1)
  end

  defp sanitize_loads(loads) do
    loads
    |> List.wrap()
    |> Enum.map(fn
      {key, value} ->
        {key, sanitize_loads(value)}

      load_part ->
        cond do
          is_atom(load_part) -> {load_part, []}
          is_list(load_part) -> sanitize_loads(load_part)
          true -> load_part
        end
    end)
  end
end