src/lightspeed@channel.erl

-module(lightspeed@channel).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/channel.gleam").
-export([new_single_node/0, new_cluster_ready/2, event_label/1, handle/2, flush_outbox/1, crashed/1, adapter_label/1, subscribers/2, presences/2, membership_count/1, outbound_label/1]).
-export_type([event/0, outbound/0, membership/0, channel/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Channel runtime model with topic routing, pubsub, and presence.\n").

-type event() :: {join, binary(), binary(), binary(), integer()} |
    {rejoin, binary(), binary(), binary(), integer()} |
    {leave, binary(), binary()} |
    {broadcast, binary(), binary(), binary()} |
    {push, binary(), binary(), binary(), binary()} |
    {crash, binary()} |
    restart.

-type outbound() :: {reply, binary(), binary(), binary(), binary()} |
    {push_message, binary(), binary(), binary(), binary()} |
    {broadcast_message, binary(), binary(), list(binary())} |
    {presence_diff, binary(), lightspeed@presence:diff()} |
    {system, binary()}.

-type membership() :: {membership, binary(), binary(), binary(), binary()}.

-opaque channel() :: {channel,
        lightspeed@pubsub:broker(binary()),
        lightspeed@presence:tracker(),
        list(membership()),
        list(outbound()),
        boolean()}.

-file("src/lightspeed/channel.gleam", 50).
?DOC(" Start a single-node channel.\n").
-spec new_single_node() -> channel().
new_single_node() ->
    {channel,
        lightspeed@pubsub:new_single_node(),
        lightspeed@presence:new(),
        [],
        [],
        false}.

-file("src/lightspeed/channel.gleam", 61).
?DOC(" Start a cluster-ready channel.\n").
-spec new_cluster_ready(binary(), binary()) -> channel().
new_cluster_ready(Node_id, Cluster) ->
    {channel,
        lightspeed@pubsub:new_cluster_ready(Node_id, Cluster),
        lightspeed@presence:new(),
        [],
        [],
        false}.

-file("src/lightspeed/channel.gleam", 359).
-spec emit(channel(), outbound()) -> channel().
emit(Channel, Outbound) ->
    {channel,
        erlang:element(2, Channel),
        erlang:element(3, Channel),
        erlang:element(4, Channel),
        [Outbound | erlang:element(5, Channel)],
        erlang:element(6, Channel)}.

-file("src/lightspeed/channel.gleam", 370).
-spec find_membership(list(membership()), binary(), binary()) -> gleam@option:option(membership()).
find_membership(Memberships_rev, Client_id, Topic) ->
    case Memberships_rev of
        [] ->
            none;

        [Membership | Rest] ->
            case (erlang:element(2, Membership) =:= Client_id) andalso (erlang:element(
                3,
                Membership
            )
            =:= Topic) of
                true ->
                    {some, Membership};

                false ->
                    find_membership(Rest, Client_id, Topic)
            end
    end.

-file("src/lightspeed/channel.gleam", 363).
-spec has_membership(channel(), binary(), binary()) -> boolean().
has_membership(Channel, Client_id, Topic) ->
    case find_membership(erlang:element(4, Channel), Client_id, Topic) of
        {some, _} ->
            true;

        none ->
            false
    end.

-file("src/lightspeed/channel.gleam", 337).
-spec push_deliveries(channel(), list(lightspeed@pubsub:delivery(binary()))) -> channel().
push_deliveries(Channel, Deliveries) ->
    case Deliveries of
        [] ->
            Channel;

        [Delivery | Rest] ->
            push_deliveries(
                emit(
                    Channel,
                    {push_message,
                        lightspeed@pubsub:delivery_subscriber(Delivery),
                        lightspeed@pubsub:delivery_topic(Delivery),
                        <<"broadcast"/utf8>>,
                        lightspeed@pubsub:delivery_message(Delivery)}
                ),
                Rest
            )
    end.

-file("src/lightspeed/channel.gleam", 412).
-spec remove_membership_loop(list(membership()), binary(), binary()) -> {list(membership()),
    gleam@option:option(membership())}.
remove_membership_loop(Memberships_rev, Client_id, Topic) ->
    case Memberships_rev of
        [] ->
            {[], none};

        [Membership | Rest] ->
            case (erlang:element(2, Membership) =:= Client_id) andalso (erlang:element(
                3,
                Membership
            )
            =:= Topic) of
                true ->
                    {Rest, {some, Membership}};

                false ->
                    {Updated_rest, Removed} = remove_membership_loop(
                        Rest,
                        Client_id,
                        Topic
                    ),
                    {[Membership | Updated_rest], Removed}
            end
    end.

-file("src/lightspeed/channel.gleam", 402).
-spec remove_membership(channel(), binary(), binary()) -> {channel(),
    gleam@option:option(membership())}.
remove_membership(Channel, Client_id, Topic) ->
    {Memberships_rev, Removed} = remove_membership_loop(
        erlang:element(4, Channel),
        Client_id,
        Topic
    ),
    {{channel,
            erlang:element(2, Channel),
            erlang:element(3, Channel),
            Memberships_rev,
            erlang:element(5, Channel),
            erlang:element(6, Channel)},
        Removed}.

-file("src/lightspeed/channel.gleam", 300).
-spec leave(channel(), binary(), binary()) -> channel().
leave(Channel, Client_id, Topic) ->
    {Updated, Membership} = remove_membership(Channel, Client_id, Topic),
    case Membership of
        none ->
            emit(
                Updated,
                {reply,
                    Client_id,
                    Topic,
                    <<"error"/utf8>>,
                    <<"not_joined"/utf8>>}
            );

        {some, Membership@1} ->
            Broker = lightspeed@pubsub:unsubscribe(
                erlang:element(2, Updated),
                Topic,
                Client_id
            ),
            {Tracker, Diff} = lightspeed@presence:leave(
                erlang:element(3, Updated),
                erlang:element(3, Membership@1),
                erlang:element(4, Membership@1),
                erlang:element(5, Membership@1)
            ),
            _pipe = {channel,
                Broker,
                Tracker,
                erlang:element(4, Updated),
                erlang:element(5, Updated),
                erlang:element(6, Updated)},
            _pipe@1 = emit(
                _pipe,
                {reply, Client_id, Topic, <<"ok"/utf8>>, <<"left"/utf8>>}
            ),
            emit(_pipe@1, {presence_diff, Topic, Diff})
    end.

-file("src/lightspeed/channel.gleam", 385).
-spec upsert_membership(list(membership()), membership()) -> list(membership()).
upsert_membership(Memberships_rev, Target) ->
    case Memberships_rev of
        [] ->
            [Target];

        [Membership | Rest] ->
            case (erlang:element(2, Membership) =:= erlang:element(2, Target))
            andalso (erlang:element(3, Membership) =:= erlang:element(3, Target)) of
                true ->
                    [Target | Rest];

                false ->
                    [Membership | upsert_membership(Rest, Target)]
            end
    end.

-file("src/lightspeed/channel.gleam", 431).
-spec membership_ref(binary(), binary()) -> binary().
membership_ref(Client_id, Topic) ->
    <<<<Client_id/binary, "@"/utf8>>/binary, Topic/binary>>.

-file("src/lightspeed/channel.gleam", 257).
-spec join(channel(), binary(), binary(), binary(), integer(), binary()) -> channel().
join(Channel, Client_id, Topic, User_id, Now_ms, Reply_payload) ->
    Presence_ref = membership_ref(Client_id, Topic),
    Broker = lightspeed@pubsub:subscribe(
        erlang:element(2, Channel),
        Topic,
        Client_id
    ),
    {Tracker, Diff} = lightspeed@presence:join(
        erlang:element(3, Channel),
        Topic,
        User_id,
        {meta, Presence_ref, Now_ms}
    ),
    Memberships_rev = upsert_membership(
        erlang:element(4, Channel),
        {membership, Client_id, Topic, User_id, Presence_ref}
    ),
    _pipe = {channel,
        Broker,
        Tracker,
        Memberships_rev,
        erlang:element(5, Channel),
        erlang:element(6, Channel)},
    _pipe@1 = emit(
        _pipe,
        {reply, Client_id, Topic, <<"ok"/utf8>>, Reply_payload}
    ),
    emit(_pipe@1, {presence_diff, Topic, Diff}).

-file("src/lightspeed/channel.gleam", 150).
-spec apply_event(channel(), event()) -> channel().
apply_event(Channel, Event) ->
    case Event of
        {join, Client_id, Topic, User_id, Now_ms} ->
            join(Channel, Client_id, Topic, User_id, Now_ms, <<"joined"/utf8>>);

        {rejoin, Client_id@1, Topic@1, User_id@1, Now_ms@1} ->
            {Membership_state, Previous} = remove_membership(
                Channel,
                Client_id@1,
                Topic@1
            ),
            Membership_state@1 = case Previous of
                none ->
                    Membership_state;

                {some, Membership} ->
                    Broker = lightspeed@pubsub:unsubscribe(
                        erlang:element(2, Membership_state),
                        erlang:element(3, Membership),
                        erlang:element(2, Membership)
                    ),
                    {Tracker, Diff} = lightspeed@presence:leave(
                        erlang:element(3, Membership_state),
                        erlang:element(3, Membership),
                        erlang:element(4, Membership),
                        erlang:element(5, Membership)
                    ),
                    _pipe = {channel,
                        Broker,
                        Tracker,
                        erlang:element(4, Membership_state),
                        erlang:element(5, Membership_state),
                        erlang:element(6, Membership_state)},
                    emit(
                        _pipe,
                        {presence_diff, erlang:element(3, Membership), Diff}
                    )
            end,
            join(
                Membership_state@1,
                Client_id@1,
                Topic@1,
                User_id@1,
                Now_ms@1,
                <<"rejoined"/utf8>>
            );

        {leave, Client_id@2, Topic@2} ->
            leave(Channel, Client_id@2, Topic@2);

        {broadcast, Client_id@3, Topic@3, Payload} ->
            case has_membership(Channel, Client_id@3, Topic@3) of
                false ->
                    emit(
                        Channel,
                        {reply,
                            Client_id@3,
                            Topic@3,
                            <<"error"/utf8>>,
                            <<"not_joined"/utf8>>}
                    );

                true ->
                    {_, Deliveries} = lightspeed@pubsub:publish(
                        erlang:element(2, Channel),
                        Topic@3,
                        Payload
                    ),
                    Recipients = gleam@list:map(
                        Deliveries,
                        fun lightspeed@pubsub:delivery_subscriber/1
                    ),
                    Pushed = push_deliveries(Channel, Deliveries),
                    _pipe@1 = Pushed,
                    _pipe@2 = emit(
                        _pipe@1,
                        {broadcast_message, Topic@3, Payload, Recipients}
                    ),
                    emit(
                        _pipe@2,
                        {reply,
                            Client_id@3,
                            Topic@3,
                            <<"ok"/utf8>>,
                            <<"broadcast:"/utf8,
                                (erlang:integer_to_binary(
                                    erlang:length(Recipients)
                                ))/binary>>}
                    )
            end;

        {push, Client_id@4, Topic@4, Event@1, Payload@1} ->
            case has_membership(Channel, Client_id@4, Topic@4) of
                false ->
                    emit(
                        Channel,
                        {reply,
                            Client_id@4,
                            Topic@4,
                            <<"error"/utf8>>,
                            <<"not_joined"/utf8>>}
                    );

                true ->
                    _pipe@3 = Channel,
                    _pipe@4 = emit(
                        _pipe@3,
                        {push_message, Client_id@4, Topic@4, Event@1, Payload@1}
                    ),
                    emit(
                        _pipe@4,
                        {reply,
                            Client_id@4,
                            Topic@4,
                            <<"ok"/utf8>>,
                            <<"pushed"/utf8>>}
                    )
            end;

        {crash, Reason} ->
            _pipe@5 = {channel,
                erlang:element(2, Channel),
                erlang:element(3, Channel),
                erlang:element(4, Channel),
                erlang:element(5, Channel),
                true},
            emit(_pipe@5, {system, <<"crashed:"/utf8, Reason/binary>>});

        restart ->
            Channel
    end.

-file("src/lightspeed/channel.gleam", 138).
?DOC(" Stable event label.\n").
-spec event_label(event()) -> binary().
event_label(Event) ->
    case Event of
        {join, _, Topic, _, _} ->
            <<"join:"/utf8, Topic/binary>>;

        {rejoin, _, Topic@1, _, _} ->
            <<"rejoin:"/utf8, Topic@1/binary>>;

        {leave, _, Topic@2} ->
            <<"leave:"/utf8, Topic@2/binary>>;

        {broadcast, _, Topic@3, _} ->
            <<"broadcast:"/utf8, Topic@3/binary>>;

        {push, _, Topic@4, Event@1, _} ->
            <<<<<<"push:"/utf8, Topic@4/binary>>/binary, ":"/utf8>>/binary,
                Event@1/binary>>;

        {crash, Reason} ->
            <<"crash:"/utf8, Reason/binary>>;

        restart ->
            <<"restart"/utf8>>
    end.

-file("src/lightspeed/channel.gleam", 72).
?DOC(" Handle one channel event.\n").
-spec handle(channel(), event()) -> channel().
handle(Channel, Event) ->
    case erlang:element(6, Channel) of
        true ->
            case Event of
                restart ->
                    _pipe = {channel,
                        erlang:element(2, Channel),
                        erlang:element(3, Channel),
                        erlang:element(4, Channel),
                        erlang:element(5, Channel),
                        false},
                    emit(_pipe, {system, <<"restarted"/utf8>>});

                _ ->
                    emit(
                        Channel,
                        {system,
                            <<"ignored_while_crashed:"/utf8,
                                (event_label(Event))/binary>>}
                    )
            end;

        false ->
            apply_event(Channel, Event)
    end.

-file("src/lightspeed/channel.gleam", 92).
?DOC(" Flush outbox in emit order.\n").
-spec flush_outbox(channel()) -> {channel(), list(outbound())}.
flush_outbox(Channel) ->
    Outbox = lists:reverse(erlang:element(5, Channel)),
    {{channel,
            erlang:element(2, Channel),
            erlang:element(3, Channel),
            erlang:element(4, Channel),
            [],
            erlang:element(6, Channel)},
        Outbox}.

-file("src/lightspeed/channel.gleam", 98).
?DOC(" True when channel is crashed.\n").
-spec crashed(channel()) -> boolean().
crashed(Channel) ->
    erlang:element(6, Channel).

-file("src/lightspeed/channel.gleam", 103).
?DOC(" PubSub adapter label.\n").
-spec adapter_label(channel()) -> binary().
adapter_label(Channel) ->
    _pipe = erlang:element(2, Channel),
    _pipe@1 = lightspeed@pubsub:adapter(_pipe),
    lightspeed@pubsub:adapter_label(_pipe@1).

-file("src/lightspeed/channel.gleam", 110).
?DOC(" Subscribers for one topic.\n").
-spec subscribers(channel(), binary()) -> list(binary()).
subscribers(Channel, Topic) ->
    lightspeed@pubsub:subscribers(erlang:element(2, Channel), Topic).

-file("src/lightspeed/channel.gleam", 115).
?DOC(" Presence entries for one topic.\n").
-spec presences(channel(), binary()) -> list(lightspeed@presence:entry()).
presences(Channel, Topic) ->
    lightspeed@presence:list_topic(erlang:element(3, Channel), Topic).

-file("src/lightspeed/channel.gleam", 120).
?DOC(" Membership count.\n").
-spec membership_count(channel()) -> integer().
membership_count(Channel) ->
    erlang:length(erlang:element(4, Channel)).

-file("src/lightspeed/channel.gleam", 125).
?DOC(" Stable outbound label.\n").
-spec outbound_label(outbound()) -> binary().
outbound_label(Outbound) ->
    case Outbound of
        {reply, _, Topic, Status, _} ->
            <<<<<<"reply:"/utf8, Topic/binary>>/binary, ":"/utf8>>/binary,
                Status/binary>>;

        {push_message, _, Topic@1, Event, _} ->
            <<<<<<"push:"/utf8, Topic@1/binary>>/binary, ":"/utf8>>/binary,
                Event/binary>>;

        {broadcast_message, Topic@2, _, Recipients} ->
            <<<<<<"broadcast:"/utf8, Topic@2/binary>>/binary, ":"/utf8>>/binary,
                (erlang:integer_to_binary(erlang:length(Recipients)))/binary>>;

        {presence_diff, Topic@3, Diff} ->
            <<<<<<"presence:"/utf8, Topic@3/binary>>/binary, ":"/utf8>>/binary,
                (lightspeed@presence:diff_label(Diff))/binary>>;

        {system, Label} ->
            <<"system:"/utf8, Label/binary>>
    end.