-module(lake_messages).
-export([parse/1, message_to_correlation_id/1]).
-export([
peer_properties/2,
sasl_handshake/1,
sasl_authenticate/4,
open/2,
close/3,
close_response/2,
tune/2,
declare_publisher/4,
publish/2,
query_publisher_sequence/3,
delete_publisher/2,
credit/2,
create/3,
delete/2,
subscribe/6,
store_offset/3,
query_offset/3,
unsubscribe/2,
metadata/2,
heartbeat/0,
route/3,
partitions/2,
stream_stats/2,
command_versions/0,
exchange_command_versions/1,
consumer_update_response/3
]).
-export([chunk_to_messages/1]).
-include("response_codes.hrl").
-define(REQUEST, 0).
-define(RESPONSE, 1).
-define(VERSION_1, 1).
-define(VERSION_2, 2).
-define(MIN_COMMAND_INDEX, 1).
-define(DECLARE_PUBLISHER, 1).
-define(PUBLISH, 2).
-define(PUBLISH_CONFIRM, 3).
-define(PUBLISH_ERROR, 4).
-define(QUERY_PUBLISHER_SEQUENCE, 5).
-define(DELETE_PUBLISHER, 6).
-define(SUBSCRIBE, 7).
-define(DELIVER, 8).
-define(CREDIT, 9).
-define(STORE_OFFSET, 10).
-define(QUERY_OFFSET, 11).
-define(UNSUBSCRIBE, 12).
-define(CREATE, 13).
-define(DELETE, 14).
-define(METADATA, 15).
-define(METADATA_UPDATE, 16).
-define(PEER_PROPERTIES, 17).
-define(SASL_HANDSHAKE, 18).
-define(SASL_AUTHENTICATE, 19).
-define(TUNE, 20).
-define(OPEN, 21).
-define(CLOSE, 22).
-define(HEARTBEAT, 23).
-define(ROUTE, 24).
-define(PARTITIONS, 25).
-define(CONSUMER_UPDATE, 26).
-define(EXCHANGE_COMMAND_VERSIONS, 27).
-define(STREAM_STATS, 28).
-define(MAX_COMMAND_INDEX, 28). % Increase this when adding a new command
-define(OFFSET_TYPE_NONE, 0).
-define(OFFSET_TYPE_FIRST, 1).
-define(OFFSET_TYPE_LAST, 2).
-define(OFFSET_TYPE_NEXT, 3).
-define(OFFSET_TYPE_OFFSET, 4).
-define(OFFSET_TYPE_TIMESTAMP, 5).
parse(<<?RESPONSE:1, ?DECLARE_PUBLISHER:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
{declare_publisher_response, Corr, ResponseCode};
parse(<<?PUBLISH_CONFIRM:16, ?VERSION_1:16, PublisherId:8, PublishingIdCount:32, PublishingIds/binary>>) ->
{publish_confirm, PublisherId, PublishingIdCount, parse_list_of_longs(PublishingIds, [])};
parse(<<?PUBLISH_ERROR:16, ?VERSION_1:16, PublisherId:8, PublishingIdCount:32, Details/binary>>) ->
ErrorById = [{Id, Code} || <<Id:64, Code:16>> <= Details],
{publish_error, PublisherId, PublishingIdCount, ErrorById};
parse(<<?RESPONSE:1, ?QUERY_PUBLISHER_SEQUENCE:15, ?VERSION_1:16, Corr:32, ResponseCode:16, Sequence:64>>) ->
{query_publisher_sequence_response, Corr, ResponseCode, Sequence};
parse(<<?RESPONSE:1, ?DELETE_PUBLISHER:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
{delete_publisher_response, Corr, ResponseCode};
parse(<<?RESPONSE:1, ?CREDIT:15, ?VERSION_1:16, ResponseCode:16, SubscriptionId:8>>) ->
{credit_response, SubscriptionId, ResponseCode};
parse(<<?RESPONSE:1, ?SUBSCRIBE:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
{subscribe_response, Corr, ResponseCode};
parse(<<?DELIVER:16, ?VERSION_1:16, SubscriptionId:8, OsirisChunk/binary>>) ->
{deliver, SubscriptionId, OsirisChunk};
parse(<<?DELIVER:16, ?VERSION_2:16, SubscriptionId:8, CommittedChunkId:64, OsirisChunk/binary>>) ->
{deliver_v2, SubscriptionId, CommittedChunkId, OsirisChunk};
parse(<<?RESPONSE:1, ?QUERY_OFFSET:15, ?VERSION_1:16, Corr:32, ResponseCode:16, Offset:64>>) ->
{query_offset_response, Corr, ResponseCode, Offset};
parse(<<?RESPONSE:1, ?UNSUBSCRIBE:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
{unsubscribe_response, Corr, ResponseCode};
parse(<<?RESPONSE:1, ?CREATE:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
{create_response, Corr, ResponseCode};
parse(<<?RESPONSE:1, ?DELETE:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
{delete_response, Corr, ResponseCode};
parse(<<?RESPONSE:1, ?METADATA:15, ?VERSION_1:16, Corr:32, Metadata0/binary>>) ->
<<NumEndpoints:32, EndpointsAndMetadata/binary>> = Metadata0,
{Endpoints, Rest} =
parse_endpoints(NumEndpoints, EndpointsAndMetadata),
StreamsMetadata =
parse_streams_metadata(Rest),
{metadata_response, Corr, Endpoints, StreamsMetadata};
parse(<<?METADATA_UPDATE:16, ?VERSION_1:16, ResponseCode:16, StreamSize:16, Stream:StreamSize/binary>>) ->
{metadata_update, ResponseCode, Stream};
parse(<<?RESPONSE:1, ?PEER_PROPERTIES:15, ?VERSION_1:16, Corr:32, ResponseCode:16, _PeerPropertiesCount:32, PeerProperties/binary>>) ->
{peer_properties_response, Corr, ResponseCode, parse_map_binary_values(PeerProperties)};
parse(<<?RESPONSE:1, ?SASL_HANDSHAKE:15, ?VERSION_1:16, Corr:32, ResponseCode:16, _MechanismsCount:32, Mechanisms/binary>>) ->
{sasl_handshake_response, Corr, ResponseCode, parse_list_of_strings(Mechanisms)};
parse(<<?RESPONSE:1, ?SASL_AUTHENTICATE:15, ?VERSION_1:16, Corr:32, ResponseCode:16, SaslOpaque/binary>>) ->
{sasl_authenticate_response, Corr, ResponseCode, SaslOpaque};
parse(<<?REQUEST:1, ?TUNE:15, ?VERSION_1:16, FrameMax:32, Heartbeat:32>>) ->
{tune, FrameMax, Heartbeat};
parse(<<?RESPONSE:1, ?OPEN:15, ?VERSION_1:16, Corr:32, ?RESPONSE_OK:16, _ConnectionPropertiesCount:32, ConnectionProperties/binary>>) ->
{open_response, Corr, ?RESPONSE_OK, parse_map_binary_values(ConnectionProperties)};
parse(<<?RESPONSE:1, ?CLOSE:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
{close_response, Corr, ResponseCode};
parse(<<?REQUEST:1, ?CLOSE:15, ?VERSION_1:16, Corr:32, ResponseCode:16, ReasonSize:16, Reason:ReasonSize/binary>>) ->
{close, Corr, ResponseCode, Reason};
parse(<<?RESPONSE:1, ?OPEN:15, ?VERSION_1:16, Corr:32, ResponseCode:16>>) ->
{open_response, Corr, ResponseCode};
parse(<<?HEARTBEAT:16, ?VERSION_1:16>>) ->
{heartbeat};
parse(<<?RESPONSE:1, ?ROUTE:15, ?VERSION_1:16, Corr:32, ?RESPONSE_OK:16, _StreamsCount:32, Streams/binary>>) ->
{route_response, Corr, ?RESPONSE_OK, parse_list_of_strings(Streams)};
parse(<<?RESPONSE:1, ?ROUTE:15, ?VERSION_1:16, Corr:32, ResponseCode:16, _/binary>>) ->
{route_response, Corr, ResponseCode, []};
parse(<<?RESPONSE:1, ?PARTITIONS:15, ?VERSION_1:16, Corr:32, ?RESPONSE_OK:16, _StreamsCount:32, Streams/binary>>) ->
{partitions_response, Corr, ?RESPONSE_OK, parse_list_of_strings(Streams)};
parse(<<?RESPONSE:1, ?PARTITIONS:15, ?VERSION_1:16, Corr:32, ResponseCode:16, _/binary>>) ->
{partitions_response, Corr, ResponseCode, []};
parse(<<?REQUEST:1, ?CONSUMER_UPDATE:15, ?VERSION_1:16, Corr:32, SubscriptionId:8, Active:8>>) ->
{consumer_update, Corr, SubscriptionId, Active == <<"1">>};
parse(<<?RESPONSE:1, ?EXCHANGE_COMMAND_VERSIONS:15, ?VERSION_1:16, Corr:32, ?RESPONSE_OK:16, _CommandsCount:32, CommandVersions/binary>>) ->
{exchange_command_versions_response, Corr, ?RESPONSE_OK, parse_command_versions(CommandVersions)};
parse(<<?RESPONSE:1, ?EXCHANGE_COMMAND_VERSIONS:15, ?VERSION_1:16, Corr:32, ResponseCode:16, _/binary>>) ->
{exchange_command_versions_response, Corr, ResponseCode, []};
parse(<<?RESPONSE:1, ?STREAM_STATS:15, ?VERSION_1:16, Corr:32, ?RESPONSE_OK:16, _StreamStatsCount:32, StreamStats/binary>>) ->
{stream_stats_response, Corr, ?RESPONSE_OK, parse_stream_stats(StreamStats)};
parse(<<?RESPONSE:1, ?STREAM_STATS:15, ?VERSION_1:16, Corr:32, ResponseCode:16, _/binary>>) ->
{stream_stats_response, Corr, ResponseCode, []};
parse(Unknown) ->
{error, {unknown, Unknown}}.
-define(OSIRIS_MAGIC, 5).
-define(OSIRIS_VERSION_1, 0).
-define(OSIRIS_CHUNK_TYPE_USER, 0).
chunk_to_messages(
<<?OSIRIS_MAGIC:4, ?OSIRIS_VERSION_1:4, ?OSIRIS_CHUNK_TYPE_USER:8, NumberOfEntries:16,
NumberOfRecords:32, Timestamp:64, Epoch:64, ChunkId:64, DataCRC:32, DataLength:32,
_TrailerLength:32, _Reserved:32, Data:DataLength/binary, _Trailer/binary>>
) ->
case erlang:crc32(Data) of
DataCRC ->
%% FIXME why isn't Trailer of length TrailerLength?
Messages = parse_data(Data, []),
Info = #{
chunk_id => ChunkId,
number_of_entries => NumberOfEntries,
number_of_records => NumberOfRecords,
timestamp => Timestamp,
epoch => Epoch
},
{ok, {Messages, Info}};
MismatchingCRC ->
{error, {crc_mismatch, [{expected, DataCRC}, {received, MismatchingCRC}]}}
end;
chunk_to_messages(Other) ->
{error, {invalid_osiris_chunk, Other}}.
parse_data(<<>>, Acc) ->
lists:reverse(Acc);
parse_data(<<0:1, Size:31, Data:Size/binary, Rest/binary>>, Acc) ->
parse_data(Rest, [Data | Acc]).
peer_properties(CorrelationId, Properties) ->
PropertiesCount = length(Properties),
EncodedProperties = encode_keywords(Properties),
<<
?REQUEST:1,
?PEER_PROPERTIES:15,
?VERSION_1:16,
CorrelationId:32,
PropertiesCount:32,
EncodedProperties/binary
>>.
sasl_handshake(CorrelationId) ->
<<
?REQUEST:1,
?SASL_HANDSHAKE:15,
?VERSION_1:16,
CorrelationId:32
>>.
tune(FrameMax, Heartbeat) ->
<<
?REQUEST:1,
?TUNE:15,
?VERSION_1:16,
FrameMax:32,
Heartbeat:32
>>.
sasl_authenticate(CorrelationId, Mechanism = <<"PLAIN">>, User, Password) ->
MechanismSize = byte_size(Mechanism),
Fragment = <<0:8, User/binary, 0:8, Password/binary>>,
FragmentSize = byte_size(Fragment),
<<
?REQUEST:1,
?SASL_AUTHENTICATE:15,
?VERSION_1:16,
CorrelationId:32,
MechanismSize:16,
Mechanism:MechanismSize/binary,
FragmentSize:32,
Fragment:FragmentSize/binary
>>.
open(CorrelationId, Vhost) ->
VhostSize = byte_size(Vhost),
<<
?REQUEST:1,
?OPEN:15,
?VERSION_1:16,
CorrelationId:32,
VhostSize:16,
Vhost:VhostSize/binary
>>.
close(CorrelationId, ResponseCode, Reason) ->
ReasonSize = byte_size(Reason),
<<
?REQUEST:1,
?CLOSE:15,
?VERSION_1:16,
CorrelationId:32,
ResponseCode:16,
ReasonSize:16,
Reason/binary
>>.
close_response(CorrelationId, ResponseCode) ->
<<
?RESPONSE:1,
?CLOSE:15,
?VERSION_1:16,
CorrelationId:32,
ResponseCode:16
>>.
declare_publisher(CorrelationId, Stream, PublisherId, PublisherReference) ->
StreamSize = byte_size(Stream),
PublisherReferenceSize = byte_size(PublisherReference),
<<
?REQUEST:1,
?DECLARE_PUBLISHER:15,
?VERSION_1:16,
CorrelationId:32,
PublisherId:8,
PublisherReferenceSize:16,
PublisherReference:PublisherReferenceSize/binary,
StreamSize:16,
Stream:StreamSize/binary
>>.
publish(PublisherId, Messages) ->
MessageCount = length(Messages),
EncodedMessages = encode_messages(Messages),
<<
?PUBLISH:16,
?VERSION_1:16,
PublisherId:8,
MessageCount:32,
EncodedMessages/binary
>>.
query_publisher_sequence(CorrelationId, PublisherReference, Stream) ->
PublisherReferenceSize = byte_size(PublisherReference),
StreamSize = byte_size(Stream),
<<
?REQUEST:1,
?QUERY_PUBLISHER_SEQUENCE:15,
?VERSION_1:16,
CorrelationId:32,
PublisherReferenceSize:16,
PublisherReference:PublisherReferenceSize/binary,
StreamSize:16,
Stream:StreamSize/binary
>>.
delete_publisher(CorrelationId, PublisherId) ->
<<
?REQUEST:1,
?DELETE_PUBLISHER:15,
?VERSION_1:16,
CorrelationId:32,
PublisherId:8
>>.
credit(SubscriptionId, Credit) ->
<<
?REQUEST:1,
?CREDIT:15,
?VERSION_1:16,
SubscriptionId:8,
Credit:16
>>.
encode_messages(Messages) ->
encode_messages(Messages, <<>>).
encode_messages([], Acc) ->
Acc;
encode_messages([{Id, Message} | Rest], Acc) when is_integer(Id), is_binary(Message) ->
Size = byte_size(Message),
encode_messages(Rest, <<Acc/binary, Id:64, 0:1, Size:31, Message:Size/binary>>).
to_offset_bin(OffsetSpecification) ->
case OffsetSpecification of
%% FIXME why no OFFSET_TYPE_NONE?
first -> <<?OFFSET_TYPE_FIRST:16>>;
last -> <<?OFFSET_TYPE_LAST:16>>;
next -> <<?OFFSET_TYPE_NEXT:16>>;
{offset, Offset} -> <<?OFFSET_TYPE_NEXT:16, Offset:64>>;
{timestamp, Offset} -> <<?OFFSET_TYPE_TIMESTAMP:16, Offset:64/signed>>
end.
subscribe(CorrelationId, Stream, SubscriptionId, OffsetSpecification, Credit, Properties) ->
StreamSize = byte_size(Stream),
OffsetBin = to_offset_bin(OffsetSpecification),
EncodedProperties = encode_keywords(Properties),
PropertiesCount = length(Properties),
<<
?REQUEST:1,
?SUBSCRIBE:15,
?VERSION_1:16,
CorrelationId:32,
SubscriptionId:8,
StreamSize:16,
Stream:StreamSize/binary,
OffsetBin/binary,
Credit:16,
PropertiesCount:32,
EncodedProperties/binary
>>.
store_offset(PublisherReference, Stream, Offset) ->
PublisherReferenceSize = byte_size(PublisherReference),
StreamSize = byte_size(Stream),
<<
?STORE_OFFSET:16,
?VERSION_1:16,
PublisherReferenceSize:16,
PublisherReference:PublisherReferenceSize/binary,
StreamSize:16,
Stream:StreamSize/binary,
Offset:64
>>.
query_offset(CorrelationId, PublisherReference, Stream) ->
PublisherReferenceSize = byte_size(PublisherReference),
StreamSize = byte_size(Stream),
<<
?REQUEST:1,
?QUERY_OFFSET:15,
?VERSION_1:16,
CorrelationId:32,
PublisherReferenceSize:16,
PublisherReference:PublisherReferenceSize/binary,
StreamSize:16,
Stream:StreamSize/binary
>>.
unsubscribe(CorrelationId, SubscriptionId) ->
<<
?REQUEST:1,
?UNSUBSCRIBE:15,
?VERSION_1:16,
CorrelationId:32,
SubscriptionId:8
>>.
create(CorrelationId, Stream, Arguments0) ->
StreamSize = byte_size(Stream),
ArgumentsCount = length(Arguments0),
Arguments = encode_keywords(Arguments0),
<<
?REQUEST:1,
?CREATE:15,
?VERSION_1:16,
CorrelationId:32,
StreamSize:16,
Stream:StreamSize/binary,
ArgumentsCount:32,
Arguments/binary
>>.
delete(CorrelationId, Stream) ->
StreamSize = byte_size(Stream),
<<
?REQUEST:1,
?DELETE:15,
?VERSION_1:16,
CorrelationId:32,
StreamSize:16,
Stream:StreamSize/binary
>>.
metadata(CorrelationId, Streams) ->
StreamsCount = length(Streams),
EncodedStreams = encode_list_of_strings(Streams),
<<
?REQUEST:1,
?METADATA:15,
?VERSION_1:16,
CorrelationId:32,
StreamsCount:32,
EncodedStreams/binary
>>.
heartbeat() ->
<<
?HEARTBEAT:16,
?VERSION_1:16
>>.
route(CorrelationId, RoutingKey, SuperStream) ->
RoutingKeySize = byte_size(RoutingKey),
SuperStreamSize = byte_size(SuperStream),
<<
?REQUEST:1,
?ROUTE:15,
?VERSION_1:16,
CorrelationId:32,
RoutingKeySize:16,
RoutingKey/binary,
SuperStreamSize:16,
SuperStream/binary
>>.
partitions(CorrelationId, SuperStream) ->
SuperStreamSize = byte_size(SuperStream),
<<
?REQUEST:1,
?PARTITIONS:15,
?VERSION_1:16,
CorrelationId:32,
SuperStreamSize:16,
SuperStream/binary
>>.
stream_stats(CorrelationId, Stream) ->
StreamSize = byte_size(Stream),
<<
?REQUEST:1,
?STREAM_STATS:15,
?VERSION_1:16,
CorrelationId:32,
StreamSize:16,
Stream/binary
>>.
command_versions() ->
maps:from_list([{Command, version_range_by_id(Command)} ||
Command <- lists:seq(?MIN_COMMAND_INDEX, ?MAX_COMMAND_INDEX)]).
version_range_by_id(?DELIVER) -> {?VERSION_1, ?VERSION_2};
version_range_by_id(_) -> {?VERSION_1, ?VERSION_1}.
exchange_command_versions(CorrelationId) ->
CommandsBinary =
<< <<Command:16,MinVersion:16,MaxVersion:16>> ||
{Command,{MinVersion, MaxVersion}} <- maps:to_list(command_versions())>>,
CommandsCount = maps:size(command_versions()),
<<
?REQUEST:1,
?EXCHANGE_COMMAND_VERSIONS:15,
?VERSION_1:16,
CorrelationId:32,
CommandsCount:32,
CommandsBinary/binary
>>.
consumer_update_response(CorrelationId, ResponseCode, OffsetSpecification) ->
OffsetBin = to_offset_bin(OffsetSpecification),
<<
?RESPONSE:1,
?CONSUMER_UPDATE:15,
?VERSION_1:16,
CorrelationId:32,
ResponseCode:16,
OffsetBin/binary
>>.
parse_map_binary_values(Bin) when is_binary(Bin) ->
parse_map_binary_values(Bin, #{}).
parse_map_binary_values(<<>>, Acc) ->
Acc;
parse_map_binary_values(<<KeySize:16, Key:KeySize/binary, ValueSize:16, Value:ValueSize/binary, Rest/binary>>, Acc) ->
parse_map_binary_values(Rest, Acc#{Key => Value}).
parse_command_versions(Bin) when is_binary(Bin) ->
parse_command_versions(Bin, #{}).
parse_command_versions(<<>>, Acc) ->
Acc;
parse_command_versions(<<Key:16, MinVersion:16, MaxVersion:16, Rest/binary>>, Acc) ->
parse_command_versions(Rest, Acc#{Key => {MinVersion, MaxVersion}}).
parse_stream_stats(Bin) when is_binary(Bin) ->
parse_stream_stats(Bin, #{}).
parse_stream_stats(<<>>, Acc) ->
Acc;
parse_stream_stats(<<KeySize:16, Key:KeySize/binary, Value:64/signed, Rest/binary>>, Acc) ->
parse_stream_stats(Rest, Acc#{Key => Value}).
parse_list_of_strings(Bin) ->
parse_list_of_strings(Bin, []).
parse_list_of_strings(<<>>, Acc) ->
lists:reverse(Acc);
parse_list_of_strings(<<Size:16, String:Size/binary, Rest/binary>>, Acc) ->
parse_list_of_strings(Rest, [String | Acc]).
parse_list_of_longs(<<>>, Acc) ->
lists:reverse(Acc);
parse_list_of_longs(<<Id:64, Rest/binary>>, Acc) ->
parse_list_of_longs(Rest, [Id | Acc]).
encode_keywords(Keywords) ->
encode_keywords(Keywords, <<>>).
encode_keywords([], Acc) ->
Acc;
encode_keywords([{Key, Value} | Rest], Acc) ->
SizeKey = byte_size(Key),
SizeValue = byte_size(Value),
encode_keywords(Rest, <<Acc/binary, SizeKey:16, Key/binary, SizeValue:16, Value/binary>>).
encode_list_of_strings(List) ->
encode_list_of_strings(List, <<>>).
encode_list_of_strings([], Acc) ->
Acc;
encode_list_of_strings([Bin | Rest], Acc) ->
Size = byte_size(Bin),
encode_list_of_strings(Rest, <<Acc/binary, Size:16, Bin:Size/binary>>).
parse_endpoints(NumEndpoints, Bin) ->
parse_endpoints(NumEndpoints, Bin, []).
parse_endpoints(0, Rest, Endpoints) ->
{Endpoints, Rest};
parse_endpoints(
Cnt,
<<Index:16, HostLength:16, Host:HostLength/binary, Port:32, Rest/binary>>,
Acc
) ->
EndpointMetadata = #{
index => Index,
host => Host,
port => Port
},
parse_endpoints(Cnt - 1, Rest, [EndpointMetadata | Acc]).
parse_streams_metadata(<<NumStreams:32, Meta/binary>>) ->
parse_streams_metadata(NumStreams, Meta, []).
parse_streams_metadata(0, <<>>, Acc) ->
Acc;
%% Error case
parse_streams_metadata(
NumStreams,
<<StreamLength:16, Stream:StreamLength/binary, Code:16, -1:16/signed, 0:32, Rest/binary>>,
Acc
) ->
StreamMetadata = #{
stream => Stream,
code => Code
},
parse_streams_metadata(NumStreams - 1, Rest, [StreamMetadata | Acc]);
%% Success case
parse_streams_metadata(
NumStreams,
<<StreamLength:16, Stream:StreamLength/binary, ?RESPONSE_OK:16, LeaderIndex:16,
ReplicasCount:32, ReplicasAndRest/binary>>,
Acc
) ->
ReplicasSize = ReplicasCount * 2,
<<ReplicasBin:ReplicasSize/binary, Rest/binary>> = ReplicasAndRest,
Replicas = [EndpointIndex || <<EndpointIndex:16>> <= ReplicasBin],
StreamMetadata = #{
stream => Stream,
leader_index => LeaderIndex,
replicas_count => ReplicasCount,
replicas => Replicas
},
parse_streams_metadata(NumStreams - 1, Rest, [StreamMetadata | Acc]).
message_to_correlation_id({declare_publisher_response, Corr, _}) ->
{ok, Corr};
message_to_correlation_id({query_publisher_sequence, Corr, _, _}) ->
{ok, Corr};
message_to_correlation_id({delete_publisher_response, Corr, _}) ->
{ok, Corr};
message_to_correlation_id({subscribe_response, Corr, _}) ->
{ok, Corr};
message_to_correlation_id({query_offset_response, Corr, _, _}) ->
{ok, Corr};
message_to_correlation_id({unsubscribe_response, Corr, _}) ->
{ok, Corr};
message_to_correlation_id({create_response, Corr, _}) ->
{ok, Corr};
message_to_correlation_id({delete_response, Corr, _}) ->
{ok, Corr};
message_to_correlation_id({metadata_response, Corr, _, _}) ->
{ok, Corr};
message_to_correlation_id({route_response, Corr, _, _}) ->
{ok, Corr};
message_to_correlation_id(Other) ->
{error, {no_correlation_id, Other}}.