Skip to main content

src/aqueduct.erl

-module(aqueduct).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src\\aqueduct.gleam").
-export([empty/0, single/1, from_list/1, from_iterator/2, from_divider/2, repeat/1, next/1, repeat_list/1, prepend/2, repeat_stream/1, collect/1, collect_count/2, collect_until/2, foldl/3, foldl_count/4, join/2, take_count/2, take_until/2, drop/2, map/2, map2/3, filter/2, filter_map/2, each/2, maybe_prepend/2, append/2, maybe_append/2, concat/2, wrap/2, maybe_wrap/2]).
-export_type([step/1, stream/1]).

-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.

?MODULEDOC(
    " Main module of the `aqueduct` library. Contains all of the type definitions and public\n"
    " functions for operating on them.\n"
    " \n"
    " The primary use cases of this library are creating [`Stream`s](aqueduct.html#Stream)\n"
    " that emit successive values by executing some side effect (such as reading a file,\n"
    " consuming a message, awaiting user input, etc), and wrapping them in successive\n"
    " operations through [`map`](aqueduct.html#map), [`filter`](aqueduct.html#filter),\n"
    " [`filter_map`](aqueduct.html#filter_map), which are only executed whenever the internal\n"
    " `step` function of a `Stream` is called.\n"
    " \n"
    " It is extremely similar to the [`gleam_yielder`](https://gleam-yielder.hexdocs.pm) library,\n"
    " but the one advantage this library offers over the `yielder`, is that the fundamental data\n"
    " types provided ([`Stream`](aqueduct.html#Stream) and [`Step`](aqueduct.html#Step)) are not\n"
    " `opaque`, and thus the user can freely extend the provided functionality by simply\n"
    " implementing their own functions that operate on `Stream`s.\n"
    " \n"
    " ## Introduction\n"
    " To start, you first need to create a [`Stream`](aqueduct.html#Stream):\n"
    " \n"
    " ```gleam\n"
    " aqueduct.single(\"element\")\n"
    " ```\n"
    " And then do something with it:\n"
    " ```gleam\n"
    " assert aqueduct.next(stream) == Next(Stream(String), \"element\")\n"
    " ```\n"
    " \n"
    " You can also transform the values:\n"
    " ```gleam\n"
    " stream\n"
    " |> aqueduct.map(int.parse)\n"
    " |> aqueduct.collect()\n"
    " // -> [Error(Nil)]\n"
    " ```\n"
    " and filter them:\n"
    " ```gleam\n"
    " stream\n"
    " |> aqueduct.filter_map(int.parse)\n"
    " |> aqueduct.collect()\n"
    " // -> []\n"
    " ```\n"
    " > Note: Take care when filtering infinite `Stream`s, as the `Stream` produced from filtering\n"
    "   traverses the original `Stream` element by element, trying to find the next one that passes\n"
    "   the specified predicate. If *none* of the elements in an infinite `Stream` pass, trying to\n"
    "   get the next element will never terminate.\n"
    " \n"
    " ### Cutting apart a given input\n"
    " For the feature I'm proudest of in this package, take a\n"
    " look at [`from_divider`](aqueduct.html#from_divider).\n"
    " \n"
    " It takes in a `source` and a function that cuts that source into smaller pieces that are\n"
    " then emitted as elements by the resulting `Stream`.\n"
    " \n"
    " In [`mesv`](https://github.com/Octaeon/mesv), I used this function to create a\n"
    " `Stream(String)` of rows in a file by splitting a `String` on newlines that were not\n"
    " somewhere between two 'escapers'.\n"
    " \n"
    " Specifically, the code of the function was thus:\n"
    " ```gleam\n"
    " aqueduct.from_divider(\n"
    "   source,\n"
    "   util.take_until_unescaped(row_separator, not_in: escaper),\n"
    " )\n"
    " ```\n"
    " And the code of the function used to split the source was:\n"
    " ```gleam\n"
    " // util.gleam\n"
    " pub fn take_until_unescaped(\n"
    "   separator el: String,\n"
    "   not_in escaper: String,\n"
    " ) -> fn(String) -> Result(#(String, String), String) {\n"
    "   fn(source: String) {\n"
    "     take_until_unescaped_loop(source, el, escaper, None)\n"
    "     |> result.map(pair.swap)\n"
    "     |> result.map_error(fn(_) { source })\n"
    "   }\n"
    " }\n"
    " \n"
    " fn take_until_unescaped_loop(\n"
    "   from: String,\n"
    "   separator: String,\n"
    "   esc: String,\n"
    "   acc: Option(String),\n"
    " ) -> Result(#(String, String), Nil) {\n"
    "   case string.split_once(from, on: separator) {\n"
    "     Ok(#(head, rest)) -> {\n"
    "       let value = case acc {\n"
    "         Some(str) -> str <> separator <> head\n"
    "         None -> head\n"
    "       }\n"
    "       case count_non_overlapping(in: value, of: esc) % 2 == 0 {\n"
    "         True -> Ok(#(value, rest))\n"
    "         False ->\n"
    "           take_until_unescaped_loop(\n"
    "             rest,\n"
    "             separator,\n"
    "             esc,\n"
    "             Some(value),\n"
    "           )\n"
    "       }\n"
    "     }\n"
    "     Error(Nil) -> Error(Nil)\n"
    "   }\n"
    " }\n"
    " ```\n"
    " \n"
    " For specifics on this function's behaviour, look at its' documentation.\n"
    " \n"
).

