defmodule Ash.Engine.Request do
@moduledoc """
Represents an individual request to be processed by the engine.
See `new/1` for more information
"""
defstruct [
:async?,
:resource,
:changeset,
:path,
:action_type,
:action,
:data,
:notification_data,
:name,
:api,
:query,
:data_layer_query,
:authorization_filter,
:write_to_data?,
:strict_check_only?,
:verbose?,
:state,
:actor,
:authorize?,
:engine_pid,
:error_path,
:completion,
:tracer,
:trace_prefix,
async_fetch_state: :not_requested,
touches_resources: [],
additional_context: %{},
notify?: false,
authorized?: false,
authorizer_state: %{},
dependencies_to_send: %{},
dependency_data: %{}
]
@type t :: %__MODULE__{}
alias Ash.Authorizer
alias Ash.Error.Invalid.{DuplicatedPath, ImpossiblePath}
require Ash.Query
require Ash.Tracer
require Logger
defmodule UnresolvedField do
@moduledoc """
Represents an unresolved field to be resolved by the engine.
"""
defstruct [:resolver, deps: [], data?: false]
@type t :: %__MODULE__{}
def new(dependencies, func) do
%__MODULE__{
resolver: func,
deps: deps(dependencies)
}
end
defp deps(deps) do
deps
|> List.wrap()
|> Enum.map(fn dep -> List.wrap(dep) end)
end
end
defimpl Inspect, for: UnresolvedField do
import Inspect.Algebra
def inspect(field, opts) do
concat([
"#UnresolvedField<",
to_doc(field.deps, opts),
">"
])
end
end
@doc """
Create an unresolved field.
Can have dependencies, which is a list of atoms. All elements
before the last comprise the path of a request that is also
being processed, like `[:data]`, and the last element is the
key of that request that is required. Make sure to pass a
list of lists of atoms. The second argument is a map, which
contains all of the values you requested, at the same path
that they were requested.
For example:
resolve([[:data, :query], [:data, :data]], fn %{data: %{query: query, data: data}} ->
data # This is the data field of the [:data] request
query # This is the query field of the [:data] request
{:ok, result}
# or
{:error, error}
# or
result
end)
"""
def resolve(dependencies \\ [], func) do
UnresolvedField.new(Enum.uniq(List.wrap(dependencies)), func)
end
@doc """
Creates a new request.
The field values may be explicit values, or they may be
instances of `UnresolvedField`.
When other requests depend on a value from this request, they will
not be sent unless this request has completed its authorization (or this
request has been configured not to do authorization). This allows requests
to depend on each other without those requests happening just before a request
fails with a forbidden error. These fields are `data`, `query`, `changeset`
and `authorized?`.
A field may not be resolved if the data of a request has been resolved and
no other requests depend on that field.
Options:
* query - The query to be used to fetch data. Used to authorize reads.
* data - The ultimate goal of a request is to compute the data
* resource - The primary resource of the request. Used for opening transactions on creates/updates/destroys
* changeset - Any changes to be made to the resource. Used to authorize writes.
* path - The path of the request. This serves as a unique id, and is the way that other requests can refer to this one
* action_type - The action_type of the request
* action - The action being performed on the data
* async? - Whether or not the request *can* be asynchronous, defaults to `true`.
* api - The api module being called
* name - A human readable name for the request, used when logging/in errors
* strict_check_only? - If true, authorization will not be allowed to proceed to a runtime check (so it cannot run db queries unless authorization is assured)
* actor - The actor performing the action, used for authorization
* authorize? - Whether or not to perform authorization (defaults to true)
* verbose? - print informational logs (warning, this will be a whole lot of logs)
* write_to_data? - If set to false, this value is not returned from the initial call to the engine
"""
def new(opts) do
query =
case opts[:query] do
%UnresolvedField{} = query ->
query
%Ash.Query{} = query ->
query
nil ->
nil
other ->
raise ArgumentError,
message: "Unexpected value passed to `Ash.Query.new/1`: #{inspect(other)}"
end
data =
case opts[:data] do
%UnresolvedField{} = unresolved ->
%{unresolved | data?: true}
other ->
other
end
%__MODULE__{
resource: opts[:resource],
changeset: opts[:changeset],
path: List.wrap(opts[:path]),
action_type: opts[:action_type] || (opts[:action] && opts[:action].type),
action: opts[:action],
async?: Keyword.get(opts, :async?, true),
data: data,
query: query,
error_path: opts[:error_path],
touches_resources: opts[:touches_resources] || [],
data_layer_query: resolve([], fn _ -> nil end),
api: opts[:api],
name: opts[:name],
strict_check_only?: opts[:strict_check_only?],
state: :strict_check,
actor: opts[:actor],
notify?: opts[:notify?] == true,
authorize?: Keyword.get(opts, :authorize?, true),
authorized?: opts[:authorize?] == false,
verbose?: opts[:verbose?] || false,
completion: resolve([opts[:path] ++ [:data]], fn _ -> {:ok, :complete} end),
write_to_data?: Keyword.get(opts, :write_to_data?, true)
}
end
def resource_notification(request) do
%Ash.Notifier.Notification{
resource: request.resource,
api: request.api,
actor: request.actor,
action: request.action,
data: request.notification_data,
changeset: request.changeset
}
end
def next(request) do
case do_next(request) do
{:complete, new_request, notifications, dependencies} ->
if request.state != :complete do
{:complete, new_request, notifications, dependencies}
else
{:already_complete, new_request, notifications, dependencies}
end
{:waiting, new_request, notifications, dependencies} ->
{:wait, new_request, notifications, dependencies}
{:continue, new_request, notifications} ->
{:continue, new_request, notifications}
{:error, error, request} ->
error = Ash.Error.to_ash_error(error)
error =
if request.error_path do
Ash.Error.set_path(error, request.error_path)
else
error
end
{:error, error, request}
end
end
def do_next(%{state: :strict_check, authorize?: false} = request) do
log(request, fn -> "Skipping strict check due to authorize?: false" end)
{:continue, %{request | state: :fetch_data}, []}
end
def do_next(%{state: :strict_check, resource: nil} = request) do
{:continue, %{request | state: :fetch_data}, []}
end
def do_next(%{state: :strict_check} = request) do
case Ash.Resource.Info.authorizers(request.resource) do
[] ->
log(request, fn -> "No authorizers found, skipping strict check" end)
{:continue, %{request | state: :fetch_data}, []}
authorizers ->
case strict_check(authorizers, request) do
{:ok, new_request, notifications, []} ->
new_request = set_authorized(new_request)
log(new_request, fn -> "Strict check complete" end)
{:continue, %{new_request | state: :fetch_data}, notifications}
{:ok, new_request, notifications, dependencies} ->
log(new_request, fn -> "Strict check incomplete, waiting on dependencies" end)
{:waiting, new_request, notifications, dependencies}
{:error, error} ->
log(request, fn -> "Strict checking failed" end)
{:error, error, request}
end
end
end
def do_next(%{state: :fetch_data} = request) do
key =
case request.changeset do
%UnresolvedField{} ->
:changeset
_ ->
:data
end
case try_resolve_local(request, key, true) do
{:skipped, new_request, notifications, waiting_for} ->
{:waiting, new_request, notifications, waiting_for}
{:ok, request, notifications, []} ->
if key == :changeset do
{:continue, request, notifications}
else
log(request, fn -> "data fetched: #{inspect(notifications)}" end)
{:continue, %{request | state: :check}, notifications}
end
{:ok, new_request, notifications, waiting_for} ->
log(request, fn -> "#{key} waiting on dependencies: #{inspect(waiting_for)}" end)
{:waiting, new_request, notifications, waiting_for}
{:error, error, new_request} ->
log(request, fn -> "error fetching #{key}: #{inspect(error)}" end)
{:error, error, new_request}
{:error, error} ->
log(request, fn -> "error fetching #{key}: #{inspect(error)}" end)
{:error, error, request}
end
end
def do_next(%{state: :check, authorize?: false} = request) do
log(request, fn -> "Skipping check due to `authorize?: false`" end)
{:complete, %{request | state: :complete}, [], []}
end
def do_next(%{state: :check, resource: nil} = request) do
{:complete, %{request | state: :complete}, [], []}
end
def do_next(%{state: :check} = request) do
case Ash.Resource.Info.authorizers(request.resource) do
[] ->
log(request, fn -> "No authorizers found, skipping check" end)
{:complete, %{request | state: :complete}, [], []}
authorizers ->
case check(authorizers, request) do
{:ok, new_request, notifications, []} ->
log(new_request, fn -> "Check complete" end)
new_request = set_authorized(new_request)
{:complete, %{new_request | state: :complete}, notifications, []}
{:ok, new_request, notifications, waiting} ->
log(request, fn -> "Check incomplete, waiting on dependencies" end)
{:waiting, new_request, notifications, waiting}
{:error, error} ->
log(request, fn -> "Check failed" end)
{:error, error, request}
end
end
end
def do_next(%{state: :complete} = request) do
if request.dependencies_to_send == %{} do
{:complete, request, [], []}
else
Enum.reduce_while(request.dependencies_to_send, {:complete, request, [], []}, fn
{field, _paths}, {:complete, request, notifications, deps} ->
case try_resolve_local(request, field, false) do
{:skipped, new_request, new_notifications, other_deps} ->
new_request = %{new_request | state: :complete}
{:cont,
{:complete, new_request, new_notifications ++ notifications, other_deps ++ deps}}
{:ok, new_request, new_notifications, other_deps} ->
new_request = %{new_request | state: :complete}
{:cont,
{:complete, new_request, new_notifications ++ notifications, other_deps ++ deps}}
{:error, error} ->
{:halt, {:error, error, request}}
end
end)
end
end
def summarize(%{path: path, name: name, action: %{name: action}, resource: resource})
when not is_nil(resource) do
"#{inspect(path, structs: false)}\n#{name}: #{inspect(resource)}.#{action}"
end
def summarize(%{path: path, name: name}) do
"#{inspect(path, structs: false)} - #{name}"
end
def sort_and_clean_notifications(notifications) do
{front, back} =
notifications
|> List.wrap()
|> Enum.uniq()
|> Enum.split_with(fn
{:requests, _} ->
true
_ ->
false
end)
front ++ back
end
def wont_receive(request, path, field) do
log(request, fn -> "Request failed due to failed dependency #{inspect(path ++ [field])}" end)
{:stop, :dependency_failed, request}
end
def send_field(request, receiver_path, field) do
log(request, fn -> "Attempting to provide #{inspect(field)} for #{inspect(receiver_path)}" end)
case store_dependency(request, receiver_path, field) do
{:ok, new_request, notifications} ->
{:ok, new_request, notifications}
{:waiting, new_request, notifications, waiting_for} ->
{:waiting, new_request, notifications, waiting_for}
{:error, error, new_request} ->
log(request, fn -> "Error resolving #{field}: #{inspect(error)}" end)
error = Ash.Error.to_ash_error(error)
error =
if request.error_path do
Ash.Error.set_path(error, request.error_path)
else
error
end
{:error, error, new_request}
end
end
def receive_field(request, path, field, value) do
log(request, fn -> "Receiving field #{field} from #{inspect(path)}" end)
new_request = put_dependency_data(request, path ++ [field], value)
{:continue, new_request}
end
defp set_authorized(%{authorized?: false, resource: resource} = request) do
authorized? =
resource
|> Ash.Resource.Info.authorizers()
|> Enum.all?(fn authorizer ->
authorizer_state(request, authorizer) == :authorized
end)
%{request | authorized?: authorized?}
end
defp set_authorized(request), do: request
def put_dependency_data(request, dep, value) do
%{request | dependency_data: Map.put(request.dependency_data, dep, value)}
end
def store_dependency(error, receiver_path, field, internal? \\ false)
def store_dependency(%{state: :error} = request, _receiver_path, _field, _internal?) do
{:ok, request, []}
end
def store_dependency(request, receiver_path, field, internal?) do
request = do_store_dependency(request, field, receiver_path)
case try_resolve_local(request, field, internal?) do
{:skipped, new_request, notifications, []} ->
log(request, fn -> "Field #{field} was skipped, no additional dependencies" end)
{:ok, new_request, notifications}
{:skipped, new_request, notifications, waiting} ->
log(request, fn ->
"Field #{field} was skipped, registering dependencies: #{inspect(waiting)}"
end)
{:waiting, new_request, notifications, waiting}
{:ok, new_request, notifications, []} ->
case Map.get(new_request, field) do
%UnresolvedField{} ->
log(request, fn -> "Field could not be resolved #{field}, registering dependency" end)
{:ok, new_request, notifications}
value ->
log(request, fn -> "Field #{field}, was resolved and provided" end)
{new_request, new_notifications} = notifications(new_request, field, value)
{:ok, new_request, new_notifications ++ notifications}
end
{:ok, new_request, notifications, waiting} ->
case Map.get(new_request, field) do
%UnresolvedField{} ->
log(request, fn -> "Field could not be resolved #{field}, registering dependency" end)
{:waiting, new_request, notifications, waiting}
value ->
log(request, fn -> "Field #{field}, was resolved and provided" end)
{new_request, new_notifications} = notifications(new_request, field, value)
{:ok, new_request, new_notifications ++ notifications, waiting}
end
{:error, error} ->
{:error, error, request}
end
end
defp do_store_dependency(request, field, receiver_path) do
log(request, fn -> "storing dependency on #{field} from #{inspect(receiver_path)}" end)
new_deps_to_send =
Map.update(request.dependencies_to_send, field, [receiver_path], fn paths ->
paths = Enum.reject(paths, &Kernel.==(&1, receiver_path))
[receiver_path | paths]
end)
%{request | dependencies_to_send: new_deps_to_send}
end
defp strict_check(authorizers, request) do
authorizers
|> Enum.reject(&(authorizer_state(request, &1) == :authorized))
|> Enum.reduce_while({:ok, request, [], []}, fn authorizer,
{:ok, request, notifications, waiting_for} ->
log(request, fn -> "strict checking" end)
case do_strict_check(authorizer, request) do
{:ok, new_request} ->
log(new_request, fn -> "strict check succeeded for #{inspect(authorizer)}" end)
{:cont, {:ok, new_request, notifications, waiting_for}}
{:ok, new_request, new_notifications, new_deps} ->
log(new_request, fn -> "strict check succeeded for #{inspect(authorizer)}" end)
{:cont, {:ok, new_request, new_notifications ++ notifications, waiting_for ++ new_deps}}
{:waiting, new_request, new_notifications, new_deps} ->
log(
new_request,
fn -> "waiting on dependencies: #{inspect(new_deps)} for #{inspect(authorizer)}" end
)
{:cont, {:ok, new_request, notifications ++ new_notifications, new_deps ++ waiting_for}}
{:error, error} ->
log(request, fn ->
"strict check failed for #{inspect(authorizer)}: #{inspect(error)}"
end)
{:halt, {:error, error}}
end
end)
end
defp do_strict_check(authorizer, request, notifications \\ []) do
strict_check_only? = request.strict_check_only?
case missing_strict_check_dependencies?(authorizer, request) do
[] ->
case strict_check_authorizer(authorizer, request) do
{:authorized, _authorizer} ->
{:ok, set_authorizer_state(request, authorizer, :authorized), notifications, []}
{:filter, authorizer_state, filter} ->
request
|> set_authorizer_state(authorizer, authorizer_state)
|> apply_filter(authorizer, filter, true)
|> case do
{:ok, request} ->
{:ok, request, notifications, []}
{:ok, request, new_notifications, deps} ->
{:ok, request, new_notifications ++ notifications, deps}
other ->
other
end
{:filter, filter} ->
request
|> apply_filter(authorizer, filter, true)
|> case do
{:ok, request} ->
{:ok, request, notifications, []}
{:ok, request, new_notifications, deps} ->
{:ok, request, new_notifications ++ notifications, deps}
other ->
other
end
{:filter_and_continue, _, authorizer_state} when strict_check_only? ->
{:error,
Authorizer.exception(
authorizer,
:must_pass_strict_check,
authorizer_state
)}
{:filter_and_continue, filter, new_authorizer_state} ->
request
|> set_authorizer_state(authorizer, new_authorizer_state)
|> apply_filter(authorizer, filter)
|> case do
{:ok, request} ->
{:ok, request, notifications, []}
{:ok, request, new_notifications, deps} ->
{:ok, request, new_notifications ++ notifications, deps}
other ->
other
end
{:continue, authorizer_state} when strict_check_only? ->
{:error,
Authorizer.exception(
authorizer,
:must_pass_strict_check,
authorizer_state
)}
{:continue, authorizer_state} ->
{:ok, set_authorizer_state(request, authorizer, authorizer_state), notifications, []}
{:error, error} ->
{:error, error}
end
deps ->
deps =
Enum.map(deps, fn dep ->
request.path ++ [dep]
end)
case try_resolve(request, deps, true) do
{:ok, new_request, new_notifications, []} ->
do_strict_check(authorizer, new_request, notifications ++ new_notifications)
{:ok, new_request, new_notifications, waiting_for} ->
{:waiting, new_request, notifications ++ new_notifications, waiting_for}
{:error, error} ->
{:error, error}
end
end
end
defp apply_filter(request, authorizer, filter, resolve_data? \\ false)
defp apply_filter(
%{action: %{type: :read}} = request,
authorizer,
filter,
resolve_data?
) do
request =
request
|> Map.update!(:query, &Ash.Query.filter(&1, ^filter))
|> Map.update(
:authorization_filter,
filter,
&add_to_or_parse(&1, filter, request.resource, request.query)
)
|> set_authorizer_state(authorizer, :authorized)
if resolve_data? do
try_resolve(request, [request.path ++ [:query]], false)
else
{:ok, request}
end
end
defp apply_filter(request, authorizer, filter, resolve_data?) do
case do_runtime_filter(request, filter, authorizer, get_authorizer_state(request, authorizer)) do
{:ok, request} ->
request = set_authorizer_state(request, authorizer, :authorized)
if resolve_data? do
try_resolve(request, [request.path ++ [:query]], false)
else
{:ok, request}
end
{:error, error} ->
{:error, error}
end
end
defp add_to_or_parse(existing_authorization_filter, filter, resource, query) do
if existing_authorization_filter do
Ash.Filter.add_to_filter(
existing_authorization_filter,
filter,
query.aggregates,
query.calculations,
query.context
)
else
Ash.Filter.parse!(resource, filter, query.aggregates, query.calculations, query.context)
end
end
defp check(authorizers, request) do
authorizers
|> Enum.reject(&(authorizer_state(request, &1) == :authorized))
|> Enum.reduce_while({:ok, request, [], []}, fn authorizer,
{:ok, request, notifications, waiting_for} ->
case do_check(authorizer, request) do
{:ok, new_request} ->
log(request, fn -> "check succeeded for #{inspect(authorizer)}" end)
{:cont, {:ok, new_request, notifications, waiting_for}}
{:ok, new_request, new_notifications, new_deps} ->
log(request, fn -> "check succeeded for #{inspect(authorizer)}" end)
{:cont, {:ok, new_request, new_notifications ++ notifications, new_deps ++ waiting_for}}
{:waiting, new_request, new_notifications, new_deps} ->
log(
request,
fn -> "waiting on dependencies: #{inspect(new_deps)} for #{inspect(authorizer)}" end
)
{:cont, {:ok, new_request, new_notifications ++ notifications, new_deps ++ waiting_for}}
{:error, error} ->
log(request, fn -> "check failed for #{inspect(authorizer)}: #{inspect(error)}" end)
{:halt, {:error, error}}
end
end)
end
defp do_check(authorizer, request, notifications \\ []) do
case missing_check_dependencies(authorizer, request) do
[] ->
case check_authorizer(authorizer, request) do
:authorized ->
{:ok, set_authorizer_state(request, authorizer, :authorized)}
{:filter, filter} ->
request
|> set_authorizer_state(authorizer, :authorized)
|> Map.update(
:authorization_filter,
filter,
&Ash.Filter.add_to_filter(&1, filter)
)
|> runtime_filter(authorizer, filter)
{:error, error} ->
{:error, error}
end
deps ->
deps =
Enum.map(deps, fn dep ->
request.path ++ [dep]
end)
case try_resolve(request, deps, true) do
{:ok, new_request, new_notifications, []} ->
do_check(authorizer, new_request, notifications ++ new_notifications)
{:ok, new_request, new_notifications, waiting_for} ->
{:waiting, new_request, new_notifications ++ notifications, waiting_for}
{:error, error} ->
{:error, error}
end
end
end
defp runtime_filter(request, authorizer, filter) do
case do_runtime_filter(request, filter, authorizer, get_authorizer_state(request, authorizer)) do
{:ok, request} ->
request
|> set_authorizer_state(authorizer, :authorized)
|> try_resolve([request.path ++ [:data], request.path ++ [:query]], false)
{:error, error} ->
{:error, error}
end
end
defp do_runtime_filter(
request,
nil,
_authorizer,
_authorizer_state
),
do: {:ok, request}
defp do_runtime_filter(
%{action: %{type: :read}, data: empty} = request,
_filter,
_authorizer,
_authorizer_state
)
when empty in [nil, []],
do: {:ok, request}
defp do_runtime_filter(
%{action: %{type: :read}} = request,
filter,
_authorizer,
_authorizer_state
) do
pkey = Ash.Resource.Info.primary_key(request.resource)
pkeys =
request.data
|> List.wrap()
|> Enum.map(fn record ->
record |> Map.take(pkey) |> Map.to_list()
end)
primary_key_filter =
case pkeys do
[pkey] -> [pkey]
pkeys -> [or: pkeys]
end
new_query =
request.query
|> Ash.Query.filter(^primary_key_filter)
|> Ash.Query.filter(^filter)
new_query
|> Map.put(:api, request.api)
|> Ash.Query.select([])
|> Ash.Query.data_layer_query()
|> case do
{:ok, data_layer_query} ->
data_layer_query
|> Ash.DataLayer.run_query(request.resource)
|> case do
{:ok, results} ->
pkey = Ash.Resource.Info.primary_key(request.resource)
pkeys = Enum.map(results, &Map.take(&1, pkey))
new_data = Enum.filter(request.data, &(Map.take(&1, pkey) in pkeys))
{:ok, %{request | data: new_data, query: new_query}}
{:error, error} ->
{:error, error}
end
{:error, error} ->
{:error, error}
end
end
defp do_runtime_filter(
%{action: %{type: :create}},
_filter,
_authorizer,
_authorizer_state
) do
{:error, Ash.Error.Forbidden.CannotFilterCreates.exception([])}
end
defp do_runtime_filter(request, filter, authorizer, authorizer_state) do
pkey = Ash.Resource.Info.primary_key(request.resource)
pkey =
request.changeset.data
|> Map.take(pkey)
|> Map.to_list()
new_query =
request.resource
|> Ash.Query.set_tenant(request.changeset.tenant)
|> Ash.Query.set_context(request.changeset.context)
|> Ash.Query.filter(^filter)
|> Ash.Query.limit(1)
query_with_pkey_filter = Ash.Query.filter(new_query, ^pkey)
query_with_pkey_filter
|> Map.put(:api, request.api)
|> Ash.Query.select([])
|> Ash.Query.data_layer_query()
|> case do
{:ok, data_layer_query} ->
data_layer_query
|> Ash.DataLayer.run_query(request.resource)
|> case do
{:ok, []} ->
{:error,
Authorizer.exception(
authorizer,
{:changeset_doesnt_match_filter, new_query.filter},
authorizer_state
)}
{:ok, [_]} ->
{:ok, request}
{:error, error} ->
{:error, error}
end
{:error, error} ->
{:error, error}
end
rescue
e ->
Logger.error(
"Exception while running authorization query: #{inspect(Exception.format(:error, e, __STACKTRACE__))}"
)
{:error,
Authorizer.exception(
authorizer,
:unknown,
authorizer_state
)}
end
defp try_resolve(request, deps, internal?) do
Enum.reduce_while(deps, {:ok, request, [], []}, fn dep,
{:ok, request, notifications, skipped} ->
case get_dependency_data(request, dep) do
{:ok, _value} ->
{:cont, {:ok, request, notifications, skipped}}
:error ->
do_try_resolve(request, notifications, skipped, dep, internal?)
end
end)
end
defp do_try_resolve(request, notifications, skipped, dep, internal?) do
if local_dep?(request, dep) do
case try_resolve_local(request, List.last(dep), internal?) do
{:skipped, request, new_notifications, other_deps} ->
{:cont, {:ok, request, new_notifications ++ notifications, skipped ++ other_deps}}
{:ok, request, new_notifications, other_deps} ->
{:cont, {:ok, request, new_notifications ++ notifications, skipped ++ other_deps}}
{:error, error} ->
{:halt, {:error, error}}
end
else
{:cont, {:ok, request, notifications, [dep | skipped]}}
end
end
defp try_resolve_local(request, field, internal?) do
authorized? = Enum.all?(Map.values(request.authorizer_state), &(&1 == :authorized))
# Don't honor requests for special fields until the request is authorized
if field in [
:data,
:query,
:changeset,
:authorized?,
:data_layer_query,
:authorization_filter
] and not authorized? and
not internal? do
try_resolve_dependencies_of(request, field, internal?)
else
case Map.get(request, field) do
%UnresolvedField{} = unresolved ->
do_try_resolve_local(request, field, unresolved, internal?)
value ->
notify_existing_value(request, field, value)
end
end
end
defp try_resolve_dependencies_of(request, field, internal?) do
case Map.get(request, field) do
%UnresolvedField{deps: deps} ->
case try_resolve(request, deps, internal?) do
{:ok, new_request, notifications, remaining_deps} ->
{:skipped, new_request, notifications, remaining_deps}
error ->
error
end
_ ->
{:skipped, request, [], []}
end
end
defp notify_existing_value(request, field, value) do
{new_request, notifications} = notifications(request, field, value)
{:ok, new_request, notifications, []}
end
defp do_try_resolve_local(request, field, unresolved, internal?) do
%{deps: deps, resolver: resolver} = unresolved
with {:ok, new_request, notifications, []} <-
try_resolve(request, deps, internal?) do
resolver_context = resolver_context(new_request, deps)
log(request, fn -> "resolving #{field}" end)
# Currently, only the `data` field is resolved asynchronously
cond do
field == :completion && new_request.state != :complete ->
{:skipped, new_request, notifications, []}
field == :data && new_request.async? &&
!match?({:fetched, _}, new_request.async_fetch_state) ->
if new_request.async_fetch_state in [:requested, :fetching] do
{:skipped, new_request, notifications, []}
else
{:skipped, %{new_request | async_fetch_state: {:requested, resolver_context}},
notifications, []}
end
true ->
# Here we reset async_fetch_state to `:not_requested`
# because one of two things is true:
# the resolver returned additional requests, in which case
# we will need to call it again later, *or* the resolution
# moves the request onto the next step in which case
# async_fetch_state is no longer in play.
{new_request, result} =
if field == :data && new_request.async? do
{_, result} = new_request.async_fetch_state
{%{new_request | async_fetch_state: :not_requested}, result}
else
if field == :data do
Ash.Tracer.span :request_step,
request.name,
request.tracer do
metadata = %{
resource_short_name:
request.resource && Ash.Resource.Info.short_name(request.resource),
resource: request.resource,
actor: request.actor,
action: request.action && request.action.name,
authorize?: request.authorize?
}
Ash.Tracer.set_metadata(request.tracer, :request_step, metadata)
Ash.Tracer.telemetry_span [:ash, :request_step], %{name: request.name} do
try do
{%{new_request | async_fetch_state: :not_requested},
resolver.(resolver_context)}
rescue
e ->
reraise Ash.Error.to_error_class(
Ash.Error.to_ash_error(e, __STACKTRACE__,
error_context: "resolving #{field} on #{request.name}"
)
),
__STACKTRACE__
end
end
end
else
try do
{%{new_request | async_fetch_state: :not_requested},
resolver.(resolver_context)}
rescue
e ->
reraise Ash.Error.to_error_class(
Ash.Error.to_ash_error(e, __STACKTRACE__,
error_context: "resolving #{field} on #{request.name}"
)
),
__STACKTRACE__
end
end
end
result =
case result do
{:requests, requests} ->
{:requests, requests, []}
other ->
other
end
case result do
{:new_deps, new_deps} ->
log(request, fn -> "New dependencies for #{field}: #{inspect(new_deps)}" end)
new_request =
Map.put(
new_request,
field,
%{unresolved | deps: Enum.uniq(unresolved.deps ++ new_deps)}
)
{:skipped, new_request, notifications, new_deps}
{:requests, requests, new_deps} ->
log(request, fn ->
paths =
Enum.map(requests, fn
{request, _} ->
request.path
request ->
request.path
end)
"#{field} added requests #{inspect(paths)}"
end)
new_deps =
requests
|> Enum.flat_map(fn
{request, key} ->
[request.path ++ [key]]
_request ->
[]
end)
|> Enum.concat(new_deps)
new_unresolved =
Map.update!(
unresolved,
:deps,
&Enum.uniq(&1 ++ new_deps)
)
new_request = Map.put(new_request, field, new_unresolved)
new_requests =
Enum.map(requests, fn
{request, _} ->
%{request | error_path: request.error_path || new_request.error_path}
request ->
%{request | error_path: request.error_path || new_request.error_path}
end)
{:skipped, new_request,
notifications ++
[{:requests, new_requests}], new_deps}
{:ok, value, instructions} ->
log(request, fn ->
"successfully resolved #{field} with instructions"
end)
set_data_notifications =
Enum.map(Map.get(instructions, :extra_data, %{}), fn {key, value} ->
{:set_extra_data, key, value}
end)
new_request = Map.merge(new_request, instructions[:set_keys] || %{})
resource_notifications = Map.get(instructions, :notifications, [])
extra_requests = Map.get(instructions, :requests, [])
unless Enum.empty?(extra_requests) do
log(new_request, fn ->
"added requests #{inspect(Enum.map(extra_requests, & &1.path))}"
end)
end
request_notifications =
case extra_requests do
[] ->
[]
nil ->
[]
requests ->
[
{:requests,
Enum.map(requests, fn
{request, _} ->
%{request | error_path: request.error_path || new_request.error_path}
request ->
%{request | error_path: request.error_path || new_request.error_path}
end)}
]
end
handle_successful_resolve(
field,
value,
new_request,
notifications ++
resource_notifications ++
set_data_notifications ++ request_notifications
)
{:ok, value} ->
log(request, fn ->
"successfully resolved #{field}"
end)
handle_successful_resolve(
field,
value,
new_request,
notifications
)
{:error, error, %{set: %{changeset: new_changeset}}} ->
{:error, error, %{request | changeset: new_changeset}}
{:error, error, %{set: %{query: new_query}}} ->
{:error, error, %{request | query: new_query}}
{:error, error} ->
log(request, fn ->
"error resolving #{field}:\n #{inspect(error)}"
end)
{:error, error}
other ->
{:error,
Ash.Error.Framework.AssumptionFailed.exception(
message: """
Invalid return from request resolver for "#{request.name}" #{field}:
Got:
#{inspect(other)}
"""
)}
end
end
end
end
defp handle_successful_resolve(field, value, new_request, notifications) do
value = process_resolved_field(field, value, new_request)
{new_request, new_notifications} = notifications(new_request, field, value)
notifications =
Enum.concat([
notifications,
new_notifications
])
new_request = Map.put(new_request, field, value)
{:ok, new_request, notifications, []}
end
defp process_resolved_field(:query, %Ash.Query{} = query, request) do
Ash.Query.set_context(query, %{
authorize?: request.authorize?,
actor: request.actor
})
end
defp process_resolved_field(:changeset, %Ash.Changeset{} = changeset, request) do
Ash.Changeset.set_context(changeset, %{
authorize?: request.authorize?,
actor: request.actor
})
end
defp process_resolved_field(_, value, _), do: value
defp get_dependency_data(request, dep) do
if local_dep?(request, dep) do
case Map.fetch(request, List.last(dep)) do
{:ok, %UnresolvedField{}} -> :error
{:ok, value} -> {:ok, value}
:error -> :error
end
else
Map.fetch(request.dependency_data, dep)
end
end
defp notifications(request, field, value) do
case Map.fetch(request.dependencies_to_send, field) do
{:ok, paths} ->
new_request = %{
request
| dependencies_to_send: Map.delete(request.dependencies_to_send, field)
}
notifications =
Enum.map(paths, fn path ->
{path, request.path, field, value}
end)
{new_request, notifications}
:error ->
{request, []}
end
end
defp resolver_context(request, deps) do
Enum.reduce(deps, %{}, fn dep, resolver_context ->
case get_dependency_data(request, dep) do
{:ok, value} ->
Ash.Engine.put_nested_key(resolver_context, dep, value)
:error ->
resolver_context
end
end)
|> Map.put(:verbose?, request.verbose?)
|> Map.put(:actor, request.actor)
|> Map.put(:authorize?, request.authorize?)
|> Map.put(:async?, request.async?)
|> Ash.Helpers.deep_merge_maps(request.additional_context || %{})
end
defp local_dep?(request, dep) do
:lists.droplast(dep) == request.path
end
def add_initial_authorizer_state(%{resource: nil} = request), do: request
def add_initial_authorizer_state(request) do
request.resource
|> Ash.Resource.Info.authorizers()
|> Enum.reduce(request, fn authorizer, request ->
if request.authorize? do
initial_state =
Authorizer.initial_state(
authorizer,
request.actor,
request.resource,
request.action,
request.verbose?
)
set_authorizer_state(request, authorizer, initial_state)
else
set_authorizer_state(request, authorizer, :authorized)
end
end)
end
defp missing_strict_check_dependencies?(authorizer, request) do
authorizer
|> Authorizer.strict_check_context(authorizer_state(request, authorizer))
|> List.wrap()
|> Enum.filter(fn dependency ->
match?(%UnresolvedField{}, Map.get(request, dependency))
end)
end
defp missing_check_dependencies(authorizer, request) do
authorizer
|> Authorizer.check_context(authorizer_state(request, authorizer))
|> Enum.filter(fn dependency ->
match?(%UnresolvedField{}, Map.get(request, dependency))
end)
end
defp strict_check_authorizer(authorizer, request) do
log(request, fn -> "strict checking for #{inspect(authorizer)}" end)
authorizer_state = authorizer_state(request, authorizer)
keys = Authorizer.strict_check_context(authorizer, authorizer_state)
Authorizer.strict_check(
authorizer,
authorizer_state,
Map.take(request, keys)
)
end
defp check_authorizer(authorizer, request) do
log(request, fn -> "checking for #{inspect(authorizer)}" end)
authorizer_state = authorizer_state(request, authorizer)
keys = Authorizer.check_context(authorizer, authorizer_state)
Authorizer.check(authorizer, authorizer_state, Map.take(request, keys))
end
defp set_authorizer_state(request, authorizer, authorizer_state) do
%{
request
| authorizer_state: Map.put(request.authorizer_state, authorizer, authorizer_state)
}
end
defp get_authorizer_state(request, authorizer) do
request.authorizer_state[authorizer] || %{}
end
defp authorizer_state(request, authorizer) do
Map.get(request.authorizer_state, authorizer) || %{}
end
def validate_requests!(requests) do
validate_unique_paths!(requests)
validate_dependencies!(requests)
:ok
end
defp validate_unique_paths!(requests) do
requests
|> Enum.group_by(& &1.path)
|> Enum.filter(fn {_path, value} ->
Enum.count(value, & &1.write_to_data?) > 1
end)
|> case do
[] ->
:ok
invalid_paths ->
invalid_paths = Enum.map(invalid_paths, &elem(&1, 0))
raise DuplicatedPath, paths: invalid_paths
end
end
defp validate_dependencies!(requests) do
Enum.each(requests, &do_build_dependencies(&1, requests))
:ok
end
defp do_build_dependencies(request, requests, trail \\ []) do
request
|> Map.from_struct()
|> Enum.each(fn
{_key, %UnresolvedField{deps: deps}} ->
expand_deps(deps, requests, trail)
_ ->
:ok
end)
end
defp expand_deps(deps, requests, trail) do
case do_expand_deps(deps, requests, trail) do
:ok ->
:ok
{:error, {:impossible, dep}} ->
raise ImpossiblePath, impossible_path: dep, paths: Enum.map(requests, & &1.path)
end
end
defp do_expand_deps([], _, _), do: :ok
defp do_expand_deps(deps, requests, trail) do
Enum.reduce_while(deps, :ok, fn dep, :ok ->
case do_expand_dep(dep, requests, trail) do
:ok -> {:cont, :ok}
{:error, error} -> {:halt, {:error, error}}
end
end)
end
defp do_expand_dep(dep, requests, trail) do
if dep in trail do
{:error, {:circular, dep}}
else
request_path = :lists.droplast(dep)
request_key = List.last(dep)
case Enum.find(requests, &(&1.path == request_path)) do
nil ->
{:error, {:impossible, dep}}
%{^request_key => %UnresolvedField{deps: nested_deps}} ->
case do_expand_deps(nested_deps, requests, [dep | trail]) do
:ok -> :ok
other -> other
end
_ ->
:ok
end
end
end
defp log(request, message, level \\ :debug)
defp log(%{verbose?: true, name: name} = request, message, level) do
if is_list(request.data) do
Logger.log(level, fn ->
message = message.()
"#{name}: #{Enum.count(request.data)} #{message}"
end)
else
Logger.log(level, fn ->
message = message.()
"#{name}: #{message}"
end)
end
end
defp log(_, _, _) do
false
end
end