src/cets_status.erl

%% @doc Status of all tables.
-module(cets_status).
-export([status/1]).

%% RPC callbacks
-export([get_local_table_to_other_nodes_map_from_disco/1]).
%% Export for debugging/testing
-export([gather_data/1, format_data/1]).

-ignore_xref([
    status/1, get_local_table_to_other_nodes_map_from_disco/1, gather_data/1, format_data/1
]).

-include_lib("kernel/include/logger.hrl").

-type tab_nodes_map() :: #{Table :: atom() => Nodes :: ordsets:ordset(node())}.
%% Mapping from a table name to a node list.

-type node_to_tab_nodes_map() :: #{node() => tab_nodes_map()}.
%% Mapping from a node name to `tab_nodes_map'.

-type table_name() :: cets:table_name().
%% Table name (atom).

-type disco_name() :: atom().
%% Discovery process name (atom).

-type info() :: #{
    %% Nodes that are connected to us and have the CETS disco process started.
    available_nodes := [node()],
    %% Nodes that do not respond to our pings.
    unavailable_nodes := [node()],
    %% Nodes that has our local tables running (but could also have some unknown tables).
    %% All joined nodes replicate data between each other.
    joined_nodes := [node()],
    %% Nodes that are extracted from the discovery backend.
    discovered_nodes := [node()],
    %% True, if discovery backend returned the list of nodes the last time we've tried
    %% to call it.
    discovery_works := boolean(),
    %% Nodes with stopped disco
    remote_nodes_without_disco := [node()],
    %% Nodes that have more tables registered than the local node.
    remote_nodes_with_unknown_tables := [node()],
    remote_unknown_tables := [table_name()],
    %% Nodes that are available, but do not host one of our local tables.
    remote_nodes_with_missing_tables => [node()],
    remote_missing_tables := [table_name()],
    %% Nodes that replicate at least one of our local tables to a different list of nodes
    %% (could temporary happen during a netsplit)
    conflict_nodes := [node()],
    conflict_tables := [table_name()]
}.
%% Status information.

-type status_data() :: #{
    this_node := node(),
    online_nodes := [node()],
    system_info := cets_discovery:system_info(),
    available_nodes := [node()],
    local_table_to_other_nodes_map := tab_nodes_map(),
    node_to_tab_nodes_map := node_to_tab_nodes_map()
}.
%% Intermediate status information.

%% @doc Collects and analyses status information.
-spec status(disco_name()) -> info().
status(Disco) when is_atom(Disco) ->
    %% Gathering and formatting data is separated to simplify testing/debugging
    Data = gather_data(Disco),
    format_data(Data).

%% @doc Collects status information.
-spec gather_data(disco_name()) -> status_data().
gather_data(Disco) ->
    ThisNode = node(),
    Info = cets_discovery:system_info(Disco),
    #{tables := Tables} = Info,
    OnlineNodes = lists:sort([ThisNode | nodes()]),
    AvailNodes = available_nodes(Disco, OnlineNodes),
    Expected = get_local_table_to_other_nodes_map(Tables),
    OtherTabNodes = get_node_to_tab_nodes_map(AvailNodes, Disco),
    #{
        this_node => ThisNode,
        online_nodes => OnlineNodes,
        system_info => Info,
        available_nodes => AvailNodes,
        local_table_to_other_nodes_map => Expected,
        node_to_tab_nodes_map => OtherTabNodes
    }.

%% @doc Formats gathered data.
%%
%% Contains pure functions logic.
-spec format_data(status_data()) -> info().
format_data(#{
    this_node := ThisNode,
    online_nodes := OnlineNodes,
    system_info := Info,
    available_nodes := AvailNodes,
    local_table_to_other_nodes_map := Expected,
    node_to_tab_nodes_map := OtherTabNodes
}) ->
    %% The node lists could not match for different nodes
    %% because they are updated periodically
    #{unavailable_nodes := UnNodes, nodes := DiscoNodes, tables := Tables} = Info,
    DiscoNodesSorted = lists:sort(DiscoNodes),
    NoDiscoNodes = remote_nodes_without_disco(DiscoNodesSorted, AvailNodes, OnlineNodes),
    JoinedNodes = joined_nodes(ThisNode, Expected, OtherTabNodes),
    AllTables = all_tables(Expected, OtherTabNodes),
    {UnknownTables, NodesWithUnknownTables} = unknown_tables(OtherTabNodes, Tables, AllTables),
    {MissingTables, NodesWithMissingTables} = missing_tables(OtherTabNodes, Tables),
    {ConflictTables, ConflictNodes} = conflict_tables(Expected, OtherTabNodes),
    #{
        available_nodes => AvailNodes,
        unavailable_nodes => UnNodes,
        joined_nodes => JoinedNodes,
        discovered_nodes => DiscoNodesSorted,
        discovery_works => discovery_works(Info),
        remote_nodes_without_disco => NoDiscoNodes,
        remote_unknown_tables => UnknownTables,
        remote_missing_tables => MissingTables,
        remote_nodes_with_unknown_tables => NodesWithUnknownTables,
        remote_nodes_with_missing_tables => NodesWithMissingTables,
        conflict_nodes => ConflictNodes,
        conflict_tables => ConflictTables
    }.

%% Nodes, that host the discovery process
-spec available_nodes(disco_name(), [node(), ...]) -> [node()].
available_nodes(Disco, OnlineNodes) ->
    Results = erpc:multicall(OnlineNodes, erlang, whereis, [Disco], infinity),
    [Node || {Node, {ok, Pid}} <- lists:zip(OnlineNodes, Results), is_pid(Pid)].

remote_nodes_without_disco(DiscoNodes, AvailNodes, OnlineNodes) ->
    lists:filter(fun(Node) -> is_node_without_disco(Node, AvailNodes, OnlineNodes) end, DiscoNodes).

is_node_without_disco(Node, AvailNodes, OnlineNodes) ->
    lists:member(Node, OnlineNodes) andalso not lists:member(Node, AvailNodes).

-spec get_node_to_tab_nodes_map(AvailNodes, Disco) -> OtherTabNodes when
    AvailNodes :: [node()],
    Disco :: disco_name(),
    OtherTabNodes :: node_to_tab_nodes_map().
get_node_to_tab_nodes_map(AvailNodes, Disco) ->
    OtherNodes = lists:delete(node(), AvailNodes),
    F = get_local_table_to_other_nodes_map_from_disco,
    Results = erpc:multicall(OtherNodes, ?MODULE, F, [Disco], infinity),
    OtherTabNodes = [{Node, Result} || {Node, {ok, Result}} <- lists:zip(OtherNodes, Results)],
    maps:from_list(OtherTabNodes).

%% Nodes that has our local tables running (but could also have some unknown tables).
%% All joined nodes replicate data between each other.
-spec joined_nodes(node(), tab_nodes_map(), node_to_tab_nodes_map()) -> [node()].
joined_nodes(ThisNode, Expected, OtherTabNodes) ->
    ExpectedTables = maps_keys_sorted(Expected),
    OtherJoined = maps:fold(
        fun(Node, TabNodes, Acc) ->
            case maps:with(ExpectedTables, TabNodes) =:= Expected of
                true -> [Node | Acc];
                false -> Acc
            end
        end,
        [],
        OtherTabNodes
    ),
    lists:sort([ThisNode | OtherJoined]).

unknown_tables(OtherTabNodes, Tables, AllTables) ->
    UnknownTables = ordsets:subtract(AllTables, Tables),
    NodesWithUnknownTables =
        maps:fold(
            fun(Node, TabNodes, Acc) ->
                case tabnodes_has_any_of(TabNodes, UnknownTables) of
                    true -> [Node | Acc];
                    false -> Acc
                end
            end,
            [],
            OtherTabNodes
        ),
    {UnknownTables, NodesWithUnknownTables}.

-spec missing_tables(node_to_tab_nodes_map(), [table_name()]) -> {[table_name()], [node()]}.
missing_tables(OtherTabNodes, LocalTables) ->
    Zip = maps:fold(
        fun(Node, TabNodes, Acc) ->
            RemoteTables = maps_keys_sorted(TabNodes),
            MissingTables = ordsets:subtract(LocalTables, RemoteTables),
            case MissingTables of
                [] -> Acc;
                [_ | _] -> [{MissingTables, Node} | Acc]
            end
        end,
        [],
        OtherTabNodes
    ),
    {MissingTables, NodesWithMissingTables} = lists:unzip(Zip),
    {lists:usort(lists:append(MissingTables)), NodesWithMissingTables}.

-spec tabnodes_has_any_of([table_name()], [table_name()]) -> boolean().
tabnodes_has_any_of(TabNodes, UnknownTables) ->
    lists:any(fun(Tab) -> maps:is_key(Tab, TabNodes) end, UnknownTables).

%% Nodes that replicate at least one of our local tables to a different list of nodes
%% (could temporary happen during a netsplit)
-spec conflict_tables(tab_nodes_map(), node_to_tab_nodes_map()) -> {[table_name()], [node()]}.
conflict_tables(Expected, OtherTabNodes) ->
    F = fun(Node, NodeTabs, Acc) ->
        FF = fun(Table, OtherNodes, Acc2) ->
            case maps:get(Table, Expected, undefined) of
                Nodes when Nodes =:= OtherNodes ->
                    Acc2;
                undefined ->
                    Acc2;
                _ ->
                    [{Table, Node} | Acc2]
            end
        end,
        maps:fold(FF, Acc, NodeTabs)
    end,
    TabNodes = maps:fold(F, [], OtherTabNodes),
    {ConflictTables, ConflictNodes} = lists:unzip(TabNodes),
    {lists:usort(ConflictTables), lists:usort(ConflictNodes)}.

-spec all_tables(tab_nodes_map(), node_to_tab_nodes_map()) -> [table_name()].
all_tables(Expected, OtherTabNodes) ->
    TableNodesVariants = [Expected | maps:values(OtherTabNodes)],
    TableVariants = lists:map(fun maps_keys_sorted/1, TableNodesVariants),
    ordsets:union(TableVariants).

%% Returns nodes for each table hosted on node()
-spec get_local_table_to_other_nodes_map_from_disco(disco_name()) -> tab_nodes_map().
get_local_table_to_other_nodes_map_from_disco(Disco) ->
    {ok, Tables} = cets_discovery:get_tables(Disco),
    get_local_table_to_other_nodes_map(Tables).

%% Returns nodes for each table in the Tables list
-spec get_local_table_to_other_nodes_map([table_name()]) -> tab_nodes_map().
get_local_table_to_other_nodes_map(Tables) ->
    RequestIds = lists:map(fun cets:get_nodes_request/1, Tables),
    Responses = cets:wait_responses(RequestIds, infinity),
    Zip = lists:zip(Tables, Responses),
    [
        begin
            ?LOG_ERROR(#{what => get_nodes_request_failed, table => Table, reason => Reason}),
            ok
        end
     || {Table, {error, Reason}} <- Zip
    ],
    %% Skip bad replies
    maps:from_list([{Table, Nodes} || {Table, {reply, Nodes}} <- Zip, is_list(Nodes)]).

-spec discovery_works(cets_discovery:system_info()) -> boolean().
discovery_works(#{last_get_nodes_result := {ok, _}}) ->
    true;
discovery_works(_) ->
    false.

maps_keys_sorted(Map) ->
    lists:sort(maps:keys(Map)).