-type step(DKT) :: {next, stream(DKT), DKT} | done.

-type stream(DKU) :: {stream, fun(() -> step(DKU))}.

-file("src\\aqueduct.gleam", 172).
?DOC(
    " Create an empty `Stream` that always returns `Done` for the next step.\n"
    " \n"
    " Since it never returns a `Next` variant of the [`Step`](aqueduct.html#Step) type,\n"
    " it does not have any type specialization, and can thus be used with any type of\n"
    " specialized `Stream`.\n"
).
-spec empty() -> stream(any()).
empty() ->
    {stream, fun() -> done end}.

-file("src\\aqueduct.gleam", 178).
?DOC(" Create a `Stream` with a singular element that it returns once, then finishes.\n").
-spec single(DKX) -> stream(DKX).
single(El) ->
    {stream, fun() -> {next, empty(), El} end}.

-file("src\\aqueduct.gleam", 194).
?DOC(
    " Create a `Stream` from a `List` of elements, which it will output in order,\n"
    " after which it will be finished.\n"
    " \n"
    " ## Note\n"
    " This splitting is done lazily - internally, the function encapsulated in the `Stream`\n"
    " deconstructs the provided `List`.\n"
    " \n"
    " If the `List` is empty, it returs `Done` for the next step;\n"
    " \n"
    " If it has at least one element, it returns a `Next` Step, with the `Stream` being a\n"
    " recursive call to this function with the tail of the `List`.\n"
).
-spec from_list(list(DKZ)) -> stream(DKZ).
from_list(From) ->
    {stream, fun() -> case From of
                [] ->
                    done;

                [Head | Rest] ->
                    {next, from_list(Rest), Head}
            end end}.

-file("src\\aqueduct.gleam", 216).
?DOC(
    " Create a `Stream` from a provided iterating function and an initial value, which will\n"
    " forever return the next element as obtained by calling the iterating function on\n"
    " the current element.\n"
    " \n"
    " The initial value provided is the first element of the returned `Stream`.\n"
    " \n"
    " ## Note\n"
    " The resulting `Stream` will never return `Done` - it will go on forever.\n"
    " \n"
    " Thus, if you try and traverse the entirety of this `Stream` (using functions such as\n"
    " [`collect`](aqueduct.html#collect), [`foldl`](aqueduct.html#foldl), or\n"
    " [`each`](aqueduct.html#each)), that function call will never terminate.\n"
).
-spec from_iterator(fun((DLC) -> DLC), DLC) -> stream(DLC).
from_iterator(Iter, Initial) ->
    {stream, fun() -> {next, from_iterator(Iter, Iter(Initial)), Initial} end}.

