-module(lightspeed@async@backpressure).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/async/backpressure.gleam").
-export([boundary/4, default_boundary/0, new/1, valid/1, enqueue/2, start_available/2, state_label/1, succeed/2, fail/3, cancel/3, retry/2, state/2, runtime_boundary/1, in_flight_count/1, queued_count/1, mode_label/1, error_label/1, tasks/1, signature/1]).
-export_type([adapter_mode/0, boundary/0, task_state/0, runtime_error/0, task/0, runtime/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Async runtime boundaries for cancellation and backpressure-aware adapters.\n").
-type adapter_mode() :: push_pull | push_only | pull_only.
-type boundary() :: {boundary, adapter_mode(), integer(), integer(), integer()}.
-type task_state() :: {queued, integer()} |
{in_flight, integer(), integer()} |
{succeeded, integer()} |
{failed, integer(), binary()} |
{cancelled, integer(), binary()}.
-type runtime_error() :: {duplicate_task, binary()} |
{queue_saturated, binary(), integer()} |
{unknown_task, binary()} |
{invalid_transition, binary(), binary(), binary()}.
-type task() :: {task, binary(), task_state()}.
-opaque runtime() :: {runtime,
boundary(),
integer(),
list(task()),
integer(),
integer()}.
-file("src/lightspeed/async/backpressure.gleam", 57).
?DOC(" Build a boundary profile.\n").
-spec boundary(adapter_mode(), integer(), integer(), integer()) -> boundary().
boundary(Mode, Max_in_flight, Max_queued, Cancellation_budget_ms) ->
{boundary, Mode, Max_in_flight, Max_queued, Cancellation_budget_ms}.
-file("src/lightspeed/async/backpressure.gleam", 72).
?DOC(" Default boundary profile for mixed push/pull adapters.\n").
-spec default_boundary() -> boundary().
default_boundary() ->
{boundary, push_pull, 3, 6, 250}.
-file("src/lightspeed/async/backpressure.gleam", 82).
?DOC(" Build a runtime.\n").
-spec new(boundary()) -> runtime().
new(Boundary) ->
{runtime, Boundary, 1, [], 0, 0}.
-file("src/lightspeed/async/backpressure.gleam", 93).
?DOC(" Validate boundary settings.\n").
-spec valid(runtime()) -> boolean().
valid(Runtime) ->
((erlang:element(3, erlang:element(2, Runtime)) > 0) andalso (erlang:element(
4,
erlang:element(2, Runtime)
)
>= 0))
andalso (erlang:element(5, erlang:element(2, Runtime)) > 0).
-file("src/lightspeed/async/backpressure.gleam", 427).
-spec has_task(list(task()), binary()) -> boolean().
has_task(Tasks_rev, Key) ->
case Tasks_rev of
[] ->
false;
[Task | Rest] ->
case erlang:element(2, Task) =:= Key of
true ->
true;
false ->
has_task(Rest, Key)
end
end.
-file("src/lightspeed/async/backpressure.gleam", 100).
?DOC(" Enqueue one task with backpressure checks.\n").
-spec enqueue(runtime(), binary()) -> {runtime(),
{ok, nil} | {error, runtime_error()}}.
enqueue(Runtime, Key) ->
case has_task(erlang:element(4, Runtime), Key) of
true ->
{Runtime, {error, {duplicate_task, Key}}};
false ->
case erlang:element(6, Runtime) >= erlang:element(
4,
erlang:element(2, Runtime)
) of
true ->
{Runtime,
{error,
{queue_saturated,
Key,
erlang:element(4, erlang:element(2, Runtime))}}};
false ->
Seq = erlang:element(3, Runtime),
Task = {task, Key, {queued, Seq}},
{{runtime,
erlang:element(2, Runtime),
Seq + 1,
[Task | erlang:element(4, Runtime)],
erlang:element(5, Runtime),
erlang:element(6, Runtime) + 1},
{ok, nil}}
end
end.
-file("src/lightspeed/async/backpressure.gleam", 343).
-spec append(list(task()), list(task())) -> list(task()).
append(Left, Right) ->
case Left of
[] ->
Right;
[Entry | Rest] ->
[Entry | append(Rest, Right)]
end.
-file("src/lightspeed/async/backpressure.gleam", 339).
-spec prepend(list(task()), task()) -> list(task()).
prepend(Values, Value) ->
[Value | Values].
-file("src/lightspeed/async/backpressure.gleam", 314).
-spec pop_next_queued(list(task()), integer(), list(task())) -> gleam@option:option({integer(),
binary(),
list(task())}).
pop_next_queued(Tasks_rev, Now_ms, Seen_rev) ->
case Tasks_rev of
[] ->
none;
[Task | Rest] ->
case erlang:element(3, Task) of
{queued, Seq} ->
{some,
{Seq,
erlang:element(2, Task),
begin
_pipe = lists:reverse(Seen_rev),
_pipe@1 = prepend(
_pipe,
{task,
erlang:element(2, Task),
{in_flight, Seq, Now_ms}}
),
append(_pipe@1, Rest)
end}};
_ ->
pop_next_queued(Rest, Now_ms, [Task | Seen_rev])
end
end.
-file("src/lightspeed/async/backpressure.gleam", 287).
-spec start_loop(runtime(), integer(), list(binary())) -> {runtime(),
list(binary())}.
start_loop(Runtime, Now_ms, Started_rev) ->
case erlang:element(5, Runtime) >= erlang:element(
3,
erlang:element(2, Runtime)
) of
true ->
{Runtime, lists:reverse(Started_rev)};
false ->
case pop_next_queued(erlang:element(4, Runtime), Now_ms, []) of
none ->
{Runtime, lists:reverse(Started_rev)};
{some, {Seq, Key, Tasks_rev}} ->
Next = {runtime,
erlang:element(2, Runtime),
erlang:element(3, Runtime),
Tasks_rev,
erlang:element(5, Runtime) + 1,
erlang:element(6, Runtime) - 1},
start_loop(
Next,
Now_ms,
[<<<<Key/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(Seq))/binary>> |
Started_rev]
)
end
end.
-file("src/lightspeed/async/backpressure.gleam", 133).
?DOC(" Start queued tasks up to in-flight limit.\n").
-spec start_available(runtime(), integer()) -> {runtime(), list(binary())}.
start_available(Runtime, Now_ms) ->
start_loop(Runtime, Now_ms, []).
-file("src/lightspeed/async/backpressure.gleam", 246).
?DOC(" Stable task-state label.\n").
-spec state_label(task_state()) -> binary().
state_label(State) ->
case State of
{queued, _} ->
<<"queued"/utf8>>;
{in_flight, _, _} ->
<<"in_flight"/utf8>>;
{succeeded, _} ->
<<"succeeded"/utf8>>;
{failed, _, _} ->
<<"failed"/utf8>>;
{cancelled, _, _} ->
<<"cancelled"/utf8>>
end.
-file("src/lightspeed/async/backpressure.gleam", 388).
-spec in_flight_count_of(task_state()) -> integer().
in_flight_count_of(State) ->
case State of
{in_flight, _, _} ->
1;
_ ->
0
end.
-file("src/lightspeed/async/backpressure.gleam", 377).
-spec in_flight_delta(task_state(), task_state()) -> integer().
in_flight_delta(From, To) ->
in_flight_count_of(To) - in_flight_count_of(From).
-file("src/lightspeed/async/backpressure.gleam", 381).
-spec queued_count_of(task_state()) -> integer().
queued_count_of(State) ->
case State of
{queued, _} ->
1;
_ ->
0
end.
-file("src/lightspeed/async/backpressure.gleam", 373).
-spec queued_delta(task_state(), task_state()) -> integer().
queued_delta(From, To) ->
queued_count_of(To) - queued_count_of(From).
-file("src/lightspeed/async/backpressure.gleam", 420).
-spec reverse_into(list(task()), list(task())) -> list(task()).
reverse_into(Left, Right) ->
case Left of
[] ->
Right;
[Entry | Rest] ->
reverse_into(Rest, [Entry | Right])
end.
-file("src/lightspeed/async/backpressure.gleam", 395).
-spec update_task(
list(task()),
binary(),
fun((task_state()) -> {ok, task_state()} | {error, runtime_error()}),
list(task())
) -> {ok, {list(task()), task_state(), task_state()}} | {error, runtime_error()}.
update_task(Tasks_rev, Key, With, Seen_rev) ->
case Tasks_rev of
[] ->
{error, {unknown_task, Key}};
[Task | Rest] ->
case erlang:element(2, Task) =:= Key of
false ->
update_task(Rest, Key, With, [Task | Seen_rev]);
true ->
case With(erlang:element(3, Task)) of
{error, Error} ->
{error, Error};
{ok, Next_state} ->
{ok,
{reverse_into(
Seen_rev,
[{task,
erlang:element(2, Task),
Next_state} |
Rest]
),
erlang:element(3, Task),
Next_state}}
end
end
end.
-file("src/lightspeed/async/backpressure.gleam", 350).
-spec transition(
runtime(),
binary(),
binary(),
fun((task_state()) -> {ok, task_state()} | {error, runtime_error()})
) -> {runtime(), {ok, nil} | {error, runtime_error()}}.
transition(Runtime, Key, _, With) ->
case update_task(erlang:element(4, Runtime), Key, With, []) of
{error, Error} ->
{Runtime, {error, Error}};
{ok, {Tasks_rev, From_state, To_state}} ->
Runtime@1 = {runtime,
erlang:element(2, Runtime),
erlang:element(3, Runtime),
Tasks_rev,
erlang:element(5, Runtime) + in_flight_delta(
From_state,
To_state
),
erlang:element(6, Runtime) + queued_delta(From_state, To_state)},
{Runtime@1, {ok, nil}}
end.
-file("src/lightspeed/async/backpressure.gleam", 141).
?DOC(" Mark one in-flight task as succeeded.\n").
-spec succeed(runtime(), binary()) -> {runtime(),
{ok, nil} | {error, runtime_error()}}.
succeed(Runtime, Key) ->
transition(Runtime, Key, <<"succeed"/utf8>>, fun(State) -> case State of
{in_flight, Seq, _} ->
{ok, {succeeded, Seq}};
_ ->
{error,
{invalid_transition,
Key,
state_label(State),
<<"succeed"/utf8>>}}
end end).
-file("src/lightspeed/async/backpressure.gleam", 159).
?DOC(" Mark one in-flight task as failed.\n").
-spec fail(runtime(), binary(), binary()) -> {runtime(),
{ok, nil} | {error, runtime_error()}}.
fail(Runtime, Key, Reason) ->
transition(Runtime, Key, <<"fail"/utf8>>, fun(State) -> case State of
{in_flight, Seq, _} ->
{ok, {failed, Seq, Reason}};
_ ->
{error,
{invalid_transition,
Key,
state_label(State),
<<"fail"/utf8>>}}
end end).
-file("src/lightspeed/async/backpressure.gleam", 178).
?DOC(" Cancel one queued or in-flight task.\n").
-spec cancel(runtime(), binary(), binary()) -> {runtime(),
{ok, nil} | {error, runtime_error()}}.
cancel(Runtime, Key, Reason) ->
transition(Runtime, Key, <<"cancel"/utf8>>, fun(State) -> case State of
{queued, Seq} ->
{ok, {cancelled, Seq, Reason}};
{in_flight, Seq@1, _} ->
{ok, {cancelled, Seq@1, Reason}};
_ ->
{error,
{invalid_transition,
Key,
state_label(State),
<<"cancel"/utf8>>}}
end end).
-file("src/lightspeed/async/backpressure.gleam", 198).
?DOC(" Retry a failed/cancelled task by putting it back in queue.\n").
-spec retry(runtime(), binary()) -> {runtime(),
{ok, nil} | {error, runtime_error()}}.
retry(Runtime, Key) ->
transition(Runtime, Key, <<"retry"/utf8>>, fun(State) -> case State of
{failed, Seq, _} ->
{ok, {queued, Seq}};
{cancelled, Seq@1, _} ->
{ok, {queued, Seq@1}};
_ ->
{error,
{invalid_transition,
Key,
state_label(State),
<<"retry"/utf8>>}}
end end).
-file("src/lightspeed/async/backpressure.gleam", 438).
-spec find_state(list(task()), binary()) -> gleam@option:option(task_state()).
find_state(Tasks_rev, Key) ->
case Tasks_rev of
[] ->
none;
[Task | Rest] ->
case erlang:element(2, Task) =:= Key of
true ->
{some, erlang:element(3, Task)};
false ->
find_state(Rest, Key)
end
end.
-file("src/lightspeed/async/backpressure.gleam", 217).
?DOC(" Lookup task state by key.\n").
-spec state(runtime(), binary()) -> gleam@option:option(task_state()).
state(Runtime, Key) ->
find_state(erlang:element(4, Runtime), Key).
-file("src/lightspeed/async/backpressure.gleam", 222).
?DOC(" Boundary accessor.\n").
-spec runtime_boundary(runtime()) -> boundary().
runtime_boundary(Runtime) ->
erlang:element(2, Runtime).
-file("src/lightspeed/async/backpressure.gleam", 227).
?DOC(" In-flight task count.\n").
-spec in_flight_count(runtime()) -> integer().
in_flight_count(Runtime) ->
erlang:element(5, Runtime).
-file("src/lightspeed/async/backpressure.gleam", 232).
?DOC(" Queued task count.\n").
-spec queued_count(runtime()) -> integer().
queued_count(Runtime) ->
erlang:element(6, Runtime).
-file("src/lightspeed/async/backpressure.gleam", 237).
?DOC(" Stable adapter mode label.\n").
-spec mode_label(adapter_mode()) -> binary().
mode_label(Mode) ->
case Mode of
push_pull ->
<<"push_pull"/utf8>>;
push_only ->
<<"push_only"/utf8>>;
pull_only ->
<<"pull_only"/utf8>>
end.
-file("src/lightspeed/async/backpressure.gleam", 257).
?DOC(" Stable runtime-error label.\n").
-spec error_label(runtime_error()) -> binary().
error_label(Error) ->
case Error of
{duplicate_task, Key} ->
<<"duplicate_task:"/utf8, Key/binary>>;
{queue_saturated, Key@1, Max_queued} ->
<<<<<<"queue_saturated:"/utf8, Key@1/binary>>/binary, ":"/utf8>>/binary,
(erlang:integer_to_binary(Max_queued))/binary>>;
{unknown_task, Key@2} ->
<<"unknown_task:"/utf8, Key@2/binary>>;
{invalid_transition, Key@3, State, Action} ->
<<<<<<<<<<"invalid_transition:"/utf8, Key@3/binary>>/binary,
":"/utf8>>/binary,
State/binary>>/binary,
":"/utf8>>/binary,
Action/binary>>
end.
-file("src/lightspeed/async/backpressure.gleam", 449).
-spec task_signature(task_state()) -> binary().
task_signature(State) ->
case State of
{queued, Seq} ->
<<"queued:"/utf8, (erlang:integer_to_binary(Seq))/binary>>;
{in_flight, Seq@1, Started_ms} ->
<<<<<<"in_flight:"/utf8, (erlang:integer_to_binary(Seq@1))/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(Started_ms))/binary>>;
{succeeded, Seq@2} ->
<<"succeeded:"/utf8, (erlang:integer_to_binary(Seq@2))/binary>>;
{failed, Seq@3, Reason} ->
<<<<<<"failed:"/utf8, (erlang:integer_to_binary(Seq@3))/binary>>/binary,
":"/utf8>>/binary,
Reason/binary>>;
{cancelled, Seq@4, Reason@1} ->
<<<<<<"cancelled:"/utf8, (erlang:integer_to_binary(Seq@4))/binary>>/binary,
":"/utf8>>/binary,
Reason@1/binary>>
end.
-file("src/lightspeed/async/backpressure.gleam", 281).
?DOC(" Tasks in stable sequence order.\n").
-spec tasks(runtime()) -> list(task_state()).
tasks(Runtime) ->
_pipe = erlang:element(4, Runtime),
_pipe@1 = lists:reverse(_pipe),
gleam@list:map(_pipe@1, fun(Task) -> erlang:element(3, Task) end).
-file("src/lightspeed/async/backpressure.gleam", 461).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[Value] ->
Value;
[Value@1 | Rest] ->
<<<<Value@1/binary, Separator/binary>>/binary,
(join_with(Separator, Rest))/binary>>
end.
-file("src/lightspeed/async/backpressure.gleam", 269).
?DOC(" Stable runtime signature for deterministic fixtures.\n").
-spec signature(runtime()) -> binary().
signature(Runtime) ->
<<<<<<<<<<<<<<"mode="/utf8,
(mode_label(
erlang:element(
2,
erlang:element(2, Runtime)
)
))/binary>>/binary,
"|in_flight="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(5, Runtime)))/binary>>/binary,
"|queued="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(6, Runtime)))/binary>>/binary,
"|tasks="/utf8>>/binary,
(join_with(
<<","/utf8>>,
gleam@list:map(tasks(Runtime), fun task_signature/1)
))/binary>>.