src/lightspeed@data_plane.erl

-module(lightspeed@data_plane).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/data_plane.gleam").
-export([new/3, row/2, request/2, valid/1, query_window/2, apply_updates/3, contains_full_root_churn/1, steady_state_incremental/1, dataset_label/1, query_error_label/1, window_signature/1, signature/1, target/1, dataset/1, rows/1]).
-export_type([dataset/0, row/0, window_request/0, window/0, update/0, query_error/0, plane/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Large data-plane contracts with windowed queries and incremental updates.\n").

-type dataset() :: list_dataset |
    {grid_dataset, integer()} |
    {chart_dataset, binary()}.

-type row() :: {row, binary(), binary()}.

-type window_request() :: {window_request, integer(), integer()}.

-type window() :: {window, integer(), integer(), integer(), list(row())}.

-type update() :: {upsert, row()} | {delete, binary()}.

-type query_error() :: {invalid_window, integer(), integer()}.

-opaque plane() :: {plane, binary(), dataset(), list(row())}.

-file("src/lightspeed/data_plane.gleam", 46).
?DOC(" Build one large-data plane.\n").
-spec new(binary(), dataset(), list(row())) -> plane().
new(Target, Dataset, Rows) ->
    {plane, Target, Dataset, Rows}.

-file("src/lightspeed/data_plane.gleam", 51).
?DOC(" Build one row.\n").
-spec row(binary(), binary()) -> row().
row(Id, Payload) ->
    {row, Id, Payload}.

-file("src/lightspeed/data_plane.gleam", 56).
?DOC(" Build one window request.\n").
-spec request(integer(), integer()) -> window_request().
request(Offset, Limit) ->
    {window_request, Offset, Limit}.

-file("src/lightspeed/data_plane.gleam", 197).
-spec contains_string(list(binary()), binary()) -> boolean().
contains_string(Values, Target) ->
    case Values of
        [] ->
            false;

        [Value | Rest] ->
            case Value =:= Target of
                true ->
                    true;

                false ->
                    contains_string(Rest, Target)
            end
    end.

-file("src/lightspeed/data_plane.gleam", 186).
-spec unique_ids(list(row()), list(binary())) -> boolean().
unique_ids(Rows, Seen) ->
    case Rows of
        [] ->
            true;

        [Entry | Rest] ->
            case contains_string(Seen, erlang:element(2, Entry)) of
                true ->
                    false;

                false ->
                    unique_ids(Rest, [erlang:element(2, Entry) | Seen])
            end
    end.

-file("src/lightspeed/data_plane.gleam", 178).
-spec valid_dataset(dataset()) -> boolean().
valid_dataset(Dataset) ->
    case Dataset of
        list_dataset ->
            true;

        {grid_dataset, Columns} ->
            Columns > 0;

        {chart_dataset, Series_key} ->
            Series_key /= <<""/utf8>>
    end.

-file("src/lightspeed/data_plane.gleam", 61).
?DOC(" Validate one large-data plane contract.\n").
-spec valid(plane()) -> boolean().
valid(Plane) ->
    ((erlang:element(2, Plane) /= <<""/utf8>>) andalso valid_dataset(
        erlang:element(3, Plane)
    ))
    andalso unique_ids(erlang:element(4, Plane), []).

-file("src/lightspeed/data_plane.gleam", 241).
-spec drop(list(row()), integer()) -> list(row()).
drop(Rows, Count) ->
    case Count =< 0 of
        true ->
            Rows;

        false ->
            case Rows of
                [] ->
                    [];

                [_ | Rest] ->
                    drop(Rest, Count - 1)
            end
    end.

-file("src/lightspeed/data_plane.gleam", 252).
-spec take(list(row()), integer()) -> list(row()).
take(Rows, Count) ->
    case Count =< 0 of
        true ->
            [];

        false ->
            case Rows of
                [] ->
                    [];

                [Row | Rest] ->
                    [Row | take(Rest, Count - 1)]
            end
    end.

-file("src/lightspeed/data_plane.gleam", 68).
?DOC(" Execute one windowed query.\n").
-spec query_window(plane(), window_request()) -> {ok, window()} |
    {error, query_error()}.
query_window(Plane, Request) ->
    case (erlang:element(2, Request) < 0) orelse (erlang:element(3, Request) =< 0) of
        true ->
            {error,
                {invalid_window,
                    erlang:element(2, Request),
                    erlang:element(3, Request)}};

        false ->
            Total = erlang:length(erlang:element(4, Plane)),
            Start = gleam@int:clamp(erlang:element(2, Request), 0, Total),
            Rows = take(
                drop(erlang:element(4, Plane), Start),
                erlang:element(3, Request)
            ),
            {ok, {window, Start, erlang:element(3, Request), Total, Rows}}
    end.

-file("src/lightspeed/data_plane.gleam", 263).
-spec to_keyed_nodes(list(row())) -> list(lightspeed@diff:keyed_node()).
to_keyed_nodes(Rows) ->
    case Rows of
        [] ->
            [];

        [Row | Rest] ->
            [lightspeed@diff:keyed_node(
                    erlang:element(2, Row),
                    erlang:element(3, Row)
                ) |
                to_keyed_nodes(Rest)]
    end.

-file("src/lightspeed/data_plane.gleam", 230).
-spec remove(list(row()), binary()) -> list(row()).
remove(Rows, Target_id) ->
    case Rows of
        [] ->
            [];

        [Row | Rest] ->
            case erlang:element(2, Row) =:= Target_id of
                true ->
                    remove(Rest, Target_id);

                false ->
                    [Row | remove(Rest, Target_id)]
            end
    end.

-file("src/lightspeed/data_plane.gleam", 219).
-spec upsert(list(row()), row()) -> list(row()).
upsert(Rows, Next) ->
    case Rows of
        [] ->
            [Next];

        [Row | Rest] ->
            case erlang:element(2, Row) =:= erlang:element(2, Next) of
                true ->
                    [Next | Rest];

                false ->
                    [Row | upsert(Rest, Next)]
            end
    end.

-file("src/lightspeed/data_plane.gleam", 208).
-spec apply_all(list(row()), list(update())) -> list(row()).
apply_all(Rows, Updates) ->
    case Updates of
        [] ->
            Rows;

        [Update | Rest] ->
            case Update of
                {upsert, Row} ->
                    apply_all(upsert(Rows, Row), Rest);

                {delete, Id} ->
                    apply_all(remove(Rows, Id), Rest)
            end
    end.

-file("src/lightspeed/data_plane.gleam", 84).
?DOC(" Apply incremental updates and return window patches for one request.\n").
-spec apply_updates(plane(), window_request(), list(update())) -> {plane(),
    {ok, window()} | {error, query_error()},
    list(lightspeed@diff:patch())}.
apply_updates(Plane, Request, Updates) ->
    Previous_window = query_window(Plane, Request),
    Next_rows = apply_all(erlang:element(4, Plane), Updates),
    Next_plane = {plane,
        erlang:element(2, Plane),
        erlang:element(3, Plane),
        Next_rows},
    Next_window = query_window(Next_plane, Request),
    Patches = case {Previous_window, Next_window} of
        {{ok, Previous}, {ok, Current}} ->
            lightspeed@diff:keyed_patch_plan(
                erlang:element(2, Plane),
                to_keyed_nodes(erlang:element(5, Previous)),
                to_keyed_nodes(erlang:element(5, Current))
            );

        {_, _} ->
            []
    end,
    {Next_plane, Next_window, Patches}.

-file("src/lightspeed/data_plane.gleam", 107).
?DOC(" True when a patch set includes full-root churn operations.\n").
-spec contains_full_root_churn(list(lightspeed@diff:patch())) -> boolean().
contains_full_root_churn(Patches) ->
    case Patches of
        [] ->
            false;

        [Patch | Rest] ->
            case Patch of
                {upsert_keyed, _, _, _} ->
                    contains_full_root_churn(Rest);

                {remove_keyed, _, _} ->
                    contains_full_root_churn(Rest);

                _ ->
                    true
            end
    end.

-file("src/lightspeed/data_plane.gleam", 120).
?DOC(" True when a patch set is steady-state incremental (keyed-only or empty).\n").
-spec steady_state_incremental(list(lightspeed@diff:patch())) -> boolean().
steady_state_incremental(Patches) ->
    not contains_full_root_churn(Patches).

-file("src/lightspeed/data_plane.gleam", 125).
?DOC(" Stable dataset label.\n").
-spec dataset_label(dataset()) -> binary().
dataset_label(Dataset) ->
    case Dataset of
        list_dataset ->
            <<"list"/utf8>>;

        {grid_dataset, Columns} ->
            <<"grid:"/utf8, (erlang:integer_to_binary(Columns))/binary>>;

        {chart_dataset, Series_key} ->
            <<"chart:"/utf8, Series_key/binary>>
    end.

-file("src/lightspeed/data_plane.gleam", 134).
?DOC(" Stable query-error label.\n").
-spec query_error_label(query_error()) -> binary().
query_error_label(Error) ->
    case Error of
        {invalid_window, Offset, Limit} ->
            <<<<<<"invalid_window:"/utf8,
                        (erlang:integer_to_binary(Offset))/binary>>/binary,
                    ":"/utf8>>/binary,
                (erlang:integer_to_binary(Limit))/binary>>
    end.

-file("src/lightspeed/data_plane.gleam", 273).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
    case Values of
        [] ->
            <<""/utf8>>;

        [Value] ->
            Value;

        [Value@1 | Rest] ->
            <<<<Value@1/binary, Separator/binary>>/binary,
                (join_with(Separator, Rest))/binary>>
    end.

-file("src/lightspeed/data_plane.gleam", 142).
?DOC(" Stable window signature for fixture assertions.\n").
-spec window_signature(window()) -> binary().
window_signature(Window) ->
    <<<<<<<<<<<<<<"offset="/utf8,
                                (erlang:integer_to_binary(
                                    erlang:element(2, Window)
                                ))/binary>>/binary,
                            "|limit="/utf8>>/binary,
                        (erlang:integer_to_binary(erlang:element(3, Window)))/binary>>/binary,
                    "|total="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(4, Window)))/binary>>/binary,
            "|ids="/utf8>>/binary,
        (join_with(
            <<","/utf8>>,
            gleam@list:map(
                erlang:element(5, Window),
                fun(Row) -> erlang:element(2, Row) end
            )
        ))/binary>>.

-file("src/lightspeed/data_plane.gleam", 154).
?DOC(" Stable plane signature for deterministic fixtures.\n").
-spec signature(plane()) -> binary().
signature(Plane) ->
    <<<<<<<<<<"target="/utf8, (erlang:element(2, Plane))/binary>>/binary,
                    "|dataset="/utf8>>/binary,
                (dataset_label(erlang:element(3, Plane)))/binary>>/binary,
            "|rows="/utf8>>/binary,
        (erlang:integer_to_binary(erlang:length(erlang:element(4, Plane))))/binary>>.

-file("src/lightspeed/data_plane.gleam", 164).
?DOC(" Plane target selector.\n").
-spec target(plane()) -> binary().
target(Plane) ->
    erlang:element(2, Plane).

-file("src/lightspeed/data_plane.gleam", 169).
?DOC(" Plane dataset profile.\n").
-spec dataset(plane()) -> dataset().
dataset(Plane) ->
    erlang:element(3, Plane).

-file("src/lightspeed/data_plane.gleam", 174).
?DOC(" Plane rows in stored order.\n").
-spec rows(plane()) -> list(row()).
rows(Plane) ->
    erlang:element(4, Plane).