-file("src\\aqueduct.gleam", 258).
?DOC(
    " Create a `Stream` from a given `source`, as well as a function that takes `chunk`s out\n"
    " of the source and returns it, diminished in some way (or not).\n"
    " \n"
    " The signature of the `chunk` argument is `fn(a) -> Result(#(a, b), b)`,\n"
    " and the produced stream is `Stream(b)`.\n"
    " \n"
    " The `Result` type here is used to indicate wheter the `source` has run out of content or not.\n"
    " - If the `chunk` function returns `Ok(#(a, b))`, it means that there is still some stuff left\n"
    "   in the `source`, so keep going;\n"
    " - If the `chunk` function returns `Error(b)`, it means that the entirety of the `source`\n"
    "   has been consumed, and the `b` is the last element emitted.\n"
    " \n"
    " ## Use cases\n"
    " Due to the type signature of this function, the `source` and output types can be different,\n"
    " making it possible to directly transform the source somehow.\n"
    " \n"
    " In my case, I used this function to create a `Stream` that emits successive rows in a `String`\n"
    " by splitting on `\\n`, as long as they're not inside of a cell wrapped in doublequotes.\n"
    " \n"
    " Or, if you have a producer of a byte stream, the chunk function can at the same time request\n"
    " more, cut it off at appropriate points, and parse it.\n"
    " \n"
    " In another example, this could be used to implement a custom\n"
    " [`from_iterator`](aqueduct.html#from_iterator) function by returning an iterated\n"
    " value with each `chunk`.\n"
    " \n"
    " ## Note\n"
    " It's important to mention that the `source` provided to this function cannot be extracted\n"
    " from within the resulting `Stream`, without consuing the `Stream` in its' entirety and then\n"
    " reversing the `chunk` function to collect it.\n"
    " \n"
    " Lastly, also keep in mind that blocking operations called from within the `chunk` function\n"
    " will block the process from which the `Stream` was consumed to get the next value;\n"
    " However, the `Stream` has no idea whether the provided function is blocking or not.\n"
    " \n"
    " As such, if you create a blocking `Stream`, it's up to you as the user to remember that\n"
    " it is blocking.\n"
).
-spec from_divider(DLE, fun((DLE) -> {ok, {DLE, DLF}} | {error, DLF})) -> stream(DLF).
from_divider(Source, Chunk) ->
    {stream, fun() -> case Chunk(Source) of
                {ok, {Source@1, Value}} ->
                    {next, from_divider(Source@1, Chunk), Value};

                {error, Last_value} ->
                    {next, empty(), Last_value}
            end end}.

-file("src\\aqueduct.gleam", 272).
?DOC(" Create an infinite `Stream` that only ever emits this single value and never emits `Done`.\n").
-spec repeat(DLJ) -> stream(DLJ).
repeat(Element) ->
    {stream, fun() -> {next, repeat(Element), Element} end}.

-file("src\\aqueduct.gleam", 586).
?DOC(
    " Get the next value from the `Stream`.\n"
    " \n"
    " ## Note\n"
    " If the `Stream` was constructed from a function, that function is called to produce that\n"
    " value, so if that function never terminates, then this function will also never terminate.\n"
).
-spec next(stream(DOB)) -> step(DOB).
next(Stream) ->
    {stream, Step} = Stream,
    Step().

-file("src\\aqueduct.gleam", 295).
-spec repeat_list_loop(list(DLQ), stream(DLQ)) -> stream(DLQ).
repeat_list_loop(Repeat, Acc) ->
    {stream, fun() -> case next(Acc) of
                {next, Rest, Element} ->
                    {next, repeat_list_loop(Repeat, Rest), Element};

                done ->
                    next(repeat_list_loop(Repeat, from_list(Repeat)))
            end end}.

-file("src\\aqueduct.gleam", 288).
?DOC(
    " Create an infinite `Stream` that only ever emits the input `List` in a loop.\n"
    " \n"
    " In most cases it's quivalent to [`repeat_stream`](aqueduct.html#repeat_stream), but with\n"
    " the added constraint of the fact that a `List` cannot be infinite in length, so it's\n"
    " guaranteed that the provided `List` will cycle.\n"
    " \n"
    " ## Note\n"
    " This function returns a `Result` because it doesn't make sense to create an infinite `Stream`\n"
    " by repeating elements from an empty `List`.\n"
    " \n"
    " So this function returns `Error(Nil)` if the provided `List` is length 0.\n"
).
-spec repeat_list(list(DLL)) -> {ok, stream(DLL)} | {error, nil}.
repeat_list(From) ->
    case From of
        [] ->
            {error, nil};

        Non_empty ->
            {ok, repeat_list_loop(Non_empty, from_list(Non_empty))}
    end.

-file("src\\aqueduct.gleam", 758).
?DOC(
    " Add the provided element to the **start** of the Stream - it will be the next element shown.\n"
    " \n"
    " Useful when the `Stream` you made is not 'pure' - ie, it executes some side effect in order\n"
    " to obtain the next element. In such a case, it's not guaranteed that after calling\n"
    " [`next`](aqueduct.html#next) on such a `Stream`, and then returning to using the original\n"
    " value would behave the same as expected.\n"
    " \n"
    " Thus, if you wish to write generic functions that operate on `Stream`s and wish to reverse a\n"
    " `Step`, instead of reusing the old value, you should prefer to take the new `Stream` and the\n"
    " value from the `Next` variant of a `Step` and use this function to prepend that value\n"
    " to the new stream.\n"
).
-spec prepend(stream(DPD), DPD) -> stream(DPD).
prepend(Stream, Element) ->
    {stream, fun() -> {next, Stream, Element} end}.

