-module(aion@internal@pump).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion/internal/pump.gleam").
-export([run/1, shield/1]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(
" The workflow-side query pump loop around suspending awaits.\n"
"\n"
" The engine answers workflow queries at yield points (AT-007 C20): when a\n"
" query is pending for the workflow, a suspending await returns the\n"
" sentinel `Error(\"aion_query:\" <> json)` instead of resolving. `run`\n"
" recognises the sentinel, services the query through\n"
" `aion_flow_query_pump` (handler lookup, try/catch, reply), and re-enters\n"
" the same await, which re-resolves identically — pump iterations are\n"
" invisible to history and to replay. Every other result passes through\n"
" untouched.\n"
"\n"
" ## The suspension call-shape contract\n"
"\n"
" Every suspending engine NIF (`sleep`, `receive_signal`,\n"
" `await_activity_result`, `await_child`, `collect_*`) parks the workflow\n"
" process with beamr's message-wakeable suspension: a wake RE-EXECUTES the\n"
" BEAM call instruction that invoked the NIF, and the NIF re-resolves from\n"
" recorded history (the engine's two-phase suspend). Re-execution is only\n"
" sound when that call instruction is idempotent. A tail call\n"
" (`call_ext_last`) is NOT idempotent — it deallocates the caller's stack\n"
" frame before the NIF runs, so re-executing it on wake pops a second\n"
" frame, desyncing the return path (observed as the NIF's result value\n"
" being *called as a function*: `bad function term {ok, <<\"fired\">>}`).\n"
"\n"
" Therefore every suspending FFI call MUST sit in non-tail position. The\n"
" thunks passed to `run` enforce this with `shield`: the FFI call is the\n"
" *argument* of a cross-module call, and the Erlang compiler can neither\n"
" tail-call nor inline a remote call, so the suspending call always\n"
" compiles to a plain `call_ext` whose re-execution is safe.\n"
"\n"
" In addition, every thunk's body must be exactly one shielded FFI call on\n"
" *captured values*: arguments are precomputed outside the thunk, never\n"
" derived inside it. Nothing in the thunk may recompute state on re-entry —\n"
" the same contract the engine documents for hand-rolled await funs in\n"
" `crates/aion/tests/fixtures/aion_fixture_query.erl` — and the pump itself\n"
" relies on it when it re-enters the same await after servicing a query.\n"
).
-file("src/aion/internal/pump.gleam", 50).
?DOC(
" Run a suspending await thunk, servicing any pending queries the engine\n"
" surfaces as `aion_query:` sentinels before the await's own resolution.\n"
"\n"
" The loop is tail-recursive: each serviced query is answered exactly once,\n"
" then the await is re-entered until it resolves with a non-sentinel\n"
" result. A query handler raise never crashes the workflow — the Erlang\n"
" pump converts it into a `reply_query_error` and the loop continues.\n"
"\n"
" Thunk authors: the suspending FFI call inside the thunk MUST be wrapped\n"
" in [`shield`] (see the module docs for the call-shape contract).\n"
).
-spec run(fun(() -> {ok, binary()} | {error, binary()})) -> {ok, binary()} |
{error, binary()}.
run(Do) ->
case Do() of
{error, <<"aion_query:"/utf8, Sentinel_payload/binary>>} ->
aion_flow_query_pump:service(Sentinel_payload),
run(Do);
Outcome ->
Outcome
end.
-file("src/aion/internal/pump.gleam", 70).
?DOC(
" Pin a suspending FFI call out of tail position.\n"
"\n"
" Called as `pump.shield(ffi.sleep(...))` from another module, the FFI\n"
" call is evaluated as the argument of a remote call: argument position is\n"
" never tail position, and the Erlang compiler never inlines remote calls\n"
" (hot-code-loading semantics), so the suspending NIF is always invoked by\n"
" a re-execution-safe `call_ext`. The body re-matches the result so the\n"
" function cannot collapse to an identity even under whole-program\n"
" optimisation. See the module docs for why a `call_ext_last` to a\n"
" suspending NIF corrupts the stack on wake.\n"
).
-spec shield({ok, binary()} | {error, binary()}) -> {ok, binary()} |
{error, binary()}.
shield(Outcome) ->
case Outcome of
{ok, Value} ->
{ok, Value};
{error, Reason} ->
{error, Reason}
end.