src/support/z_mqtt.erl

%% @author Marc Worrell <marc@worrell.nl>
%% @copyright 2013-2018 Marc Worrell

%% @doc Interface to MQTT pubsub functionality
%% @end

%% Copyright 2013-2018 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(z_mqtt).

-export([
    publish/3,
    publish/4,
    publish/2,

    subscribe/2,
    subscribe/3,
    subscribe/4,
    subscribe/5,

    unsubscribe/2,
    unsubscribe/3,

    call/2,
    call/3,

    temp_response_topic/1,
    await_response/2,
    await_response/3,

    map_topic/2,
    map_topic_filter/2,

    origin_topic/1,
    flatten_topic/1,

    is_client_alive/1,
    whereis_client/1
]).

-type topic() :: mqtt_sessions:topic().
-type topic_any() :: mqtt_sessions:topic()
                   | m_rsc:resource_id()
                   | {object, list()}
                   | {subject, list()}.
-type publish_options() :: #{ retain => boolean(), qos => 0 | 1 | 2 }.
-type callback() :: pid() | {module(), atom(), list()}.

-export_type([
    topic/0,
    topic_any/0,
    callback/0,
    publish_options/0
]).

-include_lib("zotonic.hrl").


-define(MQTT_CALL_TIMEOUT, 60000).


-spec publish( topic(), term(), z:context()) -> ok | {error, term()}.
publish(Topic, Payload, Context) ->
    publish(Topic, Payload, #{ qos => 0, retain => false }, Context).

-spec publish( topic(), term(), publish_options(), z:context()) -> ok | {error, term()}.
publish(Topic, Payload, Options, Context) ->
    Msg = #{
        type => publish,
        topic => Topic,
        payload => Payload,
        qos => maps:get(qos, Options, 0),
        retain => maps:get(retain, Options, false)
    },
    publish(Msg, Context).


