src/marina_body.erl

-module(marina_body).
-include("marina_internal.hrl").

-compile(inline).
-compile({inline_size, 512}).

-export([
    decode/1
]).

%% public
-spec decode(frame()) -> {ok, term()} | {error, atom()}.

decode(#frame {flags = Flags, body = Body, opcode = Opcode}) ->
    Body1 = maybe_decompress(Flags, Body),
    %% Spec order inside a flagged response body:
    %%   [tracing_id][warnings][custom_payload]<message>
    Body2 = skip_tracing(Flags, Body1),
    Body3 = skip_warnings(Flags, Body2),
    Body4 = skip_custom_payload(Flags, Body3),
    decode(Opcode, Body4).

%% private
decode(?OP_ERROR, Body) ->
    {Code, Rest} = marina_types:decode_int(Body),
    {Msg, _Rest2} = marina_types:decode_string(Rest),
    {error, {Code, Msg}};
decode(?OP_READY, _) ->
    {ok, undefined};
decode(?OP_AUTHENTICATE, Body) ->
    {Authenticator, <<>>} = marina_types:decode_string(Body),
    {ok, Authenticator};
decode(?OP_RESULT, <<1:32/integer>>) ->
    {ok, undefined};
decode(?OP_RESULT, <<2:32/integer, Rest/binary>>) ->
    {Metadata, Rest2} = decode_result_metadata(Rest),
    {RowsCount, Rest3} = marina_types:decode_int(Rest2),
    ColumnsCount = Metadata#result_metadata.columns_count,
    {Rows, <<>>} = decode_rows(Rest3, RowsCount, ColumnsCount),

    {ok, #result {
        metadata = Metadata,
        rows_count = RowsCount,
        rows = Rows
    }};
decode(?OP_RESULT, <<3:32/integer, Rest/binary>>) ->
    {Keyspace, <<>>} = marina_types:decode_string(Rest),
    {ok, Keyspace};
decode(?OP_RESULT, <<4:32/integer, Rest/binary>>) ->
    {Id, Rest2} = marina_types:decode_short_bytes(Rest),
    {_Metadata, Rest3} = decode_prepared_metadata(Rest2),
    {_ResultMetadata, <<>>} = decode_result_metadata(Rest3),
    {ok, Id};
decode(?OP_RESULT, <<5:32/integer, Rest/binary>>) ->
    {ChangeType, Rest2} = marina_types:decode_string(Rest),
    {Target, Rest3} = marina_types:decode_string(Rest2),

    Options = case Target of
        <<"KEYSPACE">> ->
            {Option, <<>>} = marina_types:decode_string(Rest3),
            {Option};
        <<"TABLE">> ->
            {Option, Rest4} = marina_types:decode_string(Rest3),
            {Option2, <<>>} = marina_types:decode_string(Rest4),
            {Option, Option2};
        <<"TYPE">> ->
            {Option, Rest4} = marina_types:decode_string(Rest3),
            {Option2, <<>>} = marina_types:decode_string(Rest4),
            {Option, Option2};
        Fn when Fn =:= <<"FUNCTION">>; Fn =:= <<"AGGREGATE">> ->
            {Keyspace, Rest4} = marina_types:decode_string(Rest3),
            {Name, Rest5} = marina_types:decode_string(Rest4),
            {ArgTypes, <<>>} = marina_types:decode_string_list(Rest5),
            {Keyspace, Name, ArgTypes}
    end,

    {ok, {ChangeType, Target, Options}};
decode(?OP_AUTH_CHALLENGE, Body) ->
    {Token, <<>>} = marina_types:decode_bytes(Body),
    {ok, {auth_challenge, Token}};
decode(?OP_AUTH_SUCCESS, _) ->
    %% Spec allows a trailing [bytes] token here. marina's authenticate
    %% path matches on {ok, undefined} so we keep the existing shape —
    %% the token is only meaningful for SASL mechanisms that do a
    %% multi-round handshake, which marina does not currently support.
    {ok, undefined};
