src/lake_connection.erl

-module(lake_connection).

-export([connect/6,
         tls_connect/6,
         stop/1]).
-export([declare_publisher/4,
         publish_sync/3,
         publish_async/3,
         query_publisher_sequence/3,
         delete_publisher/2,
         credit_async/3,
         create/3,
         delete/2,
         subscribe/6,
         store_offset/4,
         query_offset/3,
         unsubscribe/2,
         metadata/2,
         route/3,
         partitions/2,
         stream_stats/2,
         consumer_update_response/4]).

-behaviour(gen_server).

-export([init/1,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         terminate/2]).

-include("response_codes.hrl").
-include("lake_connection_state.hrl").

tls_connect(Host, Port, User, Password, Vhost, Options) ->
    gen_server:start_link(?MODULE, [tls, [Host, Port, User, Password, Vhost, Options]], []).

connect(Host, Port, User, Password, Vhost, Options) ->
    gen_server:start_link(?MODULE, [plain, [Host, Port, User, Password, Vhost, Options]], []).

%% FIXME creating the message binaries should be done in the caller if possible
%% (not possible if the caller needs to be blocked, ie. for requests with
%% CorrId)
declare_publisher(Connection, Stream, PublisherId, PublisherReference) ->
    Result =
        gen_server:call(Connection, {declare_publisher, self(), Stream, PublisherId, PublisherReference}),
    case Result of
        {declare_publisher_response, _, ?RESPONSE_OK} ->
            ok;
        {declare_publisher_response, _, ResponseCode} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

publish_sync(Connection, PublisherId, Messages) ->
    ok = publish_async(Connection, PublisherId, Messages),
    %% We need to wait for all confirmations
    MessageCount = length(Messages),
    wait_for_confirmations(MessageCount, []).

publish_async(Connection, PublisherId, Messages) ->
    %% FIXME encode messages here?
    ok = gen_server:call(Connection, {publish_async, PublisherId, Messages}).

wait_for_confirmations(0, PublishingIds) ->
    [case Result of
         {Id, Code} ->
             {Id, lake_utils:response_code_to_atom(Code)};
         Id ->
             {Id, lake_utils:response_code_to_atom(?RESPONSE_OK)}
     end
     || Result <- PublishingIds];
wait_for_confirmations(Count, PublishingIds0) ->
    receive
        {publish_confirm, _PublisherId, PublishingIdCount, PublishingIds} ->
            wait_for_confirmations(Count - PublishingIdCount, PublishingIds ++ PublishingIds0);
        {publish_error, _PublisherId, PublishingIdCount, PublishingCodeById} ->
            wait_for_confirmations(Count - PublishingIdCount, PublishingCodeById ++ PublishingIds0)
    end.

query_publisher_sequence(Connection, PublisherReference, Stream) ->
    case gen_server:call(Connection, {query_publisher_sequence, PublisherReference, Stream}) of
        {query_publisher_sequence_response, _, ?RESPONSE_OK, Sequence} ->
            {ok, Sequence};
        {query_publisher_sequence_response, _, ResponseCode, _} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

delete_publisher(Connection, PublisherId) ->
    case gen_server:call(Connection, {delete_publisher, PublisherId}) of
        {delete_publisher_response, _, ?RESPONSE_OK} ->
            ok;
        {delete_publisher_response, _, ResponseCode} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

credit_async(Connection, SubscriptionId, Credit) ->
    gen_server:call(Connection, {credit, SubscriptionId, Credit}).

create(Connection, Stream, Arguments) ->
    case gen_server:call(Connection, {create, Stream, Arguments}) of
        {create_response, _, ?RESPONSE_OK} ->
            ok;
        {create_response, _, ?RESPONSE_STREAM_ALREADY_EXISTS} ->
            ok;
        {create_response, _, ResponseCode} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

delete(Connection, Stream) ->
    case gen_server:call(Connection, {delete, Stream}) of
        {delete_response, _, ?RESPONSE_OK} ->
            ok;
        {delete_response, _, ResponseCode} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

subscribe(Connection, Stream, SubscriptionId, OffsetDefinition, Credit, Properties) ->
    Subscriber = self(),
    Result =
        gen_server:call(Connection,
                        {subscribe,
                         Subscriber,
                         Stream,
                         SubscriptionId,
                         OffsetDefinition,
                         Credit,
                         Properties}),
    case Result of
        {subscribe_response, _, ?RESPONSE_OK} ->
            ok;
        {subscribe_response, _, ResponseCode} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

