src/cets_call.erl

%% @doc Helper module for extended gen_server API.
-module(cets_call).

-export([long_call/2, long_call/3]).
-export([async_operation/2]).
-export([sync_operation/2]).
-export([send_leader_op/2]).
-export([wait_responses/2]).

-include_lib("kernel/include/logger.hrl").

-type request_id() :: cets:request_id().
%% Asynchronous request reference.

-type op() :: cets:op().
%% Operations are messages which could be buffered when a server is paused.
%% Operations are also broadcasted to the whole cluster.

-type server_ref() :: cets:server_ref().
%% Server name or pid.

-type long_msg() :: cets:long_msg().
%% Message type.

-type ok_or_error() :: ok | {error, Reason :: term()}.
%% Return result.

%% @doc Makes gen_server:call with better error reporting.
%%
%% It would log a warning if the call takes too long.
-spec long_call(server_ref(), long_msg()) -> term().
long_call(Server, Msg) ->
    long_call(Server, Msg, #{msg => Msg}).

%% @doc Makes gen_server:call with better error reporting.
-spec long_call(server_ref(), long_msg(), map()) -> term().
long_call(Server, Msg, Info) ->
    case where(Server) of
        Pid when is_pid(Pid) ->
            Info2 = Info#{server => Server, pid => Pid, node => node(Pid)},
            F = fun() -> gen_server:call(Pid, Msg, infinity) end,
            cets_long:run_tracked(Info2, F);
        undefined ->
            error({pid_not_found, Server})
    end.

%% @doc Contacts the local server to broadcast multinode operation.
%%
%% Returns immediately.
%% You can wait for response from all nodes by calling `wait_response/2'.
-spec async_operation(server_ref(), op()) -> request_id().
async_operation(Server, Op) ->
    gen_server:send_request(Server, {op, Op}).

%% @doc Contacts the local server to broadcast multinode operation.
%%
%% Blocks until the operation is applied on all nodes.
-spec sync_operation(server_ref(), op()) -> ok.
sync_operation(Server, Op) ->
    ok = gen_server:call(Server, {op, Op}, infinity).

-spec maybe_sync_operation(server_ref(), op()) -> ok_or_error().
maybe_sync_operation(Server, Op) ->
    gen_server:call(Server, {op, Op}, infinity).

-spec where(server_ref()) -> pid() | undefined.
where(Pid) when is_pid(Pid) -> Pid;
where(Name) when is_atom(Name) -> whereis(Name);
where({global, Name}) -> global:whereis_name(Name);
where({local, Name}) -> whereis(Name);
where({via, Module, Name}) -> Module:whereis_name(Name).

%% Wait around 15 seconds before giving up
%% (we don't count how much we spend calling the leader though)
%% If fails - this means there are some major issues
backoff_intervals() ->
    [10, 50, 100, 500, 1000, 5000, 5000].

%% @doc Sends all requests to a single node in the cluster.
-spec send_leader_op(server_ref(), op()) -> ok_or_error().
send_leader_op(Server, Op) ->
    send_leader_op(Server, Op, backoff_intervals()).

send_leader_op(Server, Op, Backoff) ->
    Leader = cets:get_leader(Server),
    Res = maybe_sync_operation(Leader, {leader_op, Op}),
    case Res of
        {error, {wrong_leader, ExpectedLeader}} ->
            ?LOG_WARNING(#{
                what => wrong_leader,
                server => Server,
                operation => Op,
                called_leader => Leader,
                expected_leader => ExpectedLeader
            }),
            %% This could happen if a new node joins the cluster.
            %% So, a simple retry should help.
            case Backoff of
                [Milliseconds | NextBackoff] ->
                    timer:sleep(Milliseconds),
                    send_leader_op(Server, Op, NextBackoff);
                [] ->
                    error(send_leader_op_failed)
            end;
        _ ->
            Res
    end.

%% @doc Waits for multiple responses at once.
-spec wait_responses([gen_server:request_id()], cets:response_timeout()) ->
    [cets:response_return()].
wait_responses([ReqId], Timeout) ->
    [gen_server:receive_response(ReqId, Timeout)];
wait_responses(ReqIds, Timeout) when is_integer(Timeout) ->
    Start = erlang:monotonic_time(millisecond),
    wait_responses(ReqIds, {abs, Start + Timeout});
wait_responses(ReqIds, {abs, _} = Timeout) ->
    [gen_server:receive_response(ReqId, Timeout) || ReqId <- ReqIds];
wait_responses(ReqIds, infinity) ->
    [gen_server:receive_response(ReqId, infinity) || ReqId <- ReqIds].