src/partisan_orchestration_backend.erl

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2018 Christopher S. Meiklejohn.  All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

-module(partisan_orchestration_backend).
-author("Christopher Meiklejohn <christopher.meiklejohn@gmail.com>").

-behaviour(gen_server).

-include("partisan.hrl").
-include("partisan_logger.hrl").

-define(TIMEOUT, infinity).

%% API
-export([start_link/0,
         start_link/1,
         graph/0,
         tree/0,
         orchestration/0,
         orchestrated/0,
         was_connected/0,
         servers/0,
         nodes/0]).

%% gen_server callbacks
-export([init/1,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         terminate/2,
         code_change/3]).

%% debug functions

-define(REFRESH_INTERVAL, 1000).
-define(REFRESH_MESSAGE,  refresh).

-define(BUILD_GRAPH_INTERVAL, 5000).
-define(BUILD_GRAPH_MESSAGE,  build_graph).

-define(ARTIFACT_INTERVAL, 1000).
-define(ARTIFACT_MESSAGE,  artifact).

-callback(clients(term()) -> term()).
-callback(servers(term()) -> term()).
-callback(download_artifact(term(), node()) -> term()).
-callback(upload_artifact(term(), node(), term()) -> term()).


-eqwalizer({nowarn_function, breadth_first/3}).



%% =============================================================================
%% API
%% =============================================================================



%% @doc Same as start_link([]).
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
    start_link([]).

%% @doc Start and link to calling process.
-spec start_link(list())-> {ok, pid()} | ignore | {error, term()}.
start_link(Opts) ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, []).

graph() ->
    gen_server:call(?MODULE, graph, ?TIMEOUT).

tree() ->
    gen_server:call(?MODULE, tree, ?TIMEOUT).

was_connected() ->
    gen_server:call(?MODULE, was_connected, ?TIMEOUT).

orchestration() ->
    gen_server:call(?MODULE, orchestration, ?TIMEOUT).

orchestrated() ->
    gen_server:call(?MODULE, orchestrated, ?TIMEOUT).

-spec servers() -> {ok, [node()]}.
servers() ->
    gen_server:call(?MODULE, servers, ?TIMEOUT).

-spec nodes() -> {ok, [node()]}.
nodes() ->
    gen_server:call(?MODULE, nodes, ?TIMEOUT).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

