defmodule Ecto.Adapters.Mnesia do
@moduledoc """
Please find the adapter documentation in the [README](README.md)
"""
# @dialyzer :no_return
@behaviour Ecto.Adapter
@behaviour Ecto.Adapter.Queryable
@behaviour Ecto.Adapter.Schema
@behaviour Ecto.Adapter.Storage
@behaviour Ecto.Adapter.Transaction
case Application.load(:ecto) do
:ok -> :ok
{:error, {:already_loaded, _}} -> :ok
end
@ecto_vsn :ecto |> Application.spec(:vsn) |> to_string()
import Ecto.Query
alias Ecto.Adapters.Mnesia
alias Ecto.Adapters.Mnesia.Config
alias Ecto.Adapters.Mnesia.Connection
alias Ecto.Adapters.Mnesia.Constraint
alias Ecto.Adapters.Mnesia.Query
alias Ecto.Adapters.Mnesia.Record
alias Ecto.Adapters.Mnesia.Source
alias Ecto.Adapters.Mnesia.Storage
alias Ecto.Adapters.Mnesia.Type
require Logger
@impl Ecto.Adapter
defmacro __before_compile__(_env), do: []
@impl Ecto.Adapter
def checkout(meta, _options, function) do
:ok = Connection.checkout(meta)
Process.put({__MODULE__, :checkout}, true)
ret = function.()
Process.delete({__MODULE__, :checkout})
ret
end
@impl Ecto.Adapter
def checked_out?(_adapter_meta) do
Process.get({__MODULE__, :checkout}, false)
end
@impl Ecto.Adapter
def ensure_all_started(options, _type) do
_config =
options
|> Config.new()
|> Config.ensure_mnesia_config()
Application.ensure_all_started(:mnesia)
end
@impl Ecto.Adapter
def init(options \\ []) do
config = Config.new(options)
if Storage.status(config) == :down do
raise "Mnesia storage is down. Please run `mix ecto.create`"
end
{:ok, Connection.child_spec(config), config}
end
@impl Ecto.Adapter
def dumpers(:utc_datetime, type), do: [type, &Type.dump_datetime(&1, :second)]
def dumpers(:utc_datetime_usec, type), do: [type, &Type.dump_datetime(&1, :microsecond)]
def dumpers(:naive_datetime, type), do: [type, &Type.dump_naive_datetime(&1, :second)]
def dumpers(:naive_datetime_usec, type), do: [type, &Type.dump_naive_datetime(&1, :microsecond)]
def dumpers(:date, type), do: [type, &Type.dump_date/1]
def dumpers(:time, type), do: [type, &Type.dump_time/1]
def dumpers(:time_usec, type), do: [type, &Type.dump_time_usec/1]
def dumpers(_, type), do: [type]
@impl Ecto.Adapter
def loaders(:utc_datetime, type), do: [&Type.load_datetime(&1, :second), type]
def loaders(:utc_datetime_usec, type), do: [&Type.load_datetime(&1, :microsecond), type]
def loaders(:naive_datetime, type), do: [&Type.load_naive_datetime(&1, :second), type]
def loaders(:naive_datetime_usec, type), do: [&Type.load_naive_datetime(&1, :microsecond), type]
def loaders(:date, type), do: [&Type.load_date/1, type]
def loaders(:time, type), do: [&Type.load_time/1, type]
def loaders(:time_usec, type), do: [&Type.load_time_usec/1, type]
def loaders(_primitive, type), do: [type]
@impl Ecto.Adapter.Queryable
def prepare(type, %Ecto.Query{} = query) do
q = Query.from_ecto_query(type, query)
{q.cache, q}
end
@impl Ecto.Adapter.Queryable
def execute(adapter_meta, _query_meta, {:cache, update, prepared}, params, _opts) do
{:ok, result} = mnesia_call(adapter_meta, prepared, params)
update.(prepared)
result
end
def execute(adapter_meta, _query_meta, {:cached, _update, _reset, cached}, params, _opts) do
{:ok, result} = mnesia_call(adapter_meta, cached, params)
result
end
def execute(adapter_meta, _query_meta, {:nocache, prepared}, params, _opts) do
{:ok, result} = mnesia_call(adapter_meta, prepared, params)
result
end
@impl Ecto.Adapter.Queryable
def stream(
_adapter_meta,
_query_meta,
{:nocache, %Mnesia.Query{query: query, answers: answers}},
params,
_opts
) do
case :mnesia.transaction(fn ->
query.(params)
|> answers.(params)
|> Enum.map(&Tuple.to_list(&1))
end) do
{:atomic, result} ->
result
_ ->
[]
end
end
@impl Ecto.Adapter.Schema
def autogenerate(:id), do: nil
def autogenerate(:binary_id), do: Ecto.UUID.generate()
@doc """
Increment autogenerated id and return new value for given source and field
"""
@spec next_id(atom(), atom()) :: integer()
def next_id(table_name, key) do
:mnesia.dirty_update_counter(Connection.id_seq({table_name, key}), 1)
end
@impl Ecto.Adapter.Schema
def insert(adapter_meta, schema_meta, params, on_conflict, returning, _opts) do
source = Source.new(schema_meta)
case tc_tx(fn -> upsert(source, params, on_conflict, adapter_meta) end) do
{time, {:atomic, [record]}} ->
result = Record.select(record, returning, source)
Logger.debug("QUERY OK source=#{inspect(schema_meta.source)} type=insert db=#{time}µs")
{:ok, result}
{time, {:aborted, {:invalid, constraints}}} ->
Logger.debug(
"QUERY ERROR source=#{inspect(schema_meta.source)} type=insert db=#{time}µs #{
inspect(constraints)
}"
)
{:invalid, constraints}
end
end
if Version.compare(@ecto_vsn, "3.6.0") in [:eq, :gt] do
@impl Ecto.Adapter.Schema
def insert_all(adapter_meta, schema, header, records, on_conflict, returning, [], opts),
do: do_insert_all(adapter_meta, schema, header, records, on_conflict, returning, opts)
@impl Ecto.Adapter.Schema
def insert_all(_, _, _, _, _, _, _placeholders, _),
do: raise(ArgumentError, ":placeholders is not supported by mnesia")
else
@impl Ecto.Adapter.Schema
def insert_all(adapter_meta, schema, header, records, on_conflict, returning, opts),
do: do_insert_all(adapter_meta, schema, header, records, on_conflict, returning, opts)
end
@impl Ecto.Adapter.Schema
def update(_adapter_meta, schema_meta, params, filters, returning, _opts) do
source = Connection.source(schema_meta)
{_cache, prepared} = Mnesia.Query.Qlc.query(:all, [], [source])
query = prepared.(filters)
select_fun = fn ->
try do
query.(params) |> Mnesia.Query.Qlc.answers(nil, nil).(params) |> Enum.to_list()
rescue
_ ->
{:atomic, []}
end
end
with {selectTime, {:atomic, [attributes]}} <-
tc_tx(select_fun),
{updateTime, {:atomic, update}} <-
tc_tx(fn ->
try do
do_update(attributes, params, source)
rescue
_ ->
{:atomic, nil}
end
end) do
result = Record.select(update, returning, source)
Logger.debug(
"QUERY OK source=#{inspect(source.table)} type=update db=#{selectTime + updateTime}µs"
)
{:ok, result}
else
{time, {:atomic, []}} ->
Logger.debug(
"QUERY ERROR source=#{inspect(source.table)} type=update db=#{time}µs \"No results\""
)
{:error, :stale}
{time, {:aborted, {:invalid, constraints}}} ->
Logger.debug(
"QUERY ERROR source=#{inspect(source.table)} type=update db=#{time}µs #{
inspect(constraints)
}"
)
{:invalid, constraints}
end
end
@impl Ecto.Adapter.Schema
def delete(_adapter_meta, schema_meta, filters, _opts) do
source = Connection.source(schema_meta)
{_cache, prepared} = Mnesia.Query.Qlc.query(:all, [], [source])
query = prepared.(filters)
select_fun = fn ->
try do
Mnesia.Query.Qlc.answers(nil, nil).(query.([]), [])
|> Enum.map(&Tuple.to_list(&1))
rescue
_ ->
{:atomic, []}
end
end
with {selectTime, {:atomic, [[id | _t]]}} <-
tc_tx(select_fun),
{deleteTime, {:atomic, :ok}} <-
tc_tx(fn ->
:mnesia.delete(source.table, id, :write)
end) do
Logger.debug(
"QUERY OK source=#{inspect(source.table)} type=delete db=#{selectTime + deleteTime}µs"
)
{:ok, []}
else
{time, {:atomic, []}} ->
Logger.debug(
"QUERY ERROR source=#{inspect(source.table)} type=delete db=#{time}µs \"No results\""
)
{:error, :stale}
{time, {:aborted, constraints}} ->
Logger.debug(
"QUERY ERROR source=#{inspect(source.table)} type=delete db=#{time}µs #{
inspect(constraints)
}"
)
{:invalid, constraints}
end
end
@impl Ecto.Adapter.Transaction
def in_transaction?(_adapter_meta), do: :mnesia.is_transaction()
@impl Ecto.Adapter.Transaction
def transaction(_adapter_meta, _options, function) when is_function(function, 0) do
case :mnesia.transaction(function) do
{:atomic, result} -> {:ok, result}
{:aborted, reason} -> {:error, reason}
end
end
@impl Ecto.Adapter.Transaction
def rollback(_adapter_meta, value) do
if :mnesia.is_transaction() do
throw(:mnesia.abort(value))
else
raise "not inside transaction"
end
end
@impl Ecto.Adapter.Storage
defdelegate storage_up(config), to: Storage, as: :up
@impl Ecto.Adapter.Storage
defdelegate storage_down(config), to: Storage, as: :down
@impl Ecto.Adapter.Storage
defdelegate storage_status(config), to: Storage, as: :status
###
### Priv
###
defp do_insert_all(adapter_meta, schema_meta, _header, records, on_conflict, returning, _opts) do
source = Source.new(schema_meta)
case tc_tx(fn ->
Enum.map(records, fn params ->
upsert(source, params, on_conflict, adapter_meta)
end)
end) do
{time, {:atomic, created_records}} ->
result =
Enum.map(created_records, fn [record] ->
record
|> Record.select(returning, source)
|> Enum.map(&elem(&1, 1))
end)
Logger.debug(
"QUERY OK source=#{inspect(schema_meta.source)} type=insert_all db=#{time}µs"
)
{length(result), result}
{time, {:aborted, {:invalid, constraints}}} ->
Logger.debug(
"QUERY ERROR source=#{inspect(schema_meta.source)} type=insert_all db=#{time}µs #{
inspect(constraints)
}"
)
{0, nil}
end
end
defp mnesia_call(
_adapter_meta,
%Mnesia.Query{
type: :all,
sources: sources,
query: query,
sort: sort,
answers: answers
},
params
) do
case tc_tx(fn ->
query.(params)
|> sort.()
|> answers.(params)
|> Enum.map(&Tuple.to_list(&1))
end) do
{time, {:atomic, result}} ->
Logger.debug("QUERY OK sources=#{inspect(sources)} type=all db=#{time}µs")
{:ok, {length(result), result}}
{time, {:aborted, error}} ->
Logger.debug(
"QUERY ERROR sources=#{inspect(sources)} type=all db=#{time}µs #{inspect(error)}"
)
{:ok, {0, []}}
end
end
defp mnesia_call(
_adapter_meta,
%Mnesia.Query{
type: :update_all,
sources: [%Source{} = source | _] = sources,
query: query,
answers: answers,
new_record: new_record
},
params
) do
case tc_tx(fn ->
query.(params)
|> answers.(params)
|> Enum.map(&new_record.(&1, params))
|> Enum.map(fn record ->
with :ok <- :mnesia.write(source.table, record, :write) do
Record.to_schema(record, source)
end
end)
end) do
{time, {:atomic, result}} ->
Logger.debug(
"QUERY OK sources=#{sources |> Enum.map(& &1.table) |> Enum.join(",")} type=update_all db=#{
time
}µs"
)
{:ok, {length(result), result}}
{time, {:aborted, error}} ->
Logger.debug(
"QUERY ERROR sources=#{sources |> Enum.map(& &1.table) |> Enum.join(",")} type=update_all db=#{
time
}µs #{inspect(error)}"
)
{:ok, {0, nil}}
end
end
defp mnesia_call(
_adapter_meta,
%Mnesia.Query{
original: original,
type: :delete_all,
sources: [%Source{} = source | _] = sources,
query: query,
answers: answers
},
params
) do
case tc_tx(fn ->
query.(params)
|> answers.(params)
|> Enum.map(fn tuple ->
# Works only if query selects id at first, see: https://gitlab.com/patatoid/ecto3_mnesia/-/issues/15
id = elem(tuple, 0)
:mnesia.delete(source.table, id, :write)
Tuple.to_list(tuple)
end)
end) do
{time, {:atomic, records}} ->
Logger.debug(
"QUERY OK sources=#{sources |> Enum.map(& &1.table) |> Enum.join(",")} type=delete_all db=#{
time
}µs"
)
result =
case original.select do
nil -> nil
%Ecto.Query.SelectExpr{} -> records
end
{:ok, {length(records), result}}
{time, {:aborted, error}} ->
Logger.debug(
"QUERY ERROR sources=#{sources |> Enum.map(& &1.table) |> Enum.join(",")} type=delete_all db=#{
time
}µs #{inspect(error)}"
)
{:ok, {0, nil}}
end
end
defp upsert(source, params, {:raise, [], []}, adapter_meta) do
case conflict?(params, source, adapter_meta) do
nil ->
do_insert(params, source)
{_rec, constraints} ->
:mnesia.abort({:invalid, constraints})
end
end
defp upsert(source, params, {:nothing, [], []}, adapter_meta) do
case conflict?(params, source, adapter_meta) do
nil ->
do_insert(params, source)
{_rec, _constraints} ->
[Record.new(params, source)]
end
end
defp upsert(source, params, {fields, [], []}, adapter_meta) when is_list(fields) do
all_fields = Source.fields(source)
case all_fields -- fields do
[] ->
# ie replace_all
do_insert(params, source)
_ ->
case conflict?(params, source, adapter_meta) do
nil ->
do_insert(params, source)
{conflict, _constraints} ->
updated = conflict |> Record.new(source) |> Record.update(params, source, fields)
with :ok <- :mnesia.write(source.table, updated, :write), do: [updated]
end
end
end
defp do_insert(params, source) do
params = Record.gen_id(params, source)
with {:constraints, []} <- {:constraints, Constraint.check(source, params)},
record = Record.new(params, source),
:ok <- :mnesia.write(source.table, record, :write) do
[record]
else
{:constraints, constraints} -> :mnesia.abort({:invalid, constraints})
end
end
defp do_update(orig, updates, source) do
updated =
orig
|> Record.new(source)
|> Record.update(updates, source)
params = Record.select(updated, source.attributes, source)
with {:constraints, []} <- {:constraints, Constraint.check(source, params)},
:ok <- :mnesia.write(source.table, updated, :write) do
updated
else
{:constraints, constraints} -> :mnesia.abort({:invalid, constraints})
end
end
defp conflict?(params, source, %{repo: repo}) do
source
|> Source.uniques(params)
|> case do
[] ->
nil
uniques ->
source.schema
|> from()
|> where(^uniques)
|> repo.one()
|> case do
nil ->
nil
conflict ->
constraints =
uniques
|> Enum.reduce([], fn {key, value}, acc ->
case Map.get(conflict, key) do
^value -> [{:unique, "#{source.table}_#{key}_index"} | acc]
_ -> acc
end
end)
{conflict, constraints}
end
end
end
defp tc_tx(fun), do: :timer.tc(:mnesia, :transaction, [fun])
end