src/partisan_peer_discovery_agent.erl

%% =============================================================================
%%  partisan_peer_discovery_agent.erl -
%%
%%  Copyright (c) 2022 Alejandro M. Ramallo. All rights reserved.
%%
%%  Licensed under the Apache License, Version 2.0 (the "License");
%%  you may not use this file except in compliance with the License.
%%  You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%%  Unless required by applicable law or agreed to in writing, software
%%  distributed under the License is distributed on an "AS IS" BASIS,
%%  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%%  See the License for the specific language governing permissions and
%%  limitations under the License.
%% =============================================================================

%% -----------------------------------------------------------------------------
%% @doc This state machine is responsible for enabled cluster peers
%% using the defined implementation backend (callback module).
%%
%%
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_peer_discovery_agent).
-behaviour(gen_statem).

-include_lib("kernel/include/logger.hrl").
-include("partisan_util.hrl").


-record(state, {
    enabled                                 ::  boolean(),
    callback_mod                            ::  module() | undefined,
    callback_config                         ::  map() | undefined,
    callback_state                          ::  any() | undefined,
    initial_delay                           ::  integer() | undefined,
    polling_interval                        ::  integer() | undefined,
    timeout                                 ::  integer() | undefined,
    peers = []                              ::  [partisan:node_spec()]
}).


%% API
-export([start/0]).
-export([start_link/0]).
-export([lookup/0]).
-export([enable/0]).
-export([disable/0]).
-export([status/0]).

%% gen_statem callbacks
-export([init/1]).
-export([callback_mode/0]).
-export([terminate/3]).
-export([code_change/4]).
-export([format_status/2]).

%% gen_statem states
-export([enabled/3]).
-export([disabled/3]).



%% =============================================================================
%% CALLBACKS
%% =============================================================================



%% -----------------------------------------------------------------------------
%% Initializes the peer discovery agent implementation
%% -----------------------------------------------------------------------------
-callback init(Opts :: map()) ->
    {ok, State :: any()}
    | {error, Reason ::  any()}.


%% -----------------------------------------------------------------------------
%%
%% -----------------------------------------------------------------------------
-callback lookup(State :: any(), Timeout :: timeout()) ->
    {ok, [partisan:node_spec()], NewState :: any()}
    | {error, Reason :: any(), NewState :: any()}.



%% =============================================================================
%% API
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec start() ->
    {ok, pid()} | ignore | {error, term()}.

