defmodule Kayrock.MessageSet do
@moduledoc """
Represents a set of messages with the v0 or v1 format
This is the old format that KafkaEx supported
See https://kafka.apache.org/documentation/#recordbatch
"""
defmodule Message do
@moduledoc """
Represents a single message with the v0 or v1 format
This is the old format that KafkaEx supported
See https://kafka.apache.org/documentation/#recordbatch
"""
defstruct offset: nil,
compression: :none,
key: nil,
value: nil,
attributes: nil,
crc: nil,
timestamp: nil,
timestamp_type: nil
@type t :: %__MODULE__{}
end
import Bitwise
defstruct messages: [], magic: 0
@type t :: %__MODULE__{}
@spec serialize(t) :: iodata
def serialize(%__MODULE__{messages: messages}) when is_list(messages) do
[%Message{compression: compression} | _] = messages
# note when we serialize we never have an offset
{message, msize} = create_message_set(messages, compression)
[<<msize::32-signed>>, message]
end
@spec deserialize(binary) :: t
def deserialize(data), do: deserialize(data, 0)
def deserialize(data, magic) do
msgs = do_deserialize(data, [], 0)
%__MODULE__{messages: msgs, magic: magic}
end
defp do_deserialize(
<<offset::64-signed, msg_size::32-signed, msg::size(msg_size)-binary, orig_rest::bits>>,
acc,
add_offset
) do
<<crc::32, magic::8-signed, attributes::8-signed, rest::bits>> = msg
{timestamp, rest} =
case magic do
0 -> {nil, rest}
1 -> Kayrock.Deserialize.deserialize(:int64, rest)
end
{key, rest} = deserialize_string(rest)
{value, <<>>} = deserialize_string(rest)
msg =
case compression_from_attributes(attributes) do
0 ->
timestamp_type = timestamp_type_from_attributes(attributes, magic)
%Message{
offset: offset + add_offset,
crc: crc,
attributes: attributes,
key: key,
value: value,
timestamp: timestamp,
timestamp_type: timestamp_type
}
c ->
c
|> Kayrock.Compression.decompress(value)
|> do_deserialize([], 0)
|> correct_offsets(offset)
|> Enum.reverse()
end
do_deserialize(orig_rest, [msg | acc], add_offset)
end
defp do_deserialize(_, acc, _add_offset) do
Enum.reverse(List.flatten(acc))
end
# All other cases, compressed inner messages should have relative offset, with below attributes:
# - The container message should have the 'real' offset
# - The container message's offset should be the 'real' offset of the last message in the compressed batch
defp correct_offsets(messages, real_offset) do
max_relative_offset = messages |> List.last() |> Map.fetch!(:offset)
if max_relative_offset == real_offset do
messages
else
Enum.map(messages, fn msg ->
Map.update!(msg, :offset, &(&1 + real_offset))
end)
end
end
defp create_message_set([], _compression_type), do: {"", 0}
defp create_message_set(messages, :none) do
create_message_set_uncompressed(messages)
end
defp create_message_set(messages, compression_type) do
alias Kayrock.Compression
{message_set, _} = create_message_set(messages, :none)
{compressed_message_set, attribute} = Compression.compress(compression_type, message_set)
{message, msize} = create_message(compressed_message_set, nil, attribute)
{[<<0::64-signed>>, <<msize::32-signed>>, message], 8 + 4 + msize}
end
defp create_message_set_uncompressed([
%Message{key: key, value: value} | messages
]) do
{message, msize} = create_message(value, key)
message_set = [<<0::64-signed>>, <<msize::32-signed>>, message]
{message_set2, ms2size} = create_message_set(messages, :none)
{[message_set, message_set2], 8 + 4 + msize + ms2size}
end
defp create_message(value, key, attributes \\ 0) do
{bkey, skey} = bytes(key)
{bvalue, svalue} = bytes(value)
sub = [<<0::8, attributes::8-signed>>, bkey, bvalue]
crc = :erlang.crc32(sub)
{[<<crc::32>>, sub], 4 + 2 + skey + svalue}
end
# the 3 lsb specifies compression
defp compression_from_attributes(a), do: a &&& 7
defp timestamp_type_from_attributes(a, 1), do: a &&& 8
defp timestamp_type_from_attributes(_, _), do: nil
defp deserialize_string(<<-1::32-signed, rest::bits>>), do: {nil, rest}
defp deserialize_string(<<str_size::32-signed, str::size(str_size)-binary, rest::bits>>),
do: {str, rest}
defp bytes(nil), do: {<<-1::32-signed>>, 4}
defp bytes(data) do
case :erlang.iolist_size(data) do
0 -> {<<0::32>>, 4}
size -> {[<<size::32>>, data], 4 + size}
end
end
end