%% @private
-spec init([]) -> {ok, #orchestration_strategy_state{}}.
init([]) ->
    OrchestrationStrategy = partisan_config:get(orchestration_strategy, ?DEFAULT_ORCHESTRATION_STRATEGY),
    PeerService = ?PEER_SERVICE_MANAGER,

    case OrchestrationStrategy of
        undefined ->
            ?LOG_INFO(#{description => "Not using container orchestration; disabling."}),
            ok;
        OrchestrationStrategy ->
            ?LOG_INFO("OrchestrationStrategy: ~p", [OrchestrationStrategy]),

            %% Only construct the graph and attempt to repair the graph
            %% from the designated server node.
            case partisan_config:get(tag, client) of
                server ->
                    ?LOG_INFO(#{description => "Tag is server."}),
                    schedule_build_graph();
                client ->
                    ?LOG_INFO(#{description => "Tag is client."}),
                    ok;
                undefined ->
                    ?LOG_INFO(#{description => "Tag is undefined."}),
                    ok
            end,

            %% All nodes should upload artifacts.
            schedule_artifact_upload(),

            %% All nodes should attempt to refresh the membership.
            schedule_membership_refresh()
    end,

    Servers = case OrchestrationStrategy of
        undefined ->
            %% TODO: What am I?
            case partisan_config:get(lasp_server, undefined) of
                undefined ->
                    [];
                Server ->
                    [Server]
            end;
        _ ->
            []
    end,

    Nodes = case OrchestrationStrategy of
        undefined ->
            members_for_orchestration();
        _ ->
            []
    end,

    Eredis = case OrchestrationStrategy of
        partisan_kubernetes_orchestration_strategy ->
            RedisHost = os:getenv("REDIS_SERVICE_HOST", "127.0.0.1"),
            RedisPort = os:getenv("REDIS_SERVICE_PORT", "6379"),
            {ok, C} = eredis:start_link(RedisHost, list_to_integer(RedisPort)),
            C;
        partisan_compose_orchestration_strategy ->
            RedisHost = os:getenv("REDIS_SERVICE_HOST", "127.0.0.1"),
            RedisPort = os:getenv("REDIS_SERVICE_PORT", "6379"),
            {ok, C} = eredis:start_link(RedisHost, list_to_integer(RedisPort)),
            C;
        _ ->
            undefined
    end,

    {ok, #orchestration_strategy_state{
                eredis=Eredis,
                nodes=Nodes,
                peer_service=PeerService,
                servers=Servers,
                is_connected=false,
                was_connected=false,
                orchestration_strategy=OrchestrationStrategy,
                attempted_nodes=sets:new(),
                graph=digraph:new(),
                tree=digraph:new()}}.

%% @private
-spec handle_call(term(), {pid(), term()}, #orchestration_strategy_state{}) ->
    {reply, term(), #orchestration_strategy_state{}}.

handle_call(nodes, _From, #orchestration_strategy_state{nodes=Nodes}=State) ->
    {reply, {ok, Nodes}, State};

handle_call(servers, _From, #orchestration_strategy_state{servers=Servers}=State) ->
    {reply, {ok, Servers}, State};

handle_call(orchestration, _From, #orchestration_strategy_state{orchestration_strategy=OrchestrationStrategy}=State) ->
    Result = case OrchestrationStrategy of
        undefined ->
            false;
        partisan_kubernetes_orchestration_strategy ->
            kubernetes
    end,
    {reply, {ok, Result}, State};

handle_call(orchestrated, _From, #orchestration_strategy_state{orchestration_strategy=OrchestrationStrategy}=State) ->
    Result = case OrchestrationStrategy of
        undefined ->
            false;
        _ ->
            true
    end,
    {reply, Result, State};

handle_call(was_connected, _From, #orchestration_strategy_state{was_connected=WasConnected}=State) ->
    {reply, {ok, WasConnected}, State};

handle_call(graph, _From, #orchestration_strategy_state{graph=Graph}=State) ->
    {Vertices, Edges} = vertices_and_edges(Graph),
    {reply, {ok, {Vertices, Edges}}, State};

handle_call(tree, _From, #orchestration_strategy_state{tree=Tree}=State) ->
    {Vertices, Edges} = vertices_and_edges(Tree),
    {reply, {ok, {Vertices, Edges}}, State};

handle_call(Event, _From, State) ->
    ?LOG_WARNING(#{description => "Unhandled call event", event => Event}),
    {reply, ok, State}.

%% @private
-spec handle_cast(term(), #orchestration_strategy_state{}) ->
{noreply, #orchestration_strategy_state{}}.

handle_cast(Event, State) ->
    ?LOG_WARNING(#{description => "Unhandled cast event", event => Event}),    {noreply, State}.

%% @private
-spec handle_info(term(), #orchestration_strategy_state{}) -> {noreply, #orchestration_strategy_state{}}.
handle_info(?REFRESH_MESSAGE, #orchestration_strategy_state{orchestration_strategy=OrchestrationStrategy,
                                     peer_service=PeerService,
                                     attempted_nodes=SeenNodes}=State) ->
    Tag = partisan_config:get(tag, client),
    PeerServiceManager = ?PEER_SERVICE_MANAGER,

    Servers = OrchestrationStrategy:servers(State),

    Clients = OrchestrationStrategy:clients(State),

    %% Get list of nodes to connect to: this specialized logic isn't
    %% required when the node count is small, but is required with a
    %% larger node count to ensure the network stabilizes correctly
    %% because HyParView doesn't guarantee graph connectivity: it is
    %% only probabilistic.
    %%
    ToConnectNodes = case {Tag, PeerServiceManager} of
        {_, partisan_pluggable_peer_service_manager} ->
            %% By default, full connectivity; but,
            %% connect all nodes to all other nodes for now.
            sets:union(Servers, Clients);
        {client, partisan_client_server_peer_service_manager} ->
            %% If we're a client, and we're in client/server mode, then
            %% always connect with the server.
            Servers;
        {server, partisan_client_server_peer_service_manager} ->
            %% If we're a server, and we're in client/server mode, then
            %% always initiate connections with clients.
            Clients;
        {client, partisan_hyparview_peer_service_manager} ->
            %% If we're the server, and we're in HyParView, clients will
            %% ask the server to join the overlay and force outbound
            %% connections to the clients.
            Servers;
        {server, partisan_hyparview_peer_service_manager} ->
            %% If we're in HyParView, and we're a client, only ever
            %% do nothing -- force all connection to go through the
            %% server.
            sets:new();
        {Tag, PeerServiceManager} ->
            %% Catch all.
            ?LOG_INFO(#{description => "Invalid mode: not connecting to any nodes."}),
            ?LOG_INFO("Tag: ~p; PeerServiceManager: ~p",
                       [Tag, PeerServiceManager]),
            sets:new()
    end,

    %% Attempt to connect nodes that are not connected.
    AttemptedNodes = maybe_connect(PeerService, ToConnectNodes, SeenNodes),

    ServerNames = node_names(sets:to_list(Servers)),
    ClientNames = node_names(sets:to_list(Clients)),
    Nodes = ServerNames ++ ClientNames,

    schedule_membership_refresh(),

    {noreply, State#orchestration_strategy_state
                          {nodes=Nodes,
                          servers=ServerNames,
                          attempted_nodes=AttemptedNodes}};

handle_info(?ARTIFACT_MESSAGE, State) ->
    %% Get current membership.
    Nodes = members_for_orchestration(),

    %% Store membership.
    Node = prefix(atom_to_list(partisan:node())),
    Payload = term_to_binary({partisan:node_spec(), Nodes}),

    ?LOG_TRACE("Uploading membership for node ~p: ~p", [Node, Nodes]),

    upload_artifact(State, Node, Payload),

    schedule_artifact_upload(),

    {noreply, State};
handle_info(?BUILD_GRAPH_MESSAGE, #orchestration_strategy_state{
                                         orchestration_strategy=OrchestrationStrategy,
                                         graph=Graph0,
                                         tree=Tree0,
                                         was_connected=WasConnected0}=State) ->
    % _ = ?LOG_INFO(#{description => "Beginning graph analysis."}),

    %% Delete existing graphs to prevent ets table leak.
    digraph:delete(Tree0),
    digraph:delete(Graph0),

    %% Get all running nodes, because we need the list of *everything*
    %% to analyze the graph for connectedness.
    Servers = OrchestrationStrategy:servers(State),
    Clients = OrchestrationStrategy:clients(State),
    ServerNames = node_names(sets:to_list(Servers)),
    ClientNames = node_names(sets:to_list(Clients)),
    Nodes = ServerNames ++ ClientNames,

    %% Build the tree.
    Tree = digraph:new(),

    case partisan_config:get(broadcast, false) of
        true ->
            try
                Root = hd(ServerNames),
                populate_tree(Root, Nodes, Tree)
            catch
                _:_ ->
                    ok
            end;
        false ->
            ok
    end,

    %% Build the graph.
    Graph = digraph:new(),
    Orphaned = populate_graph(State, Nodes, Graph),

    {SymmetricViews, VisitedNames} = breadth_first(partisan:node(), Graph, ordsets:new()),
    AllNodesVisited = length(Nodes) == length(VisitedNames),

    Connected = SymmetricViews andalso AllNodesVisited,

    case Connected of
        true ->
            ?LOG_TRACE(#{description => "Graph is connected!"}),
            ok;
        false ->
            ServerMembership = members_for_orchestration(),
            ?LOG_INFO(#{
                description => "Graph is not connected!",
                membership => ServerMembership,
                member_count => length(ServerMembership),
                visited_count => length(VisitedNames),
                node => partisan:node(),
                visited => VisitedNames
            }),
            ok
    end,

    WasConnected = Connected orelse WasConnected0,

    case length(Orphaned) of
        0 ->
            ok;
        Length ->
            ?LOG_INFO(#{
                description => "Isolated nodes",
                count => Length,
                nodes => Orphaned
            })
    end,

    schedule_build_graph(),

    {noreply, State#orchestration_strategy_state{
                          is_connected=Connected,
                          was_connected=WasConnected,
                          graph=Graph,
                          tree=Tree}};

handle_info(Event, State) ->
    ?LOG_WARNING(#{description => "Unhandled info event", event => Event}),
    {noreply, State}.

%% @private
-spec terminate(term(), #orchestration_strategy_state{}) -> term().
terminate(_Reason, _State) ->
    ok.

%% @private
-spec code_change(term() | {down, term()}, #orchestration_strategy_state{}, term()) -> {ok, #orchestration_strategy_state{}}.
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

%% @private
maybe_connect(PeerService, Nodes, SeenNodes) ->
    %% If this is the first time you've seen the node, attempt to
    %% connect; only attempt to connect once, because node might be
    %% migrated to a passive view of the membership.
    %% If the node is isolated always try to connect.
    Membership0 = members_for_orchestration(),
    Membership1 = Membership0 -- [partisan:node()],
    Isolated = length(Membership1) == 0,

    ToConnect = case Isolated of
        true ->
            Nodes;
        false ->
            sets:subtract(Nodes, SeenNodes)
    end,

    case sets:to_list(ToConnect) of
        [] ->
            ok;
        _ ->
            ?LOG_INFO("Attempting to connect: ~p", [sets:to_list(ToConnect)])
    end,

    %% Attempt connection to any new nodes.
    sets:fold(fun(Node, Acc) -> [connect(PeerService, Node) | Acc] end, [], ToConnect),

    %% Return list of seen nodes with the new node.
    sets:union(Nodes, SeenNodes).

%% @private
connect(PeerService, Node) ->
    PeerService:join(Node).


%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec breadth_first(
    Root :: node(), Graph :: digraph:graph(), ordsets:ordset(node())) ->
    {boolean(), ordsets:ordset(node())}.

breadth_first(Root, Graph, Visited0) when is_atom(Root) ->
    %% Check if every link is bidirectional
    %% If not, stop traversal
    In = ordsets:from_list(digraph:in_neighbours(Graph, Root)),
    Out = ordsets:from_list(digraph:out_neighbours(Graph, Root)),

    Visited1 = ordsets:union(Visited0, [Root]),

    case In == Out of
        true ->
            {SymmetricViews, VisitedNodes} =
                ordsets:fold(
                    fun
                        (Peer, {IsSymmetric0, VisitedNodes0})
                        when is_boolean(IsSymmetric0), is_atom(Peer) ->
                            {IsSymmetric1, VisitedNodes1} = breadth_first(
                                Peer, Graph, VisitedNodes0
                            ),
                            IsSymmetric2 = IsSymmetric0 andalso IsSymmetric1,
                            {
                                IsSymmetric2,
                                ordsets:union(VisitedNodes0, VisitedNodes1)
                            }
                    end,
                    {true, Visited1},
                    ordsets:subtract(Out, Visited1)
                ),
            {SymmetricViews, ordsets:union(VisitedNodes, Out)};

        false ->
            ?LOG_INFO(
                "Non symmetric views for node ~p. In ~p; Out ~p",
                [Root, In, Out]
            ),
            {false, ordsets:new()}
    end.

%% @private
prefix(File) ->
    DeploymentIdentifier = partisan_config:get(deployment_identifier, undefined),
    DeploymentTimestamp = partisan_config:get(deployment_timestamp, 0),
    "partisan" ++ "/" ++ atom_to_list(DeploymentIdentifier) ++ "/" ++ integer_to_list(DeploymentTimestamp) ++ "/" ++ File.

%% @private
schedule_build_graph() ->
    %% Add random jitter.
    Jitter = rand:uniform(?BUILD_GRAPH_INTERVAL),
    timer:send_after(?BUILD_GRAPH_INTERVAL + Jitter, ?BUILD_GRAPH_MESSAGE).

%% @private
schedule_artifact_upload() ->
    %% Add random jitter.
    Jitter = rand:uniform(?ARTIFACT_INTERVAL),
    timer:send_after(?ARTIFACT_INTERVAL + Jitter, ?ARTIFACT_MESSAGE).

%% @private
schedule_membership_refresh() ->
    %% Add random jitter.
    Jitter = rand:uniform(?REFRESH_INTERVAL),
    timer:send_after(?REFRESH_INTERVAL + Jitter, ?REFRESH_MESSAGE).

%% @private
vertices_and_edges(Graph) ->
    Vertices = digraph:vertices(Graph),
    Edges = lists:map(
        fun(Edge) ->
            {_E, V1, V2, _Label} = digraph:edge(Graph, Edge),
            {V1, V2}
        end,
        digraph:edges(Graph)
    ),
    {Vertices, Edges}.

%% @private
node_names([]) ->
    [];
node_names([#{name := Name}|T]) ->
    [Name|node_names(T)];
node_names([Name|T]) ->
    [Name|node_names(T)].

%% @private
populate_graph(State, Nodes, Graph) ->
    lists:foldl(
        fun(Node, OrphanedNodes) ->
            File = prefix(atom_to_list(Node)),
            try
                case download_artifact(State, File) of
                    undefined ->
                        OrphanedNodes;
                    Body ->
                        Payload = binary_to_term(Body),

                        case Payload of
                            {_NodeMyself, [Node]} ->
                                add_edges(Node, [], Graph),
                                [Node|OrphanedNodes];
                            {_NodeMyself, NodeMembership} ->
                                add_edges(Node, node_names(NodeMembership), Graph),
                                OrphanedNodes
                        end
                end
            catch
                _:{aws_error, Error} ->
                    %% TODO: Move me inside download artifact.
                    add_edges(Node, [], Graph),
                    ?LOG_INFO("Could not get graph object; ~p", [Error]),
                    OrphanedNodes
            end
        end,
        [],
        Nodes
    ).

%% @private
populate_tree(Root, Nodes, Tree) ->
    DebugTree = partisan_plumtree_broadcast:debug_get_tree(Root, Nodes, 5000),
    lists:foreach(
        fun
            ({Node, down}) ->
                add_edges(Node, [], Tree);
            ({Node, {Eager, _Lazy}}) ->
                add_edges(Node, Eager, Tree)
        end,
        DebugTree
    ).

%% @private
add_edges(Name, Membership, Graph) ->
    %% Add node to graph.
    digraph:add_vertex(Graph, Name),

    lists:foldl(
        fun(N, _) ->
            %% Add node to graph.
            digraph:add_vertex(Graph, N),

            %% Add edge to graph.
            digraph:add_edge(Graph, Name, N)
        end,
        Graph,
        Membership
    ).

%% @private
upload_artifact(#orchestration_strategy_state{orchestration_strategy=OrchestrationStrategy}=State, Node, Payload) ->
    OrchestrationStrategy:upload_artifact(State, Node, Payload).

%% @private
download_artifact(#orchestration_strategy_state{orchestration_strategy=OrchestrationStrategy}=State, Node) ->
    OrchestrationStrategy:download_artifact(State, Node).


%% @private
members_for_orchestration() ->
    try
        %% Assumes full membership.
        {ok, Members} = ?PEER_SERVICE_MANAGER:members_for_orchestration(),
        Members
    catch
        _:_ ->
            []
    end.