%% -------------------------------------------------------------------
%%
%% Copyright (c) 2017 Christopher S. Meiklejohn. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% -----------------------------------------------------------------------------
%% @doc This modules implements a server that realises the
%% {@link partisan_plumtree_broadcast_handler} behaviour in order to diseminate
%% heartbeat messages. Partisan uses these heartbeat messages to stimulate the
%% Epidemic Broadcast Tree construction.
%%
%% The server will schedule the sending of a `hearbeat` message periodically
%% using the `broadcast_heartbeat_interval` option.
%%
%% Notice that this handler does not perform AAE Exchanges, as we will always
%% have a periodic heartbeat. For that reason, the implementation
%% of the {@link partisan_plumtree_broadcast_handler:exchange/1}
%% callback always returns `ignore'.
%%
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_plumtree_backend).
-author("Christopher S. Meiklejohn <christopher.meiklejohn@gmail.com>").
-behaviour(gen_server).
-behaviour(partisan_plumtree_broadcast_handler).
-include("partisan.hrl").
-include("partisan_logger.hrl").
-record(state, {
node :: node(),
epoch :: integer(),
monotonic = 0 :: non_neg_integer()
}).
-type state() :: #state{}.
-record(broadcast, {
timestamp :: timestamp()
}).
-type broadcast_message() :: #broadcast{}.
-type timestamp() :: {
Node :: node(),
Epoch :: integer(),
Monotonic :: integer()
}.
-type broadcast_id() :: timestamp().
-type broadcast_payload() :: timestamp().
%% API
-export([start_link/0]).
-export([start_link/1]).
-export([timestamps/0]).
%% transmission callbacks
-export([extract_log_type_and_payload/1]).
%% partisan_plumtree_broadcast_handler callbacks
-export([broadcast_channel/0]).
-export([broadcast_data/1]).
-export([exchange/1]).
-export([graft/1]).
-export([is_stale/1]).
-export([merge/2]).
%% gen_server callbacks
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).
%% =============================================================================
%% API
%% =============================================================================
%% -----------------------------------------------------------------------------
%% @doc Same as start_link([]).
%% @end
%% -----------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
start_link([]).
%% -----------------------------------------------------------------------------
%% @doc Start and link to calling process.
%% @end
%% -----------------------------------------------------------------------------
-spec start_link(list())-> {ok, pid()} | ignore | {error, term()}.
start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, []).
%% -----------------------------------------------------------------------------
%% @doc Returns all the timestamps.
%%
%% Notice this can copy a lot of data from the ets table to the calling process.
%%
%% In case `partisan_config:get(broadcast)' returns `false', this function
%% returns an empty list.
%% @end
%% -----------------------------------------------------------------------------
-spec timestamps() -> [timestamp()].
timestamps() ->
try
ets:tab2list(?MODULE)
catch
error:badarg ->
[]
end.
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
extract_log_type_and_payload({prune, Root, From}) ->
[{broadcast_protocol, {Root, From}}];
extract_log_type_and_payload({ignored_i_have, MessageId, _Mod, Round, Root, From}) ->
[{broadcast_protocol, {MessageId, Round, Root, From}}];
extract_log_type_and_payload({graft, MessageId, _Mod, Round, Root, From}) ->
[{broadcast_protocol, {MessageId, Round, Root, From}}];
extract_log_type_and_payload(
{broadcast, MessageId, Timestamp, _Mod, Round, Root, From}) ->
[{broadcast_protocol, {Timestamp, MessageId, Round, Root, From}}];
extract_log_type_and_payload({i_have, MessageId, _Mod, Round, Root, From}) ->
[{broadcast_protocol, {MessageId, Round, Root, From}}];
extract_log_type_and_payload(Message) ->
?LOG_INFO("No match for extracted payload: ~p", [Message]),
[].
%% =============================================================================
%% PARTISAN_PLUMTREE_BROADCAST_HANDLER CALLBACKS
%% =============================================================================
%% -----------------------------------------------------------------------------
%% @doc Returns the channel to be used when broadcasting a message
%% on behalf of this handler.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast_channel() -> partisan:channel().
broadcast_channel() ->
?MEMBERSHIP_CHANNEL.
%% -----------------------------------------------------------------------------
%% @doc Returns from the broadcast message the identifier and the payload.
%% In this case a tuple where both arguments have the broadcast message
%% `timestamp'. These messages are used by Partisan as a stimulus for the
%% Epidemic Broadcast Tree (Plumtree) construction.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast_data(broadcast_message()) ->
{broadcast_id(), broadcast_payload()}.
broadcast_data(#broadcast{timestamp = Timestamp}) ->
{Timestamp, Timestamp}.
%% -----------------------------------------------------------------------------
%% @doc Perform a merge of an incoming object with an object in the
%% local datastore.
%% @end
%% -----------------------------------------------------------------------------
-spec merge(broadcast_id(), broadcast_payload()) -> boolean().
merge(Timestamp, Timestamp) ->
?LOG_DEBUG("Heartbeat received: ~p", [Timestamp]),
case is_stale(Timestamp) of
true ->
false;
false ->
gen_server:call(?MODULE, {merge, Timestamp}, infinity),
true
end.
%% -----------------------------------------------------------------------------
%% @doc Use the clock on the object to determine if this message is
%% stale or not.
%% A message is stale if the received message is causually newer than an
%% existing one. If the message is missing or if the context does not represent
%% an ancestor of message, false is returned. Otherwise, true is
%% returned.
%% @end
%% -----------------------------------------------------------------------------
-spec is_stale(broadcast_id()) -> boolean().
is_stale({Node, Epoch, Monotonic}) ->
try ets:lookup(?MODULE, Node) of
[] ->
false;
[{_, Epoch, ISet}] ->
partisan_interval_sets:is_element(Monotonic, ISet);
[{_, Epoch0, _}] ->
Epoch0 > Epoch
catch
_:_ ->
%% we return true so that merge is not called
true
end.
%% -----------------------------------------------------------------------------
%% @doc Given a message identifier and a clock, return a given message.
%% @end
%% -----------------------------------------------------------------------------
-spec graft(broadcast_id()) ->
stale | {ok, broadcast_payload()} | {error, {not_found, timestamp()}}.
graft({Node, Epoch, Monotonic} = Timestamp) ->
try ets:lookup(?MODULE, Node) of
[] ->
?LOG_DEBUG(#{
description => "Heartbeat message not found for graft.",
timestamp => Timestamp
}),
not_found(Timestamp);
[{Node, Epoch, ISet}] ->
case partisan_interval_sets:is_element(Monotonic, ISet) of
true ->
{ok, Timestamp};
false ->
not_found(Timestamp)
end;
[{_, Epoch0, _}] when Epoch0 > Epoch ->
stale;
[{_, Epoch0, _}] when Epoch0 < Epoch ->
not_found(Timestamp)
catch
_:_ ->
not_found(Timestamp)
end.
%% -----------------------------------------------------------------------------
%% @doc Returns `ignore`.
%% This is because we don't need to worry about reliable delivery: we always
%% know we'll have another heartbeat message to further repair during the next
%% interval.
%% @end
%% -----------------------------------------------------------------------------
-spec exchange(node()) -> {ok, pid()} | {error, any()} | ignore.
exchange(_Peer) ->
ignore.
%% =============================================================================
%% GEN_SERVER CALLBACKS
%% =============================================================================
-spec init([]) -> {ok, state()}.
init([]) ->
%% Seed the random number generator.
partisan_config:seed(),
schedule_heartbeat(),
%% Open an ETS table for tracking heartbeat messages.
ets:new(?MODULE, [named_table, set, protected]),
State = #state{
node = node(),
epoch = erlang:system_time(),
monotonic = 0
},
{ok, State}.
-spec handle_call(term(), {pid(), term()}, state()) ->
{reply, term(), state()}.
handle_call({merge, {_, _, _} = Timestamp}, _From, State) ->
true = add_timestamp(Timestamp),
{reply, ok, State};
handle_call(Event, _From, State) ->
?LOG_WARNING(#{description => "Unhandled call event", event => Event}),
{reply, ok, State}.
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast(Event, State) ->
?LOG_WARNING(#{description => "Unhandled cast event", event => Event}),
{noreply, State}.
handle_info(heartbeat, State) ->
%% Generate message with monotonically increasing integer.
Monotonic = State#state.monotonic + 1,
%% Make sure the node prefixes the timestamp with it's own
%% identifier: this means that we can have this tree
%% participate in multiple trees, each rooted at a different
%% node.
Timestamp = {State#state.node, State#state.epoch, Monotonic},
%% Insert a new message into the table.
true = add_timestamp(Timestamp),
%% Send message with monotonically increasing integer.
ok = partisan_plumtree_broadcast:broadcast(
#broadcast{timestamp = Timestamp},
?MODULE
),
?LOG_DEBUG(
"Heartbeat triggered: sending ping ~p to ensure tree.",
[Timestamp]
),
%% Schedule report.
schedule_heartbeat(),
{noreply, State#state{monotonic = Monotonic}};
handle_info(Event, State) ->
?LOG_WARNING(#{description => "Unhandled info event", event => Event}),
{noreply, State}.
-spec terminate(term(), state()) -> term().
terminate(_Reason, _State) ->
ok.
-spec code_change(term() | {down, term()}, state(), term()) ->
{ok, state()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% =============================================================================
%% PRIVATE
%% =============================================================================
%% @private
not_found(Timestamp) ->
{error, {not_found, Timestamp}}.
%% @private
add_timestamp({Node, Epoch, Monotonic}) ->
case ets:lookup(?MODULE, Node) of
[] ->
ISet = partisan_interval_sets:from_list([Monotonic]),
true = ets:insert(?MODULE, [{Node, Epoch, ISet}]);
[{_, Epoch0, _}] when Epoch0 < Epoch ->
ISet = partisan_interval_sets:from_list([Monotonic]),
true = ets:insert(?MODULE, [{Node, Epoch, ISet}]);
[{_, Epoch, ISet0}] ->
ISet = partisan_interval_sets:add_element(Monotonic, ISet0),
true = ets:insert(?MODULE, [{Node, Epoch, ISet}]);
[{_, Epoch0, _}] when Epoch0 > Epoch ->
%% We ignore as it is an old message
true
end.
%% @private
schedule_heartbeat() ->
case partisan_config:get(broadcast, false) of
true ->
Interval = partisan_config:get(broadcast_heartbeat_interval, 10000),
timer:send_after(Interval, heartbeat);
false ->
ok
end.