lib/ash/changeset/changeset.ex

defmodule Ash.Changeset do
  @moduledoc """
  Changesets are used to create and update data in Ash.

  Create a changeset with `new/1` or `new/2`, and alter the attributes
  and relationships using the functions provided in this module.  Nothing in this module
  actually incurs changes in a data layer. To commit a changeset, see `Ash.create/2`
  and `Ash.update/2`.

  # Changeset lifecycle

  ## Action Lifecycle

  The following example illustrates the hook lifecycle of a changeset.

  ```elixir
  defmodule AshChangesetLifeCycleExample do
    def change(changeset, _, _) do
      changeset
      # execute code both before and after the transaction
      |> Ash.Changeset.around_transaction(fn changeset, callback ->
        callback.(changeset)
      end)
      # execute code before the transaction is started. Use for things like external calls
      |> Ash.Changeset.before_transaction(fn changeset -> changeset end)
      # execute code in the transaction, before and after the data layer is called
      |> Ash.Changeset.around_action(fn changeset, callback ->
        callback.(changeset)
      end)
      # execute code in the transaction, before the data layer is called
      |> Ash.Changeset.before_action(fn changeset -> changeset end)
      # execute code in the transaction, after the data layer is called, only if the action is successful
      |> Ash.Changeset.after_action(fn changeset, result -> {:ok, result} end)
      # execute code after the transaction, both in success and error cases
      |> Ash.Changeset.after_transaction(fn changeset, success_or_error_result -> success_or_error_result end
    end
  end
  ```
  """

  defstruct [
    :__validated_for_action__,
    :action_type,
    :action,
    :domain,
    :data,
    :handle_errors,
    :resource,
    :tenant,
    :to_tenant,
    :timeout,
    dirty_hooks: [],
    invalid_keys: MapSet.new(),
    filter: nil,
    added_filter: nil,
    action_failed?: false,
    atomics: [],
    atomic_validations: [],
    after_action: [],
    after_transaction: [],
    arguments: %{},
    around_action: [],
    around_transaction: [],
    attributes: %{},
    before_action: [],
    before_transaction: [],
    no_atomic_constraints: [],
    context: %{},
    context_changes: %{},
    defaults: [],
    errors: [],
    params: %{},
    action_select: [],
    atomic_after_action: [],
    attribute_changes: %{},
    atomic_changes: [],
    casted_attributes: %{},
    casted_arguments: %{},
    phase: :pending,
    relationships: %{},
    select: nil,
    load: [],
    valid?: true
  ]

  defimpl Inspect do
    import Inspect.Algebra

    @spec inspect(Ash.Changeset.t(), Inspect.Opts.t()) ::
            {:doc_cons, :doc_line | :doc_nil | binary | tuple,
             :doc_line | :doc_nil | binary | tuple}
            | {:doc_group,
               :doc_line
               | :doc_nil
               | binary
               | {:doc_collapse, pos_integer}
               | {:doc_force, any}
               | {:doc_break | :doc_color | :doc_cons | :doc_fits | :doc_group | :doc_string, any,
                  any}
               | {:doc_nest, any, :cursor | :reset | non_neg_integer, :always | :break},
               :inherit | :self}
    def inspect(changeset, opts) do
      context = Map.delete(changeset.context, :private)

      context =
        if context == %{} do
          empty()
        else
          concat("context: ", to_doc(context, opts))
        end

      tenant =
        if changeset.tenant do
          concat(
            "tenant: ",
            to_doc(changeset.to_tenant, opts)
          )
        else
          empty()
        end

      domain =
        if changeset.domain do
          concat("domain: ", to_doc(changeset.domain, opts))
        else
          empty()
        end

      select =
        if changeset.select do
          concat("select: ", to_doc(changeset.select, opts))
        else
          empty()
        end

      load =
        if changeset.load && changeset.load != [] do
          concat("load: ", to_doc(changeset.load, opts))
        else
          empty()
        end

      atomics =
        if Enum.empty?(changeset.atomics) do
          empty()
        else
          concat("atomics: ", to_doc(changeset.atomics, opts))
        end

      filter =
        case changeset.filter do
          nil ->
            empty()

          %Ash.Filter{expression: nil} ->
            empty()

          _ ->
            concat("filter: ", to_doc(changeset.filter, opts))
        end

      container_doc(
        "#Ash.Changeset<",
        [
          domain,
          concat("action_type: ", inspect(changeset.action_type)),
          concat("action: ", inspect(changeset.action && changeset.action.name)),
          tenant,
          concat("attributes: ", to_doc(changeset.attributes, opts)),
          atomics,
          concat("relationships: ", to_doc(changeset.relationships, opts)),
          arguments(changeset, opts),
          concat("errors: ", to_doc(changeset.errors, opts)),
          filter,
          concat("data: ", to_doc(changeset.data, opts)),
          context,
          concat("valid?: ", to_doc(changeset.valid?, opts)),
          select,
          load
        ],
        ">",
        opts,
        fn str, _ -> str end
      )
    end

    defp arguments(changeset, opts) do
      if changeset.action do
        if Enum.empty?(changeset.action.arguments) do
          empty()
        else
          arg_string =
            changeset.action.arguments
            |> Enum.filter(fn argument ->
              match?({:ok, _}, Ash.Changeset.fetch_argument(changeset, argument.name))
            end)
            |> Map.new(fn argument ->
              value = Ash.Changeset.get_argument(changeset, argument.name)

              if argument.sensitive? do
                {argument.name, Ash.Helpers.redact(value)}
              else
                {argument.name, value}
              end
            end)
            |> to_doc(opts)

          concat(["arguments: ", arg_string])
        end
      else
        empty()
      end
    end
  end

  @type after_action_fun ::
          (t, Ash.Resource.record() ->
             {:ok, Ash.Resource.record()}
             | {:ok, Ash.Resource.record(), [Ash.Notifier.Notification.t()]}
             | {:error, any})

  @type after_transaction_fun ::
          (t, {:ok, Ash.Resource.record()} | {:error, any} ->
             {:ok, Ash.Resource.record()} | {:error, any})

  @type before_action_fun :: (t -> t | {t, %{notifications: [Ash.Notifier.Notification.t()]}})

  @type before_transaction_fun :: (t -> t)

  @type around_action_result ::
          {:ok, Ash.Resource.record(), t(), %{notifications: list(Ash.Notifier.Notification.t())}}
          | {:error, Ash.Error.t()}
  @type around_action_callback :: (t -> around_action_result)
  @type around_action_fun :: (t, around_action_callback -> around_action_result)

  @type around_transaction_result :: {:ok, Ash.Resource.record()} | {:error, any}
  @type around_transaction_callback :: (t -> around_transaction_result)
  @type around_transaction_fun :: (t, around_transaction_callback -> around_transaction_result)

  @phases [
    :atomic,
    :pending,
    :validate,
    :before_action,
    :after_action,
    :before_transaction,
    :after_transaction,
    :around_action,
    :around_transaction
  ]

  @type phase :: unquote(Enum.reduce(@phases, &{:|, [], [&1, &2]}))

  @type t :: %__MODULE__{
          __validated_for_action__: atom | nil,
          action: Ash.Resource.Actions.action() | nil,
          action_failed?: boolean,
          action_type: Ash.Resource.Actions.action_type() | nil,
          after_action: [after_action_fun | {after_action_fun, map}],
          after_transaction: [after_transaction_fun | {after_transaction_fun, map}],
          atomics: Keyword.t(),
          domain: module | nil,
          arguments: %{optional(atom) => any},
          around_action: [around_action_fun | {around_action_fun, map}],
          around_transaction: [around_transaction_fun | {around_transaction_fun, map}],
          attributes: %{optional(atom) => any},
          before_action: [before_action_fun | {before_action_fun, map}],
          before_transaction: [before_transaction_fun | {before_transaction_fun, map}],
          context: map,
          filter: Ash.Filter.t() | nil,
          added_filter: Ash.Filter.t() | nil,
          data: Ash.Resource.record() | nil,
          defaults: [atom],
          errors: [Ash.Error.t()],
          handle_errors:
            nil | (t, error :: any -> :ignore | t | (error :: any) | {error :: any, t}),
          invalid_keys: MapSet.t(),
          params: %{optional(atom | binary) => any},
          phase: phase(),
          relationships: %{
            optional(atom) =>
              %{optional(atom | binary) => any} | [%{optional(atom | binary) => any}]
          },
          resource: module,
          select: [atom] | nil,
          load: keyword(keyword),
          tenant: term(),
          timeout: pos_integer() | nil,
          valid?: boolean
        }

  @type error_info ::
          String.t()
          | [
              {:field, atom()}
              | {:fields, [atom()]}
              | {:message, String.t()}
              | {:value, any()}
            ]
          | Ash.Error.t()

  alias Ash.Error.{
    Changes.InvalidArgument,
    Changes.InvalidAttribute,
    Changes.InvalidChanges,
    Changes.InvalidRelationship,
    Changes.NoSuchAttribute,
    Changes.NoSuchRelationship,
    Changes.Required,
    Invalid.NoSuchInput,
    Invalid.NoSuchResource
  }

  require Ash.Tracer
  import Ash.Expr
  require Logger

  defmodule OriginalDataNotAvailable do
    @moduledoc "A value placed in changeset.data to indicate that the original data is not available"
    defstruct reason: :atomic_query_update
    @type t :: %__MODULE__{reason: :atomic_query_update}
  end

  defmacrop maybe_already_validated_error!(changeset, alternative \\ nil) do
    {function, arity} = __CALLER__.function

    if alternative do
      quote do
        changeset = unquote(changeset)

        {:current_stacktrace, stacktrace} =
          Process.info(self(), :current_stacktrace)

        if changeset.__validated_for_action__ do
          require Logger

          Logger.warning("""
          Changeset has already been validated for action #{inspect(changeset.__validated_for_action__)}.

          For safety, we prevent any changes after that point because they will bypass validations or other action logic.. To proceed anyway,
          you can use `#{unquote(alternative)}/#{unquote(arity)}`. However, you should prefer a pattern like the below, which makes
          any custom changes *before* calling the action.

            Resource
            |> Ash.Changeset.new()
            |> Ash.Changeset.#{unquote(function)}(...)
            |> Ash.Changeset.for_create(...)

          #{Exception.format_stacktrace(stacktrace)}
          """)
        end
      end
    else
      quote do
        changeset = unquote(changeset)

        {:current_stacktrace, stacktrace} =
          Process.info(self(), :current_stacktrace)

        if changeset.__validated_for_action__ do
          require Logger

          Logger.warning("""
          Changeset has already been validated for action #{inspect(changeset.__validated_for_action__)}.

          For safety, we prevent any changes using `#{unquote(function)}/#{unquote(arity)}` after that point because they will bypass validations or other action logic.
          Instead, you should change or set this value before calling the action, like so:

            Resource
            |> Ash.Changeset.new()
            |> Ash.Changeset.#{unquote(function)}(...)
            |> Ash.Changeset.for_create(...)

          #{Exception.format_stacktrace(stacktrace)}
          """)
        end
      end
    end
  end

  @doc """
  A guard which checks if the Changeset is valid.
  """
  @spec is_valid(t) :: Macro.output()
  defguard is_valid(changeset) when is_struct(changeset, __MODULE__) and changeset.valid? == true

  @doc """
  Returns a new changeset over a resource.

  *Warning*: You almost always want to use `for_action` or `for_create`, etc. over this function if possible.

  You can use this to start a changeset and make changes prior to calling `for_action`. This is not typically
  necessary, but can be useful as an escape hatch. For example:

  ```elixir
  Resource
  |> Ash.Changeset.new()
  |> Ash.Changeset.change_attribute(:name, "foobar")
  |> Ash.Changeset.for_action(...)
  ```
  """
  @spec new(Ash.Resource.t() | Ash.Resource.record()) :: t

  def new(record_or_resource) do
    {resource, record, action_type} =
      case record_or_resource do
        %resource{} = record -> {resource, record, :update}
        resource -> {resource, struct(resource), :create}
      end

    tenant =
      record
      |> Map.get(:__metadata__, %{})
      |> Map.get(:tenant, nil)

    context = Ash.Resource.Info.default_context(resource) || %{}

    if Ash.Resource.Info.resource?(resource) do
      %__MODULE__{resource: resource, data: record, action_type: action_type}
      |> set_context(context)
      |> set_tenant(tenant)
    else
      %__MODULE__{
        resource: resource,
        action_type: action_type,
        data: struct(resource)
      }
      |> add_error(NoSuchResource.exception(resource: resource))
      |> set_tenant(tenant)
      |> set_context(context)
    end
  end

  @doc """
  Ensure that only the specified attributes are present in the results.

  The first call to `select/2` will replace the default behavior of selecting
  all attributes. Subsequent calls to `select/2` will combine the provided
  fields unless the `replace?` option is provided with a value of `true`.

  If a field has been deselected, selecting it again will override that (because a single list of fields is tracked for selection)

  Primary key attributes always selected and cannot be deselected.

  When attempting to load a relationship (or manage it with `Ash.Changeset.manage_relationship/3`),
  if the source field is not selected on the query/provided data an error will be produced. If loading
  a relationship with a query, an error is produced if the query does not select the destination field
  of the relationship.

  Datalayers currently are not notified of the `select` for a changeset(unlike queries), and creates/updates select all fields when they are performed.
  A select provided on a changeset sets the unselected fields to `nil` before returning the result.

  Use `ensure_selected/2` if you wish to make sure a field has been selected, without deselecting any other fields.
  """
  def select(changeset, fields, opts \\ []) do
    if opts[:replace?] do
      case fields do
        %MapSet{} = fields -> %{changeset | select: Enum.to_list(fields)}
        fields -> %{changeset | select: Enum.uniq(List.wrap(fields))}
      end
    else
      case fields do
        %MapSet{} ->
          %{
            changeset
            | select: MapSet.union(MapSet.new(changeset.select || []), fields) |> MapSet.to_list()
          }

        fields ->
          %{changeset | select: Enum.uniq(List.wrap(fields) ++ (changeset.select || []))}
      end
    end
  end

  @doc false
  def set_action_select(%{action: nil} = changeset) do
    %{
      changeset
      | action_select:
          MapSet.to_list(
            Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)
          )
    }
  end

  def set_action_select(changeset) do
    if Ash.DataLayer.data_layer_can?(changeset.resource, :action_select) do
      required =
        Ash.Resource.Info.action_select(changeset.resource, changeset.action.name) || []

      select =
        changeset.select ||
          MapSet.to_list(
            Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)
          )

      %{
        changeset
        | action_select: Enum.uniq(List.wrap(required) |> Enum.concat(select))
      }
    else
      %{
        changeset
        | action_select:
            MapSet.to_list(
              Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)
            )
      }
    end
  end

  @doc """
  Calls the provided load statement on the result of the action at the very end of the action.
  """
  @spec load(t(), term()) :: t()
  def load(changeset, load) do
    query =
      changeset.resource
      |> Ash.Query.new()
      |> Map.put(:errors, [])
      |> Ash.Query.load(changeset.load)
      |> Ash.Query.load(load)

    changeset = %{
      changeset
      | load: Enum.concat(changeset.load || [], List.wrap(load))
    }

    Enum.reduce(query.errors, changeset, &add_error(&2, &1))
  end

  @doc """
  Ensures that the given attributes are selected.

  The first call to `select/2` will *limit* the fields to only the provided fields.
  Use `ensure_selected/2` to say "select this field (or these fields) without deselecting anything else".

  See `select/2` for more.
  """
  def ensure_selected(changeset, fields) do
    if changeset.select do
      Ash.Changeset.select(changeset, List.wrap(fields))
    else
      to_select = Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)

      Ash.Changeset.select(changeset, to_select)
    end
  end

  @doc """
  Ensure the the specified attributes are `nil` in the changeset results.
  """
  def deselect(changeset, fields) do
    select =
      if changeset.select do
        changeset.select
      else
        changeset.resource
        |> Ash.Resource.Info.attributes()
        |> Enum.map(& &1.name)
      end

    select = select -- List.wrap(fields)

    select(changeset, select, replace?: true)
  end

  def selecting?(changeset, field) do
    case changeset.select do
      nil ->
        not is_nil(Ash.Resource.Info.attribute(changeset.resource, field))

      select ->
        if field in select do
          true
        else
          attribute = Ash.Resource.Info.attribute(changeset.resource, field)

          attribute && attribute.primary_key?
        end
    end || loading?(changeset, field)
  end

  @doc """
  Returns true if the field/relationship or path to field/relationship is being loaded.

  It accepts an atom or a list of atoms, which is treated for as a "path", i.e:

      Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?([:friends, :enemies, :score])
      iex> true

      Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?([:friends, :score])
      iex> false

      Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?(:friends)
      iex> true
  """
  def loading?(changeset, path) do
    changeset.resource
    |> Ash.Query.new()
    |> Ash.Query.load(changeset.load)
    |> Ash.Query.loading?(path)
  end

  @doc """
  Returns a list of attributes, aggregates, relationships, and calculations that are being loaded

  Provide a list of field types to narrow down the returned results.
  """
  def accessing(
        changeset,
        types \\ [:attributes, :relationships, :calculations, :attributes],
        only_public? \\ true
      ) do
    changeset.resource
    |> Ash.Query.new()
    |> Ash.Query.load(changeset.load)
    |> Map.put(:select, changeset.select)
    |> Ash.Query.accessing(types, only_public?)
  end

  @spec fully_atomic_changeset(
          resource :: Ash.Resource.t(),
          action :: atom() | Ash.Resource.Actions.action(),
          params :: map(),
          opts :: Keyword.t()
        ) :: Ash.Changeset.t() | {:not_atomic, String.t()}
  def fully_atomic_changeset(resource, action, params, opts \\ []) do
    action =
      case action do
        action when is_atom(action) -> Ash.Resource.Info.action(resource, action)
        action -> action
      end

    if action.manual do
      {:not_atomic,
       "manual action `#{inspect(resource)}.#{action.name}` cannot be performed atomically"}
    else
      changeset =
        resource
        |> Ash.Changeset.new()
        |> then(fn changeset ->
          if data = opts[:data] do
            Map.put(changeset, :data, data)
          else
            Map.put(changeset, :data, %OriginalDataNotAvailable{})
          end
        end)
        |> Map.put(:context, opts[:context] || %{})
        |> Map.put(:params, params)
        |> Map.put(:action, action)
        |> Map.put(:no_atomic_constraints, opts[:no_atomic_constraints] || [])
        |> Map.put(:action_type, action.type)
        |> Map.put(:atomics, opts[:atomics] || [])
        |> Ash.Changeset.set_tenant(opts[:tenant])

      {changeset, _opts} =
        Ash.Actions.Helpers.set_context_and_get_opts(
          opts[:domain] || Ash.Resource.Info.domain(resource),
          changeset,
          opts
        )

      changeset = set_phase(changeset, :atomic)

      with :ok <- verify_notifiers_support_atomic(resource, action),
           %Ash.Changeset{} = changeset <-
             atomic_params(changeset, action, params, opts),
           %Ash.Changeset{} = changeset <- set_argument_defaults(changeset, action),
           %Ash.Changeset{} = changeset <- require_arguments(changeset, action),
           %Ash.Changeset{} = changeset <- atomic_changes(changeset, action),
           %Ash.Changeset{} = changeset <- atomic_defaults(changeset),
           %Ash.Changeset{} = changeset <- atomic_update(changeset, opts[:atomic_update] || []),
           %Ash.Changeset{} = changeset <-
             hydrate_atomic_refs(changeset, opts[:actor], Keyword.take(opts, [:eager?])),
           %Ash.Changeset{} = changeset <- apply_atomic_constraints(changeset, opts[:actor]) do
        changeset
      else
        {:not_atomic, reason} ->
          {:not_atomic, reason}
      end
    end
    |> case do
      {:not_atomic, reason} ->
        {:not_atomic, reason}

      changeset ->
        clear_phase(changeset)
    end
  end

  defp atomic_defaults(changeset) do
    with %__MODULE__{} <- atomic_static_update_defaults(changeset) do
      atomic_lazy_update_defaults(changeset)
    end
  end

  defp atomic_static_update_defaults(changeset) do
    changeset.resource
    |> Ash.Resource.Info.static_default_attributes(:update)
    |> Enum.reject(fn attribute ->
      Ash.Changeset.changing_attribute?(changeset, attribute.name)
    end)
    |> Enum.reduce_while(changeset, fn attribute, changeset ->
      case Ash.Type.cast_atomic(
             attribute.type,
             attribute.update_default,
             attribute.constraints
           ) do
        {:atomic, atomic} ->
          {:cont, atomic_update(changeset, attribute.name, {:atomic, atomic})}

        {:ok, value} ->
          allow_nil? =
            attribute.allow_nil? and attribute.name not in changeset.action.require_attributes

          if is_nil(value) and !allow_nil? do
            {:cont, add_required_attribute_error(changeset, attribute)}
          else
            {:cont,
             %{
               changeset
               | attributes: Map.put(changeset.attributes, attribute.name, value),
                 atomics: Keyword.delete(changeset.atomics, attribute.name)
             }
             |> store_casted_attribute(attribute.name, value, true)}
          end

        {:error, error} ->
          {:cont,
           add_invalid_errors(attribute.update_default, :attribute, changeset, attribute, error)}

        {:not_atomic, reason} ->
          {:halt, {:not_atomic, reason}}
      end
    end)
  end

  defp atomic_lazy_update_defaults(changeset) do
    changeset.resource
    |> Ash.Resource.Info.lazy_matching_default_attributes(:update)
    |> Enum.concat(
      Ash.Resource.Info.lazy_non_matching_default_attributes(changeset.resource, :update)
    )
    |> Enum.reject(fn attribute ->
      Ash.Changeset.changing_attribute?(changeset, attribute.name)
    end)
    |> Enum.reduce_while(changeset, fn attribute, changeset ->
      cond do
        attribute.update_default == (&DateTime.utc_now/0) ->
          {:cont, atomic_update(changeset, attribute.name, Ash.Expr.expr(now()))}

        attribute.update_default == (&Ash.UUID.generate/0) ->
          {:cont, atomic_update(changeset, attribute.name, Ash.Expr.expr(^Ash.UUID.generate()))}

        true ->
          {:halt,
           {:not_atomic,
            "update_default for `#{inspect(attribute.name)}` cannot be done atomically: #{inspect(attribute.update_default)}"}}
      end
    end)
  end

  defp verify_notifiers_support_atomic(resource, action) do
    resource
    |> Ash.Resource.Info.notifiers()
    |> Enum.filter(fn notifier ->
      notifier.requires_original_data?(resource, action)
    end)
    |> case do
      [] ->
        :ok

      notifiers ->
        {:not_atomic,
         "notifiers #{inspect(notifiers)} require original data for #{inspect(resource)}.#{action.name}"}
    end
  end

  defp atomic_changes(changeset, action) do
    changes =
      action.changes
      |> Enum.concat(Ash.Resource.Info.changes(changeset.resource, changeset.action_type))
      |> then(fn changes ->
        if changeset.action.skip_global_validations? do
          changes
        else
          Enum.concat(
            changes,
            Ash.Resource.Info.validations(changeset.resource, changeset.action_type)
          )
        end
      end)

    context = %{
      actor: changeset.context[:private][:actor],
      tenant: changeset.tenant,
      authorize?: changeset.context[:private][:authorize?] || false,
      tracer: changeset.context[:private][:tracer]
    }

    changeset = set_phase(changeset, :atomic)

    Enum.reduce_while(changes, changeset, fn
      %{change: _} = change, changeset ->
        case run_atomic_change(changeset, change, context) do
          {:not_atomic, reason} ->
            {:halt, {:not_atomic, reason}}

          changeset ->
            {:cont, changeset}
        end

      %{validation: _} = validation, changeset ->
        case run_atomic_validation(changeset, validation, context) do
          {:not_atomic, reason} ->
            {:halt, {:not_atomic, reason}}

          changeset ->
            {:cont, changeset}
        end
    end)
    |> case do
      {:not_atomic, reason} -> {:not_atomic, reason}
      %__MODULE__{} = changeset -> clear_phase(changeset)
    end
  end

  @doc false
  def split_atomic_conditions(%{where: []} = validation, _changeset, _actor, _context) do
    {:ok, validation}
  end

  def split_atomic_conditions(
        %{where: [{module, opts} | rest]} = validation,
        changeset,
        actor,
        context
      ) do
    if module.has_validate?() do
      opts =
        Ash.Actions.Helpers.templated_opts(opts, actor, changeset.arguments, changeset.context)

      {:ok, opts} = module.init(opts)

      case module.validate(
             changeset,
             opts,
             context
           ) do
        :ok -> split_atomic_conditions(%{validation | where: rest}, changeset, actor, context)
        _ -> :skip
      end
    else
      if module.atomic?() do
        case split_atomic_conditions(%{validation | where: rest}, changeset, actor, context) do
          {:ok, %{where: remaining} = validation} ->
            {:ok, %{validation | where: [{module, opts} | remaining]}}

          other ->
            other
        end
      else
        raise "Module #{module} must define one of `atomic/3` or `validate/3`"
      end
    end
  end

  @doc false
  def run_atomic_validation(changeset, %{where: where} = validation, context) do
    with {:atomic, condition} <- atomic_condition(where, changeset, context) do
      case condition do
        false ->
          changeset

        true ->
          do_run_atomic_validation(changeset, validation, context)

        where_condition ->
          do_run_atomic_validation(changeset, validation, context, where_condition)
      end
    end
  end

  defp do_run_atomic_validation(
         changeset,
         %{validation: {module, validation_opts}, message: message},
         context,
         where_condition \\ nil
       ) do
    case List.wrap(
           module.atomic(
             changeset,
             validation_opts,
             struct(Ash.Resource.Validation.Context, Map.put(context, :message, message))
           )
         ) do
      [{:atomic, _, _, _} | _] = atomics ->
        Enum.reduce(atomics, changeset, fn
          {:atomic, _fields, condition_expr, error_expr}, changeset ->
            condition_expr =
              if where_condition do
                expr(^where_condition and ^condition_expr)
              else
                condition_expr
              end

            condition_expr = rewrite_atomics(changeset, condition_expr)

            validate_atomically(changeset, condition_expr, error_expr)
        end)

      [:ok] ->
        changeset

      [{:error, error}] ->
        if message do
          error = override_validation_message(error, message)
          Ash.Changeset.add_error(changeset, error)
        else
          Ash.Changeset.add_error(changeset, error)
        end

      [{:not_atomic, error}] ->
        {:not_atomic, error}
    end
  end

  defp rewrite_atomics(changeset, expr) do
    Ash.Expr.walk_template(expr, fn
      {:_atomic_ref, ref} ->
        atomic_ref(changeset, ref)

      other ->
        other
    end)
  end

  def run_atomic_change(
        changeset,
        %{change: {module, change_opts}, where: where},
        context
      ) do
    change_opts =
      Ash.Expr.fill_template(
        change_opts,
        changeset.context.private[:actor],
        changeset.arguments,
        changeset.context
      )

    with {:atomic, changeset, atomic_changes, validations} <-
           atomic_with_changeset(
             module.atomic(changeset, change_opts, struct(Ash.Resource.Change.Context, context)),
             changeset
           ),
         {:atomic, condition} <- atomic_condition(where, changeset, context) do
      changeset =
        case condition do
          true ->
            apply_atomic_update(changeset, atomic_changes)

          false ->
            changeset

          condition ->
            atomic_changes =
              Map.new(atomic_changes, fn {key, value} ->
                new_value =
                  expr(
                    if ^condition do
                      ^value
                    else
                      ^ref(key)
                    end
                  )

                {key, new_value}
              end)

            apply_atomic_update(changeset, atomic_changes)
        end

      Enum.reduce(
        List.wrap(validations),
        changeset,
        fn {:atomic, _, condition_expr, error_expr}, changeset ->
          validate_atomically(changeset, condition_expr, error_expr)
        end
      )
    else
      {:ok, changeset} ->
        changeset

      {:not_atomic, reason} ->
        {:not_atomic, reason}

      :ok ->
        changeset
    end
  end

  defp apply_atomic_update(changeset, atomics) when is_list(atomics) or is_map(atomics) do
    Enum.reduce(atomics, changeset, fn {key, value}, changeset ->
      apply_atomic_update(changeset, key, value)
    end)
  end

  defp apply_atomic_update(changeset, key, {:atomic, value}) do
    %{
      changeset
      | atomics: Keyword.put(changeset.atomics, key, value),
        no_atomic_constraints: [key | changeset.no_atomic_constraints]
    }
    |> record_atomic_update_for_atomic_upgrade(key, value)
  end

  defp apply_atomic_update(changeset, key, value) do
    attribute = Ash.Resource.Info.attribute(changeset.resource, key)

    value =
      Ash.Expr.walk_template(value, fn
        {:_atomic_ref, field} ->
          atomic_ref(changeset, field)

        other ->
          other
      end)

    case Ash.Type.cast_atomic(attribute.type, value, attribute.constraints) do
      {:atomic, value} ->
        value =
          if attribute.primary_key? do
            value
          else
            set_error_field(value, attribute.name)
          end

        %{changeset | atomics: Keyword.put(changeset.atomics, attribute.name, value)}
        |> record_atomic_update_for_atomic_upgrade(attribute.name, value)

      {:not_atomic, message} ->
        {:not_atomic,
         "Cannot atomically update #{inspect(changeset.resource)}.#{attribute.name}: #{message}"}

      {:ok, value} ->
        allow_nil? =
          attribute.allow_nil? and attribute.name not in changeset.action.require_attributes

        if is_nil(value) and !allow_nil? do
          add_required_attribute_error(changeset, attribute)
        else
          %{changeset | attributes: Map.put(changeset.attributes, attribute.name, value)}
          |> store_casted_attribute(attribute.name, value, true)
        end

      {:error, error} ->
        {:cont, add_invalid_errors(value, :attribute, changeset, attribute, error)}
    end
  end

  defp atomic_with_changeset({:atomic, changeset, atomics}, _changeset),
    do: {:atomic, changeset, atomics, []}

  defp atomic_with_changeset({:atomic, atomics}, changeset), do: {:atomic, changeset, atomics, []}
  defp atomic_with_changeset(other, _), do: other

  defp validate_atomically(changeset, condition_expr, error_expr) do
    %{
      changeset
      | atomic_validations: [{condition_expr, error_expr} | changeset.atomic_validations]
    }
  end

  @doc """
  Gets a reference to a field, or the current atomic update expression of that field.
  """
  def atomic_ref(changeset, field) do
    case Keyword.fetch(changeset.atomics, field) do
      {:ok, atomic} ->
        attribute = Ash.Resource.Info.attribute(changeset.resource, field)
        Ash.Expr.expr(type(^atomic, ^attribute.type, ^attribute.constraints))

      :error ->
        case Map.fetch(changeset.attributes, field) do
          {:ok, new_value} ->
            attribute = Ash.Resource.Info.attribute(changeset.resource, field)
            Ash.Expr.expr(type(^new_value, ^attribute.type, ^attribute.constraints))

          :error ->
            expr(^ref(field))
        end
    end
  end

  @doc false
  # Returns either an appropriate expression for an atomic condition or a value
  # indicated that the condition cannot be handled atomically.
  #
  # Validation logic matches on failure. So, for example, `present(:field)` is
  # going to _match_ when `:field` is `nil`. However, when applying this logic
  # to a `where` condition, the opposite is desired. The end result is kinda
  # ugly because it can end up reading like "not is not equal to" but
  # ultimately produces the correct results.
  @spec atomic_condition([{module(), keyword()}], Ash.Changeset.t(), map()) ::
          {:atomic, Ash.Expr.t() | boolean()} | {:not_atomic, String.t()}
  def atomic_condition(where, changeset, context) do
    Enum.reduce_while(where, {:atomic, true}, fn {module, validation_opts},
                                                 {:atomic, condition_expr} ->
      case module.atomic(
             changeset,
             validation_opts,
             struct(Ash.Resource.Validation.Context, context)
           ) do
        :ok ->
          {:cont, {:atomic, condition_expr}}

        {:atomic, _, expr, _as_error} ->
          {:cont, {:atomic, atomic_condition_expr(condition_expr, expr)}}

        {:error, _} ->
          # Error from the validator, so the validations should just fail with
          # a `false` expression.
          {:halt, {:atomic, false}}

        {:not_atomic, _reason} = not_atomic ->
          {:halt, not_atomic}

        atomic_conditions when is_list(atomic_conditions) ->
          atomic_conditions
          |> Enum.reduce(condition_expr, fn {:atomic, _, expr, _as_error}, reduced_expr ->
            atomic_condition_expr(reduced_expr, expr)
          end)
          |> then(&{:cont, {:atomic, &1}})
      end
    end)
  end

  # This is not expressly necessary as `expr(true and not ^new_expr)` would also
  # work just fine, but the final output from omitting `true` is much easier to
  # read if debugging.
  defp atomic_condition_expr(true, expr) do
    expr(not (^expr))
  end

  defp atomic_condition_expr(condition_expr, expr) do
    expr(^condition_expr and not (^expr))
  end

  defp atomic_params(changeset, action, params, opts) do
    if opts[:assume_casted?] do
      Enum.reduce_while(params, changeset, fn {key, value}, changeset ->
        cond do
          has_argument?(action, key) ->
            {:cont, %{changeset | arguments: Map.put(changeset.arguments, key, value)}}

          attribute = Ash.Resource.Info.attribute(changeset.resource, key) ->
            {:cont, atomic_update(changeset, attribute.name, {:atomic, value})}

          match?("_" <> _, key) ->
            {:cont, changeset}

          :* in List.wrap(opts[:skip_unknown_inputs]) ->
            {:cont, changeset}

          key in List.wrap(opts[:skip_unknown_inputs]) ->
            {:cont, changeset}

          true ->
            {:cont,
             add_error(
               changeset,
               NoSuchInput.exception(
                 resource: changeset.resource,
                 action: action.name,
                 input: key,
                 inputs: Ash.Resource.Info.action_inputs(changeset.resource, action.name)
               )
             )}
        end
      end)
    else
      Enum.reduce_while(params, changeset, fn {key, value}, changeset ->
        cond do
          has_argument?(action, key) ->
            {:cont, set_argument(changeset, key, value)}

          attribute = Ash.Resource.Info.attribute(changeset.resource, key) ->
            cond do
              attribute.name in action.accept ->
                {:cont, atomic_update(changeset, attribute.name, value)}

              :* in List.wrap(opts[:skip_unknown_inputs]) ->
                {:cont, changeset}

              key in List.wrap(opts[:skip_unknown_inputs]) ->
                {:cont, changeset}

              match?("_" <> _, key) ->
                {:cont, changeset}

              true ->
                {:cont,
                 add_error(
                   changeset,
                   NoSuchInput.exception(
                     resource: changeset.resource,
                     action: action.name,
                     input: key,
                     inputs: Ash.Resource.Info.action_inputs(changeset.resource, action.name)
                   )
                 )}
            end

          match?("_" <> _, key) ->
            {:cont, changeset}

          :* in List.wrap(opts[:skip_unknown_inputs]) ->
            {:cont, changeset}

          key in List.wrap(opts[:skip_unknown_inputs]) ->
            {:cont, changeset}

          true ->
            {:cont,
             add_error(
               changeset,
               NoSuchInput.exception(
                 resource: changeset.resource,
                 action: action.name,
                 input: key,
                 inputs: Ash.Resource.Info.action_inputs(changeset.resource, action.name)
               )
             )}
        end
      end)
    end
  end

  defp set_error_field(expr, field) do
    Ash.Filter.map(expr, fn
      %Ash.Query.Function.Error{arguments: [module, nested_expr]} = func
      when is_map(nested_expr) and not is_struct(nested_expr) ->
        %{func | arguments: [module, Map.put(nested_expr, :field, field)]}

      other ->
        other
    end)
  end

  @manage_types [:append_and_remove, :append, :remove, :direct_control, :create]

  @doc """
  Constructs a changeset for a given action, and validates it.

  Calls `for_create/4`, `for_update/4` or `for_destroy/4` based on the type of action passed in.

  See those functions for more explanation.
  """
  def for_action(initial, action, params \\ %{}, opts \\ []) do
    resource =
      case initial do
        %__MODULE__{resource: resource} -> resource
        %resource{} -> resource
        resource -> resource
      end

    action = get_action_entity(resource, action)

    case action.type do
      :create ->
        for_create(initial, action, params, opts)

      :update ->
        for_update(initial, action, params, opts)

      :destroy ->
        for_destroy(initial, action, params, opts)

      :read ->
        raise ArgumentError,
              "Passed a read action `#{inspect(resource)}.#{action.name}` into `Ash.Changeset.for_action/4`. Use `Ash.Query.for_read/4` instead."
    end
  end

  @for_create_opts [
    require?: [
      type: :boolean,
      default: false,
      doc:
        "If set to `false`, values are only required when the action is run (instead of immediately)."
    ],
    actor: [
      type: :any,
      doc:
        "set the actor, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)"
    ],
    authorize?: [
      type: :any,
      doc:
        "set authorize?, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)"
    ],
    tracer: [
      type: {:wrap_list, {:behaviour, Ash.Tracer}},
      doc:
        "A tracer to use. Will be carried over to the action. For more information see `Ash.Tracer`."
    ],
    tenant: [
      type: {:protocol, Ash.ToTenant},
      doc: "set the tenant on the changeset"
    ],
    skip_unknown_inputs: [
      type: {:wrap_list, {:or, [:atom, :string]}},
      doc:
        "A list of inputs that, if provided, will be ignored if they are not recognized by the action. Use `:*` to indicate all unknown keys."
    ],
    context: [
      type: :map,
      doc: "Context to set on the query, changeset, or input"
    ],
    private_arguments: [
      type: :map,
      doc: "Private argument values to set before validations and changes.",
      default: %{}
    ],
    return_skipped_upsert?: [
      type: :boolean,
      default: false,
      doc:
        "If `true`, and a record was *not* upserted because its filter prevented the upsert, the original record (which was *not* upserted) will be returned."
    ]
  ]

  @doc false
  def for_create_opts, do: @for_create_opts

  @doc """
  Constructs a changeset for a given create action, and validates it.

  Anything that is modified prior to `for_create/4` is validated against the rules of the action, while *anything after it is not*.
  This runs any `change`s contained on your action. To have your logic execute *only* during the action, you can use `after_action/2`
  or `before_action/2`.

  Multitenancy is *not* validated until an action is called. This allows you to avoid specifying a tenant until just before calling
  the domain action.

  ### Params
  `params` may be attributes, relationships, or arguments. You can safely pass user/form input directly into this function.
  Only public attributes and relationships are supported. If you want to change private attributes as well, see the
  Customization section below. `params` are stored directly as given in the `params` field of the changeset, which is used

  ### Opts

  #{Spark.Options.docs(@for_create_opts)}

  ### Customization

  A changeset can be provided as the first argument, instead of a resource, to allow
  setting specific attributes ahead of time.

  For example:

      MyResource
      |> Ash.Changeset.new()
      |> Ash.Changeset.change_attribute(:foo, 1)
      |> Ash.Changeset.for_create(:create, ...opts)

  Once a changeset has been validated by `for_create/4` (or `for_update/4`), it isn't validated again in the action.
  New changes added are validated individually, though. This allows you to create a changeset according
  to a given action, and then add custom changes if necessary.

  ### What does this function do?

  The following steps are run when calling `Ash.Changeset.for_create/4`.

  - Cast input params | This is any arguments in addition to any accepted attributes
  - Set argument defaults
  - Require any missing arguments
  - Validate all provided attributes are accepted
  - Require any accepted attributes that are `allow_nil?` false
  - Set any default values for attributes
  - Run action changes & validations
  - Run validations, or add them in `before_action` hooks if using `d:Ash.Resource.Dsl.actions.create.validate|before_action?`. Any global validations are skipped if the action has `skip_global_validations?` set to `true`.
  """
  def for_create(initial, action, params \\ %{}, opts \\ []) do
    changeset =
      case initial do
        %__MODULE__{action_type: :create} = changeset ->
          changeset

        resource when is_atom(resource) ->
          new(resource)

        other ->
          raise ArgumentError,
            message: """
            Initial must be a changeset with the action type of `:create`, or a resource.

            Got: #{inspect(other)}
            """
      end

    action =
      get_action_entity(changeset.resource, action) ||
        raise_no_action(changeset.resource, action, :create)

    upsert_condition =
      case opts[:upsert_condition] do
        nil -> action && action.upsert_condition
        other -> other
      end

    case action do
      %Ash.Resource.Actions.Update{name: name} ->
        raise ArgumentError,
          message: """
          Action #{inspect(changeset.resource)}.#{name} was passed to `Ash.Changeset.for_create`, but it is an update action.

          Perhaps you meant to call `Ash.Changeset.for_create` instead?
          """

      _ ->
        :ok
    end

    changeset
    |> set_context(%{
      private: %{
        upsert?: opts[:upsert?] || (action && action.upsert?) || false,
        return_skipped_upsert?:
          opts[:return_skipped_upsert?] || (action && action.return_skipped_upsert?) || false,
        upsert_identity: opts[:upsert_identity] || (action && action.upsert_identity),
        upsert_fields:
          expand_upsert_fields(
            opts[:upsert_fields] || (action && action.upsert_fields),
            changeset.resource
          ),
        upsert_condition: upsert_condition
      }
    })
    |> then(fn
      changeset when upsert_condition != nil -> filter(changeset, upsert_condition)
      changeset -> changeset
    end)
    |> do_for_action(action, params, opts)
  end

  @for_update_opts @for_create_opts

  @doc false
  def for_update_opts, do: @for_update_opts

  @doc """
  Constructs a changeset for a given update action, and validates it.

  Anything that is modified prior to `for_update/4` is validated against the rules of the action, while *anything after it is not*.

  ### What does this function do?

  The following steps are run when calling `Ash.Changeset.for_update/4`.

  - Cast input params | This is any arguments in addition to any accepted attributes
  - Set argument defaults
  - Require any missing arguments
  - Validate all provided attributes are accepted
  - Require any accepted attributes that are `allow_nil?` false
  - Set any default values for attributes
  - Run action changes & validations
  - Run validations, or add them in `before_action` hooks if using `d:Ash.Resource.Dsl.actions.update.validate|before_action?`. Any global validations are skipped if the action has `skip_global_validations?` set to `true`.
  """
  def for_update(initial, action, params \\ %{}, opts \\ []) do
    changeset =
      case initial do
        # We accept :destroy here to support soft deletes
        %__MODULE__{action_type: type} = changeset when type in [:update, :destroy] ->
          changeset

        %mod{} = struct when mod != __MODULE__ ->
          new(struct)

        other ->
          raise ArgumentError,
            message: """
            Initial must be a changeset with the action type of `:update` or `:destroy`, or a record.

            Got: #{inspect(other)}
            """
      end

    do_for_action(changeset, action, params, opts)
  end

  @doc """
  Constructs a changeset for a given destroy action, and validates it.

  ### Opts

  * `:actor` - set the actor, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)
  * `:tenant` - set the tenant on the changeset
  * `:private_arguments` - set private arguments on the changeset before validations and changes are run

  Anything that is modified prior to `for_destroy/4` is validated against the rules of the action, while *anything after it is not*.

  Once a changeset has been validated by `for_destroy/4`, it isn't validated again in the action.
  New changes added are validated individually, though. This allows you to create a changeset according
  to a given action, and then add custom changes if necessary.

  ### What does this function do?

  The following steps are run when calling `Ash.Changeset.for_destroy/4`.

  - Cast input params | This is any arguments in addition to any accepted attributes
  - Set argument defaults
  - Require any missing arguments
  - Validate all provided attributes are accepted
  - Require any accepted attributes that are `allow_nil?` false
  - Set any default values for attributes
  - Run action changes & validations
  - Run validations, or add them in `before_action` hooks if using `d:Ash.Resource.Dsl.actions.destroy.validate|before_action?`. Any global validations are skipped if the action has `skip_global_validations?` set to `true`.
  """
  def for_destroy(initial, action_or_name, params \\ %{}, opts \\ []) do
    changeset =
      case initial do
        %__MODULE__{} = changeset ->
          changeset
          |> Map.put(:action_type, :destroy)

        %_{} = struct ->
          struct
          |> new()
          |> Map.put(:action_type, :destroy)

        other ->
          raise ArgumentError,
            message: """
            Initial must be a changeset with the action type of `:destroy`, or a record.

            Got: #{inspect(other)}
            """
      end

    action =
      get_action_entity(changeset.resource, action_or_name) ||
        raise_no_action(changeset.resource, action_or_name, :destroy)

    domain =
      changeset.domain || opts[:domain] || Ash.Resource.Info.domain(changeset.resource) ||
        Ash.Actions.Helpers.maybe_embedded_domain(changeset.resource) ||
        raise ArgumentError,
          message:
            "Could not determine domain for changeset. Provide the `domain` option or configure a domain in the resource directly."

    changeset = %{changeset | domain: domain}

    if changeset.valid? do
      if action do
        try do
          if action.soft? do
            do_for_action(%{changeset | action_type: :destroy}, action, params, opts)
          else
            {changeset, opts} =
              Ash.Actions.Helpers.set_context_and_get_opts(
                domain,
                changeset,
                opts
              )

            name =
              fn ->
                "changeset:" <>
                  Ash.Resource.Info.trace_name(changeset.resource) <> ":#{action.name}"
              end

            Ash.Tracer.span :changeset,
                            name,
                            opts[:tracer] do
              Ash.Tracer.telemetry_span [:ash, :changeset], fn ->
                %{
                  resource_short_name: Ash.Resource.Info.short_name(changeset.resource)
                }
              end do
                metadata = fn ->
                  %{
                    resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
                    resource: changeset.resource,
                    actor: opts[:actor],
                    tenant: opts[:tenant],
                    action: action.name,
                    authorize?: opts[:authorize?]
                  }
                end

                Ash.Tracer.set_metadata(opts[:tracer], :changeset, metadata)

                changeset =
                  Enum.reduce(opts[:private_arguments] || %{}, changeset, fn {k, v}, changeset ->
                    Ash.Changeset.set_argument(changeset, k, v)
                  end)

                changeset
                |> Map.put(:action, action)
                |> handle_errors(action.error_handler)
                |> set_actor(opts)
                |> set_authorize(opts)
                |> set_tracer(opts)
                |> set_tenant(opts[:tenant] || changeset.tenant)
                |> cast_params(action, params, opts)
                |> set_argument_defaults(action)
                |> require_arguments(action)
                |> validate_attributes_accepted(action)
                |> run_action_changes(
                  action,
                  opts[:actor],
                  opts[:authorize?],
                  opts[:tracer],
                  metadata
                )
                |> add_validations(opts[:tracer], metadata, opts[:actor])
                |> mark_validated(action.name)
                |> Map.put(:__validated_for_action__, action.name)
              end
            end
          end
        rescue
          e ->
            reraise Ash.Error.to_error_class(e,
                      stacktrace: __STACKTRACE__,
                      bread_crumbs: [
                        "building changeset for #{inspect(changeset.resource)}.#{action.name}"
                      ]
                    ),
                    __STACKTRACE__
        end
      else
        raise_no_action(changeset.resource, action_or_name, :destroy)
      end
    else
      changeset
    end
  end

  @doc """
  Adds multiple atomic changes to the changeset

  See `atomic_update/3` for more information.
  """
  @spec atomic_update(t(), map() | Keyword.t()) :: t()
  def atomic_update(changeset, atomics) when is_list(atomics) or is_map(atomics) do
    Enum.reduce(atomics, changeset, fn {key, value}, changeset ->
      atomic_update(changeset, key, value)
    end)
  end

  @doc """
  Adds an atomic change to the changeset.

  Atomic changes are applied by the data layer, and as such have guarantees that are not
  given by changes that are based on looking at the previous value and updating it. Here
  is an example of a change that is not safe to do concurrently:

  ```elixir
  change fn changeset, _ ->
    Ash.Changeset.change_attribute(changeset, :score, changeset.data.score + 1)
  end
  ```

  If two processes run this concurrently, they will both read the same value of `score`, and
  set the new score to the same value. This means that one of the increments will be lost.
  If you were to instead do this using `atomic_update`, you would get the correct result:

  ```elixir
  Ash.Changeset.atomic_update(changeset, :score, expr(score + 1))
  ```

  There are drawbacks/things to consider, however. The first is that atomic update results
  are not known until after the action is run. The following functional validation would not
  be able to enforce the score being less than 10, because the atomic happens after the validation.

  ```elixir
  validate fn changeset, _ ->
    if Ash.Changeset.get_attribute(changeset, :score) < 10 do
      :ok
    else
      {:error, field: :score, message: "must be less than 10"}
    end
  end
  ```

  If you want to use atomic updates, it is suggested to write module-based validations & changes,
  and implement the appropriate atomic callbacks on those modules. All builtin validations and changes
  implement these callbacks in addition to the standard callbacks. Validations will only be run atomically
  when the entire action is being run atomically or if one of the relevant fields is being updated atomically.
  """
  @spec atomic_update(t(), atom(), {:atomic, Ash.Expr.t()} | Ash.Expr.t()) :: t()
  def atomic_update(changeset, key, {:atomic, value}) do
    %{
      changeset
      | atomics: Keyword.put(changeset.atomics, key, value),
        no_atomic_constraints: [key | changeset.no_atomic_constraints]
    }
  end

  def atomic_update(changeset, key, value) do
    attribute =
      Ash.Resource.Info.attribute(changeset.resource, key) ||
        raise "Unknown attribute `#{inspect(changeset.resource)}.#{inspect(key)}`"

    value =
      Ash.Expr.walk_template(value, fn
        {:_atomic_ref, field} ->
          atomic_ref(changeset, field)

        other ->
          other
      end)

    case Ash.Type.cast_atomic(attribute.type, value, attribute.constraints) do
      {:atomic, value} ->
        value =
          if attribute.primary_key? do
            value
          else
            set_error_field(value, attribute.name)
          end

        %{changeset | atomics: Keyword.put(changeset.atomics, attribute.name, value)}
        |> record_atomic_update_for_atomic_upgrade(attribute.name, value)

      {:ok, value} ->
        allow_nil? =
          if is_nil(changeset.action) do
            true
          else
            attribute.allow_nil? and attribute.name not in changeset.action.require_attributes
          end

        if is_nil(value) and !allow_nil? do
          add_required_attribute_error(changeset, attribute)
        else
          %{
            changeset
            | attributes: Map.put(changeset.attributes, attribute.name, value),
              atomics: Keyword.delete(changeset.atomics, attribute.name)
          }
          |> store_casted_attribute(attribute.name, value, true)
        end

      {:error, error} ->
        add_invalid_errors(value, :attribute, changeset, attribute, error)

      {:not_atomic, message} ->
        add_error(
          changeset,
          "Cannot atomically update #{inspect(changeset.resource)}.#{attribute.name}: #{message}"
        )
    end
  end

  @doc false
  def handle_allow_nil_atomics(changeset, actor) do
    changeset.atomics
    |> Enum.reduce(changeset, fn {key, value}, changeset ->
      attribute = Ash.Resource.Info.attribute(changeset.resource, key)

      if attribute.primary_key? do
        changeset
      else
        allow_nil? =
          attribute.allow_nil? and attribute.name not in changeset.action.require_attributes

        value =
          if allow_nil? || not Ash.Expr.can_return_nil?(value) do
            value
          else
            expr(
              if is_nil(^value) do
                error(
                  ^Ash.Error.Changes.Required,
                  %{
                    field: ^attribute.name,
                    type: ^:attribute,
                    resource: ^changeset.resource
                  }
                )
              else
                ^value
              end
            )
          end

        %{changeset | atomics: Keyword.put(changeset.atomics, key, value)}
      end
    end)
    |> Ash.Changeset.hydrate_atomic_refs(actor, eager?: true)
  end

  @doc """
  Set the result of the action. This will prevent running the underlying datalayer behavior
  """
  @spec set_result(t(), term) :: t()
  def set_result(changeset, result) do
    set_context(changeset, %{private: %{action_result: result}})
  end

  @doc """
  Turns the special case {:replace, fields}, :replace_all and {:replace_all_except, fields} upsert_fields
  options into a list of fields
  """
  def expand_upsert_fields({:replace, fields}, _) do
    fields
  end

  def expand_upsert_fields(:replace_all, resource) do
    resource
    |> Ash.Resource.Info.attributes()
    |> Enum.map(fn %{name: name} -> name end)
  end

  def expand_upsert_fields({:replace_all_except, except_fields}, resource) do
    resource
    |> Ash.Resource.Info.attributes()
    |> Enum.map(fn %{name: name} -> name end)
    |> Enum.reject(fn name -> name in except_fields end)
  end

  def expand_upsert_fields(fields, _), do: fields

  @spec set_on_upsert(t(), list(atom)) :: Keyword.t()
  def set_on_upsert(changeset, upsert_keys) do
    keys = upsert_keys || Ash.Resource.Info.primary_key(changeset.resource)

    if changeset.context[:private][:upsert_fields] do
      Keyword.new(changeset.context[:private][:upsert_fields], fn key ->
        {key, Ash.Changeset.get_attribute(changeset, key)}
      end)
    else
      explicitly_changing_attributes =
        Enum.map(
          Map.keys(changeset.attributes) -- Map.get(changeset, :defaults, []) -- keys,
          fn key ->
            {key, Ash.Changeset.get_attribute(changeset, key)}
          end
        )

      changeset
      |> upsert_update_defaults()
      |> Keyword.merge(explicitly_changing_attributes)
    end
  end

  defp upsert_update_defaults(changeset) do
    changeset.resource
    |> static_defaults()
    |> Enum.concat(lazy_matching_defaults(changeset.resource))
    |> Enum.concat(lazy_non_matching_defaults(changeset.resource))
  end

  defp static_defaults(resource) do
    resource
    |> Ash.Resource.Info.static_default_attributes(:update)
    |> Enum.map(&{&1.name, &1.update_default})
  end

  defp lazy_non_matching_defaults(resource) do
    resource
    |> Ash.Resource.Info.lazy_non_matching_default_attributes(:update)
    |> Enum.map(&{&1.name, &1.update_default})
  end

  defp lazy_matching_defaults(resource) do
    resource
    |> Ash.Resource.Info.lazy_matching_default_attributes(:update)
    |> Enum.group_by(& &1.update_default)
    |> Enum.flat_map(fn {default_fun, attributes} ->
      default_value =
        case default_fun do
          function when is_function(function) ->
            function.()

          {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
            apply(m, f, a)
        end

      Enum.map(attributes, &{&1.name, default_value})
    end)
  end

  defp do_for_action(changeset, action_or_name, params, opts) do
    domain =
      changeset.domain || opts[:domain] || Ash.Resource.Info.domain(changeset.resource) ||
        Ash.Actions.Helpers.maybe_embedded_domain(changeset.resource) ||
        raise ArgumentError,
          message:
            "Could not determine domain for changeset. Provide the `domain` option or configure a domain in the resource directly."

    changeset = %{changeset | domain: domain}

    if changeset.valid? do
      action = get_action_entity(changeset.resource, action_or_name)

      {changeset, opts} =
        Ash.Actions.Helpers.set_context_and_get_opts(
          domain,
          %{changeset | action: action},
          opts
        )

      if action do
        name =
          fn ->
            "changeset:" <> Ash.Resource.Info.trace_name(changeset.resource) <> ":#{action.name}"
          end

        try do
          Ash.Tracer.span :changeset,
                          name,
                          opts[:tracer] do
            Ash.Tracer.telemetry_span [:ash, :changeset], fn ->
              %{
                resource_short_name: Ash.Resource.Info.short_name(changeset.resource)
              }
            end do
              metadata = fn ->
                %{
                  resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
                  resource: changeset.resource,
                  actor: opts[:actor],
                  tenant: opts[:tenant],
                  action: action.name,
                  authorize?: opts[:authorize?]
                }
              end

              Ash.Tracer.set_metadata(opts[:tracer], :changeset, metadata)

              changeset =
                Enum.reduce(opts[:private_arguments] || %{}, changeset, fn {k, v}, changeset ->
                  Ash.Changeset.set_argument(changeset, k, v)
                end)

              changeset =
                changeset
                |> prepare_changeset_for_action(action, opts)
                |> handle_params(action, params, opts)
                |> run_action_changes(
                  action,
                  opts[:actor],
                  opts[:authorize?],
                  opts[:tracer],
                  metadata
                )
                |> add_validations(opts[:tracer], metadata, opts[:actor])
                |> mark_validated(action.name)
                |> eager_validate_identities()
                |> Map.put(:__validated_for_action__, action.name)

              if Keyword.get(opts, :require?, true) do
                require_values(changeset, action.type)
              else
                changeset
              end
            end
          end
        rescue
          e ->
            reraise Ash.Error.to_error_class(e,
                      stacktrace: __STACKTRACE__,
                      bread_crumbs: [
                        "building changeset for #{inspect(changeset.resource)}.#{action.name}"
                      ]
                    ),
                    __STACKTRACE__
        end
      else
        raise_no_action(changeset.resource, action_or_name, changeset.action_type)
      end
    else
      action = get_action_entity(changeset.resource, action_or_name)

      {changeset, _opts} =
        Ash.Actions.Helpers.set_context_and_get_opts(
          domain,
          changeset,
          opts
        )

      %{changeset | action: action}
    end
  end

  @doc """
  Checks if an argument is not nil or an attribute is not nil, either in the original data, or that it is not being changed to a `nil` value if it is changing.

  This also accounts for the `accessing_from` context that is set when using `manage_relationship`, so it is aware that a particular value
  *will* be set by `manage_relationship` even if it isn't currently being set.
  """
  def present?(changeset, attribute) do
    arg_or_attribute_value =
      case Ash.Changeset.fetch_argument(changeset, attribute) do
        {:ok, nil} ->
          Ash.Changeset.get_attribute(changeset, attribute)

        :error ->
          Ash.Changeset.get_attribute(changeset, attribute)

        {:ok, value} ->
          {:ok, value}
      end

    arg_or_attribute_value =
      case arg_or_attribute_value do
        %Ash.NotLoaded{} ->
          nil

        %Ash.ForbiddenField{} ->
          nil

        other ->
          other
      end

    not is_nil(arg_or_attribute_value) ||
      belongs_to_attr_of_rel_being_managed?(attribute, changeset, true) ||
      is_belongs_to_rel_being_managed?(attribute, changeset, true)
  end

  @doc """
  Checks if an attribute is not nil, either in the original data, or that it is not being changed to a `nil` value if it is changing.

  This also accounts for the `accessing_from` context that is set when using `manage_relationship`, so it is aware that a particular value
  *will* be set by `manage_relationship` even if it isn't currently being set.
  """
  def attribute_present?(changeset, attribute) do
    attribute_value = Ash.Changeset.get_attribute(changeset, attribute)

    attribute_value =
      case attribute_value do
        %Ash.NotLoaded{} ->
          nil

        %Ash.ForbiddenField{} ->
          nil

        other ->
          other
      end

    not is_nil(attribute_value) ||
      belongs_to_attr_of_rel_being_managed?(attribute, changeset, true) ||
      is_belongs_to_rel_being_managed?(attribute, changeset, true)
  end

  def prepare_changeset_for_action(changeset, action, opts) do
    changeset
    |> Map.put(:action, action)
    |> reset_arguments()
    |> handle_errors(action.error_handler)
    |> set_actor(opts)
    |> set_authorize(opts)
    |> set_tracer(opts)
    |> timeout(changeset.timeout || opts[:timeout])
    |> set_tenant(opts[:tenant] || changeset.tenant || changeset.data.__metadata__[:tenant])
    |> Map.put(:action_type, action.type)
  end

  defp reset_arguments(%{arguments: arguments} = changeset) do
    Enum.reduce(arguments, changeset, fn {key, value}, changeset ->
      set_argument(changeset, key, value)
    end)
  end

  def handle_params(changeset, action, params, handle_params_opts \\ []) do
    if Keyword.get(handle_params_opts, :cast_params?, true) do
      cast_params(changeset, action, params || %{}, handle_params_opts)
    else
      changeset
    end
    |> set_argument_defaults(action)
    |> require_arguments(action)
    |> validate_attributes_accepted(action)
    |> require_values(action.type, false, action.require_attributes)
    |> set_defaults(changeset.action_type, false)
  end

  defp get_action_entity(resource, name) when is_atom(name),
    do: Ash.Resource.Info.action(resource, name)

  defp get_action_entity(_resource, %struct{} = action)
       when struct in [
              Ash.Resource.Actions.Update,
              Ash.Resource.Actions.Create,
              Ash.Resource.Actions.Destroy
            ] do
    action
  end

  defp get_action_entity(_resource, action) do
    raise ArgumentError, "Invalid value provided for action: #{inspect(action)}"
  end

  defp eager_validate_identities(changeset) do
    identities =
      changeset.resource
      |> Ash.Resource.Info.identities()

    case identities do
      [] ->
        changeset

      identities ->
        Enum.reduce(identities, changeset, fn identity, changeset ->
          changeset =
            if identity.eager_check_with do
              validate_identity(changeset, identity, identity.eager_check_with)
            else
              changeset
            end

          if identity.pre_check_with do
            before_action(changeset, &validate_identity(&1, identity, identity.pre_check_with))
          else
            changeset
          end
        end)
    end
  end

  defp validate_identity(
         %{context: %{private: %{upsert?: true, upsert_identity: name}}} = changeset,
         %{name: name},
         _domain
       ) do
    changeset
  end

  defp validate_identity(
         %{action: %{soft?: true}} = changeset,
         identity,
         domain
       ) do
    do_validate_identity(changeset, identity, domain)
  end

  defp validate_identity(
         %{action: %{type: type}} = changeset,
         identity,
         domain
       )
       when type in [:create, :update] do
    do_validate_identity(changeset, identity, domain)
  end

  defp validate_identity(
         %{action: %{type: type}} = changeset,
         identity,
         domain
       )
       when type in [:create, :update] do
    do_validate_identity(changeset, identity, domain)
  end

  defp validate_identity(changeset, _, _), do: changeset

  defp do_validate_identity(changeset, identity, domain) do
    if changeset.context[:private][:upsert_identity] == identity.name do
      changeset
    else
      if changeset.action_type == :create ||
           Enum.any?(identity.keys, &changing_attribute?(changeset, &1)) do
        action = Ash.Resource.Info.primary_action(changeset.resource, :read).name

        if Enum.any?(identity.keys, fn key ->
             Ash.Resource.Info.calculation(changeset.resource, key)
           end) do
          raise ArgumentError, "Cannot pre or eager check an identity based on calculated fields."
        end

        values =
          Enum.map(identity.keys, fn key ->
            case Ash.Changeset.get_attribute(changeset, key) do
              nil ->
                {key, is_nil: true}

              value ->
                {key, value}
            end
          end)

        if identity.nils_distinct? && Enum.any?(values, &(elem(&1, 1) == [is_nil: true])) do
          changeset
        else
          tenant =
            if identity.all_tenants? do
              unless Ash.Resource.Info.multitenancy_global?(changeset.resource) do
                raise ArgumentError,
                  message: """
                  Cannot pre or eager check an identity that has `all_tenants?: true`
                  unless the resource supports global multitenancy.
                  """
              end

              nil
            else
              changeset.tenant
            end

          changeset.resource
          |> Ash.Query.for_read(action, %{},
            tenant: tenant,
            actor: changeset.context[:private][:actor],
            authorize?: changeset.context[:private][:authorize?],
            tracer: changeset.context[:private][:tracer],
            domain: domain
          )
          |> Ash.Query.do_filter(values)
          |> Ash.Query.limit(1)
          |> Ash.Query.set_context(%{private: %{internal?: true}})
          |> Ash.read_one(authorize?: false)
          |> case do
            {:ok, nil} ->
              changeset

            {:ok, _} ->
              error =
                Ash.Error.Changes.InvalidChanges.exception(
                  fields: identity.keys,
                  message: identity.message || "has already been taken"
                )

              add_error(changeset, error)

            {:error, error} ->
              add_error(changeset, error)
          end
        end
      else
        changeset
      end
    end
  end

  defp require_arguments(changeset, action) do
    action.arguments
    |> Enum.filter(&(&1.allow_nil? == false))
    |> Enum.reduce(changeset, fn argument, changeset ->
      case fetch_argument(changeset, argument.name) do
        {:ok, value} when not is_nil(value) ->
          changeset

        _ ->
          if argument.name in changeset.invalid_keys do
            changeset
          else
            add_error(
              changeset,
              Ash.Error.Changes.Required.exception(
                resource: changeset.resource,
                field: argument.name,
                type: :argument
              )
            )
          end
      end
    end)
  end

  defp set_argument_defaults(changeset, action) do
    Enum.reduce(action.arguments, changeset, fn argument, changeset ->
      case fetch_argument(changeset, argument.name) do
        :error ->
          if is_nil(argument.default) do
            changeset
          else
            %{
              changeset
              | arguments: Map.put(changeset.arguments, argument.name, default(:create, argument))
            }
          end

        _ ->
          changeset
      end
    end)
  end

  defp set_actor(changeset, opts) do
    if Keyword.has_key?(opts, :actor) do
      put_context(changeset, :private, %{actor: opts[:actor]})
    else
      changeset
    end
  end

  defp set_authorize(changeset, opts) do
    if Keyword.has_key?(opts, :authorize?) do
      put_context(changeset, :private, %{authorize?: opts[:authorize?]})
    else
      changeset
    end
  end

  defp set_tracer(changeset, opts) do
    if Keyword.has_key?(opts, :tracer) do
      put_context(changeset, :private, %{tracer: opts[:tracer]})
    else
      changeset
    end
  end

  defp raise_no_action(resource, action, type) do
    available_actions =
      resource
      |> Ash.Resource.Info.actions()
      |> Enum.filter(&(&1.type == type))
      |> Enum.map_join("\n", &"    - `#{inspect(&1.name)}`")

    raise ArgumentError,
      message: """
      No such #{type} action on resource #{inspect(resource)}: #{String.slice(inspect(action), 0..50)}

      Example Call:

        Ash.Changeset.for_#{type}(changeset_or_record, :action_name, input, options)

      Available #{type} actions:

      #{available_actions}
      """
  end

  defp mark_validated(changeset, action_name) do
    %{changeset | __validated_for_action__: