-module(lightspeed@channel).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/channel.gleam").
-export([new_single_node/0, new_cluster_ready/2, event_label/1, handle/2, flush_outbox/1, crashed/1, adapter_label/1, subscribers/2, presences/2, membership_count/1, outbound_label/1]).
-export_type([event/0, outbound/0, membership/0, channel/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Channel runtime model with topic routing, pubsub, and presence.\n").
-type event() :: {join, binary(), binary(), binary(), integer()} |
{rejoin, binary(), binary(), binary(), integer()} |
{leave, binary(), binary()} |
{broadcast, binary(), binary(), binary()} |
{push, binary(), binary(), binary(), binary()} |
{crash, binary()} |
restart.
-type outbound() :: {reply, binary(), binary(), binary(), binary()} |
{push_message, binary(), binary(), binary(), binary()} |
{broadcast_message, binary(), binary(), list(binary())} |
{presence_diff, binary(), lightspeed@presence:diff()} |
{system, binary()}.
-type membership() :: {membership, binary(), binary(), binary(), binary()}.
-opaque channel() :: {channel,
lightspeed@pubsub:broker(binary()),
lightspeed@presence:tracker(),
list(membership()),
list(outbound()),
boolean()}.
-file("src/lightspeed/channel.gleam", 50).
?DOC(" Start a single-node channel.\n").
-spec new_single_node() -> channel().
new_single_node() ->
{channel,
lightspeed@pubsub:new_single_node(),
lightspeed@presence:new(),
[],
[],
false}.
-file("src/lightspeed/channel.gleam", 61).
?DOC(" Start a cluster-ready channel.\n").
-spec new_cluster_ready(binary(), binary()) -> channel().
new_cluster_ready(Node_id, Cluster) ->
{channel,
lightspeed@pubsub:new_cluster_ready(Node_id, Cluster),
lightspeed@presence:new(),
[],
[],
false}.
-file("src/lightspeed/channel.gleam", 359).
-spec emit(channel(), outbound()) -> channel().
emit(Channel, Outbound) ->
{channel,
erlang:element(2, Channel),
erlang:element(3, Channel),
erlang:element(4, Channel),
[Outbound | erlang:element(5, Channel)],
erlang:element(6, Channel)}.
-file("src/lightspeed/channel.gleam", 370).
-spec find_membership(list(membership()), binary(), binary()) -> gleam@option:option(membership()).
find_membership(Memberships_rev, Client_id, Topic) ->
case Memberships_rev of
[] ->
none;
[Membership | Rest] ->
case (erlang:element(2, Membership) =:= Client_id) andalso (erlang:element(
3,
Membership
)
=:= Topic) of
true ->
{some, Membership};
false ->
find_membership(Rest, Client_id, Topic)
end
end.
-file("src/lightspeed/channel.gleam", 363).
-spec has_membership(channel(), binary(), binary()) -> boolean().
has_membership(Channel, Client_id, Topic) ->
case find_membership(erlang:element(4, Channel), Client_id, Topic) of
{some, _} ->
true;
none ->
false
end.
-file("src/lightspeed/channel.gleam", 337).
-spec push_deliveries(channel(), list(lightspeed@pubsub:delivery(binary()))) -> channel().
push_deliveries(Channel, Deliveries) ->
case Deliveries of
[] ->
Channel;
[Delivery | Rest] ->
push_deliveries(
emit(
Channel,
{push_message,
lightspeed@pubsub:delivery_subscriber(Delivery),
lightspeed@pubsub:delivery_topic(Delivery),
<<"broadcast"/utf8>>,
lightspeed@pubsub:delivery_message(Delivery)}
),
Rest
)
end.
-file("src/lightspeed/channel.gleam", 412).
-spec remove_membership_loop(list(membership()), binary(), binary()) -> {list(membership()),
gleam@option:option(membership())}.
remove_membership_loop(Memberships_rev, Client_id, Topic) ->
case Memberships_rev of
[] ->
{[], none};
[Membership | Rest] ->
case (erlang:element(2, Membership) =:= Client_id) andalso (erlang:element(
3,
Membership
)
=:= Topic) of
true ->
{Rest, {some, Membership}};
false ->
{Updated_rest, Removed} = remove_membership_loop(
Rest,
Client_id,
Topic
),
{[Membership | Updated_rest], Removed}
end
end.
-file("src/lightspeed/channel.gleam", 402).
-spec remove_membership(channel(), binary(), binary()) -> {channel(),
gleam@option:option(membership())}.
remove_membership(Channel, Client_id, Topic) ->
{Memberships_rev, Removed} = remove_membership_loop(
erlang:element(4, Channel),
Client_id,
Topic
),
{{channel,
erlang:element(2, Channel),
erlang:element(3, Channel),
Memberships_rev,
erlang:element(5, Channel),
erlang:element(6, Channel)},
Removed}.
-file("src/lightspeed/channel.gleam", 300).
-spec leave(channel(), binary(), binary()) -> channel().
leave(Channel, Client_id, Topic) ->
{Updated, Membership} = remove_membership(Channel, Client_id, Topic),
case Membership of
none ->
emit(
Updated,
{reply,
Client_id,
Topic,
<<"error"/utf8>>,
<<"not_joined"/utf8>>}
);
{some, Membership@1} ->
Broker = lightspeed@pubsub:unsubscribe(
erlang:element(2, Updated),
Topic,
Client_id
),
{Tracker, Diff} = lightspeed@presence:leave(
erlang:element(3, Updated),
erlang:element(3, Membership@1),
erlang:element(4, Membership@1),
erlang:element(5, Membership@1)
),
_pipe = {channel,
Broker,
Tracker,
erlang:element(4, Updated),
erlang:element(5, Updated),
erlang:element(6, Updated)},
_pipe@1 = emit(
_pipe,
{reply, Client_id, Topic, <<"ok"/utf8>>, <<"left"/utf8>>}
),
emit(_pipe@1, {presence_diff, Topic, Diff})
end.
-file("src/lightspeed/channel.gleam", 385).
-spec upsert_membership(list(membership()), membership()) -> list(membership()).
upsert_membership(Memberships_rev, Target) ->
case Memberships_rev of
[] ->
[Target];
[Membership | Rest] ->
case (erlang:element(2, Membership) =:= erlang:element(2, Target))
andalso (erlang:element(3, Membership) =:= erlang:element(3, Target)) of
true ->
[Target | Rest];
false ->
[Membership | upsert_membership(Rest, Target)]
end
end.
-file("src/lightspeed/channel.gleam", 431).
-spec membership_ref(binary(), binary()) -> binary().
membership_ref(Client_id, Topic) ->
<<<<Client_id/binary, "@"/utf8>>/binary, Topic/binary>>.
-file("src/lightspeed/channel.gleam", 257).
-spec join(channel(), binary(), binary(), binary(), integer(), binary()) -> channel().
join(Channel, Client_id, Topic, User_id, Now_ms, Reply_payload) ->
Presence_ref = membership_ref(Client_id, Topic),
Broker = lightspeed@pubsub:subscribe(
erlang:element(2, Channel),
Topic,
Client_id
),
{Tracker, Diff} = lightspeed@presence:join(
erlang:element(3, Channel),
Topic,
User_id,
{meta, Presence_ref, Now_ms}
),
Memberships_rev = upsert_membership(
erlang:element(4, Channel),
{membership, Client_id, Topic, User_id, Presence_ref}
),
_pipe = {channel,
Broker,
Tracker,
Memberships_rev,
erlang:element(5, Channel),
erlang:element(6, Channel)},
_pipe@1 = emit(
_pipe,
{reply, Client_id, Topic, <<"ok"/utf8>>, Reply_payload}
),
emit(_pipe@1, {presence_diff, Topic, Diff}).
-file("src/lightspeed/channel.gleam", 150).
-spec apply_event(channel(), event()) -> channel().
apply_event(Channel, Event) ->
case Event of
{join, Client_id, Topic, User_id, Now_ms} ->
join(Channel, Client_id, Topic, User_id, Now_ms, <<"joined"/utf8>>);
{rejoin, Client_id@1, Topic@1, User_id@1, Now_ms@1} ->
{Membership_state, Previous} = remove_membership(
Channel,
Client_id@1,
Topic@1
),
Membership_state@1 = case Previous of
none ->
Membership_state;
{some, Membership} ->
Broker = lightspeed@pubsub:unsubscribe(
erlang:element(2, Membership_state),
erlang:element(3, Membership),
erlang:element(2, Membership)
),
{Tracker, Diff} = lightspeed@presence:leave(
erlang:element(3, Membership_state),
erlang:element(3, Membership),
erlang:element(4, Membership),
erlang:element(5, Membership)
),
_pipe = {channel,
Broker,
Tracker,
erlang:element(4, Membership_state),
erlang:element(5, Membership_state),
erlang:element(6, Membership_state)},
emit(
_pipe,
{presence_diff, erlang:element(3, Membership), Diff}
)
end,
join(
Membership_state@1,
Client_id@1,
Topic@1,
User_id@1,
Now_ms@1,
<<"rejoined"/utf8>>
);
{leave, Client_id@2, Topic@2} ->
leave(Channel, Client_id@2, Topic@2);
{broadcast, Client_id@3, Topic@3, Payload} ->
case has_membership(Channel, Client_id@3, Topic@3) of
false ->
emit(
Channel,
{reply,
Client_id@3,
Topic@3,
<<"error"/utf8>>,
<<"not_joined"/utf8>>}
);
true ->
{_, Deliveries} = lightspeed@pubsub:publish(
erlang:element(2, Channel),
Topic@3,
Payload
),
Recipients = gleam@list:map(
Deliveries,
fun lightspeed@pubsub:delivery_subscriber/1
),
Pushed = push_deliveries(Channel, Deliveries),
_pipe@1 = Pushed,
_pipe@2 = emit(
_pipe@1,
{broadcast_message, Topic@3, Payload, Recipients}
),
emit(
_pipe@2,
{reply,
Client_id@3,
Topic@3,
<<"ok"/utf8>>,
<<"broadcast:"/utf8,
(erlang:integer_to_binary(
erlang:length(Recipients)
))/binary>>}
)
end;
{push, Client_id@4, Topic@4, Event@1, Payload@1} ->
case has_membership(Channel, Client_id@4, Topic@4) of
false ->
emit(
Channel,
{reply,
Client_id@4,
Topic@4,
<<"error"/utf8>>,
<<"not_joined"/utf8>>}
);
true ->
_pipe@3 = Channel,
_pipe@4 = emit(
_pipe@3,
{push_message, Client_id@4, Topic@4, Event@1, Payload@1}
),
emit(
_pipe@4,
{reply,
Client_id@4,
Topic@4,
<<"ok"/utf8>>,
<<"pushed"/utf8>>}
)
end;
{crash, Reason} ->
_pipe@5 = {channel,
erlang:element(2, Channel),
erlang:element(3, Channel),
erlang:element(4, Channel),
erlang:element(5, Channel),
true},
emit(_pipe@5, {system, <<"crashed:"/utf8, Reason/binary>>});
restart ->
Channel
end.
-file("src/lightspeed/channel.gleam", 138).
?DOC(" Stable event label.\n").
-spec event_label(event()) -> binary().
event_label(Event) ->
case Event of
{join, _, Topic, _, _} ->
<<"join:"/utf8, Topic/binary>>;
{rejoin, _, Topic@1, _, _} ->
<<"rejoin:"/utf8, Topic@1/binary>>;
{leave, _, Topic@2} ->
<<"leave:"/utf8, Topic@2/binary>>;
{broadcast, _, Topic@3, _} ->
<<"broadcast:"/utf8, Topic@3/binary>>;
{push, _, Topic@4, Event@1, _} ->
<<<<<<"push:"/utf8, Topic@4/binary>>/binary, ":"/utf8>>/binary,
Event@1/binary>>;
{crash, Reason} ->
<<"crash:"/utf8, Reason/binary>>;
restart ->
<<"restart"/utf8>>
end.
-file("src/lightspeed/channel.gleam", 72).
?DOC(" Handle one channel event.\n").
-spec handle(channel(), event()) -> channel().
handle(Channel, Event) ->
case erlang:element(6, Channel) of
true ->
case Event of
restart ->
_pipe = {channel,
erlang:element(2, Channel),
erlang:element(3, Channel),
erlang:element(4, Channel),
erlang:element(5, Channel),
false},
emit(_pipe, {system, <<"restarted"/utf8>>});
_ ->
emit(
Channel,
{system,
<<"ignored_while_crashed:"/utf8,
(event_label(Event))/binary>>}
)
end;
false ->
apply_event(Channel, Event)
end.
-file("src/lightspeed/channel.gleam", 92).
?DOC(" Flush outbox in emit order.\n").
-spec flush_outbox(channel()) -> {channel(), list(outbound())}.
flush_outbox(Channel) ->
Outbox = lists:reverse(erlang:element(5, Channel)),
{{channel,
erlang:element(2, Channel),
erlang:element(3, Channel),
erlang:element(4, Channel),
[],
erlang:element(6, Channel)},
Outbox}.
-file("src/lightspeed/channel.gleam", 98).
?DOC(" True when channel is crashed.\n").
-spec crashed(channel()) -> boolean().
crashed(Channel) ->
erlang:element(6, Channel).
-file("src/lightspeed/channel.gleam", 103).
?DOC(" PubSub adapter label.\n").
-spec adapter_label(channel()) -> binary().
adapter_label(Channel) ->
_pipe = erlang:element(2, Channel),
_pipe@1 = lightspeed@pubsub:adapter(_pipe),
lightspeed@pubsub:adapter_label(_pipe@1).
-file("src/lightspeed/channel.gleam", 110).
?DOC(" Subscribers for one topic.\n").
-spec subscribers(channel(), binary()) -> list(binary()).
subscribers(Channel, Topic) ->
lightspeed@pubsub:subscribers(erlang:element(2, Channel), Topic).
-file("src/lightspeed/channel.gleam", 115).
?DOC(" Presence entries for one topic.\n").
-spec presences(channel(), binary()) -> list(lightspeed@presence:entry()).
presences(Channel, Topic) ->
lightspeed@presence:list_topic(erlang:element(3, Channel), Topic).
-file("src/lightspeed/channel.gleam", 120).
?DOC(" Membership count.\n").
-spec membership_count(channel()) -> integer().
membership_count(Channel) ->
erlang:length(erlang:element(4, Channel)).
-file("src/lightspeed/channel.gleam", 125).
?DOC(" Stable outbound label.\n").
-spec outbound_label(outbound()) -> binary().
outbound_label(Outbound) ->
case Outbound of
{reply, _, Topic, Status, _} ->
<<<<<<"reply:"/utf8, Topic/binary>>/binary, ":"/utf8>>/binary,
Status/binary>>;
{push_message, _, Topic@1, Event, _} ->
<<<<<<"push:"/utf8, Topic@1/binary>>/binary, ":"/utf8>>/binary,
Event/binary>>;
{broadcast_message, Topic@2, _, Recipients} ->
<<<<<<"broadcast:"/utf8, Topic@2/binary>>/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(erlang:length(Recipients)))/binary>>;
{presence_diff, Topic@3, Diff} ->
<<<<<<"presence:"/utf8, Topic@3/binary>>/binary, ":"/utf8>>/binary,
(lightspeed@presence:diff_label(Diff))/binary>>;
{system, Label} ->
<<"system:"/utf8, Label/binary>>
end.