src/mqtt_sessions_incoming.erl

%% @author Marc Worrell <marc@worrell.nl>
%% @copyright 2018 Marc Worrell

%% Copyright 2018 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(mqtt_sessions_incoming).

-export([
    incoming_connect/3,
    send_connack_error/3
]).


-include("../include/mqtt_sessions.hrl").
-include_lib("mqtt_packet_map/include/mqtt_packet_map.hrl").


%% @doc Handle an incoming connect message, the package MUST be a connect package.
-spec incoming_connect( Pool :: atom(), mqtt_packet_map:mqtt_packet(), mqtt_sessions:msg_options() ) ->
        {ok, mqtt_sessions:session_ref()}.
incoming_connect(Pool, #{ type := connect, client_id := <<>> } = Msg, Options) ->
    start_session(Pool, Msg, Options);
incoming_connect(Pool, #{ type := connect, client_id := ClientId } = Msg, Options) ->
    % Check client_id in connect message
    case mqtt_sessions_registry:find_session(Pool, ClientId) of
        {ok, SessionRef} ->
            ok = mqtt_sessions_process:incoming_connect(SessionRef, Msg, Options),
            {ok, SessionRef};
        {error, notfound} ->
            start_session(Pool, Msg, Options)
        % {error, _} = Error ->
        %     send_connack_error(?MQTT_RC_ERROR, Msg, Options),
        %     Error
    end.

%% @doc The session pool returned an error when trying to map the client-id
send_connack_error(ReasonCode, #{ protocol_version := PV }, Options) ->
    AckMsg = #{
        type => connack,
        reason_code => ReasonCode
    },
    AckMsgB = mqtt_packet_map:encode(PV, AckMsg),
    case maps:get(transport, Options, undefined) of
        undefined ->
            ok;
        Pid when is_pid(Pid) ->
            Pid ! {mqtt_transport, self(), AckMsgB},
            Pid ! {mqtt_transport, self(), disconnect};
        F when is_function(F) ->
            F(AckMsgB),
            F(disconnect)
    end.

-spec start_session( atom(), mqtt_packet_map:mqtt_packet(), mqtt_sessions:msg_options() ) -> {ok, pid()}.
start_session(Pool, #{ client_id := ClientId } = Msg, MsgOptions) ->
    SessionOptions = #{
        peer_ip => maps:get(peer_ip, MsgOptions, undefined),
        context_prefs => maps:get(context_prefs, MsgOptions, #{})
    },
    {ok, {Pid, _NewClientId}} = mqtt_sessions_process_sup:new_session(Pool, ClientId, SessionOptions),
    ok = mqtt_sessions_process:incoming_connect(Pid, Msg, MsgOptions),
    {ok, Pid}.