-module(lightspeed@data_plane).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/data_plane.gleam").
-export([new/3, row/2, request/2, valid/1, query_window/2, apply_updates/3, contains_full_root_churn/1, steady_state_incremental/1, dataset_label/1, query_error_label/1, window_signature/1, signature/1, target/1, dataset/1, rows/1]).
-export_type([dataset/0, row/0, window_request/0, window/0, update/0, query_error/0, plane/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Large data-plane contracts with windowed queries and incremental updates.\n").
-type dataset() :: list_dataset |
{grid_dataset, integer()} |
{chart_dataset, binary()}.
-type row() :: {row, binary(), binary()}.
-type window_request() :: {window_request, integer(), integer()}.
-type window() :: {window, integer(), integer(), integer(), list(row())}.
-type update() :: {upsert, row()} | {delete, binary()}.
-type query_error() :: {invalid_window, integer(), integer()}.
-opaque plane() :: {plane, binary(), dataset(), list(row())}.
-file("src/lightspeed/data_plane.gleam", 46).
?DOC(" Build one large-data plane.\n").
-spec new(binary(), dataset(), list(row())) -> plane().
new(Target, Dataset, Rows) ->
{plane, Target, Dataset, Rows}.
-file("src/lightspeed/data_plane.gleam", 51).
?DOC(" Build one row.\n").
-spec row(binary(), binary()) -> row().
row(Id, Payload) ->
{row, Id, Payload}.
-file("src/lightspeed/data_plane.gleam", 56).
?DOC(" Build one window request.\n").
-spec request(integer(), integer()) -> window_request().
request(Offset, Limit) ->
{window_request, Offset, Limit}.
-file("src/lightspeed/data_plane.gleam", 197).
-spec contains_string(list(binary()), binary()) -> boolean().
contains_string(Values, Target) ->
case Values of
[] ->
false;
[Value | Rest] ->
case Value =:= Target of
true ->
true;
false ->
contains_string(Rest, Target)
end
end.
-file("src/lightspeed/data_plane.gleam", 186).
-spec unique_ids(list(row()), list(binary())) -> boolean().
unique_ids(Rows, Seen) ->
case Rows of
[] ->
true;
[Entry | Rest] ->
case contains_string(Seen, erlang:element(2, Entry)) of
true ->
false;
false ->
unique_ids(Rest, [erlang:element(2, Entry) | Seen])
end
end.
-file("src/lightspeed/data_plane.gleam", 178).
-spec valid_dataset(dataset()) -> boolean().
valid_dataset(Dataset) ->
case Dataset of
list_dataset ->
true;
{grid_dataset, Columns} ->
Columns > 0;
{chart_dataset, Series_key} ->
Series_key /= <<""/utf8>>
end.
-file("src/lightspeed/data_plane.gleam", 61).
?DOC(" Validate one large-data plane contract.\n").
-spec valid(plane()) -> boolean().
valid(Plane) ->
((erlang:element(2, Plane) /= <<""/utf8>>) andalso valid_dataset(
erlang:element(3, Plane)
))
andalso unique_ids(erlang:element(4, Plane), []).
-file("src/lightspeed/data_plane.gleam", 241).
-spec drop(list(row()), integer()) -> list(row()).
drop(Rows, Count) ->
case Count =< 0 of
true ->
Rows;
false ->
case Rows of
[] ->
[];
[_ | Rest] ->
drop(Rest, Count - 1)
end
end.
-file("src/lightspeed/data_plane.gleam", 252).
-spec take(list(row()), integer()) -> list(row()).
take(Rows, Count) ->
case Count =< 0 of
true ->
[];
false ->
case Rows of
[] ->
[];
[Row | Rest] ->
[Row | take(Rest, Count - 1)]
end
end.
-file("src/lightspeed/data_plane.gleam", 68).
?DOC(" Execute one windowed query.\n").
-spec query_window(plane(), window_request()) -> {ok, window()} |
{error, query_error()}.
query_window(Plane, Request) ->
case (erlang:element(2, Request) < 0) orelse (erlang:element(3, Request) =< 0) of
true ->
{error,
{invalid_window,
erlang:element(2, Request),
erlang:element(3, Request)}};
false ->
Total = erlang:length(erlang:element(4, Plane)),
Start = gleam@int:clamp(erlang:element(2, Request), 0, Total),
Rows = take(
drop(erlang:element(4, Plane), Start),
erlang:element(3, Request)
),
{ok, {window, Start, erlang:element(3, Request), Total, Rows}}
end.
-file("src/lightspeed/data_plane.gleam", 263).
-spec to_keyed_nodes(list(row())) -> list(lightspeed@diff:keyed_node()).
to_keyed_nodes(Rows) ->
case Rows of
[] ->
[];
[Row | Rest] ->
[lightspeed@diff:keyed_node(
erlang:element(2, Row),
erlang:element(3, Row)
) |
to_keyed_nodes(Rest)]
end.
-file("src/lightspeed/data_plane.gleam", 230).
-spec remove(list(row()), binary()) -> list(row()).
remove(Rows, Target_id) ->
case Rows of
[] ->
[];
[Row | Rest] ->
case erlang:element(2, Row) =:= Target_id of
true ->
remove(Rest, Target_id);
false ->
[Row | remove(Rest, Target_id)]
end
end.
-file("src/lightspeed/data_plane.gleam", 219).
-spec upsert(list(row()), row()) -> list(row()).
upsert(Rows, Next) ->
case Rows of
[] ->
[Next];
[Row | Rest] ->
case erlang:element(2, Row) =:= erlang:element(2, Next) of
true ->
[Next | Rest];
false ->
[Row | upsert(Rest, Next)]
end
end.
-file("src/lightspeed/data_plane.gleam", 208).
-spec apply_all(list(row()), list(update())) -> list(row()).
apply_all(Rows, Updates) ->
case Updates of
[] ->
Rows;
[Update | Rest] ->
case Update of
{upsert, Row} ->
apply_all(upsert(Rows, Row), Rest);
{delete, Id} ->
apply_all(remove(Rows, Id), Rest)
end
end.
-file("src/lightspeed/data_plane.gleam", 84).
?DOC(" Apply incremental updates and return window patches for one request.\n").
-spec apply_updates(plane(), window_request(), list(update())) -> {plane(),
{ok, window()} | {error, query_error()},
list(lightspeed@diff:patch())}.
apply_updates(Plane, Request, Updates) ->
Previous_window = query_window(Plane, Request),
Next_rows = apply_all(erlang:element(4, Plane), Updates),
Next_plane = {plane,
erlang:element(2, Plane),
erlang:element(3, Plane),
Next_rows},
Next_window = query_window(Next_plane, Request),
Patches = case {Previous_window, Next_window} of
{{ok, Previous}, {ok, Current}} ->
lightspeed@diff:keyed_patch_plan(
erlang:element(2, Plane),
to_keyed_nodes(erlang:element(5, Previous)),
to_keyed_nodes(erlang:element(5, Current))
);
{_, _} ->
[]
end,
{Next_plane, Next_window, Patches}.
-file("src/lightspeed/data_plane.gleam", 107).
?DOC(" True when a patch set includes full-root churn operations.\n").
-spec contains_full_root_churn(list(lightspeed@diff:patch())) -> boolean().
contains_full_root_churn(Patches) ->
case Patches of
[] ->
false;
[Patch | Rest] ->
case Patch of
{upsert_keyed, _, _, _} ->
contains_full_root_churn(Rest);
{remove_keyed, _, _} ->
contains_full_root_churn(Rest);
_ ->
true
end
end.
-file("src/lightspeed/data_plane.gleam", 120).
?DOC(" True when a patch set is steady-state incremental (keyed-only or empty).\n").
-spec steady_state_incremental(list(lightspeed@diff:patch())) -> boolean().
steady_state_incremental(Patches) ->
not contains_full_root_churn(Patches).
-file("src/lightspeed/data_plane.gleam", 125).
?DOC(" Stable dataset label.\n").
-spec dataset_label(dataset()) -> binary().
dataset_label(Dataset) ->
case Dataset of
list_dataset ->
<<"list"/utf8>>;
{grid_dataset, Columns} ->
<<"grid:"/utf8, (erlang:integer_to_binary(Columns))/binary>>;
{chart_dataset, Series_key} ->
<<"chart:"/utf8, Series_key/binary>>
end.
-file("src/lightspeed/data_plane.gleam", 134).
?DOC(" Stable query-error label.\n").
-spec query_error_label(query_error()) -> binary().
query_error_label(Error) ->
case Error of
{invalid_window, Offset, Limit} ->
<<<<<<"invalid_window:"/utf8,
(erlang:integer_to_binary(Offset))/binary>>/binary,
":"/utf8>>/binary,
(erlang:integer_to_binary(Limit))/binary>>
end.
-file("src/lightspeed/data_plane.gleam", 273).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[Value] ->
Value;
[Value@1 | Rest] ->
<<<<Value@1/binary, Separator/binary>>/binary,
(join_with(Separator, Rest))/binary>>
end.
-file("src/lightspeed/data_plane.gleam", 142).
?DOC(" Stable window signature for fixture assertions.\n").
-spec window_signature(window()) -> binary().
window_signature(Window) ->
<<<<<<<<<<<<<<"offset="/utf8,
(erlang:integer_to_binary(
erlang:element(2, Window)
))/binary>>/binary,
"|limit="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Window)))/binary>>/binary,
"|total="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(4, Window)))/binary>>/binary,
"|ids="/utf8>>/binary,
(join_with(
<<","/utf8>>,
gleam@list:map(
erlang:element(5, Window),
fun(Row) -> erlang:element(2, Row) end
)
))/binary>>.
-file("src/lightspeed/data_plane.gleam", 154).
?DOC(" Stable plane signature for deterministic fixtures.\n").
-spec signature(plane()) -> binary().
signature(Plane) ->
<<<<<<<<<<"target="/utf8, (erlang:element(2, Plane))/binary>>/binary,
"|dataset="/utf8>>/binary,
(dataset_label(erlang:element(3, Plane)))/binary>>/binary,
"|rows="/utf8>>/binary,
(erlang:integer_to_binary(erlang:length(erlang:element(4, Plane))))/binary>>.
-file("src/lightspeed/data_plane.gleam", 164).
?DOC(" Plane target selector.\n").
-spec target(plane()) -> binary().
target(Plane) ->
erlang:element(2, Plane).
-file("src/lightspeed/data_plane.gleam", 169).
?DOC(" Plane dataset profile.\n").
-spec dataset(plane()) -> dataset().
dataset(Plane) ->
erlang:element(3, Plane).
-file("src/lightspeed/data_plane.gleam", 174).
?DOC(" Plane rows in stored order.\n").
-spec rows(plane()) -> list(row()).
rows(Plane) ->
erlang:element(4, Plane).