-file("src\\aqueduct.gleam", 328).
-spec repeat_stream_loop(stream(DLZ), stream(DLZ)) -> stream(DLZ).
repeat_stream_loop(Repeat, Acc) ->
    {stream, fun() -> case next(Acc) of
                {next, Stream, Value} ->
                    {next, repeat_stream_loop(Repeat, Stream), Value};

                done ->
                    next(repeat_stream_loop(Repeat, Repeat))
            end end}.

-file("src\\aqueduct.gleam", 318).
?DOC(
    " Transform a finite `Stream` into an infinite one by making it loop forever.\n"
    " \n"
    " It can also be used with infinite `Stream`s, but since this function works by waiting until\n"
    " the stream is finished then replacing the `Done` value with another instance of the input\n"
    " `Stream`, using it on infinite `Stream`s is pointless, since they never return `Done`.\n"
    " \n"
    " ## Note\n"
    " This function returns a `Result` because it's impossible to convert an empty `Stream` into\n"
    " an infinite one, since there are no elements to infinitely repeat.\n"
    " \n"
    " If this check were not in place, passing in an empty `Stream` would create a `Stream`\n"
    " that would never emit the next element, as trying to call the internal function would\n"
    " start an infinite recursive loop.\n"
).
-spec repeat_stream(stream(DLU)) -> {ok, stream(DLU)} | {error, nil}.
repeat_stream(Stream) ->
    case next(Stream) of
        {next, New_stream, Element} ->
            {ok, repeat_stream_loop(prepend(New_stream, Element), empty())};

        done ->
            {error, nil}
    end.

-file("src\\aqueduct.gleam", 349).
-spec collect_loop(stream(DMG), list(DMG)) -> list(DMG).
collect_loop(Stream, Acc) ->
    case next(Stream) of
        {next, Rest, Element} ->
            collect_loop(Rest, [Element | Acc]);

        done ->
            lists:reverse(Acc)
    end.

-file("src\\aqueduct.gleam", 345).
?DOC(
    " Consume the provided `Stream` and collect all of the values into a `List`.\n"
    " \n"
    " ## Note\n"
    " As this function attempts to eagerly evaluate all of the elements until it encounters the\n"
    " `Done` next step, if called on an infinite `Stream`, it will never terminate.\n"
).
-spec collect(stream(DMD)) -> list(DMD).
collect(Stream) ->
    collect_loop(Stream, []).

-file("src\\aqueduct.gleam", 381).
-spec collect_count_loop(stream(DMN), integer(), list(DMN)) -> list(DMN).
collect_count_loop(Stream, Count, Acc) ->
    case Count of
        C when C =< 0 ->
            lists:reverse(Acc);

        Positive_count ->
            case next(Stream) of
                {next, Rest, Element} ->
                    collect_count_loop(
                        Rest,
                        Positive_count - 1,
                        [Element | Acc]
                    );

                done ->
                    lists:reverse(Acc)
            end
    end.

-file("src\\aqueduct.gleam", 377).
?DOC(
    " Consume the provided `Stream` and collect `count` number of values into a `List`.\n"
    " \n"
    " Since this function has a built in limit, as long as all of the elements in the `Stream`\n"
    " can be evaluated and terminate, it will also terminate, even if the `Stream` is infinite.\n"
    " \n"
    " Basically, using this function protects you against the infinite length of the `Stream`,\n"
    " but cannot protect you against potentially infinite requirements of the internal\n"
    " function of the `Stream`.\n"
    " \n"
    " Such a `Stream` with infinite requirements of the internal function would be created\n"
    " by something like this:\n"
    " ```gleam\n"
    " aqueduct.from_iterator(fn(num) { num + 2 }, 1)\n"
    " |> aqueduct.filter(fn(num) { num % 2 == 0 })\n"
    " ```\n"
    " This creates a `Stream` that emits odd numbers, while filtering all non-even numbers out.\n"
    " Trying to call [`next`](aqueduct.html#next) on this `Stream` would never terminate\n"
    " (as the [`filter`](aqueduct.html#filter) function would recursively call itself to get\n"
    " the next element ad infinitum), and since this function uses `next` internally,\n"
    " it too would never terminate.\n"
).
-spec collect_count(stream(DMK), integer()) -> list(DMK).
collect_count(Stream, Count) ->
    collect_count_loop(Stream, Count, []).

-file("src\\aqueduct.gleam", 421).
-spec collect_until_loop(stream(DMU), fun((DMU) -> boolean()), list(DMU)) -> list(DMU).
collect_until_loop(Stream, Stop, Acc) ->
    case next(Stream) of
        {next, Rest, Element} ->
            case Stop(Element) of
                true ->
                    lists:reverse(Acc);

                false ->
                    collect_until_loop(Rest, Stop, [Element | Acc])
            end;

        done ->
            lists:reverse(Acc)
    end.

