-module(lake_connection).
-export([connect/6,
tls_connect/6,
stop/1]).
-export([declare_publisher/4,
publish_sync/3,
publish_async/3,
query_publisher_sequence/3,
delete_publisher/2,
credit_async/3,
create/3,
delete/2,
subscribe/6,
store_offset/4,
query_offset/3,
unsubscribe/2,
metadata/2,
route/3,
partitions/2,
stream_stats/2,
consumer_update_response/4]).
-behaviour(gen_server).
-export([init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2]).
-include("response_codes.hrl").
-include("lake_connection_state.hrl").
tls_connect(Host, Port, User, Password, Vhost, Options) ->
gen_server:start_link(?MODULE, [tls, [Host, Port, User, Password, Vhost, Options]], []).
connect(Host, Port, User, Password, Vhost, Options) ->
gen_server:start_link(?MODULE, [plain, [Host, Port, User, Password, Vhost, Options]], []).
%% FIXME creating the message binaries should be done in the caller if possible
%% (not possible if the caller needs to be blocked, ie. for requests with
%% CorrId)
declare_publisher(Connection, Stream, PublisherId, PublisherReference) ->
Result =
gen_server:call(Connection, {declare_publisher, self(), Stream, PublisherId, PublisherReference}),
case Result of
{declare_publisher_response, _, ?RESPONSE_OK} ->
ok;
{declare_publisher_response, _, ResponseCode} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
publish_sync(Connection, PublisherId, Messages) ->
ok = publish_async(Connection, PublisherId, Messages),
%% We need to wait for all confirmations
MessageCount = length(Messages),
wait_for_confirmations(MessageCount, []).
publish_async(Connection, PublisherId, Messages) ->
%% FIXME encode messages here?
ok = gen_server:call(Connection, {publish_async, PublisherId, Messages}).
wait_for_confirmations(0, PublishingIds) ->
[case Result of
{Id, Code} ->
{Id, lake_utils:response_code_to_atom(Code)};
Id ->
{Id, lake_utils:response_code_to_atom(?RESPONSE_OK)}
end
|| Result <- PublishingIds];
wait_for_confirmations(Count, PublishingIds0) ->
receive
{publish_confirm, _PublisherId, PublishingIdCount, PublishingIds} ->
wait_for_confirmations(Count - PublishingIdCount, PublishingIds ++ PublishingIds0);
{publish_error, _PublisherId, PublishingIdCount, PublishingCodeById} ->
wait_for_confirmations(Count - PublishingIdCount, PublishingCodeById ++ PublishingIds0)
end.
query_publisher_sequence(Connection, PublisherReference, Stream) ->
case gen_server:call(Connection, {query_publisher_sequence, PublisherReference, Stream}) of
{query_publisher_sequence_response, _, ?RESPONSE_OK, Sequence} ->
{ok, Sequence};
{query_publisher_sequence_response, _, ResponseCode, _} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
delete_publisher(Connection, PublisherId) ->
case gen_server:call(Connection, {delete_publisher, PublisherId}) of
{delete_publisher_response, _, ?RESPONSE_OK} ->
ok;
{delete_publisher_response, _, ResponseCode} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
credit_async(Connection, SubscriptionId, Credit) ->
gen_server:call(Connection, {credit, SubscriptionId, Credit}).
create(Connection, Stream, Arguments) ->
case gen_server:call(Connection, {create, Stream, Arguments}) of
{create_response, _, ?RESPONSE_OK} ->
ok;
{create_response, _, ?RESPONSE_STREAM_ALREADY_EXISTS} ->
ok;
{create_response, _, ResponseCode} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
delete(Connection, Stream) ->
case gen_server:call(Connection, {delete, Stream}) of
{delete_response, _, ?RESPONSE_OK} ->
ok;
{delete_response, _, ResponseCode} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
subscribe(Connection, Stream, SubscriptionId, OffsetDefinition, Credit, Properties) ->
Subscriber = self(),
Result =
gen_server:call(Connection,
{subscribe,
Subscriber,
Stream,
SubscriptionId,
OffsetDefinition,
Credit,
Properties}),
case Result of
{subscribe_response, _, ?RESPONSE_OK} ->
ok;
{subscribe_response, _, ResponseCode} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
store_offset(Connection, PublisherReference, Stream, Offset) ->
gen_server:call(Connection, {store_offset, PublisherReference, Stream, Offset}).
query_offset(Connection, PublisherReference, Stream) ->
case gen_server:call(Connection, {query_offset, PublisherReference, Stream}) of
{query_offset_response, _, ?RESPONSE_OK, Offset} ->
{ok, Offset};
{query_offset_response, _, ResponseCode, _Offset} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
unsubscribe(Connection, SubscriptionId) ->
case gen_server:call(Connection, {unsubscribe, SubscriptionId}) of
{unsubscribe_response, _CorrelationId, ?RESPONSE_OK} ->
ok;
{unsubscribe_response, _CorrelationId, ResponseCode} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
metadata(Connection, Streams) ->
{metadata_response, _CorrelationId, Endpoints, Metadata} =
gen_server:call(Connection, {metadata, Streams}),
{ok, Endpoints, Metadata}.
route(Connection, RoutingKey, SuperStream) ->
case gen_server:call(Connection, {route, RoutingKey, SuperStream}) of
{route_response, _CorrelationId, ?RESPONSE_OK, Streams} ->
{ok, Streams};
{route_response, _CorrelationId, ResponseCode, _} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
partitions(Connection, SuperStream) ->
case gen_server:call(Connection, {partitions, SuperStream}) of
{partitions_response, _CorrelationId, ?RESPONSE_OK, Streams} ->
{ok, Streams};
{partitions_response, _CorrelationId, ResponseCode, _} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
stream_stats(Connection, SuperStream) ->
case gen_server:call(Connection, {stream_stats, SuperStream}) of
{stream_stats_response, _CorrelationId, ?RESPONSE_OK, Streams} ->
{ok, Streams};
{stream_stats_response, _CorrelationId, ResponseCode, _} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
consumer_update_response(Connection, CorrelationId, ResponseCode, OffsetSpecification) ->
gen_server:call(Connection,
{consumer_update_response, CorrelationId, ResponseCode, OffsetSpecification}).
stop(Connection) ->
case gen_server:call(Connection, {close, ?RESPONSE_OK, <<"stop">>}) of
{close_response, _Corr, ?RESPONSE_OK} ->
ok;
{close_response, _Corr, ResponseCode} ->
{error, lake_utils:response_code_to_atom(ResponseCode)}
end.
init([Type, ConnectionArgs]) ->
case create_connection(Type, ConnectionArgs) of
E = {error, _} ->
E;
{ok, {Socket, _FrameMax, NegotiatedHeartbeat}} ->
{ok, Heartbeat} = heartbeat_process(NegotiatedHeartbeat),
{ok,
#state{correlation_id = 3, % sent two messages to establish the connection
socket = Socket,
heartbeat = Heartbeat}}
end.
create_connection(tls, ConnectionArgs) ->
lake_raw_connection:tls_connect(ConnectionArgs);
create_connection(plain, ConnectionArgs) ->
lake_raw_connection:connect(ConnectionArgs).
heartbeat_process(NegotiatedHeartbeat) when NegotiatedHeartbeat > 0 ->
lake_heartbeat:start_link(NegotiatedHeartbeat, self());
heartbeat_process(_) ->
{ok, undefined}.
handle_call({declare_publisher, Publisher, Stream, PublisherId, PublisherReference},
From,
State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
DeclarePublisher = lake_messages:declare_publisher(Corr, Stream, PublisherId, PublisherReference),
ok = lake_utils:send_message(Socket, DeclarePublisher),
State1 =
lake_connection_state:register_pending_request(Corr,
From,
{Publisher, PublisherId, Stream},
State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({publish_async, PublisherId, Messages}, From, State) ->
Socket = State#state.socket,
Publish = lake_messages:publish(PublisherId, Messages),
gen_server:reply(From, ok),
ok = lake_utils:send_message(Socket, Publish),
{noreply, State};
handle_call({query_publisher_sequence, PublisherReference, Stream}, From, State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
QueryPublisherSequence = lake_messages:query_publisher_sequence(Corr, PublisherReference, Stream),
ok = lake_utils:send_message(Socket, QueryPublisherSequence),
State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({delete_publisher, PublisherId}, From, State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
DeletePublisher = lake_messages:delete_publisher(Corr, PublisherId),
ok = lake_utils:send_message(Socket, DeletePublisher),
State1 = lake_connection_state:register_pending_request(Corr, From, PublisherId, State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({credit, SubscriptionId, Credit}, _From, State) ->
Socket = State#state.socket,
case State#state.subscriptions of
#{SubscriptionId := _} ->
CreditMessage = lake_messages:credit(SubscriptionId, Credit),
{reply, lake_utils:send_message(Socket, CreditMessage), State};
_ ->
{reply, {error, unknown_subscription}, State}
end;
handle_call({create, Stream, Arguments}, From, State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
Create = lake_messages:create(Corr, Stream, Arguments),
ok = lake_utils:send_message(Socket, Create),
State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({delete, Stream}, From, State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
Delete = lake_messages:delete(Corr, Stream),
ok = lake_utils:send_message(Socket, Delete),
State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({subscribe, Subscriber, Stream, SubscriptionId, OffsetDefinition, Credit, Properties},
From,
State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
Subscribe =
lake_messages:subscribe(Corr, Stream, SubscriptionId, OffsetDefinition, Credit, Properties),
ok = lake_utils:send_message(Socket, Subscribe),
State1 =
lake_connection_state:register_pending_request(Corr,
From,
{Subscriber, SubscriptionId, Stream},
State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({store_offset, PublisherReference, Stream, Offset}, _From, State) ->
Socket = State#state.socket,
StoreOffset = lake_messages:store_offset(PublisherReference, Stream, Offset),
{reply, lake_utils:send_message(Socket, StoreOffset), State};
handle_call({query_offset, PublisherReference, Stream}, From, State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
QueryOffset = lake_messages:query_offset(Corr, PublisherReference, Stream),
ok = lake_utils:send_message(Socket, QueryOffset),
State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({unsubscribe, SubscriptionId}, From, State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
Unsubscribe = lake_messages:unsubscribe(Corr, SubscriptionId),
ok = lake_utils:send_message(Socket, Unsubscribe),
State1 = lake_connection_state:register_pending_request(Corr, From, SubscriptionId, State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({metadata, Streams}, From, State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
Metadata = lake_messages:metadata(Corr, Streams),
ok = lake_utils:send_message(Socket, Metadata),
State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({close, ResponseCode, Reason}, From, State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
Close = lake_messages:close(Corr, ResponseCode, Reason),
ok = lake_utils:send_message(Socket, Close),
State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
State2 = lake_connection_state:inc_correlation_id(State1),
%% When shutting down, we need to preemptively stop the heartbeat process. The process might
%% send a heartbeat request during the time this connection is still alive, but the socket is
%% already closed. That would result in a crash.
case State0#state.heartbeat of
undefined ->
ok;
HB ->
flush_heartbeat_messages(),
ok = gen_server:stop(HB)
end,
{noreply, State2};
handle_call({route, RoutingKey, SuperStream}, From, State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
Frame = lake_messages:route(Corr, RoutingKey, SuperStream),
ok = lake_utils:send_message(Socket, Frame),
State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({partitions, SuperStream}, From, State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
Frame = lake_messages:partitions(Corr, SuperStream),
ok = lake_utils:send_message(Socket, Frame),
State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({stream_stats, SuperStream}, From, State0) ->
Corr = State0#state.correlation_id,
Socket = State0#state.socket,
Frame = lake_messages:stream_stats(Corr, SuperStream),
ok = lake_utils:send_message(Socket, Frame),
State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
State2 = lake_connection_state:inc_correlation_id(State1),
{noreply, State2};
handle_call({consumer_update_response, Corr, ResponseCode, OffsetSpecification}, _From, State) ->
Socket = State#state.socket,
Metadata = lake_messages:consumer_update_response(Corr, ResponseCode, OffsetSpecification),
ok = lake_utils:send_message(Socket, Metadata),
{reply, ok, State};
handle_call({debug, forward, Message}, _From, State = #state{socket = {Module, Socket}}) ->
%% This call is meant for testing.
{reply, Module:send(Socket, Message), State};
handle_call(Call, _From, State) ->
{reply, {unknown, Call}, State}.
handle_cast(heartbeat, State) ->
Socket = State#state.socket,
Heartbeat = lake_messages:heartbeat(),
ok = lake_utils:send_message(Socket, Heartbeat),
{noreply, State};
handle_cast(_Cast, State) ->
{noreply, State}.
%% FIXME can we avoid parsing here? Only determine where the message should end up and forward_and_reply it to the owner
%% This might require to copy the binary slices (if the buffer holds multiple messages) - a large message might be referenced by multiple subscribers.
handle_info({tcp, Socket, Packet}, State = #state{socket = {_Module, Socket}}) ->
inet:setopts(Socket, [{active, once}]),
handle_packet(Packet, State);
handle_info({ssl, Socket, Packet}, State = #state{socket = {_Module, Socket}}) ->
ssl:setopts(Socket, [{active, once}]),
handle_packet(Packet, State);
handle_info({tcp_closed, Port}, State = #state{socket = {_Module, Port}}) ->
{stop, normal, State};
handle_info({ssl_closed, Port}, State = #state{socket = {_Module, Port}}) ->
{stop, normal, State}.
handle_packet(Packet, State0 = #state{rx_buf = Buf0}) ->
{Messages, BufRest} = split_into_messages(<<Buf0/binary, Packet/binary>>),
Socket = State0#state.socket,
State1 = add_new_subscriptions(Messages, State0),
State2 = add_new_publishers(Messages, State1),
forward_and_reply(Messages, State2),
State3 = clean_publishers(Messages, State2),
State4 = clean_subscriptions(Messages, State3),
State5 = clean_pending_requests(Messages, State4),
State6 = State5#state{rx_buf = BufRest},
%% Check if the remote told us to close the connection
case lists:keyfind(close, 1, Messages) of
{close, Corr, _ResponseCode, Reason} ->
CloseResponse = lake_messages:close_response(Corr, ?RESPONSE_OK),
ok = lake_utils:send_message(Socket, CloseResponse),
{stop, {close, Reason}, State6};
false ->
{noreply, State6}
end.
terminate(Reason, _State) ->
Reason.
split_into_messages(Buffer) ->
split_into_messages(Buffer, []).
split_into_messages(<<Size:32, Buf:Size/binary, Rest/binary>>, Acc) ->
split_into_messages(Rest, [lake_messages:parse(Buf) | Acc]);
split_into_messages(Rest, Acc) ->
{lists:reverse(Acc), Rest}.
add_new_subscriptions(Messages, State) ->
lists:foldl(fun add_new_subscriptions1/2, State, Messages).
add_new_subscriptions1({subscribe_response, Corr, ?RESPONSE_OK}, State0) ->
#state{pending_requests = #{Corr := {_From, {Subscriber, SubscriptionId, Stream}}}} = State0,
lake_connection_state:add_subscription(SubscriptionId, Subscriber, Stream, State0);
add_new_subscriptions1({subscribe_response, _Corr, _ResponseCode}, State) ->
State;
add_new_subscriptions1(_Other, State) ->
State.
add_new_publishers(Messages, State) ->
lists:foldl(fun add_new_publishers1/2, State, Messages).
add_new_publishers1({declare_publisher_response, Corr, ?RESPONSE_OK}, State0) ->
#state{pending_requests = #{Corr := {_From, {Publisher, PublisherId, Stream}}}} = State0,
lake_connection_state:add_publisher(PublisherId, Publisher, Stream, State0);
add_new_publishers1({declare_publisher_response, _Corr, _}, State) ->
State;
add_new_publishers1(_Other, State) ->
State.
forward_and_reply(Messages, State) ->
lists:foreach(fun(Message) -> forward_and_reply1(recipient_from_message(Message, State), Message)
end,
Messages).
forward_and_reply1({send, Pid}, Message) ->
Pid ! Message;
forward_and_reply1({reply, From}, Message) ->
gen_server:reply(From, Message);
forward_and_reply1({cast, Pid}, Message) ->
gen_server:cast(Pid, Message);
forward_and_reply1(ignore, _Message) ->
ok;
forward_and_reply1(List, Message) when is_list(List) ->
lists:foreach(fun(Recipient) -> forward_and_reply1(Recipient, Message) end, List).
recipient_from_message({declare_publisher_response, Corr, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({publish_confirm, PublisherId, _, _}, State) ->
recipient_from_publishers(PublisherId, State);
recipient_from_message({publish_error, PublisherId, _, _}, State) ->
recipient_from_publishers(PublisherId, State);
recipient_from_message({query_publisher_sequence_response, Corr, _, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({delete_publisher_response, Corr, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({subscribe_response, Corr, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({deliver, SubscriptionId, _}, State) ->
recipient_from_subscriptions(SubscriptionId, State);
recipient_from_message({deliver_v2, SubscriptionId, _, _}, State) ->
recipient_from_subscriptions(SubscriptionId, State);
recipient_from_message({credit_response, SubscriptionId, _}, State) ->
recipient_from_subscriptions(SubscriptionId, State);
recipient_from_message({query_offset_response, Corr, _, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({unsubscribe_response, Corr, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({create_response, Corr, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({delete_response, Corr, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({metadata_response, Corr, _, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({metadata_update, _, Stream}, State) ->
recipients_from_stream(Stream, State);
recipient_from_message({close_response, Corr, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({close, _, _, _}, _) ->
%% RabbitMQ sent us a message to close the connection; we need to handle this separately.
[];
recipient_from_message({heartbeat}, State) ->
case State#state.heartbeat of
undefined ->
ignore;
Pid ->
{cast, Pid}
end;
recipient_from_message({route_response, Corr, _, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({partitions_response, Corr, _, _}, State) ->
recipient_from_corr(Corr, State);
recipient_from_message({consumer_update, _Corr, SubscriptionId, _}, State) ->
recipient_from_subscriptions(SubscriptionId, State);
recipient_from_message({stream_stats_response, Corr, _, _}, State) ->
recipient_from_corr(Corr, State).
recipient_from_corr(Corr, State) ->
From = lake_connection_state:requestor_from_correlation_id(Corr, State),
{reply, From}.
recipient_from_publishers(PublisherId, State) ->
Publisher = lake_connection_state:publisher_from_publisher_id(PublisherId, State),
{send, Publisher}.
recipient_from_subscriptions(SubscriptionId, State) ->
Subscriber = lake_connection_state:subscriber_from_subscription_id(SubscriptionId, State),
{send, Subscriber}.
recipients_from_stream(Stream, State) ->
Subscribers = lake_connection_state:stream_subscriptions(Stream, State),
Publishers = lake_connection_state:stream_publishers(Stream, State),
[{send, Pid} || Pid <- Subscribers ++ Publishers].
clean_publishers(Messages, State) ->
lists:foldl(fun clean_publishers1/2, State, Messages).
clean_publishers1({delete_publisher_response, Corr, ?RESPONSE_OK}, State0) ->
#state{pending_requests = #{Corr := {_From, PublisherId}}} = State0,
lake_connection_state:remove_publisher(PublisherId, State0);
clean_publishers1({delete_publisher_response, _Corr, _}, State) ->
State;
clean_publishers1({metadata_update, ?RESPONSE_OK, _Stream}, _State) ->
exit("MetadataUpdate with ?RESPONSE_OK not handled");
clean_publishers1({metadata_update, _, Stream}, State) ->
PublisherIds = lake_connection_state:publisher_ids_from_stream(Stream, State),
lists:foldl(fun lake_connection_state:remove_publisher/2, State, PublisherIds);
clean_publishers1(_Other, State) ->
State.
clean_subscriptions(Messages, State) ->
lists:foldl(fun clean_subscriptions1/2, State, Messages).
clean_subscriptions1({unsubscribe_response, Corr, ?RESPONSE_OK}, State0) ->
#state{pending_requests = #{Corr := {_From, SubscriptionId}}} = State0,
lake_connection_state:remove_subscription(SubscriptionId, State0);
clean_subscriptions1({unsubscribe_response, _Corr, _}, _State) ->
exit("UnsubscribeResponse without ?RESPONSE_OK not handled");
clean_subscriptions1({metadata_update, ?RESPONSE_OK, _Stream}, _State) ->
exit("MetadataUpdate with ?RESPONSE_OK not handled");
clean_subscriptions1({metadata_update, _, Stream}, State) ->
SubscriptionIds = lake_connection_state:subscription_ids_from_stream(Stream, State),
lists:foldl(fun lake_connection_state:remove_subscription/2, State, SubscriptionIds);
clean_subscriptions1(_Other, State) ->
State.
clean_pending_requests(Messages, State) ->
lists:foldl(fun clean_pending_requests1/2, State, Messages).
clean_pending_requests1(Message, State) ->
case lake_messages:message_to_correlation_id(Message) of
{ok, Corr} ->
lake_connection_state:remove_pending_request(Corr, State);
{error, _} ->
State
end.
flush_heartbeat_messages() ->
receive
{'$gen_cast', heartbeat} ->
flush_heartbeat_messages()
after 0 ->
ok
end.