-module(lightspeed@pubsub).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pubsub.gleam").
-export([new_single_node/0, new_cluster_ready/2, adapter/1, adapter_label/1, subscribe/3, unsubscribe/3, subscribers/2, publish/3, subscriber_count/2, topic_labels/1, delivery_subscriber/1, delivery_topic/1, delivery_message/1]).
-export_type([adapter/0, delivery/1, topic_subscribers/0, broker/1]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Deterministic PubSub abstraction for channel fanout.\n").
-type adapter() :: single_node | {cluster_ready, binary(), binary()}.
-type delivery(GYA) :: {delivery, binary(), binary(), GYA}.
-type topic_subscribers() :: {topic_subscribers, binary(), list(binary())}.
-opaque broker(GYB) :: {broker, adapter(), list(topic_subscribers())} |
{gleam_phantom, GYB}.
-file("src/lightspeed/pubsub.gleam", 27).
?DOC(" New single-node broker.\n").
-spec new_single_node() -> broker(any()).
new_single_node() ->
{broker, single_node, []}.
-file("src/lightspeed/pubsub.gleam", 32).
?DOC(" New cluster-ready broker.\n").
-spec new_cluster_ready(binary(), binary()) -> broker(any()).
new_cluster_ready(Node_id, Cluster) ->
{broker, {cluster_ready, Node_id, Cluster}, []}.
-file("src/lightspeed/pubsub.gleam", 40).
?DOC(" Broker adapter.\n").
-spec adapter(broker(any())) -> adapter().
adapter(Broker) ->
erlang:element(2, Broker).
-file("src/lightspeed/pubsub.gleam", 45).
?DOC(" Stable adapter label.\n").
-spec adapter_label(adapter()) -> binary().
adapter_label(Adapter) ->
case Adapter of
single_node ->
<<"single_node"/utf8>>;
{cluster_ready, Node_id, Cluster} ->
<<<<<<"cluster_ready:"/utf8, Cluster/binary>>/binary, ":"/utf8>>/binary,
Node_id/binary>>
end.
-file("src/lightspeed/pubsub.gleam", 189).
-spec contains(list(binary()), binary()) -> boolean().
contains(Values, Target) ->
case Values of
[] ->
false;
[Value | Rest] ->
case Value =:= Target of
true ->
true;
false ->
contains(Rest, Target)
end
end.
-file("src/lightspeed/pubsub.gleam", 122).
-spec upsert_subscriber(list(topic_subscribers()), binary(), binary()) -> list(topic_subscribers()).
upsert_subscriber(Topics_rev, Topic, Subscriber_id) ->
case Topics_rev of
[] ->
[{topic_subscribers, Topic, [Subscriber_id]}];
[Entry | Rest] ->
case erlang:element(2, Entry) =:= Topic of
true ->
case contains(erlang:element(3, Entry), Subscriber_id) of
true ->
[Entry | Rest];
false ->
[{topic_subscribers,
erlang:element(2, Entry),
[Subscriber_id | erlang:element(3, Entry)]} |
Rest]
end;
false ->
[Entry | upsert_subscriber(Rest, Topic, Subscriber_id)]
end
end.
-file("src/lightspeed/pubsub.gleam", 54).
?DOC(" Subscribe one subscriber to one topic.\n").
-spec subscribe(broker(GYI), binary(), binary()) -> broker(GYI).
subscribe(Broker, Topic, Subscriber_id) ->
{broker,
erlang:element(2, Broker),
upsert_subscriber(erlang:element(3, Broker), Topic, Subscriber_id)}.
-file("src/lightspeed/pubsub.gleam", 200).
-spec remove_item(list(binary()), binary()) -> list(binary()).
remove_item(Values, Target) ->
case Values of
[] ->
[];
[Value | Rest] ->
case Value =:= Target of
true ->
remove_item(Rest, Target);
false ->
[Value | remove_item(Rest, Target)]
end
end.
-file("src/lightspeed/pubsub.gleam", 149).
-spec remove_subscriber(list(topic_subscribers()), binary(), binary()) -> list(topic_subscribers()).
remove_subscriber(Topics_rev, Topic, Subscriber_id) ->
case Topics_rev of
[] ->
[];
[Entry | Rest] ->
case erlang:element(2, Entry) =:= Topic of
true ->
Remaining = remove_item(
erlang:element(3, Entry),
Subscriber_id
),
case Remaining of
[] ->
Rest;
_ ->
[{topic_subscribers,
erlang:element(2, Entry),
Remaining} |
Rest]
end;
false ->
[Entry | remove_subscriber(Rest, Topic, Subscriber_id)]
end
end.
-file("src/lightspeed/pubsub.gleam", 66).
?DOC(" Unsubscribe one subscriber from one topic.\n").
-spec unsubscribe(broker(GYL), binary(), binary()) -> broker(GYL).
unsubscribe(Broker, Topic, Subscriber_id) ->
{broker,
erlang:element(2, Broker),
remove_subscriber(erlang:element(3, Broker), Topic, Subscriber_id)}.
-file("src/lightspeed/pubsub.gleam", 211).
-spec deliver_loop(binary(), GZQ, list(binary()), list(delivery(GZQ))) -> list(delivery(GZQ)).
deliver_loop(Topic, Message, Subscribers, Deliveries_rev) ->
case Subscribers of
[] ->
lists:reverse(Deliveries_rev);
[Subscriber_id | Rest] ->
deliver_loop(
Topic,
Message,
Rest,
[{delivery, Topic, Subscriber_id, Message} | Deliveries_rev]
)
end.
-file("src/lightspeed/pubsub.gleam", 175).
-spec find_topic(list(topic_subscribers()), binary()) -> list(binary()).
find_topic(Topics_rev, Topic) ->
case Topics_rev of
[] ->
[];
[Entry | Rest] ->
case erlang:element(2, Entry) =:= Topic of
true ->
erlang:element(3, Entry);
false ->
find_topic(Rest, Topic)
end
end.
-file("src/lightspeed/pubsub.gleam", 88).
?DOC(" Subscribers for one topic in stable join order.\n").
-spec subscribers(broker(any()), binary()) -> list(binary()).
subscribers(Broker, Topic) ->
_pipe = erlang:element(3, Broker),
_pipe@1 = find_topic(_pipe, Topic),
lists:reverse(_pipe@1).
-file("src/lightspeed/pubsub.gleam", 78).
?DOC(" Publish one message to one topic.\n").
-spec publish(broker(GYO), binary(), GYO) -> {broker(GYO), list(delivery(GYO))}.
publish(Broker, Topic, Message) ->
Subscribers = subscribers(Broker, Topic),
{Broker, deliver_loop(Topic, Message, Subscribers, [])}.
-file("src/lightspeed/pubsub.gleam", 95).
?DOC(" Number of subscribers for one topic.\n").
-spec subscriber_count(broker(any()), binary()) -> integer().
subscriber_count(Broker, Topic) ->
erlang:length(subscribers(Broker, Topic)).
-file("src/lightspeed/pubsub.gleam", 100).
?DOC(" Stable topic labels for tests and logs.\n").
-spec topic_labels(broker(any())) -> list(binary()).
topic_labels(Broker) ->
_pipe = erlang:element(3, Broker),
gleam@list:map(
_pipe,
fun(Topic) ->
<<<<(erlang:element(2, Topic))/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(
erlang:length(erlang:element(3, Topic))
))/binary>>
end
).
-file("src/lightspeed/pubsub.gleam", 108).
?DOC(" Subscriber id for one delivery.\n").
-spec delivery_subscriber(delivery(any())) -> binary().
delivery_subscriber(Delivery) ->
erlang:element(3, Delivery).
-file("src/lightspeed/pubsub.gleam", 113).
?DOC(" Topic for one delivery.\n").
-spec delivery_topic(delivery(any())) -> binary().
delivery_topic(Delivery) ->
erlang:element(2, Delivery).
-file("src/lightspeed/pubsub.gleam", 118).
?DOC(" Message payload for one delivery.\n").
-spec delivery_message(delivery(GZF)) -> GZF.
delivery_message(Delivery) ->
erlang:element(4, Delivery).