Skip to main content

src/aws@internal@codec@event_stream.erl

-module(aws@internal@codec@event_stream).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aws/internal/codec/event_stream.gleam").
-export([string_header/2, encode/1, decode/1, events_to_streaming_body/1, decode_all/1, fold_events/3, iter_events/1]).
-export_type([event/0, header/0, header_value/0, decode_error/0, iter_step/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " `application/vnd.amazon.eventstream` framing codec.\n"
    "\n"
    " AWS event-stream operations (Transcribe, Kinesis SubscribeToShard,\n"
    " Bedrock streaming responses, S3 SelectObjectContent, etc.) deliver\n"
    " their bodies as a sequence of self-describing frames rather than a\n"
    " single payload. Each frame carries a small header set and an\n"
    " opaque payload; the protocol handler unpacks one frame at a time\n"
    " off the streaming transport.\n"
    "\n"
    " On-wire layout (big-endian throughout):\n"
    "\n"
    " ```\n"
    " +-----------------------+\n"
    " | Total length      [4] |   <-- includes all four boxes below\n"
    " | Headers length    [4] |   <-- bytes of Headers section\n"
    " | Prelude CRC32     [4] |   <-- of the two ints above\n"
    " | Headers       [N1]    |\n"
    " | Payload       [N2]    |\n"
    " | Message CRC32     [4] |   <-- of every byte before this one\n"
    " +-----------------------+\n"
    " ```\n"
    "\n"
    " Each Header is `name_len[1] | name[name_len] | type[1] | value[...]`\n"
    " where `type` selects the header-value shape. All ten header-value\n"
    " shapes the protocol defines (bool true/false, byte, short, int,\n"
    " long, binary, string, timestamp, uuid) are implemented — see\n"
    " `HeaderValue` for the wire-code mapping.\n"
).

-type event() :: {event, list(header()), bitstring()}.

-type header() :: {header, binary(), header_value()}.

-type header_value() :: bool_true_value |
    bool_false_value |
    {byte_value, integer()} |
    {int16_value, integer()} |
    {int32_value, integer()} |
    {int64_value, integer()} |
    {binary_value, bitstring()} |
    {string_value, binary()} |
    {timestamp_value, integer()} |
    {uuid_value, bitstring()}.

-type decode_error() :: {malformed_frame, binary()} |
    bad_prelude_crc |
    bad_message_crc |
    {unknown_header_type, integer()}.

-type iter_step() :: {yield, event(), fun(() -> iter_step())} |
    done |
    {failed, decode_error()}.

-file("src/aws/internal/codec/event_stream.gleam", 49).
?DOC(
    " Look up the first `StringValue` header on `event` matching `name`.\n"
    " Used by codegen-emitted `parse_<op>_event` functions to dispatch\n"
    " on `:event-type` / `:message-type` (both are string-valued per the\n"
    " event-stream spec).\n"
).
-spec string_header(event(), binary()) -> {ok, binary()} | {error, nil}.
string_header(Event, Name) ->
    case gleam@list:find(
        erlang:element(2, Event),
        fun(H) -> erlang:element(2, H) =:= Name end
    ) of
        {ok, {header, _, {string_value, V}}} ->
            {ok, V};

        _ ->
            {error, nil}
    end.

-file("src/aws/internal/codec/event_stream.gleam", 149).
-spec pow2(integer()) -> integer().
pow2(Bits) ->
    case Bits of
        8 ->
            256;

        16 ->
            65536;

        32 ->
            4294967296;

        64 ->
            18446744073709551616;

        _ ->
            0
    end.

-file("src/aws/internal/codec/event_stream.gleam", 142).
-spec wrap(integer(), integer()) -> integer().
wrap(N, Bits) ->
    case N < 0 of
        true ->
            N + pow2(Bits);

        false ->
            N
    end.

-file("src/aws/internal/codec/event_stream.gleam", 117).
-spec encode_header_value(header_value()) -> bitstring().
encode_header_value(Value) ->
    case Value of
        bool_true_value ->
            <<0:8>>;

        bool_false_value ->
            <<1:8>>;

        {byte_value, N} ->
            <<2:8, ((wrap(N, 8))):8/big>>;

        {int16_value, N@1} ->
            <<3:8, ((wrap(N@1, 16))):16/big>>;

        {int32_value, N@2} ->
            <<4:8, ((wrap(N@2, 32))):32/big>>;

        {int64_value, N@3} ->
            <<5:8, ((wrap(N@3, 64))):64/big>>;

        {binary_value, Bytes} ->
            Len = erlang:byte_size(Bytes),
            <<6:8, Len:16/big, Bytes/bitstring>>;

        {string_value, S} ->
            Bytes@1 = gleam_stdlib:identity(S),
            Len@1 = erlang:byte_size(Bytes@1),
            <<7:8, Len@1:16/big, Bytes@1/bitstring>>;

        {timestamp_value, Millis} ->
            <<8:8, ((wrap(Millis, 64))):64/big>>;

        {uuid_value, Bytes@2} ->
            <<9:8, Bytes@2/bitstring>>
    end.

-file("src/aws/internal/codec/event_stream.gleam", 110).
-spec encode_header(header()) -> bitstring().
encode_header(Header) ->
    Name_bytes = gleam_stdlib:identity(erlang:element(2, Header)),
    Name_len = erlang:byte_size(Name_bytes),
    Value_bytes = encode_header_value(erlang:element(3, Header)),
    <<Name_len:8, Name_bytes/bitstring, Value_bytes/bitstring>>.

-file("src/aws/internal/codec/event_stream.gleam", 104).
-spec encode_headers(list(header())) -> bitstring().
encode_headers(Headers) ->
    gleam@list:fold(
        Headers,
        <<>>,
        fun(Acc, Header) ->
            <<Acc/bitstring, (encode_header(Header))/bitstring>>
        end
    ).

-file("src/aws/internal/codec/event_stream.gleam", 86).
?DOC(
    " Frame an `Event` for transmission. Computes both CRC32s (prelude\n"
    " over the first two ints, message over every preceding byte) and\n"
    " returns the assembled BitArray ready to hand to the streaming\n"
    " transport.\n"
).
-spec encode(event()) -> bitstring().
encode(Event) ->
    Headers_bytes = encode_headers(erlang:element(2, Event)),
    Headers_len = erlang:byte_size(Headers_bytes),
    Payload_len = erlang:byte_size(erlang:element(3, Event)),
    Total_len = ((12 + Headers_len) + Payload_len) + 4,
    Prelude = <<Total_len:32/big, Headers_len:32/big>>,
    Prelude_crc = erlang:crc32(Prelude),
    Body = <<Prelude/bitstring,
        Prelude_crc:32/big,
        Headers_bytes/bitstring,
        (erlang:element(3, Event))/bitstring>>,
    Message_crc = erlang:crc32(Body),
    <<Body/bitstring, Message_crc:32/big>>.

-file("src/aws/internal/codec/event_stream.gleam", 359).
-spec slice_after(bitstring(), integer()) -> bitstring().
slice_after(Bytes, Offset) ->
    case gleam_stdlib:bit_array_slice(
        Bytes,
        Offset,
        erlang:byte_size(Bytes) - Offset
    ) of
        {ok, B} ->
            B;

        {error, _} ->
            <<>>
    end.

-file("src/aws/internal/codec/event_stream.gleam", 326).
-spec decode_uuid_header(bitstring()) -> {ok, {header_value(), bitstring()}} |
    {error, decode_error()}.
decode_uuid_header(Rest) ->
    gleam@result:'try'(
        begin
            _pipe = gleam_stdlib:bit_array_slice(Rest, 0, 16),
            gleam@result:replace_error(
                _pipe,
                {malformed_frame, <<"uuid header truncated"/utf8>>}
            )
        end,
        fun(Uuid_bytes) ->
            {ok, {{uuid_value, Uuid_bytes}, slice_after(Rest, 16)}}
        end
    ).

-file("src/aws/internal/codec/event_stream.gleam", 303).
-spec unsign(integer(), integer()) -> integer().
unsign(N, Bits) ->
    Half = pow2(Bits) div 2,
    case N >= Half of
        true ->
            N - pow2(Bits);

        false ->
            N
    end.

-file("src/aws/internal/codec/event_stream.gleam", 287).
-spec decode_int_header(
    bitstring(),
    integer(),
    fun((integer()) -> header_value())
) -> {ok, {header_value(), bitstring()}} | {error, decode_error()}.
decode_int_header(Rest, Bits, Wrap_in) ->
    case {Bits, Rest} of
        {8, <<N:8/big, After/bitstring>>} ->
            {ok, {Wrap_in(unsign(N, 8)), After}};

        {16, <<N@1:16/big, After@1/bitstring>>} ->
            {ok, {Wrap_in(unsign(N@1, 16)), After@1}};

        {32, <<N@2:32/big, After@2/bitstring>>} ->
            {ok, {Wrap_in(unsign(N@2, 32)), After@2}};

        {64, <<N@3:64/big, After@3/bitstring>>} ->
            {ok, {Wrap_in(unsign(N@3, 64)), After@3}};

        {_, _} ->
            {error, {malformed_frame, <<"int header truncated"/utf8>>}}
    end.

-file("src/aws/internal/codec/event_stream.gleam", 336).
-spec decode_string_header(bitstring()) -> {ok, {header_value(), bitstring()}} |
    {error, decode_error()}.
decode_string_header(Rest) ->
    case Rest of
        <<Len:16/big, Value_and_rest/bitstring>> ->
            gleam@result:'try'(
                begin
                    _pipe = gleam_stdlib:bit_array_slice(Value_and_rest, 0, Len),
                    gleam@result:replace_error(
                        _pipe,
                        {malformed_frame, <<"string slice"/utf8>>}
                    )
                end,
                fun(Value_bytes) ->
                    gleam@result:'try'(
                        begin
                            _pipe@1 = gleam@bit_array:to_string(Value_bytes),
                            gleam@result:replace_error(
                                _pipe@1,
                                {malformed_frame, <<"string utf8"/utf8>>}
                            )
                        end,
                        fun(S) ->
                            {ok,
                                {{string_value, S},
                                    slice_after(Value_and_rest, Len)}}
                        end
                    )
                end
            );

        _ ->
            {error, {malformed_frame, <<"string header truncated"/utf8>>}}
    end.

-file("src/aws/internal/codec/event_stream.gleam", 311).
-spec decode_binary_header(bitstring()) -> {ok, {header_value(), bitstring()}} |
    {error, decode_error()}.
decode_binary_header(Rest) ->
    case Rest of
        <<Len:16/big, Value_and_rest/bitstring>> ->
            gleam@result:'try'(
                begin
                    _pipe = gleam_stdlib:bit_array_slice(Value_and_rest, 0, Len),
                    gleam@result:replace_error(
                        _pipe,
                        {malformed_frame, <<"binary slice"/utf8>>}
                    )
                end,
                fun(Value_bytes) ->
                    {ok,
                        {{binary_value, Value_bytes},
                            slice_after(Value_and_rest, Len)}}
                end
            );

        _ ->
            {error, {malformed_frame, <<"binary header truncated"/utf8>>}}
    end.

-file("src/aws/internal/codec/event_stream.gleam", 268).
-spec decode_header_value_body(integer(), bitstring()) -> {ok,
        {header_value(), bitstring()}} |
    {error, decode_error()}.
decode_header_value_body(Type_code, Rest) ->
    case Type_code of
        0 ->
            {ok, {bool_true_value, Rest}};

        1 ->
            {ok, {bool_false_value, Rest}};

        2 ->
            decode_int_header(Rest, 8, fun(N) -> {byte_value, N} end);

        3 ->
            decode_int_header(Rest, 16, fun(N@1) -> {int16_value, N@1} end);

        4 ->
            decode_int_header(Rest, 32, fun(N@2) -> {int32_value, N@2} end);

        5 ->
            decode_int_header(Rest, 64, fun(N@3) -> {int64_value, N@3} end);

        6 ->
            decode_binary_header(Rest);

        7 ->
            decode_string_header(Rest);

        8 ->
            decode_int_header(Rest, 64, fun(N@4) -> {timestamp_value, N@4} end);

        9 ->
            decode_uuid_header(Rest);

        Other ->
            {error, {unknown_header_type, Other}}
    end.

-file("src/aws/internal/codec/event_stream.gleam", 259).
-spec decode_header_value(bitstring()) -> {ok, {header_value(), bitstring()}} |
    {error, decode_error()}.
decode_header_value(Bytes) ->
    case Bytes of
        <<Type_code:8, Rest/bitstring>> ->
            decode_header_value_body(Type_code, Rest);

        _ ->
            {error,
                {malformed_frame, <<"header value missing type byte"/utf8>>}}
    end.

-file("src/aws/internal/codec/event_stream.gleam", 236).
-spec decode_headers(bitstring(), list(header())) -> {ok, list(header())} |
    {error, decode_error()}.
decode_headers(Bytes, Acc) ->
    case Bytes of
        <<>> ->
            {ok, lists:reverse(Acc)};

        <<Name_len:8, Rest/bitstring>> ->
            gleam@result:'try'(
                begin
                    _pipe = gleam_stdlib:bit_array_slice(Rest, 0, Name_len),
                    gleam@result:replace_error(
                        _pipe,
                        {malformed_frame, <<"header name slice"/utf8>>}
                    )
                end,
                fun(Name_bytes) ->
                    gleam@result:'try'(
                        begin
                            _pipe@1 = gleam@bit_array:to_string(Name_bytes),
                            gleam@result:replace_error(
                                _pipe@1,
                                {malformed_frame, <<"header name utf8"/utf8>>}
                            )
                        end,
                        fun(Name) ->
                            Value_rest = slice_after(Rest, Name_len),
                            gleam@result:'try'(
                                decode_header_value(Value_rest),
                                fun(_use0) ->
                                    {Value, After_value} = _use0,
                                    decode_headers(
                                        After_value,
                                        [{header, Name, Value} | Acc]
                                    )
                                end
                            )
                        end
                    )
                end
            );

        _ ->
            {error, {malformed_frame, <<"header truncated"/utf8>>}}
    end.

-file("src/aws/internal/codec/event_stream.gleam", 366).
-spec bytes_to_int_be(bitstring()) -> integer().
bytes_to_int_be(Bytes) ->
    case Bytes of
        <<N:32/big>> ->
            N;

        _ ->
            0
    end.

-file("src/aws/internal/codec/event_stream.gleam", 194).
-spec decode_after_prelude(integer(), integer(), bitstring(), bitstring()) -> {ok,
        {event(), bitstring()}} |
    {error, decode_error()}.
decode_after_prelude(Total, Headers_len, Rest_after_prelude, Original_bytes) ->
    Payload_len = ((Total - 12) - Headers_len) - 4,
    case Payload_len < 0 of
        true ->
            {error, {malformed_frame, <<"negative payload length"/utf8>>}};

        false ->
            gleam@result:'try'(
                begin
                    _pipe = gleam_stdlib:bit_array_slice(
                        Rest_after_prelude,
                        0,
                        Headers_len
                    ),
                    gleam@result:replace_error(
                        _pipe,
                        {malformed_frame, <<"headers slice failed"/utf8>>}
                    )
                end,
                fun(Headers_bytes) ->
                    gleam@result:'try'(
                        begin
                            _pipe@1 = gleam_stdlib:bit_array_slice(
                                Rest_after_prelude,
                                Headers_len,
                                Payload_len
                            ),
                            gleam@result:replace_error(
                                _pipe@1,
                                {malformed_frame,
                                    <<"payload slice failed"/utf8>>}
                            )
                        end,
                        fun(Payload) ->
                            Trailing_offset = Headers_len + Payload_len,
                            gleam@result:'try'(
                                begin
                                    _pipe@2 = gleam_stdlib:bit_array_slice(
                                        Rest_after_prelude,
                                        Trailing_offset,
                                        4
                                    ),
                                    gleam@result:replace_error(
                                        _pipe@2,
                                        {malformed_frame,
                                            <<"message crc slice failed"/utf8>>}
                                    )
                                end,
                                fun(Msg_crc_bytes) ->
                                    Body_len = Total - 4,
                                    gleam@result:'try'(
                                        begin
                                            _pipe@3 = gleam_stdlib:bit_array_slice(
                                                Original_bytes,
                                                0,
                                                Body_len
                                            ),
                                            gleam@result:replace_error(
                                                _pipe@3,
                                                {malformed_frame,
                                                    <<"body slice failed"/utf8>>}
                                            )
                                        end,
                                        fun(Body) ->
                                            case erlang:crc32(Body) =:= bytes_to_int_be(
                                                Msg_crc_bytes
                                            ) of
                                                false ->
                                                    {error, bad_message_crc};

                                                true ->
                                                    gleam@result:'try'(
                                                        decode_headers(
                                                            Headers_bytes,
                                                            []
                                                        ),
                                                        fun(Headers) ->
                                                            Rest = slice_after(
                                                                Rest_after_prelude,
                                                                Trailing_offset
                                                                + 4
                                                            ),
                                                            {ok,
                                                                {{event,
                                                                        Headers,
                                                                        Payload},
                                                                    Rest}}
                                                        end
                                                    )
                                            end
                                        end
                                    )
                                end
                            )
                        end
                    )
                end
            )
    end.

-file("src/aws/internal/codec/event_stream.gleam", 181).
?DOC(
    " Decode one framed message off the front of `bytes`. Returns the\n"
    " decoded `Event` plus the trailing bytes (which may hold the next\n"
    " frame; callers call `decode` again on the rest).\n"
    "\n"
    " Validates both CRCs end-to-end — partial / corrupted streams\n"
    " surface as `BadPreludeCrc` / `BadMessageCrc` rather than silently\n"
    " returning garbage.\n"
).
-spec decode(bitstring()) -> {ok, {event(), bitstring()}} |
    {error, decode_error()}.
decode(Bytes) ->
    case Bytes of
        <<Total:32/big, Headers_len:32/big, Prelude_crc:32/big, Rest/bitstring>> ->
            Prelude = <<Total:32/big, Headers_len:32/big>>,
            case erlang:crc32(Prelude) =:= Prelude_crc of
                false ->
                    {error, bad_prelude_crc};

                true ->
                    decode_after_prelude(Total, Headers_len, Rest, Bytes)
            end;

        _ ->
            {error, {malformed_frame, <<"shorter than prelude"/utf8>>}}
    end.

-file("src/aws/internal/codec/event_stream.gleam", 383).
?DOC(
    " Frame a list of events as a single `StreamingBody`. Each event\n"
    " is `encode`d in turn; the result is the concatenated frames in\n"
    " list order. Use this on the request side of `@eventStream`\n"
    " operations to hand the framed bytes to the streaming transport.\n"
    "\n"
    " The body is a `Chunked` `StreamingBody` carrying one chunk per\n"
    " event, so the streaming transport can write them on the wire\n"
    " one frame at a time (`fold_chunks` preserves order). Buffered-\n"
    " then-streamed callers see the same wire bytes — `to_bit_array`\n"
    " concatenates in order.\n"
).
-spec events_to_streaming_body(list(event())) -> aws@streaming:streaming_body().
events_to_streaming_body(Events) ->
    _pipe = Events,
    _pipe@1 = gleam@list:map(_pipe, fun encode/1),
    aws@streaming:from_chunks(_pipe@1).

-file("src/aws/internal/codec/event_stream.gleam", 404).
-spec decode_all_bytes(bitstring(), list(event())) -> {ok, list(event())} |
    {error, decode_error()}.
decode_all_bytes(Bytes, Acc) ->
    case Bytes of
        <<>> ->
            {ok, lists:reverse(Acc)};

        _ ->
            gleam@result:'try'(
                decode(Bytes),
                fun(_use0) ->
                    {Event, Rest} = _use0,
                    decode_all_bytes(Rest, [Event | Acc])
                end
            )
    end.

-file("src/aws/internal/codec/event_stream.gleam", 400).
?DOC(
    " Decode every frame from a streaming body. Materialises the full\n"
    " list of events — appropriate when the response is short (control\n"
    " messages, handshakes) or the call site wants to handle every\n"
    " event after the stream terminates. Long-lived subscription\n"
    " streams (`SubscribeToShard`, `StartStreamTranscription`) want\n"
    " `fold_events` instead so each event surfaces incrementally.\n"
    "\n"
    " The streaming body's chunks are concatenated first; the framing\n"
    " protocol's length fields make incremental parsing safe across\n"
    " chunk boundaries, but materialising-then-parsing is simpler and\n"
    " equally correct for buffer-bounded responses.\n"
).
-spec decode_all(aws@streaming:streaming_body()) -> {ok, list(event())} |
    {error, decode_error()}.
decode_all(Body) ->
    decode_all_bytes(aws@streaming:to_bit_array(Body), []).

-file("src/aws/internal/codec/event_stream.gleam", 440).
-spec fold_events_bytes(bitstring(), MJR, fun((MJR, event()) -> MJR)) -> {ok,
        MJR} |
    {error, decode_error()}.
fold_events_bytes(Bytes, Acc, F) ->
    case Bytes of
        <<>> ->
            {ok, Acc};

        _ ->
            gleam@result:'try'(
                decode(Bytes),
                fun(_use0) ->
                    {Event, Rest} = _use0,
                    fold_events_bytes(Rest, F(Acc, Event), F)
                end
            )
    end.

-file("src/aws/internal/codec/event_stream.gleam", 432).
?DOC(
    " Reduce a streaming body's event frames left-to-right by\n"
    " accumulating one decoded event at a time. The natural consumer\n"
    " API for long-lived subscription streams — the folder can update\n"
    " running state (counts, partial outputs, signals) without holding\n"
    " the whole event list in memory.\n"
    "\n"
    " Returns `Error(DecodeError)` the moment a frame fails CRC or\n"
    " length checks, preserving the accumulator up to (but not\n"
    " including) the bad frame. Callers that want to keep going past\n"
    " a bad frame must do their own resync.\n"
    "\n"
    " Reads the full body up front via `streaming.to_bit_array` so the\n"
    " fold runs on a single contiguous buffer; a future chunk-by-chunk\n"
    " consumer that decodes events as bytes arrive can keep this same\n"
    " surface — only the implementation changes.\n"
).
-spec fold_events(
    aws@streaming:streaming_body(),
    MJO,
    fun((MJO, event()) -> MJO)
) -> {ok, MJO} | {error, decode_error()}.
fold_events(Body, Initial, F) ->
    fold_events_bytes(aws@streaming:to_bit_array(Body), Initial, F).

-file("src/aws/internal/codec/event_stream.gleam", 485).
-spec iter_step(bitstring()) -> iter_step().
iter_step(Remaining) ->
    case Remaining of
        <<>> ->
            done;

        _ ->
            case decode(Remaining) of
                {ok, {Event, Rest}} ->
                    {yield, Event, fun() -> iter_step(Rest) end};

                {error, Err} ->
                    {failed, Err}
            end
    end.

-file("src/aws/internal/codec/event_stream.gleam", 481).
?DOC(
    " Wrap a streaming body as a pull-based event iterator. Each call\n"
    " to `next` returns either `Yield(event, next)` — the next decoded\n"
    " event plus a continuation for the rest of the stream — or `Done`\n"
    " at clean end-of-stream, or `Failed(err)` if the wire bytes don't\n"
    " parse.\n"
    "\n"
    " Useful for callers that want to drive consumption explicitly\n"
    " rather than handing the whole stream to `fold_events`. The\n"
    " codegen-emitted `<op>_event_stream(client, input)` wrappers\n"
    " return a `streaming.Response`; pipe `resp.body` through this\n"
    " helper to get a typed iterator without buffering the full event\n"
    " list in memory.\n"
    "\n"
    " Today materialises the body up front (same as `fold_events` —\n"
    " `streaming.to_bit_array`); a follow-up that streams chunk-by-\n"
    " chunk lands when the wire transport surfaces partial frames.\n"
).
-spec iter_events(aws@streaming:streaming_body()) -> iter_step().
iter_events(Body) ->
    iter_step(aws@streaming:to_bit_array(Body)).