%%%-------------------------------------------------------------------
%%% @author Niclas Axelsson <niclas@burbas.se>
%%% @doc
%%% Pubsub system for Nova. It uses the pg/pg2 module.
%%%
%%% Pubsub subsystem is started with Nova and does not need any additional
%%% configuration. It uses the pg/pg2 module depending on the version of OTP.
%%% It provides a simple way of distributing messages to a large set of
%%% receivers and exposes a simple set of functions for doing that.
%%%
%%%
%%% A simple example of how to use pubsub in a ping/pong inspired game engine:
%%%
%%% -module(test_module).
%%% -export([player1/0,
%%% player2/0,
%%% start_game/0]).
%%%
%%% player1() ->
%%% spawn(fun() ->
%%% nova_pubsub:join(game_of_pong),
%%% game_loop(1, "pong", "ping").
%%%
%%% player2() ->
%%% spawn(fun() ->
%%% nova_pubsub:join(game_of_pong),
%%% game_loop(2, "ping", "pong").
%%%
%%% game_loop(Player, ExpectedMessage, Smash) ->
%%% receive
%%% ExpectedMessage ->
%%% io:format("Player ~d received ~s and returning ~s~n", [Player, ExpectedMessage, Smash]),
%%% nova_pubsub:broadcast(game_of_pong, "match1", Smash),
%%% game_loop(Player, ExpectedMessage, Smash);
%%% _ ->
%%% game_loop(Player, ExpectedMessage, Smash)
%%% end.
%%%
%%% @end
%%% Created : 8 Apr 2022 by Niclas Axelsson <niclas@burbas.se>
%%%-------------------------------------------------------------------
-module(nova_pubsub).
-export([
start/0,
join/1,
join/2,
leave/1,
leave/2,
broadcast/3,
local_broadcast/3,
get_members/1,
get_local_members/1
]).
-define(SCOPE, nova_scope).
-include("../include/nova_pubsub.hrl").
%%--------------------------------------------------------------------
%% @doc
%% Starts the pubsub subsystem. Only used by Nova internal supervisor!
%% @hidden
%% @end
%%--------------------------------------------------------------------
-spec start() -> ok.
start() ->
pg:start(?SCOPE),
ok.
%%--------------------------------------------------------------------
%% @doc
%% Joining a channel with the calling process. Always returns ok
%% @end
%%--------------------------------------------------------------------
-spec join(Channel :: atom()) -> ok.
join(Channel) ->
join(Channel, self()).
%%--------------------------------------------------------------------
%% @doc
%% Leaves a channnel. Will return ok on success and not_joined if the
%% calling process were not part of the channel.
%% @end
%%--------------------------------------------------------------------
-spec leave(Channel :: atom()) -> ok | not_joined.
leave(Channel) ->
leave(Channel, self()).
%%--------------------------------------------------------------------
%% @doc
%% Same as join/1 but with a specified process.
%% @end
%%--------------------------------------------------------------------
-spec join(Channel :: atom(), Pid :: pid()) -> ok.
join(Channel, Pid) when is_pid(Pid) ->
pg:join(?SCOPE, Channel, Pid).
%%--------------------------------------------------------------------
%% @doc
%% Same as leave/1 but with a specified process.
%% @end
%%--------------------------------------------------------------------
-spec leave(Channel :: atom(), Pid :: pid()) -> ok | not_joined.
leave(Channel, Pid) ->
pg:leave(?SCOPE, Channel, Pid).
%%--------------------------------------------------------------------
%% @doc
%% Broadcasts a message to all members of a channel. Topic is specified
%% to differentiate messages within the same channel.
%% @end
%%--------------------------------------------------------------------
-spec broadcast(Channel :: atom(), Topic :: list() | binary(), Message :: any()) -> ok.
broadcast(Channel, Topic, Message) ->
Members = get_members(Channel),
Envelope = create_envelope(Channel, self(), Topic, Message),
[ Receiver ! Envelope || Receiver <- Members ],
ok.
%%--------------------------------------------------------------------
%% @doc
%% Works in the same way as broadcast/3 but only for members in the same
%% node.
%% @end
%%--------------------------------------------------------------------
-spec local_broadcast(Channel :: atom(), Topic :: list() | binary(), Message :: any()) -> ok.
local_broadcast(Channel, Topic, Message) ->
Members = get_local_members(Channel),
Envelope = create_envelope(Channel, self(), Topic, Message),
[ Receiver ! Envelope || Receiver <- Members ],
ok.
%%--------------------------------------------------------------------
%% @doc
%% Returns all members for a given channel
%% @end
%%--------------------------------------------------------------------
-spec get_members(Channel :: atom()) -> [pid()].
get_members(Channel) ->
pg:get_members(?SCOPE, Channel).
%%--------------------------------------------------------------------
%% @doc
%% Works the same way as get_members/1 but returns only members on the
%% same node.
%% @end
%%--------------------------------------------------------------------
-spec get_local_members(Channel :: atom()) -> [pid()].
get_local_members(Channel) ->
pg:get_local_members(?SCOPE, Channel).
create_envelope(Channel, Sender, Topic, Payload) ->
#nova_pubsub{channel = Channel,
sender = Sender,
topic = Topic,
payload = Payload}.