-file("src\\aqueduct.gleam", 417).
?DOC(
    " Consume the provided `Stream` and collect elements into a `List` until the provided `stop`\n"
    " function encounters an element for which it returns `True`, or the `Stream` ends.\n"
    " \n"
    " Since this function has a built in limit, as long as all of the elements in the `Stream`\n"
    " can be evaluated and terminate and there exists an element for which the provided `stop`\n"
    " function returns `True`, it will also terminate, even if the `Stream` is infinite.\n"
    " \n"
    " Basically, using this function protects you against the infinite length of the `Stream`,\n"
    " but cannot protect you against potentially infinite requirements of the internal\n"
    " function of the `Stream`.\n"
    " \n"
    " ## Example\n"
    " Such a `Stream` with infinite requirements of the internal function would be created\n"
    " by something like this:\n"
    " ```gleam\n"
    " aqueduct.from_iterator(fn(num) { num + 2 }, 1)\n"
    " |> aqueduct.filter(fn(num) { num % 2 == 0 })\n"
    " ```\n"
    " This creates a `Stream` that emits odd numbers, while filtering all non-even numbers out.\n"
    " Trying to call [`next`](aqueduct.html#next) on this `Stream` would never terminate\n"
    " (as the [`filter`](aqueduct.html#filter) function would recursively call itself to get the\n"
    " next element ad infinitum), and since this function uses `next` internally, it too\n"
    " would never terminate.\n"
).
-spec collect_until(stream(DMR), fun((DMR) -> boolean())) -> list(DMR).
collect_until(Stream, Stop) ->
    collect_until_loop(Stream, Stop, []).

-file("src\\aqueduct.gleam", 443).
?DOC(
    " Consume all of the elements of a `Stream` and fold them into a single value,\n"
    " using the provided function and initial accumulator.\n"
    " \n"
    " ## Note\n"
    " Since this function tries to collect all of the elements of the input `Stream`,\n"
    " if the `Stream` is infinite, then it will never terminate.\n"
).
-spec foldl(stream(DMY), fun((DMY, DNA) -> DNA), DNA) -> DNA.
foldl(Stream, Fun, Acc) ->
    case next(Stream) of
        {next, Rest, Element} ->
            foldl(Rest, Fun, Fun(Element, Acc));

        done ->
            Acc
    end.

-file("src\\aqueduct.gleam", 462).
?DOC(
    " Consume `count` number of the elements of a `Stream` and fold them into a single value,\n"
    " using the provided function and initial accumulator.\n"
    " \n"
    " This function returns a Pair:\n"
    " - The `Stream` with `count` elements removed (if the `Stream` was shorter than `count`,\n"
    "   it is an [`empty`](aqueduct.html#empty) `Stream`)\n"
    " - The folded value.\n"
    " \n"
    " ## Note\n"
    " This function is safe to use with infinite `Stream`s, since it has a built in termination\n"
    " point - it takes only `count` elements, and returns the folded result of those elements.\n"
).
-spec foldl_count(stream(DNB), integer(), fun((DNB, DND) -> DND), DND) -> {stream(DNB),
    DND}.
foldl_count(Stream, Count, Fun, Acc) ->
    case Count of
        C when C =< 0 ->
            {Stream, Acc};

        Positive_count ->
            case next(Stream) of
                {next, Rest, Element} ->
                    foldl_count(
                        Rest,
                        Positive_count - 1,
                        Fun,
                        Fun(Element, Acc)
                    );

                done ->
                    {empty(), Acc}
            end
    end.

-file("src\\aqueduct.gleam", 498).
?DOC(
    " Consume all of the elements of a `Stream` and join them into a single value,\n"
    " using the provided function.\n"
    " \n"
    " If the stream is empty, return `Error(Nil)`, and if there's only a single value,\n"
    " return that. Only if there are two or more elements is the function called.\n"
    " \n"
    " ## Use case\n"
    " I made this function to imitate the output of the `string.join` function, but since unlike\n"
    " `String`s, this function works for an arbitrary element, I can't just return an empty\n"
    " string like `string.join` does.\n"
    " \n"
    " So, under the hood, this function just gets the next step of the `Stream` once, and then\n"
    " calls [`foldl`](aqueduct.html#foldl), with the initial accumulator being the first element\n"
    " of the `Stream`.\n"
    " \n"
    " ## Note\n"
    " Since this function tries to collect all of the elements of the input `Stream`,\n"
    " if the `Stream` is infinite, then it will never terminate.\n"
).
-spec join(stream(DNF), fun((DNF, DNF) -> DNF)) -> {ok, DNF} | {error, nil}.
join(Stream, Fun) ->
    case next(Stream) of
        {next, Rest, Element} ->
            {ok, foldl(Rest, Fun, Element)};

        done ->
            {error, nil}
    end.

