%% -------------------------------------------------------------------
%%
%% Copyright (c) 2016 Christopher Meiklejohn.  All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License.  You may obtain
%% a copy of the License at
%%
%%   http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied.  See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(partisan_peer_service_server).
-author("Christopher S. Meiklejohn <christopher.meiklejohn@gmail.com>").
-include("partisan.hrl").
-include("partisan_logger.hrl").
-include("partisan_peer_socket.hrl").
-behaviour(acceptor).
-behaviour(gen_server).
%% acceptor api
-export([acceptor_init/3,
         acceptor_continue/3,
         acceptor_terminate/2]).
%% gen_server callbacks
-export([init/1,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         terminate/2,
         code_change/3]).
-record(state, {
    socket      ::  partisan_peer_socket:connection(),
    channel     ::  partisan:channel() | undefined,
    ref         ::  reference()
}).
-type state_t() :: #state{}.
acceptor_init(_SockName, LSocket, []) ->
    %% monitor listen socket to gracefully close when it closes
    MRef = monitor(port, LSocket),
    {ok, MRef}.
acceptor_continue(_PeerName, Socket0, MRef) ->
    put({?MODULE, ingress_delay}, partisan_config:get(ingress_delay, 0)),
    Socket = partisan_peer_socket:accept(Socket0),
    send_message(Socket, {hello, partisan:node()}),
    gen_server:enter_loop(?MODULE, [], #state{socket = Socket, ref = MRef}).
acceptor_terminate(Reason, _) ->
    %% Something went wrong. Either the acceptor_pool is terminating
    %% or the accept failed.
    exit(Reason).
%% =============================================================================
%% GEN_SERVER CALLBACKS
%% =============================================================================
init(_) ->
    {stop, acceptor}.
handle_call(Req, _, State) ->
    {stop, {bad_call, Req}, State}.
handle_cast(Req, State) ->
    {stop, {bad_cast, Req}, State}.
handle_info({Tag, _RawSocket, Data}, State=#state{socket = Socket})
when ?DATA_MSG(Tag) ->
    Decoded = binary_to_term(Data),
    ?LOG_TRACE("Received data from socket: ~p", [Decoded]),
    case get({?MODULE, ingress_delay}) of
        0 ->
            ok;
        Other ->
            timer:sleep(Other)
    end,
    handle_message(binary_to_term(Data), State),
    ok = partisan_peer_socket:setopts(Socket, [{active, once}]),
    {noreply, State};
handle_info({Tag, _RawSocket, Reason}, State=#state{socket=Socket})
when ?ERROR_MSG(Tag) ->
    ?LOG_ERROR(#{
        description => "Connection socket error, closing",
        socket => Socket,
        reason => Reason
    }),
    {stop, Reason, State};
handle_info({Tag, _RawSocket}, State) when ?CLOSED_MSG(Tag) ->
    ?LOG_TRACE(
        "Connection socket ~p has been remotely closed",
        [State#state.socket]
    ),
    {stop, normal, State};
handle_info({'DOWN', MRef, port, _, _}, #state{ref=MRef} = State) ->
    %% Listen socket closed
    {stop, normal, State};
handle_info(_, State) ->
    {noreply, State}.
terminate(_, State) ->
    ok = partisan_peer_socket:close(State#state.socket),
    ok.
-spec code_change(term() | {down, term()}, state_t(), term()) ->
    {ok, state_t()}.
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.
%% =============================================================================
%% PRIVATE
%% =============================================================================
handle_message({hello, Node, Channel}, #state{socket = Socket}) ->
    %% Get our tag, if set.
    Tag = partisan_config:get(tag, undefined),
    %% Store node and channel in the process dictionary.
    put({?MODULE, peer}, Node),
    put({?MODULE, channel}, Channel),
    %% Connect the node with Distributed Erlang, just for now for
    %% control messaging in the test suite execution.
    case maybe_connect_disterl(Node) of
        ok ->
            %% Send our state to the remote service, in case they want
            %% it to bootstrap.
            Manager = partisan_peer_service:manager(),
            {ok, LocalState} = Manager:get_local_state(),
            send_message(Socket, {state, Tag, LocalState}),
            ok;
        error ->
            ?LOG_INFO(#{description => "Node could not be connected."}),
            send_message(Socket, {hello, {error, pang}}),
            ok
    end;
handle_message(Message, _State) ->
    Peer = get({?MODULE, peer}),
    Channel = get({?MODULE, channel}),
    ?LOG_TRACE("Received message from peer ~p: ~p", [Peer, Message]),
    Manager = partisan_peer_service:manager(),
    Manager:receive_message(Peer, Channel, Message),
    ?LOG_TRACE("Dispatched ~p to manager.", [Message]),
    ok.
%% @private
send_message(Socket, Message) ->
    EncodedMessage = erlang:term_to_binary(Message),
    partisan_peer_socket:send(Socket, EncodedMessage).
%% @private
maybe_connect_disterl(Node) ->
    case partisan_config:get(connect_disterl, false) of
        true ->
            case net_adm:ping(Node) of
                pong ->
                    ok;
                pang ->
                    error
            end;
        false ->
            ok
    end.