defmodule(Kayrock.Fetch) do
@api :fetch
@moduledoc "Kayrock-generated module for the Kafka `#{@api}` API\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
(
@vmin 0
@vmax 7
)
defmodule(V0.Request) do
@vsn 0
@api :fetch
@schema replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
topics:
{:array,
[
topic: :string,
partitions:
{:array, [partition: :int32, fetch_offset: :int64, max_bytes: :int32]}
]}
@moduledoc "Kayrock-generated request struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(
replica_id: nil,
max_wait_time: nil,
min_bytes: nil,
topics: [],
correlation_id: nil,
client_id: nil
)
import(Elixir.Kayrock.Serialize)
@typedoc "Request struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
replica_id: nil | integer(),
max_wait_time: nil | integer(),
min_bytes: nil | integer(),
topics: [
%{
topic: nil | binary(),
partitions: [
%{
partition: nil | integer(),
fetch_offset: nil | integer(),
max_bytes: nil | integer()
}
]
}
],
correlation_id: nil | integer(),
client_id: nil | binary()
}
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
0
end
@doc "Returns a function that can be used to deserialize the wire response from the\nbroker for this message type\n"
@spec response_deserializer :: (binary -> {V0.Response.t(), binary})
def(response_deserializer) do
&V0.Response.deserialize/1
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
topics:
{:array,
[
topic: :string,
partitions: {:array, [partition: :int32, fetch_offset: :int64, max_bytes: :int32]}
]}
]
end
@doc "Serialize a message to binary data for transfer to a Kafka broker"
@spec serialize(t()) :: iodata
def(serialize(%V0.Request{} = struct)) do
[
<<api_key()::16, api_vsn()::16, struct.correlation_id::32,
byte_size(struct.client_id)::16, struct.client_id::binary>>,
[
serialize(:int32, Map.fetch!(struct, :replica_id)),
serialize(:int32, Map.fetch!(struct, :max_wait_time)),
serialize(:int32, Map.fetch!(struct, :min_bytes)),
case(Map.fetch!(struct, :topics)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:string, Map.fetch!(v, :topic)),
case(Map.fetch!(v, :partitions)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:int32, Map.fetch!(v, :partition)),
serialize(:int64, Map.fetch!(v, :fetch_offset)),
serialize(:int32, Map.fetch!(v, :max_bytes))
]
end
]
end
]
end
]
end
]
]
end
end
defimpl(Elixir.Kayrock.Request, for: V0.Request) do
def(serialize(%V0.Request{} = struct)) do
try do
V0.Request.serialize(struct)
rescue
e ->
reraise(Kayrock.InvalidRequestError, {e, struct}, __STACKTRACE__)
end
end
def(api_vsn(%V0.Request{})) do
V0.Request.api_vsn()
end
def(response_deserializer(%V0.Request{})) do
V0.Request.response_deserializer()
end
end
defmodule(V1.Request) do
@vsn 1
@api :fetch
@schema replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
topics:
{:array,
[
topic: :string,
partitions:
{:array, [partition: :int32, fetch_offset: :int64, max_bytes: :int32]}
]}
@moduledoc "Kayrock-generated request struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(
replica_id: nil,
max_wait_time: nil,
min_bytes: nil,
topics: [],
correlation_id: nil,
client_id: nil
)
import(Elixir.Kayrock.Serialize)
@typedoc "Request struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
replica_id: nil | integer(),
max_wait_time: nil | integer(),
min_bytes: nil | integer(),
topics: [
%{
topic: nil | binary(),
partitions: [
%{
partition: nil | integer(),
fetch_offset: nil | integer(),
max_bytes: nil | integer()
}
]
}
],
correlation_id: nil | integer(),
client_id: nil | binary()
}
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
1
end
@doc "Returns a function that can be used to deserialize the wire response from the\nbroker for this message type\n"
@spec response_deserializer :: (binary -> {V1.Response.t(), binary})
def(response_deserializer) do
&V1.Response.deserialize/1
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
topics:
{:array,
[
topic: :string,
partitions: {:array, [partition: :int32, fetch_offset: :int64, max_bytes: :int32]}
]}
]
end
@doc "Serialize a message to binary data for transfer to a Kafka broker"
@spec serialize(t()) :: iodata
def(serialize(%V1.Request{} = struct)) do
[
<<api_key()::16, api_vsn()::16, struct.correlation_id::32,
byte_size(struct.client_id)::16, struct.client_id::binary>>,
[
serialize(:int32, Map.fetch!(struct, :replica_id)),
serialize(:int32, Map.fetch!(struct, :max_wait_time)),
serialize(:int32, Map.fetch!(struct, :min_bytes)),
case(Map.fetch!(struct, :topics)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:string, Map.fetch!(v, :topic)),
case(Map.fetch!(v, :partitions)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:int32, Map.fetch!(v, :partition)),
serialize(:int64, Map.fetch!(v, :fetch_offset)),
serialize(:int32, Map.fetch!(v, :max_bytes))
]
end
]
end
]
end
]
end
]
]
end
end
defimpl(Elixir.Kayrock.Request, for: V1.Request) do
def(serialize(%V1.Request{} = struct)) do
try do
V1.Request.serialize(struct)
rescue
e ->
reraise(Kayrock.InvalidRequestError, {e, struct}, __STACKTRACE__)
end
end
def(api_vsn(%V1.Request{})) do
V1.Request.api_vsn()
end
def(response_deserializer(%V1.Request{})) do
V1.Request.response_deserializer()
end
end
defmodule(V2.Request) do
@vsn 2
@api :fetch
@schema replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
topics:
{:array,
[
topic: :string,
partitions:
{:array, [partition: :int32, fetch_offset: :int64, max_bytes: :int32]}
]}
@moduledoc "Kayrock-generated request struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(
replica_id: nil,
max_wait_time: nil,
min_bytes: nil,
topics: [],
correlation_id: nil,
client_id: nil
)
import(Elixir.Kayrock.Serialize)
@typedoc "Request struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
replica_id: nil | integer(),
max_wait_time: nil | integer(),
min_bytes: nil | integer(),
topics: [
%{
topic: nil | binary(),
partitions: [
%{
partition: nil | integer(),
fetch_offset: nil | integer(),
max_bytes: nil | integer()
}
]
}
],
correlation_id: nil | integer(),
client_id: nil | binary()
}
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
2
end
@doc "Returns a function that can be used to deserialize the wire response from the\nbroker for this message type\n"
@spec response_deserializer :: (binary -> {V2.Response.t(), binary})
def(response_deserializer) do
&V2.Response.deserialize/1
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
topics:
{:array,
[
topic: :string,
partitions: {:array, [partition: :int32, fetch_offset: :int64, max_bytes: :int32]}
]}
]
end
@doc "Serialize a message to binary data for transfer to a Kafka broker"
@spec serialize(t()) :: iodata
def(serialize(%V2.Request{} = struct)) do
[
<<api_key()::16, api_vsn()::16, struct.correlation_id::32,
byte_size(struct.client_id)::16, struct.client_id::binary>>,
[
serialize(:int32, Map.fetch!(struct, :replica_id)),
serialize(:int32, Map.fetch!(struct, :max_wait_time)),
serialize(:int32, Map.fetch!(struct, :min_bytes)),
case(Map.fetch!(struct, :topics)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:string, Map.fetch!(v, :topic)),
case(Map.fetch!(v, :partitions)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:int32, Map.fetch!(v, :partition)),
serialize(:int64, Map.fetch!(v, :fetch_offset)),
serialize(:int32, Map.fetch!(v, :max_bytes))
]
end
]
end
]
end
]
end
]
]
end
end
defimpl(Elixir.Kayrock.Request, for: V2.Request) do
def(serialize(%V2.Request{} = struct)) do
try do
V2.Request.serialize(struct)
rescue
e ->
reraise(Kayrock.InvalidRequestError, {e, struct}, __STACKTRACE__)
end
end
def(api_vsn(%V2.Request{})) do
V2.Request.api_vsn()
end
def(response_deserializer(%V2.Request{})) do
V2.Request.response_deserializer()
end
end
defmodule(V3.Request) do
@vsn 3
@api :fetch
@schema replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
max_bytes: :int32,
topics:
{:array,
[
topic: :string,
partitions:
{:array, [partition: :int32, fetch_offset: :int64, max_bytes: :int32]}
]}
@moduledoc "Kayrock-generated request struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(
replica_id: nil,
max_wait_time: nil,
min_bytes: nil,
max_bytes: nil,
topics: [],
correlation_id: nil,
client_id: nil
)
import(Elixir.Kayrock.Serialize)
@typedoc "Request struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
replica_id: nil | integer(),
max_wait_time: nil | integer(),
min_bytes: nil | integer(),
max_bytes: nil | integer(),
topics: [
%{
topic: nil | binary(),
partitions: [
%{
partition: nil | integer(),
fetch_offset: nil | integer(),
max_bytes: nil | integer()
}
]
}
],
correlation_id: nil | integer(),
client_id: nil | binary()
}
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
3
end
@doc "Returns a function that can be used to deserialize the wire response from the\nbroker for this message type\n"
@spec response_deserializer :: (binary -> {V3.Response.t(), binary})
def(response_deserializer) do
&V3.Response.deserialize/1
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
max_bytes: :int32,
topics:
{:array,
[
topic: :string,
partitions: {:array, [partition: :int32, fetch_offset: :int64, max_bytes: :int32]}
]}
]
end
@doc "Serialize a message to binary data for transfer to a Kafka broker"
@spec serialize(t()) :: iodata
def(serialize(%V3.Request{} = struct)) do
[
<<api_key()::16, api_vsn()::16, struct.correlation_id::32,
byte_size(struct.client_id)::16, struct.client_id::binary>>,
[
serialize(:int32, Map.fetch!(struct, :replica_id)),
serialize(:int32, Map.fetch!(struct, :max_wait_time)),
serialize(:int32, Map.fetch!(struct, :min_bytes)),
serialize(:int32, Map.fetch!(struct, :max_bytes)),
case(Map.fetch!(struct, :topics)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:string, Map.fetch!(v, :topic)),
case(Map.fetch!(v, :partitions)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:int32, Map.fetch!(v, :partition)),
serialize(:int64, Map.fetch!(v, :fetch_offset)),
serialize(:int32, Map.fetch!(v, :max_bytes))
]
end
]
end
]
end
]
end
]
]
end
end
defimpl(Elixir.Kayrock.Request, for: V3.Request) do
def(serialize(%V3.Request{} = struct)) do
try do
V3.Request.serialize(struct)
rescue
e ->
reraise(Kayrock.InvalidRequestError, {e, struct}, __STACKTRACE__)
end
end
def(api_vsn(%V3.Request{})) do
V3.Request.api_vsn()
end
def(response_deserializer(%V3.Request{})) do
V3.Request.response_deserializer()
end
end
defmodule(V4.Request) do
@vsn 4
@api :fetch
@schema replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
max_bytes: :int32,
isolation_level: :int8,
topics:
{:array,
[
topic: :string,
partitions:
{:array, [partition: :int32, fetch_offset: :int64, max_bytes: :int32]}
]}
@moduledoc "Kayrock-generated request struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(
replica_id: nil,
max_wait_time: nil,
min_bytes: nil,
max_bytes: nil,
isolation_level: nil,
topics: [],
correlation_id: nil,
client_id: nil
)
import(Elixir.Kayrock.Serialize)
@typedoc "Request struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
replica_id: nil | integer(),
max_wait_time: nil | integer(),
min_bytes: nil | integer(),
max_bytes: nil | integer(),
isolation_level: nil | integer(),
topics: [
%{
topic: nil | binary(),
partitions: [
%{
partition: nil | integer(),
fetch_offset: nil | integer(),
max_bytes: nil | integer()
}
]
}
],
correlation_id: nil | integer(),
client_id: nil | binary()
}
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
4
end
@doc "Returns a function that can be used to deserialize the wire response from the\nbroker for this message type\n"
@spec response_deserializer :: (binary -> {V4.Response.t(), binary})
def(response_deserializer) do
&V4.Response.deserialize/1
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
max_bytes: :int32,
isolation_level: :int8,
topics:
{:array,
[
topic: :string,
partitions: {:array, [partition: :int32, fetch_offset: :int64, max_bytes: :int32]}
]}
]
end
@doc "Serialize a message to binary data for transfer to a Kafka broker"
@spec serialize(t()) :: iodata
def(serialize(%V4.Request{} = struct)) do
[
<<api_key()::16, api_vsn()::16, struct.correlation_id::32,
byte_size(struct.client_id)::16, struct.client_id::binary>>,
[
serialize(:int32, Map.fetch!(struct, :replica_id)),
serialize(:int32, Map.fetch!(struct, :max_wait_time)),
serialize(:int32, Map.fetch!(struct, :min_bytes)),
serialize(:int32, Map.fetch!(struct, :max_bytes)),
serialize(:int8, Map.fetch!(struct, :isolation_level)),
case(Map.fetch!(struct, :topics)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:string, Map.fetch!(v, :topic)),
case(Map.fetch!(v, :partitions)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:int32, Map.fetch!(v, :partition)),
serialize(:int64, Map.fetch!(v, :fetch_offset)),
serialize(:int32, Map.fetch!(v, :max_bytes))
]
end
]
end
]
end
]
end
]
]
end
end
defimpl(Elixir.Kayrock.Request, for: V4.Request) do
def(serialize(%V4.Request{} = struct)) do
try do
V4.Request.serialize(struct)
rescue
e ->
reraise(Kayrock.InvalidRequestError, {e, struct}, __STACKTRACE__)
end
end
def(api_vsn(%V4.Request{})) do
V4.Request.api_vsn()
end
def(response_deserializer(%V4.Request{})) do
V4.Request.response_deserializer()
end
end
defmodule(V5.Request) do
@vsn 5
@api :fetch
@schema replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
max_bytes: :int32,
isolation_level: :int8,
topics:
{:array,
[
topic: :string,
partitions:
{:array,
[
partition: :int32,
fetch_offset: :int64,
log_start_offset: :int64,
max_bytes: :int32
]}
]}
@moduledoc "Kayrock-generated request struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(
replica_id: nil,
max_wait_time: nil,
min_bytes: nil,
max_bytes: nil,
isolation_level: nil,
topics: [],
correlation_id: nil,
client_id: nil
)
import(Elixir.Kayrock.Serialize)
@typedoc "Request struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
replica_id: nil | integer(),
max_wait_time: nil | integer(),
min_bytes: nil | integer(),
max_bytes: nil | integer(),
isolation_level: nil | integer(),
topics: [
%{
topic: nil | binary(),
partitions: [
%{
partition: nil | integer(),
fetch_offset: nil | integer(),
log_start_offset: nil | integer(),
max_bytes: nil | integer()
}
]
}
],
correlation_id: nil | integer(),
client_id: nil | binary()
}
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
5
end
@doc "Returns a function that can be used to deserialize the wire response from the\nbroker for this message type\n"
@spec response_deserializer :: (binary -> {V5.Response.t(), binary})
def(response_deserializer) do
&V5.Response.deserialize/1
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
max_bytes: :int32,
isolation_level: :int8,
topics:
{:array,
[
topic: :string,
partitions:
{:array,
[
partition: :int32,
fetch_offset: :int64,
log_start_offset: :int64,
max_bytes: :int32
]}
]}
]
end
@doc "Serialize a message to binary data for transfer to a Kafka broker"
@spec serialize(t()) :: iodata
def(serialize(%V5.Request{} = struct)) do
[
<<api_key()::16, api_vsn()::16, struct.correlation_id::32,
byte_size(struct.client_id)::16, struct.client_id::binary>>,
[
serialize(:int32, Map.fetch!(struct, :replica_id)),
serialize(:int32, Map.fetch!(struct, :max_wait_time)),
serialize(:int32, Map.fetch!(struct, :min_bytes)),
serialize(:int32, Map.fetch!(struct, :max_bytes)),
serialize(:int8, Map.fetch!(struct, :isolation_level)),
case(Map.fetch!(struct, :topics)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:string, Map.fetch!(v, :topic)),
case(Map.fetch!(v, :partitions)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:int32, Map.fetch!(v, :partition)),
serialize(:int64, Map.fetch!(v, :fetch_offset)),
serialize(:int64, Map.fetch!(v, :log_start_offset)),
serialize(:int32, Map.fetch!(v, :max_bytes))
]
end
]
end
]
end
]
end
]
]
end
end
defimpl(Elixir.Kayrock.Request, for: V5.Request) do
def(serialize(%V5.Request{} = struct)) do
try do
V5.Request.serialize(struct)
rescue
e ->
reraise(Kayrock.InvalidRequestError, {e, struct}, __STACKTRACE__)
end
end
def(api_vsn(%V5.Request{})) do
V5.Request.api_vsn()
end
def(response_deserializer(%V5.Request{})) do
V5.Request.response_deserializer()
end
end
defmodule(V6.Request) do
@vsn 6
@api :fetch
@schema replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
max_bytes: :int32,
isolation_level: :int8,
topics:
{:array,
[
topic: :string,
partitions:
{:array,
[
partition: :int32,
fetch_offset: :int64,
log_start_offset: :int64,
max_bytes: :int32
]}
]}
@moduledoc "Kayrock-generated request struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(
replica_id: nil,
max_wait_time: nil,
min_bytes: nil,
max_bytes: nil,
isolation_level: nil,
topics: [],
correlation_id: nil,
client_id: nil
)
import(Elixir.Kayrock.Serialize)
@typedoc "Request struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
replica_id: nil | integer(),
max_wait_time: nil | integer(),
min_bytes: nil | integer(),
max_bytes: nil | integer(),
isolation_level: nil | integer(),
topics: [
%{
topic: nil | binary(),
partitions: [
%{
partition: nil | integer(),
fetch_offset: nil | integer(),
log_start_offset: nil | integer(),
max_bytes: nil | integer()
}
]
}
],
correlation_id: nil | integer(),
client_id: nil | binary()
}
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
6
end
@doc "Returns a function that can be used to deserialize the wire response from the\nbroker for this message type\n"
@spec response_deserializer :: (binary -> {V6.Response.t(), binary})
def(response_deserializer) do
&V6.Response.deserialize/1
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
max_bytes: :int32,
isolation_level: :int8,
topics:
{:array,
[
topic: :string,
partitions:
{:array,
[
partition: :int32,
fetch_offset: :int64,
log_start_offset: :int64,
max_bytes: :int32
]}
]}
]
end
@doc "Serialize a message to binary data for transfer to a Kafka broker"
@spec serialize(t()) :: iodata
def(serialize(%V6.Request{} = struct)) do
[
<<api_key()::16, api_vsn()::16, struct.correlation_id::32,
byte_size(struct.client_id)::16, struct.client_id::binary>>,
[
serialize(:int32, Map.fetch!(struct, :replica_id)),
serialize(:int32, Map.fetch!(struct, :max_wait_time)),
serialize(:int32, Map.fetch!(struct, :min_bytes)),
serialize(:int32, Map.fetch!(struct, :max_bytes)),
serialize(:int8, Map.fetch!(struct, :isolation_level)),
case(Map.fetch!(struct, :topics)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:string, Map.fetch!(v, :topic)),
case(Map.fetch!(v, :partitions)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:int32, Map.fetch!(v, :partition)),
serialize(:int64, Map.fetch!(v, :fetch_offset)),
serialize(:int64, Map.fetch!(v, :log_start_offset)),
serialize(:int32, Map.fetch!(v, :max_bytes))
]
end
]
end
]
end
]
end
]
]
end
end
defimpl(Elixir.Kayrock.Request, for: V6.Request) do
def(serialize(%V6.Request{} = struct)) do
try do
V6.Request.serialize(struct)
rescue
e ->
reraise(Kayrock.InvalidRequestError, {e, struct}, __STACKTRACE__)
end
end
def(api_vsn(%V6.Request{})) do
V6.Request.api_vsn()
end
def(response_deserializer(%V6.Request{})) do
V6.Request.response_deserializer()
end
end
defmodule(V7.Request) do
@vsn 7
@api :fetch
@schema replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
max_bytes: :int32,
isolation_level: :int8,
session_id: :int32,
epoch: :int32,
topics:
{:array,
[
topic: :string,
partitions:
{:array,
[
partition: :int32,
fetch_offset: :int64,
log_start_offset: :int64,
max_bytes: :int32
]}
]},
forgetten_topics_data:
{:array,
[
topic: :string,
partitions:
{:array,
[
partition: :int32,
fetch_offset: :int64,
log_start_offset: :int64,
max_bytes: :int32
]}
]}
@moduledoc "Kayrock-generated request struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(
replica_id: nil,
max_wait_time: nil,
min_bytes: nil,
max_bytes: nil,
isolation_level: nil,
session_id: nil,
epoch: nil,
topics: [],
forgetten_topics_data: [],
correlation_id: nil,
client_id: nil
)
import(Elixir.Kayrock.Serialize)
@typedoc "Request struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
replica_id: nil | integer(),
max_wait_time: nil | integer(),
min_bytes: nil | integer(),
max_bytes: nil | integer(),
isolation_level: nil | integer(),
session_id: nil | integer(),
epoch: nil | integer(),
topics: [
%{
topic: nil | binary(),
partitions: [
%{
partition: nil | integer(),
fetch_offset: nil | integer(),
log_start_offset: nil | integer(),
max_bytes: nil | integer()
}
]
}
],
forgetten_topics_data: [
%{
topic: nil | binary(),
partitions: [
%{
partition: nil | integer(),
fetch_offset: nil | integer(),
log_start_offset: nil | integer(),
max_bytes: nil | integer()
}
]
}
],
correlation_id: nil | integer(),
client_id: nil | binary()
}
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
7
end
@doc "Returns a function that can be used to deserialize the wire response from the\nbroker for this message type\n"
@spec response_deserializer :: (binary -> {V7.Response.t(), binary})
def(response_deserializer) do
&V7.Response.deserialize/1
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
replica_id: :int32,
max_wait_time: :int32,
min_bytes: :int32,
max_bytes: :int32,
isolation_level: :int8,
session_id: :int32,
epoch: :int32,
topics:
{:array,
[
topic: :string,
partitions:
{:array,
[
partition: :int32,
fetch_offset: :int64,
log_start_offset: :int64,
max_bytes: :int32
]}
]},
forgetten_topics_data:
{:array,
[
topic: :string,
partitions:
{:array,
[
partition: :int32,
fetch_offset: :int64,
log_start_offset: :int64,
max_bytes: :int32
]}
]}
]
end
@doc "Serialize a message to binary data for transfer to a Kafka broker"
@spec serialize(t()) :: iodata
def(serialize(%V7.Request{} = struct)) do
[
<<api_key()::16, api_vsn()::16, struct.correlation_id::32,
byte_size(struct.client_id)::16, struct.client_id::binary>>,
[
serialize(:int32, Map.fetch!(struct, :replica_id)),
serialize(:int32, Map.fetch!(struct, :max_wait_time)),
serialize(:int32, Map.fetch!(struct, :min_bytes)),
serialize(:int32, Map.fetch!(struct, :max_bytes)),
serialize(:int8, Map.fetch!(struct, :isolation_level)),
serialize(:int32, Map.fetch!(struct, :session_id)),
serialize(:int32, Map.fetch!(struct, :epoch)),
case(Map.fetch!(struct, :topics)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:string, Map.fetch!(v, :topic)),
case(Map.fetch!(v, :partitions)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:int32, Map.fetch!(v, :partition)),
serialize(:int64, Map.fetch!(v, :fetch_offset)),
serialize(:int64, Map.fetch!(v, :log_start_offset)),
serialize(:int32, Map.fetch!(v, :max_bytes))
]
end
]
end
]
end
]
end,
case(Map.fetch!(struct, :forgetten_topics_data)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:string, Map.fetch!(v, :topic)),
case(Map.fetch!(v, :partitions)) do
nil ->
<<-1::32-signed>>
[] ->
<<0::32-signed>>
vals when is_list(vals) ->
[
<<length(vals)::32-signed>>,
for(v <- vals) do
[
serialize(:int32, Map.fetch!(v, :partition)),
serialize(:int64, Map.fetch!(v, :fetch_offset)),
serialize(:int64, Map.fetch!(v, :log_start_offset)),
serialize(:int32, Map.fetch!(v, :max_bytes))
]
end
]
end
]
end
]
end
]
]
end
end
defimpl(Elixir.Kayrock.Request, for: V7.Request) do
def(serialize(%V7.Request{} = struct)) do
try do
V7.Request.serialize(struct)
rescue
e ->
reraise(Kayrock.InvalidRequestError, {e, struct}, __STACKTRACE__)
end
end
def(api_vsn(%V7.Request{})) do
V7.Request.api_vsn()
end
def(response_deserializer(%V7.Request{})) do
V7.Request.response_deserializer()
end
end
(
@doc "Returns a request struct for this API with the given version"
@spec get_request_struct(integer) :: request_t
)
def(get_request_struct(0)) do
%V0.Request{}
end
def(get_request_struct(1)) do
%V1.Request{}
end
def(get_request_struct(2)) do
%V2.Request{}
end
def(get_request_struct(3)) do
%V3.Request{}
end
def(get_request_struct(4)) do
%V4.Request{}
end
def(get_request_struct(5)) do
%V5.Request{}
end
def(get_request_struct(6)) do
%V6.Request{}
end
def(get_request_struct(7)) do
%V7.Request{}
end
defmodule(V0.Response) do
@vsn 0
@api :fetch
@schema responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64
],
record_set: :records
]}
]}
@moduledoc "Kayrock-generated response struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(responses: [], correlation_id: nil)
@typedoc "Response struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
responses: [
%{
topic: nil | binary(),
partition_responses: [
%{
partition_header: %{
partition: nil | integer(),
error_code: nil | integer(),
high_watermark: nil | integer()
},
record_set: nil | Kayrock.MessageSet.t() | Kayrock.RecordBatch.t()
}
]
}
],
correlation_id: integer()
}
import(Elixir.Kayrock.Deserialize)
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
0
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64
],
record_set: :records
]}
]}
]
end
@doc "Deserialize data for this version of this API\n"
@spec deserialize(binary) :: {t(), binary}
def(deserialize(data)) do
<<correlation_id::32-signed, rest::binary>> = data
deserialize_field(:root, :responses, %__MODULE__{correlation_id: correlation_id}, rest)
end
defp(deserialize_field(:responses, :topic, acc, data)) do
{val, rest} = deserialize(:string, data)
deserialize_field(:responses, :partition_responses, Map.put(acc, :topic, val), rest)
end
defp(deserialize_field(:partition_header, :partition, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:partition_header, :error_code, Map.put(acc, :partition, val), rest)
end
defp(deserialize_field(:partition_header, :error_code, acc, data)) do
{val, rest} = deserialize(:int16, data)
deserialize_field(:partition_header, :high_watermark, Map.put(acc, :error_code, val), rest)
end
defp(deserialize_field(:partition_header, :high_watermark, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(:partition_header, nil, Map.put(acc, :high_watermark, val), rest)
end
defp(deserialize_field(:partition_responses, :partition_header, acc, data)) do
{val, rest} = deserialize_field(:partition_header, :partition, %{}, data)
deserialize_field(
:partition_responses,
:record_set,
Map.put(acc, :partition_header, val),
rest
)
end
defp(deserialize_field(:partition_responses, :record_set, acc, data)) do
<<msg_set_size::32-signed, msg_set_data::size(msg_set_size)-binary, rest::bits>> = data
val = Elixir.Kayrock.RecordBatch.deserialize(msg_set_size, msg_set_data)
deserialize_field(:partition_responses, nil, Map.put(acc, :record_set, val), rest)
end
defp(deserialize_field(:responses, :partition_responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:partition_responses, :partition_header, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:responses,
nil,
Map.put(acc, :partition_responses, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:root, :responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:responses, :topic, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(:root, nil, Map.put(acc, :responses, Enum.reverse(vals)), rest)
end
defp(deserialize_field(_, nil, acc, rest)) do
{acc, rest}
end
end
defmodule(V1.Response) do
@vsn 1
@api :fetch
@schema throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64
],
record_set: :records
]}
]}
@moduledoc "Kayrock-generated response struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(throttle_time_ms: nil, responses: [], correlation_id: nil)
@typedoc "Response struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
throttle_time_ms: nil | integer(),
responses: [
%{
topic: nil | binary(),
partition_responses: [
%{
partition_header: %{
partition: nil | integer(),
error_code: nil | integer(),
high_watermark: nil | integer()
},
record_set: nil | Kayrock.MessageSet.t() | Kayrock.RecordBatch.t()
}
]
}
],
correlation_id: integer()
}
import(Elixir.Kayrock.Deserialize)
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
1
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64
],
record_set: :records
]}
]}
]
end
@doc "Deserialize data for this version of this API\n"
@spec deserialize(binary) :: {t(), binary}
def(deserialize(data)) do
<<correlation_id::32-signed, rest::binary>> = data
deserialize_field(
:root,
:throttle_time_ms,
%__MODULE__{correlation_id: correlation_id},
rest
)
end
defp(deserialize_field(:root, :throttle_time_ms, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:root, :responses, Map.put(acc, :throttle_time_ms, val), rest)
end
defp(deserialize_field(:responses, :topic, acc, data)) do
{val, rest} = deserialize(:string, data)
deserialize_field(:responses, :partition_responses, Map.put(acc, :topic, val), rest)
end
defp(deserialize_field(:partition_header, :partition, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:partition_header, :error_code, Map.put(acc, :partition, val), rest)
end
defp(deserialize_field(:partition_header, :error_code, acc, data)) do
{val, rest} = deserialize(:int16, data)
deserialize_field(:partition_header, :high_watermark, Map.put(acc, :error_code, val), rest)
end
defp(deserialize_field(:partition_header, :high_watermark, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(:partition_header, nil, Map.put(acc, :high_watermark, val), rest)
end
defp(deserialize_field(:partition_responses, :partition_header, acc, data)) do
{val, rest} = deserialize_field(:partition_header, :partition, %{}, data)
deserialize_field(
:partition_responses,
:record_set,
Map.put(acc, :partition_header, val),
rest
)
end
defp(deserialize_field(:partition_responses, :record_set, acc, data)) do
<<msg_set_size::32-signed, msg_set_data::size(msg_set_size)-binary, rest::bits>> = data
val = Elixir.Kayrock.RecordBatch.deserialize(msg_set_size, msg_set_data)
deserialize_field(:partition_responses, nil, Map.put(acc, :record_set, val), rest)
end
defp(deserialize_field(:responses, :partition_responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:partition_responses, :partition_header, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:responses,
nil,
Map.put(acc, :partition_responses, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:root, :responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:responses, :topic, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(:root, nil, Map.put(acc, :responses, Enum.reverse(vals)), rest)
end
defp(deserialize_field(_, nil, acc, rest)) do
{acc, rest}
end
end
defmodule(V2.Response) do
@vsn 2
@api :fetch
@schema throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64
],
record_set: :records
]}
]}
@moduledoc "Kayrock-generated response struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(throttle_time_ms: nil, responses: [], correlation_id: nil)
@typedoc "Response struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
throttle_time_ms: nil | integer(),
responses: [
%{
topic: nil | binary(),
partition_responses: [
%{
partition_header: %{
partition: nil | integer(),
error_code: nil | integer(),
high_watermark: nil | integer()
},
record_set: nil | Kayrock.MessageSet.t() | Kayrock.RecordBatch.t()
}
]
}
],
correlation_id: integer()
}
import(Elixir.Kayrock.Deserialize)
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
2
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64
],
record_set: :records
]}
]}
]
end
@doc "Deserialize data for this version of this API\n"
@spec deserialize(binary) :: {t(), binary}
def(deserialize(data)) do
<<correlation_id::32-signed, rest::binary>> = data
deserialize_field(
:root,
:throttle_time_ms,
%__MODULE__{correlation_id: correlation_id},
rest
)
end
defp(deserialize_field(:root, :throttle_time_ms, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:root, :responses, Map.put(acc, :throttle_time_ms, val), rest)
end
defp(deserialize_field(:responses, :topic, acc, data)) do
{val, rest} = deserialize(:string, data)
deserialize_field(:responses, :partition_responses, Map.put(acc, :topic, val), rest)
end
defp(deserialize_field(:partition_header, :partition, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:partition_header, :error_code, Map.put(acc, :partition, val), rest)
end
defp(deserialize_field(:partition_header, :error_code, acc, data)) do
{val, rest} = deserialize(:int16, data)
deserialize_field(:partition_header, :high_watermark, Map.put(acc, :error_code, val), rest)
end
defp(deserialize_field(:partition_header, :high_watermark, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(:partition_header, nil, Map.put(acc, :high_watermark, val), rest)
end
defp(deserialize_field(:partition_responses, :partition_header, acc, data)) do
{val, rest} = deserialize_field(:partition_header, :partition, %{}, data)
deserialize_field(
:partition_responses,
:record_set,
Map.put(acc, :partition_header, val),
rest
)
end
defp(deserialize_field(:partition_responses, :record_set, acc, data)) do
<<msg_set_size::32-signed, msg_set_data::size(msg_set_size)-binary, rest::bits>> = data
val = Elixir.Kayrock.RecordBatch.deserialize(msg_set_size, msg_set_data)
deserialize_field(:partition_responses, nil, Map.put(acc, :record_set, val), rest)
end
defp(deserialize_field(:responses, :partition_responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:partition_responses, :partition_header, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:responses,
nil,
Map.put(acc, :partition_responses, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:root, :responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:responses, :topic, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(:root, nil, Map.put(acc, :responses, Enum.reverse(vals)), rest)
end
defp(deserialize_field(_, nil, acc, rest)) do
{acc, rest}
end
end
defmodule(V3.Response) do
@vsn 3
@api :fetch
@schema throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64
],
record_set: :records
]}
]}
@moduledoc "Kayrock-generated response struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(throttle_time_ms: nil, responses: [], correlation_id: nil)
@typedoc "Response struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
throttle_time_ms: nil | integer(),
responses: [
%{
topic: nil | binary(),
partition_responses: [
%{
partition_header: %{
partition: nil | integer(),
error_code: nil | integer(),
high_watermark: nil | integer()
},
record_set: nil | Kayrock.MessageSet.t() | Kayrock.RecordBatch.t()
}
]
}
],
correlation_id: integer()
}
import(Elixir.Kayrock.Deserialize)
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
3
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64
],
record_set: :records
]}
]}
]
end
@doc "Deserialize data for this version of this API\n"
@spec deserialize(binary) :: {t(), binary}
def(deserialize(data)) do
<<correlation_id::32-signed, rest::binary>> = data
deserialize_field(
:root,
:throttle_time_ms,
%__MODULE__{correlation_id: correlation_id},
rest
)
end
defp(deserialize_field(:root, :throttle_time_ms, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:root, :responses, Map.put(acc, :throttle_time_ms, val), rest)
end
defp(deserialize_field(:responses, :topic, acc, data)) do
{val, rest} = deserialize(:string, data)
deserialize_field(:responses, :partition_responses, Map.put(acc, :topic, val), rest)
end
defp(deserialize_field(:partition_header, :partition, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:partition_header, :error_code, Map.put(acc, :partition, val), rest)
end
defp(deserialize_field(:partition_header, :error_code, acc, data)) do
{val, rest} = deserialize(:int16, data)
deserialize_field(:partition_header, :high_watermark, Map.put(acc, :error_code, val), rest)
end
defp(deserialize_field(:partition_header, :high_watermark, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(:partition_header, nil, Map.put(acc, :high_watermark, val), rest)
end
defp(deserialize_field(:partition_responses, :partition_header, acc, data)) do
{val, rest} = deserialize_field(:partition_header, :partition, %{}, data)
deserialize_field(
:partition_responses,
:record_set,
Map.put(acc, :partition_header, val),
rest
)
end
defp(deserialize_field(:partition_responses, :record_set, acc, data)) do
<<msg_set_size::32-signed, msg_set_data::size(msg_set_size)-binary, rest::bits>> = data
val = Elixir.Kayrock.RecordBatch.deserialize(msg_set_size, msg_set_data)
deserialize_field(:partition_responses, nil, Map.put(acc, :record_set, val), rest)
end
defp(deserialize_field(:responses, :partition_responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:partition_responses, :partition_header, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:responses,
nil,
Map.put(acc, :partition_responses, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:root, :responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:responses, :topic, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(:root, nil, Map.put(acc, :responses, Enum.reverse(vals)), rest)
end
defp(deserialize_field(_, nil, acc, rest)) do
{acc, rest}
end
end
defmodule(V4.Response) do
@vsn 4
@api :fetch
@schema throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64,
last_stable_offset: :int64,
aborted_transactions:
{:array, [producer_id: :int64, first_offset: :int64]}
],
record_set: :records
]}
]}
@moduledoc "Kayrock-generated response struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(throttle_time_ms: nil, responses: [], correlation_id: nil)
@typedoc "Response struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
throttle_time_ms: nil | integer(),
responses: [
%{
topic: nil | binary(),
partition_responses: [
%{
partition_header: %{
partition: nil | integer(),
error_code: nil | integer(),
high_watermark: nil | integer(),
last_stable_offset: nil | integer(),
aborted_transactions: [
%{producer_id: nil | integer(), first_offset: nil | integer()}
]
},
record_set: nil | Kayrock.MessageSet.t() | Kayrock.RecordBatch.t()
}
]
}
],
correlation_id: integer()
}
import(Elixir.Kayrock.Deserialize)
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
4
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64,
last_stable_offset: :int64,
aborted_transactions: {:array, [producer_id: :int64, first_offset: :int64]}
],
record_set: :records
]}
]}
]
end
@doc "Deserialize data for this version of this API\n"
@spec deserialize(binary) :: {t(), binary}
def(deserialize(data)) do
<<correlation_id::32-signed, rest::binary>> = data
deserialize_field(
:root,
:throttle_time_ms,
%__MODULE__{correlation_id: correlation_id},
rest
)
end
defp(deserialize_field(:root, :throttle_time_ms, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:root, :responses, Map.put(acc, :throttle_time_ms, val), rest)
end
defp(deserialize_field(:responses, :topic, acc, data)) do
{val, rest} = deserialize(:string, data)
deserialize_field(:responses, :partition_responses, Map.put(acc, :topic, val), rest)
end
defp(deserialize_field(:partition_header, :partition, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:partition_header, :error_code, Map.put(acc, :partition, val), rest)
end
defp(deserialize_field(:partition_header, :error_code, acc, data)) do
{val, rest} = deserialize(:int16, data)
deserialize_field(:partition_header, :high_watermark, Map.put(acc, :error_code, val), rest)
end
defp(deserialize_field(:partition_header, :high_watermark, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:partition_header,
:last_stable_offset,
Map.put(acc, :high_watermark, val),
rest
)
end
defp(deserialize_field(:partition_header, :last_stable_offset, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:partition_header,
:aborted_transactions,
Map.put(acc, :last_stable_offset, val),
rest
)
end
defp(deserialize_field(:aborted_transactions, :producer_id, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:aborted_transactions,
:first_offset,
Map.put(acc, :producer_id, val),
rest
)
end
defp(deserialize_field(:aborted_transactions, :first_offset, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(:aborted_transactions, nil, Map.put(acc, :first_offset, val), rest)
end
defp(deserialize_field(:partition_header, :aborted_transactions, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:aborted_transactions, :producer_id, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:partition_header,
nil,
Map.put(acc, :aborted_transactions, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:partition_responses, :partition_header, acc, data)) do
{val, rest} = deserialize_field(:partition_header, :partition, %{}, data)
deserialize_field(
:partition_responses,
:record_set,
Map.put(acc, :partition_header, val),
rest
)
end
defp(deserialize_field(:partition_responses, :record_set, acc, data)) do
<<msg_set_size::32-signed, msg_set_data::size(msg_set_size)-binary, rest::bits>> = data
val = Elixir.Kayrock.RecordBatch.deserialize(msg_set_size, msg_set_data)
deserialize_field(:partition_responses, nil, Map.put(acc, :record_set, val), rest)
end
defp(deserialize_field(:responses, :partition_responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:partition_responses, :partition_header, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:responses,
nil,
Map.put(acc, :partition_responses, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:root, :responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:responses, :topic, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(:root, nil, Map.put(acc, :responses, Enum.reverse(vals)), rest)
end
defp(deserialize_field(_, nil, acc, rest)) do
{acc, rest}
end
end
defmodule(V5.Response) do
@vsn 5
@api :fetch
@schema throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64,
last_stable_offset: :int64,
log_start_offset: :int64,
aborted_transactions:
{:array, [producer_id: :int64, first_offset: :int64]}
],
record_set: :records
]}
]}
@moduledoc "Kayrock-generated response struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(throttle_time_ms: nil, responses: [], correlation_id: nil)
@typedoc "Response struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
throttle_time_ms: nil | integer(),
responses: [
%{
topic: nil | binary(),
partition_responses: [
%{
partition_header: %{
partition: nil | integer(),
error_code: nil | integer(),
high_watermark: nil | integer(),
last_stable_offset: nil | integer(),
log_start_offset: nil | integer(),
aborted_transactions: [
%{producer_id: nil | integer(), first_offset: nil | integer()}
]
},
record_set: nil | Kayrock.MessageSet.t() | Kayrock.RecordBatch.t()
}
]
}
],
correlation_id: integer()
}
import(Elixir.Kayrock.Deserialize)
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
5
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64,
last_stable_offset: :int64,
log_start_offset: :int64,
aborted_transactions: {:array, [producer_id: :int64, first_offset: :int64]}
],
record_set: :records
]}
]}
]
end
@doc "Deserialize data for this version of this API\n"
@spec deserialize(binary) :: {t(), binary}
def(deserialize(data)) do
<<correlation_id::32-signed, rest::binary>> = data
deserialize_field(
:root,
:throttle_time_ms,
%__MODULE__{correlation_id: correlation_id},
rest
)
end
defp(deserialize_field(:root, :throttle_time_ms, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:root, :responses, Map.put(acc, :throttle_time_ms, val), rest)
end
defp(deserialize_field(:responses, :topic, acc, data)) do
{val, rest} = deserialize(:string, data)
deserialize_field(:responses, :partition_responses, Map.put(acc, :topic, val), rest)
end
defp(deserialize_field(:partition_header, :partition, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:partition_header, :error_code, Map.put(acc, :partition, val), rest)
end
defp(deserialize_field(:partition_header, :error_code, acc, data)) do
{val, rest} = deserialize(:int16, data)
deserialize_field(:partition_header, :high_watermark, Map.put(acc, :error_code, val), rest)
end
defp(deserialize_field(:partition_header, :high_watermark, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:partition_header,
:last_stable_offset,
Map.put(acc, :high_watermark, val),
rest
)
end
defp(deserialize_field(:partition_header, :last_stable_offset, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:partition_header,
:log_start_offset,
Map.put(acc, :last_stable_offset, val),
rest
)
end
defp(deserialize_field(:partition_header, :log_start_offset, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:partition_header,
:aborted_transactions,
Map.put(acc, :log_start_offset, val),
rest
)
end
defp(deserialize_field(:aborted_transactions, :producer_id, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:aborted_transactions,
:first_offset,
Map.put(acc, :producer_id, val),
rest
)
end
defp(deserialize_field(:aborted_transactions, :first_offset, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(:aborted_transactions, nil, Map.put(acc, :first_offset, val), rest)
end
defp(deserialize_field(:partition_header, :aborted_transactions, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:aborted_transactions, :producer_id, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:partition_header,
nil,
Map.put(acc, :aborted_transactions, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:partition_responses, :partition_header, acc, data)) do
{val, rest} = deserialize_field(:partition_header, :partition, %{}, data)
deserialize_field(
:partition_responses,
:record_set,
Map.put(acc, :partition_header, val),
rest
)
end
defp(deserialize_field(:partition_responses, :record_set, acc, data)) do
<<msg_set_size::32-signed, msg_set_data::size(msg_set_size)-binary, rest::bits>> = data
val = Elixir.Kayrock.RecordBatch.deserialize(msg_set_size, msg_set_data)
deserialize_field(:partition_responses, nil, Map.put(acc, :record_set, val), rest)
end
defp(deserialize_field(:responses, :partition_responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:partition_responses, :partition_header, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:responses,
nil,
Map.put(acc, :partition_responses, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:root, :responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:responses, :topic, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(:root, nil, Map.put(acc, :responses, Enum.reverse(vals)), rest)
end
defp(deserialize_field(_, nil, acc, rest)) do
{acc, rest}
end
end
defmodule(V6.Response) do
@vsn 6
@api :fetch
@schema throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64,
last_stable_offset: :int64,
log_start_offset: :int64,
aborted_transactions:
{:array, [producer_id: :int64, first_offset: :int64]}
],
record_set: :records
]}
]}
@moduledoc "Kayrock-generated response struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(throttle_time_ms: nil, responses: [], correlation_id: nil)
@typedoc "Response struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
throttle_time_ms: nil | integer(),
responses: [
%{
topic: nil | binary(),
partition_responses: [
%{
partition_header: %{
partition: nil | integer(),
error_code: nil | integer(),
high_watermark: nil | integer(),
last_stable_offset: nil | integer(),
log_start_offset: nil | integer(),
aborted_transactions: [
%{producer_id: nil | integer(), first_offset: nil | integer()}
]
},
record_set: nil | Kayrock.MessageSet.t() | Kayrock.RecordBatch.t()
}
]
}
],
correlation_id: integer()
}
import(Elixir.Kayrock.Deserialize)
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
6
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
throttle_time_ms: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64,
last_stable_offset: :int64,
log_start_offset: :int64,
aborted_transactions: {:array, [producer_id: :int64, first_offset: :int64]}
],
record_set: :records
]}
]}
]
end
@doc "Deserialize data for this version of this API\n"
@spec deserialize(binary) :: {t(), binary}
def(deserialize(data)) do
<<correlation_id::32-signed, rest::binary>> = data
deserialize_field(
:root,
:throttle_time_ms,
%__MODULE__{correlation_id: correlation_id},
rest
)
end
defp(deserialize_field(:root, :throttle_time_ms, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:root, :responses, Map.put(acc, :throttle_time_ms, val), rest)
end
defp(deserialize_field(:responses, :topic, acc, data)) do
{val, rest} = deserialize(:string, data)
deserialize_field(:responses, :partition_responses, Map.put(acc, :topic, val), rest)
end
defp(deserialize_field(:partition_header, :partition, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:partition_header, :error_code, Map.put(acc, :partition, val), rest)
end
defp(deserialize_field(:partition_header, :error_code, acc, data)) do
{val, rest} = deserialize(:int16, data)
deserialize_field(:partition_header, :high_watermark, Map.put(acc, :error_code, val), rest)
end
defp(deserialize_field(:partition_header, :high_watermark, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:partition_header,
:last_stable_offset,
Map.put(acc, :high_watermark, val),
rest
)
end
defp(deserialize_field(:partition_header, :last_stable_offset, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:partition_header,
:log_start_offset,
Map.put(acc, :last_stable_offset, val),
rest
)
end
defp(deserialize_field(:partition_header, :log_start_offset, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:partition_header,
:aborted_transactions,
Map.put(acc, :log_start_offset, val),
rest
)
end
defp(deserialize_field(:aborted_transactions, :producer_id, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:aborted_transactions,
:first_offset,
Map.put(acc, :producer_id, val),
rest
)
end
defp(deserialize_field(:aborted_transactions, :first_offset, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(:aborted_transactions, nil, Map.put(acc, :first_offset, val), rest)
end
defp(deserialize_field(:partition_header, :aborted_transactions, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:aborted_transactions, :producer_id, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:partition_header,
nil,
Map.put(acc, :aborted_transactions, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:partition_responses, :partition_header, acc, data)) do
{val, rest} = deserialize_field(:partition_header, :partition, %{}, data)
deserialize_field(
:partition_responses,
:record_set,
Map.put(acc, :partition_header, val),
rest
)
end
defp(deserialize_field(:partition_responses, :record_set, acc, data)) do
<<msg_set_size::32-signed, msg_set_data::size(msg_set_size)-binary, rest::bits>> = data
val = Elixir.Kayrock.RecordBatch.deserialize(msg_set_size, msg_set_data)
deserialize_field(:partition_responses, nil, Map.put(acc, :record_set, val), rest)
end
defp(deserialize_field(:responses, :partition_responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:partition_responses, :partition_header, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:responses,
nil,
Map.put(acc, :partition_responses, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:root, :responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:responses, :topic, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(:root, nil, Map.put(acc, :responses, Enum.reverse(vals)), rest)
end
defp(deserialize_field(_, nil, acc, rest)) do
{acc, rest}
end
end
defmodule(V7.Response) do
@vsn 7
@api :fetch
@schema throttle_time_ms: :int32,
error_code: :int16,
session_id: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64,
last_stable_offset: :int64,
log_start_offset: :int64,
aborted_transactions:
{:array, [producer_id: :int64, first_offset: :int64]}
],
record_set: :records
]}
]}
@moduledoc "Kayrock-generated response struct for Kafka `#{@api}` v#{@vsn} API\nmessages\n\nThe schema of this API is\n```\n#{
inspect(@schema, pretty: true)
}\n```\n"
_ = " THIS CODE IS GENERATED BY KAYROCK"
defstruct(
throttle_time_ms: nil,
error_code: nil,
session_id: nil,
responses: [],
correlation_id: nil
)
@typedoc "Response struct for the Kafka `#{@api}` API v#{@vsn}\n"
@type t :: %__MODULE__{
throttle_time_ms: nil | integer(),
error_code: nil | integer(),
session_id: nil | integer(),
responses: [
%{
topic: nil | binary(),
partition_responses: [
%{
partition_header: %{
partition: nil | integer(),
error_code: nil | integer(),
high_watermark: nil | integer(),
last_stable_offset: nil | integer(),
log_start_offset: nil | integer(),
aborted_transactions: [
%{producer_id: nil | integer(), first_offset: nil | integer()}
]
},
record_set: nil | Kayrock.MessageSet.t() | Kayrock.RecordBatch.t()
}
]
}
],
correlation_id: integer()
}
import(Elixir.Kayrock.Deserialize)
@doc "Returns the Kafka API key for this API"
@spec api_key :: integer
def(api_key) do
Kayrock.KafkaSchemaMetadata.api_key(:fetch)
end
@doc "Returns the API version (#{@vsn}) implemented by this module"
@spec api_vsn :: integer
def(api_vsn) do
7
end
@doc "Returns the schema of this message\n\nSee [above](#).\n"
@spec schema :: term
def(schema) do
[
throttle_time_ms: :int32,
error_code: :int16,
session_id: :int32,
responses:
{:array,
[
topic: :string,
partition_responses:
{:array,
[
partition_header: [
partition: :int32,
error_code: :int16,
high_watermark: :int64,
last_stable_offset: :int64,
log_start_offset: :int64,
aborted_transactions: {:array, [producer_id: :int64, first_offset: :int64]}
],
record_set: :records
]}
]}
]
end
@doc "Deserialize data for this version of this API\n"
@spec deserialize(binary) :: {t(), binary}
def(deserialize(data)) do
<<correlation_id::32-signed, rest::binary>> = data
deserialize_field(
:root,
:throttle_time_ms,
%__MODULE__{correlation_id: correlation_id},
rest
)
end
defp(deserialize_field(:root, :throttle_time_ms, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:root, :error_code, Map.put(acc, :throttle_time_ms, val), rest)
end
defp(deserialize_field(:root, :error_code, acc, data)) do
{val, rest} = deserialize(:int16, data)
deserialize_field(:root, :session_id, Map.put(acc, :error_code, val), rest)
end
defp(deserialize_field(:root, :session_id, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:root, :responses, Map.put(acc, :session_id, val), rest)
end
defp(deserialize_field(:responses, :topic, acc, data)) do
{val, rest} = deserialize(:string, data)
deserialize_field(:responses, :partition_responses, Map.put(acc, :topic, val), rest)
end
defp(deserialize_field(:partition_header, :partition, acc, data)) do
{val, rest} = deserialize(:int32, data)
deserialize_field(:partition_header, :error_code, Map.put(acc, :partition, val), rest)
end
defp(deserialize_field(:partition_header, :error_code, acc, data)) do
{val, rest} = deserialize(:int16, data)
deserialize_field(:partition_header, :high_watermark, Map.put(acc, :error_code, val), rest)
end
defp(deserialize_field(:partition_header, :high_watermark, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:partition_header,
:last_stable_offset,
Map.put(acc, :high_watermark, val),
rest
)
end
defp(deserialize_field(:partition_header, :last_stable_offset, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:partition_header,
:log_start_offset,
Map.put(acc, :last_stable_offset, val),
rest
)
end
defp(deserialize_field(:partition_header, :log_start_offset, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:partition_header,
:aborted_transactions,
Map.put(acc, :log_start_offset, val),
rest
)
end
defp(deserialize_field(:aborted_transactions, :producer_id, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(
:aborted_transactions,
:first_offset,
Map.put(acc, :producer_id, val),
rest
)
end
defp(deserialize_field(:aborted_transactions, :first_offset, acc, data)) do
{val, rest} = deserialize(:int64, data)
deserialize_field(:aborted_transactions, nil, Map.put(acc, :first_offset, val), rest)
end
defp(deserialize_field(:partition_header, :aborted_transactions, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:aborted_transactions, :producer_id, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:partition_header,
nil,
Map.put(acc, :aborted_transactions, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:partition_responses, :partition_header, acc, data)) do
{val, rest} = deserialize_field(:partition_header, :partition, %{}, data)
deserialize_field(
:partition_responses,
:record_set,
Map.put(acc, :partition_header, val),
rest
)
end
defp(deserialize_field(:partition_responses, :record_set, acc, data)) do
<<msg_set_size::32-signed, msg_set_data::size(msg_set_size)-binary, rest::bits>> = data
val = Elixir.Kayrock.RecordBatch.deserialize(msg_set_size, msg_set_data)
deserialize_field(:partition_responses, nil, Map.put(acc, :record_set, val), rest)
end
defp(deserialize_field(:responses, :partition_responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:partition_responses, :partition_header, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(
:responses,
nil,
Map.put(acc, :partition_responses, Enum.reverse(vals)),
rest
)
end
defp(deserialize_field(:root, :responses, acc, data)) do
<<num_elements::32-signed, rest::binary>> = data
{vals, rest} =
if(num_elements > 0) do
Enum.reduce(1..num_elements, {[], rest}, fn _ix, {acc, d} ->
{val, r} = deserialize_field(:responses, :topic, %{}, d)
{[val | acc], r}
end)
else
{[], rest}
end
deserialize_field(:root, nil, Map.put(acc, :responses, Enum.reverse(vals)), rest)
end
defp(deserialize_field(_, nil, acc, rest)) do
{acc, rest}
end
end
(
@doc "Deserializes raw wire data for this API with the given version"
@spec deserialize(integer, binary) :: {response_t, binary}
)
def(deserialize(0, data)) do
V0.Response.deserialize(data)
end
def(deserialize(1, data)) do
V1.Response.deserialize(data)
end
def(deserialize(2, data)) do
V2.Response.deserialize(data)
end
def(deserialize(3, data)) do
V3.Response.deserialize(data)
end
def(deserialize(4, data)) do
V4.Response.deserialize(data)
end
def(deserialize(5, data)) do
V5.Response.deserialize(data)
end
def(deserialize(6, data)) do
V6.Response.deserialize(data)
end
def(deserialize(7, data)) do
V7.Response.deserialize(data)
end
(
@typedoc "Union type for all request structs for this API"
@type request_t ::
Kayrock.Fetch.V7.Request.t()
| Kayrock.Fetch.V6.Request.t()
| Kayrock.Fetch.V5.Request.t()
| Kayrock.Fetch.V4.Request.t()
| Kayrock.Fetch.V3.Request.t()
| Kayrock.Fetch.V2.Request.t()
| Kayrock.Fetch.V1.Request.t()
| Kayrock.Fetch.V0.Request.t()
)
(
@typedoc "Union type for all response structs for this API"
@type response_t ::
Kayrock.Fetch.V7.Response.t()
| Kayrock.Fetch.V6.Response.t()
| Kayrock.Fetch.V5.Response.t()
| Kayrock.Fetch.V4.Response.t()
| Kayrock.Fetch.V3.Response.t()
| Kayrock.Fetch.V2.Response.t()
| Kayrock.Fetch.V1.Response.t()
| Kayrock.Fetch.V0.Response.t()
)
(
@doc "Returns the minimum version of this API supported by Kayrock (#{@vmin})"
@spec min_vsn :: integer
def(min_vsn) do
0
end
)
(
@doc "Returns the maximum version of this API supported by Kayrock (#{@vmax})"
@spec max_vsn :: integer
def(max_vsn) do
7
end
)
end