-file("src\\aqueduct.gleam", 521).
-spec take_count_loop(stream(DNN), integer(), list(DNN)) -> {stream(DNN),
    list(DNN)}.
take_count_loop(Stream, Count, Acc) ->
    case Count of
        C when C =< 0 ->
            {Stream, lists:reverse(Acc)};

        Positive_count ->
            case next(Stream) of
                {next, Rest, Element} ->
                    take_count_loop(Rest, Positive_count - 1, [Element | Acc]);

                done ->
                    {empty(), lists:reverse(Acc)}
            end
    end.

-file("src\\aqueduct.gleam", 517).
?DOC(
    " Collect `count` number of elements from the `Stream` into a `List`, and return both the\n"
    " `List` and the `Stream` without the collected elements.\n"
    " \n"
    " Use this function over [`collect_count`](aqueduct.html#collect_count) if you don't\n"
    " want to discard the `Stream` after taking the specified number of elements.\n"
    " \n"
    " ## Note\n"
    " This is done by recursively traversing the `Stream` output while decrementing `count`\n"
    " until it reaches 0.\n"
    " \n"
    " As such, if trying to call [`next`](aqueduct.html#next) doesn't terminate, neither will this function.\n"
).
-spec take_count(stream(DNJ), integer()) -> {stream(DNJ), list(DNJ)}.
take_count(Stream, Count) ->
    take_count_loop(Stream, Count, []).

-file("src\\aqueduct.gleam", 565).
-spec take_until_loop(stream(DNW), fun((DNW) -> boolean()), list(DNW)) -> {stream(DNW),
    list(DNW)}.
take_until_loop(Stream, Stop, Acc) ->
    case next(Stream) of
        {next, Rest, Element} ->
            case Stop(Element) of
                true ->
                    {prepend(Rest, Element), lists:reverse(Acc)};

                false ->
                    take_until_loop(Rest, Stop, [Element | Acc])
            end;

        done ->
            {empty(), lists:reverse(Acc)}
    end.

-file("src\\aqueduct.gleam", 558).
?DOC(
    " Collect the values inside of the `Stream` into the List until an element evaluates\n"
    " `True` when passed into the `stop` argument.\n"
    " \n"
    " When an element evaluates `True`, the function ends, and returns the List containing all\n"
    " of the previous elements **without** that one, and the `Stream` which **does**\n"
    " contain that element.\n"
    " \n"
    " ## Note\n"
    " This is done by recursively traversing the `Stream` output until we encounter an element\n"
    " that evaluates to `True` when passed to the `stop` function, and then prepending\n"
    " that element to the `Stream`.\n"
    " \n"
    " As such, if your `Stream` is created from a function that executes some side-effect to\n"
    " obtain the next value, if you use this function, the very next iteration of this `Stream`\n"
    " will not execute that operation, since it has the output of that function stored inside of it.\n"
    " \n"
    " The simplest example is if you had a `Stream` that returned the system time whenever you\n"
    " called it. Then, if you for some reason used this function, the very next element you'd\n"
    " get from this `Stream` would be the time in the past, when the function was evaluated\n"
    " inside of this function.\n"
).
-spec take_until(stream(DNS), fun((DNS) -> boolean())) -> {stream(DNS),
    list(DNS)}.
take_until(Stream, Stop) ->
    take_until_loop(Stream, Stop, []).

-file("src\\aqueduct.gleam", 598).
?DOC(
    " Drop `count` elements from the beginning of the `Stream`.\n"
    " \n"
    " If there are no elements left (the [`next`](aqueduct.html#next) function returned `Done`),\n"
    " an empty `Stream` is returned.\n"
).
-spec drop(stream(DOE), integer()) -> stream(DOE).
drop(Stream, Count) ->
    case Count of
        C when C =< 0 ->
            Stream;

        Positive_count ->
            case next(Stream) of
                {next, Rest, _} ->
                    drop(Rest, Positive_count - 1);

                done ->
                    empty()
            end
    end.

-file("src\\aqueduct.gleam", 621).
?DOC(
    " Transform the provided `Stream` using the given function.\n"
    " \n"
    " This is done lazily - a new `Stream` is constructed, whose internal function just gets the\n"
    " next [`Step`](aqueduct.html#Step) from the old `Stream`, transforms the emitted element using\n"
    " the provided function, and constructs a new `Stream` by calling itself on the one that was\n"
    " returned inside of the [`Step`](aqueduct.html#Step) type.\n"
    " \n"
    " As such, it is safe to use with infinite `Stream`s, since each step from the input `Stream`\n"
    " that is called (importantly **that terminated** already) is guaranteed to return in finite\n"
    " time... as long as the function provided takes finite time.\n"
).
-spec map(stream(DOH), fun((DOH) -> DOJ)) -> stream(DOJ).
map(Stream, Fun) ->
    {stream, fun() -> case next(Stream) of
                {next, Rest, Element} ->
                    {next, map(Rest, Fun), Fun(Element)};

                done ->
                    done
            end end}.

