Skip to main content

src/aion@query.erl

-module(aion@query).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/aion/query.gleam").
-export([handler/3, dispatch/2, recorded_observations/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Typed query handler registration and replies.\n").

-file("src/aion/query.gleam", 107).
?DOC(
    " Map the engine's raw query error strings to the typed taxonomy.\n"
    "\n"
    " The engine emits exactly: `unknown:<name>` (unregistered query name),\n"
    " `timeout` (reply deadline expired), `not_running:<workflow_id>` (terminal\n"
    " or non-resident target), `handler_failed:<message>` (the handler ran and\n"
    " reported failure — the `query_failed` wire code), and engine-fault strings\n"
    " (`unknown_workflow:`, `reply_dropped`, `engine:`) which surface as\n"
    " `QueryEngineFailure`.\n"
).
-spec query_error(binary()) -> aion@error:query_error().
query_error(Raw) ->
    case Raw of
        <<"unknown:"/utf8, Name/binary>> ->
            {unknown_query, Name};

        <<"timeout"/utf8>> ->
            {query_timed_out, {timed_out, Raw}};

        <<"not_running:"/utf8, Workflow_id/binary>> ->
            {query_not_running, Workflow_id};

        <<"handler_failed:"/utf8, Message/binary>> ->
            {query_handler_failed, Message};

        _ ->
            {query_engine_failure, Raw}
    end.

-file("src/aion/query.gleam", 91).
-spec register_config() -> binary().
register_config() ->
    _pipe = gleam@json:object([]),
    gleam@json:to_string(_pipe).

-file("src/aion/query.gleam", 33).
?DOC(
    " Register a typed read-only query handler with AT's query service.\n"
    "\n"
    " The handler's return type is fixed by the `Codec(a)` supplied at\n"
    " registration. When the engine dispatches a query with this name, the SDK\n"
    " invokes `reply`, encodes the returned value with the codec, and replies on the\n"
    " engine-provided one-shot reply channel. Query return values never cross the\n"
    " FFI boundary without a codec.\n"
    "\n"
    " Registration stores the encoding handler in the workflow process\n"
    " dictionary (the engine-contract location the yield-point query pump reads\n"
    " from) and then registers the name with the engine. Register before the\n"
    " first yield point — `workflow.sleep`, `workflow.receive`, `workflow.run`,\n"
    " `child.await`, `workflow.all`/`workflow.race` — that should answer the\n"
    " query; awaits reached earlier cannot service it.\n"
    " Because workflow code re-executes from the top on replay, re-registration\n"
    " after recovery is automatic: a recovered workflow answers queries without\n"
    " any extra author code.\n"
    "\n"
    " Queries bind to AT's read-only query service: they append no workflow `Event`,\n"
    " are answered at engine yield points, and never block workflow progress. By\n"
    " type this callback only returns a value; by workflow-author convention it must\n"
    " not call activity-dispatch primitives such as `workflow.run` or otherwise\n"
    " mutate workflow state — the engine refuses recording calls made while a\n"
    " query is being serviced, surfacing them as a typed handler failure.\n"
).
-spec handler(binary(), aion@codec:codec(EEC), fun(() -> EEC)) -> {ok, nil} |
    {error, aion@error:query_error()}.
handler(Name, Value_codec, Reply) ->
    Encoded_reply = fun(Query_id) ->
        Encoded = (erlang:element(2, Value_codec))(Reply()),
        aion_flow_ffi:reply_query(Query_id, Encoded)
    end,
    aion_flow_query_pump:register(Name, Encoded_reply),
    case aion_flow_ffi:register_query(Name, register_config()) of
        {ok, _} ->
            {ok, nil};

        {error, Raw_error} ->
            {error, query_error(Raw_error)}
    end.

-file("src/aion/query.gleam", 95).
-spec dispatch_config() -> binary().
dispatch_config() ->
    _pipe = gleam@json:object([]),
    gleam@json:to_string(_pipe).

-file("src/aion/query.gleam", 57).
?DOC(
    " Dispatch a typed query through the in-engine/client-side binding.\n"
    "\n"
    " This helper exists for callers already inside the engine boundary and for the\n"
    " pure Gleam test harness. It asks AT's query service for the named handler,\n"
    " then decodes the encoded reply with the supplied codec. An unregistered name\n"
    " returns `Error(UnknownQuery(name))`; malformed replies return\n"
    " `Error(QueryDecodeFailed(_))`. Query dispatch records no workflow event.\n"
).
-spec dispatch(binary(), aion@codec:codec(EEG)) -> {ok, EEG} |
    {error, aion@error:query_error()}.
dispatch(Name, Value_codec) ->
    case aion_flow_ffi:dispatch_query(Name, dispatch_config()) of
        {ok, Raw_payload} ->
            case (erlang:element(3, Value_codec))(Raw_payload) of
                {ok, Value} ->
                    {ok, Value};

                {error, Decode_error} ->
                    {error, {query_decode_failed, Decode_error}}
            end;

        {error, Raw_error} ->
            {error, query_error(Raw_error)}
    end.

-file("src/aion/query.gleam", 78).
?DOC(
    " Return the test/engine observation count reported by the backing query\n"
    " service.\n"
    "\n"
    " Production engines may expose this as diagnostic data; the shipped test double\n"
    " uses it to assert that query dispatch does not append recorded observations or\n"
    " history events.\n"
).
-spec recorded_observations() -> {ok, integer()} |
    {error, aion@error:query_error()}.
recorded_observations() ->
    case aion_flow_ffi:query_recorded_observations() of
        {ok, Raw_count} ->
            case gleam_stdlib:parse_int(Raw_count) of
                {ok, Count} ->
                    {ok, Count};

                {error, _} ->
                    {error,
                        {query_engine_failure,
                            <<"invalid query observation count"/utf8>>}}
            end;

        {error, Raw_error} ->
            {error, query_error(Raw_error)}
    end.