Skip to main content

src/graffeo_mnesia.erl

-module(graffeo_mnesia).
-moduledoc """
Mnesia-backed, transactional / replicated handle backend.

The fourth graffeo backend. Its reason to exist is **atomic
multi-mutation transactions** and **multi-node replicated graphs**
— not raw speed (every Mnesia write costs an ETS write plus
overhead; prefer `graffeo_ets` for non-persistent, non-transactional
use — PF-12).

Each graph is a set of three named Mnesia tables (vtab/etab/ntab)
within the node-global Mnesia instance. Reads are dirty by default
(fast, lock-free); inside a `transaction/1` they use locked reads
for a consistent snapshot.

Distributed Mnesia is available via the `nodes`/`majority` options
in `open/2`; its partition behaviour is Mnesia's own — use knowingly
(DIST-14).

Constructive ops (`subgraph`, `condensation`, `filter_edges`,
`contract`) are not supported over Mnesia — use `graffeo:copy/2`
to materialise an in-memory result into a Mnesia graph.
""".

-behaviour(graffeo_backend).
-behaviour(graffeo_builder).

-export([
    new/0,
    open/1, open/2,
    close/1,
    delete/1,
    transaction/1
]).

-export([
    add_vertex/2, add_vertex/3,
    add_edge/3, add_edge/4,
    del_vertex/2,
    del_vertices/2,
    del_edge/3,
    del_edges/2
]).

-export([
    vertices/1,
    out_neighbours/2,
    in_neighbours/2,
    in_degree/2,
    out_degree/2,
    no_edges/1,
    no_vertices/1,
    edge_meta/3,
    vertex_label/2
]).

-export([
    empty_like/1,
    build_add_vertex/2, build_add_vertex/3,
    build_add_edge/4
]).

-record(graffeo_vtab, {v, label}).
-record(graffeo_etab, {pair, meta}).
-record(graffeo_ntab, {key, target}).

-record(mnesia_ref, {
    vtab :: atom(),
    etab :: atom(),
    ntab :: atom()
}).

-doc "Opaque reference to the Mnesia-backed graph tables.".
-opaque ref() :: #mnesia_ref{}.
-export_type([ref/0]).

%%% === Lifecycle ===