-file("src\\aqueduct.gleam", 640).
?DOC(
    " Combines two `Stream`s into another using the given function.\n"
    " \n"
    " This is done lazily - a new `Stream` is constructed, whose internal function just gets the\n"
    " next [`Step`](aqueduct.html#Step) from the old `Stream`s, and if both return a `Next` variant,\n"
    " return a `Next` step by calling itself recursively on the new streams and call the function\n"
    " on the two values.\n"
    " \n"
    " Thus, if either of the `Stream`s end, the resulting `Stream` ends - in short, the length\n"
    " of the resulting `Stream` is the minimum of the length of the two input `Stream`s.\n"
).
-spec map2(stream(DOL), stream(DON), fun((DOL, DON) -> DOP)) -> stream(DOP).
map2(First, Second, Fun) ->
    {stream, fun() -> case {next(First), next(Second)} of
                {{next, Rest_first, Element_first},
                    {next, Rest_second, Element_second}} ->
                    {next,
                        map2(Rest_first, Rest_second, Fun),
                        Fun(Element_first, Element_second)};

                {_, _} ->
                    done
            end end}.

-file("src\\aqueduct.gleam", 674).
?DOC(
    " Transform a `Stream` to only retain values that match the predicate.\n"
    " \n"
    " ## Note\n"
    " This function works by creating a new `Stream`, which for every element that is requested,\n"
    " requests an element from the provided `Stream`.\n"
    " \n"
    " If the returned element passes the function, then it stops there and returns itself;\n"
    " \n"
    " If it does not, then it recursively calls itself until the element passes.\n"
    " \n"
    " As such, if a `Stream` created using this function were to be based on an infinite `Stream`\n"
    " and the values that pass were extremely rare, then calling [`next`](aqueduct.html#next)\n"
    " on such a `Stream` would take a long time.\n"
    " \n"
    " Furthermore, if no elements in the input infinite `Stream` were to pass the predicate,\n"
    " then calling [`next`](aqueduct.html#next) on such a `Stream` will never return.\n"
).
-spec filter(stream(DOR), fun((DOR) -> boolean())) -> stream(DOR).
filter(Stream, Predicate) ->
    {stream, fun() -> case next(Stream) of
                {next, Rest, Element} ->
                    case Predicate(Element) of
                        true ->
                            {next, filter(Rest, Predicate), Element};

                        false ->
                            next(filter(Rest, Predicate))
                    end;

                done ->
                    done
            end end}.

-file("src\\aqueduct.gleam", 710).
?DOC(
    " Transform a `Stream` to only retain values that are returned in an `Ok` variant\n"
    " of the `Result` type.\n"
    " \n"
    " Basically equivalent to the composition of [`filter`](aqueduct.html#filter) and\n"
    " [`map`](aqueduct.html#map), just a bit more optimized.\n"
    " \n"
    " ## Note\n"
    " This function works by creating a new `Stream`, which for every element that is\n"
    " requested, requests an element from the provided `Stream`.\n"
    " \n"
    " If the returned element passes the function, then it stops there and returns\n"
    " that transformed element and calls itself on the returned `Stream`;\n"
    " \n"
    " If it does not, then it recursively calls itself until the element passes.\n"
    " \n"
    " As such, if a `Stream` created using this function were to be based on an infinite\n"
    " `Stream` and the values that pass were extremely rare, then calling\n"
    " [`next`](aqueduct.html#next) on such a `Stream` would take a long time.\n"
    " \n"
    " Furthermore, if no elements in the input infinite `Stream` were to pass the predicate,\n"
    " then calling [`next`](aqueduct.html#next) on such a `Stream` will never return.\n"
).
-spec filter_map(stream(DOU), fun((DOU) -> {ok, DOW} | {error, any()})) -> stream(DOW).
filter_map(Stream, Predicate) ->
    {stream, fun() -> case next(Stream) of
                {next, Rest, Element} ->
                    case Predicate(Element) of
                        {ok, New_element} ->
                            {next, filter_map(Rest, Predicate), New_element};

                        {error, _} ->
                            next(filter_map(Rest, Predicate))
                    end;

                done ->
                    done
            end end}.

