-module(aqueduct).
-compile([no_auto_import, nowarn_unused_vars, nowarn_unused_function, nowarn_nomatch, inline]).
-define(FILEPATH, "src\\aqueduct.gleam").
-export([empty/0, single/1, from_list/1, from_iterator/2, from_divider/2, repeat/1, next/1, repeat_list/1, prepend/2, repeat_stream/1, collect/1, collect_count/2, collect_until/2, foldl/3, foldl_count/4, join/2, take_count/2, take_until/2, drop/2, map/2, map2/3, filter/2, filter_map/2, each/2, maybe_prepend/2, append/2, maybe_append/2, concat/2, wrap/2, maybe_wrap/2]).
-export_type([step/1, stream/1]).
-if(?OTP_RELEASE >= 27).
-define(MODULEDOC(Str), -moduledoc(Str)).
-define(DOC(Str), -doc(Str)).
-else.
-define(MODULEDOC(Str), -compile([])).
-define(DOC(Str), -compile([])).
-endif.
?MODULEDOC(
" Main module of the `aqueduct` library. Contains all of the type definitions and public\n"
" functions for operating on them.\n"
" \n"
" The primary use cases of this library are creating [`Stream`s](aqueduct.html#Stream)\n"
" that emit successive values by executing some side effect (such as reading a file,\n"
" consuming a message, awaiting user input, etc), and wrapping them in successive\n"
" operations through [`map`](aqueduct.html#map), [`filter`](aqueduct.html#filter),\n"
" [`filter_map`](aqueduct.html#filter_map), which are only executed whenever the internal\n"
" `step` function of a `Stream` is called.\n"
" \n"
" It is extremely similar to the [`gleam_yielder`](https://gleam-yielder.hexdocs.pm) library,\n"
" but the one advantage this library offers over the `yielder`, is that the fundamental data\n"
" types provided ([`Stream`](aqueduct.html#Stream) and [`Step`](aqueduct.html#Step)) are not\n"
" `opaque`, and thus the user can freely extend the provided functionality by simply\n"
" implementing their own functions that operate on `Stream`s.\n"
" \n"
" ## Introduction\n"
" To start, you first need to create a [`Stream`](aqueduct.html#Stream):\n"
" \n"
" ```gleam\n"
" aqueduct.single(\"element\")\n"
" ```\n"
" And then do something with it:\n"
" ```gleam\n"
" assert aqueduct.next(stream) == Next(Stream(String), \"element\")\n"
" ```\n"
" \n"
" You can also transform the values:\n"
" ```gleam\n"
" stream\n"
" |> aqueduct.map(int.parse)\n"
" |> aqueduct.collect()\n"
" // -> [Error(Nil)]\n"
" ```\n"
" and filter them:\n"
" ```gleam\n"
" stream\n"
" |> aqueduct.filter_map(int.parse)\n"
" |> aqueduct.collect()\n"
" // -> []\n"
" ```\n"
" > Note: Take care when filtering infinite `Stream`s, as the `Stream` produced from filtering\n"
" traverses the original `Stream` element by element, trying to find the next one that passes\n"
" the specified predicate. If *none* of the elements in an infinite `Stream` pass, trying to\n"
" get the next element will never terminate.\n"
" \n"
" ### Cutting apart a given input\n"
" For the feature I'm proudest of in this package, take a\n"
" look at [`from_divider`](aqueduct.html#from_divider).\n"
" \n"
" It takes in a `source` and a function that cuts that source into smaller pieces that are\n"
" then emitted as elements by the resulting `Stream`.\n"
" \n"
" In [`mesv`](https://github.com/Octaeon/mesv), I used this function to create a\n"
" `Stream(String)` of rows in a file by splitting a `String` on newlines that were not\n"
" somewhere between two 'escapers'.\n"
" \n"
" Specifically, the code of the function was thus:\n"
" ```gleam\n"
" aqueduct.from_divider(\n"
" source,\n"
" util.take_until_unescaped(row_separator, not_in: escaper),\n"
" )\n"
" ```\n"
" And the code of the function used to split the source was:\n"
" ```gleam\n"
" // util.gleam\n"
" pub fn take_until_unescaped(\n"
" separator el: String,\n"
" not_in escaper: String,\n"
" ) -> fn(String) -> Result(#(String, String), String) {\n"
" fn(source: String) {\n"
" take_until_unescaped_loop(source, el, escaper, None)\n"
" |> result.map(pair.swap)\n"
" |> result.map_error(fn(_) { source })\n"
" }\n"
" }\n"
" \n"
" fn take_until_unescaped_loop(\n"
" from: String,\n"
" separator: String,\n"
" esc: String,\n"
" acc: Option(String),\n"
" ) -> Result(#(String, String), Nil) {\n"
" case string.split_once(from, on: separator) {\n"
" Ok(#(head, rest)) -> {\n"
" let value = case acc {\n"
" Some(str) -> str <> separator <> head\n"
" None -> head\n"
" }\n"
" case count_non_overlapping(in: value, of: esc) % 2 == 0 {\n"
" True -> Ok(#(value, rest))\n"
" False ->\n"
" take_until_unescaped_loop(\n"
" rest,\n"
" separator,\n"
" esc,\n"
" Some(value),\n"
" )\n"
" }\n"
" }\n"
" Error(Nil) -> Error(Nil)\n"
" }\n"
" }\n"
" ```\n"
" \n"
" For specifics on this function's behaviour, look at its' documentation.\n"
" \n"
).
-type step(DKT) :: {next, stream(DKT), DKT} | done.
-type stream(DKU) :: {stream, fun(() -> step(DKU))}.
-file("src\\aqueduct.gleam", 172).
?DOC(
" Create an empty `Stream` that always returns `Done` for the next step.\n"
" \n"
" Since it never returns a `Next` variant of the [`Step`](aqueduct.html#Step) type,\n"
" it does not have any type specialization, and can thus be used with any type of\n"
" specialized `Stream`.\n"
).
-spec empty() -> stream(any()).
empty() ->
{stream, fun() -> done end}.
-file("src\\aqueduct.gleam", 178).
?DOC(" Create a `Stream` with a singular element that it returns once, then finishes.\n").
-spec single(DKX) -> stream(DKX).
single(El) ->
{stream, fun() -> {next, empty(), El} end}.
-file("src\\aqueduct.gleam", 194).
?DOC(
" Create a `Stream` from a `List` of elements, which it will output in order,\n"
" after which it will be finished.\n"
" \n"
" ## Note\n"
" This splitting is done lazily - internally, the function encapsulated in the `Stream`\n"
" deconstructs the provided `List`.\n"
" \n"
" If the `List` is empty, it returs `Done` for the next step;\n"
" \n"
" If it has at least one element, it returns a `Next` Step, with the `Stream` being a\n"
" recursive call to this function with the tail of the `List`.\n"
).
-spec from_list(list(DKZ)) -> stream(DKZ).
from_list(From) ->
{stream, fun() -> case From of
[] ->
done;
[Head | Rest] ->
{next, from_list(Rest), Head}
end end}.
-file("src\\aqueduct.gleam", 216).
?DOC(
" Create a `Stream` from a provided iterating function and an initial value, which will\n"
" forever return the next element as obtained by calling the iterating function on\n"
" the current element.\n"
" \n"
" The initial value provided is the first element of the returned `Stream`.\n"
" \n"
" ## Note\n"
" The resulting `Stream` will never return `Done` - it will go on forever.\n"
" \n"
" Thus, if you try and traverse the entirety of this `Stream` (using functions such as\n"
" [`collect`](aqueduct.html#collect), [`foldl`](aqueduct.html#foldl), or\n"
" [`each`](aqueduct.html#each)), that function call will never terminate.\n"
).
-spec from_iterator(fun((DLC) -> DLC), DLC) -> stream(DLC).
from_iterator(Iter, Initial) ->
{stream, fun() -> {next, from_iterator(Iter, Iter(Initial)), Initial} end}.
-file("src\\aqueduct.gleam", 258).
?DOC(
" Create a `Stream` from a given `source`, as well as a function that takes `chunk`s out\n"
" of the source and returns it, diminished in some way (or not).\n"
" \n"
" The signature of the `chunk` argument is `fn(a) -> Result(#(a, b), b)`,\n"
" and the produced stream is `Stream(b)`.\n"
" \n"
" The `Result` type here is used to indicate wheter the `source` has run out of content or not.\n"
" - If the `chunk` function returns `Ok(#(a, b))`, it means that there is still some stuff left\n"
" in the `source`, so keep going;\n"
" - If the `chunk` function returns `Error(b)`, it means that the entirety of the `source`\n"
" has been consumed, and the `b` is the last element emitted.\n"
" \n"
" ## Use cases\n"
" Due to the type signature of this function, the `source` and output types can be different,\n"
" making it possible to directly transform the source somehow.\n"
" \n"
" In my case, I used this function to create a `Stream` that emits successive rows in a `String`\n"
" by splitting on `\\n`, as long as they're not inside of a cell wrapped in doublequotes.\n"
" \n"
" Or, if you have a producer of a byte stream, the chunk function can at the same time request\n"
" more, cut it off at appropriate points, and parse it.\n"
" \n"
" In another example, this could be used to implement a custom\n"
" [`from_iterator`](aqueduct.html#from_iterator) function by returning an iterated\n"
" value with each `chunk`.\n"
" \n"
" ## Note\n"
" It's important to mention that the `source` provided to this function cannot be extracted\n"
" from within the resulting `Stream`, without consuing the `Stream` in its' entirety and then\n"
" reversing the `chunk` function to collect it.\n"
" \n"
" Lastly, also keep in mind that blocking operations called from within the `chunk` function\n"
" will block the process from which the `Stream` was consumed to get the next value;\n"
" However, the `Stream` has no idea whether the provided function is blocking or not.\n"
" \n"
" As such, if you create a blocking `Stream`, it's up to you as the user to remember that\n"
" it is blocking.\n"
).
-spec from_divider(DLE, fun((DLE) -> {ok, {DLE, DLF}} | {error, DLF})) -> stream(DLF).
from_divider(Source, Chunk) ->
{stream, fun() -> case Chunk(Source) of
{ok, {Source@1, Value}} ->
{next, from_divider(Source@1, Chunk), Value};
{error, Last_value} ->
{next, empty(), Last_value}
end end}.
-file("src\\aqueduct.gleam", 272).
?DOC(" Create an infinite `Stream` that only ever emits this single value and never emits `Done`.\n").
-spec repeat(DLJ) -> stream(DLJ).
repeat(Element) ->
{stream, fun() -> {next, repeat(Element), Element} end}.
-file("src\\aqueduct.gleam", 586).
?DOC(
" Get the next value from the `Stream`.\n"
" \n"
" ## Note\n"
" If the `Stream` was constructed from a function, that function is called to produce that\n"
" value, so if that function never terminates, then this function will also never terminate.\n"
).
-spec next(stream(DOB)) -> step(DOB).
next(Stream) ->
{stream, Step} = Stream,
Step().
-file("src\\aqueduct.gleam", 295).
-spec repeat_list_loop(list(DLQ), stream(DLQ)) -> stream(DLQ).
repeat_list_loop(Repeat, Acc) ->
{stream, fun() -> case next(Acc) of
{next, Rest, Element} ->
{next, repeat_list_loop(Repeat, Rest), Element};
done ->
next(repeat_list_loop(Repeat, from_list(Repeat)))
end end}.
-file("src\\aqueduct.gleam", 288).
?DOC(
" Create an infinite `Stream` that only ever emits the input `List` in a loop.\n"
" \n"
" In most cases it's quivalent to [`repeat_stream`](aqueduct.html#repeat_stream), but with\n"
" the added constraint of the fact that a `List` cannot be infinite in length, so it's\n"
" guaranteed that the provided `List` will cycle.\n"
" \n"
" ## Note\n"
" This function returns a `Result` because it doesn't make sense to create an infinite `Stream`\n"
" by repeating elements from an empty `List`.\n"
" \n"
" So this function returns `Error(Nil)` if the provided `List` is length 0.\n"
).
-spec repeat_list(list(DLL)) -> {ok, stream(DLL)} | {error, nil}.
repeat_list(From) ->
case From of
[] ->
{error, nil};
Non_empty ->
{ok, repeat_list_loop(Non_empty, from_list(Non_empty))}
end.
-file("src\\aqueduct.gleam", 758).
?DOC(
" Add the provided element to the **start** of the Stream - it will be the next element shown.\n"
" \n"
" Useful when the `Stream` you made is not 'pure' - ie, it executes some side effect in order\n"
" to obtain the next element. In such a case, it's not guaranteed that after calling\n"
" [`next`](aqueduct.html#next) on such a `Stream`, and then returning to using the original\n"
" value would behave the same as expected.\n"
" \n"
" Thus, if you wish to write generic functions that operate on `Stream`s and wish to reverse a\n"
" `Step`, instead of reusing the old value, you should prefer to take the new `Stream` and the\n"
" value from the `Next` variant of a `Step` and use this function to prepend that value\n"
" to the new stream.\n"
).
-spec prepend(stream(DPD), DPD) -> stream(DPD).
prepend(Stream, Element) ->
{stream, fun() -> {next, Stream, Element} end}.
-file("src\\aqueduct.gleam", 328).
-spec repeat_stream_loop(stream(DLZ), stream(DLZ)) -> stream(DLZ).
repeat_stream_loop(Repeat, Acc) ->
{stream, fun() -> case next(Acc) of
{next, Stream, Value} ->
{next, repeat_stream_loop(Repeat, Stream), Value};
done ->
next(repeat_stream_loop(Repeat, Repeat))
end end}.
-file("src\\aqueduct.gleam", 318).
?DOC(
" Transform a finite `Stream` into an infinite one by making it loop forever.\n"
" \n"
" It can also be used with infinite `Stream`s, but since this function works by waiting until\n"
" the stream is finished then replacing the `Done` value with another instance of the input\n"
" `Stream`, using it on infinite `Stream`s is pointless, since they never return `Done`.\n"
" \n"
" ## Note\n"
" This function returns a `Result` because it's impossible to convert an empty `Stream` into\n"
" an infinite one, since there are no elements to infinitely repeat.\n"
" \n"
" If this check were not in place, passing in an empty `Stream` would create a `Stream`\n"
" that would never emit the next element, as trying to call the internal function would\n"
" start an infinite recursive loop.\n"
).
-spec repeat_stream(stream(DLU)) -> {ok, stream(DLU)} | {error, nil}.
repeat_stream(Stream) ->
case next(Stream) of
{next, New_stream, Element} ->
{ok, repeat_stream_loop(prepend(New_stream, Element), empty())};
done ->
{error, nil}
end.
-file("src\\aqueduct.gleam", 349).
-spec collect_loop(stream(DMG), list(DMG)) -> list(DMG).
collect_loop(Stream, Acc) ->
case next(Stream) of
{next, Rest, Element} ->
collect_loop(Rest, [Element | Acc]);
done ->
lists:reverse(Acc)
end.
-file("src\\aqueduct.gleam", 345).
?DOC(
" Consume the provided `Stream` and collect all of the values into a `List`.\n"
" \n"
" ## Note\n"
" As this function attempts to eagerly evaluate all of the elements until it encounters the\n"
" `Done` next step, if called on an infinite `Stream`, it will never terminate.\n"
).
-spec collect(stream(DMD)) -> list(DMD).
collect(Stream) ->
collect_loop(Stream, []).
-file("src\\aqueduct.gleam", 381).
-spec collect_count_loop(stream(DMN), integer(), list(DMN)) -> list(DMN).
collect_count_loop(Stream, Count, Acc) ->
case Count of
C when C =< 0 ->
lists:reverse(Acc);
Positive_count ->
case next(Stream) of
{next, Rest, Element} ->
collect_count_loop(
Rest,
Positive_count - 1,
[Element | Acc]
);
done ->
lists:reverse(Acc)
end
end.
-file("src\\aqueduct.gleam", 377).
?DOC(
" Consume the provided `Stream` and collect `count` number of values into a `List`.\n"
" \n"
" Since this function has a built in limit, as long as all of the elements in the `Stream`\n"
" can be evaluated and terminate, it will also terminate, even if the `Stream` is infinite.\n"
" \n"
" Basically, using this function protects you against the infinite length of the `Stream`,\n"
" but cannot protect you against potentially infinite requirements of the internal\n"
" function of the `Stream`.\n"
" \n"
" Such a `Stream` with infinite requirements of the internal function would be created\n"
" by something like this:\n"
" ```gleam\n"
" aqueduct.from_iterator(fn(num) { num + 2 }, 1)\n"
" |> aqueduct.filter(fn(num) { num % 2 == 0 })\n"
" ```\n"
" This creates a `Stream` that emits odd numbers, while filtering all non-even numbers out.\n"
" Trying to call [`next`](aqueduct.html#next) on this `Stream` would never terminate\n"
" (as the [`filter`](aqueduct.html#filter) function would recursively call itself to get\n"
" the next element ad infinitum), and since this function uses `next` internally,\n"
" it too would never terminate.\n"
).
-spec collect_count(stream(DMK), integer()) -> list(DMK).
collect_count(Stream, Count) ->
collect_count_loop(Stream, Count, []).
-file("src\\aqueduct.gleam", 421).
-spec collect_until_loop(stream(DMU), fun((DMU) -> boolean()), list(DMU)) -> list(DMU).
collect_until_loop(Stream, Stop, Acc) ->
case next(Stream) of
{next, Rest, Element} ->
case Stop(Element) of
true ->
lists:reverse(Acc);
false ->
collect_until_loop(Rest, Stop, [Element | Acc])
end;
done ->
lists:reverse(Acc)
end.
-file("src\\aqueduct.gleam", 417).
?DOC(
" Consume the provided `Stream` and collect elements into a `List` until the provided `stop`\n"
" function encounters an element for which it returns `True`, or the `Stream` ends.\n"
" \n"
" Since this function has a built in limit, as long as all of the elements in the `Stream`\n"
" can be evaluated and terminate and there exists an element for which the provided `stop`\n"
" function returns `True`, it will also terminate, even if the `Stream` is infinite.\n"
" \n"
" Basically, using this function protects you against the infinite length of the `Stream`,\n"
" but cannot protect you against potentially infinite requirements of the internal\n"
" function of the `Stream`.\n"
" \n"
" ## Example\n"
" Such a `Stream` with infinite requirements of the internal function would be created\n"
" by something like this:\n"
" ```gleam\n"
" aqueduct.from_iterator(fn(num) { num + 2 }, 1)\n"
" |> aqueduct.filter(fn(num) { num % 2 == 0 })\n"
" ```\n"
" This creates a `Stream` that emits odd numbers, while filtering all non-even numbers out.\n"
" Trying to call [`next`](aqueduct.html#next) on this `Stream` would never terminate\n"
" (as the [`filter`](aqueduct.html#filter) function would recursively call itself to get the\n"
" next element ad infinitum), and since this function uses `next` internally, it too\n"
" would never terminate.\n"
).
-spec collect_until(stream(DMR), fun((DMR) -> boolean())) -> list(DMR).
collect_until(Stream, Stop) ->
collect_until_loop(Stream, Stop, []).
-file("src\\aqueduct.gleam", 443).
?DOC(
" Consume all of the elements of a `Stream` and fold them into a single value,\n"
" using the provided function and initial accumulator.\n"
" \n"
" ## Note\n"
" Since this function tries to collect all of the elements of the input `Stream`,\n"
" if the `Stream` is infinite, then it will never terminate.\n"
).
-spec foldl(stream(DMY), fun((DMY, DNA) -> DNA), DNA) -> DNA.
foldl(Stream, Fun, Acc) ->
case next(Stream) of
{next, Rest, Element} ->
foldl(Rest, Fun, Fun(Element, Acc));
done ->
Acc
end.
-file("src\\aqueduct.gleam", 462).
?DOC(
" Consume `count` number of the elements of a `Stream` and fold them into a single value,\n"
" using the provided function and initial accumulator.\n"
" \n"
" This function returns a Pair:\n"
" - The `Stream` with `count` elements removed (if the `Stream` was shorter than `count`,\n"
" it is an [`empty`](aqueduct.html#empty) `Stream`)\n"
" - The folded value.\n"
" \n"
" ## Note\n"
" This function is safe to use with infinite `Stream`s, since it has a built in termination\n"
" point - it takes only `count` elements, and returns the folded result of those elements.\n"
).
-spec foldl_count(stream(DNB), integer(), fun((DNB, DND) -> DND), DND) -> {stream(DNB),
DND}.
foldl_count(Stream, Count, Fun, Acc) ->
case Count of
C when C =< 0 ->
{Stream, Acc};
Positive_count ->
case next(Stream) of
{next, Rest, Element} ->
foldl_count(
Rest,
Positive_count - 1,
Fun,
Fun(Element, Acc)
);
done ->
{empty(), Acc}
end
end.
-file("src\\aqueduct.gleam", 498).
?DOC(
" Consume all of the elements of a `Stream` and join them into a single value,\n"
" using the provided function.\n"
" \n"
" If the stream is empty, return `Error(Nil)`, and if there's only a single value,\n"
" return that. Only if there are two or more elements is the function called.\n"
" \n"
" ## Use case\n"
" I made this function to imitate the output of the `string.join` function, but since unlike\n"
" `String`s, this function works for an arbitrary element, I can't just return an empty\n"
" string like `string.join` does.\n"
" \n"
" So, under the hood, this function just gets the next step of the `Stream` once, and then\n"
" calls [`foldl`](aqueduct.html#foldl), with the initial accumulator being the first element\n"
" of the `Stream`.\n"
" \n"
" ## Note\n"
" Since this function tries to collect all of the elements of the input `Stream`,\n"
" if the `Stream` is infinite, then it will never terminate.\n"
).
-spec join(stream(DNF), fun((DNF, DNF) -> DNF)) -> {ok, DNF} | {error, nil}.
join(Stream, Fun) ->
case next(Stream) of
{next, Rest, Element} ->
{ok, foldl(Rest, Fun, Element)};
done ->
{error, nil}
end.
-file("src\\aqueduct.gleam", 521).
-spec take_count_loop(stream(DNN), integer(), list(DNN)) -> {stream(DNN),
list(DNN)}.
take_count_loop(Stream, Count, Acc) ->
case Count of
C when C =< 0 ->
{Stream, lists:reverse(Acc)};
Positive_count ->
case next(Stream) of
{next, Rest, Element} ->
take_count_loop(Rest, Positive_count - 1, [Element | Acc]);
done ->
{empty(), lists:reverse(Acc)}
end
end.
-file("src\\aqueduct.gleam", 517).
?DOC(
" Collect `count` number of elements from the `Stream` into a `List`, and return both the\n"
" `List` and the `Stream` without the collected elements.\n"
" \n"
" Use this function over [`collect_count`](aqueduct.html#collect_count) if you don't\n"
" want to discard the `Stream` after taking the specified number of elements.\n"
" \n"
" ## Note\n"
" This is done by recursively traversing the `Stream` output while decrementing `count`\n"
" until it reaches 0.\n"
" \n"
" As such, if trying to call [`next`](aqueduct.html#next) doesn't terminate, neither will this function.\n"
).
-spec take_count(stream(DNJ), integer()) -> {stream(DNJ), list(DNJ)}.
take_count(Stream, Count) ->
take_count_loop(Stream, Count, []).
-file("src\\aqueduct.gleam", 565).
-spec take_until_loop(stream(DNW), fun((DNW) -> boolean()), list(DNW)) -> {stream(DNW),
list(DNW)}.
take_until_loop(Stream, Stop, Acc) ->
case next(Stream) of
{next, Rest, Element} ->
case Stop(Element) of
true ->
{prepend(Rest, Element), lists:reverse(Acc)};
false ->
take_until_loop(Rest, Stop, [Element | Acc])
end;
done ->
{empty(), lists:reverse(Acc)}
end.
-file("src\\aqueduct.gleam", 558).
?DOC(
" Collect the values inside of the `Stream` into the List until an element evaluates\n"
" `True` when passed into the `stop` argument.\n"
" \n"
" When an element evaluates `True`, the function ends, and returns the List containing all\n"
" of the previous elements **without** that one, and the `Stream` which **does**\n"
" contain that element.\n"
" \n"
" ## Note\n"
" This is done by recursively traversing the `Stream` output until we encounter an element\n"
" that evaluates to `True` when passed to the `stop` function, and then prepending\n"
" that element to the `Stream`.\n"
" \n"
" As such, if your `Stream` is created from a function that executes some side-effect to\n"
" obtain the next value, if you use this function, the very next iteration of this `Stream`\n"
" will not execute that operation, since it has the output of that function stored inside of it.\n"
" \n"
" The simplest example is if you had a `Stream` that returned the system time whenever you\n"
" called it. Then, if you for some reason used this function, the very next element you'd\n"
" get from this `Stream` would be the time in the past, when the function was evaluated\n"
" inside of this function.\n"
).
-spec take_until(stream(DNS), fun((DNS) -> boolean())) -> {stream(DNS),
list(DNS)}.
take_until(Stream, Stop) ->
take_until_loop(Stream, Stop, []).
-file("src\\aqueduct.gleam", 598).
?DOC(
" Drop `count` elements from the beginning of the `Stream`.\n"
" \n"
" If there are no elements left (the [`next`](aqueduct.html#next) function returned `Done`),\n"
" an empty `Stream` is returned.\n"
).
-spec drop(stream(DOE), integer()) -> stream(DOE).
drop(Stream, Count) ->
case Count of
C when C =< 0 ->
Stream;
Positive_count ->
case next(Stream) of
{next, Rest, _} ->
drop(Rest, Positive_count - 1);
done ->
empty()
end
end.
-file("src\\aqueduct.gleam", 621).
?DOC(
" Transform the provided `Stream` using the given function.\n"
" \n"
" This is done lazily - a new `Stream` is constructed, whose internal function just gets the\n"
" next [`Step`](aqueduct.html#Step) from the old `Stream`, transforms the emitted element using\n"
" the provided function, and constructs a new `Stream` by calling itself on the one that was\n"
" returned inside of the [`Step`](aqueduct.html#Step) type.\n"
" \n"
" As such, it is safe to use with infinite `Stream`s, since each step from the input `Stream`\n"
" that is called (importantly **that terminated** already) is guaranteed to return in finite\n"
" time... as long as the function provided takes finite time.\n"
).
-spec map(stream(DOH), fun((DOH) -> DOJ)) -> stream(DOJ).
map(Stream, Fun) ->
{stream, fun() -> case next(Stream) of
{next, Rest, Element} ->
{next, map(Rest, Fun), Fun(Element)};
done ->
done
end end}.
-file("src\\aqueduct.gleam", 640).
?DOC(
" Combines two `Stream`s into another using the given function.\n"
" \n"
" This is done lazily - a new `Stream` is constructed, whose internal function just gets the\n"
" next [`Step`](aqueduct.html#Step) from the old `Stream`s, and if both return a `Next` variant,\n"
" return a `Next` step by calling itself recursively on the new streams and call the function\n"
" on the two values.\n"
" \n"
" Thus, if either of the `Stream`s end, the resulting `Stream` ends - in short, the length\n"
" of the resulting `Stream` is the minimum of the length of the two input `Stream`s.\n"
).
-spec map2(stream(DOL), stream(DON), fun((DOL, DON) -> DOP)) -> stream(DOP).
map2(First, Second, Fun) ->
{stream, fun() -> case {next(First), next(Second)} of
{{next, Rest_first, Element_first},
{next, Rest_second, Element_second}} ->
{next,
map2(Rest_first, Rest_second, Fun),
Fun(Element_first, Element_second)};
{_, _} ->
done
end end}.
-file("src\\aqueduct.gleam", 674).
?DOC(
" Transform a `Stream` to only retain values that match the predicate.\n"
" \n"
" ## Note\n"
" This function works by creating a new `Stream`, which for every element that is requested,\n"
" requests an element from the provided `Stream`.\n"
" \n"
" If the returned element passes the function, then it stops there and returns itself;\n"
" \n"
" If it does not, then it recursively calls itself until the element passes.\n"
" \n"
" As such, if a `Stream` created using this function were to be based on an infinite `Stream`\n"
" and the values that pass were extremely rare, then calling [`next`](aqueduct.html#next)\n"
" on such a `Stream` would take a long time.\n"
" \n"
" Furthermore, if no elements in the input infinite `Stream` were to pass the predicate,\n"
" then calling [`next`](aqueduct.html#next) on such a `Stream` will never return.\n"
).
-spec filter(stream(DOR), fun((DOR) -> boolean())) -> stream(DOR).
filter(Stream, Predicate) ->
{stream, fun() -> case next(Stream) of
{next, Rest, Element} ->
case Predicate(Element) of
true ->
{next, filter(Rest, Predicate), Element};
false ->
next(filter(Rest, Predicate))
end;
done ->
done
end end}.
-file("src\\aqueduct.gleam", 710).
?DOC(
" Transform a `Stream` to only retain values that are returned in an `Ok` variant\n"
" of the `Result` type.\n"
" \n"
" Basically equivalent to the composition of [`filter`](aqueduct.html#filter) and\n"
" [`map`](aqueduct.html#map), just a bit more optimized.\n"
" \n"
" ## Note\n"
" This function works by creating a new `Stream`, which for every element that is\n"
" requested, requests an element from the provided `Stream`.\n"
" \n"
" If the returned element passes the function, then it stops there and returns\n"
" that transformed element and calls itself on the returned `Stream`;\n"
" \n"
" If it does not, then it recursively calls itself until the element passes.\n"
" \n"
" As such, if a `Stream` created using this function were to be based on an infinite\n"
" `Stream` and the values that pass were extremely rare, then calling\n"
" [`next`](aqueduct.html#next) on such a `Stream` would take a long time.\n"
" \n"
" Furthermore, if no elements in the input infinite `Stream` were to pass the predicate,\n"
" then calling [`next`](aqueduct.html#next) on such a `Stream` will never return.\n"
).
-spec filter_map(stream(DOU), fun((DOU) -> {ok, DOW} | {error, any()})) -> stream(DOW).
filter_map(Stream, Predicate) ->
{stream, fun() -> case next(Stream) of
{next, Rest, Element} ->
case Predicate(Element) of
{ok, New_element} ->
{next, filter_map(Rest, Predicate), New_element};
{error, _} ->
next(filter_map(Rest, Predicate))
end;
done ->
done
end end}.
-file("src\\aqueduct.gleam", 736).
?DOC(
" Consume the provided `Stream` and execute a function on each element, eagerly consuming them.\n"
" \n"
" Use to execute side-effects based on the values in the `Stream` using functions that cannot\n"
" fail, when you don't care about the consumed values on their own afterwards.\n"
" \n"
" ## Note\n"
" If the provided `Stream` is infinite, this function will never return, and constantly\n"
" execute side effects on successive elements emitted by the `Stream`.\n"
).
-spec each(stream(DPB), fun((DPB) -> nil)) -> nil.
each(Stream, Fun) ->
case next(Stream) of
{next, Rest, Element} ->
Fun(Element),
each(Rest, Fun);
done ->
nil
end.
-file("src\\aqueduct.gleam", 767).
?DOC(
" If the provided element is `Some`, add it to the **start** of the Stream - it will be\n"
" the next element shown.\n"
" \n"
" Otherwise, return the `Stream` unchanged.\n"
).
-spec maybe_prepend(stream(DPG), gleam@option:option(DPG)) -> stream(DPG).
maybe_prepend(Stream, Maybe_element) ->
case Maybe_element of
{some, El} ->
prepend(Stream, El);
none ->
Stream
end.
-file("src\\aqueduct.gleam", 783).
?DOC(
" Add the provided element to the **end** of the Stream - it will be the last element shown.\n"
" \n"
" ## Note\n"
" If the stream is infinite, this element will never be returned, since this function only\n"
" waits until the stream returns `Done`, and then replaces that `Done`\n"
" with `Next(empty(), element)`.\n"
" \n"
" So if the `Stream` never returns `Done`, that element will never be returned.\n"
).
-spec append(stream(DPK), DPK) -> stream(DPK).
append(Stream, Element) ->
{stream, fun() -> case next(Stream) of
{next, Rest, Element@1} ->
{next, append(Rest, Element@1), Element@1};
done ->
{next, empty(), Element}
end end}.
-file("src\\aqueduct.gleam", 804).
?DOC(
" If the provided element is `Some`, it to the **end** of the Stream - it will be the\n"
" last element shown.\n"
" \n"
" Otherwise, return the `Stream` unchanged.\n"
" \n"
" ## Note\n"
" If the stream is infinite, this element will never be returned, since this function only\n"
" waits until the stream returns `Done`, and then replaces that `Done` with\n"
" `Next(empty(), element)`.\n"
" \n"
" So if the `Stream` never returns done, that element will never be returned.\n"
).
-spec maybe_append(stream(DPN), gleam@option:option(DPN)) -> stream(DPN).
maybe_append(Stream, Maybe_element) ->
case Maybe_element of
{some, Element} ->
append(Stream, Element);
none ->
Stream
end.
-file("src\\aqueduct.gleam", 816).
?DOC(
" Concatonate two `Stream`s that emit the same type of element together.\n"
" \n"
" ## Note\n"
" If the first `Stream` is infinite, then the second `Stream` will never appear.\n"
).
-spec concat(stream(DPR), stream(DPR)) -> stream(DPR).
concat(First, Second) ->
{stream, fun() -> case next(First) of
{next, Rest, Element} ->
{next, concat(Rest, Second), Element};
done ->
next(Second)
end end}.
-file("src\\aqueduct.gleam", 830).
?DOC(
" Helper function to both *prepend* and *append* the provided element to the `Stream`.\n"
" \n"
" It is basically equivalent to first calling the [`append`](aqueduct.html#append) function,\n"
" then the [`prepend`](aqueduct.html#prepend) function.\n"
).
-spec wrap(stream(DPV), DPV) -> stream(DPV).
wrap(Stream, Element) ->
{stream, fun() -> {next, append(Stream, Element), Element} end}.
-file("src\\aqueduct.gleam", 843).
?DOC(
" Helper function to both *prepend* and *append* the provided `Option(element)` to the `Stream`.\n"
" \n"
" If the provided element is `None`, the stream is returned unmodified.\n"
" \n"
" If it's `Some`, it just calls [`wrap`](aqueduct.html#wrap).\n"
" \n"
" It is basically equivalent to first calling the [`maybe_append`](aqueduct.html#maybe_append)\n"
" function, then the [`maybe_prepend`](aqueduct.html#maybe_prepend) function.\n"
).
-spec maybe_wrap(stream(DPY), gleam@option:option(DPY)) -> stream(DPY).
maybe_wrap(Stream, Maybe_element) ->
case Maybe_element of
{some, Element} ->
wrap(Stream, Element);
none ->
Stream
end.