src/lake_connection_state.erl

-module(lake_connection_state).

-include("lake_connection_state.hrl").

-export([new/3,
         publisher_ids_from_stream/2,
         subscription_ids_from_stream/2,
         inc_correlation_id/1,
         register_pending_request/4,
         remove_pending_request/2,
         add_subscription/4,
         remove_subscription/2,
         add_publisher/4,
         remove_publisher/2,
         stream_subscriptions/2,
         stream_publishers/2,
         subscriber_from_subscription_id/2,
         publisher_from_publisher_id/2,
         requestor_from_correlation_id/2]).

new(Socket, CorrelationId, Heartbeat) ->
    HeartbeatProcess =
        if Heartbeat > 0 ->
               {ok, HeartbeatProcess0} = lake_heartbeat:start_link(Heartbeat, self()),
               HeartbeatProcess0;
           true ->
               undefined
        end,
    #state{correlation_id = CorrelationId,
           socket = Socket,
           heartbeat = HeartbeatProcess}.

publisher_ids_from_stream(Stream, State) ->
    case State#state.publishers_by_stream of
        #{Stream := Publishers} ->
            sets:to_list(Publishers);
        _ ->
            []
    end.

subscription_ids_from_stream(Stream, State) ->
    case State#state.subscriptions_by_stream of
        #{Stream := Subscriptions} ->
            sets:to_list(Subscriptions);
        _ ->
            []
    end.

inc_correlation_id(State = #state{correlation_id = Corr}) ->
    State#state{correlation_id = Corr + 1}.

register_pending_request(Corr, From, Extra, State) ->
    PendingRequests = State#state.pending_requests,
    State#state{pending_requests = PendingRequests#{Corr => {From, Extra}, From => Corr}}.

remove_pending_request(Corr, State) ->
    #state{pending_requests = #{Corr := {From, _Extra}}} = State,
    State#state{pending_requests = maps:remove(From, maps:remove(Corr, State#state.pending_requests))}.

add_subscription(SubscriptionId, Subscriber, Stream, State) ->
    Subscriptions = State#state.subscriptions,
    State#state{subscriptions = Subscriptions#{SubscriptionId => {Subscriber, Stream}},
                subscriptions_by_stream =
                    add_to_subscriptions_by_stream(Stream,
                                                   SubscriptionId,
                                                   State#state.subscriptions_by_stream)}.

add_to_subscriptions_by_stream(Stream, SubscriptionId, SubscriptionsByStream) ->
    SubscriptionIds = maps:get(Stream, SubscriptionsByStream, sets:new()),
    SubscriptionsByStream#{Stream => sets:add_element(SubscriptionId, SubscriptionIds)}.

remove_subscription(SubscriptionId, State) ->
    #state{subscriptions = Subscriptions, subscriptions_by_stream = SubscriptionsByStream} = State,
    #{SubscriptionId := {_Subscriber, Stream}} = Subscriptions,
    State#state{subscriptions = maps:remove(SubscriptionId, Subscriptions),
                subscriptions_by_stream =
                    remove_subscription_by_stream(SubscriptionsByStream, Stream, SubscriptionId)}.

remove_subscription_by_stream(SubscriptionsByStream, Stream, SubscriptionId) ->
    #{Stream := Subscriptions0} = SubscriptionsByStream,
    Subscriptions1 = sets:del_element(SubscriptionId, Subscriptions0),
    case sets:is_empty(Subscriptions1) of
        true ->
            maps:remove(Stream, SubscriptionsByStream);
        false ->
            SubscriptionsByStream#{Stream := Subscriptions1}
    end.

add_publisher(PublisherId, Publisher, Stream, State) ->
    Publishers = State#state.publishers,
    State#state{publishers = Publishers#{PublisherId => {Publisher, Stream}},
                publishers_by_stream =
                    add_to_publishers_by_stream(State#state.publishers_by_stream, Stream, PublisherId)}.

add_to_publishers_by_stream(PublishersByStream, Stream, PublisherId) ->
    Set = maps:get(Stream, PublishersByStream, sets:new()),
    PublishersByStream#{Stream => sets:add_element(PublisherId, Set)}.

remove_publisher(PublisherId, State) ->
    #state{publishers = Publishers, publishers_by_stream = PublishersByStream} = State,
    #{PublisherId := {_Publisher, Stream}} = Publishers,
    State#state{publishers = maps:remove(PublisherId, State#state.publishers),
                publishers_by_stream = remove_publisher_by_stream(PublishersByStream, Stream, PublisherId)}.

remove_publisher_by_stream(PublishersByStream, Stream, PublisherId) ->
    #{Stream := Publishers0} = PublishersByStream,
    Publishers1 = sets:del_element(PublisherId, Publishers0),
    case sets:is_empty(Publishers1) of
        true ->
            maps:remove(Stream, PublishersByStream);
        false ->
            PublishersByStream#{Stream := Publishers1}
    end.

stream_subscriptions(Stream, State) ->
    SubscriptionIds = maps:get(Stream, State#state.subscriptions_by_stream, sets:new()),
    [subscriber_from_subscription_id(SubscriptionId, State)
     || SubscriptionId <- sets:to_list(SubscriptionIds)].

stream_publishers(Stream, State) ->
    PublisherIds = maps:get(Stream, State#state.publishers_by_stream, sets:new()),
    [publisher_from_publisher_id(PublisherId, State) || PublisherId <- sets:to_list(PublisherIds)].

subscriber_from_subscription_id(SubscriptionId, State) ->
    #state{subscriptions = #{SubscriptionId := {Subscriber, _Stream}}} = State,
    Subscriber.

publisher_from_publisher_id(PublisherId, State) ->
    #state{publishers = #{PublisherId := {Publisher, _Stream}}} = State,
    Publisher.

requestor_from_correlation_id(Corr, State) ->
    #state{pending_requests = #{Corr := {From, _Extra}}} = State,
    From.