src/partisan_scamp_v2_membership_strategy.erl

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2018 Christopher S. Meiklejohn.  All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% -----------------------------------------------------------------------------
%% @doc
%% @reference https://people.maths.bris.ac.uk/~maajg/hiscamp-sigops.pdf
%%
%%
%% ==Isolation prevention ==
%%
%% Protocol has no specific description of how detection of recovery or
%% isolation is performed, so we use the protocol specification for that taken
%% from the scamp_v1 membership strategy.
%%
%% @todo Join of InView.
%% @todo Node unsubscript.
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_scamp_v2_membership_strategy).
-behaviour(partisan_membership_strategy).

-author("Christopher S. Meiklejohn <christopher.meiklejohn@gmail.com>").

-include("partisan.hrl").
-include("partisan_logger.hrl").



-record(scamp_v2, {
    myself              ::  partisan:node_spec(),
    actor               ::  partisan:actor(),
    partial_view        ::  [partisan:node_spec()],
    in_view             ::  [partisan:node_spec()],
    last_message_time   ::  erlang:timestamp() | undefined
}).


-type t()               ::  #scamp_v2{}.

-export([init/1]).
-export([join/3]).
-export([leave/2]).
-export([compare/2]).
-export([periodic/1]).
-export([prune/2]).
-export([handle_message/2]).



%% =============================================================================
%% API
%% =============================================================================



%% @doc Initialize the strategy state.
%%      Start with an empty state with only ourselves known.
init(Identity) ->
    Myself = partisan:node_spec(),
    PartialView = [Myself],

    State = #scamp_v2{
        myself = Myself,
        in_view = [],
        partial_view = PartialView,
        actor = Identity
    },
    {ok, PartialView, State}.


