lib/data_layer.ex

defmodule AshPostgres.DataLayer do
  require Ecto.Query
  require Ash.Expr

  @manage_tenant %Spark.Dsl.Section{
    name: :manage_tenant,
    describe: """
    Configuration for the behavior of a resource that manages a tenant
    """,
    examples: [
      """
      manage_tenant do
        template ["organization_", :id]
        create? true
        update? false
      end
      """
    ],
    schema: [
      template: [
        type: {:wrap_list, {:or, [:string, :atom]}},
        required: true,
        doc: """
        A template that will cause the resource to create/manage the specified schema.
        """
      ],
      create?: [
        type: :boolean,
        default: true,
        doc: "Whether or not to automatically create a tenant when a record is created"
      ],
      update?: [
        type: :boolean,
        default: true,
        doc: "Whether or not to automatically update the tenant name if the record is udpated"
      ]
    ]
  }

  @index %Spark.Dsl.Entity{
    name: :index,
    describe: """
    Add an index to be managed by the migration generator.
    """,
    examples: [
      "index [\"column\", \"column2\"], unique: true, where: \"thing = TRUE\""
    ],
    target: AshPostgres.CustomIndex,
    schema: AshPostgres.CustomIndex.schema(),
    transform: {AshPostgres.CustomIndex, :transform, []},
    args: [:fields]
  }

  @custom_indexes %Spark.Dsl.Section{
    name: :custom_indexes,
    describe: """
    A section for configuring indexes to be created by the migration generator.

    In general, prefer to use `identities` for simple unique constraints. This is a tool to allow
    for declaring more complex indexes.
    """,
    examples: [
      """
      custom_indexes do
        index [:column1, :column2], unique: true, where: "thing = TRUE"
      end
      """
    ],
    entities: [
      @index
    ]
  }

  @statement %Spark.Dsl.Entity{
    name: :statement,
    describe: """
    Add a custom statement for migrations.
    """,
    examples: [
      """
      statement :pgweb_idx do
        up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));"
        down "DROP INDEX pgweb_idx;"
      end
      """
    ],
    target: AshPostgres.Statement,
    schema: AshPostgres.Statement.schema(),
    args: [:name]
  }

  @custom_statements %Spark.Dsl.Section{
    name: :custom_statements,
    describe: """
    A section for configuring custom statements to be added to migrations.

    Changing custom statements may require manual intervention, because Ash can't determine what order they should run
    in (i.e if they depend on table structure that you've added, or vice versa). As such, any `down` statements we run
    for custom statements happen first, and any `up` statements happen last.

    Additionally, when changing a custom statement, we must make some assumptions, i.e that we should migrate
    the old structure down using the previously configured `down` and recreate it.

    This may not be desired, and so what you may end up doing is simply modifying the old migration and deleting whatever was
    generated by the migration generator. As always: read your migrations after generating them!
    """,
    examples: [
      """
      custom_statements do
        # the name is used to detect if you remove or modify the statement
        statement :pgweb_idx do
          up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));"
          down "DROP INDEX pgweb_idx;"
        end
      end
      """
    ],
    entities: [
      @statement
    ]
  }

  @reference %Spark.Dsl.Entity{
    name: :reference,
    describe: """
    Configures the reference for a relationship in resource migrations.

    Keep in mind that multiple relationships can theoretically involve the same destination and foreign keys.
    In those cases, you only need to configure the `reference` behavior for one of them. Any conflicts will result
    in an error, across this resource and any other resources that share a table with this one. For this reason,
    instead of adding a reference configuration for `:nothing`, its best to just leave the configuration out, as that
    is the default behavior if *no* relationship anywhere has configured the behavior of that reference.
    """,
    examples: [
      "reference :post, on_delete: :delete, on_update: :update, name: \"comments_to_posts_fkey\""
    ],
    args: [:relationship],
    target: AshPostgres.Reference,
    schema: AshPostgres.Reference.schema()
  }

  @references %Spark.Dsl.Section{
    name: :references,
    describe: """
    A section for configuring the references (foreign keys) in resource migrations.

    This section is only relevant if you are using the migration generator with this resource.
    Otherwise, it has no effect.
    """,
    examples: [
      """
      references do
        reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey"
      end
      """
    ],
    entities: [@reference],
    schema: [
      polymorphic_on_delete: [
        type:
          {:or,
           [
             {:one_of, [:delete, :nilify, :nothing, :restrict]},
             {:tagged_tuple, :nilify, {:wrap_list, :atom}}
           ]},
        doc:
          "For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables."
      ],
      polymorphic_on_update: [
        type: {:one_of, [:update, :nilify, :nothing, :restrict]},
        doc:
          "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
      ],
      polymorphic_name: [
        type: :string,
        doc:
          "For polymorphic resources, then index name to use for the foreign key to the source table."
      ]
    ]
  }

  @check_constraint %Spark.Dsl.Entity{
    name: :check_constraint,
    describe: """
    Add a check constraint to be validated.

    If a check constraint exists on the table but not in this section, and it produces an error, a runtime error will be raised.

    Provide a list of attributes instead of a single attribute to add the message to multiple attributes.

    By adding the `check` option, the migration generator will include it when generating migrations.
    """,
    examples: [
      """
      check_constraint :price, "price_must_be_positive", check: "price > 0", message: "price must be positive"
      """
    ],
    args: [:attribute, :name],
    target: AshPostgres.CheckConstraint,
    schema: AshPostgres.CheckConstraint.schema()
  }

  @check_constraints %Spark.Dsl.Section{
    name: :check_constraints,
    describe: """
    A section for configuring the check constraints for a given table.

    This can be used to automatically create those check constraints, or just to provide message when they are raised
    """,
    examples: [
      """
      check_constraints do
        check_constraint :price, "price_must_be_positive", check: "price > 0", message: "price must be positive"
      end
      """
    ],
    entities: [@check_constraint]
  }

  @references %Spark.Dsl.Section{
    name: :references,
    describe: """
    A section for configuring the references (foreign keys) in resource migrations.

    This section is only relevant if you are using the migration generator with this resource.
    Otherwise, it has no effect.
    """,
    examples: [
      """
      references do
        reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey"
      end
      """
    ],
    entities: [@reference],
    schema: [
      polymorphic_on_delete: [
        type:
          {:or,
           [
             {:one_of, [:delete, :nilify, :nothing, :restrict]},
             {:tagged_tuple, :nilify, {:wrap_list, :atom}}
           ]},
        doc:
          "For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables."
      ],
      polymorphic_on_update: [
        type: {:one_of, [:update, :nilify, :nothing, :restrict]},
        doc:
          "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
      ]
    ]
  }

  @postgres %Spark.Dsl.Section{
    name: :postgres,
    describe: """
    Postgres data layer configuration
    """,
    sections: [
      @custom_indexes,
      @custom_statements,
      @manage_tenant,
      @references,
      @check_constraints
    ],
    modules: [
      :repo
    ],
    examples: [
      """
      postgres do
        repo MyApp.Repo
        table "organizations"
      end
      """
    ],
    schema: [
      repo: [
        type: {:or, [{:behaviour, Ecto.Repo}, {:fun, 2}]},
        required: true,
        doc:
          "The repo that will be used to fetch your data. See the `AshPostgres.Repo` documentation for more. Can also be a function that takes a resource and a type `:read | :mutate` and returns the repo"
      ],
      migrate?: [
        type: :boolean,
        default: true,
        doc:
          "Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations`"
      ],
      storage_types: [
        type: :keyword_list,
        default: [],
        doc:
          "A keyword list of attribute names to the ecto type that should be used for that attribute. Only necessary if you need to override the defaults."
      ],
      migration_types: [
        type: :keyword_list,
        default: [],
        doc:
          "A keyword list of attribute names to the ecto migration type that should be used for that attribute. Only necessary if you need to override the defaults."
      ],
      migration_defaults: [
        type: :keyword_list,
        default: [],
        doc: """
        A keyword list of attribute names to the ecto migration default that should be used for that attribute. The string you use will be placed verbatim in the migration. Use fragments like `fragment(\\\\"now()\\\\")`, or for `nil`, use `\\\\"nil\\\\"`.
        """
      ],
      calculations_to_sql: [
        type: :keyword_list,
        doc:
          "A keyword list of calculations and their SQL representation. Used when creating unique indexes for identities over calculations"
      ],
      identity_wheres_to_sql: [
        type: :keyword_list,
        doc:
          "A keyword list of identity names and the SQL representation of their `where` clause. See `AshPostgres.DataLayer.Info.identity_wheres_to_sql/1` for more details."
      ],
      base_filter_sql: [
        type: :string,
        doc:
          "A raw sql version of the base_filter, e.g `representative = true`. Required if trying to create a unique constraint on a resource with a base_filter"
      ],
      simple_join_first_aggregates: [
        type: {:list, :atom},
        default: [],
        doc: """
        A list of `:first` type aggregate names that can be joined to using a simple join.  Use when you have a `:first` aggregate that uses a to-many relationship , but your `filter` statement ensures that there is only one result. Optimizes the generated query.
        """
      ],
      skip_unique_indexes: [
        type: {:wrap_list, :atom},
        default: [],
        doc: "Skip generating unique indexes when generating migrations"
      ],
      unique_index_names: [
        type:
          {:list,
           {:or,
            [{:tuple, [{:list, :atom}, :string]}, {:tuple, [{:list, :atom}, :string, :string]}]}},
        default: [],
        doc: """
        A list of unique index names that could raise errors that are not configured in identities, or an mfa to a function that takes a changeset and returns the list. In the format `{[:affected, :keys], "name_of_constraint"}` or `{[:affected, :keys], "name_of_constraint", "custom error message"}`
        """
      ],
      exclusion_constraint_names: [
        type: :any,
        default: [],
        doc: """
        A list of exclusion constraint names that could raise errors. Must be in the format `{:affected_key, "name_of_constraint"}` or `{:affected_key, "name_of_constraint", "custom error message"}`
        """
      ],
      identity_index_names: [
        type: :any,
        default: [],
        doc: """
        A keyword list of identity names to the unique index name that they should use when being managed by the migration generator.
        """
      ],
      foreign_key_names: [
        type:
          {:list,
           {:or,
            [
              {:tuple, [{:or, [:atom, :string]}, :string]},
              {:tuple, [{:or, [:atom, :string]}, :string, :string]}
            ]}},
        default: [],
        doc: """
        A list of foreign keys that could raise errors, or an mfa to a function that takes a changeset and returns a list. In the format: `{:key, "name_of_constraint"}` or `{:key, "name_of_constraint", "custom error message"}`
        """
      ],
      migration_ignore_attributes: [
        type: {:list, :atom},
        default: [],
        doc: """
        A list of attributes that will be ignored when generating migrations.
        """
      ],
      table: [
        type: :string,
        doc: """
        The table to store and read the resource from. If this is changed, the migration generator will not remove the old table.
        """
      ],
      schema: [
        type: :string,
        doc: """
        The schema that the table is located in. Schema-based multitenancy will supercede this option. If this is changed, the migration generator will not remove the old schema.
        """
      ],
      polymorphic?: [
        type: :boolean,
        default: false,
        doc: """
        Declares this resource as polymorphic. See the [polymorphic resources guide](/documentation/topics/resources/polymorphic-resources.md) for more.
        """
      ]
    ]
  }

  @behaviour Ash.DataLayer

  @sections [@postgres]

  @moduledoc """
  A postgres data layer that leverages Ecto's postgres capabilities.
  """

  use Spark.Dsl.Extension,
    sections: @sections,
    verifiers: [
      AshPostgres.Verifiers.PreventMultidimensionalArrayAggregates,
      AshPostgres.Verifiers.ValidateReferences,
      AshPostgres.Verifiers.PreventAttributeMultitenancyAndNonFullMatchType,
      AshPostgres.Verifiers.EnsureTableOrPolymorphic,
      AshPostgres.Verifiers.ValidateIdentityIndexNames
    ]

  def migrate(args) do
    Mix.Task.reenable("ash_postgres.migrate")
    Mix.Task.run("ash_postgres.migrate", args)
  end

  def rollback(args) do
    repos = AshPostgres.Mix.Helpers.repos!([], args)

    show_for_repo? = Enum.count_until(repos, 2) == 2

    for repo <- repos do
      {:ok, _, _} =
        Ecto.Migrator.with_repo(repo, fn repo ->
          for_repo =
            if show_for_repo? do
              " for repo #{inspect(repo)}"
            else
              ""
            end

          migrations_path = AshPostgres.Mix.Helpers.migrations_path([], repo)
          tenant_migrations_path = AshPostgres.Mix.Helpers.tenant_migrations_path([], repo)

          current_migrations =
            Ecto.Query.from(row in "schema_migrations",
              select: row.version
            )
            |> repo.all()
            |> Enum.map(&to_string/1)

          files =
            migrations_path
            |> Path.join("**/*.exs")
            |> Path.wildcard()
            |> Enum.sort()
            |> Enum.reverse()
            |> Enum.filter(fn file ->
              Enum.any?(current_migrations, &String.starts_with?(Path.basename(file), &1))
            end)
            |> Enum.take(20)
            |> Enum.map(&String.trim_leading(&1, migrations_path))
            |> Enum.with_index()
            |> Enum.map(fn {file, index} -> "#{index + 1}: #{file}" end)

          n =
            Mix.shell().prompt(
              """
              How many migrations should be rolled back#{for_repo}? (default: 0)

              Last 20 migration names, with the input you must provide to
              rollback up to *and including* that migration:

              #{Enum.join(files, "\n")}
              Rollback to:
              """
              |> String.trim_trailing()
            )
            |> String.trim()
            |> case do
              "" ->
                0

              n ->
                try do
                  String.to_integer(n)
                rescue
                  _ ->
                    reraise "Required an integer value, got: #{n}", __STACKTRACE__
                end
            end

          Mix.Task.run("ash_postgres.rollback", args ++ ["-r", inspect(repo), "-n", to_string(n)])
          Mix.Task.reenable("ash_postgres.rollback")

          tenant_files =
            tenant_migrations_path
            |> Path.join("**/*.exs")
            |> Path.wildcard()
            |> Enum.sort()
            |> Enum.reverse()

          if !Enum.empty?(tenant_files) do
            first_tenant = repo.all_tenants() |> Enum.at(0)

            if first_tenant do
              current_tenant_migrations =
                Ecto.Query.from(row in "schema_migrations",
                  select: row.version
                )
                |> repo.all(prefix: first_tenant)
                |> Enum.map(&to_string/1)

              tenant_files =
                tenant_files
                |> Enum.filter(fn file ->
                  Enum.any?(
                    current_tenant_migrations,
                    &String.starts_with?(Path.basename(file), &1)
                  )
                end)
                |> Enum.take(20)
                |> Enum.map(&String.trim_leading(&1, tenant_migrations_path))
                |> Enum.with_index()
                |> Enum.map(fn {file, index} -> "#{index + 1}: #{file}" end)

              n =
                Mix.shell().prompt(
                  """

                  How many _tenant_ migrations should be rolled back#{for_repo}? (default: 0)

                  IMPORTANT: we are assuming that all of your tenants have all had the same migrations run.
                  If each tenant may be in a different state: *abort this command and roll them back individually*.
                  To do so, use the `--only-tenants` option to `mix ash_postgres.rollback`.

                  Last 20 migration names, with the input you must provide to
                  rollback up to *and including* that migration:

                  #{Enum.join(tenant_files, "\n")}

                  Rollback to:
                  """
                  |> String.trim_trailing()
                )
                |> String.trim()
                |> case do
                  "" ->
                    0

                  n ->
                    try do
                      String.to_integer(n)
                    rescue
                      _ ->
                        reraise "Required an integer value, got: #{n}", __STACKTRACE__
                    end
                end

              Mix.Task.run(
                "ash_postgres.rollback",
                args ++ ["--tenants", "-r", inspect(repo), "-n", to_string(n)]
              )

              Mix.Task.reenable("ash_postgres.rollback")
            end
          end
        end)
    end
  end

  def codegen(args) do
    Mix.Task.reenable("ash_postgres.generate_migrations")
    Mix.Task.run("ash_postgres.generate_migrations", args)
  end

  def setup(args) do
    # TODO: take args that we care about
    Mix.Task.run("ash_postgres.create", args)
    Mix.Task.run("ash_postgres.migrate", args)

    []
    |> AshPostgres.Mix.Helpers.repos!(args)
    |> Enum.all?(&(not has_tenant_migrations?(&1)))
    |> case do
      true ->
        :ok

      _ ->
        Mix.Task.run("ash_postgres.migrate", ["--tenant" | args])
    end
  end

  def tear_down(args) do
    # TODO: take args that we care about
    Mix.Task.run("ash_postgres.drop", args)
  end

  defp has_tenant_migrations?(repo) do
    []
    |> AshPostgres.Mix.Helpers.tenant_migrations_path(repo)
    |> Path.join("**/*.exs")
    |> Path.wildcard()
    |> Enum.empty?()
  end

  import Ecto.Query, only: [from: 2, subquery: 1]

  @impl true
  def prefer_transaction?(resource) do
    AshPostgres.DataLayer.Info.repo(resource, :mutate).prefer_transaction?()
  end

  @impl true
  def prefer_transaction_for_atomic_updates?(resource) do
    AshPostgres.DataLayer.Info.repo(resource, :mutate).prefer_transaction_for_atomic_updates?()
  end

  @impl true
  def can?(_, :async_engine), do: true
  def can?(_, :combine), do: true
  def can?(_, {:combine, _}), do: true
  def can?(_, :bulk_create), do: true

  def can?(_, :action_select), do: true

  def can?(resource, :update_query) do
    # We can't currently support updating a record from a query
    # if that record manages a tenant on update
    !AshPostgres.DataLayer.Info.manage_tenant_update?(resource)
  end

  def can?(_, :destroy_query), do: true

  def can?(_, {:lock, :for_update}), do: true
  def can?(_, :composite_types), do: true

  def can?(_, {:lock, string}) do
    string = String.trim_trailing(string, " NOWAIT")

    String.upcase(string) in [
      "FOR UPDATE",
      "FOR NO KEY UPDATE",
      "FOR SHARE",
      "FOR KEY SHARE"
    ]
  end

  def can?(_, :transact), do: true
  def can?(_, :composite_primary_key), do: true

  def can?(resource, {:atomic, :update}),
    do: not AshPostgres.DataLayer.Info.repo(resource, :mutate).disable_atomic_actions?()

  def can?(resource, {:atomic, :upsert}),
    do: not AshPostgres.DataLayer.Info.repo(resource, :mutate).disable_atomic_actions?()

  def can?(_, :upsert), do: true
  def can?(_, :changeset_filter), do: true

  def can?(resource, {:join, other_resource}) do
    data_layer = Ash.DataLayer.data_layer(resource)
    other_data_layer = Ash.DataLayer.data_layer(other_resource)

    data_layer == other_data_layer and
      AshPostgres.DataLayer.Info.repo(resource, :read) ==
        AshPostgres.DataLayer.Info.repo(other_resource, :read)
  end

  def can?(resource, {:lateral_join, resources}) do
    repo = AshPostgres.DataLayer.Info.repo(resource, :read)
    data_layer = Ash.DataLayer.data_layer(resource)

    data_layer == __MODULE__ &&
      Enum.all?(resources, fn resource ->
        Ash.DataLayer.data_layer(resource) == data_layer &&
          AshPostgres.DataLayer.Info.repo(resource, :read) == repo
      end)
  end

  def can?(_, :boolean_filter), do: true

  def can?(_, {:aggregate, type})
      when type in [:count, :sum, :first, :list, :avg, :max, :min, :exists, :custom],
      do: true

  def can?(_, :aggregate_filter), do: true
  def can?(_, :aggregate_sort), do: true
  def can?(_, :calculate), do: true
  def can?(_, :expression_calculation), do: true
  def can?(_, :expression_calculation_sort), do: true
  def can?(_, :create), do: true
  def can?(_, :select), do: true
  def can?(_, :read), do: true

  def can?(resource, action) when action in ~w[update destroy]a do
    resource
    |> Ash.Resource.Info.primary_key()
    |> Enum.any?()
  end

  def can?(_, :filter), do: true
  def can?(_, :limit), do: true
  def can?(_, :offset), do: true
  def can?(_, :multitenancy), do: true

  def can?(_, {:filter_relationship, %{manual: {module, _}}}) do
    Spark.implements_behaviour?(module, AshPostgres.ManualRelationship)
  end

  def can?(_, {:filter_relationship, _}), do: true

  def can?(_, {:aggregate_relationship, %{manual: {module, _}}}) do
    Spark.implements_behaviour?(module, AshPostgres.ManualRelationship)
  end

  def can?(_, {:aggregate_relationship, _}), do: true

  def can?(_, :timeout), do: true

  def can?(resource, :expr_error),
    do: not AshPostgres.DataLayer.Info.repo(resource, :mutate).disable_expr_error?()

  def can?(resource, {:filter_expr, %Ash.Query.Function.Error{}}) do
    not AshPostgres.DataLayer.Info.repo(resource, :mutate).disable_expr_error?() &&
      "ash-functions" in AshPostgres.DataLayer.Info.repo(resource, :read).installed_extensions() &&
      "ash-functions" in AshPostgres.DataLayer.Info.repo(resource, :mutate).installed_extensions()
  end

  def can?(_, {:filter_expr, _}), do: true
  def can?(_, :nested_expressions), do: true
  def can?(_, {:query_aggregate, _}), do: true
  def can?(_, :sort), do: true
  def can?(_, :distinct_sort), do: true
  def can?(_, :distinct), do: true
  def can?(_, {:sort, _}), do: true
  def can?(_, _), do: false

  @impl true
  def in_transaction?(resource) do
    AshPostgres.DataLayer.Info.repo(resource, :mutate).in_transaction?()
  end

  @impl true
  def limit(query, nil, _), do: {:ok, query}

  def limit(query, limit, _resource) do
    {:ok, from(row in query, limit: ^limit)}
  end

  @impl true
  def source(resource) do
    AshPostgres.DataLayer.Info.table(resource) || ""
  end

  @impl true
  def set_context(resource, data_layer_query, context) do
    AshSql.Query.set_context(resource, data_layer_query, AshPostgres.SqlImplementation, context)
  end

  @impl true
  def offset(query, nil, _), do: query

  def offset(%{offset: old_offset} = query, 0, _resource) when old_offset in [0, nil] do
    {:ok, query}
  end

  def offset(query, offset, _resource) do
    {:ok, from(row in query, offset: ^offset)}
  end

  @impl true
  def return_query(query, resource) do
    query
    |> AshSql.Bindings.default_bindings(resource, AshPostgres.SqlImplementation)
    |> AshSql.Query.return_query(resource)
  end

  @impl true
  def run_query(query, resource) do
    query = AshSql.Bindings.default_bindings(query, resource, AshPostgres.SqlImplementation)

    if AshPostgres.DataLayer.Info.polymorphic?(resource) && no_table?(query) do
      raise_table_error!(resource, :read)
    else
      repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, query)

      with_savepoint(repo, query, fn ->
        repo.all(
          query,
          AshSql.repo_opts(repo, AshPostgres.SqlImplementation, nil, nil, resource)
        )
        |> AshSql.Query.remap_mapped_fields(query)
        |> then(fn results ->
          if query.__ash_bindings__.context[:data_layer][:combination_of_queries?] do
            Enum.map(results, fn result ->
              struct(resource, result)
              |> Map.put(:__meta__, %Ecto.Schema.Metadata{
                state: :loaded
              })
            end)
          else
            results
          end
        end)
        |> then(&{:ok, &1})
      end)
    end
  rescue
    e ->
      handle_raised_error(e, __STACKTRACE__, query, resource)
  end

  defp no_table?(%{from: %{source: {"", _}}}), do: true
  defp no_table?(_), do: false

  @impl true
  def functions(resource) do
    config = AshPostgres.DataLayer.Info.repo(resource, :mutate).config()

    functions = [
      AshPostgres.Functions.Like,
      AshPostgres.Functions.ILike,
      AshPostgres.Functions.Binding
    ]

    functions =
      if "pg_trgm" in (config[:installed_extensions] || []) do
        functions ++
          [
            AshPostgres.Functions.TrigramSimilarity
          ]
      else
        functions
      end

    if "vector" in (config[:installed_extensions] || []) do
      functions ++
        [
          AshPostgres.Functions.VectorCosineDistance,
          AshPostgres.Functions.VectorL2Distance
        ]
    else
      functions
    end
  end

  @impl true
  def run_aggregate_query(original_query, aggregates, resource) do
    AshSql.AggregateQuery.run_aggregate_query(
      original_query,
      aggregates,
      resource,
      AshPostgres.SqlImplementation
    )
  end

  @impl true
  def set_tenant(_resource, query, tenant) do
    {:ok, Map.put(Ecto.Query.put_query_prefix(query, to_string(tenant)), :__tenant__, tenant)}
  end

  @impl true
  def run_aggregate_query_with_lateral_join(
        query,
        aggregates,
        root_data,
        destination_resource,
        path
      ) do
    {can_group, cant_group} =
      aggregates
      |> Enum.split_with(&AshSql.Aggregate.can_group?(destination_resource, &1, query))
      |> case do
        {[one], cant_group} -> {[], [one | cant_group]}
        {can_group, cant_group} -> {can_group, cant_group}
      end

    case lateral_join_query(
           query,
           root_data,
           path
         ) do
      {:ok, lateral_join_query} ->
        source_resource =
          path
          |> Enum.at(0)
          |> elem(0)
          |> Map.get(:resource)

        subquery = from(row in subquery(lateral_join_query), as: ^0, select: %{})

        subquery =
          AshSql.Bindings.default_bindings(
            subquery,
            source_resource,
            AshPostgres.SqlImplementation
          )

        {global_filter, can_group} =
          AshSql.Aggregate.extract_shared_filters(can_group)

        original_subquery = subquery

        subquery =
          case global_filter do
            {:ok, global_filter} ->
              filter(subquery, global_filter, destination_resource)

            :error ->
              {:ok, subquery}
          end

        case subquery do
          {:error, error} ->
            {:error, error}

          {:ok, subquery} ->
            query =
              Enum.reduce(
                can_group,
                subquery,
                fn agg, subquery ->
                  has_exists? =
                    Ash.Filter.find(agg.query && agg.query.filter, fn
                      %Ash.Query.Exists{} -> true
                      _ -> false
                    end)

                  first_relationship =
                    Ash.Resource.Info.relationship(
                      source_resource,
                      agg.relationship_path |> Enum.at(0)
                    )

                  AshSql.Aggregate.add_subquery_aggregate_select(
                    subquery,
                    agg.relationship_path |> Enum.drop(1),
                    agg,
                    destination_resource,
                    has_exists?,
                    first_relationship
                  )
                end
              )

            result =
              case can_group do
                [] ->
                  %{}

                _ ->
                  repo =
                    AshSql.dynamic_repo(source_resource, AshPostgres.SqlImplementation, query)

                  repo.one(
                    query,
                    AshSql.repo_opts(
                      repo,
                      AshPostgres.SqlImplementation,
                      nil,
                      nil,
                      source_resource
                    )
                  )
              end

            {:ok,
             AshSql.AggregateQuery.add_single_aggs(
               result,
               source_resource,
               original_subquery,
               cant_group,
               AshPostgres.SqlImplementation
             )}
        end

      {:error, error} ->
        {:error, error}
    end
  end

  @impl true
  def run_query_with_lateral_join(
        query,
        root_data,
        _destination_resource,
        path
      ) do
    {calculations_require_rewrite, aggregates_require_rewrite, query} =
      AshSql.Query.rewrite_nested_selects(query)

    case lateral_join_query(
           query,
           root_data,
           path
         ) do
      {:ok, lateral_join_query} ->
        source_resource =
          path
          |> Enum.at(0)
          |> elem(0)
          |> Map.get(:resource)

        repo =
          AshSql.dynamic_repo(source_resource, AshPostgres.SqlImplementation, lateral_join_query)

        # patching strange behavior that sets `take` to this empty list even though I'm not telling it to
        lateral_join_query =
          case lateral_join_query do
            %{select: %{take: %{0 => {:map, []}}}} -> put_in(lateral_join_query.select.take, %{})
            _ -> lateral_join_query
          end

        results =
          repo.all(
            lateral_join_query,
            AshSql.repo_opts(repo, AshPostgres.SqlImplementation, nil, nil, source_resource)
          )
          |> AshSql.Query.remap_mapped_fields(
            query,
            calculations_require_rewrite,
            aggregates_require_rewrite
          )

        {:ok, results}

      {:error, error} ->
        {:error, error}
    end
  end

  defp lateral_join_query(
         query,
         root_data,
         [{source_query, source_attribute, destination_attribute, relationship}] = path
       ) do
    source_query = Ash.Query.new(source_query)

    base_query =
      if query.__ash_bindings__[:__order__?] do
        from(row in query,
          select_merge: %{__order__: over(row_number(), :order)}
        )
      else
        query
      end

    base_query =
      if Map.get(relationship, :from_many?) do
        from(row in base_query, limit: 1)
      else
        base_query
      end

    base_query =
      cond do
        Map.get(relationship, :manual) ->
          {module, opts} = relationship.manual

          case module.ash_postgres_subquery(opts, 0, 0, base_query) do
            {:ok, subquery} ->
              subquery

            {:error, error} ->
              {:error, error}

            subquery ->
              subquery
          end

        Map.get(relationship, :no_attributes?) ->
          base_query

        true ->
          from(destination in base_query,
            where:
              field(destination, ^destination_attribute) ==
                field(parent_as(^0), ^source_attribute)
          )
      end

    subquery =
      base_query
      |> set_subquery_prefix(source_query, relationship.destination)
      |> subquery()

    source_pkey = Ash.Resource.Info.primary_key(source_query.resource)

    case lateral_join_source_query(query, source_query, root_data, path) do
      {:ok, data_layer_query} ->
        source_values = Enum.map(root_data, &Map.get(&1, source_attribute))

        data_layer_query =
          if Map.get(relationship, :manual) || Map.get(relationship, :no_attributes?) do
            data_layer_query
          else
            source_filter =
              case source_pkey do
                [] ->
                  Ecto.Query.dynamic([source], field(source, ^source_attribute) in ^source_values)

                [field] ->
                  values = Enum.map(root_data, &Map.get(&1, field))
                  Ecto.Query.dynamic([source], field(source, ^field) in ^values)

                fields ->
                  Enum.reduce(root_data, nil, fn record, acc ->
                    row_match =
                      Enum.reduce(fields, nil, fn field, acc ->
                        if is_nil(acc) do
                          Ecto.Query.dynamic(
                            [source],
                            field(source, ^field) == ^Map.get(record, field)
                          )
                        else
                          Ecto.Query.dynamic(
                            [source],
                            field(source, ^field) == ^Map.get(record, field) and ^acc
                          )
                        end
                      end)

                    if is_nil(acc) do
                      row_match
                    else
                      Ecto.Query.dynamic(^row_match or ^acc)
                    end
                  end)
              end

            from(source in data_layer_query,
              where: ^source_filter
            )
          end

        data_layer_query =
          data_layer_query
          |> Ecto.Query.exclude(:distinct)
          |> Ecto.Query.exclude(:select)

        if query.__ash_bindings__[:__order__?] do
          {:ok,
           from(source in data_layer_query,
             inner_lateral_join: destination in ^subquery,
             on: true,
             order_by: destination.__order__,
             select: merge(destination, %{__lateral_join_source__: map(source, ^source_pkey)}),
             distinct: true
           )}
        else
          {:ok,
           from(source in data_layer_query,
             inner_lateral_join: destination in ^subquery,
             on: true,
             select: merge(destination, %{__lateral_join_source__: map(source, ^source_pkey)}),
             distinct: true
           )}
        end

      {:error, error} ->
        {:error, error}
    end
  end

  defp lateral_join_query(
         query,
         root_data,
         [
           {source_query, source_attribute, source_attribute_on_join_resource, relationship},
           {through_resource, destination_attribute_on_join_resource, destination_attribute,
            through_relationship}
         ] = path
       ) do
    source_query = Ash.Query.new(source_query)
    source_values = Enum.map(root_data, &Map.get(&1, source_attribute))
    source_pkey = Ash.Resource.Info.primary_key(source_query.resource)

    case lateral_join_source_query(query, source_query, root_data, path) do
      {:ok, data_layer_query} ->
        data_layer_query = Ecto.Query.exclude(data_layer_query, :select)

        through_binding = Map.get(query, :__ash_bindings__)[:current]

        through_resource
        |> Ash.Query.new()
        |> Ash.Query.put_context(:data_layer, %{
          start_bindings_at: through_binding
        })
        |> Ash.Query.set_context(through_relationship.context)
        |> Ash.Query.do_filter(through_relationship.filter)
        |> Ash.Query.set_tenant(source_query.tenant)
        |> set_lateral_join_prefix(query)
        |> case do
          %{valid?: true} = through_query ->
            through_query
            |> Ash.Query.data_layer_query()

          query ->
            {:error, query}
        end
        |> case do
          {:ok, through_query} ->
            through_query = Ecto.Query.exclude(through_query, :select)

            through_query =
              if through_query.joins && through_query.joins != [] do
                subquery(
                  set_subquery_prefix(
                    through_query,
                    source_query,
                    relationship.through
                  )
                )
              else
                through_query
              end

            if query.__ash_bindings__[:__order__?] do
              subquery =
                subquery(
                  from(
                    destination in query,
                    select_merge: %{__order__: over(row_number(), :order)},
                    join: through in ^through_query,
                    as: ^through_binding,
                    on:
                      field(through, ^destination_attribute_on_join_resource) ==
                        field(destination, ^destination_attribute),
                    where:
                      field(through, ^source_attribute_on_join_resource) ==
                        field(
                          parent_as(^0),
                          ^source_attribute
                        )
                  )
                  |> set_subquery_prefix(
                    source_query,
                    relationship.destination
                  )
                )

              {:ok,
               from(source in data_layer_query,
                 where: field(source, ^source_attribute) in ^source_values,
                 inner_lateral_join: destination in ^subquery,
                 on: true,
                 select: destination,
                 select_merge: %{__lateral_join_source__: map(source, ^source_pkey)},
                 order_by: destination.__order__,
                 distinct: true
               )}
            else
              subquery =
                subquery(
                  from(
                    destination in query,
                    join: through in ^through_query,
                    as: ^through_binding,
                    on:
                      field(through, ^destination_attribute_on_join_resource) ==
                        field(destination, ^destination_attribute),
                    where:
                      field(through, ^source_attribute_on_join_resource) ==
                        field(
                          parent_as(^0),
                          ^source_attribute
                        )
                  )
                  |> set_subquery_prefix(
                    source_query,
                    relationship.destination
                  )
                )

              data_layer_query = Ecto.Query.exclude(data_layer_query, :distinct)

              {:ok,
               from(source in data_layer_query,
                 where: field(source, ^source_attribute) in ^source_values,
                 inner_lateral_join: destination in ^subquery,
                 on: true,
                 select: destination,
                 select_merge: %{__lateral_join_source__: map(source, ^source_pkey)},
                 distinct: true
               )}
            end

          {:error, error} ->
            {:error, error}
        end

      {:error, error} ->
        {:error, error}
    end
  end

  defp lateral_join_source_query(
         %{
           __ash_bindings__: %{
             lateral_join_source_query: lateral_join_source_query
           }
         },
         source_query,
         _root_data,
         _path
       )
       when not is_nil(lateral_join_source_query) do
    {:ok,
     lateral_join_source_query
     |> set_subquery_prefix(source_query, lateral_join_source_query.__ash_bindings__.resource)}
  end

  defp lateral_join_source_query(query, source_query, root_data, path) do
    source_query.resource
    |> Ash.Query.set_context(%{:data_layer => source_query.context[:data_layer]})
    |> Ash.Query.set_context(%{
      :data_layer =>
        Map.put(
          source_query.context[:data_layer] || %{},
          :no_inner_join?,
          true
        )
        |> Map.delete(:lateral_join_source)
    })
    |> Ash.Query.set_tenant(source_query.tenant)
    |> filter_for_records(root_data)
    |> set_lateral_join_prefix(query)
    |> case do
      %{valid?: true} = query ->
        relationship = path |> List.first() |> elem(3)

        {:ok, expr} =
          Ash.Filter.hydrate_refs(relationship.filter, %{
            resource: relationship.destination,
            parent_stack: [relationship.source]
          })

        parent_expr = AshSql.Join.parent_expr(expr)

        used_aggregates =
          Ash.Filter.used_aggregates(parent_expr, [])

        with {:ok, query} <- Ash.Query.data_layer_query(query) do
          AshSql.Aggregate.add_aggregates(
            query,
            used_aggregates,
            relationship.source,
            false,
            query.__ash_bindings__.root_binding
          )
        end

      query ->
        {:error, query}
    end
  end

  defp filter_for_records(query, records) do
    keys =
      case Ash.Resource.Info.primary_key(query.resource) do
        [] ->
          case Ash.Resource.Info.identities(query.resource) do
            [%{keys: keys} | _] -> keys
            _ -> []
          end

        pkey ->
          pkey
      end

    expr =
      case keys do
        [] ->
          raise "Cannot use lateral joins with a resource that has no primary key and no identities"

        [key] ->
          Ash.Expr.expr(^Ash.Expr.ref(key) in ^Enum.map(records, &Map.get(&1, key)))

        keys ->
          Enum.reduce(records, Ash.Expr.expr(false), fn record, filter_expr ->
            all_keys_match_expr =
              Enum.reduce(keys, Ash.Expr.expr(true), fn key, key_expr ->
                Ash.Expr.expr(^key_expr and ^Ash.Expr.ref(key) == ^Map.get(record, key))
              end)

            Ash.Expr.expr(^filter_expr or ^all_keys_match_expr)
          end)
      end

    Ash.Query.do_filter(query, expr)
  end

  @doc false
  def set_subquery_prefix(data_layer_query, source_query, resource) do
    repo = AshPostgres.DataLayer.Info.repo(resource, :mutate)
    config = repo.config()

    case data_layer_query do
      %{__ash_bindings__: %{context: %{data_layer: %{schema: schema}}}} when not is_nil(schema) ->
        data_layer_query

      _ ->
        query_tenant =
          case source_query do
            %{__tenant__: tenant} -> tenant
            %{tenant: tenant} -> Ash.ToTenant.to_tenant(tenant, resource)
            _ -> nil
          end

        if Ash.Resource.Info.multitenancy_strategy(resource) == :context do
          %{
            data_layer_query
            | prefix:
                query_tenant || AshPostgres.DataLayer.Info.schema(resource) ||
                  config[:default_prefix] ||
                  "public"
          }
        else
          %{
            data_layer_query
            | prefix:
                AshPostgres.DataLayer.Info.schema(resource) || config[:default_prefix] ||
                  "public"
          }
        end
    end
  end

  defp set_lateral_join_prefix(ash_query, query) do
    if Ash.Resource.Info.multitenancy_strategy(ash_query.resource) == :context do
      Ash.Query.set_tenant(ash_query, query.prefix)
    else
      ash_query
    end
  end

  @impl true
  def resource_to_query(resource, domain) do
    AshSql.Query.resource_to_query(resource, AshPostgres.SqlImplementation, domain)
  end

  @impl true
  def combination_of(combination_of, resource, domain) do
    AshSql.Query.combination_of(combination_of, resource, domain, AshPostgres.SqlImplementation)
  end

  @impl true
  def update_query(query, changeset, resource, options) do
    repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, changeset)

    ecto_changeset =
      case changeset.data do
        %Ash.Changeset.OriginalDataNotAvailable{} ->
          changeset.resource.__struct__()

        data ->
          data
      end
      |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
      |> ecto_changeset(changeset, :update, repo, true)

    case bulk_updatable_query(
           query,
           resource,
           changeset.atomics,
           options[:calculations] || [],
           changeset.context
         ) do
      {:error, error} ->
        {:error, error}

      {:ok, query} ->
        try do
          repo_opts =
            AshSql.repo_opts(
              repo,
              AshPostgres.SqlImplementation,
              changeset.timeout,
              changeset.tenant,
              changeset.resource
            )

          case AshSql.Atomics.query_with_atomics(
                 resource,
                 query,
                 changeset.filter,
                 changeset.atomics,
                 ecto_changeset.changes,
                 []
               ) do
            {:empty, query} ->
              if options[:return_records?] do
                if changeset.context[:data_layer][:use_atomic_update_data?] do
                  case query.__ash_bindings__ do
                    %{expression_accumulator: %AshSql.Expr.ExprInfo{has_error?: true}} ->
                      # if the query could produce an error
                      # we must run it even if we will just be returning the original data.
                      repo.all(query, repo_opts)

                    _ ->
                      :ok
                  end

                  {:ok, [changeset.data]}
                else
                  {:ok, repo.all(query, repo_opts)}
                end
              else
                :ok
              end

            {:ok, query} ->
              query =
                if options[:return_records?] do
                  {:ok, query} =
                    if options[:action_select] do
                      query
                      |> Ecto.Query.exclude(:select)
                      |> Ecto.Query.select([row], struct(row, ^options[:action_select]))
                      |> add_calculations(options[:calculations] || [], resource)
                    else
                      query
                      |> Ecto.Query.exclude(:select)
                      |> Ecto.Query.select([row], row)
                      |> add_calculations(options[:calculations] || [], resource)
                    end

                  query
                else
                  Ecto.Query.exclude(query, :select)
                end

              {_, results} =
                with_savepoint(repo, query, fn ->
                  repo.update_all(
                    Map.delete(query, :__ash_bindings__),
                    [],
                    repo_opts
                  )
                end)

              if options[:return_records?] do
                results = AshSql.Query.remap_mapped_fields(results, query)

                if changeset.context[:data_layer][:use_atomic_update_data?] &&
                     Enum.count_until(results, 2) == 1 do
                  modifying =
                    Map.keys(changeset.attributes) ++
                      Keyword.keys(changeset.atomics) ++ Ash.Resource.Info.primary_key(resource)

                  result = hd(results)

                  Map.merge(changeset.data, Map.take(result, modifying))
                  |> Map.update!(:aggregates, &Map.merge(&1, result.aggregates))
                  |> Map.update!(:calculations, &Map.merge(&1, result.calculations))
                  |> then(&{:ok, [&1]})
                else
                  {:ok, results}
                end
              else
                :ok
              end

            {:error, error} ->
              {:error, error}
          end
        rescue
          e ->
            handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
        end
    end
  end

  defp bulk_updatable_query(query, resource, atomics, calculations, context, type \\ :update) do
    Enum.reduce_while(atomics, {:ok, query}, fn {_, expr}, {:ok, query} ->
      used_aggregates =
        Ash.Filter.used_aggregates(expr, [])

      with {:ok, query} <-
             AshSql.Join.join_all_relationships(
               query,
               %Ash.Filter{
                 resource: resource,
                 expression: expr
               },
               left_only?: true
             ),
           {:ok, query} <-
             AshSql.Aggregate.add_aggregates(query, used_aggregates, resource, false, 0) do
        {:cont, {:ok, query}}
      else
        {:error, error} ->
          {:halt, {:error, error}}
      end
    end)
    |> case do
      {:ok, query} ->
        requires_adding_inner_join? =
          case type do
            :update ->
              # could potentially optimize this to avoid the subquery by shuffling free
              # inner joins to the top of the query
              has_inner_join_to_start? =
                case Enum.at(query.joins, 0) do
                  nil ->
                    false

                  %{qual: :inner} ->
                    true

                  _ ->
                    false
                end

              cond do
                has_inner_join_to_start? ->
                  false

                Enum.any?(query.joins, &(&1.qual != :inner)) ->
                  true

                Enum.any?(atomics ++ calculations, fn {_, expr} ->
                  Ash.Filter.list_refs(expr) |> Enum.any?(&(&1.relationship_path != []))
                end) ->
                  true

                true ->
                  false
              end

            :destroy ->
              Enum.any?(query.joins, &(&1.qual != :inner)) ||
                Enum.any?(atomics ++ calculations, fn {_, expr} ->
                  expr |> Ash.Filter.list_refs() |> Enum.any?(&(&1.relationship_path != []))
                end)
          end

        has_exists? =
          Enum.any?(atomics, fn {_key, expr} ->
            Ash.Filter.find(expr, fn
              %Ash.Query.Exists{} ->
                true

              _ ->
                false
            end)
          end)

        needs_to_join? =
          requires_adding_inner_join? || query.distinct ||
            query.limit || query.offset || has_exists? || query.combinations != []

        query =
          if needs_to_join? do
            root_query = Ecto.Query.exclude(query, :select)

            root_query_result =
              cond do
                query.limit || query.offset ->
                  with {:ok, root_query} <-
                         AshSql.Atomics.select_atomics(resource, root_query, atomics) do
                    {:ok, from(row in Ecto.Query.subquery(root_query), []),
                     root_query.__ash_bindings__.expression_accumulator, atomics != []}
                  end

                !Enum.empty?(query.joins) || has_exists? ->
                  with root_query <- Ecto.Query.exclude(root_query, :order_by),
                       {:ok, root_query} <-
                         AshSql.Atomics.select_atomics(resource, root_query, atomics) do
                    {:ok, from(row in Ecto.Query.subquery(root_query), []),
                     root_query.__ash_bindings__.expression_accumulator, atomics != []}
                  end

                true ->
                  {:ok, Ecto.Query.exclude(root_query, :order_by),
                   Map.get(root_query, :__ash_bindings__).expression_accumulator, false}
              end

            case root_query_result do
              {:ok, root_query, acc, selected_atomics?} ->
                dynamic =
                  Enum.reduce(Ash.Resource.Info.primary_key(resource), nil, fn pkey, dynamic ->
                    if dynamic do
                      Ecto.Query.dynamic(
                        [row, joining],
                        field(row, ^pkey) == field(joining, ^pkey) and ^dynamic
                      )
                    else
                      Ecto.Query.dynamic(
                        [row, joining],
                        field(row, ^pkey) == field(joining, ^pkey)
                      )
                    end
                  end)

                faked_query =
                  from(row in query.from.source,
                    inner_join: limiter in ^root_query,
                    as: ^0,
                    on: ^dynamic
                  )
                  |> AshSql.Bindings.default_bindings(
                    query.__ash_bindings__.resource,
                    AshPostgres.SqlImplementation,
                    context
                  )
                  |> AshSql.Bindings.merge_expr_accumulator(acc)
                  |> then(fn query ->
                    if selected_atomics? do
                      Map.update!(query, :__ash_bindings__, &Map.put(&1, :atomics_in_binding, 0))
                    else
                      query
                    end
                  end)

                {:ok, faked_query}

              {:error, error} ->
                {:error, error}
            end
          else
            {:ok,
             query
             |> Ecto.Query.exclude(:select)
             |> Ecto.Query.exclude(:order_by)}
          end

        case query do
          {:ok, query} ->
            Enum.reduce_while(calculations, {:ok, query}, fn {_, expr}, {:ok, query} ->
              used_aggregates =
                Ash.Filter.used_aggregates(expr, [])

              with {:ok, query} <-
                     AshSql.Join.join_all_relationships(
                       query,
                       %Ash.Filter{
                         resource: resource,
                         expression: expr
                       },
                       left_only?: true
                     ),
                   {:ok, query} <-
                     AshSql.Aggregate.add_aggregates(query, used_aggregates, resource, false, 0) do
                {:cont, {:ok, query}}
              else
                {:error, error} ->
                  {:halt, {:error, error}}
              end
            end)

          {:error, error} ->
            {:error, error}
        end

      {:error, error} ->
        {:error, error}
    end
  end

  @impl true
  def destroy_query(query, changeset, resource, options) do
    repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, changeset)

    ecto_changeset =
      case changeset.data do
        %Ash.Changeset.OriginalDataNotAvailable{} ->
          changeset.resource.__struct__()

        data ->
          data
      end
      |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
      |> ecto_changeset(changeset, :delete, repo, true)

    case bulk_updatable_query(
           query,
           resource,
           [],
           options[:calculations] || [],
           changeset.context,
           :destroy
         ) do
      {:error, error} ->
        {:error, error}

      {:ok, query} ->
        try do
          repo_opts =
            AshSql.repo_opts(
              repo,
              AshPostgres.SqlImplementation,
              changeset.timeout,
              changeset.tenant,
              changeset.resource
            )

          query =
            if options[:return_records?] do
              {:ok, query} =
                case options[:action_select] do
                  nil ->
                    query
                    |> Ecto.Query.exclude(:select)
                    |> Ecto.Query.select([row], row)
                    |> add_calculations(options[:calculations] || [], resource)

                  action_select ->
                    query
                    |> Ecto.Query.exclude(:select)
                    |> Ecto.Query.select([row], struct(row, ^action_select))
                    |> add_calculations(options[:calculations] || [], resource)
                end

              query
            else
              Ecto.Query.exclude(query, :select)
            end

          # query = Ecto.Query.exclude(query, :distinct)

          {_, results} =
            with_savepoint(repo, query, fn ->
              repo.delete_all(
                query,
                repo_opts
              )
            end)

          if options[:return_records?] do
            {:ok, AshSql.Query.remap_mapped_fields(results, query)}
          else
            :ok
          end
        rescue
          e ->
            handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
        end
    end
  end

  @impl true
  def calculate(resource, expressions, context) do
    ash_query =
      resource
      |> Ash.Query.new()
      |> Map.put(:context, context)

    {:ok, query} = Ash.Query.data_layer_query(ash_query)

    query =
      AshSql.Bindings.default_bindings(query, resource, AshPostgres.SqlImplementation)

    try do
      {dynamics, query} =
        Enum.reduce(expressions, {[], query}, fn expression, {dynamics, query} ->
          {dynamic, acc} = AshSql.Expr.dynamic_expr(query, expression, query.__ash_bindings__)

          dynamic =
            case dynamic do
              %Ecto.Query.DynamicExpr{} ->
                dynamic

              other ->
                Ecto.Query.dynamic(^other)
            end

          {[dynamic | dynamics], AshSql.Bindings.merge_expr_accumulator(query, acc)}
        end)

      dynamics =
        dynamics
        |> Enum.with_index()
        |> Map.new(fn {dynamic, index} -> {index, dynamic} end)

      query =
        Ecto.Query.from(row in fragment("(VALUES(1))"), select: ^dynamics)
        |> Map.put(:__ash_bindings__, query.__ash_bindings__)

      repo =
        AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, ash_query)

      with_savepoint(repo, query, fn ->
        {:ok,
         repo.one(query) |> Enum.sort_by(&elem(&1, 0)) |> Enum.map(&elem(&1, 1)) |> Enum.reverse()}
      end)
    rescue
      e ->
        handle_raised_error(e, __STACKTRACE__, query, resource)
    end
  end

  @impl true
  def bulk_create(resource, stream, options) do
    changesets = Enum.to_list(stream)

    repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, Enum.at(changesets, 0))

    opts = AshSql.repo_opts(repo, AshPostgres.SqlImplementation, nil, options[:tenant], resource)

    opts =
      if options.return_records? do
        returning =
          case options[:action_select] do
            nil -> true
            [] -> Ash.Resource.Info.primary_key(resource)
            fields -> fields
          end

        Keyword.put(opts, :returning, returning)
      else
        opts
      end

    source = resolve_source(resource, Enum.at(changesets, 0))

    try do
      opts =
        if options[:upsert?] do
          # Ash groups changesets by atomics before dispatching them to the data layer
          # this means that all changesets have the same atomics
          %{atomics: atomics, filter: filter} = Enum.at(changesets, 0)

          query = from(row in source, as: ^0)

          query =
            query
            |> AshSql.Bindings.default_bindings(resource, AshPostgres.SqlImplementation)

          upsert_set =
            upsert_set(
              resource,
              changesets,
              options[:upsert_keys] || Ash.Resource.Info.primary_key(resource),
              options
            )

          on_conflict =
            case AshSql.Atomics.query_with_atomics(
                   resource,
                   query,
                   filter,
                   atomics,
                   %{},
                   upsert_set
                 ) do
              {:empty, _query} ->
                raise "Cannot upsert with no fields to specify in the upsert statement. This can only happen on resources without a primary key."

              {:ok, query} ->
                query

              {:error, error} ->
                raise Ash.Error.to_ash_error(error)
            end

          opts
          |> Keyword.put(:on_conflict, on_conflict)
          |> Keyword.put(
            :conflict_target,
            conflict_target(
              resource,
              options[:identity],
              options[:upsert_keys] || Ash.Resource.Info.primary_key(resource)
            )
          )
        else
          opts
        end

      ecto_changesets = Enum.map(changesets, & &1.attributes)

      opts =
        if schema = Enum.at(changesets, 0).context[:data_layer][:schema] do
          Keyword.put(opts, :prefix, schema)
        else
          opts
        end

      result =
        with_savepoint(repo, opts[:on_conflict], fn ->
          repo.insert_all(source, ecto_changesets, opts)
        end)

      case result do
        {_, nil} ->
          :ok

        {_, results} ->
          if options[:single?] do
            Enum.each(results, &maybe_create_tenant!(resource, &1))

            {:ok, results}
          else
            {:ok,
             Stream.zip_with(results, changesets, fn result, changeset ->
               if !opts[:upsert?] do
                 maybe_create_tenant!(resource, result)
               end

               Ash.Resource.put_metadata(
                 result,
                 :bulk_create_index,
                 changeset.context.bulk_create.index
               )
             end)}
          end
      end
    rescue
      e ->
        changeset =
          case source do
            {table, resource} ->
              resource
              |> Ash.Changeset.new()
              |> Ash.Changeset.put_context(:data_layer, %{table: table})

            resource ->
              resource
              |> Ash.Changeset.new()
          end

        handle_raised_error(
          e,
          __STACKTRACE__,
          {:bulk_create, ecto_changeset(changeset.data, changeset, :create, repo, false)},
          resource
        )
    end
  end

  defp with_savepoint(
         repo,
         %{
           __ash_bindings__: %{
             expression_accumulator: %AshSql.Expr.ExprInfo{has_error?: true}
           }
         },
         fun
       ) do
    if repo.in_transaction?() do
      savepoint_id = "a" <> (Ash.UUID.generate() |> String.replace("-", "_"))

      repo.query!("SAVEPOINT #{savepoint_id}")

      result =
        try do
          {:ok, fun.()}
        rescue
          e in Postgrex.Error ->
            case e do
              %Postgrex.Error{
                postgres: %{
                  code: :raise_exception,
                  message: "ash_error:" <> _,
                  severity: "ERROR"
                }
              } ->
                repo.query!("ROLLBACK TO #{savepoint_id}")
                # This kind of exception won't trigger
                # a rollback
                {:exception, e, __STACKTRACE__}

              _ ->
                {:exception, e, __STACKTRACE__}
            end
        end

      case result do
        {:exception, e, stacktrace} ->
          reraise e, stacktrace

        {:ok, result} ->
          repo.query!("RELEASE #{savepoint_id}")
          result
      end
    else
      fun.()
    end
  end

  defp with_savepoint(_repo, _acc, fun) do
    fun.()
  end

  defp upsert_set(resource, changesets, keys, options) do
    attributes_changing_anywhere =
      changesets |> Enum.flat_map(&Map.keys(&1.attributes)) |> Enum.uniq()

    update_defaults = update_defaults(resource)
    # We can't reference EXCLUDED if at least one of the changesets in the stream is not
    # changing the value (and we wouldn't want to even if we could as it would be unnecessary)

    upsert_fields =
      (options[:upsert_fields] || []) |> Enum.filter(&(&1 in attributes_changing_anywhere))

    fields_to_upsert =
      upsert_fields --
        (Keyword.keys(Enum.at(changesets, 0).atomics) -- keys)

    fields_to_upsert =
      case fields_to_upsert do
        [] -> keys
        fields_to_upsert -> fields_to_upsert
      end

    fields_to_upsert
    |> Enum.uniq()
    |> Enum.map(fn upsert_field ->
      # for safety, we check once more at the end that all values in
      # upsert_fields are names of attributes. This is because
      # below we use `identifier/1` to bring them into the query
      if is_nil(resource.__schema__(:type, upsert_field)) do
        raise "Only attribute names can be used in upsert_fields"
      end

      case Keyword.fetch(update_defaults, upsert_field) do
        {:ok, default} ->
          if upsert_field in upsert_fields do
            {upsert_field,
             Ecto.Query.dynamic(
               [],
               fragment(
                 "COALESCE(EXCLUDED.?, ?)",
                 identifier(^to_string(get_source_for_upsert_field(upsert_field, resource))),
                 ^default
               )
             )}
          else
            {upsert_field, default}
          end

        :error ->
          {upsert_field,
           Ecto.Query.dynamic(
             [],
             fragment(
               "EXCLUDED.?",
               identifier(^to_string(get_source_for_upsert_field(upsert_field, resource)))
             )
           )}
      end
    end)
  end

  @doc false
  def get_source_for_upsert_field(field, resource) do
    case Ash.Resource.Info.attribute(resource, field) do
      %{source: source} when not is_nil(source) ->
        source

      _ ->
        field
    end
  end

  @impl true
  def create(resource, changeset) do
    changeset = %{
      changeset
      | data:
          Map.update!(
            changeset.data,
            :__meta__,
            &Map.put(&1, :source, table(resource, changeset))
          )
    }

    case bulk_create(resource, [changeset], %{
           single?: true,
           tenant: Map.get(changeset, :to_tenant, changeset.tenant),
           action_select: changeset.action_select,
           return_records?: true
         }) do
      {:ok, [result]} ->
        {:ok, result}

      {:error, error} ->
        {:error, error}
    end
  end

  defp maybe_create_tenant!(resource, result) do
    if AshPostgres.DataLayer.Info.manage_tenant_create?(resource) do
      tenant_name = tenant_name(resource, result)

      AshPostgres.MultiTenancy.create_tenant!(
        tenant_name,
        AshPostgres.DataLayer.Info.repo(resource, :read)
      )
    else
      :ok
    end
  end

  defp maybe_update_tenant(resource, changeset, result) do
    if AshPostgres.DataLayer.Info.manage_tenant_update?(resource) do
      changing_tenant_name? =
        resource
        |> AshPostgres.DataLayer.Info.manage_tenant_template()
        |> Enum.filter(&is_atom/1)
        |> Enum.any?(&Ash.Changeset.changing_attribute?(changeset, &1))

      if changing_tenant_name? do
        old_tenant_name = tenant_name(resource, changeset.data)

        new_tenant_name = tenant_name(resource, result)

        AshPostgres.MultiTenancy.rename_tenant(
          AshPostgres.DataLayer.Info.repo(resource, :read),
          old_tenant_name,
          new_tenant_name
        )
      end
    end

    :ok
  end

  defp tenant_name(resource, result) do
    resource
    |> AshPostgres.DataLayer.Info.manage_tenant_template()
    |> Enum.map_join(fn item ->
      if is_binary(item) do
        item
      else
        result
        |> Map.get(item)
        |> to_string()
      end
    end)
  end

  defp ecto_changeset(record, changeset, type, repo, table_error?) do
    attributes =
      changeset.resource
      |> Ash.Resource.Info.attributes()
      |> Enum.map(& &1.name)

    attributes_to_change =
      Enum.reject(attributes, fn attribute ->
        Keyword.has_key?(changeset.atomics, attribute)
      end)

    ecto_changeset =
      record
      |> to_ecto()
      |> set_table(changeset, type, table_error?)
      |> Ecto.Changeset.cast(%{}, [])
      |> force_changes(Map.take(changeset.attributes, attributes_to_change))
      |> add_configured_foreign_key_constraints(record.__struct__, repo)
      |> add_unique_indexes(record.__struct__, changeset, repo)
      |> add_check_constraints(record.__struct__, repo)
      |> add_exclusion_constraints(record.__struct__, repo)

    case type do
      :create ->
        ecto_changeset
        |> add_my_foreign_key_constraints(record.__struct__, repo)

      type when type in [:upsert, :update] ->
        ecto_changeset
        |> add_my_foreign_key_constraints(record.__struct__, repo)
        |> add_related_foreign_key_constraints(record.__struct__, repo)

      :delete ->
        ecto_changeset
        |> add_related_foreign_key_constraints(record.__struct__, repo)
    end
  end

  defp force_changes(changeset, changes) do
    Enum.reduce(changes, changeset, fn {key, value}, changeset ->
      Ecto.Changeset.force_change(changeset, key, value)
    end)
  end

  defp handle_raised_error(
         %Ecto.StaleEntryError{changeset: %{data: %resource{}, filters: filters}},
         stacktrace,
         context,
         resource
       ) do
    handle_raised_error(
      Ash.Error.Changes.StaleRecord.exception(resource: resource, filter: filters),
      stacktrace,
      context,
      resource
    )
  end

  defp handle_raised_error(
         %Postgrex.Error{
           postgres: %{
             code: :lock_not_available,
             message: message
           }
         },
         stacktrace,
         context,
         resource
       ) do
    handle_raised_error(
      Ash.Error.Invalid.Unavailable.exception(
        resource: resource,
        source: inspect(context, pretty: true),
        reason: message
      ),
      stacktrace,
      context,
      resource
    )
  end

  defp handle_raised_error(
         %Postgrex.Error{
           postgres: %{
             code: :raise_exception,
             message: "ash_error: \"" <> json,
             severity: "ERROR"
           }
         },
         _,
         _,
         _
       ) do
    %{"exception" => exception, "input" => input} =
      json
      |> String.trim_trailing("\"")
      |> String.replace("\\\"", "\"")
      |> Jason.decode!()

    exception = Module.concat([exception])

    {:error, :no_rollback, Ash.Error.from_json(exception, input)}
  end

  defp handle_raised_error(
         %Postgrex.Error{
           postgres: %{
             code: :raise_exception,
             message: "ash_error: " <> json,
             severity: "ERROR"
           }
         },
         _,
         _,
         _
       ) do
    %{"exception" => exception, "input" => input} =
      Jason.decode!(json)

    exception = Module.concat([exception])

    {:error, :no_rollback, Ash.Error.from_json(exception, input)}
  end

  defp handle_raised_error(
         %Postgrex.Error{} = error,
         stacktrace,
         {:bulk_create, fake_changeset},
         resource
       ) do
    case Ecto.Adapters.Postgres.Connection.to_constraints(error, []) do
      [] ->
        {:error, Ash.Error.to_ash_error(error, stacktrace)}

      constraints ->
        {:error,
         fake_changeset
         |> constraints_to_errors(:insert, constraints, resource, error)
         |> Ash.Error.to_ash_error()}
    end
  end

  defp handle_raised_error(%Ecto.Query.CastError{} = e, stacktrace, context, resource) do
    handle_raised_error(
      Ash.Error.Query.InvalidFilterValue.exception(value: e.value, context: context),
      stacktrace,
      context,
      resource
    )
  end

  defp handle_raised_error(
         %Postgrex.Error{} = error,
         stacktrace,
         changeset,
         resource
       ) do
    case Ecto.Adapters.Postgres.Connection.to_constraints(error, []) do
      [] ->
        {:error, Ash.Error.to_ash_error(error, stacktrace)}

      constraints ->
        {:error,
         changeset
         |> constraints_to_errors(:insert, constraints, resource, error)
         |> Ash.Error.to_ash_error()}
    end
  end

  defp handle_raised_error(error, stacktrace, _ecto_changeset, _resource) do
    {:error, Ash.Error.to_ash_error(error, stacktrace)}
  end

  defp constraints_to_errors(
         %{constraints: user_constraints} = changeset,
         action,
         constraints,
         resource,
         error
       ) do
    Enum.map(constraints, fn {type, constraint} ->
      user_constraint =
        Enum.find(user_constraints, fn c ->
          case {c.type, c.constraint, c.match} do
            {^type, ^constraint, :exact} -> true
            {^type, cc, :suffix} -> String.ends_with?(constraint, cc)
            {^type, cc, :prefix} -> String.starts_with?(constraint, cc)
            {^type, %Regex{} = r, _match} -> Regex.match?(r, constraint)
            _ -> false
          end
        end)

      case user_constraint do
        %{field: field, error_message: error_message, type: type, constraint: constraint} ->
          identities = Ash.Resource.Info.identities(resource)
          table = AshPostgres.DataLayer.Info.table(resource)

          identity =
            Enum.find(identities, fn identity ->
              "#{table}_#{identity.name}_index" == constraint
            end)

          field_names = if identity, do: identity.field_names || [field], else: [field]

          Enum.map(field_names, fn field_name ->
            Ash.Error.Changes.InvalidAttribute.exception(
              field: field_name,
              message: error_message,
              private_vars: [
                constraint: constraint,
                constraint_type: type,
                detail: error.postgres.detail
              ]
            )
          end)

        nil ->
          Ecto.ConstraintError.exception(
            action: action,
            type: type,
            constraint: constraint,
            changeset: changeset
          )
      end
    end)
  end

  defp set_table(record, changeset, operation, table_error?) do
    if AshPostgres.DataLayer.Info.polymorphic?(record.__struct__) do
      table =
        changeset.context[:data_layer][:table] ||
          AshPostgres.DataLayer.Info.table(record.__struct__)

      record =
        if table do
          Ecto.put_meta(record, source: table)
        else
          if table_error? do
            raise_table_error!(changeset.resource, operation)
          else
            record
          end
        end

      prefix =
        changeset.context[:data_layer][:schema] ||
          AshPostgres.DataLayer.Info.schema(record.__struct__)

      if prefix do
        Ecto.put_meta(record, prefix: table)
      else
        record
      end
    else
      record
    end
  end

  def from_ecto({:ok, result}), do: {:ok, from_ecto(result)}
  def from_ecto({:error, _} = other), do: other

  def from_ecto(nil), do: nil

  def from_ecto(value) when is_list(value) do
    Enum.map(value, &from_ecto/1)
  end

  def from_ecto(%resource{} = record) do
    if Spark.Dsl.is?(resource, Ash.Resource) do
      empty = struct(resource)

      resource
      |> Ash.Resource.Info.relationships()
      |> Enum.reduce(record, fn relationship, record ->
        case Map.get(record, relationship.name) do
          %Ecto.Association.NotLoaded{} ->
            Map.put(record, relationship.name, Map.get(empty, relationship.name))

          value ->
            Map.put(record, relationship.name, from_ecto(value))
        end
      end)
    else
      record
    end
  end

  def from_ecto(other), do: other

  def to_ecto(nil), do: nil

  def to_ecto(value) when is_list(value) do
    Enum.map(value, &to_ecto/1)
  end

  def to_ecto(%resource{} = record) do
    if Spark.Dsl.is?(resource, Ash.Resource) do
      resource
      |> Ash.Resource.Info.relationships()
      |> Enum.reduce(record, fn relationship, record ->
        value =
          case Map.get(record, relationship.name) do
            %Ash.NotLoaded{} ->
              %Ecto.Association.NotLoaded{
                __field__: relationship.name,
                __cardinality__: relationship.cardinality
              }

            value ->
              to_ecto(value)
          end

        Map.put(record, relationship.name, value)
      end)
    else
      record
    end
  end

  def to_ecto(other), do: other

  defp add_check_constraints(changeset, resource, repo) do
    resource
    |> AshPostgres.DataLayer.Info.check_constraints()
    |> Enum.reduce(changeset, fn constraint, changeset ->
      constraint.attribute
      |> List.wrap()
      |> Enum.reduce(changeset, fn attribute, changeset ->
        case repo.default_constraint_match_type(:check, constraint.name) do
          {:regex, regex} ->
            Ecto.Changeset.check_constraint(changeset, attribute,
              name: regex,
              message: constraint.message || "is invalid",
              match: :exact
            )

          match ->
            Ecto.Changeset.check_constraint(changeset, attribute,
              name: constraint.name,
              message: constraint.message || "is invalid",
              match: match
            )
        end
      end)
    end)
  end

  defp add_exclusion_constraints(changeset, resource, repo) do
    resource
    |> AshPostgres.DataLayer.Info.exclusion_constraint_names()
    |> Enum.reduce(changeset, fn constraint, changeset ->
      case constraint do
        {key, name} ->
          case repo.default_constraint_match_type(:check, name) do
            {:regex, regex} ->
              Ecto.Changeset.exclusion_constraint(changeset, key,
                name: regex,
                match: :exact
              )

            match ->
              Ecto.Changeset.exclusion_constraint(changeset, key,
                name: name,
                match: match
              )
          end

        {key, name, message} ->
          case repo.default_constraint_match_type(:check, name) do
            {:regex, regex} ->
              Ecto.Changeset.exclusion_constraint(changeset, key,
                name: regex,
                message: message,
                match: :exact
              )

            match ->
              Ecto.Changeset.exclusion_constraint(changeset, key,
                name: name,
                message: message,
                match: match
              )
          end
      end
    end)
  end

  defp add_related_foreign_key_constraints(changeset, resource, repo) do
    # TODO: this doesn't guarantee us to get all of them, because if something is related to this
    # schema and there is no back-relation, then this won't catch it's foreign key constraints
    resource
    |> Ash.Resource.Info.relationships()
    |> Enum.map(& &1.destination)
    |> Enum.uniq()
    |> Enum.flat_map(fn related ->
      related
      |> Ash.Resource.Info.relationships()
      |> Enum.filter(&(&1.destination == resource))
      |> Enum.map(&Map.take(&1, [:source, :source_attribute, :destination_attribute, :name]))
    end)
    |> Enum.reduce(changeset, fn %{
                                   source: source,
                                   source_attribute: source_attribute,
                                   destination_attribute: destination_attribute,
                                   name: relationship_name
                                 },
                                 changeset ->
      case AshPostgres.DataLayer.Info.reference(resource, relationship_name) do
        %{name: name} when not is_nil(name) ->
          case repo.default_constraint_match_type(:foreign, name) do
            {:regex, regex} ->
              Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute,
                name: regex,
                message: "would leave records behind",
                match: :exact
              )

            match ->
              Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute,
                name: name,
                message: "would leave records behind",
                match: match
              )
          end

        _ ->
          name = "#{AshPostgres.DataLayer.Info.table(source)}_#{source_attribute}_fkey"

          case repo.default_constraint_match_type(:foreign, name) do
            {:regex, regex} ->
              Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute,
                name: regex,
                message: "would leave records behind",
                match: :exact
              )

            match ->
              Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute,
                name: name,
                message: "would leave records behind",
                match: match
              )
          end
      end
    end)
  end

  defp add_my_foreign_key_constraints(changeset, resource, repo) do
    resource
    |> Ash.Resource.Info.relationships()
    |> Enum.reduce(changeset, fn relationship, changeset ->
      # Check if there's a custom reference name defined in the DSL
      name =
        case AshPostgres.DataLayer.Info.reference(resource, relationship.name) do
          %{name: custom_name} when not is_nil(custom_name) ->
            custom_name

          _ ->
            "#{AshPostgres.DataLayer.Info.table(resource)}_#{relationship.source_attribute}_fkey"
        end

      case repo.default_constraint_match_type(:foreign, name) do
        {:regex, regex} ->
          Ecto.Changeset.foreign_key_constraint(changeset, relationship.source_attribute,
            name: regex,
            match: :exact
          )

        match ->
          Ecto.Changeset.foreign_key_constraint(changeset, relationship.source_attribute,
            name: name,
            match: match
          )
      end
    end)
  end

  defp add_configured_foreign_key_constraints(changeset, resource, repo) do
    resource
    |> AshPostgres.DataLayer.Info.foreign_key_names()
    |> case do
      {m, f, a} -> List.wrap(apply(m, f, [changeset | a]))
      value -> List.wrap(value)
    end
    |> Enum.reduce(changeset, fn
      {key, name}, changeset ->
        case repo.default_constraint_match_type(:foreign, name) do
          {:regex, regex} ->
            Ecto.Changeset.foreign_key_constraint(changeset, key,
              name: regex,
              match: :exact
            )

          match ->
            Ecto.Changeset.foreign_key_constraint(changeset, key,
              name: name,
              match: match
            )
        end

      {key, name, message}, changeset ->
        case repo.default_constraint_match_type(:foreign, name) do
          {:regex, regex} ->
            Ecto.Changeset.foreign_key_constraint(changeset, key,
              name: regex,
              message: message,
              match: :exact
            )

          match ->
            Ecto.Changeset.foreign_key_constraint(changeset, key,
              name: name,
              message: message,
              match: match
            )
        end
    end)
  end

  defp add_unique_indexes(changeset, resource, ash_changeset, repo) do
    table = table(resource, ash_changeset)
    pkey = Ash.Resource.Info.primary_key(resource)

    changeset =
      resource
      |> Ash.Resource.Info.identities()
      |> Enum.reduce(changeset, fn identity, changeset ->
        name =
          AshPostgres.DataLayer.Info.identity_index_names(resource)[identity.name] ||
            "#{table}_#{identity.name}_index"

        opts =
          case repo.default_constraint_match_type(:unique, name) do
            {:regex, regex} ->
              if Map.get(identity, :message) do
                [name: regex, message: identity.message, match: :exact]
              else
                [name: regex, match: :exact]
              end

            index_match_type ->
              if Map.get(identity, :message) do
                [name: name, message: identity.message, match: index_match_type]
              else
                [name: name, match: index_match_type]
              end
          end

        fields =
          case identity.keys do
            [] ->
              pkey

            keys ->
              keys
          end

        Ecto.Changeset.unique_constraint(changeset, fields, opts)
      end)

    changeset =
      resource
      |> AshPostgres.DataLayer.Info.custom_indexes()
      |> Enum.reduce(changeset, fn index, changeset ->
        name = index.name || AshPostgres.CustomIndex.name(table, index)

        opts =
          case repo.default_constraint_match_type(:custom, name) do
            {:regex, regex} ->
              if Map.get(index, :message) do
                [name: regex, message: index.message, match: :exact]
              else
                [name: regex, match: :exact]
              end

            index_match_type ->
              if Map.get(index, :message) do
                [name: name, message: index.message, match: index_match_type]
              else
                [name: name, match: index_match_type]
              end
          end

        fields =
          if index.error_fields do
            case index.error_fields do
              [] -> pkey
              fields -> fields
            end
          else
            case Enum.filter(index.fields, &is_atom/1) do
              [] -> pkey
              fields -> fields
            end
          end

        Ecto.Changeset.unique_constraint(changeset, fields, opts)
      end)

    names =
      resource
      |> AshPostgres.DataLayer.Info.unique_index_names()
      |> case do
        {m, f, a} -> List.wrap(apply(m, f, [changeset | a]))
        value -> List.wrap(value)
      end

    names =
      case Ash.Resource.Info.primary_key(resource) do
        [] ->
          names

        fields ->
          if table = table(resource, ash_changeset) do
            [{fields, table <> "_pkey"} | names]
          else
            []
          end
      end

    Enum.reduce(names, changeset, fn
      {keys, name}, changeset ->
        case repo.default_constraint_match_type(:unique, name) do
          {:regex, regex} ->
            Ecto.Changeset.unique_constraint(changeset, List.wrap(keys),
              name: regex,
              match: :exact
            )

          match ->
            Ecto.Changeset.unique_constraint(changeset, List.wrap(keys),
              name: name,
              match: match
            )
        end

      {keys, name, message}, changeset ->
        case repo.default_constraint_match_type(:unique, name) do
          {:regex, regex} ->
            Ecto.Changeset.unique_constraint(changeset, List.wrap(keys),
              name: regex,
              message: message,
              match: :exact
            )

          match ->
            Ecto.Changeset.unique_constraint(changeset, List.wrap(keys),
              name: name,
              message: message,
              match: match
            )
        end
    end)
  end

  @impl true
  def upsert(resource, changeset, keys, identity) do
    if AshPostgres.DataLayer.Info.manage_tenant_update?(resource) do
      {:error, "Cannot currently upsert a resource that owns a tenant"}
    else
      keys = keys || Ash.Resource.Info.primary_key(keys)

      update_defaults = update_defaults(resource)

      explicitly_changing_attributes =
        changeset.attributes
        |> Map.keys()
        |> Enum.concat(Keyword.keys(update_defaults))
        |> Kernel.--(Map.get(changeset, :defaults, []))
        |> Kernel.--(keys)

      upsert_fields =
        changeset.context[:private][:upsert_fields] || explicitly_changing_attributes

      case bulk_create(resource, [changeset], %{
             single?: true,
             upsert?: true,
             tenant: changeset.tenant,
             identity: identity,
             upsert_keys: keys,
             action_select: changeset.action_select,
             upsert_fields: upsert_fields,
             return_records?: true
           }) do
        {:ok, []} ->
          key_filters =
            Enum.map(keys, fn key ->
              {key,
               Ash.Changeset.get_attribute(changeset, key) || Map.get(changeset.params, key) ||
                 Map.get(changeset.params, to_string(key))}
            end)

          ash_query =
            resource
            |> Ash.Query.do_filter(and: [key_filters])
            |> then(fn
              query when is_nil(identity) or is_nil(identity.where) -> query
              query -> Ash.Query.do_filter(query, identity.where)
            end)
            |> Ash.Query.set_tenant(changeset.tenant)

          {:ok,
           {:upsert_skipped, ash_query,
            fn ->
              with {:ok, ecto_query} <- Ash.Query.data_layer_query(ash_query),
                   {:ok, [result]} <- run_query(ecto_query, resource) do
                {:ok, Ash.Resource.put_metadata(result, :upsert_skipped, true)}
              end
            end}}

        {:ok, [result]} ->
          {:ok, result}

        {:error, :no_rollback, error} ->
          {:error, :no_rollback, error}

        {:error, error} ->
          {:error, error}
      end
    end
  end

  defp conflict_target(resource, identity, keys) do
    identity_where =
      case identity do
        %{name: name, where: where} when not is_nil(where) ->
          AshPostgres.DataLayer.Info.identity_where_to_sql(resource, name) ||
            raise("""
            Must provide an entry for :#{identity.name} in `postgres.identity_wheres_to_sql` to use it as an upsert_identity.

            See https://hexdocs.pm/ash_postgres/AshPostgres.DataLayer.Info.html#identity_wheres_to_sql/1 for an example.
            """)

        _ ->
          nil
      end

    base_filter_sql =
      case Ash.Resource.Info.base_filter(resource) do
        nil ->
          nil

        _base_filter ->
          AshPostgres.DataLayer.Info.base_filter_sql(resource) ||
            raise """
            Cannot use upserts with resources that have a base_filter without also adding `base_filter_sql` in the postgres section.
            """
      end

    where =
      case {base_filter_sql, identity_where} do
        {nil, nil} ->
          nil

        {base_filter_sql, nil} ->
          " WHERE (#{base_filter_sql})"

        {nil, identity_where} ->
          " WHERE (#{identity_where})"

        {base_filter_sql, identity_where} ->
          " WHERE ((#{base_filter_sql}) AND (#{identity_where}))"
      end

    if is_nil(where) && Enum.all?(keys, &Ash.Resource.Info.attribute(resource, &1)) do
      keys
    else
      sources = sources_to_sql(resource, keys)
      {:unsafe_fragment, "(" <> Enum.join(sources, ", ") <> ")#{where}"}
    end
  end

  defp sources_to_sql(resource, keys) do
    Enum.map(keys, fn key ->
      case Ash.Resource.Info.field(resource, key) do
        %Ash.Resource.Attribute{source: source, name: name} ->
          ~s("#{source || name}")

        %Ash.Resource.Calculation{name: name} ->
          if sql = AshPostgres.DataLayer.Info.calculation_to_sql(resource, name) do
            "(" <> sql <> ")"
          else
            raise ArgumentError,
                  "Calculation #{inspect(key)} used in `AshPostgres.DataLayer` conflict target must have its sql defined in `calculations_to_sql`"
          end

        _other ->
          raise ArgumentError,
                "Unsupported field #{inspect(key)} used in `AshPostgres.DataLayer` conflict target"
      end
    end)
  end

  defp update_defaults(resource) do
    attributes =
      resource
      |> Ash.Resource.Info.attributes()
      |> Enum.reject(&is_nil(&1.update_default))

    attributes
    |> static_defaults()
    |> Enum.concat(lazy_matching_defaults(attributes))
    |> Enum.concat(lazy_non_matching_defaults(attributes))
  end

  defp static_defaults(attributes) do
    attributes
    |> Enum.reject(&get_default_fun(&1))
    |> Enum.map(&{&1.name, &1.update_default})
  end

  defp lazy_non_matching_defaults(attributes) do
    attributes
    |> Enum.filter(&(!&1.match_other_defaults? && get_default_fun(&1)))
    |> Enum.map(fn attribute ->
      default_value =
        case attribute.update_default do
          function when is_function(function) ->
            function.()

          {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
            apply(m, f, a)
        end

      {:ok, default_value} =
        Ash.Type.cast_input(attribute.type, default_value, attribute.constraints)

      {attribute.name, default_value}
    end)
  end

  defp lazy_matching_defaults(attributes) do
    attributes
    |> Enum.filter(&(&1.match_other_defaults? && get_default_fun(&1)))
    |> Enum.group_by(&{&1.update_default, &1.type, &1.constraints})
    |> Enum.flat_map(fn {{default_fun, type, constraints}, attributes} ->
      default_value =
        case default_fun do
          function when is_function(function) ->
            function.()

          {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
            apply(m, f, a)
        end

      {:ok, default_value} =
        Ash.Type.cast_input(type, default_value, constraints)

      Enum.map(attributes, &{&1.name, default_value})
    end)
  end

  defp get_default_fun(attribute) do
    if is_function(attribute.update_default) or match?({_, _, _}, attribute.update_default) do
      attribute.update_default
    end
  end

  @impl true
  def update(resource, changeset) do
    source = resolve_source(resource, changeset)

    query =
      from(row in source, as: ^0)
      |> AshSql.Bindings.default_bindings(
        resource,
        AshPostgres.SqlImplementation,
        changeset.context
      )
      |> pkey_filter(changeset.data)
      |> then(fn query ->
        Map.put(
          query,
          :__ash_bindings__,
          Map.put_new(query.__ash_bindings__, :tenant, changeset.tenant)
        )
      end)

    changeset =
      Ash.Changeset.set_context(changeset, %{
        data_layer: %{
          use_atomic_update_data?: true
        }
      })

    case update_query(query, changeset, resource, %{
           return_records?: true,
           action_select: changeset.action_select,
           calculations: []
         }) do
      {:ok, []} ->
        {:error,
         Ash.Error.Changes.StaleRecord.exception(
           resource: resource,
           filter: changeset.filter
         )}

      {:ok, [record]} ->
        maybe_update_tenant(resource, changeset, record)

        {:ok, record}

      {:error, error} ->
        {:error, error}

      {:error, :no_rollback, error} ->
        {:error, :no_rollback, error}
    end
  end

  defp pkey_filter(query, %resource{} = record) do
    pkey =
      record
      |> Map.take(Ash.Resource.Info.primary_key(resource))
      |> Map.to_list()

    Ecto.Query.where(query, ^pkey)
  end

  @impl true

  def destroy(resource, %{data: record} = changeset) do
    source = resolve_source(resource, changeset)

    query =
      from(row in source, as: ^0)
      |> AshSql.Bindings.default_bindings(
        resource,
        AshPostgres.SqlImplementation,
        changeset.context
      )
      |> pkey_filter(record)

    repo = AshSql.dynamic_repo(resource, AshPostgres.SqlImplementation, changeset)

    with {:ok, query} <- filter(query, changeset.filter, resource) do
      ecto_changeset =
        case changeset.data do
          %Ash.Changeset.OriginalDataNotAvailable{} ->
            changeset.resource.__struct__()

          data ->
            data
        end
        |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
        |> ecto_changeset(changeset, :delete, repo, true)

      case bulk_updatable_query(
             query,
             resource,
             [],
             [],
             changeset.context,
             :destroy
           ) do
        {:error, error} ->
          {:error, error}

        {:ok, query} ->
          try do
            repo_opts =
              AshSql.repo_opts(
                repo,
                AshPostgres.SqlImplementation,
                changeset.timeout,
                changeset.tenant,
                changeset.resource
              )

            query = Ecto.Query.exclude(query, :select)

            with_savepoint(repo, query, fn ->
              repo.delete_all(
                query,
                repo_opts
              )
              |> case do
                {0, _} ->
                  {:error,
                   Ash.Error.Changes.StaleRecord.exception(
                     resource: resource,
                     filter: changeset.filter
                   )}

                _ ->
                  :ok
              end
            end)
          rescue
            e ->
              handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
          end
      end
    end
  end

  @impl true
  def lock(query, :for_update, _) do
    if query.distinct do
      new_query =
        Ecto.Query.lock(%{query | distinct: nil}, [{^0, a}], fragment("FOR UPDATE OF ?", a))

      q = from(row in subquery(new_query), [])
      {:ok, %{q | distinct: query.distinct}}
    else
      {:ok, Ecto.Query.lock(query, [{^0, a}], fragment("FOR UPDATE OF ?", a))}
    end
  end

  @locks [
    "FOR UPDATE",
    "FOR NO KEY UPDATE",
    "FOR SHARE",
    "FOR KEY SHARE"
  ]

  for lock <- @locks do
    frag = "#{lock} OF ?"

    def lock(query, unquote(lock), _) do
      {:ok, Ecto.Query.lock(query, [{^0, a}], fragment(unquote(frag), a))}
    end

    frag = "#{lock} OF ? NOWAIT"
    lock = "#{lock} NOWAIT"

    def lock(query, unquote(lock), _) do
      {:ok, Ecto.Query.lock(query, [{^0, a}], fragment(unquote(frag), a))}
    end

    frag = "#{lock} OF ? SKIP LOCKED"
    lock = "#{lock} SKIP LOCKED"

    def lock(query, unquote(lock), _) do
      {:ok, Ecto.Query.lock(query, [{^0, a}], fragment(unquote(frag), a))}
    end
  end

  @impl true
  def sort(query, sort, _resource) do
    query = maybe_subquery_upgrade(query, {:sort, sort})

    {:ok,
     Map.update!(
       query,
       :__ash_bindings__,
       &Map.put(&1, :sort, sort)
     )}
  end

  @impl true
  def select(query, select, _resource) do
    query = maybe_subquery_upgrade(query, {:select, select})

    if query.__ash_bindings__.context[:data_layer][:combination_query?] ||
         query.__ash_bindings__.context[:data_layer][:combination_of_queries?] do
      binding = query.__ash_bindings__.root_binding

      {query, select} =
        if field_set = query.__ash_bindings__[:already_selected] do
          {query, select -- field_set}
        else
          {from(row in Ecto.Query.exclude(query, :select), select: %{}), select}
        end

      Enum.reduce(select, query, fn field, query ->
        from(row in query, select_merge: %{^field => field(as(^binding), ^field)})
      end)
      |> then(&{:ok, &1})
    else
      {:ok, from(row in query, select: struct(row, ^Enum.uniq(select)))}
    end
  end

  @impl true
  def distinct_sort(query, sort, _) when sort in [nil, []] do
    {:ok, query}
  end

  def distinct_sort(query, sort, _) do
    query = maybe_subquery_upgrade(query, {:distinct_sort, sort})
    {:ok, Map.update!(query, :__ash_bindings__, &Map.put(&1, :distinct_sort, sort))}
  end

  # If the order by does not match the initial sort clause, then we use a subquery
  # to limit to only distinct rows. This may not perform that well, so we may need
  # to come up with alternatives here.
  @impl true
  def distinct(query, distinct, resource) do
    query = maybe_subquery_upgrade(query, {:distinct, distinct})
    AshSql.Distinct.distinct(query, distinct, resource)
  end

  @impl true
  def filter(query, filter, resource, opts \\ []) do
    query = maybe_subquery_upgrade(query, {:filter, filter})
    used_aggregates = Ash.Filter.used_aggregates(filter, [])

    query =
      AshSql.Bindings.default_bindings(query, resource, AshPostgres.SqlImplementation)

    query
    |> AshSql.Join.join_all_relationships(filter, opts)
    |> case do
      {:ok, query} ->
        query
        |> AshSql.Aggregate.add_aggregates(
          used_aggregates,
          resource,
          false,
          query.__ash_bindings__.root_binding
        )
        |> case do
          {:ok, query} ->
            {:ok, AshSql.Filter.add_filter_expression(query, filter)}

          {:error, error} ->
            {:error, error}
        end

      {:error, error} ->
        {:error, error}
    end
  end

  defp maybe_subquery_upgrade(
         %{__ash_bindings__: %{subquery_upgrade?: true}} = query,
         _
       ) do
    query
  end

  defp maybe_subquery_upgrade(query, type) do
    fieldset = query.__ash_bindings__.context[:data_layer][:combination_fieldset]

    if query.__ash_bindings__.context[:data_layer][:combination_of_queries?] && fieldset do
      requires_join? =
        case type do
          {:filter, contents} ->
            Enum.any?(
              Ash.Filter.list_refs(contents),
              &(&1.relationship_path != [] || &1.attribute.name not in fieldset)
            )

          {:calculations, calculations} ->
            Enum.any?(calculations, fn {_, expr} ->
              Enum.any?(
                Ash.Filter.list_refs(expr),
                &(&1.relationship_path != [] || &1.attribute.name not in fieldset)
              )
            end)

          {sort, sorts} when sort in [:sort, :distinct, :distinct_sort] ->
            Enum.any?(sorts, fn
              {atom, _} when is_atom(atom) ->
                atom not in fieldset

              {%Ash.Query.Calculation{} = calc, _} ->
                calc.opts
                |> calc.module.expression(calc.context)
                |> Ash.Filter.hydrate_refs(%{
                  resource: query.__ash_bindings__.resource,
                  parent_stack: query.__ash_bindings__[:parent_resources] || [],
                  public?: false
                })
                |> Ash.Filter.list_refs()
                |> Enum.any?(&(&1.relationship_path != [] || &1.attribute.name not in fieldset))

              _ ->
                true
            end)

          {:select, select} ->
            Enum.any?(select, &(&1 not in fieldset))
        end

      resource = query.__ash_bindings__.resource

      if requires_join? do
        primary_key = Ash.Resource.Info.primary_key(query.__ash_bindings__.resource)

        if primary_key != [] && primary_key -- fieldset == [] do
          dynamic =
            Enum.reduce(primary_key, nil, fn key, expr ->
              if is_nil(expr) do
                Ecto.Query.dynamic([l, r], field(l, ^key) == field(r, ^key))
              else
                Ecto.Query.dynamic([l, r], field(l, ^key) == field(r, ^key) and ^expr)
              end
            end)

          default_select =
            MapSet.to_list(
              Ash.Resource.Info.selected_by_default_attribute_names(
                query.__ash_bindings__.resource
              )
            )

          query_with_select =
            from(sub in Ecto.Query.exclude(query, :select),
              join: row in ^query.__ash_bindings__.resource,
              # why doesn't `.root_binding` work the way I expect it to here?
              on: ^dynamic,
              select: map(row, ^default_select),
              select_merge: map(sub, ^fieldset)
            )

          from(row in subquery(query_with_select), as: ^0)
          |> AshSql.Bindings.default_bindings(resource, AshPostgres.SqlImplementation)
          |> Map.update!(
            :__ash_bindings__,
            &Map.merge(&1, %{
              already_selected: fieldset,
              subquery_upgrade?: true,
              sort: query.__ash_bindings__[:sort],
              context: query.__ash_bindings__.context
            })
          )
        else
          raise """
          Unsupported combination query. Combinations must select the primary key if referencing
          any fields that are *not* selected by the combinations in filter, sort & distinct.
          """
        end
      else
        query
      end
    else
      query
    end
  end

  @impl true
  def add_aggregates(query, aggregates, resource) do
    AshSql.Aggregate.add_aggregates(
      query,
      aggregates,
      resource,
      true,
      query.__ash_bindings__.root_binding
    )
  end

  @impl true
  def add_calculations(query, calculations, resource, select? \\ true) do
    query = maybe_subquery_upgrade(query, {:calculations, calculations})

    AshSql.Calculation.add_calculations(
      query,
      calculations,
      resource,
      query.__ash_bindings__.root_binding,
      select?
    )
  end

  def add_known_binding(query, data, known_binding) do
    bindings = query.__ash_bindings__.bindings

    new_ash_bindings = %{
      query.__ash_bindings__
      | bindings: Map.put(bindings, known_binding, data)
    }

    %{query | __ash_bindings__: new_ash_bindings}
  end

  @impl true
  def transaction(resource, func, timeout \\ nil, reason \\ %{type: :custom, metadata: %{}}) do
    repo =
      case reason[:data_layer_context] do
        %{repo: repo} when not is_nil(repo) ->
          repo

        _ ->
          AshPostgres.DataLayer.Info.repo(resource, :read)
      end

    func = fn ->
      repo.on_transaction_begin(reason)

      func.()
    end

    if timeout do
      repo.transaction(func, timeout: timeout)
    else
      repo.transaction(func)
    end
  end

  if Code.ensure_loaded?(Igniter) do
    def install(igniter, module, Ash.Resource, _path, argv) do
      table_name =
        module
        |> Module.split()
        |> List.last()
        |> Macro.underscore()
        |> Igniter.Inflex.pluralize()

      {options, _, _} = OptionParser.parse(argv, switches: [repo: :string])

      repo =
        case options[:repo] do
          nil ->
            Igniter.Project.Module.module_name(igniter, "Repo")

          repo ->
            Igniter.Project.Module.parse(repo)
        end

      igniter
      |> Spark.Igniter.set_option(module, [:postgres, :table], table_name)
      |> Spark.Igniter.set_option(module, [:postgres, :repo], repo)
    end

    def install(igniter, _, _, _), do: igniter
  end

  @impl true
  def rollback(resource, term) do
    AshPostgres.DataLayer.Info.repo(resource, :mutate).rollback(term)
  end

  defp table(resource, changeset) do
    changeset.context[:data_layer][:table] || AshPostgres.DataLayer.Info.table(resource)
  end

  defp raise_table_error!(resource, operation) do
    if AshPostgres.DataLayer.Info.polymorphic?(resource) do
      raise """
      Could not determine table for #{operation} on #{inspect(resource)}.

      Polymorphic resources require that the `data_layer[:table]` context is provided.
      See the guide on polymorphic resources for more information.
      """
    else
      raise """
      Could not determine table for #{operation} on #{inspect(resource)}.
      """
    end
  end

  defp resolve_source(resource, changeset) do
    if table = changeset.context[:data_layer][:table] do
      {table, resource}
    else
      resource
    end
  end
end