-module(lightspeed@pipeline@quality).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/quality.gleam").
-export([field/3, schema/3, string_value/2, int_value/2, bool_value/2, null_value/1, valid_schema/1, field_type_label/1, check_compatibility/2, validate/2, value_label/1, validation_error_label/1, validation_result_label/1, schema_signature/1]).
-export_type([field_type/0, field/0, schema/0, value/0, boundary_value/0, validation_error/0, validation_result/0]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(" Data-quality and schema-evolution contracts for ETL pipeline boundaries.\n").
-type field_type() :: string_type | int_type | bool_type.
-type field() :: {field, binary(), field_type(), boolean()}.
-type schema() :: {schema, binary(), integer(), list(field())}.
-type value() :: {string_value, binary()} |
{int_value, integer()} |
{bool_value, boolean()} |
null_value.
-type boundary_value() :: {boundary_value, binary(), value()}.
-type validation_error() :: {invalid_schema, binary()} |
{missing_field, binary()} |
{unexpected_field, binary()} |
{type_mismatch, binary(), field_type(), binary()}.
-type validation_result() :: {validation_passed, list(boundary_value())} |
{validation_failed, list(validation_error())}.
-file("src/lightspeed/pipeline/quality.gleam", 52).
?DOC(" Build one field contract.\n").
-spec field(binary(), field_type(), boolean()) -> field().
field(Name, Kind, Required) ->
{field, Name, Kind, Required}.
-file("src/lightspeed/pipeline/quality.gleam", 57).
?DOC(" Build one schema contract.\n").
-spec schema(binary(), integer(), list(field())) -> schema().
schema(Name, Version, Fields) ->
{schema, Name, Version, Fields}.
-file("src/lightspeed/pipeline/quality.gleam", 62).
?DOC(" Build one named string value.\n").
-spec string_value(binary(), binary()) -> boundary_value().
string_value(Name, Value) ->
{boundary_value, Name, {string_value, Value}}.
-file("src/lightspeed/pipeline/quality.gleam", 67).
?DOC(" Build one named integer value.\n").
-spec int_value(binary(), integer()) -> boundary_value().
int_value(Name, Value) ->
{boundary_value, Name, {int_value, Value}}.
-file("src/lightspeed/pipeline/quality.gleam", 72).
?DOC(" Build one named boolean value.\n").
-spec bool_value(binary(), boolean()) -> boundary_value().
bool_value(Name, Value) ->
{boundary_value, Name, {bool_value, Value}}.
-file("src/lightspeed/pipeline/quality.gleam", 77).
?DOC(" Build one named null value.\n").
-spec null_value(binary()) -> boundary_value().
null_value(Name) ->
{boundary_value, Name, null_value}.
-file("src/lightspeed/pipeline/quality.gleam", 376).
-spec contains_name(list(binary()), binary()) -> boolean().
contains_name(Names, Name) ->
case Names of
[] ->
false;
[Entry | Rest] ->
case Entry =:= Name of
true ->
true;
false ->
contains_name(Rest, Name)
end
end.
-file("src/lightspeed/pipeline/quality.gleam", 314).
-spec field_names_unique(list(field()), list(binary())) -> boolean().
field_names_unique(Fields, Seen) ->
case Fields of
[] ->
true;
[Entry | Rest] ->
case contains_name(Seen, erlang:element(2, Entry)) of
true ->
false;
false ->
field_names_unique(Rest, [erlang:element(2, Entry) | Seen])
end
end.
-file("src/lightspeed/pipeline/quality.gleam", 307).
-spec fields_valid(list(field())) -> boolean().
fields_valid(Fields) ->
case Fields of
[] ->
true;
[Entry | Rest] ->
(erlang:element(2, Entry) /= <<""/utf8>>) andalso fields_valid(Rest)
end.
-file("src/lightspeed/pipeline/quality.gleam", 82).
?DOC(" Validate schema invariants.\n").
-spec valid_schema(schema()) -> boolean().
valid_schema(Schema) ->
((((erlang:element(2, Schema) /= <<""/utf8>>) andalso (erlang:element(
3,
Schema
)
> 0))
andalso (erlang:element(4, Schema) /= []))
andalso fields_valid(erlang:element(4, Schema)))
andalso field_names_unique(erlang:element(4, Schema), []).
-file("src/lightspeed/pipeline/quality.gleam", 325).
-spec find_field(list(field()), binary()) -> gleam@option:option(field()).
find_field(Fields, Name) ->
case Fields of
[] ->
none;
[Entry | Rest] ->
case erlang:element(2, Entry) =:= Name of
true ->
{some, Entry};
false ->
find_field(Rest, Name)
end
end.
-file("src/lightspeed/pipeline/quality.gleam", 235).
-spec compatible_new_fields(list(field()), list(field())) -> {ok, nil} |
{error, binary()}.
compatible_new_fields(Previous_fields, Next_fields) ->
case Next_fields of
[] ->
{ok, nil};
[Entry | Rest] ->
case {find_field(Previous_fields, erlang:element(2, Entry)),
erlang:element(4, Entry)} of
{none, true} ->
{error,
<<"new_required_field:"/utf8,
(erlang:element(2, Entry))/binary>>};
{_, _} ->
compatible_new_fields(Previous_fields, Rest)
end
end.
-file("src/lightspeed/pipeline/quality.gleam", 145).
?DOC(" Field-type label.\n").
-spec field_type_label(field_type()) -> binary().
field_type_label(Kind) ->
case Kind of
string_type ->
<<"string"/utf8>>;
int_type ->
<<"int"/utf8>>;
bool_type ->
<<"bool"/utf8>>
end.
-file("src/lightspeed/pipeline/quality.gleam", 205).
-spec compatible_previous_fields(list(field()), list(field())) -> {ok, nil} |
{error, binary()}.
compatible_previous_fields(Previous_fields, Next_fields) ->
case Previous_fields of
[] ->
{ok, nil};
[Previous | Rest] ->
case find_field(Next_fields, erlang:element(2, Previous)) of
none ->
{error,
<<"missing_field:"/utf8,
(erlang:element(2, Previous))/binary>>};
{some, Next} ->
case erlang:element(3, Previous) /= erlang:element(3, Next) of
true ->
{error,
<<<<<<<<<<"type_changed:"/utf8,
(erlang:element(2, Previous))/binary>>/binary,
":"/utf8>>/binary,
(field_type_label(
erlang:element(3, Previous)
))/binary>>/binary,
"->"/utf8>>/binary,
(field_type_label(erlang:element(3, Next)))/binary>>};
false ->
case erlang:element(4, Previous) /= erlang:element(
4,
Next
) of
true ->
{error,
<<"requiredness_changed:"/utf8,
(erlang:element(2, Previous))/binary>>};
false ->
compatible_previous_fields(
Rest,
Next_fields
)
end
end
end
end.
-file("src/lightspeed/pipeline/quality.gleam", 91).
?DOC(" Check whether `next` is compatibility-safe relative to `previous`.\n").
-spec check_compatibility(schema(), schema()) -> {ok, nil} | {error, binary()}.
check_compatibility(Previous, Next) ->
case erlang:element(2, Previous) /= erlang:element(2, Next) of
true ->
{error,
<<<<<<"schema_name_mismatch:"/utf8,
(erlang:element(2, Previous))/binary>>/binary,
":"/utf8>>/binary,
(erlang:element(2, Next))/binary>>};
false ->
case erlang:element(3, Next) < erlang:element(3, Previous) of
true ->
{error,
<<<<<<"schema_version_regression:"/utf8,
(erlang:integer_to_binary(
erlang:element(3, Previous)
))/binary>>/binary,
"->"/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Next)))/binary>>};
false ->
case valid_schema(Previous) andalso valid_schema(Next) of
false ->
{error, <<"invalid_schema"/utf8>>};
true ->
case compatible_previous_fields(
erlang:element(4, Previous),
erlang:element(4, Next)
) of
{error, Reason} ->
{error, Reason};
{ok, _} ->
compatible_new_fields(
erlang:element(4, Previous),
erlang:element(4, Next)
)
end
end
end
end.
-file("src/lightspeed/pipeline/quality.gleam", 288).
-spec validate_unexpected_payload_fields(
list(boundary_value()),
list(field()),
list(validation_error())
) -> list(validation_error()).
validate_unexpected_payload_fields(Payload, Fields, Errors_rev) ->
case Payload of
[] ->
Errors_rev;
[Entry | Rest] ->
case find_field(Fields, erlang:element(2, Entry)) of
none ->
validate_unexpected_payload_fields(
Rest,
Fields,
[{unexpected_field, erlang:element(2, Entry)} |
Errors_rev]
);
{some, _} ->
validate_unexpected_payload_fields(Rest, Fields, Errors_rev)
end
end.
-file("src/lightspeed/pipeline/quality.gleam", 367).
-spec value_kind_label(value()) -> binary().
value_kind_label(Value) ->
case Value of
{string_value, _} ->
<<"string"/utf8>>;
{int_value, _} ->
<<"int"/utf8>>;
{bool_value, _} ->
<<"bool"/utf8>>;
null_value ->
<<"null"/utf8>>
end.
-file("src/lightspeed/pipeline/quality.gleam", 350).
-spec value_matches(field(), value()) -> boolean().
value_matches(Entry, Value) ->
case Value of
null_value ->
not erlang:element(4, Entry);
{string_value, _} ->
erlang:element(3, Entry) =:= string_type;
{int_value, _} ->
erlang:element(3, Entry) =:= int_type;
{bool_value, _} ->
erlang:element(3, Entry) =:= bool_type
end.
-file("src/lightspeed/pipeline/quality.gleam", 336).
-spec find_value(list(boundary_value()), binary()) -> gleam@option:option(boundary_value()).
find_value(Payload, Name) ->
case Payload of
[] ->
none;
[Entry | Rest] ->
case erlang:element(2, Entry) =:= Name of
true ->
{some, Entry};
false ->
find_value(Rest, Name)
end
end.
-file("src/lightspeed/pipeline/quality.gleam", 249).
-spec validate_schema_fields(
list(field()),
list(boundary_value()),
list(boundary_value()),
list(validation_error())
) -> {list(boundary_value()), list(validation_error())}.
validate_schema_fields(Fields, Payload, Normalized_rev, Errors_rev) ->
case Fields of
[] ->
{Normalized_rev, Errors_rev};
[Entry | Rest] ->
{Next_normalized, Next_errors} = case find_value(
Payload,
erlang:element(2, Entry)
) of
none ->
case erlang:element(4, Entry) of
true ->
{Normalized_rev,
[{missing_field, erlang:element(2, Entry)} |
Errors_rev]};
false ->
{[null_value(erlang:element(2, Entry)) |
Normalized_rev],
Errors_rev}
end;
{some, Value} ->
case value_matches(Entry, erlang:element(3, Value)) of
true ->
{[Value | Normalized_rev], Errors_rev};
false ->
{Normalized_rev,
[{type_mismatch,
erlang:element(2, Entry),
erlang:element(3, Entry),
value_kind_label(
erlang:element(3, Value)
)} |
Errors_rev]}
end
end,
validate_schema_fields(Rest, Payload, Next_normalized, Next_errors)
end.
-file("src/lightspeed/pipeline/quality.gleam", 120).
?DOC(" Validate one payload against one schema contract.\n").
-spec validate(schema(), list(boundary_value())) -> validation_result().
validate(Schema, Payload) ->
case valid_schema(Schema) of
false ->
{validation_failed, [{invalid_schema, <<"invalid_schema"/utf8>>}]};
true ->
{Normalized_rev, Schema_errors_rev} = validate_schema_fields(
erlang:element(4, Schema),
Payload,
[],
[]
),
Errors_rev = validate_unexpected_payload_fields(
Payload,
erlang:element(4, Schema),
Schema_errors_rev
),
case Errors_rev of
[] ->
{validation_passed, lists:reverse(Normalized_rev)};
_ ->
{validation_failed, lists:reverse(Errors_rev)}
end
end.
-file("src/lightspeed/pipeline/quality.gleam", 387).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
case Value of
true ->
<<"true"/utf8>>;
false ->
<<"false"/utf8>>
end.
-file("src/lightspeed/pipeline/quality.gleam", 154).
?DOC(" Value label.\n").
-spec value_label(value()) -> binary().
value_label(Value) ->
case Value of
{string_value, V} ->
<<"string:"/utf8, V/binary>>;
{int_value, V@1} ->
<<"int:"/utf8, (erlang:integer_to_binary(V@1))/binary>>;
{bool_value, V@2} ->
<<"bool:"/utf8, (bool_label(V@2))/binary>>;
null_value ->
<<"null"/utf8>>
end.
-file("src/lightspeed/pipeline/quality.gleam", 164).
?DOC(" Validation-error label.\n").
-spec validation_error_label(validation_error()) -> binary().
validation_error_label(Error) ->
case Error of
{invalid_schema, Reason} ->
<<"invalid_schema:"/utf8, Reason/binary>>;
{missing_field, Field} ->
<<"missing_field:"/utf8, Field/binary>>;
{unexpected_field, Field@1} ->
<<"unexpected_field:"/utf8, Field@1/binary>>;
{type_mismatch, Field@2, Expected, Actual} ->
<<<<<<<<<<"type_mismatch:"/utf8, Field@2/binary>>/binary,
":expected="/utf8>>/binary,
(field_type_label(Expected))/binary>>/binary,
":actual="/utf8>>/binary,
Actual/binary>>
end.
-file("src/lightspeed/pipeline/quality.gleam", 394).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
case Values of
[] ->
<<""/utf8>>;
[Value] ->
Value;
[Value@1 | Rest] ->
<<<<Value@1/binary, Separator/binary>>/binary,
(join_with(Separator, Rest))/binary>>
end.
-file("src/lightspeed/pipeline/quality.gleam", 180).
?DOC(" Validation-result label.\n").
-spec validation_result_label(validation_result()) -> binary().
validation_result_label(Result) ->
case Result of
{validation_passed, Normalized} ->
<<"valid:"/utf8,
(join_with(
<<","/utf8>>,
gleam@list:map(
Normalized,
fun(Entry) ->
<<<<(erlang:element(2, Entry))/binary, "="/utf8>>/binary,
(value_label(erlang:element(3, Entry)))/binary>>
end
)
))/binary>>;
{validation_failed, Errors} ->
<<"invalid:"/utf8,
(join_with(
<<","/utf8>>,
gleam@list:map(Errors, fun validation_error_label/1)
))/binary>>
end.
-file("src/lightspeed/pipeline/quality.gleam", 359).
-spec field_signature(field()) -> binary().
field_signature(Field) ->
<<<<<<<<(erlang:element(2, Field))/binary, ":"/utf8>>/binary,
(field_type_label(erlang:element(3, Field)))/binary>>/binary,
":required="/utf8>>/binary,
(bool_label(erlang:element(4, Field)))/binary>>.
-file("src/lightspeed/pipeline/quality.gleam", 196).
?DOC(" Stable schema signature.\n").
-spec schema_signature(schema()) -> binary().
schema_signature(Schema) ->
<<<<<<<<<<"name="/utf8, (erlang:element(2, Schema))/binary>>/binary,
"|version="/utf8>>/binary,
(erlang:integer_to_binary(erlang:element(3, Schema)))/binary>>/binary,
"|fields="/utf8>>/binary,
(join_with(
<<","/utf8>>,
gleam@list:map(erlang:element(4, Schema), fun field_signature/1)
))/binary>>.