src/partisan_full_membership_strategy.erl

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2018 Christopher S. Meiklejohn.  All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% -----------------------------------------------------------------------------
%% @doc This module implements the full-mesh membership strategy to be used
%% with {link partisan_pluggable_peer_service_manager}.
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_full_membership_strategy).

-behaviour(partisan_membership_strategy).

-include("partisan.hrl").
-include("partisan_logger.hrl").

-author("Christopher S. Meiklejohn <christopher.meiklejohn@gmail.com>").


-record(full_v1, {
    actor           ::  partisan:actor(),
    membership      ::  partisan_membership_set:t()
}).

-type t()                   :: #full_v1{}.
-type membership_list()     :: partisan_membership_strategy:membership_list().
-type outgoing_messages()   :: partisan_membership_strategy:outgoing_messages().


%% PARTISAN_MEMBERSHIP_STRATEGY CALLBACKS
-export([init/1]).
-export([join/3]).
-export([leave/2]).
-export([compare/2]).
-export([periodic/1]).
-export([prune/2]).
-export([handle_message/2]).



%% =============================================================================
%% PARTISAN_MEMBERSHIP_STRATEGY CALLBACKS
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @doc Initialize the strategy state.
%% @end
%% -----------------------------------------------------------------------------
-spec init(partisan:actor()) ->
    {ok, membership_list(), State :: any()}.

init(Actor) ->
    State = maybe_load_state_from_disk(Actor),
    Members = members(State),
    ok = persist_state(State),
    {ok, Members, State}.


%% -----------------------------------------------------------------------------
%% @doc When a node is connected, return the state, membership and outgoing
%% message queue to be transmitted.
%% @end
%% -----------------------------------------------------------------------------
-spec join(partisan:node_spec(), PeerState :: any(), State :: any()) ->
    {ok, membership_list(), outgoing_messages(), NewState :: any()}.

