lib/ash/changeset/changeset.ex

defmodule Ash.Changeset do
  @moduledoc """
  Changesets are used to create and update data in Ash.

  Create a changeset with `new/1` or `new/2`, and alter the attributes
  and relationships using the functions provided in this module.  Nothing in this module
  actually incurs changes in a data layer. To commit a changeset, see `Ash.create/2`
  and `Ash.update/2`.

  # Changeset lifecycle

  ## Action Lifecycle

  The following example illustrates the hook lifecycle of a changeset.

  ```elixir
  defmodule AshChangesetLifeCycleExample do
    def change(changeset, _, _) do
      changeset
      # execute code both before and after the transaction
      |> Ash.Changeset.around_transaction(fn changeset, callback ->
        callback.(changeset)
      end)
      # execute code before the transaction is started. Use for things like external calls
      |> Ash.Changeset.before_transaction(fn changeset -> changeset end)
      # execute code in the transaction, before and after the data layer is called
      |> Ash.Changeset.around_action(fn changeset, callback ->
        callback.(changeset)
      end)
      # execute code in the transaction, before the data layer is called
      |> Ash.Changeset.before_action(fn changeset -> changeset end)
      # execute code in the transaction, after the data layer is called, only if the action is successful
      |> Ash.Changeset.after_action(fn changeset, result -> {:ok, result} end)
      # execute code after the transaction, both in success and error cases
      |> Ash.Changeset.after_transaction(fn changeset, success_or_error_result -> success_or_error_result end
    end
  end
  ```
  """

  defstruct [
    :__validated_for_action__,
    :action_type,
    :action,
    :domain,
    :data,
    :handle_errors,
    :resource,
    :tenant,
    :to_tenant,
    :timeout,
    dirty_hooks: [],
    invalid_keys: MapSet.new(),
    filter: nil,
    added_filter: nil,
    action_failed?: false,
    atomics: [],
    atomic_validations: [],
    after_action: [],
    after_transaction: [],
    arguments: %{},
    around_action: [],
    around_transaction: [],
    attributes: %{},
    before_action: [],
    before_transaction: [],
    no_atomic_constraints: [],
    context: %{},
    context_changes: %{},
    defaults: [],
    errors: [],
    params: %{},
    action_select: [],
    atomic_after_action: [],
    attribute_changes: %{},
    atomic_changes: [],
    casted_attributes: %{},
    casted_arguments: %{},
    phase: :pending,
    relationships: %{},
    select: nil,
    load: [],
    valid?: true
  ]

  defimpl Inspect do
    import Inspect.Algebra

    @spec inspect(Ash.Changeset.t(), Inspect.Opts.t()) ::
            {:doc_cons, :doc_line | :doc_nil | binary | tuple,
             :doc_line | :doc_nil | binary | tuple}
            | {:doc_group,
               :doc_line
               | :doc_nil
               | binary
               | {:doc_collapse, pos_integer}
               | {:doc_force, any}
               | {:doc_break | :doc_color | :doc_cons | :doc_fits | :doc_group | :doc_string, any,
                  any}
               | {:doc_nest, any, :cursor | :reset | non_neg_integer, :always | :break},
               :inherit | :self}
    def inspect(changeset, opts) do
      context = Map.delete(changeset.context, :private)

      context =
        if context == %{} do
          empty()
        else
          concat("context: ", to_doc(context, opts))
        end

      tenant =
        if changeset.tenant do
          concat(
            "tenant: ",
            to_doc(changeset.to_tenant, opts)
          )
        else
          empty()
        end

      domain =
        if changeset.domain do
          concat("domain: ", to_doc(changeset.domain, opts))
        else
          empty()
        end

      select =
        if changeset.select do
          concat("select: ", to_doc(changeset.select, opts))
        else
          empty()
        end

      load =
        if changeset.load && changeset.load != [] do
          concat("load: ", to_doc(changeset.load, opts))
        else
          empty()
        end

      atomics =
        if Enum.empty?(changeset.atomics) do
          empty()
        else
          concat("atomics: ", to_doc(changeset.atomics, opts))
        end

      filter =
        case changeset.filter do
          nil ->
            empty()

          %Ash.Filter{expression: nil} ->
            empty()

          _ ->
            concat("filter: ", to_doc(changeset.filter, opts))
        end

      container_doc(
        "#Ash.Changeset<",
        [
          domain,
          concat("action_type: ", inspect(changeset.action_type)),
          concat("action: ", inspect(changeset.action && changeset.action.name)),
          tenant,
          concat("attributes: ", to_doc(changeset.attributes, opts)),
          atomics,
          concat("relationships: ", to_doc(changeset.relationships, opts)),
          arguments(changeset, opts),
          concat("errors: ", to_doc(changeset.errors, opts)),
          filter,
          concat("data: ", to_doc(changeset.data, opts)),
          context,
          concat("valid?: ", to_doc(changeset.valid?, opts)),
          select,
          load
        ],
        ">",
        opts,
        fn str, _ -> str end
      )
    end

    defp arguments(changeset, opts) do
      if changeset.action do
        if Enum.empty?(changeset.action.arguments) do
          empty()
        else
          arg_string =
            changeset.action.arguments
            |> Enum.filter(fn argument ->
              match?({:ok, _}, Ash.Changeset.fetch_argument(changeset, argument.name))
            end)
            |> Map.new(fn argument ->
              value = Ash.Changeset.get_argument(changeset, argument.name)

              if argument.sensitive? do
                {argument.name, Ash.Helpers.redact(value)}
              else
                {argument.name, value}
              end
            end)
            |> to_doc(opts)

          concat(["arguments: ", arg_string])
        end
      else
        empty()
      end
    end
  end

  @type after_action_fun ::
          (t, Ash.Resource.record() ->
             {:ok, Ash.Resource.record()}
             | {:ok, Ash.Resource.record(), [Ash.Notifier.Notification.t()]}
             | {:error, any})

  @type after_transaction_fun ::
          (t, {:ok, Ash.Resource.record()} | {:error, any} ->
             {:ok, Ash.Resource.record()} | {:error, any})

  @type before_action_fun :: (t -> t | {t, %{notifications: [Ash.Notifier.Notification.t()]}})

  @type before_transaction_fun :: (t -> t)

  @type around_action_result ::
          {:ok, Ash.Resource.record(), t(), %{notifications: list(Ash.Notifier.Notification.t())}}
          | {:error, Ash.Error.t()}
  @type around_action_callback :: (t -> around_action_result)
  @type around_action_fun :: (t, around_action_callback -> around_action_result)

  @type around_transaction_result :: {:ok, Ash.Resource.record()} | {:error, any}
  @type around_transaction_callback :: (t -> around_transaction_result)
  @type around_transaction_fun :: (t, around_transaction_callback -> around_transaction_result)

  @phases [
    :atomic,
    :pending,
    :validate,
    :before_action,
    :after_action,
    :before_transaction,
    :after_transaction,
    :around_action,
    :around_transaction
  ]

  @type phase :: unquote(Enum.reduce(@phases, &{:|, [], [&1, &2]}))

  @type t :: %__MODULE__{
          __validated_for_action__: atom | nil,
          action: Ash.Resource.Actions.action() | nil,
          action_failed?: boolean,
          action_type: Ash.Resource.Actions.action_type() | nil,
          after_action: [after_action_fun | {after_action_fun, map}],
          after_transaction: [after_transaction_fun | {after_transaction_fun, map}],
          atomics: Keyword.t(),
          domain: module | nil,
          arguments: %{optional(atom) => any},
          around_action: [around_action_fun | {around_action_fun, map}],
          around_transaction: [around_transaction_fun | {around_transaction_fun, map}],
          attributes: %{optional(atom) => any},
          before_action: [before_action_fun | {before_action_fun, map}],
          before_transaction: [before_transaction_fun | {before_transaction_fun, map}],
          context: map,
          filter: Ash.Filter.t() | nil,
          added_filter: Ash.Filter.t() | nil,
          data: Ash.Resource.record() | nil,
          defaults: [atom],
          errors: [Ash.Error.t()],
          handle_errors:
            nil | (t, error :: any -> :ignore | t | (error :: any) | {error :: any, t}),
          invalid_keys: MapSet.t(),
          params: %{optional(atom | binary) => any},
          phase: phase(),
          relationships: %{
            optional(atom) =>
              %{optional(atom | binary) => any} | [%{optional(atom | binary) => any}]
          },
          resource: module,
          select: [atom] | nil,
          load: keyword(keyword),
          tenant: term(),
          timeout: pos_integer() | nil,
          valid?: boolean
        }

  @doc deprecated: "Use `Ash.Error.error_input()` instead"
  @type error_info :: Ash.Error.error_input()

  alias Ash.Error.{
    Changes.InvalidArgument,
    Changes.InvalidAttribute,
    Changes.InvalidChanges,
    Changes.InvalidRelationship,
    Changes.NoSuchAttribute,
    Changes.NoSuchRelationship,
    Changes.Required,
    Invalid.NoSuchInput,
    Invalid.NoSuchResource
  }

  require Ash.Tracer
  import Ash.Expr
  require Logger

  defmodule OriginalDataNotAvailable do
    @moduledoc "A value placed in changeset.data to indicate that the original data is not available"
    defstruct reason: :atomic_query_update
    @type t :: %__MODULE__{reason: :atomic_query_update}
  end

  defmacrop maybe_already_validated_error!(changeset, alternative \\ nil) do
    {function, arity} = __CALLER__.function

    if alternative do
      quote do
        changeset = unquote(changeset)

        {:current_stacktrace, stacktrace} =
          Process.info(self(), :current_stacktrace)

        if changeset.__validated_for_action__ do
          require Logger

          Logger.warning("""
          Changeset has already been validated for action #{inspect(changeset.__validated_for_action__)}.

          For safety, we prevent any changes after that point because they will bypass validations or other action logic.. To proceed anyway,
          you can use `#{unquote(alternative)}/#{unquote(arity)}`. However, you should prefer a pattern like the below, which makes
          any custom changes *before* calling the action.

            Resource
            |> Ash.Changeset.new()
            |> Ash.Changeset.#{unquote(function)}(...)
            |> Ash.Changeset.for_create(...)

          #{Exception.format_stacktrace(stacktrace)}
          """)
        end
      end
    else
      quote do
        changeset = unquote(changeset)

        {:current_stacktrace, stacktrace} =
          Process.info(self(), :current_stacktrace)

        if changeset.__validated_for_action__ do
          require Logger

          Logger.warning("""
          Changeset has already been validated for action #{inspect(changeset.__validated_for_action__)}.

          For safety, we prevent any changes using `#{unquote(function)}/#{unquote(arity)}` after that point because they will bypass validations or other action logic.
          Instead, you should change or set this value before calling the action, like so:

            Resource
            |> Ash.Changeset.new()
            |> Ash.Changeset.#{unquote(function)}(...)
            |> Ash.Changeset.for_create(...)

          #{Exception.format_stacktrace(stacktrace)}
          """)
        end
      end
    end
  end

  @doc """
  A guard which checks if the Changeset is valid.
  """
  @spec is_valid(t) :: Macro.output()
  defguard is_valid(changeset) when is_struct(changeset, __MODULE__) and changeset.valid? == true

  @doc """
  Returns a new changeset over a resource.

  *Warning*: You almost always want to use `for_action` or `for_create`, etc. over this function if possible.

  You can use this to start a changeset and make changes prior to calling `for_action`. This is not typically
  necessary, but can be useful as an escape hatch. For example:

  ```elixir
  Resource
  |> Ash.Changeset.new()
  |> Ash.Changeset.change_attribute(:name, "foobar")
  |> Ash.Changeset.for_action(...)
  ```
  """
  @spec new(Ash.Resource.t() | Ash.Resource.record()) :: t

  def new(record_or_resource) do
    {resource, record, action_type} =
      case record_or_resource do
        %resource{} = record -> {resource, record, :update}
        resource -> {resource, struct(resource), :create}
      end

    tenant =
      record
      |> Map.get(:__metadata__, %{})
      |> Map.get(:tenant, nil)

    context = Ash.Resource.Info.default_context(resource) || %{}

    if Ash.Resource.Info.resource?(resource) do
      %__MODULE__{resource: resource, data: record, action_type: action_type}
      |> set_context(context)
      |> set_tenant(tenant)
    else
      %__MODULE__{
        resource: resource,
        action_type: action_type,
        data: struct(resource)
      }
      |> add_error(NoSuchResource.exception(resource: resource))
      |> set_tenant(tenant)
      |> set_context(context)
    end
  end

  @doc """
  Ensure that only the specified attributes are present in the results.

  The first call to `select/2` will replace the default behavior of selecting
  all attributes. Subsequent calls to `select/2` will combine the provided
  fields unless the `replace?` option is provided with a value of `true`.

  If a field has been deselected, selecting it again will override that (because a single list of fields is tracked for selection)

  Primary key attributes always selected and cannot be deselected.

  When attempting to load a relationship (or manage it with `Ash.Changeset.manage_relationship/3`),
  if the source field is not selected on the query/provided data an error will be produced. If loading
  a relationship with a query, an error is produced if the query does not select the destination field
  of the relationship.

  Datalayers currently are not notified of the `select` for a changeset(unlike queries), and creates/updates select all fields when they are performed.
  A select provided on a changeset sets the unselected fields to `nil` before returning the result.

  Use `ensure_selected/2` if you wish to make sure a field has been selected, without deselecting any other fields.
  """
  def select(changeset, fields, opts \\ []) do
    if opts[:replace?] do
      case fields do
        %MapSet{} = fields -> %{changeset | select: Enum.to_list(fields)}
        fields -> %{changeset | select: Enum.uniq(List.wrap(fields))}
      end
    else
      case fields do
        %MapSet{} ->
          %{
            changeset
            | select: MapSet.union(MapSet.new(changeset.select || []), fields) |> MapSet.to_list()
          }

        fields ->
          %{changeset | select: Enum.uniq(List.wrap(fields) ++ (changeset.select || []))}
      end
    end
  end

  @doc false
  def set_action_select(%{action: nil} = changeset) do
    %{
      changeset
      | action_select:
          MapSet.to_list(
            Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)
          )
    }
  end

  def set_action_select(changeset) do
    if Ash.DataLayer.data_layer_can?(changeset.resource, :action_select) do
      required =
        Ash.Resource.Info.action_select(changeset.resource, changeset.action.name) || []

      select =
        changeset.select ||
          MapSet.to_list(
            Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)
          )

      %{
        changeset
        | action_select: Enum.uniq(List.wrap(required) |> Enum.concat(select))
      }
    else
      %{
        changeset
        | action_select:
            MapSet.to_list(
              Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)
            )
      }
    end
  end

  @doc """
  Calls the provided load statement on the result of the action at the very end of the action.
  """
  @spec load(t(), term()) :: t()
  def load(changeset, load) do
    query =
      changeset.resource
      |> Ash.Query.new()
      |> Map.put(:errors, [])
      |> Ash.Query.load(changeset.load)
      |> Ash.Query.load(load)

    changeset = %{
      changeset
      | load: Enum.concat(changeset.load || [], List.wrap(load))
    }

    Enum.reduce(query.errors, changeset, &add_error(&2, &1))
  end

  @doc """
  Ensures that the given attributes are selected.

  The first call to `select/2` will *limit* the fields to only the provided fields.
  Use `ensure_selected/2` to say "select this field (or these fields) without deselecting anything else".

  See `select/2` for more.
  """
  def ensure_selected(changeset, fields) do
    if changeset.select do
      Ash.Changeset.select(changeset, List.wrap(fields))
    else
      to_select = Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource)

      Ash.Changeset.select(changeset, to_select)
    end
  end

  @doc """
  Ensure the the specified attributes are `nil` in the changeset results.
  """
  def deselect(changeset, fields) do
    select =
      if changeset.select do
        changeset.select -- List.wrap(fields)
      else
        MapSet.difference(
          Ash.Resource.Info.selected_by_default_attribute_names(changeset.resource),
          MapSet.new(List.wrap(fields))
        )
      end

    select(changeset, select, replace?: true)
  end

  def selecting?(changeset, field) do
    case changeset.select do
      nil ->
        not is_nil(Ash.Resource.Info.attribute(changeset.resource, field))

      select ->
        if field in select do
          true
        else
          attribute = Ash.Resource.Info.attribute(changeset.resource, field)

          attribute && attribute.primary_key?
        end
    end || loading?(changeset, field)
  end

  @doc """
  Returns true if the field/relationship or path to field/relationship is being loaded.

  It accepts an atom or a list of atoms, which is treated for as a "path", i.e:

      Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?([:friends, :enemies, :score])
      iex> true

      Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?([:friends, :score])
      iex> false

      Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?(:friends)
      iex> true
  """
  def loading?(changeset, path) do
    changeset.resource
    |> Ash.Query.new()
    |> Ash.Query.load(changeset.load)
    |> Ash.Query.loading?(path)
  end

  @doc """
  Returns a list of attributes, aggregates, relationships, and calculations that are being loaded

  Provide a list of field types to narrow down the returned results.
  """
  def accessing(
        changeset,
        types \\ [:attributes, :relationships, :calculations, :attributes],
        only_public? \\ true
      ) do
    changeset.resource
    |> Ash.Query.new()
    |> Ash.Query.load(changeset.load)
    |> Map.put(:select, changeset.select)
    |> Ash.Query.accessing(types, only_public?)
  end

  @spec fully_atomic_changeset(
          resource :: Ash.Resource.t(),
          action :: atom() | Ash.Resource.Actions.action(),
          params :: map(),
          opts :: Keyword.t()
        ) :: Ash.Changeset.t() | {:not_atomic, String.t()}
  def fully_atomic_changeset(resource, action, params, opts \\ []) do
    action =
      case action do
        action when is_atom(action) -> Ash.Resource.Info.action(resource, action)
        action -> action
      end

    if action.manual do
      {:not_atomic,
       "manual action `#{inspect(resource)}.#{action.name}` cannot be performed atomically"}
    else
      changeset =
        resource
        |> Ash.Changeset.new()
        |> then(fn changeset ->
          if data = opts[:data] do
            Map.put(changeset, :data, data)
          else
            Map.put(changeset, :data, %OriginalDataNotAvailable{})
          end
        end)
        |> Map.put(:context, opts[:context] || %{})
        |> Map.put(:params, params)
        |> Map.put(:action, action)
        |> Map.put(:no_atomic_constraints, opts[:no_atomic_constraints] || [])
        |> Map.put(:action_type, action.type)
        |> Map.put(:atomics, opts[:atomics] || [])
        |> Ash.Changeset.set_tenant(opts[:tenant])

      {changeset, _opts} =
        Ash.Actions.Helpers.set_context_and_get_opts(
          opts[:domain] || Ash.Resource.Info.domain(resource),
          changeset,
          opts
        )

      changeset = set_phase(changeset, :atomic)

      with :ok <- verify_notifiers_support_atomic(resource, action),
           %Ash.Changeset{} = changeset <-
             atomic_params(changeset, action, params, opts),
           %Ash.Changeset{} = changeset <- set_argument_defaults(changeset, action),
           %Ash.Changeset{} = changeset <- require_arguments(changeset, action),
           %Ash.Changeset{} = changeset <- atomic_changes(changeset, action),
           %Ash.Changeset{} = changeset <- atomic_update(changeset, opts[:atomic_update] || []),
           %Ash.Changeset{} = changeset <-
             Ash.Changeset.set_context(changeset, %{
               changed?:
                 not (Enum.empty?(changeset.atomics) and Enum.empty?(changeset.attributes))
             }),
           %Ash.Changeset{} = changeset <- atomic_defaults(changeset),
           %Ash.Changeset{} = changeset <-
             hydrate_atomic_refs(
               changeset,
               opts[:actor],
               opts
               |> Keyword.take([:eager?])
               |> Keyword.put(:error_is_not_atomic?, true)
             ),
           %Ash.Changeset{} = changeset <-
             apply_atomic_constraints(changeset, opts[:actor]) do
        changeset
      else
        {:not_atomic, reason} ->
          {:not_atomic, reason}
      end
    end
    |> case do
      {:not_atomic, reason} ->
        {:not_atomic, reason}

      changeset ->
        clear_phase(changeset)
    end
  end

  def atomic_defaults(changeset) do
    if changeset.context.changed? do
      with %__MODULE__{} <- atomic_static_update_defaults(changeset) do
        atomic_lazy_update_defaults(changeset)
      end
    else
      changeset
    end
  end

  defp atomic_static_update_defaults(changeset) do
    initial_changeset = changeset

    changeset.resource
    |> Ash.Resource.Info.static_default_attributes(:update)
    |> Enum.reject(fn attribute ->
      Ash.Changeset.changing_attribute?(changeset, attribute.name)
    end)
    |> Enum.reduce_while(changeset, fn attribute, changeset ->
      case Ash.Type.cast_atomic(
             attribute.type,
             attribute.update_default,
             attribute.constraints
           ) do
        {:atomic, atomic} ->
          {:cont,
           atomic_update(
             changeset,
             attribute.name,
             {:atomic, atomic_default_condition(initial_changeset, attribute.name, atomic)}
           )}

        {:ok, value} ->
          {:cont,
           atomic_update(
             changeset,
             attribute.name,
             {:atomic, atomic_default_condition(initial_changeset, attribute.name, value)}
           )}

        {:error, error} ->
          {:cont,
           add_invalid_errors(attribute.update_default, :attribute, changeset, attribute, error)}

        {:not_atomic, reason} ->
          {:halt, {:not_atomic, reason}}
      end
    end)
  end

  defp atomic_lazy_update_defaults(changeset) do
    initial_changeset = changeset

    changeset.resource
    |> Ash.Resource.Info.lazy_matching_default_attributes(:update)
    |> Enum.concat(
      Ash.Resource.Info.lazy_non_matching_default_attributes(changeset.resource, :update)
    )
    |> Enum.reject(fn attribute ->
      Ash.Changeset.changing_attribute?(changeset, attribute.name)
    end)
    |> Enum.reduce_while(changeset, fn attribute, changeset ->
      cond do
        attribute.update_default == (&DateTime.utc_now/0) ->
          {:cont,
           atomic_update(
             changeset,
             attribute.name,
             {:atomic,
              atomic_default_condition(initial_changeset, attribute.name, Ash.Expr.expr(now()))}
           )}

        attribute.update_default == (&Ash.UUID.generate/0) ->
          {:cont,
           atomic_update(
             changeset,
             attribute.name,
             {:atomic,
              atomic_default_condition(
                initial_changeset,
                attribute.name,
                Ash.Expr.expr(^Ash.UUID.generate())
              )}
           )}

        true ->
          {:cont,
           atomic_update(
             changeset,
             attribute.name,
             {:atomic,
              atomic_default_condition(
                initial_changeset,
                attribute.name,
                attribute.update_default.()
              )}
           )}
      end
    end)
  end

  defp atomic_default_condition(changeset, key, value) do
    Enum.reduce(changeset.atomics ++ Map.to_list(changeset.attributes), nil, fn
      {key, atomic}, expr ->
        atomic = strip_errors(atomic)

        if is_nil(expr) do
          Ash.Expr.expr(^atomic != ^ref(key))
        else
          Ash.Expr.expr(^expr or ^atomic != ^ref(key))
        end
    end)
    |> then(fn expr ->
      Ash.Expr.expr(
        if ^expr do
          ^value
        else
          ^ref(key)
        end
      )
    end)
  end

  defp strip_errors(atomic) do
    Ash.Filter.map(atomic, fn
      %Ash.Query.Function.If{
        arguments: [_condition, left, %Ash.Query.Function.Error{}]
      } ->
        left

      other ->
        other
    end)
  end

  defp verify_notifiers_support_atomic(resource, action) do
    resource
    |> Ash.Resource.Info.notifiers()
    |> Enum.filter(fn notifier ->
      notifier.requires_original_data?(resource, action)
    end)
    |> case do
      [] ->
        :ok

      notifiers ->
        {:not_atomic,
         "notifiers #{inspect(notifiers)} require original data for #{inspect(resource)}.#{action.name}"}
    end
  end

  defp atomic_changes(changeset, action) do
    changes =
      action.changes
      |> Enum.concat(Ash.Resource.Info.changes(changeset.resource, changeset.action_type))
      |> then(fn changes ->
        if changeset.action.skip_global_validations? do
          changes
        else
          Enum.concat(
            changes,
            Ash.Resource.Info.validations(changeset.resource, changeset.action_type)
          )
        end
      end)

    context = %{
      actor: changeset.context[:private][:actor],
      tenant: changeset.tenant,
      authorize?: changeset.context[:private][:authorize?] || false,
      tracer: changeset.context[:private][:tracer]
    }

    changeset = set_phase(changeset, :atomic)

    Enum.reduce_while(changes, changeset, fn
      %{change: _} = change, changeset ->
        case run_atomic_change(changeset, change, context) do
          {:not_atomic, reason} ->
            {:halt, {:not_atomic, reason}}

          changeset ->
            {:cont, changeset}
        end

      %{validation: _} = validation, changeset ->
        case run_atomic_validation(changeset, validation, context) do
          {:not_atomic, reason} ->
            {:halt, {:not_atomic, reason}}

          changeset ->
            {:cont, changeset}
        end
    end)
    |> case do
      {:not_atomic, reason} -> {:not_atomic, reason}
      %__MODULE__{} = changeset -> clear_phase(changeset)
    end
  end

  @doc false
  def split_atomic_conditions(%{where: []} = validation, _changeset, _actor, _context) do
    {:ok, validation}
  end

  def split_atomic_conditions(
        %{where: [{module, opts} | rest]} = validation,
        changeset,
        actor,
        context
      ) do
    if module.has_validate?() do
      opts =
        Ash.Actions.Helpers.templated_opts(
          opts,
          actor,
          changeset.tenant,
          changeset.arguments,
          changeset.context,
          changeset
        )

      {:ok, opts} = module.init(opts)

      case module.validate(
             changeset,
             opts,
             context
           ) do
        :ok -> split_atomic_conditions(%{validation | where: rest}, changeset, actor, context)
        _ -> :skip
      end
    else
      if module.atomic?() do
        case split_atomic_conditions(%{validation | where: rest}, changeset, actor, context) do
          {:ok, %{where: remaining} = validation} ->
            {:ok, %{validation | where: [{module, opts} | remaining]}}

          other ->
            other
        end
      else
        raise "Module #{module} must define one of `atomic/3` or `validate/3`"
      end
    end
  end

  @doc false
  def run_atomic_validation(changeset, %{where: where} = validation, context) do
    if Ash.DataLayer.data_layer_can?(changeset.resource, :expr_error) do
      with {:atomic, condition} <- atomic_condition(where, changeset, context) do
        case condition do
          false ->
            changeset

          true ->
            do_run_atomic_validation(changeset, validation, context)

          where_condition ->
            do_run_atomic_validation(changeset, validation, context, where_condition)
        end
      end
    else
      {:not_atomic,
       "data layer `#{Ash.DataLayer.data_layer(changeset.resource)}` does not support the expr_error"}
    end
  end

  defp do_run_atomic_validation(
         changeset,
         %{validation: {module, validation_opts}, message: message},
         context,
         where_condition \\ nil
       ) do
    case List.wrap(
           module.atomic(
             changeset,
             validation_opts,
             struct(Ash.Resource.Validation.Context, Map.put(context, :message, message))
           )
         ) do
      [{:atomic, _, _, _} | _] = atomics ->
        Enum.reduce(atomics, changeset, fn
          {:atomic, _fields, condition_expr, error_expr}, changeset ->
            condition_expr =
              if where_condition do
                expr(^where_condition and ^condition_expr)
              else
                condition_expr
              end

            condition_expr = rewrite_atomics(changeset, condition_expr)

            validate_atomically(changeset, condition_expr, error_expr)
        end)

      [:ok] ->
        changeset

      [{:error, error}] ->
        if message do
          error = override_validation_message(error, message)
          Ash.Changeset.add_error(changeset, error)
        else
          Ash.Changeset.add_error(changeset, error)
        end

      [{:not_atomic, error}] ->
        {:not_atomic, error}
    end
  end

  defp rewrite_atomics(changeset, expr) do
    Ash.Expr.walk_template(expr, fn
      {:_atomic_ref, ref} ->
        atomic_ref(changeset, ref)

      other ->
        other
    end)
  end

  @doc false
  def run_atomic_change(
        changeset,
        %{change: {module, change_opts}, where: where},
        context
      ) do
    change_opts =
      Ash.Expr.fill_template(
        change_opts,
        actor: changeset.context.private[:actor],
        tenant: changeset.to_tenant,
        args: changeset.arguments,
        context: changeset.context,
        changeset: changeset
      )

    with {:ok, change_opts} <- module.init(change_opts),
         {:atomic, condition} <-
           atomic_condition(where, changeset, context),
         {{:atomic, modified_changeset?, new_changeset, atomic_changes, validations}, condition} <-
           {atomic_with_changeset(
              module.atomic(changeset, change_opts, struct(Ash.Resource.Change.Context, context)),
              changeset
            ), condition} do
      case condition do
        true ->
          {:atomic, apply_atomic_update(new_changeset, atomic_changes)}

        false ->
          {:atomic, changeset}

        condition ->
          if modified_changeset? do
            {:not_atomic,
             "change `#{inspect(module)}` modified the changeset, but had a condition that could not be checked without running the action"}
          else
            atomic_changes =
              Map.new(atomic_changes, fn
                {key, {:atomic, value}} ->
                  new_value =
                    expr(
                      if ^condition do
                        ^value
                      else
                        ^atomic_ref(key)
                      end
                    )

                  {key, {:atomic, new_value}}

                {key, value} ->
                  new_value =
                    expr(
                      if ^condition do
                        ^value
                      else
                        ^atomic_ref(key)
                      end
                    )

                  {key, new_value}
              end)

            {:atomic, apply_atomic_update(new_changeset, atomic_changes)}
          end
      end
      |> case do
        {:not_atomic, reason} ->
          {:not_atomic, reason}

        {:atomic, changeset} ->
          Enum.reduce(
            List.wrap(validations),
            changeset,
            fn {:atomic, _, condition_expr, error_expr}, changeset ->
              validate_atomically(
                changeset,
                Ash.Expr.expr(^condition and ^condition_expr),
                error_expr
              )
            end
          )
      end
    else
      {{:ok, new_changeset}, _condition} ->
        new_changeset

      {{:error, error}, _condition} ->
        {:error, error}

      {:error, error} ->
        {:error, error}

      {{:not_atomic, reason}, _} ->
        {:not_atomic, reason}

      {:not_atomic, reason} ->
        {:not_atomic, reason}

      {:ok, _} ->
        changeset
    end
  end

  defp apply_atomic_update(changeset, atomics) when is_list(atomics) or is_map(atomics) do
    Enum.reduce(atomics, changeset, fn {key, value}, changeset ->
      apply_atomic_update(changeset, key, value)
    end)
  end

  defp apply_atomic_update(changeset, key, {:atomic, value}) do
    value =
      Ash.Expr.walk_template(value, fn
        {:_atomic_ref, field} ->
          atomic_ref(changeset, field)

        other ->
          other
      end)

    %{
      changeset
      | atomics: Keyword.put(changeset.atomics, key, value),
        no_atomic_constraints: [key | changeset.no_atomic_constraints]
    }
    |> record_atomic_update_for_atomic_upgrade(key, {:atomic, value})
  end

  defp apply_atomic_update(changeset, key, value) do
    attribute = Ash.Resource.Info.attribute(changeset.resource, key)

    value =
      Ash.Expr.walk_template(value, fn
        {:_atomic_ref, field} ->
          atomic_ref(changeset, field)

        other ->
          other
      end)

    case Ash.Type.cast_atomic(attribute.type, value, attribute.constraints) do
      {:atomic, value} ->
        value =
          if attribute.primary_key? do
            value
          else
            set_error_field(value, attribute.name)
          end

        %{changeset | atomics: Keyword.put(changeset.atomics, attribute.name, value)}
        |> record_atomic_update_for_atomic_upgrade(attribute.name, value)

      {:not_atomic, message} ->
        {:not_atomic,
         "Cannot atomically update #{inspect(changeset.resource)}.#{attribute.name}: #{message}"}

      {:ok, value} ->
        allow_nil? =
          attribute.allow_nil? and attribute.name not in changeset.action.require_attributes

        if is_nil(value) and !allow_nil? do
          add_required_attribute_error(
            %{changeset | atomics: Keyword.delete(changeset.atomics, attribute.name)},
            attribute
          )
        else
          %{
            changeset
            | attributes: Map.put(changeset.attributes, attribute.name, value),
              atomics: Keyword.delete(changeset.atomics, attribute.name)
          }
          |> store_casted_attribute(attribute.name, value, true)
        end

      {:error, error} ->
        {:cont, add_invalid_errors(value, :attribute, changeset, attribute, error)}
    end
  end

  defp atomic_with_changeset({:atomic, changeset, atomics, validations}, _changeset),
    do: {:atomic, true, changeset, atomics, validations}

  defp atomic_with_changeset({:atomic, %Ash.Changeset{} = changeset, atomics}, _changeset),
    do: {:atomic, true, changeset, atomics, []}

  defp atomic_with_changeset({:atomic, atomics, validations}, changeset),
    do: {:atomic, false, changeset, atomics, validations}

  defp atomic_with_changeset({:atomic, atomics}, changeset),
    do: {:atomic, false, changeset, atomics, []}

  defp atomic_with_changeset({:ok, changeset}, _) do
    {:atomic, true, changeset, %{}, []}
  end

  defp atomic_with_changeset(other, _), do: other

  defp validate_atomically(changeset, condition_expr, error_expr) do
    %{
      changeset
      | atomic_validations: [{condition_expr, error_expr} | changeset.atomic_validations]
    }
  end

  @doc """
  Gets a reference to a field, or the current atomic update expression of that field.
  """
  def atomic_ref(changeset, field) do
    case Keyword.fetch(changeset.atomics, field) do
      {:ok, atomic} ->
        attribute = Ash.Resource.Info.attribute(changeset.resource, field)
        Ash.Expr.expr(type(^atomic, ^attribute.type, ^attribute.constraints))

      :error ->
        case Map.fetch(changeset.attributes, field) do
          {:ok, new_value} ->
            attribute = Ash.Resource.Info.attribute(changeset.resource, field)
            Ash.Expr.expr(type(^new_value, ^attribute.type, ^attribute.constraints))

          :error ->
            expr(^ref(field))
        end
    end
  end

  @doc false
  # Returns either an appropriate expression for an atomic condition or a value
  # indicated that the condition cannot be handled atomically.
  #
  # Validation logic matches on failure. So, for example, `present(:field)` is
  # going to _match_ when `:field` is `nil`. However, when applying this logic
  # to a `where` condition, the opposite is desired. The end result is kinda
  # ugly because it can end up reading like "not is not equal to" but
  # ultimately produces the correct results.
  @spec atomic_condition([{module(), keyword()}], Ash.Changeset.t(), map()) ::
          {:atomic, Ash.Expr.t() | boolean()} | {:not_atomic, String.t()}
  def atomic_condition(where, changeset, context) do
    Enum.reduce_while(where, {:atomic, true}, fn {module, validation_opts},
                                                 {:atomic, condition_expr} ->
      case module.atomic(
             changeset,
             validation_opts,
             struct(Ash.Resource.Validation.Context, context)
           ) do
        :ok ->
          {:cont, {:atomic, condition_expr}}

        {:atomic, _, expr, _as_error} ->
          {:cont, {:atomic, atomic_condition_expr(condition_expr, expr)}}

        {:error, _} ->
          # Error from the validator, so the validations should just fail with
          # a `false` expression.
          {:halt, {:atomic, false}}

        {:not_atomic, _reason} = not_atomic ->
          {:halt, not_atomic}

        atomic_conditions when is_list(atomic_conditions) ->
          atomic_conditions
          |> Enum.reduce(condition_expr, fn {:atomic, _, expr, _as_error}, reduced_expr ->
            atomic_condition_expr(reduced_expr, expr)
          end)
          |> then(&{:cont, {:atomic, &1}})
      end
    end)
    |> case do
      {:atomic, expr} ->
        case Ash.Expr.eval(expr,
               resource: changeset.resource,
               unknown_on_unknown_refs?: true
             ) do
          {:ok, value} ->
            {:atomic, value}

          :unknown ->
            {:atomic, expr}

          {:error, _} ->
            {:atomic, false}
        end
    end
  end

  # This is not expressly necessary as `expr(true and not ^new_expr)` would also
  # work just fine, but the final output from omitting `true` is much easier to
  # read if debugging.
  defp atomic_condition_expr(condition_expr, {:atomic, expr}) do
    {:atomic, atomic_condition_expr(condition_expr, expr)}
  end

  defp atomic_condition_expr(true, expr) do
    expr(not (^expr))
  end

  defp atomic_condition_expr(condition_expr, expr) do
    expr(^condition_expr and not (^expr))
  end

  defp atomic_params(changeset, action, params, opts) do
    if opts[:assume_casted?] do
      Enum.reduce_while(params, changeset, fn {key, value}, changeset ->
        cond do
          has_argument?(action, key) ->
            {:cont, %{changeset | arguments: Map.put(changeset.arguments, key, value)}}

          attribute = Ash.Resource.Info.attribute(changeset.resource, key) ->
            case Ash.Type.dump_to_native(attribute.type, value, attribute.constraints) do
              {:ok, value} ->
                {:cont, atomic_update(changeset, attribute.name, {:atomic, value})}

              :error ->
                {:halt,
                 {:error,
                  "Failed to dump #{inspect(value)} to native as type #{attribute.type}(#{inspect(attribute.constraints)})"}}
            end

          match?("_" <> _, key) ->
            {:cont, changeset}

          :* in List.wrap(opts[:skip_unknown_inputs]) ->
            {:cont, changeset}

          key in List.wrap(opts[:skip_unknown_inputs]) ->
            {:cont, changeset}

          true ->
            {:cont,
             add_error(
               changeset,
               NoSuchInput.exception(
                 resource: changeset.resource,
                 action: action.name,
                 input: key,
                 inputs: Ash.Resource.Info.action_inputs(changeset.resource, action.name)
               )
             )}
        end
      end)
    else
      Enum.reduce_while(params, changeset, fn {key, value}, changeset ->
        cond do
          has_argument?(action, key) ->
            {:cont, set_argument(changeset, key, value)}

          attribute = Ash.Resource.Info.attribute(changeset.resource, key) ->
            cond do
              attribute.name in action.accept ->
                {:cont, atomic_update(changeset, attribute.name, value)}

              :* in List.wrap(opts[:skip_unknown_inputs]) ->
                {:cont, changeset}

              key in List.wrap(opts[:skip_unknown_inputs]) ->
                {:cont, changeset}

              match?("_" <> _, key) ->
                {:cont, changeset}

              true ->
                {:cont,
                 add_error(
                   changeset,
                   NoSuchInput.exception(
                     resource: changeset.resource,
                     action: action.name,
                     input: key,
                     inputs: Ash.Resource.Info.action_inputs(changeset.resource, action.name)
                   )
                 )}
            end

          match?("_" <> _, key) ->
            {:cont, changeset}

          :* in List.wrap(opts[:skip_unknown_inputs]) ->
            {:cont, changeset}

          key in List.wrap(opts[:skip_unknown_inputs]) ->
            {:cont, changeset}

          true ->
            {:cont,
             add_error(
               changeset,
               NoSuchInput.exception(
                 resource: changeset.resource,
                 action: action.name,
                 input: key,
                 inputs: Ash.Resource.Info.action_inputs(changeset.resource, action.name)
               )
             )}
        end
      end)
    end
  end

  defp set_error_field(expr, field) do
    Ash.Filter.map(expr, fn
      %Ash.Query.Function.Error{arguments: [module, nested_expr]} = func
      when is_map(nested_expr) and not is_struct(nested_expr) ->
        %{func | arguments: [module, Map.put(nested_expr, :field, field)]}

      other ->
        other
    end)
  end

  @manage_types [:append_and_remove, :append, :remove, :direct_control, :create]

  @doc """
  Constructs a changeset for a given action, and validates it.

  Calls `for_create/4`, `for_update/4` or `for_destroy/4` based on the type of action passed in.

  See those functions for more explanation.
  """
  def for_action(initial, action, params \\ %{}, opts \\ []) do
    resource =
      case initial do
        %__MODULE__{resource: resource} -> resource
        %resource{} -> resource
        resource -> resource
      end

    action = get_action_entity(resource, action)

    case action.type do
      :create ->
        for_create(initial, action, params, opts)

      :update ->
        for_update(initial, action, params, opts)

      :destroy ->
        for_destroy(initial, action, params, opts)

      :read ->
        raise ArgumentError,
              "Passed a read action `#{inspect(resource)}.#{action.name}` into `Ash.Changeset.for_action/4`. Use `Ash.Query.for_read/4` instead."
    end
  end

  @for_create_opts [
    require?: [
      type: :boolean,
      default: false,
      doc:
        "If set to `false`, values are only required when the action is run (instead of immediately)."
    ],
    actor: [
      type: :any,
      doc:
        "set the actor, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)"
    ],
    authorize?: [
      type: :any,
      doc:
        "set authorize?, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)"
    ],
    tracer: [
      type: {:wrap_list, {:behaviour, Ash.Tracer}},
      doc:
        "A tracer to use. Will be carried over to the action. For more information see `Ash.Tracer`."
    ],
    tenant: [
      type: {:protocol, Ash.ToTenant},
      doc: "set the tenant on the changeset"
    ],
    skip_unknown_inputs: [
      type: {:wrap_list, {:or, [:atom, :string]}},
      doc:
        "A list of inputs that, if provided, will be ignored if they are not recognized by the action. Use `:*` to indicate all unknown keys."
    ],
    load: [
      type: :any,
      doc: "Data to load on the result after running the action."
    ],
    context: [
      type: :map,
      doc: "Context to set on the query, changeset, or input"
    ],
    private_arguments: [
      type: :map,
      doc: "Private argument values to set before validations and changes.",
      default: %{}
    ],
    return_skipped_upsert?: [
      type: :boolean,
      default: false,
      doc:
        "If `true`, and a record was *not* upserted because its filter prevented the upsert, the original record (which was *not* upserted) will be returned."
    ]
  ]

  @doc false
  def for_create_opts, do: @for_create_opts

  @doc """
  Constructs a changeset for a given create action, and validates it.

  Anything that is modified prior to `for_create/4` is validated against the rules of the action, while *anything after it is not*.
  This runs any `change`s contained on your action. To have your logic execute *only* during the action, you can use `after_action/2`
  or `before_action/2`.

  Multitenancy is *not* validated until an action is called. This allows you to avoid specifying a tenant until just before calling
  the domain action.

  ### Params
  `params` may be attributes, relationships, or arguments. You can safely pass user/form input directly into this function.
  Only public attributes and relationships are supported. If you want to change private attributes as well, see the
  Customization section below. `params` are stored directly as given in the `params` field of the changeset, which can be
  used to retrieve the originally input value.

  ### Opts

  #{Spark.Options.docs(@for_create_opts)}

  ### Customization

  A changeset can be provided as the first argument, instead of a resource, to allow
  setting specific attributes ahead of time.

  For example:

      MyResource
      |> Ash.Changeset.new()
      |> Ash.Changeset.change_attribute(:foo, 1)
      |> Ash.Changeset.for_create(:create, ...opts)

  Once a changeset has been validated by `for_create/4` (or `for_update/4`), it isn't validated again in the action.
  New changes added are validated individually, though. This allows you to create a changeset according
  to a given action, and then add custom changes if necessary.

  ### What does this function do?

  The following steps are run when calling `Ash.Changeset.for_create/4`.

  - Cast input params | This is any arguments in addition to any accepted attributes
  - Set argument defaults
  - Require any missing arguments
  - Validate all provided attributes are accepted
  - Require any accepted attributes that are `allow_nil?` false
  - Set any default values for attributes
  - Run action changes & validations
  - Run validations, or add them in `before_action` hooks if using `d:Ash.Resource.Dsl.actions.create.validate|before_action?`. Any global validations are skipped if the action has `skip_global_validations?` set to `true`.
  """
  def for_create(initial, action, params \\ %{}, opts \\ []) do
    changeset =
      case initial do
        %__MODULE__{action_type: :create} = changeset ->
          changeset

        resource when is_atom(resource) ->
          new(resource)

        other ->
          raise ArgumentError,
            message: """
            Initial must be a changeset with the action type of `:create`, or a resource.

            Got: #{inspect(other)}
            """
      end

    action =
      get_action_entity(changeset.resource, action) ||
        raise_no_action(changeset.resource, action, :create)

    upsert_condition =
      case opts[:upsert_condition] do
        nil -> action && action.upsert_condition
        other -> other
      end

    case action do
      %Ash.Resource.Actions.Update{name: name} ->
        raise ArgumentError,
          message: """
          Action #{inspect(changeset.resource)}.#{name} was passed to `Ash.Changeset.for_create`, but it is an update action.

          Perhaps you meant to call `Ash.Changeset.for_create` instead?
          """

      _ ->
        :ok
    end

    changeset
    |> set_context(%{
      private: %{
        upsert?: opts[:upsert?] || (action && action.upsert?) || false,
        return_skipped_upsert?:
          opts[:return_skipped_upsert?] || (action && action.return_skipped_upsert?) || false,
        upsert_identity: opts[:upsert_identity] || (action && action.upsert_identity),
        upsert_fields:
          expand_upsert_fields(
            opts[:upsert_fields] || (action && action.upsert_fields),
            changeset.resource
          ),
        upsert_condition: upsert_condition
      }
    })
    |> then(fn
      changeset when upsert_condition != nil -> filter(changeset, upsert_condition)
      changeset -> changeset
    end)
    |> do_for_action(action, params, opts)
  end

  @for_update_opts @for_create_opts

  @doc false
  def for_update_opts, do: @for_update_opts

  @doc """
  Constructs a changeset for a given update action, and validates it.

  Anything that is modified prior to `for_update/4` is validated against the rules of the action, while *anything after it is not*.

  ### What does this function do?

  The following steps are run when calling `Ash.Changeset.for_update/4`.

  - Cast input params | This is any arguments in addition to any accepted attributes
  - Set argument defaults
  - Require any missing arguments
  - Validate all provided attributes are accepted
  - Require any accepted attributes that are `allow_nil?` false
  - Set any default values for attributes
  - Run action changes & validations
  - Run validations, or add them in `before_action` hooks if using `d:Ash.Resource.Dsl.actions.update.validate|before_action?`. Any global validations are skipped if the action has `skip_global_validations?` set to `true`.
  """
  def for_update(initial, action, params \\ %{}, opts \\ []) do
    changeset =
      case initial do
        # We accept :destroy here to support soft deletes
        %__MODULE__{action_type: type} = changeset when type in [:update, :destroy] ->
          changeset

        %mod{} = struct when mod != __MODULE__ ->
          new(struct)

        other ->
          raise ArgumentError,
            message: """
            Initial must be a changeset with the action type of `:update` or `:destroy`, or a record.

            Got: #{inspect(other)}
            """
      end

    do_for_action(changeset, action, params, opts)
  end

  @doc """
  Constructs a changeset for a given destroy action, and validates it.

  ### Opts

  * `:actor` - set the actor, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)
  * `:tenant` - set the tenant on the changeset
  * `:private_arguments` - set private arguments on the changeset before validations and changes are run

  Anything that is modified prior to `for_destroy/4` is validated against the rules of the action, while *anything after it is not*.

  Once a changeset has been validated by `for_destroy/4`, it isn't validated again in the action.
  New changes added are validated individually, though. This allows you to create a changeset according
  to a given action, and then add custom changes if necessary.

  ### What does this function do?

  The following steps are run when calling `Ash.Changeset.for_destroy/4`.

  - Cast input params | This is any arguments in addition to any accepted attributes
  - Set argument defaults
  - Require any missing arguments
  - Validate all provided attributes are accepted
  - Require any accepted attributes that are `allow_nil?` false
  - Set any default values for attributes
  - Run action changes & validations
  - Run validations, or add them in `before_action` hooks if using `d:Ash.Resource.Dsl.actions.destroy.validate|before_action?`. Any global validations are skipped if the action has `skip_global_validations?` set to `true`.
  """
  def for_destroy(initial, action_or_name, params \\ %{}, opts \\ []) do
    changeset =
      case initial do
        %__MODULE__{} = changeset ->
          changeset
          |> Map.put(:action_type, :destroy)

        %_{} = struct ->
          struct
          |> new()
          |> Map.put(:action_type, :destroy)

        other ->
          raise ArgumentError,
            message: """
            Initial must be a changeset with the action type of `:destroy`, or a record.

            Got: #{inspect(other)}
            """
      end

    action =
      get_action_entity(changeset.resource, action_or_name) ||
        raise_no_action(changeset.resource, action_or_name, :destroy)

    domain =
      changeset.domain || opts[:domain] || Ash.Resource.Info.domain(changeset.resource) ||
        Ash.Actions.Helpers.maybe_embedded_domain(changeset.resource) ||
        raise ArgumentError,
          message:
            "Could not determine domain for changeset. Provide the `domain` option or configure a domain in the resource directly."

    changeset = %{changeset | domain: domain}

    if changeset.valid? do
      if action do
        try do
          if action.soft? do
            do_for_action(%{changeset | action_type: :destroy}, action, params, opts)
          else
            {changeset, opts} =
              Ash.Actions.Helpers.set_context_and_get_opts(
                domain,
                changeset,
                opts
              )

            name =
              fn ->
                "changeset:" <>
                  Ash.Resource.Info.trace_name(changeset.resource) <> ":#{action.name}"
              end

            Ash.Tracer.span :changeset,
                            name,
                            opts[:tracer] do
              Ash.Tracer.telemetry_span [:ash, :changeset], fn ->
                %{
                  resource_short_name: Ash.Resource.Info.short_name(changeset.resource)
                }
              end do
                metadata = fn ->
                  %{
                    resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
                    resource: changeset.resource,
                    actor: opts[:actor],
                    tenant: opts[:tenant],
                    action: action.name,
                    authorize?: opts[:authorize?]
                  }
                end

                Ash.Tracer.set_metadata(opts[:tracer], :changeset, metadata)

                changeset =
                  Enum.reduce(opts[:private_arguments] || %{}, changeset, fn {k, v}, changeset ->
                    set_private_argument_for_action(changeset, k, v)
                  end)

                changeset
                |> Map.put(:action, action)
                |> handle_errors(action.error_handler)
                |> set_actor(opts)
                |> set_authorize(opts)
                |> set_tracer(opts)
                |> load(opts[:load])
                |> set_tenant(opts[:tenant] || changeset.tenant)
                |> cast_params(action, params, opts)
                |> set_argument_defaults(action)
                |> require_arguments(action)
                |> validate_attributes_accepted(action)
                |> run_action_changes(
                  action,
                  opts[:actor],
                  opts[:authorize?],
                  opts[:tracer],
                  metadata
                )
                |> add_validations(opts[:tracer], metadata, opts[:actor])
                |> mark_validated(action.name)
                |> Map.put(:__validated_for_action__, action.name)
              end
            end
          end
        rescue
          e ->
            reraise Ash.Error.to_error_class(e,
                      stacktrace: __STACKTRACE__,
                      bread_crumbs: [
                        "building changeset for #{inspect(changeset.resource)}.#{action.name}"
                      ]
                    ),
                    __STACKTRACE__
        end
      else
        raise_no_action(changeset.resource, action_or_name, :destroy)
      end
    else
      changeset
    end
  end

  @doc """
  Adds multiple atomic changes to the changeset

  See `atomic_update/3` for more information.
  """
  @spec atomic_update(t(), map() | Keyword.t()) :: t()
  def atomic_update(changeset, atomics) when is_list(atomics) or is_map(atomics) do
    Enum.reduce(atomics, changeset, fn {key, value}, changeset ->
      atomic_update(changeset, key, value)
    end)
  end

  @doc """
  Adds an atomic change to the changeset.

  Atomic changes are applied by the data layer, and as such have guarantees that are not
  given by changes that are based on looking at the previous value and updating it. Here
  is an example of a change that is not safe to do concurrently:

  ```elixir
  change fn changeset, _ ->
    Ash.Changeset.change_attribute(changeset, :score, changeset.data.score + 1)
  end
  ```

  If two processes run this concurrently, they will both read the same value of `score`, and
  set the new score to the same value. This means that one of the increments will be lost.
  If you were to instead do this using `atomic_update`, you would get the correct result:

  ```elixir
  Ash.Changeset.atomic_update(changeset, :score, expr(score + 1))
  ```

  There are drawbacks/things to consider, however. The first is that atomic update results
  are not known until after the action is run. The following functional validation would not
  be able to enforce the score being less than 10, because the atomic happens after the validation.

  ```elixir
  validate fn changeset, _ ->
    if Ash.Changeset.get_attribute(changeset, :score) < 10 do
      :ok
    else
      {:error, field: :score, message: "must be less than 10"}
    end
  end
  ```

  If you want to use atomic updates, it is suggested to write module-based validations & changes,
  and implement the appropriate atomic callbacks on those modules. All builtin validations and changes
  implement these callbacks in addition to the standard callbacks. Validations will only be run atomically
  when the entire action is being run atomically or if one of the relevant fields is being updated atomically.
  """
  @spec atomic_update(t(), atom(), {:atomic, Ash.Expr.t()} | Ash.Expr.t()) :: t()
  def atomic_update(changeset, key, {:atomic, value}) do
    %{
      changeset
      | atomics: Keyword.put(changeset.atomics, key, value),
        no_atomic_constraints: [key | changeset.no_atomic_constraints]
    }
  end

  def atomic_update(changeset, key, value) do
    attribute =
      Ash.Resource.Info.attribute(changeset.resource, key) ||
        raise "Unknown attribute `#{inspect(changeset.resource)}.#{inspect(key)}`"

    value =
      Ash.Expr.walk_template(value, fn
        {:_atomic_ref, field} ->
          atomic_ref(changeset, field)

        other ->
          other
      end)

    case Ash.Type.cast_atomic(attribute.type, value, attribute.constraints) do
      {:atomic, value} ->
        value =
          if attribute.primary_key? do
            value
          else
            set_error_field(value, attribute.name)
          end

        %{changeset | atomics: Keyword.put(changeset.atomics, attribute.name, value)}
        |> record_atomic_update_for_atomic_upgrade(attribute.name, value)

      {:ok, value} ->
        allow_nil? =
          if is_nil(changeset.action) do
            true
          else
            attribute.allow_nil? and attribute.name not in changeset.action.require_attributes
          end

        if is_nil(value) and !allow_nil? do
          add_required_attribute_error(changeset, attribute)
        else
          %{
            changeset
            | attributes: Map.put(changeset.attributes, attribute.name, value),
              atomics: Keyword.delete(changeset.atomics, attribute.name)
          }
          |> store_casted_attribute(attribute.name, value, true)
        end

      {:error, error} ->
        add_invalid_errors(value, :attribute, changeset, attribute, error)

      {:not_atomic, message} ->
        add_error(
          changeset,
          "Cannot atomically update #{inspect(changeset.resource)}.#{attribute.name}: #{message}"
        )
    end
  end

  @doc false
  def handle_allow_nil_atomics(changeset, actor) do
    changeset.atomics
    |> Enum.reduce(changeset, fn {key, value}, changeset ->
      attribute = Ash.Resource.Info.attribute(changeset.resource, key)

      if attribute.primary_key? do
        changeset
      else
        allow_nil? =
          attribute.allow_nil? and attribute.name not in changeset.action.require_attributes

        if allow_nil? || not Ash.Expr.can_return_nil?(value) do
          value
        else
          if Ash.DataLayer.data_layer_can?(changeset.resource, :expr_error) do
            expr(
              if is_nil(^value) do
                error(
                  ^Ash.Error.Changes.Required,
                  %{
                    field: ^attribute.name,
                    type: ^:attribute,
                    resource: ^changeset.resource
                  }
                )
              else
                ^value
              end
            )
          else
            {:not_atomic,
             "Failed to validate expression #{inspect(value)}: data layer `#{Ash.DataLayer.data_layer(changeset.resource)}` does not support the expr_error"}
          end
        end
        |> case do
          {:not_atomic, error} ->
            Ash.Changeset.add_error(changeset, error)

          value ->
            %{changeset | atomics: Keyword.put(changeset.atomics, key, value)}
        end
      end
    end)
    |> Ash.Changeset.hydrate_atomic_refs(actor, eager?: true)
    |> then(fn changeset ->
      if changeset.action.type == :update do
        attributes =
          changeset.attributes
          |> Map.keys()
          |> Enum.reject(&Ash.Resource.Info.attribute(changeset.resource, &1).allow_nil?)

        require_values(changeset, :update, false, attributes)
      else
        changeset
      end
    end)
  end

  @doc """
  Set the result of the action. This will prevent running the underlying datalayer behavior
  """
  @spec set_result(t(), term) :: t()
  def set_result(changeset, result) do
    set_context(changeset, %{private: %{action_result: result}})
  end

  @doc """
  Turns the special case {:replace, fields}, :replace_all and {:replace_all_except, fields} upsert_fields
  options into a list of fields
  """
  def expand_upsert_fields({:replace, fields}, _) do
    fields
  end

  def expand_upsert_fields(:replace_all, resource) do
    resource
    |> Ash.Resource.Info.attributes()
    |> Enum.map(fn %{name: name} -> name end)
  end

  def expand_upsert_fields({:replace_all_except, except_fields}, resource) do
    resource
    |> Ash.Resource.Info.attributes()
    |> Enum.map(fn %{name: name} -> name end)
    |> Enum.reject(fn name -> name in except_fields end)
  end

  def expand_upsert_fields(fields, _), do: fields

  @spec set_on_upsert(t(), list(atom)) :: Keyword.t()

  @doc false
  def set_on_upsert(changeset, upsert_keys) do
    keys = upsert_keys || Ash.Resource.Info.primary_key(changeset.resource)

    if changeset.context[:private][:upsert_fields] do
      Keyword.new(changeset.context[:private][:upsert_fields], fn key ->
        {key, Ash.Changeset.get_attribute(changeset, key)}
      end)
    else
      explicitly_changing_attributes =
        Enum.map(
          Map.keys(changeset.attributes) -- (Map.get(changeset, :defaults, []) -- keys),
          fn key ->
            {key, Ash.Changeset.get_attribute(changeset, key)}
          end
        )

      changeset
      |> upsert_update_defaults()
      |> Keyword.merge(explicitly_changing_attributes)
    end
  end

  defp upsert_update_defaults(changeset) do
    changeset.resource
    |> static_defaults()
    |> Enum.concat(lazy_matching_defaults(changeset.resource))
    |> Enum.concat(lazy_non_matching_defaults(changeset.resource))
  end

  defp static_defaults(resource) do
    resource
    |> Ash.Resource.Info.static_default_attributes(:update)
    |> Enum.map(&{&1.name, &1.update_default})
  end

  defp lazy_non_matching_defaults(resource) do
    resource
    |> Ash.Resource.Info.lazy_non_matching_default_attributes(:update)
    |> Enum.map(&{&1.name, &1.update_default})
  end

  defp lazy_matching_defaults(resource) do
    resource
    |> Ash.Resource.Info.lazy_matching_default_attributes(:update)
    |> Enum.group_by(& &1.update_default)
    |> Enum.flat_map(fn {default_fun, attributes} ->
      default_value =
        case default_fun do
          function when is_function(function) ->
            function.()

          {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
            apply(m, f, a)
        end

      Enum.map(attributes, &{&1.name, default_value})
    end)
  end

  defp do_for_action(changeset, action_or_name, params, opts) do
    domain =
      changeset.domain || opts[:domain] || Ash.Resource.Info.domain(changeset.resource) ||
        Ash.Actions.Helpers.maybe_embedded_domain(changeset.resource) ||
        raise ArgumentError,
          message:
            "Could not determine domain for changeset. Provide the `domain` option or configure a domain in the resource directly."

    changeset = %{changeset | domain: domain}

    if changeset.valid? do
      action = get_action_entity(changeset.resource, action_or_name)

      {changeset, opts} =
        Ash.Actions.Helpers.set_context_and_get_opts(
          domain,
          %{changeset | action: action},
          opts
        )

      if action do
        name =
          fn ->
            "changeset:" <> Ash.Resource.Info.trace_name(changeset.resource) <> ":#{action.name}"
          end

        try do
          Ash.Tracer.span :changeset,
                          name,
                          opts[:tracer] do
            Ash.Tracer.telemetry_span [:ash, :changeset], fn ->
              %{
                resource_short_name: Ash.Resource.Info.short_name(changeset.resource)
              }
            end do
              metadata = fn ->
                %{
                  resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
                  resource: changeset.resource,
                  actor: opts[:actor],
                  tenant: opts[:tenant],
                  action: action.name,
                  authorize?: opts[:authorize?]
                }
              end

              Ash.Tracer.set_metadata(opts[:tracer], :changeset, metadata)

              changeset =
                Enum.reduce(opts[:private_arguments] || %{}, changeset, fn {k, v}, changeset ->
                  set_private_argument_for_action(changeset, k, v)
                end)

              changeset =
                changeset
                |> prepare_changeset_for_action(action, opts)
                |> handle_params(action, params, opts)
                |> run_action_changes(
                  action,
                  opts[:actor],
                  opts[:authorize?],
                  opts[:tracer],
                  metadata
                )
                |> add_validations(opts[:tracer], metadata, opts[:actor])
                |> mark_validated(action.name)
                |> eager_validate_identities()
                |> Map.put(:__validated_for_action__, action.name)

              if Keyword.get(opts, :require?, true) do
                require_values(changeset, action.type)
              else
                changeset
              end
            end
          end
        rescue
          e ->
            reraise Ash.Error.to_error_class(e,
                      stacktrace: __STACKTRACE__,
                      bread_crumbs: [
                        "building changeset for #{inspect(changeset.resource)}.#{action.name}"
                      ]
                    ),
                    __STACKTRACE__
        end
      else
        raise_no_action(changeset.resource, action_or_name, changeset.action_type)
      end
    else
      action = get_action_entity(changeset.resource, action_or_name)

      {changeset, _opts} =
        Ash.Actions.Helpers.set_context_and_get_opts(
          domain,
          changeset,
          opts
        )

      %{changeset | action: action}
    end
  end

  @doc """
  Checks if an argument is not nil or an attribute is not nil, either in the original data, or that it is not being changed to a `nil` value if it is changing.

  This also accounts for the `accessing_from` context that is set when using `manage_relationship`, so it is aware that a particular value
  *will* be set by `manage_relationship` even if it isn't currently being set.
  """
  def present?(changeset, attribute) do
    arg_or_attribute_value =
      case Ash.Changeset.fetch_argument(changeset, attribute) do
        {:ok, nil} ->
          Ash.Changeset.get_attribute(changeset, attribute)

        :error ->
          Ash.Changeset.get_attribute(changeset, attribute)

        {:ok, value} ->
          {:ok, value}
      end

    arg_or_attribute_value =
      case arg_or_attribute_value do
        %Ash.NotLoaded{} ->
          nil

        %Ash.ForbiddenField{} ->
          nil

        other ->
          other
      end

    not is_nil(arg_or_attribute_value) ||
      belongs_to_attr_of_rel_being_managed?(attribute, changeset, true) ||
      is_belongs_to_rel_being_managed?(attribute, changeset, true)
  end

  @doc """
  Checks if an attribute is not nil, either in the original data, or that it is not being changed to a `nil` value if it is changing.

  This also accounts for the `accessing_from` context that is set when using `manage_relationship`, so it is aware that a particular value
  *will* be set by `manage_relationship` even if it isn't currently being set.
  """
  def attribute_present?(changeset, attribute) do
    attribute_value = Ash.Changeset.get_attribute(changeset, attribute)

    attribute_value =
      case attribute_value do
        %Ash.NotLoaded{} ->
          nil

        %Ash.ForbiddenField{} ->
          nil

        other ->
          other
      end

    not is_nil(attribute_value) ||
      belongs_to_attr_of_rel_being_managed?(attribute, changeset, true) ||
      is_belongs_to_rel_being_managed?(attribute, changeset, true)
  end

  def prepare_changeset_for_action(changeset, action, opts) do
    changeset
    |> Map.put(:action, action)
    |> reset_arguments()
    |> handle_errors(action.error_handler)
    |> set_actor(opts)
    |> set_authorize(opts)
    |> set_tracer(opts)
    |> load(opts[:load])
    |> timeout(changeset.timeout || opts[:timeout])
    |> set_tenant(opts[:tenant] || changeset.tenant || changeset.data.__metadata__[:tenant])
    |> Map.put(:action_type, action.type)
  end

  defp reset_arguments(%{arguments: arguments} = changeset) do
    Enum.reduce(arguments, changeset, fn {key, value}, changeset ->
      set_argument(changeset, key, value)
    end)
  end

  @doc false
  def handle_params(changeset, action, params, handle_params_opts \\ []) do
    if Keyword.get(handle_params_opts, :cast_params?, true) do
      cast_params(changeset, action, params || %{}, handle_params_opts)
    else
      changeset
    end
    |> set_argument_defaults(action)
    |> require_arguments(action)
    |> validate_attributes_accepted(action)
    |> require_values(action.type, false, action.require_attributes)
    |> set_defaults(changeset.action_type, false)
  end

  defp get_action_entity(resource, name) when is_atom(name),
    do: Ash.Resource.Info.action(resource, name)

  defp get_action_entity(_resource, %struct{} = action)
       when struct in [
              Ash.Resource.Actions.Update,
              Ash.Resource.Actions.Create,
              Ash.Resource.Actions.Destroy
            ] do
    action
  end

  defp get_action_entity(_resource, action) do
    raise ArgumentError, "Invalid value provided for action: #{inspect(action)}"
  end

  defp eager_validate_identities(changeset) do
    identities =
      changeset.resource
      |> Ash.Resource.Info.identities()

    case identities do
      [] ->
        changeset

      identities ->
        Enum.reduce(identities, changeset, fn identity, changeset ->
          changeset =
            if identity.eager_check_with do
              validate_identity(changeset, identity, identity.eager_check_with)
            else
              changeset
            end

          if identity.pre_check_with do
            before_action(changeset, &validate_identity(&1, identity, identity.pre_check_with))
          else
            changeset
          end
        end)
    end
  end

  defp validate_identity(
         %{context: %{private: %{upsert?: true, upsert_identity: name}}} = changeset,
         %{name: name},
         _domain
       ) do
    changeset
  end

  defp validate_identity(
         %{action: %{soft?: true}} = changeset,
         identity,
         domain
       ) do
    do_validate_identity(changeset, identity, domain)
  end

  defp validate_identity(
         %{action: %{type: type}} = changeset,
         identity,
         domain
       )
       when type in [:create, :update] do
    do_validate_identity(changeset, identity, domain)
  end

  defp validate_identity(
         %{action: %{type: type}} = changeset,
         identity,
         domain
       )
       when type in [:create, :update] do
    do_validate_identity(changeset, identity, domain)
  end

  defp validate_identity(changeset, _, _), do: changeset

  defp do_validate_identity(changeset, identity, domain) do
    if changeset.context[:private][:upsert_identity] == identity.name do
      changeset
    else
      if changeset.action_type == :create ||
           Enum.any?(identity.keys, &changing_attribute?(changeset, &1)) do
        action = Ash.Resource.Info.primary_action(changeset.resource, :read).name

        if Enum.any?(identity.keys, fn key ->
             Ash.Resource.Info.calculation(changeset.resource, key)
           end) do
          raise ArgumentError, "Cannot pre or eager check an identity based on calculated fields."
        end

        values =
          Enum.map(identity.keys, fn key ->
            case Ash.Changeset.get_attribute(changeset, key) do
              nil ->
                {key, is_nil: true}

              value ->
                {key, value}
            end
          end)

        if identity.nils_distinct? && Enum.any?(values, &(elem(&1, 1) == [is_nil: true])) do
          changeset
        else
          tenant =
            if identity.all_tenants? do
              if !Ash.Resource.Info.multitenancy_global?(changeset.resource) do
                raise ArgumentError,
                  message: """
                  Cannot pre or eager check an identity that has `all_tenants?: true`
                  unless the resource supports global multitenancy.
                  """
              end

              nil
            else
              changeset.tenant
            end

          changeset.resource
          |> Ash.Query.for_read(action, %{},
            tenant: tenant,
            actor: changeset.context[:private][:actor],
            authorize?: changeset.context[:private][:authorize?],
            tracer: changeset.context[:private][:tracer],
            domain: domain
          )
          |> Ash.Query.do_filter(values)
          |> Ash.Query.limit(1)
          |> Ash.Query.set_context(%{private: %{internal?: true}})
          |> Ash.read_one(authorize?: false)
          |> case do
            {:ok, nil} ->
              changeset

            {:ok, _} ->
              error =
                Ash.Error.Changes.InvalidChanges.exception(
                  fields: identity.field_names || identity.keys,
                  message: identity.message || "has already been taken"
                )

              add_error(changeset, error)

            {:error, error} ->
              add_error(changeset, error)
          end
        end
      else
        changeset
      end
    end
  end

  defp require_arguments(changeset, action) do
    action.arguments
    |> Enum.filter(&(&1.allow_nil? == false))
    |> Enum.reduce(changeset, fn argument, changeset ->
      case fetch_argument(changeset, argument.name) do
        {:ok, value} when not is_nil(value) ->
          changeset

        _ ->
          if argument.name in changeset.invalid_keys do
            changeset
          else
            add_error(
              changeset,
              Ash.Error.Changes.Required.exception(
                resource: changeset.resource,
                field: argument.name,
                type: :argument
              )
            )
          end
      end
    end)
  end

  defp set_argument_defaults(changeset, action) do
    Enum.reduce(action.arguments, changeset, fn argument, changeset ->
      case fetch_argument(changeset, argument.name) do
        :error ->
          if is_nil(argument.default) do
            changeset
          else
            %{
              changeset
              | arguments: Map.put(changeset.arguments, argument.name, default(:create, argument))
            }
          end

        _ ->
          changeset
      end
    end)
  end

  defp set_actor(changeset, opts) do
    if Keyword.has_key?(opts, :actor) do
      put_context(changeset, :private, %{actor: opts[:actor]})
    else
      changeset
    end
  end

  defp set_authorize(changeset, opts) do
    if Keyword.has_key?(opts, :authorize?) do
      put_context(changeset, :private, %{authorize?: opts[:authorize?]})
    else
      changeset
    end
  end

  defp set_tracer(changeset, opts) do
    if Keyword.has_key?(opts, :tracer) do
      put_context(changeset, :private, %{tracer: opts[:tracer]})
    else
      changeset
    end
  end

  defp raise_no_action(resource, action, type) do
    available_actions =
      resource
      |> Ash.Resource.Info.actions()
      |> Enum.filter(&(&1.type == type))
      |> Enum.map_join("\n", &"    - `#{inspect(&1.name)}`")

    raise ArgumentError,
      message: """
      No such #{type} action on resource #{inspect(resource)}: #{String.slice(inspect(action), 0..50)}

      Example Call:

        Ash.Changeset.for_#{type}(changeset_or_record, :action_name, input, options)

      Available #{type} actions:

      #{available_actions}
      """
  end

  defp mark_validated(changeset, action_name) do
    %{changeset | __validated_for_action__: action_name}
  end

  @doc false
  def validate_multitenancy(changeset) do
    if Ash.Resource.Info.multitenancy_strategy(changeset.resource) &&
         not Ash.Resource.Info.multitenancy_global?(changeset.resource) &&
         is_nil(changeset.tenant) do
      add_error(
        changeset,
        "#{inspect(changeset.resource)} changesets require a tenant to be specified"
      )
    else
      changeset
    end
  end

  defp cast_params(changeset, action, params, opts) do
    changeset = %{
      changeset
      | params: Map.merge(changeset.params, Enum.into(params, %{})),
        casted_arguments: %{},
        casted_attributes: %{}
    }

    skip_unknown_inputs = opts[:skip_unknown_inputs] || []

    Enum.reduce(params, changeset, fn {name, value}, changeset ->
      cond do
        !Ash.Resource.Info.action_input?(changeset.resource, action.name, name) ->
          cond do
            :* in skip_unknown_inputs ->
              changeset

            name in skip_unknown_inputs ->
              changeset

            match?("_" <> _, name) ->
              changeset

            true ->
              add_error(
                changeset,
                NoSuchInput.exception(
                  resource: changeset.resource,
                  action: action.name,
                  input: name,
                  inputs: Ash.Resource.Info.action_inputs(changeset.resource, action.name)
                )
              )
          end

        argument = get_action_argument(action, name) ->
          do_set_argument(changeset, argument.name, value, true)

        attr = Ash.Resource.Info.attribute(changeset.resource, name) ->
          if attr.writable? && attr.name in changeset.action.accept do
            do_change_attribute(changeset, attr.name, value, true)
          else
            cond do
              name in skip_unknown_inputs ->
                changeset

              match?("_" <> _, name) ->
                changeset

              :* in skip_unknown_inputs ->
                changeset

              true ->
                add_error(
                  changeset,
                  NoSuchInput.exception(
                    resource: changeset.resource,
                    action: action.name,
                    input: name,
                    inputs: Ash.Resource.Info.action_inputs(changeset.resource, action.name)
                  )
                )
            end
          end

        true ->
          changeset
      end
    end)
  end

  defp get_action_argument(action, name) when is_atom(name) do
    Enum.find(action.arguments, &(&1.public? && &1.name == name))
  end

  defp get_action_argument(action, name) when is_binary(name) do
    Enum.find(action.arguments, &(to_string(&1.name) == name))
  end

  defp has_argument?(action, name) when is_atom(name) do
    Enum.any?(action.arguments, &(&1.name == name))
  end

  defp has_argument?(action, name) when is_binary(name) do
    Enum.any?(action.arguments, &(to_string(&1.name) == name))
  end

  defp validate_attributes_accepted(changeset, %{accept: nil}), do: changeset

  defp validate_attributes_accepted(changeset, %{accept: accepted_attributes}) do
    changeset.attributes
    |> Map.keys()
    |> Kernel.--(accepted_attributes)
    |> Enum.reduce(changeset, fn key, changeset ->
      add_error(
        changeset,
        InvalidAttribute.exception(
          field: key,
          message: "cannot be changed",
          value: changeset.attributes[key]
        )
      )
    end)
  end

  defp run_action_changes(changeset, %{changes: changes}, actor, authorize?, tracer, metadata) do
    changeset = set_phase(changeset, :validate)

    changes =
      Enum.map(changes, &{:action, &1}) ++
        Enum.map(
          Ash.Resource.Info.changes(changeset.resource, changeset.action_type),
          &{:global, &1}
        )

    context = %{
      actor: actor,
      tenant: changeset.tenant,
      authorize?: authorize? || false,
      tracer: tracer
    }

    changes
    |> Enum.reduce(changeset, fn {location, change_or_validation}, changeset ->
      try do
        context = %{context | tenant: changeset.tenant}

        run_change_or_validation(
          change_or_validation,
          changeset,
          context,
          tracer,
          metadata,
          actor
        )
      rescue
        e ->
          bread_crumb =
            case change_or_validation do
              %{validation: {Ash.Resource.Validation.Function, opts}} ->
                "#{location} validation #{inspect(opts[:fun])}"

              %{validation: other} ->
                "#{location} validation #{inspect(other)}"

              %{change: {Ash.Resource.Change.Function, opts}} ->
                "#{location} change #{inspect(opts[:fun])}"

              %{change: other} ->
                "#{location} change #{inspect(other)}"
            end

          reraise Ash.Error.to_error_class(e,
                    stacktrace: __STACKTRACE__,
                    bread_crumbs: [
                      bread_crumb
                    ]
                  ),
                  __STACKTRACE__
      end
    end)
    |> clear_phase()
  end

  defp run_change_or_validation(change_or_validation, changeset, context, tracer, metadata, actor) do
    case {change_or_validation, changeset} do
      {%{only_when_valid?: true}, %{valid?: false} = changeset} ->
        changeset

      {%{always_atomic?: true, change: {module, _}} = change, changeset} ->
        if changeset.action.type == :create do
          Ash.Changeset.add_error(
            changeset,
            Ash.Error.Framework.CanNotBeAtomic.exception(
              resource: changeset.resource,
              change: module,
              reason: "Create actions cannot be made atomic"
            )
          )
        else
          case run_atomic_change(changeset, change, context) do
            {:not_atomic, reason} ->
              Ash.Changeset.add_error(
                changeset,
                "Change #{inspect(module)} was configured with `always_atomic?` to `true`, but could not be done atomically: #{reason}"
              )

            changeset ->
              changeset
          end
        end

      {%{always_atomic?: true, validation: {module, _}} = change, changeset} ->
        if changeset.action.type == :create do
          Ash.Changeset.add_error(
            changeset,
            Ash.Error.Framework.CanNotBeAtomic.exception(
              resource: changeset.resource,
              change: module,
              reason: "Create actions cannot be made atomic"
            )
          )
        else
          case run_atomic_validation(changeset, change, context) do
            {:not_atomic, reason} ->
              Ash.Changeset.add_error(
                changeset,
                "Validation #{change.module} must be run atomically, but it could not be: #{reason}"
              )

            changeset ->
              changeset
          end
        end

      {%{change: {module, opts}, where: where} = change, changeset} ->
        if module.has_change?() do
          if Enum.all?(where || [], fn {module, opts} ->
               Ash.Tracer.span :validation,
                               fn -> "change condition: #{inspect(module)}" end,
                               tracer do
                 Ash.Tracer.telemetry_span [:ash, :validation], fn ->
                   %{
                     resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
                     validation: inspect(module)
                   }
                 end do
                   Ash.Tracer.set_metadata(tracer, :validation, metadata)

                   opts =
                     Ash.Expr.fill_template(
                       opts,
                       actor: actor,
                       tenant: changeset.to_tenant,
                       args: changeset.arguments,
                       context: changeset.context,
                       changeset: changeset
                     )

                   module.validate(
                     changeset,
                     opts,
                     struct(Ash.Resource.Validation.Context, context)
                   ) == :ok
                 end
               end
             end) do
            Ash.Tracer.span :change, fn -> "change: #{inspect(module)}" end, tracer do
              Ash.Tracer.telemetry_span [:ash, :change], fn ->
                %{
                  resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
                  change: inspect(module)
                }
              end do
                {:ok, opts} = module.init(opts)

                Ash.Tracer.set_metadata(tracer, :change, metadata)

                opts =
                  Ash.Expr.fill_template(
                    opts,
                    actor: actor,
                    tenant: changeset.to_tenant,
                    args: changeset.arguments,
                    context: changeset.context,
                    changeset: changeset
                  )

                module.change(
                  changeset,
                  opts,
                  struct(Ash.Resource.Change.Context, context)
                )
              end
            end
          else
            changeset
          end
        else
          if changeset.action.type == :create do
            Ash.Changeset.add_error(
              changeset,
              Ash.Error.Framework.CanNotBeAtomic.exception(
                resource: changeset.resource,
                change: module,
                reason: "Create actions cannot be made atomic"
              )
            )
          else
            case run_atomic_change(changeset, change, context) do
              {:not_atomic, reason} ->
                Ash.Changeset.add_error(
                  changeset,
                  "Change #{inspect(module)} must be atomic, but could not be done atomically: #{reason}"
                )

              changeset ->
                changeset
            end
          end
        end

      {%{validation: _} = validation, changeset} ->
        validate(changeset, validation, tracer, metadata, actor)
    end
  end

  @doc false
  def hydrate_atomic_refs(changeset, actor, opts \\ []) do
    changeset
    |> add_atomic_validations(actor, opts)
    |> do_hydrate_atomic_refs(actor)
    |> case do
      {:ok, hydrated_changeset} ->
        hydrated_changeset
        |> extract_atomic_eager_errors(actor, opts)
        |> case do
          %Ash.Changeset{} = changeset ->
            %{changeset | atomic_validations: []}

          other ->
            other
        end

      other ->
        other
    end
  end

  defp do_hydrate_atomic_refs(changeset, actor) do
    Enum.reduce_while(
      changeset.atomics,
      {:ok, %{changeset | atomics: []}},
      fn {key, expr}, {:ok, changeset} ->
        expr =
          Ash.Expr.fill_template(
            expr,
            actor: actor,
            tenant: changeset.to_tenant,
            args: changeset.arguments,
            context: changeset.context,
            changeset: changeset
          )

        case Ash.Filter.hydrate_refs(expr, %{resource: changeset.resource, public?: false}) do
          {:ok, expr} ->
            {:cont, {:ok, %{changeset | atomics: Keyword.put(changeset.atomics, key, expr)}}}

          {:error, error} ->
            {:halt,
             {:not_atomic, "Failed to validate expression #{inspect(expr)}: #{inspect(error)}"}}
        end
      end
    )
  end

  @doc false
  def apply_atomic_constraints(changeset, actor, opts \\ []) do
    changeset
    |> do_apply_atomic_constraints()
    |> do_hydrate_atomic_refs(actor)
    |> case do
      {:ok, changeset} ->
        changeset

      {:not_atomic, error} ->
        add_error(changeset, error)
    end
    |> extract_atomic_eager_errors(actor, opts)
  end

  defp do_apply_atomic_constraints(%Ash.Changeset{} = changeset) do
    Enum.reduce(changeset.atomics, %{changeset | atomics: []}, fn {key, value}, changeset ->
      attribute = Ash.Resource.Info.attribute(changeset.resource, key)

      if key in changeset.no_atomic_constraints do
        value =
          if(attribute.primary_key?) do
            value
          else
            set_error_field(value, attribute.name)
          end

        %{changeset | atomics: Keyword.put(changeset.atomics, key, value)}
      else
        case Ash.Type.apply_atomic_constraints(attribute.type, value, attribute.constraints) do
          {:ok, ^value} ->
            %{changeset | atomics: Keyword.put(changeset.atomics, key, value)}

          {:ok, value} ->
            value = expr(type(^value, ^attribute.type, ^attribute.constraints))

            value =
              if attribute.primary_key? do
                value
              else
                set_error_field(value, attribute.name)
              end

            %{changeset | atomics: Keyword.put(changeset.atomics, key, value)}

          {:error, error} ->
            add_error(changeset, error)
        end
      end
    end)
  end

  defp do_apply_atomic_constraints(value), do: value

  defp extract_atomic_eager_errors(changeset, _actor, opts) do
    if Keyword.get(opts, :eager?, true) do
      Enum.reduce(
        changeset.atomics,
        changeset,
        fn
          {_key,
           %Ash.Query.Function.Error{
             arguments: arguments
           } = error},
          changeset ->
            Enum.reduce_while(arguments, {:ok, []}, fn argument, {:ok, args} ->
              case Ash.Expr.eval(argument,
                     resource: changeset.resource,
                     unknown_on_unknown_refs?: true
                   ) do
                {:ok, value} ->
                  {:cont, {:ok, [value | args]}}

                _ ->
                  {:halt, :error}
              end
            end)
            |> case do
              {:ok, args} ->
                error = %{error | arguments: Enum.reverse(args)}

                case Ash.Expr.eval(error,
                       resource: changeset.resource,
                       unknown_on_unknown_refs?: true
                     ) do
                  {:error, error} ->
                    Ash.Changeset.add_error(changeset, error)

                  _ ->
                    changeset
                end

              _ ->
                changeset
            end

          {_key, _value}, changeset ->
            changeset
        end
      )
    else
      changeset
    end
  end

  @doc false
  def add_atomic_validations(changeset, actor, opts) do
    eager? = Keyword.get(opts, :eager?, true)

    changeset.atomic_validations
    |> Enum.reduce_while(
      %{changeset | atomic_validations: []},
      fn
        {condition_expr, error_expr}, changeset ->
          condition_expr =
            Ash.Expr.fill_template(
              condition_expr,
              actor: actor,
              tenant: changeset.to_tenant,
              args: changeset.arguments,
              context: changeset.context,
              changeset: changeset
            )

          error_expr =
            Ash.Expr.fill_template(
              error_expr,
              actor: actor,
              tenant: changeset.to_tenant,
              args: changeset.arguments,
              context: changeset.context,
              changeset: changeset
            )

          with {:expr, {:ok, condition_expr}, _expr} <-
                 {:expr,
                  Ash.Filter.hydrate_refs(condition_expr, %{
                    resource: changeset.resource,
                    public?: false
                  }), condition_expr},
               {:expr, {:ok, error_expr}, _} <-
                 {:expr,
                  Ash.Filter.hydrate_refs(error_expr, %{
                    resource: changeset.resource,
                    public?: false
                  }), error_expr} do
            eager_condition_expr =
              if eager? do
                Ash.Expr.eval(condition_expr,
                  resource: changeset.resource,
                  unknown_on_unknown_refs?: true
                )
              else
                {:ok, condition_expr}
              end

            eager_error_expr =
              if eager? do
                Ash.Expr.eval(error_expr,
                  resource: changeset.resource,
                  unknown_on_unknown_refs?: true
                )
              else
                {:ok, error_expr}
              end

            case extract_eager_error(eager_condition_expr, eager_error_expr, eager?) do
              {:ok, error} ->
                {:cont,
                 add_error(
                   changeset,
                   error
                 )}

              :error ->
                if changeset.action.type == :update || Map.get(changeset.action, :soft?) do
                  [first_pkey_field | _] = Ash.Resource.Info.primary_key(changeset.resource)

                  full_atomic_update =
                    expr(
                      if ^condition_expr do
                        ^error_expr
                      else
                        ^atomic_ref(changeset, first_pkey_field)
                      end
                    )

                  case Ash.Filter.hydrate_refs(full_atomic_update, %{
                         resource: changeset.resource,
                         public: false
                       }) do
                    {:ok, full_atomic_update} ->
                      {:cont,
                       atomic_update(
                         changeset,
                         first_pkey_field,
                         full_atomic_update
                       )}

                    {:error, error} ->
                      if Keyword.get(opts, :error_is_not_atomic?, false) do
                        {:halt,
                         {:not_atomic,
                          "Failed to validate expression #{inspect(full_atomic_update)}: #{inspect(error)}"}}
                      else
                        {:cont,
                         Ash.Changeset.add_error(
                           changeset,
                           "Failed to validate expression #{inspect(full_atomic_update)}: #{inspect(error)}"
                         )}
                      end
                  end
                else
                  {:cont,
                   filter(
                     changeset,
                     expr(
                       if ^condition_expr do
                         ^error_expr
                       else
                         true
                       end
                     )
                   )}
                end
            end
          else
            {:expr, {:error, error}, expr} ->
              if Keyword.get(opts, :error_is_not_atomic?, false) do
                {:halt,
                 {:not_atomic,
                  "Failed to validate expression #{inspect(expr)}: #{inspect(error)}"}}
              else
                {:cont,
                 Ash.Changeset.add_error(
                   changeset,
                   "Failed to validate expression #{inspect(expr)}: #{inspect(error)}"
                 )}
              end
          end
      end
    )
  end

  defp extract_eager_error({:ok, true}, {:error, %{class: :invalid} = error}, true) do
    {:ok, error}
  end

  defp extract_eager_error(_, _, _), do: :error

  @doc false
  def set_defaults(changeset, action_type, lazy? \\ false)

  def set_defaults(changeset, :create, lazy?) do
    with_static_defaults =
      changeset.resource
      |> Ash.Resource.Info.static_default_attributes(:create)
      |> Enum.reduce(changeset, fn attribute, changeset ->
        if changing_attribute?(changeset, attribute.name) do
          changeset
        else
          changeset
          |> force_change_attribute(attribute.name, default(:create, attribute))
          |> Map.update!(:defaults, fn defaults ->
            [attribute.name | defaults]
          end)
        end
      end)
      |> Map.update!(:defaults, &Enum.uniq/1)

    if lazy? do
      set_lazy_defaults(with_static_defaults, :create)
    else
      with_static_defaults
    end
    |> Map.update!(:defaults, &Enum.uniq/1)
  end

  def set_defaults(changeset, :update, lazy?) do
    with_static_defaults =
      changeset.resource
      |> Ash.Resource.Info.static_default_attributes(:update)
      |> Enum.reduce(changeset, fn attribute, changeset ->
        if changing_attribute?(changeset, attribute.name) do
          changeset
        else
          changeset
          |> force_change_attribute(attribute.name, default(:update, attribute))
          |> Map.update!(:defaults, fn defaults ->
            [attribute.name | defaults]
          end)
        end
      end)

    if lazy? do
      set_lazy_defaults(with_static_defaults, :update)
    else
      with_static_defaults
    end
    |> Map.update!(:defaults, &Enum.uniq/1)
  end

  def set_defaults(changeset, _, _) do
    changeset
  end

  defp set_lazy_defaults(changeset, type) do
    changeset
    |> set_lazy_non_matching_defaults(type)
    |> set_lazy_matching_defaults(type)
  end

  defp set_lazy_non_matching_defaults(changeset, type) do
    changeset.resource
    |> Ash.Resource.Info.lazy_non_matching_default_attributes(type)
    |> Enum.reduce(changeset, fn attribute, changeset ->
      if changing_attribute?(changeset, attribute.name) do
        changeset
      else
        changeset
        |> force_change_attribute(attribute.name, default(type, attribute))
        |> Map.update!(:defaults, fn defaults ->
          [attribute.name | defaults]
        end)
      end
    end)
  end

  defp set_lazy_matching_defaults(changeset, type) do
    changeset.resource
    |> Ash.Resource.Info.lazy_matching_default_attributes(type)
    |> Enum.group_by(fn attribute ->
      case type do
        :create ->
          attribute.default

        :update ->
          attribute.update_default
      end
    end)
    |> Enum.reduce(changeset, fn {default_fun, attributes}, changeset ->
      default_value =
        case default_fun do
          function when is_function(function) ->
            function.()

          {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
            apply(m, f, a)
        end

      Enum.reduce(attributes, changeset, fn attribute, changeset ->
        if changing_attribute?(changeset, attribute.name) do
          changeset
        else
          changeset
          |> force_change_attribute(attribute.name, default_value)
          |> Map.update!(:defaults, fn defaults ->
            [attribute.name | defaults]
          end)
        end
      end)
    end)
  end

  defp default(:create, %{default: {mod, func, args}}), do: apply(mod, func, args)
  defp default(:create, %{default: function}) when is_function(function, 0), do: function.()
  defp default(:create, %{default: value}), do: value

  defp default(:update, %{update_default: {mod, func, args}}), do: apply(mod, func, args)

  defp default(:update, %{update_default: function}) when is_function(function, 0),
    do: function.()

  defp default(:update, %{update_default: value}), do: value

  defp add_validations(changeset, tracer, metadata, actor) do
    if changeset.action.skip_global_validations? do
      changeset
    else
      changeset.resource
      # We use the `changeset.action_type` to support soft deletes
      # Because a delete is an `update` with an action type of `update`
      |> Ash.Resource.Info.validations(changeset.action_type)
      |> then(fn validations ->
        if changeset.action.delay_global_validations? do
          Enum.map(validations, &%{&1 | before_action?: true})
        else
          validations
        end
      end)
      |> Enum.reduce(changeset, &validate(&2, &1, tracer, metadata, actor))
    end
  end

  defp validate(changeset, validation, tracer, metadata, actor) do
    if validation.module.has_validate?() &&
         Enum.all?(validation.where, fn {module, _} ->
           module.has_validate?()
         end) do
      if validation.before_action? do
        before_action(changeset, fn changeset ->
          if validation.only_when_valid? and not changeset.valid? do
            changeset
          else
            do_validation(changeset, validation, tracer, metadata, actor)
          end
        end)
      else
        if validation.only_when_valid? and not changeset.valid? do
          changeset
        else
          do_validation(changeset, validation, tracer, metadata, actor)
        end
      end
    else
      if changeset.action.type == :create do
        Ash.Changeset.add_error(
          changeset,
          Ash.Error.Framework.CanNotBeAtomic.exception(
            resource: changeset.resource,
            change: validation.module,
            reason: "Create actions cannot be made atomic"
          )
        )
      else
        context = %{
          actor: changeset.context[:private][:actor],
          tenant: changeset.tenant,
          authorize?: changeset.context[:private][:authorize?] || false,
          tracer: changeset.context[:private][:tracer]
        }

        case run_atomic_validation(changeset, validation, context) do
          {:not_atomic, reason} ->
            Ash.Changeset.add_error(
              changeset,
              "Validation #{validation.module} must be run atomically, but it could not be: #{reason}"
            )

          changeset ->
            changeset
        end
      end
    end
  end

  defp do_validation(changeset, validation, tracer, metadata, actor) do
    context = %{
      actor: changeset.context[:private][:actor],
      tenant: changeset.tenant,
      authorize?: changeset.context[:private][:authorize?] || false,
      tracer: changeset.context[:private][:tracer]
    }

    if Enum.all?(validation.where || [], fn {module, opts} ->
         opts =
           Ash.Expr.fill_template(
             opts,
             actor: actor,
             tenant: changeset.to_tenant,
             args: changeset.arguments,
             context: changeset.context,
             changeset: changeset
           )

         case module.init(opts) do
           {:ok, opts} ->
             module.validate(changeset, opts, struct(Ash.Resource.Validation.Context, context)) ==
               :ok

           _ ->
             false
         end
       end) do
      Ash.Tracer.span :validation, fn -> "validate: #{inspect(validation.module)}" end, tracer do
        Ash.Tracer.telemetry_span [:ash, :validation], fn ->
          %{
            resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
            validation: inspect(validation.module)
          }
        end do
          Ash.Tracer.set_metadata(tracer, :validation, metadata)

          opts =
            Ash.Expr.fill_template(
              validation.opts,
              actor: actor,
              tenant: changeset.to_tenant,
              args: changeset.arguments,
              context: changeset.context,
              changeset: changeset
            )

          with {:ok, opts} <- validation.module.init(opts),
               :ok <-
                 validation.module.validate(
                   changeset,
                   opts,
                   struct(
                     Ash.Resource.Validation.Context,
                     Map.put(context, :message, validation.message)
                   )
                 ) do
            changeset
          else
            :ok ->
              changeset

            {:error, error} when is_binary(error) ->
              Ash.Changeset.add_error(changeset, validation.message || error)

            {:error, error} when is_exception(error) ->
              if validation.message do
                error = override_validation_message(error, validation.message)
                Ash.Changeset.add_error(changeset, error)
              else
                Ash.Changeset.add_error(changeset, error)
              end

            {:error, errors} when is_list(errors) ->
              if validation.message do
                errors =
                  Enum.map(errors, fn error ->
                    override_validation_message(error, validation.message)
                  end)

                Ash.Changeset.add_error(changeset, errors)
              else
                Ash.Changeset.add_error(changeset, errors)
              end

            {:error, error} ->
              error =
                if Keyword.keyword?(error) do
                  Keyword.put(error, :message, validation.message || error[:message])
                else
                  validation.message || error
                end

              Ash.Changeset.add_error(changeset, error)
          end
        end
      end
    else
      changeset
    end
  end

  @doc false
  def override_validation_message(error, message) do
    case error do
      %{field: field} = error when not is_nil(field) ->
        error
        |> Map.take([:field, :vars])
        |> Map.to_list()
        |> Keyword.put(:message, message)
        |> Keyword.put(:value, Map.get(error, :value))
        |> InvalidAttribute.exception()

      %{fields: fields} when fields not in [nil, []] ->
        error
        |> Map.take([:fields, :vars])
        |> Map.to_list()
        |> Keyword.put(:message, message)
        |> Keyword.put(:value, Map.get(error, :value))
        |> InvalidChanges.exception()

      _ ->
        message
    end
  end

  @doc false
  def require_values(changeset, action_type, private_and_belongs_to? \\ false, attrs \\ nil)

  def require_values(changeset, :create, private_and_belongs_to?, attrs) do
    attributes =
      attrs ||
        attributes_to_require(changeset.resource, changeset.action, private_and_belongs_to?)

    attributes
    |> Enum.map(fn
      attribute when is_struct(attribute, Ash.Resource.Attribute) ->
        attribute

      name when is_atom(name) ->
        Ash.Resource.Info.attribute(changeset.resource, name)
    end)
    |> then(fn attributes ->
      if private_and_belongs_to? do
        attributes
      else
        Enum.reject(attributes, &belongs_to_attr_of_rel_being_managed?(&1.name, changeset))
      end
    end)
    |> Enum.reduce(changeset, fn required_attribute, changeset ->
      if changing_attribute?(changeset, required_attribute.name) do
        if is_nil(get_attribute(changeset, required_attribute.name)) do
          if required_attribute.name in changeset.invalid_keys do
            changeset
          else
            add_required_attribute_error(changeset, required_attribute)
          end
        else
          changeset
        end
      else
        if is_nil(required_attribute.default) ||
             required_attribute.name in changeset.action.require_attributes do
          if required_attribute.name in changeset.invalid_keys do
            changeset
          else
            add_required_attribute_error(changeset, required_attribute)
          end
        else
          changeset
        end
      end
    end)
  end

  def require_values(changeset, :update, private_and_belongs_to?, attrs) do
    attributes =
      attrs ||
        attributes_to_require(changeset.resource, changeset.action, private_and_belongs_to?)

    attributes
    |> Enum.map(fn
      attribute when is_struct(attribute, Ash.Resource.Attribute) ->
        attribute

      name when is_atom(name) ->
        Ash.Resource.Info.attribute(changeset.resource, name)
    end)
    |> then(fn attributes ->
      if private_and_belongs_to? do
        attributes
      else
        Enum.reject(attributes, &belongs_to_attr_of_rel_being_managed?(&1.name, changeset))
      end
    end)
    |> Enum.reduce(changeset, fn required_attribute, changeset ->
      setting? =
        Map.has_key?(changeset.attributes, required_attribute.name) ||
          Keyword.has_key?(changeset.atomics, required_attribute.name) ||
          Map.has_key?(changeset.casted_attributes, required_attribute.name)

      if setting? do
        if is_nil(get_attribute(changeset, required_attribute.name)) do
          if required_attribute.name in changeset.invalid_keys do
            changeset
          else
            add_required_attribute_error(changeset, required_attribute)
          end
        else
          changeset
        end
      else
        changeset
      end
    end)
  end

  def require_values(changeset, _, _, _), do: changeset

  defp add_required_attribute_error(changeset, required_attribute) do
    changeset.resource
    |> Ash.Resource.Info.relationships()
    |> Enum.find(&(&1.type == :belongs_to && &1.source_attribute == required_attribute.name))
    |> case do
      nil ->
        add_error(
          changeset,
          Required.exception(
            resource: changeset.resource,
            field: required_attribute.name,
            type: :attribute
          )
        )

      %{name: name} ->
        if required_attribute.name in changeset.action.accept do
          add_error(
            changeset,
            Required.exception(
              resource: changeset.resource,
              field: required_attribute.name,
              type: :attribute
            )
          )
        else
          add_error(
            changeset,
            Required.exception(
              resource: changeset.resource,
              field: name,
              type: :relationship
            )
          )
        end
    end
  end

  defp is_belongs_to_rel_being_managed?(attribute, changeset, only_if_relating?) do
    Enum.any?(changeset.relationships, fn
      {key, [{rels, _}]} ->
        relationship = Ash.Resource.Info.relationship(changeset.resource, key)

        relationship.type == :belongs_to && relationship.name == attribute &&
          (not only_if_relating? || rels != [])

      {_key, list} when is_list(list) ->
        false
    end)
  end

  defp belongs_to_attr_of_rel_being_managed?(attribute, changeset, only_if_relating? \\ false) do
    do_belongs_to_attr_of_rel_being_managed?(changeset, attribute, only_if_relating?) ||
      belongs_to_attr_of_being_managed_through?(changeset, attribute, only_if_relating?)
  end

  defp do_belongs_to_attr_of_rel_being_managed?(changeset, attribute, only_if_relating?) do
    Enum.any?(changeset.relationships, fn
      {key, [{rels, opts}]} ->
        if attribute == opts[:order_is_key] do
          true
        else
          relationship = Ash.Resource.Info.relationship(changeset.resource, key)

          relationship.type == :belongs_to && relationship.source_attribute == attribute &&
            (not only_if_relating? || rels != [])
        end

      {_key, list} when is_list(list) ->
        false
    end)
  end

  defp belongs_to_attr_of_being_managed_through?(
         %{context: %{accessing_from: %{unrelating?: true}}},
         _attribute,
         true
       ) do
    false
  end

  defp belongs_to_attr_of_being_managed_through?(
         %{context: %{accessing_from: %{source: source, name: relationship} = accessing_from}},
         attribute,
         _
       ) do
    with opts when not is_nil(opts) <- accessing_from[:manage_relationship_opts],
         key when not is_nil(key) <- opts[:order_is_key],
         true <- key == attribute do
      true
    else
      _ ->
        case Ash.Resource.Info.relationship(source, relationship) do
          %{type: :belongs_to} ->
            false

          relationship ->
            relationship.destination_attribute == attribute
        end
    end
  end

  defp belongs_to_attr_of_being_managed_through?(_, _, _), do: false

  # Attributes that are private and/or are the source field of a belongs_to relationship
  # are typically not set by input, so they aren't required until the actual action
  # is run.
  defp attributes_to_require(resource, _action, true = _final?) do
    Ash.Resource.Info.attributes_to_require(resource)
  end

  defp attributes_to_require(resource, action, false = _final?) do
    Ash.Resource.Info.attributes_to_require(resource, action.name)
  end

  @doc """
  Wraps a function in the before/after action hooks of a changeset.

  The function takes a changeset and if it returns
  `{:ok, result}`, the result will be passed through the after
  action hooks.
  """
  @spec with_hooks(
          t(),
          (t() ->
             {:ok, term, %{notifications: list(Ash.Notifier.Notification.t())}}
             | {:error, term}),
          Keyword.t()
        ) ::
          {:ok, term, t(), %{notifications: list(Ash.Notifier.Notification.t())}} | {:error, term}
  def with_hooks(changeset, func, opts \\ [])

  def with_hooks(changeset, _func, _opts) when changeset.valid? == false do
    {:error, changeset.errors}
  end

  def with_hooks(changeset, func, opts) do
    data_layer_prefers_transaction? = Ash.DataLayer.prefer_transaction?(changeset.resource)
    # We check if *all* hooks that *could* add transaction hooks are empty
    # Later before starting the transaction we do the same but only checking
    # the actual hooks
    prefer_transaction? =
      if !(changeset.action && changeset.action.manual) &&
           Enum.empty?(changeset.before_transaction) && Enum.empty?(changeset.around_transaction) &&
           Enum.empty?(changeset.before_action) && Enum.empty?(changeset.after_action) &&
           Enum.empty?(changeset.around_action) && Enum.empty?(changeset.relationships) do
        data_layer_prefers_transaction?
      else
        true
      end

    if prefer_transaction? && opts[:transaction?] &&
         Ash.DataLayer.data_layer_can?(changeset.resource, :transact) do
      transaction_hooks(changeset, fn changeset ->
        resources =
          changeset.resource
          |> List.wrap()
          |> Enum.concat(changeset.action.touches_resources)
          |> Enum.uniq()

        notify? = !Process.put(:ash_started_transaction?, true)

        resources = Enum.reject(resources, &Ash.DataLayer.in_transaction?/1)

        try do
          resources
          |> Ash.DataLayer.transaction(
            fn ->
              case run_around_actions(changeset, func) do
                {:error, error} ->
                  if opts[:rollback_on_error?] do
                    Ash.DataLayer.rollback(
                      changeset.resource,
                      error
                    )
                  else
                    {:error, error}
                  end

                other ->
                  other
              end
            end,
            changeset.timeout || :infinity,
            Map.put(
              opts[:transaction_metadata],
              :data_layer_context,
              changeset.context[:data_layer] || %{}
            )
          )
          |> case do
            {:ok, {:ok, value, changeset, instructions}} ->
              {:ok, value, changeset, Map.put(instructions, :gather_notifications?, notify?)}

            {:ok, {:error, error}} ->
              {:error, error}

            {:error, error} ->
              {:error, error}
          end
        after
          if notify? do
            Process.delete(:ash_started_transaction?)
          end
        end
      end)
    else
      if changeset.timeout do
        Ash.ProcessHelpers.task_with_timeout(
          fn ->
            transaction_hooks(changeset, fn changeset ->
              run_around_actions(changeset, func)
            end)
          end,
          changeset.resource,
          changeset.timeout,
          fn -> "#{inspect(changeset.resource)}.#{changeset.action.name}" end,
          opts[:tracer]
        )
      else
        transaction_hooks(changeset, fn changeset ->
          run_around_actions(changeset, func)
        end)
      end
    end
    |> case do
      {:ok, value, changeset, instructions} ->
        if opts[:return_notifications?] do
          {:ok, value, changeset, instructions}
        else
          if Process.get(:ash_started_transaction?) do
            current_notifications = List.wrap(Process.get(:ash_notifications, []))

            Process.put(
              :ash_notifications,
              current_notifications ++ List.wrap(instructions[:notifications])
            )

            {:ok, value, changeset, Map.put(instructions, :notifications, [])}
          else
            notifications =
              instructions[:notifications] || []

            notifications =
              if instructions[:gather_notifications?] do
                Enum.concat(List.wrap(Process.delete(:ash_notifications) || []), notifications)
              else
                notifications
              end

            {:ok, value, changeset,
             Map.put(instructions, :notifications, Ash.Notifier.notify(notifications))}
          end
        end

      other ->
        other
    end
  end

  defp warn_on_transaction_hooks(_, [], _), do: :ok

  defp warn_on_transaction_hooks(changeset, _, type) do
    if Application.get_env(:ash, :warn_on_transaction_hooks?) != false &&
         changeset.context[:warn_on_transaction_hooks?] != false &&
         Ash.DataLayer.in_transaction?(changeset.resource) &&
         (changeset.before_transaction != [] or changeset.around_transaction != []) do
      message =
        if type in ["before_transaction", "around_transaction"] do
          "already"
        else
          "still"
        end

      Logger.warning("""
      One or more `#{type}` hooks on `#{inspect(changeset.resource)}.#{changeset.action.name}` are being executed,
      but there is an ongoing transaction #{message} happening.

      This means that you may be running an action in a transaction that you did not design with the intent of running in a surrounding transaction.
      You should either

      1. If you are testing, and your data layer runs in a transaction/sandbox mode, set `config :ash, warn_on_transaction_hooks?: false` in `config/test.exs`
      2. Create another action that is safe to use in a surrounding transaction, and use that instead of this one
      3. Silence this warning using `set_context(%{warn_on_transaction_hooks?: false})` in the action definition
      4. If building a changeset manually, do #2 except programmatically, `Ash.Changeset.set_context(changeset, %{warn_on_transaction_hooks?: false})`
      """)
    end
  end

  defp transaction_hooks(changeset, func) do
    warn_on_transaction_hooks(changeset, changeset.around_transaction, "around_transaction")

    run_around_transaction_hooks(changeset, fn changeset ->
      warn_on_transaction_hooks(changeset, changeset.before_transaction, "before_transaction")

      changeset_result =
        try do
          {:changeset, run_before_transaction_hooks(changeset)}
        rescue
          exception ->
            {:raise, exception, __STACKTRACE__}
        catch
          :exit, reason ->
            {:exit, reason}
        end

      case changeset_result do
        {:changeset, %{valid?: true} = changeset} ->
          result =
            try do
              func.(clear_phase(changeset))
            rescue
              exception ->
                {:raise, exception, __STACKTRACE__}
            catch
              :exit, reason ->
                {:exit, reason}
            end

          case result do
            {:exit, reason} ->
              error = Ash.Error.to_ash_error(reason)

              case run_after_transactions({:error, error}, changeset) do
                {:ok, result} ->
                  {:ok, result, %{}}

                {:error, new_error} when new_error == error ->
                  exit(reason)

                {:error, new_error} ->
                  exit(new_error)
              end

            {:raise, exception, stacktrace} ->
              case run_after_transactions({:error, exception}, changeset) do
                {:ok, result} ->
                  {:ok, result, changeset, %{}}

                {:error, error} ->
                  reraise error, stacktrace
              end

            {:ok, result, changeset, notifications} ->
              case run_after_transactions({:ok, result}, changeset) do
                {:ok, result} ->
                  {:ok, result, changeset, notifications}

                {:error, error} ->
                  {:error, error}
              end

            {:ok, result, notifications} ->
              case run_after_transactions({:ok, result}, changeset) do
                {:ok, result} ->
                  {:ok, result, changeset, notifications}

                {:error, error} ->
                  {:error, error}
              end

            {:error, error} ->
              case run_after_transactions({:error, error}, changeset) do
                {:ok, result} ->
                  {:ok, result, changeset, %{}}

                {:error, error} ->
                  {:error, error}
              end
          end

        {:changeset, changeset} ->
          case run_after_transactions(
                 {:error, Ash.Error.to_error_class(changeset.errors)},
                 changeset
               ) do
            {:ok, result} ->
              {:ok, result, changeset, %{}}

            {:error, error} ->
              {:error, error}
          end

        {:exit, reason} ->
          error = Ash.Error.to_ash_error(reason)

          case run_after_transactions({:error, error}, changeset) do
            {:ok, result} ->
              {:ok, result, %{}}

            {:error, new_error} when new_error == error ->
              exit(reason)

            {:error, new_error} ->
              exit(new_error)
          end

        {:raise, exception, stacktrace} ->
          case run_after_transactions({:error, exception}, changeset) do
            {:ok, result} ->
              {:ok, result, changeset, %{}}

            {:error, error} ->
              reraise error, stacktrace
          end
      end
    end)
  end

  defp run_around_transaction_hooks(%{around_transaction: []} = changeset, func),
    do: func.(changeset)

  defp run_around_transaction_hooks(%{around_transaction: [around | rest]} = changeset, func) do
    changeset
    |> set_phase(:around_transaction)
    |> around.(fn changeset ->
      run_around_transaction_hooks(%{changeset | around_transaction: rest}, func)
    end)
  end

  def run_before_transaction_hooks(changeset) do
    Enum.reduce_while(
      changeset.before_transaction,
      set_phase(changeset, :before_transaction),
      fn before_transaction, changeset ->
        metadata = fn ->
          %{
            domain: changeset.domain,
            resource: changeset.resource,
            resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
            actor: changeset.context[:private][:actor],
            tenant: changeset.context[:private][:tenant],
            action: changeset.action && changeset.action.name,
            authorize?: changeset.context[:private][:authorize?]
          }
        end

        tracer = changeset.context[:private][:tracer]

        result =
          Ash.Tracer.span :before_transaction,
                          "before_transaction",
                          tracer do
            Ash.Tracer.set_metadata(tracer, :before_transaction, metadata)

            Ash.Tracer.telemetry_span [:ash, :before_transaction], metadata do
              before_transaction.(changeset)
            end
          end

        case result do
          {:error, error} ->
            {:halt, {:error, error}}

          changeset ->
            cont =
              if changeset.valid? do
                :cont
              else
                :halt
              end

            {cont, changeset}
        end
      end
    )
  end

  @doc false
  def run_before_actions(%{before_action: []} = changeset), do: {changeset, %{notifications: []}}

  def run_before_actions(%{valid?: false} = changeset), do: changeset

  def run_before_actions(changeset) do
    can_do_atomic? = data_layer_can_do_atomic_for_changest?(changeset)

    Enum.reduce_while(
      changeset.before_action,
      {changeset, %{notifications: []}},
      fn before_action, {changeset, instructions} ->
        metadata = fn ->
          %{
            domain: changeset.domain,
            resource: changeset.resource,
            resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
            actor: changeset.context[:private][:actor],
            tenant: changeset.context[:private][:actor],
            action: changeset.action && changeset.action.name,
            authorize?: changeset.context[:private][:authorize?]
          }
        end

        tracer = changeset.context[:private][:tracer]

        result =
          Ash.Tracer.span :before_action,
                          "before_action",
                          tracer do
            Ash.Tracer.set_metadata(tracer, :before_action, metadata)

            Ash.Tracer.telemetry_span [:ash, :before_action], metadata do
              before_action.(changeset)
            end
          end

        case result do
          {:error, error} ->
            {:halt, {:error, error}}

          {changeset, %{notifications: notifications}} ->
            cont =
              if changeset.valid? do
                :cont
              else
                :halt
              end

            {cont,
             {changeset,
              %{
                instructions
                | notifications: List.wrap(instructions.notifications) ++ List.wrap(notifications)
              }}}

          changeset ->
            cont =
              if changeset.valid? do
                :cont
              else
                :halt
              end

            {cont, {changeset, instructions}}
        end
      end
    )
    |> case do
      {:error, error} ->
        {:error, error}

      {%{atomics: atomics} = changeset, _} when atomics != [] and not can_do_atomic? ->
        Ash.Changeset.add_error(
          changeset,
          Ash.Error.Invalid.AtomicsNotSupported.exception(
            resource: changeset.resource,
            action_type: changeset.action_type
          )
        )

      {%{valid?: true} = changeset, instructions} ->
        {Ash.Changeset.hydrate_atomic_refs(changeset, changeset.context[:private][:actor]),
         instructions}

      {changeset, instructions} ->
        {changeset, instructions}
    end
  end

  @doc false
  def run_after_transactions(result, changeset) do
    warn_on_transaction_hooks(changeset, changeset.after_transaction, "after_transaction")

    changeset = set_phase(changeset, :after_transaction)

    changeset.after_transaction
    |> Enum.reduce(
      result,
      fn after_transaction, result ->
        tracer = changeset.context[:private][:tracer]

        metadata = fn ->
          %{
            domain: changeset.domain,
            resource: changeset.resource,
            resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
            actor: changeset.context[:private][:actor],
            tenant: changeset.context[:private][:actor],
            action: changeset.action && changeset.action.name,
            authorize?: changeset.context[:private][:authorize?]
          }
        end

        Ash.Tracer.span :after_transaction,
                        "after_transaction",
                        tracer do
          Ash.Tracer.set_metadata(tracer, :after_transaction, metadata)

          Ash.Tracer.telemetry_span [:ash, :after_transaction], metadata do
            after_transaction.(changeset, result)
          end
        end
      end
    )
    |> case do
      {:ok, new_result} ->
        {:ok, new_result}

      {:error, error} ->
        {:error, error}
    end
  end

  defp run_around_actions(%{around_action: []} = changeset, func) do
    changeset =
      changeset
      |> set_phase(:before_action)

    result =
      if changeset.atomics != [] && !data_layer_can_do_atomic_for_changest?(changeset) do
        {:error,
         Ash.Error.Invalid.AtomicsNotSupported.exception(
           resource: changeset.resource,
           action_type: changeset.action_type
         )}
      else
        run_before_actions(changeset)
      end

    case result do
      {:error, error} ->
        {:error, error}

      {changeset, %{notifications: before_action_notifications}} ->
        changed? =
          Ash.Changeset.changing_attributes?(changeset) or
            not Enum.empty?(changeset.atomics)

        changeset =
          Ash.Changeset.put_context(changeset, :changed?, changed?)

        changeset
        |> clear_phase()
        |> func.()
        |> case do
          {:ok, result, instructions} ->
            run_after_actions(
              result,
              instructions[:new_changeset] || changeset,
              List.wrap(instructions[:notifications]) ++ List.wrap(before_action_notifications)
            )

          {:ok, result} ->
            run_after_actions(result, changeset, before_action_notifications)

          {:error, error} ->
            {:error, error}
        end
    end
  end

  defp run_around_actions(
         %{around_action: [around | rest]} = changeset,
         func
       ) do
    changeset
    |> set_phase(:around_action)
    |> around.(fn changeset ->
      run_around_actions(%{changeset | around_action: rest}, func)
    end)
  end

  @doc false
  def run_after_actions(result, changeset, before_action_notifications) do
    changeset = set_phase(changeset, :after_action)

    Enum.reduce_while(
      changeset.after_action,
      {:ok, result, changeset, %{notifications: before_action_notifications}},
      fn after_action, {:ok, result, changeset, %{notifications: notifications} = acc} ->
        tracer = changeset.context[:private][:tracer]

        metadata = fn ->
          %{
            domain: changeset.domain,
            resource: changeset.resource,
            resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
            actor: changeset.context[:private][:actor],
            tenant: changeset.context[:private][:actor],
            action: changeset.action && changeset.action.name,
            authorize?: changeset.context[:private][:authorize?]
          }
        end

        result =
          Ash.Tracer.span :after_action,
                          "after_action",
                          tracer do
            Ash.Tracer.set_metadata(tracer, :after_action, metadata)

            Ash.Tracer.telemetry_span [:ash, :after_action], metadata do
              after_action.(changeset, result)
            end
          end

        case result do
          {:ok, new_result, new_notifications} ->
            all_notifications =
              Enum.map(
                List.wrap(notifications) ++ List.wrap(new_notifications),
                fn notification ->
                  %{
                    notification
                    | resource: notification.resource || changeset.resource,
                      action:
                        notification.action ||
                          Ash.Resource.Info.action(
                            changeset.resource,
                            changeset.action,
                            changeset.action_type
                          ),
                      data: notification.data || new_result,
                      changeset: notification.changeset || changeset,
                      actor: notification.actor || changeset.context[:private][:actor]
                  }
                end
              )

            {:cont,
             {:ok, new_result, clear_phase(changeset), %{acc | notifications: all_notifications}}}

          {:ok, new_result} ->
            {:cont, {:ok, new_result, clear_phase(changeset), acc}}

          {:error, error} ->
            {:halt, {:error, error}}

          other ->
            raise """
            Invalid return value from after_action hook. Expected one of:

            * {:ok, result}
            * {:ok, result, notifications}
            * {:error, error}

            Got:

            #{inspect(other)}
            """
        end
      end
    )
  end

  defp data_layer_can_do_atomic_for_changest?(changeset) do
    ability = if changeset.action_type == :update, do: :update, else: :upsert
    Ash.DataLayer.data_layer_can?(changeset.resource, {:atomic, ability})
  end

  @doc "Gets the value of an argument provided to the changeset."
  @spec get_argument(t, atom) :: term
  def get_argument(changeset, argument) when is_atom(argument) do
    if Map.has_key?(changeset.arguments, argument) do
      Map.get(changeset.arguments, argument)
    else
      Map.get(changeset.arguments, to_string(argument))
    end
  end

  def get_argument(changeset, argument) when is_binary(argument) do
    changeset.arguments
    |> Enum.find(fn {key, _} ->
      to_string(key) == argument
    end)
    |> case do
      {_key, value} ->
        value

      _ ->
        nil
    end
  end

  @doc "Fetches the value of an argument provided to the changeset or `:error`."
  @spec fetch_argument(t, atom) :: {:ok, term} | :error
  def fetch_argument(changeset, argument) when is_atom(argument) do
    case Map.fetch(changeset.arguments, argument) do
      {:ok, value} ->
        {:ok, value}

      :error ->
        case Map.fetch(changeset.arguments, to_string(argument)) do
          {:ok, value} -> {:ok, value}
          :error -> :error
        end
    end
  end

  def fetch_argument(changeset, argument) when is_binary(argument) do
    changeset.arguments
    |> Enum.find(fn {key, _} ->
      to_string(key) == argument
    end)
    |> case do
      {_key, value} ->
        {:ok, value}

      _ ->
        :error
    end
  end

  @doc "Gets the changing value or the original value of an attribute."
  @spec get_attribute(t, atom) :: term
  def get_attribute(changeset, attribute) do
    case fetch_change(changeset, attribute) do
      {:ok, value} ->
        value

      :error ->
        get_data(changeset, attribute)
    end
  end

  @doc "Gets the value of an argument provided to the changeset, falling back to `Ash.Changeset.get_attribute/2` if nothing was provided."
  @spec get_argument_or_attribute(t, atom) :: term
  def get_argument_or_attribute(changeset, attribute) do
    case fetch_argument(changeset, attribute) do
      {:ok, value} -> value
      :error -> get_attribute(changeset, attribute)
    end
  end

  @doc "Gets the new value for an attribute, or `:error` if it is not being changed."
  @spec fetch_change(t, atom) :: {:ok, any} | :error
  def fetch_change(changeset, attribute) do
    Map.fetch(changeset.attributes, attribute)
  end

  @doc "Gets the value of an argument provided to the changeset, falling back to `Ash.Changeset.fetch_change/2` if nothing was provided."
  @spec fetch_argument_or_change(t, atom) :: {:ok, any} | :error
  def fetch_argument_or_change(changeset, attribute) do
    case fetch_argument(changeset, attribute) do
      {:ok, value} -> {:ok, value}
      :error -> fetch_change(changeset, attribute)
    end
  end

  @doc "Gets the original value for an attribute"
  @spec get_data(t, atom) :: term
  def get_data(changeset, attribute) do
    Map.get(changeset.data, attribute)
  end

  @doc """
  Puts a key/value in the changeset context that can be used later.

  Do not use the `private` key in your custom context, as that is reserved for internal use.
  """
  @spec put_context(t(), atom, term) :: t()
  def put_context(changeset, key, value) do
    set_context(changeset, %{key => value})
  end

  @spec set_tenant(t(), Ash.ToTenant.t()) :: t()
  def set_tenant(changeset, tenant) do
    %{changeset | tenant: tenant, to_tenant: Ash.ToTenant.to_tenant(tenant, changeset.resource)}
  end

  @spec timeout(t(), nil | pos_integer, nil | pos_integer) :: t()
  def timeout(changeset, timeout, default \\ nil) do
    %{changeset | timeout: timeout || default}
  end

  @doc """
  Deep merges the provided map into the changeset context that can be used later.

  Do not use the `private` key in your custom context, as that is reserved for internal use.
  """
  @spec set_context(t(), map | nil) :: t()
  def set_context(changeset, nil), do: changeset

  def set_context(changeset, map) do
    %{changeset | context: Ash.Helpers.deep_merge_maps(changeset.context, map)}
    |> store_context_changes(map)
  end

  defp store_context_changes(%{phase: :pending} = changeset, map) do
    %{changeset | context_changes: Ash.Helpers.deep_merge_maps(changeset.context_changes, map)}
  end

  defp store_context_changes(changeset, _), do: changeset

  @type manage_relationship_type ::
          :append_and_remove | :append | :remove | :direct_control | :create

  @spec manage_relationship_opts(manage_relationship_type()) :: Keyword.t()

  @doc false
  def manage_relationship_opts(:append_and_remove) do
    [
      on_lookup: :relate,
      on_no_match: :error,
      on_match: :ignore,
      on_missing: :unrelate
    ]
  end

  @doc false
  def manage_relationship_opts(:append) do
    [
      on_lookup: :relate,
      on_no_match: :error,
      on_match: :ignore,
      on_missing: :ignore
    ]
  end

  @doc false
  def manage_relationship_opts(:remove) do
    [
      on_no_match: :error,
      on_match: :unrelate,
      on_missing: :ignore
    ]
  end

  @doc false
  def manage_relationship_opts(:create) do
    [
      on_no_match: :create,
      on_match: :ignore
    ]
  end

  @doc false
  def manage_relationship_opts(:direct_control) do
    [
      on_lookup: :ignore,
      on_no_match: :create,
      on_match: :update,
      on_missing: :destroy
    ]
  end

  @manage_opts [
    type: [
      type: {:one_of, @manage_types},
      doc: """
      If the `type` is specified, the default values of each option is modified to match that `type` of operation.

      This allows for specifying certain operations much more succinctly. The defaults that are modified are listed below:

      - `:append_and_remove`

            [
              on_lookup: :relate,
              on_no_match: :error,
              on_match: :ignore,
              on_missing: :unrelate
            ]

      - `:append`

            [
              on_lookup: :relate,
              on_no_match: :error,
              on_match: :ignore,
              on_missing: :ignore
            ]

      - `:remove`

            [
              on_no_match: :error,
              on_match: :unrelate,
              on_missing: :ignore
            ]

      - `:direct_control`

            [
              on_lookup: :ignore,
              on_no_match: :create,
              on_match: :update,
              on_missing: :destroy
            ]

      - `:create`

            [
              on_no_match: :create,
              on_match: :ignore
            ]
      """
    ],
    authorize?: [
      type: :boolean,
      default: true,
      doc:
        "Authorize reads and changes to the destination records, if the primary change is being authorized as well."
    ],
    eager_validate_with: [
      type: :atom,
      default: false,
      doc:
        "Validates that any referenced entities exist *before* the action is being performed, using the provided domain for the read."
    ],
    on_no_match: [
      type: :any,
      default: :ignore,
      doc: """
      Instructions for handling records where no matching record existed in the relationship.

      * `:ignore` (default) - those inputs are ignored
      * `:match` - For `has_one` and `belongs_to` only, any input is treated as a match for an existing value. For `has_many` and `many_to_many`, this is the same as `:ignore`.
      * `:create` - the records are created using the destination's primary create action
      * `{:create, :action_name}` - the records are created using the specified action on the destination resource
      * `{:create, :action_name, :join_table_action_name, [:list, :of, :join_table, :params]}` - Same as `{:create, :action_name}` but takes
          the list of params specified out and applies them when creating the join record, with the provided join_table_action_name.
      * `:error`  - an error is returned indicating that a record would have been created
        *  If `on_lookup` is set, and the data contained a primary key or identity, then the error is a `NotFound` error
        * Otherwise, an `InvalidRelationship` error is returned
      """
    ],
    value_is_key: [
      type: :atom,
      doc: """
      Configures what key to use when a single value is provided.

      This is useful when you use things like a list of strings i.e `friend_emails` to manage the relationship, instead of a list of maps.

      By default, we assume it is the primary key of the destination resource, unless it is a composite primary key.
      """
    ],
    order_is_key: [
      type: :atom,
      doc: """
      If set, the order that each input appears in the list will be added to the input as this key.

      This is useful when you want to accept an ordered list of related records and write that order to the entity.
      This should only currently be used with `type: :direct_control` or `type: :create` when there are no currently
      existing related records (like when creating the source record).

      If you have an identity on the field and relationship id on the destination, and you are using
      AshPostgres, you will want to use the `deferrable` option to ensure that conflicting orders are temporarily
      allowed within a single transaction.
      """
    ],
    identity_priority: [
      type: {:list, :atom},
      doc: """
      The list, in priority order, of identities to use when looking up records for `on_lookup`, and matching records with `on_match`.

      Use `:_primary_key` to prioritize checking a match with the primary key.
      All identities, along with `:_primary_key` are checked regardless, this only allows ensuring that some are checked first.
      Defaults to the list provided by `use_identities`, so you typically won't need this option.
      """
    ],
    use_identities: [
      type: {:list, :atom},
      doc: """
      A list of identities that may be used to look up and compare records. Use `:_primary_key` to include the primary key. By default, only `[:_primary_key]` is used.
      """
    ],
    on_lookup: [
      type: :any,
      default: :ignore,
      doc: """
      Before creating a record (because no match was found in the relationship), the record can be looked up and related.

      * `:ignore` (default) - Does not look for existing entries (matches in the current relationship are still considered updates)
      * `:relate` - Same as calling `{:relate, primary_action_name}`
      * `{:relate, :action_name}` - the records are looked up by primary key/the first identity that is found (using the primary read action), and related. The action should be:
          * `many_to_many` - a create action on the join resource
          * `has_many` - an update action on the destination resource
          * `has_one` - an update action on the destination resource
          * `belongs_to` - an update action on the source resource
      * `{:relate, :action_name, :read_action_name}` - Same as the above, but customizes the read action called to search for matches.
      * `:relate_and_update` - Same as `:relate`, but the remaining parameters from the lookup are passed into the action that is used to change the relationship key
      * `{:relate_and_update, :action_name}` - Same as the above, but customizes the action used. The action should be:
          * `many_to_many` - a create action on the join resource
          * `has_many` - an update action on the destination resource
          * `has_one` - an update action on the destination resource
          * `belongs_to` - an update action on the source resource
      * `{:relate_and_update, :action_name, :read_action_name}` - Same as the above, but customizes the read action called to search for matches.
      * `{:relate_and_update, :action_name, :read_action_name, [:list, :of, :join_table, :params]}` - Same as the above, but uses the provided list of parameters when creating
          the join row (only relevant for many to many relationships). Use `:*` to *only* update the join record, and pass all parameters to its action
      """
    ],
    on_match: [
      type: :any,
      default: :ignore,
      doc: """
      Instructions for handling records where a matching record existed in the relationship already.

      * `:ignore` (default) - those inputs are ignored
      * `:update` - the record is updated using the destination's primary update action
      * `{:update, :action_name}` - the record is updated using the specified action on the destination resource
      * `{:update, :action_name, :join_table_action_name, [:list, :of, :params]}` - Same as `{:update, :action_name}` but takes
          the list of params specified out and applies them as an update to the join record (only valid for many to many)
      * `:update_join` - update only the join record (only valid for many to many)
      * `{:update_join, :join_table_action_name}` - use the specified update action on a join resource
      * `{:update_join, :join_table_action_name, [:list, :of, :params]}` - pass specified params from input into a join resource update action
      * `{:destroy, :action_name}` - the record is destroyed using the specified action on the destination resource. The action should be:
        * `many_to_many` - a destroy action on the join record
        * `has_many` - a destroy action on the destination resource
        * `has_one` - a destroy action on the destination resource
        * `belongs_to` - a destroy action on the destination resource
      * `:error`  - an error is returned indicating that a record would have been updated
      * `:no_match` - follows the `on_no_match` instructions with these records
      * `:missing` - follows the `on_missing` instructions with these records
      * `:unrelate` - the related item is not destroyed, but the data is "unrelated", making this behave like `remove_from_relationship/3`. The action should be:
        * `many_to_many` - the join resource row is destroyed
        * `has_many` - the `destination_attribute` (on the related record) is set to `nil`
        * `has_one` - the `destination_attribute` (on the related record) is set to `nil`
        * `belongs_to` - the `source_attribute` (on this record) is set to `nil`
      * `{:unrelate, :action_name}` - the record is unrelated using the provided update action. The action should be:
        * `many_to_many` - a destroy action on the join resource
        * `has_many` - an update action on the destination resource
        * `has_one` - an update action on the destination resource
        * `belongs_to` - an update action on the source resource
      """
    ],
    on_missing: [
      type: :any,
      default: :ignore,
      doc: """
      Instructions for handling records that existed in the current relationship but not in the input.

      * `:ignore` (default) - those inputs are ignored
      * `:destroy` - the record is destroyed using the destination's primary destroy action
      * `{:destroy, :action_name}` - the record is destroyed using the specified action on the destination resource
      * `{:destroy, :action_name, :join_resource_action_name}` - the record is destroyed using the specified action on the destination resource,
        but first the join resource is destroyed with its specified action
      * `:error`  - an error is returned indicating that a record would have been updated
      * `:unrelate` - the related item is not destroyed, but the data is "unrelated", making this behave like `remove_from_relationship/3`. The action should be:
        * `many_to_many` - the join resource row is destroyed
        * `has_many` - the `destination_attribute` (on the related record) is set to `nil`
        * `has_one` - the `destination_attribute` (on the related record) is set to `nil`
        * `belongs_to` - the `source_attribute` (on this record) is set to `nil`
      * `{:unrelate, :action_name}` - the record is unrelated using the provided update action. The action should be:
        * `many_to_many` - a destroy action on the join resource
        * `has_many` - an update action on the destination resource
        * `has_one` - an update action on the destination resource
        * `belongs_to` - an update action on the source resource
      """
    ],
    error_path: [
      type: :any,
      doc: """
      By default, errors added to the changeset will use the path `[:relationship_name]`, or `[:relationship_name, <index>]`.
      If you want to modify this path, you can specify `error_path`, e.g if had a `change` on an action that takes an argument
      and uses that argument data to call `manage_relationship`, you may want any generated errors to appear under the name of that
      argument, so you could specify `error_path: :argument_name` when calling `manage_relationship`.
      """
    ],
    join_keys: [
      type: {:list, :atom},
      doc: """
      For many to many relationships specifies the parameters to pick from the input and pass into a join resource action.
      Applicable in cases like `on_no_match: :create`, `on_match: :update` and `on_lookup: :relate`.
      Can be overwritten by a full form instruction tuple which contains join parameters at the end.
      """
    ],
    meta: [
      type: :any,
      doc: """
      Freeform data that will be retained along with the options, which can be used to track/manage the changes
      that are added to the `relationships` key. Use the `meta[:order]` option to specify the order in which multiple
      calls to `manage_relationship` should be executed.
      """
    ],
    ignore?: [
      type: :any,
      default: false,
      doc: """
      This tells Ash to ignore the provided inputs when actually running the action. This can be useful for
      building up a set of instructions that you intend to handle manually.
      """
    ]
  ]

  @doc false
  def manage_relationship_schema, do: @manage_opts

  manage_opts = @manage_opts

  defmodule ManageRelationshipOpts do
    @moduledoc false

    use Spark.Options.Validator, schema: manage_opts
  end

  @doc """
  Manages the related records by creating, updating, or destroying them as necessary.

  Keep in mind that the default values for all `on_*` are `:ignore`, meaning nothing will happen
  unless you provide instructions.

  The input provided to `manage_relationship` should be a map, in the case of to_one relationships, or a list of maps
  in the case of to_many relationships. The following steps are followed for each input provided:

  - The input is checked against the currently related records to find any matches. The primary key and unique identities are used to find matches.
  - For any input that had a match in the current relationship, the `:on_match` behavior is triggered
  - For any input that does not have a match:
    - if there is `on_lookup` behavior:
      - we try to find the record in the data layer.
      - if the record is found, the on_lookup behavior is triggered
      - if the record is not found, the `on_no_match` behavior is triggered
    - if there is no `on_lookup` behavior:
      - the `on_no_match` behavior is triggered
  - finally, for any records present in the *current relationship* that had no match *in the input*, the `on_missing` behavior is triggered

  ## Options

  #{Spark.Options.docs(@manage_opts)}

  Each call to this function adds new records that will be handled according to their options. For example,
  if you tracked "tags to add" and "tags to remove" in separate fields, you could input them like so:

  ```elixir
  changeset
  |> Ash.Changeset.manage_relationship(
    :tags,
    [%{name: "backend"}],
    on_lookup: :relate, #relate that tag if it exists in the database
    on_no_match: :error # error if a tag with that name doesn't exist
  )
  |> Ash.Changeset.manage_relationship(
    :tags,
    [%{name: "frontend"}],
    on_no_match: :error, # error if a tag with that name doesn't exist in the relationship
    on_match: :unrelate # if a tag with that name is related, unrelate it
  )
  ```

  When calling this multiple times with the `on_missing` option set, the list of records that are considered missing are checked
  after each set of inputs is processed. For example, if you manage the relationship once with `on_missing: :unrelate`, the records
  missing from your input will be removed, and *then* your next call to `manage_relationship` will be resolved (with those records unrelated).
  For this reason, it is suggested that you don't call this function multiple times with an `on_missing` instruction, as you may be
  surprised by the result.

  If you want the input to update existing entities, you need to ensure that the primary key (or unique identity) is provided as
  part of the input. See the example below:

      changeset
      |> Ash.Changeset.manage_relationship(
        :comments,
        [%{rating: 10, contents: "foo"}],
        on_no_match: {:create, :create_action},
        on_missing: :ignore
      )
      |> Ash.Changeset.manage_relationship(
        :comments,
        [%{id: 10, rating: 10, contents: "foo"}],
        on_match: {:update, :update_action},
        on_no_match: {:create, :create_action})

  This is a simple way to manage a relationship. If you need custom behavior, you can customize the action that is
  called, which allows you to add arguments/changes. However, at some point you may want to forego this function
  and make the changes yourself. For example:

      input = [%{id: 10, rating: 10, contents: "foo"}]

      changeset
      |> Ash.Changeset.after_action(fn _changeset, result ->
        # An example of updating comments based on a result of other changes
        for comment <- input do
          comment = Ash.get(Comment, comment.id)

          comment
          |> Map.update(:rating, 0, &(&1 * result.rating_weight))
          |> Ash.update!()
        end

        {:ok, result}
      end)

  ## Using records as input

  Records can be supplied as the input values. If you do:

  * if it would be looked up due to `on_lookup`, the record is used as-is
  * if it would be created due to `on_no_match`, the record is used as-is
  * Instead of specifying `join_keys`, those keys must go in `__metadata__.join_keys`. If `join_keys` is specified in the options, it is ignored.

  For example:

  ```elixir
  post1 =
    changeset
    |> Ash.create!()
    |> Ash.Resource.put_metadata(:join_keys, %{type: "a"})

  post2 =
    changeset2
    |> Ash.create!()
    |> Ash.Resource.put_metadata(:join_keys, %{type: "b"})

  author = Ash.create!(author_changeset)

  Ash.Changeset.manage_relationship(
    author,
    :posts,
    [post1, post2],
    on_lookup: :relate
  )
  ```
  """
  def manage_relationship(changeset, relationship, input, opts \\ [])

  def manage_relationship(changeset, relationship, "", opts) do
    manage_relationship(changeset, relationship, nil, opts)
  end

  def manage_relationship(changeset, relationship, input, opts) do
    changeset = maybe_dirty_hook(changeset, :manage_relationships)

    if opts == [] do
      IO.warn("Calling `manage_relationship` without any options will not do anything")
    end

    opts =
      if opts[:type] == :replace do
        Logger.warning(
          "`type: :replace` has been renamed to `:append_and_remove` in 2.0, and it will be removed in 2.1"
        )

        Keyword.put(opts, :type, :append_and_remove)
      else
        opts
      end

    inputs_was_list? = is_list(input)

    opts =
      if opts[:type] do
        defaults = manage_relationship_opts(opts[:type])
        Keyword.merge(defaults, opts)
      else
        opts
      end

    opts =
      opts
      |> ManageRelationshipOpts.validate!()

    keyword_opts =
      ManageRelationshipOpts.to_options(opts)

    opts =
      if opts.meta && Keyword.has_key?(opts.meta, :inputs_was_list?) do
        opts
      else
        %{opts | meta: Keyword.put(opts.meta || [], :inputs_was_list?, inputs_was_list?)}
      end

    case Ash.Resource.Info.relationship(changeset.resource, relationship) do
      nil ->
        error =
          NoSuchRelationship.exception(
            resource: changeset.resource,
            relationship: relationship
          )

        add_error(changeset, error)

      %{writable?: false} = relationship ->
        error =
          InvalidRelationship.exception(
            relationship: relationship.name,
            message: "relationship is not editable"
          )

        add_error(changeset, error)

      %{manual: manual} = relationship when not is_nil(manual) ->
        error =
          InvalidRelationship.exception(
            relationship: relationship.name,
            message: "cannot manage a manual relationship"
          )

        add_error(changeset, error)

      %{cardinality: :one, type: type} = relationship when is_list(input) and length(input) > 1 ->
        error =
          InvalidRelationship.exception(
            relationship: relationship.name,
            message: "cannot manage a #{type} relationship with a list of records"
          )

        add_error(changeset, error)

      relationship ->
        key =
          opts.value_is_key ||
            changeset.resource
            |> Ash.Resource.Info.related(relationship.name)
            |> Ash.Resource.Info.primary_key()
            |> case do
              [key] ->
                key

              _ ->
                nil
            end

        if relationship.cardinality == :many && is_map(input) && !is_struct(input) do
          case map_input_to_list(input) do
            {:ok, input} ->
              input =
                if key do
                  Enum.map(input, fn input ->
                    if is_map(input) || is_list(input) do
                      input
                    else
                      %{key => input}
                    end
                  end)
                else
                  input
                end

              manage_relationship(changeset, relationship.name, input, keyword_opts)

            :error ->
              manage_relationship(changeset, relationship.name, List.wrap(input), keyword_opts)
          end
        else
          input =
            if key do
              input
              |> List.wrap()
              |> Enum.map(fn input ->
                if !opts.value_is_key && (is_map(input) || Keyword.keyword?(input)) do
                  input
                else
                  %{key => input}
                end
              end)
            else
              input
            end
            |> List.wrap()

          input =
            if opts.order_is_key do
              input
              |> Enum.with_index()
              |> Enum.map(fn {map, index} ->
                if is_map(map) do
                  Map.put(map, opts.order_is_key, index)
                else
                  Keyword.put(map, opts.order_is_key, index)
                end
              end)
            else
              input
            end

          if Enum.any?(
               input,
               &(is_struct(&1) && Ash.Resource.Info.resource?(&1.__struct__) &&
                   &1.__struct__ != relationship.destination)
             ) do
            add_error(
              changeset,
              InvalidRelationship.exception(
                relationship: relationship.name,
                message: "cannot provide structs that don't match the destination"
              )
            )
          else
            relationships =
              changeset.relationships
              |> Map.put_new(relationship.name, [])
              |> Map.update!(relationship.name, &(&1 ++ [{input, keyword_opts}]))

            changeset = %{changeset | relationships: relationships}

            if opts.eager_validate_with do
              eager_validate_relationship_input(
                relationship,
                input,
                changeset,
                opts.eager_validate_with,
                opts.error_path || opts.meta[:id] || relationship.name,
                keyword_opts
              )
            else
              changeset
            end
          end
        end
    end
  end

  defp eager_validate_relationship_input(
         _relationship,
         [],
         _changeset,
         _domain,
         _error_path,
         _opts
       ),
       do: :ok

  defp eager_validate_relationship_input(relationship, input, changeset, domain, error_path, opts) do
    pkeys = Ash.Actions.ManagedRelationships.pkeys(relationship, opts)

    pkeys =
      Enum.map(pkeys, fn pkey ->
        Enum.map(pkey, fn key ->
          Ash.Resource.Info.attribute(relationship.destination, key)
        end)
      end)

    search =
      Enum.reduce(input, false, fn item, expr ->
        filter =
          Enum.find_value(pkeys, fn pkey ->
            this_filter =
              pkey
              |> Enum.reject(&(&1.name == relationship.destination_attribute))
              |> Enum.all?(fn key ->
                case fetch_identity_field(
                       item,
                       changeset.data,
                       key,
                       relationship
                     ) do
                  {:ok, _value} ->
                    true

                  :error ->
                    false
                end
              end)
              |> case do
                true ->
                  case Map.take(
                         item,
                         Enum.map(pkey, & &1.name) ++ Enum.map(pkey, &to_string(&1.name))
                       ) do
                    empty when empty == %{} -> nil
                    filter -> filter
                  end

                false ->
                  nil
              end

            if Enum.any?(pkey, &(&1.name == relationship.destination_attribute)) &&
                 relationship.type in [:has_many, :has_one] do
              destination_value = Map.get(changeset.data, relationship.source_attribute)

              expr(
                ^this_filter and
                  (is_nil(^ref(relationship.destination_attribute)) or
                     ^ref(relationship.destination_attribute) == ^destination_value)
              )
            else
              this_filter
            end
          end)

        if filter && filter != %{} do
          expr(^expr or ^filter)
        else
          expr
        end
      end)

    results =
      if search == false do
        {:ok, []}
      else
        action =
          relationship.read_action ||
            Ash.Resource.Info.primary_action!(relationship.destination, :read).name

        relationship.destination
        |> Ash.Query.for_read(action, %{},
          domain: domain,
          actor: changeset.context[:private][:actor],
          authorize?: changeset.context[:private][:authorize?],
          tenant: changeset.tenant
        )
        |> Ash.Query.limit(Enum.count(input))
        |> Ash.Query.do_filter(search)
        |> Ash.read()
      end

    case results do
      {:error, error} ->
        {:error, error}

      {:ok, results} ->
        case Enum.find(input, fn item ->
               no_pkey_all_matches(results, pkeys, fn result, key ->
                 case fetch_identity_field(item, changeset.data, key, relationship) do
                   {:ok, value} ->
                     Ash.Type.equal?(
                       key.type,
                       value,
                       Map.get(result, key.name)
                     )

                   :error ->
                     false
                 end
               end)
             end) do
          nil ->
            :ok

          item ->
            pkey_search =
              Enum.find_value(pkeys, fn pkey ->
                if Enum.all?(pkey, fn key ->
                     Map.has_key?(item, key.name) || Map.has_key?(item, to_string(key.name))
                   end) do
                  Map.take(item, pkey ++ Enum.map(pkey, &to_string(&1.name)))
                end
              end)

            {:error,
             Ash.Error.Query.NotFound.exception(
               primary_key: pkey_search,
               resource: relationship.destination
             )}
        end
    end
    |> case do
      :ok ->
        changeset

      {:error, error} ->
        add_error(changeset, Ash.Error.set_path(error, error_path))
    end
  end

  defp no_pkey_all_matches(results, pkeys, func) do
    !Enum.any?(results, fn result ->
      Enum.any?(pkeys, fn pkey ->
        Enum.all?(pkey, fn key ->
          func.(result, key)
        end)
      end)
    end)
  end

  defp fetch_identity_field(item, data, attribute, relationship) do
    if attribute.name == relationship.destination_attribute &&
         relationship.type in [:has_many, :has_one] do
      {:ok, Map.get(data, relationship.source_attribute)}
    else
      string_attribute = to_string(attribute.name)

      if Map.has_key?(item, attribute.name) || Map.has_key?(item, string_attribute) do
        input_value = Map.get(item, attribute.name) || Map.get(item, string_attribute)

        case Ash.Type.cast_input(attribute.type, input_value, attribute.constraints) do
          {:ok, casted_input_value} ->
            {:ok, casted_input_value}

          _ ->
            :error
        end
      else
        :error
      end
    end
  end

  defp map_input_to_list(input) when input == %{} do
    :error
  end

  defp map_input_to_list(input) do
    input
    |> Enum.reduce_while({:ok, []}, fn
      {key, value}, {:ok, acc} when is_integer(key) ->
        {:cont, {:ok, [{key, value} | acc]}}

      {key, value}, {:ok, acc} when is_binary(key) ->
        case Integer.parse(key) do
          {int, ""} ->
            {:cont, {:ok, [{int, value} | acc]}}

          _ ->
            {:halt, :error}
        end

      _, _ ->
        {:halt, :error}
    end)
    |> case do
      {:ok, value} ->
        {:ok,
         value
         |> Enum.sort_by(&elem(&1, 0))
         |> Enum.map(&elem(&1, 1))}

      :error ->
        :error
    end
  end

  @doc "Returns true if any attributes on the resource are being changed."
  @spec changing_attributes?(t()) :: boolean
  def changing_attributes?(changeset) do
    changeset.resource
    |> Ash.Resource.Info.attributes()
    |> Enum.any?(&changing_attribute?(changeset, &1.name))
  end

  @doc "Returns true if an attribute exists in the changes"
  @spec changing_attribute?(t(), atom) :: boolean
  def changing_attribute?(changeset, attribute) do
    Map.has_key?(changeset.attributes, attribute) ||
      Keyword.has_key?(changeset.atomics, attribute)
  end

  @doc "Returns true if a relationship exists in the changes"
  @spec changing_relationship?(t(), atom) :: boolean
  def changing_relationship?(changeset, relationship) do
    Map.has_key?(changeset.relationships, relationship)
  end

  @doc "Change an attribute only if is not currently being changed"
  @spec change_new_attribute(t(), atom, term) :: t()
  def change_new_attribute(changeset, attribute, value) do
    maybe_already_validated_error!(changeset, :force_change_new_attribute)

    if changing_attribute?(changeset, attribute) do
      changeset
    else
      change_attribute(changeset, attribute, value)
    end
  end

  @doc """
  Change an attribute if is not currently being changed, by calling the provided function.

  Use this if you want to only perform some expensive calculation for an attribute value
  only if there isn't already a change for that attribute.
  """
  @spec change_new_attribute_lazy(t(), atom, (-> any)) :: t()
  def change_new_attribute_lazy(changeset, attribute, func) do
    maybe_already_validated_error!(changeset, :force_change_new_attribute_lazy)

    if changing_attribute?(changeset, attribute) do
      changeset
    else
      change_attribute(changeset, attribute, func.())
    end
  end

  @doc """
  Updates an existing attribute change by applying a function to it.

  This is useful for applying some kind of normalization to the attribute.

  ```elixir
  Ash.Changeset.update_change(changeset, :serial, &String.downcase/1)
  ```

  The update function gets called with the value already cast to the correct type.

  ```elixir
  changeset
  |> Ash.Changeset.change_attribute(:integer_attribute, "3")
  |> Ash.Changeset.update_change(:integer_attribute, fn x -> x + 1 end)
  ```

  ## Invalid value handling

  If `update_change` is called with a changeset that has not been validated yet, the update
  function must handle potentially invalid and `nil` values.

  To only deal with valid values, you can call `update_change` in a `before_action` hook.
  """
  @spec update_change(t(), atom, (any -> any)) :: t()
  def update_change(changeset, attribute, fun) do
    case fetch_change(changeset, attribute) do
      {:ok, change} ->
        force_change_attribute(changeset, attribute, fun.(change))

      :error ->
        changeset
    end
  end

  @doc """
  Add an argument to the changeset, which will be provided to the action.
  """
  def set_argument(changeset, argument, value) do
    maybe_already_validated_error!(changeset, :force_set_argument)
    do_set_argument(changeset, argument, value)
  end

  @doc """
  Add a private argument to the changeset, which will be provided to the action.
  """
  @spec set_private_argument(t(), atom, term) :: t()
  def set_private_argument(changeset, argument, value) do
    do_set_private_argument(
      changeset,
      argument,
      value,
      "can't set public arguments with set_private_argument/3"
    )
  end

  defp set_private_argument_for_action(changeset, argument, value) do
    do_set_private_argument(
      changeset,
      argument,
      value,
      "can't set public arguments using the private_arguments option."
    )
  end

  defp do_set_private_argument(changeset, name, value, error_msg) do
    argument =
      Enum.find(
        changeset.action.arguments,
        &(&1.name == name || to_string(&1.name) == name)
      )

    cond do
      is_nil(argument) ->
        changeset

      argument.public? ->
        add_invalid_errors(
          value,
          :argument,
          changeset,
          argument,
          error_msg
        )

      true ->
        set_argument(changeset, name, value)
    end
  end

  @doc """
  Add an argument to the changeset, which will be provided to the action.

  Does not show a warning when used in before/after action hooks.
  """
  def force_set_argument(changeset, argument, value) do
    do_set_argument(changeset, argument, value)
  end

  defp do_set_argument(changeset, argument, value, store_casted? \\ false) do
    if changeset.action do
      argument =
        Enum.find(
          changeset.action.arguments,
          &(&1.name == argument || to_string(&1.name) == argument)
        )

      if argument do
        with value <- Ash.Type.Helpers.handle_indexed_maps(argument.type, value),
             constraints <-
               Ash.Type.include_source(argument.type, changeset, argument.constraints),
             {:ok, casted} <-
               Ash.Type.cast_input(argument.type, value, constraints),
             {:constrained, {:ok, casted}, _last_val} when not is_nil(casted) <-
               {:constrained, Ash.Type.apply_constraints(argument.type, casted, constraints),
                casted} do
          %{changeset | arguments: Map.put(changeset.arguments, argument.name, casted)}
          |> store_casted_argument(argument.name, casted, store_casted?)
        else
          {:constrained, {:ok, nil}, _} ->
            %{changeset | arguments: Map.put(changeset.arguments, argument.name, nil)}
            |> store_casted_argument(argument.name, nil, store_casted?)

          {:constrained, {:error, error}, last_val} ->
            add_invalid_errors(value, :argument, changeset, argument, error)
            |> store_casted_argument(argument.name, last_val, store_casted?)

          {:error, error} ->
            add_invalid_errors(value, :argument, changeset, argument, error)
        end
      else
        %{changeset | arguments: Map.put(changeset.arguments, argument, value)}
        |> store_casted_argument(argument, value, store_casted?)
      end
    else
      %{changeset | arguments: Map.put(changeset.arguments, argument, value)}
      |> store_casted_argument(argument, value, store_casted?)
    end
  end

  defp store_casted_argument(changeset, name, value, true) do
    %{
      changeset
      | casted_arguments: Map.put(changeset.casted_arguments, name, value)
    }
  end

  defp store_casted_argument(changeset, _name, _value, _store_casted?) do
    changeset
  end

  @doc """
  Remove an argument from the changeset
  """
  def delete_argument(changeset, argument_or_arguments) do
    maybe_already_validated_error!(changeset)

    argument_or_arguments
    |> List.wrap()
    |> Enum.reduce(changeset, fn argument, changeset ->
      %{changeset | arguments: Map.delete(changeset.arguments, argument)}
    end)
  end

  @doc """
  Remove an argument from the changeset, not warning if the changeset has already been validated.
  """
  def force_delete_argument(changeset, argument_or_arguments) do
    argument_or_arguments
    |> List.wrap()
    |> Enum.reduce(changeset, fn argument, changeset ->
      %{changeset | arguments: Map.delete(changeset.arguments, argument)}
    end)
  end

  @doc """
  Merge a map of arguments to the arguments list.
  """
  def set_arguments(changeset, map) do
    Enum.reduce(map, changeset, fn {key, value}, changeset ->
      set_argument(changeset, key, value)
    end)
  end

  @doc """
  Merge a map of arguments to the arguments list.

  Does not show a warning when used in before/after action hooks.
  """
  def force_set_arguments(changeset, map) do
    Enum.reduce(map, changeset, fn {key, value}, changeset ->
      force_set_argument(changeset, key, value)
    end)
  end

  @doc """
  Force change an attribute if it is not currently being changed.

  See `change_new_attribute/3` for more.
  """
  @spec force_change_new_attribute(t(), atom, term) :: t()
  def force_change_new_attribute(changeset, attribute, value) do
    if changing_attribute?(changeset, attribute) do
      changeset
    else
      force_change_attribute(changeset, attribute, value)
    end
  end

  @doc """
  Force change an attribute if it is not currently being changed, by calling the provided function.

  See `change_new_attribute_lazy/3` for more.
  """
  @spec force_change_new_attribute_lazy(t(), atom, (-> any)) :: t()
  def force_change_new_attribute_lazy(changeset, attribute, func) do
    if changing_attribute?(changeset, attribute) do
      changeset
    else
      force_change_attribute(changeset, attribute, func.())
    end
  end

  @doc "Calls `change_attribute/3` for each key/value pair provided."
  @spec change_attributes(t(), map | Keyword.t()) :: t()
  def change_attributes(changeset, changes) do
    maybe_already_validated_error!(changeset, :force_change_attributes)

    Enum.reduce(changes, changeset, fn {key, value}, changeset ->
      change_attribute(changeset, key, value)
    end)
  end

  @doc "Adds a change to the changeset, unless the value matches the existing value."
  @spec change_attribute(t(), atom, any) :: t()
  def change_attribute(changeset, attribute, value) do
    maybe_already_validated_error!(changeset, :force_change_attribute)
    do_change_attribute(changeset, attribute, value)
  end

  defp do_change_attribute(changeset, attribute, value, store_casted? \\ false) do
    case Ash.Resource.Info.attribute(changeset.resource, attribute) do
      nil ->
        error =
          NoSuchAttribute.exception(
            resource: changeset.resource,
            attribute: attribute
          )

        add_error(changeset, error)

      %{writable?: false} = attribute ->
        add_invalid_errors(value, :attribute, changeset, attribute, "Attribute is not writable")

      attribute ->
        with value <- Ash.Type.Helpers.handle_indexed_maps(attribute.type, value),
             constraints <-
               Ash.Type.include_source(attribute.type, changeset, attribute.constraints),
             {{:ok, prepared}, _} <-
               {prepare_change(changeset, attribute, value, constraints), value},
             {{:ok, casted}, _} <-
               {Ash.Type.cast_input(
                  attribute.type,
                  prepared,
                  constraints
                ), prepared},
             {{:ok, casted}, _} <-
               {handle_change(
                  changeset,
                  attribute,
                  casted,
                  constraints
                ), casted},
             {{:ok, casted}, _} <-
               {Ash.Type.apply_constraints(attribute.type, casted, constraints), casted} do
          data_value =
            if changeset.action_type != :create do
              case changeset.data do
                %Ash.Changeset.OriginalDataNotAvailable{} ->
                  nil

                data ->
                  Map.get(data, attribute.name)
              end
            end
            |> case do
              %Ash.ForbiddenField{} -> nil
              %Ash.NotLoaded{} -> nil
              v -> v
            end

          changeset = remove_default(changeset, attribute.name)

          cond do
            changeset.action_type == :create ->
              %{
                changeset
                | attributes: Map.put(changeset.attributes, attribute.name, casted),
                  defaults: changeset.defaults -- [attribute.name]
              }
              |> store_casted_attribute(attribute.name, casted, store_casted?)

            match?(%OriginalDataNotAvailable{}, changeset.data) ->
              %{
                changeset
                | attributes: Map.put(changeset.attributes, attribute.name, casted),
                  defaults: changeset.defaults -- [attribute.name]
              }
              |> store_casted_attribute(attribute.name, casted, store_casted?)

            is_nil(data_value) and is_nil(casted) ->
              %{
                changeset
                | attributes: Map.delete(changeset.attributes, attribute.name),
                  defaults: changeset.defaults -- [attribute.name]
              }
              |> store_casted_attribute(attribute.name, nil, store_casted?)

            Ash.Type.equal?(attribute.type, casted, data_value) ->
              %{
                changeset
                | attributes: Map.delete(changeset.attributes, attribute.name),
                  defaults: changeset.defaults -- [attribute.name]
              }
              |> store_casted_attribute(attribute.name, casted, store_casted?)

            true ->
              %{
                changeset
                | attributes: Map.put(changeset.attributes, attribute.name, casted),
                  defaults: changeset.defaults -- [attribute.name]
              }
              |> store_casted_attribute(attribute.name, casted, store_casted?)
          end
        else
          {{:error, error_or_errors}, last_val} ->
            add_invalid_errors(value, :attribute, changeset, attribute, error_or_errors)
            |> store_casted_attribute(attribute.name, last_val, store_casted?)

          {:error, error_or_errors} ->
            add_invalid_errors(value, :attribute, changeset, attribute, error_or_errors)
        end
    end
  end

  defp store_casted_attribute(changeset, name, value, true) do
    %{changeset | casted_attributes: Map.put(changeset.casted_attributes, name, value)}
  end

  defp store_casted_attribute(changeset, _name, _value, _store_casted?) do
    changeset
  end

  @doc """
  The same as `change_attribute`, but annotates that the attribute is currently holding a default value.

  This information can be used in changes to see if a value was explicitly set or if it was set by being the default.
  Additionally, this is used in `upsert` actions to not overwrite existing values with the default.
  """
  @spec change_default_attribute(t(), atom, any) :: t()
  def change_default_attribute(changeset, attribute, value) do
    maybe_already_validated_error!(changeset)

    case Ash.Resource.Info.attribute(changeset.resource, attribute) do
      nil ->
        error =
          NoSuchAttribute.exception(
            resource: changeset.resource,
            attribute: attribute
          )

        add_error(changeset, error)

      attribute ->
        changeset
        |> change_attribute(attribute.name, value)
        |> Map.update!(:defaults, fn defaults ->
          Enum.uniq([attribute.name | defaults])
        end)
    end
  end

  @doc "Calls `force_change_attribute/3` for each key/value pair provided."
  @spec force_change_attributes(t(), map | Keyword.t()) :: t()
  def force_change_attributes(changeset, changes) do
    Enum.reduce(changes, changeset, fn {key, value}, changeset ->
      force_change_attribute(changeset, key, value)
    end)
  end

  @doc "Changes an attribute even if it isn't writable"
  @spec force_change_attribute(t(), atom, any) :: t()
  def force_change_attribute(changeset, attribute, value) do
    case Ash.Resource.Info.attribute(changeset.resource, attribute) do
      nil ->
        error =
          NoSuchAttribute.exception(
            resource: changeset.resource,
            attribute: attribute
          )

        add_error(changeset, error)

      attribute ->
        with value <- Ash.Type.Helpers.handle_indexed_maps(attribute.type, value),
             constraints <-
               Ash.Type.include_source(attribute.type, changeset, attribute.constraints),
             {:ok, prepared} <-
               prepare_change(changeset, attribute, value, constraints),
             {:ok, casted} <-
               Ash.Type.cast_input(attribute.type, prepared, constraints),
             {:ok, casted} <- handle_change(changeset, attribute, casted, constraints),
             {:ok, casted} <-
               Ash.Type.apply_constraints(attribute.type, casted, constraints) do
          data_value =
            if changeset.action_type != :create do
              case changeset.data do
                %Ash.Changeset.OriginalDataNotAvailable{} ->
                  nil

                data ->
                  Map.get(data, attribute.name)
              end
            end
            |> case do
              %Ash.ForbiddenField{} -> nil
              %Ash.NotLoaded{} -> nil
              v -> v
            end

          cond do
            changeset.action_type == :create ->
              %{
                changeset
                | attributes: Map.put(changeset.attributes, attribute.name, casted),
                  defaults: changeset.defaults -- [attribute.name]
              }

            match?(%OriginalDataNotAvailable{}, changeset.data) ->
              %{
                changeset
                | attributes: Map.put(changeset.attributes, attribute.name, casted),
                  defaults: changeset.defaults -- [attribute.name]
              }

            is_nil(data_value) and is_nil(casted) ->
              %{
                changeset
                | attributes: Map.delete(changeset.attributes, attribute.name),
                  defaults: changeset.defaults -- [attribute.name]
              }

            Ash.Type.equal?(attribute.type, casted, data_value) ->
              %{
                changeset
                | attributes: Map.delete(changeset.attributes, attribute.name),
                  defaults: changeset.defaults -- [attribute.name]
              }

            true ->
              %{
                changeset
                | attributes: Map.put(changeset.attributes, attribute.name, casted),
                  defaults: changeset.defaults -- [attribute.name]
              }
          end
          |> remove_default(attribute.name)
          |> record_attribute_change_for_atomic_upgrade(attribute.name, casted)
        else
          :error ->
            add_invalid_errors(value, :attribute, changeset, attribute)

          {:error, error_or_errors} ->
            add_invalid_errors(value, :attribute, changeset, attribute, error_or_errors)
        end
    end
  end

  defp record_attribute_change_for_atomic_upgrade(changeset, name, casted) do
    if changeset.phase == :pending do
      %{changeset | attribute_changes: Map.put(changeset.attribute_changes, name, casted)}
    else
      changeset
    end
  end

  defp record_atomic_update_for_atomic_upgrade(changeset, name, value) do
    if changeset.phase == :pending do
      %{changeset | atomic_changes: Keyword.put(changeset.atomic_changes, name, value)}
    else
      changeset
    end
  end

  @doc """
  Adds a before_action hook to the changeset.

  Provide the option `prepend?: true` to place the hook before all
  other hooks instead of after.
  """
  @spec before_action(
          changeset :: t(),
          fun :: before_action_fun(),
          opts :: Keyword.t()
        ) ::
          t()
  def before_action(changeset, func, opts \\ []) do
    changeset = maybe_dirty_hook(changeset, :before_action)

    if opts[:prepend?] do
      %{changeset | before_action: [func | changeset.before_action]}
    else
      %{changeset | before_action: changeset.before_action ++ [func]}
    end
  end

  @doc """
  Adds a before_transaction hook to the changeset.

  Provide the option `prepend?: true` to place the hook before all
  other hooks instead of after.
  """
  @spec before_transaction(
          t(),
          before_transaction_fun(),
          Keyword.t()
        ) :: t()
  def before_transaction(changeset, func, opts \\ []) do
    changeset = maybe_dirty_hook(changeset, :before_transaction)

    if opts[:prepend?] do
      %{changeset | before_transaction: [func | changeset.before_transaction]}
    else
      %{changeset | before_transaction: changeset.before_transaction ++ [func]}
    end
  end

  @doc """
  Adds an after_action hook to the changeset.

  Provide the option `prepend?: true` to place the hook before all
  other hooks instead of after.
  """
  @spec after_action(
          t(),
          after_action_fun(),
          Keyword.t()
        ) :: t()
  def after_action(changeset, func, opts \\ []) do
    changeset = maybe_dirty_hook(changeset, :after_action)

    if opts[:prepend?] do
      if changeset.phase == :pending do
        %{
          changeset
          | after_action: [func | changeset.after_action],
            atomic_after_action: [func | changeset.atomic_after_action]
        }
      else
        %{changeset | after_action: [func | changeset.after_action]}
      end
    else
      if changeset.phase == :pending do
        %{
          changeset
          | after_action: changeset.after_action ++ [func],
            atomic_after_action: changeset.atomic_after_action ++ [func]
        }
      else
        %{changeset | after_action: changeset.after_action ++ [func]}
      end
    end
  end

  @doc """
  Adds an after_transaction hook to the changeset. Cannot be called within other hooks.

  `after_transaction` hooks differ from `after_action` hooks in that they are run
  on success *and* failure of the action or some previous hook.

  Provide the option `prepend?: true` to place the hook before all
  other hooks instead of after.
  """
  @spec after_transaction(
          t(),
          after_transaction_fun(),
          Keyword.t()
        ) :: t()
  def after_transaction(changeset, func, opts \\ []) do
    changeset = maybe_dirty_hook(changeset, :after_transaction)

    if changeset.phase in [:pending, :validate] do
      if opts[:prepend?] do
        %{changeset | after_transaction: [func | changeset.after_transaction]}
      else
        %{changeset | after_transaction: changeset.after_transaction ++ [func]}
      end
    else
      Ash.Changeset.add_error(
        changeset,
        "Cannot add after_transaction hooks inside of other hooks, or in atomic hooks. Current phase: #{inspect(changeset.phase)}."
      )
    end
  end

  @doc """
  Adds an around_action hook to the changeset.

  Your function will get the changeset, and a callback that must be called with a changeset (that may be modified).
  The callback will return `{:ok, result, changeset, instructions}` or `{:error, error}`. You can modify these values, but the
  return value must be one of those types. Instructions contains the notifications in its `notifications` key, i.e
  `%{notifications: [%Ash.Resource.Notification{}, ...]}`.

  The around_action calls happen first, and then (after they each resolve their callbacks) the `before_action`
  hooks are called, followed by the action itself occurring at the data layer and then the `after_action` hooks being run.
  Then, the code that appeared *after* the callbacks were called is then run.

  For example:
  ```elixir
  changeset
  |> Ash.Changeset.around_action(fn changeset, callback ->
    IO.puts("first around: before")
    result = callback.(changeset)
    IO.puts("first around: after")

    result
  end)
  |> Ash.Changeset.around_action(fn changeset, callback ->
    IO.puts("second around: before")
    result = callback.(changeset)
    IO.puts("second around: after")

    result
  end)
  |> Ash.Changeset.before_action(fn changeset ->
    IO.puts("first before")
    changeset
  end)
  |> Ash.Changeset.before_action(fn changeset ->
    IO.puts("second before")
    changeset
  end)
  |> Ash.Changeset.after_action(fn changeset, result ->
    IO.puts("first after")
    {:ok, result}
  end)
  |> Ash.Changeset.after_action(fn changeset, result ->
    IO.puts("second after")
    {:ok, result}
  end)
  ```

  This would print:
  ```
  first around: before
  second around: before
  first before
  second before
               <-- action happens here
  first after
  second after
  second around: after <-- Notice that because of the callbacks, the after of the around hooks is reversed from the before
  first around: after
  ```

  Warning: using this without understanding how it works can cause big problems.
  You *must* call the callback function that is provided to your hook, and the return value must
  contain the same structure that was given to you, i.e `{:ok, result_of_action, instructions}`.

  You can almost always get the same effect by using `before_action`, setting some context on the changeset
  and reading it out in an `after_action` hook.
  """

  @spec around_action(t(), around_action_fun()) :: t()
  def around_action(changeset, func) do
    changeset = maybe_dirty_hook(changeset, :around_action)
    %{changeset | around_action: changeset.around_action ++ [func]}
  end

  @doc """
  Adds an around_transaction hook to the changeset.

  Your function will get the changeset, and a callback that must be called with a changeset (that may be modified).
  The callback will return `{:ok, result}` or `{:error, error}`. You can modify these values, but the return value
  must be one of those types.

  The around_transaction calls happen first, and then (after they each resolve their callbacks) the `before_transaction`
  hooks are called, followed by the action hooks and then the `after_transaction` hooks being run.
  Then, the code that appeared *after* the callbacks were called is then run.

  For example:

  ```elixir
  changeset
  |> Ash.Changeset.around_transaction(fn changeset, callback ->
    IO.puts("first around: before")
    result = callback.(changeset)
    IO.puts("first around: after")

    result
  end)
  |> Ash.Changeset.around_transaction(fn changeset, callback ->
    IO.puts("second around: before")
    result = callback.(changeset)
    IO.puts("second around: after")

    result
  end)
  |> Ash.Changeset.before_transaction(fn changeset ->
    IO.puts("first before")
    changeset
  end)
  |> Ash.Changeset.before_transaction(fn changeset ->
    IO.puts("second before")
    changeset
  end)
  |> Ash.Changeset.after_transaction(fn changeset, result ->
    IO.puts("first after")
    result
  end)
  |> Ash.Changeset.after_transaction(fn changeset, result ->
    IO.puts("second after")
    result
  end)
  ```

  This would print:

  ```
  first around: before
  second around: before
  first before
  second before
               <-- action hooks happens here
  first after
  second after
  second around: after <-- Notice that because of the callbacks, the after of the around hooks is reversed from the before
  first around: after
  ```

  Warning: using this without understanding how it works can cause big problems.
  You *must* call the callback function that is provided to your hook, and the return value must
  contain the same structure that was given to you, i.e `{:ok, result_of_action}`.

  You can almost always get the same effect by using `before_transaction`, setting some context on the changeset
  and reading it out in an `after_transaction` hook.
  """

  @spec around_transaction(t(), around_transaction_fun()) :: t()
  def around_transaction(changeset, func) do
    changeset = maybe_dirty_hook(changeset, :around_transaction)
    %{changeset | around_transaction: changeset.around_transaction ++ [func]}
  end

  defp maybe_dirty_hook(changeset, type) do
    if changeset.phase == :pending do
      %{changeset | dirty_hooks: Enum.uniq([type | changeset.dirty_hooks])}
    else
      changeset
    end
  end

  @doc """
  Returns the original data with attribute changes merged, if the changeset is valid.

  Options:

  * force? - applies current attributes even if the changeset is not valid
  """
  @spec apply_attributes(t(), opts :: Keyword.t()) :: {:ok, Ash.Resource.record()} | {:error, t()}
  def apply_attributes(changeset, opts \\ [])

  def apply_attributes(%{valid?: true} = changeset, _opts) do
    changeset = set_defaults(changeset, changeset.action_type, true)

    {:ok,
     Enum.reduce(changeset.attributes, changeset.data, fn {attribute, value}, data ->
       Map.put(data, attribute, value)
     end)}
  end

  def apply_attributes(changeset, opts) do
    if opts[:force?] do
      apply_attributes(%{changeset | valid?: true}, opts)
    else
      {:error, changeset}
    end
  end

  defp remove_default(changeset, attribute) do
    %{changeset | defaults: changeset.defaults -- [attribute]}
  end

  @doc "Clears an attribute or relationship change off of the changeset."
  def clear_change(changeset, field) do
    cond do
      attr = Ash.Resource.Info.attribute(changeset.resource, field) ->
        %{
          changeset
          | attributes: Map.delete(changeset.attributes, attr.name),
            atomics: Keyword.delete(changeset.atomics, attr.name)
        }

      rel = Ash.Resource.Info.relationship(changeset.resource, field) ->
        %{changeset | relationships: Map.delete(changeset.relationships, rel.name)}

      true ->
        changeset
    end
  end

  @doc """
  Sets a custom error handler on the changeset.

  The error handler should be a two argument function or an mfa, in which case the first two arguments will be set
  to the changeset and the error, w/ the supplied arguments following those. The changeset will be marked
  as invalid regardless of the outcome of this callback.

  Any errors generated are passed to `handle_errors`, which can return any of the following:

  * `:ignore` - the error is discarded.
  * `changeset` - a new (or the same) changeset. The error is not added.
  * `{changeset, error}` - a new (or the same) error and changeset. The error is added to the changeset.
  * `anything_else` - is treated as a new, transformed version of the error. The result is added as an error to the changeset.
  """
  @spec handle_errors(
          t(),
          (t(), error :: term -> :ignore | t() | (error :: term) | {error :: term, t()})
          | {module, atom, [term]}
        ) :: t()
  def handle_errors(changeset, {m, f, a}) do
    %{changeset | handle_errors: &apply(m, f, [&1, &2 | a])}
  end

  def handle_errors(changeset, func) do
    %{changeset | handle_errors: func}
  end

  @doc """
  Adds a filter for a record being updated or destroyed.

  Used by optimistic locking. See `Ash.Resource.Change.Builtins.optimistic_lock/1` for more.
  """
  @spec filter(t(), Ash.Expr.t()) :: t()
  def filter(changeset, expr) when expr in [nil, %{}, []] do
    changeset
  end

  def filter(changeset, expression) do
    if Ash.DataLayer.data_layer_can?(changeset.resource, :changeset_filter) do
      expression = Ash.Filter.parse!(changeset.resource, expression)

      actor = changeset.context[:private][:actor]

      if is_nil(actor) && Ash.Expr.template_references_actor?(expression) do
        add_error(changeset, Ash.Error.Changes.ActionRequiresActor.exception([]))
      else
        expression =
          Ash.Expr.fill_template(
            expression,
            actor: actor,
            tenant: changeset.to_tenant,
            args: changeset.arguments,
            context: changeset.context,
            changeset: changeset
          )

        with {:ok, expression} <-
               Ash.Filter.hydrate_refs(expression, %{
                 resource: changeset.resource,
                 public?: false
               }),
             {:ok, full_expression} <-
               Ash.Filter.add_to_filter(
                 changeset.filter,
                 expression
               ) do
          %{changeset | filter: full_expression}
          |> record_added_filter(expression)
        else
          {:error, error} ->
            Ash.Changeset.add_error(changeset, error)
        end
      end
    else
      IO.warn(
        "changeset.filter is not supported by the #{inspect(Ash.DataLayer.data_layer(changeset.resource))} data layer"
      )

      changeset
    end
  end

  defp record_added_filter(%{phase: :pending} = changeset, expression) do
    case Ash.Filter.add_to_filter(changeset.added_filter, expression) do
      {:ok, new_added_filter} ->
        %{changeset | added_filter: new_added_filter}

      {:error, error} ->
        Ash.Changeset.add_error(changeset, Ash.Error.to_ash_error(error))
    end
  end

  defp record_added_filter(changeset, _), do: changeset

  @doc """
  Add an error to the errors list and mark the changeset as invalid.

  See `Ash.Error.to_ash_error/3` for more on supported values for `error`
  """
  @spec add_error(t(), Ash.Error.error_input(), path :: Ash.Error.path_input()) :: t()
  @spec add_error(t(), Ash.Error.error_input()) :: t()

  def add_error(changeset, errors, path \\ [])

  def add_error(changeset, [], _path) do
    changeset
  end

  def add_error(changeset, errors, path) when is_list(errors) do
    if Keyword.keyword?(errors) do
      errors
      |> to_change_error()
      |> Ash.Error.set_path(path)
      |> handle_error(changeset)
    else
      Enum.reduce(errors, changeset, &add_error(&2, &1, path))
    end
  end

  def add_error(changeset, error, path) when is_binary(error) do
    add_error(
      changeset,
      InvalidChanges.exception(message: error),
      path
    )
  end

  def add_error(changeset, %__MODULE__{errors: errors}, path) do
    add_error(changeset, errors, path)
  end

  def add_error(changeset, error, path) do
    error
    |> Ash.Error.set_path(path)
    |> handle_error(changeset)
  end

  defp handle_error(error, %{handle_errors: nil} = changeset) do
    %{changeset | valid?: false, errors: [error | changeset.errors]}
  end

  defp handle_error(error, changeset) do
    changeset
    |> changeset.handle_errors.(error)
    |> case do
      :ignore ->
        %{changeset | valid?: false}

      {:ignore, changeset} ->
        %{changeset | valid?: false}

      %__MODULE__{} = changeset ->
        %{changeset | valid?: false}

      {changeset, error} ->
        %{changeset | valid?: false, errors: [error | changeset.errors]}

      error ->
        %{changeset | valid?: false, errors: [error | changeset.errors]}
    end
  end

  defp to_change_error(keyword) do
    error =
      if keyword[:field] do
        InvalidAttribute.exception(
          field: keyword[:field],
          message: keyword[:message],
          value: keyword[:value],
          vars: keyword
        )
      else
        InvalidChanges.exception(
          fields: keyword[:fields] || [],
          message: keyword[:message],
          value: keyword[:value],
          vars: keyword
        )
      end

    if keyword[:path] do
      Ash.Error.set_path(error, keyword[:path])
    else
      error
    end
  end

  defp prepare_change(%{action_type: :create}, _attribute, value, _constraints), do: {:ok, value}

  defp prepare_change(changeset, attribute, value, constraints) do
    old_value = Map.get(changeset.data, attribute.name)
    Ash.Type.prepare_change(attribute.type, old_value, value, constraints)
  end

  defp handle_change(%{action_type: :create}, _attribute, value, _constraints), do: {:ok, value}

  defp handle_change(changeset, attribute, value, constraints) do
    old_value = Map.get(changeset.data, attribute.name)
    Ash.Type.handle_change(attribute.type, old_value, value, constraints)
  end

  defp add_invalid_errors(value, type, changeset, attribute, message \\ nil) do
    changeset = %{changeset | invalid_keys: MapSet.put(changeset.invalid_keys, attribute.name)}

    message
    |> Ash.Helpers.flatten_preserving_keywords()
    |> Enum.reduce(changeset, fn message, changeset ->
      if is_exception(message) do
        error =
          message
          |> Ash.Error.to_ash_error()

        errors =
          case error do
            %class{errors: errors}
            when class in [
                   Ash.Error.Invalid,
                   Ash.Error.Unknown,
                   Ash.Error.Forbidden,
                   Ash.Error.Framework
                 ] ->
              errors

            error ->
              [error]
          end

        Enum.reduce(errors, changeset, fn error, changeset ->
          add_error(changeset, Ash.Error.set_path(error, attribute.name))
        end)
      else
        opts = Ash.Type.Helpers.error_to_exception_opts(message, attribute)

        exception =
          case type do
            :attribute -> InvalidAttribute
            :argument -> InvalidArgument
          end

        Enum.reduce(opts, changeset, fn opts, changeset ->
          error =
            exception.exception(
              value: value,
              field: Keyword.get(opts, :field),
              message: Keyword.get(opts, :message),
              vars: opts
            )

          error =
            if opts[:path] do
              Ash.Error.set_path(error, opts[:path])
            else
              error
            end

          add_error(changeset, error)
        end)
      end
    end)
  end

  defp set_phase(changeset, phase) when changeset.phase == phase, do: changeset

  defp set_phase(changeset, phase) when phase in @phases, do: %{changeset | phase: phase}
  defp clear_phase(changeset), do: %{changeset | phase: :pending}
end