%% @doc When a remote node is connected, notify that node to add us.  Then, perform forwarding, if necessary.
join(Node, _NodeState, #scamp_v2{} = State0) ->
    PartialView0 = State0#scamp_v2.partial_view,
    OutgoingMessages0 = [],

    %% 1. Add node to our state.
    ?LOG_INFO("~p: Adding node ~p to our partial_view.", [partisan:node(), Node]),
    PartialView = [Node|PartialView0],

    %% 2. Notify node to add us to its state.
    %%    This is lazily done to ensure we can setup the TCP connection both ways, first.
    Myself = partisan:node_spec(),
    OutgoingMessages1 = OutgoingMessages0 ++ [
        {Node, {membership_strategy, {forward_subscription, Myself}}}
    ],

    %% 3. Notify all members we know about to add node to their partial_view.
    OutgoingMessages2 =
        lists:foldl(
            fun(N, OM) ->
                ?LOG_DEBUG(
                    "~p: Forwarding subscription for ~p to node: ~p",
                    [partisan:node(), Node, N]
                ),

                OM ++ [{N, {membership_strategy, {forward_subscription, Node}}}]
            end,
            OutgoingMessages1,
            PartialView0
        ),

    %% 4. Use 'c - 1' (failure tolerance value) to send forwarded subscriptions for node.
    %%
    %% @todo: Ambiguity here: "These forwarded subscriptions may be kept by the neighbours or
    %%                        "forwarded, but are not destroyed until some node keeps them?"
    %%
    %% What does this mean?
    %%
    C = partisan_config:get(scamp_c, ?SCAMP_C_VALUE),
    ForwardMessages = lists:map(fun(N) ->
        ?LOG_INFO(
            "~p: Forwarding additional subscription for ~p to node: ~p",
            [partisan:node(), Node, N]
        ),

        {N, {membership_strategy, {forward_subscription, Node}}}
        end, select_random_sublist(State0, C - 1)), %% Important difference from scamp_v1: (c - 1) additional copies instead of c!
    OutgoingMessages = OutgoingMessages2 ++ ForwardMessages,

    {ok, PartialView, OutgoingMessages, State0#scamp_v2{partial_view=PartialView}}.

%% @doc Leave a node from the cluster.
leave(Node, #scamp_v2{partial_view=PartialView}=State0) ->
    ?LOG_INFO("~p: Issuing remove_subscription for node ~p.", [partisan:node(), Node]),

    %% Begin unsubcription process: send a bootstrap message to the node that is being removed.
    Message = {bootstrap_remove_subscription, Node},
    OutgoingMessages = lists:map(fun(Peer) -> {Peer, {membership_strategy, Message}} end, PartialView),
    {ok, PartialView, OutgoingMessages, State0}.


%% -----------------------------------------------------------------------------
%% @doc Returns the tuple `{Joiners, Leavers}' where `Joiners' is the list of
%% node specifications that are elements of `List' but are not in the
%% membership set, and `Leavers' are the node specifications for the current
%% members that are not elements in `List'.
%% @end
%% -----------------------------------------------------------------------------
-spec compare(Members :: [partisan:node_spec()], t()) ->
    {Joiners :: [partisan:node_spec()], Leavers :: [partisan:node_spec()]}.

compare(_Members, #scamp_v2{}) ->
    %% TODO at the momebr this is called only when peer_service:jupdate_members
    %% is called. We need to define what happens in this case as we maintain a
    %% partial view of the cluster and Members could be the complete cluster
    %% view as discovered by partisan_peer_discovery_agent or manually by the
    %% user.
    {[], []}.




%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
prune(_Nodes, #scamp_v2{partial_view = PartialView} = State) ->
    %% Not implemented
    {ok, PartialView, State}.


%% @doc Periodic protocol maintenance.
periodic(#scamp_v2{partial_view=PartialView, last_message_time=LastMessageTime}=State) ->
    SourceNode = partisan:node_spec(),

    %% Isolation detection:
    %%
    %% Since we do not know the rate of message transmission by other nodes in the system,
    %% periodically transmit a message to all known nodes.  Each node will keep track of the
    %% last message received, and if we don't receive one after X interval, then we know
    %% we are isolated.
    OutgoingPingMessages = lists:map(fun(Peer) ->
        {Peer, {membership_strategy, {ping, SourceNode}}}
    end, PartialView),

    Difference = case LastMessageTime of
        undefined ->
            0;
        _ ->
            CurrentTime = erlang:timestamp(),
            timer:now_diff(CurrentTime, LastMessageTime)
    end,

    OutgoingSubscriptionMessages = case Difference > (?PERIODIC_INTERVAL * ?SCAMP_MESSAGE_WINDOW) of
        true ->
            %% Node is isolated.
            ?LOG_TRACE("~p: Node is possibly isolated.", [partisan:node()]),

            Myself = partisan:node_spec(),

            lists:map(fun(N) ->
                ?LOG_TRACE(
                    "~p: Forwarding additional subscription for ~p to node: ~p",
                    [partisan:node(), Myself, N]
                ),

                {N, {membership_strategy, {forward_subscription, Myself}}}
            end, select_random_sublist(State, 1));
        false ->
            %% Node is not isolated.
            []
    end,

    {ok, PartialView, OutgoingSubscriptionMessages ++ OutgoingPingMessages, State}.

%% @doc Handling incoming protocol message.
handle_message({ping, SourceNode}, #scamp_v2{partial_view=PartialView0}=State) ->
    ?LOG_TRACE("~p: Received ping from node ~p.", [partisan:node(), SourceNode]),

    LastMessageTime = erlang:timestamp(),
    OutgoingMessages = [],
    {ok, PartialView0, OutgoingMessages, State#scamp_v2{last_message_time=LastMessageTime}};
handle_message({bootstrap_remove_subscription, Node}, #scamp_v2{partial_view=PartialView0, in_view=InView0}=State0) ->
    ?LOG_TRACE(
        "~p: Received bootstrap_remove_subscription from node ~p.",
        [partisan:node(), Node]
    ),

    Myself = partisan:node_spec(),
    C = partisan_config:get(scamp_c, ?SCAMP_C_VALUE),

    case Node of
        %% Remove ourselves, but attempt to preserve the scaling relation.
        Myself ->
            %% 1. Notify InView[0 - (L - C - 1)] to replace with PartialView[0 - (L - C - 1)]
            NumToIterate = length(InView0) - (C - 1),

            ReplacementMessages = case NumToIterate > 0 of
                true ->
                    lists:map(fun(N) ->
                        Nth = lists:nth(N, InView0),
                        Replacement = lists:nth(N div length(PartialView0), PartialView0),
                        {Nth, {membership_strategy, {replace_subscription, Node, Replacement}}}
                    end, lists:seq(1, NumToIterate));
                false ->
                    []
            end,

            %% 2. Notify InView[(L - C - 1) - ] to remove.
            RemainderToIterate = length(InView0) - NumToIterate,

            RemovalMessages = case RemainderToIterate > 0 of
                true ->
                    lists:map(fun(N) ->
                        Nth = lists:nth(N, InView0),
                        {Nth, {membership_strategy, {remove_subscription, Node}}}
                    end, lists:seq(1, RemainderToIterate));
                false ->
                    []
            end,

            %% Reset our state.
            {ok, [], ReplacementMessages ++ RemovalMessages, State0#scamp_v2{in_view=[], partial_view=[]}};
        _ ->
            %% Not us, do nothing.
            {ok, PartialView0, [], State0}
    end;
handle_message({replace_subscription, Node, Replacement}, #scamp_v2{partial_view=PartialView0}=State0) ->
    ?LOG_TRACE(
        "~p: Received replace_subscription for node ~p => ~p.",
        [partisan:node(), Node, Replacement]
    ),

    %% Replacement reorganizes the graphs so that the removed nodes parents connect to
    %% its children; but, this doesn't update in links, right?  Is that missing in the
    %% protocol description?

    PartialView = lists:map(fun(N) ->
            case N of
                Node ->
                    Replacement;
                _ ->
                    N
            end
        end, PartialView0),

    {ok, PartialView, [], State0#scamp_v2{partial_view=PartialView}};
handle_message({remove_subscription, Node}, #scamp_v2{partial_view=PartialView0}=State0) ->
    ?LOG_TRACE("~p: Received remove_subscription for node ~p.", [partisan:node(), Node]),

    case lists:member(Node, PartialView0) of
        true ->
            %% Remove.
            PartialView = PartialView0 -- [Node],

            %% Gossip removals.
            Message = {remove_subscription, Node},
            OutgoingMessages = lists:map(fun(Peer) -> {Peer, {membership_strategy, Message}} end, PartialView0),

            %% Update state.
            {ok, PartialView, OutgoingMessages, State0#scamp_v2{partial_view=PartialView}};
        false ->
            OutgoingMessages = [],
            {ok, PartialView0, OutgoingMessages, State0}
    end;
handle_message({forward_subscription, Node}, #scamp_v2{partial_view=PartialView0}=State0) ->
    ?LOG_TRACE("~p: Received subscription for node ~p.", [partisan:node(), Node]),

    %% Probability: P = 1 / (1 + sizeOf(View))
    Random = random_0_or_1(),
    Keep = trunc((length(PartialView0) + 1) * Random),

    case Keep =:= 0 andalso not lists:member(Node, PartialView0) of
        true ->
            ?LOG_TRACE("~p: Adding subscription for node: ~p", [partisan:node(), Node]),
            PartialView = [Node|PartialView0],

            %% Respond to the node that's joining and tell them to keep us.
            ?LOG_TRACE(
                "~p: Notifying ~p to keep us: ~p", [partisan:node(), Node, partisan:node()]
            ),
            OutgoingMessages = [{Node, {membership_strategy, {keep_subscription, partisan:node_spec()}}}],

            {ok, PartialView, OutgoingMessages, State0#scamp_v2{partial_view=PartialView}};
        false ->
            OutgoingMessages = lists:map(fun(N) ->
                ?LOG_TRACE(
                    "~p: Forwarding subscription for ~p to node: ~p",
                    [partisan:node(), Node, N]
                ),
                {N, {membership_strategy, {forward_subscription, Node}}}
            end, select_random_sublist(State0, 1)),
            {ok, PartialView0, OutgoingMessages, State0}
    end;
handle_message({keep_subscription, Node}, #scamp_v2{partial_view=PartialView0, in_view=InView0}=State) ->
    ?LOG_TRACE("~p: Received keep_subscription for node ~p.", [partisan:node(), Node]),

    InView = [Node|InView0],
    OutgoingMessages = [],
    {ok, PartialView0, OutgoingMessages, State#scamp_v2{in_view=InView}}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

%% @private
select_random_sublist(#scamp_v2{partial_view=PartialView}, K) ->
    lists:sublist(shuffle(PartialView), K).

%% -----------------------------------------------------------------------------
%% @private
%% @doc
%% http://stackoverflow.com/questions/8817171/shuffling-elements-in-a-list-randomly-re-arrange-list-elements/8820501#8820501
%% @end
%% -----------------------------------------------------------------------------
shuffle(L) ->
    [X || {_, X} <- lists:sort([{rand:uniform(), N} || N <- L])].

%% @private
random_0_or_1() ->
    Rand = rand:uniform(10),
    case Rand >= 5 of
        true ->
            1;
        false ->
            0
    end.