lib/ash/data_layer/ets/ets.ex

defmodule Ash.DataLayer.Ets do
  @behaviour Ash.DataLayer
  require Ash.Query
  import Ash.Expr
  require Logger

  @ets %Spark.Dsl.Section{
    name: :ets,
    describe: """
    A section for configuring the ets data layer
    """,
    examples: [
      """
      ets do
        # Used in testing
        private? true
      end
      """
    ],
    schema: [
      private?: [
        type: :boolean,
        default: false,
        doc:
          "Sets the ets table protection to private, and scopes it to only this process. The table name will not be used directly if this is true, to allow multiple processes to use this resource separately."
      ],
      table: [
        type: :atom,
        doc: """
        The name of the table. Defaults to the resource name.
        """
      ]
    ]
  }

  @moduledoc """
  An ETS (Erlang Term Storage) backed Ash Datalayer, for testing and lightweight usage.

  Remember, this does not have support for transactions! This is not recommended for production
  use, especially in multi-user applications. It can, however, be great for prototyping.
  """

  use Spark.Dsl.Extension,
    sections: [@ets],
    verifiers: [Ash.DataLayer.Verifiers.RequirePreCheckWith]

  alias Ash.Actions.Sort

  defmodule Query do
    @moduledoc false
    defstruct [
      :resource,
      :filter,
      :limit,
      :sort,
      :tenant,
      :domain,
      :distinct,
      :distinct_sort,
      context: %{},
      calculations: [],
      aggregates: [],
      relationships: %{},
      offset: 0
    ]
  end

  defmodule TableManager do
    @moduledoc false
    use GenServer

    def start(resource, tenant) do
      table =
        if tenant && Ash.Resource.Info.multitenancy_strategy(resource) == :context do
          Module.concat(to_string(Ash.DataLayer.Ets.Info.table(resource)), to_string(tenant))
        else
          Ash.DataLayer.Ets.Info.table(resource)
        end

      if Ash.DataLayer.Ets.Info.private?(resource) do
        do_wrap_existing(resource, table)
      else
        case GenServer.start(__MODULE__, {resource, table},
               name: Module.concat(table, TableManager)
             ) do
          {:error, {:already_started, _pid}} ->
            ETS.Set.wrap_existing(table)

          {:error, error} ->
            {:error, error}

          _ ->
            ETS.Set.wrap_existing(table)
        end
      end
    end

    def init({resource, table}) do
      case do_wrap_existing(resource, table) do
        {:ok, table} ->
          {:ok, {resource, table}, :hibernate}

        {:error, error} ->
          {:error, error}
      end
    end

    def handle_call(:wait, _, state), do: {:reply, :ok, state}

    defp do_wrap_existing(_resource, table) do
      case ETS.Set.wrap_existing(table) do
        {:error, :table_not_found} ->
          case ETS.Set.new(
                 name: table,
                 protection: :public,
                 ordered: true,
                 read_concurrency: true
               ) do
            {:ok, tab} ->
              {:ok, tab}

            {:error, :table_already_exists} ->
              ETS.Set.wrap_existing(table)

            other ->
              other
          end

        {:ok, table} ->
          {:ok, table}

        {:error, other} ->
          {:error, other}
      end
    end
  end

  @doc "Stops the storage for a given resource/tenant (deleting all of the data)"
  # sobelow_skip ["DOS.StringToAtom"]
  def stop(resource, tenant \\ nil) do
    tenant =
      if Ash.Resource.Info.multitenancy_strategy(resource) == :context do
        tenant
      end

    if Ash.DataLayer.Ets.Info.private?(resource) do
      case Process.get({:ash_ets_table, resource, tenant}) do
        nil ->
          :ok

        table ->
          ETS.Set.delete(table)
      end
    else
      table =
        if tenant && Ash.Resource.Info.multitenancy_strategy(resource) == :context do
          String.to_atom(to_string(tenant) <> to_string(resource))
        else
          resource
        end

      name = Module.concat(table, TableManager)

      case Process.whereis(name) do
        nil ->
          :ok

        pid ->
          Process.exit(pid, :shutdown)
      end
    end
  end

  @doc false
  @impl true
  def can?(_, :distinct_sort), do: true
  def can?(resource, :async_engine), do: not Ash.DataLayer.Ets.Info.private?(resource)
  def can?(_, {:lateral_join, _}), do: true
  def can?(_, :bulk_create), do: true
  def can?(_, :composite_primary_key), do: true
  def can?(_, :expression_calculation), do: true
  def can?(_, :expression_calculation_sort), do: true
  def can?(_, :multitenancy), do: true
  def can?(_, :upsert), do: true
  def can?(_, :calculate), do: true
  def can?(_, :aggregate_filter), do: true
  def can?(_, :aggregate_sort), do: true
  def can?(_, {:aggregate_relationship, _}), do: true
  def can?(_, {:filter_relationship, _}), do: true
  def can?(_, {:aggregate, :count}), do: true
  def can?(_, {:aggregate, :first}), do: true
  def can?(_, {:aggregate, :sum}), do: true
  def can?(_, {:aggregate, :list}), do: true
  def can?(_, {:aggregate, :max}), do: true
  def can?(_, {:aggregate, :min}), do: true
  def can?(_, {:aggregate, :avg}), do: true
  def can?(_, {:aggregate, :exists}), do: true
  def can?(_, :changeset_filter), do: true
  def can?(_, :update_query), do: true
  def can?(_, :destroy_query), do: true
  def can?(resource, {:query_aggregate, kind}), do: can?(resource, {:aggregate, kind})

  def can?(_, :create), do: true
  def can?(_, :read), do: true

  def can?(resource, action_type) when action_type in ~w[update destroy]a do
    resource
    |> Ash.Resource.Info.primary_key()
    |> Enum.any?()
  end

  def can?(_, :sort), do: true
  def can?(_, :filter), do: true
  def can?(_, :limit), do: true
  def can?(_, :offset), do: true
  def can?(_, :boolean_filter), do: true
  def can?(_, :distinct), do: true
  def can?(_, :transact), do: false
  def can?(_, {:filter_expr, _}), do: true

  case Application.compile_env(:ash, :no_join_mnesia_ets) || false do
    false ->
      def can?(_, {:join, _resource}) do
        # we synthesize all filters under the hood using `Ash.Filter.Runtime`
        true
      end

    true ->
      def can?(_, {:join, _resource}) do
        # we synthesize all filters under the hood using `Ash.Filter.Runtime`
        false
      end

    :dynamic ->
      def can?(_, {:join, resource}) do
        Ash.Resource.Info.data_layer(resource) == __MODULE__ ||
          Application.get_env(:ash, :mnesia_ets_join?, true)
      end
  end

  def can?(_, :nested_expressions), do: true
  def can?(_, {:query_aggregate, :count}), do: true
  def can?(_, {:query_aggregate, :first}), do: true
  def can?(_, {:query_aggregate, :sum}), do: true
  def can?(_, {:query_aggregate, :list}), do: true
  def can?(_, {:query_aggregate, :max}), do: true
  def can?(_, {:query_aggregate, :min}), do: true
  def can?(_, {:query_aggregate, :avg}), do: true
  def can?(_, {:query_aggregate, :exists}), do: true
  def can?(_, :expr_error), do: true
  def can?(_, {:sort, _}), do: true
  def can?(_, {:atomic, :update}), do: true
  def can?(_, {:atomic, :upsert}), do: true
  def can?(_, _), do: false

  @doc false
  @impl true
  def resource_to_query(resource, domain) do
    %Query{
      resource: resource,
      domain: domain
    }
  end

  @doc false
  @impl true
  def limit(query, limit, _), do: {:ok, %{query | limit: limit}}

  @doc false
  @impl true
  def offset(query, offset, _), do: {:ok, %{query | offset: offset}}

  @doc false
  @impl true
  def add_calculations(query, calculations, _) do
    {:ok, %{query | calculations: query.calculations ++ calculations}}
  end

  @doc false
  @impl true
  def add_aggregate(query, aggregate, _),
    do: {:ok, %{query | aggregates: [aggregate | query.aggregates]}}

  @doc false
  @impl true
  def set_tenant(_resource, query, tenant) do
    {:ok, %{query | tenant: tenant}}
  end

  @doc false
  @impl true
  def set_context(_resource, query, context) do
    {:ok, %{query | context: context}}
  end

  @doc false
  @impl true
  def filter(query, filter, _resource) do
    if query.filter do
      {:ok, %{query | filter: Ash.Filter.add_to_filter!(query.filter, filter)}}
    else
      {:ok, %{query | filter: filter}}
    end
  end

  @doc false
  @impl true
  def sort(query, sort, _resource) do
    {:ok, %{query | sort: sort}}
  end

  @doc false
  @impl true
  def distinct(query, distinct, _resource) do
    {:ok, %{query | distinct: distinct}}
  end

  @impl true
  def distinct_sort(query, distinct_sort, _resource) do
    {:ok, %{query | distinct_sort: distinct_sort}}
  end

  @doc false
  @impl true
  def run_aggregate_query(%{domain: domain} = query, aggregates, resource) do
    case run_query(query, resource) do
      {:ok, results} ->
        Enum.reduce_while(aggregates, {:ok, %{}}, fn
          %{
            kind: kind,
            name: name,
            query: query,
            field: field,
            resource: resource,
            uniq?: uniq?,
            include_nil?: include_nil?,
            default_value: default_value,
            context: context
          },
          {:ok, acc} ->
            results
            |> filter_matches(
              Map.get(query || %{}, :filter),
              domain,
              context[:tenant],
              context[:actor]
            )
            |> case do
              {:ok, matches} ->
                field = field || Enum.at(Ash.Resource.Info.primary_key(resource), 0)

                value = aggregate_value(matches, kind, field, uniq?, include_nil?, default_value)
                {:cont, {:ok, Map.put(acc, name, value)}}

              {:error, error} ->
                {:halt, {:error, error}}
            end
        end)

      {:error, error} ->
        {:error, error}
    end
    |> case do
      {:error, error} ->
        {:error, error}

      other ->
        other
    end
  end

  @doc false
  @impl true
  def run_query(
        %Query{
          resource: resource,
          filter: filter,
          offset: offset,
          limit: limit,
          sort: sort,
          distinct: distinct,
          distinct_sort: distinct_sort,
          tenant: tenant,
          calculations: calculations,
          aggregates: aggregates,
          domain: domain,
          context: context
        },
        _resource,
        parent \\ nil
      ) do
    with {:ok, records} <- get_records(resource, tenant),
         {:ok, records} <-
           filter_matches(
             records,
             filter,
             domain,
             context[:private][:tenant],
             context[:private][:actor],
             parent
           ),
         records <- Sort.runtime_sort(records, distinct_sort || sort, domain: domain),
         records <- Sort.runtime_distinct(records, distinct, domain: domain),
         records <- Sort.runtime_sort(records, sort, domain: domain),
         records <- Enum.drop(records, offset || []),
         records <- do_limit(records, limit),
         {:ok, records} <-
           do_add_aggregates(records, domain, resource, aggregates),
         {:ok, records} <-
           do_add_calculations(
             records,
             resource,
             calculations,
             domain
           ) do
      {:ok, records}
    else
      {:error, error} ->
        {:error, error}
    end
  end

  defp do_limit(records, nil), do: records
  defp do_limit(records, limit), do: Enum.take(records, limit)

  @impl true
  def prefer_lateral_join_for_many_to_many?, do: false

  @impl true
  def run_query_with_lateral_join(
        query,
        root_data,
        _destination_resource,
        [
          {source_query, source_attribute, destination_attribute, relationship}
        ]
      ) do
    source_query =
      source_query
      |> Ash.Query.unset(:load)
      |> Ash.Query.unset(:page)
      |> Ash.Query.set_context(%{private: %{internal?: true}})
      |> Ash.Query.set_domain(query.domain)

    primary_key = Ash.Resource.Info.primary_key(source_query.resource)

    source_query =
      case primary_key do
        [] ->
          source_attributes = Enum.map(root_data, &Map.get(&1, source_attribute))

          Ash.Query.filter(source_query, ^ref(source_attribute) in ^source_attributes)

        [field] ->
          source_attributes = Enum.map(root_data, &Map.get(&1, field))
          Ash.Query.filter(source_query, ^ref(field) in ^source_attributes)

        fields ->
          filter = [
            or:
              Enum.map(root_data, fn record ->
                [and: Map.take(record, fields) |> Map.to_list()]
              end)
          ]

          Ash.Query.do_filter(source_query, filter)
      end

    source_query
    |> Ash.Actions.Read.unpaginated_read(nil, authorize?: false)
    |> case do
      {:error, error} ->
        {:error, error}

      {:ok, root_data} ->
        root_data
        |> Enum.reduce_while({:ok, []}, fn parent, {:ok, results} ->
          new_filter =
            if Map.get(relationship, :no_attributes?) do
              query.filter
            else
              filter =
                if is_nil(query.filter) do
                  %Ash.Filter{resource: query.resource, expression: true}
                else
                  query.filter
                end

              Ash.Filter.add_to_filter!(
                filter,
                Ash.Filter.parse!(
                  query.resource,
                  Ash.Expr.expr(^ref(destination_attribute) == ^Map.get(parent, source_attribute))
                )
              )
            end

          query = %{query | filter: new_filter}

          case run_query(query, relationship.source, parent) do
            {:ok, new_results} ->
              new_results =
                Enum.map(
                  new_results,
                  &Map.put(&1, :__lateral_join_source__, Map.take(parent, primary_key))
                )

              {:cont, {:ok, new_results ++ results}}

            {:error, error} ->
              {:halt, {:error, error}}
          end
        end)
    end
  end

  def run_query_with_lateral_join(query, root_data, _destination_resource, [
        {source_query, source_attribute, source_attribute_on_join_resource, relationship},
        {through_query, destination_attribute_on_join_resource, destination_attribute,
         _through_relationship}
      ]) do
    source_query =
      source_query
      |> Ash.Query.unset(:load)
      |> Ash.Query.unset(:page)
      |> Ash.Query.set_context(%{private: %{internal?: true}})
      |> Ash.Query.set_domain(query.domain)

    primary_key = Ash.Resource.Info.primary_key(source_query.resource)

    source_query =
      case primary_key do
        [] ->
          source_attributes = Enum.map(root_data, &Map.get(&1, source_attribute))

          Ash.Query.filter(source_query, ^ref(source_attribute) in ^source_attributes)

        [field] ->
          source_attributes = Enum.map(root_data, &Map.get(&1, field))
          Ash.Query.filter(source_query, ^ref(field) in ^source_attributes)

        fields ->
          filter = [
            or:
              Enum.map(root_data, fn record ->
                [and: Map.take(record, fields) |> Map.to_list()]
              end)
          ]

          Ash.Query.do_filter(source_query, filter)
      end

    source_query
    |> Ash.read(authorize?: false)
    |> case do
      {:error, error} ->
        {:error, error}

      {:ok, root_data} ->
        root_data
        |> Enum.reduce_while({:ok, []}, fn parent, {:ok, results} ->
          through_query
          |> Ash.Query.filter(
            ^ref(source_attribute_on_join_resource) ==
              ^Map.get(parent, source_attribute)
          )
          |> Ash.Query.set_context(%{private: %{internal?: true}})
          |> Ash.Query.set_domain(query.domain)
          |> Ash.read(authorize?: false)
          |> case do
            {:ok, join_data} ->
              join_attrs =
                Enum.map(join_data, &Map.get(&1, destination_attribute_on_join_resource))

              new_filter =
                if is_nil(query.filter) do
                  Ash.Filter.parse!(query.resource, [
                    {destination_attribute, [in: join_attrs]}
                  ])
                else
                  Ash.Filter.add_to_filter!(query.filter, [
                    {destination_attribute, [in: join_attrs]}
                  ])
                end

              query = %{query | filter: new_filter}

              case run_query(query, relationship.source, parent) do
                {:ok, new_results} ->
                  new_results =
                    Enum.flat_map(new_results, fn result ->
                      join_data
                      |> Enum.flat_map(fn join_row ->
                        # TODO: use `Ash.Type.equal?`
                        if Map.get(join_row, destination_attribute_on_join_resource) ==
                             Map.get(result, destination_attribute) do
                          [
                            Map.put(
                              result,
                              :__lateral_join_source__,
                              Map.take(parent, primary_key)
                            )
                          ]
                        else
                          []
                        end
                      end)
                    end)

                  {:cont, {:ok, new_results ++ results}}

                {:error, error} ->
                  {:halt, {:error, error}}
              end

            {:error, error} ->
              {:halt, {:error, error}}
          end
        end)
    end
  end

  def do_add_calculations(records, _resource, [], _domain), do: {:ok, records}

  def do_add_calculations(records, resource, calculations, domain) do
    Enum.reduce_while(records, {:ok, []}, fn record, {:ok, records} ->
      calculations
      |> Enum.reduce_while({:ok, record}, fn {calculation, expression}, {:ok, record} ->
        case Ash.Filter.hydrate_refs(expression, %{
               resource: resource,
               public?: false
             }) do
          {:ok, expression} ->
            case Ash.Expr.eval_hydrated(expression,
                   record: record,
                   resource: resource,
                   domain: domain,
                   actor: calculation.context.actor,
                   tenant: calculation.context.tenant
                 ) do
              {:ok, value} ->
                if calculation.load do
                  {:cont, {:ok, Map.put(record, calculation.load, value)}}
                else
                  {:cont,
                   {:ok,
                    Map.update!(
                      record,
                      :calculations,
                      &Map.put(&1, calculation.name, value)
                    )}}
                end

              :unknown ->
                if calculation.load do
                  {:cont, {:ok, Map.put(record, calculation.load, nil)}}
                else
                  {:cont,
                   {:ok,
                    Map.update!(
                      record,
                      :calculations,
                      &Map.put(&1, calculation.name, nil)
                    )}}
                end

              {:error, error} ->
                {:halt, {:error, error}}
            end

          {:error, error} ->
            {:halt, {:error, error}}
        end
      end)
      |> case do
        {:ok, record} ->
          {:cont, {:ok, [record | records]}}

        {:error, error} ->
          {:halt, {:error, error}}
      end
    end)
    |> case do
      {:ok, records} ->
        {:ok, Enum.reverse(records)}

      {:error, error} ->
        {:error, error}
    end
  end

  @doc false
  def do_add_aggregates(records, _domain, _resource, []), do: {:ok, records}

  def do_add_aggregates(records, domain, _resource, aggregates) do
    Enum.reduce_while(records, {:ok, []}, fn record, {:ok, records} ->
      aggregates
      |> Enum.reduce_while(
        {:ok, record},
        fn
          %{
            kind: kind,
            field: field,
            relationship_path: relationship_path,
            query: query,
            name: name,
            load: load,
            uniq?: uniq?,
            include_nil?: include_nil?,
            context: context,
            default_value: default_value,
            join_filters: join_filters
          },
          {:ok, record} ->
            with {:ok, loaded_record} <-
                   Ash.load(
                     record,
                     record.__struct__
                     |> Ash.Query.load(relationship_path_to_load(relationship_path, field))
                     |> Ash.Query.set_context(%{private: %{internal?: true}}),
                     domain: domain,
                     tenant: context[:tenant],
                     actor: context[:actor],
                     authorize?: false
                   ),
                 related <-
                   Ash.Filter.Runtime.get_related(
                     loaded_record,
                     relationship_path,
                     false,
                     join_filters,
                     [record],
                     domain
                   ),
                 {:ok, filtered} <-
                   filter_matches(
                     related,
                     query.filter,
                     domain,
                     context[:tenant],
                     context[:actor]
                   ),
                 sorted <- Sort.runtime_sort(filtered, query.sort, domain: domain) do
              field = field || Enum.at(Ash.Resource.Info.primary_key(query.resource), 0)

              value =
                aggregate_value(sorted, kind, field, uniq?, include_nil?, default_value)

              if load do
                {:cont, {:ok, Map.put(record, load, value)}}
              else
                {:cont, {:ok, Map.update!(record, :aggregates, &Map.put(&1, name, value))}}
              end
            else
              other ->
                {:halt, other}
            end
        end
      )
      |> case do
        {:ok, record} ->
          {:cont, {:ok, [record | records]}}

        {:error, error} ->
          {:halt, {:error, error}}
      end
    end)
    |> case do
      {:ok, records} ->
        {:ok, Enum.reverse(records)}

      {:error, error} ->
        {:error, error}
    end
  end

  defp relationship_path_to_load([], leaf) do
    leaf
  end

  defp relationship_path_to_load([key | rest], leaf) do
    [{key, relationship_path_to_load(rest, leaf)}]
  end

  @doc false
  def aggregate_value(records, kind, field, uniq?, include_nil?, default) do
    case kind do
      :count ->
        if uniq? do
          records
          |> Stream.map(&field_value(&1, field))
          |> Stream.uniq()
          |> Stream.reject(&is_nil/1)
          |> Enum.count()
        else
          Enum.count(records, &(not is_nil(field_value(&1, field))))
        end

      :exists ->
        case records do
          [] ->
            false

          _ ->
            true
        end

      :first ->
        if include_nil? do
          case records do
            [] ->
              nil

            [record | _rest] ->
              field_value(record, field)
          end
        else
          Enum.find_value(records, fn record ->
            case field_value(record, field) do
              nil ->
                nil

              value ->
                {:value, value}
            end
          end)
          |> case do
            nil -> nil
            {:value, value} -> value
          end
        end

      :list ->
        records
        |> Enum.map(fn record ->
          field_value(record, field)
        end)
        |> then(fn values ->
          if include_nil? do
            values
          else
            Enum.reject(values, &is_nil/1)
          end
        end)
        |> then(fn values ->
          if uniq? do
            Enum.uniq(values)
          else
            values
          end
        end)

      :avg ->
        records
        |> then(fn records ->
          if uniq? do
            records
            |> Stream.map(&field_value(&1, field))
            |> Stream.uniq()
          else
            records
            |> Stream.map(&field_value(&1, field))
          end
        end)
        |> Enum.reduce({nil, 0}, fn value, {sum, count} ->
          case value do
            nil ->
              {sum, count}

            value ->
              case {sum, value} do
                {nil, %Decimal{}} ->
                  {Decimal.new(value), count + 1}

                {_not_nil, %Decimal{}} ->
                  {Decimal.add(sum, value), count + 1}

                {nil, _not_decimal} ->
                  {value, count + 1}

                {_not_nil, _not_decimal} ->
                  {sum + value, count + 1}
              end
          end
        end)
        |> case do
          {_, 0} ->
            nil

          {%Decimal{} = sum, count} ->
            Decimal.div(sum, count)

          {sum, count} ->
            sum / count
        end

      kind when kind in [:sum, :max, :min] ->
        records
        |> Enum.map(&field_value(&1, field))
        |> case do
          [] ->
            nil

          items ->
            items =
              if uniq? do
                items |> Stream.uniq() |> Stream.reject(&is_nil/1)
              else
                items |> Stream.reject(&is_nil/1)
              end

            first_item = List.first(Enum.to_list(Stream.take(items, 1)))

            case kind do
              :sum ->
                if is_struct(first_item, Decimal) do
                  Enum.reduce(items, Decimal.new(0), &Decimal.add(&1, &2))
                else
                  Enum.sum(items)
                end

              :max ->
                if is_struct(first_item, Decimal) do
                  Enum.reduce(items, &Decimal.max(&1, &2))
                else
                  Enum.max(items)
                end

              :min ->
                if is_struct(first_item, Decimal) do
                  Enum.reduce(items, &Decimal.min(&1, &2))
                else
                  Enum.min(items)
                end
            end
        end
    end
    |> case do
      nil -> default
      other -> other
    end
  end

  defp field_value(nil, _), do: nil

  defp field_value(record, field) when is_atom(field) do
    Map.get(record, field)
  end

  defp field_value(record, %struct{load: load, name: name})
       when struct in [Ash.Query.Aggregate, Ash.Query.Calculation] do
    if load do
      Map.get(record, load)
    else
      case struct do
        Ash.Query.Aggregate ->
          Map.get(record.aggregates, name)

        Ash.Query.Calculation ->
          Map.get(record.calculations, name)
      end
    end
  end

  defp field_value(record, %{name: name}) do
    Map.get(record, name)
  end

  defp get_records(resource, tenant) do
    with {:ok, table} <- wrap_or_create_table(resource, tenant),
         {:ok, record_tuples} <- ETS.Set.to_list(table),
         records <- Enum.map(record_tuples, &elem(&1, 1)) do
      cast_records(records, resource)
    end
  end

  @doc false
  def cast_records(records, resource) do
    records
    |> Enum.reduce_while({:ok, []}, fn record, {:ok, casted} ->
      case cast_record(record, resource) do
        {:ok, casted_record} ->
          {:cont, {:ok, [casted_record | casted]}}

        {:error, error} ->
          {:halt, {:error, error}}
      end
    end)
    |> case do
      {:ok, records} ->
        {:ok, Enum.reverse(records)}

      {:error, error} ->
        {:error, error}
    end
  end

  @doc false
  def cast_record(record, resource) do
    resource
    |> Ash.Resource.Info.attributes()
    |> Enum.reduce_while({:ok, %{}}, fn attribute, {:ok, attrs} ->
      case Map.get(record, attribute.name) do
        nil ->
          {:cont, {:ok, Map.put(attrs, attribute.name, nil)}}

        value ->
          case Ash.Type.cast_stored(attribute.type, value, attribute.constraints) do
            {:ok, value} ->
              {:cont, {:ok, Map.put(attrs, attribute.name, value)}}

            :error ->
              {:halt,
               {:error, "Failed to load #{inspect(value)} as type #{inspect(attribute.type)}"}}

            {:error, error} ->
              {:halt, {:error, error}}
          end
      end
    end)
    |> case do
      {:ok, attrs} ->
        {:ok,
         %{
           struct(resource, attrs)
           | __meta__: %Ecto.Schema.Metadata{state: :loaded, schema: resource}
         }}

      {:error, error} ->
        {:error, error}
    end
  end

  defp filter_matches(
         records,
         filter,
         domain,
         _tenant,
         actor,
         parent \\ nil,
         conflicting_upsert_values \\ nil
       )

  defp filter_matches([], _, _domain, _tenant, _actor, _parent, _conflicting_upsert_values),
    do: {:ok, []}

  defp filter_matches(
         records,
         nil,
         _domain,
         _tenant,
         _actor,
         _parent,
         _conflicting_upsert_values
       ),
       do: {:ok, records}

  defp filter_matches(
         records,
         filter,
         domain,
         tenant,
         actor,
         parent,
         conflicting_upsert_values
       ) do
    Ash.Filter.Runtime.filter_matches(domain, records, filter,
      parent: parent,
      tenant: tenant,
      actor: actor,
      conflicting_upsert_values: conflicting_upsert_values
    )
  end

  @doc false
  @impl true
  def upsert(resource, changeset, keys, identity, opts \\ [from_bulk_create?: false]) do
    pkey = Ash.Resource.Info.primary_key(resource)
    keys = keys || pkey

    if (is_nil(identity) || !identity.nils_distinct?) &&
         Enum.any?(keys, &is_nil(Ash.Changeset.get_attribute(changeset, &1))) do
      create(resource, changeset, opts[:from_bulk_create?])
    else
      key_filters =
        Enum.map(keys, fn key ->
          {key,
           Ash.Changeset.get_attribute(changeset, key) || Map.get(changeset.params, key) ||
             Map.get(changeset.params, to_string(key))}
        end)

      query =
        resource
        |> Ash.Query.do_filter(and: [key_filters])
        |> then(fn query ->
          if is_nil(identity) || is_nil(identity.where) do
            query
          else
            Ash.Query.do_filter(query, identity.where)
          end
        end)

      to_set = Ash.Changeset.set_on_upsert(changeset, keys)

      resource
      |> resource_to_query(changeset.domain)
      |> Map.put(:filter, query.filter)
      |> Map.put(:tenant, changeset.tenant)
      |> run_query(resource)
      |> case do
        {:ok, []} ->
          create(resource, changeset, opts[:from_bulk_create?])

        {:ok, [result]} ->
          with {:ok, conflicting_upsert_values} <- Ash.Changeset.apply_attributes(changeset),
               {:ok, [^result]} <-
                 upsert_conflict_check(
                   changeset,
                   result,
                   conflicting_upsert_values
                 ) do
            changeset =
              changeset
              |> Map.put(:attributes, %{})
              |> Map.put(:data, result)
              |> Ash.Changeset.force_change_attributes(to_set)

            update(
              resource,
              %{changeset | action_type: :update, filter: nil},
              Map.take(result, pkey),
              opts[:from_bulk_create?]
            )
          else
            {:ok, []} ->
              {:ok, Ash.Resource.put_metadata(result, :upsert_skipped, true)}

            {:error, reason} ->
              {:error, reason}
          end

        {:ok, _} ->
          {:error, "Multiple records matching keys"}
      end
    end
  end

  @spec upsert_conflict_check(
          changeset :: Ash.Changeset.t(),
          subject :: record,
          conflicting_upsert_values :: record
        ) :: {:ok, [record]} | {:error, reason}
        when record: Ash.Resource.record(), reason: term()
  defp upsert_conflict_check(changeset, subject, conflicting_upsert_values)

  defp upsert_conflict_check(
         %Ash.Changeset{filter: nil},
         result,
         _conflicting_upsert_values
       ),
       do: {:ok, [result]}

  defp upsert_conflict_check(
         %Ash.Changeset{filter: filter, domain: domain, context: context},
         result,
         conflicting_upsert_values
       ) do
    filter_matches(
      [result],
      filter,
      domain,
      context.private[:tenant],
      context.private[:actor],
      nil,
      conflicting_upsert_values
    )
  end

  @impl true
  def bulk_create(resource, stream, options) do
    stream = Enum.to_list(stream)
    log_bulk_create(resource, stream, options)

    if options[:upsert?] do
      # This is not optimized, but thats okay for now
      stream
      |> Enum.reduce_while({:ok, []}, fn changeset, {:ok, results} ->
        changeset =
          Ash.Changeset.set_context(changeset, %{
            private: %{upsert_fields: options[:upsert_fields] || []}
          })

        case upsert(
               resource,
               changeset,
               options.upsert_keys,
               options.identity,
               Map.put(options, :from_bulk_create?, true)
             ) do
          {:ok, result} ->
            if Ash.Resource.get_metadata(result, :upsert_skipped) do
              {:cont, {:ok, results}}
            else
              {:cont,
               {:ok,
                [
                  Ash.Resource.put_metadata(
                    result,
                    :bulk_create_index,
                    changeset.context.bulk_create.index
                  )
                  | results
                ]}}
            end

          {:error, error} ->
            {:halt, {:error, error}}
        end
      end)
    else
      with {:ok, table} <- wrap_or_create_table(resource, options.tenant) do
        Enum.reduce_while(stream, {:ok, []}, fn changeset, {:ok, results} ->
          pkey =
            resource
            |> Ash.Resource.Info.primary_key()
            |> Enum.into(%{}, fn attr ->
              {attr, Ash.Changeset.get_attribute(changeset, attr)}
            end)

          with {:ok, record} <- Ash.Changeset.apply_attributes(changeset),
               record <- unload_relationships(resource, record) do
            {:cont, {:ok, [{pkey, changeset.context.bulk_create.index, record} | results]}}
          else
            {:error, error} ->
              {:halt, {:error, error}}
          end
        end)
        |> case do
          {:ok, records} ->
            case put_or_insert_new_batch(table, records, resource, options.return_records?) do
              :ok ->
                :ok

              {:ok, records} ->
                {:ok, Stream.map(records, &set_loaded/1)}

              {:error, error} ->
                {:error, error}
            end

          {:error, error} ->
            {:error, error}
        end
      end
    end
  end

  @doc false
  @impl true
  def create(resource, changeset, from_bulk_create? \\ false) do
    pkey =
      resource
      |> Ash.Resource.Info.primary_key()
      |> Enum.into(%{}, fn attr ->
        {attr, Ash.Changeset.get_attribute(changeset, attr)}
      end)

    with {:ok, table} <- wrap_or_create_table(resource, changeset.tenant),
         _ <- if(!from_bulk_create?, do: log_create(resource, changeset)),
         {:ok, record} <- Ash.Changeset.apply_attributes(changeset),
         record <- unload_relationships(resource, record),
         {:ok, record} <- put_or_insert_new(table, {pkey, record}, resource) do
      {:ok, set_loaded(record)}
    else
      {:error, error} -> {:error, error}
    end
  end

  defp set_loaded(%resource{} = record) do
    %{record | __meta__: %Ecto.Schema.Metadata{state: :loaded, schema: resource}}
  end

  defp put_or_insert_new(table, {pkey, record}, resource) do
    attributes = resource |> Ash.Resource.Info.attributes()

    case dump_to_native(record, attributes) do
      {:ok, casted} ->
        case ETS.Set.put(table, {pkey, casted}) do
          {:ok, set} ->
            {_key, record} = ETS.Set.get!(set, pkey)
            cast_record(record, resource)

          other ->
            other
        end

      other ->
        other
    end
  end

  defp put_or_insert_new_batch(table, records, resource, return_records?) do
    attributes = resource |> Ash.Resource.Info.attributes()

    Enum.reduce_while(records, {:ok, [], []}, fn {pkey, index, record}, {:ok, acc, indices} ->
      case dump_to_native(record, attributes) do
        {:ok, casted} ->
          {:cont, {:ok, [{pkey, casted} | acc], [{pkey, index} | indices]}}

        {:error, error} ->
          {:halt, {:error, error}}
      end
    end)
    |> case do
      {:ok, batch, indices} ->
        case ETS.Set.put(table, batch) do
          {:ok, set} ->
            if return_records? do
              Enum.reduce_while(indices, {:ok, []}, fn {pkey, index}, {:ok, acc} ->
                {_key, record} = ETS.Set.get!(set, pkey)

                case cast_record(record, resource) do
                  {:ok, casted} ->
                    {:cont,
                     {:ok, [Ash.Resource.put_metadata(casted, :bulk_create_index, index) | acc]}}

                  {:error, error} ->
                    {:halt, {:error, error}}
                end
              end)
            else
              :ok
            end

          other ->
            other
        end

      other ->
        other
    end
  end

  @doc false
  def dump_to_native(record, attributes) do
    Enum.reduce_while(attributes, {:ok, %{}}, fn attribute, {:ok, attrs} ->
      case Map.fetch(record, attribute.name) do
        :error ->
          {:cont, {:ok, attrs}}

        {:ok, value} ->
          case Ash.Type.dump_to_native(
                 attribute.type,
                 value,
                 attribute.constraints
               ) do
            {:ok, casted_value} ->
              {:cont, {:ok, Map.put(attrs, attribute.name, casted_value)}}

            :error ->
              {:halt,
               {:error,
                "Failed to dump #{inspect(Map.get(record, attribute.name))} as type #{inspect(attribute.type)}"}}

            {:error, error} ->
              {:halt, {:error, error}}
          end
      end
    end)
  end

  @doc false
  @impl true
  # This is synthesized behavior. Its not truly atomic.
  def destroy_query(query, changeset, resource, options) do
    acc =
      if options[:return_records?] do
        {:ok, []}
      else
        :ok
      end

    log_destroy_query(resource, query)

    query
    |> run_query(resource)
    |> case do
      {:ok, results} ->
        results
        |> Enum.reduce_while(acc, fn result, acc ->
          case destroy(query.resource, %{changeset | data: result}) do
            :ok ->
              case acc do
                :ok ->
                  {:cont, :ok}

                {:ok, results} ->
                  {:cont, {:ok, [result | results]}}
              end

            {:error, error} ->
              {:halt, {:error, error}}
          end
        end)
        |> case do
          :ok -> :ok
          {:ok, results} -> {:ok, Enum.reverse(results)}
          {:error, error} -> {:error, error}
        end

      {:error, error} ->
        {:error, error}
    end
  end

  @doc false
  @impl true
  def destroy(resource, %{data: record, filter: filter} = changeset) do
    do_destroy(
      resource,
      record,
      changeset.tenant,
      filter,
      changeset.domain,
      changeset.context[:private][:actor]
    )
  end

  defp do_destroy(resource, record, tenant, filter, domain, actor) do
    with {:ok, table} <- wrap_or_create_table(resource, tenant) do
      pkey = Map.take(record, Ash.Resource.Info.primary_key(resource))

      if has_filter?(filter) do
        case ETS.Set.get(table, pkey) do
          {:ok, {_key, record}} when is_map(record) ->
            with {:ok, record} <- cast_record(record, resource),
                 {:ok, [_]} <- filter_matches([record], filter, domain, tenant, actor) do
              with {:ok, _} <- ETS.Set.delete(table, pkey) do
                :ok
              end
            else
              {:ok, []} ->
                {:error,
                 Ash.Error.Changes.StaleRecord.exception(
                   resource: resource,
                   filter: filter
                 )}

              {:error, error} ->
                {:error, error}
            end

          {:error, error} ->
            {:error, error}
        end
      else
        with {:ok, _} <- ETS.Set.delete(table, pkey) do
          :ok
        end
      end
    end
  end

  defp has_filter?(filter) when filter in [nil, true], do: false
  defp has_filter?(%Ash.Filter{expression: expression}) when expression == true, do: false
  defp has_filter?(_filter), do: true

  @doc false
  @impl true
  # This is synthesized behavior. Its not truly atomic.
  def update_query(query, changeset, resource, options) do
    acc =
      if options[:return_records?] do
        {:ok, []}
      else
        :ok
      end

    log_update_query(resource, query, changeset)

    query
    |> Map.update!(:filter, fn filter ->
      if is_nil(changeset.filter) do
        filter
      else
        filter = filter || %Ash.Filter{resource: changeset.resource}
        Ash.Filter.add_to_filter!(filter, changeset.filter)
      end
    end)
    |> run_query(resource)
    |> case do
      {:ok, results} ->
        Enum.reduce_while(results, acc, fn result, acc ->
          case update(query.resource, %{changeset | data: result}, nil, true) do
            {:ok, result} ->
              case acc do
                :ok ->
                  {:cont, :ok}

                {:ok, results} ->
                  {:cont, {:ok, [result | results]}}
              end

            {:error, error} ->
              {:halt, {:error, error}}
          end
        end)

      {:error, error} ->
        {:error, error}
    end
    |> case do
      :ok -> :ok
      {:ok, results} -> {:ok, Enum.reverse(results)}
      {:error, error} -> {:error, error}
    end
  end

  @doc false
  @impl true
  def update(resource, changeset, pkey \\ nil, from_bulk? \\ false) do
    pkey = pkey || pkey_map(resource, changeset.data)

    with {:ok, table} <- wrap_or_create_table(resource, changeset.tenant),
         _ <- if(!from_bulk?, do: log_update(resource, pkey, changeset)),
         {:ok, record} <-
           do_update(
             table,
             {pkey, changeset.attributes, changeset.atomics, changeset.filter},
             changeset.domain,
             changeset.tenant,
             resource,
             changeset.context[:private][:actor]
           ),
         {:ok, record} <- cast_record(record, resource),
         record <- retain_fields(record, changeset) do
      new_pkey = pkey_map(resource, record)

      if new_pkey != pkey do
        case destroy(resource, changeset) do
          :ok ->
            {:ok, %{record | __meta__: %Ecto.Schema.Metadata{state: :loaded, schema: resource}}}

          {:error, error} ->
            {:error, error}
        end
      else
        {:ok, %{record | __meta__: %Ecto.Schema.Metadata{state: :loaded, schema: resource}}}
      end
    else
      {:error, error} ->
        {:error, error}
    end
  end

  defp retain_fields(%struct{} = record, %{data: %struct{} = data}) do
    attributes = Enum.map(Ash.Resource.Info.attributes(struct), & &1.name)
    take = [:__metadata__, :__meta__ | attributes]
    Map.merge(data, Map.take(record, take))
  end

  defp retain_fields(record, _) do
    record
  end

  @impl true
  def calculate(resource, expressions, context) do
    Enum.reduce_while(expressions, {:ok, []}, fn expression, {:ok, results} ->
      case Ash.Expr.eval(expression, resource: resource, context: context) do
        {:ok, result} -> {:cont, {:ok, [result | results]}}
        {:error, error} -> {:halt, {:error, error}}
      end
    end)
    |> case do
      {:ok, results} -> {:ok, Enum.reverse(results)}
      {:error, error} -> {:error, error}
    end
  end

  @doc false
  def pkey_map(resource, data) do
    resource
    |> Ash.Resource.Info.primary_key()
    |> Enum.into(%{}, fn attr ->
      {attr, Map.get(data, attr)}
    end)
  end

  defp do_update(
         table,
         {pkey, record, atomics, changeset_filter},
         domain,
         tenant,
         resource,
         actor
       ) do
    attributes = resource |> Ash.Resource.Info.attributes()

    case dump_to_native(record, attributes) do
      {:ok, casted} ->
        case ETS.Set.get(table, pkey) do
          {:ok, {_key, record}} when is_map(record) ->
            with {:ok, casted_record} <- cast_record(record, resource),
                 {:ok, [casted_record]} <-
                   filter_matches([casted_record], changeset_filter, domain, tenant, actor) do
              case atomics do
                empty when empty in [nil, []] ->
                  data = Map.merge(record, casted)

                  put_data(table, pkey, data)

                atomics ->
                  with {:ok, atomics} <- make_atomics(atomics, resource, domain, casted_record) do
                    data = record |> Map.merge(casted) |> Map.merge(atomics)
                    put_data(table, pkey, data)
                  end
              end
            else
              {:error, error} ->
                {:error, error}

              {:ok, []} ->
                {:error,
                 Ash.Error.Changes.StaleRecord.exception(
                   resource: resource,
                   filter: changeset_filter
                 )}
            end

          {:ok, _} ->
            {:error,
             Ash.Error.Changes.StaleRecord.exception(
               resource: record.__struct__,
               filter: changeset_filter
             )}

          other ->
            other
        end

      {:error, error} ->
        {:error, error}
    end
  end

  defp put_data(table, pkey, data) do
    case ETS.Set.put(
           table,
           {pkey, data}
         ) do
      {:ok, _set} ->
        {:ok, data}

      error ->
        error
    end
  end

  defp make_atomics(atomics, resource, domain, record) do
    Enum.reduce_while(atomics, {:ok, %{}}, fn {key, expr}, {:ok, acc} ->
      case Ash.Expr.eval(expr,
             resource: resource,
             record: record,
             domain: domain,
             unknown_on_unknown_refs?: true
           ) do
        {:ok, value} ->
          {:cont, {:ok, Map.put(acc, key, value)}}

        {:error, error} ->
          {:halt, {:error, error}}

        :unknown ->
          {:halt, {:error, "Could not evaluate expression #{inspect(expr)}"}}
      end
    end)
  end

  defp unload_relationships(resource, record) do
    empty = resource.__struct__()

    resource
    |> Ash.Resource.Info.relationships()
    |> Enum.reduce(record, fn relationship, record ->
      Map.put(record, relationship.name, Map.get(empty, relationship.name))
    end)
  end

  # sobelow_skip ["DOS.StringToAtom"]
  defp wrap_or_create_table(resource, tenant) do
    tenant =
      if Ash.Resource.Info.multitenancy_strategy(resource) == :context do
        tenant
      end

    if Ash.DataLayer.Ets.Info.private?(resource) do
      configured_table = Ash.DataLayer.Ets.Info.table(resource)

      case Process.get({:ash_ets_table, configured_table, tenant}) do
        nil ->
          case ETS.Set.new(
                 protection: :private,
                 ordered: true,
                 read_concurrency: true
               ) do
            {:ok, table} ->
              Process.put({:ash_ets_table, configured_table, tenant}, table)
              {:ok, table}

            {:error, error} ->
              {:error, error}
          end

        tab ->
          {:ok, tab}
      end
    else
      TableManager.start(resource, tenant)
    end
  end

  defp log_bulk_create(resource, stream, options) do
    Logger.debug(
      "#{bulk_create_operation(options, stream)} #{inspect(resource)}: #{inspect(stream)}"
    )
  end

  defp bulk_create_operation(
         %{
           upsert?: true,
           upsert_keys: upsert_keys,
           upsert_fields: upsert_fields,
           upsert_where: expr
         },
         stream
       ) do
    where_expr =
      if is_nil(expr) do
        ""
      else
        "where #{inspect(expr)}"
      end

    "Upserting #{Enum.count(stream)} on #{inspect(upsert_keys)} #{where_expr}, setting #{inspect(List.wrap(upsert_fields))}"
  end

  defp bulk_create_operation(_options, stream) do
    "Creating #{Enum.count(stream)}"
  end

  defp log_destroy_query(resource, query) do
    limit =
      if query.limit do
        "#{query.limit} "
      else
        ""
      end

    offset =
      if query.offset && query.offset != 0 do
        " skipping #{query.offset} records"
      else
        ""
      end

    sort =
      if query.sort && query.sort != [] do
        " sorted by #{inspect(query.sort)}"
      else
        ""
      end

    filter =
      if query.filter && query.filter != nil && query.filter.expression != nil do
        " where `#{inspect(query.filter.expression)}`"
      else
        ""
      end

    Logger.debug("""
    ETS: Destroying #{limit}#{inspect(resource)}#{offset}#{sort}#{filter}
    """)

    :ok
  end

  defp log_update_query(resource, query, changeset) do
    limit =
      if query.limit do
        "#{query.limit} "
      else
        ""
      end

    offset =
      if query.offset && query.offset != 0 do
        " skipping #{query.offset} records"
      else
        ""
      end

    sort =
      if query.sort && query.sort != [] do
        " sorted by #{inspect(query.sort)}"
      else
        ""
      end

    filter =
      if query.filter && query.filter != nil && query.filter.expression != nil do
        " matching filter `#{inspect(query.filter.expression)}`"
      else
        ""
      end

    Logger.debug("""
    ETS: Updating #{limit}#{inspect(resource)}#{offset}#{sort}#{filter}:

    #{inspect(Map.merge(changeset.attributes, Map.new(changeset.atomics)), pretty: true)}
    """)

    :ok
  end

  defp log_create(resource, changeset) do
    Logger.debug("""
    Creating #{inspect(resource)}:

    #{inspect(Map.merge(changeset.attributes, Map.new(changeset.atomics)), pretty: true)}
    """)
  end

  defp log_update(resource, pkey, changeset) do
    pkey =
      if Enum.count_until(pkey, 2) == 2 do
        inspect(pkey)
      else
        inspect(pkey |> Enum.at(0) |> elem(1))
      end

    Logger.debug("""
    "Updating #{inspect(resource)} #{pkey}:

    #{inspect(Map.merge(changeset.attributes, Map.new(changeset.atomics)), pretty: true)}
    """)
  end
end