decode(?OP_EVENT, Body) ->
    {EventType, Rest} = marina_types:decode_string(Body),
    decode_event(EventType, Rest).

decode_event(<<"TOPOLOGY_CHANGE">>, Body) ->
    {Kind, Rest} = marina_types:decode_string(Body),
    {Addr, _Port, <<>>} = decode_inet(Rest),
    {ok, {event, topology_change, event_kind(Kind), Addr}};
decode_event(<<"STATUS_CHANGE">>, Body) ->
    {Kind, Rest} = marina_types:decode_string(Body),
    {Addr, _Port, <<>>} = decode_inet(Rest),
    {ok, {event, status_change, event_kind(Kind), Addr}};
decode_event(<<"SCHEMA_CHANGE">>, Body) ->
    %% Payload shape matches OP_RESULT kind 5. marina does not consume schema
    %% events, so pass the raw body through without committing to a shape —
    %% a future schema-aware caller can decode it at the call site.
    {ok, {event, schema_change, Body}}.

event_kind(<<"NEW_NODE">>) -> new_node;
event_kind(<<"REMOVED_NODE">>) -> removed_node;
event_kind(<<"MOVED_NODE">>) -> moved_node;
event_kind(<<"UP">>) -> up;
event_kind(<<"DOWN">>) -> down;
event_kind(Other) -> Other.

%% [inet] = <byte N><N bytes of IP><int port>
decode_inet(<<4, A, B, C, D, Port:32/signed, Rest/binary>>) ->
    {{A, B, C, D}, Port, Rest};
decode_inet(<<16, A:16, B:16, C:16, D:16, E:16, F:16, G:16, H:16,
              Port:32/signed, Rest/binary>>) ->
    {{A, B, C, D, E, F, G, H}, Port, Rest}.

maybe_decompress(Flags, Body) when Flags band 16#01 =:= 16#01 ->
    {ok, Body2} = marina_utils:unpack(Body),
    Body2;
maybe_decompress(_Flags, Body) ->
    Body.

skip_tracing(Flags, <<_Uuid:16/binary, Rest/binary>>)
  when Flags band 16#02 =:= 16#02 ->
    Rest;
skip_tracing(_Flags, Body) ->
    Body.

skip_custom_payload(Flags, <<N:16/unsigned, Rest/binary>>)
  when Flags band 16#04 =:= 16#04 ->
    skip_bytes_map(N, Rest);
skip_custom_payload(_Flags, Body) ->
    Body.

skip_bytes_map(0, Body) ->
    Body;
skip_bytes_map(N, <<KLen:16/unsigned, _Key:KLen/binary,
                    VLen:32/signed, VBody/binary>>) when VLen >= 0 ->
    <<_Val:VLen/binary, Rest/binary>> = VBody,
    skip_bytes_map(N - 1, Rest);
skip_bytes_map(N, <<KLen:16/unsigned, _Key:KLen/binary,
                    _VLen:32/signed, Rest/binary>>) ->
    skip_bytes_map(N - 1, Rest).

skip_warnings(Flags, <<N:16/unsigned, Rest/binary>>)
  when Flags band 16#08 =:= 16#08 ->
    skip_string_list(N, Rest);
skip_warnings(_Flags, Body) ->
    Body.

skip_string_list(0, Body) ->
    Body;
skip_string_list(N, <<Len:16/unsigned, _S:Len/binary, Rest/binary>>) ->
    skip_string_list(N - 1, Rest).

decode_columns(Bin, Count) ->
    decode_columns(Bin, Count, []).

decode_columns(Rest, 0, Acc) ->
    {lists:reverse(Acc), Rest};
decode_columns(Bin, Count, Acc) ->
    {Column, Rest} = marina_types:decode_bytes(Bin),
    decode_columns(Rest, Count - 1, [Column | Acc]).

decode_columns_metadata(Bin, ColumnsCount, true) ->
    {Keyspace, Rest} = marina_types:decode_string(Bin),
    {Table, Rest2} = marina_types:decode_string(Rest),
    decode_columns_metadata(Rest2, ColumnsCount, {Keyspace, Table}, []);
