src/partisan_peer_socket.erl

%% -------------------------------------------------------------------
%%
%% Copyright (c) 2017 Christopher Meiklejohn.  All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------

%% -----------------------------------------------------------------------------
%% @doc Wrapper that allows transparent usage of plain TCP or TLS socket
%% for peer connections.
%%
%% This module also implements the monotonic channel functionality.
%% @end
%% -----------------------------------------------------------------------------
-module(partisan_peer_socket).

-record(partisan_peer_socket, {
    socket              :: gen_tcp:socket() | ssl:sslsocket() | socket:socket(),
    transport           :: gen_tcp | ssl,
    control             :: inet | ssl,
    monotonic = false   :: boolean()
}).

-type t()               :: #partisan_peer_socket{}.
-type reason()          :: closed | inet:posix().
-type options()         :: [gen_tcp:option()] | map().


-export_type([t/0]).


-export([accept/1]).
-export([close/1]).
-export([connect/3]).
-export([connect/4]).
-export([connect/5]).
-export([recv/2]).
-export([recv/3]).
-export([send/2]).
-export([setopts/2]).
-export([socket/1]).



%% =============================================================================
%% API
%% =============================================================================



%% -----------------------------------------------------------------------------
%% @doc Wraps a TCP socket with the appropriate information for
%% transceiving on and controlling the socket later. If TLS/SSL is
%% enabled, this performs the socket upgrade/negotiation before
%% returning the wrapped socket.
%% @end
%% -----------------------------------------------------------------------------
-spec accept(gen_tcp:socket()) -> t().

accept(TCPSocket) ->
    case tls_enabled() of
        true ->
            TLSOpts = partisan_config:get(tls_server_options),
            %% as per http://erlang.org/doc/man/ssl.html#ssl_accept-1
            %% The listen socket is to be in mode {active, false} before
            %% telling the client that the server is ready to upgrade by
            %% calling this function, else the upgrade succeeds or does not
            %% succeed depending on timing.
            inet:setopts(TCPSocket, [{active, false}]),
            {ok, TLSSocket} = ssl:handshake(TCPSocket, TLSOpts),
            %% restore the expected active once setting
            ssl:setopts(TLSSocket, [{active, once}]),
            #partisan_peer_socket{
                socket = TLSSocket,
                transport = ssl,
                control = ssl
            };
        _ ->
            #partisan_peer_socket{
                socket = TCPSocket,
                transport = gen_tcp,
                control = inet
            }
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% @see gen_tcp:send/2
%% @see ssl:send/2
%% @end
%% -----------------------------------------------------------------------------
-spec send(t(), iodata()) -> ok | {error, reason()}.

