defmodule Ash.Changeset do
@moduledoc """
Changesets are used to create and update data in Ash.
Create a changeset with `new/1` or `new/2`, and alter the attributes
and relationships using the functions provided in this module. Nothing in this module
actually incurs changes in a data layer. To commit a changeset, see `Ash.create/2`
and `Ash.update/2`.
# Changeset lifecycle
## Action Lifecycle
The following example illustrates the hook lifecycle of a changeset.
```elixir
defmodule AshChangesetLifeCycleExample do
def change(changeset, _, _) do
changeset
# execute code both before and after the transaction
|> Ash.Changeset.around_transaction(fn changeset, callback ->
callback.(changeset)
end)
# execute code before the transaction is started. Use for things like external calls
|> Ash.Changeset.before_transaction(fn changeset -> changeset end)
# execute code in the transaction, before and after the data layer is called
|> Ash.Changeset.around_action(fn changeset, callback ->
callback.(changeset)
end)
# execute code in the transaction, before the data layer is called
|> Ash.Changeset.before_action(fn changeset -> changeset end)
# execute code in the transaction, after the data layer is called, only if the action is successful
|> Ash.Changeset.after_action(fn changeset, result -> {:ok, result} end)
# execute code after the transaction, both in success and error cases
|> Ash.Changeset.after_transaction(fn changeset, success_or_error_result -> success_or_error_result end
end
end
```
"""
defstruct [
:__validated_for_action__,
:action_type,
:action,
:domain,
:data,
:handle_errors,
:resource,
:tenant,
:to_tenant,
:timeout,
dirty_hooks: [],
invalid_keys: MapSet.new(),
filter: nil,
added_filter: nil,
action_failed?: false,
atomics: [],
atomic_validations: [],
after_action: [],
after_transaction: [],
arguments: %{},
around_action: [],
around_transaction: [],
attributes: %{},
before_action: [],
before_transaction: [],
no_atomic_constraints: [],
context: %{},
context_changes: %{},
defaults: [],
errors: [],
params: %{},
action_select: [],
atomic_after_action: [],
attribute_changes: %{},
atomic_changes: [],
casted_attributes: %{},
casted_arguments: %{},
phase: :pending,
relationships: %{},
select: nil,
load: [],
valid?: true
]
defimpl Inspect do
import Inspect.Algebra
@spec inspect(Ash.Changeset.t(), Inspect.Opts.t()) ::
{:doc_cons, :doc_line | :doc_nil | binary | tuple,
:doc_line | :doc_nil | binary | tuple}
| {:doc_group,
:doc_line
| :doc_nil
| binary
| {:doc_collapse, pos_integer}
| {:doc_force, any}
| {:doc_break | :doc_color | :doc_cons | :doc_fits | :doc_group | :doc_string, any,
any}
| {:doc_nest, any, :cursor | :reset | non_neg_integer, :always | :break},
:inherit | :self}
def inspect(changeset, opts) do
context = Map.delete(changeset.context, :private)
context =
if context == %{} do
empty()
else
concat("context: ", to_doc(context, opts))
end
tenant =
if changeset.tenant do
concat(
"tenant: ",
to_doc(changeset.to_tenant, opts)
)
else
empty()
end
domain =
if changeset.domain do
concat("domain: ", to_doc(changeset.domain, opts))
else
empty()
end
select =
if changeset.select do
concat("select: ", to_doc(changeset.select, opts))
else
empty()
end
load =
if changeset.load && changeset.load != [] do
concat("load: ", to_doc(changeset.load, opts))
else
empty()
end
atomics =
if Enum.empty?(changeset.atomics) do
empty()
else
concat("atomics: ", to_doc(changeset.atomics, opts))
end
filter =
case changeset.filter do
nil ->
empty()
%Ash.Filter{expression: nil} ->
empty()
_ ->
concat("filter: ", to_doc(changeset.filter, opts))
end
container_doc(
"#Ash.Changeset<",
[
domain,
concat("action_type: ", inspect(changeset.action_type)),
concat("action: ", inspect(changeset.action && changeset.action.name)),
tenant,
concat("attributes: ", to_doc(changeset.attributes, opts)),
atomics,
concat("relationships: ", to_doc(changeset.relationships, opts)),
arguments(changeset, opts),
concat("errors: ", to_doc(changeset.errors, opts)),
filter,
concat("data: ", to_doc(changeset.data, opts)),
context,
concat("valid?: ", to_doc(changeset.valid?, opts)),
select,
load
],
">",
opts,
fn str, _ -> str end
)
end
defp arguments(changeset, opts) do
if changeset.action do
if Enum.empty?(changeset.action.arguments) do
empty()
else
arg_string =
changeset.action.arguments
|> Enum.filter(fn argument ->
match?({:ok, _}, Ash.Changeset.fetch_argument(changeset, argument.name))
end)
|> Map.new(fn argument ->
value = Ash.Changeset.get_argument(changeset, argument.name)
if argument.sensitive? do
{argument.name, Ash.Helpers.redact(value)}
else
{argument.name, value}
end
end)
|> to_doc(opts)
concat(["arguments: ", arg_string])
end
else
empty()
end
end
end
@type after_action_fun ::
(t, Ash.Resource.record() ->
{:ok, Ash.Resource.record()}
| {:ok, Ash.Resource.record(), [Ash.Notifier.Notification.t()]}
| {:error, any})
@type after_transaction_fun ::
(t, {:ok, Ash.Resource.record()} | {:error, any} ->
{:ok, Ash.Resource.record()} | {:error, any})
@type before_action_fun :: (t -> t | {t, %{notifications: [Ash.Notifier.Notification.t()]}})
@type before_transaction_fun :: (t -> t)
@type around_action_result ::
{:ok, Ash.Resource.record(), t(), %{notifications: list(Ash.Notifier.Notification.t())}}
| {:error, Ash.Error.t()}
@type around_action_callback :: (t -> around_action_result)
@type around_action_fun :: (t, around_action_callback -> around_action_result)
@type around_transaction_result :: {:ok, Ash.Resource.record()} | {:error, any}
@type around_transaction_callback :: (t -> around_transaction_result)
@type around_transaction_fun :: (t, around_transaction_callback -> around_transaction_result)
@phases [
:atomic,
:pending,
:validate,
:before_action,
:after_action,
:before_transaction,
:after_transaction,
:around_action,
:around_transaction
]
@type phase :: unquote(Enum.reduce(@phases, &{:|, [], [&1, &2]}))
@type t :: %__MODULE__{
__validated_for_action__: atom | nil,
action: Ash.Resource.Actions.action() | nil,
action_failed?: boolean,
action_type: Ash.Resource.Actions.action_type() | nil,
after_action: [after_action_fun | {after_action_fun, map}],
after_transaction: [after_transaction_fun | {after_transaction_fun, map}],
atomics: Keyword.t(),
domain: module | nil,
arguments: %{optional(atom) => any},
around_action: [around_action_fun | {around_action_fun, map}],
around_transaction: [around_transaction_fun | {around_transaction_fun, map}],
attributes: %{optional(atom) => any},
before_action: [before_action_fun | {before_action_fun, map}],
before_transaction: [before_transaction_fun | {before_transaction_fun, map}],
context: map,
filter: Ash.Filter.t() | nil,
added_filter: Ash.Filter.t() | nil,
data: Ash.Resource.record() | nil,
defaults: [atom],
errors: [Ash.Error.t()],
handle_errors:
nil | (t, error :: any -> :ignore | t | (error :: any) | {error :: any, t}),
invalid_keys: MapSet.t(),
params: %{optional(atom | binary) => any},
phase: phase(),
relationships: %{
optional(atom) =>
%{optional(atom | binary) => any} | [%{optional(atom | binary) => any}]
},
resource: module,
select: [atom] | nil,
load: keyword(keyword),
tenant: term(),
timeout: pos_integer() | nil,
valid?: boolean
}
@type error_info ::
String.t()
| [
{:field, atom()}
| {:fields, [atom()]}
| {:message, String.t()}
| {:value, any()}
]
| Ash.Error.t()
alias Ash.Error.{
Changes.InvalidArgument,
Changes.InvalidAttribute,
Changes.InvalidChanges,
Changes.InvalidRelationship,
Changes.NoSuchAttribute,
Changes.NoSuchRelationship,
Changes.Required,
Invalid.NoSuchInput,
Invalid.NoSuchResource
}
require Ash.Tracer
import Ash.Expr
require Logger
defmodule OriginalDataNotAvailable do
@moduledoc "A value placed in changeset.data to indicate that the original data is not available"
defstruct reason: :atomic_query_update
@type t :: %__MODULE__{reason: :atomic_query_update}
end
defmacrop maybe_already_validated_error!(changeset, alternative \\ nil) do
{function, arity} = __CALLER__.function
if alternative do
quote do
changeset = unquote(changeset)
{:current_stacktrace, stacktrace} =
Process.info(self(), :current_stacktrace)
if changeset.__validated_for_action__ do
require Logger
Logger.warning("""
Changeset has already been validated for action #{inspect(changeset.__validated_for_action__)}.
For safety, we prevent any changes after that point because they will bypass validations or other action logic.. To proceed anyway,
you can use `#{unquote(alternative)}/#{unquote(arity)}`. However, you should prefer a pattern like the below, which makes
any custom changes *before* calling the action.
Resource
|> Ash.Changeset.new()
|> Ash.Changeset.#{unquote(function)}(...)
|> Ash.Changeset.for_create(...)
#{Exception.format_stacktrace(stacktrace)}
""")
end
end
else
quote do
changeset = unquote(changeset)
{:current_stacktrace, stacktrace} =
Process.info(self(), :current_stacktrace)
if changeset.__validated_for_action__ do
require Logger
Logger.warning("""
Changeset has already been validated for action #{inspect(changeset.__validated_for_action__)}.
For safety, we prevent any changes using `#{unquote(function)}/#{unquote(arity)}` after that point because they will bypass validations or other action logic.
Instead, you should change or set this value before calling the action, like so:
Resource
|> Ash.Changeset.new()
|> Ash.Changeset.#{unquote(function)}(...)
|> Ash.Changeset.for_create(...)
#{Exception.format_stacktrace(stacktrace)}
""")
end
end
end
end
@doc """
A guard which checks if the Changeset is valid.
"""
@spec is_valid(t) :: Macro.output()
defguard is_valid(changeset) when is_struct(changeset, __MODULE__) and changeset.valid? == true
@doc """
Returns a new changeset over a resource.
*Warning*: You almost always want to use `for_action` or `for_create`, etc. over this function if possible.
You can use this to start a changeset and make changes prior to calling `for_action`. This is not typically
necessary, but can be useful as an escape hatch. For example:
```elixir
Resource
|> Ash.Changeset.new()
|> Ash.Changeset.change_attribute(:name, "foobar")
|> Ash.Changeset.for_action(...)
```
"""
@spec new(Ash.Resource.t() | Ash.Resource.record()) :: t
def new(record_or_resource) do
{resource, record, action_type} =
case record_or_resource do
%resource{} = record -> {resource, record, :update}
resource -> {resource, struct(resource), :create}
end
tenant =
record
|> Map.get(:__metadata__, %{})
|> Map.get(:tenant, nil)
context = Ash.Resource.Info.default_context(resource) || %{}
if Ash.Resource.Info.resource?(resource) do
%__MODULE__{resource: resource, data: record, action_type: action_type}
|> set_context(context)
|> set_tenant(tenant)
else
%__MODULE__{
resource: resource,
action_type: action_type,
data: struct(resource)
}
|> add_error(NoSuchResource.exception(resource: resource))
|> set_tenant(tenant)
|> set_context(context)
end
end
@doc """
Ensure that only the specified attributes are present in the results.
The first call to `select/2` will replace the default behavior of selecting
all attributes. Subsequent calls to `select/2` will combine the provided
fields unless the `replace?` option is provided with a value of `true`.
If a field has been deselected, selecting it again will override that (because a single list of fields is tracked for selection)
Primary key attributes always selected and cannot be deselected.
When attempting to load a relationship (or manage it with `Ash.Changeset.manage_relationship/3`),
if the source field is not selected on the query/provided data an error will be produced. If loading
a relationship with a query, an error is produced if the query does not select the destination field
of the relationship.
Datalayers currently are not notified of the `select` for a changeset(unlike queries), and creates/updates select all fields when they are performed.
A select provided on a changeset sets the unselected fields to `nil` before returning the result.
Use `ensure_selected/2` if you wish to make sure a field has been selected, without deselecting any other fields.
"""
def select(changeset, fields, opts \\ []) do
if opts[:replace?] do
case fields do
%MapSet{} = fields -> %{changeset | select: Enum.to_list(fields)}
fields -> %{changeset | select: Enum.uniq(List.wrap(fields))}
end
else
case fields do
%MapSet{} ->
%{
changeset
| select: MapSet.union(MapSet.new(changeset.select || []), fields) |> MapSet.to_list()
}
fields ->
%{changeset | select: Enum.uniq(List.wrap(fields) ++ (changeset.select || []))}
end
end
end
@doc false
def set_action_select(%{action: nil} = changeset) do
%{
changeset
| action_select:
MapSet.to_list(
Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)
)
}
end
def set_action_select(changeset) do
if Ash.DataLayer.data_layer_can?(changeset.resource, :action_select) do
required =
Ash.Resource.Info.action_select(changeset.resource, changeset.action.name) || []
select =
changeset.select ||
MapSet.to_list(
Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)
)
%{
changeset
| action_select: Enum.uniq(List.wrap(required) |> Enum.concat(select))
}
else
%{
changeset
| action_select:
MapSet.to_list(
Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)
)
}
end
end
@doc """
Calls the provided load statement on the result of the action at the very end of the action.
"""
@spec load(t(), term()) :: t()
def load(changeset, load) do
query =
changeset.resource
|> Ash.Query.new()
|> Map.put(:errors, [])
|> Ash.Query.load(changeset.load)
|> Ash.Query.load(load)
changeset = %{
changeset
| load: Enum.concat(changeset.load || [], List.wrap(load))
}
Enum.reduce(query.errors, changeset, &add_error(&2, &1))
end
@doc """
Ensures that the given attributes are selected.
The first call to `select/2` will *limit* the fields to only the provided fields.
Use `ensure_selected/2` to say "select this field (or these fields) without deselecting anything else".
See `select/2` for more.
"""
def ensure_selected(changeset, fields) do
if changeset.select do
Ash.Changeset.select(changeset, List.wrap(fields))
else
to_select = Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)
Ash.Changeset.select(changeset, to_select)
end
end
@doc """
Ensure the the specified attributes are `nil` in the changeset results.
"""
def deselect(changeset, fields) do
select =
if changeset.select do
changeset.select
else
changeset.resource
|> Ash.Resource.Info.attributes()
|> Enum.map(& &1.name)
end
select = select -- List.wrap(fields)
select(changeset, select, replace?: true)
end
def selecting?(changeset, field) do
case changeset.select do
nil ->
not is_nil(Ash.Resource.Info.attribute(changeset.resource, field))
select ->
if field in select do
true
else
attribute = Ash.Resource.Info.attribute(changeset.resource, field)
attribute && attribute.primary_key?
end
end || loading?(changeset, field)
end
@doc """
Returns true if the field/relationship or path to field/relationship is being loaded.
It accepts an atom or a list of atoms, which is treated for as a "path", i.e:
Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?([:friends, :enemies, :score])
iex> true
Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?([:friends, :score])
iex> false
Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?(:friends)
iex> true
"""
def loading?(changeset, path) do
changeset.resource
|> Ash.Query.new()
|> Ash.Query.load(changeset.load)
|> Ash.Query.loading?(path)
end
@doc """
Returns a list of attributes, aggregates, relationships, and calculations that are being loaded
Provide a list of field types to narrow down the returned results.
"""
def accessing(
changeset,
types \\ [:attributes, :relationships, :calculations, :attributes],
only_public? \\ true
) do
changeset.resource
|> Ash.Query.new()
|> Ash.Query.load(changeset.load)
|> Map.put(:select, changeset.select)
|> Ash.Query.accessing(types, only_public?)
end
@spec fully_atomic_changeset(
resource :: Ash.Resource.t(),
action :: atom() | Ash.Resource.Actions.action(),
params :: map(),
opts :: Keyword.t()
) :: Ash.Changeset.t() | {:not_atomic, String.t()}
def fully_atomic_changeset(resource, action, params, opts \\ []) do
action =
case action do
action when is_atom(action) -> Ash.Resource.Info.action(resource, action)
action -> action
end
if action.manual do
{:not_atomic,
"manual action `#{inspect(resource)}.#{action.name}` cannot be performed atomically"}
else
changeset =
resource
|> Ash.Changeset.new()
|> then(fn changeset ->
if data = opts[:data] do
Map.put(changeset, :data, data)
else
Map.put(changeset, :data, %OriginalDataNotAvailable{})
end
end)
|> Map.put(:context, opts[:context] || %{})
|> Map.put(:params, params)
|> Map.put(:action, action)
|> Map.put(:no_atomic_constraints, opts[:no_atomic_constraints] || [])
|> Map.put(:action_type, action.type)
|> Map.put(:atomics, opts[:atomics] || [])
|> Ash.Changeset.set_tenant(opts[:tenant])
{changeset, _opts} =
Ash.Actions.Helpers.set_context_and_get_opts(
opts[:domain] || Ash.Resource.Info.domain(resource),
changeset,
opts
)
changeset = set_phase(changeset, :atomic)
with :ok <- verify_notifiers_support_atomic(resource, action),
%Ash.Changeset{} = changeset <-
atomic_params(changeset, action, params, opts),
%Ash.Changeset{} = changeset <- set_argument_defaults(changeset, action),
%Ash.Changeset{} = changeset <- require_arguments(changeset, action),
%Ash.Changeset{} = changeset <- atomic_changes(changeset, action),
%Ash.Changeset{} = changeset <- atomic_defaults(changeset),
%Ash.Changeset{} = changeset <- atomic_update(changeset, opts[:atomic_update] || []),
%Ash.Changeset{} = changeset <-
hydrate_atomic_refs(changeset, opts[:actor], Keyword.take(opts, [:eager?])),
%Ash.Changeset{} = changeset <- apply_atomic_constraints(changeset, opts[:actor]) do
changeset
else
{:not_atomic, reason} ->
{:not_atomic, reason}
end
end
|> case do
{:not_atomic, reason} ->
{:not_atomic, reason}
changeset ->
clear_phase(changeset)
end
end
defp atomic_defaults(changeset) do
with %__MODULE__{} <- atomic_static_update_defaults(changeset) do
atomic_lazy_update_defaults(changeset)
end
end
defp atomic_static_update_defaults(changeset) do
changeset.resource
|> Ash.Resource.Info.static_default_attributes(:update)
|> Enum.reject(fn attribute ->
Ash.Changeset.changing_attribute?(changeset, attribute.name)
end)
|> Enum.reduce_while(changeset, fn attribute, changeset ->
case Ash.Type.cast_atomic(
attribute.type,
attribute.update_default,
attribute.constraints
) do
{:atomic, atomic} ->
{:cont, atomic_update(changeset, attribute.name, {:atomic, atomic})}
{:ok, value} ->
allow_nil? =
attribute.allow_nil? and attribute.name not in changeset.action.require_attributes
if is_nil(value) and !allow_nil? do
{:cont, add_required_attribute_error(changeset, attribute)}
else
{:cont,
%{
changeset
| attributes: Map.put(changeset.attributes, attribute.name, value),
atomics: Keyword.delete(changeset.atomics, attribute.name)
}
|> store_casted_attribute(attribute.name, value, true)}
end
{:error, error} ->
{:cont,
add_invalid_errors(attribute.update_default, :attribute, changeset, attribute, error)}
{:not_atomic, reason} ->
{:halt, {:not_atomic, reason}}
end
end)
end
defp atomic_lazy_update_defaults(changeset) do
changeset.resource
|> Ash.Resource.Info.lazy_matching_default_attributes(:update)
|> Enum.concat(
Ash.Resource.Info.lazy_non_matching_default_attributes(changeset.resource, :update)
)
|> Enum.reject(fn attribute ->
Ash.Changeset.changing_attribute?(changeset, attribute.name)
end)
|> Enum.reduce_while(changeset, fn attribute, changeset ->
cond do
attribute.update_default == (&DateTime.utc_now/0) ->
{:cont, atomic_update(changeset, attribute.name, Ash.Expr.expr(now()))}
attribute.update_default == (&Ash.UUID.generate/0) ->
{:cont, atomic_update(changeset, attribute.name, Ash.Expr.expr(^Ash.UUID.generate()))}
true ->
{:halt,
{:not_atomic,
"update_default for `#{inspect(attribute.name)}` cannot be done atomically: #{inspect(attribute.update_default)}"}}
end
end)
end
defp verify_notifiers_support_atomic(resource, action) do
resource
|> Ash.Resource.Info.notifiers()
|> Enum.filter(fn notifier ->
notifier.requires_original_data?(resource, action)
end)
|> case do
[] ->
:ok
notifiers ->
{:not_atomic,
"notifiers #{inspect(notifiers)} require original data for #{inspect(resource)}.#{action.name}"}
end
end
defp atomic_changes(changeset, action) do
changes =
action.changes
|> Enum.concat(Ash.Resource.Info.changes(changeset.resource, changeset.action_type))
|> then(fn changes ->
if changeset.action.skip_global_validations? do
changes
else
Enum.concat(
changes,
Ash.Resource.Info.validations(changeset.resource, changeset.action_type)
)
end
end)
context = %{
actor: changeset.context[:private][:actor],
tenant: changeset.tenant,
authorize?: changeset.context[:private][:authorize?] || false,
tracer: changeset.context[:private][:tracer]
}
changeset = set_phase(changeset, :atomic)
Enum.reduce_while(changes, changeset, fn
%{change: _} = change, changeset ->
case run_atomic_change(changeset, change, context) do
{:not_atomic, reason} ->
{:halt, {:not_atomic, reason}}
changeset ->
{:cont, changeset}
end
%{validation: _} = validation, changeset ->
case run_atomic_validation(changeset, validation, context) do
{:not_atomic, reason} ->
{:halt, {:not_atomic, reason}}
changeset ->
{:cont, changeset}
end
end)
|> case do
{:not_atomic, reason} -> {:not_atomic, reason}
%__MODULE__{} = changeset -> clear_phase(changeset)
end
end
@doc false
def split_atomic_conditions(%{where: []} = validation, _changeset, _actor, _context) do
{:ok, validation}
end
def split_atomic_conditions(
%{where: [{module, opts} | rest]} = validation,
changeset,
actor,
context
) do
if module.has_validate?() do
opts =
Ash.Actions.Helpers.templated_opts(opts, actor, changeset.arguments, changeset.context)
{:ok, opts} = module.init(opts)
case module.validate(
changeset,
opts,
context
) do
:ok -> split_atomic_conditions(%{validation | where: rest}, changeset, actor, context)
_ -> :skip
end
else
if module.atomic?() do
case split_atomic_conditions(%{validation | where: rest}, changeset, actor, context) do
{:ok, %{where: remaining} = validation} ->
{:ok, %{validation | where: [{module, opts} | remaining]}}
other ->
other
end
else
raise "Module #{module} must define one of `atomic/3` or `validate/3`"
end
end
end
@doc false
def run_atomic_validation(changeset, %{where: where} = validation, context) do
with {:atomic, condition} <- atomic_condition(where, changeset, context) do
case condition do
false ->
changeset
true ->
do_run_atomic_validation(changeset, validation, context)
where_condition ->
do_run_atomic_validation(changeset, validation, context, where_condition)
end
end
end
defp do_run_atomic_validation(
changeset,
%{validation: {module, validation_opts}, message: message},
context,
where_condition \\ nil
) do
case List.wrap(
module.atomic(
changeset,
validation_opts,
struct(Ash.Resource.Validation.Context, Map.put(context, :message, message))
)
) do
[{:atomic, _, _, _} | _] = atomics ->
Enum.reduce(atomics, changeset, fn
{:atomic, _fields, condition_expr, error_expr}, changeset ->
condition_expr =
if where_condition do
expr(^where_condition and ^condition_expr)
else
condition_expr
end
condition_expr = rewrite_atomics(changeset, condition_expr)
validate_atomically(changeset, condition_expr, error_expr)
end)
[:ok] ->
changeset
[{:error, error}] ->
if message do
error = override_validation_message(error, message)
Ash.Changeset.add_error(changeset, error)
else
Ash.Changeset.add_error(changeset, error)
end
[{:not_atomic, error}] ->
{:not_atomic, error}
end
end
defp rewrite_atomics(changeset, expr) do
Ash.Expr.walk_template(expr, fn
{:_atomic_ref, ref} ->
atomic_ref(changeset, ref)
other ->
other
end)
end
def run_atomic_change(
changeset,
%{change: {module, change_opts}, where: where},
context
) do
change_opts =
Ash.Expr.fill_template(
change_opts,
changeset.context.private[:actor],
changeset.arguments,
changeset.context
)
with {:atomic, changeset, atomic_changes, validations} <-
atomic_with_changeset(
module.atomic(changeset, change_opts, struct(Ash.Resource.Change.Context, context)),
changeset
),
{:atomic, condition} <- atomic_condition(where, changeset, context) do
changeset =
case condition do
true ->
apply_atomic_update(changeset, atomic_changes)
false ->
changeset
condition ->
atomic_changes =
Map.new(atomic_changes, fn {key, value} ->
new_value =
expr(
if ^condition do
^value
else
^ref(key)
end
)
{key, new_value}
end)
apply_atomic_update(changeset, atomic_changes)
end
Enum.reduce(
List.wrap(validations),
changeset,
fn {:atomic, _, condition_expr, error_expr}, changeset ->
validate_atomically(changeset, condition_expr, error_expr)
end
)
else
{:ok, changeset} ->
changeset
{:not_atomic, reason} ->
{:not_atomic, reason}
:ok ->
changeset
end
end
defp apply_atomic_update(changeset, atomics) when is_list(atomics) or is_map(atomics) do
Enum.reduce(atomics, changeset, fn {key, value}, changeset ->
apply_atomic_update(changeset, key, value)
end)
end
defp apply_atomic_update(changeset, key, {:atomic, value}) do
%{
changeset
| atomics: Keyword.put(changeset.atomics, key, value),
no_atomic_constraints: [key | changeset.no_atomic_constraints]
}
|> record_atomic_update_for_atomic_upgrade(key, value)
end
defp apply_atomic_update(changeset, key, value) do
attribute = Ash.Resource.Info.attribute(changeset.resource, key)
value =
Ash.Expr.walk_template(value, fn
{:_atomic_ref, field} ->
atomic_ref(changeset, field)
other ->
other
end)
case Ash.Type.cast_atomic(attribute.type, value, attribute.constraints) do
{:atomic, value} ->
value =
if attribute.primary_key? do
value
else
set_error_field(value, attribute.name)
end
%{changeset | atomics: Keyword.put(changeset.atomics, attribute.name, value)}
|> record_atomic_update_for_atomic_upgrade(attribute.name, value)
{:not_atomic, message} ->
{:not_atomic,
"Cannot atomically update #{inspect(changeset.resource)}.#{attribute.name}: #{message}"}
{:ok, value} ->
allow_nil? =
attribute.allow_nil? and attribute.name not in changeset.action.require_attributes
if is_nil(value) and !allow_nil? do
add_required_attribute_error(changeset, attribute)
else
%{changeset | attributes: Map.put(changeset.attributes, attribute.name, value)}
|> store_casted_attribute(attribute.name, value, true)
end
{:error, error} ->
{:cont, add_invalid_errors(value, :attribute, changeset, attribute, error)}
end
end
defp atomic_with_changeset({:atomic, changeset, atomics}, _changeset),
do: {:atomic, changeset, atomics, []}
defp atomic_with_changeset({:atomic, atomics}, changeset), do: {:atomic, changeset, atomics, []}
defp atomic_with_changeset(other, _), do: other
defp validate_atomically(changeset, condition_expr, error_expr) do
%{
changeset
| atomic_validations: [{condition_expr, error_expr} | changeset.atomic_validations]
}
end
@doc """
Gets a reference to a field, or the current atomic update expression of that field.
"""
def atomic_ref(changeset, field) do
case Keyword.fetch(changeset.atomics, field) do
{:ok, atomic} ->
attribute = Ash.Resource.Info.attribute(changeset.resource, field)
Ash.Expr.expr(type(^atomic, ^attribute.type, ^attribute.constraints))
:error ->
case Map.fetch(changeset.attributes, field) do
{:ok, new_value} ->
attribute = Ash.Resource.Info.attribute(changeset.resource, field)
Ash.Expr.expr(type(^new_value, ^attribute.type, ^attribute.constraints))
:error ->
expr(^ref(field))
end
end
end
@doc false
# Returns either an appropriate expression for an atomic condition or a value
# indicated that the condition cannot be handled atomically.
#
# Validation logic matches on failure. So, for example, `present(:field)` is
# going to _match_ when `:field` is `nil`. However, when applying this logic
# to a `where` condition, the opposite is desired. The end result is kinda
# ugly because it can end up reading like "not is not equal to" but
# ultimately produces the correct results.
@spec atomic_condition([{module(), keyword()}], Ash.Changeset.t(), map()) ::
{:atomic, Ash.Expr.t() | boolean()} | {:not_atomic, String.t()}
def atomic_condition(where, changeset, context) do
Enum.reduce_while(where, {:atomic, true}, fn {module, validation_opts},
{:atomic, condition_expr} ->
case module.atomic(
changeset,
validation_opts,
struct(Ash.Resource.Validation.Context, context)
) do
:ok ->
{:cont, {:atomic, condition_expr}}
{:atomic, _, expr, _as_error} ->
{:cont, {:atomic, atomic_condition_expr(condition_expr, expr)}}
{:error, _} ->
# Error from the validator, so the validations should just fail with
# a `false` expression.
{:halt, {:atomic, false}}
{:not_atomic, _reason} = not_atomic ->
{:halt, not_atomic}
atomic_conditions when is_list(atomic_conditions) ->
atomic_conditions
|> Enum.reduce(condition_expr, fn {:atomic, _, expr, _as_error}, reduced_expr ->
atomic_condition_expr(reduced_expr, expr)
end)
|> then(&{:cont, {:atomic, &1}})
end
end)
end
# This is not expressly necessary as `expr(true and not ^new_expr)` would also
# work just fine, but the final output from omitting `true` is much easier to
# read if debugging.
defp atomic_condition_expr(true, expr) do
expr(not (^expr))
end
defp atomic_condition_expr(condition_expr, expr) do
expr(^condition_expr and not (^expr))
end
defp atomic_params(changeset, action, params, opts) do
if opts[:assume_casted?] do
Enum.reduce_while(params, changeset, fn {key, value}, changeset ->
cond do
has_argument?(action, key) ->
{:cont, %{changeset | arguments: Map.put(changeset.arguments, key, value)}}
attribute = Ash.Resource.Info.attribute(changeset.resource, key) ->
{:cont, atomic_update(changeset, attribute.name, {:atomic, value})}
match?("_" <> _, key) ->
{:cont, changeset}
:* in List.wrap(opts[:skip_unknown_inputs]) ->
{:cont, changeset}
key in List.wrap(opts[:skip_unknown_inputs]) ->
{:cont, changeset}
true ->
{:cont,
add_error(
changeset,
NoSuchInput.exception(
resource: changeset.resource,
action: action.name,
input: key,
inputs: Ash.Resource.Info.action_inputs(changeset.resource, action.name)
)
)}
end
end)
else
Enum.reduce_while(params, changeset, fn {key, value}, changeset ->
cond do
has_argument?(action, key) ->
{:cont, set_argument(changeset, key, value)}
attribute = Ash.Resource.Info.attribute(changeset.resource, key) ->
cond do
attribute.name in action.accept ->
{:cont, atomic_update(changeset, attribute.name, value)}
:* in List.wrap(opts[:skip_unknown_inputs]) ->
{:cont, changeset}
key in List.wrap(opts[:skip_unknown_inputs]) ->
{:cont, changeset}
match?("_" <> _, key) ->
{:cont, changeset}
true ->
{:cont,
add_error(
changeset,
NoSuchInput.exception(
resource: changeset.resource,
action: action.name,
input: key,
inputs: Ash.Resource.Info.action_inputs(changeset.resource, action.name)
)
)}
end
match?("_" <> _, key) ->
{:cont, changeset}
:* in List.wrap(opts[:skip_unknown_inputs]) ->
{:cont, changeset}
key in List.wrap(opts[:skip_unknown_inputs]) ->
{:cont, changeset}
true ->
{:cont,
add_error(
changeset,
NoSuchInput.exception(
resource: changeset.resource,
action: action.name,
input: key,
inputs: Ash.Resource.Info.action_inputs(changeset.resource, action.name)
)
)}
end
end)
end
end
defp set_error_field(expr, field) do
Ash.Filter.map(expr, fn
%Ash.Query.Function.Error{arguments: [module, nested_expr]} = func
when is_map(nested_expr) and not is_struct(nested_expr) ->
%{func | arguments: [module, Map.put(nested_expr, :field, field)]}
other ->
other
end)
end
@manage_types [:append_and_remove, :append, :remove, :direct_control, :create]
@doc """
Constructs a changeset for a given action, and validates it.
Calls `for_create/4`, `for_update/4` or `for_destroy/4` based on the type of action passed in.
See those functions for more explanation.
"""
def for_action(initial, action, params \\ %{}, opts \\ []) do
resource =
case initial do
%__MODULE__{resource: resource} -> resource
%resource{} -> resource
resource -> resource
end
action = get_action_entity(resource, action)
case action.type do
:create ->
for_create(initial, action, params, opts)
:update ->
for_update(initial, action, params, opts)
:destroy ->
for_destroy(initial, action, params, opts)
:read ->
raise ArgumentError,
"Passed a read action `#{inspect(resource)}.#{action.name}` into `Ash.Changeset.for_action/4`. Use `Ash.Query.for_read/4` instead."
end
end
@for_create_opts [
require?: [
type: :boolean,
default: false,
doc:
"If set to `false`, values are only required when the action is run (instead of immediately)."
],
actor: [
type: :any,
doc:
"set the actor, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)"
],
authorize?: [
type: :any,
doc:
"set authorize?, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)"
],
tracer: [
type: {:wrap_list, {:behaviour, Ash.Tracer}},
doc:
"A tracer to use. Will be carried over to the action. For more information see `Ash.Tracer`."
],
tenant: [
type: {:protocol, Ash.ToTenant},
doc: "set the tenant on the changeset"
],
skip_unknown_inputs: [
type: {:wrap_list, {:or, [:atom, :string]}},
doc:
"A list of inputs that, if provided, will be ignored if they are not recognized by the action. Use `:*` to indicate all unknown keys."
],
context: [
type: :map,
doc: "Context to set on the query, changeset, or input"
],
private_arguments: [
type: :map,
doc: "Private argument values to set before validations and changes.",
default: %{}
],
return_skipped_upsert?: [
type: :boolean,
default: false,
doc:
"If `true`, and a record was *not* upserted because its filter prevented the upsert, the original record (which was *not* upserted) will be returned."
]
]
@doc false
def for_create_opts, do: @for_create_opts
@doc """
Constructs a changeset for a given create action, and validates it.
Anything that is modified prior to `for_create/4` is validated against the rules of the action, while *anything after it is not*.
This runs any `change`s contained on your action. To have your logic execute *only* during the action, you can use `after_action/2`
or `before_action/2`.
Multitenancy is *not* validated until an action is called. This allows you to avoid specifying a tenant until just before calling
the domain action.
### Params
`params` may be attributes, relationships, or arguments. You can safely pass user/form input directly into this function.
Only public attributes and relationships are supported. If you want to change private attributes as well, see the
Customization section below. `params` are stored directly as given in the `params` field of the changeset, which is used
### Opts
#{Spark.Options.docs(@for_create_opts)}
### Customization
A changeset can be provided as the first argument, instead of a resource, to allow
setting specific attributes ahead of time.
For example:
MyResource
|> Ash.Changeset.new()
|> Ash.Changeset.change_attribute(:foo, 1)
|> Ash.Changeset.for_create(:create, ...opts)
Once a changeset has been validated by `for_create/4` (or `for_update/4`), it isn't validated again in the action.
New changes added are validated individually, though. This allows you to create a changeset according
to a given action, and then add custom changes if necessary.
### What does this function do?
The following steps are run when calling `Ash.Changeset.for_create/4`.
- Cast input params | This is any arguments in addition to any accepted attributes
- Set argument defaults
- Require any missing arguments
- Validate all provided attributes are accepted
- Require any accepted attributes that are `allow_nil?` false
- Set any default values for attributes
- Run action changes & validations
- Run validations, or add them in `before_action` hooks if using `d:Ash.Resource.Dsl.actions.create.validate|before_action?`. Any global validations are skipped if the action has `skip_global_validations?` set to `true`.
"""
def for_create(initial, action, params \\ %{}, opts \\ []) do
changeset =
case initial do
%__MODULE__{action_type: :create} = changeset ->
changeset
resource when is_atom(resource) ->
new(resource)
other ->
raise ArgumentError,
message: """
Initial must be a changeset with the action type of `:create`, or a resource.
Got: #{inspect(other)}
"""
end
action =
get_action_entity(changeset.resource, action) ||
raise_no_action(changeset.resource, action, :create)
upsert_condition =
case opts[:upsert_condition] do
nil -> action && action.upsert_condition
other -> other
end
case action do
%Ash.Resource.Actions.Update{name: name} ->
raise ArgumentError,
message: """
Action #{inspect(changeset.resource)}.#{name} was passed to `Ash.Changeset.for_create`, but it is an update action.
Perhaps you meant to call `Ash.Changeset.for_create` instead?
"""
_ ->
:ok
end
changeset
|> set_context(%{
private: %{
upsert?: opts[:upsert?] || (action && action.upsert?) || false,
return_skipped_upsert?:
opts[:return_skipped_upsert?] || (action && action.return_skipped_upsert?) || false,
upsert_identity: opts[:upsert_identity] || (action && action.upsert_identity),
upsert_fields:
expand_upsert_fields(
opts[:upsert_fields] || (action && action.upsert_fields),
changeset.resource
),
upsert_condition: upsert_condition
}
})
|> then(fn
changeset when upsert_condition != nil -> filter(changeset, upsert_condition)
changeset -> changeset
end)
|> do_for_action(action, params, opts)
end
@for_update_opts @for_create_opts
@doc false
def for_update_opts, do: @for_update_opts
@doc """
Constructs a changeset for a given update action, and validates it.
Anything that is modified prior to `for_update/4` is validated against the rules of the action, while *anything after it is not*.
### What does this function do?
The following steps are run when calling `Ash.Changeset.for_update/4`.
- Cast input params | This is any arguments in addition to any accepted attributes
- Set argument defaults
- Require any missing arguments
- Validate all provided attributes are accepted
- Require any accepted attributes that are `allow_nil?` false
- Set any default values for attributes
- Run action changes & validations
- Run validations, or add them in `before_action` hooks if using `d:Ash.Resource.Dsl.actions.update.validate|before_action?`. Any global validations are skipped if the action has `skip_global_validations?` set to `true`.
"""
def for_update(initial, action, params \\ %{}, opts \\ []) do
changeset =
case initial do
# We accept :destroy here to support soft deletes
%__MODULE__{action_type: type} = changeset when type in [:update, :destroy] ->
changeset
%mod{} = struct when mod != __MODULE__ ->
new(struct)
other ->
raise ArgumentError,
message: """
Initial must be a changeset with the action type of `:update` or `:destroy`, or a record.
Got: #{inspect(other)}
"""
end
do_for_action(changeset, action, params, opts)
end
@doc """
Constructs a changeset for a given destroy action, and validates it.
### Opts
* `:actor` - set the actor, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)
* `:tenant` - set the tenant on the changeset
* `:private_arguments` - set private arguments on the changeset before validations and changes are run
Anything that is modified prior to `for_destroy/4` is validated against the rules of the action, while *anything after it is not*.
Once a changeset has been validated by `for_destroy/4`, it isn't validated again in the action.
New changes added are validated individually, though. This allows you to create a changeset according
to a given action, and then add custom changes if necessary.
### What does this function do?
The following steps are run when calling `Ash.Changeset.for_destroy/4`.
- Cast input params | This is any arguments in addition to any accepted attributes
- Set argument defaults
- Require any missing arguments
- Validate all provided attributes are accepted
- Require any accepted attributes that are `allow_nil?` false
- Set any default values for attributes
- Run action changes & validations
- Run validations, or add them in `before_action` hooks if using `d:Ash.Resource.Dsl.actions.destroy.validate|before_action?`. Any global validations are skipped if the action has `skip_global_validations?` set to `true`.
"""
def for_destroy(initial, action_or_name, params \\ %{}, opts \\ []) do
changeset =
case initial do
%__MODULE__{} = changeset ->
changeset
|> Map.put(:action_type, :destroy)
%_{} = struct ->
struct
|> new()
|> Map.put(:action_type, :destroy)
other ->
raise ArgumentError,
message: """
Initial must be a changeset with the action type of `:destroy`, or a record.
Got: #{inspect(other)}
"""
end
action =
get_action_entity(changeset.resource, action_or_name) ||
raise_no_action(changeset.resource, action_or_name, :destroy)
domain =
changeset.domain || opts[:domain] || Ash.Resource.Info.domain(changeset.resource) ||
Ash.Actions.Helpers.maybe_embedded_domain(changeset.resource) ||
raise ArgumentError,
message:
"Could not determine domain for changeset. Provide the `domain` option or configure a domain in the resource directly."
changeset = %{changeset | domain: domain}
if changeset.valid? do
if action do
try do
if action.soft? do
do_for_action(%{changeset | action_type: :destroy}, action, params, opts)
else
{changeset, opts} =
Ash.Actions.Helpers.set_context_and_get_opts(
domain,
changeset,
opts
)
name =
fn ->
"changeset:" <>
Ash.Resource.Info.trace_name(changeset.resource) <> ":#{action.name}"
end
Ash.Tracer.span :changeset,
name,
opts[:tracer] do
Ash.Tracer.telemetry_span [:ash, :changeset], fn ->
%{
resource_short_name: Ash.Resource.Info.short_name(changeset.resource)
}
end do
metadata = fn ->
%{
resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
resource: changeset.resource,
actor: opts[:actor],
tenant: opts[:tenant],
action: action.name,
authorize?: opts[:authorize?]
}
end
Ash.Tracer.set_metadata(opts[:tracer], :changeset, metadata)
changeset =
Enum.reduce(opts[:private_arguments] || %{}, changeset, fn {k, v}, changeset ->
Ash.Changeset.set_argument(changeset, k, v)
end)
changeset
|> Map.put(:action, action)
|> handle_errors(action.error_handler)
|> set_actor(opts)
|> set_authorize(opts)
|> set_tracer(opts)
|> set_tenant(opts[:tenant] || changeset.tenant)
|> cast_params(action, params, opts)
|> set_argument_defaults(action)
|> require_arguments(action)
|> validate_attributes_accepted(action)
|> run_action_changes(
action,
opts[:actor],
opts[:authorize?],
opts[:tracer],
metadata
)
|> add_validations(opts[:tracer], metadata, opts[:actor])
|> mark_validated(action.name)
|> Map.put(:__validated_for_action__, action.name)
end
end
end
rescue
e ->
reraise Ash.Error.to_error_class(e,
stacktrace: __STACKTRACE__,
bread_crumbs: [
"building changeset for #{inspect(changeset.resource)}.#{action.name}"
]
),
__STACKTRACE__
end
else
raise_no_action(changeset.resource, action_or_name, :destroy)
end
else
changeset
end
end
@doc """
Adds multiple atomic changes to the changeset
See `atomic_update/3` for more information.
"""
@spec atomic_update(t(), map() | Keyword.t()) :: t()
def atomic_update(changeset, atomics) when is_list(atomics) or is_map(atomics) do
Enum.reduce(atomics, changeset, fn {key, value}, changeset ->
atomic_update(changeset, key, value)
end)
end
@doc """
Adds an atomic change to the changeset.
Atomic changes are applied by the data layer, and as such have guarantees that are not
given by changes that are based on looking at the previous value and updating it. Here
is an example of a change that is not safe to do concurrently:
```elixir
change fn changeset, _ ->
Ash.Changeset.change_attribute(changeset, :score, changeset.data.score + 1)
end
```
If two processes run this concurrently, they will both read the same value of `score`, and
set the new score to the same value. This means that one of the increments will be lost.
If you were to instead do this using `atomic_update`, you would get the correct result:
```elixir
Ash.Changeset.atomic_update(changeset, :score, expr(score + 1))
```
There are drawbacks/things to consider, however. The first is that atomic update results
are not known until after the action is run. The following functional validation would not
be able to enforce the score being less than 10, because the atomic happens after the validation.
```elixir
validate fn changeset, _ ->
if Ash.Changeset.get_attribute(changeset, :score) < 10 do
:ok
else
{:error, field: :score, message: "must be less than 10"}
end
end
```
If you want to use atomic updates, it is suggested to write module-based validations & changes,
and implement the appropriate atomic callbacks on those modules. All builtin validations and changes
implement these callbacks in addition to the standard callbacks. Validations will only be run atomically
when the entire action is being run atomically or if one of the relevant fields is being updated atomically.
"""
@spec atomic_update(t(), atom(), {:atomic, Ash.Expr.t()} | Ash.Expr.t()) :: t()
def atomic_update(changeset, key, {:atomic, value}) do
%{
changeset
| atomics: Keyword.put(changeset.atomics, key, value),
no_atomic_constraints: [key | changeset.no_atomic_constraints]
}
end
def atomic_update(changeset, key, value) do
attribute =
Ash.Resource.Info.attribute(changeset.resource, key) ||
raise "Unknown attribute `#{inspect(changeset.resource)}.#{inspect(key)}`"
value =
Ash.Expr.walk_template(value, fn
{:_atomic_ref, field} ->
atomic_ref(changeset, field)
other ->
other
end)
case Ash.Type.cast_atomic(attribute.type, value, attribute.constraints) do
{:atomic, value} ->
value =
if attribute.primary_key? do
value
else
set_error_field(value, attribute.name)
end
%{changeset | atomics: Keyword.put(changeset.atomics, attribute.name, value)}
|> record_atomic_update_for_atomic_upgrade(attribute.name, value)
{:ok, value} ->
allow_nil? =
if is_nil(changeset.action) do
true
else
attribute.allow_nil? and attribute.name not in changeset.action.require_attributes
end
if is_nil(value) and !allow_nil? do
add_required_attribute_error(changeset, attribute)
else
%{
changeset
| attributes: Map.put(changeset.attributes, attribute.name, value),
atomics: Keyword.delete(changeset.atomics, attribute.name)
}
|> store_casted_attribute(attribute.name, value, true)
end
{:error, error} ->
add_invalid_errors(value, :attribute, changeset, attribute, error)
{:not_atomic, message} ->
add_error(
changeset,
"Cannot atomically update #{inspect(changeset.resource)}.#{attribute.name}: #{message}"
)
end
end
@doc false
def handle_allow_nil_atomics(changeset, actor) do
changeset.atomics
|> Enum.reduce(changeset, fn {key, value}, changeset ->
attribute = Ash.Resource.Info.attribute(changeset.resource, key)
if attribute.primary_key? do
changeset
else
allow_nil? =
attribute.allow_nil? and attribute.name not in changeset.action.require_attributes
value =
if allow_nil? || not Ash.Expr.can_return_nil?(value) do
value
else
expr(
if is_nil(^value) do
error(
^Ash.Error.Changes.Required,
%{
field: ^attribute.name,
type: ^:attribute,
resource: ^changeset.resource
}
)
else
^value
end
)
end
%{changeset | atomics: Keyword.put(changeset.atomics, key, value)}
end
end)
|> Ash.Changeset.hydrate_atomic_refs(actor, eager?: true)
end
@doc """
Set the result of the action. This will prevent running the underlying datalayer behavior
"""
@spec set_result(t(), term) :: t()
def set_result(changeset, result) do
set_context(changeset, %{private: %{action_result: result}})
end
@doc """
Turns the special case {:replace, fields}, :replace_all and {:replace_all_except, fields} upsert_fields
options into a list of fields
"""
def expand_upsert_fields({:replace, fields}, _) do
fields
end
def expand_upsert_fields(:replace_all, resource) do
resource
|> Ash.Resource.Info.attributes()
|> Enum.map(fn %{name: name} -> name end)
end
def expand_upsert_fields({:replace_all_except, except_fields}, resource) do
resource
|> Ash.Resource.Info.attributes()
|> Enum.map(fn %{name: name} -> name end)
|> Enum.reject(fn name -> name in except_fields end)
end
def expand_upsert_fields(fields, _), do: fields
@spec set_on_upsert(t(), list(atom)) :: Keyword.t()
def set_on_upsert(changeset, upsert_keys) do
keys = upsert_keys || Ash.Resource.Info.primary_key(changeset.resource)
if changeset.context[:private][:upsert_fields] do
Keyword.new(changeset.context[:private][:upsert_fields], fn key ->
{key, Ash.Changeset.get_attribute(changeset, key)}
end)
else
explicitly_changing_attributes =
Enum.map(
Map.keys(changeset.attributes) -- Map.get(changeset, :defaults, []) -- keys,
fn key ->
{key, Ash.Changeset.get_attribute(changeset, key)}
end
)
changeset
|> upsert_update_defaults()
|> Keyword.merge(explicitly_changing_attributes)
end
end
defp upsert_update_defaults(changeset) do
changeset.resource
|> static_defaults()
|> Enum.concat(lazy_matching_defaults(changeset.resource))
|> Enum.concat(lazy_non_matching_defaults(changeset.resource))
end
defp static_defaults(resource) do
resource
|> Ash.Resource.Info.static_default_attributes(:update)
|> Enum.map(&{&1.name, &1.update_default})
end
defp lazy_non_matching_defaults(resource) do
resource
|> Ash.Resource.Info.lazy_non_matching_default_attributes(:update)
|> Enum.map(&{&1.name, &1.update_default})
end
defp lazy_matching_defaults(resource) do
resource
|> Ash.Resource.Info.lazy_matching_default_attributes(:update)
|> Enum.group_by(& &1.update_default)
|> Enum.flat_map(fn {default_fun, attributes} ->
default_value =
case default_fun do
function when is_function(function) ->
function.()
{m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
apply(m, f, a)
end
Enum.map(attributes, &{&1.name, default_value})
end)
end
defp do_for_action(changeset, action_or_name, params, opts) do
domain =
changeset.domain || opts[:domain] || Ash.Resource.Info.domain(changeset.resource) ||
Ash.Actions.Helpers.maybe_embedded_domain(changeset.resource) ||
raise ArgumentError,
message:
"Could not determine domain for changeset. Provide the `domain` option or configure a domain in the resource directly."
changeset = %{changeset | domain: domain}
if changeset.valid? do
action = get_action_entity(changeset.resource, action_or_name)
{changeset, opts} =
Ash.Actions.Helpers.set_context_and_get_opts(
domain,
%{changeset | action: action},
opts
)
if action do
name =
fn ->
"changeset:" <> Ash.Resource.Info.trace_name(changeset.resource) <> ":#{action.name}"
end
try do
Ash.Tracer.span :changeset,
name,
opts[:tracer] do
Ash.Tracer.telemetry_span [:ash, :changeset], fn ->
%{
resource_short_name: Ash.Resource.Info.short_name(changeset.resource)
}
end do
metadata = fn ->
%{
resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
resource: changeset.resource,
actor: opts[:actor],
tenant: opts[:tenant],
action: action.name,
authorize?: opts[:authorize?]
}
end
Ash.Tracer.set_metadata(opts[:tracer], :changeset, metadata)
changeset =
Enum.reduce(opts[:private_arguments] || %{}, changeset, fn {k, v}, changeset ->
Ash.Changeset.set_argument(changeset, k, v)
end)
changeset =
changeset
|> prepare_changeset_for_action(action, opts)
|> handle_params(action, params, opts)
|> run_action_changes(
action,
opts[:actor],
opts[:authorize?],
opts[:tracer],
metadata
)
|> add_validations(opts[:tracer], metadata, opts[:actor])
|> mark_validated(action.name)
|> eager_validate_identities()
|> Map.put(:__validated_for_action__, action.name)
if Keyword.get(opts, :require?, true) do
require_values(changeset, action.type)
else
changeset
end
end
end
rescue
e ->
reraise Ash.Error.to_error_class(e,
stacktrace: __STACKTRACE__,
bread_crumbs: [
"building changeset for #{inspect(changeset.resource)}.#{action.name}"
]
),
__STACKTRACE__
end
else
raise_no_action(changeset.resource, action_or_name, changeset.action_type)
end
else
action = get_action_entity(changeset.resource, action_or_name)
{changeset, _opts} =
Ash.Actions.Helpers.set_context_and_get_opts(
domain,
changeset,
opts
)
%{changeset | action: action}
end
end
@doc """
Checks if an argument is not nil or an attribute is not nil, either in the original data, or that it is not being changed to a `nil` value if it is changing.
This also accounts for the `accessing_from` context that is set when using `manage_relationship`, so it is aware that a particular value
*will* be set by `manage_relationship` even if it isn't currently being set.
"""
def present?(changeset, attribute) do
arg_or_attribute_value =
case Ash.Changeset.fetch_argument(changeset, attribute) do
{:ok, nil} ->
Ash.Changeset.get_attribute(changeset, attribute)
:error ->
Ash.Changeset.get_attribute(changeset, attribute)
{:ok, value} ->
{:ok, value}
end
arg_or_attribute_value =
case arg_or_attribute_value do
%Ash.NotLoaded{} ->
nil
%Ash.ForbiddenField{} ->
nil
other ->
other
end
not is_nil(arg_or_attribute_value) ||
belongs_to_attr_of_rel_being_managed?(attribute, changeset, true) ||
is_belongs_to_rel_being_managed?(attribute, changeset, true)
end
@doc """
Checks if an attribute is not nil, either in the original data, or that it is not being changed to a `nil` value if it is changing.
This also accounts for the `accessing_from` context that is set when using `manage_relationship`, so it is aware that a particular value
*will* be set by `manage_relationship` even if it isn't currently being set.
"""
def attribute_present?(changeset, attribute) do
attribute_value = Ash.Changeset.get_attribute(changeset, attribute)
attribute_value =
case attribute_value do
%Ash.NotLoaded{} ->
nil
%Ash.ForbiddenField{} ->
nil
other ->
other
end
not is_nil(attribute_value) ||
belongs_to_attr_of_rel_being_managed?(attribute, changeset, true) ||
is_belongs_to_rel_being_managed?(attribute, changeset, true)
end
def prepare_changeset_for_action(changeset, action, opts) do
changeset
|> Map.put(:action, action)
|> reset_arguments()
|> handle_errors(action.error_handler)
|> set_actor(opts)
|> set_authorize(opts)
|> set_tracer(opts)
|> timeout(changeset.timeout || opts[:timeout])
|> set_tenant(opts[:tenant] || changeset.tenant || changeset.data.__metadata__[:tenant])
|> Map.put(:action_type, action.type)
end
defp reset_arguments(%{arguments: arguments} = changeset) do
Enum.reduce(arguments, changeset, fn {key, value}, changeset ->
set_argument(changeset, key, value)
end)
end
def handle_params(changeset, action, params, handle_params_opts \\ []) do
if Keyword.get(handle_params_opts, :cast_params?, true) do
cast_params(changeset, action, params || %{}, handle_params_opts)
else
changeset
end
|> set_argument_defaults(action)
|> require_arguments(action)
|> validate_attributes_accepted(action)
|> require_values(action.type, false, action.require_attributes)
|> set_defaults(changeset.action_type, false)
end
defp get_action_entity(resource, name) when is_atom(name),
do: Ash.Resource.Info.action(resource, name)
defp get_action_entity(_resource, %struct{} = action)
when struct in [
Ash.Resource.Actions.Update,
Ash.Resource.Actions.Create,
Ash.Resource.Actions.Destroy
] do
action
end
defp get_action_entity(_resource, action) do
raise ArgumentError, "Invalid value provided for action: #{inspect(action)}"
end
defp eager_validate_identities(changeset) do
identities =
changeset.resource
|> Ash.Resource.Info.identities()
case identities do
[] ->
changeset
identities ->
Enum.reduce(identities, changeset, fn identity, changeset ->
changeset =
if identity.eager_check_with do
validate_identity(changeset, identity, identity.eager_check_with)
else
changeset
end
if identity.pre_check_with do
before_action(changeset, &validate_identity(&1, identity, identity.pre_check_with))
else
changeset
end
end)
end
end
defp validate_identity(
%{context: %{private: %{upsert?: true, upsert_identity: name}}} = changeset,
%{name: name},
_domain
) do
changeset
end
defp validate_identity(
%{action: %{soft?: true}} = changeset,
identity,
domain
) do
do_validate_identity(changeset, identity, domain)
end
defp validate_identity(
%{action: %{type: type}} = changeset,
identity,
domain
)
when type in [:create, :update] do
do_validate_identity(changeset, identity, domain)
end
defp validate_identity(
%{action: %{type: type}} = changeset,
identity,
domain
)
when type in [:create, :update] do
do_validate_identity(changeset, identity, domain)
end
defp validate_identity(changeset, _, _), do: changeset
defp do_validate_identity(changeset, identity, domain) do
if changeset.context[:private][:upsert_identity] == identity.name do
changeset
else
if changeset.action_type == :create ||
Enum.any?(identity.keys, &changing_attribute?(changeset, &1)) do
action = Ash.Resource.Info.primary_action(changeset.resource, :read).name
if Enum.any?(identity.keys, fn key ->
Ash.Resource.Info.calculation(changeset.resource, key)
end) do
raise ArgumentError, "Cannot pre or eager check an identity based on calculated fields."
end
values =
Enum.map(identity.keys, fn key ->
case Ash.Changeset.get_attribute(changeset, key) do
nil ->
{key, is_nil: true}
value ->
{key, value}
end
end)
if identity.nils_distinct? && Enum.any?(values, &(elem(&1, 1) == [is_nil: true])) do
changeset
else
tenant =
if identity.all_tenants? do
unless Ash.Resource.Info.multitenancy_global?(changeset.resource) do
raise ArgumentError,
message: """
Cannot pre or eager check an identity that has `all_tenants?: true`
unless the resource supports global multitenancy.
"""
end
nil
else
changeset.tenant
end
changeset.resource
|> Ash.Query.for_read(action, %{},
tenant: tenant,
actor: changeset.context[:private][:actor],
authorize?: changeset.context[:private][:authorize?],
tracer: changeset.context[:private][:tracer],
domain: domain
)
|> Ash.Query.do_filter(values)
|> Ash.Query.limit(1)
|> Ash.Query.set_context(%{private: %{internal?: true}})
|> Ash.read_one(authorize?: false)
|> case do
{:ok, nil} ->
changeset
{:ok, _} ->
error =
Ash.Error.Changes.InvalidChanges.exception(
fields: identity.keys,
message: identity.message || "has already been taken"
)
add_error(changeset, error)
{:error, error} ->
add_error(changeset, error)
end
end
else
changeset
end
end
end
defp require_arguments(changeset, action) do
action.arguments
|> Enum.filter(&(&1.allow_nil? == false))
|> Enum.reduce(changeset, fn argument, changeset ->
case fetch_argument(changeset, argument.name) do
{:ok, value} when not is_nil(value) ->
changeset
_ ->
if argument.name in changeset.invalid_keys do
changeset
else
add_error(
changeset,
Ash.Error.Changes.Required.exception(
resource: changeset.resource,
field: argument.name,
type: :argument
)
)
end
end
end)
end
defp set_argument_defaults(changeset, action) do
Enum.reduce(action.arguments, changeset, fn argument, changeset ->
case fetch_argument(changeset, argument.name) do
:error ->
if is_nil(argument.default) do
changeset
else
%{
changeset
| arguments: Map.put(changeset.arguments, argument.name, default(:create, argument))
}
end
_ ->
changeset
end
end)
end
defp set_actor(changeset, opts) do
if Keyword.has_key?(opts, :actor) do
put_context(changeset, :private, %{actor: opts[:actor]})
else
changeset
end
end
defp set_authorize(changeset, opts) do
if Keyword.has_key?(opts, :authorize?) do
put_context(changeset, :private, %{authorize?: opts[:authorize?]})
else
changeset
end
end
defp set_tracer(changeset, opts) do
if Keyword.has_key?(opts, :tracer) do
put_context(changeset, :private, %{tracer: opts[:tracer]})
else
changeset
end
end
defp raise_no_action(resource, action, type) do
available_actions =
resource
|> Ash.Resource.Info.actions()
|> Enum.filter(&(&1.type == type))
|> Enum.map_join("\n", &" - `#{inspect(&1.name)}`")
raise ArgumentError,
message: """
No such #{type} action on resource #{inspect(resource)}: #{String.slice(inspect(action), 0..50)}
Example Call:
Ash.Changeset.for_#{type}(changeset_or_record, :action_name, input, options)
Available #{type} actions:
#{available_actions}
"""
end
defp mark_validated(changeset, action_name) do
%{changeset | __validated_for_action__: