src/lightspeed@pipeline@quality.erl

-module(lightspeed@pipeline@quality).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src/lightspeed/pipeline/quality.gleam").
-export([field/3, schema/3, string_value/2, int_value/2, bool_value/2, null_value/1, valid_schema/1, field_type_label/1, check_compatibility/2, validate/2, value_label/1, validation_error_label/1, validation_result_label/1, schema_signature/1]).
-export_type([field_type/0, field/0, schema/0, value/0, boundary_value/0, validation_error/0, validation_result/0]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(" Data-quality and schema-evolution contracts for ETL pipeline boundaries.\n").

-type field_type() :: string_type | int_type | bool_type.

-type field() :: {field, binary(), field_type(), boolean()}.

-type schema() :: {schema, binary(), integer(), list(field())}.

-type value() :: {string_value, binary()} |
    {int_value, integer()} |
    {bool_value, boolean()} |
    null_value.

-type boundary_value() :: {boundary_value, binary(), value()}.

-type validation_error() :: {invalid_schema, binary()} |
    {missing_field, binary()} |
    {unexpected_field, binary()} |
    {type_mismatch, binary(), field_type(), binary()}.

-type validation_result() :: {validation_passed, list(boundary_value())} |
    {validation_failed, list(validation_error())}.

-file("src/lightspeed/pipeline/quality.gleam", 52).
?DOC(" Build one field contract.\n").
-spec field(binary(), field_type(), boolean()) -> field().
field(Name, Kind, Required) ->
    {field, Name, Kind, Required}.

-file("src/lightspeed/pipeline/quality.gleam", 57).
?DOC(" Build one schema contract.\n").
-spec schema(binary(), integer(), list(field())) -> schema().
schema(Name, Version, Fields) ->
    {schema, Name, Version, Fields}.

-file("src/lightspeed/pipeline/quality.gleam", 62).
?DOC(" Build one named string value.\n").
-spec string_value(binary(), binary()) -> boundary_value().
string_value(Name, Value) ->
    {boundary_value, Name, {string_value, Value}}.

-file("src/lightspeed/pipeline/quality.gleam", 67).
?DOC(" Build one named integer value.\n").
-spec int_value(binary(), integer()) -> boundary_value().
int_value(Name, Value) ->
    {boundary_value, Name, {int_value, Value}}.

-file("src/lightspeed/pipeline/quality.gleam", 72).
?DOC(" Build one named boolean value.\n").
-spec bool_value(binary(), boolean()) -> boundary_value().
bool_value(Name, Value) ->
    {boundary_value, Name, {bool_value, Value}}.

-file("src/lightspeed/pipeline/quality.gleam", 77).
?DOC(" Build one named null value.\n").
-spec null_value(binary()) -> boundary_value().
null_value(Name) ->
    {boundary_value, Name, null_value}.

-file("src/lightspeed/pipeline/quality.gleam", 376).
-spec contains_name(list(binary()), binary()) -> boolean().
contains_name(Names, Name) ->
    case Names of
        [] ->
            false;

        [Entry | Rest] ->
            case Entry =:= Name of
                true ->
                    true;

                false ->
                    contains_name(Rest, Name)
            end
    end.

-file("src/lightspeed/pipeline/quality.gleam", 314).
-spec field_names_unique(list(field()), list(binary())) -> boolean().
field_names_unique(Fields, Seen) ->
    case Fields of
        [] ->
            true;

        [Entry | Rest] ->
            case contains_name(Seen, erlang:element(2, Entry)) of
                true ->
                    false;

                false ->
                    field_names_unique(Rest, [erlang:element(2, Entry) | Seen])
            end
    end.

-file("src/lightspeed/pipeline/quality.gleam", 307).
-spec fields_valid(list(field())) -> boolean().
fields_valid(Fields) ->
    case Fields of
        [] ->
            true;

        [Entry | Rest] ->
            (erlang:element(2, Entry) /= <<""/utf8>>) andalso fields_valid(Rest)
    end.

-file("src/lightspeed/pipeline/quality.gleam", 82).
?DOC(" Validate schema invariants.\n").
-spec valid_schema(schema()) -> boolean().
valid_schema(Schema) ->
    ((((erlang:element(2, Schema) /= <<""/utf8>>) andalso (erlang:element(
        3,
        Schema
    )
    > 0))
    andalso (erlang:element(4, Schema) /= []))
    andalso fields_valid(erlang:element(4, Schema)))
    andalso field_names_unique(erlang:element(4, Schema), []).

-file("src/lightspeed/pipeline/quality.gleam", 325).
-spec find_field(list(field()), binary()) -> gleam@option:option(field()).
find_field(Fields, Name) ->
    case Fields of
        [] ->
            none;

        [Entry | Rest] ->
            case erlang:element(2, Entry) =:= Name of
                true ->
                    {some, Entry};

                false ->
                    find_field(Rest, Name)
            end
    end.

-file("src/lightspeed/pipeline/quality.gleam", 235).
-spec compatible_new_fields(list(field()), list(field())) -> {ok, nil} |
    {error, binary()}.
compatible_new_fields(Previous_fields, Next_fields) ->
    case Next_fields of
        [] ->
            {ok, nil};

        [Entry | Rest] ->
            case {find_field(Previous_fields, erlang:element(2, Entry)),
                erlang:element(4, Entry)} of
                {none, true} ->
                    {error,
                        <<"new_required_field:"/utf8,
                            (erlang:element(2, Entry))/binary>>};

                {_, _} ->
                    compatible_new_fields(Previous_fields, Rest)
            end
    end.

-file("src/lightspeed/pipeline/quality.gleam", 145).
?DOC(" Field-type label.\n").
-spec field_type_label(field_type()) -> binary().
field_type_label(Kind) ->
    case Kind of
        string_type ->
            <<"string"/utf8>>;

        int_type ->
            <<"int"/utf8>>;

        bool_type ->
            <<"bool"/utf8>>
    end.

-file("src/lightspeed/pipeline/quality.gleam", 205).
-spec compatible_previous_fields(list(field()), list(field())) -> {ok, nil} |
    {error, binary()}.
compatible_previous_fields(Previous_fields, Next_fields) ->
    case Previous_fields of
        [] ->
            {ok, nil};

        [Previous | Rest] ->
            case find_field(Next_fields, erlang:element(2, Previous)) of
                none ->
                    {error,
                        <<"missing_field:"/utf8,
                            (erlang:element(2, Previous))/binary>>};

                {some, Next} ->
                    case erlang:element(3, Previous) /= erlang:element(3, Next) of
                        true ->
                            {error,
                                <<<<<<<<<<"type_changed:"/utf8,
                                                    (erlang:element(2, Previous))/binary>>/binary,
                                                ":"/utf8>>/binary,
                                            (field_type_label(
                                                erlang:element(3, Previous)
                                            ))/binary>>/binary,
                                        "->"/utf8>>/binary,
                                    (field_type_label(erlang:element(3, Next)))/binary>>};

                        false ->
                            case erlang:element(4, Previous) /= erlang:element(
                                4,
                                Next
                            ) of
                                true ->
                                    {error,
                                        <<"requiredness_changed:"/utf8,
                                            (erlang:element(2, Previous))/binary>>};

                                false ->
                                    compatible_previous_fields(
                                        Rest,
                                        Next_fields
                                    )
                            end
                    end
            end
    end.

-file("src/lightspeed/pipeline/quality.gleam", 91).
?DOC(" Check whether `next` is compatibility-safe relative to `previous`.\n").
-spec check_compatibility(schema(), schema()) -> {ok, nil} | {error, binary()}.
check_compatibility(Previous, Next) ->
    case erlang:element(2, Previous) /= erlang:element(2, Next) of
        true ->
            {error,
                <<<<<<"schema_name_mismatch:"/utf8,
                            (erlang:element(2, Previous))/binary>>/binary,
                        ":"/utf8>>/binary,
                    (erlang:element(2, Next))/binary>>};

        false ->
            case erlang:element(3, Next) < erlang:element(3, Previous) of
                true ->
                    {error,
                        <<<<<<"schema_version_regression:"/utf8,
                                    (erlang:integer_to_binary(
                                        erlang:element(3, Previous)
                                    ))/binary>>/binary,
                                "->"/utf8>>/binary,
                            (erlang:integer_to_binary(erlang:element(3, Next)))/binary>>};

                false ->
                    case valid_schema(Previous) andalso valid_schema(Next) of
                        false ->
                            {error, <<"invalid_schema"/utf8>>};

                        true ->
                            case compatible_previous_fields(
                                erlang:element(4, Previous),
                                erlang:element(4, Next)
                            ) of
                                {error, Reason} ->
                                    {error, Reason};

                                {ok, _} ->
                                    compatible_new_fields(
                                        erlang:element(4, Previous),
                                        erlang:element(4, Next)
                                    )
                            end
                    end
            end
    end.

-file("src/lightspeed/pipeline/quality.gleam", 288).
-spec validate_unexpected_payload_fields(
    list(boundary_value()),
    list(field()),
    list(validation_error())
) -> list(validation_error()).
validate_unexpected_payload_fields(Payload, Fields, Errors_rev) ->
    case Payload of
        [] ->
            Errors_rev;

        [Entry | Rest] ->
            case find_field(Fields, erlang:element(2, Entry)) of
                none ->
                    validate_unexpected_payload_fields(
                        Rest,
                        Fields,
                        [{unexpected_field, erlang:element(2, Entry)} |
                            Errors_rev]
                    );

                {some, _} ->
                    validate_unexpected_payload_fields(Rest, Fields, Errors_rev)
            end
    end.

-file("src/lightspeed/pipeline/quality.gleam", 367).
-spec value_kind_label(value()) -> binary().
value_kind_label(Value) ->
    case Value of
        {string_value, _} ->
            <<"string"/utf8>>;

        {int_value, _} ->
            <<"int"/utf8>>;

        {bool_value, _} ->
            <<"bool"/utf8>>;

        null_value ->
            <<"null"/utf8>>
    end.

-file("src/lightspeed/pipeline/quality.gleam", 350).
-spec value_matches(field(), value()) -> boolean().
value_matches(Entry, Value) ->
    case Value of
        null_value ->
            not erlang:element(4, Entry);

        {string_value, _} ->
            erlang:element(3, Entry) =:= string_type;

        {int_value, _} ->
            erlang:element(3, Entry) =:= int_type;

        {bool_value, _} ->
            erlang:element(3, Entry) =:= bool_type
    end.

-file("src/lightspeed/pipeline/quality.gleam", 336).
-spec find_value(list(boundary_value()), binary()) -> gleam@option:option(boundary_value()).
find_value(Payload, Name) ->
    case Payload of
        [] ->
            none;

        [Entry | Rest] ->
            case erlang:element(2, Entry) =:= Name of
                true ->
                    {some, Entry};

                false ->
                    find_value(Rest, Name)
            end
    end.

-file("src/lightspeed/pipeline/quality.gleam", 249).
-spec validate_schema_fields(
    list(field()),
    list(boundary_value()),
    list(boundary_value()),
    list(validation_error())
) -> {list(boundary_value()), list(validation_error())}.
validate_schema_fields(Fields, Payload, Normalized_rev, Errors_rev) ->
    case Fields of
        [] ->
            {Normalized_rev, Errors_rev};

        [Entry | Rest] ->
            {Next_normalized, Next_errors} = case find_value(
                Payload,
                erlang:element(2, Entry)
            ) of
                none ->
                    case erlang:element(4, Entry) of
                        true ->
                            {Normalized_rev,
                                [{missing_field, erlang:element(2, Entry)} |
                                    Errors_rev]};

                        false ->
                            {[null_value(erlang:element(2, Entry)) |
                                    Normalized_rev],
                                Errors_rev}
                    end;

                {some, Value} ->
                    case value_matches(Entry, erlang:element(3, Value)) of
                        true ->
                            {[Value | Normalized_rev], Errors_rev};

                        false ->
                            {Normalized_rev,
                                [{type_mismatch,
                                        erlang:element(2, Entry),
                                        erlang:element(3, Entry),
                                        value_kind_label(
                                            erlang:element(3, Value)
                                        )} |
                                    Errors_rev]}
                    end
            end,
            validate_schema_fields(Rest, Payload, Next_normalized, Next_errors)
    end.

-file("src/lightspeed/pipeline/quality.gleam", 120).
?DOC(" Validate one payload against one schema contract.\n").
-spec validate(schema(), list(boundary_value())) -> validation_result().
validate(Schema, Payload) ->
    case valid_schema(Schema) of
        false ->
            {validation_failed, [{invalid_schema, <<"invalid_schema"/utf8>>}]};

        true ->
            {Normalized_rev, Schema_errors_rev} = validate_schema_fields(
                erlang:element(4, Schema),
                Payload,
                [],
                []
            ),
            Errors_rev = validate_unexpected_payload_fields(
                Payload,
                erlang:element(4, Schema),
                Schema_errors_rev
            ),
            case Errors_rev of
                [] ->
                    {validation_passed, lists:reverse(Normalized_rev)};

                _ ->
                    {validation_failed, lists:reverse(Errors_rev)}
            end
    end.

-file("src/lightspeed/pipeline/quality.gleam", 387).
-spec bool_label(boolean()) -> binary().
bool_label(Value) ->
    case Value of
        true ->
            <<"true"/utf8>>;

        false ->
            <<"false"/utf8>>
    end.

-file("src/lightspeed/pipeline/quality.gleam", 154).
?DOC(" Value label.\n").
-spec value_label(value()) -> binary().
value_label(Value) ->
    case Value of
        {string_value, V} ->
            <<"string:"/utf8, V/binary>>;

        {int_value, V@1} ->
            <<"int:"/utf8, (erlang:integer_to_binary(V@1))/binary>>;

        {bool_value, V@2} ->
            <<"bool:"/utf8, (bool_label(V@2))/binary>>;

        null_value ->
            <<"null"/utf8>>
    end.

-file("src/lightspeed/pipeline/quality.gleam", 164).
?DOC(" Validation-error label.\n").
-spec validation_error_label(validation_error()) -> binary().
validation_error_label(Error) ->
    case Error of
        {invalid_schema, Reason} ->
            <<"invalid_schema:"/utf8, Reason/binary>>;

        {missing_field, Field} ->
            <<"missing_field:"/utf8, Field/binary>>;

        {unexpected_field, Field@1} ->
            <<"unexpected_field:"/utf8, Field@1/binary>>;

        {type_mismatch, Field@2, Expected, Actual} ->
            <<<<<<<<<<"type_mismatch:"/utf8, Field@2/binary>>/binary,
                            ":expected="/utf8>>/binary,
                        (field_type_label(Expected))/binary>>/binary,
                    ":actual="/utf8>>/binary,
                Actual/binary>>
    end.

-file("src/lightspeed/pipeline/quality.gleam", 394).
-spec join_with(binary(), list(binary())) -> binary().
join_with(Separator, Values) ->
    case Values of
        [] ->
            <<""/utf8>>;

        [Value] ->
            Value;

        [Value@1 | Rest] ->
            <<<<Value@1/binary, Separator/binary>>/binary,
                (join_with(Separator, Rest))/binary>>
    end.

-file("src/lightspeed/pipeline/quality.gleam", 180).
?DOC(" Validation-result label.\n").
-spec validation_result_label(validation_result()) -> binary().
validation_result_label(Result) ->
    case Result of
        {validation_passed, Normalized} ->
            <<"valid:"/utf8,
                (join_with(
                    <<","/utf8>>,
                    gleam@list:map(
                        Normalized,
                        fun(Entry) ->
                            <<<<(erlang:element(2, Entry))/binary, "="/utf8>>/binary,
                                (value_label(erlang:element(3, Entry)))/binary>>
                        end
                    )
                ))/binary>>;

        {validation_failed, Errors} ->
            <<"invalid:"/utf8,
                (join_with(
                    <<","/utf8>>,
                    gleam@list:map(Errors, fun validation_error_label/1)
                ))/binary>>
    end.

-file("src/lightspeed/pipeline/quality.gleam", 359).
-spec field_signature(field()) -> binary().
field_signature(Field) ->
    <<<<<<<<(erlang:element(2, Field))/binary, ":"/utf8>>/binary,
                (field_type_label(erlang:element(3, Field)))/binary>>/binary,
            ":required="/utf8>>/binary,
        (bool_label(erlang:element(4, Field)))/binary>>.

-file("src/lightspeed/pipeline/quality.gleam", 196).
?DOC(" Stable schema signature.\n").
-spec schema_signature(schema()) -> binary().
schema_signature(Schema) ->
    <<<<<<<<<<"name="/utf8, (erlang:element(2, Schema))/binary>>/binary,
                    "|version="/utf8>>/binary,
                (erlang:integer_to_binary(erlang:element(3, Schema)))/binary>>/binary,
            "|fields="/utf8>>/binary,
        (join_with(
            <<","/utf8>>,
            gleam@list:map(erlang:element(4, Schema), fun field_signature/1)
        ))/binary>>.