store_offset(Connection, PublisherReference, Stream, Offset) ->
    gen_server:call(Connection, {store_offset, PublisherReference, Stream, Offset}).

query_offset(Connection, PublisherReference, Stream) ->
    case gen_server:call(Connection, {query_offset, PublisherReference, Stream}) of
        {query_offset_response, _, ?RESPONSE_OK, Offset} ->
            {ok, Offset};
        {query_offset_response, _, ResponseCode, _Offset} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

unsubscribe(Connection, SubscriptionId) ->
    case gen_server:call(Connection, {unsubscribe, SubscriptionId}) of
        {unsubscribe_response, _CorrelationId, ?RESPONSE_OK} ->
            ok;
        {unsubscribe_response, _CorrelationId, ResponseCode} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

metadata(Connection, Streams) ->
    {metadata_response, _CorrelationId, Endpoints, Metadata} =
        gen_server:call(Connection, {metadata, Streams}),
    {ok, Endpoints, Metadata}.

route(Connection, RoutingKey, SuperStream) ->
    case gen_server:call(Connection, {route, RoutingKey, SuperStream}) of
        {route_response, _CorrelationId, ?RESPONSE_OK, Streams} ->
            {ok, Streams};
        {route_response, _CorrelationId, ResponseCode, _} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

partitions(Connection, SuperStream) ->
    case gen_server:call(Connection, {partitions, SuperStream}) of
        {partitions_response, _CorrelationId, ?RESPONSE_OK, Streams} ->
            {ok, Streams};
        {partitions_response, _CorrelationId, ResponseCode, _} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

stream_stats(Connection, SuperStream) ->
    case gen_server:call(Connection, {stream_stats, SuperStream}) of
        {stream_stats_response, _CorrelationId, ?RESPONSE_OK, Streams} ->
            {ok, Streams};
        {stream_stats_response, _CorrelationId, ResponseCode, _} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

consumer_update_response(Connection, CorrelationId, ResponseCode, OffsetSpecification) ->
    gen_server:call(Connection,
                    {consumer_update_response, CorrelationId, ResponseCode, OffsetSpecification}).

stop(Connection) ->
    case gen_server:call(Connection, {close, ?RESPONSE_OK, <<"stop">>}) of
        {close_response, _Corr, ?RESPONSE_OK} ->
            ok;
        {close_response, _Corr, ResponseCode} ->
            {error, lake_utils:response_code_to_atom(ResponseCode)}
    end.

init([Type, ConnectionArgs]) ->
    case create_connection(Type, ConnectionArgs) of
        E = {error, _} ->
            E;
        {ok, {Socket, _FrameMax, NegotiatedHeartbeat}} ->
            {ok, Heartbeat} = heartbeat_process(NegotiatedHeartbeat),
            {ok,
             #state{correlation_id = 3, % sent two messages to establish the connection
                    socket = Socket,
                    heartbeat = Heartbeat}}
    end.

create_connection(tls, ConnectionArgs) ->
    lake_raw_connection:tls_connect(ConnectionArgs);
create_connection(plain, ConnectionArgs) ->
    lake_raw_connection:connect(ConnectionArgs).

heartbeat_process(NegotiatedHeartbeat) when NegotiatedHeartbeat > 0 ->
    lake_heartbeat:start_link(NegotiatedHeartbeat, self());
heartbeat_process(_) ->
    {ok, undefined}.

