lib/ecto/adapters/mnesia.ex

defmodule Ecto.Adapters.Mnesia do
  @moduledoc """
  Please find the adapter documentation in the [README](README.md)
  """

  # @dialyzer :no_return

  @behaviour Ecto.Adapter
  @behaviour Ecto.Adapter.Queryable
  @behaviour Ecto.Adapter.Schema
  @behaviour Ecto.Adapter.Storage
  @behaviour Ecto.Adapter.Transaction

  case Application.load(:ecto) do
    :ok -> :ok
    {:error, {:already_loaded, _}} -> :ok
  end

  @ecto_vsn :ecto |> Application.spec(:vsn) |> to_string()

  import Ecto.Query

  alias Ecto.Adapters.Mnesia
  alias Ecto.Adapters.Mnesia.Config
  alias Ecto.Adapters.Mnesia.Connection
  alias Ecto.Adapters.Mnesia.Constraint
  alias Ecto.Adapters.Mnesia.Query
  alias Ecto.Adapters.Mnesia.Record
  alias Ecto.Adapters.Mnesia.Source
  alias Ecto.Adapters.Mnesia.Storage
  alias Ecto.Adapters.Mnesia.Type

  require Logger

  @impl Ecto.Adapter
  defmacro __before_compile__(_env), do: []

  @impl Ecto.Adapter
  def checkout(meta, _options, function) do
    :ok = Connection.checkout(meta)
    Process.put({__MODULE__, :checkout}, true)

    ret = function.()

    Process.delete({__MODULE__, :checkout})
    ret
  end

  @impl Ecto.Adapter
  def checked_out?(_adapter_meta) do
    Process.get({__MODULE__, :checkout}, false)
  end

  @impl Ecto.Adapter
  def ensure_all_started(options, _type) do
    _config =
      options
      |> Config.new()
      |> Config.ensure_mnesia_config()

    Application.ensure_all_started(:mnesia)
  end

  @impl Ecto.Adapter
  def init(options \\ []) do
    config = Config.new(options)

    if Storage.status(config) == :down do
      raise "Mnesia storage is down. Please run `mix ecto.create`"
    end

    {:ok, Connection.child_spec(config), config}
  end

  @impl Ecto.Adapter
  def dumpers(:utc_datetime, type), do: [type, &Type.dump_datetime(&1, :second)]

  def dumpers(:utc_datetime_usec, type), do: [type, &Type.dump_datetime(&1, :microsecond)]

  def dumpers(:naive_datetime, type), do: [type, &Type.dump_naive_datetime(&1, :second)]

  def dumpers(:naive_datetime_usec, type), do: [type, &Type.dump_naive_datetime(&1, :microsecond)]

  def dumpers(:date, type), do: [type, &Type.dump_date/1]

  def dumpers(:time, type), do: [type, &Type.dump_time/1]

  def dumpers(:time_usec, type), do: [type, &Type.dump_time_usec/1]

  def dumpers(_, type), do: [type]

  @impl Ecto.Adapter
  def loaders(:utc_datetime, type), do: [&Type.load_datetime(&1, :second), type]

  def loaders(:utc_datetime_usec, type), do: [&Type.load_datetime(&1, :microsecond), type]

  def loaders(:naive_datetime, type), do: [&Type.load_naive_datetime(&1, :second), type]

  def loaders(:naive_datetime_usec, type), do: [&Type.load_naive_datetime(&1, :microsecond), type]

  def loaders(:date, type), do: [&Type.load_date/1, type]

  def loaders(:time, type), do: [&Type.load_time/1, type]

  def loaders(:time_usec, type), do: [&Type.load_time_usec/1, type]

  def loaders(_primitive, type), do: [type]

  @impl Ecto.Adapter.Queryable
  def prepare(type, %Ecto.Query{} = query) do
    q = Query.from_ecto_query(type, query)
    {q.cache, q}
  end

  @impl Ecto.Adapter.Queryable
  def execute(adapter_meta, _query_meta, {:cache, update, prepared}, params, _opts) do
    {:ok, result} = mnesia_call(adapter_meta, prepared, params)
    update.(prepared)
    result
  end

  def execute(adapter_meta, _query_meta, {:cached, _update, _reset, cached}, params, _opts) do
    {:ok, result} = mnesia_call(adapter_meta, cached, params)
    result
  end

  def execute(adapter_meta, _query_meta, {:nocache, prepared}, params, _opts) do
    {:ok, result} = mnesia_call(adapter_meta, prepared, params)
    result
  end

  @impl Ecto.Adapter.Queryable
  def stream(
        _adapter_meta,
        _query_meta,
        {:nocache, %Mnesia.Query{query: query, answers: answers}},
        params,
        _opts
      ) do
    case :mnesia.transaction(fn ->
           query.(params)
           |> answers.(params)
           |> Enum.map(&Tuple.to_list(&1))
         end) do
      {:atomic, result} ->
        result

      _ ->
        []
    end
  end

  @impl Ecto.Adapter.Schema
  def autogenerate(:id), do: nil

  def autogenerate(:binary_id), do: Ecto.UUID.generate()

  @doc """
  Increment autogenerated id and return new value for given source and field
  """
  @spec next_id(atom(), atom()) :: integer()
  def next_id(table_name, key) do
    :mnesia.dirty_update_counter(Connection.id_seq({table_name, key}), 1)
  end

  @impl Ecto.Adapter.Schema
  def insert(adapter_meta, schema_meta, params, on_conflict, returning, _opts) do
    source = Source.new(schema_meta)

    case tc_tx(fn -> upsert(source, params, on_conflict, adapter_meta) end) do
      {time, {:atomic, [record]}} ->
        result = Record.select(record, returning, source)
        Logger.debug("QUERY OK source=#{inspect(schema_meta.source)} type=insert db=#{time}µs")
        {:ok, result}

      {time, {:aborted, {:invalid, constraints}}} ->
        Logger.debug(
          "QUERY ERROR source=#{inspect(schema_meta.source)} type=insert db=#{time}µs #{
            inspect(constraints)
          }"
        )

        {:invalid, constraints}
    end
  end

  if Version.compare(@ecto_vsn, "3.6.0") in [:eq, :gt] do
    @impl Ecto.Adapter.Schema
    def insert_all(adapter_meta, schema, header, records, on_conflict, returning, [], opts),
      do: do_insert_all(adapter_meta, schema, header, records, on_conflict, returning, opts)

    @impl Ecto.Adapter.Schema
    def insert_all(_, _, _, _, _, _, _placeholders, _),
      do: raise(ArgumentError, ":placeholders is not supported by mnesia")
  else
    @impl Ecto.Adapter.Schema
    def insert_all(adapter_meta, schema, header, records, on_conflict, returning, opts),
      do: do_insert_all(adapter_meta, schema, header, records, on_conflict, returning, opts)
  end

  @impl Ecto.Adapter.Schema
  def update(_adapter_meta, schema_meta, params, filters, returning, _opts) do
    source = Connection.source(schema_meta)
    {_cache, prepared} = Mnesia.Query.Qlc.query(:all, [], [source])
    query = prepared.(filters)

    select_fun = fn ->
      try do
        query.(params) |> Mnesia.Query.Qlc.answers(nil, nil).(params) |> Enum.to_list()
      rescue
        _ ->
          {:atomic, []}
      end
    end

    with {selectTime, {:atomic, [attributes]}} <-
           tc_tx(select_fun),
         {updateTime, {:atomic, update}} <-
           tc_tx(fn ->
             try do
               do_update(attributes, params, source)
             rescue
               _ ->
                 {:atomic, nil}
             end
           end) do
      result = Record.select(update, returning, source)

      Logger.debug(
        "QUERY OK source=#{inspect(source.table)} type=update db=#{selectTime + updateTime}µs"
      )

      {:ok, result}
    else
      {time, {:atomic, []}} ->
        Logger.debug(
          "QUERY ERROR source=#{inspect(source.table)} type=update db=#{time}µs \"No results\""
        )

        {:error, :stale}

      {time, {:aborted, {:invalid, constraints}}} ->
        Logger.debug(
          "QUERY ERROR source=#{inspect(source.table)} type=update db=#{time}µs #{
            inspect(constraints)
          }"
        )

        {:invalid, constraints}
    end
  end

  @impl Ecto.Adapter.Schema
  def delete(_adapter_meta, schema_meta, filters, _opts) do
    source = Connection.source(schema_meta)
    {_cache, prepared} = Mnesia.Query.Qlc.query(:all, [], [source])
    query = prepared.(filters)

    select_fun = fn ->
      try do
        Mnesia.Query.Qlc.answers(nil, nil).(query.([]), [])
        |> Enum.map(&Tuple.to_list(&1))
      rescue
        _ ->
          {:atomic, []}
      end
    end

    with {selectTime, {:atomic, [[id | _t]]}} <-
           tc_tx(select_fun),
         {deleteTime, {:atomic, :ok}} <-
           tc_tx(fn ->
             :mnesia.delete(source.table, id, :write)
           end) do
      Logger.debug(
        "QUERY OK source=#{inspect(source.table)} type=delete db=#{selectTime + deleteTime}µs"
      )

      {:ok, []}
    else
      {time, {:atomic, []}} ->
        Logger.debug(
          "QUERY ERROR source=#{inspect(source.table)} type=delete db=#{time}µs \"No results\""
        )

        {:error, :stale}

      {time, {:aborted, constraints}} ->
        Logger.debug(
          "QUERY ERROR source=#{inspect(source.table)} type=delete db=#{time}µs #{
            inspect(constraints)
          }"
        )

        {:invalid, constraints}
    end
  end

  @impl Ecto.Adapter.Transaction
  def in_transaction?(_adapter_meta), do: :mnesia.is_transaction()

  @impl Ecto.Adapter.Transaction
  def transaction(_adapter_meta, _options, function) when is_function(function, 0) do
    case :mnesia.transaction(function) do
      {:atomic, result} -> {:ok, result}
      {:aborted, reason} -> {:error, reason}
    end
  end

  @impl Ecto.Adapter.Transaction
  def rollback(_adapter_meta, value) do
    if :mnesia.is_transaction() do
      throw(:mnesia.abort(value))
    else
      raise "not inside transaction"
    end
  end

  @impl Ecto.Adapter.Storage
  defdelegate storage_up(config), to: Storage, as: :up

  @impl Ecto.Adapter.Storage
  defdelegate storage_down(config), to: Storage, as: :down

  @impl Ecto.Adapter.Storage
  defdelegate storage_status(config), to: Storage, as: :status

  ###
  ### Priv
  ###
  defp do_insert_all(adapter_meta, schema_meta, _header, records, on_conflict, returning, _opts) do
    source = Source.new(schema_meta)

    case tc_tx(fn ->
           Enum.map(records, fn params ->
             upsert(source, params, on_conflict, adapter_meta)
           end)
         end) do
      {time, {:atomic, created_records}} ->
        result =
          Enum.map(created_records, fn [record] ->
            record
            |> Record.select(returning, source)
            |> Enum.map(&elem(&1, 1))
          end)

        Logger.debug(
          "QUERY OK source=#{inspect(schema_meta.source)} type=insert_all db=#{time}µs"
        )

        {length(result), result}

      {time, {:aborted, {:invalid, constraints}}} ->
        Logger.debug(
          "QUERY ERROR source=#{inspect(schema_meta.source)} type=insert_all db=#{time}µs #{
            inspect(constraints)
          }"
        )

        {0, nil}
    end
  end

  defp mnesia_call(
         _adapter_meta,
         %Mnesia.Query{
           type: :all,
           sources: sources,
           query: query,
           sort: sort,
           answers: answers
         },
         params
       ) do
    case tc_tx(fn ->
           query.(params)
           |> sort.()
           |> answers.(params)
           |> Enum.map(&Tuple.to_list(&1))
         end) do
      {time, {:atomic, result}} ->
        Logger.debug("QUERY OK sources=#{inspect(sources)} type=all db=#{time}µs")

        {:ok, {length(result), result}}

      {time, {:aborted, error}} ->
        Logger.debug(
          "QUERY ERROR sources=#{inspect(sources)} type=all db=#{time}µs #{inspect(error)}"
        )

        {:ok, {0, []}}
    end
  end

  defp mnesia_call(
         _adapter_meta,
         %Mnesia.Query{
           type: :update_all,
           sources: [%Source{} = source | _] = sources,
           query: query,
           answers: answers,
           new_record: new_record
         },
         params
       ) do
    case tc_tx(fn ->
           query.(params)
           |> answers.(params)
           |> Enum.map(&new_record.(&1, params))
           |> Enum.map(fn record ->
             with :ok <- :mnesia.write(source.table, record, :write) do
               Record.to_schema(record, source)
             end
           end)
         end) do
      {time, {:atomic, result}} ->
        Logger.debug(
          "QUERY OK sources=#{sources |> Enum.map(& &1.table) |> Enum.join(",")} type=update_all db=#{
            time
          }µs"
        )

        {:ok, {length(result), result}}

      {time, {:aborted, error}} ->
        Logger.debug(
          "QUERY ERROR sources=#{sources |> Enum.map(& &1.table) |> Enum.join(",")} type=update_all db=#{
            time
          }µs #{inspect(error)}"
        )

        {:ok, {0, nil}}
    end
  end

  defp mnesia_call(
         _adapter_meta,
         %Mnesia.Query{
           original: original,
           type: :delete_all,
           sources: [%Source{} = source | _] = sources,
           query: query,
           answers: answers
         },
         params
       ) do
    case tc_tx(fn ->
           query.(params)
           |> answers.(params)
           |> Enum.map(fn tuple ->
             # Works only if query selects id at first, see: https://gitlab.com/patatoid/ecto3_mnesia/-/issues/15
             id = elem(tuple, 0)
             :mnesia.delete(source.table, id, :write)
             Tuple.to_list(tuple)
           end)
         end) do
      {time, {:atomic, records}} ->
        Logger.debug(
          "QUERY OK sources=#{sources |> Enum.map(& &1.table) |> Enum.join(",")} type=delete_all db=#{
            time
          }µs"
        )

        result =
          case original.select do
            nil -> nil
            %Ecto.Query.SelectExpr{} -> records
          end

        {:ok, {length(records), result}}

      {time, {:aborted, error}} ->
        Logger.debug(
          "QUERY ERROR sources=#{sources |> Enum.map(& &1.table) |> Enum.join(",")} type=delete_all db=#{
            time
          }µs #{inspect(error)}"
        )

        {:ok, {0, nil}}
    end
  end

  defp upsert(source, params, {:raise, [], []}, adapter_meta) do
    case conflict?(params, source, adapter_meta) do
      nil ->
        do_insert(params, source)

      {_rec, constraints} ->
        :mnesia.abort({:invalid, constraints})
    end
  end

  defp upsert(source, params, {:nothing, [], []}, adapter_meta) do
    case conflict?(params, source, adapter_meta) do
      nil ->
        do_insert(params, source)

      {_rec, _constraints} ->
        [Record.new(params, source)]
    end
  end

  defp upsert(source, params, {fields, [], []}, adapter_meta) when is_list(fields) do
    all_fields = Source.fields(source)

    case all_fields -- fields do
      [] ->
        # ie replace_all
        do_insert(params, source)

      _ ->
        case conflict?(params, source, adapter_meta) do
          nil ->
            do_insert(params, source)

          {conflict, _constraints} ->
            updated = conflict |> Record.new(source) |> Record.update(params, source, fields)
            with :ok <- :mnesia.write(source.table, updated, :write), do: [updated]
        end
    end
  end

  defp do_insert(params, source) do
    params = Record.gen_id(params, source)

    with {:constraints, []} <- {:constraints, Constraint.check(source, params)},
         record = Record.new(params, source),
         :ok <- :mnesia.write(source.table, record, :write) do
      [record]
    else
      {:constraints, constraints} -> :mnesia.abort({:invalid, constraints})
    end
  end

  defp do_update(orig, updates, source) do
    updated =
      orig
      |> Record.new(source)
      |> Record.update(updates, source)

    params = Record.select(updated, source.attributes, source)

    with {:constraints, []} <- {:constraints, Constraint.check(source, params)},
         :ok <- :mnesia.write(source.table, updated, :write) do
      updated
    else
      {:constraints, constraints} -> :mnesia.abort({:invalid, constraints})
    end
  end

  defp conflict?(params, source, %{repo: repo}) do
    source
    |> Source.uniques(params)
    |> case do
      [] ->
        nil

      uniques ->
        source.schema
        |> from()
        |> where(^uniques)
        |> repo.one()
        |> case do
          nil ->
            nil

          conflict ->
            constraints =
              uniques
              |> Enum.reduce([], fn {key, value}, acc ->
                case Map.get(conflict, key) do
                  ^value -> [{:unique, "#{source.table}_#{key}_index"} | acc]
                  _ -> acc
                end
              end)

            {conflict, constraints}
        end
    end
  end

  defp tc_tx(fun), do: :timer.tc(:mnesia, :transaction, [fun])
end