join(_Node, #full_v1{} = PeerState, #full_v1{} = State0) ->
    M0 = State0#full_v1.membership,
    PeerM = PeerState#full_v1.membership,

    M = partisan_membership_set:merge(M0, PeerM),
    State = State0#full_v1{membership = M},
    ok = persist_state(State),

    Members = members(State),
    OutgoingMessages = gossip_messages(State),

    {ok, Members, OutgoingMessages, State}.


%% -----------------------------------------------------------------------------
%% @doc Periodic protocol maintenance.
%% @end
%% -----------------------------------------------------------------------------
-spec periodic(State :: any()) ->
    {ok, membership_list(), outgoing_messages(), NewState :: any()}.

periodic(State) ->
    Members = members(State),
    OutgoingMessages = gossip_messages(State),
    persist_state(State),

    {ok, Members, OutgoingMessages, State}.


%% -----------------------------------------------------------------------------
%% @doc Returns the tuple `{Joiners, Leavers}' where `Joiners' is the list of
%% node specifications that are elements of `List' but are not in the
%% membership set, and `Leavers' are the node specifications for the current
%% members that are not elements in `List'.
%% @end
%% -----------------------------------------------------------------------------
-spec compare(Members :: [partisan:node_spec()], t()) ->
    {Joiners :: [partisan:node_spec()], Leavers :: [partisan:node_spec()]}.

compare(Members, State) ->
    partisan_membership_set:compare(Members, State#full_v1.membership).


%% -----------------------------------------------------------------------------
%% @doc Handling incoming protocol message.
%% @end
%% -----------------------------------------------------------------------------
-spec handle_message(partisan:message(), State :: any()) ->
    {ok, membership_list(), outgoing_messages(), NewState :: any()}.

handle_message({#{name := From}, #full_v1{} = State1}, #full_v1{} = State0) ->

    M0 = State0#full_v1.membership,
    M1 = State1#full_v1.membership,

    ?LOG_DEBUG(
        fun([Node, M]) ->
            #{
                description => "Received membership_strategy",
                from => Node,
                membership => partisan_membership_set:to_list(M)
            }
        end,
        [From, M1]
    ),

    case partisan_membership_set:equal(M0, M1) of
        true ->
            %% Convergence of gossip at this node.
            Members = partisan_membership_set:to_list(M0),
            OutgoingMessages = [],
            {ok, Members, OutgoingMessages, State0};
        false ->
            %% Merge, persist, reforward to peers.
            M = partisan_membership_set:merge(M0, M1),
            State = State0#full_v1{membership = M},
            ok = persist_state(State),

            Members = partisan_membership_set:to_list(M),
            OutgoingMessages = gossip_messages(State),

            {ok, Members, OutgoingMessages, State}
    end.


%% -----------------------------------------------------------------------------
%% @doc Leave a node from the cluster.
%% @end
%% -----------------------------------------------------------------------------
-spec leave(partisan:node_spec(), State :: any()) ->
    {ok, membership_list(), outgoing_messages(), NewState :: any()}.

leave(#{name := NameLeaving}, #full_v1{} = State0) ->
    Actor = State0#full_v1.actor,
    Membership0 = State0#full_v1.membership,

    %% Node may exist in the membership on multiple ports, so we need to
    %% remove all.
    Membership = lists:foldl(
        fun
            (#{name := Name} = N, Acc0) when Name == NameLeaving ->
                partisan_membership_set:remove(N, Actor, Acc0);
            (_, Acc0) ->
                Acc0
        end,
        Membership0,
        members(State0)
    ),

    %% Self-leave removes our own state and resets it.
    StateToGossip = State0#full_v1{membership = Membership},

    State = case partisan:node() of
        NameLeaving ->
            %% Reset our state, store this, but gossip the state with us
            %% removed to the remainder of the members.
            new_state(Actor);
        _ ->
            %% Gossip state with member removed.
            StateToGossip
    end,

    ok = persist_state(State),

    Members = members(State),

    %% Gossip new membership to existing members, so they remove themselves.
    OutgoingMessages = gossip_messages(State0, StateToGossip),

    {ok, Members, OutgoingMessages, State}.


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec prune([partisan:node_spec()], State :: any()) ->
    {ok, membership_list(), NewState :: any()}.

prune([H|T], #full_v1{membership = M0} = State0) ->
    Actor = State0#full_v1.actor,
    M = partisan_membership_set:remove(H, Actor, M0),
    State = State0#full_v1{membership = M},
    prune(T, State);

prune([], State) ->
    {ok, members(State), State}.



%% =============================================================================
%% PRIVATE
%% =============================================================================



%% @private
members(#full_v1{membership = M}) ->
    partisan_membership_set:to_list(M).


%% @private
gossip_messages(State) ->
    gossip_messages(State, State).


%% @private
gossip_messages(State0, #full_v1{} = State) ->
    case partisan_config:get(gossip, true) of
        true ->
            %% All nodes excluding myself
            M = State0#full_v1.membership,
            Peers = partisan_membership_set:to_peer_list(M),
            gossip_messages(State, Peers);
        _ ->
            []
    end;

gossip_messages(_, []) ->
    [];

gossip_messages(State, Peers) when is_list(Peers) ->
    Msg = {membership_strategy, {partisan:node_spec(), State}},
    [{Peer, Msg} || Peer <- Peers].


%% @private
maybe_load_state_from_disk(Actor) ->
    case data_root() of
        undefined ->
            new_state(Actor);
        Dir ->
            case filelib:is_regular(filename:join(Dir, "cluster_state")) of
                true ->
                    {ok, Bin} = file:read_file(filename:join(Dir, "cluster_state")),
                    binary_to_term(Bin);
                false ->
                    new_state(Actor)
            end
    end.


%% @private
new_state(Actor) ->
    Membership = partisan_membership_set:add(
        partisan:node_spec(), Actor, partisan_membership_set:new()
    ),
    State = #full_v1{actor = Actor, membership = Membership},
    ok = persist_state(State),
    State.


%% @private
persist_state(State) ->
    case partisan_config:get(persist_state, true) of
        true ->
            try
                write_state_to_disk(State)
            catch
                Class:Reason:Stacktrace ->
                    ?LOG_ERROR(#{
                        description => "Error while persisting membership",
                        class => Class,
                        reason => Reason,
                        stacktrace => Stacktrace
                    }),
                    ok
            end;
        false ->
            ok
    end.


%% @private
write_state_to_disk(State) ->
    case data_root() of
        undefined ->
            ok;
        Dir ->
            File = filename:join(Dir, "cluster_state"),
            ok = filelib:ensure_dir(File),
            ok = file:write_file(File, term_to_binary(State))
    end.


%% @private
data_root() ->
    case application:get_env(partisan, partisan_data_dir) of
        {ok, PRoot} ->
            filename:join(PRoot, "default_peer_service");
        undefined ->
            undefined
    end.