handle_call({declare_publisher, Publisher, Stream, PublisherId, PublisherReference},
            From,
            State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    DeclarePublisher = lake_messages:declare_publisher(Corr, Stream, PublisherId, PublisherReference),
    ok = lake_utils:send_message(Socket, DeclarePublisher),
    State1 =
        lake_connection_state:register_pending_request(Corr,
                                                       From,
                                                       {Publisher, PublisherId, Stream},
                                                       State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({publish_async, PublisherId, Messages}, From, State) ->
    Socket = State#state.socket,
    Publish = lake_messages:publish(PublisherId, Messages),
    gen_server:reply(From, ok),
    ok = lake_utils:send_message(Socket, Publish),
    {noreply, State};
handle_call({query_publisher_sequence, PublisherReference, Stream}, From, State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    QueryPublisherSequence = lake_messages:query_publisher_sequence(Corr, PublisherReference, Stream),
    ok = lake_utils:send_message(Socket, QueryPublisherSequence),
    State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({delete_publisher, PublisherId}, From, State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    DeletePublisher = lake_messages:delete_publisher(Corr, PublisherId),
    ok = lake_utils:send_message(Socket, DeletePublisher),
    State1 = lake_connection_state:register_pending_request(Corr, From, PublisherId, State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({credit, SubscriptionId, Credit}, _From, State) ->
    Socket = State#state.socket,
    case State#state.subscriptions of
        #{SubscriptionId := _} ->
            CreditMessage = lake_messages:credit(SubscriptionId, Credit),
            {reply, lake_utils:send_message(Socket, CreditMessage), State};
        _ ->
            {reply, {error, unknown_subscription}, State}
    end;
handle_call({create, Stream, Arguments}, From, State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    Create = lake_messages:create(Corr, Stream, Arguments),
    ok = lake_utils:send_message(Socket, Create),
    State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({delete, Stream}, From, State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    Delete = lake_messages:delete(Corr, Stream),
    ok = lake_utils:send_message(Socket, Delete),
    State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({subscribe, Subscriber, Stream, SubscriptionId, OffsetDefinition, Credit, Properties},
            From,
            State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    Subscribe =
        lake_messages:subscribe(Corr, Stream, SubscriptionId, OffsetDefinition, Credit, Properties),
    ok = lake_utils:send_message(Socket, Subscribe),
    State1 =
        lake_connection_state:register_pending_request(Corr,
                                                       From,
                                                       {Subscriber, SubscriptionId, Stream},
                                                       State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({store_offset, PublisherReference, Stream, Offset}, _From, State) ->
    Socket = State#state.socket,
    StoreOffset = lake_messages:store_offset(PublisherReference, Stream, Offset),
    {reply, lake_utils:send_message(Socket, StoreOffset), State};
handle_call({query_offset, PublisherReference, Stream}, From, State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    QueryOffset = lake_messages:query_offset(Corr, PublisherReference, Stream),
    ok = lake_utils:send_message(Socket, QueryOffset),
    State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({unsubscribe, SubscriptionId}, From, State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    Unsubscribe = lake_messages:unsubscribe(Corr, SubscriptionId),
    ok = lake_utils:send_message(Socket, Unsubscribe),
    State1 = lake_connection_state:register_pending_request(Corr, From, SubscriptionId, State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({metadata, Streams}, From, State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    Metadata = lake_messages:metadata(Corr, Streams),
    ok = lake_utils:send_message(Socket, Metadata),
    State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({close, ResponseCode, Reason}, From, State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    Close = lake_messages:close(Corr, ResponseCode, Reason),
    ok = lake_utils:send_message(Socket, Close),
    State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    %% When shutting down, we need to preemptively stop the heartbeat process. The process might
    %% send a heartbeat request during the time this connection is still alive, but the socket is
    %% already closed. That would result in a crash.
    case State0#state.heartbeat of
        undefined ->
            ok;
        HB ->
            flush_heartbeat_messages(),
            ok = gen_server:stop(HB)
    end,
    {noreply, State2};
handle_call({route, RoutingKey, SuperStream}, From, State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    Frame = lake_messages:route(Corr, RoutingKey, SuperStream),
    ok = lake_utils:send_message(Socket, Frame),
    State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({partitions, SuperStream}, From, State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    Frame = lake_messages:partitions(Corr, SuperStream),
    ok = lake_utils:send_message(Socket, Frame),
    State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({stream_stats, SuperStream}, From, State0) ->
    Corr = State0#state.correlation_id,
    Socket = State0#state.socket,
    Frame = lake_messages:stream_stats(Corr, SuperStream),
    ok = lake_utils:send_message(Socket, Frame),
    State1 = lake_connection_state:register_pending_request(Corr, From, [], State0),
    State2 = lake_connection_state:inc_correlation_id(State1),
    {noreply, State2};
handle_call({consumer_update_response, Corr, ResponseCode, OffsetSpecification}, _From, State) ->
    Socket = State#state.socket,
    Metadata = lake_messages:consumer_update_response(Corr, ResponseCode, OffsetSpecification),
    ok = lake_utils:send_message(Socket, Metadata),
    {reply, ok, State};
handle_call({debug, forward, Message}, _From, State = #state{socket = {Module, Socket}}) ->
    %% This call is meant for testing.
    {reply, Module:send(Socket, Message), State};
handle_call(Call, _From, State) ->
    {reply, {unknown, Call}, State}.

handle_cast(heartbeat, State) ->
    Socket = State#state.socket,
    Heartbeat = lake_messages:heartbeat(),
    ok = lake_utils:send_message(Socket, Heartbeat),
    {noreply, State};
handle_cast(_Cast, State) ->
    {noreply, State}.

%% FIXME can we avoid parsing here? Only determine where the message should end up and forward_and_reply it to the owner
%% This might require to copy the binary slices (if the buffer holds multiple messages) - a large message might be referenced by multiple subscribers.
handle_info({tcp, Socket, Packet}, State = #state{socket = {_Module, Socket}}) ->
    inet:setopts(Socket, [{active, once}]),
    handle_packet(Packet, State);
handle_info({ssl, Socket, Packet}, State = #state{socket = {_Module, Socket}}) ->
    ssl:setopts(Socket, [{active, once}]),
    handle_packet(Packet, State);
handle_info({tcp_closed, Port}, State = #state{socket = {_Module, Port}}) ->
    {stop, normal, State};
handle_info({ssl_closed, Port}, State = #state{socket = {_Module, Port}}) ->
    {stop, normal, State}.

handle_packet(Packet, State0 = #state{rx_buf = Buf0}) ->
    {Messages, BufRest} = split_into_messages(<<Buf0/binary, Packet/binary>>),
    Socket = State0#state.socket,
    State1 = add_new_subscriptions(Messages, State0),
    State2 = add_new_publishers(Messages, State1),
    forward_and_reply(Messages, State2),
    State3 = clean_publishers(Messages, State2),
    State4 = clean_subscriptions(Messages, State3),
    State5 = clean_pending_requests(Messages, State4),
    State6 = State5#state{rx_buf = BufRest},
    %% Check if the remote told us to close the connection
    case lists:keyfind(close, 1, Messages) of
        {close, Corr, _ResponseCode, Reason} ->
            CloseResponse = lake_messages:close_response(Corr, ?RESPONSE_OK),
            ok = lake_utils:send_message(Socket, CloseResponse),
            {stop, {close, Reason}, State6};
        false ->
            {noreply, State6}
    end.

terminate(Reason, _State) ->
    Reason.

split_into_messages(Buffer) ->
    split_into_messages(Buffer, []).

split_into_messages(<<Size:32, Buf:Size/binary, Rest/binary>>, Acc) ->
    split_into_messages(Rest, [lake_messages:parse(Buf) | Acc]);
split_into_messages(Rest, Acc) ->
    {lists:reverse(Acc), Rest}.

add_new_subscriptions(Messages, State) ->
    lists:foldl(fun add_new_subscriptions1/2, State, Messages).

add_new_subscriptions1({subscribe_response, Corr, ?RESPONSE_OK}, State0) ->
    #state{pending_requests = #{Corr := {_From, {Subscriber, SubscriptionId, Stream}}}} = State0,
    lake_connection_state:add_subscription(SubscriptionId, Subscriber, Stream, State0);
add_new_subscriptions1({subscribe_response, _Corr, _ResponseCode}, State) ->
    State;
add_new_subscriptions1(_Other, State) ->
    State.

add_new_publishers(Messages, State) ->
    lists:foldl(fun add_new_publishers1/2, State, Messages).

add_new_publishers1({declare_publisher_response, Corr, ?RESPONSE_OK}, State0) ->
    #state{pending_requests = #{Corr := {_From, {Publisher, PublisherId, Stream}}}} = State0,
    lake_connection_state:add_publisher(PublisherId, Publisher, Stream, State0);
add_new_publishers1({declare_publisher_response, _Corr, _}, State) ->
    State;
add_new_publishers1(_Other, State) ->
    State.

forward_and_reply(Messages, State) ->
    lists:foreach(fun(Message) -> forward_and_reply1(recipient_from_message(Message, State), Message)
                  end,
                  Messages).

forward_and_reply1({send, Pid}, Message) ->
    Pid ! Message;
forward_and_reply1({reply, From}, Message) ->
    gen_server:reply(From, Message);
forward_and_reply1({cast, Pid}, Message) ->
    gen_server:cast(Pid, Message);
forward_and_reply1(ignore, _Message) ->
    ok;
forward_and_reply1(List, Message) when is_list(List) ->
    lists:foreach(fun(Recipient) -> forward_and_reply1(Recipient, Message) end, List).

recipient_from_message({declare_publisher_response, Corr, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({publish_confirm, PublisherId, _, _}, State) ->
    recipient_from_publishers(PublisherId, State);
recipient_from_message({publish_error, PublisherId, _, _}, State) ->
    recipient_from_publishers(PublisherId, State);
recipient_from_message({query_publisher_sequence_response, Corr, _, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({delete_publisher_response, Corr, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({subscribe_response, Corr, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({deliver, SubscriptionId, _}, State) ->
    recipient_from_subscriptions(SubscriptionId, State);
recipient_from_message({deliver_v2, SubscriptionId, _, _}, State) ->
    recipient_from_subscriptions(SubscriptionId, State);
recipient_from_message({credit_response, SubscriptionId, _}, State) ->
    recipient_from_subscriptions(SubscriptionId, State);
recipient_from_message({query_offset_response, Corr, _, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({unsubscribe_response, Corr, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({create_response, Corr, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({delete_response, Corr, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({metadata_response, Corr, _, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({metadata_update, _, Stream}, State) ->
    recipients_from_stream(Stream, State);
recipient_from_message({close_response, Corr, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({close, _, _, _}, _) ->
    %% RabbitMQ sent us a message to close the connection; we need to handle this separately.
    [];
recipient_from_message({heartbeat}, State) ->
    case State#state.heartbeat of
        undefined ->
            ignore;
        Pid ->
            {cast, Pid}
    end;
recipient_from_message({route_response, Corr, _, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({partitions_response, Corr, _, _}, State) ->
    recipient_from_corr(Corr, State);
recipient_from_message({consumer_update, _Corr, SubscriptionId, _}, State) ->
    recipient_from_subscriptions(SubscriptionId, State);
recipient_from_message({stream_stats_response, Corr, _, _}, State) ->
    recipient_from_corr(Corr, State).

recipient_from_corr(Corr, State) ->
    From = lake_connection_state:requestor_from_correlation_id(Corr, State),
    {reply, From}.

recipient_from_publishers(PublisherId, State) ->
    Publisher = lake_connection_state:publisher_from_publisher_id(PublisherId, State),
    {send, Publisher}.

recipient_from_subscriptions(SubscriptionId, State) ->
    Subscriber = lake_connection_state:subscriber_from_subscription_id(SubscriptionId, State),
    {send, Subscriber}.

recipients_from_stream(Stream, State) ->
    Subscribers = lake_connection_state:stream_subscriptions(Stream, State),
    Publishers = lake_connection_state:stream_publishers(Stream, State),
    [{send, Pid} || Pid <- Subscribers ++ Publishers].

clean_publishers(Messages, State) ->
    lists:foldl(fun clean_publishers1/2, State, Messages).

clean_publishers1({delete_publisher_response, Corr, ?RESPONSE_OK}, State0) ->
    #state{pending_requests = #{Corr := {_From, PublisherId}}} = State0,
    lake_connection_state:remove_publisher(PublisherId, State0);
clean_publishers1({delete_publisher_response, _Corr, _}, State) ->
    State;
clean_publishers1({metadata_update, ?RESPONSE_OK, _Stream}, _State) ->
    exit("MetadataUpdate with ?RESPONSE_OK not handled");
clean_publishers1({metadata_update, _, Stream}, State) ->
    PublisherIds = lake_connection_state:publisher_ids_from_stream(Stream, State),
    lists:foldl(fun lake_connection_state:remove_publisher/2, State, PublisherIds);
clean_publishers1(_Other, State) ->
    State.

clean_subscriptions(Messages, State) ->
    lists:foldl(fun clean_subscriptions1/2, State, Messages).

clean_subscriptions1({unsubscribe_response, Corr, ?RESPONSE_OK}, State0) ->
    #state{pending_requests = #{Corr := {_From, SubscriptionId}}} = State0,
    lake_connection_state:remove_subscription(SubscriptionId, State0);
clean_subscriptions1({unsubscribe_response, _Corr, _}, _State) ->
    exit("UnsubscribeResponse without ?RESPONSE_OK not handled");
clean_subscriptions1({metadata_update, ?RESPONSE_OK, _Stream}, _State) ->
    exit("MetadataUpdate with ?RESPONSE_OK not handled");
clean_subscriptions1({metadata_update, _, Stream}, State) ->
    SubscriptionIds = lake_connection_state:subscription_ids_from_stream(Stream, State),
    lists:foldl(fun lake_connection_state:remove_subscription/2, State, SubscriptionIds);
clean_subscriptions1(_Other, State) ->
    State.

clean_pending_requests(Messages, State) ->
    lists:foldl(fun clean_pending_requests1/2, State, Messages).

clean_pending_requests1(Message, State) ->
    case lake_messages:message_to_correlation_id(Message) of
        {ok, Corr} ->
            lake_connection_state:remove_pending_request(Corr, State);
        {error, _} ->
            State
    end.

flush_heartbeat_messages() ->
    receive
        {'$gen_cast', heartbeat} ->
            flush_heartbeat_messages()
    after 0 ->
        ok
    end.