lib/data_layer.ex

defmodule AshSqlite.DataLayer do
  @index %Spark.Dsl.Entity{
    name: :index,
    describe: """
    Add an index to be managed by the migration generator.
    """,
    examples: [
      "index [\"column\", \"column2\"], unique: true, where: \"thing = TRUE\""
    ],
    target: AshSqlite.CustomIndex,
    schema: AshSqlite.CustomIndex.schema(),
    transform: {AshSqlite.CustomIndex, :transform, []},
    args: [:fields]
  }

  @custom_indexes %Spark.Dsl.Section{
    name: :custom_indexes,
    describe: """
    A section for configuring indexes to be created by the migration generator.

    In general, prefer to use `identities` for simple unique constraints. This is a tool to allow
    for declaring more complex indexes.
    """,
    examples: [
      """
      custom_indexes do
        index [:column1, :column2], unique: true, where: "thing = TRUE"
      end
      """
    ],
    entities: [
      @index
    ]
  }

  @statement %Spark.Dsl.Entity{
    name: :statement,
    describe: """
    Add a custom statement for migrations.
    """,
    examples: [
      """
      statement :pgweb_idx do
        up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));"
        down "DROP INDEX pgweb_idx;"
      end
      """
    ],
    target: AshSqlite.Statement,
    schema: AshSqlite.Statement.schema(),
    args: [:name]
  }

  @custom_statements %Spark.Dsl.Section{
    name: :custom_statements,
    describe: """
    A section for configuring custom statements to be added to migrations.

    Changing custom statements may require manual intervention, because Ash can't determine what order they should run
    in (i.e if they depend on table structure that you've added, or vice versa). As such, any `down` statements we run
    for custom statements happen first, and any `up` statements happen last.

    Additionally, when changing a custom statement, we must make some assumptions, i.e that we should migrate
    the old structure down using the previously configured `down` and recreate it.

    This may not be desired, and so what you may end up doing is simply modifying the old migration and deleting whatever was
    generated by the migration generator. As always: read your migrations after generating them!
    """,
    examples: [
      """
      custom_statements do
        # the name is used to detect if you remove or modify the statement
        statement :pgweb_idx do
          up "CREATE INDEX pgweb_idx ON pgweb USING GIN (to_tsvector('english', title || ' ' || body));"
          down "DROP INDEX pgweb_idx;"
        end
      end
      """
    ],
    entities: [
      @statement
    ]
  }

  @reference %Spark.Dsl.Entity{
    name: :reference,
    describe: """
    Configures the reference for a relationship in resource migrations.

    Keep in mind that multiple relationships can theoretically involve the same destination and foreign keys.
    In those cases, you only need to configure the `reference` behavior for one of them. Any conflicts will result
    in an error, across this resource and any other resources that share a table with this one. For this reason,
    instead of adding a reference configuration for `:nothing`, its best to just leave the configuration out, as that
    is the default behavior if *no* relationship anywhere has configured the behavior of that reference.
    """,
    examples: [
      "reference :post, on_delete: :delete, on_update: :update, name: \"comments_to_posts_fkey\""
    ],
    args: [:relationship],
    target: AshSqlite.Reference,
    schema: AshSqlite.Reference.schema()
  }

  @references %Spark.Dsl.Section{
    name: :references,
    describe: """
    A section for configuring the references (foreign keys) in resource migrations.

    This section is only relevant if you are using the migration generator with this resource.
    Otherwise, it has no effect.
    """,
    examples: [
      """
      references do
        reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey"
      end
      """
    ],
    entities: [@reference],
    schema: [
      polymorphic_on_delete: [
        type: {:one_of, [:delete, :nilify, :nothing, :restrict]},
        doc:
          "For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables."
      ],
      polymorphic_on_update: [
        type: {:one_of, [:update, :nilify, :nothing, :restrict]},
        doc:
          "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
      ],
      polymorphic_name: [
        type: {:one_of, [:update, :nilify, :nothing, :restrict]},
        doc:
          "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
      ]
    ]
  }

  @references %Spark.Dsl.Section{
    name: :references,
    describe: """
    A section for configuring the references (foreign keys) in resource migrations.

    This section is only relevant if you are using the migration generator with this resource.
    Otherwise, it has no effect.
    """,
    examples: [
      """
      references do
        reference :post, on_delete: :delete, on_update: :update, name: "comments_to_posts_fkey"
      end
      """
    ],
    entities: [@reference],
    schema: [
      polymorphic_on_delete: [
        type: {:one_of, [:delete, :nilify, :nothing, :restrict]},
        doc:
          "For polymorphic resources, configures the on_delete behavior of the automatically generated foreign keys to source tables."
      ],
      polymorphic_on_update: [
        type: {:one_of, [:update, :nilify, :nothing, :restrict]},
        doc:
          "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
      ],
      polymorphic_name: [
        type: {:one_of, [:update, :nilify, :nothing, :restrict]},
        doc:
          "For polymorphic resources, configures the on_update behavior of the automatically generated foreign keys to source tables."
      ]
    ]
  }

  @sqlite %Spark.Dsl.Section{
    name: :sqlite,
    describe: """
    Sqlite data layer configuration
    """,
    sections: [
      @custom_indexes,
      @custom_statements,
      @references
    ],
    modules: [
      :repo
    ],
    examples: [
      """
      sqlite do
        repo MyApp.Repo
        table "organizations"
      end
      """
    ],
    schema: [
      repo: [
        type: :atom,
        required: true,
        doc:
          "The repo that will be used to fetch your data. See the `AshSqlite.Repo` documentation for more"
      ],
      migrate?: [
        type: :boolean,
        default: true,
        doc:
          "Whether or not to include this resource in the generated migrations with `mix ash.generate_migrations`"
      ],
      migration_types: [
        type: :keyword_list,
        default: [],
        doc:
          "A keyword list of attribute names to the ecto migration type that should be used for that attribute. Only necessary if you need to override the defaults."
      ],
      migration_defaults: [
        type: :keyword_list,
        default: [],
        doc: """
        A keyword list of attribute names to the ecto migration default that should be used for that attribute. The string you use will be placed verbatim in the migration. Use fragments like `fragment(\\\\"now()\\\\")`, or for `nil`, use `\\\\"nil\\\\"`.
        """
      ],
      base_filter_sql: [
        type: :string,
        doc:
          "A raw sql version of the base_filter, e.g `representative = true`. Required if trying to create a unique constraint on a resource with a base_filter"
      ],
      skip_unique_indexes: [
        type: {:wrap_list, :atom},
        default: false,
        doc: "Skip generating unique indexes when generating migrations"
      ],
      unique_index_names: [
        type:
          {:list,
           {:or,
            [{:tuple, [{:list, :atom}, :string]}, {:tuple, [{:list, :atom}, :string, :string]}]}},
        default: [],
        doc: """
        A list of unique index names that could raise errors that are not configured in identities, or an mfa to a function that takes a changeset and returns the list. In the format `{[:affected, :keys], "name_of_constraint"}` or `{[:affected, :keys], "name_of_constraint", "custom error message"}`
        """
      ],
      exclusion_constraint_names: [
        type: :any,
        default: [],
        doc: """
        A list of exclusion constraint names that could raise errors. Must be in the format `{:affected_key, "name_of_constraint"}` or `{:affected_key, "name_of_constraint", "custom error message"}`
        """
      ],
      identity_index_names: [
        type: :any,
        default: [],
        doc: """
        A keyword list of identity names to the unique index name that they should use when being managed by the migration generator.
        """
      ],
      foreign_key_names: [
        type: {:list, {:or, [{:tuple, [:atom, :string]}, {:tuple, [:string, :string]}]}},
        default: [],
        doc: """
        A list of foreign keys that could raise errors, or an mfa to a function that takes a changeset and returns a list. In the format: `{:key, "name_of_constraint"}` or `{:key, "name_of_constraint", "custom error message"}`
        """
      ],
      migration_ignore_attributes: [
        type: {:list, :atom},
        default: [],
        doc: """
        A list of attributes that will be ignored when generating migrations.
        """
      ],
      table: [
        type: :string,
        doc: """
        The table to store and read the resource from. If this is changed, the migration generator will not remove the old table.
        """
      ],
      polymorphic?: [
        type: :boolean,
        default: false,
        doc: """
        Declares this resource as polymorphic. See the [polymorphic resources guide](/documentation/topics/polymorphic_resources.md) for more.
        """
      ]
    ]
  }

  alias Ash.Filter
  alias Ash.Query.{BooleanExpression, Not}

  @behaviour Ash.DataLayer

  @sections [@sqlite]

  @moduledoc """
  A sqlite data layer that leverages Ecto's sqlite capabilities.
  """

  use Spark.Dsl.Extension,
    sections: @sections,
    transformers: [
      AshSqlite.Transformers.ValidateReferences,
      AshSqlite.Transformers.VerifyRepo,
      AshSqlite.Transformers.EnsureTableOrPolymorphic
    ]

  def migrate(args) do
    # TODO: take args that we care about
    Mix.Task.run("ash_sqlite.migrate", args)
  end

  def codegen(args) do
    # TODO: take args that we care about
    Mix.Task.run("ash_sqlite.generate_migrations", args)
  end

  def setup(args) do
    # TODO: take args that we care about
    Mix.Task.run("ash_sqlite.create", args)
    Mix.Task.run("ash_sqlite.migrate", args)
  end

  def tear_down(args) do
    # TODO: take args that we care about
    Mix.Task.run("ash_sqlite.drop", args)
  end

  import Ecto.Query, only: [from: 2, subquery: 1]

  @impl true
  def can?(_, :async_engine), do: false
  def can?(_, :bulk_create), do: true
  def can?(_, {:lock, _}), do: false

  def can?(_, :transact), do: false
  def can?(_, :composite_primary_key), do: true
  def can?(_, {:atomic, :update}), do: true
  def can?(_, {:atomic, :upsert}), do: true
  def can?(_, :upsert), do: true
  def can?(_, :changeset_filter), do: true

  def can?(resource, {:join, other_resource}) do
    data_layer = Ash.DataLayer.data_layer(resource)
    other_data_layer = Ash.DataLayer.data_layer(other_resource)

    data_layer == other_data_layer and
      AshSqlite.DataLayer.Info.repo(resource) == AshSqlite.DataLayer.Info.repo(other_resource)
  end

  def can?(_resource, {:lateral_join, _}) do
    false
  end

  def can?(_, :boolean_filter), do: true

  def can?(_, {:aggregate, _type}), do: false

  def can?(_, :aggregate_filter), do: false
  def can?(_, :aggregate_sort), do: false
  def can?(_, :expression_calculation), do: true
  def can?(_, :expression_calculation_sort), do: true
  def can?(_, :create), do: true
  def can?(_, :select), do: true
  def can?(_, :read), do: true

  def can?(resource, action) when action in ~w[update destroy]a do
    resource
    |> Ash.Resource.Info.primary_key()
    |> Enum.any?()
  end

  def can?(_, :filter), do: true
  def can?(_, :limit), do: true
  def can?(_, :offset), do: true
  def can?(_, :multitenancy), do: false

  def can?(_, {:filter_relationship, %{manual: {module, _}}}) do
    Spark.implements_behaviour?(module, AshSqlite.ManualRelationship)
  end

  def can?(_, {:filter_relationship, _}), do: true

  def can?(_, {:aggregate_relationship, _}), do: false

  def can?(_, :timeout), do: true
  def can?(_, {:filter_expr, _}), do: true
  def can?(_, :nested_expressions), do: true
  def can?(_, {:query_aggregate, _}), do: true
  def can?(_, :sort), do: true
  def can?(_, :distinct_sort), do: false
  def can?(_, :distinct), do: false
  def can?(_, {:sort, _}), do: true
  def can?(_, _), do: false

  @impl true
  def limit(query, nil, _), do: {:ok, query}

  def limit(query, limit, _resource) do
    {:ok, from(row in query, limit: ^limit)}
  end

  @impl true
  def source(resource) do
    AshSqlite.DataLayer.Info.table(resource) || ""
  end

  @impl true
  def set_context(resource, data_layer_query, context) do
    start_bindings = context[:data_layer][:start_bindings_at] || 0
    data_layer_query = from(row in data_layer_query, as: ^start_bindings)

    data_layer_query =
      if context[:data_layer][:table] do
        %{
          data_layer_query
          | from: %{data_layer_query.from | source: {context[:data_layer][:table], resource}}
        }
      else
        data_layer_query
      end

    {:ok, default_bindings(data_layer_query, resource, context)}
  end

  @impl true
  def offset(query, nil, _), do: query

  def offset(%{offset: old_offset} = query, 0, _resource) when old_offset in [0, nil] do
    {:ok, query}
  end

  def offset(query, offset, _resource) do
    {:ok, from(row in query, offset: ^offset)}
  end

  @impl true
  def run_aggregate_query(query, aggregates, resource) do
    {exists, aggregates} = Enum.split_with(aggregates, &(&1.kind == :exists))
    query = default_bindings(query, resource)

    query =
      if query.distinct || query.limit do
        query =
          query
          |> Ecto.Query.exclude(:select)
          |> Ecto.Query.exclude(:order_by)
          |> Map.put(:windows, [])

        from(row in subquery(query), as: ^0, select: %{})
      else
        query
        |> Ecto.Query.exclude(:select)
        |> Ecto.Query.exclude(:order_by)
        |> Map.put(:windows, [])
        |> Ecto.Query.select(%{})
      end

    query_before_select = query

    query =
      Enum.reduce(
        aggregates,
        query,
        fn agg, query ->
          AshSqlite.Aggregate.add_subquery_aggregate_select(
            query,
            agg.relationship_path |> Enum.drop(1),
            agg,
            resource,
            true
          )
        end
      )

    result =
      case aggregates do
        [] ->
          %{}

        _ ->
          dynamic_repo(resource, query).one(query, repo_opts(nil, nil, resource))
      end

    {:ok, add_exists_aggs(result, resource, query_before_select, exists)}
  end

  defp add_exists_aggs(result, resource, query, exists) do
    repo = dynamic_repo(resource, query)
    repo_opts = repo_opts(nil, nil, resource)

    Enum.reduce(exists, result, fn agg, result ->
      {:ok, filtered} =
        case agg do
          %{query: %{filter: filter}} when not is_nil(filter) ->
            filter(query, filter, resource)

          _ ->
            {:ok, query}
        end

      Map.put(
        result || %{},
        agg.name,
        repo.exists?(filtered, repo_opts)
      )
    end)
  end

  @impl true
  def run_query(query, resource) do
    query = default_bindings(query, resource)

    with_sort_applied =
      if query.__ash_bindings__[:sort_applied?] do
        {:ok, query}
      else
        apply_sort(query, query.__ash_bindings__[:sort], resource)
      end

    case with_sort_applied do
      {:error, error} ->
        {:error, error}

      {:ok, query} ->
        query =
          if query.__ash_bindings__[:__order__?] && query.windows[:order] do
            order_by = %{query.windows[:order] | expr: query.windows[:order].expr[:order_by]}

            %{
              query
              | windows: Keyword.delete(query.windows, :order),
                order_bys: [order_by]
            }
          else
            %{query | windows: Keyword.delete(query.windows, :order)}
          end

        if AshSqlite.DataLayer.Info.polymorphic?(resource) && no_table?(query) do
          raise_table_error!(resource, :read)
        else
          primary_key = Ash.Resource.Info.primary_key(resource)

          {:ok,
           dynamic_repo(resource, query).all(query, repo_opts(nil, nil, resource))
           |> Enum.uniq_by(&Map.take(&1, primary_key))}
        end
    end
  rescue
    e ->
      handle_raised_error(e, __STACKTRACE__, query, resource)
  end

  defp no_table?(%{from: %{source: {"", _}}}), do: true
  defp no_table?(_), do: false

  defp repo_opts(timeout, nil, _resource) do
    []
    |> add_timeout(timeout)
  end

  defp repo_opts(timeout, _resource) do
    add_timeout([], timeout)
  end

  defp add_timeout(opts, timeout) when not is_nil(timeout) do
    Keyword.put(opts, :timeout, timeout)
  end

  defp add_timeout(opts, _), do: opts

  @impl true
  def functions(_resource) do
    [
      AshSqlite.Functions.Fragment,
      AshSqlite.Functions.Like,
      AshSqlite.Functions.ILike
    ]
  end

  @impl true
  def resource_to_query(resource, _) do
    from(row in {AshSqlite.DataLayer.Info.table(resource) || "", resource}, [])
  end

  @impl true
  def bulk_create(resource, stream, options) do
    # Cell-wise default values are not supported on INSERT statements by SQLite
    # This requires that we group changesets by what attributes are changing
    # And *omit* any defaults instead of using something like `(1, 2, DEFAULT)`
    # like we could with postgres
    stream
    |> Enum.group_by(&Map.keys(&1.attributes))
    |> Enum.reduce_while({:ok, []}, fn {_, changesets}, {:ok, acc} ->
      opts = repo_opts(nil, options[:tenant], resource)

      opts =
        if options.return_records? do
          Keyword.put(opts, :returning, true)
        else
          opts
        end

      opts =
        if options[:upsert?] do
          # Ash groups changesets by atomics before dispatching them to the data layer
          # this means that all changesets have the same atomics
          %{atomics: atomics, filters: filters} = Enum.at(changesets, 0)

          query = from(row in resource, as: ^0)

          query =
            query
            |> default_bindings(resource)

          upsert_set =
            upsert_set(resource, changesets, options)

          on_conflict =
            case query_with_atomics(
                   resource,
                   query,
                   filters,
                   atomics,
                   %{},
                   upsert_set
                 ) do
              :empty ->
                :nothing

              {:ok, query} ->
                query

              {:error, error} ->
                raise Ash.Error.to_ash_error(error)
            end

          opts
          |> Keyword.put(:on_conflict, on_conflict)
          |> Keyword.put(
            :conflict_target,
            conflict_target(
              resource,
              options[:upsert_keys] || Ash.Resource.Info.primary_key(resource)
            )
          )
        else
          opts
        end

      ecto_changesets = changesets |> Enum.map(& &1.attributes)

      source =
        if table = Enum.at(changesets, 0).context[:data_layer][:table] do
          {table, resource}
        else
          resource
        end

      repo = dynamic_repo(resource, Enum.at(changesets, 0))

      source
      |> repo.insert_all(ecto_changesets, opts)
      |> case do
        {_, nil} ->
          {:cont, {:ok, acc}}

        {_, results} ->
          if options[:single?] do
            {:cont, {:ok, acc ++ results}}
          else
            {:cont,
             {:ok,
              acc ++
                Enum.zip_with(results, changesets, fn result, changeset ->
                  Ash.Resource.put_metadata(
                    result,
                    :bulk_create_index,
                    changeset.context.bulk_create.index
                  )
                end)}}
          end
      end
    end)
    |> case do
      {:ok, records} ->
        if options[:return_records?] do
          {:ok, records}
        else
          :ok
        end

      {:error, error} ->
        {:error, error}
    end
  rescue
    e ->
      changeset = Ash.Changeset.new(resource)

      handle_raised_error(
        e,
        __STACKTRACE__,
        {:bulk_create, ecto_changeset(changeset.data, changeset, :create, false)},
        resource
      )
  end

  defp upsert_set(resource, changesets, options) do
    attributes_changing_anywhere =
      changesets |> Enum.flat_map(&Map.keys(&1.attributes)) |> Enum.uniq()

    update_defaults = update_defaults(resource)
    # We can't reference EXCLUDED if at least one of the changesets in the stream is not
    # changing the value (and we wouldn't want to even if we could as it would be unnecessary)

    upsert_fields =
      (options[:upsert_fields] || []) |> Enum.filter(&(&1 in attributes_changing_anywhere))

    fields_to_upsert =
      (upsert_fields ++ Keyword.keys(update_defaults)) --
        Keyword.keys(Enum.at(changesets, 0).atomics)

    Enum.map(fields_to_upsert, fn upsert_field ->
      # for safety, we check once more at the end that all values in
      # upsert_fields are names of attributes. This is because
      # below we use `literal/1` to bring them into the query
      if is_nil(resource.__schema__(:type, upsert_field)) do
        raise "Only attribute names can be used in upsert_fields"
      end

      case Keyword.fetch(update_defaults, upsert_field) do
        {:ok, default} ->
          if upsert_field in upsert_fields do
            {upsert_field,
             Ecto.Query.dynamic(
               [],
               fragment(
                 "COALESCE(EXCLUDED.?, ?)",
                 literal(^to_string(upsert_field)),
                 ^default
               )
             )}
          else
            {upsert_field, default}
          end

        :error ->
          {upsert_field,
           Ecto.Query.dynamic(
             [],
             fragment("EXCLUDED.?", literal(^to_string(upsert_field)))
           )}
      end
    end)
  end

  @impl true
  def create(resource, changeset) do
    changeset = %{
      changeset
      | data:
          Map.update!(
            changeset.data,
            :__meta__,
            &Map.put(&1, :source, table(resource, changeset))
          )
    }

    case bulk_create(resource, [changeset], %{
           single?: true,
           tenant: changeset.tenant,
           return_records?: true
         }) do
      {:ok, [result]} ->
        {:ok, result}

      {:error, error} ->
        {:error, error}
    end
  end

  defp handle_errors({:error, %Ecto.Changeset{errors: errors}}) do
    {:error, Enum.map(errors, &to_ash_error/1)}
  end

  defp to_ash_error({field, {message, vars}}) do
    Ash.Error.Changes.InvalidAttribute.exception(
      field: field,
      message: message,
      private_vars: vars
    )
  end

  defp ecto_changeset(record, changeset, type, table_error? \\ true) do
    filters =
      if changeset.action_type == :create do
        %{}
      else
        Map.get(changeset, :filters, %{})
      end

    filters =
      if changeset.action_type == :create do
        filters
      else
        changeset.resource
        |> Ash.Resource.Info.primary_key()
        |> Enum.reduce(filters, fn key, filters ->
          Map.put(filters, key, Map.get(record, key))
        end)
      end

    attributes =
      changeset.resource
      |> Ash.Resource.Info.attributes()
      |> Enum.map(& &1.name)

    attributes_to_change =
      Enum.reject(attributes, fn attribute ->
        Keyword.has_key?(changeset.atomics, attribute)
      end)

    ecto_changeset =
      record
      |> to_ecto()
      |> set_table(changeset, type, table_error?)
      |> Ecto.Changeset.change(Map.take(changeset.attributes, attributes_to_change))
      |> Map.update!(:filters, &Map.merge(&1, filters))
      |> add_configured_foreign_key_constraints(record.__struct__)
      |> add_unique_indexes(record.__struct__, changeset)
      |> add_exclusion_constraints(record.__struct__)

    case type do
      :create ->
        ecto_changeset
        |> add_my_foreign_key_constraints(record.__struct__)

      type when type in [:upsert, :update] ->
        ecto_changeset
        |> add_my_foreign_key_constraints(record.__struct__)
        |> add_related_foreign_key_constraints(record.__struct__)

      :delete ->
        ecto_changeset
        |> add_related_foreign_key_constraints(record.__struct__)
    end
  end

  defp handle_raised_error(
         %Ecto.StaleEntryError{changeset: %{data: %resource{}, filters: filters}},
         stacktrace,
         context,
         resource
       ) do
    handle_raised_error(
      Ash.Error.Changes.StaleRecord.exception(resource: resource, filters: filters),
      stacktrace,
      context,
      resource
    )
  end

  defp handle_raised_error(%Ecto.Query.CastError{} = e, stacktrace, context, resource) do
    handle_raised_error(
      Ash.Error.Query.InvalidFilterValue.exception(value: e.value, context: context),
      stacktrace,
      context,
      resource
    )
  end

  defp handle_raised_error(
         %Exqlite.Error{
           message: "FOREIGN KEY constraint failed"
         },
         stacktrace,
         context,
         resource
       ) do
    handle_raised_error(
      Ash.Error.Changes.InvalidChanges.exception(
        fields: Ash.Resource.Info.primary_key(resource),
        message: "referenced something that does not exist"
      ),
      stacktrace,
      context,
      resource
    )
  end

  defp handle_raised_error(
         %Exqlite.Error{
           message: "UNIQUE constraint failed: " <> fields
         },
         _stacktrace,
         _context,
         resource
       ) do
    names =
      fields
      |> String.split(", ")
      |> Enum.map(fn field ->
        field |> String.split(".", trim: true) |> Enum.drop(1) |> Enum.at(0)
      end)
      |> Enum.map(fn field ->
        Ash.Resource.Info.attribute(resource, field)
      end)
      |> Enum.reject(&is_nil/1)
      |> Enum.map(fn %{name: name} ->
        name
      end)

    message = find_constraint_message(resource, names)

    {:error,
     names
     |> Enum.map(fn name ->
       Ash.Error.Changes.InvalidAttribute.exception(
         field: name,
         message: message
       )
     end)}
  end

  defp handle_raised_error(error, stacktrace, _ecto_changeset, _resource) do
    {:error, Ash.Error.to_ash_error(error, stacktrace)}
  end

  defp find_constraint_message(resource, names) do
    find_custom_index_message(resource, names) || find_identity_message(resource, names) ||
      "has already been taken"
  end

  defp find_custom_index_message(resource, names) do
    resource
    |> AshSqlite.DataLayer.Info.custom_indexes()
    |> Enum.find(fn %{fields: fields} ->
      fields |> Enum.map(&to_string/1) |> Enum.sort() ==
        names |> Enum.map(&to_string/1) |> Enum.sort()
    end)
    |> case do
      %{message: message} when is_binary(message) -> message
      _ -> nil
    end
  end

  defp find_identity_message(resource, names) do
    resource
    |> Ash.Resource.Info.identities()
    |> Enum.find(fn %{keys: fields} ->
      fields |> Enum.map(&to_string/1) |> Enum.sort() ==
        names |> Enum.map(&to_string/1) |> Enum.sort()
    end)
    |> case do
      %{message: message} when is_binary(message) ->
        message

      _ ->
        nil
    end
  end

  defp set_table(record, changeset, operation, table_error?) do
    if AshSqlite.DataLayer.Info.polymorphic?(record.__struct__) do
      table =
        changeset.context[:data_layer][:table] ||
          AshSqlite.DataLayer.Info.table(record.__struct__)

      if table do
        Ecto.put_meta(record, source: table)
      else
        if table_error? do
          raise_table_error!(changeset.resource, operation)
        else
          record
        end
      end
    else
      record
    end
  end

  def from_ecto({:ok, result}), do: {:ok, from_ecto(result)}
  def from_ecto({:error, _} = other), do: other

  def from_ecto(nil), do: nil

  def from_ecto(value) when is_list(value) do
    Enum.map(value, &from_ecto/1)
  end

  def from_ecto(%resource{} = record) do
    if Spark.Dsl.is?(resource, Ash.Resource) do
      empty = struct(resource)

      resource
      |> Ash.Resource.Info.relationships()
      |> Enum.reduce(record, fn relationship, record ->
        case Map.get(record, relationship.name) do
          %Ecto.Association.NotLoaded{} ->
            Map.put(record, relationship.name, Map.get(empty, relationship.name))

          value ->
            Map.put(record, relationship.name, from_ecto(value))
        end
      end)
    else
      record
    end
  end

  def from_ecto(other), do: other

  def to_ecto(nil), do: nil

  def to_ecto(value) when is_list(value) do
    Enum.map(value, &to_ecto/1)
  end

  def to_ecto(%resource{} = record) do
    if Spark.Dsl.is?(resource, Ash.Resource) do
      resource
      |> Ash.Resource.Info.relationships()
      |> Enum.reduce(record, fn relationship, record ->
        value =
          case Map.get(record, relationship.name) do
            %Ash.NotLoaded{} ->
              %Ecto.Association.NotLoaded{
                __field__: relationship.name,
                __cardinality__: relationship.cardinality
              }

            value ->
              to_ecto(value)
          end

        Map.put(record, relationship.name, value)
      end)
    else
      record
    end
  end

  def to_ecto(other), do: other

  defp add_exclusion_constraints(changeset, resource) do
    resource
    |> AshSqlite.DataLayer.Info.exclusion_constraint_names()
    |> Enum.reduce(changeset, fn constraint, changeset ->
      case constraint do
        {key, name} ->
          Ecto.Changeset.exclusion_constraint(changeset, key, name: name)

        {key, name, message} ->
          Ecto.Changeset.exclusion_constraint(changeset, key, name: name, message: message)
      end
    end)
  end

  defp add_related_foreign_key_constraints(changeset, resource) do
    # TODO: this doesn't guarantee us to get all of them, because if something is related to this
    # schema and there is no back-relation, then this won't catch it's foreign key constraints
    resource
    |> Ash.Resource.Info.relationships()
    |> Enum.map(& &1.destination)
    |> Enum.uniq()
    |> Enum.flat_map(fn related ->
      related
      |> Ash.Resource.Info.relationships()
      |> Enum.filter(&(&1.destination == resource))
      |> Enum.map(&Map.take(&1, [:source, :source_attribute, :destination_attribute, :name]))
    end)
    |> Enum.reduce(changeset, fn %{
                                   source: source,
                                   source_attribute: source_attribute,
                                   destination_attribute: destination_attribute,
                                   name: relationship_name
                                 },
                                 changeset ->
      case AshSqlite.DataLayer.Info.reference(resource, relationship_name) do
        %{name: name} when not is_nil(name) ->
          Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute,
            name: name,
            message: "would leave records behind"
          )

        _ ->
          Ecto.Changeset.foreign_key_constraint(changeset, destination_attribute,
            name: "#{AshSqlite.DataLayer.Info.table(source)}_#{source_attribute}_fkey",
            message: "would leave records behind"
          )
      end
    end)
  end

  defp add_my_foreign_key_constraints(changeset, resource) do
    resource
    |> Ash.Resource.Info.relationships()
    |> Enum.reduce(changeset, &Ecto.Changeset.foreign_key_constraint(&2, &1.source_attribute))
  end

  defp add_configured_foreign_key_constraints(changeset, resource) do
    resource
    |> AshSqlite.DataLayer.Info.foreign_key_names()
    |> case do
      {m, f, a} -> List.wrap(apply(m, f, [changeset | a]))
      value -> List.wrap(value)
    end
    |> Enum.reduce(changeset, fn
      {key, name}, changeset ->
        Ecto.Changeset.foreign_key_constraint(changeset, key, name: name)

      {key, name, message}, changeset ->
        Ecto.Changeset.foreign_key_constraint(changeset, key, name: name, message: message)
    end)
  end

  defp add_unique_indexes(changeset, resource, ash_changeset) do
    changeset =
      resource
      |> Ash.Resource.Info.identities()
      |> Enum.reduce(changeset, fn identity, changeset ->
        name =
          AshSqlite.DataLayer.Info.identity_index_names(resource)[identity.name] ||
            "#{table(resource, ash_changeset)}_#{identity.name}_index"

        opts =
          if Map.get(identity, :message) do
            [name: name, message: identity.message]
          else
            [name: name]
          end

        Ecto.Changeset.unique_constraint(changeset, identity.keys, opts)
      end)

    changeset =
      resource
      |> AshSqlite.DataLayer.Info.custom_indexes()
      |> Enum.reduce(changeset, fn index, changeset ->
        opts =
          if index.message do
            [name: index.name, message: index.message]
          else
            [name: index.name]
          end

        Ecto.Changeset.unique_constraint(changeset, index.fields, opts)
      end)

    names =
      resource
      |> AshSqlite.DataLayer.Info.unique_index_names()
      |> case do
        {m, f, a} -> List.wrap(apply(m, f, [changeset | a]))
        value -> List.wrap(value)
      end

    names =
      case Ash.Resource.Info.primary_key(resource) do
        [] ->
          names

        fields ->
          if table = table(resource, ash_changeset) do
            [{fields, table <> "_pkey"} | names]
          else
            []
          end
      end

    Enum.reduce(names, changeset, fn
      {keys, name}, changeset ->
        Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name)

      {keys, name, message}, changeset ->
        Ecto.Changeset.unique_constraint(changeset, List.wrap(keys), name: name, message: message)
    end)
  end

  @impl true
  def upsert(resource, changeset, keys \\ nil) do
    keys = keys || Ash.Resource.Info.primary_key(keys)

    explicitly_changing_attributes =
      Map.keys(changeset.attributes) -- Map.get(changeset, :defaults, []) -- keys

    upsert_fields =
      changeset.context[:private][:upsert_fields] || explicitly_changing_attributes

    case bulk_create(resource, [changeset], %{
           single?: true,
           upsert?: true,
           tenant: changeset.tenant,
           upsert_keys: keys,
           upsert_fields: upsert_fields,
           return_records?: true
         }) do
      {:ok, [result]} ->
        {:ok, result}

      {:error, error} ->
        {:error, error}
    end
  end

  defp conflict_target(resource, keys) do
    if Ash.Resource.Info.base_filter(resource) do
      base_filter_sql =
        AshSqlite.DataLayer.Info.base_filter_sql(resource) ||
          raise """
          Cannot use upserts with resources that have a base_filter without also adding `base_filter_sql` in the sqlite section.
          """

      sources =
        Enum.map(keys, fn key ->
          ~s("#{Ash.Resource.Info.attribute(resource, key).source || key}")
        end)

      {:unsafe_fragment, "(" <> Enum.join(sources, ", ") <> ") WHERE (#{base_filter_sql})"}
    else
      keys
    end
  end

  defp update_defaults(resource) do
    attributes =
      resource
      |> Ash.Resource.Info.attributes()
      |> Enum.reject(&is_nil(&1.update_default))

    attributes
    |> static_defaults()
    |> Enum.concat(lazy_matching_defaults(attributes))
    |> Enum.concat(lazy_non_matching_defaults(attributes))
  end

  defp static_defaults(attributes) do
    attributes
    |> Enum.reject(&get_default_fun(&1))
    |> Enum.map(&{&1.name, &1.update_default})
  end

  defp lazy_non_matching_defaults(attributes) do
    attributes
    |> Enum.filter(&(!&1.match_other_defaults? && get_default_fun(&1)))
    |> Enum.map(fn attribute ->
      default_value =
        case attribute.update_default do
          function when is_function(function) ->
            function.()

          {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
            apply(m, f, a)
        end

      {attribute.name, default_value}
    end)
  end

  defp lazy_matching_defaults(attributes) do
    attributes
    |> Enum.filter(&(&1.match_other_defaults? && get_default_fun(&1)))
    |> Enum.group_by(& &1.update_default)
    |> Enum.flat_map(fn {default_fun, attributes} ->
      default_value =
        case default_fun do
          function when is_function(function) ->
            function.()

          {m, f, a} when is_atom(m) and is_atom(f) and is_list(a) ->
            apply(m, f, a)
        end

      Enum.map(attributes, &{&1.name, default_value})
    end)
  end

  defp get_default_fun(attribute) do
    if is_function(attribute.update_default) or match?({_, _, _}, attribute.update_default) do
      attribute.update_default
    end
  end

  @impl true
  def update(resource, changeset) do
    ecto_changeset =
      changeset.data
      |> Map.update!(:__meta__, &Map.put(&1, :source, table(resource, changeset)))
      |> ecto_changeset(changeset, :update)

    try do
      query = from(row in resource, as: ^0)

      select = Keyword.keys(changeset.atomics) ++ Ash.Resource.Info.primary_key(resource)

      query =
        query
        |> default_bindings(resource, changeset.context)
        |> Ecto.Query.select(^select)

      case query_with_atomics(
             resource,
             query,
             ecto_changeset.filters,
             changeset.atomics,
             ecto_changeset.changes,
             []
           ) do
        :empty ->
          {:ok, changeset.data}

        {:ok, query} ->
          repo_opts = repo_opts(changeset.timeout, changeset.tenant, changeset.resource)

          repo_opts =
            Keyword.put(repo_opts, :returning, Keyword.keys(changeset.atomics))

          result =
            dynamic_repo(resource, changeset).update_all(
              query,
              [],
              repo_opts
            )

          case result do
            {0, []} ->
              {:error,
               Ash.Error.Changes.StaleRecord.exception(
                 resource: resource,
                 filters: ecto_changeset.filters
               )}

            {1, [result]} ->
              record =
                changeset.data
                |> Map.merge(changeset.attributes)
                |> Map.merge(Map.take(result, Keyword.keys(changeset.atomics)))

              {:ok, record}
          end

        {:error, error} ->
          {:error, error}
      end
    rescue
      e ->
        handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
    end
  end

  defp query_with_atomics(
         resource,
         query,
         filters,
         atomics,
         updating_one_changes,
         existing_set
       ) do
    query =
      Enum.reduce(filters, query, fn {key, value}, query ->
        from(row in query,
          where: field(row, ^key) == ^value
        )
      end)

    atomics_result =
      Enum.reduce_while(atomics, {:ok, query, []}, fn {field, expr}, {:ok, query, set} ->
        with {:ok, query} <-
               AshSqlite.Join.join_all_relationships(
                 query,
                 %Ash.Filter{
                   resource: resource,
                   expression: expr
                 },
                 left_only?: true
               ),
             dynamic <-
               AshSqlite.Expr.dynamic_expr(query, expr, query.__ash_bindings__) do
          {:cont, {:ok, query, Keyword.put(set, field, dynamic)}}
        else
          other ->
            {:halt, other}
        end
      end)

    case atomics_result do
      {:ok, query, dynamics} ->
        {params, set, count} =
          updating_one_changes
          |> Map.to_list()
          |> Enum.reduce({[], [], 0}, fn {key, value}, {params, set, count} ->
            {[{value, {0, key}} | params], [{key, {:^, [], [count]}} | set], count + 1}
          end)

        {params, set, _} =
          Enum.reduce(
            dynamics ++ existing_set,
            {params, set, count},
            fn {key, value}, {params, set, count} ->
              case AshSqlite.Expr.dynamic_expr(query, value, query.__ash_bindings__) do
                %Ecto.Query.DynamicExpr{} = dynamic ->
                  result =
                    Ecto.Query.Builder.Dynamic.partially_expand(
                      :select,
                      query,
                      dynamic,
                      params,
                      count
                    )

                  expr = elem(result, 0)
                  new_params = elem(result, 1)

                  new_count =
                    result |> Tuple.to_list() |> List.last()

                  {new_params, [{key, expr} | set], new_count}

                other ->
                  {[{other, {0, key}} | params], [{key, {:^, [], [count]}} | set], count + 1}
              end
            end
          )

        case set do
          [] ->
            :empty

          set ->
            {:ok,
             Map.put(query, :updates, [
               %Ecto.Query.QueryExpr{
                 # why do I have to reverse the `set`???
                 # it breaks if I don't
                 expr: [set: Enum.reverse(set)],
                 params: Enum.reverse(params)
               }
             ])}
        end

      {:error, error} ->
        {:error, error}
    end
  end

  @impl true
  def destroy(resource, %{data: record} = changeset) do
    ecto_changeset = ecto_changeset(record, changeset, :delete)

    try do
      ecto_changeset
      |> dynamic_repo(resource, changeset).delete(
        repo_opts(changeset.timeout, changeset.resource)
      )
      |> from_ecto()
      |> case do
        {:ok, _record} ->
          :ok

        {:error, error} ->
          handle_errors({:error, error})
      end
    rescue
      e ->
        handle_raised_error(e, __STACKTRACE__, ecto_changeset, resource)
    end
  end

  @impl true
  def sort(query, sort, _resource) do
    {:ok, Map.update!(query, :__ash_bindings__, &Map.put(&1, :sort, sort))}
  end

  @impl true
  def select(query, select, resource) do
    query = default_bindings(query, resource)

    {:ok,
     from(row in query,
       select: struct(row, ^Enum.uniq(select))
     )}
  end

  defp apply_sort(query, sort, _resource) when sort in [nil, []] do
    {:ok, query |> set_sort_applied()}
  end

  defp apply_sort(query, sort, resource) do
    query
    |> AshSqlite.Sort.sort(sort, resource, [], 0)
    |> case do
      {:ok, query} ->
        {:ok, query |> set_sort_applied()}

      {:error, error} ->
        {:error, error}
    end
  end

  @doc false
  def unwrap_one([thing]), do: thing
  def unwrap_one([]), do: nil
  def unwrap_one(other), do: other

  defp set_sort_applied(query) do
    Map.update!(query, :__ash_bindings__, &Map.put(&1, :sort_applied?, true))
  end

  @impl true
  def filter(query, filter, resource, opts \\ []) do
    query = default_bindings(query, resource)

    query
    |> AshSqlite.Join.join_all_relationships(filter, opts)
    |> case do
      {:ok, query} ->
        {:ok, add_filter_expression(query, filter)}

      {:error, error} ->
        {:error, error}
    end
  end

  @doc false
  def default_bindings(query, resource, context \\ %{}) do
    start_bindings = context[:data_layer][:start_bindings_at] || 0

    Map.put_new(query, :__ash_bindings__, %{
      resource: resource,
      current: Enum.count(query.joins) + 1 + start_bindings,
      in_group?: false,
      calculations: %{},
      parent_resources: [],
      context: context,
      bindings: %{start_bindings => %{path: [], type: :root, source: resource}}
    })
  end

  @impl true
  def add_calculations(query, calculations, resource) do
    AshSqlite.Calculation.add_calculations(query, calculations, resource, 0)
  end

  @doc false
  def get_binding(resource, path, query, type, name_match \\ nil)

  def get_binding(resource, path, %{__ash_bindings__: _} = query, type, name_match) do
    types = List.wrap(type)

    Enum.find_value(query.__ash_bindings__.bindings, fn
      {binding, %{path: candidate_path, type: binding_type} = data} ->
        if binding_type in types do
          if name_match do
            if data[:name] == name_match do
              if Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) do
                binding
              end
            end
          else
            if Ash.SatSolver.synonymous_relationship_paths?(resource, candidate_path, path) do
              binding
            else
              false
            end
          end
        end

      _ ->
        nil
    end)
  end

  def get_binding(_, _, _, _, _), do: nil

  defp add_filter_expression(query, filter) do
    filter
    |> split_and_statements()
    |> Enum.reduce(query, fn filter, query ->
      dynamic = AshSqlite.Expr.dynamic_expr(query, filter, query.__ash_bindings__)

      Ecto.Query.where(query, ^dynamic)
    end)
  end

  defp split_and_statements(%Filter{expression: expression}) do
    split_and_statements(expression)
  end

  defp split_and_statements(%BooleanExpression{op: :and, left: left, right: right}) do
    split_and_statements(left) ++ split_and_statements(right)
  end

  defp split_and_statements(%Not{expression: %Not{expression: expression}}) do
    split_and_statements(expression)
  end

  defp split_and_statements(%Not{
         expression: %BooleanExpression{op: :or, left: left, right: right}
       }) do
    split_and_statements(%BooleanExpression{
      op: :and,
      left: %Not{expression: left},
      right: %Not{expression: right}
    })
  end

  defp split_and_statements(other), do: [other]

  @doc false
  def add_binding(query, data, additional_bindings \\ 0) do
    current = query.__ash_bindings__.current
    bindings = query.__ash_bindings__.bindings

    new_ash_bindings = %{
      query.__ash_bindings__
      | bindings: Map.put(bindings, current, data),
        current: current + 1 + additional_bindings
    }

    %{query | __ash_bindings__: new_ash_bindings}
  end

  def add_known_binding(query, data, known_binding) do
    bindings = query.__ash_bindings__.bindings

    new_ash_bindings = %{
      query.__ash_bindings__
      | bindings: Map.put(bindings, known_binding, data)
    }

    %{query | __ash_bindings__: new_ash_bindings}
  end

  @impl true
  def rollback(resource, term) do
    AshSqlite.DataLayer.Info.repo(resource).rollback(term)
  end

  defp table(resource, changeset) do
    changeset.context[:data_layer][:table] || AshSqlite.DataLayer.Info.table(resource)
  end

  defp raise_table_error!(resource, operation) do
    if AshSqlite.DataLayer.Info.polymorphic?(resource) do
      raise """
      Could not determine table for #{operation} on #{inspect(resource)}.

      Polymorphic resources require that the `data_layer[:table]` context is provided.
      See the guide on polymorphic resources for more information.
      """
    else
      raise """
      Could not determine table for #{operation} on #{inspect(resource)}.
      """
    end
  end

  defp dynamic_repo(resource, %{__ash_bindings__: %{context: %{data_layer: %{repo: repo}}}}) do
    repo || AshSqlite.DataLayer.Info.repo(resource)
  end

  defp dynamic_repo(resource, %{context: %{data_layer: %{repo: repo}}}) do
    repo || AshSqlite.DataLayer.Info.repo(resource)
  end

  defp dynamic_repo(resource, _) do
    AshSqlite.DataLayer.Info.repo(resource)
  end
end