src/lightspeed@pubsub.erl

-module(lightspeed@pubsub).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pubsub.gleam").
-export([new_single_node/0, new_cluster_ready/2, adapter/1, adapter_label/1, subscribe/3, unsubscribe/3, subscribers/2, publish/3, subscriber_count/2, topic_labels/1, delivery_subscriber/1, delivery_topic/1, delivery_message/1]).
-export_type([adapter/0, delivery/1, topic_subscribers/0, broker/1]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Deterministic PubSub abstraction for channel fanout.\n").

-type adapter() :: single_node | {cluster_ready, binary(), binary()}.

-type delivery(GYA) :: {delivery, binary(), binary(), GYA}.

-type topic_subscribers() :: {topic_subscribers, binary(), list(binary())}.

-opaque broker(GYB) :: {broker, adapter(), list(topic_subscribers())} |
    {gleam_phantom, GYB}.

-file("src/lightspeed/pubsub.gleam", 27).
?DOC(" New single-node broker.\n").
-spec new_single_node() -> broker(any()).
new_single_node() ->
    {broker, single_node, []}.

-file("src/lightspeed/pubsub.gleam", 32).
?DOC(" New cluster-ready broker.\n").
-spec new_cluster_ready(binary(), binary()) -> broker(any()).
new_cluster_ready(Node_id, Cluster) ->
    {broker, {cluster_ready, Node_id, Cluster}, []}.

-file("src/lightspeed/pubsub.gleam", 40).
?DOC(" Broker adapter.\n").
-spec adapter(broker(any())) -> adapter().
adapter(Broker) ->
    erlang:element(2, Broker).

-file("src/lightspeed/pubsub.gleam", 45).
?DOC(" Stable adapter label.\n").
-spec adapter_label(adapter()) -> binary().
adapter_label(Adapter) ->
    case Adapter of
        single_node ->
            <<"single_node"/utf8>>;

        {cluster_ready, Node_id, Cluster} ->
            <<<<<<"cluster_ready:"/utf8, Cluster/binary>>/binary, ":"/utf8>>/binary,
                Node_id/binary>>
    end.

-file("src/lightspeed/pubsub.gleam", 189).
-spec contains(list(binary()), binary()) -> boolean().
contains(Values, Target) ->
    case Values of
        [] ->
            false;

        [Value | Rest] ->
            case Value =:= Target of
                true ->
                    true;

                false ->
                    contains(Rest, Target)
            end
    end.

-file("src/lightspeed/pubsub.gleam", 122).
-spec upsert_subscriber(list(topic_subscribers()), binary(), binary()) -> list(topic_subscribers()).
upsert_subscriber(Topics_rev, Topic, Subscriber_id) ->
    case Topics_rev of
        [] ->
            [{topic_subscribers, Topic, [Subscriber_id]}];

        [Entry | Rest] ->
            case erlang:element(2, Entry) =:= Topic of
                true ->
                    case contains(erlang:element(3, Entry), Subscriber_id) of
                        true ->
                            [Entry | Rest];

                        false ->
                            [{topic_subscribers,
                                    erlang:element(2, Entry),
                                    [Subscriber_id | erlang:element(3, Entry)]} |
                                Rest]
                    end;

                false ->
                    [Entry | upsert_subscriber(Rest, Topic, Subscriber_id)]
            end
    end.

-file("src/lightspeed/pubsub.gleam", 54).
?DOC(" Subscribe one subscriber to one topic.\n").
-spec subscribe(broker(GYI), binary(), binary()) -> broker(GYI).
subscribe(Broker, Topic, Subscriber_id) ->
    {broker,
        erlang:element(2, Broker),
        upsert_subscriber(erlang:element(3, Broker), Topic, Subscriber_id)}.

-file("src/lightspeed/pubsub.gleam", 200).
-spec remove_item(list(binary()), binary()) -> list(binary()).
remove_item(Values, Target) ->
    case Values of
        [] ->
            [];

        [Value | Rest] ->
            case Value =:= Target of
                true ->
                    remove_item(Rest, Target);

                false ->
                    [Value | remove_item(Rest, Target)]
            end
    end.

-file("src/lightspeed/pubsub.gleam", 149).
-spec remove_subscriber(list(topic_subscribers()), binary(), binary()) -> list(topic_subscribers()).
remove_subscriber(Topics_rev, Topic, Subscriber_id) ->
    case Topics_rev of
        [] ->
            [];

        [Entry | Rest] ->
            case erlang:element(2, Entry) =:= Topic of
                true ->
                    Remaining = remove_item(
                        erlang:element(3, Entry),
                        Subscriber_id
                    ),
                    case Remaining of
                        [] ->
                            Rest;

                        _ ->
                            [{topic_subscribers,
                                    erlang:element(2, Entry),
                                    Remaining} |
                                Rest]
                    end;

                false ->
                    [Entry | remove_subscriber(Rest, Topic, Subscriber_id)]
            end
    end.

-file("src/lightspeed/pubsub.gleam", 66).
?DOC(" Unsubscribe one subscriber from one topic.\n").
-spec unsubscribe(broker(GYL), binary(), binary()) -> broker(GYL).
unsubscribe(Broker, Topic, Subscriber_id) ->
    {broker,
        erlang:element(2, Broker),
        remove_subscriber(erlang:element(3, Broker), Topic, Subscriber_id)}.

-file("src/lightspeed/pubsub.gleam", 211).
-spec deliver_loop(binary(), GZQ, list(binary()), list(delivery(GZQ))) -> list(delivery(GZQ)).
deliver_loop(Topic, Message, Subscribers, Deliveries_rev) ->
    case Subscribers of
        [] ->
            lists:reverse(Deliveries_rev);

        [Subscriber_id | Rest] ->
            deliver_loop(
                Topic,
                Message,
                Rest,
                [{delivery, Topic, Subscriber_id, Message} | Deliveries_rev]
            )
    end.

-file("src/lightspeed/pubsub.gleam", 175).
-spec find_topic(list(topic_subscribers()), binary()) -> list(binary()).
find_topic(Topics_rev, Topic) ->
    case Topics_rev of
        [] ->
            [];

        [Entry | Rest] ->
            case erlang:element(2, Entry) =:= Topic of
                true ->
                    erlang:element(3, Entry);

                false ->
                    find_topic(Rest, Topic)
            end
    end.

-file("src/lightspeed/pubsub.gleam", 88).
?DOC(" Subscribers for one topic in stable join order.\n").
-spec subscribers(broker(any()), binary()) -> list(binary()).
subscribers(Broker, Topic) ->
    _pipe = erlang:element(3, Broker),
    _pipe@1 = find_topic(_pipe, Topic),
    lists:reverse(_pipe@1).

-file("src/lightspeed/pubsub.gleam", 78).
?DOC(" Publish one message to one topic.\n").
-spec publish(broker(GYO), binary(), GYO) -> {broker(GYO), list(delivery(GYO))}.
publish(Broker, Topic, Message) ->
    Subscribers = subscribers(Broker, Topic),
    {Broker, deliver_loop(Topic, Message, Subscribers, [])}.

-file("src/lightspeed/pubsub.gleam", 95).
?DOC(" Number of subscribers for one topic.\n").
-spec subscriber_count(broker(any()), binary()) -> integer().
subscriber_count(Broker, Topic) ->
    erlang:length(subscribers(Broker, Topic)).

-file("src/lightspeed/pubsub.gleam", 100).
?DOC(" Stable topic labels for tests and logs.\n").
-spec topic_labels(broker(any())) -> list(binary()).
topic_labels(Broker) ->
    _pipe = erlang:element(3, Broker),
    gleam@list:map(
        _pipe,
        fun(Topic) ->
            <<<<(erlang:element(2, Topic))/binary, ":"/utf8>>/binary,
                (erlang:integer_to_binary(
                    erlang:length(erlang:element(3, Topic))
                ))/binary>>
        end
    ).

-file("src/lightspeed/pubsub.gleam", 108).
?DOC(" Subscriber id for one delivery.\n").
-spec delivery_subscriber(delivery(any())) -> binary().
delivery_subscriber(Delivery) ->
    erlang:element(3, Delivery).

-file("src/lightspeed/pubsub.gleam", 113).
?DOC(" Topic for one delivery.\n").
-spec delivery_topic(delivery(any())) -> binary().
delivery_topic(Delivery) ->
    erlang:element(2, Delivery).

-file("src/lightspeed/pubsub.gleam", 118).
?DOC(" Message payload for one delivery.\n").
-spec delivery_message(delivery(GZF)) -> GZF.
delivery_message(Delivery) ->
    erlang:element(4, Delivery).