-module(aws@internal@codec@event_stream).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aws/internal/codec/event_stream.gleam").
-export([string_header/2, encode/1, decode/1, events_to_streaming_body/1, decode_all/1, fold_events/3, iter_events/1]).
-export_type([event/0, header/0, header_value/0, decode_error/0, iter_step/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(
" `application/vnd.amazon.eventstream` framing codec.\n"
"\n"
" AWS event-stream operations (Transcribe, Kinesis SubscribeToShard,\n"
" Bedrock streaming responses, S3 SelectObjectContent, etc.) deliver\n"
" their bodies as a sequence of self-describing frames rather than a\n"
" single payload. Each frame carries a small header set and an\n"
" opaque payload; the protocol handler unpacks one frame at a time\n"
" off the streaming transport.\n"
"\n"
" On-wire layout (big-endian throughout):\n"
"\n"
" ```\n"
" +-----------------------+\n"
" | Total length [4] | <-- includes all four boxes below\n"
" | Headers length [4] | <-- bytes of Headers section\n"
" | Prelude CRC32 [4] | <-- of the two ints above\n"
" | Headers [N1] |\n"
" | Payload [N2] |\n"
" | Message CRC32 [4] | <-- of every byte before this one\n"
" +-----------------------+\n"
" ```\n"
"\n"
" Each Header is `name_len[1] | name[name_len] | type[1] | value[...]`\n"
" where `type` selects the header-value shape. All ten header-value\n"
" shapes the protocol defines (bool true/false, byte, short, int,\n"
" long, binary, string, timestamp, uuid) are implemented — see\n"
" `HeaderValue` for the wire-code mapping.\n"
).
-type event() :: {event, list(header()), bitstring()}.
-type header() :: {header, binary(), header_value()}.
-type header_value() :: bool_true_value |
bool_false_value |
{byte_value, integer()} |
{int16_value, integer()} |
{int32_value, integer()} |
{int64_value, integer()} |
{binary_value, bitstring()} |
{string_value, binary()} |
{timestamp_value, integer()} |
{uuid_value, bitstring()}.
-type decode_error() :: {malformed_frame, binary()} |
bad_prelude_crc |
bad_message_crc |
{unknown_header_type, integer()}.
-type iter_step() :: {yield, event(), fun(() -> iter_step())} |
done |
{failed, decode_error()}.
-file("src/aws/internal/codec/event_stream.gleam", 49).
?DOC(
" Look up the first `StringValue` header on `event` matching `name`.\n"
" Used by codegen-emitted `parse_<op>_event` functions to dispatch\n"
" on `:event-type` / `:message-type` (both are string-valued per the\n"
" event-stream spec).\n"
).
-spec string_header(event(), binary()) -> {ok, binary()} | {error, nil}.
string_header(Event, Name) ->
case gleam@list:find(
erlang:element(2, Event),
fun(H) -> erlang:element(2, H) =:= Name end
) of
{ok, {header, _, {string_value, V}}} ->
{ok, V};
_ ->
{error, nil}
end.
-file("src/aws/internal/codec/event_stream.gleam", 149).
-spec pow2(integer()) -> integer().
pow2(Bits) ->
case Bits of
8 ->
256;
16 ->
65536;
32 ->
4294967296;
64 ->
18446744073709551616;
_ ->
0
end.
-file("src/aws/internal/codec/event_stream.gleam", 142).
-spec wrap(integer(), integer()) -> integer().
wrap(N, Bits) ->
case N < 0 of
true ->
N + pow2(Bits);
false ->
N
end.
-file("src/aws/internal/codec/event_stream.gleam", 117).
-spec encode_header_value(header_value()) -> bitstring().
encode_header_value(Value) ->
case Value of
bool_true_value ->
<<0:8>>;
bool_false_value ->
<<1:8>>;
{byte_value, N} ->
<<2:8, ((wrap(N, 8))):8/big>>;
{int16_value, N@1} ->
<<3:8, ((wrap(N@1, 16))):16/big>>;
{int32_value, N@2} ->
<<4:8, ((wrap(N@2, 32))):32/big>>;
{int64_value, N@3} ->
<<5:8, ((wrap(N@3, 64))):64/big>>;
{binary_value, Bytes} ->
Len = erlang:byte_size(Bytes),
<<6:8, Len:16/big, Bytes/bitstring>>;
{string_value, S} ->
Bytes@1 = gleam_stdlib:identity(S),
Len@1 = erlang:byte_size(Bytes@1),
<<7:8, Len@1:16/big, Bytes@1/bitstring>>;
{timestamp_value, Millis} ->
<<8:8, ((wrap(Millis, 64))):64/big>>;
{uuid_value, Bytes@2} ->
<<9:8, Bytes@2/bitstring>>
end.
-file("src/aws/internal/codec/event_stream.gleam", 110).
-spec encode_header(header()) -> bitstring().
encode_header(Header) ->
Name_bytes = gleam_stdlib:identity(erlang:element(2, Header)),
Name_len = erlang:byte_size(Name_bytes),
Value_bytes = encode_header_value(erlang:element(3, Header)),
<<Name_len:8, Name_bytes/bitstring, Value_bytes/bitstring>>.
-file("src/aws/internal/codec/event_stream.gleam", 104).
-spec encode_headers(list(header())) -> bitstring().
encode_headers(Headers) ->
gleam@list:fold(
Headers,
<<>>,
fun(Acc, Header) ->
<<Acc/bitstring, (encode_header(Header))/bitstring>>
end
).
-file("src/aws/internal/codec/event_stream.gleam", 86).
?DOC(
" Frame an `Event` for transmission. Computes both CRC32s (prelude\n"
" over the first two ints, message over every preceding byte) and\n"
" returns the assembled BitArray ready to hand to the streaming\n"
" transport.\n"
).
-spec encode(event()) -> bitstring().
encode(Event) ->
Headers_bytes = encode_headers(erlang:element(2, Event)),
Headers_len = erlang:byte_size(Headers_bytes),
Payload_len = erlang:byte_size(erlang:element(3, Event)),
Total_len = ((12 + Headers_len) + Payload_len) + 4,
Prelude = <<Total_len:32/big, Headers_len:32/big>>,
Prelude_crc = erlang:crc32(Prelude),
Body = <<Prelude/bitstring,
Prelude_crc:32/big,
Headers_bytes/bitstring,
(erlang:element(3, Event))/bitstring>>,
Message_crc = erlang:crc32(Body),
<<Body/bitstring, Message_crc:32/big>>.
-file("src/aws/internal/codec/event_stream.gleam", 359).
-spec slice_after(bitstring(), integer()) -> bitstring().
slice_after(Bytes, Offset) ->
case gleam_stdlib:bit_array_slice(
Bytes,
Offset,
erlang:byte_size(Bytes) - Offset
) of
{ok, B} ->
B;
{error, _} ->
<<>>
end.
-file("src/aws/internal/codec/event_stream.gleam", 326).
-spec decode_uuid_header(bitstring()) -> {ok, {header_value(), bitstring()}} |
{error, decode_error()}.
decode_uuid_header(Rest) ->
gleam@result:'try'(
begin
_pipe = gleam_stdlib:bit_array_slice(Rest, 0, 16),
gleam@result:replace_error(
_pipe,
{malformed_frame, <<"uuid header truncated"/utf8>>}
)
end,
fun(Uuid_bytes) ->
{ok, {{uuid_value, Uuid_bytes}, slice_after(Rest, 16)}}
end
).
-file("src/aws/internal/codec/event_stream.gleam", 303).
-spec unsign(integer(), integer()) -> integer().
unsign(N, Bits) ->
Half = pow2(Bits) div 2,
case N >= Half of
true ->
N - pow2(Bits);
false ->
N
end.
-file("src/aws/internal/codec/event_stream.gleam", 287).
-spec decode_int_header(
bitstring(),
integer(),
fun((integer()) -> header_value())
) -> {ok, {header_value(), bitstring()}} | {error, decode_error()}.
decode_int_header(Rest, Bits, Wrap_in) ->
case {Bits, Rest} of
{8, <<N:8/big, After/bitstring>>} ->
{ok, {Wrap_in(unsign(N, 8)), After}};
{16, <<N@1:16/big, After@1/bitstring>>} ->
{ok, {Wrap_in(unsign(N@1, 16)), After@1}};
{32, <<N@2:32/big, After@2/bitstring>>} ->
{ok, {Wrap_in(unsign(N@2, 32)), After@2}};
{64, <<N@3:64/big, After@3/bitstring>>} ->
{ok, {Wrap_in(unsign(N@3, 64)), After@3}};
{_, _} ->
{error, {malformed_frame, <<"int header truncated"/utf8>>}}
end.
-file("src/aws/internal/codec/event_stream.gleam", 336).
-spec decode_string_header(bitstring()) -> {ok, {header_value(), bitstring()}} |
{error, decode_error()}.
decode_string_header(Rest) ->
case Rest of
<<Len:16/big, Value_and_rest/bitstring>> ->
gleam@result:'try'(
begin
_pipe = gleam_stdlib:bit_array_slice(Value_and_rest, 0, Len),
gleam@result:replace_error(
_pipe,
{malformed_frame, <<"string slice"/utf8>>}
)
end,
fun(Value_bytes) ->
gleam@result:'try'(
begin
_pipe@1 = gleam@bit_array:to_string(Value_bytes),
gleam@result:replace_error(
_pipe@1,
{malformed_frame, <<"string utf8"/utf8>>}
)
end,
fun(S) ->
{ok,
{{string_value, S},
slice_after(Value_and_rest, Len)}}
end
)
end
);
_ ->
{error, {malformed_frame, <<"string header truncated"/utf8>>}}
end.
-file("src/aws/internal/codec/event_stream.gleam", 311).
-spec decode_binary_header(bitstring()) -> {ok, {header_value(), bitstring()}} |
{error, decode_error()}.
decode_binary_header(Rest) ->
case Rest of
<<Len:16/big, Value_and_rest/bitstring>> ->
gleam@result:'try'(
begin
_pipe = gleam_stdlib:bit_array_slice(Value_and_rest, 0, Len),
gleam@result:replace_error(
_pipe,
{malformed_frame, <<"binary slice"/utf8>>}
)
end,
fun(Value_bytes) ->
{ok,
{{binary_value, Value_bytes},
slice_after(Value_and_rest, Len)}}
end
);
_ ->
{error, {malformed_frame, <<"binary header truncated"/utf8>>}}
end.
-file("src/aws/internal/codec/event_stream.gleam", 268).
-spec decode_header_value_body(integer(), bitstring()) -> {ok,
{header_value(), bitstring()}} |
{error, decode_error()}.
decode_header_value_body(Type_code, Rest) ->
case Type_code of
0 ->
{ok, {bool_true_value, Rest}};
1 ->
{ok, {bool_false_value, Rest}};
2 ->
decode_int_header(Rest, 8, fun(N) -> {byte_value, N} end);
3 ->
decode_int_header(Rest, 16, fun(N@1) -> {int16_value, N@1} end);
4 ->
decode_int_header(Rest, 32, fun(N@2) -> {int32_value, N@2} end);
5 ->
decode_int_header(Rest, 64, fun(N@3) -> {int64_value, N@3} end);
6 ->
decode_binary_header(Rest);
7 ->
decode_string_header(Rest);
8 ->
decode_int_header(Rest, 64, fun(N@4) -> {timestamp_value, N@4} end);
9 ->
decode_uuid_header(Rest);
Other ->
{error, {unknown_header_type, Other}}
end.
-file("src/aws/internal/codec/event_stream.gleam", 259).
-spec decode_header_value(bitstring()) -> {ok, {header_value(), bitstring()}} |
{error, decode_error()}.
decode_header_value(Bytes) ->
case Bytes of
<<Type_code:8, Rest/bitstring>> ->
decode_header_value_body(Type_code, Rest);
_ ->
{error,
{malformed_frame, <<"header value missing type byte"/utf8>>}}
end.
-file("src/aws/internal/codec/event_stream.gleam", 236).
-spec decode_headers(bitstring(), list(header())) -> {ok, list(header())} |
{error, decode_error()}.
decode_headers(Bytes, Acc) ->
case Bytes of
<<>> ->
{ok, lists:reverse(Acc)};
<<Name_len:8, Rest/bitstring>> ->
gleam@result:'try'(
begin
_pipe = gleam_stdlib:bit_array_slice(Rest, 0, Name_len),
gleam@result:replace_error(
_pipe,
{malformed_frame, <<"header name slice"/utf8>>}
)
end,
fun(Name_bytes) ->
gleam@result:'try'(
begin
_pipe@1 = gleam@bit_array:to_string(Name_bytes),
gleam@result:replace_error(
_pipe@1,
{malformed_frame, <<"header name utf8"/utf8>>}
)
end,
fun(Name) ->
Value_rest = slice_after(Rest, Name_len),
gleam@result:'try'(
decode_header_value(Value_rest),
fun(_use0) ->
{Value, After_value} = _use0,
decode_headers(
After_value,
[{header, Name, Value} | Acc]
)
end
)
end
)
end
);
_ ->
{error, {malformed_frame, <<"header truncated"/utf8>>}}
end.
-file("src/aws/internal/codec/event_stream.gleam", 366).
-spec bytes_to_int_be(bitstring()) -> integer().
bytes_to_int_be(Bytes) ->
case Bytes of
<<N:32/big>> ->
N;
_ ->
0
end.
-file("src/aws/internal/codec/event_stream.gleam", 194).
-spec decode_after_prelude(integer(), integer(), bitstring(), bitstring()) -> {ok,
{event(), bitstring()}} |
{error, decode_error()}.
decode_after_prelude(Total, Headers_len, Rest_after_prelude, Original_bytes) ->
Payload_len = ((Total - 12) - Headers_len) - 4,
case Payload_len < 0 of
true ->
{error, {malformed_frame, <<"negative payload length"/utf8>>}};
false ->
gleam@result:'try'(
begin
_pipe = gleam_stdlib:bit_array_slice(
Rest_after_prelude,
0,
Headers_len
),
gleam@result:replace_error(
_pipe,
{malformed_frame, <<"headers slice failed"/utf8>>}
)
end,
fun(Headers_bytes) ->
gleam@result:'try'(
begin
_pipe@1 = gleam_stdlib:bit_array_slice(
Rest_after_prelude,
Headers_len,
Payload_len
),
gleam@result:replace_error(
_pipe@1,
{malformed_frame,
<<"payload slice failed"/utf8>>}
)
end,
fun(Payload) ->
Trailing_offset = Headers_len + Payload_len,
gleam@result:'try'(
begin
_pipe@2 = gleam_stdlib:bit_array_slice(
Rest_after_prelude,
Trailing_offset,
4
),
gleam@result:replace_error(
_pipe@2,
{malformed_frame,
<<"message crc slice failed"/utf8>>}
)
end,
fun(Msg_crc_bytes) ->
Body_len = Total - 4,
gleam@result:'try'(
begin
_pipe@3 = gleam_stdlib:bit_array_slice(
Original_bytes,
0,
Body_len
),
gleam@result:replace_error(
_pipe@3,
{malformed_frame,
<<"body slice failed"/utf8>>}
)
end,
fun(Body) ->
case erlang:crc32(Body) =:= bytes_to_int_be(
Msg_crc_bytes
) of
false ->
{error, bad_message_crc};
true ->
gleam@result:'try'(
decode_headers(
Headers_bytes,
[]
),
fun(Headers) ->
Rest = slice_after(
Rest_after_prelude,
Trailing_offset
+ 4
),
{ok,
{{event,
Headers,
Payload},
Rest}}
end
)
end
end
)
end
)
end
)
end
)
end.
-file("src/aws/internal/codec/event_stream.gleam", 181).
?DOC(
" Decode one framed message off the front of `bytes`. Returns the\n"
" decoded `Event` plus the trailing bytes (which may hold the next\n"
" frame; callers call `decode` again on the rest).\n"
"\n"
" Validates both CRCs end-to-end — partial / corrupted streams\n"
" surface as `BadPreludeCrc` / `BadMessageCrc` rather than silently\n"
" returning garbage.\n"
).
-spec decode(bitstring()) -> {ok, {event(), bitstring()}} |
{error, decode_error()}.
decode(Bytes) ->
case Bytes of
<<Total:32/big, Headers_len:32/big, Prelude_crc:32/big, Rest/bitstring>> ->
Prelude = <<Total:32/big, Headers_len:32/big>>,
case erlang:crc32(Prelude) =:= Prelude_crc of
false ->
{error, bad_prelude_crc};
true ->
decode_after_prelude(Total, Headers_len, Rest, Bytes)
end;
_ ->
{error, {malformed_frame, <<"shorter than prelude"/utf8>>}}
end.
-file("src/aws/internal/codec/event_stream.gleam", 383).
?DOC(
" Frame a list of events as a single `StreamingBody`. Each event\n"
" is `encode`d in turn; the result is the concatenated frames in\n"
" list order. Use this on the request side of `@eventStream`\n"
" operations to hand the framed bytes to the streaming transport.\n"
"\n"
" The body is a `Chunked` `StreamingBody` carrying one chunk per\n"
" event, so the streaming transport can write them on the wire\n"
" one frame at a time (`fold_chunks` preserves order). Buffered-\n"
" then-streamed callers see the same wire bytes — `to_bit_array`\n"
" concatenates in order.\n"
).
-spec events_to_streaming_body(list(event())) -> aws@streaming:streaming_body().
events_to_streaming_body(Events) ->
_pipe = Events,
_pipe@1 = gleam@list:map(_pipe, fun encode/1),
aws@streaming:from_chunks(_pipe@1).
-file("src/aws/internal/codec/event_stream.gleam", 404).
-spec decode_all_bytes(bitstring(), list(event())) -> {ok, list(event())} |
{error, decode_error()}.
decode_all_bytes(Bytes, Acc) ->
case Bytes of
<<>> ->
{ok, lists:reverse(Acc)};
_ ->
gleam@result:'try'(
decode(Bytes),
fun(_use0) ->
{Event, Rest} = _use0,
decode_all_bytes(Rest, [Event | Acc])
end
)
end.
-file("src/aws/internal/codec/event_stream.gleam", 400).
?DOC(
" Decode every frame from a streaming body. Materialises the full\n"
" list of events — appropriate when the response is short (control\n"
" messages, handshakes) or the call site wants to handle every\n"
" event after the stream terminates. Long-lived subscription\n"
" streams (`SubscribeToShard`, `StartStreamTranscription`) want\n"
" `fold_events` instead so each event surfaces incrementally.\n"
"\n"
" The streaming body's chunks are concatenated first; the framing\n"
" protocol's length fields make incremental parsing safe across\n"
" chunk boundaries, but materialising-then-parsing is simpler and\n"
" equally correct for buffer-bounded responses.\n"
).
-spec decode_all(aws@streaming:streaming_body()) -> {ok, list(event())} |
{error, decode_error()}.
decode_all(Body) ->
decode_all_bytes(aws@streaming:to_bit_array(Body), []).
-file("src/aws/internal/codec/event_stream.gleam", 440).
-spec fold_events_bytes(bitstring(), MJR, fun((MJR, event()) -> MJR)) -> {ok,
MJR} |
{error, decode_error()}.
fold_events_bytes(Bytes, Acc, F) ->
case Bytes of
<<>> ->
{ok, Acc};
_ ->
gleam@result:'try'(
decode(Bytes),
fun(_use0) ->
{Event, Rest} = _use0,
fold_events_bytes(Rest, F(Acc, Event), F)
end
)
end.
-file("src/aws/internal/codec/event_stream.gleam", 432).
?DOC(
" Reduce a streaming body's event frames left-to-right by\n"
" accumulating one decoded event at a time. The natural consumer\n"
" API for long-lived subscription streams — the folder can update\n"
" running state (counts, partial outputs, signals) without holding\n"
" the whole event list in memory.\n"
"\n"
" Returns `Error(DecodeError)` the moment a frame fails CRC or\n"
" length checks, preserving the accumulator up to (but not\n"
" including) the bad frame. Callers that want to keep going past\n"
" a bad frame must do their own resync.\n"
"\n"
" Reads the full body up front via `streaming.to_bit_array` so the\n"
" fold runs on a single contiguous buffer; a future chunk-by-chunk\n"
" consumer that decodes events as bytes arrive can keep this same\n"
" surface — only the implementation changes.\n"
).
-spec fold_events(
aws@streaming:streaming_body(),
MJO,
fun((MJO, event()) -> MJO)
) -> {ok, MJO} | {error, decode_error()}.
fold_events(Body, Initial, F) ->
fold_events_bytes(aws@streaming:to_bit_array(Body), Initial, F).
-file("src/aws/internal/codec/event_stream.gleam", 485).
-spec iter_step(bitstring()) -> iter_step().
iter_step(Remaining) ->
case Remaining of
<<>> ->
done;
_ ->
case decode(Remaining) of
{ok, {Event, Rest}} ->
{yield, Event, fun() -> iter_step(Rest) end};
{error, Err} ->
{failed, Err}
end
end.
-file("src/aws/internal/codec/event_stream.gleam", 481).
?DOC(
" Wrap a streaming body as a pull-based event iterator. Each call\n"
" to `next` returns either `Yield(event, next)` — the next decoded\n"
" event plus a continuation for the rest of the stream — or `Done`\n"
" at clean end-of-stream, or `Failed(err)` if the wire bytes don't\n"
" parse.\n"
"\n"
" Useful for callers that want to drive consumption explicitly\n"
" rather than handing the whole stream to `fold_events`. The\n"
" codegen-emitted `<op>_event_stream(client, input)` wrappers\n"
" return a `streaming.Response`; pipe `resp.body` through this\n"
" helper to get a typed iterator without buffering the full event\n"
" list in memory.\n"
"\n"
" Today materialises the body up front (same as `fold_events` —\n"
" `streaming.to_bit_array`); a follow-up that streams chunk-by-\n"
" chunk lands when the wire transport surfaces partial frames.\n"
).
-spec iter_events(aws@streaming:streaming_body()) -> iter_step().
iter_events(Body) ->
iter_step(aws@streaming:to_bit_array(Body)).