src/lake_connection_state.erl

-module(lake_connection_state).

-include("lake_connection_state.hrl").

-export([
    new/3,
    publisher_ids_from_stream/2,
    subscription_ids_from_stream/2,
    inc_correlation_id/1,
    register_pending_request/4,
    remove_pending_request/2,
    add_subscription/4,
    remove_subscription/2,
    add_publisher/4,
    remove_publisher/2,
    stream_subscriptions/2,
    stream_publishers/2,
    subscriber_from_subscription_id/2,
    publisher_from_publisher_id/2,
    requestor_from_correlation_id/2
]).

new(Socket, CorrelationId, Heartbeat) ->
    HeartbeatProcess =
        if
            Heartbeat > 0 ->
                {ok, HeartbeatProcess0} = lake_heartbeat:start_link(Heartbeat, self()),
                HeartbeatProcess0;
            true ->
                undefined
        end,
    #state{
        correlation_id = CorrelationId,
        socket = Socket,
        heartbeat = HeartbeatProcess
    }.

publisher_ids_from_stream(Stream, State) ->
    case State#state.publishers_by_stream of
        #{Stream := Publishers} ->
            sets:to_list(Publishers);
        _ ->
            []
    end.

subscription_ids_from_stream(Stream, State) ->
    case State#state.subscriptions_by_stream of
        #{Stream := Subscriptions} ->
            sets:to_list(Subscriptions);
        _ ->
            []
    end.

inc_correlation_id(State = #state{correlation_id = Corr}) ->
    State#state{correlation_id = Corr + 1}.

register_pending_request(Corr, From, Extra, State) ->
    PendingRequests = State#state.pending_requests,
    State#state{
        pending_requests = PendingRequests#{Corr => {From, Extra}, From => Corr}
    }.

remove_pending_request(Corr, State) ->
    #state{
        pending_requests = #{Corr := {From, _Extra}}
    } = State,
    State#state{
        pending_requests = maps:remove(From, maps:remove(Corr, State#state.pending_requests))
    }.

add_subscription(SubscriptionId, Subscriber, Stream, State) ->
    Subscriptions = State#state.subscriptions,
    State#state{
        subscriptions = Subscriptions#{SubscriptionId => {Subscriber, Stream}},
        subscriptions_by_stream =
            add_to_subscriptions_by_stream(
                Stream,
                SubscriptionId,
                State#state.subscriptions_by_stream
            )
    }.

add_to_subscriptions_by_stream(Stream, SubscriptionId, SubscriptionsByStream) ->
    SubscriptionIds = maps:get(Stream, SubscriptionsByStream, sets:new()),
    SubscriptionsByStream#{Stream => sets:add_element(SubscriptionId, SubscriptionIds)}.

remove_subscription(SubscriptionId, State) ->
    #state{
        subscriptions = Subscriptions,
        subscriptions_by_stream = SubscriptionsByStream
    } = State,
    #{
        SubscriptionId := {_Subscriber, Stream}
    } = Subscriptions,
    State#state{
        subscriptions = maps:remove(SubscriptionId, Subscriptions),
        subscriptions_by_stream = remove_subscription_by_stream(
            SubscriptionsByStream,
            Stream,
            SubscriptionId
        )
    }.

remove_subscription_by_stream(SubscriptionsByStream, Stream, SubscriptionId) ->
    #{Stream := Subscriptions0} = SubscriptionsByStream,
    Subscriptions1 = sets:del_element(SubscriptionId, Subscriptions0),
    case sets:is_empty(Subscriptions1) of
        true ->
            maps:remove(Stream, SubscriptionsByStream);
        false ->
            SubscriptionsByStream#{Stream := Subscriptions1}
    end.

add_publisher(PublisherId, Publisher, Stream, State) ->
    Publishers = State#state.publishers,
    State#state{
        publishers = Publishers#{PublisherId => {Publisher, Stream}},
        publishers_by_stream = add_to_publishers_by_stream(
            State#state.publishers_by_stream,
            Stream,
            PublisherId
        )
    }.

add_to_publishers_by_stream(PublishersByStream, Stream, PublisherId) ->
    Set = maps:get(Stream, PublishersByStream, sets:new()),
    PublishersByStream#{Stream => sets:add_element(PublisherId, Set)}.

remove_publisher(PublisherId, State) ->
    #state{
        publishers = Publishers,
        publishers_by_stream = PublishersByStream
    } = State,
    #{
        PublisherId := {_Publisher, Stream}
    } = Publishers,
    State#state{
        publishers = maps:remove(PublisherId, State#state.publishers),
        publishers_by_stream = remove_publisher_by_stream(
            PublishersByStream,
            Stream,
            PublisherId
        )
    }.

remove_publisher_by_stream(PublishersByStream, Stream, PublisherId) ->
    #{Stream := Publishers0} = PublishersByStream,
    Publishers1 = sets:del_element(PublisherId, Publishers0),
    case sets:is_empty(Publishers1) of
        true ->
            maps:remove(Stream, PublishersByStream);
        false ->
            PublishersByStream#{Stream := Publishers1}
    end.

stream_subscriptions(Stream, State) ->
    #state{subscriptions_by_stream = #{Stream := SubscriptionIds}} = State,
    [
        subscriber_from_subscription_id(SubscriptionId, State)
        || SubscriptionId <- sets:to_list(SubscriptionIds)
    ].

stream_publishers(Stream, State) ->
    #state{publishers_by_stream = #{Stream := PublisherIds}} = State,
    [
        publisher_from_publisher_id(PublisherId, State)
        || PublisherId <- sets:to_list(PublisherIds)
    ].

subscriber_from_subscription_id(SubscriptionId, State) ->
    #state{
        subscriptions = #{SubscriptionId := {Subscriber, _Stream}}
    } = State,
    Subscriber.

publisher_from_publisher_id(PublisherId, State) ->
    #state{
        publishers = #{PublisherId := {Publisher, _Stream}}
    } = State,
    Publisher.

requestor_from_correlation_id(Corr, State) ->
    #state{
        pending_requests = #{Corr := {From, _Extra}}
    } = State,
    From.