-doc "Create a new ephemeral Mnesia graph (ram_copies).".
-spec new() -> graffeo:graph().
new() ->
    ensure_mnesia(),
    Name = "tmp_" ++ integer_to_list(erlang:unique_integer([positive])),
    create_tables(Name, #{storage => ram_copies}).

-doc "Open or create a persistent named Mnesia graph (disc_copies).".
-spec open(string() | binary()) -> graffeo:graph().
open(Name) ->
    open(Name, #{}).

-doc """
Open or create a named Mnesia graph with options.

Options:
- `storage`: `ram_copies` | `disc_copies` | `disc_only_copies`
  (default `disc_copies`).
- `nodes`: list of nodes for replication (default `[node()]`).
- `majority`: boolean (default `false`).
""".
-spec open(string() | binary(), map()) -> graffeo:graph().
open(Name, Opts) when is_binary(Name) ->
    open(binary_to_list(Name), Opts);
open(Name, Opts) when is_list(Name) ->
    ensure_mnesia(),
    create_tables(Name, Opts).

-doc "Release the graph handle (Mnesia keeps disc tables).".
-spec close(graffeo:graph()) -> ok.
close(G) ->
    _ = require_handle(close, G),
    ok.

-doc "Delete the graph's Mnesia tables.".
-spec delete(graffeo:graph()) -> ok.
delete(G) ->
    Ref = require_handle(delete, G),
    {atomic, ok} = mnesia:delete_table(Ref#mnesia_ref.vtab),
    {atomic, ok} = mnesia:delete_table(Ref#mnesia_ref.etab),
    {atomic, ok} = mnesia:delete_table(Ref#mnesia_ref.ntab),
    ok.

-doc """
Run a function inside a Mnesia transaction.

Read and write operations on the graph automatically dispatch to
locked Mnesia operations when called inside a transaction, giving
atomic mutations and consistent-snapshot reads.
""".
-spec transaction(fun(() -> T)) -> {atomic, T} | {aborted, term()}.
transaction(Fun) ->
    mnesia:transaction(Fun).

%%% === Mutation ===

-doc "Add a vertex with the default label.".
-spec add_vertex(graffeo:graph(), graffeo:vertex()) -> ok.
add_vertex(G, V) ->
    add_vertex(G, V, undefined).

-doc "Add a vertex with a label.".
-spec add_vertex(graffeo:graph(), graffeo:vertex(), graffeo:label()) -> ok.
add_vertex(G, V, Label) ->
    Ref = require_handle(add_vertex, G),
    write(Ref#mnesia_ref.vtab, #graffeo_vtab{v = V, label = Label}),
    ok.

-doc "Add an edge with default metadata.".
-spec add_edge(graffeo:graph(), graffeo:vertex(), graffeo:vertex()) -> ok.
add_edge(G, From, To) ->
    add_edge(G, From, To, #{weight => 1}).

-doc "Add an edge with metadata (last-writer-wins).".
-spec add_edge(
    graffeo:graph(),
    graffeo:vertex(),
    graffeo:vertex(),
    graffeo:edge_meta()
) -> ok.
add_edge(G, From, To, Meta) ->
    Ref = require_handle(add_edge, G),
    ensure_vertex_raw(Ref, From),
    ensure_vertex_raw(Ref, To),
    case read(Ref#mnesia_ref.etab, {From, To}) of
        [] ->
            write(
                Ref#mnesia_ref.ntab,
                #graffeo_ntab{key = {out, From}, target = To}
            ),
            write(
                Ref#mnesia_ref.ntab,
                #graffeo_ntab{key = {in, To}, target = From}
            );
        _ ->
            ok
    end,
    write(
        Ref#mnesia_ref.etab,
        #graffeo_etab{pair = {From, To}, meta = Meta}
    ),
    ok.

-doc "Remove vertex `V` and all its incident edges.".
-spec del_vertex(graffeo:graph(), graffeo:vertex()) -> ok.
del_vertex(G, V) ->
    Ref = require_handle(del_vertex, G),
    OutNbrs = out_neighbours_raw(Ref, V),
    InNbrs = in_neighbours_raw(Ref, V),
    lists:foreach(fun(To) -> del_edge_raw(Ref, V, To) end, OutNbrs),
    lists:foreach(fun(From) -> del_edge_raw(Ref, From, V) end, InNbrs),
    delete_record(Ref#mnesia_ref.vtab, V),
    ok.

-doc "Delete a list of vertices and their incident edges.".
-spec del_vertices(graffeo:graph(), [graffeo:vertex()]) -> ok.
del_vertices(G, Vs) ->
    lists:foreach(fun(V) -> del_vertex(G, V) end, Vs),
    ok.

-doc "Remove the `From→To` edge.".
-spec del_edge(graffeo:graph(), graffeo:vertex(), graffeo:vertex()) -> ok.
del_edge(G, From, To) ->
    Ref = require_handle(del_edge, G),
    del_edge_raw(Ref, From, To),
    ok.

-doc "Delete a list of `{From, To}` edges.".
-spec del_edges(graffeo:graph(), [{graffeo:vertex(), graffeo:vertex()}]) -> ok.
del_edges(G, Pairs) ->
    Ref = require_handle(del_edges, G),
    lists:foreach(fun({From, To}) -> del_edge_raw(Ref, From, To) end, Pairs),
    ok.

%%% === graffeo_backend callbacks ===

-doc "All vertices in the graph.".
-spec vertices(ref()) -> [graffeo:vertex()].
vertices(Ref) ->
    Keys = dirty_all_keys(Ref#mnesia_ref.vtab),
    Keys.

-doc "Outgoing neighbours of `V`.".
-spec out_neighbours(ref(), graffeo:vertex()) -> [graffeo:vertex()].
out_neighbours(Ref, V) ->
    out_neighbours_raw(Ref, V).

-doc "Incoming neighbours of `V`.".
-spec in_neighbours(ref(), graffeo:vertex()) -> [graffeo:vertex()].
in_neighbours(Ref, V) ->
    in_neighbours_raw(Ref, V).

-doc "Number of incoming edges to `V`.".
-spec in_degree(ref(), graffeo:vertex()) -> non_neg_integer().
in_degree(Ref, V) ->
    length(in_neighbours_raw(Ref, V)).

-doc "Number of outgoing edges from `V`.".
-spec out_degree(ref(), graffeo:vertex()) -> non_neg_integer().
out_degree(Ref, V) ->
    length(out_neighbours_raw(Ref, V)).

-doc "Total number of edges.".
-spec no_edges(ref()) -> non_neg_integer().
no_edges(Ref) ->
    table_size(Ref#mnesia_ref.etab).

-doc "Total number of vertices.".
-spec no_vertices(ref()) -> non_neg_integer().
no_vertices(Ref) ->
    table_size(Ref#mnesia_ref.vtab).

-doc "Get edge metadata.".
-spec edge_meta(ref(), graffeo:vertex(), graffeo:vertex()) ->
    {ok, graffeo:edge_meta()} | error.
edge_meta(Ref, From, To) ->
    case read(Ref#mnesia_ref.etab, {From, To}) of
        [#graffeo_etab{meta = Meta}] -> {ok, Meta};
        [] -> error
    end.

-doc "Get the label of a vertex.".
-spec vertex_label(ref(), graffeo:vertex()) ->
    {ok, graffeo:label()} | error.
vertex_label(Ref, V) ->
    case read(Ref#mnesia_ref.vtab, V) of
        [#graffeo_vtab{label = Label}] -> {ok, Label};
        [] -> error
    end.

%%% === graffeo_builder callbacks ===

-doc false.
-spec empty_like(graffeo:graph()) -> no_return().
empty_like(_G) ->
    erlang:error({unsupported_on_backend, empty_like, ?MODULE}).

-doc false.
-spec build_add_vertex(graffeo:graph(), graffeo:vertex()) -> graffeo:graph().
build_add_vertex(G, V) ->
    ok = add_vertex(G, V),
    G.

-doc false.
-spec build_add_vertex(graffeo:graph(), graffeo:vertex(), graffeo:label()) ->
    graffeo:graph().
build_add_vertex(G, V, Label) ->
    ok = add_vertex(G, V, Label),
    G.

-doc false.
-spec build_add_edge(
    graffeo:graph(),
    graffeo:vertex(),
    graffeo:vertex(),
    graffeo:edge_meta()
) -> graffeo:graph().
build_add_edge(G, From, To, Meta) ->
    ok = add_edge(G, From, To, Meta),
    G.

%%% === Internal: bootstrap ===

-spec ensure_mnesia() -> ok.
ensure_mnesia() ->
    case mnesia:system_info(is_running) of
        yes ->
            ok;
        _ ->
            MnesiaDir = filename:join(graffeo_config:data_dir(), "mnesia"),
            ok = filelib:ensure_dir(filename:join(MnesiaDir, "probe")),
            application:set_env(mnesia, dir, MnesiaDir),
            case mnesia:create_schema([node()]) of
                ok -> ok;
                {error, {_, {already_exists, _}}} -> ok
            end,
            ok = mnesia:start()
    end.

-spec create_tables(string(), map()) -> graffeo:graph().
create_tables(Name, Opts) ->
    Storage = maps:get(storage, Opts, disc_copies),
    Nodes = maps:get(nodes, Opts, [node()]),
    Majority = maps:get(majority, Opts, false),
    VTab = table_name(Name, "vtab"),
    ETab = table_name(Name, "etab"),
    NTab = table_name(Name, "ntab"),
    ensure_table(
        VTab,
        set,
        graffeo_vtab,
        record_info(fields, graffeo_vtab),
        Storage,
        Nodes,
        Majority
    ),
    ensure_table(
        ETab,
        set,
        graffeo_etab,
        record_info(fields, graffeo_etab),
        Storage,
        Nodes,
        Majority
    ),
    ensure_table(
        NTab,
        bag,
        graffeo_ntab,
        record_info(fields, graffeo_ntab),
        Storage,
        Nodes,
        Majority
    ),
    ok = mnesia:wait_for_tables([VTab, ETab, NTab], 5000),
    Ref = #mnesia_ref{vtab = VTab, etab = ETab, ntab = NTab},
    graffeo:wrap_ref(?MODULE, Ref).

-spec ensure_table(
    atom(),
    atom(),
    atom(),
    [atom()],
    atom(),
    [node()],
    boolean()
) -> ok.
ensure_table(Tab, Type, RecName, Fields, Storage, Nodes, Majority) ->
    case
        mnesia:create_table(Tab, [
            {type, Type},
            {record_name, RecName},
            {attributes, Fields},
            {Storage, Nodes},
            {majority, Majority}
        ])
    of
        {atomic, ok} -> ok;
        {aborted, {already_exists, Tab}} -> ok
    end.

-dialyzer({no_underspecs, table_name/2}).
-spec table_name(string(), nonempty_string()) -> atom().
table_name(Name, Suffix) ->
    list_to_atom("graffeo_" ++ Name ++ "_" ++ Suffix).

%%% === Internal: dirty/txn dispatch ===

-type record() :: #graffeo_vtab{} | #graffeo_etab{} | #graffeo_ntab{}.
-type ntab_record() :: #graffeo_ntab{}.

-spec write(atom(), record()) -> ok.
write(Tab, Record) ->
    case mnesia:is_transaction() of
        true -> mnesia:write(Tab, Record, write);
        false -> mnesia:dirty_write(Tab, Record)
    end.

-spec read(atom(), term()) -> [tuple()].
read(Tab, Key) ->
    case mnesia:is_transaction() of
        true -> mnesia:read(Tab, Key);
        false -> mnesia:dirty_read(Tab, Key)
    end.

-spec delete_record(atom(), term()) -> ok.
delete_record(Tab, Key) ->
    case mnesia:is_transaction() of
        true -> mnesia:delete(Tab, Key, write);
        false -> mnesia:dirty_delete(Tab, Key)
    end.

-spec delete_object(atom(), ntab_record()) -> ok.
delete_object(Tab, Record) ->
    case mnesia:is_transaction() of
        true -> mnesia:delete_object(Tab, Record, write);
        false -> mnesia:dirty_delete_object(Tab, Record)
    end.

-spec dirty_all_keys(atom()) -> [term()].
dirty_all_keys(Tab) ->
    mnesia:dirty_all_keys(Tab).

-spec table_size(atom()) -> non_neg_integer().
table_size(Tab) ->
    mnesia:table_info(Tab, size).

%%% === Internal: helpers ===

-spec require_handle(atom(), graffeo:graph()) -> ref().
require_handle(Op, G) ->
    case graffeo:backend(G) of
        ?MODULE -> graffeo:extract_ref(?MODULE, G);
        Other -> erlang:error({handle_only, Op, Other})
    end.

-spec ensure_vertex_raw(ref(), graffeo:vertex()) -> ok.
ensure_vertex_raw(Ref, V) ->
    case read(Ref#mnesia_ref.vtab, V) of
        [] ->
            write(
                Ref#mnesia_ref.vtab,
                #graffeo_vtab{v = V, label = undefined}
            );
        _ ->
            ok
    end.

-spec out_neighbours_raw(ref(), graffeo:vertex()) -> [graffeo:vertex()].
out_neighbours_raw(Ref, V) ->
    Recs = read(Ref#mnesia_ref.ntab, {out, V}),
    [T || #graffeo_ntab{target = T} <- Recs].

-spec in_neighbours_raw(ref(), graffeo:vertex()) -> [graffeo:vertex()].
in_neighbours_raw(Ref, V) ->
    Recs = read(Ref#mnesia_ref.ntab, {in, V}),
    [T || #graffeo_ntab{target = T} <- Recs].

-spec del_edge_raw(ref(), graffeo:vertex(), graffeo:vertex()) -> ok.
del_edge_raw(Ref, From, To) ->
    delete_record(Ref#mnesia_ref.etab, {From, To}),
    delete_object(
        Ref#mnesia_ref.ntab,
        #graffeo_ntab{key = {out, From}, target = To}
    ),
    delete_object(
        Ref#mnesia_ref.ntab,
        #graffeo_ntab{key = {in, To}, target = From}
    ),
    ok.