-module(lake_connection_state).
-include("lake_connection_state.hrl").
-export([new/3,
publisher_ids_from_stream/2,
subscription_ids_from_stream/2,
inc_correlation_id/1,
register_pending_request/4,
remove_pending_request/2,
add_subscription/4,
remove_subscription/2,
add_publisher/4,
remove_publisher/2,
stream_subscriptions/2,
stream_publishers/2,
subscriber_from_subscription_id/2,
publisher_from_publisher_id/2,
requestor_from_correlation_id/2]).
new(Socket, CorrelationId, Heartbeat) ->
HeartbeatProcess =
if Heartbeat > 0 ->
{ok, HeartbeatProcess0} = lake_heartbeat:start_link(Heartbeat, self()),
HeartbeatProcess0;
true ->
undefined
end,
#state{correlation_id = CorrelationId,
socket = Socket,
heartbeat = HeartbeatProcess}.
publisher_ids_from_stream(Stream, State) ->
case State#state.publishers_by_stream of
#{Stream := Publishers} ->
sets:to_list(Publishers);
_ ->
[]
end.
subscription_ids_from_stream(Stream, State) ->
case State#state.subscriptions_by_stream of
#{Stream := Subscriptions} ->
sets:to_list(Subscriptions);
_ ->
[]
end.
inc_correlation_id(State = #state{correlation_id = Corr}) ->
State#state{correlation_id = Corr + 1}.
register_pending_request(Corr, From, Extra, State) ->
PendingRequests = State#state.pending_requests,
State#state{pending_requests = PendingRequests#{Corr => {From, Extra}, From => Corr}}.
remove_pending_request(Corr, State) ->
#state{pending_requests = #{Corr := {From, _Extra}}} = State,
State#state{pending_requests = maps:remove(From, maps:remove(Corr, State#state.pending_requests))}.
add_subscription(SubscriptionId, Subscriber, Stream, State) ->
Subscriptions = State#state.subscriptions,
State#state{subscriptions = Subscriptions#{SubscriptionId => {Subscriber, Stream}},
subscriptions_by_stream =
add_to_subscriptions_by_stream(Stream,
SubscriptionId,
State#state.subscriptions_by_stream)}.
add_to_subscriptions_by_stream(Stream, SubscriptionId, SubscriptionsByStream) ->
SubscriptionIds = maps:get(Stream, SubscriptionsByStream, sets:new()),
SubscriptionsByStream#{Stream => sets:add_element(SubscriptionId, SubscriptionIds)}.
remove_subscription(SubscriptionId, State) ->
#state{subscriptions = Subscriptions, subscriptions_by_stream = SubscriptionsByStream} = State,
#{SubscriptionId := {_Subscriber, Stream}} = Subscriptions,
State#state{subscriptions = maps:remove(SubscriptionId, Subscriptions),
subscriptions_by_stream =
remove_subscription_by_stream(SubscriptionsByStream, Stream, SubscriptionId)}.
remove_subscription_by_stream(SubscriptionsByStream, Stream, SubscriptionId) ->
#{Stream := Subscriptions0} = SubscriptionsByStream,
Subscriptions1 = sets:del_element(SubscriptionId, Subscriptions0),
case sets:is_empty(Subscriptions1) of
true ->
maps:remove(Stream, SubscriptionsByStream);
false ->
SubscriptionsByStream#{Stream := Subscriptions1}
end.
add_publisher(PublisherId, Publisher, Stream, State) ->
Publishers = State#state.publishers,
State#state{publishers = Publishers#{PublisherId => {Publisher, Stream}},
publishers_by_stream =
add_to_publishers_by_stream(State#state.publishers_by_stream, Stream, PublisherId)}.
add_to_publishers_by_stream(PublishersByStream, Stream, PublisherId) ->
Set = maps:get(Stream, PublishersByStream, sets:new()),
PublishersByStream#{Stream => sets:add_element(PublisherId, Set)}.
remove_publisher(PublisherId, State) ->
#state{publishers = Publishers, publishers_by_stream = PublishersByStream} = State,
#{PublisherId := {_Publisher, Stream}} = Publishers,
State#state{publishers = maps:remove(PublisherId, State#state.publishers),
publishers_by_stream = remove_publisher_by_stream(PublishersByStream, Stream, PublisherId)}.
remove_publisher_by_stream(PublishersByStream, Stream, PublisherId) ->
#{Stream := Publishers0} = PublishersByStream,
Publishers1 = sets:del_element(PublisherId, Publishers0),
case sets:is_empty(Publishers1) of
true ->
maps:remove(Stream, PublishersByStream);
false ->
PublishersByStream#{Stream := Publishers1}
end.
stream_subscriptions(Stream, State) ->
SubscriptionIds = maps:get(Stream, State#state.subscriptions_by_stream, sets:new()),
[subscriber_from_subscription_id(SubscriptionId, State)
|| SubscriptionId <- sets:to_list(SubscriptionIds)].
stream_publishers(Stream, State) ->
PublisherIds = maps:get(Stream, State#state.publishers_by_stream, sets:new()),
[publisher_from_publisher_id(PublisherId, State) || PublisherId <- sets:to_list(PublisherIds)].
subscriber_from_subscription_id(SubscriptionId, State) ->
#state{subscriptions = #{SubscriptionId := {Subscriber, _Stream}}} = State,
Subscriber.
publisher_from_publisher_id(PublisherId, State) ->
#state{publishers = #{PublisherId := {Publisher, _Stream}}} = State,
Publisher.
requestor_from_correlation_id(Corr, State) ->
#state{pending_requests = #{Corr := {From, _Extra}}} = State,
From.