src/partisan_trace_orchestrator.erl

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2018 Christopher S. Meiklejohn.  All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% -----------------------------------------------------------------------------
%% @doc
%% This module requires `disterl'.
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_trace_orchestrator).
-author("Christopher Meiklejohn <christopher.meiklejohn@gmail.com>").

-behaviour(gen_server).

-include("partisan.hrl").
-include("partisan_logger.hrl").

-define(MANAGER, partisan_pluggable_peer_service_manager).

%% API
-export([start_link/0,
         start_link/1,
         stop/0,
         trace/2,
         replay/2,
         reset/0,
         enable/1,
         identify/1,
         print/0,
         perform_preloads/1]).

%% gen_server callbacks
-export([init/1,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         terminate/2,
         code_change/3]).

-record(state, {previous_trace=[],
                trace=[],
                enabled=false,
                nodes=[],
                replay=false,
                shrinking=false,
                blocked_processes=[],
                identifier=undefined}).

%%%===================================================================
%%% API
%%%===================================================================

%% @doc Same as start_link([]).
start_link() ->
    start_link([]).

stop() ->
    gen_server:stop({global, ?MODULE}, normal, infinity).

%% @doc Start and link to calling process.
start_link(Args) ->
    gen_server:start_link({global, ?MODULE}, ?MODULE, Args, []).

%% @doc Record trace message.
trace(Type, Message) ->
    gen_server:cast({global, ?MODULE}, {trace, Type, Message}).

%% @doc Replay trace.
replay(Type, Message) ->
    gen_server:call({global, ?MODULE}, {replay, Type, Message}, infinity).

%% @doc Enable trace.
enable(Nodes) ->
    gen_server:call({global, ?MODULE}, {enable, Nodes}, infinity).

%% @doc Reset trace.
reset() ->
    gen_server:call({global, ?MODULE}, reset, infinity).

%% @doc Print trace.
print() ->
    gen_server:call({global, ?MODULE}, print, infinity).

%% @doc Identify trace.
identify(Identifier) ->
    gen_server:call({global, ?MODULE}, {identify, Identifier}, infinity).

%% @doc Perform preloads.
perform_preloads(Nodes) ->
    %% This is a replay, so load the preloads.
    replay_debug("loading preload omissions.", []),
    preload_omissions(Nodes),
    replay_debug("preloads finished.", []),

    ok.

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

%% @private
-spec init([]) -> {ok, #state{}}.
init([]) ->
    debug("test orchestrator started on node: ~p", [partisan:node()]),
    State = initialize_state(),
    {ok, State}.

%% @private
-spec handle_call(term(), {pid(), term()}, #state{}) ->
    {reply, term(), #state{}} | {noreply, #state{}}.

handle_call({enable, Nodes}, _From, #state{enabled=false}=State)
when is_list(Nodes) ->
    ?LOG_INFO("enabling tracing for nodes: ~p", [Nodes]),

    %% Add send and receive pre-interposition functions to enforce message
    %% ordering.
    PreInterpositionFun = fun({Type, OriginNode, OriginalMessage}) ->
        %% TODO: This needs to be fixed: replay and trace need to be done
        %% atomically otherwise processes will race to write trace entry when
        %% they are unblocked from retry: this means that under replay the trace
        %% file might generate small permutations of messages which means it's
        %% technically not the same trace.
        ?LOG_INFO("pre interposition fired for message type: ~p", [Type]),

        %% Under replay ensure they match the trace order (but only for
        %% pre-interposition messages).
        ok = partisan_trace_orchestrator:replay(
            pre_interposition_fun, {node(), Type, OriginNode, OriginalMessage}
        ),

        %% Record message incoming and outgoing messages.
        ok = partisan_trace_orchestrator:trace(
            pre_interposition_fun, {node(), Type, OriginNode, OriginalMessage}
        )
    end,

    lists:foreach(
        fun({_Name, Node}) when is_atom(Node)->
            ?LOG_INFO(
                "Installing $tracing pre-interposition function on node: ~p",
                [Node]
            ),
            ok = rpc:call(
                Node,
                ?MANAGER,
                add_pre_interposition_fun,
                ['$tracing', PreInterpositionFun]
            )
        end,
        Nodes
    ),

    %% Add send and receive post-interposition functions to perform tracing.
    PostInterpositionFun = fun({Type, OriginNode, OriginalMessage}, {Type, OriginNode, RewrittenMessage}) ->
        %% Record outgoing message after transformation.
    ?LOG_INFO("post interposition fired for message type: ~p", [Type]),
        ok = partisan_trace_orchestrator:trace(
            post_interposition_fun,
            {node(), OriginNode, Type, OriginalMessage, RewrittenMessage}
        )
    end,
    lists:foreach(
        fun({_Name, Node}) when is_atom(Node) ->
            ok = rpc:call(
                Node,
                ?MANAGER,
                add_post_interposition_fun,
                ['$tracing', PostInterpositionFun]
            )
        end,
        Nodes
    ),

    {reply, ok, State#state{nodes=Nodes, enabled=true}};

handle_call(Message, _From, #state{enabled=false}=State) ->
    replay_debug("ignoring ~p as tracing is disabled.", [Message]),
    {reply, ok, State};

handle_call({replay, Type, Message}, From, #state{previous_trace=PreviousTrace0, replay=Replay, shrinking=Shrinking, blocked_processes=BlockedProcesses0}=State) ->
    case Replay of
        true ->
            %% Should we enforce trace order during replay?
            ShouldEnforce = case Type of
                pre_interposition_fun ->
                    %% Destructure pre-interposition trace message.
                    {_TracingNode, InterpositionType, _OriginNode, MessagePayload} = Message,

                    case is_membership_strategy_message(InterpositionType, MessagePayload) of
                        true ->
                            membership_strategy_tracing();
                        false ->
                            true
                    end;
                _ ->
                    true
            end,

            case ShouldEnforce of
                false ->
                    {reply, ok, State};
                true ->
                    %% Find next message that should arrive based on the trace.
                    %% Can we process immediately?
                    case can_deliver_based_on_trace(Shrinking, {Type, Message}, PreviousTrace0, BlockedProcesses0) of
                        true ->
                            %% Deliver as much as we can.
                            {PreviousTrace, BlockedProcesses} = trace_deliver(Shrinking, {Type, Message}, PreviousTrace0, BlockedProcesses0),

                            %% Record new trace position and new list of blocked processes.
                            {reply, ok, State#state{blocked_processes=BlockedProcesses, previous_trace=PreviousTrace}};
                        false ->
                            %% If not, store message, block caller until
                            %% processed.
                            BlockedProcesses = [
                                {{Type, Message}, From} | BlockedProcesses0
                            ],
                            %% Block the process.
                            {noreply, State#state{blocked_processes=BlockedProcesses}}
                    end
            end;
        false ->
            {reply, ok, State}
    end;
handle_call(reset, _From, _State) ->
    replay_debug("resetting trace.", []),
    State = initialize_state(),
    {reply, ok, State};
handle_call({identify, Identifier}, _From, State) ->
    replay_debug("identifying trace: ~p", [Identifier]),
    {reply, ok, State#state{identifier=Identifier}};
handle_call(print, _From, #state{trace=Trace}=State) ->
    replay_debug("printing trace", []),

    lists:foreach(fun({Type, Message}) ->
        case Type of
            enter_command ->
                %% Destructure message.
                {TracingNode, Command} = Message,

                %% Format trace accordingly.
                replay_debug("~p entering command: ~p", [TracingNode, Command]);
            exit_command ->
                %% Destructure message.
                {TracingNode, Command} = Message,

                %% Format trace accordingly.
                replay_debug("~p leaving command: ~p", [TracingNode, Command]);
            pre_interposition_fun ->
                %% Destructure message.
                {_TracingNode, _InterpositionType, _OriginNode, _MessagePayload} = Message,

                %% Do nothing.
                ok;
            interposition_fun ->
                %% Destructure message.
                {TracingNode, OriginNode, InterpositionType, MessagePayload} = Message,

                %% Format trace accordingly.
                case InterpositionType of
                    receive_message ->
                        % replay_debug("~p <- ~p: ~p", [TracingNode, OriginNode, MessagePayload]),
                        ok;
                    forward_message ->
                        replay_debug("~p => ~p: ~p", [TracingNode, OriginNode, MessagePayload])
                end;
            post_interposition_fun ->
                %% Destructure message.
                {TracingNode, OriginNode, InterpositionType, MessagePayload, RewrittenMessagePayload} = Message,

                case is_membership_strategy_message(InterpositionType, MessagePayload) andalso not membership_strategy_tracing() of
                    true ->
                        %% Protocol message and we're not tracing protocol messages.
                        ok;
                    _ ->
                        %% Otherwise, format trace accordingly.
                        case MessagePayload =:= RewrittenMessagePayload of
                            true ->
                                case InterpositionType of
                                    receive_message ->
                                        % replay_debug("* ~p <- ~p: ~p", [TracingNode, OriginNode, MessagePayload]),
                                        ok;
                                    forward_message ->
                                        replay_debug("~p => ~p: ~p", [TracingNode, OriginNode, MessagePayload])
                                end;
                            false ->
                                case RewrittenMessagePayload of
                                    undefined ->
                                        case InterpositionType of
                                            receive_message ->
                                                replay_debug("* ~p <- ~p: DROPPED ~p", [TracingNode, OriginNode, MessagePayload]);
                                            forward_message ->
                                                replay_debug("~p => ~p: DROPPED ~p", [TracingNode, OriginNode, MessagePayload])
                                        end;
                                    _ ->
                                        case InterpositionType of
                                            receive_message ->
                                                replay_debug("* ~p <- ~p: REWROTE ~p to ~p", [TracingNode, OriginNode, MessagePayload, RewrittenMessagePayload]);
                                            forward_message ->
                                                replay_debug("~p => ~p: REWROTE ~p to ~p", [TracingNode, OriginNode, MessagePayload, RewrittenMessagePayload])
                                        end
                                end
                        end
                end;
            _ ->
                replay_debug("unknown message type: ~p, message: ~p", [Type, Message])
        end
    end, Trace),

    write_trace(Trace),

    {reply, ok, State};
handle_call(Msg, _From, State) ->
    replay_debug("Unhandled call messages: ~p", [Msg]),
    {reply, ok, State}.

%% @private
-spec handle_cast(term(), #state{}) -> {noreply, #state{}}.
handle_cast({trace, Type, Message}, #state{trace=Trace0}=State) ->
    {noreply, State#state{trace=Trace0++[{Type, Message}]}};
handle_cast(Msg, State) ->
    replay_debug("Unhandled cast messages: ~p", [Msg]),
    {noreply, State}.

%% @private
-spec handle_info(term(), #state{}) -> {noreply, #state{}}.
handle_info(Msg, State) ->
    replay_debug("Unhandled info messages: ~p", [Msg]),
    {noreply, State}.

%% @private
-spec terminate(term(), #state{}) -> term().
terminate(_Reason, #state{nodes=Nodes}) ->
    lists:foreach(fun({_Name, Node}) ->
        replay_debug("removing tracing pre-interposition on node: ~p", [Node]),
        rpc:call(Node, ?MANAGER, remove_pre_interposition_fun, ['$tracing'])
    end, Nodes),

    lists:foreach(fun({_Name, Node}) ->
        replay_debug("removing tracing post-interposition on node: ~p", [Node]),
        rpc:call(Node, ?MANAGER, remove_post_interposition_fun, ['$tracing'])
    end, Nodes),

    ok.

%% @private
-spec code_change(term() | {down, term()}, #state{}, term()) -> {ok, #state{}}.
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%%===================================================================
%%% Internal functions
%%%===================================================================

%% @private
-spec can_deliver_based_on_trace(term(), term(), term(), [term()]) ->
    true | false.

can_deliver_based_on_trace(Shrinking, {Type, Message}, PreviousTrace, BlockedProcesses)  ->
    replay_debug("determining if message ~p: ~p can be delivered.", [Type, Message]),

    case PreviousTrace of
        [{NextType, NextMessage} | _] ->
            replay_debug("waiting for message ~p: ~p ", [NextType, NextMessage]),

            CanDeliver = case {NextType, NextMessage} of
                {Type, Message} ->
                    replay_debug("=> YES!", []),
                    true;
                _ ->
                    %% But, does the message actually exist in the trace?
                    case lists:member({Type, Message}, PreviousTrace) of
                        true ->
                            replay_debug("=> NO, waiting for message: ~p: ~p", [NextType, NextMessage]),
                            false;
                        false ->
                            %% new messages in the middle of the trace.
                            %% this *should* be the common case if we shrink from the start of the trace forward (foldl)
                            case Shrinking of
                                true ->
                                    replay_debug("=> CONDITIONAL YES, message doesn't exist in previous trace, but shrinking: ~p ~p", [Type, Message]),
                                    true;
                                false ->
                                    replay_debug("=> NO, waiting for message: ~p: ~p", [NextType, NextMessage]),
                                    false
                            end
                    end
            end,

            replay_debug("can deliver: ~p", [CanDeliver]),
            replay_debug("blocked processes: ~p", [length(BlockedProcesses)]),

            CanDeliver;
        [] ->
            %% end of trace, but we are still receiving messages.
            %% this *should* be the common case if we shrink from the back of the trace forward (foldr)
            case Shrinking of
                true ->
                    replay_debug("=> CONDITIONAL YES, message doesn't exist in previous trace, but shrinking: ~p ~p", [Type, Message]),
                    true;
                false ->
                    replay_debug("=> message should not have been delivered, blocking.", []),
                    false
            end
    end.

%% @private
trace_deliver(Shrinking, {Type, Message}, [{Type, Message} | Trace], BlockedProcesses) ->
    replay_debug("delivering single message!", []),

    %% Advance the trace, then try to flush the blocked processes.
    trace_deliver_log_flush(Shrinking, Trace, BlockedProcesses);
trace_deliver(_Shrinking, {_, _}, [{_, _} = NextMessage | Trace], BlockedProcesses) ->
    replay_debug("delivering single message (not in the trace)!", []),

    %% Advance the trace, don't flush blocked processes, since we weren't in the trace, nothing is blocked.
    {[NextMessage|Trace], BlockedProcesses};
trace_deliver(_Shrinking, {_, _}, [], BlockedProcesses) ->
    replay_debug("delivering single message (not in the trace -- end of trace)!", []),

    %% No trace to advance.
    {[], BlockedProcesses}.

%% @private
trace_deliver_log_flush(Shrinking, Trace0, BlockedProcesses0) ->
    replay_debug("attempting to flush blocked messages!", []),

    %% Iterate blocked processes in an attempt to remove one.
    {ND, T, BP} = lists:foldl(fun({{NextType, NextMessage}, Pid} = BP, {NumDelivered1, Trace1, BlockedProcesses1}) ->
        case can_deliver_based_on_trace(Shrinking, {NextType, NextMessage}, Trace1, BlockedProcesses0) of
            true ->
                replay_debug("message ~p ~p can be unblocked!", [NextType, NextMessage]),

                %% Advance the trace.
                [{_, _} | RestOfTrace] = Trace1,

                %% Advance the count of delivered messages.
                NewNumDelivered = NumDelivered1 + 1,

                %% Unblock the process.
                gen_server:reply(Pid, ok),

                %% Remove from the blocked processes list.
                NewBlockedProcesses = BlockedProcesses1 -- [BP],

                {NewNumDelivered, RestOfTrace, NewBlockedProcesses};
            false ->
                replay_debug("pid ~p CANNOT be unblocked yet, unmet dependencies!", [Pid]),

                {NumDelivered1, Trace1, BlockedProcesses1}
        end
    end, {0, Trace0, BlockedProcesses0}, BlockedProcesses0),

    %% Did we deliver something? If so, try again.
    case ND > 0 of
        true ->
            %% replay_debug("was able to deliver a message, trying again", []),
            trace_deliver_log_flush(Shrinking, T, BP);
        false ->
            replay_debug("flush attempt finished.", []),
            {T, BP}
    end.

%% @private
preload_omission_file() ->
    case os:getenv("PRELOAD_OMISSIONS_FILE") of
        false ->
            undefined;
        Other ->
            Other
    end.

%% @private
trace_file() ->
    case os:getenv("TRACE_FILE") of
        false ->
            "/tmp/partisan-latest.trace";
        Other ->
            Other
    end.

%% @private
replay_trace_file() ->
    case os:getenv("REPLAY_TRACE_FILE") of
        false ->
            "/tmp/partisan-latest.trace";
        Other ->
            Other
    end.

%% @private
initialize_state() ->
    case os:getenv("REPLAY_DEBUG", "false") of
        "true" ->
            partisan_config:set(replay_debug, true);
        _ ->
            partisan_config:set(replay_debug, false)
    end,

    case os:getenv("REPLAY") of
        false ->
            %% This is not a replay, so store the current trace.
            replay_debug("recording trace to file.", []),
            #state{enabled=false, trace=[], blocked_processes=[], identifier=undefined};
        _ ->
            %% Mark that we are in replay mode.
            partisan_config:set(replaying, true),

            %% This is a replay, so load the previous trace.
            replay_debug("loading previous trace for replay.", []),

            ReplayTraceFile = replay_trace_file(),
            {ok, Lines} = partisan_trace_file:read(ReplayTraceFile),

            lists:foreach(fun(Line) -> replay_debug("~p", [Line]) end, Lines),

            replay_debug("trace loaded.", []),

            Shrinking = case os:getenv("SHRINKING") of
                false ->
                    false;
                _ ->
                    %% Mark that we are in shrinking mode.
                    partisan_config:set(shrinking, true),

                    true
            end,

            #state{enabled=false, trace=[], previous_trace=Lines, replay=true, shrinking=Shrinking, blocked_processes=[], identifier=undefined}
    end.

%% @private
write_trace(Trace) ->
    %% Write trace.
    FilteredTrace = lists:filter(fun({Type, Message}) ->
        case Type of
            pre_interposition_fun ->
                %% Trace all entry points if protocol message, unless tracing enabled.
                {_TracingNode, InterpositionType, _OriginNode, MessagePayload} = Message,

                case is_membership_strategy_message(InterpositionType, MessagePayload) of
                    true ->
                        %% Trace protocol messages only if protocol tracing is enabled.
                        membership_strategy_tracing();
                    false ->
                        %% Always trace non-protocol messages.
                        true
                end;
            enter_command ->
                true;
            exit_command ->
                true;
            _ ->
                false
        end
    end, Trace),

    %% Write the trace file out.
    TraceFile = trace_file(),
    ok = partisan_trace_file:write(TraceFile, FilteredTrace),

    ok.

%% Should we do replay debugging?
replay_debug(Line, Args) ->
    case partisan_config:get(replay_debug) of
        true ->
            ?LOG_INFO("~p: " ++ Line, [?MODULE] ++ Args);
        _ ->
            ok
    end.

debug(Line, Args) ->
    ?LOG_INFO("~p: " ++ Line, [?MODULE] ++ Args).

%% @private
membership_strategy_tracing() ->
    partisan_config:get(membership_strategy_tracing, ?MEMBERSHIP_STRATEGY_TRACING).

%% @private
preload_omissions(Nodes) ->
    PreloadOmissionFile = preload_omission_file(),

    case PreloadOmissionFile of
        undefined ->
            replay_debug("no preload omissions file...", []),
            ok;
        _ ->
            {ok, Omissions} = partisan_trace_file:read(PreloadOmissionFile),

            %% Preload each omission at the correct node.
            lists:foldl(fun({T, Message}, OmissionNodes0) ->
                case T of
                    pre_interposition_fun ->
                        {TracingNode, forward_message, OriginNode, MessagePayload} = Message,

                        replay_debug("enabling preload omission for ~p => ~p: ~p", [TracingNode, OriginNode, MessagePayload]) ,

                        InterpositionFun = fun
                            ({forward_message, N, M}) ->
                                case N of
                                    OriginNode ->
                                        case M of
                                            MessagePayload ->
                                                ?LOG_INFO("~p: dropping packet from ~p to ~p due to preload interposition.", [partisan:node(), TracingNode, OriginNode]),

                                                case partisan_config:get(fauled_for_background) of
                                                    true ->
                                                        ok;
                                                    _ ->
                                                        ?LOG_INFO("~p: setting node ~p to faulted due to preload interposition hit on message: ~p", [partisan:node(), TracingNode, Message]),
                                                        partisan_config:set(fauled_for_background, true)
                                                end,

                                                undefined;
                                            Other ->
                                                ?LOG_INFO("~p: allowing message, doesn't match interposition payload while node matches", [partisan:node()]),
                                                ?LOG_INFO("~p: => expecting: ~p", [partisan:node(), MessagePayload]),
                                                ?LOG_INFO("~p: => got: ~p", [partisan:node(), Other]),
                                                M
                                        end;
                                    OtherNode ->
                                        ?LOG_INFO("~p: allowing message, doesn't match interposition as destination is ~p and not ~p", [partisan:node(), TracingNode, OtherNode]),
                                        M
                                end;
                            ({receive_message, _N, M}) ->
                                M
                        end,

                        %% Install function.
                        ok = rpc:call(TracingNode, ?MANAGER, add_interposition_fun, [{send_omission, OriginNode, Message}, InterpositionFun]),

                        lists:usort(OmissionNodes0 ++ [TracingNode]);
                    Other ->
                        replay_debug("unknown preload: ~p", [Other]),
                        OmissionNodes0
                end
            end, [], Omissions)
    end,

    %% Load background annotations.
    BackgroundAnnotations = background_annotations(),
    debug("background annotations are: ~p", [BackgroundAnnotations]),

    %% Install faulted tracing interposition function.
    lists:foreach(fun({_, Node}) ->
        InterpositionFun = fun({forward_message, _N, M}) ->
            case partisan_config:get(faulted) of
                true ->
                    case M of
                        undefined ->
                            undefined;
                        _ ->
                            replay_debug("~p: faulted during forward_message of background message, message ~p should be dropped.", [partisan:node(), M]),
                            undefined
                    end;
                _ ->
                    M
            end;
            ({receive_message, _N, M}) ->
                case partisan_config:get(faulted) of
                    true ->
                        case M of
                            undefined ->
                                undefined;
                            _ ->
                                replay_debug("~p: faulted during receive_message of background message, message ~p should be dropped.", [partisan:node(), M]),
                                undefined
                        end;
                    _ ->
                        M
                end
        end,

        %% Install function.
        replay_debug("installing faulted pre-interposition for node: ~p", [Node]),
        ok = rpc:call(Node, ?MANAGER, add_interposition_fun, [{faulted, Node}, InterpositionFun])
    end, Nodes),

    %% Install faulted_for_background tracing interposition function.
    lists:foreach(fun({_, Node}) ->
        InterpositionFun = fun({forward_message, _N, M}) ->
            replay_debug("~p: interposition called for message: ~p", [partisan:node(), M]),

            case partisan_config:get(faulted_for_background) of
                true ->
                    case M of
                        undefined ->
                            undefined;
                        _ ->
                            MessageType = message_type(forward_message, M),

                            case lists:member(element(2, MessageType), BackgroundAnnotations) of
                                true ->
                                    ?LOG_INFO("~p: faulted_for_background during forward_message of background message, message ~p should be dropped.", [partisan:node(), M]),
                                    undefined;
                                false ->
                                    ?LOG_INFO("~p: faulted_for_background, but forward_message payload is not background message: ~p, message_type: ~p", [partisan:node(), M, MessageType]),
                                    M
                            end
                    end;
                _ ->
                    M
            end;
            ({receive_message, _N, M}) ->
                case partisan_config:get(faulted_for_background) of
                    true ->
                        case M of
                            undefined ->
                                undefined;
                            _ ->
                                MessageType = message_type(receive_message, M),

                                case lists:member(element(2, MessageType), BackgroundAnnotations) of
                                    true ->
                                        ?LOG_INFO("~p: faulted_for_background during receive_message of background message, message ~p should be dropped.", [partisan:node(), M]),
                                        undefined;
                                    false ->
                                        ?LOG_INFO("~p: faulted_for_background, but receive_message payload is not background message: ~p, message_type: ~p", [partisan:node(), M, MessageType]),
                                        M
                                end
                        end;
                    _ ->
                        M
                end
        end,

        %% Install function.
        replay_debug("installing faulted_for_background pre-interposition for node: ~p", [Node]),
        ok = rpc:call(Node, ?MANAGER, add_interposition_fun, [{faulted_for_background, Node}, InterpositionFun])
    end, Nodes),

    ok.

%% @private
message_type(InterpositionType, MessagePayload) ->
    ?LOG_INFO("interposition_type: ~p, payload: ~p", [InterpositionType, MessagePayload]),

    case InterpositionType of
        forward_message ->
            MessageType1 = element(1, MessagePayload),

            ActualType = case MessageType1 of
                '$gen_cast' ->
                    CastMessage = element(2, MessagePayload),
                    element(1, CastMessage);
                _ ->
                    MessageType1
            end,

            {forward_message, ActualType};
        receive_message ->
            {forward_message, _Module, Payload} = MessagePayload,
            MessageType1 = element(1, Payload),

            ActualType = case MessageType1 of
                '$gen_cast' ->
                    CastMessage = element(2, Payload),
                    element(1, CastMessage);
                _ ->
                    MessageType1
            end,

            {receive_message, ActualType}
    end.


%% @private
background_annotations() ->
    %% Get module as string.
    background_annotations(os:getenv("IMPLEMENTATION_MODULE")).

%% @private
background_annotations(false) ->
    debug("IMPLEMENTATION_MODULE env var undefined", []),
    [];

background_annotations(ModuleString) ->
    %% Open the annotations file.
    AnnotationsFile = "./annotations/partisan-annotations-" ++ ModuleString,

    case filelib:is_file(AnnotationsFile) of
        false ->
            debug("Annotations file doesn't exist: ~p~n", [AnnotationsFile]),
            [];

        true ->
            {ok, [RawAnnotations]} = file:consult(AnnotationsFile),
            % debug("Raw annotations loaded: ~p~n", [RawAnnotations]),
            AllAnnotations = dict:from_list(RawAnnotations),
            % debug("Annotations loaded: ~p~n", [dict:to_list(AllAnnotations)]),

            % {ok, RawCausalityAnnotations} = dict:find(causality, AllAnnotations),
            % debug("Raw causality annotations loaded: ~p~n", [RawCausalityAnnotations]),

            % CausalityAnnotations = dict:from_list(RawCausalityAnnotations),
            % debug("Causality annotations loaded: ~p~n", [dict:to_list(CausalityAnnotations)]),

            {ok, BackgroundAnnotations} = dict:find(background, AllAnnotations),
            % debug("Background annotations loaded: ~p~n", [BackgroundAnnotations]),

            BackgroundAnnotations
    end.

%%%===================================================================
%%% Trace filtering: super hack, until we can refactor these messages.
%%%===================================================================

%% TODO: Find a way to disable distance metrics, which are non-deterministic
%% for the test execution.

%% TODO: Rename protocol messages, find a way to specify an external
%% function for filtering the trace -- do it from the harness.  Maybe
%% some sort of glob function for receives and forwards to filter.

%% TODO: Weird hack, now everything goes through the interposition mechanism
%% which means we can capture everything *good!* but we only want
%% to see a small subset of it -- acks, yes!, membership updates, sometimes!
%% but distances?  never.  so, we need some really terrible hacks here.

%% TODO: Change "protocol" tracing to "membership strategy" tracing.

%% Pre-interposition examples.
is_membership_strategy_message(receive_message, {_, _, {membership_strategy, _}}) ->
    true;

is_membership_strategy_message(forward_message, {membership_strategy, _}) ->
    true;

%% Post-interposition examples.
is_membership_strategy_message(forward_message, {_, _, {membership_strategy, _}}) ->
    true;

is_membership_strategy_message(receive_message, {membership_strategy, _}) ->
    true;

is_membership_strategy_message(_Type, _Message) ->
    false.