decode_columns_metadata(Bin, ColumnsCount, false) ->
    decode_columns_metadata(Bin, ColumnsCount, {undefined, undefined}, []).

decode_columns_metadata(Rest, 0, _GlobalTableSpec, Acc) ->
    {lists:reverse(Acc), Rest};
decode_columns_metadata(Bin, Count, {undefined, undefined} = GlobalTableSpec,
    Acc) ->

    {Keyspace, Bin2} = marina_types:decode_string(Bin),
    {Table, Bin3} = marina_types:decode_string(Bin2),
    {Name, Bin4} = marina_types:decode_string(Bin3),
    {Type, Bin5} = decode_type(Bin4),
    ColumnSpec = #column_spec {
        keyspace = Keyspace,
        table = Table,
        name = Name,
        type = Type
    },
    decode_columns_metadata(Bin5, Count - 1, GlobalTableSpec,
        [ColumnSpec | Acc]);
decode_columns_metadata(Bin, Count, {Keyspace, Table} = GlobalTableSpec,
    Acc) ->

    {Name, Bin2} = marina_types:decode_string(Bin),
    {Type, Bin3} = decode_type(Bin2),
    ColumnSpec = #column_spec {
        keyspace = Keyspace,
        table = Table,
        name = Name,
        type = Type
    },
    decode_columns_metadata(Bin3, Count - 1, GlobalTableSpec,
        [ColumnSpec | Acc]).

decode_elements(Bin, N) ->
    decode_elements(Bin, N, []).

decode_elements(Rest, 0, Acc) ->
    {lists:reverse(Acc), Rest};
decode_elements(Bin, N, Acc) ->
    {Type, Rest} = decode_type(Bin),
    decode_elements(Rest, N - 1, [Type | Acc]).

decode_fields(Bin, N) ->
    decode_fields(Bin, N, []).

decode_fields(Rest, 0, Acc) ->
    {lists:reverse(Acc), Rest};
decode_fields(Bin, N, Acc) ->
    {Name, Rest} = marina_types:decode_string(Bin),
    {Type, Rest2} = decode_type(Rest),
    decode_fields(Rest2, N - 1, [{Name, Type} | Acc]).

decode_result_flags(Flags) ->
    GlobalTableSpec = Flags band 1 == 1,
    HasMorePages = Flags band 2 == 2,
    NoMetaData = Flags band 4 == 4,
    {GlobalTableSpec, HasMorePages, NoMetaData}.

decode_result_paging_state(Bin, true) ->
    {Paging, Rest} = marina_types:decode_bytes(Bin),
    {Paging, Rest};
decode_result_paging_state(Rest, false) ->
    {undefined, Rest}.

decode_result_metadata(<<Flags:32/integer, ColumnsCount:32/integer,
    Rest/binary>>) ->

    {GlobalTableSpec, HasMorePages, NoMetaData} = decode_result_flags(Flags),
    {PagingState, Rest2} = decode_result_paging_state(Rest, HasMorePages),
    {Columns, Rest3} = case NoMetaData of
        true -> {[], Rest2};
        false ->
            decode_columns_metadata(Rest2, ColumnsCount, GlobalTableSpec)
    end,

    {#result_metadata {
        columns_count = ColumnsCount,
        columns = Columns,
        paging_state = PagingState
    }, Rest3}.

%% PREPARED result metadata (v4+) carries a pk_count + pk_indexes preamble
%% that the plain ROWS result_metadata does not. marina does not consume the
%% pk info for routing, so we skip it and fall through to the shared tail.
decode_prepared_metadata(<<Flags:32/integer, ColumnsCount:32/integer,
    PkCount:32/integer, PkRest/binary>>) ->

    PkSkip = PkCount * 2,
    <<_PkIndexes:PkSkip/binary, Rest/binary>> = PkRest,
    {GlobalTableSpec, HasMorePages, NoMetaData} = decode_result_flags(Flags),
    {PagingState, Rest2} = decode_result_paging_state(Rest, HasMorePages),
    {Columns, Rest3} = case NoMetaData of
        true -> {[], Rest2};
        false ->
            decode_columns_metadata(Rest2, ColumnsCount, GlobalTableSpec)
    end,

    {#result_metadata {
        columns_count = ColumnsCount,
        columns = Columns,
        paging_state = PagingState
    }, Rest3}.

