%%% @doc Per-session Connect-UDP-Bind compression table.
%%%
%%% Two of these live in every bind session, one per role:
%%% <ul>
%%% <li><b>own</b> table - context-IDs we have opened on the peer
%%% via outbound `COMPRESSION_ASSIGN'. Allocations follow the
%%% parity rule (client uses even IDs, proxy uses odd).</li>
%%% <li><b>peer</b> table - context-IDs the peer has opened on us
%%% via incoming `COMPRESSION_ASSIGN'. We installed them; we
%%% look up by ID on receipt of a datagram and by tuple when
%%% compressing outbound traffic to a peer we already have a
%%% mapping for.</li>
%%% </ul>
%%%
%%% Draft-ietf-masque-connect-udp-listen-11 invariants enforced here:
%%%
%%% <ul>
%%% <li>Parity: client-even / proxy-odd context-IDs on `open'.
%%% Cross-parity from the peer is malformed on `install'.</li>
%%% <li>Duplicate context-ID is malformed.</li>
%%% <li>Per-tuple uniqueness with the two distinct draft-11 cases:
%%% <ul>
%%% <li>Cross-side conflict: peer ASSIGNs a tuple our side
%%% already opened. The table emits
%%% `{conflict, close_proxy_id}' so the session can send
%%% the matching `COMPRESSION_CLOSE'.</li>
%%% <li>Same-side conflict: peer ASSIGNs a tuple it already
%%% has open. Returned as
%%% `{error, malformed_duplicate_tuple}'; the session
%%% aborts the request stream.</li>
%%% </ul></li>
%%% <li>Uncompressed (IP Version 0) asymmetry:
%%% <ul>
%%% <li>`open_uncompressed/1' on a proxy-side own table
%%% returns `{error, uncompressed_only_from_client}'.</li>
%%% <li>The proxy-side peer table accepts an incoming version-0
%%% ASSIGN (the client opened it).</li>
%%% <li>The client-side peer table rejects an incoming
%%% version-0 ASSIGN (only the client may originate
%%% uncompressed mappings).</li>
%%% </ul></li>
%%% <li>Singleton uncompressed: at most one open IP Version 0
%%% mapping per session. Second `open_uncompressed/1' returns
%%% `{error, uncompressed_context_already_open}'; second
%%% incoming version-0 ASSIGN is malformed.</li>
%%% <li>Post-close prohibition: once the proxy-side own
%%% uncompressed mapping has been closed, the proxy must not
%%% open new compressed mappings - the client could not deliver
%%% payloads that need the uncompressed channel. `open_compressed/2'
%%% returns `{error, uncompressed_closed}' on a proxy-side own
%%% table after the closure.</li>
%%% <li>Address-family gating: `open_compressed/2' for a family not
%%% in the advertised list returns `{error, unadvertised_family}';
%%% the same family check applies to `install/2'.</li>
%%% <li>Bounds (`max_in', `max_out') on the peer / own tables to
%%% limit memory in the face of a hostile peer.</li>
%%% </ul>
%%%
%%% This module is pure data. No process, no I/O. Sessions thread the
%%% returned `state()' through their gen_server state.
-module(masque_compression_table).
-export([new_own/2, new_peer/2,
open_compressed/2, open_uncompressed/1,
install/2,
install_ack/2,
install_close/2,
lookup_by_id/2, lookup_by_tuple/2,
entries/1,
is_empty/1]).
-include("masque_udp_bind.hrl").
-type role() :: client | proxy.
-type direction() :: own | peer.
-type peer_tuple() :: {4 | 6, inet:ip_address(), inet:port_number()}.
-record(state, {
role :: role(),
direction :: direction(),
advertised_families = [4, 6] :: [4 | 6],
%% Next outbound ID to consider. We bump in steps of 2 so the
%% parity invariant holds. Clients start at 2, proxies at 1.
%% Only meaningful on `direction = own'.
next_id :: pos_integer(),
%% Mapping context-id -> #compression_entry{}
entries = #{} :: #{pos_integer() => #compression_entry{}},
%% Reverse index for lookup_by_tuple/2. Only includes entries
%% with a non-undefined peer tuple (i.e. IP Version 4 / 6).
by_tuple = #{} :: #{peer_tuple() => pos_integer()},
%% Tracks the singleton-uncompressed invariant.
uncompressed_id = undefined :: undefined | pos_integer(),
max_entries :: pos_integer()
}).
-opaque state() :: #state{}.
-type open_error() ::
uncompressed_only_from_client
| uncompressed_context_already_open
| unadvertised_family
| table_full.
-type install_error() ::
bad_parity
| duplicate_context_id
| uncompressed_only_from_client
| uncompressed_context_already_open
| malformed_duplicate_tuple
| unadvertised_family
| table_full.
-type install_ack_error() ::
malformed_unknown_ack.
-type install_close_error() ::
unknown_context.
-type install_result() ::
{ok, state()}
| {ok, {conflict, close_proxy_id, pos_integer()}, state()}.
-export_type([state/0, role/0, direction/0, peer_tuple/0,
open_error/0, install_error/0,
install_ack_error/0, install_close_error/0,
install_result/0]).
%%====================================================================
%% Constructors
%%====================================================================
%% @doc Build a fresh "own" table - the one that allocates outbound
%% context-IDs. `role' decides the parity. `Opts' may set
%% `advertised_families' (default `[4,6]') and `max_entries'
%% (default 1024).
-spec new_own(role(), map()) -> state().
new_own(Role, Opts) ->
#state{role = Role,
direction = own,
advertised_families =
maps:get(advertised_families, Opts, [4, 6]),
next_id = first_id(Role),
max_entries = maps:get(max_entries, Opts, 1024)}.
%% @doc Build a fresh "peer" table - the one that records incoming
%% `COMPRESSION_ASSIGN's from the peer. The peer's parity is the
%% opposite of `Role'.
-spec new_peer(role(), map()) -> state().
new_peer(Role, Opts) ->
#state{role = Role,
direction = peer,
advertised_families =
maps:get(advertised_families, Opts, [4, 6]),
next_id = first_id(peer_of(Role)),
max_entries = maps:get(max_entries, Opts, 1024)}.
%%====================================================================
%% Outbound: open
%%====================================================================
%% @doc Allocate a fresh own context-ID for a compressed
%% (IP Version 4 / 6) mapping to the given peer tuple. Returns the
%% new entry plus the updated state. The session emits a matching
%% `COMPRESSION_ASSIGN' on the wire and waits for ACK before using
%% the ID.
-spec open_compressed(state(), peer_tuple()) ->
{ok, #compression_entry{}, state()} | {error, open_error()}.
open_compressed(#state{direction = peer}, _Peer) ->
erlang:error(open_on_peer_table);
open_compressed(#state{advertised_families = Fams} = S,
{V, IP, Port} = Peer)
when (V =:= 4 orelse V =:= 6),
is_integer(Port), Port >= 0, Port =< 65535 ->
case lists:member(V, Fams) of
false ->
{error, unadvertised_family};
true ->
case is_full(S) of
true -> {error, table_full};
false ->
case maps:is_key(Peer, S#state.by_tuple) of
true ->
%% Same tuple already mapped on our own
%% side; refuse the duplicate. Per draft-11
%% the cross-side conflict resolution is
%% only triggered by an *incoming* ASSIGN.
{error, malformed_duplicate_tuple};
false ->
allocate(V, IP, Port, Peer, S)
end
end
end.
%% @doc Allocate a fresh own context-ID for an uncompressed
%% (IP Version 0) mapping. Client-only.
-spec open_uncompressed(state()) ->
{ok, #compression_entry{}, state()} | {error, open_error()}.
open_uncompressed(#state{direction = peer}) ->
erlang:error(open_on_peer_table);
open_uncompressed(#state{role = proxy}) ->
{error, uncompressed_only_from_client};
open_uncompressed(#state{uncompressed_id = Id}) when Id =/= undefined ->
{error, uncompressed_context_already_open};
open_uncompressed(#state{} = S) ->
case is_full(S) of
true -> {error, table_full};
false -> allocate_uncompressed(S)
end.
%%====================================================================
%% Inbound: install (peer's ASSIGN)
%%====================================================================
%% @doc Record a peer-originated `COMPRESSION_ASSIGN' on the peer
%% table.
%%
%% Returns:
%% <ul>
%% <li>`{ok, NewState}' on a clean install.</li>
%% <li>`{ok, {conflict, close_proxy_id, Id}, NewState}' when the
%% peer's tuple matches one our own side has already opened
%% (the proxy must close the proxy-opened context per
%% draft-11). The session emits the matching CLOSE; the close
%% refers to the `Id' on the *own* table, not this one.</li>
%% <li>`{error, _}' on a malformed install.</li>
%% </ul>
%%
%% The own table is passed in too because the cross-side conflict
%% rule needs visibility into our own allocations. `OwnTable' is
%% read-only; the conflict resolution itself happens in the session,
%% which removes the conflicted entry from the own table after
%% sending CLOSE.
-spec install(state(), #compression_assign{}) ->
install_result() | {error, install_error()}.
install(#state{direction = own}, _Assign) ->
erlang:error(install_on_own_table);
install(#state{} = S, #compression_assign{context_id = Id} = A) ->
case parity_ok(S, Id) of
false -> {error, bad_parity};
true -> install_1(S, A)
end.
install_1(#state{entries = E}, #compression_assign{context_id = Id})
when is_map_key(Id, E) ->
{error, duplicate_context_id};
install_1(#state{} = S, #compression_assign{ip_version = 0} = A) ->
install_uncompressed(S, A);
install_1(#state{advertised_families = Fams} = S,
#compression_assign{ip_version = V} = A)
when V =:= 4; V =:= 6 ->
case lists:member(V, Fams) of
false -> {error, unadvertised_family};
true -> install_compressed(S, A)
end;
install_1(_S, _A) ->
{error, bad_parity}. %% defensive; draft only allows 0 / 4 / 6
install_uncompressed(#state{role = client}, _A) ->
{error, uncompressed_only_from_client};
install_uncompressed(#state{uncompressed_id = OldId}, _A)
when OldId =/= undefined ->
{error, uncompressed_context_already_open};
install_uncompressed(#state{} = S,
#compression_assign{context_id = Id}) ->
case is_full(S) of
true -> {error, table_full};
false ->
Entry = #compression_entry{
context_id = Id,
ip_version = 0,
address = undefined,
port = undefined,
state = installed,
direction = inbound},
S2 = S#state{
entries = maps:put(Id, Entry, S#state.entries),
uncompressed_id = Id},
{ok, S2}
end.
install_compressed(#state{} = S,
#compression_assign{context_id = Id,
ip_version = V,
address = A,
port = Port}) ->
Tuple = {V, A, Port},
case is_full(S) of
true -> {error, table_full};
false ->
case maps:get(Tuple, S#state.by_tuple, undefined) of
undefined ->
%% Clean install. Cross-side conflict (the same
%% tuple existing on the *own* table) is the
%% session's job to detect; we just record what
%% the peer told us.
Entry = #compression_entry{
context_id = Id,
ip_version = V,
address = A,
port = Port,
state = installed,
direction = inbound},
S2 = S#state{
entries = maps:put(Id, Entry,
S#state.entries),
by_tuple = maps:put(Tuple, Id,
S#state.by_tuple)},
{ok, S2};
_PriorId ->
%% Same-side duplicate: the peer has the same
%% tuple open under two of its own context-IDs.
%% Malformed per draft-11.
{error, malformed_duplicate_tuple}
end
end.
%%====================================================================
%% Inbound: install_ack (peer's ACK of our ASSIGN)
%%====================================================================
%% @doc Record a peer `COMPRESSION_ACK'. Only valid against the own
%% table (we sent the original ASSIGN).
-spec install_ack(state(), #compression_ack{}) ->
{ok, state()} | {error, install_ack_error()}.
install_ack(#state{direction = peer}, _Ack) ->
erlang:error(install_ack_on_peer_table);
install_ack(#state{} = S, #compression_ack{context_id = Id}) ->
case maps:get(Id, S#state.entries, undefined) of
undefined ->
{error, malformed_unknown_ack};
#compression_entry{state = pending_ack} = E ->
E2 = E#compression_entry{state = installed},
{ok, S#state{entries = maps:put(Id, E2, S#state.entries)}};
#compression_entry{state = installed} ->
%% Idempotent: ACK after a previous ACK. Treat as ok
%% rather than malformed; the peer simply repeated.
{ok, S}
end.
%%====================================================================
%% Inbound: install_close (peer's CLOSE of one of its / our IDs)
%%====================================================================
%% @doc Record a peer `COMPRESSION_CLOSE'. Removes the entry from
%% whichever table holds it. The caller is expected to dispatch by
%% direction first - typically the session looks up the ID in both
%% tables and calls install_close on the matching one.
-spec install_close(state(), #compression_close{}) ->
{ok, state()} | {error, install_close_error()}.
install_close(#state{} = S, #compression_close{context_id = Id}) ->
case maps:take(Id, S#state.entries) of
error ->
{error, unknown_context};
{Entry, NewEntries} ->
S2 = S#state{entries = NewEntries},
S3 = drop_tuple_index(Entry, S2),
S4 = clear_uncompressed_marker(Id, S3),
{ok, S4}
end.
%%====================================================================
%% Read-only lookups
%%====================================================================
-spec lookup_by_id(state(), pos_integer()) ->
{ok, #compression_entry{}} | not_found.
lookup_by_id(#state{entries = E}, Id) when is_integer(Id), Id > 0 ->
case maps:get(Id, E, undefined) of
undefined -> not_found;
Entry -> {ok, Entry}
end.
-spec lookup_by_tuple(state(), peer_tuple()) ->
{ok, #compression_entry{}} | not_found.
lookup_by_tuple(#state{entries = E, by_tuple = T} = _S, Tuple) ->
case maps:get(Tuple, T, undefined) of
undefined -> not_found;
Id -> {ok, maps:get(Id, E)}
end.
-spec entries(state()) -> [#compression_entry{}].
entries(#state{entries = E}) ->
maps:values(E).
-spec is_empty(state()) -> boolean().
is_empty(#state{entries = E}) ->
map_size(E) =:= 0.
%%====================================================================
%% Internal
%%====================================================================
first_id(client) -> 2;
first_id(proxy) -> 1.
peer_of(client) -> proxy;
peer_of(proxy) -> client.
%% True iff the peer's chosen context-ID has the parity that side is
%% allowed to use.
parity_ok(#state{role = Role}, Id) ->
case peer_of(Role) of
client -> Id band 1 =:= 0; %% clients: even
proxy -> Id band 1 =:= 1 %% proxies: odd
end.
is_full(#state{entries = E, max_entries = Max}) ->
map_size(E) >= Max.
allocate(V, IP, Port, Peer, S) ->
Id = S#state.next_id,
Entry = #compression_entry{
context_id = Id,
ip_version = V,
address = IP,
port = Port,
state = pending_ack,
direction = outbound},
S2 = S#state{
entries = maps:put(Id, Entry, S#state.entries),
by_tuple = maps:put(Peer, Id, S#state.by_tuple),
next_id = Id + 2},
{ok, Entry, S2}.
allocate_uncompressed(S) ->
Id = S#state.next_id,
Entry = #compression_entry{
context_id = Id,
ip_version = 0,
address = undefined,
port = undefined,
state = pending_ack,
direction = outbound},
S2 = S#state{
entries = maps:put(Id, Entry, S#state.entries),
next_id = Id + 2,
uncompressed_id = Id},
{ok, Entry, S2}.
drop_tuple_index(#compression_entry{ip_version = 0}, S) ->
S;
drop_tuple_index(#compression_entry{ip_version = V,
address = A,
port = Port},
#state{by_tuple = T} = S) ->
S#state{by_tuple = maps:remove({V, A, Port}, T)}.
clear_uncompressed_marker(Id, #state{uncompressed_id = Id} = S) ->
S#state{uncompressed_id = undefined};
clear_uncompressed_marker(_Id, S) ->
S.