src/lake_messages.erl

-module(lake_messages).

-export([parse/1,
         message_to_correlation_id/1]).
-export([peer_properties/2,
         sasl_handshake/1,
         sasl_authenticate/4,
         open/2,
         close/3,
         close_response/2,
         tune/2,
         declare_publisher/4,
         publish/2,
         query_publisher_sequence/3,
         delete_publisher/2,
         credit/2,
         create/3,
         delete/2,
         subscribe/6,
         store_offset/3,
         query_offset/3,
         unsubscribe/2,
         metadata/2,
         heartbeat/0,
         route/3,
         partitions/2,
         stream_stats/2,
         command_versions/0,
         exchange_command_versions/1,
         consumer_update_response/3]).
-export([chunk_to_messages/1]).

-include("response_codes.hrl").

-define(REQUEST, 0).
-define(RESPONSE, 1).
-define(VERSION_1, 1).
-define(VERSION_2, 2).
-define(MIN_COMMAND_INDEX, 1).
-define(DECLARE_PUBLISHER, 1).
-define(PUBLISH, 2).
-define(PUBLISH_CONFIRM, 3).
-define(PUBLISH_ERROR, 4).
-define(QUERY_PUBLISHER_SEQUENCE, 5).
-define(DELETE_PUBLISHER, 6).
-define(SUBSCRIBE, 7).
-define(DELIVER, 8).
-define(CREDIT, 9).
-define(STORE_OFFSET, 10).
-define(QUERY_OFFSET, 11).
-define(UNSUBSCRIBE, 12).
-define(CREATE, 13).
-define(DELETE, 14).
-define(METADATA, 15).
-define(METADATA_UPDATE, 16).
-define(PEER_PROPERTIES, 17).
-define(SASL_HANDSHAKE, 18).
-define(SASL_AUTHENTICATE, 19).
-define(TUNE, 20).
-define(OPEN, 21).
-define(CLOSE, 22).
-define(HEARTBEAT, 23).
-define(ROUTE, 24).
-define(PARTITIONS, 25).
-define(CONSUMER_UPDATE, 26).
-define(EXCHANGE_COMMAND_VERSIONS, 27).
-define(STREAM_STATS, 28).
-define(MAX_COMMAND_INDEX, 28). % Increase this when adding a new command
-define(OFFSET_TYPE_NONE, 0).
-define(OFFSET_TYPE_FIRST, 1).
-define(OFFSET_TYPE_LAST, 2).
-define(OFFSET_TYPE_NEXT, 3).
-define(OFFSET_TYPE_OFFSET, 4).
-define(OFFSET_TYPE_TIMESTAMP, 5).

