-module(lightspeed@transport@matrix).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/transport/matrix.gleam").
-export([default_security_policy/0, default_timeout_policy/0, fallback_available/1, connect_with_policies/7, connect/4, reconnect_with_policies/6, reconnect/3, receive_with_hooks/5, 'receive'/4, poll/1, session_state/1, profile_label/1, mode_label/1, progressive_enhancement_active/1, active_transport/1, active_transport_label/1, capability_enabled/2, queued_outbound_count/1]).
-export_type([profile/0, mode/0, security_policy/0, timeout_policy/0, capability/0, connect_request/0, adapter_state/0, connect_result/0, receive_result/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Transport profile matrix with progressive enhancement semantics for M24.\n").
-type profile() :: web_socket_only |
web_socket_with_server_sent_events_fallback |
web_socket_with_long_polling_fallback.
-type mode() :: primary_realtime | fallback_polling.
-type security_policy() :: {security_policy, boolean(), boolean(), integer()}.
-type timeout_policy() :: {timeout_policy, integer(), integer()}.
-type capability() :: ordered_client_events |
reconnect_semantics |
server_push_frames |
bidirectional_channel |
progressive_fallback.
-type connect_request() :: {connect_request,
binary(),
binary(),
binary(),
integer(),
boolean()}.
-type adapter_state() :: {adapter_state,
lightspeed@transport@wisp_websocket:adapter_state(),
profile(),
mode(),
list(binary()),
integer(),
integer(),
security_policy(),
timeout_policy()}.
-type connect_result() :: {connected, adapter_state(), list(binary())} |
{rejected, lightspeed@transport@contract:adapter_error()}.
-type receive_result() :: {receive_result, adapter_state(), list(binary())}.
-file("src/lightspeed/transport/matrix.gleam", 84).
?DOC(" Default transport security policy.\n").
-spec default_security_policy() -> security_policy().
default_security_policy() ->
{security_policy, true, true, 4096}.
-file("src/lightspeed/transport/matrix.gleam", 93).
?DOC(" Default timeout policy.\n").
-spec default_timeout_policy() -> timeout_policy().
default_timeout_policy() ->
{timeout_policy, 30000, 120000}.
-file("src/lightspeed/transport/matrix.gleam", 375).
?DOC(" Whether the profile has a fallback mode available.\n").
-spec fallback_available(profile()) -> boolean().
fallback_available(Profile) ->
case Profile of
web_socket_only ->
false;
web_socket_with_server_sent_events_fallback ->
true;
web_socket_with_long_polling_fallback ->
true
end.
-file("src/lightspeed/transport/matrix.gleam", 423).
-spec select_mode(profile(), boolean()) -> mode().
select_mode(Profile, Prefer_fallback) ->
case Prefer_fallback andalso fallback_available(Profile) of
true ->
fallback_polling;
false ->
primary_realtime
end.
-file("src/lightspeed/transport/matrix.gleam", 469).
-spec to_ws_request(connect_request()) -> lightspeed@transport@wisp_websocket:web_socket_request().
to_ws_request(Request) ->
{web_socket_request,
erlang:element(2, Request),
erlang:element(3, Request),
erlang:element(4, Request),
erlang:element(5, Request)}.
-file("src/lightspeed/transport/matrix.gleam", 444).
-spec is_insecure_origin(binary()) -> boolean().
is_insecure_origin(Origin) ->
case (gleam_stdlib:string_starts_with(Origin, <<"https://"/utf8>>) orelse gleam_stdlib:string_starts_with(
Origin,
<<"http://localhost"/utf8>>
))
orelse gleam_stdlib:string_starts_with(Origin, <<"http://127.0.0.1"/utf8>>) of
true ->
false;
false ->
true
end.
-file("src/lightspeed/transport/matrix.gleam", 430).
-spec ensure_connection_security(connect_request(), security_policy()) -> {ok,
nil} |
{error, lightspeed@transport@contract:adapter_error()}.
ensure_connection_security(Request, Policy) ->
case erlang:element(3, Policy) andalso (erlang:element(3, Request) =:= <<""/utf8>>) of
true ->
{error, {protection_rejected, <<"csrf_token_missing"/utf8>>}};
false ->
case erlang:element(2, Policy) andalso is_insecure_origin(
erlang:element(4, Request)
) of
true ->
{error, {protection_rejected, <<"insecure_origin"/utf8>>}};
false ->
{ok, nil}
end
end.
-file("src/lightspeed/transport/matrix.gleam", 116).
?DOC(" Connect with explicit protection, security, and timeout policies.\n").
-spec connect_with_policies(
lightspeed@agent@session:session(),
connect_request(),
profile(),
lightspeed@transport@contract:auth_hook(),
lightspeed@transport@contract:protection_hook(),
security_policy(),
timeout_policy()
) -> connect_result().
connect_with_policies(
Session_state,
Request,
Profile,
Auth_hook,
Protection_hook,
Security_policy,
Timeout_policy
) ->
case ensure_connection_security(Request, Security_policy) of
{error, Error} ->
{rejected, Error};
{ok, nil} ->
case lightspeed@transport@wisp_websocket:connect_with_hooks(
Session_state,
to_ws_request(Request),
Auth_hook,
Protection_hook
) of
{rejected, Error@1} ->
{rejected, Error@1};
{connected, Ws_state, Frames} ->
Mode = select_mode(Profile, erlang:element(6, Request)),
State = {adapter_state,
Ws_state,
Profile,
Mode,
case Mode of
primary_realtime ->
[];
fallback_polling ->
Frames
end,
0,
erlang:element(5, Request),
Security_policy,
Timeout_policy},
{connected, State, case Mode of
primary_realtime ->
Frames;
fallback_polling ->
[]
end}
end
end.
-file("src/lightspeed/transport/matrix.gleam", 98).
?DOC(" Connect with default protection, security, and timeout policies.\n").
-spec connect(
lightspeed@agent@session:session(),
connect_request(),
profile(),
lightspeed@transport@contract:auth_hook()
) -> connect_result().
connect(Session_state, Request, Profile, Auth_hook) ->
connect_with_policies(
Session_state,
Request,
Profile,
Auth_hook,
lightspeed@transport@contract:allow_protection(),
default_security_policy(),
default_timeout_policy()
).
-file("src/lightspeed/transport/matrix.gleam", 485).
-spec append(list(binary()), list(binary())) -> list(binary()).
append(Left, Right) ->
case Left of
[] ->
Right;
[Entry | Rest] ->
[Entry | append(Rest, Right)]
end.
-file("src/lightspeed/transport/matrix.gleam", 179).
?DOC(" Reconnect with explicit protection, security, and timeout policies.\n").
-spec reconnect_with_policies(
adapter_state(),
connect_request(),
lightspeed@transport@contract:auth_hook(),
lightspeed@transport@contract:protection_hook(),
security_policy(),
timeout_policy()
) -> connect_result().
reconnect_with_policies(
State,
Request,
Auth_hook,
Protection_hook,
Security_policy,
Timeout_policy
) ->
case ensure_connection_security(Request, Security_policy) of
{error, Error} ->
{rejected, Error};
{ok, nil} ->
case (erlang:element(5, Request) - erlang:element(7, State)) > erlang:element(
3,
Timeout_policy
) of
true ->
{rejected,
{invalid_adapter_state,
<<"reconnect_grace_exceeded"/utf8>>}};
false ->
case lightspeed@transport@wisp_websocket:reconnect_with_hooks(
erlang:element(2, State),
to_ws_request(Request),
Auth_hook,
Protection_hook
) of
{rejected, Error@1} ->
{rejected, Error@1};
{connected, Ws_state, Frames} ->
Next_mode = select_mode(
erlang:element(3, State),
erlang:element(6, Request)
),
Merged_queue = case Next_mode of
primary_realtime ->
[];
fallback_polling ->
append(erlang:element(5, State), Frames)
end,
Next = {adapter_state,
Ws_state,
erlang:element(3, State),
Next_mode,
Merged_queue,
erlang:element(6, State),
erlang:element(5, Request),
Security_policy,
Timeout_policy},
{connected, Next, case Next_mode of
primary_realtime ->
case erlang:element(4, State) of
primary_realtime ->
Frames;
fallback_polling ->
append(
erlang:element(5, State),
Frames
)
end;
fallback_polling ->
[]
end}
end
end
end.
-file("src/lightspeed/transport/matrix.gleam", 163).
?DOC(" Reconnect with the state's existing policy defaults.\n").
-spec reconnect(
adapter_state(),
connect_request(),
lightspeed@transport@contract:auth_hook()
) -> connect_result().
reconnect(State, Request, Auth_hook) ->
reconnect_with_policies(
State,
Request,
Auth_hook,
lightspeed@transport@contract:allow_protection(),
erlang:element(8, State),
erlang:element(9, State)
).
-file("src/lightspeed/transport/matrix.gleam", 455).
-spec emit_frames(adapter_state(), list(binary())) -> receive_result().
emit_frames(State, Frames) ->
case erlang:element(4, State) of
primary_realtime ->
{receive_result, State, Frames};
fallback_polling ->
{receive_result,
{adapter_state,
erlang:element(2, State),
erlang:element(3, State),
erlang:element(4, State),
append(erlang:element(5, State), Frames),
erlang:element(6, State),
erlang:element(7, State),
erlang:element(8, State),
erlang:element(9, State)},
[]}
end.
-file("src/lightspeed/transport/matrix.gleam", 478).
-spec event_order_violation_reason(integer(), integer()) -> binary().
event_order_violation_reason(Last, Incoming) ->
<<<<<<"event_order_violation:last="/utf8,
(erlang:integer_to_binary(Last))/binary>>/binary,
":incoming="/utf8>>/binary,
(erlang:integer_to_binary(Incoming))/binary>>.
-file("src/lightspeed/transport/matrix.gleam", 259).
?DOC(" Receive one client frame with explicit rate-limit hook.\n").
-spec receive_with_hooks(
adapter_state(),
binary(),
integer(),
integer(),
lightspeed@transport@contract:rate_limit_hook()
) -> receive_result().
receive_with_hooks(State, Payload, Now_ms, Client_sequence, Rate_limit_hook) ->
case (Now_ms - erlang:element(7, State)) > erlang:element(
2,
erlang:element(9, State)
) of
true ->
emit_frames(
State,
[lightspeed@protocol:encode(
{failure,
<<""/utf8>>,
lightspeed@transport@contract:error_to_string(
{invalid_adapter_state,
<<"heartbeat_timeout"/utf8>>}
)}
)]
);
false ->
case string:length(Payload) > erlang:element(
4,
erlang:element(8, State)
) of
true ->
emit_frames(
{adapter_state,
erlang:element(2, State),
erlang:element(3, State),
erlang:element(4, State),
erlang:element(5, State),
erlang:element(6, State),
Now_ms,
erlang:element(8, State),
erlang:element(9, State)},
[lightspeed@protocol:encode(
{failure,
<<""/utf8>>,
lightspeed@transport@contract:error_to_string(
{unsupported_client_frame,
<<"payload_too_large"/utf8>>}
)}
)]
);
false ->
case Client_sequence =< erlang:element(6, State) of
true ->
emit_frames(
{adapter_state,
erlang:element(2, State),
erlang:element(3, State),
erlang:element(4, State),
erlang:element(5, State),
erlang:element(6, State),
Now_ms,
erlang:element(8, State),
erlang:element(9, State)},
[lightspeed@protocol:encode(
{failure,
<<""/utf8>>,
lightspeed@transport@contract:error_to_string(
{invalid_adapter_state,
event_order_violation_reason(
erlang:element(6, State),
Client_sequence
)}
)}
)]
);
false ->
Received = lightspeed@transport@wisp_websocket:receive_with_hooks(
erlang:element(2, State),
Payload,
Now_ms,
Rate_limit_hook
),
Next = {adapter_state,
erlang:element(2, Received),
erlang:element(3, State),
erlang:element(4, State),
erlang:element(5, State),
Client_sequence,
Now_ms,
erlang:element(8, State),
erlang:element(9, State)},
emit_frames(Next, erlang:element(3, Received))
end
end
end.
-file("src/lightspeed/transport/matrix.gleam", 243).
?DOC(
" Receive one client frame.\n"
"\n"
" `client_sequence` must be monotonically increasing per connection.\n"
).
-spec 'receive'(adapter_state(), binary(), integer(), integer()) -> receive_result().
'receive'(State, Payload, Now_ms, Client_sequence) ->
receive_with_hooks(
State,
Payload,
Now_ms,
Client_sequence,
lightspeed@transport@contract:allow_rate_limit()
).
-file("src/lightspeed/transport/matrix.gleam", 333).
?DOC(" Poll queued server frames.\n").
-spec poll(adapter_state()) -> receive_result().
poll(State) ->
case erlang:element(4, State) of
primary_realtime ->
{receive_result, State, []};
fallback_polling ->
{receive_result,
{adapter_state,
erlang:element(2, State),
erlang:element(3, State),
erlang:element(4, State),
[],
erlang:element(6, State),
erlang:element(7, State),
erlang:element(8, State),
erlang:element(9, State)},
erlang:element(5, State)}
end.
-file("src/lightspeed/transport/matrix.gleam", 345).
?DOC(" Session state accessor.\n").
-spec session_state(adapter_state()) -> lightspeed@agent@session:session().
session_state(State) ->
lightspeed@transport@wisp_websocket:session_state(erlang:element(2, State)).
-file("src/lightspeed/transport/matrix.gleam", 350).
?DOC(" Active profile label.\n").
-spec profile_label(profile()) -> binary().
profile_label(Profile) ->
case Profile of
web_socket_only ->
<<"websocket_only"/utf8>>;
web_socket_with_server_sent_events_fallback ->
<<"websocket_plus_sse"/utf8>>;
web_socket_with_long_polling_fallback ->
<<"websocket_plus_long_polling"/utf8>>
end.
-file("src/lightspeed/transport/matrix.gleam", 359).
?DOC(" Active mode label.\n").
-spec mode_label(adapter_state()) -> binary().
mode_label(State) ->
case erlang:element(4, State) of
primary_realtime ->
<<"primary_realtime"/utf8>>;
fallback_polling ->
<<"fallback_polling"/utf8>>
end.
-file("src/lightspeed/transport/matrix.gleam", 367).
?DOC(" Whether fallback mode is active.\n").
-spec progressive_enhancement_active(adapter_state()) -> boolean().
progressive_enhancement_active(State) ->
case erlang:element(4, State) of
primary_realtime ->
false;
fallback_polling ->
true
end.
-file("src/lightspeed/transport/matrix.gleam", 384).
?DOC(" Active transport for current profile+mode.\n").
-spec active_transport(adapter_state()) -> lightspeed@transport:transport().
active_transport(State) ->
case {erlang:element(4, State), erlang:element(3, State)} of
{primary_realtime, _} ->
web_socket;
{fallback_polling, web_socket_with_server_sent_events_fallback} ->
server_sent_events;
{fallback_polling, web_socket_with_long_polling_fallback} ->
long_polling;
{fallback_polling, web_socket_only} ->
web_socket
end.
-file("src/lightspeed/transport/matrix.gleam", 395).
?DOC(" Active transport label.\n").
-spec active_transport_label(adapter_state()) -> binary().
active_transport_label(State) ->
lightspeed@transport:label(active_transport(State)).
-file("src/lightspeed/transport/matrix.gleam", 400).
?DOC(" Capability predicate for fixtures and docs.\n").
-spec capability_enabled(adapter_state(), capability()) -> boolean().
capability_enabled(State, Capability) ->
case Capability of
ordered_client_events ->
true;
reconnect_semantics ->
true;
server_push_frames ->
case erlang:element(4, State) of
primary_realtime ->
true;
fallback_polling ->
false
end;
bidirectional_channel ->
case erlang:element(4, State) of
primary_realtime ->
true;
fallback_polling ->
false
end;
progressive_fallback ->
progressive_enhancement_active(State)
end.
-file("src/lightspeed/transport/matrix.gleam", 492).
-spec list_length(list(any())) -> integer().
list_length(Values) ->
case Values of
[] ->
0;
[_ | Rest] ->
1 + list_length(Rest)
end.
-file("src/lightspeed/transport/matrix.gleam", 419).
?DOC(" Number of queued frames pending `poll`.\n").
-spec queued_outbound_count(adapter_state()) -> integer().
queued_outbound_count(State) ->
list_length(erlang:element(5, State)).