Skip to main content

src/aion@internal@pump.erl

-module(aion@internal@pump).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion/internal/pump.gleam").
-export([run/1, shield/1]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " The workflow-side query pump loop around suspending awaits.\n"
    "\n"
    " The engine answers workflow queries at yield points (AT-007 C20): when a\n"
    " query is pending for the workflow, a suspending await returns the\n"
    " sentinel `Error(\"aion_query:\" <> json)` instead of resolving. `run`\n"
    " recognises the sentinel, services the query through\n"
    " `aion_flow_query_pump` (handler lookup, try/catch, reply), and re-enters\n"
    " the same await, which re-resolves identically — pump iterations are\n"
    " invisible to history and to replay. Every other result passes through\n"
    " untouched.\n"
    "\n"
    " ## The suspension call-shape contract\n"
    "\n"
    " Every suspending engine NIF (`sleep`, `receive_signal`,\n"
    " `await_activity_result`, `await_child`, `collect_*`) parks the workflow\n"
    " process with beamr's message-wakeable suspension: a wake RE-EXECUTES the\n"
    " BEAM call instruction that invoked the NIF, and the NIF re-resolves from\n"
    " recorded history (the engine's two-phase suspend). Re-execution is only\n"
    " sound when that call instruction is idempotent. A tail call\n"
    " (`call_ext_last`) is NOT idempotent — it deallocates the caller's stack\n"
    " frame before the NIF runs, so re-executing it on wake pops a second\n"
    " frame, desyncing the return path (observed as the NIF's result value\n"
    " being *called as a function*: `bad function term {ok, <<\"fired\">>}`).\n"
    "\n"
    " Therefore every suspending FFI call MUST sit in non-tail position. The\n"
    " thunks passed to `run` enforce this with `shield`: the FFI call is the\n"
    " *argument* of a cross-module call, and the Erlang compiler can neither\n"
    " tail-call nor inline a remote call, so the suspending call always\n"
    " compiles to a plain `call_ext` whose re-execution is safe.\n"
    "\n"
    " In addition, every thunk's body must be exactly one shielded FFI call on\n"
    " *captured values*: arguments are precomputed outside the thunk, never\n"
    " derived inside it. Nothing in the thunk may recompute state on re-entry —\n"
    " the same contract the engine documents for hand-rolled await funs in\n"
    " `crates/aion/tests/fixtures/aion_fixture_query.erl` — and the pump itself\n"
    " relies on it when it re-enters the same await after servicing a query.\n"
).

-file("src/aion/internal/pump.gleam", 50).
?DOC(
    " Run a suspending await thunk, servicing any pending queries the engine\n"
    " surfaces as `aion_query:` sentinels before the await's own resolution.\n"
    "\n"
    " The loop is tail-recursive: each serviced query is answered exactly once,\n"
    " then the await is re-entered until it resolves with a non-sentinel\n"
    " result. A query handler raise never crashes the workflow — the Erlang\n"
    " pump converts it into a `reply_query_error` and the loop continues.\n"
    "\n"
    " Thunk authors: the suspending FFI call inside the thunk MUST be wrapped\n"
    " in [`shield`] (see the module docs for the call-shape contract).\n"
).
-spec run(fun(() -> {ok, binary()} | {error, binary()})) -> {ok, binary()} |
    {error, binary()}.
run(Do) ->
    case Do() of
        {error, <<"aion_query:"/utf8, Sentinel_payload/binary>>} ->
            aion_flow_query_pump:service(Sentinel_payload),
            run(Do);

        Outcome ->
            Outcome
    end.

-file("src/aion/internal/pump.gleam", 70).
?DOC(
    " Pin a suspending FFI call out of tail position.\n"
    "\n"
    " Called as `pump.shield(ffi.sleep(...))` from another module, the FFI\n"
    " call is evaluated as the argument of a remote call: argument position is\n"
    " never tail position, and the Erlang compiler never inlines remote calls\n"
    " (hot-code-loading semantics), so the suspending NIF is always invoked by\n"
    " a re-execution-safe `call_ext`. The body re-matches the result so the\n"
    " function cannot collapse to an identity even under whole-program\n"
    " optimisation. See the module docs for why a `call_ext_last` to a\n"
    " suspending NIF corrupts the stack on wake.\n"
).
-spec shield({ok, binary()} | {error, binary()}) -> {ok, binary()} |
    {error, binary()}.
shield(Outcome) ->
    case Outcome of
        {ok, Value} ->
            {ok, Value};

        {error, Reason} ->
            {error, Reason}
    end.