send(#partisan_peer_socket{monotonic = false} = Conn, Data) ->
    Socket = Conn#partisan_peer_socket.socket,
    Transport = Conn#partisan_peer_socket.transport,
    send(Transport, Socket, Data);

send(#partisan_peer_socket{monotonic = true} = Conn, Data) ->
    Socket = Conn#partisan_peer_socket.socket,
    Transport = Conn#partisan_peer_socket.transport,

    %% Get the current process message queue length.
    {message_queue_len, MQLen} = process_info(self(), message_queue_len),

    %% Get last transmission time from process dictionary
    Time = get(last_transmission_time),

    %% Test for whether we should send or not.
    case monotonic_should_send(MQLen, Time) of
        false ->
            ok;
        true ->
            %% Update last transmission time on process dictionary
            put(last_transmission_time, monotonic_now()),
            send(Transport, Socket, Data)
    end.


%% -----------------------------------------------------------------------------
%% @doc
%% @see gen_tcp:recv/2
%% @see ssl:recv/2
%% @end
%% -----------------------------------------------------------------------------
-spec recv(t(), integer()) -> {ok, iodata()} | {error, reason()}.

recv(Conn, Length) ->
    recv(Conn, Length, infinity).


%% -----------------------------------------------------------------------------
%% @doc
%% @see gen_tcp:recv/3
%% @see ssl:recv/3
%% @end
%% -----------------------------------------------------------------------------
-spec recv(t(), integer(), timeout()) ->
    {ok, iodata()} | {error, reason()}.

recv(#partisan_peer_socket{socket = Socket, transport = Transport}, Length, Timeout) ->
    Transport:recv(Socket, Length, Timeout).


%% -----------------------------------------------------------------------------
%% @doc
%% @see inet:setopts/2
%% @see ssl:setopts/2
%% @end
%% -----------------------------------------------------------------------------
-spec setopts(t(), options()) -> ok | {error, inet:posix()}.

setopts(#partisan_peer_socket{} = Connection, Options) when is_map(Options) ->
    setopts(Connection, maps:to_list(Options));

setopts(#partisan_peer_socket{socket = Socket, control = Control}, Options) ->
    Control:setopts(Socket, Options).


%% -----------------------------------------------------------------------------
%% @doc
%% @see gen_tcp:close/1
%% @see ssl:close/1
%% @end
%% -----------------------------------------------------------------------------
-spec close(t()) -> ok.

close(#partisan_peer_socket{socket = Socket, transport = Transport}) ->
    Transport:close(Socket).


%% -----------------------------------------------------------------------------
%% @doc
%% @see gen_tcp:connect/3
%% @see ssl:connect/3
%% @end
%% -----------------------------------------------------------------------------
-spec connect(
    inet:socket_address() | inet:hostname(), inet:port_number(), options()) ->
    {ok, t()} | {error, inet:posix()}.

connect(Address, Port, Options) ->
    connect(Address, Port, Options, infinity).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec connect(
    inet:socket_address() | inet:hostname(),
    inet:port_number(),
    options(),
    timeout()) ->
    {ok, t()} | {error, inet:posix()}.

connect(Address, Port, Options, Timeout) ->
    connect(Address, Port, Options, Timeout, #{}).


%% -----------------------------------------------------------------------------
%% @doc
%% @end
%% -----------------------------------------------------------------------------
-spec connect(
    inet:socket_address() | inet:hostname(),
    inet:port_number(),
    options(),
    timeout(),
    map() | list()) -> {ok, t()} | {error, inet:posix()}.

connect(Address, Port, Options, Timeout, PartisanOptions)
when is_list(PartisanOptions) ->
    connect(Address, Port, Options, Timeout, maps:from_list(PartisanOptions));

connect(Address, Port, Options0, Timeout, PartisanOptions)
when is_map(PartisanOptions) ->
    Options = connection_options(Options0),

    case tls_enabled() of
        true ->
            TLSOptions = partisan_config:get(tls_client_options),
            do_connect(
                Address,
                Port,
                Options ++ TLSOptions,
                Timeout,
                ssl,
                ssl,
                PartisanOptions
            );
        _ ->
            do_connect(
                Address,
                Port,
                Options,
                Timeout,
                gen_tcp,
                inet,
                PartisanOptions
            )
    end.


%% -----------------------------------------------------------------------------
%% @doc Returns the wrapped socket from within the connection.
%% @end
%% -----------------------------------------------------------------------------
-spec socket(t()) -> gen_tcp:socket() | ssl:sslsocket().
socket(Conn) ->
    Conn#partisan_peer_socket.socket.



%% =============================================================================
%% PRIVATE
%% =============================================================================



%% @private
do_connect(Address, Port, ConnectOpts, Timeout, Transport, Control, Opts) ->
   Monotonic = maps:get(monotonic, Opts, false),

   case Transport:connect(Address, Port, ConnectOpts, Timeout) of
       {ok, Socket} ->
            Connection = #partisan_peer_socket{
                socket = Socket,
                transport = Transport,
                control = Control,
                monotonic = Monotonic
            },
           {ok, Connection};
       Error ->
           Error
   end.


%% @private
connection_options(Options) when is_map(Options) ->
    connection_options(maps:to_list(Options));

connection_options(Options) when is_list(Options) ->
    Options ++ [{nodelay, true}].


%% @private
tls_enabled() ->
    partisan_config:get(tls).


%% @private
monotonic_now() ->
    erlang:monotonic_time(millisecond).

%% @private
send(Transport, Socket, Data) ->
    %% Transmit the data on the socket.
    Transport:send(Socket, Data).


%% Determine if we should transmit:
%%
%% If there's another message in the queue, we can skip
%% sending this message.  However, if the arrival rate of
%% messages is too high, we risk starvation where
%% we may never send.  Therefore, we must force a transmission
%% after a given period with no transmissions.
%%
%% @private
monotonic_should_send(MessageQueueLen, LastTransmissionTime) ->
    case MessageQueueLen > 0 of
        true ->
            %% Messages in queue; conditional send.
            NowTime = monotonic_now(),

            Diff = abs(NowTime - LastTransmissionTime),

            SendWindow = partisan_config:get(send_window, 1000),

            Diff > SendWindow;
        false ->
            %% No messages in queue; transmit.
            true
    end.