-module(rally@generator@ws_handler).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/rally/generator/ws_handler.gleam").
-export([generate/8]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(
" WebSocket handler codegen.\n"
"\n"
" Generates ws_handler.gleam: connection init (with optional auth resolve),\n"
" frame decoding via protocol_wire, page-init handling (topic subscription,\n"
" auth policy checks), RPC dispatch, push frame delivery, and reauth on\n"
" stale sessions. The output wires together page dispatch, libero RPC\n"
" dispatch, and the system message logger.\n"
).
-file("src/rally/generator/ws_handler.gleam", 838).
-spec helpers_string(binary()) -> binary().
helpers_string(Protocol) ->
Send_fn = case Protocol of
<<"json"/utf8>> ->
<<"send_text_frame"/utf8>>;
_ ->
<<"send_binary_frame"/utf8>>
end,
<<<<"
fn send_pending_frames(conn: WebsocketConnection) -> Nil {
let frames = effect.drain_outgoing_frames()
list.each(frames, fn(frame) {
let _send_result = mist."/utf8,
Send_fn/binary>>/binary,
"(conn, frame)
Nil
})
}
fn debug_log(message: String) -> Nil {
use <- bool.guard(when: !env.is_dev(), return: Nil)
io.println_error(message)
}"/utf8>>.
-file("src/rally/generator/ws_handler.gleam", 173).
-spec generate_frame_handler_no_auth(binary()) -> binary().
generate_frame_handler_no_auth(Protocol) ->
Rpc_branch = <<"\n _ -> {
case wire.decode_ws_rpc_envelope(msg) {
Ok(envelope) -> {
let request_id = wire.rpc_request_id(envelope)
debug_log(\"[rally:ws] RPC: request_id=\" <> int.to_string(request_id))
let assert Ok(server_context) = effect.get_stored_server_context()
let current_page = effect.get_ws_page()
let start = timestamp.system_time()
let #(result, new_ctx) = wire.dispatch_rpc(envelope, server_context)
let elapsed_ms =
timestamp.difference(start, timestamp.system_time())
|> duration.to_milliseconds()
let session_id = effect.get_ws_session()
let assert Ok(db_conn) = system.get_conn()
system.log_to_server(
db: db_conn,
session_id: session_id,
user_id: Error(Nil),
page: current_page,
variant_name: wire.rpc_identity(envelope),
raw_payload: wire.rpc_raw_payload(envelope),
elapsed_ms: elapsed_ms,
)
let Nil = effect.put_ws_state(conn, new_ctx, current_page)
wire.send_rpc_result(conn, result)
send_pending_frames(conn)
mist.continue(state)
}
Error(Nil) -> {
let result = wire.malformed_rpc_result()
wire.send_rpc_result(conn, result)
send_pending_frames(conn)
mist.continue(state)
}
}
}"/utf8>>,
Binary_branch = case Protocol of
<<"json"/utf8>> ->
<<"\n mist.Binary(_data) -> {
let error_frame = wire.encode_error(None, [JsonError(\"frame\", \"binary frames are not supported by the JSON protocol\")])
let _send_result = mist.send_text_frame(conn, error_frame)
mist.continue(state)
}"/utf8>>;
_ ->
<<"\n mist.Binary(data) -> {
debug_log(\"[rally:ws] Binary frame: \" <> int.to_string(bit_array.byte_size(data)) <> \" bytes\")
case wire.decode_ws_rpc_envelope(msg) {
Ok(envelope) -> {
let request_id = wire.rpc_request_id(envelope)
debug_log(\"[rally:ws] RPC: request_id=\" <> int.to_string(request_id))
let assert Ok(server_context) = effect.get_stored_server_context()
let current_page = effect.get_ws_page()
let start = timestamp.system_time()
let #(result, new_ctx) = wire.dispatch_rpc(envelope, server_context)
let elapsed_ms =
timestamp.difference(start, timestamp.system_time())
|> duration.to_milliseconds()
let session_id = effect.get_ws_session()
let assert Ok(db_conn) = system.get_conn()
system.log_to_server(
db: db_conn,
session_id: session_id,
user_id: Error(Nil),
page: current_page,
variant_name: wire.rpc_identity(envelope),
raw_payload: wire.rpc_raw_payload(envelope),
elapsed_ms: elapsed_ms,
)
let Nil = effect.put_ws_state(conn, new_ctx, current_page)
wire.send_rpc_result(conn, result)
send_pending_frames(conn)
mist.continue(state)
}
Error(Nil) ->
case wire.decode_request(data) {
Ok(#(page, request_id, _value)) if request_id == 0 -> {
debug_log(\"[rally:ws] page_init: \" <> page)
let old_page = effect.get_ws_page()
let assert Ok(server_context) = effect.get_stored_server_context()
let Nil = effect.put_ws_state(conn, server_context, page)
case old_page {
\"\" -> Nil
_ -> topics.leave(\"page:\" <> old_page)
}
topics.join(\"page:\" <> page)
let response_frame = wire.encode_response(request_id:, value: wire.page_init_ok())
let _send_result = mist.send_binary_frame(conn, response_frame)
send_pending_frames(conn)
mist.continue(state)
}
_ -> {
debug_log(\"[rally:ws] decode_request FAILED\")
mist.continue(state)
}
}
}
}"/utf8>>
end,
Text_branch = case Protocol of
<<"json"/utf8>> ->
gleam@string:replace(
Rpc_branch,
<<"\n _ ->"/utf8>>,
<<"\n mist.Text(_data) ->"/utf8>>
);
_ ->
<<""/utf8>>
end,
Send_fn = case Protocol of
<<"json"/utf8>> ->
<<"send_text_frame"/utf8>>;
_ ->
<<"send_binary_frame"/utf8>>
end,
Msg_catch_all = case Protocol of
<<"json"/utf8>> ->
<<""/utf8>>;
_ ->
<<"\n _ -> mist.continue(state)"/utf8>>
end,
<<<<<<<<<<<<<<<<<<<<"pub fn handler(
state state: Nil,
msg msg: WebsocketMessage(a),
conn conn: WebsocketConnection,
) {
debug_log(\"[rally:ws] handler called\")
case msg {"/utf8,
Binary_branch/binary>>/binary,
Text_branch/binary>>/binary,
"
mist.Custom(msg) -> {
case effect.decode_rally_push"/utf8>>/binary,
(case Protocol of
<<"json"/utf8>> ->
<<"_json"/utf8>>;
_ ->
<<""/utf8>>
end)/binary>>/binary,
"(msg) {
Ok(frame) -> {
let _send_result = mist."/utf8>>/binary,
Send_fn/binary>>/binary,
"(conn, frame)
mist.continue(state)
}
Error(Nil) -> mist.continue(state)
}
}
mist.Closed -> mist.stop()
mist.Shutdown -> mist.stop()"/utf8>>/binary,
Msg_catch_all/binary>>/binary,
"
}
}
"/utf8>>/binary,
(helpers_string(Protocol))/binary>>.
-file("src/rally/generator/ws_handler.gleam", 668).
-spec page_has_authorize_fn(
list({rally@types:scanned_route(), rally@types:page_contract()})
) -> binary().
page_has_authorize_fn(Page_contracts) ->
Arms = begin
_pipe = gleam@list:filter_map(
Page_contracts,
fun(Pair) ->
{Route, Contract} = Pair,
case erlang:element(16, Contract) of
true ->
{ok,
<<<<" \""/utf8,
(erlang:element(3, Route))/binary>>/binary,
"\" -> True"/utf8>>};
false ->
{error, nil}
end
end
),
_pipe@1 = gleam@string:join(_pipe, <<"\n"/utf8>>),
(fun(S) -> case S of
<<""/utf8>> ->
<<" _ -> False"/utf8>>;
_ ->
<<S/binary, "\n _ -> False"/utf8>>
end end)(_pipe@1)
end,
<<<<"fn page_has_authorize(page: String) -> Bool {
case page {
"/utf8,
Arms/binary>>/binary,
"
}
}"/utf8>>.
-file("src/rally/generator/ws_handler.gleam", 547).
-spec rpc_body(boolean(), binary()) -> binary().
rpc_body(Has_endpoints, Auth_ref) ->
case Has_endpoints of
true ->
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<" Ok(envelope) -> {\n"/utf8,
" let request_id = wire.rpc_request_id(envelope)\n"/utf8>>/binary,
" debug_log(\"[rally:ws] RPC: request_id=\" <> int.to_string(request_id))\n"/utf8>>/binary,
" let assert Ok(server_context) = effect.get_stored_server_context()\n"/utf8>>/binary,
" let current_page = effect.get_ws_page()\n"/utf8>>/binary,
" case effect.get_ws_identity() {\n"/utf8>>/binary,
" Error(Nil) -> {\n"/utf8>>/binary,
" let result = wire.auth_error_result(request_id, \"auth:forbidden\")\n"/utf8>>/binary,
" wire.send_rpc_result(conn, result)\n"/utf8>>/binary,
" send_pending_frames(conn)\n"/utf8>>/binary,
" mist.continue(state)\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" Ok(identity) -> {\n"/utf8>>/binary,
" case handler_page_info(wire.rpc_identity(envelope)) {\n"/utf8>>/binary,
" Error(Nil) -> {\n"/utf8>>/binary,
" let result = wire.auth_error_result(request_id, \"auth:unknown_rpc\")\n"/utf8>>/binary,
" wire.send_rpc_result(conn, result)\n"/utf8>>/binary,
" send_pending_frames(conn)\n"/utf8>>/binary,
" mist.continue(state)\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" Ok(info) -> {\n"/utf8>>/binary,
" let owning_page = info.page\n"/utf8>>/binary,
" let required = info.required\n"/utf8>>/binary,
" let has_authorize = info.has_authorize\n"/utf8>>/binary,
" case owning_page != current_page {\n"/utf8>>/binary,
" True -> {\n"/utf8>>/binary,
" let result = wire.auth_error_result(request_id, \"auth:page_mismatch\")\n"/utf8>>/binary,
" wire.send_rpc_result(conn, result)\n"/utf8>>/binary,
" send_pending_frames(conn)\n"/utf8>>/binary,
" mist.continue(state)\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" False -> {\n"/utf8>>/binary,
" case required && !"/utf8>>/binary,
Auth_ref/binary>>/binary,
".is_authenticated(identity) {\n"/utf8>>/binary,
" True -> {\n"/utf8>>/binary,
" let result = wire.auth_error_result(request_id, \"auth:redirect:\" <> "/utf8>>/binary,
Auth_ref/binary>>/binary,
".redirect_url)\n"/utf8>>/binary,
" wire.send_rpc_result(conn, result)\n"/utf8>>/binary,
" send_pending_frames(conn)\n"/utf8>>/binary,
" mist.continue(state)\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" False -> {\n"/utf8>>/binary,
" case has_authorize && !check_page_authorize(owning_page, server_context, identity) {\n"/utf8>>/binary,
" True -> {\n"/utf8>>/binary,
" let result = wire.auth_error_result(request_id, \"auth:forbidden\")\n"/utf8>>/binary,
" wire.send_rpc_result(conn, result)\n"/utf8>>/binary,
" send_pending_frames(conn)\n"/utf8>>/binary,
" mist.continue(state)\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" False -> {\n"/utf8>>/binary,
" let start = timestamp.system_time()\n"/utf8>>/binary,
" let #(result, new_ctx) = wire.dispatch_rpc(envelope, server_context, identity)\n"/utf8>>/binary,
" let elapsed_ms =\n"/utf8>>/binary,
" timestamp.difference(start, timestamp.system_time())\n"/utf8>>/binary,
" |> duration.to_milliseconds()\n"/utf8>>/binary,
"\n"/utf8>>/binary,
" let session_id = effect.get_ws_session()\n"/utf8>>/binary,
" let assert Ok(db_conn) = system.get_conn()\n"/utf8>>/binary,
" system.log_to_server(db: db_conn, session_id: session_id, user_id: Error(Nil), page: current_page, variant_name: wire.rpc_identity(envelope), raw_payload: wire.rpc_raw_payload(envelope), elapsed_ms: elapsed_ms)\n"/utf8>>/binary,
"\n"/utf8>>/binary,
" let Nil = effect.put_ws_state(conn, new_ctx, current_page)\n"/utf8>>/binary,
" wire.send_rpc_result(conn, result)\n"/utf8>>/binary,
" send_pending_frames(conn)\n"/utf8>>/binary,
" mist.continue(state)\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" }\n"/utf8>>/binary,
" }"/utf8>>;
false ->
<<<<<<<<<<<<<<" Ok(envelope) -> {\n"/utf8,
" let request_id = wire.rpc_request_id(envelope)\n"/utf8>>/binary,
" debug_log(\"[rally:ws] RPC: request_id=\" <> int.to_string(request_id))\n"/utf8>>/binary,
" let result = wire.auth_error_result(request_id, \"auth:unknown_rpc\")\n"/utf8>>/binary,
" wire.send_rpc_result(conn, result)\n"/utf8>>/binary,
" send_pending_frames(conn)\n"/utf8>>/binary,
" mist.continue(state)\n"/utf8>>/binary,
" }"/utf8>>
end.
-file("src/rally/generator/ws_handler.gleam", 860).
-spec last_segment(binary()) -> binary().
last_segment(Module_path) ->
case begin
_pipe = gleam@string:split(Module_path, <<"/"/utf8>>),
gleam@list:last(_pipe)
end of
{ok, Seg} ->
Seg;
{error, nil} ->
Module_path
end.
-file("src/rally/generator/ws_handler.gleam", 878).
-spec import_as(binary(), binary()) -> binary().
import_as(Module_path, Alias) ->
case last_segment(Module_path) =:= Alias of
true ->
<<"import "/utf8, Module_path/binary>>;
false ->
<<<<<<"import "/utf8, Module_path/binary>>/binary, " as "/utf8>>/binary,
Alias/binary>>
end.
-file("src/rally/generator/ws_handler.gleam", 826).
-spec stub_check_page_authorize(binary()) -> binary().
stub_check_page_authorize(Auth_ref) ->
<<<<"fn check_page_authorize(page: String, _server_context: ServerContext, _identity: "/utf8,
Auth_ref/binary>>/binary,
".Identity) -> Bool {
case page {
_ -> False
}
}"/utf8>>.
-file("src/rally/generator/ws_handler.gleam", 867).
-spec module_to_alias(binary()) -> binary().
module_to_alias(Module_path) ->
gleam@string:replace(Module_path, <<"/"/utf8>>, <<"_"/utf8>>).
-file("src/rally/generator/ws_handler.gleam", 694).
-spec generate_ws_check_page_authorize(
list({rally@types:scanned_route(), rally@types:page_contract()}),
binary()
) -> binary().
generate_ws_check_page_authorize(Page_contracts, Auth_ref) ->
With_auth = gleam@list:filter_map(
Page_contracts,
fun(Pair) ->
{Route, Contract} = Pair,
case erlang:element(16, Contract) of
true ->
{ok,
{erlang:element(3, Route),
erlang:element(5, Route),
module_to_alias(erlang:element(5, Route))}};
false ->
{error, nil}
end
end
),
case With_auth of
[] ->
stub_check_page_authorize(Auth_ref);
_ ->
Arms = begin
_pipe = gleam@list:map(
With_auth,
fun(Pair@1) ->
{Page, _, Alias} = Pair@1,
<<<<<<<<" \""/utf8, Page/binary>>/binary,
"\" -> "/utf8>>/binary,
Alias/binary>>/binary,
".authorize(server_context, identity)"/utf8>>
end
),
_pipe@1 = gleam@string:join(_pipe, <<"\n"/utf8>>),
(fun(S) -> <<S/binary, "\n _ -> False"/utf8>> end)(_pipe@1)
end,
Imports = begin
_pipe@2 = gleam@list:map(
With_auth,
fun(Pair@2) ->
{_, Module_path, Alias@1} = Pair@2,
import_as(Module_path, Alias@1)
end
),
_pipe@3 = gleam@list:unique(_pipe@2),
gleam@string:join(_pipe@3, <<"\n"/utf8>>)
end,
<<<<<<<<<<Imports/binary,
"\n\nfn check_page_authorize(page: String, server_context: ServerContext, identity: "/utf8>>/binary,
Auth_ref/binary>>/binary,
".Identity) -> Bool {
case page {
"/utf8>>/binary,
Arms/binary>>/binary,
"
}
}"/utf8>>
end.
-file("src/rally/generator/ws_handler.gleam", 871).
-spec bool_str(boolean()) -> binary().
bool_str(B) ->
case B of
true ->
<<"True"/utf8>>;
false ->
<<"False"/utf8>>
end.
-file("src/rally/generator/ws_handler.gleam", 804).
-spec endpoint_wire_tags(libero@scanner:handler_endpoint(), binary()) -> list(binary()).
endpoint_wire_tags(Endpoint, Protocol) ->
case Protocol of
<<"json"/utf8>> ->
[rally@generator@json_rpc_dispatch:endpoint_json_tag(Endpoint)];
_ ->
Function_tag = <<"server_"/utf8,
(erlang:element(3, Endpoint))/binary>>,
Hash_tags = case erlang:element(8, Endpoint) of
{some, {Module_path, Type_name}} ->
Fields = gleam@list:map(
erlang:element(6, Endpoint),
fun(Param) -> erlang:element(2, Param) end
),
{_, Hash} = libero@wire_identity:wire_identity(
Module_path,
Type_name,
Fields
),
[Hash];
none ->
[]
end,
lists:append([Function_tag], Hash_tags)
end.
-file("src/rally/generator/ws_handler.gleam", 748).
-spec generate_handler_page_info(
list({rally@types:scanned_route(), rally@types:page_contract()}),
list(libero@scanner:handler_endpoint()),
binary()
) -> binary().
generate_handler_page_info(Page_contracts, Endpoints, Protocol) ->
Contract_for = gleam@list:filter_map(
Page_contracts,
fun(Pair) ->
{Route, Contract} = Pair,
{ok, {erlang:element(5, Route), erlang:element(3, Route), Contract}}
end
),
Arms = begin
_pipe@1 = gleam@list:flat_map(
Endpoints,
fun(Endpoint) ->
case gleam@list:find_map(
Contract_for,
fun(Pair@1) ->
{Module_path, Variant_name, Contract@1} = Pair@1,
case Module_path =:= erlang:element(2, Endpoint) of
true ->
{ok, {Variant_name, Contract@1}};
false ->
{error, nil}
end
end
) of
{ok, Page_info} ->
{Variant_name@1, Contract@2} = Page_info,
_pipe = endpoint_wire_tags(Endpoint, Protocol),
gleam@list:map(
_pipe,
fun(Wire_tag) ->
<<<<<<<<<<<<<<<<" \""/utf8, Wire_tag/binary>>/binary,
"\" -> Ok(PageAuthInfo(page: \""/utf8>>/binary,
Variant_name@1/binary>>/binary,
"\", required: "/utf8>>/binary,
(bool_str(
erlang:element(
15,
Contract@2
)
))/binary>>/binary,
", has_authorize: "/utf8>>/binary,
(bool_str(
erlang:element(16, Contract@2)
))/binary>>/binary,
"))"/utf8>>
end
);
{error, nil} ->
erlang:error(#{gleam_error => panic,
message => <<"rally codegen: WS handler has no matching page contract"/utf8>>,
file => <<?FILEPATH/utf8>>,
module => <<"rally/generator/ws_handler"/utf8>>,
function => <<"generate_handler_page_info"/utf8>>,
line => 787})
end
end
),
_pipe@2 = gleam@string:join(_pipe@1, <<"\n"/utf8>>),
(fun(S) -> <<S/binary, "\n _ -> Error(Nil)"/utf8>> end)(_pipe@2)
end,
<<<<"type PageAuthInfo {
PageAuthInfo(page: String, required: Bool, has_authorize: Bool)
}
fn handler_page_info(variant: String) -> Result(PageAuthInfo, Nil) {
case variant {
"/utf8,
Arms/binary>>/binary,
"
}
}"/utf8>>.
-file("src/rally/generator/ws_handler.gleam", 639).
-spec generate_page_auth_policy(
list({rally@types:scanned_route(), rally@types:page_contract()})
) -> binary().
generate_page_auth_policy(Page_contracts) ->
Arms = begin
_pipe = gleam@list:filter_map(
Page_contracts,
fun(Pair) ->
{Route, Contract} = Pair,
case erlang:element(14, Contract) of
true ->
Policy = case erlang:element(15, Contract) of
true ->
<<"rally_auth.Required"/utf8>>;
false ->
<<"rally_auth.Optional"/utf8>>
end,
{ok,
<<<<<<" \""/utf8,
(erlang:element(3, Route))/binary>>/binary,
"\" -> "/utf8>>/binary,
Policy/binary>>};
false ->
{error, nil}
end
end
),
_pipe@1 = gleam@string:join(_pipe, <<"\n"/utf8>>),
(fun(S) -> <<S/binary, "\n _ -> rally_auth.Optional"/utf8>> end)(
_pipe@1
)
end,
<<<<"import rally_runtime/auth as rally_auth
fn page_auth_policy(page: String) -> rally_auth.AuthPolicy {
case page {
"/utf8,
Arms/binary>>/binary,
"
}
}"/utf8>>.
-file("src/rally/generator/ws_handler.gleam", 324).
-spec generate_frame_handler_with_auth(
list({rally@types:scanned_route(), rally@types:page_contract()}),
binary(),
binary(),
list(libero@scanner:handler_endpoint()),
binary()
) -> binary().
generate_frame_handler_with_auth(
Page_contracts,
Auth_ref,
From_session_ref,
Endpoints,
Protocol
) ->
Has_endpoints = not gleam@list:is_empty(Endpoints),
Page_auth_policy_fn = generate_page_auth_policy(Page_contracts),
Handler_page_info_fn = case Has_endpoints of
true ->
generate_handler_page_info(Page_contracts, Endpoints, Protocol);
false ->
<<""/utf8>>
end,
Check_page_authorize_fn = generate_ws_check_page_authorize(
Page_contracts,
Auth_ref
),
Binary_branch = case Protocol of
<<"json"/utf8>> ->
<<"\n mist.Binary(_data) -> {
let error_frame = wire.encode_error(None, [JsonError(\"frame\", \"binary frames are not supported by the JSON protocol\")])
let _send_result = mist.send_text_frame(conn, error_frame)
mist.continue(state)
}"/utf8>>;
_ ->
<<<<<<<<<<<<<<<<<<<<<<<<"\n mist.Binary(data) -> {
debug_log(\"[rally:ws] Binary frame: \" <> int.to_string(bit_array.byte_size(data)) <> \" bytes\")
// Reauth: re-resolve identity if the last auth check is stale.
let epoch = timestamp.from_unix_seconds(0)
let now = duration.to_milliseconds(timestamp.difference(epoch, timestamp.system_time()))
let last_auth = effect.get_ws_auth_timestamp()
case now - last_auth > 1800000 {
True -> {
let session_id = effect.get_ws_session()
let hostname = effect.get_ws_hostname()
let current_page = effect.get_ws_page()
let assert Ok(server_context) = effect.get_stored_server_context()
case "/utf8,
Auth_ref/binary>>/binary,
".resolve(server_context, session_id) {
Error(Nil) -> {
effect.clear_ws_auth_state()
let Nil = effect.put_ws_state(conn, server_context, current_page)
}
Ok(identity) -> {
let #(_, enriched_sc) = "/utf8>>/binary,
From_session_ref/binary>>/binary,
".from_session(server_context: server_context, session_id: session_id, hostname: hostname, identity: identity)
let Nil = effect.put_ws_state(conn, enriched_sc, current_page)
let Nil = effect.put_ws_identity(identity)
let Nil = effect.put_ws_auth_timestamp(now)
}
}
}
False -> Nil
}
case wire.decode_ws_rpc_envelope(msg) {
"/utf8>>/binary,
(rpc_body(Has_endpoints, Auth_ref))/binary>>/binary,
"
Error(Nil) ->
case wire.decode_request(data) {
Ok(#(page, request_id, _value)) if request_id == 0 -> {
debug_log(\"[rally:ws] page_init: \" <> page)
let #(can_proceed, response_frame) = case page_auth_policy(page) {
rally_auth.Required -> {
case effect.get_ws_identity() {
Error(Nil) ->
#(False, wire.encode_response(request_id:, value: Error(\"auth:redirect:\" <> "/utf8>>/binary,
Auth_ref/binary>>/binary,
".redirect_url)))
Ok(identity) -> {
case "/utf8>>/binary,
Auth_ref/binary>>/binary,
".is_authenticated(identity) {
False ->
#(False, wire.encode_response(request_id:, value: Error(\"auth:redirect:\" <> "/utf8>>/binary,
Auth_ref/binary>>/binary,
".redirect_url)))
True -> {
case page_has_authorize(page) {
True -> {
let assert Ok(server_context) = effect.get_stored_server_context()
case check_page_authorize(page, server_context, identity) {
False ->
#(False, wire.encode_response(request_id:, value: Error(\"auth:forbidden\")))
True -> #(True, wire.encode_response(request_id:, value: wire.page_init_ok()))
}
}
False -> #(True, wire.encode_response(request_id:, value: wire.page_init_ok()))
}
}
}
}
}
}
rally_auth.Optional -> {
case page_has_authorize(page) {
True -> {
case effect.get_ws_identity() {
Error(Nil) ->
#(False, wire.encode_response(request_id:, value: Error(\"auth:forbidden\")))
Ok(identity) -> {
let assert Ok(server_context) = effect.get_stored_server_context()
case check_page_authorize(page, server_context, identity) {
False ->
#(False, wire.encode_response(request_id:, value: Error(\"auth:forbidden\")))
True -> #(True, wire.encode_response(request_id:, value: wire.page_init_ok()))
}
}
}
}
False -> #(True, wire.encode_response(request_id:, value: wire.page_init_ok()))
}
}
}
case can_proceed {
True -> {
let old_page = effect.get_ws_page()
let assert Ok(server_context) = effect.get_stored_server_context()
let Nil = effect.put_ws_state(conn, server_context, page)
case old_page {
\"\" -> Nil
_ -> topics.leave(\"page:\" <> old_page)
}
topics.join(\"page:\" <> page)
let _send_result = mist.send_binary_frame(conn, response_frame)
send_pending_frames(conn)
mist.continue(state)
}
False -> {
let _send_result = mist.send_binary_frame(conn, response_frame)
send_pending_frames(conn)
mist.continue(state)
}
}
}
_ -> {
debug_log(\"[rally:ws] decode_request FAILED\")
mist.continue(state)
}
}
}
}"/utf8>>
end,
Text_branch = case Protocol of
<<"json"/utf8>> ->
<<<<<<<<<<<<"\n mist.Text(_data) -> {
// Reauth: re-resolve identity if the last auth check is stale.
let epoch = timestamp.from_unix_seconds(0)
let now = duration.to_milliseconds(timestamp.difference(epoch, timestamp.system_time()))
let last_auth = effect.get_ws_auth_timestamp()
case now - last_auth > 1800000 {
True -> {
let session_id = effect.get_ws_session()
let hostname = effect.get_ws_hostname()
let current_page = effect.get_ws_page()
let assert Ok(server_context) = effect.get_stored_server_context()
case "/utf8,
Auth_ref/binary>>/binary,
".resolve(server_context, session_id) {
Error(Nil) -> {
effect.clear_ws_auth_state()
let Nil = effect.put_ws_state(conn, server_context, current_page)
}
Ok(identity) -> {
let #(_, enriched_sc) = "/utf8>>/binary,
From_session_ref/binary>>/binary,
".from_session(server_context: server_context, session_id: session_id, hostname: hostname, identity: identity)
let Nil = effect.put_ws_state(conn, enriched_sc, current_page)
let Nil = effect.put_ws_identity(identity)
let Nil = effect.put_ws_auth_timestamp(now)
}
}
}
False -> Nil
}
case wire.decode_ws_rpc_envelope(msg) {
"/utf8>>/binary,
(rpc_body(Has_endpoints, Auth_ref))/binary>>/binary,
"
Error(Nil) -> {
let result = wire.malformed_rpc_result()
wire.send_rpc_result(conn, result)
send_pending_frames(conn)
mist.continue(state)
}
}
}"/utf8>>;
_ ->
<<""/utf8>>
end,
Send_fn = case Protocol of
<<"json"/utf8>> ->
<<"send_text_frame"/utf8>>;
_ ->
<<"send_binary_frame"/utf8>>
end,
Msg_catch_all = case Protocol of
<<"json"/utf8>> ->
<<""/utf8>>;
_ ->
<<"\n _ -> mist.continue(state)"/utf8>>
end,
Handler = <<<<<<<<<<<<<<<<<<<<"pub fn handler(
state state: Nil,
msg msg: WebsocketMessage(a),
conn conn: WebsocketConnection,
) {
debug_log(\"[rally:ws] handler called\")
case msg {"/utf8,
Binary_branch/binary>>/binary,
Text_branch/binary>>/binary,
"
mist.Custom(msg) -> {
case effect.decode_rally_push"/utf8>>/binary,
(case Protocol of
<<"json"/utf8>> ->
<<"_json"/utf8>>;
_ ->
<<""/utf8>>
end)/binary>>/binary,
"(msg) {
Ok(frame) -> {
let _send_result = mist."/utf8>>/binary,
Send_fn/binary>>/binary,
"(conn, frame)
mist.continue(state)
}
Error(Nil) -> mist.continue(state)
}
}
mist.Closed -> mist.stop()
mist.Shutdown -> mist.stop()"/utf8>>/binary,
Msg_catch_all/binary>>/binary,
"
}
}
"/utf8>>/binary,
(helpers_string(Protocol))/binary>>,
Page_info_section = case Has_endpoints of
true ->
<<"\n\n"/utf8, Handler_page_info_fn/binary>>;
false ->
<<""/utf8>>
end,
<<<<<<<<<<<<<<Handler/binary, "\n\n"/utf8>>/binary,
Page_auth_policy_fn/binary>>/binary,
"\n\n"/utf8>>/binary,
(page_has_authorize_fn(Page_contracts))/binary>>/binary,
Page_info_section/binary>>/binary,
"\n\n"/utf8>>/binary,
Check_page_authorize_fn/binary>>.
-file("src/rally/generator/ws_handler.gleam", 99).
-spec generate_init_no_auth() -> binary().
generate_init_no_auth() ->
<<"pub fn on_init(
conn conn: WebsocketConnection,
server_context server_context: ServerContext,
session_id session_id: String,
hostname _hostname: String,
) {
ensure_atoms()
topics.start()
let Nil = effect.put_ws_state(conn, server_context, \"\")
let Nil = effect.put_ws_session(session_id)
topics.join(\"app\")
topics.join(\"session:\" <> session_id)
let selector = process.new_selector()
|> process.select_other(fn(msg) { msg })
#(Nil, Some(selector))
}
pub fn on_close(_state: Nil) -> Nil {
Nil
}"/utf8>>.
-file("src/rally/generator/ws_handler.gleam", 124).
-spec generate_init_with_auth(binary(), binary()) -> binary().
generate_init_with_auth(Auth_ref, From_session_ref) ->
<<<<<<<<"pub fn on_init(
conn conn: WebsocketConnection,
server_context server_context: ServerContext,
session_id session_id: String,
hostname hostname: String,
) {
ensure_atoms()
topics.start()
let Nil = effect.put_ws_session(session_id)
case "/utf8,
Auth_ref/binary>>/binary,
".resolve(server_context, session_id) {
Error(Nil) -> {
// Infrastructure failure — no identity stored. Connection starts with
// the un-enriched server_context. Subsequent page-init and RPC will see
// get_ws_identity() -> Error(Nil) and fail closed for Required pages.
let Nil = effect.put_ws_state(conn, server_context, \"\")
topics.join(\"app\")
topics.join(\"session:\" <> session_id)
let selector = process.new_selector()
|> process.select_other(fn(msg) { msg })
#(Nil, Some(selector))
}
Ok(identity) -> {
let #(_, enriched_sc) = "/utf8>>/binary,
From_session_ref/binary>>/binary,
".from_session(server_context: server_context, session_id: session_id, hostname: hostname, identity: identity)
let Nil = effect.put_ws_state(conn, enriched_sc, \"\")
let Nil = effect.put_ws_identity(identity)
let Nil = effect.put_ws_hostname(hostname)
let epoch = timestamp.from_unix_seconds(0)
let now = duration.to_milliseconds(timestamp.difference(epoch, timestamp.system_time()))
let Nil = effect.put_ws_auth_timestamp(now)
topics.join(\"app\")
topics.join(\"session:\" <> session_id)
let selector = process.new_selector()
|> process.select_other(fn(msg) { msg })
#(Nil, Some(selector))
}
}
}
pub fn on_close(_state: Nil) -> Nil {
Nil
}"/utf8>>.
-file("src/rally/generator/ws_handler.gleam", 19).
-spec generate(
list({rally@types:scanned_route(), rally@types:page_contract()}),
binary(),
binary(),
gleam@option:option(rally@types:auth_config()),
binary(),
list(libero@scanner:handler_endpoint()),
binary(),
binary()
) -> binary().
generate(
Page_contracts,
Atoms_module,
_,
Auth_config,
From_session_module,
Endpoints,
Wire_import_module,
Protocol
) ->
Has_auth = gleam@option:is_some(Auth_config),
Has_endpoints = not gleam@list:is_empty(Endpoints),
Auth_module_ref = case Auth_config of
{some, {auth_config, Auth_module}} ->
last_segment(Auth_module);
none ->
<<""/utf8>>
end,
From_session_ref = last_segment(From_session_module),
Auth_imports = case Auth_config of
{some, {auth_config, Auth_module@1}} ->
<<<<(import_as(Auth_module@1, Auth_module_ref))/binary, "\n"/utf8>>/binary,
(case From_session_ref =:= <<"server_context"/utf8>> of
true ->
<<""/utf8>>;
false ->
<<(import_as(From_session_module, From_session_ref))/binary,
"\n"/utf8>>
end)/binary>>;
none ->
<<""/utf8>>
end,
System_import = case Has_auth andalso not Has_endpoints of
true ->
<<""/utf8>>;
false ->
<<"import rally_runtime/system\n"/utf8>>
end,
Json_imports = case Protocol of
<<"json"/utf8>> ->
<<"import libero/json/error.{JsonError}\n"/utf8>>;
_ ->
<<""/utf8>>
end,
Bit_array_import = case Protocol of
<<"json"/utf8>> ->
<<""/utf8>>;
_ ->
<<"import gleam/bit_array\n"/utf8>>
end,
Time_imports = <<"import gleam/time/duration\n"/utf8,
"import gleam/time/timestamp\n"/utf8>>,
Header = <<<<<<<<<<<<<<<<<<<<<<<<"// Generated by Rally — do not edit.\n\nimport gleam/bool\n"/utf8,
Bit_array_import/binary>>/binary,
Time_imports/binary>>/binary,
"import gleam/int\nimport gleam/io\nimport gleam/list\nimport gleam/option.{None, Some}\nimport gleam/erlang/process\nimport rally_runtime/effect\nimport rally_runtime/env\n"/utf8>>/binary,
(import_as(
Wire_import_module,
<<"wire"/utf8>>
))/binary>>/binary,
"\nimport rally_runtime/topics\n"/utf8>>/binary,
System_import/binary>>/binary,
"import server_context.{type ServerContext}\nimport mist.{type WebsocketConnection, type WebsocketMessage}\n"/utf8>>/binary,
Auth_imports/binary>>/binary,
Json_imports/binary>>/binary,
"\n@external(erlang, \""/utf8>>/binary,
Atoms_module/binary>>/binary,
"\", \"ensure\")\nfn ensure_atoms() -> Nil\n"/utf8>>,
Init = case Has_auth of
true ->
generate_init_with_auth(Auth_module_ref, From_session_ref);
false ->
generate_init_no_auth()
end,
Handler = case Has_auth of
true ->
generate_frame_handler_with_auth(
Page_contracts,
Auth_module_ref,
From_session_ref,
Endpoints,
Protocol
);
false ->
generate_frame_handler_no_auth(Protocol)
end,
<<(gleam@string:join([Header, Init, Handler], <<"\n\n"/utf8>>))/binary,
"\n"/utf8>>.