Skip to main content

src/livery_grpc_wire.erl

-module(livery_grpc_wire).
-moduledoc """
Message <-> wire glue shared by the server and the client.

Combines the three wire concerns into one place: protobuf coding
(`livery_grpc_codec`), per-message compression (`livery_grpc_compression`),
and length-prefixed framing (`livery_grpc_frame`). The server and client
encode and decode messages through here so the rules stay identical on
both sides.
""".

-export([encode/4, decode_frame/4, decode_all/5]).

-doc """
Encode one message to a wire frame: protobuf-encode, compress with
`Algorithm`, then length-prefix.
""".
-spec encode(module(), atom(), map() | tuple(), livery_grpc_compression:algorithm()) ->
    {ok, iodata()} | {error, {encode, term()}}.
encode(Proto, MsgName, Msg, Algorithm) ->
    case livery_grpc_codec:encode(Proto, MsgName, Msg) of
        {ok, Bin} ->
            {Compressed, Bytes} = livery_grpc_compression:compress(Algorithm, Bin),
            {ok, livery_grpc_frame:encode(Bytes, Compressed)};
        {error, _} = E ->
            E
    end.

-doc """
Decode one framed message: decompress per the frame flag and message
`Encoding`, then protobuf-decode as `MsgName`.
""".
-spec decode_frame(
    module(), atom(), livery_grpc_compression:algorithm(), livery_grpc_frame:frame()
) ->
    {ok, map() | tuple()} | {error, term()}.
decode_frame(Proto, MsgName, Encoding, {Compressed, Payload}) ->
    try livery_grpc_compression:decompress(Compressed, Encoding, Payload) of
        Raw -> livery_grpc_codec:decode(Proto, MsgName, Raw)
    catch
        error:{grpc_compression, _} = Reason -> {error, Reason}
    end.

-doc """
Decode every whole frame in `Bin` as `MsgName`. Returns `{error,
incomplete}` if a trailing partial frame remains, or the first frame's
decode error. `Max` caps a single message's declared length.
""".
-spec decode_all(
    module(), atom(), livery_grpc_compression:algorithm(), binary(), non_neg_integer() | infinity
) ->
    {ok, [map() | tuple()]} | {error, term()}.
decode_all(Proto, MsgName, Encoding, Bin, Max) ->
    case livery_grpc_frame:push(Bin, livery_grpc_frame:new(), Max) of
        {ok, Frames, Buf} ->
            case livery_grpc_frame:is_empty(Buf) of
                true -> decode_each(Proto, MsgName, Encoding, Frames, []);
                false -> {error, incomplete}
            end;
        {error, _} = E ->
            E
    end.

-spec decode_each(
    module(), atom(), livery_grpc_compression:algorithm(), [livery_grpc_frame:frame()], [
        map() | tuple()
    ]
) ->
    {ok, [map() | tuple()]} | {error, term()}.
decode_each(_Proto, _MsgName, _Encoding, [], Acc) ->
    {ok, lists:reverse(Acc)};
decode_each(Proto, MsgName, Encoding, [Frame | Rest], Acc) ->
    case decode_frame(Proto, MsgName, Encoding, Frame) of
        {ok, Msg} -> decode_each(Proto, MsgName, Encoding, Rest, [Msg | Acc]);
        {error, _} = E -> E
    end.