%% -------------------------------------------------------------------
%%
%% Copyright (c) 2018 Christopher S. Meiklejohn. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(partisan_full_membership_strategy).
-author("Christopher S. Meiklejohn <christopher.meiklejohn@gmail.com>").
-behaviour(partisan_membership_strategy).
-export([init/1,
join/3,
leave/2,
periodic/1,
handle_message/2]).
-define(SET, state_orset).
-record(full_v1, {actor, membership}).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Initialize the strategy state.
init(Identity) ->
State = maybe_load_state_from_disk(Identity),
MembershipList = membership_list(State),
persist_state(State),
{ok, MembershipList, State}.
%% @doc When a node is connected, return the state, membership and outgoing message queue to be transmitted.
join(#full_v1{membership=Membership0} = State0, _Node, #full_v1{membership=NodeMembership}) ->
Membership = ?SET:merge(Membership0, NodeMembership),
State = State0#full_v1{membership=Membership},
MembershipList = membership_list(State),
OutgoingMessages = gossip_messages(State),
persist_state(State),
{ok, MembershipList, OutgoingMessages, State}.
%% @doc Leave a node from the cluster.
leave(#full_v1{membership=Membership0, actor=Actor}=State0, #{name := NameToRemove}) ->
%% Node may exist in the membership on multiple ports, so we need to
%% remove all.
Membership = lists:foldl(fun(#{name := Name} = N, M0) ->
case NameToRemove of
Name ->
{ok, M} = ?SET:mutate({rmv, N}, Actor, M0),
M;
_ ->
M0
end
end, Membership0, membership_list(State0)),
%% Self-leave removes our own state and resets it.
StateToGossip = State0#full_v1{membership=Membership},
State = case partisan_peer_service_manager:mynode() of
NameToRemove ->
%% Reset our state, store this, but gossip the state with us removed to the remainder of the members.
new_state(Actor);
_ ->
%% Gossip state with member removed.
StateToGossip
end,
MembershipList = membership_list(State),
%% Gossip new membership to existing members, so they remove themselves.
OutgoingMessages = gossip_messages(State0, StateToGossip),
persist_state(State),
{ok, MembershipList, OutgoingMessages, State}.
%% @doc Periodic protocol maintenance.
periodic(State) ->
MembershipList = membership_list(State),
OutgoingMessages = gossip_messages(State),
{ok, MembershipList, OutgoingMessages, State}.
%% @doc Handling incoming protocol message.
handle_message(#full_v1{membership=Membership0}=State0, {#{name := _From}, #full_v1{membership=NodeMembership}}) ->
case ?SET:equal(Membership0, NodeMembership) of
true ->
%% Convergence of gossip at this node.
MembershipList = membership_list(State0),
OutgoingMessages = [],
{ok, MembershipList, OutgoingMessages, State0};
false ->
%% Merge, persist, reforward to peers.
Membership = ?SET:merge(Membership0, NodeMembership),
State = State0#full_v1{membership=Membership},
MembershipList = membership_list(State),
OutgoingMessages = gossip_messages(State),
persist_state(State),
{ok, MembershipList, OutgoingMessages, State}
end.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% @private
membership_list(#full_v1{membership=Membership}) ->
sets:to_list(?SET:query(Membership)).
%% @private
gossip_messages(State) ->
gossip_messages(State, State).
%% @private
gossip_messages(State0, State) ->
MembershipList = membership_list(State0),
case partisan_config:get(gossip, true) of
true ->
case MembershipList of
[] ->
[];
AllPeers ->
lists:map(fun(Peer) -> {Peer, {membership_strategy, {myself(), State}}} end, AllPeers)
end;
_ ->
[]
end.
%% @private
maybe_load_state_from_disk(Actor) ->
case data_root() of
undefined ->
new_state(Actor);
Dir ->
case filelib:is_regular(filename:join(Dir, "cluster_state")) of
true ->
{ok, Bin} = file:read_file(filename:join(Dir, "cluster_state")),
binary_to_term(Bin);
false ->
new_state(Actor)
end
end.
%% @private
data_root() ->
case application:get_env(partisan, partisan_data_dir) of
{ok, PRoot} ->
filename:join(PRoot, "default_peer_service");
undefined ->
undefined
end.
%% @private
new_state(Actor) ->
{ok, Membership} = ?SET:mutate({add, myself()}, Actor, ?SET:new()),
LocalState = #full_v1{membership=Membership, actor=Actor},
persist_state(LocalState),
LocalState.
%% @private
myself() ->
partisan_peer_service_manager:myself().
%% @private
persist_state(State) ->
case partisan_config:get(persist_state, true) of
true ->
write_state_to_disk(State);
false ->
ok
end.
%% @private
write_state_to_disk(State) ->
case data_root() of
undefined ->
ok;
Dir ->
File = filename:join(Dir, "cluster_state"),
ok = filelib:ensure_dir(File),
ok = file:write_file(File, term_to_binary(State))
end.