src/lightspeed@async@backpressure.erl

-module(lightspeed@async@backpressure).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/async/backpressure.gleam").
-export([boundary/4, default_boundary/0, new/1, valid/1, enqueue/2, start_available/2, state_label/1, succeed/2, fail/3, cancel/3, retry/2, state/2, runtime_boundary/1, in_flight_count/1, queued_count/1, mode_label/1, error_label/1, tasks/1, signature/1]).
-export_type([adapter_mode/0, boundary/0, task_state/0, runtime_error/0, task/0, runtime/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Async runtime boundaries for cancellation and backpressure-aware adapters.\n").

-type adapter_mode() :: push_pull | push_only | pull_only.

-type boundary() :: {boundary, adapter_mode(), integer(), integer(), integer()}.

-type task_state() :: {queued, integer()} |
    {in_flight, integer(), integer()} |
    {succeeded, integer()} |
    {failed, integer(), binary()} |
    {cancelled, integer(), binary()}.

-type runtime_error() :: {duplicate_task, binary()} |
    {queue_saturated, binary(), integer()} |
    {unknown_task, binary()} |
    {invalid_transition, binary(), binary(), binary()}.

-type task() :: {task, binary(), task_state()}.

-opaque runtime() :: {runtime,
        boundary(),
        integer(),
        list(task()),
        integer(),
        integer()}.

-file("src/lightspeed/async/backpressure.gleam", 57).
?DOC(" Build a boundary profile.\n").
-spec boundary(adapter_mode(), integer(), integer(), integer()) -> boundary().
boundary(Mode, Max_in_flight, Max_queued, Cancellation_budget_ms) ->
    {boundary, Mode, Max_in_flight, Max_queued, Cancellation_budget_ms}.

-file("src/lightspeed/async/backpressure.gleam", 72).
?DOC(" Default boundary profile for mixed push/pull adapters.\n").
-spec default_boundary() -> boundary().
default_boundary() ->
    {boundary, push_pull, 3, 6, 250}.

-file("src/lightspeed/async/backpressure.gleam", 82).
?DOC(" Build a runtime.\n").
-spec new(boundary()) -> runtime().
new(Boundary) ->
    {runtime, Boundary, 1, [], 0, 0}.

-file("src/lightspeed/async/backpressure.gleam", 93).
?DOC(" Validate boundary settings.\n").
-spec valid(runtime()) -> boolean().
valid(Runtime) ->
    ((erlang:element(3, erlang:element(2, Runtime)) > 0) andalso (erlang:element(
        4,
        erlang:element(2, Runtime)
    )
    >= 0))
    andalso (erlang:element(5, erlang:element(2, Runtime)) > 0).

-file("src/lightspeed/async/backpressure.gleam", 427).
-spec has_task(list(task()), binary()) -> boolean().
has_task(Tasks_rev, Key) ->
    case Tasks_rev of
        [] ->
            false;

        [Task | Rest] ->
            case erlang:element(2, Task) =:= Key of
                true ->
                    true;

                false ->
                    has_task(Rest, Key)
            end
    end.

-file("src/lightspeed/async/backpressure.gleam", 100).
?DOC(" Enqueue one task with backpressure checks.\n").
-spec enqueue(runtime(), binary()) -> {runtime(),
    {ok, nil} | {error, runtime_error()}}.
enqueue(Runtime, Key) ->
    case has_task(erlang:element(4, Runtime), Key) of
        true ->
            {Runtime, {error, {duplicate_task, Key}}};

        false ->
            case erlang:element(6, Runtime) >= erlang:element(
                4,
                erlang:element(2, Runtime)
            ) of
                true ->
                    {Runtime,
                        {error,
                            {queue_saturated,
                                Key,
                                erlang:element(4, erlang:element(2, Runtime))}}};

                false ->
                    Seq = erlang:element(3, Runtime),
                    Task = {task, Key, {queued, Seq}},
                    {{runtime,
                            erlang:element(2, Runtime),
                            Seq + 1,
                            [Task | erlang:element(4, Runtime)],
                            erlang:element(5, Runtime),
                            erlang:element(6, Runtime) + 1},
                        {ok, nil}}
            end
    end.

-file("src/lightspeed/async/backpressure.gleam", 343).
-spec append(list(task()), list(task())) -> list(task()).
append(Left, Right) ->
    case Left of
        [] ->
            Right;

        [Entry | Rest] ->
            [Entry | append(Rest, Right)]
    end.

-file("src/lightspeed/async/backpressure.gleam", 339).
-spec prepend(list(task()), task()) -> list(task()).
prepend(Values, Value) ->
    [Value | Values].

-file("src/lightspeed/async/backpressure.gleam", 314).
-spec pop_next_queued(list(task()), integer(), list(task())) -> gleam@option:option({integer(),
    binary(),
    list(task())}).
pop_next_queued(Tasks_rev, Now_ms, Seen_rev) ->
    case Tasks_rev of
        [] ->
            none;

        [Task | Rest] ->
            case erlang:element(3, Task) of
                {queued, Seq} ->
                    {some,
                        {Seq,
                            erlang:element(2, Task),
                            begin
                                _pipe = lists:reverse(Seen_rev),
                                _pipe@1 = prepend(
                                    _pipe,
                                    {task,
                                        erlang:element(2, Task),
                                        {in_flight, Seq, Now_ms}}
                                ),
                                append(_pipe@1, Rest)
                            end}};

                _ ->
                    pop_next_queued(Rest, Now_ms, [Task | Seen_rev])
            end
    end.

-file("src/lightspeed/async/backpressure.gleam", 287).
-spec start_loop(runtime(), integer(), list(binary())) -> {runtime(),
    list(binary())}.
start_loop(Runtime, Now_ms, Started_rev) ->
    case erlang:element(5, Runtime) >= erlang:element(
        3,
        erlang:element(2, Runtime)
    ) of
        true ->
            {Runtime, lists:reverse(Started_rev)};

        false ->
            case pop_next_queued(erlang:element(4, Runtime), Now_ms, []) of
                none ->
                    {Runtime, lists:reverse(Started_rev)};

                {some, {Seq, Key, Tasks_rev}} ->
                    Next = {runtime,
                        erlang:element(2, Runtime),
                        erlang:element(3, Runtime),
                        Tasks_rev,
                        erlang:element(5, Runtime) + 1,
                        erlang:element(6, Runtime) - 1},
                    start_loop(
                        Next,
                        Now_ms,
                        [<<<<Key/binary, ":"/utf8>>/binary,
                                (erlang:integer_to_binary(Seq))/binary>> |
                            Started_rev]
                    )
            end
    end.

-file("src/lightspeed/async/backpressure.gleam", 133).
?DOC(" Start queued tasks up to in-flight limit.\n").
-spec start_available(runtime(), integer()) -> {runtime(), list(binary())}.
start_available(Runtime, Now_ms) ->
    start_loop(Runtime, Now_ms, []).

-file("src/lightspeed/async/backpressure.gleam", 246).
?DOC(" Stable task-state label.\n").
-spec state_label(task_state()) -> binary().
state_label(State) ->
    case State of
        {queued, _} ->
            <<"queued"/utf8>>;

        {in_flight, _, _} ->
            <<"in_flight"/utf8>>;

        {succeeded, _} ->
            <<"succeeded"/utf8>>;

        {failed, _, _} ->
            <<"failed"/utf8>>;

        {cancelled, _, _} ->
            <<"cancelled"/utf8>>
    end.

-file("src/lightspeed/async/backpressure.gleam", 388).
-spec in_flight_count_of(task_state()) -> integer().
in_flight_count_of(State) ->
    case State of
        {in_flight, _, _} ->
            1;

        _ ->
            0
    end.

-file("src/lightspeed/async/backpressure.gleam", 377).
-spec in_flight_delta(task_state(), task_state()) -> integer().
in_flight_delta(From, To) ->
    in_flight_count_of(To) - in_flight_count_of(From).

-file("src/lightspeed/async/backpressure.gleam", 381).
-spec queued_count_of(task_state()) -> integer().
queued_count_of(State) ->
    case State of
        {queued, _} ->
            1;

        _ ->
            0
    end.

-file("src/lightspeed/async/backpressure.gleam", 373).
-spec queued_delta(task_state(), task_state()) -> integer().
queued_delta(From, To) ->
    queued_count_of(To) - queued_count_of(From).

-file("src/lightspeed/async/backpressure.gleam", 420).
-spec reverse_into(list(task()), list(task())) -> list(task()).
reverse_into(Left, Right) ->
    case Left of
        [] ->
            Right;

        [Entry | Rest] ->
            reverse_into(Rest, [Entry | Right])
    end.

-file("src/lightspeed/async/backpressure.gleam", 395).
-spec update_task(
    list(task()),
    binary(),
    fun((task_state()) -> {ok, task_state()} | {error, runtime_error()}),
    list(task())
) -> {ok, {list(task()), task_state(), task_state()}} | {error, runtime_error()}.
update_task(Tasks_rev, Key, With, Seen_rev) ->
    case Tasks_rev of
        [] ->
            {error, {unknown_task, Key}};

        [Task | Rest] ->
            case erlang:element(2, Task) =:= Key of
                false ->
                    update_task(Rest, Key, With, [Task | Seen_rev]);

                true ->
                    case With(erlang:element(3, Task)) of
                        {error, Error} ->
                            {error, Error};

                        {ok, Next_state} ->
                            {ok,
                                {reverse_into(
                                        Seen_rev,
                                        [{task,
                                                erlang:element(2, Task),
                                                Next_state} |
                                            Rest]
                                    ),
                                    erlang:element(3, Task),
                                    Next_state}}
                    end
            end
    end.

-file("src/lightspeed/async/backpressure.gleam", 350).
-spec transition(
    runtime(),
    binary(),
    binary(),
    fun((task_state()) -> {ok, task_state()} | {error, runtime_error()})
) -> {runtime(), {ok, nil} | {error, runtime_error()}}.
transition(Runtime, Key, _, With) ->
    case update_task(erlang:element(4, Runtime), Key, With, []) of
        {error, Error} ->
            {Runtime, {error, Error}};

        {ok, {Tasks_rev, From_state, To_state}} ->
            Runtime@1 = {runtime,
                erlang:element(2, Runtime),
                erlang:element(3, Runtime),
                Tasks_rev,
                erlang:element(5, Runtime) + in_flight_delta(
                    From_state,
                    To_state
                ),
                erlang:element(6, Runtime) + queued_delta(From_state, To_state)},
            {Runtime@1, {ok, nil}}
    end.

-file("src/lightspeed/async/backpressure.gleam", 141).
?DOC(" Mark one in-flight task as succeeded.\n").
-spec succeed(runtime(), binary()) -> {runtime(),
    {ok, nil} | {error, runtime_error()}}.
succeed(Runtime, Key) ->
    transition(Runtime, Key, <<"succeed"/utf8>>, fun(State) -> case State of
                {in_flight, Seq, _} ->
                    {ok, {succeeded, Seq}};

                _ ->
                    {error,
                        {invalid_transition,
                            Key,
                            state_label(State),
                            <<"succeed"/utf8>>}}
            end end).

-file("src/lightspeed/async/backpressure.gleam", 159).
?DOC(" Mark one in-flight task as failed.\n").
-spec fail(runtime(), binary(), binary()) -> {runtime(),
    {ok, nil} | {error, runtime_error()}}.
fail(Runtime, Key, Reason) ->
    transition(Runtime, Key, <<"fail"/utf8>>, fun(State) -> case State of
                {in_flight, Seq, _} ->
                    {ok, {failed, Seq, Reason}};

                _ ->
                    {error,
                        {invalid_transition,
                            Key,
                            state_label(State),
                            <<"fail"/utf8>>}}
            end end).

-file("src/lightspeed/async/backpressure.gleam", 178).
?DOC(" Cancel one queued or in-flight task.\n").
-spec cancel(runtime(), binary(), binary()) -> {runtime(),
    {ok, nil} | {error, runtime_error()}}.
cancel(Runtime, Key, Reason) ->
    transition(Runtime, Key, <<"cancel"/utf8>>, fun(State) -> case State of
                {queued, Seq} ->
                    {ok, {cancelled, Seq, Reason}};

                {in_flight, Seq@1, _} ->
                    {ok, {cancelled, Seq@1, Reason}};

                _ ->
                    {error,
                        {invalid_transition,
                            Key,
                            state_label(State),
                            <<"cancel"/utf8>>}}
            end end).

-file("src/lightspeed/async/backpressure.gleam", 198).
?DOC(" Retry a failed/cancelled task by putting it back in queue.\n").
-spec retry(runtime(), binary()) -> {runtime(),
    {ok, nil} | {error, runtime_error()}}.
retry(Runtime, Key) ->
    transition(Runtime, Key, <<"retry"/utf8>>, fun(State) -> case State of
                {failed, Seq, _} ->
                    {ok, {queued, Seq}};

                {cancelled, Seq@1, _} ->
                    {ok, {queued, Seq@1}};

                _ ->
                    {error,
                        {invalid_transition,
                            Key,
                            state_label(State),
                            <<"retry"/utf8>>}}
            end end).

-file("src/lightspeed/async/backpressure.gleam", 438).
-spec find_state(list(task()), binary()) -> gleam@option:option(task_state()).
find_state(Tasks_rev, Key) ->
    case Tasks_rev of
        [] ->
            none;

        [Task | Rest] ->
            case erlang:element(2, Task) =:= Key of
                true ->
                    {some, erlang:element(3, Task)};

                false ->
                    find_state(Rest, Key)
            end
    end.

-file("src/lightspeed/async/backpressure.gleam", 217).
?DOC(" Lookup task state by key.\n").
-spec state(runtime(), binary()) -> gleam@option:option(task_state()).
state(Runtime, Key) ->
    find_state(erlang:element(4, Runtime), Key).

-file("src/lightspeed/async/backpressure.gleam", 222).
?DOC(" Boundary accessor.\n").
-spec runtime_boundary(runtime()) -> boundary().
runtime_boundary(Runtime) ->
    erlang:element(2, Runtime).

-file("src/lightspeed/async/backpressure.gleam", 227).
?DOC(" In-flight task count.\n").
-spec in_flight_count(runtime()) -> integer().
in_flight_count(Runtime) ->
    erlang:element(5, Runtime).

-file("src/lightspeed/async/backpressure.gleam", 232).
?DOC(" Queued task count.\n").
-spec queued_count(runtime()) -> integer().
queued_count(Runtime) ->
    erlang:element(6, Runtime).

-file("src/lightspeed/async/backpressure.gleam", 237).
?DOC(" Stable adapter mode label.\n").
-spec mode_label(adapter_mode()) -> binary().
mode_label(Mode) ->
    case Mode of
        push_pull ->
            <<"push_pull"/utf8>>;

        push_only ->
            <<"push_only"/utf8>>;

        pull_only ->
            <<"pull_only"/utf8>>
    end.

-file("src/lightspeed/async/backpressure.gleam", 257).
?DOC(" Stable runtime-error label.\n").
-spec error_label(runtime_error()) -> binary().
error_label(Error) ->
    case Error of
        {duplicate_task, Key} ->
            <<"duplicate_task:"/utf8, Key/binary>>;

        {queue_saturated, Key@1, Max_queued} ->
            <<<<<<"queue_saturated:"/utf8, Key@1/binary>>/binary, ":"/utf8>>/binary,
                (erlang:integer_to_binary(Max_queued))/binary>>;

        {unknown_task, Key@2} ->
            <<"unknown_task:"/utf8, Key@2/binary>>;

        {invalid_transition, Key@3, State, Action} ->
            <<<<<<<<<<"invalid_transition:"/utf8, Key@3/binary>>/binary,
                            ":"/utf8>>/binary,
                        State/binary>>/binary,
                    ":"/utf8>>/binary,
                Action/binary>>
    end.

-file("src/lightspeed/async/backpressure.gleam", 449).
-spec task_signature(task_state()) -> binary().
task_signature(State) ->
    case State of
        {queued, Seq} ->
            <<"queued:"/utf8, (erlang:integer_to_binary(Seq))/binary>>;

        {in_flight, Seq@1, Started_ms} ->
            <<<<<<"in_flight:"/utf8, (erlang:integer_to_binary(Seq@1))/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(Started_ms))/binary>>;

        {succeeded, Seq@2} ->
            <<"succeeded:"/utf8, (erlang:integer_to_binary(Seq@2))/binary>>;

        {failed, Seq@3, Reason} ->
            <<<<<<"failed:"/utf8, (erlang:integer_to_binary(Seq@3))/binary>>/binary,
                    ":"/utf8>>/binary,
                Reason/binary>>;

        {cancelled, Seq@4, Reason@1} ->
            <<<<<<"cancelled:"/utf8, (erlang:integer_to_binary(Seq@4))/binary>>/binary,
                    ":"/utf8>>/binary,
                Reason@1/binary>>
    end.

-file("src/lightspeed/async/backpressure.gleam", 281).
?DOC(" Tasks in stable sequence order.\n").
-spec tasks(runtime()) -> list(task_state()).
tasks(Runtime) ->
    _pipe = erlang:element(4, Runtime),
    _pipe@1 = lists:reverse(_pipe),
    gleam@list:map(_pipe@1, fun(Task) -> erlang:element(3, Task) end).

-file("src/lightspeed/async/backpressure.gleam", 461).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
    case Values of
        [] ->
            <<""/utf8>>;

        [Value] ->
            Value;

        [Value@1 | Rest] ->
            <<<<Value@1/binary, Separator/binary>>/binary,
                (join_with(Separator, Rest))/binary>>
    end.

-file("src/lightspeed/async/backpressure.gleam", 269).
?DOC(" Stable runtime signature for deterministic fixtures.\n").
-spec signature(runtime()) -> binary().
signature(Runtime) ->
    <<<<<<<<<<<<<<"mode="/utf8,
                                (mode_label(
                                    erlang:element(
                                        2,
                                        erlang:element(2, Runtime)
                                    )
                                ))/binary>>/binary,
                            "|in_flight="/utf8>>/binary,
                        (erlang:integer_to_binary(erlang:element(5, Runtime)))/binary>>/binary,
                    "|queued="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(6, Runtime)))/binary>>/binary,
            "|tasks="/utf8>>/binary,
        (join_with(
            <<","/utf8>>,
            gleam@list:map(tasks(Runtime), fun task_signature/1)
        ))/binary>>.