%% =============================================================================
%% SPDX-FileCopyrightText: 2016 Christopher Meiklejohn
%% SPDX-FileCopyrightText: 2021 - 2025 Alejandro Ramallo
%% SPDX-License-Identifier: Apache-2.0
%% =============================================================================
-module(partisan_peer_service_server).
-author("Christopher S. Meiklejohn <christopher.meiklejohn@gmail.com>").
-behaviour(acceptor).
-behaviour(gen_server).
-include("partisan.hrl").
-include("partisan_logger.hrl").
-include("partisan_peer_socket.hrl").
-record(state, {
socket :: partisan_peer_socket:t(),
peer_node :: node(),
channel :: partisan:channel(),
ref :: reference(),
ping_idle_timeout :: non_neg_integer(),
ping_tref :: optional(partisan_remote_ref:r()),
ping_retry :: optional(partisan_retry:t()),
ping_id :: optional(partisan:reference())
}).
-type state_t() :: #state{}.
%% Acceptor callbacks
-export([acceptor_init/3,
acceptor_continue/3,
acceptor_terminate/2]).
%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3]).
%% =============================================================================
%% ACCEPTOR CALLBACKS
%% =============================================================================
acceptor_init(_SockName, LSocket, []) ->
%% monitor listen socket to gracefully close when it closes
MRef = monitor(port, LSocket),
{ok, MRef}.
acceptor_continue(_PeerName, Socket0, MRef) ->
put({?MODULE, ingress_delay}, partisan_config:get(ingress_delay, 0)),
Socket = partisan_peer_socket:accept(Socket0),
send_message(Socket, {hello, partisan:node()}),
State0 = #state{socket = Socket, ref = MRef},
State = maybe_enable_ping(
State0,
partisan_config:get(connection_ping, #{})
),
gen_server:enter_loop(?MODULE, [], State).
acceptor_terminate(Reason, _) ->
%% Something went wrong. Either the acceptor_pool is terminating
%% or the accept failed.
exit(Reason).
%% =============================================================================
%% GEN_SERVER CALLBACKS
%% =============================================================================
init(_) ->
{stop, acceptor}.
handle_call(Req, _, State) ->
{stop, {bad_call, Req}, State}.
handle_cast(Req, State) ->
{stop, {bad_cast, Req}, State}.
handle_info({Tag, _RawSocket, Data}, #state{} = State)
when ?DATA_MSG(Tag) ->
Msg = binary_to_term(Data),
?LOG_TRACE("Received data from socket: ~p", [Msg]),
ok = maybe_delay(),
ok = reset_socket_opts(State),
handle_inbound(Msg, State);
handle_info({Tag, _RawSocket, Reason}, #state{} = State)
when ?ERROR_MSG(Tag) ->
?LOG_ERROR(#{
description => "Connection socket error, closing",
socket => State#state.socket,
reason => Reason
}),
{stop, Reason, State};
handle_info({Tag, _RawSocket}, State) when ?CLOSED_MSG(Tag) ->
?LOG_TRACE(
"Connection socket ~p has been remotely closed",
[State#state.socket]
),
{stop, normal, State};
handle_info({'DOWN', MRef, port, _, _}, #state{ref=MRef} = State) ->
%% Listen socket closed
{stop, normal, State};
handle_info(
{timeout, Ref, ping_idle_timeout}, #state{ping_tref = Ref} = State) ->
?LOG_INFO(#{
description => "Connection idle, sending ping",
peer_node => State#state.peer_node,
channel => State#state.channel,
attempt => partisan_retry:count(State#state.ping_retry)
}),
maybe_send_ping(State);
handle_info({timeout, Ref, ping_timeout}, #state{ping_tref = Ref} = State) ->
?LOG_INFO(#{
description => "Ping timeout, retrying",
attempts => partisan_retry:count(State#state.ping_retry)
}),
maybe_send_ping(State);
handle_info(_, State) ->
{noreply, State}.
terminate(_, State) ->
ok = partisan_peer_socket:close(State#state.socket),
ok.
-spec code_change(term() | {down, term()}, state_t(), term()) ->
{ok, state_t()}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% =============================================================================
%% PRIVATE
%% =============================================================================
handle_inbound({hello, Node, Channel}, #state{} = State0) ->
%% Get our tag, if set.
Tag = partisan_config:get(tag, undefined),
%% Store node and channel in the process dictionary.
put({?MODULE, peer}, Node),
put({?MODULE, channel}, Channel),
State = State0#state{peer_node = Node},
%% Connect the node with Distributed Erlang, just for now for
%% control messaging in the test suite execution.
case maybe_connect_disterl(Node) of
ok ->
%% Send our state to the remote service, in case they want
%% it to bootstrap.
Manager = partisan_peer_service:manager(),
{ok, LocalState} = Manager:get_local_state(),
send_message(State#state.socket, {state, Tag, LocalState}),
ok;
error ->
?LOG_INFO(#{description => "Node could not be connected."}),
send_message(State#state.socket, {hello, {error, pang}}),
ok
end,
{noreply, reset_ping(State)};
handle_inbound(#ping{from = Node} = Ping, #state{peer_node = Node} = State) ->
ok = send_pong(State, Ping),
{noreply, reset_ping(State)};
handle_inbound(#ping{} = Ping, #state{} = State) ->
?LOG_WARNING(#{
description => "Received invalid ping message",
message => Ping
}),
{noreply, State};
handle_inbound(
#pong{from = Node, id = Id, timestamp = Ts},
#state{peer_node = Node, ping_id = Id} = State) ->
ok = telemetry:execute(
[partisan, connection, server, hearbeat],
#{latency => erlang:system_time(millisecond) - Ts},
#{
node => partisan:node(),
channel => State#state.channel,
socket => State#state.socket,
peer_node => Node
}
),
{noreply, reset_ping(State)};
handle_inbound(#pong{} = Pong, #state{} = State) ->
{stop, {invalid_ping_response, Pong}, State};
handle_inbound(Message, State) ->
PeerNode = get({?MODULE, peer_node}),
Channel = get({?MODULE, channel}),
Manager = partisan_peer_service:manager(),
_ = Manager:receive_message(PeerNode, Channel, Message),
?LOG_TRACE("Dispatched ~p to manager.", [Message]),
{noreply, reset_ping(State)}.
%% @private
reset_socket_opts(State) ->
partisan_peer_socket:setopts(State#state.socket, [{active, once}]).
%% @private
maybe_delay() ->
case get({?MODULE, ingress_delay}) of
0 ->
ok;
Other ->
timer:sleep(Other)
end.
%% @private
send_message(Socket, Message) ->
Data = erlang:term_to_iovec(Message),
send_data(Socket, Data).
send_data(Socket, Data) ->
partisan_peer_socket:send(Socket, Data).
%% @private
maybe_connect_disterl(Node) ->
case partisan_config:get(connect_disterl, false) of
true ->
case net_adm:ping(Node) of
pong ->
ok;
pang ->
error
end;
false ->
ok
end.
%% =============================================================================
%% PRIVATE: KEEP ALIVE PING
%% =============================================================================
%% @private
maybe_enable_ping(State, #{enabled := true} = PingOpts) ->
IdleTimeout = maps:get(idle_timeout, PingOpts),
Timeout = maps:get(timeout, PingOpts),
Attempts = maps:get(max_attempts, PingOpts),
Retry = partisan_retry:init(
ping_timeout,
#{
deadline => 0, % disable, use max_retries only
interval => Timeout,
max_retries => Attempts,
backoff_enabled => false
}
),
State#state{
ping_idle_timeout = IdleTimeout,
ping_id = partisan:make_ref(),
ping_retry = Retry
};
maybe_enable_ping(#{enabled := false}, State) ->
State.
%% @private
reset_ping(State) ->
%% The client ping idle_timeout is the same as ours,
%% so we offset the server timeout to avoid synchronization.
reset_ping(State, trunc(State#state.ping_idle_timeout * 0.25)).
%% @private
reset_ping(#state{ping_retry = undefined} = State, _Offset) ->
State;
reset_ping(#state{ping_tref = undefined} = State, Offset) ->
Time = State#state.ping_idle_timeout + Offset,
Ref = erlang:start_timer(Time, self(), ping_idle_timeout),
State#state{ping_tref = Ref};
reset_ping(#state{} = State, Offset) ->
_ = erlang:cancel_timer(State#state.ping_tref),
{_, Retry} = partisan_retry:succeed(State#state.ping_retry),
Time = State#state.ping_idle_timeout + Offset,
Ref = erlang:start_timer(Time, self(), ping_idle_timeout),
State#state{
ping_retry = Retry,
ping_tref = Ref
}.
%% @private
maybe_send_ping(#state{ping_idle_timeout = undefined} = State) ->
{noreply, State};
maybe_send_ping(#state{} = State) ->
{Result, Retry} = partisan_retry:fail(State#state.ping_retry),
maybe_send_ping(Result, State#state{ping_retry = Retry}).
%% @private
maybe_send_ping(Limit, State)
when Limit == deadline orelse Limit == max_retries ->
{stop, {shutdown, ping_timeout}, State};
maybe_send_ping(_Time, #state{} = State0) ->
State = send_ping(State0),
{noreply, State}.
%% @private
send_ping(State) ->
Ref = partisan:make_ref(),
Ping = #ping{
from = partisan:node(),
id = Ref,
timestamp = erlang:system_time(millisecond)
},
ok = send_message(State#state.socket, Ping),
%% We schedule the next retry
Tref = partisan_retry:fire(State#state.ping_retry),
State#state{ping_id = Ref, ping_tref = Tref}.
%% @private
send_pong(State, #ping{id = Id, timestamp = Ts}) ->
Pong = #pong{
from = partisan:node(),
id = Id,
timestamp = Ts
},
send_message(State#state.socket, Pong).