lib/ash/changeset/changeset.ex

defmodule Ash.Changeset do
  @moduledoc """
  Changesets are used to create and update data in Ash.

  Create a changeset with `new/1` or `new/2`, and alter the attributes
  and relationships using the functions provided in this module.  Nothing in this module
  actually incurs changes in a data layer. To commit a changeset, see `c:Ash.Api.create/2`
  and `c:Ash.Api.update/2`.

  See the action DSL documentation for more.
  """

  defstruct [
    :__validated_for_action__,
    :action_type,
    :action,
    :api,
    :data,
    :handle_errors,
    :resource,
    :tenant,
    :timeout,
    invalid_keys: MapSet.new(),
    filters: %{},
    action_failed?: false,
    atomics: [],
    after_action: [],
    after_transaction: [],
    arguments: %{},
    around_action: [],
    around_transaction: [],
    attributes: %{},
    before_action: [],
    before_transaction: [],
    context: %{},
    defaults: [],
    errors: [],
    params: %{},
    phase: :validate,
    relationships: %{},
    select: nil,
    load: [],
    valid?: true
  ]

  defimpl Inspect do
    import Inspect.Algebra

    @spec inspect(Ash.Changeset.t(), Inspect.Opts.t()) ::
            {:doc_cons, :doc_line | :doc_nil | binary | tuple,
             :doc_line | :doc_nil | binary | tuple}
            | {:doc_group,
               :doc_line
               | :doc_nil
               | binary
               | {:doc_collapse, pos_integer}
               | {:doc_force, any}
               | {:doc_break | :doc_color | :doc_cons | :doc_fits | :doc_group | :doc_string, any,
                  any}
               | {:doc_nest, any, :cursor | :reset | non_neg_integer, :always | :break},
               :inherit | :self}
    def inspect(changeset, opts) do
      context = Map.delete(changeset.context, :private)

      context =
        if context == %{} do
          empty()
        else
          concat("context: ", to_doc(context, opts))
        end

      tenant =
        if changeset.tenant do
          concat("tenant: ", to_doc(changeset.tenant, opts))
        else
          empty()
        end

      api =
        if changeset.api do
          concat("api: ", to_doc(changeset.api, opts))
        else
          empty()
        end

      select =
        if changeset.select do
          concat("select: ", to_doc(changeset.select, opts))
        else
          empty()
        end

      load =
        if changeset.load && changeset.load != [] do
          concat("load: ", to_doc(changeset.load, opts))
        else
          empty()
        end

      container_doc(
        "#Ash.Changeset<",
        [
          api,
          concat("action_type: ", inspect(changeset.action_type)),
          concat("action: ", inspect(changeset.action && changeset.action.name)),
          tenant,
          concat("attributes: ", to_doc(changeset.attributes, opts)),
          concat("relationships: ", to_doc(changeset.relationships, opts)),
          arguments(changeset, opts),
          concat("errors: ", to_doc(changeset.errors, opts)),
          concat("data: ", to_doc(changeset.data, opts)),
          context,
          concat("valid?: ", to_doc(changeset.valid?, opts)),
          select,
          load
        ],
        ">",
        opts,
        fn str, _ -> str end
      )
    end

    defp arguments(changeset, opts) do
      if changeset.action do
        if Enum.empty?(changeset.action.arguments) do
          empty()
        else
          arg_string =
            changeset.action.arguments
            |> Enum.filter(fn argument ->
              match?({:ok, _}, Ash.Changeset.fetch_argument(changeset, argument.name))
            end)
            |> Map.new(fn argument ->
              if argument.sensitive? do
                {argument.name, "**redacted**"}
              else
                {argument.name, Ash.Changeset.get_argument(changeset, argument.name)}
              end
            end)
            |> to_doc(opts)

          concat(["arguments: ", arg_string])
        end
      else
        empty()
      end
    end
  end

  @type after_action_fun ::
          (t, Ash.Resource.record() ->
             {:ok, Ash.Resource.record()}
             | {:ok, Ash.Resource.record(), [Ash.Notifier.Notification.t()]}
             | {:error, any})

  # TODO: make these structs i.e %Changeset.AfterActionHook{}
  @type after_transaction_fun ::
          (t, {:ok, Ash.Resource.record()} | {:error, any} ->
             {:ok, Ash.Resource.record()} | {:error, any})

  @type before_action_fun :: (t -> t | {t, %{notifications: [Ash.Notifier.Notification.t()]}})

  @type before_transaction_fun :: (t -> t)

  @type around_result ::
          {:ok, Ash.Resource.record(), t(), %{notifications: list(Ash.Notifier.Notification.t())}}
          | {:error, Ash.Error.t()}
  @type around_callback :: (t() -> around_result)
  @type around_action_fun :: (t, around_callback -> around_result)

  @type around_transaction_fun :: (t -> {:ok, Ash.Resource.record()} | {:error, any})

  @type t :: %__MODULE__{
          __validated_for_action__: atom | nil,
          action: Ash.Resource.Actions.action() | nil,
          action_failed?: boolean,
          action_type: Ash.Resource.Actions.action_type() | nil,
          after_action: [after_action_fun | {after_action_fun, map}],
          after_transaction: [after_transaction_fun | {after_transaction_fun, map}],
          api: module | nil,
          arguments: %{optional(atom) => any},
          around_action: [around_action_fun | {around_action_fun, map}],
          around_transaction: [around_transaction_fun | {around_transaction_fun, map}],
          attributes: %{optional(atom) => any},
          before_action: [before_action_fun | {around_action_fun, map}],
          before_transaction: [before_transaction_fun | {before_transaction_fun, map}],
          context: map,
          data: Ash.Resource.record() | nil,
          defaults: [atom],
          errors: [Ash.Error.t()],
          handle_errors:
            nil | (t, error :: any -> :ignore | t | (error :: any) | {error :: any, t}),
          invalid_keys: MapSet.t(),
          params: %{optional(atom | binary) => any},
          phase:
            :validate
            | :before_transaction
            | :before_action
            | :after_action
            | :after_transaction
            | :around_action
            | :around_transaction,
          relationships: %{
            optional(atom) =>
              %{optional(atom | binary) => any} | [%{optional(atom | binary) => any}]
          },
          resource: module,
          select: [atom] | nil,
          load: keyword(keyword),
          tenant: any,
          timeout: pos_integer() | nil,
          valid?: boolean
        }

  @type error_info ::
          String.t()
          | [
              {:field, atom()}
              | {:fields, [atom()]}
              | {:message, String.t()}
              | {:value, any()}
            ]
          | %{:__struct__ => atom(), required(atom()) => any()}

  alias Ash.Error.{
    Changes.InvalidArgument,
    Changes.InvalidAttribute,
    Changes.InvalidChanges,
    Changes.InvalidRelationship,
    Changes.NoSuchAttribute,
    Changes.NoSuchRelationship,
    Changes.Required,
    Invalid.NoSuchResource
  }

  require Ash.Tracer
  require Ash.Expr
  require Logger

  defmacrop maybe_already_validated_error!(changeset, alternative \\ nil) do
    {function, arity} = __CALLER__.function

    if alternative do
      quote do
        changeset = unquote(changeset)

        if changeset.__validated_for_action__ && !changeset.context[:private][:in_before_action?] do
          IO.warn("""
          Changeset has already been validated for action #{inspect(changeset.__validated_for_action__)}.

          In the future, this will become an error.

          For safety, we prevent any changes after that point because they will bypass validations or other action logic.. To proceed anyway,
          you can use `#{unquote(alternative)}/#{unquote(arity)}`. However, you should prefer a pattern like the below, which makes
          any custom changes *before* calling the action.

            Resource
            |> Ash.Changeset.new()
            |> Ash.Changeset.#{unquote(function)}(...)
            |> Ash.Changeset.for_create(...)
          """)
        end
      end
    else
      quote do
        changeset = unquote(changeset)

        if changeset.__validated_for_action__ && !changeset.context[:private][:in_before_action?] do
          IO.warn("""
          Changeset has already been validated for action #{inspect(changeset.__validated_for_action__)}.

          In the future, this will become an error.

          For safety, we prevent any changes using `#{unquote(function)}/#{unquote(arity)}` after that point because they will bypass validations or other action logic.
          Instead, you should change or set this value before calling the action, like so:

            Resource
            |> Ash.Changeset.new()
            |> Ash.Changeset.#{unquote(function)}(...)
            |> Ash.Changeset.for_create(...)
          """)
        end
      end
    end
  end

  @doc """
  Returns a new changeset over a resource.

  *Warning*: You almost always want to use `for_action` or `for_create`, etc. over this function if possible.

  You can use this to start a changeset and make a few changes prior to calling `for_action`. For example:

  ```elixir
  Resource
  |> Ash.Changeset.new()
  |> Ash.Changeset.change_attribute(:name, "foobar")
  |> Ash.Changeset.for_action(...)
  ```
  """
  @spec new(Ash.Resource.t() | Ash.Resource.record(), params :: map) :: t
  def new(resource, params \\ %{})

  def new(%resource{} = record, params) do
    tenant =
      record
      |> Map.get(:__metadata__, %{})
      |> Map.get(:tenant, nil)

    context = Ash.Resource.Info.default_context(resource) || %{}

    if Ash.Resource.Info.resource?(resource) do
      %__MODULE__{resource: resource, data: record, action_type: :update}
      |> change_attributes(params)
      |> set_context(context)
      |> set_tenant(tenant)
    else
      %__MODULE__{
        resource: resource,
        action_type: :update,
        data: struct(resource)
      }
      |> add_error(NoSuchResource.exception(resource: resource))
      |> set_tenant(tenant)
      |> set_context(context)
    end
  end

  def new(resource, params) do
    if Ash.Resource.Info.resource?(resource) do
      %__MODULE__{
        resource: resource,
        action_type: :create,
        data: struct(resource)
      }
      |> change_attributes(params)
    else
      %__MODULE__{resource: resource, action_type: :create, data: struct(resource)}
      |> add_error(NoSuchResource.exception(resource: resource))
    end
  end

  @doc """
  Ensure that only the specified attributes are present in the results.

  The first call to `select/2` will replace the default behavior of selecting
  all attributes. Subsequent calls to `select/2` will combine the provided
  fields unless the `replace?` option is provided with a value of `true`.

  If a field has been deselected, selecting it again will override that (because a single list of fields is tracked for selection)

  Primary key attributes always selected and cannot be deselected.

  When attempting to load a relationship (or manage it with `Ash.Changeset.manage_relationship/3`),
  if the source field is not selected on the query/provided data an error will be produced. If loading
  a relationship with a query, an error is produced if the query does not select the destination field
  of the relationship.

  Datalayers currently are not notified of the `select` for a changeset(unlike queries), and creates/updates select all fields when they are performed.
  A select provided on a changeset sets the unselected fields to `nil` before returning the result.

  Use `ensure_selected/2` if you wish to make sure a field has been selected, without deselecting any other fields.
  """
  def select(changeset, fields, opts \\ []) do
    if opts[:replace?] do
      %{changeset | select: Enum.uniq(List.wrap(fields))}
    else
      %{changeset | select: Enum.uniq(List.wrap(fields) ++ (changeset.select || []))}
    end
  end

  @doc """
  Calls the provided load statement on the result of the action at the very end of the action.
  """
  def load(changeset, load) do
    query =
      changeset.resource
      |> Ash.Query.new()
      |> Map.put(:errors, [])
      |> Ash.Query.load(changeset.load)
      |> Ash.Query.load(load)

    changeset = %{
      changeset
      | load: Enum.concat(changeset.load || [], List.wrap(load))
    }

    Enum.reduce(query.errors, changeset, &add_error(&2, &1))
  end

  @doc """
  Ensures that the given attributes are selected.

  The first call to `select/2` will *limit* the fields to only the provided fields.
  Use `ensure_selected/2` to say "select this field (or these fields) without deselecting anything else".

  See `select/2` for more.
  """
  def ensure_selected(changeset, fields) do
    if changeset.select do
      Ash.Changeset.select(changeset, List.wrap(fields))
    else
      to_select =
        changeset.resource
        |> Ash.Resource.Info.attributes()
        |> Enum.map(& &1.name)

      Ash.Changeset.select(changeset, to_select)
    end
  end

  @doc """
  Ensure the the specified attributes are `nil` in the changeset results.
  """
  def deselect(changeset, fields) do
    select =
      if changeset.select do
        changeset.select
      else
        changeset.resource
        |> Ash.Resource.Info.attributes()
        |> Enum.map(& &1.name)
      end

    select = select -- List.wrap(fields)

    select(changeset, select, replace?: true)
  end

  def selecting?(changeset, field) do
    case changeset.select do
      nil ->
        not is_nil(Ash.Resource.Info.attribute(changeset.resource, field))

      select ->
        if field in select do
          true
        else
          attribute = Ash.Resource.Info.attribute(changeset.resource, field)

          attribute && (attribute.private? || attribute.primary_key?)
        end
    end || loading?(changeset, field)
  end

  @doc """
  Returns true if the field/relationship or path to field/relationship is being loaded.

  It accepts an atom or a list of atoms, which is treated for as a "path", i.e:

      Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?([:friends, :enemies, :score])
      iex> true

      Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?([:friends, :score])
      iex> false

      Resource |> Ash.Changeset.load(friends: [enemies: [:score]]) |> Ash.Changeset.loading?(:friends)
      iex> true
  """
  def loading?(changeset, path) do
    changeset.resource
    |> Ash.Query.new()
    |> Ash.Query.load(changeset.load)
    |> Ash.Query.loading?(path)
  end

  @doc """
  Returns a list of attributes, aggregates, relationships, and calculations that are being loaded

  Provide a list of field types to narrow down the returned results.
  """
  def accessing(changeset, types \\ [:attributes, :relationships, :calculations, :attributes]) do
    changeset.resource
    |> Ash.Query.new()
    |> Ash.Query.load(changeset.load)
    |> Map.put(:select, changeset.select)
    |> Ash.Query.accessing(types)
  end

  @manage_types [:append_and_remove, :append, :remove, :direct_control, :create]

  @doc """
  Constructs a changeset for a given action, and validates it.

  Calls `for_create/4`, `for_update/4` or `for_destroy/4` based on the type of action passed in.

  See those functions for more explanation.
  """
  def for_action(initial, action, params \\ %{}, opts \\ []) do
    resource =
      case initial do
        %__MODULE__{resource: resource} -> resource
        %resource{} -> resource
        resource -> resource
      end

    action = get_action_entity(resource, action)

    case action.type do
      :create ->
        for_create(initial, action, params, opts)

      :update ->
        for_update(initial, action, params, opts)

      :destroy ->
        for_destroy(initial, action, params, opts)

      :read ->
        raise ArgumentError,
              "Passed a read action `#{inspect(resource)}.#{action.name}` into `Ash.Changeset.for_action/4`. Use `Ash.Query.for_read/4` instead."
    end
  end

  @for_create_opts [
    require?: [
      type: :boolean,
      default: false,
      doc:
        "If set to `false`, values are only required when the action is run (instead of immediately)."
    ],
    actor: [
      type: :any,
      doc:
        "set the actor, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)"
    ],
    authorize?: [
      type: :any,
      doc:
        "set authorize?, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)"
    ],
    tracer: [
      type: {:wrap_list, {:behaviour, Ash.Tracer}},
      doc:
        "A tracer to use. Will be carried over to the action. For more information see `Ash.Tracer`."
    ],
    tenant: [
      type: :any,
      doc: "set the tenant on the changeset"
    ]
  ]

  @doc false
  def for_create_opts, do: @for_create_opts

  @doc """
  Constructs a changeset for a given create action, and validates it.

  Anything that is modified prior to `for_create/4` is validated against the rules of the action, while *anything after it is not*.
  This runs any `change`s contained on your action. To have your logic execute *only* during the action, you can use `after_action/2`
  or `before_action/2`.

  Multitenancy is *not* validated until an action is called. This allows you to avoid specifying a tenant until just before calling
  the api action.

  ### Params
  `params` may be attributes, relationships, or arguments. You can safely pass user/form input directly into this function.
  Only public attributes and relationships are supported. If you want to change private attributes as well, see the
  Customization section below. `params` are stored directly as given in the `params` field of the changeset, which is used

  ### Opts

  #{Spark.OptionsHelpers.docs(@for_create_opts)}

  ### Customization

  A changeset can be provided as the first argument, instead of a resource, to allow
  setting specific attributes ahead of time.

  For example:

      MyResource
      |> Ash.Changeset.new()
      |> Ash.Changeset.change_attribute(:foo, 1)
      |> Ash.Changeset.for_create(:create, ...opts)

  Once a changeset has been validated by `for_create/4` (or `for_update/4`), it isn't validated again in the action.
  New changes added are validated individually, though. This allows you to create a changeset according
  to a given action, and then add custom changes if necessary.
  """
  def for_create(initial, action, params \\ %{}, opts \\ []) do
    changeset =
      case initial do
        %__MODULE__{action_type: :create} = changeset ->
          changeset

        resource when is_atom(resource) ->
          new(resource)

        other ->
          raise ArgumentError,
            message: """
            Initial must be a changeset with the action type of `:create`, or a resource.

            Got: #{inspect(other)}
            """
      end

    action =
      get_action_entity(changeset.resource, action) ||
        raise_no_action(changeset.resource, action, :create)

    changeset
    |> set_context(%{
      private: %{
        upsert?: opts[:upsert?] || (action && action.upsert?) || false,
        upsert_identity: opts[:upsert_identity] || (action && action.upsert_identity),
        upsert_fields: opts[:upsert_fields] || (action && action.upsert_fields)
      }
    })
    |> do_for_action(action, params, opts)
  end

  @doc """
  Constructs a changeset for a given update action, and validates it.

  Anything that is modified prior to `for_update/4` is validated against the rules of the action, while *anything after it is not*.

  See `for_create/4` for more information
  """
  def for_update(initial, action, params \\ %{}, opts \\ []) do
    changeset =
      case initial do
        # We accept :destroy here to support soft deletes
        %__MODULE__{action_type: type} = changeset when type in [:update, :destroy] ->
          changeset

        %mod{} = struct when mod != __MODULE__ ->
          new(struct)

        other ->
          raise ArgumentError,
            message: """
            Initial must be a changeset with the action type of `:update` or `:destroy`, or a record.

            Got: #{inspect(other)}
            """
      end

    do_for_action(changeset, action, params, opts)
  end

  @doc """
  Constructs a changeset for a given destroy action, and validates it.

  ### Opts

  * `:actor` - set the actor, which can be used in any `Ash.Resource.Change`s configured on the action. (in the `context` argument)
  * `:tenant` - set the tenant on the changeset

  Anything that is modified prior to `for_destroy/4` is validated against the rules of the action, while *anything after it is not*.

  Once a changeset has been validated by `for_destroy/4`, it isn't validated again in the action.
  New changes added are validated individually, though. This allows you to create a changeset according
  to a given action, and then add custom changes if necessary.
  """
  def for_destroy(initial, action_or_name, params \\ %{}, opts \\ []) do
    changeset =
      case initial do
        %__MODULE__{} = changeset ->
          changeset
          |> Map.put(:action_type, :destroy)

        %_{} = struct ->
          struct
          |> new()
          |> Map.put(:action_type, :destroy)

        other ->
          raise ArgumentError,
            message: """
            Initial must be a changeset with the action type of `:destroy`, or a record.

            Got: #{inspect(other)}
            """
      end

    action =
      get_action_entity(changeset.resource, action_or_name) ||
        raise_no_action(changeset.resource, action_or_name, :destroy)

    if changeset.valid? do
      if action do
        if action.soft? do
          do_for_action(%{changeset | action_type: :destroy}, action, params, opts)
        else
          {changeset, opts} =
            Ash.Actions.Helpers.add_process_context(changeset.api, changeset, opts)

          name =
            "changeset:" <> Ash.Resource.Info.trace_name(changeset.resource) <> ":#{action.name}"

          Ash.Tracer.span :changeset,
                          name,
                          opts[:tracer] do
            Ash.Tracer.telemetry_span [:ash, :changeset], %{
              resource_short_name: Ash.Resource.Info.short_name(changeset.resource)
            } do
              metadata = %{
                resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
                resource: changeset.resource,
                actor: opts[:actor],
                tenant: opts[:tenant],
                action: action.name,
                authorize?: opts[:authorize?]
              }

              Ash.Tracer.set_metadata(opts[:tracer], :changeset, metadata)

              changeset
              |> Map.put(:action, action)
              |> handle_errors(action.error_handler)
              |> set_actor(opts)
              |> set_authorize(opts)
              |> set_tracer(opts)
              |> set_tenant(opts[:tenant] || changeset.tenant)
              |> cast_params(action, params)
              |> set_argument_defaults(action)
              |> require_arguments(action)
              |> run_action_changes(
                action,
                opts[:actor],
                opts[:authorize?],
                opts[:tracer],
                metadata
              )
              |> add_validations(opts[:tracer], metadata, opts[:actor])
              |> mark_validated(action.name)
              |> Map.put(:__validated_for_action__, action.name)
            end
          end
        end
      else
        raise_no_action(changeset.resource, action_or_name, :destroy)
      end
    else
      changeset
    end
  end

  @doc """
  Adds atomic changes to the changeset

  i.e `Ash.Changeset.atomic_update(changeset, score: [Ash.Expr.expr(score + 1)])`
  """
  def atomic_update(changeset, atomics) when is_list(atomics) do
    Enum.reduce(atomics, changeset, fn {key, value}, changeset ->
      atomic_update(changeset, key, value)
    end)
  end

  @doc """
  Adds an atomic change to the changeset

  i.e `Ash.Changeset.atomic_update(changeset, :score, [Ash.Expr.expr(score + 1)])`
  """
  def atomic_update(changeset, key, value) do
    %{changeset | atomics: Keyword.put(changeset.atomics, key, value)}
  end

  @doc """
  Set the result of the action. This will prevent running the underlying datalayer behavior
  """
  @spec set_result(t(), term) :: t()
  def set_result(changeset, result) do
    set_context(changeset, %{private: %{action_result: result}})
  end

  @spec set_on_upsert(t(), list(atom)) :: Keyword.t()
  def set_on_upsert(changeset, upsert_keys) do
    case changeset.context[:private][:upsert_fields] do
      fields when is_list(fields) ->
        create_upsert_list(fields, changeset)

      {:replace, fields} ->
        create_upsert_list(fields, changeset)

      :replace_all ->
        changeset.resource
        |> Ash.Resource.Info.attributes()
        |> Enum.map(fn %{name: name} -> name end)
        |> create_upsert_list(changeset)

      {:replace_all_except, except_fields} ->
        changeset.resource
        |> Ash.Resource.Info.attributes()
        |> Enum.map(fn %{name: name} -> name end)
        |> Enum.reject(fn name -> name in except_fields end)
        |> create_upsert_list(changeset)

      nil ->
        keys = upsert_keys || Ash.Resource.Info.primary_key(changeset.resource)

        explicitly_changing_attributes =
          Enum.map(
            Map.keys(changeset.attributes) -- Map.get(changeset, :defaults, []) -- keys,
            fn key ->
              {key, Ash.Changeset.get_attribute(changeset, key)}
            end
          )

        changeset
        |> upsert_update_defaults()
        |> Keyword.merge(explicitly_changing_attributes)
    end
  end

  defp create_upsert_list(fields, changeset) do
    Keyword.new(fields, fn key ->
      {key, Ash.Changeset.get_attribute(changeset, key)}
    end)
  end

  defp upsert_update_defaults(changeset) do
    changeset.resource
    |> static_defaults()
    |> Enum.concat(lazy_matching_defaults(changeset.resource))
    |> Enum.concat(lazy_non_matching_defaults(changeset.resource))
  end

  defp static_defaults(resource) do
    resource
    |> Ash.Resource.Info.static_default_attributes(:update)
    |> Enum.map(&{&1.name, &1.update_default})
  end

  defp lazy_non_matching_defaults(resource) do
    resource
    |> Ash.Resource.Info.lazy_non_matching_default_attributes(:update)
    |> Enum.map(&{&1.name, &1.update_default})
  end

  defp lazy_matching_defaults(resource) do
    resource
    |> Ash.Resource.Info.lazy_matching_default_attributes(:update)
    |> Enum.group_by(& &1.update_default)
    |> Enum.flat_map(fn {default_fun, attributes} ->
      default_value =
        case default_fun do
          function when is_function(function) ->
            function.()

          {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
            apply(m, f, a)
        end

      Enum.map(attributes, &{&1.name, default_value})
    end)
  end

  defp do_for_action(changeset, action_or_name, params, opts) do
    {changeset, opts} = Ash.Actions.Helpers.add_process_context(changeset.api, changeset, opts)

    if changeset.valid? do
      action = get_action_entity(changeset.resource, action_or_name)

      if action do
        name =
          "changeset:" <> Ash.Resource.Info.trace_name(changeset.resource) <> ":#{action.name}"

        Ash.Tracer.span :changeset,
                        name,
                        opts[:tracer] do
          Ash.Tracer.telemetry_span [:ash, :changeset], %{
            resource_short_name: Ash.Resource.Info.short_name(changeset.resource)
          } do
            metadata = %{
              resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
              resource: changeset.resource,
              actor: opts[:actor],
              tenant: opts[:tenant],
              action: action.name,
              authorize?: opts[:authorize?]
            }

            Ash.Tracer.set_metadata(opts[:tracer], :changeset, metadata)

            changeset =
              changeset
              |> prepare_changeset_for_action(action, opts)
              |> handle_params(action, params)
              |> run_action_changes(
                action,
                opts[:actor],
                opts[:authorize?],
                opts[:tracer],
                metadata
              )
              |> add_validations(opts[:tracer], metadata, opts[:actor])
              |> mark_validated(action.name)
              |> eager_validate_identities()
              |> Map.put(:__validated_for_action__, action.name)

            if Keyword.get(opts, :require?, true) do
              require_values(changeset, action.type)
            else
              changeset
            end
          end
        end
      else
        raise_no_action(changeset.resource, action_or_name, changeset.action_type)
      end
    else
      changeset
    end
  end

  def prepare_changeset_for_action(changeset, action, opts) do
    changeset
    |> Map.put(:action, action)
    |> reset_arguments()
    |> handle_errors(action.error_handler)
    |> set_actor(opts)
    |> set_authorize(opts)
    |> set_tracer(opts)
    |> timeout(changeset.timeout || opts[:timeout])
    |> set_tenant(opts[:tenant] || changeset.tenant || changeset.data.__metadata__[:tenant])
  end

  defp reset_arguments(%{arguments: arguments} = changeset) do
    Enum.reduce(arguments, changeset, fn {key, value}, changeset ->
      set_argument(changeset, key, value)
    end)
  end

  def handle_params(changeset, action, params, handle_params_opts \\ []) do
    if Keyword.get(handle_params_opts, :cast_params?, true) do
      cast_params(changeset, action, params || %{})
    else
      changeset
    end
    |> set_argument_defaults(action)
    |> require_arguments(action)
    |> validate_attributes_accepted(action)
    |> require_values(action.type, false, action.require_attributes)
    |> set_defaults(changeset.action_type, false)
  end

  defp get_action_entity(resource, name) when is_atom(name),
    do: Ash.Resource.Info.action(resource, name)

  defp get_action_entity(_resource, action), do: action

  defp eager_validate_identities(changeset) do
    identities =
      changeset.resource
      |> Ash.Resource.Info.identities()

    case identities do
      [] ->
        changeset

      identities ->
        Enum.reduce(identities, changeset, fn identity, changeset ->
          changeset =
            if identity.eager_check_with do
              validate_identity(changeset, identity, identity.eager_check_with)
            else
              changeset
            end

          if identity.pre_check_with do
            before_action(changeset, &validate_identity(&1, identity, identity.pre_check_with))
          else
            changeset
          end
        end)
    end
  end

  defp validate_identity(
         %{context: %{private: %{upsert?: true, upsert_identity: name}}} = changeset,
         %{name: name},
         _api
       ) do
    changeset
  end

  defp validate_identity(
         %{action: %{soft?: true}} = changeset,
         identity,
         api
       ) do
    do_validate_identity(changeset, identity, api)
  end

  defp validate_identity(
         %{action: %{type: type}} = changeset,
         identity,
         api
       )
       when type in [:create, :update] do
    do_validate_identity(changeset, identity, api)
  end

  defp validate_identity(
         %{action: %{type: type}} = changeset,
         identity,
         api
       )
       when type in [:create, :update] do
    do_validate_identity(changeset, identity, api)
  end

  defp validate_identity(changeset, _, _), do: changeset

  defp do_validate_identity(changeset, identity, api) do
    if changeset.context[:private][:upsert_identity] == identity.name do
      changeset
    else
      if Enum.any?(identity.keys, &changing_attribute?(changeset, &1)) do
        action = Ash.Resource.Info.primary_action(changeset.resource, :read).name

        values =
          Enum.map(identity.keys, fn key ->
            {key, Ash.Changeset.get_attribute(changeset, key)}
          end)

        changeset.resource
        |> Ash.Query.for_read(action, %{},
          tenant: changeset.tenant,
          actor: changeset.context[:private][:actor],
          authorize?: changeset.context[:private][:authorize?],
          tracer: changeset.context[:private][:tracer]
        )
        |> Ash.Query.do_filter(values)
        |> Ash.Query.limit(1)
        |> Ash.Query.set_context(%{private: %{internal?: true}})
        |> api.read_one(authorize?: false)
        |> case do
          {:ok, nil} ->
            changeset

          {:ok, _} ->
            error =
              Ash.Error.Changes.InvalidChanges.exception(
                fields: identity.keys,
                message: identity.message || "has already been taken"
              )

            add_error(changeset, error)

          {:error, error} ->
            add_error(changeset, error)
        end
      else
        changeset
      end
    end
  end

  defp require_arguments(changeset, action) do
    action.arguments
    |> Enum.filter(&(&1.allow_nil? == false))
    |> Enum.reduce(changeset, fn argument, changeset ->
      case fetch_argument(changeset, argument.name) do
        {:ok, value} when not is_nil(value) ->
          changeset

        _ ->
          if argument.name in changeset.invalid_keys do
            changeset
          else
            add_error(
              changeset,
              Ash.Error.Changes.Required.exception(
                resource: changeset.resource,
                field: argument.name,
                type: :argument
              )
            )
          end
      end
    end)
  end

  defp set_argument_defaults(changeset, action) do
    Enum.reduce(action.arguments, changeset, fn argument, changeset ->
      case fetch_argument(changeset, argument.name) do
        :error ->
          if is_nil(argument.default) do
            changeset
          else
            %{
              changeset
              | arguments: Map.put(changeset.arguments, argument.name, default(:create, argument))
            }
          end

        _ ->
          changeset
      end
    end)
  end

  defp set_actor(changeset, opts) do
    if Keyword.has_key?(opts, :actor) do
      put_context(changeset, :private, %{actor: opts[:actor]})
    else
      changeset
    end
  end

  defp set_authorize(changeset, opts) do
    if Keyword.has_key?(opts, :authorize?) do
      put_context(changeset, :private, %{authorize?: opts[:authorize?]})
    else
      changeset
    end
  end

  defp set_tracer(changeset, opts) do
    if Keyword.has_key?(opts, :tracer) do
      put_context(changeset, :private, %{tracer: opts[:tracer]})
    else
      changeset
    end
  end

  defp raise_no_action(resource, action, type) do
    available_actions =
      resource
      |> Ash.Resource.Info.actions()
      |> Enum.filter(&(&1.type == type))
      |> Enum.map_join("\n", &"    - `#{inspect(&1.name)}`")

    raise ArgumentError,
      message: """
      No such #{type} action on resource #{inspect(resource)}: #{String.slice(inspect(action), 0..50)}

      Example Call:

        Ash.Changeset.for_#{type}(changeset_or_record, :action_name, input, options)

      Available #{type} actions:

      #{available_actions}
      """
  end

  defp mark_validated(changeset, action_name) do
    %{changeset | __validated_for_action__: action_name}
  end

  @doc false
  def validate_multitenancy(changeset) do
    if Ash.Resource.Info.multitenancy_strategy(changeset.resource) &&
         not Ash.Resource.Info.multitenancy_global?(changeset.resource) &&
         is_nil(changeset.tenant) do
      add_error(
        changeset,
        "#{inspect(changeset.resource)} changesets require a tenant to be specified"
      )
    else
      changeset
    end
  end

  defp cast_params(changeset, action, params) do
    changeset = %{
      changeset
      | params: Map.merge(changeset.params, Enum.into(params, %{}))
    }

    Enum.reduce(params, changeset, fn {name, value}, changeset ->
      cond do
        has_argument?(action, name) ->
          set_argument(changeset, name, value)

        attr = Ash.Resource.Info.public_attribute(changeset.resource, name) ->
          if attr.writable? do
            change_attribute(changeset, attr.name, value)
          else
            changeset
          end

        true ->
          changeset
      end
    end)
  end

  defp has_argument?(action, name) when is_atom(name) do
    Enum.any?(action.arguments, &(&1.private? == false && &1.name == name))
  end

  defp has_argument?(action, name) when is_binary(name) do
    Enum.any?(action.arguments, &(&1.private? == false && to_string(&1.name) == name))
  end

  defp validate_attributes_accepted(changeset, %{accept: nil}), do: changeset

  defp validate_attributes_accepted(changeset, %{accept: accepted_attributes}) do
    changeset.attributes
    |> Enum.reject(fn {key, _value} ->
      key in accepted_attributes
    end)
    |> Enum.reduce(changeset, fn {key, value}, changeset ->
      add_error(
        changeset,
        InvalidAttribute.exception(
          field: key,
          message: "cannot be changed",
          value: value
        )
      )
    end)
  end

  defp run_action_changes(changeset, %{changes: changes}, actor, authorize?, tracer, metadata) do
    changes = changes ++ Ash.Resource.Info.changes(changeset.resource, changeset.action_type)

    Enum.reduce(changes, changeset, fn
      %{only_when_valid?: true}, %{valid?: false} = changeset ->
        changeset

      %{change: {module, opts}, where: where}, changeset ->
        if Enum.all?(where || [], fn {module, opts} ->
             Ash.Tracer.span :validation, "change condition: #{inspect(module)}", tracer do
               Ash.Tracer.telemetry_span [:ash, :validation], %{
                 resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
                 validation: inspect(module)
               } do
                 Ash.Tracer.set_metadata(tracer, :validation, metadata)

                 opts =
                   Ash.Filter.build_filter_from_template(
                     opts,
                     actor,
                     changeset.arguments,
                     changeset.context
                   )

                 module.validate(changeset, opts) == :ok
               end
             end
           end) do
          Ash.Tracer.span :change, "change: #{inspect(module)}", tracer do
            Ash.Tracer.telemetry_span [:ash, :change], %{
              resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
              change: inspect(module)
            } do
              {:ok, opts} = module.init(opts)

              Ash.Tracer.set_metadata(tracer, :change, metadata)

              opts =
                Ash.Filter.build_filter_from_template(
                  opts,
                  actor,
                  changeset.arguments,
                  changeset.context
                )

              module.change(changeset, opts, %{
                actor: actor,
                authorize?: authorize? || false,
                tracer: tracer
              })
            end
          end
        else
          changeset
        end

      %{validation: _} = validation, changeset ->
        validate(changeset, validation, tracer, metadata, actor)
    end)
  end

  @doc false
  def hydrate_atomic_refs(changeset, actor) do
    %{
      changeset
      | atomics:
          Enum.map(changeset.atomics, fn {key, expr} ->
            expr =
              Ash.Filter.build_filter_from_template(
                expr,
                actor,
                changeset.arguments,
                changeset.context
              )

            {:ok, expr} =
              Ash.Filter.hydrate_refs(expr, %{resource: changeset.resource, public?: false})

            {key, expr}
          end)
    }
  end

  @doc false
  def set_defaults(changeset, action_type, lazy? \\ false)

  def set_defaults(changeset, :create, lazy?) do
    with_static_defaults =
      changeset.resource
      |> Ash.Resource.Info.static_default_attributes(:create)
      |> Enum.reduce(changeset, fn attribute, changeset ->
        if changing_attribute?(changeset, attribute.name) do
          changeset
        else
          changeset
          |> unsafe_change_attribute(attribute.name, default(:create, attribute))
          |> Map.update!(:defaults, fn defaults ->
            [attribute.name | defaults]
          end)
        end
      end)
      |> Map.update!(:defaults, &Enum.uniq/1)

    if lazy? do
      set_lazy_defaults(with_static_defaults, :create)
    else
      with_static_defaults
    end
    |> Map.update!(:defaults, &Enum.uniq/1)
  end

  def set_defaults(changeset, :update, lazy?) do
    with_static_defaults =
      changeset.resource
      |> Ash.Resource.Info.static_default_attributes(:update)
      |> Enum.reduce(changeset, fn attribute, changeset ->
        if changing_attribute?(changeset, attribute.name) do
          changeset
        else
          changeset
          |> unsafe_change_attribute(attribute.name, default(:update, attribute))
          |> Map.update!(:defaults, fn defaults ->
            [attribute.name | defaults]
          end)
        end
      end)

    if lazy? do
      set_lazy_defaults(with_static_defaults, :update)
    else
      with_static_defaults
    end
    |> Map.update!(:defaults, &Enum.uniq/1)
  end

  def set_defaults(changeset, _, _) do
    changeset
  end

  defp set_lazy_defaults(changeset, type) do
    changeset
    |> set_lazy_non_matching_defaults(type)
    |> set_lazy_matching_defaults(type)
  end

  defp set_lazy_non_matching_defaults(changeset, type) do
    changeset.resource
    |> Ash.Resource.Info.lazy_non_matching_default_attributes(type)
    |> Enum.reduce(changeset, fn attribute, changeset ->
      if changing_attribute?(changeset, attribute.name) do
        changeset
      else
        changeset
        |> force_change_attribute(attribute.name, default(type, attribute))
        |> Map.update!(:defaults, fn defaults ->
          [attribute.name | defaults]
        end)
      end
    end)
  end

  defp set_lazy_matching_defaults(changeset, type) do
    changeset.resource
    |> Ash.Resource.Info.lazy_matching_default_attributes(type)
    |> Enum.group_by(fn attribute ->
      case type do
        :create ->
          attribute.default

        :update ->
          attribute.update_default
      end
    end)
    |> Enum.reduce(changeset, fn {default_fun, attributes}, changeset ->
      default_value =
        case default_fun do
          function when is_function(function) ->
            function.()

          {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
            apply(m, f, a)
        end

      Enum.reduce(attributes, changeset, fn attribute, changeset ->
        if changing_attribute?(changeset, attribute.name) do
          changeset
        else
          changeset
          |> unsafe_change_attribute(attribute.name, default_value)
          |> Map.update!(:defaults, fn defaults ->
            [attribute.name | defaults]
          end)
        end
      end)
    end)
  end

  defp default(:create, %{default: {mod, func, args}}), do: apply(mod, func, args)
  defp default(:create, %{default: function}) when is_function(function, 0), do: function.()
  defp default(:create, %{default: value}), do: value

  defp default(:update, %{update_default: {mod, func, args}}), do: apply(mod, func, args)

  defp default(:update, %{update_default: function}) when is_function(function, 0),
    do: function.()

  defp default(:update, %{update_default: value}), do: value

  defp add_validations(changeset, tracer, metadata, actor) do
    if changeset.action.skip_global_validations? do
      changeset
    else
      changeset.resource
      # We use the `changeset.action_type` to support soft deletes
      # Because a delete is an `update` with an action type of `update`
      |> Ash.Resource.Info.validations(changeset.action_type)
      |> then(fn validations ->
        if changeset.action.delay_global_validations? do
          Enum.map(validations, &%{&1 | before_action?: true})
        else
          validations
        end
      end)
      |> Enum.reduce(changeset, &validate(&2, &1, tracer, metadata, actor))
    end
  end

  defp validate(changeset, validation, tracer, metadata, actor) do
    if validation.before_action? do
      before_action(
        changeset,
        fn changeset ->
          if validation.only_when_valid? and not changeset.valid? do
            changeset
          else
            do_validation(changeset, validation, tracer, metadata, actor)
          end
        end,
        append?: true
      )
    else
      if validation.only_when_valid? and not changeset.valid? do
        changeset
      else
        do_validation(changeset, validation, tracer, metadata, actor)
      end
    end
  end

  defp do_validation(changeset, validation, tracer, metadata, actor) do
    if Enum.all?(validation.where || [], fn {module, opts} ->
         opts =
           Ash.Filter.build_filter_from_template(
             opts,
             actor,
             changeset.arguments,
             changeset.context
           )

         case module.init(opts) do
           {:ok, opts} ->
             module.validate(changeset, opts) == :ok

           _ ->
             false
         end
       end) do
      Ash.Tracer.span :validation, "validate: #{inspect(validation.module)}", tracer do
        Ash.Tracer.telemetry_span [:ash, :validation], %{
          resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
          validation: inspect(validation.module)
        } do
          Ash.Tracer.set_metadata(tracer, :validation, metadata)

          opts =
            Ash.Filter.build_filter_from_template(
              validation.opts,
              actor,
              changeset.arguments,
              changeset.context
            )

          with {:ok, opts} <- validation.module.init(opts),
               :ok <- validation.module.validate(changeset, opts) do
            changeset
          else
            :ok ->
              changeset

            {:error, error} when is_binary(error) ->
              Ash.Changeset.add_error(changeset, validation.message || error)

            {:error, error} when is_exception(error) ->
              if validation.message do
                error = override_validation_message(error, validation.message)
                Ash.Changeset.add_error(changeset, error)
              else
                Ash.Changeset.add_error(changeset, error)
              end

            {:error, errors} when is_list(errors) ->
              if validation.message do
                errors =
                  Enum.map(errors, fn error ->
                    override_validation_message(error, validation.message)
                  end)

                Ash.Changeset.add_error(changeset, errors)
              else
                Ash.Changeset.add_error(changeset, errors)
              end

            {:error, error} ->
              error =
                if Keyword.keyword?(error) do
                  Keyword.put(error, :message, validation.message || error[:message])
                else
                  validation.message || error
                end

              Ash.Changeset.add_error(changeset, error)
          end
        end
      end
    else
      changeset
    end
  end

  defp override_validation_message(error, message) do
    case error do
      %{field: field} = error when not is_nil(field) ->
        error
        |> Map.take([:field, :vars])
        |> Map.to_list()
        |> Keyword.put(:message, message)
        |> Keyword.put(:value, Map.get(error, :value))
        |> InvalidAttribute.exception()

      %{fields: fields} when fields not in [nil, []] ->
        error
        |> Map.take([:fields, :vars])
        |> Map.to_list()
        |> Keyword.put(:message, message)
        |> Keyword.put(:value, Map.get(error, :value))
        |> InvalidChanges.exception()

      _ ->
        message
    end
  end

  @doc false
  def require_values(changeset, action_type, private_and_belongs_to? \\ false, attrs \\ nil)

  def require_values(changeset, :create, private_and_belongs_to?, attrs) do
    attributes =
      attrs ||
        attributes_to_require(changeset.resource, changeset.action, private_and_belongs_to?)

    attributes
    |> Enum.map(fn
      attribute when is_struct(attribute, Ash.Resource.Attribute) ->
        attribute

      name when is_atom(name) ->
        Ash.Resource.Info.attribute(changeset.resource, name)
    end)
    |> then(fn attributes ->
      if private_and_belongs_to? do
        attributes
      else
        Enum.reject(attributes, &belongs_to_attr_of_rel_being_managed?(&1, changeset))
      end
    end)
    |> Enum.reduce(changeset, fn required_attribute, changeset ->
      if changing_attribute?(changeset, required_attribute.name) do
        if is_nil(get_attribute(changeset, required_attribute.name)) do
          if required_attribute.name in changeset.invalid_keys do
            changeset
          else
            add_error(
              changeset,
              Required.exception(
                resource: changeset.resource,
                field: required_attribute.name,
                type: :attribute
              )
            )
          end
        else
          changeset
        end
      else
        if is_nil(required_attribute.default) ||
             required_attribute.name in changeset.action.require_attributes do
          if required_attribute.name in changeset.invalid_keys do
            changeset
          else
            add_error(
              changeset,
              Required.exception(
                resource: changeset.resource,
                field: required_attribute.name,
                type: :attribute
              )
            )
          end
        else
          changeset
        end
      end
    end)
  end

  def require_values(changeset, :update, private_and_belongs_to?, attrs) do
    attributes =
      attrs ||
        attributes_to_require(changeset.resource, changeset.action, private_and_belongs_to?)

    attributes
    |> Enum.map(fn
      attribute when is_struct(attribute, Ash.Resource.Attribute) ->
        attribute

      name when is_atom(name) ->
        Ash.Resource.Info.attribute(changeset.resource, name)
    end)
    |> then(fn attributes ->
      if private_and_belongs_to? do
        attributes
      else
        Enum.reject(attributes, &belongs_to_attr_of_rel_being_managed?(&1, changeset))
      end
    end)
    |> Enum.reduce(changeset, fn required_attribute, changeset ->
      if changing_attribute?(changeset, required_attribute.name) do
        if is_nil(get_attribute(changeset, required_attribute.name)) do
          if required_attribute.name in changeset.invalid_keys do
            changeset
          else
            add_error(
              changeset,
              Required.exception(
                resource: changeset.resource,
                field: required_attribute.name,
                type: :attribute
              )
            )
          end
        else
          changeset
        end
      else
        changeset
      end
    end)
  end

  def require_values(changeset, _, _, _), do: changeset

  defp belongs_to_attr_of_rel_being_managed?(attribute, changeset) do
    do_belongs_to_attr_of_rel_being_managed?(changeset, attribute) ||
      belongs_to_attr_of_being_managed_through?(changeset, attribute)
  end

  defp do_belongs_to_attr_of_rel_being_managed?(changeset, attribute) do
    Enum.any?(changeset.relationships, fn {key, _} ->
      relationship = Ash.Resource.Info.relationship(changeset.resource, key)
      relationship.type == :belongs_to && relationship.source_attribute == attribute.name
    end)
  end

  defp belongs_to_attr_of_being_managed_through?(
         %{context: %{accessing_from: %{source: source, name: relationship}}},
         attribute
       ) do
    case Ash.Resource.Info.relationship(source, relationship) do
      %{type: :belongs_to} -> false
      relationship -> relationship.destination_attribute == attribute.name
    end
  end

  defp belongs_to_attr_of_being_managed_through?(_, _), do: false

  # Attributes that are private and/or are the source field of a belongs_to relationship
  # are typically not set by input, so they aren't required until the actual action
  # is run.
  defp attributes_to_require(
         resource,
         %{type: :create},
         true
       ) do
    resource
    |> Ash.Resource.Info.attributes()
    |> Enum.reject(&(&1.allow_nil? || &1.generated?))
  end

  defp attributes_to_require(
         resource,
         %{type: :create, accept: accept, require_attributes: require_attributes} = action,
         false
       ) do
    resource
    |> do_attributes_to_require(action)
    |> Enum.filter(&(&1.name in accept || &1.name in require_attributes))
  end

  defp attributes_to_require(resource, _action, true = _private_and_belongs_to?) do
    resource
    |> Ash.Resource.Info.attributes()
    |> Enum.reject(&(&1.allow_nil? || &1.generated?))
  end

  defp attributes_to_require(resource, action, false = _private_and_belongs_to?) do
    do_attributes_to_require(resource, action)
  end

  defp do_attributes_to_require(resource, action) do
    action =
      case action do
        action when is_atom(action) ->
          Ash.Resource.Info.action(resource, action)

        _ ->
          action
      end

    allow_nil_input =
      case action do
        %{allow_nil_input: allow_nil_input} ->
          allow_nil_input

        _ ->
          []
      end

    masked_argument_names = Enum.map(action.arguments, & &1.name)

    resource
    |> Ash.Resource.Info.attributes()
    |> Enum.reject(
      &(&1.allow_nil? || &1.private? || !&1.writable? || &1.generated? ||
          &1.name in masked_argument_names ||
          &1.name in allow_nil_input)
    )
  end

  @doc """
  Wraps a function in the before/after action hooks of a changeset.

  The function takes a changeset and if it returns
  `{:ok, result}`, the result will be passed through the after
  action hooks.
  """
  @spec with_hooks(
          t(),
          (t() ->
             {:ok, term, %{notifications: list(Ash.Notifier.Notification.t())}}
             | {:error, term}),
          Keyword.t()
        ) ::
          {:ok, term, t(), %{notifications: list(Ash.Notifier.Notification.t())}} | {:error, term}
  def with_hooks(changeset, func, opts \\ [])

  def with_hooks(changeset, _func, _opts) when changeset.valid? == false do
    {:error, changeset.errors}
  end

  def with_hooks(changeset, func, opts) do
    if opts[:transaction?] && Ash.DataLayer.data_layer_can?(changeset.resource, :transact) do
      transaction_hooks(changeset, fn changeset ->
        resources =
          changeset.resource
          |> List.wrap()
          |> Enum.concat(changeset.action.touches_resources)
          |> Enum.uniq()

        notify? =
          if Process.get(:ash_started_transaction?) do
            false
          else
            Process.put(:ash_started_transaction?, true)
            true
          end

        resources = Enum.reject(resources, &Ash.DataLayer.in_transaction?/1)

        try do
          resources
          |> Ash.DataLayer.transaction(
            fn ->
              case run_around_actions(changeset, func) do
                {:error, error} ->
                  if opts[:rollback_on_error?] do
                    Ash.DataLayer.rollback(
                      changeset.resource,
                      error
                    )
                  else
                    {:error, error}
                  end

                other ->
                  other
              end
            end,
            changeset.timeout || :infinity,
            Map.put(
              opts[:transaction_metadata],
              :data_layer_context,
              changeset.context[:data_layer] || %{}
            )
          )
          |> case do
            {:ok, {:ok, value, changeset, instructions}} ->
              notifications =
                if notify? && !opts[:return_notifications?] do
                  Enum.concat(
                    instructions[:notifications] || [],
                    Process.delete(:ash_notifications) || []
                  )
                else
                  instructions[:notifications] || []
                end

              {:ok, value, changeset, Map.put(instructions, :notifications, notifications)}

            {:ok, {:error, error}} ->
              {:error, error}

            {:error, error} ->
              {:error, error}
          end
        after
          if notify? do
            Process.delete(:ash_started_transaction?)
          end
        end
      end)
    else
      transaction_hooks(changeset, fn changeset ->
        if changeset.timeout do
          Ash.Engine.task_with_timeout(
            fn ->
              run_around_actions(changeset, func)
            end,
            changeset.resource,
            changeset.timeout,
            "#{inspect(changeset.resource)}.#{changeset.action.name}",
            opts[:tracer]
          )
        else
          run_around_actions(changeset, func)
        end
      end)
    end
    |> case do
      {:ok, value, changeset, instructions} ->
        if opts[:return_notifications?] do
          {:ok, value, changeset, instructions}
        else
          if Process.get(:ash_started_transaction?) do
            current_notifications = Process.get(:ash_notifications, [])

            Process.put(
              :ash_notifications,
              current_notifications ++ (instructions[:notifications] || [])
            )

            {:ok, value, changeset, Map.put(instructions, :notifications, [])}
          else
            notifications = Ash.Notifier.notify(instructions[:notifications] || [])

            {:ok, value, changeset, Map.put(instructions, :notifications, notifications)}
          end
        end

      other ->
        other
    end
  end

  defp warn_on_transaction_hooks(_, [], _), do: :ok

  defp warn_on_transaction_hooks(changeset, _, type) do
    if changeset.context[:warn_on_transaction_hooks?] != false &&
         Ash.DataLayer.in_transaction?(changeset.resource) &&
         (changeset.before_transaction != [] or changeset.around_transaction != []) do
      message =
        if type in ["before_transaction", "around_transaction"] do
          "already"
        else
          "still"
        end

      Logger.warning("""
      One or more `#{type}` hooks on `#{inspect(changeset.resource)}.#{changeset.action.name}` are being executed,
      but there is an ongoing transaction #{message} happening.

      This means that you may be running an action in a transaction that you did not design with the intent of running in a surrounding transaction.
      You should either

      1. Create another action that is safe to use in a surrounding transaction, and use that instead of this one
      2. Silence this warning using `set_context(%{warn_on_transaction_hooks?: false})` in the action definition
      3. If building a changeset manually, do #2 except programmatically, `Ash.Changeset.set_context(changeset, %{warn_on_transaction_hooks?: false})`
      """)
    end
  end

  defp transaction_hooks(changeset, func) do
    warn_on_transaction_hooks(changeset, changeset.around_transaction, "around_transaction")

    run_around_transaction_hooks(changeset, fn changeset ->
      warn_on_transaction_hooks(changeset, changeset.before_transaction, "before_transaction")

      changeset = run_before_transaction_hooks(changeset)

      result =
        try do
          func.(clear_phase(changeset))
        rescue
          exception ->
            {:raise, exception, __STACKTRACE__}
        catch
          :exit, reason ->
            {:exit, reason}
        end

      case result do
        {:exit, reason} ->
          error = Ash.Error.to_ash_error(reason)

          case run_after_transactions({:error, error}, changeset) do
            {:ok, result} ->
              {:ok, result, %{}}

            {:error, new_error} when new_error == error ->
              exit(reason)

            {:error, new_error} ->
              exit(new_error)
          end

        {:raise, exception, stacktrace} ->
          case run_after_transactions({:error, exception}, changeset) do
            {:ok, result} ->
              {:ok, result, changeset, %{}}

            {:error, error} ->
              reraise error, stacktrace
          end

        {:ok, result, changeset, notifications} ->
          case run_after_transactions({:ok, result}, changeset) do
            {:ok, result} ->
              {:ok, result, changeset, notifications}

            {:error, error} ->
              {:error, error}
          end

        {:ok, result, notifications} ->
          case run_after_transactions({:ok, result}, changeset) do
            {:ok, result} ->
              {:ok, result, changeset, notifications}

            {:error, error} ->
              {:error, error}
          end

        {:error, error} ->
          case run_after_transactions({:error, error}, changeset) do
            {:ok, result} ->
              {:ok, result, changeset, %{}}

            {:error, error} ->
              {:error, error}
          end
      end
    end)
  end

  defp run_around_transaction_hooks(%{around_transaction: []} = changeset, func),
    do: func.(changeset)

  defp run_around_transaction_hooks(%{around_transaction: [around | rest]} = changeset, func) do
    changeset
    |> set_phase(:around_transaction)
    |> around.(fn changeset ->
      run_around_transaction_hooks(%{changeset | around_transaction: rest}, func)
    end)
  end

  def run_before_transaction_hooks(changeset) do
    Enum.reduce_while(
      changeset.before_transaction,
      set_phase(changeset, :before_transaction),
      fn before_transaction, changeset ->
        metadata = %{
          api: changeset.api,
          resource: changeset.resource,
          resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
          actor: changeset.context[:private][:actor],
          tenant: changeset.context[:private][:tenant],
          action: changeset.action && changeset.action.name,
          authorize?: changeset.context[:private][:authorize?]
        }

        tracer = changeset.context[:private][:tracer]

        result =
          Ash.Tracer.span :before_transaction,
                          "before_transaction",
                          tracer do
            Ash.Tracer.set_metadata(tracer, :before_transaction, metadata)

            Ash.Tracer.telemetry_span [:ash, :before_transaction], metadata do
              before_transaction.(changeset)
            end
          end

        case result do
          {:error, error} ->
            {:halt, {:error, error}}

          changeset ->
            cont =
              if changeset.valid? do
                :cont
              else
                :halt
              end

            {cont, changeset}
        end
      end
    )
  end

  @doc false
  def run_before_actions(%{before_action: []} = changeset), do: {changeset, %{notifications: []}}

  def run_before_actions(changeset) do
    can_atomic_upsert? = Ash.DataLayer.data_layer_can?(changeset.resource, {:atomic, :upsert})

    Enum.reduce_while(
      changeset.before_action,
      {changeset, %{notifications: []}},
      fn before_action, {changeset, instructions} ->
        metadata = %{
          api: changeset.api,
          resource: changeset.resource,
          resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
          actor: changeset.context[:private][:actor],
          tenant: changeset.context[:private][:actor],
          action: changeset.action && changeset.action.name,
          authorize?: changeset.context[:private][:authorize?]
        }

        tracer = changeset.context[:private][:tracer]

        result =
          Ash.Tracer.span :before_action,
                          "before_action",
                          tracer do
            Ash.Tracer.set_metadata(tracer, :before_action, metadata)

            Ash.Tracer.telemetry_span [:ash, :before_action], metadata do
              before_action.(changeset)
            end
          end

        case result do
          {:error, error} ->
            {:halt, {:error, error}}

          {changeset, %{notifications: notifications}} ->
            cont =
              if changeset.valid? do
                :cont
              else
                :halt
              end

            {cont,
             {changeset,
              %{
                instructions
                | notifications: instructions.notifications ++ List.wrap(notifications)
              }}}

          changeset ->
            cont =
              if changeset.valid? do
                :cont
              else
                :halt
              end

            {cont, {changeset, instructions}}
        end
      end
    )
    |> case do
      {:error, error} ->
        {:error, error}

      {%{atomics: atomics} = changeset, _} when atomics != [] and not can_atomic_upsert? ->
        Ash.Changeset.add_error(
          changeset,
          Ash.Error.Invalid.AtomicsNotSupported.exception(
            resource: changeset.resource,
            action_type: :create
          )
        )

      {%{valid?: true} = changeset, instructions} ->
        {Ash.Changeset.hydrate_atomic_refs(changeset, changeset.context[:private][:actor]),
         instructions}

      {changeset, instructions} ->
        {changeset, instructions}
    end
  end

  @doc false
  def run_after_transactions(result, changeset) do
    warn_on_transaction_hooks(changeset, changeset.before_transaction, "after_transaction")

    changeset = set_phase(changeset, :after_transaction)

    changeset.after_transaction
    |> Enum.reduce(
      result,
      fn after_transaction, result ->
        tracer = changeset.context[:private][:tracer]

        metadata = %{
          api: changeset.api,
          resource: changeset.resource,
          resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
          actor: changeset.context[:private][:actor],
          tenant: changeset.context[:private][:actor],
          action: changeset.action && changeset.action.name,
          authorize?: changeset.context[:private][:authorize?]
        }

        Ash.Tracer.span :after_transaction,
                        "after_transaction",
                        tracer do
          Ash.Tracer.set_metadata(tracer, :after_transaction, metadata)

          Ash.Tracer.telemetry_span [:ash, :after_transaction], metadata do
            after_transaction.(changeset, result)
          end
        end
      end
    )
    |> case do
      {:ok, new_result} ->
        {:ok, new_result}

      {:error, error} ->
        {:error, error}
    end
  end

  defp run_around_actions(%{around_action: []} = changeset, func) do
    changeset =
      changeset
      |> put_context(:private, %{in_before_action?: true})
      |> set_phase(:before_action)

    result =
      if changeset.atomics != [] &&
           !Ash.DataLayer.data_layer_can?(changeset.resource, {:atomic, :upsert}) do
        {:error,
         Ash.Error.Invalid.AtomicsNotSupported.exception(
           resource: changeset.resource,
           action_type: :create
         )}
      else
        run_before_actions(changeset)
      end

    case result do
      {:error, error} ->
        {:error, error}

      {changeset, %{notifications: before_action_notifications}} ->
        changeset
        |> clear_phase()
        |> func.()
        |> case do
          {:ok, result, instructions} ->
            run_after_actions(
              result,
              instructions[:new_changeset] || changeset,
              List.wrap(instructions[:notifications]) ++ before_action_notifications
            )

          {:ok, result} ->
            run_after_actions(result, changeset, before_action_notifications)

          {:error, error} ->
            {:error, error}
        end
    end
  end

  defp run_around_actions(
         %{around_action: [around | rest]} = changeset,
         func
       ) do
    changeset
    |> set_phase(:around_action)
    |> around.(fn changeset ->
      run_around_actions(%{changeset | around_action: rest}, func)
    end)
  end

  @doc false
  def run_after_actions(result, changeset, before_action_notifications) do
    changeset = set_phase(changeset, :after_action)

    Enum.reduce_while(
      changeset.after_action,
      {:ok, result, changeset, %{notifications: before_action_notifications}},
      fn after_action, {:ok, result, changeset, %{notifications: notifications} = acc} ->
        tracer = changeset.context[:private][:tracer]

        metadata = %{
          api: changeset.api,
          resource: changeset.resource,
          resource_short_name: Ash.Resource.Info.short_name(changeset.resource),
          actor: changeset.context[:private][:actor],
          tenant: changeset.context[:private][:actor],
          action: changeset.action && changeset.action.name,
          authorize?: changeset.context[:private][:authorize?]
        }

        result =
          Ash.Tracer.span :after_action,
                          "after_action",
                          tracer do
            Ash.Tracer.set_metadata(tracer, :after_action, metadata)

            Ash.Tracer.telemetry_span [:ash, :after_action], metadata do
              after_action.(changeset, result)
            end
          end

        case result do
          {:ok, new_result, new_notifications} ->
            all_notifications =
              Enum.map(notifications ++ new_notifications, fn notification ->
                %{
                  notification
                  | resource: notification.resource || changeset.resource,
                    action:
                      notification.action ||
                        Ash.Resource.Info.action(
                          changeset.resource,
                          changeset.action,
                          changeset.action_type
                        ),
                    data: notification.data || new_result,
                    changeset: notification.changeset || changeset,
                    actor: notification.actor || changeset.context[:private][:actor]
                }
              end)

            {:cont,
             {:ok, new_result, clear_phase(changeset), %{acc | notifications: all_notifications}}}

          {:ok, new_result} ->
            {:cont, {:ok, new_result, clear_phase(changeset), acc}}

          {:error, error} ->
            {:halt, {:error, error}}

          other ->
            raise """
            Invalid return value from after_action hook. Expected one of:

            * {:ok, result}
            * {:ok, result, notifications}
            * {:error, error}

            Got:

            #{inspect(other)}
            """
        end
      end
    )
  end

  @doc "Gets the value of an argument provided to the changeset."
  @spec get_argument(t, atom) :: term
  def get_argument(changeset, argument) when is_atom(argument) do
    if Map.has_key?(changeset.arguments, argument) do
      Map.get(changeset.arguments, argument)
    else
      Map.get(changeset.arguments, to_string(argument))
    end
  end

  @doc "Fetches the value of an argument provided to the changeset or `:error`."
  @spec fetch_argument(t, atom) :: {:ok, term} | :error
  def fetch_argument(changeset, argument) when is_atom(argument) do
    case Map.fetch(changeset.arguments, argument) do
      {:ok, value} ->
        {:ok, value}

      :error ->
        case Map.fetch(changeset.arguments, to_string(argument)) do
          {:ok, value} -> {:ok, value}
          :error -> :error
        end
    end
  end

  @doc "Gets the changing value or the original value of an attribute."
  @spec get_attribute(t, atom) :: term
  def get_attribute(changeset, attribute) do
    case fetch_change(changeset, attribute) do
      {:ok, value} ->
        value

      :error ->
        get_data(changeset, attribute)
    end
  end

  @doc "Gets the value of an argument provided to the changeset, falling back to `Ash.Changeset.get_attribute/2` if nothing was provided."
  @spec get_argument_or_attribute(t, atom) :: term
  def get_argument_or_attribute(changeset, attribute) do
    case fetch_argument(changeset, attribute) do
      {:ok, value} -> value
      :error -> get_attribute(changeset, attribute)
    end
  end

  @doc "Gets the new value for an attribute, or `:error` if it is not being changed."
  @spec fetch_change(t, atom) :: {:ok, any} | :error
  def fetch_change(changeset, attribute) do
    Map.fetch(changeset.attributes, attribute)
  end

  @doc "Gets the value of an argument provided to the changeset, falling back to `Ash.Changeset.fetch_change/2` if nothing was provided."
  @spec fetch_argument_or_change(t, atom) :: {:ok, any} | :error
  def fetch_argument_or_change(changeset, attribute) do
    case fetch_argument(changeset, attribute) do
      {:ok, value} -> {:ok, value}
      :error -> fetch_change(changeset, attribute)
    end
  end

  @doc "Gets the original value for an attribute"
  @spec get_data(t, atom) :: term
  def get_data(changeset, attribute) do
    Map.get(changeset.data, attribute)
  end

  @doc """
  Puts a key/value in the changeset context that can be used later.

  Do not use the `private` key in your custom context, as that is reserved for internal use.
  """
  @spec put_context(t(), atom, term) :: t()
  def put_context(changeset, key, value) do
    set_context(changeset, %{key => value})
  end

  @spec set_tenant(t(), String.t()) :: t()
  def set_tenant(changeset, tenant) do
    %{changeset | tenant: tenant}
  end

  @spec timeout(t(), nil | pos_integer, nil | pos_integer) :: t()
  def timeout(changeset, timeout, default \\ nil) do
    %{changeset | timeout: timeout || default}
  end

  @doc """
  Deep merges the provided map into the changeset context that can be used later.

  Do not use the `private` key in your custom context, as that is reserved for internal use.
  """
  @spec set_context(t(), map | nil) :: t()
  def set_context(changeset, nil), do: changeset

  def set_context(changeset, map) do
    %{changeset | context: Ash.Helpers.deep_merge_maps(changeset.context, map)}
  end

  @type manage_relationship_type ::
          :append_and_remove | :append | :remove | :direct_control | :create

  @spec manage_relationship_opts(manage_relationship_type()) :: Keyword.t()
  def manage_relationship_opts(:replace) do
    Logger.warning(
      "`type: :replace` has been renamed to `:append_and_remove` in 2.0, and it will be removed in 2.1"
    )

    manage_relationship_opts(:append_and_remove)
  end

  def manage_relationship_opts(:append_and_remove) do
    [
      on_lookup: :relate,
      on_no_match: :error,
      on_match: :ignore,
      on_missing: :unrelate
    ]
  end

  def manage_relationship_opts(:append) do
    [
      on_lookup: :relate,
      on_no_match: :error,
      on_match: :ignore,
      on_missing: :ignore
    ]
  end

  def manage_relationship_opts(:remove) do
    [
      on_no_match: :error,
      on_match: :unrelate,
      on_missing: :ignore
    ]
  end

  def manage_relationship_opts(:create) do
    [
      on_no_match: :create,
      on_match: :ignore
    ]
  end

  def manage_relationship_opts(:direct_control) do
    [
      on_lookup: :ignore,
      on_no_match: :create,
      on_match: :update,
      on_missing: :destroy
    ]
  end

  @manage_opts [
    type: [
      type: {:one_of, @manage_types},
      doc: """
      If the `type` is specified, the default values of each option is modified to match that `type` of operation.

      This allows for specifying certain operations much more succinctly. The defaults that are modified are listed below:

      - `:append_and_remove`

            [
              on_lookup: :relate,
              on_no_match: :error,
              on_match: :ignore,
              on_missing: :unrelate
            ]

      - `:append`

            [
              on_lookup: :relate,
              on_no_match: :error,
              on_match: :ignore,
              on_missing: :ignore
            ]

      - `:remove`

            [
              on_no_match: :error,
              on_match: :unrelate,
              on_missing: :ignore
            ]

      - `:direct_control`

            [
              on_lookup: :ignore,
              on_no_match: :create,
              on_match: :update,
              on_missing: :destroy
            ]

      - `:create`

            [
              on_no_match: :create,
              on_match: :ignore
            ]
      """
    ],
    authorize?: [
      type: :boolean,
      default: true,
      doc:
        "Authorize reads and changes to the destination records, if the primary change is being authorized as well."
    ],
    eager_validate_with: [
      type: :atom,
      default: false,
      doc:
        "Validates that any referenced entities exist *before* the action is being performed, using the provided API for the read."
    ],
    on_no_match: [
      type: :any,
      default: :ignore,
      doc: """
      Instructions for handling records where no matching record existed in the relationship.

      * `:ignore` (default) - those inputs are ignored
      * `:match` - For `has_one` and `belongs_to` only, any input is treated as a match for an existing value. For `has_many` and `many_to_many`, this is the same as `:ignore`.
      * `:create` - the records are created using the destination's primary create action
      * `{:create, :action_name}` - the records are created using the specified action on the destination resource
      * `{:create, :action_name, :join_table_action_name, [:list, :of, :join_table, :params]}` - Same as `{:create, :action_name}` but takes
          the list of params specified out and applies them when creating the join record, with the provided join_table_action_name.
      * `:error`  - an error is returned indicating that a record would have been created
        *  If `on_lookup` is set, and the data contained a primary key or identity, then the error is a `NotFound` error
        * Otherwise, an `InvalidRelationship` error is returned
      """
    ],
    value_is_key: [
      type: :atom,
      doc: """
      Configures what key to use when a single value is provided.

      This is useful when you use things like a list of strings i.e `friend_emails` to manage the relationship, instead of a list of maps.

      By default, we assume it is the primary key of the destination resource, unless it is a composite primary key.
      """
    ],
    identity_priority: [
      type: {:list, :atom},
      doc: """
      The list, in priority order, of identities to use when looking up records for `on_lookup`, and matching records with `on_match`.

      Use `:_primary_key` to prioritize checking a match with the primary key.
      All identities, along with `:_primary_key` are checked regardless, this only allows ensuring that some are checked first.
      Defaults to the list provided by `use_identities`, so you typically won't need this option.
      """
    ],
    use_identities: [
      type: {:list, :atom},
      doc: """
      A list of identities that may be used to look up and compare records. Use `:_primary_key` to include the primary key. By default, only `[:_primary_key]` is used.
      """
    ],
    on_lookup: [
      type: :any,
      default: :ignore,
      doc: """
      Before creating a record (because no match was found in the relationship), the record can be looked up and related.

      * `:ignore` (default) - Does not look for existing entries (matches in the current relationship are still considered updates)
      * `:relate` - Same as calling `{:relate, primary_action_name}`
      * `{:relate, :action_name}` - the records are looked up by primary key/the first identity that is found (using the primary read action), and related. The action should be:
          * `many_to_many` - a create action on the join resource
          * `has_many` - an update action on the destination resource
          * `has_one` - an update action on the destination resource
          * `belongs_to` - an update action on the source resource
      * `{:relate, :action_name, :read_action_name}` - Same as the above, but customizes the read action called to search for matches.
      * `:relate_and_update` - Same as `:relate`, but the remaining parameters from the lookup are passed into the action that is used to change the relationship key
      * `{:relate_and_update, :action_name}` - Same as the above, but customizes the action used. The action should be:
          * `many_to_many` - a create action on the join resource
          * `has_many` - an update action on the destination resource
          * `has_one` - an update action on the destination resource
          * `belongs_to` - an update action on the source resource
      * `{:relate_and_update, :action_name, :read_action_name}` - Same as the above, but customizes the read action called to search for matches.
      * `{:relate_and_update, :action_name, :read_action_name, [:list, :of, :join_table, :params]}` - Same as the above, but uses the provided list of parameters when creating
          the join row (only relevant for many to many relationships). Use `:all` to *only* update the join record, and pass all parameters to its action
      """
    ],
    on_match: [
      type: :any,
      default: :ignore,
      doc: """
      Instructions for handling records where a matching record existed in the relationship already.

      * `:ignore` (default) - those inputs are ignored
      * `:update` - the record is updated using the destination's primary update action
      * `{:update, :action_name}` - the record is updated using the specified action on the destination resource
      * `{:update, :action_name, :join_table_action_name, [:list, :of, :params]}` - Same as `{:update, :action_name}` but takes
          the list of params specified out and applies them as an update to the join record (only valid for many to many).
      * `{:destroy, :action_name}` - the record is destroyed using the specified action on the destination resource. The action should be:
        * `many_to_many` - a destroy action on the join record
        * `has_many` - a destroy action on the destination resource
        * `has_one` - a destroy action on the destination resource
        * `belongs_to` - a destroy action on the destination resource
      * `:error`  - an error is returned indicating that a record would have been updated
      * `:no_match` - ignores the primary key match and follows the `on_no_match` instructions with these records instead.
      * `:unrelate` - the related item is not destroyed, but the data is "unrelated", making this behave like `remove_from_relationship/3`. The action should be:
        * `many_to_many` - the join resource row is destroyed
        * `has_many` - the `destination_attribute` (on the related record) is set to `nil`
        * `has_one` - the `destination_attribute` (on the related record) is set to `nil`
        * `belongs_to` - the `source_attribute` (on this record) is set to `nil`
      * `{:unrelate, :action_name}` - the record is unrelated using the provided update action. The action should be:
        * `many_to_many` - a destroy action on the join resource
        * `has_many` - an update action on the destination resource
        * `has_one` - an update action on the destination resource
        * `belongs_to` - an update action on the source resource
      """
    ],
    on_missing: [
      type: :any,
      default: :ignore,
      doc: """
      Instructions for handling records that existed in the current relationship but not in the input.

      * `:ignore` (default) - those inputs are ignored
      * `:destroy` - the record is destroyed using the destination's primary destroy action
      * `{:destroy, :action_name}` - the record is destroyed using the specified action on the destination resource
      * `{:destroy, :action_name, :join_resource_action_name, [:join, :keys]}` - the record is destroyed using the specified action on the destination resource,
        but first the join resource is destroyed with its specified action
      * `:error`  - an error is returned indicating that a record would have been updated
      * `:unrelate` - the related item is not destroyed, but the data is "unrelated", making this behave like `remove_from_relationship/3`. The action should be:
        * `many_to_many` - the join resource row is destroyed
        * `has_many` - the `destination_attribute` (on the related record) is set to `nil`
        * `has_one` - the `destination_attribute` (on the related record) is set to `nil`
        * `belongs_to` - the `source_attribute` (on this record) is set to `nil`
      * `{:unrelate, :action_name}` - the record is unrelated using the provided update action. The action should be:
        * `many_to_many` - a destroy action on the join resource
        * `has_many` - an update action on the destination resource
        * `has_one` - an update action on the destination resource
        * `belongs_to` - an update action on the source resource
      """
    ],
    error_path: [
      type: :any,
      doc: """
      By default, errors added to the changeset will use the path `[:relationship_name]`, or `[:relationship_name, <index>]`.
      If you want to modify this path, you can specify `error_path`, e.g if had a `change` on an action that takes an argument
      and uses that argument data to call `manage_relationship`, you may want any generated errors to appear under the name of that
      argument, so you could specify `error_path: :argument_name` when calling `manage_relationship`.
      """
    ],
    meta: [
      type: :any,
      doc:
        "Freeform data that will be retained along with the options, which can be used to track/manage the changes that are added to the `relationships` key."
    ],
    ignore?: [
      type: :any,
      default: false,
      doc: """
      This tells Ash to ignore the provided inputs when actually running the action. This can be useful for
      building up a set of instructions that you intend to handle manually.
      """
    ]
  ]

  @doc false
  def manage_relationship_schema, do: @manage_opts

  @doc """
  Manages the related records by creating, updating, or destroying them as necessary.

  Keep in mind that the default values for all `on_*` are `:ignore`, meaning nothing will happen
  unless you provide instructions.

  The input provided to `manage_relationship` should be a map, in the case of to_one relationships, or a list of maps
  in the case of to_many relationships. The following steps are followed for each input provided:

  - The input is checked against the currently related records to find any matches. The primary key and unique identities are used to find matches.
  - For any input that had a match in the current relationship, the `:on_match` behavior is triggered
  - For any input that does not have a match:
    - if there is `on_lookup` behavior:
      - we try to find the record in the data layer.
      - if the record is found, the on_lookup behavior is triggered
      - if the record is not found, the `on_no_match` behavior is triggered
    - if there is no `on_lookup` behavior:
      - the `on_no_match` behavior is triggered
  - finally, for any records present in the *current relationship* that had no match *in the input*, the `on_missing` behavior is triggered

  ## Options

  #{Spark.OptionsHelpers.docs(@manage_opts)}

  Each call to this function adds new records that will be handled according to their options. For example,
  if you tracked "tags to add" and "tags to remove" in separate fields, you could input them like so:

  ```elixir
  changeset
  |> Ash.Changeset.manage_relationship(
    :tags,
    [%{name: "backend"}],
    on_lookup: :relate, #relate that tag if it exists in the database
    on_no_match: :error # error if a tag with that name doesn't exist
  )
  |> Ash.Changeset.manage_relationship(
    :tags,
    [%{name: "frontend"}],
    on_no_match: :error, # error if a tag with that name doesn't exist in the relationship
    on_match: :unrelate # if a tag with that name is related, unrelate it
  )
  ```

  When calling this multiple times with the `on_missing` option set, the list of records that are considered missing are checked
  after each set of inputs is processed. For example, if you manage the relationship once with `on_missing: :unrelate`, the records
  missing from your input will be removed, and *then* your next call to `manage_relationship` will be resolved (with those records unrelated).
  For this reason, it is suggested that you don't call this function multiple times with an `on_missing` instruction, as you may be
  surprised by the result.

  If you want the input to update existing entities, you need to ensure that the primary key (or unique identity) is provided as
  part of the input. See the example below:

      changeset
      |> Ash.Changeset.manage_relationship(
        :comments,
        [%{rating: 10, contents: "foo"}],
        on_no_match: {:create, :create_action},
        on_missing: :ignore
      )
      |> Ash.Changeset.manage_relationship(
        :comments,
        [%{id: 10, rating: 10, contents: "foo"}],
        on_match: {:update, :update_action},
        on_no_match: {:create, :create_action})

  This is a simple way to manage a relationship. If you need custom behavior, you can customize the action that is
  called, which allows you to add arguments/changes. However, at some point you may want to forego this function
  and make the changes yourself. For example:

      input = [%{id: 10, rating: 10, contents: "foo"}]

      changeset
      |> Ash.Changeset.after_action(fn _changeset, result ->
        # An example of updating comments based on a result of other changes
        for comment <- input do
          comment = MyApi.get(Comment, comment.id)

          comment
          |> Map.update(:rating, 0, &(&1 * result.rating_weight))
          |> MyApi.update!()
        end

        {:ok, result}
      end)

  ## Using records as input

  Records can be supplied as the input values. If you do:

  * if it would be looked up due to `on_lookup`, the record is used as-is
  * if it would be created due to `on_no_match`, the record is used as-is
  * Instead of specifying `join_keys`, those keys must go in `__metadata__.join_keys`. If `join_keys` is specified in the options, it is ignored.

  For example:

  ```elixir
  post1 =
    changeset
    |> Api.create!()
    |> Ash.Resource.put_metadata(:join_keys, %{type: "a"})

  post1 =
    changeset2
    |> Api.create!()
    |> Ash.Resource.put_metadata(:join_keys, %{type: "b"})

  author = Api.create!(author_changeset)

  Ash.Changeset.manage_relationship(
    author,
    :posts,
    [post1, post2],
    on_lookup: :relate
  )
  ```
  """
  def manage_relationship(changeset, relationship, input, opts \\ [])

  def manage_relationship(changeset, relationship, "", opts) do
    manage_relationship(changeset, relationship, nil, opts)
  end

  def manage_relationship(changeset, relationship, input, opts) do
    opts =
      if opts[:type] == :replace do
        Logger.warning(
          "`type: :replace` has been renamed to `:append_and_remove` in 2.0, and it will be removed in 2.1"
        )

        Keyword.put(opts, :type, :append_and_remove)
      else
        opts
      end

    inputs_was_list? = is_list(input)

    manage_opts =
      if opts[:type] do
        defaults = manage_relationship_opts(opts[:type])

        Enum.reduce(defaults, @manage_opts, fn {key, value}, manage_opts ->
          Spark.OptionsHelpers.set_default!(manage_opts, key, value)
        end)
      else
        @manage_opts
      end

    opts = Spark.OptionsHelpers.validate!(opts, manage_opts)

    opts =
      if Keyword.has_key?(opts[:meta] || [], :inputs_was_list?) do
        opts
      else
        Keyword.update(
          opts,
          :meta,
          [inputs_was_list?: inputs_was_list?],
          &Keyword.put(&1, :inputs_was_list?, inputs_was_list?)
        )
      end

    case Ash.Resource.Info.relationship(changeset.resource, relationship) do
      nil ->
        error =
          NoSuchRelationship.exception(
            resource: changeset.resource,
            name: relationship
          )

        add_error(changeset, error)

      %{writable?: false} = relationship ->
        error =
          InvalidRelationship.exception(
            relationship: relationship.name,
            message: "relationship is not editable"
          )

        add_error(changeset, error)

      %{manual: manual} = relationship when not is_nil(manual) ->
        error =
          InvalidRelationship.exception(
            relationship: relationship.name,
            message: "cannot manage a manual relationship"
          )

        add_error(changeset, error)

      %{cardinality: :one, type: type} = relationship when is_list(input) and length(input) > 1 ->
        error =
          InvalidRelationship.exception(
            relationship: relationship.name,
            message: "cannot manage a #{type} relationship with a list of records"
          )

        add_error(changeset, error)

      relationship ->
        key =
          opts[:value_is_key] ||
            changeset.resource
            |> Ash.Resource.Info.related(relationship.name)
            |> Ash.Resource.Info.primary_key()
            |> case do
              [key] ->
                key

              _ ->
                nil
            end

        if relationship.cardinality == :many && is_map(input) && !is_struct(input) do
          case map_input_to_list(input) do
            {:ok, input} ->
              input =
                if key do
                  Enum.map(input, fn input ->
                    if is_map(input) || is_list(input) do
                      input
                    else
                      %{key => input}
                    end
                  end)
                else
                  input
                end

              manage_relationship(changeset, relationship.name, input, opts)

            :error ->
              manage_relationship(changeset, relationship.name, List.wrap(input), opts)
          end
        else
          input =
            if key do
              input
              |> List.wrap()
              |> Enum.map(fn input ->
                if is_map(input) || is_list(input) do
                  input
                else
                  %{key => input}
                end
              end)
            else
              input
            end

          if Enum.any?(
               List.wrap(input),
               &(is_struct(&1) && Ash.Resource.Info.resource?(&1.__struct__) &&
                   &1.__struct__ != relationship.destination)
             ) do
            add_error(
              changeset,
              InvalidRelationship.exception(
                relationship: relationship.name,
                message: "cannot provide structs that don't match the destination"
              )
            )
          else
            relationships =
              changeset.relationships
              |> Map.put_new(relationship.name, [])
              |> Map.update!(relationship.name, &(&1 ++ [{input, opts}]))

            changeset = %{changeset | relationships: relationships}

            if opts[:eager_validate_with] do
              eager_validate_relationship_input(
                relationship,
                input,
                changeset,
                opts[:eager_validate_with],
                opts[:error_path] || opts[:meta][:id] || relationship.name,
                opts
              )
            else
              changeset
            end
          end
        end
    end
  end

  defp eager_validate_relationship_input(_relationship, [], _changeset, _api, _error_path, _opts),
    do: :ok

  defp eager_validate_relationship_input(relationship, input, changeset, api, error_path, opts) do
    pkeys = Ash.Actions.ManagedRelationships.pkeys(relationship, opts)

    pkeys =
      Enum.map(pkeys, fn pkey ->
        Enum.map(pkey, fn key ->
          Ash.Resource.Info.attribute(relationship.destination, key)
        end)
      end)

    search =
      Enum.reduce(input, false, fn item, expr ->
        filter =
          Enum.find_value(pkeys, fn pkey ->
            this_filter =
              pkey
              |> Enum.reject(&(&1.name == relationship.destination_attribute))
              |> Enum.all?(fn key ->
                case fetch_identity_field(
                       item,
                       changeset.data,
                       key,
                       relationship
                     ) do
                  {:ok, _value} ->
                    true

                  :error ->
                    false
                end
              end)
              |> case do
                true ->
                  case Map.take(
                         item,
                         Enum.map(pkey, & &1.name) ++ Enum.map(pkey, &to_string(&1.name))
                       ) do
                    empty when empty == %{} -> nil
                    filter -> filter
                  end

                false ->
                  nil
              end

            if Enum.any?(pkey, &(&1.name == relationship.destination_attribute)) &&
                 relationship.type in [:has_many, :has_one] do
              destination_value = Map.get(changeset.data, relationship.source_attribute)

              Ash.Expr.expr(
                ^this_filter and
                  (is_nil(ref(^relationship.destination_attribute, [])) or
                     ref(^relationship.destination_attribute, []) == ^destination_value)
              )
            else
              this_filter
            end
          end)

        if filter && filter != %{} do
          Ash.Expr.expr(^expr or ^filter)
        else
          expr
        end
      end)

    results =
      if search == false do
        {:ok, []}
      else
        action =
          relationship.read_action ||
            Ash.Resource.Info.primary_action!(relationship.destination, :read).name

        relationship.destination
        |> Ash.Query.for_read(action, %{},
          actor: changeset.context[:private][:actor],
          authorize?: changeset.context[:private][:authorize?],
          tenant: changeset.tenant
        )
        |> Ash.Query.limit(Enum.count(input))
        |> Ash.Query.do_filter(search)
        |> api.read()
      end

    case results do
      {:error, error} ->
        {:error, error}

      {:ok, results} ->
        case Enum.find(input, fn item ->
               no_pkey_all_matches(results, pkeys, fn result, key ->
                 case fetch_identity_field(item, changeset.data, key, relationship) do
                   {:ok, value} ->
                     Ash.Type.equal?(
                       key.type,
                       value,
                       Map.get(result, key.name)
                     )

                   :error ->
                     false
                 end
               end)
             end) do
          nil ->
            :ok

          item ->
            pkey_search =
              Enum.find_value(pkeys, fn pkey ->
                if Enum.all?(pkey, fn key ->
                     Map.has_key?(item, key.name) || Map.has_key?(item, to_string(key.name))
                   end) do
                  Map.take(item, pkey ++ Enum.map(pkey, &to_string(&1.name)))
                end
              end)

            {:error,
             Ash.Error.Query.NotFound.exception(
               primary_key: pkey_search,
               resource: relationship.destination
             )}
        end
    end
    |> case do
      :ok ->
        changeset

      {:error, error} ->
        add_error(changeset, Ash.Error.set_path(error, error_path))
    end
  end

  defp no_pkey_all_matches(results, pkeys, func) do
    !Enum.any?(results, fn result ->
      Enum.any?(pkeys, fn pkey ->
        Enum.all?(pkey, fn key ->
          func.(result, key)
        end)
      end)
    end)
  end

  defp fetch_identity_field(item, data, attribute, relationship) do
    if attribute.name == relationship.destination_attribute &&
         relationship.type in [:has_many, :has_one] do
      {:ok, Map.get(data, relationship.source_attribute)}
    else
      string_attribute = to_string(attribute.name)

      if Map.has_key?(item, attribute.name) || Map.has_key?(item, string_attribute) do
        input_value = Map.get(item, attribute.name) || Map.get(item, string_attribute)

        case Ash.Type.cast_input(attribute.type, input_value, attribute.constraints) do
          {:ok, casted_input_value} ->
            {:ok, casted_input_value}

          _ ->
            :error
        end
      else
        :error
      end
    end
  end

  defp map_input_to_list(input) when input == %{} do
    :error
  end

  defp map_input_to_list(input) do
    input
    |> Enum.reduce_while({:ok, []}, fn
      {key, value}, {:ok, acc} when is_integer(key) ->
        {:cont, {:ok, [{key, value} | acc]}}

      {key, value}, {:ok, acc} when is_binary(key) ->
        case Integer.parse(key) do
          {int, ""} ->
            {:cont, {:ok, [{int, value} | acc]}}

          _ ->
            {:halt, :error}
        end

      _, _ ->
        {:halt, :error}
    end)
    |> case do
      {:ok, value} ->
        {:ok,
         value
         |> Enum.sort_by(&elem(&1, 0))
         |> Enum.map(&elem(&1, 1))}

      :error ->
        :error
    end
  end

  @doc """
  Appends a record or a list of records to a relationship.

  Alias for:

  ```elixir
  manage_relationship(changeset, relationship, input,
    on_lookup: :relate, # If a record is not in the relationship, and can be found, relate it
    on_no_match: :error, # If a record is not found in the relationship or the database, we error
    on_match: :ignore, # If a record is found in the relationship we don't change it
    on_missing: :ignore, # If a record is not found in the input, we ignore it
  )
  ```

  Provide `opts` to customize/override the behavior.
  """
  @spec append_to_relationship(
          t,
          atom,
          Ash.Resource.record() | map | term | [Ash.Resource.record() | map | term],
          Keyword.t()
        ) ::
          t()
  @deprecated "Use manage_relationship/4 instead"
  def append_to_relationship(changeset, relationship, record_or_records, opts \\ []) do
    manage_relationship(
      changeset,
      relationship,
      record_or_records,
      Keyword.merge(
        [
          on_lookup: :relate,
          on_no_match: :error,
          on_match: :ignore,
          on_missing: :ignore,
          authorize?: false
        ],
        opts
      )
    )
  end

  @doc """
  Removes a record or a list of records to a relationship.

  Alias for:

  ```elixir
  manage_relationship(changeset, relationship, record_or_records,
    on_no_match: :error, # If a record is not found in the relationship, we error
    on_match: :unrelate, # If a record is found in the relationship we unrelate it
    on_missing: :ignore, # If a record is not found in the relationship
    authorize?: false
  )
  ```
  """
  @deprecated "Use manage_relationship/4 instead"
  @spec remove_from_relationship(
          t,
          atom,
          Ash.Resource.record() | map | term | [Ash.Resource.record() | map | term],
          Keyword.t()
        ) ::
          t()
  def remove_from_relationship(changeset, relationship, record_or_records, opts \\ []) do
    manage_relationship(
      changeset,
      relationship,
      record_or_records,
      Keyword.merge(
        [
          on_no_match: :error,
          on_match: :unrelate,
          on_missing: :ignore,
          authorize?: false
        ],
        opts
      )
    )
  end

  @doc """
  Alias for:

  ```elixir
  manage_relationship(
    changeset,
    relationship,
    record_or_records,
    on_lookup: :relate, # If a record is not found in the relationship, but is found in the database, relate it and apply the input as an update
    on_no_match: :error, # If a record is not found in the relationship or the database, we error
    on_match: :ignore, # If a record is found in the relationship we make no changes to it
    on_missing: :unrelate, # If a record is not found in the relationship, we unrelate it
    authorize?: false
  )
  ```
  """
  @spec replace_relationship(
          t(),
          atom(),
          Ash.Resource.record() | map | term | [Ash.Resource.record() | map | term] | nil,
          Keyword.t()
        ) :: t()
  @deprecated "Use manage_relationship/4 instead"
  def replace_relationship(changeset, relationship, record_or_records, opts \\ []) do
    manage_relationship(
      changeset,
      relationship,
      record_or_records,
      Keyword.put(opts, :type, :append_and_remove)
    )
  end

  @doc "Returns true if any attributes on the resource are being changed."
  @spec changing_attributes?(t()) :: boolean
  def changing_attributes?(changeset) do
    changeset.resource
    |> Ash.Resource.Info.attributes()
    |> Enum.any?(&changing_attribute?(changeset, &1.name))
  end

  @doc "Returns true if an attribute exists in the changes"
  @spec changing_attribute?(t(), atom) :: boolean
  def changing_attribute?(changeset, attribute) do
    Map.has_key?(changeset.attributes, attribute)
  end

  @doc "Returns true if a relationship exists in the changes"
  @spec changing_relationship?(t(), atom) :: boolean
  def changing_relationship?(changeset, relationship) do
    Map.has_key?(changeset.relationships, relationship)
  end

  @doc "Change an attribute only if is not currently being changed"
  @spec change_new_attribute(t(), atom, term) :: t()
  def change_new_attribute(changeset, attribute, value) do
    maybe_already_validated_error!(changeset, :force_change_new_attribute)

    if changing_attribute?(changeset, attribute) do
      changeset
    else
      change_attribute(changeset, attribute, value)
    end
  end

  @doc """
  Change an attribute if is not currently being changed, by calling the provided function.

  Use this if you want to only perform some expensive calculation for an attribute value
  only if there isn't already a change for that attribute.
  """
  @spec change_new_attribute_lazy(t(), atom, (-> any)) :: t()
  def change_new_attribute_lazy(changeset, attribute, func) do
    maybe_already_validated_error!(changeset, :force_change_new_attribute_lazy)

    if changing_attribute?(changeset, attribute) do
      changeset
    else
      change_attribute(changeset, attribute, func.())
    end
  end

  @doc """
  Add an argument to the changeset, which will be provided to the action.
  """
  def set_argument(changeset, argument, value) do
    maybe_already_validated_error!(changeset, :set_argument)
    do_set_argument(changeset, argument, value)
  end

  @doc """
  Add an argument to the changeset, which will be provided to the action.

  Does not show a warning when used in before/after action hooks.
  """
  def force_set_argument(changeset, argument, value) do
    do_set_argument(changeset, argument, value)
  end

  defp do_set_argument(changeset, argument, value) do
    if changeset.action do
      argument =
        Enum.find(
          changeset.action.arguments,
          &(&1.name == argument || to_string(&1.name) == argument)
        )

      if argument do
        with {:ok, casted} <-
               Ash.Type.Helpers.cast_input(argument.type, value, argument.constraints, changeset),
             {:constrained, {:ok, casted}, argument} when not is_nil(casted) <-
               {:constrained,
                Ash.Type.apply_constraints(argument.type, casted, argument.constraints),
                argument} do
          %{changeset | arguments: Map.put(changeset.arguments, argument.name, casted)}
        else
          {:constrained, {:ok, nil}, _argument} ->
            %{changeset | arguments: Map.put(changeset.arguments, argument.name, nil)}

          {:constrained, {:error, error}, argument} ->
            add_invalid_errors(value, :argument, changeset, argument, error)

          {:error, error} ->
            add_invalid_errors(value, :argument, changeset, argument, error)
        end
      else
        %{changeset | arguments: Map.put(changeset.arguments, argument, value)}
      end
    else
      %{changeset | arguments: Map.put(changeset.arguments, argument, value)}
    end
  end

  @doc """
  Remove an argument from the changeset
  """
  def delete_argument(changeset, argument_or_arguments) do
    maybe_already_validated_error!(changeset)

    argument_or_arguments
    |> List.wrap()
    |> Enum.reduce(changeset, fn argument, changeset ->
      %{changeset | arguments: Map.delete(changeset.arguments, argument)}
    end)
  end

  @doc """
  Merge a map of arguments to the arguments list.
  """
  def set_arguments(changeset, map) do
    Enum.reduce(map, changeset, fn {key, value}, changeset ->
      set_argument(changeset, key, value)
    end)
  end

  @doc """
  Merge a map of arguments to the arguments list.

  Does not show a warning when used in before/after action hooks.
  """
  def force_set_arguments(changeset, map) do
    Enum.reduce(map, changeset, fn {key, value}, changeset ->
      force_set_argument(changeset, key, value)
    end)
  end

  @doc """
  Force change an attribute if it is not currently being changed.

  See `change_new_attribute/3` for more.
  """
  @spec force_change_new_attribute(t(), atom, term) :: t()
  def force_change_new_attribute(changeset, attribute, value) do
    if changing_attribute?(changeset, attribute) do
      changeset
    else
      force_change_attribute(changeset, attribute, value)
    end
  end

  @doc """
  Force change an attribute if it is not currently being changed, by calling the provided function.

  See `change_new_attribute_lazy/3` for more.
  """
  @spec force_change_new_attribute_lazy(t(), atom, (-> any)) :: t()
  def force_change_new_attribute_lazy(changeset, attribute, func) do
    if changing_attribute?(changeset, attribute) do
      changeset
    else
      force_change_attribute(changeset, attribute, func.())
    end
  end

  @doc "Calls `change_attribute/3` for each key/value pair provided."
  @spec change_attributes(t(), map | Keyword.t()) :: t()
  def change_attributes(changeset, changes) do
    maybe_already_validated_error!(changeset, :force_change_attributes)

    Enum.reduce(changes, changeset, fn {key, value}, changeset ->
      change_attribute(changeset, key, value)
    end)
  end

  @doc "Adds a change to the changeset, unless the value matches the existing value."
  @spec change_attribute(t(), atom, any) :: t()
  def change_attribute(changeset, attribute, value) do
    maybe_already_validated_error!(changeset, :change_attribute)

    case Ash.Resource.Info.attribute(changeset.resource, attribute) do
      nil ->
        error =
          NoSuchAttribute.exception(
            resource: changeset.resource,
            name: attribute
          )

        add_error(changeset, error)

      %{writable?: false} = attribute ->
        add_invalid_errors(value, :attribute, changeset, attribute, "Attribute is not writable")

      attribute ->
        with value <- Ash.Type.Helpers.handle_indexed_maps(attribute.type, value),
             {:ok, prepared} <-
               prepare_change(changeset, attribute, value, attribute.constraints),
             {:ok, casted} <-
               Ash.Type.Helpers.cast_input(
                 attribute.type,
                 prepared,
                 attribute.constraints,
                 changeset,
                 true
               ),
             {:ok, casted} <-
               handle_change(changeset, attribute, casted, attribute.constraints),
             {:ok, casted} <-
               Ash.Type.apply_constraints(attribute.type, casted, attribute.constraints) do
          data_value = Map.get(changeset.data, attribute.name)
          changeset = remove_default(changeset, attribute.name)

          cond do
            changeset.action_type == :create ->
              %{
                changeset
                | attributes: Map.put(changeset.attributes, attribute.name, casted),
                  defaults: changeset.defaults -- [attribute.name]
              }

            is_nil(data_value) and is_nil(casted) ->
              %{
                changeset
                | attributes: Map.delete(changeset.attributes, attribute.name),
                  defaults: changeset.defaults -- [attribute.name]
              }

            Ash.Type.equal?(attribute.type, casted, data_value) ->
              %{
                changeset
                | attributes: Map.delete(changeset.attributes, attribute.name),
                  defaults: changeset.defaults -- [attribute.name]
              }

            true ->
              %{
                changeset
                | attributes: Map.put(changeset.attributes, attribute.name, casted),
                  defaults: changeset.defaults -- [attribute.name]
              }
          end
        else
          {{:error, error_or_errors}, _last_val} ->
            add_invalid_errors(value, :attribute, changeset, attribute, error_or_errors)

          :error ->
            add_invalid_errors(value, :attribute, changeset, attribute)

          {:error, error_or_errors} ->
            add_invalid_errors(value, :attribute, changeset, attribute, error_or_errors)
        end
    end
  end

  @doc """
  The same as `change_attribute`, but annotates that the attribute is currently holding a default value.

  This information can be used in changes to see if a value was explicitly set or if it was set by being the default.
  Additionally, this is used in `upsert` actions to not overwrite existing values with the default.
  """
  @spec change_default_attribute(t(), atom, any) :: t()
  def change_default_attribute(changeset, attribute, value) do
    maybe_already_validated_error!(changeset)

    case Ash.Resource.Info.attribute(changeset.resource, attribute) do
      nil ->
        error =
          NoSuchAttribute.exception(
            resource: changeset.resource,
            name: attribute
          )

        add_error(changeset, error)

      attribute ->
        changeset
        |> change_attribute(attribute.name, value)
        |> Map.update!(:defaults, fn defaults ->
          Enum.uniq([attribute.name | defaults])
        end)
    end
  end

  @doc "Calls `force_change_attribute/3` for each key/value pair provided."
  @spec force_change_attributes(t(), map | Keyword.t()) :: t()
  def force_change_attributes(changeset, changes) do
    Enum.reduce(changes, changeset, fn {key, value}, changeset ->
      force_change_attribute(changeset, key, value)
    end)
  end

  @doc "Changes an attribute even if it isn't writable"
  @spec force_change_attribute(t(), atom, any) :: t()
  def force_change_attribute(changeset, attribute, value) do
    case Ash.Resource.Info.attribute(changeset.resource, attribute) do
      nil ->
        error =
          NoSuchAttribute.exception(
            resource: changeset.resource,
            name: attribute
          )

        add_error(changeset, error)

      attribute when is_nil(value) ->
        changeset = remove_default(changeset, attribute.name)
        %{changeset | attributes: Map.put(changeset.attributes, attribute.name, nil)}

      attribute ->
        with value <- Ash.Type.Helpers.handle_indexed_maps(attribute.type, value),
             {:ok, prepared} <-
               prepare_change(changeset, attribute, value, attribute.constraints),
             {:ok, casted} <-
               Ash.Type.Helpers.cast_input(
                 attribute.type,
                 prepared,
                 attribute.constraints,
                 changeset
               ),
             {:ok, casted} <- handle_change(changeset, attribute, casted, attribute.constraints),
             {:ok, casted} <-
               Ash.Type.apply_constraints(attribute.type, casted, attribute.constraints) do
          data_value = Map.get(changeset.data, attribute.name)

          changeset = remove_default(changeset, attribute.name)

          cond do
            changeset.action_type == :create ->
              %{
                changeset
                | attributes: Map.put(changeset.attributes, attribute.name, casted)
              }

            is_nil(data_value) and is_nil(casted) ->
              %{
                changeset
                | attributes: Map.delete(changeset.attributes, attribute.name)
              }

            Ash.Type.equal?(attribute.type, casted, data_value) ->
              %{
                changeset
                | attributes: Map.delete(changeset.attributes, attribute.name)
              }

            true ->
              %{
                changeset
                | attributes: Map.put(changeset.attributes, attribute.name, casted)
              }
          end
        else
          :error ->
            add_invalid_errors(value, :attribute, changeset, attribute)

          {:error, error_or_errors} ->
            add_invalid_errors(value, :attribute, changeset, attribute, error_or_errors)
        end
    end
  end

  @doc "Calls `unsafe_change_attribute/3` for each key/value pair provided."
  @spec unsafe_change_attributes(t(), map | Keyword.t()) :: t()
  def unsafe_change_attributes(changeset, changes) do
    Enum.reduce(changes, changeset, fn {key, value}, changeset ->
      unsafe_change_attribute(changeset, key, value)
    end)
  end

  @doc "Changes an attribute even if it isn't writable, doing no type casting or validation"
  @spec unsafe_change_attribute(t(), atom, any) :: t()
  def unsafe_change_attribute(changeset, attribute, value) do
    %{changeset | attributes: Map.put(changeset.attributes, attribute, value)}
  end

  @doc """
  Adds a before_action hook to the changeset.

  Provide the option `append?: true` to place the hook after all
  other hooks instead of before.
  """
  @spec before_action(
          t(),
          before_action_fun(),
          Keyword.t()
        ) ::
          t()
  def before_action(changeset, func, opts \\ []) do
    if opts[:append?] do
      %{changeset | before_action: changeset.before_action ++ [func]}
    else
      %{changeset | before_action: [func | changeset.before_action]}
    end
  end

  @doc """
  Adds a before_transaction hook to the changeset.

  Provide the option `append?: true` to place the hook after all
  other hooks instead of before.
  """
  @spec before_transaction(
          t(),
          before_transaction_fun(),
          Keyword.t()
        ) :: t()
  def before_transaction(changeset, func, opts \\ []) do
    if opts[:append?] do
      %{changeset | before_transaction: changeset.before_transaction ++ [func]}
    else
      %{changeset | before_transaction: [func | changeset.before_transaction]}
    end
  end

  @doc """
  Adds an after_action hook to the changeset.

  Provide the option `prepend?: true` to place the hook before all
  other hooks instead of after.
  """
  @spec after_action(
          t(),
          after_action_fun(),
          Keyword.t()
        ) :: t()
  def after_action(changeset, func, opts \\ []) do
    if opts[:prepend?] do
      %{changeset | after_action: [func | changeset.after_action]}
    else
      %{changeset | after_action: changeset.after_action ++ [func]}
    end
  end

  @doc """
  Adds an after_transaction hook to the changeset.

  `after_transaction` hooks differ from `after_action` hooks in that they are run
  on success *and* failure of the action or some previous hook.

  Provide the option `prepend?: true` to place the hook before all
  other hooks instead of after.
  """
  @spec after_transaction(
          t(),
          after_transaction_fun(),
          Keyword.t()
        ) :: t()
  def after_transaction(changeset, func, opts \\ []) do
    if opts[:prepend?] do
      %{changeset | after_transaction: [func | changeset.after_transaction]}
    else
      %{changeset | after_transaction: changeset.after_transaction ++ [func]}
    end
  end

  @doc """
  Adds an around_action hook to the changeset.

  Your function will get the changeset, and a callback that must be called with a changeset (that may be modified).
  The callback will return `{:ok, result, instructions}` or `{:error, error}`. You can modify these values, but the
  return value must be one of those types. Instructions contains the notifications in its `notifications` key, i.e
  `%{notifications: [%Ash.Resource.Notification{}, ...]}`.

  The around_action calls happen first, and then (after they each resolve their callbacks) the `before_action`
  hooks are called, followed by the action itself occurring at the data layer and then the `after_action` hooks being run.
  Then, the code that appeared *after* the callbacks were called is then run.

  For example:
  ```elixir
  changeset
  |> Ash.Changeset.around_action(fn changeset, callback ->
    IO.puts("first around: before")
    result = callback.(changeset)
    IO.puts("first around: after")

    result
  end)
  |> Ash.Changeset.around_action(fn changeset, callback ->
    IO.puts("second around: before")
    result = callback.(changeset)
    IO.puts("second around: after")

    result
  end)
  |> Ash.Changeset.before_action(fn changeset ->
    IO.puts("first before")
    changeset
  end, append?: true)
  |> Ash.Changeset.before_action(fn changeset ->
    IO.puts("second before")
    changeset
  end, append?: true)
  |> Ash.Changeset.after_action(fn changeset, result ->
    IO.puts("first after")
    {:ok, result}
  end)
  |> Ash.Changeset.after_action(fn changeset, result ->
    IO.puts("second after")
    {:ok, result}
  end)
  ```

  This would print:
  ```
  first around: before
  second around: before
  first before
  second before
               <-- action happens here
  first after
  second after
  second around: after <-- Notice that because of the callbacks, the after of the around hooks is reversed from the before
  first around: after
  ```

  Warning: using this without understanding how it works can cause big problems.
  You *must* call the callback function that is provided to your hook, and the return value must
  contain the same structure that was given to you, i.e `{:ok, result_of_action, instructions}`.

  You can almost always get the same effect by using `before_action`, setting some context on the changeset
  and reading it out in an `after_action` hook.
  """

  @spec around_action(t(), around_action_fun()) :: t()
  def around_action(changeset, func) do
    %{changeset | around_action: changeset.around_action ++ [func]}
  end

  @doc """
  Adds an around_transaction hook to the changeset.

  Your function will get the changeset, and a callback that must be called with a changeset (that may be modified).
  The callback will return `{:ok, result}` or `{:error, error}`. You can modify these values, but the return value
  must be one of those types.

  The around_transaction calls happen first, and then (after they each resolve their callbacks) the `before_transaction`
  hooks are called, followed by the action hooks and then the `after_transaction` hooks being run.
  Then, the code that appeared *after* the callbacks were called is then run.

  For example:

  ```elixir
  changeset
  |> Ash.Changeset.around_transaction(fn changeset, callback ->
    IO.puts("first around: before")
    result = callback.(changeset)
    IO.puts("first around: after")

    result
  end)
  |> Ash.Changeset.around_transaction(fn changeset, callback ->
    IO.puts("second around: before")
    result = callback.(changeset)
    IO.puts("second around: after")

    result
  end)
  |> Ash.Changeset.before_transaction(fn changeset ->
    IO.puts("first before")
    changeset
  end, append?: true)
  |> Ash.Changeset.before_transaction(fn changeset ->
    IO.puts("second before")
    changeset
  end, append?: true)
  |> Ash.Changeset.after_transaction(fn changeset, result ->
    IO.puts("first after")
    result
  end)
  |> Ash.Changeset.after_transaction(fn changeset, result ->
    IO.puts("second after")
    result
  end)
  ```

  This would print:

  ```
  first around: before
  second around: before
  first before
  second before
               <-- action hooks happens here
  first after
  second after
  second around: after <-- Notice that because of the callbacks, the after of the around hooks is reversed from the before
  first around: after
  ```

  Warning: using this without understanding how it works can cause big problems.
  You *must* call the callback function that is provided to your hook, and the return value must
  contain the same structure that was given to you, i.e `{:ok, result_of_action}`.

  You can almost always get the same effect by using `before_transaction`, setting some context on the changeset
  and reading it out in an `after_transaction` hook.
  """

  @spec around_transaction(t(), around_transaction_fun()) :: t()
  def around_transaction(changeset, func) do
    %{changeset | around_transaction: changeset.around_transaction ++ [func]}
  end

  @doc """
  Returns the original data with attribute changes merged, if the changeset is valid.

  Options:

  * force? - applies current attributes even if the changeset is not valid
  """
  @spec apply_attributes(t(), opts :: Keyword.t()) :: {:ok, Ash.Resource.record()} | {:error, t()}
  def apply_attributes(changeset, opts \\ [])

  def apply_attributes(%{valid?: true} = changeset, _opts) do
    {:ok,
     Enum.reduce(changeset.attributes, changeset.data, fn {attribute, value}, data ->
       Map.put(data, attribute, value)
     end)}
  end

  def apply_attributes(changeset, opts) do
    if opts[:force?] do
      apply_attributes(%{changeset | valid?: true}, opts)
    else
      {:error, changeset}
    end
  end

  defp remove_default(changeset, attribute) do
    %{changeset | defaults: changeset.defaults -- [attribute]}
  end

  @doc "Clears an attribute or relationship change off of the changeset."
  def clear_change(changeset, field) do
    cond do
      attr = Ash.Resource.Info.attribute(changeset.resource, field) ->
        %{changeset | attributes: Map.delete(changeset.attributes, attr.name)}

      rel = Ash.Resource.Info.relationship(changeset.resource, field) ->
        %{changeset | relationships: Map.delete(changeset.relationships, rel.name)}

      true ->
        changeset
    end
  end

  @doc """
  Sets a custom error handler on the changeset.

  The error handler should be a two argument function or an mfa, in which case the first two arguments will be set
  to the changeset and the error, w/ the supplied arguments following those.

  Any errors generated are passed to `handle_errors`, which can return any of the following:

  * `:ignore` - the error is discarded, and the changeset is not marked as invalid
  * `changeset` - a new (or the same) changeset. The error is not added (you'll want to add an error yourself), but the changeset *is* marked as invalid.
  * `{changeset, error}` - a new (or the same) error and changeset. The error is added to the changeset, and the changeset is marked as invalid.
  * `anything_else` - is treated as a new, transformed version of the error. The result is added as an error to the changeset, and the changeset is marked as invalid.
  """
  @spec handle_errors(
          t(),
          (t(), error :: term -> :ignore | t() | (error :: term) | {error :: term, t()})
          | {module, atom, [term]}
        ) :: t()
  def handle_errors(changeset, {m, f, a}) do
    %{changeset | handle_errors: &apply(m, f, [&1, &2 | a])}
  end

  def handle_errors(changeset, func) do
    %{changeset | handle_errors: func}
  end

  @doc """
  Adds a filter for a record being updated or destroyed.

  Used by optimistic locking. See `Ash.Resource.Change.Builtins.optimistic_lock/1` for more.
  """
  @spec filter(t(), %{optional(atom) => term}) :: t()
  def filter(changeset, fields) do
    if Ash.DataLayer.data_layer_can?(changeset.resource, :changeset_filter) || fields == %{} do
      %{changeset | filters: Map.merge(changeset.filters, fields)}
    else
      IO.warn(
        "Filters (used by optimistic locking) is not supported in the #{inspect(Ash.DataLayer.data_layer(changeset.resource))} data layer"
      )

      changeset
    end
  end

  @doc """
  Adds an error to the changesets errors list, and marks the change as `valid?: false`.

  ## Error Data

  The given `errors` argument can be a string, a keyword list, a struct, or a list of any of the three.

  If `errors` is a keyword list, or a list of keyword lists, the following keys are supported in the keyword list:

  - `field` (atom) - the field that the error is for. This is required, unless `fields` is given.
  - `fields` (list of atoms) - the fields that the error is for. This is required, unless `field` is given.
  - `message` (string) - the error message
  - `value` (any) - (optional) the field value that caused the error
  """
  @spec add_error(t(), error_info() | [error_info()], Keyword.t()) :: t()
  @spec add_error(t(), term | String.t() | list(term | String.t())) :: t()
  def add_error(changeset, errors, path \\ [])

  def add_error(changeset, errors, path) when is_list(errors) do
    if Keyword.keyword?(errors) do
      errors
      |> to_change_error()
      |> Ash.Error.set_path(path)
      |> handle_error(changeset)
    else
      Enum.reduce(errors, changeset, &add_error(&2, &1, path))
    end
  end

  def add_error(changeset, error, path) when is_binary(error) do
    add_error(
      changeset,
      InvalidChanges.exception(message: error),
      path
    )
  end

  def add_error(changeset, %__MODULE__{errors: errors}, path) do
    add_error(changeset, errors, path)
  end

  def add_error(changeset, error, path) do
    error
    |> Ash.Error.set_path(path)
    |> handle_error(changeset)
  end

  defp handle_error(error, %{handle_errors: nil} = changeset) do
    %{changeset | valid?: false, errors: [error | changeset.errors]}
  end

  defp handle_error(error, changeset) do
    changeset
    |> changeset.handle_errors.(error)
    |> case do
      :ignore ->
        changeset

      {:ignore, changeset} ->
        changeset

      %__MODULE__{} = changeset ->
        %{changeset | valid?: false}

      {changeset, error} ->
        %{changeset | valid?: false, errors: [error | changeset.errors]}

      error ->
        %{changeset | valid?: false, errors: [error | changeset.errors]}
    end
  end

  defp to_change_error(keyword) do
    error =
      if keyword[:field] do
        InvalidAttribute.exception(
          field: keyword[:field],
          message: keyword[:message],
          value: keyword[:value],
          vars: keyword
        )
      else
        InvalidChanges.exception(
          fields: keyword[:fields] || [],
          message: keyword[:message],
          value: keyword[:value],
          vars: keyword
        )
      end

    if keyword[:path] do
      Ash.Error.set_path(error, keyword[:path])
    else
      error
    end
  end

  defp prepare_change(%{action_type: :create}, _attribute, value, _constraints), do: {:ok, value}

  defp prepare_change(changeset, attribute, value, constraints) do
    old_value = Map.get(changeset.data, attribute.name)
    Ash.Type.prepare_change(attribute.type, old_value, value, constraints)
  end

  defp handle_change(%{action_type: :create}, _attribute, value, _constraints), do: {:ok, value}

  defp handle_change(changeset, attribute, value, constraints) do
    old_value = Map.get(changeset.data, attribute.name)
    Ash.Type.handle_change(attribute.type, old_value, value, constraints)
  end

  defp add_invalid_errors(value, type, changeset, attribute, message \\ nil) do
    changeset = %{changeset | invalid_keys: MapSet.put(changeset.invalid_keys, attribute.name)}

    messages =
      if Keyword.keyword?(message) do
        [message]
      else
        List.wrap(message)
      end

    Enum.reduce(messages, changeset, fn message, changeset ->
      if is_exception(message) do
        error =
          message
          |> Ash.Error.to_ash_error()

        errors =
          case error do
            %class{errors: errors}
            when class in [
                   Ash.Error.Invalid,
                   Ash.Error.Unknown,
                   Ash.Error.Forbidden,
                   Ash.Error.Framework
                 ] ->
              errors

            error ->
              [error]
          end

        Enum.reduce(errors, changeset, fn error, changeset ->
          add_error(changeset, Ash.Error.set_path(error, attribute.name))
        end)
      else
        opts = Ash.Type.Helpers.error_to_exception_opts(message, attribute)

        exception =
          case type do
            :attribute -> InvalidAttribute
            :argument -> InvalidArgument
          end

        Enum.reduce(opts, changeset, fn opts, changeset ->
          error =
            exception.exception(
              value: value,
              field: Keyword.get(opts, :field),
              message: Keyword.get(opts, :message),
              vars: opts
            )

          error =
            if opts[:path] do
              Ash.Error.set_path(error, opts[:path])
            else
              error
            end

          add_error(changeset, error)
        end)
      end
    end)
  end

  defp set_phase(changeset, phase) when changeset.phase == phase, do: changeset

  defp set_phase(changeset, phase)
       when phase in ~w[validate before_action after_action before_transaction after_transaction around_action around_transaction]a,
       do: %{changeset | phase: phase}

  defp clear_phase(changeset), do: %{changeset | phase: :validate}
end