decode_rows(Bin, Count, ColumnsCount) ->
    decode_rows(Bin, Count, ColumnsCount, []).

decode_rows(Rest, 0, _ColumnsCount, Acc) ->
    {lists:reverse(Acc), Rest};
decode_rows(Bin, Count, ColumnsCount, Acc) ->
    {Row, Rest} = decode_columns(Bin, ColumnsCount),
    decode_rows(Rest, Count - 1, ColumnsCount, [Row | Acc]).

decode_type(<<16#0:16, Rest/binary>>) ->
    {Type, Rest2} = marina_types:decode_string(Rest),
    {Type, Rest2};
decode_type(<<16#1:16, Rest/binary>>) ->
    {ascii, Rest};
decode_type(<<16#2:16, Rest/binary>>) ->
    {bigint, Rest};
decode_type(<<16#3:16, Rest/binary>>) ->
    {blob, Rest};
decode_type(<<16#4:16, Rest/binary>>) ->
    {boolean, Rest};
decode_type(<<16#5:16, Rest/binary>>) ->
    {counter, Rest};
decode_type(<<16#6:16, Rest/binary>>) ->
    {decimal, Rest};
decode_type(<<16#7:16, Rest/binary>>) ->
    {double, Rest};
decode_type(<<16#8:16, Rest/binary>>) ->
    {float, Rest};
decode_type(<<16#9:16, Rest/binary>>) ->
    {int, Rest};
decode_type(<<16#B:16, Rest/binary>>) ->
    {timestamp, Rest};
decode_type(<<16#C:16, Rest/binary>>) ->
    {uid, Rest};
decode_type(<<16#D:16, Rest/binary>>) ->
    {varchar, Rest};
decode_type(<<16#E:16, Rest/binary>>) ->
    {varint, Rest};
decode_type(<<16#F:16, Rest/binary>>) ->
    {timeuuid, Rest};
decode_type(<<16#10:16, Rest/binary>>) ->
    {inet, Rest};
decode_type(<<16#11:16, Rest/binary>>) ->
    {date, Rest};
decode_type(<<16#12:16, Rest/binary>>) ->
    {time, Rest};
decode_type(<<16#13:16, Rest/binary>>) ->
    {smallint, Rest};
decode_type(<<16#14:16, Rest/binary>>) ->
    {tinyint, Rest};
decode_type(<<16#15:16, Rest/binary>>) ->
    {duration, Rest};
decode_type(<<16#20:16, Rest/binary>>) ->
    {Type, Rest2} = decode_type(Rest),
    {{list, Type}, Rest2};
decode_type(<<16#21:16, Rest/binary>>) ->
    {KeyType, Rest2} = decode_type(Rest),
    {ValueType, Rest3} = decode_type(Rest2),
    {{map, KeyType, ValueType}, Rest3};
decode_type(<<16#22:16, Rest/binary >>) ->
    {Type, Rest2} = decode_type(Rest),
    {{set, Type}, Rest2};
decode_type(<<16#30:16, Rest/binary >>) ->
    {Keyspace, Rest2} = marina_types:decode_string(Rest),
    {Name, Rest3} = marina_types:decode_string(Rest2),
    {FieldsCount, Rest4} = marina_types:decode_short(Rest3),
    {Fields, Rest5} = decode_fields(Rest4, FieldsCount),
    {{udt, Keyspace, Name, Fields}, Rest5};
decode_type(<<16#31:16, Rest/binary >>) ->
    {ElementsCount, Rest2} = marina_types:decode_short(Rest),
    {Elements, Rest3} = decode_elements(Rest2, ElementsCount),
    {{tuple, Elements}, Rest3}.