-file("src\\aqueduct.gleam", 736).
?DOC(
    " Consume the provided `Stream` and execute a function on each element, eagerly consuming them.\n"
    " \n"
    " Use to execute side-effects based on the values in the `Stream` using functions that cannot\n"
    " fail, when you don't care about the consumed values on their own afterwards.\n"
    " \n"
    " ## Note\n"
    " If the provided `Stream` is infinite, this function will never return, and constantly\n"
    " execute side effects on successive elements emitted by the `Stream`.\n"
).
-spec each(stream(DPB), fun((DPB) -> nil)) -> nil.
each(Stream, Fun) ->
    case next(Stream) of
        {next, Rest, Element} ->
            Fun(Element),
            each(Rest, Fun);

        done ->
            nil
    end.

-file("src\\aqueduct.gleam", 767).
?DOC(
    " If the provided element is `Some`, add it to the **start** of the Stream - it will be\n"
    " the next element shown.\n"
    " \n"
    " Otherwise, return the `Stream` unchanged.\n"
).
-spec maybe_prepend(stream(DPG), gleam@option:option(DPG)) -> stream(DPG).
maybe_prepend(Stream, Maybe_element) ->
    case Maybe_element of
        {some, El} ->
            prepend(Stream, El);

        none ->
            Stream
    end.

-file("src\\aqueduct.gleam", 783).
?DOC(
    " Add the provided element to the **end** of the Stream - it will be the last element shown.\n"
    " \n"
    " ## Note\n"
    " If the stream is infinite, this element will never be returned, since this function only\n"
    " waits until the stream returns `Done`, and then replaces that `Done`\n"
    " with `Next(empty(), element)`.\n"
    " \n"
    " So if the `Stream` never returns `Done`, that element will never be returned.\n"
).
-spec append(stream(DPK), DPK) -> stream(DPK).
append(Stream, Element) ->
    {stream, fun() -> case next(Stream) of
                {next, Rest, Element@1} ->
                    {next, append(Rest, Element@1), Element@1};

                done ->
                    {next, empty(), Element}
            end end}.

-file("src\\aqueduct.gleam", 804).
?DOC(
    " If the provided element is `Some`, it to the **end** of the Stream - it will be the\n"
    " last element shown.\n"
    " \n"
    " Otherwise, return the `Stream` unchanged.\n"
    " \n"
    " ## Note\n"
    " If the stream is infinite, this element will never be returned, since this function only\n"
    " waits until the stream returns `Done`, and then replaces that `Done` with\n"
    " `Next(empty(), element)`.\n"
    " \n"
    " So if the `Stream` never returns done, that element will never be returned.\n"
).
-spec maybe_append(stream(DPN), gleam@option:option(DPN)) -> stream(DPN).
maybe_append(Stream, Maybe_element) ->
    case Maybe_element of
        {some, Element} ->
            append(Stream, Element);

        none ->
            Stream
    end.

-file("src\\aqueduct.gleam", 816).
?DOC(
    " Concatonate two `Stream`s that emit the same type of element together.\n"
    " \n"
    " ## Note\n"
    " If the first `Stream` is infinite, then the second `Stream` will never appear.\n"
).
-spec concat(stream(DPR), stream(DPR)) -> stream(DPR).
concat(First, Second) ->
    {stream, fun() -> case next(First) of
                {next, Rest, Element} ->
                    {next, concat(Rest, Second), Element};

                done ->
                    next(Second)
            end end}.

-file("src\\aqueduct.gleam", 830).
?DOC(
    " Helper function to both *prepend* and *append* the provided element to the `Stream`.\n"
    " \n"
    " It is basically equivalent to first calling the [`append`](aqueduct.html#append) function,\n"
    " then the [`prepend`](aqueduct.html#prepend) function.\n"
).
-spec wrap(stream(DPV), DPV) -> stream(DPV).
wrap(Stream, Element) ->
    {stream, fun() -> {next, append(Stream, Element), Element} end}.

-file("src\\aqueduct.gleam", 843).
?DOC(
    " Helper function to both *prepend* and *append* the provided `Option(element)` to the `Stream`.\n"
    " \n"
    " If the provided element is `None`, the stream is returned unmodified.\n"
    " \n"
    " If it's `Some`, it just calls [`wrap`](aqueduct.html#wrap).\n"
    " \n"
    " It is basically equivalent to first calling the [`maybe_append`](aqueduct.html#maybe_append)\n"
    " function, then the [`maybe_prepend`](aqueduct.html#maybe_prepend) function.\n"
).
-spec maybe_wrap(stream(DPY), gleam@option:option(DPY)) -> stream(DPY).
maybe_wrap(Stream, Maybe_element) ->
    case Maybe_element of
        {some, Element} ->
            wrap(Stream, Element);

        none ->
            Stream
    end.