start() ->
    Opts = partisan_config:get(peer_discovery, #{}),
    gen_statem:start({local, ?MODULE}, ?MODULE, [Opts], []).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec start_link() ->
    {ok, pid()} | ignore | {error, term()}.

start_link() ->
    Opts = partisan_config:get(peer_discovery, #{}),
    gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
lookup() ->
    gen_statem:call(?MODULE, lookup, 5000).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec enable() -> ok.

enable() ->
    gen_statem:call(?MODULE, enable, 5000).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec disable() -> ok.

disable() ->
    gen_statem:call(?MODULE, disable, 5000).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec status() -> enabled | disabled.

status() ->
    gen_statem:call(?MODULE, status, 5000).



%% =============================================================================
%% GEN_STATEM CALLBACKS
%% =============================================================================



init([#{enabled := true, type := Mod} = Opts]) when is_atom(Mod) ->
    CBConfig = maps:get(config, Opts, #{}),

    case Mod:init(CBConfig) of
        {ok, CBState} ->
            Delay = maps:get(initial_delay, Opts, timer:seconds(10)),
            Interval = maps:get(polling_interval, Opts, timer:seconds(60)),
            Timeout = maps:get(timeout, Opts, timer:seconds(5)),

            State = #state{
                enabled = true,
                callback_mod = Mod,
                callback_config = CBConfig,
                initial_delay = Delay,
                polling_interval = Interval,
                timeout = Timeout,
                callback_state = CBState
            },

            Action =
                case Delay > 0 of
                    true ->
                        ?LOG_INFO(#{
                            description =>
                                "Peer discovery agent will start after "
                                "initial delay",
                            delay_msecs => Delay
                        }),
                        {state_timeout, Delay, lookup, []};
                    false ->
                        {next_event, internal, lookup}
                end,

            {ok, enabled, State, [Action]};

        {error, Reason} ->
            ?LOG_ERROR(#{
                description =>
                    "Peer discovery agent could not start due to "
                    "misconfiguration.",
                reason => Reason
            }),
            {stop, Reason}
    end;

init([#{enabled := true} = Opts]) ->
    {stop, {invalid_config, Opts}};

init([Opts]) when is_list(Opts) ->
    init([maps:from_list(Opts)]);

init(_) ->
    State = #state{enabled = false},
    {ok, disabled, State}.


callback_mode() ->
    state_functions.


terminate(_Reason, _StateName, _State) ->
    ok.


code_change(_OldVsn, StateName, State, _Extra) ->
    {ok, StateName, State}.


%% Dialyzer does not know the existence of record_info
-dialyzer([{nowarn_function, format_status/2}]).

format_status(_Opts, [_PDict, StateName, #state{} = State]) ->
    maps:put(state, StateName, ?RECORD_TO_MAP(State)).



%% =============================================================================
%% STATE FUNCTIONS
%% =============================================================================


%% -----------------------------------------------------------------------------
%% @doc In this state the agent uses the callback module to discover peers
%% by calling its lookup/2 callback.
%% @end
%% -----------------------------------------------------------------------------
enabled({call, From}, enable, _State) ->
    ok = gen_statem:reply(From, ok),
    keep_state_and_data;

enabled({call, From}, disable, State) ->
    ok = gen_statem:reply(From, ok),
    {next_state, disabled, State};

enabled(state_timeout, lookup, State) ->
    %% The polling interval timeout, we need to perform a lookup
    {keep_state, State, [{next_event, internal, lookup}]};

enabled(internal, lookup, State0) ->
    CBMod = State0#state.callback_mod,
    CBState0 = State0#state.callback_state,
    Timeout = State0#state.timeout,

    {ok, Peers, CBState} = CBMod:lookup(CBState0, Timeout),

    ?LOG_DEBUG(#{
        description => "Got peer discovery lookup response",
        callback_mod => CBMod,
        response => Peers
    }),

    %% Add/remove peers from the membership view, this is the right way to do it
    %% as opposed to invididually join the peers. This is so that the peer
    %% service can decide which nodes to join based on the topology/strategy.
    Members = [partisan:node_spec() | Peers],
    %% update_members/1 will deduplicate members.
    ok = partisan_peer_service:update_members(Members),

    State = State0#state{callback_state = CBState},

    %% Schedule next lookup
    Action = {state_timeout, State#state.polling_interval, lookup, []},
    {keep_state, State, [Action]};

enabled(EventType, EventContent, State) ->
    handle_common_event(EventType, EventContent, enabled, State).



%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
disabled({call, From}, disable, _State) ->
    ok = gen_statem:reply(From, ok),
    keep_state_and_data;

disabled({call, From}, enable, #state{callback_mod = undefined}) ->
    ok = gen_statem:reply(From, {error, missing_configuration}),
    keep_state_and_data;

disabled({call, From}, enable, State) ->
    ok = gen_statem:reply(From, ok),
    {next_state, enabled, State, [{next_event, internal, next}]};

disabled(EventType, EventContent, State) ->
    handle_common_event(EventType, EventContent, disabled, State).



%% =============================================================================
%% PRIVATE
%% =============================================================================




%% @private
handle_common_event({call, From}, status, StateName, _State) ->
    ok = gen_statem:reply(From, StateName),
    keep_state_and_data;

handle_common_event(EventType, EventContent, _StateName, State) ->
    case EventType of
        {call, From} ->
            ok = gen_statem:reply(From, {error, unknown_call});
        _ ->
            ok
    end,

    ?LOG_DEBUG(#{
        description => "Unhandled event",
        callback_mod => State#state.callback_mod,
        type => EventType,
        content => EventContent
    }),

    keep_state_and_data.