parse(<<?RESPONSE:1, ?DECLARE_PUBLISHER:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
    {declare_publisher_response, Corr, ResponseCode};
parse(<<?PUBLISH_CONFIRM:16,
        ?VERSION_1:16,
        PublisherId:8,
        PublishingIdCount:32,
        PublishingIds/binary>>) ->
    {publish_confirm, PublisherId, PublishingIdCount, parse_list_of_longs(PublishingIds, [])};
parse(<<?PUBLISH_ERROR:16, ?VERSION_1:16, PublisherId:8, PublishingIdCount:32, Details/binary>>) ->
    ErrorById = [{Id, Code} || <<Id:64, Code:16>> <= Details],
    {publish_error, PublisherId, PublishingIdCount, ErrorById};
parse(<<?RESPONSE:1,
        ?QUERY_PUBLISHER_SEQUENCE:15,
        ?VERSION_1:16,
        Corr:32,
        ResponseCode:16,
        Sequence:64>>) ->
    {query_publisher_sequence_response, Corr, ResponseCode, Sequence};
parse(<<?RESPONSE:1, ?DELETE_PUBLISHER:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
    {delete_publisher_response, Corr, ResponseCode};
parse(<<?RESPONSE:1, ?CREDIT:15, ?VERSION_1:16, ResponseCode:16, SubscriptionId:8>>) ->
    {credit_response, SubscriptionId, ResponseCode};
parse(<<?RESPONSE:1, ?SUBSCRIBE:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
    {subscribe_response, Corr, ResponseCode};
parse(<<?DELIVER:16, ?VERSION_1:16, SubscriptionId:8, OsirisChunk/binary>>) ->
    {deliver, SubscriptionId, OsirisChunk};
parse(<<?DELIVER:16, ?VERSION_2:16, SubscriptionId:8, CommittedChunkId:64, OsirisChunk/binary>>) ->
    {deliver_v2, SubscriptionId, CommittedChunkId, OsirisChunk};
parse(<<?RESPONSE:1, ?QUERY_OFFSET:15, ?VERSION_1:16, Corr:32, ResponseCode:16, Offset:64>>) ->
    {query_offset_response, Corr, ResponseCode, Offset};
parse(<<?RESPONSE:1, ?UNSUBSCRIBE:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
    {unsubscribe_response, Corr, ResponseCode};
parse(<<?RESPONSE:1, ?CREATE:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
    {create_response, Corr, ResponseCode};
parse(<<?RESPONSE:1, ?DELETE:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
    {delete_response, Corr, ResponseCode};
parse(<<?RESPONSE:1, ?METADATA:15, ?VERSION_1:16, Corr:32, Metadata0/binary>>) ->
    <<NumEndpoints:32, EndpointsAndMetadata/binary>> = Metadata0,
    {Endpoints, Rest} = parse_endpoints(NumEndpoints, EndpointsAndMetadata),
    StreamsMetadata = parse_streams_metadata(Rest),
    {metadata_response, Corr, Endpoints, StreamsMetadata};
parse(<<?METADATA_UPDATE:16,
        ?VERSION_1:16,
        ResponseCode:16,
        StreamSize:16,
        Stream:StreamSize/binary>>) ->
    {metadata_update, ResponseCode, Stream};
parse(<<?RESPONSE:1,
        ?PEER_PROPERTIES:15,
        ?VERSION_1:16,
        Corr:32,
        ResponseCode:16,
        _PeerPropertiesCount:32,
        PeerProperties/binary>>) ->
    {peer_properties_response, Corr, ResponseCode, parse_map_binary_values(PeerProperties)};
parse(<<?RESPONSE:1,
        ?SASL_HANDSHAKE:15,
        ?VERSION_1:16,
        Corr:32,
        ResponseCode:16,
        _MechanismsCount:32,
        Mechanisms/binary>>) ->
    {sasl_handshake_response, Corr, ResponseCode, parse_list_of_strings(Mechanisms)};
parse(<<?RESPONSE:1,
        ?SASL_AUTHENTICATE:15,
        ?VERSION_1:16,
        Corr:32,
        ResponseCode:16,
        SaslOpaque/binary>>) ->
    {sasl_authenticate_response, Corr, ResponseCode, SaslOpaque};
parse(<<?REQUEST:1, ?TUNE:15, ?VERSION_1:16, FrameMax:32, Heartbeat:32>>) ->
    {tune, FrameMax, Heartbeat};
parse(<<?RESPONSE:1,
        ?OPEN:15,
        ?VERSION_1:16,
        Corr:32,
        ?RESPONSE_OK:16,
        _ConnectionPropertiesCount:32,
        ConnectionProperties/binary>>) ->
    {open_response, Corr, ?RESPONSE_OK, parse_map_binary_values(ConnectionProperties)};
parse(<<?RESPONSE:1, ?CLOSE:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
    {close_response, Corr, ResponseCode};
parse(<<?REQUEST:1,
        ?CLOSE:15,
        ?VERSION_1:16,
        Corr:32,
        ResponseCode:16,
        ReasonSize:16,
        Reason:ReasonSize/binary>>) ->
    {close, Corr, ResponseCode, Reason};
parse(<<?RESPONSE:1, ?OPEN:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
    {open_response, Corr, ResponseCode};
parse(<<?HEARTBEAT:16, ?VERSION_1:16>>) ->
    {heartbeat};
parse(<<?RESPONSE:1,
        ?ROUTE:15,
        ?VERSION_1:16,
        Corr:32,
        ?RESPONSE_OK:16,
        _StreamsCount:32,
        Streams/binary>>) ->
    {route_response, Corr, ?RESPONSE_OK, parse_list_of_strings(Streams)};
parse(<<?RESPONSE:1, ?ROUTE:15, ?VERSION_1:16, Corr:32, ResponseCode:16, _/binary>>) ->
    {route_response, Corr, ResponseCode, []};
parse(<<?RESPONSE:1,
        ?PARTITIONS:15,
        ?VERSION_1:16,
        Corr:32,
        ?RESPONSE_OK:16,
        _StreamsCount:32,
        Streams/binary>>) ->
    {partitions_response, Corr, ?RESPONSE_OK, parse_list_of_strings(Streams)};
parse(<<?RESPONSE:1, ?PARTITIONS:15, ?VERSION_1:16, Corr:32, ResponseCode:16, _/binary>>) ->
    {partitions_response, Corr, ResponseCode, []};
parse(<<?REQUEST:1, ?CONSUMER_UPDATE:15, ?VERSION_1:16, Corr:32, SubscriptionId:8, Active:8>>) ->
    {consumer_update, Corr, SubscriptionId, Active == <<"1">>};
parse(<<?RESPONSE:1,
        ?EXCHANGE_COMMAND_VERSIONS:15,
        ?VERSION_1:16,
        Corr:32,
        ?RESPONSE_OK:16,
        _CommandsCount:32,
        CommandVersions/binary>>) ->
    {exchange_command_versions_response, Corr, ?RESPONSE_OK, parse_command_versions(CommandVersions)};
parse(<<?RESPONSE:1,
        ?EXCHANGE_COMMAND_VERSIONS:15,
        ?VERSION_1:16,
        Corr:32,
        ResponseCode:16,
        _/binary>>) ->
    {exchange_command_versions_response, Corr, ResponseCode, []};
parse(<<?RESPONSE:1,
        ?STREAM_STATS:15,
        ?VERSION_1:16,
        Corr:32,
        ?RESPONSE_OK:16,
        _StreamStatsCount:32,
        StreamStats/binary>>) ->
    {stream_stats_response, Corr, ?RESPONSE_OK, parse_stream_stats(StreamStats)};
parse(<<?RESPONSE:1, ?STREAM_STATS:15, ?VERSION_1:16, Corr:32, ResponseCode:16, _/binary>>) ->
    {stream_stats_response, Corr, ResponseCode, []};
parse(Unknown) ->
    {error, {unknown, Unknown}}.

-define(OSIRIS_MAGIC, 5).
-define(OSIRIS_VERSION_1, 0).
-define(OSIRIS_CHUNK_TYPE_USER, 0).

chunk_to_messages(<<?OSIRIS_MAGIC:4,
                    ?OSIRIS_VERSION_1:4,
                    ?OSIRIS_CHUNK_TYPE_USER:8,
                    NumberOfEntries:16,
                    NumberOfRecords:32,
                    Timestamp:64,
                    Epoch:64,
                    ChunkId:64,
                    DataCRC:32,
                    DataLength:32,
                    _TrailerLength:32,
                    _Reserved:32,
                    Data:DataLength/binary,
                    _Trailer/binary>>) ->
    case erlang:crc32(Data) of
        DataCRC ->
            %% FIXME why isn't Trailer of length TrailerLength?
            Messages = parse_data(Data, []),
            Info =
                #{chunk_id => ChunkId,
                  number_of_entries => NumberOfEntries,
                  number_of_records => NumberOfRecords,
                  timestamp => Timestamp,
                  epoch => Epoch},
            {ok, {Messages, Info}};
        MismatchingCRC ->
            {error, {crc_mismatch, [{expected, DataCRC}, {received, MismatchingCRC}]}}
    end;
chunk_to_messages(Other) ->
    {error, {invalid_osiris_chunk, Other}}.

parse_data(<<>>, Acc) ->
    lists:reverse(Acc);
parse_data(<<0:1, Size:31, Data:Size/binary, Rest/binary>>, Acc) ->
    parse_data(Rest, [Data | Acc]).

peer_properties(CorrelationId, Properties) ->
    PropertiesCount = length(Properties),
    EncodedProperties = encode_keywords(Properties),
    <<?REQUEST:1,
      ?PEER_PROPERTIES:15,
      ?VERSION_1:16,
      CorrelationId:32,
      PropertiesCount:32,
      EncodedProperties/binary>>.

sasl_handshake(CorrelationId) ->
    <<?REQUEST:1, ?SASL_HANDSHAKE:15, ?VERSION_1:16, CorrelationId:32>>.

tune(FrameMax, Heartbeat) ->
    <<?REQUEST:1, ?TUNE:15, ?VERSION_1:16, FrameMax:32, Heartbeat:32>>.

sasl_authenticate(CorrelationId, Mechanism = <<"PLAIN">>, User, Password) ->
    MechanismSize = byte_size(Mechanism),
    Fragment = <<0:8, User/binary, 0:8, Password/binary>>,
    FragmentSize = byte_size(Fragment),
    <<?REQUEST:1,
      ?SASL_AUTHENTICATE:15,
      ?VERSION_1:16,
      CorrelationId:32,
      MechanismSize:16,
      Mechanism:MechanismSize/binary,
      FragmentSize:32,
      Fragment:FragmentSize/binary>>.

open(CorrelationId, Vhost) ->
    VhostSize = byte_size(Vhost),
    <<?REQUEST:1, ?OPEN:15, ?VERSION_1:16, CorrelationId:32, VhostSize:16, Vhost:VhostSize/binary>>.

close(CorrelationId, ResponseCode, Reason) ->
    ReasonSize = byte_size(Reason),
    <<?REQUEST:1,
      ?CLOSE:15,
      ?VERSION_1:16,
      CorrelationId:32,
      ResponseCode:16,
      ReasonSize:16,
      Reason/binary>>.

close_response(CorrelationId, ResponseCode) ->
    <<?RESPONSE:1, ?CLOSE:15, ?VERSION_1:16, CorrelationId:32, ResponseCode:16>>.

declare_publisher(CorrelationId, Stream, PublisherId, PublisherReference) ->
    StreamSize = byte_size(Stream),
    PublisherReferenceSize = byte_size(PublisherReference),
    <<?REQUEST:1,
      ?DECLARE_PUBLISHER:15,
      ?VERSION_1:16,
      CorrelationId:32,
      PublisherId:8,
      PublisherReferenceSize:16,
      PublisherReference:PublisherReferenceSize/binary,
      StreamSize:16,
      Stream:StreamSize/binary>>.

publish(PublisherId, Messages) ->
    MessageCount = length(Messages),
    EncodedMessages = encode_messages(Messages),
    <<?PUBLISH:16, ?VERSION_1:16, PublisherId:8, MessageCount:32, EncodedMessages/binary>>.

query_publisher_sequence(CorrelationId, PublisherReference, Stream) ->
    PublisherReferenceSize = byte_size(PublisherReference),
    StreamSize = byte_size(Stream),
    <<?REQUEST:1,
      ?QUERY_PUBLISHER_SEQUENCE:15,
      ?VERSION_1:16,
      CorrelationId:32,
      PublisherReferenceSize:16,
      PublisherReference:PublisherReferenceSize/binary,
      StreamSize:16,
      Stream:StreamSize/binary>>.

delete_publisher(CorrelationId, PublisherId) ->
    <<?REQUEST:1, ?DELETE_PUBLISHER:15, ?VERSION_1:16, CorrelationId:32, PublisherId:8>>.

credit(SubscriptionId, Credit) ->
    <<?REQUEST:1, ?CREDIT:15, ?VERSION_1:16, SubscriptionId:8, Credit:16>>.

encode_messages(Messages) ->
    encode_messages(Messages, <<>>).

encode_messages([], Acc) ->
    Acc;
encode_messages([{Id, Message} | Rest], Acc) when is_integer(Id), is_binary(Message) ->
    Size = byte_size(Message),
    encode_messages(Rest, <<Acc/binary, Id:64, 0:1, Size:31, Message:Size/binary>>).

to_offset_bin(OffsetSpecification) ->
    case OffsetSpecification of
        %% FIXME why no OFFSET_TYPE_NONE?
        first ->
            <<?OFFSET_TYPE_FIRST:16>>;
        last ->
            <<?OFFSET_TYPE_LAST:16>>;
        next ->
            <<?OFFSET_TYPE_NEXT:16>>;
        {offset, Offset} ->
            <<?OFFSET_TYPE_OFFSET:16, Offset:64>>;
        {timestamp, Offset} ->
            <<?OFFSET_TYPE_TIMESTAMP:16, Offset:64/signed>>
    end.

subscribe(CorrelationId, Stream, SubscriptionId, OffsetSpecification, Credit, Properties) ->
    StreamSize = byte_size(Stream),
    OffsetBin = to_offset_bin(OffsetSpecification),
    EncodedProperties = encode_keywords(Properties),
    PropertiesCount = length(Properties),
    <<?REQUEST:1,
      ?SUBSCRIBE:15,
      ?VERSION_1:16,
      CorrelationId:32,
      SubscriptionId:8,
      StreamSize:16,
      Stream:StreamSize/binary,
      OffsetBin/binary,
      Credit:16,
      PropertiesCount:32,
      EncodedProperties/binary>>.

store_offset(PublisherReference, Stream, Offset) ->
    PublisherReferenceSize = byte_size(PublisherReference),
    StreamSize = byte_size(Stream),
    <<?STORE_OFFSET:16,
      ?VERSION_1:16,
      PublisherReferenceSize:16,
      PublisherReference:PublisherReferenceSize/binary,
      StreamSize:16,
      Stream:StreamSize/binary,
      Offset:64>>.

query_offset(CorrelationId, PublisherReference, Stream) ->
    PublisherReferenceSize = byte_size(PublisherReference),
    StreamSize = byte_size(Stream),

    <<?REQUEST:1,
      ?QUERY_OFFSET:15,
      ?VERSION_1:16,
      CorrelationId:32,
      PublisherReferenceSize:16,
      PublisherReference:PublisherReferenceSize/binary,
      StreamSize:16,
      Stream:StreamSize/binary>>.

unsubscribe(CorrelationId, SubscriptionId) ->
    <<?REQUEST:1, ?UNSUBSCRIBE:15, ?VERSION_1:16, CorrelationId:32, SubscriptionId:8>>.

create(CorrelationId, Stream, Arguments0) ->
    StreamSize = byte_size(Stream),
    ArgumentsCount = length(Arguments0),
    Arguments = encode_keywords(Arguments0),
    <<?REQUEST:1,
      ?CREATE:15,
      ?VERSION_1:16,
      CorrelationId:32,
      StreamSize:16,
      Stream:StreamSize/binary,
      ArgumentsCount:32,
      Arguments/binary>>.

delete(CorrelationId, Stream) ->
    StreamSize = byte_size(Stream),
    <<?REQUEST:1,
      ?DELETE:15,
      ?VERSION_1:16,
      CorrelationId:32,
      StreamSize:16,
      Stream:StreamSize/binary>>.

metadata(CorrelationId, Streams) ->
    StreamsCount = length(Streams),
    EncodedStreams = encode_list_of_strings(Streams),
    <<?REQUEST:1,
      ?METADATA:15,
      ?VERSION_1:16,
      CorrelationId:32,
      StreamsCount:32,
      EncodedStreams/binary>>.

heartbeat() ->
    <<?HEARTBEAT:16, ?VERSION_1:16>>.

route(CorrelationId, RoutingKey, SuperStream) ->
    RoutingKeySize = byte_size(RoutingKey),
    SuperStreamSize = byte_size(SuperStream),
    <<?REQUEST:1,
      ?ROUTE:15,
      ?VERSION_1:16,
      CorrelationId:32,
      RoutingKeySize:16,
      RoutingKey/binary,
      SuperStreamSize:16,
      SuperStream/binary>>.

partitions(CorrelationId, SuperStream) ->
    SuperStreamSize = byte_size(SuperStream),
    <<?REQUEST:1,
      ?PARTITIONS:15,
      ?VERSION_1:16,
      CorrelationId:32,
      SuperStreamSize:16,
      SuperStream/binary>>.

stream_stats(CorrelationId, Stream) ->
    StreamSize = byte_size(Stream),
    <<?REQUEST:1, ?STREAM_STATS:15, ?VERSION_1:16, CorrelationId:32, StreamSize:16, Stream/binary>>.

command_versions() ->
    maps:from_list([{Command, version_range_by_id(Command)}
                    || Command <- lists:seq(?MIN_COMMAND_INDEX, ?MAX_COMMAND_INDEX)]).

version_range_by_id(?DELIVER) ->
    {?VERSION_1, ?VERSION_2};
version_range_by_id(_) ->
    {?VERSION_1, ?VERSION_1}.

exchange_command_versions(CorrelationId) ->
    CommandsBinary =
        << <<Command:16, MinVersion:16, MaxVersion:16>>
           || {Command, {MinVersion, MaxVersion}} <- maps:to_list(command_versions()) >>,
    CommandsCount = maps:size(command_versions()),
    <<?REQUEST:1,
      ?EXCHANGE_COMMAND_VERSIONS:15,
      ?VERSION_1:16,
      CorrelationId:32,
      CommandsCount:32,
      CommandsBinary/binary>>.

consumer_update_response(CorrelationId, ResponseCode, OffsetSpecification) ->
    OffsetBin = to_offset_bin(OffsetSpecification),
    <<?RESPONSE:1,
      ?CONSUMER_UPDATE:15,
      ?VERSION_1:16,
      CorrelationId:32,
      ResponseCode:16,
      OffsetBin/binary>>.

parse_map_binary_values(Bin) when is_binary(Bin) ->
    parse_map_binary_values(Bin, #{}).

parse_map_binary_values(<<>>, Acc) ->
    Acc;
parse_map_binary_values(<<KeySize:16,
                          Key:KeySize/binary,
                          ValueSize:16,
                          Value:ValueSize/binary,
                          Rest/binary>>,
                        Acc) ->
    parse_map_binary_values(Rest, Acc#{Key => Value}).

parse_command_versions(Bin) when is_binary(Bin) ->
    parse_command_versions(Bin, #{}).

parse_command_versions(<<>>, Acc) ->
    Acc;
parse_command_versions(<<Key:16, MinVersion:16, MaxVersion:16, Rest/binary>>, Acc) ->
    parse_command_versions(Rest, Acc#{Key => {MinVersion, MaxVersion}}).

parse_stream_stats(Bin) when is_binary(Bin) ->
    parse_stream_stats(Bin, #{}).

parse_stream_stats(<<>>, Acc) ->
    Acc;
parse_stream_stats(<<KeySize:16, Key:KeySize/binary, Value:64/signed, Rest/binary>>, Acc) ->
    parse_stream_stats(Rest, Acc#{Key => Value}).

parse_list_of_strings(Bin) ->
    parse_list_of_strings(Bin, []).

parse_list_of_strings(<<>>, Acc) ->
    lists:reverse(Acc);
parse_list_of_strings(<<Size:16, String:Size/binary, Rest/binary>>, Acc) ->
    parse_list_of_strings(Rest, [String | Acc]).

parse_list_of_longs(<<>>, Acc) ->
    lists:reverse(Acc);
parse_list_of_longs(<<Id:64, Rest/binary>>, Acc) ->
    parse_list_of_longs(Rest, [Id | Acc]).

encode_keywords(Keywords) ->
    encode_keywords(Keywords, <<>>).

encode_keywords([], Acc) ->
    Acc;
encode_keywords([{Key, Value} | Rest], Acc) ->
    SizeKey = byte_size(Key),
    SizeValue = byte_size(Value),
    encode_keywords(Rest, <<Acc/binary, SizeKey:16, Key/binary, SizeValue:16, Value/binary>>).

encode_list_of_strings(List) ->
    encode_list_of_strings(List, <<>>).

encode_list_of_strings([], Acc) ->
    Acc;
encode_list_of_strings([Bin | Rest], Acc) ->
    Size = byte_size(Bin),
    encode_list_of_strings(Rest, <<Acc/binary, Size:16, Bin:Size/binary>>).

parse_endpoints(NumEndpoints, Bin) ->
    parse_endpoints(NumEndpoints, Bin, []).

parse_endpoints(0, Rest, Endpoints) ->
    {Endpoints, Rest};
parse_endpoints(Cnt,
                <<Index:16, HostLength:16, Host:HostLength/binary, Port:32, Rest/binary>>,
                Acc) ->
    EndpointMetadata =
        #{index => Index,
          host => Host,
          port => Port},
    parse_endpoints(Cnt - 1, Rest, [EndpointMetadata | Acc]).

parse_streams_metadata(<<NumStreams:32, Meta/binary>>) ->
    parse_streams_metadata(NumStreams, Meta, []).

parse_streams_metadata(0, <<>>, Acc) ->
    Acc;
%% Error case
parse_streams_metadata(NumStreams,
                       <<StreamLength:16,
                         Stream:StreamLength/binary,
                         Code:16,
                         (-1):16/signed,
                         0:32,
                         Rest/binary>>,
                       Acc) ->
    StreamMetadata = #{stream => Stream, code => Code},
    parse_streams_metadata(NumStreams - 1, Rest, [StreamMetadata | Acc]);
%% Success case
parse_streams_metadata(NumStreams,
                       <<StreamLength:16,
                         Stream:StreamLength/binary,
                         ?RESPONSE_OK:16,
                         LeaderIndex:16,
                         ReplicasCount:32,
                         ReplicasAndRest/binary>>,
                       Acc) ->
    ReplicasSize = ReplicasCount * 2,
    <<ReplicasBin:ReplicasSize/binary, Rest/binary>> = ReplicasAndRest,
    Replicas = [EndpointIndex || <<EndpointIndex:16>> <= ReplicasBin],
    StreamMetadata =
        #{stream => Stream,
          leader_index => LeaderIndex,
          replicas_count => ReplicasCount,
          replicas => Replicas},
    parse_streams_metadata(NumStreams - 1, Rest, [StreamMetadata | Acc]).

message_to_correlation_id({declare_publisher_response, Corr, _}) ->
    {ok, Corr};
message_to_correlation_id({query_publisher_sequence, Corr, _, _}) ->
    {ok, Corr};
message_to_correlation_id({delete_publisher_response, Corr, _}) ->
    {ok, Corr};
message_to_correlation_id({subscribe_response, Corr, _}) ->
    {ok, Corr};
message_to_correlation_id({query_offset_response, Corr, _, _}) ->
    {ok, Corr};
message_to_correlation_id({unsubscribe_response, Corr, _}) ->
    {ok, Corr};
message_to_correlation_id({create_response, Corr, _}) ->
    {ok, Corr};
message_to_correlation_id({delete_response, Corr, _}) ->
    {ok, Corr};
message_to_correlation_id({metadata_response, Corr, _, _}) ->
    {ok, Corr};
message_to_correlation_id({route_response, Corr, _, _}) ->
    {ok, Corr};
message_to_correlation_id(Other) ->
    {error, {no_correlation_id, Other}}.