-spec publish( mqtt_packet_map:mqtt_packet(), z:context()) -> ok | {error, term()}.
publish(#{ type := publish, topic := Topic } = Msg, Context) ->
    case map_topic(Topic, Context) of
        {ok, Topic1} ->
            mqtt_sessions:publish(z_context:site(Context), Msg#{ topic => Topic1 }, Context);
        {error, _} = Error ->
            Error
    end.

-spec call( mqtt_packet_map:mqtt_packet(), z:context() ) -> {ok, term()} | {error, term()}.
call( #{ type := publish } = Msg, Context ) ->
    {ok, RespTopic} = temp_response_topic(Context),
    Props = maps:get(properties, Msg, #{}),
    Msg1 = Msg#{
        properties => Props#{ response_topic => RespTopic }
    },
    case publish(Msg1, Context) of
        ok ->
            case await_response(RespTopic, ?MQTT_CALL_TIMEOUT, Context) of
                {ok, #{ message := ReplyMsg } } ->
                    {ok, maps:get(payload, ReplyMsg, undefined)};
                {error, _} = Error ->
                    Error
            end;
        {error, _} = Error ->
            Error
    end.

-spec call( topic(), term(), z:context() ) -> {ok, term()} | {error, term()}.
call( Topic, Payload, Context ) ->
    Msg = #{
        type => publish,
        topic => Topic,
        payload => Payload,
        qos => 0,
        retain => false
    },
    call(Msg, Context).


-spec subscribe( topic(), z:context() ) -> ok | {error, term()}.
subscribe(TopicFilter, Context) ->
    subscribe(TopicFilter, self(), self(), #{ qos => 0 }, Context).

-spec subscribe( topic(), pid(), z:context() ) -> ok | {error, term()}.
subscribe(TopicFilter, Pid, Context) when is_pid(Pid) ->
    subscribe(TopicFilter, Pid, Pid, #{ qos => 0 }, Context).


-spec subscribe( topic(), callback(), pid(), z:context() ) -> ok | {error, term()}.
subscribe(TopicFilter, Callback, OwnerPid, Context) when is_pid(OwnerPid) ->
    subscribe(TopicFilter, Callback, OwnerPid, #{ qos => 0 }, Context).

-spec subscribe( topic(), callback(), pid(), mqtt_sessions:subscriber_options(), z:context() ) -> ok | {error, term()}.
subscribe(TopicFilter, Callback, OwnerPid, Options, Context) when is_pid(OwnerPid) ->
    case map_topic_filter(TopicFilter, Context) of
        {ok, TopicFilter1} ->
            mqtt_sessions:subscribe(z_context:site(Context), TopicFilter1, Callback, OwnerPid, Options, Context);
        {error, _} = Error ->
            Error
    end.


-spec unsubscribe( topic(), z:context() ) -> ok | {error, term()}.
unsubscribe(TopicFilter, Context) ->
    unsubscribe(TopicFilter, self(), Context).

-spec unsubscribe( topic(), pid(), z:context() ) -> ok | {error, term()}.
unsubscribe(TopicFilter, OwnerPid, Context) when is_pid(OwnerPid) ->
    case map_topic_filter(TopicFilter, Context) of
        {ok, TopicFilter1} ->
            mqtt_sessions:unsubscribe(z_context:site(Context), TopicFilter1, OwnerPid);
        {error, _} = Error ->
            Error
    end.


-spec temp_response_topic( z:context() ) -> {ok, mqtt_sessions:topic()} | {error, term()}.
temp_response_topic(Context) ->
    mqtt_sessions:temp_response_topic(z_context:site(Context), z_context:prune_for_async(Context)).

-spec await_response( mqtt_sessions:topic(), z:context() ) -> {ok, mqtt_packet_map:mqtt_packet()} | {error, timeout}.
await_response( Topic, Context ) ->
    mqtt_sessions:await_response(z_context:site(Context), Topic).

-spec await_response( mqtt_sessions:topic(), pos_integer(), z:context() ) -> {ok, mqtt_packet_map:mqtt_packet()} | {error, timeout}.
await_response( Topic, Timeout, Context ) ->
    mqtt_sessions:await_response(z_context:site(Context), Topic, Timeout).



-spec map_topic(Topic, Context) -> {ok, mqtt_sessions:topic()} | {error, no_client | term()} when
    Topic :: mqtt_sessions:topic() | undefined | atom() | m_rsc:resource_id(),
    Context :: z:context().
map_topic(undefined, _Context) ->
    {error, no_topic};
map_topic(<<>>, _Context) ->
    {error, no_topic};
map_topic(Topic, Context) when is_binary(Topic) ->
    map_topic(binary:split(Topic, <<"/">>, [global]), Context);
map_topic([ <<"~client">> | _ ], #context{ client_topic = undefined }) ->
    {error, no_client};
map_topic([ <<"~client">> | T ], #context{ client_topic = Route }) ->
    {ok, Route ++ T};
map_topic([ <<"~user">> | T ], #context{ user_id = UserId }) when is_integer(UserId) ->
    {ok, [ <<"user">>, integer_to_binary(UserId) | T ]};
map_topic([ <<"~user">> | T ], #context{ user_id = undefined }) ->
    {ok, [ <<"user">>, <<"anonymous">> | T ]};
map_topic(Topic, Context) when is_tuple(Topic); is_integer(Topic) ->
    map_topic_filter(Topic, Context);
map_topic(Topic, Context) when is_atom(Topic) ->
    % Assume this is the unique name of a resource
    map_topic(m_rsc:rid(Topic, Context), Context);
map_topic(Topic, _Context) ->
    {ok, Topic}.


%% @doc Map subscription topic to a topic filter.
-spec map_topic_filter(TopicAny, Context) -> {ok, topic()} | {error, no_topic | term()} when
    TopicAny ::  topic_any() | undefined,
    Context :: z:context().
map_topic_filter(undefined, _Context) ->
    {error, no_topic};
map_topic_filter([], _Context) ->
    {error, no_topic};
map_topic_filter(Topic, Context) when is_list(Topic) ->
    map_topic(Topic, Context);
map_topic_filter(Topic, Context) when is_binary(Topic) ->
    map_topic(binary:split(Topic, <<"/">>, [global]), Context);
map_topic_filter(RscId, _Context) when is_integer(RscId) ->
    {ok, [ <<"model">>, <<"rsc">>, <<"event">>, z_convert:to_binary(RscId), <<"+">> ]};
map_topic_filter(RscId, Context) when is_atom(RscId) ->
    map_topic_filter(m_rsc:rid(RscId, Context), Context);
map_topic_filter({object, Props}, Context) when is_list(Props) ->
    map_topic_edge(<<"o">>, Props, Context);
map_topic_filter({subject, Props}, Context) when is_list(Props) ->
    map_topic_edge(<<"s">>, Props, Context).

map_topic_edge(ObjSub, Props, Context) ->
    Id = proplists:get_value(id, Props),
    MaybePredicate = proplists:get_value(predicate, Props),
    case to_predicate_name(MaybePredicate, Context) of
        {ok, PredName} ->
            {ok, [
                <<"model">>, <<"edge">>, <<"event">>,
                z_convert:to_binary(Id), ObjSub, PredName
            ]};
        {error, _} = Error ->
            Error
    end.

-spec to_predicate_name(Name, Context) -> {ok, binary()} | {error, term()}
    when Name :: binary() | string() | undefined | m_rsc:resource(),
         Context :: z:context().
to_predicate_name(undefined, _Context) -> {ok, <<"+">>};
to_predicate_name(<<"*">>, _Context) -> {ok, <<"+">>};
to_predicate_name("*", _Context) -> {ok, <<"+">>};
to_predicate_name('*', _Context) -> {ok, <<"+">>};
to_predicate_name(<<>>, _Context) -> {ok, <<"+">>};
to_predicate_name(Id, Context) ->
    case m_rsc:rid(Id, Context) of
        undefined ->
            {error, {unknown_predicate, Id}};
        RId ->
            case m_predicate:id_to_name(RId, Context) of
                {ok, Name} ->
                    {ok, z_convert:to_binary(Name)};
                {error, _} = Error ->
                    Error
            end
    end.


%% @doc Ensure that the topic is prefixed with "bridge/origin/".
-spec origin_topic(Topic) -> OriginTopic
    when Topic :: mqtt_sessions:topic(),
         OriginTopic :: mqtt_sessions:topic().
origin_topic(<<"bridge/origin/", _/binary>> = Topic) ->
    Topic;
origin_topic(Topic) when is_binary(Topic) ->
    <<"bridge/origin/", Topic/binary>>;
origin_topic([ <<"bridge">>, <<"origin">> | _ ] = Topic) ->
    Topic;
origin_topic(Topic) when is_list(Topic) ->
    flatten_topic([ <<"bridge">>, <<"origin">> | Topic ]).

flatten_topic(T) when is_binary(T) ->
    T;
flatten_topic(T) when is_list(T) ->
    T1 = lists:map(fun z_convert:to_binary/1, T),
    iolist_to_binary( z_utils:join_defined($/, T1) ).


%% @doc Check if the MQTT client for this request context is alive.
-spec is_client_alive( z:context() ) -> boolean().
is_client_alive(Context) ->
    case z_context:client_id(Context) of
        {error, _} ->
            false;
        {ok, ClientId} ->
            case mqtt_sessions_registry:find_session(z_context:site(Context), ClientId) of
                {ok, Pid} when is_pid(Pid) -> erlang:is_process_alive(Pid);
                {error, _} -> false
            end
    end.

%% @doc Find the Pid of the MQTT client process for this request context.
-spec whereis_client( z:context() ) -> {ok, pid()} | {error, term()}.
whereis_client(Context) ->
    case z_context:client_id(Context) of
        {error, _} = Error ->
            Error;
        {ok, ClientId} ->
            mqtt_sessions_registry:find_session(z_context:site(Context), ClientId)
    end.