defmodule Ash.Query.Aggregate do
@moduledoc "Represents an aggregated association value"
defstruct [
:name,
:relationship_path,
:default_value,
:resource,
:query,
:field,
:kind,
:type,
:constraints,
:implementation,
:load,
:read_action,
:agg_name,
join_filters: %{},
context: %{},
authorize?: true,
uniq?: false,
filterable?: true
]
@type t :: %__MODULE__{}
@kinds [:count, :first, :sum, :list, :max, :min, :avg, :exists, :custom]
@type kind :: unquote(Enum.reduce(@kinds, &{:|, [], [&1, &2]}))
alias Ash.Error.Query.{NoReadAction, NoSuchRelationship}
require Ash.Query
@doc false
def kinds, do: @kinds
def new!(resource, name, kind, opts \\ []) do
case new(resource, name, kind, opts) do
{:ok, aggregate} ->
aggregate
{:error, error} ->
raise Ash.Error.to_error_class(error)
end
end
@doc false
def subpaths([]), do: []
def subpaths([first | rest]) do
[[first] | Enum.map(subpaths(rest), &[first | &1])]
end
@schema [
path: [
type: {:list, :atom},
doc: "The relationship path to aggregate over. Only used when adding aggregates to a query."
],
query: [
type: :any,
doc:
"A base query to use for the aggregate, or a keyword list to be passed to `Ash.Query.build/2`"
],
field: [
type: :atom,
doc: "The field to use for the aggregate. Not necessary for all aggregate types."
],
default: [
type: :any,
doc: "A default value to use for the aggregate if it returns `nil`."
],
filterable?: [
type: :boolean,
doc: "Whether or not this aggregate may be used in filters."
],
type: [
type: :any,
doc: "A type to use for the aggregate."
],
constraints: [
type: :any,
doc: "Type constraints to use for the aggregate."
],
implementation: [
type: :any,
doc: "The implementation for any custom aggregates."
],
read_action: [
type: :atom,
doc: "The read action to use for the aggregate, defaults to the primary read action."
],
uniq?: [
type: :boolean,
default: false,
doc:
"Whether or not to only consider unique values. Only relevant for `count` and `list` aggregates."
],
join_filters: [
type: {:map, {:wrap_list, :atom}, :any},
default: %{},
doc: """
A map of relationship paths (an atom or list of atoms), to an expression to apply when fetching the aggregate data. See the aggregates guide for more.
"""
],
authorize?: [
type: :boolean,
default: true,
doc: """
Whether or not the aggregate query should authorize based on the target action.
See `d:Ash.Resource.Dsl.aggregates|count` for more information.
"""
]
]
@keys Keyword.keys(@schema)
@doc false
def opt_keys do
@keys
end
@doc """
Create a new aggregate, used with `Query.aggregate` or `Api.aggregate`
Options:
#{Spark.OptionsHelpers.docs(@schema)}
"""
def new(resource, name, kind, opts \\ []) do
opts =
Enum.reject(opts, fn
{_key, nil} ->
true
_ ->
false
end)
with {:ok, opts} <- Spark.OptionsHelpers.validate(opts, @schema) do
related = Ash.Resource.Info.related(resource, opts[:path] || [])
query =
case opts[:query] || Ash.Query.new(related) do
%Ash.Query{} = query -> query
build_opts -> build_query(related, build_opts)
end
read_action = opts[:read_action] || Ash.Resource.Info.primary_action!(related, :read).name
query =
if query.__validated_for_action__ != read_action do
query
|> Ash.Query.set_context(%{private: %{require_actor?: false}})
|> Ash.Query.for_read(read_action, %{})
else
query
end
opts[:join_filters]
|> Kernel.||(%{})
|> Enum.reduce_while({:ok, %{}}, fn {path, filter}, {:ok, acc} ->
case parse_join_filter(resource, path, filter) do
{:ok, filter} ->
{:cont, {:ok, Map.put(acc, path, filter)}}
{:error, error} ->
{:halt, {:error, error}}
end
end)
|> case do
{:ok, join_filters} ->
new(
resource,
name,
kind,
opts[:path] || [],
query,
opts[:field],
opts[:default],
Keyword.get(opts, :filterable?, true),
opts[:type],
Keyword.get(opts, :constraints, []),
opts[:implementation],
opts[:uniq?],
opts[:read_action],
Keyword.get(opts, :authorize?, true),
join_filters
)
{:error, error} ->
{:error, error}
end
end
end
@deprecated "Use `new/4` instead."
def new(
resource,
name,
kind,
relationship,
query,
field,
default \\ nil,
filterable? \\ true,
type \\ nil,
constraints \\ [],
implementation \\ nil,
uniq? \\ false,
read_action \\ nil,
authorize? \\ true,
join_filters \\ %{}
) do
if kind == :custom && !type do
raise ArgumentError, "Must supply type when building a `custom` aggregate"
end
if kind == :custom && !implementation do
raise ArgumentError, "Must supply implementation when building a `custom` aggregate"
end
related = Ash.Resource.Info.related(resource, relationship)
attribute_type =
if field do
case Ash.Resource.Info.field(related, field) do
%{type: type, constraints: constraints} ->
{:ok, type, constraints}
_ ->
{:error, "No such field for #{inspect(related)}: #{inspect(field)}"}
end
else
{:ok, nil, constraints}
end
default =
if is_function(default) do
default.()
else
default
end
with :ok <- validate_uniq(uniq?, kind),
{:ok, attribute_type, attribute_constraints} <- attribute_type,
:ok <- validate_path(resource, List.wrap(relationship)),
{:ok, type, constraints} <-
get_type(kind, type, attribute_type, attribute_constraints, constraints),
%{valid?: true} = query <- build_query(related, query) do
{:ok,
%__MODULE__{
name: name,
agg_name: name,
resource: resource,
constraints: constraints,
default_value: default || default_value(kind),
relationship_path: List.wrap(relationship),
implementation: implementation,
field: field,
kind: kind,
type: type,
uniq?: uniq?,
query: query,
filterable?: filterable?,
authorize?: authorize?,
read_action: read_action,
join_filters: Map.new(join_filters, fn {key, value} -> {List.wrap(key), value} end)
}}
else
%{valid?: false} = query ->
{:error, query.errors}
{:error, error} ->
{:error, error}
end
end
defp parse_join_filter(resource, path, filter) do
[last_relationship | relationships] =
path_to_reversed_relationships(resource, path)
top_parent_resource = (List.last(relationships) || last_relationship).source
parent_resources =
relationships |> Enum.map(& &1.destination) |> Enum.concat([top_parent_resource])
Ash.Filter.parse(last_relationship.destination, filter, %{}, %{}, %{
parent_stack: parent_resources
})
end
defp path_to_reversed_relationships(resource, path, acc \\ [])
defp path_to_reversed_relationships(_resource, [], acc), do: acc
defp path_to_reversed_relationships(resource, [first | rest], acc) do
relationship = Ash.Resource.Info.relationship(resource, first)
if !relationship do
raise ArgumentError, "No such relationship: #{inspect(resource)}.#{first} in join_filter"
end
path_to_reversed_relationships(relationship.destination, rest, [
relationship | acc
])
end
defp validate_uniq(true, kind) when kind in [:count, :list], do: :ok
defp validate_uniq(true, kind),
do:
{:error,
"#{kind} aggregates do not support the `uniq?` option. Only count and list are supported currently."}
defp validate_uniq(_, _), do: :ok
defp get_type(:custom, type, _, _attribute_constraints, provided_constraints),
do: {:ok, type, provided_constraints || []}
defp get_type(kind, _, attribute_type, attribute_constraints, provided_constraints) do
kind_to_type(kind, attribute_type, attribute_constraints || provided_constraints)
end
defp validate_path(_, []), do: :ok
defp validate_path(resource, [relationship | rest]) do
case Ash.Resource.Info.relationship(resource, relationship) do
nil ->
{:error, NoSuchRelationship.exception(resource: resource, name: relationship)}
%{type: :many_to_many, through: through, destination: destination} ->
cond do
!Ash.Resource.Info.primary_action(through, :read) ->
{:error, NoReadAction.exception(resource: through, when: "aggregating")}
!Ash.Resource.Info.primary_action(destination, :read) ->
{:error, NoReadAction.exception(resource: destination, when: "aggregating")}
!Ash.DataLayer.data_layer(through) == Ash.DataLayer.data_layer(resource) ->
{:error, "Cannot cross data layer boundaries when building an aggregate"}
true ->
validate_path(destination, rest)
end
relationship ->
cond do
!Ash.Resource.Info.primary_action(relationship.destination, :read) ->
NoReadAction.exception(resource: relationship.destination, when: "aggregating")
!Ash.DataLayer.data_layer(relationship.destination) ==
Ash.DataLayer.data_layer(resource) ->
{:error, "Cannot cross data layer boundaries when building an aggregate"}
true ->
validate_path(relationship.destination, rest)
end
end
end
@doc false
def split_aggregate_opts(opts) do
{left, right} = Keyword.split(opts, opt_keys())
right =
case Keyword.fetch(left, :authorize?) do
{:ok, value} ->
Keyword.put(right, :authorize?, value)
:error ->
right
end
case Keyword.fetch(right, :action) do
{:ok, action} ->
{Keyword.put(left, :read_action, action), right}
:error ->
{left, right}
end
end
def default_value(:count), do: 0
def default_value(:first), do: nil
def default_value(:sum), do: nil
def default_value(:max), do: nil
def default_value(:min), do: nil
def default_value(:avg), do: nil
def default_value(:exists), do: nil
def default_value(:list), do: []
def default_value(:custom), do: nil
@doc false
def build_query(resource, nil), do: Ash.Query.new(resource)
def build_query(resource, build_opts) when is_list(build_opts) do
cond do
build_opts[:limit] ->
Ash.Query.add_error(resource, "Cannot set limit on aggregate query")
build_opts[:offset] && build_opts[:offset] != 0 ->
Ash.Query.add_error(resource, "Cannot set offset on aggregate query")
true ->
case Ash.Query.build(resource, build_opts) do
%{valid?: true} = query ->
build_query(resource, query)
%{valid?: false} = query ->
query
end
end
end
def build_query(_resource, %Ash.Query{} = query) do
cond do
query.limit ->
Ash.Query.add_error(query, "Cannot set limit on aggregate query")
query.offset && query.offset != 0 ->
Ash.Query.add_error(query, "Cannot set offset on aggregate query")
true ->
Ash.Query.unset(query, [:load, :limit, :offset])
end
end
@doc false
def kind_to_type({:custom, type}, _attribute_type, _attribute_constraints), do: {:ok, type, []}
def kind_to_type(kind, attribute_type, attribute_constraints)
when kind in [:first, :sum, :max, :min],
do: {:ok, attribute_type, attribute_constraints || []}
def kind_to_type(:list, attribute_type, attribute_constraints),
do: {:ok, {:array, attribute_type}, [items: attribute_constraints || []]}
def kind_to_type(kind, attribute_type, _attribute_constraints) do
with {:ok, type} <- kind_to_type(kind, attribute_type) do
{:ok, type, []}
end
end
@deprecated "use kind to type/3 instead"
def kind_to_type({:custom, type}, _attribute_type), do: {:ok, type}
def kind_to_type(:count, _attribute_type), do: {:ok, Ash.Type.Integer}
def kind_to_type(:exists, _attribute_type), do: {:ok, Ash.Type.Boolean}
def kind_to_type(kind, nil), do: {:error, "Must provide field type for #{kind}"}
def kind_to_type(:avg, _attribute_type), do: {:ok, :float}
def kind_to_type(kind, attribute_type) when kind in [:first, :sum, :max, :min],
do: {:ok, attribute_type}
def kind_to_type(:list, attribute_type), do: {:ok, {:array, attribute_type}}
def kind_to_type(kind, _attribute_type), do: {:error, "Invalid aggregate kind: #{kind}"}
defimpl Inspect do
import Inspect.Algebra
def inspect(%{query: nil} = aggregate, opts) do
container_doc(
"#" <> to_string(aggregate.kind) <> "<",
[Enum.join(aggregate.relationship_path, ".")],
">",
opts,
fn str, _ -> str end,
separator: ""
)
end
def inspect(%{query: query} = aggregate, opts) do
field =
if aggregate.field do
if is_atom(aggregate.field) do
[to_string(aggregate.field)]
else
case aggregate.field do
%{agg_name: agg_name} ->
[to_string(agg_name)]
%{calc_name: calc_name} ->
[to_string(calc_name)]
_ ->
[inspect(aggregate.field)]
end
end
else
[]
end
container_doc(
"#" <> to_string(aggregate.kind) <> "<",
[
concat([
Enum.join(aggregate.relationship_path ++ field, "."),
concat(" from ", to_doc(query, opts))
])
],
">",
opts,
fn str, _ -> str end
)
end
end
end