%% -------------------------------------------------------------------
%%
%% Copyright (c) 2017 Christopher S. Meiklejohn. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% -----------------------------------------------------------------------------
%% @doc This modules implements a server that realises the
%% {@link partisan_plumtree_broadcast_handler} behaviour in order to diseminate
%% heartbeat messages. Partisan uses these heartbeat messages to stimulate the
%% Epidemic Broadcast Tree construction.
%%
%% The server will schedule the sending of a `hearbeat` message periodically
%% using the `broadcast_heartbeat_interval` option.
%%
%% Notice that this handler does not perform AAE Exchanges, as we will always
%% have a periodic heartbeat. For that reason, the implementation
%% of the {@link partisan_plumtree_broadcast_handler:exchange/1}
%% callback always returns `ignore'.
%%
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_plumtree_backend).
-author("Christopher S. Meiklejohn <christopher.meiklejohn@gmail.com>").
-behaviour(gen_server).
-behaviour(partisan_plumtree_broadcast_handler).
-include("partisan.hrl").
-include("partisan_logger.hrl").
-record(state, {}).
-type state() :: #state{}.
-record(broadcast, {
timestamp :: timestamp()
}).
-type broadcast_message() :: #broadcast{}.
-type timestamp() :: {node(), non_neg_integer()}.
-type broadcast_id() :: timestamp().
-type broadcast_payload() :: timestamp().
%% API
-export([start_link/0]).
-export([start_link/1]).
%% transmission callbacks
-export([extract_log_type_and_payload/1]).
%% partisan_plumtree_broadcast_handler callbacks
-export([broadcast_channel/0]).
-export([broadcast_data/1]).
-export([exchange/1]).
-export([graft/1]).
-export([is_stale/1]).
-export([merge/2]).
%% gen_server callbacks
-export([init/1]).
-export([handle_call/3]).
-export([handle_cast/2]).
-export([handle_info/2]).
-export([terminate/2]).
-export([code_change/3]).
%% =============================================================================
%% API
%% =============================================================================
%% -----------------------------------------------------------------------------
%% @doc Same as start_link([]).
%% @end
%% -----------------------------------------------------------------------------
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
start_link() ->
start_link([]).
%% -----------------------------------------------------------------------------
%% @doc Start and link to calling process.
%% @end
%% -----------------------------------------------------------------------------
-spec start_link(list())-> {ok, pid()} | ignore | {error, term()}.
start_link(Opts) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, Opts, []).
%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
extract_log_type_and_payload({prune, Root, From}) ->
[{broadcast_protocol, {Root, From}}];
extract_log_type_and_payload({ignored_i_have, MessageId, _Mod, Round, Root, From}) ->
[{broadcast_protocol, {MessageId, Round, Root, From}}];
extract_log_type_and_payload({graft, MessageId, _Mod, Round, Root, From}) ->
[{broadcast_protocol, {MessageId, Round, Root, From}}];
extract_log_type_and_payload(
{broadcast, MessageId, Timestamp, _Mod, Round, Root, From}) ->
[{broadcast_protocol, {Timestamp, MessageId, Round, Root, From}}];
extract_log_type_and_payload({i_have, MessageId, _Mod, Round, Root, From}) ->
[{broadcast_protocol, {MessageId, Round, Root, From}}];
extract_log_type_and_payload(Message) ->
?LOG_INFO("No match for extracted payload: ~p", [Message]),
[].
%% =============================================================================
%% PARTISAN_PLUMTREE_BROADCAST_HANDLER CALLBACKS
%% =============================================================================
%% -----------------------------------------------------------------------------
%% @doc Returns the channel to be used when broadcasting a message
%% on behalf of this handler.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast_channel() -> partisan:channel().
broadcast_channel() ->
?MEMBERSHIP_CHANNEL.
%% -----------------------------------------------------------------------------
%% @doc Returns from the broadcast message the identifier and the payload.
%% In this case a tuple where both arguments have the broadcast message
%% `timestamp'. These messages are used by Partisan as a stimulus for the
%% Epidemic Broadcast Tree (Plumtree) construction.
%% @end
%% -----------------------------------------------------------------------------
-spec broadcast_data(broadcast_message()) ->
{broadcast_id(), broadcast_payload()}.
broadcast_data(#broadcast{timestamp = Timestamp}) ->
{Timestamp, Timestamp}.
%% -----------------------------------------------------------------------------
%% @doc Perform a merge of an incoming object with an object in the
%% local datastore.
%% @end
%% -----------------------------------------------------------------------------
-spec merge(broadcast_id(), broadcast_payload()) -> boolean().
merge(Timestamp, Timestamp) ->
?LOG_DEBUG("Heartbeat received: ~p", [Timestamp]),
case is_stale(Timestamp) of
true ->
false;
false ->
gen_server:call(?MODULE, {merge, Timestamp}, infinity),
true
end.
%% -----------------------------------------------------------------------------
%% @doc Use the clock on the object to determine if this message is
%% stale or not.
%% @end
%% -----------------------------------------------------------------------------
-spec is_stale(broadcast_id()) -> boolean().
is_stale(Timestamp) ->
gen_server:call(?MODULE, {is_stale, Timestamp}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Given a message identifier and a clock, return a given message.
%% @end
%% -----------------------------------------------------------------------------
-spec graft(broadcast_id()) ->
stale | {ok, broadcast_payload()} | {error, term()}.
graft(Timestamp) ->
gen_server:call(?MODULE, {graft, Timestamp}, infinity).
%% -----------------------------------------------------------------------------
%% @doc Returns `ignore`.
%% This is because we don't need to worry about reliable delivery: we always
%% know we'll have another heartbeat message to further repair during the next
%% interval.
%% @end
%% -----------------------------------------------------------------------------
-spec exchange(node()) -> {ok, pid()} | {error, any()} | ignore.
exchange(_Peer) ->
ignore.
%% =============================================================================
%% GEN_SERVER CALLBACKS
%% =============================================================================
-spec init([]) -> {ok, state()}.
init([]) ->
%% Seed the random number generator.
partisan_config:seed(),
schedule_heartbeat(),
%% Open an ETS table for tracking heartbeat messages.
ets:new(?MODULE, [named_table]),
{ok, #state{}}.
-spec handle_call(term(), {pid(), term()}, state()) ->
{reply, term(), state()}.
handle_call({is_stale, Timestamp}, _From, State) ->
Result = case ets:lookup(?MODULE, Timestamp) of
[] ->
false;
_ ->
true
end,
{reply, Result, State};
handle_call({graft, Timestamp}, _From, State) ->
Result = case ets:lookup(?MODULE, Timestamp) of
[] ->
?LOG_DEBUG(#{
description => "Heartbeat message not found for graft.",
timestamp => Timestamp
}),
{error, {not_found, Timestamp}};
[{Timestamp, _}] ->
{ok, Timestamp}
end,
{reply, Result, State};
handle_call({merge, Timestamp}, _From, State) ->
true = ets:insert(?MODULE, [{Timestamp, true}]),
{reply, ok, State};
handle_call(Event, _From, State) ->
?LOG_WARNING(#{description => "Unhandled call event", event => Event}),
{reply, ok, State}.
-spec handle_cast(term(), state()) -> {noreply, state()}.
handle_cast(Event, State) ->
?LOG_WARNING(#{description => "Unhandled cast event", event => Event}),
{noreply, State}.
handle_info(heartbeat, State) ->
%% Generate message with monotonically increasing integer.
Counter = erlang:unique_integer([monotonic, positive]),
%% Make sure the node prefixes the timestamp with it's own
%% identifier: this means that we can have this tree
%% participate in multiple trees, each rooted at a different
%% node.
Timestamp = {partisan:node(), Counter},
%% Insert a new message into the table.
true = ets:insert(?MODULE, [{Timestamp, true}]),
%% Send message with monotonically increasing integer.
ok = partisan_plumtree_broadcast:broadcast(
#broadcast{timestamp = Timestamp},
?MODULE
),
?LOG_DEBUG(
"Heartbeat triggered: sending ping ~p to ensure tree.",
[Timestamp]
),
%% Schedule report.
schedule_heartbeat(),
{noreply, State};
handle_info(Event, State) ->
?LOG_WARNING(#{description => "Unhandled info event", event => Event}),
{noreply, State}.
-spec terminate(term(), state()) -> term().
terminate(_Reason, _State) ->
ok.
-spec code_change(term() | {down, term()}, state(), term()) ->
{ok, state()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% =============================================================================
%% PRIVATE
%% =============================================================================
%% @private
schedule_heartbeat() ->
case partisan_config:get(broadcast, false) of
true ->
Interval = partisan_config:get(broadcast_heartbeat_interval, 10000),
timer:send_after(Interval, heartbeat);
false ->
ok
end.