//// Main module of the `aqueduct` library. Contains all of the type definitions and public
//// functions for operating on them.
////
//// The primary use cases of this library are creating [`Stream`s](aqueduct.html#Stream)
//// that emit successive values by executing some side effect (such as reading a file,
//// consuming a message, awaiting user input, etc), and wrapping them in successive
//// operations through [`map`](aqueduct.html#map), [`filter`](aqueduct.html#filter),
//// [`filter_map`](aqueduct.html#filter_map), which are only executed whenever the internal
//// `step` function of a `Stream` is called.
////
//// It is extremely similar to the [`gleam_yielder`](https://gleam-yielder.hexdocs.pm) library,
//// but the one advantage this library offers over the `yielder`, is that the fundamental data
//// types provided ([`Stream`](aqueduct.html#Stream) and [`Step`](aqueduct.html#Step)) are not
//// `opaque`, and thus the user can freely extend the provided functionality by simply
//// implementing their own functions that operate on `Stream`s.
////
//// ## Introduction
//// To start, you first need to create a [`Stream`](aqueduct.html#Stream):
////
//// ```gleam
//// aqueduct.single("element")
//// ```
//// And then do something with it:
//// ```gleam
//// assert aqueduct.next(stream) == Next(Stream(String), "element")
//// ```
////
//// You can also transform the values:
//// ```gleam
//// stream
//// |> aqueduct.map(int.parse)
//// |> aqueduct.collect()
//// // -> [Error(Nil)]
//// ```
//// and filter them:
//// ```gleam
//// stream
//// |> aqueduct.filter_map(int.parse)
//// |> aqueduct.collect()
//// // -> []
//// ```
//// > Note: Take care when filtering infinite `Stream`s, as the `Stream` produced from filtering
//// traverses the original `Stream` element by element, trying to find the next one that passes
//// the specified predicate. If *none* of the elements in an infinite `Stream` pass, trying to
//// get the next element will never terminate.
////
//// ### Cutting apart a given input
//// For the feature I'm proudest of in this package, take a
//// look at [`from_divider`](aqueduct.html#from_divider).
////
//// It takes in a `source` and a function that cuts that source into smaller pieces that are
//// then emitted as elements by the resulting `Stream`.
////
//// In [`mesv`](https://github.com/Octaeon/mesv), I used this function to create a
//// `Stream(String)` of rows in a file by splitting a `String` on newlines that were not
//// somewhere between two 'escapers'.
////
//// Specifically, the code of the function was thus:
//// ```gleam
//// aqueduct.from_divider(
//// source,
//// util.take_until_unescaped(row_separator, not_in: escaper),
//// )
//// ```
//// And the code of the function used to split the source was:
//// ```gleam
//// // util.gleam
//// pub fn take_until_unescaped(
//// separator el: String,
//// not_in escaper: String,
//// ) -> fn(String) -> Result(#(String, String), String) {
//// fn(source: String) {
//// take_until_unescaped_loop(source, el, escaper, None)
//// |> result.map(pair.swap)
//// |> result.map_error(fn(_) { source })
//// }
//// }
////
//// fn take_until_unescaped_loop(
//// from: String,
//// separator: String,
//// esc: String,
//// acc: Option(String),
//// ) -> Result(#(String, String), Nil) {
//// case string.split_once(from, on: separator) {
//// Ok(#(head, rest)) -> {
//// let value = case acc {
//// Some(str) -> str <> separator <> head
//// None -> head
//// }
//// case count_non_overlapping(in: value, of: esc) % 2 == 0 {
//// True -> Ok(#(value, rest))
//// False ->
//// take_until_unescaped_loop(
//// rest,
//// separator,
//// esc,
//// Some(value),
//// )
//// }
//// }
//// Error(Nil) -> Error(Nil)
//// }
//// }
//// ```
////
//// For specifics on this function's behaviour, look at its' documentation.
////
import gleam/list
import gleam/option.{type Option, None, Some}
// ==== Public Types ====
/// Result of calling [`next`](aqueduct.html#next) on a `Stream`.
///
/// It either returns an element and the updated `Stream`, or `Done`,
/// indicating the `Stream` is finished.
///
/// It's generally emitted by the [`next`](aqueduct.html#next) function or by simply calling
/// the internal `step` function after deconstructing a `Stream`, but there's nothing stopping
/// you from creating it yourself.
///
pub type Step(a) {
/// This `Stream` emitted the below element, and returned the successive `Stream`.
///
Next(Stream(a), a)
/// This `Stream` has terminated.
///
Done
}
/// Represents a lazily evaluated `Stream` of some type of data.
///
/// It does not care specifically *how* the next element is obtained - it merely encapsulates
/// a function that when called, produces a [`Step`](aqueduct.html#Step).
/// What this function does internally depends on how the `Stream` was defined.
///
/// Furthermore, a `Stream` can be finite or infinite, and currently there's no good way to
/// check whether a given `Stream` terminates.
///
/// To create it, either use one of the provided constructors:
/// - [`empty`](aqueduct.html#empty)
/// - [`single`](aqueduct.html#single)
/// - [`from_list`](aqueduct.html#from_list)
/// - [`from_iterator`](aqueduct.html#from_iterator)
/// - [`from_divider`](aqueduct.html#from_divider)
/// - [`repeat`](aqueduct.html#repeat)
/// - [`repeat_list`](aqueduct.html#repeat_list)
///
/// Or simply build it by yourself.
///
pub type Stream(a) {
/// Create a `Stream` from scratch by passing in a function that returns a `Step`.
///
/// Most useful to combine with recursive functions that call themselves to
/// construct the next `Stream`.
///
Stream(fn() -> Step(a))
}
// ==== Public API ====
// => Constructors
/// Create an empty `Stream` that always returns `Done` for the next step.
///
/// Since it never returns a `Next` variant of the [`Step`](aqueduct.html#Step) type,
/// it does not have any type specialization, and can thus be used with any type of
/// specialized `Stream`.
///
pub fn empty() -> Stream(a) {
Stream(fn() { Done })
}
/// Create a `Stream` with a singular element that it returns once, then finishes.
///
pub fn single(el: a) -> Stream(a) {
Stream(fn() { Next(empty(), el) })
}
/// Create a `Stream` from a `List` of elements, which it will output in order,
/// after which it will be finished.
///
/// ## Note
/// This splitting is done lazily - internally, the function encapsulated in the `Stream`
/// deconstructs the provided `List`.
///
/// If the `List` is empty, it returs `Done` for the next step;
///
/// If it has at least one element, it returns a `Next` Step, with the `Stream` being a
/// recursive call to this function with the tail of the `List`.
///
pub fn from_list(from: List(a)) -> Stream(a) {
Stream(fn() {
case from {
[] -> Done
[head, ..rest] -> Next(from_list(rest), head)
}
})
}
/// Create a `Stream` from a provided iterating function and an initial value, which will
/// forever return the next element as obtained by calling the iterating function on
/// the current element.
///
/// The initial value provided is the first element of the returned `Stream`.
///
/// ## Note
/// The resulting `Stream` will never return `Done` - it will go on forever.
///
/// Thus, if you try and traverse the entirety of this `Stream` (using functions such as
/// [`collect`](aqueduct.html#collect), [`foldl`](aqueduct.html#foldl), or
/// [`each`](aqueduct.html#each)), that function call will never terminate.
///
pub fn from_iterator(iter: fn(a) -> a, initial: a) -> Stream(a) {
Stream(fn() { Next(from_iterator(iter, iter(initial)), initial) })
}
/// Create a `Stream` from a given `source`, as well as a function that takes `chunk`s out
/// of the source and returns it, diminished in some way (or not).
///
/// The signature of the `chunk` argument is `fn(a) -> Result(#(a, b), b)`,
/// and the produced stream is `Stream(b)`.
///
/// The `Result` type here is used to indicate wheter the `source` has run out of content or not.
/// - If the `chunk` function returns `Ok(#(a, b))`, it means that there is still some stuff left
/// in the `source`, so keep going;
/// - If the `chunk` function returns `Error(b)`, it means that the entirety of the `source`
/// has been consumed, and the `b` is the last element emitted.
///
/// ## Use cases
/// Due to the type signature of this function, the `source` and output types can be different,
/// making it possible to directly transform the source somehow.
///
/// In my case, I used this function to create a `Stream` that emits successive rows in a `String`
/// by splitting on `\n`, as long as they're not inside of a cell wrapped in doublequotes.
///
/// Or, if you have a producer of a byte stream, the chunk function can at the same time request
/// more, cut it off at appropriate points, and parse it.
///
/// In another example, this could be used to implement a custom
/// [`from_iterator`](aqueduct.html#from_iterator) function by returning an iterated
/// value with each `chunk`.
///
/// ## Note
/// It's important to mention that the `source` provided to this function cannot be extracted
/// from within the resulting `Stream`, without consuing the `Stream` in its' entirety and then
/// reversing the `chunk` function to collect it.
///
/// Lastly, also keep in mind that blocking operations called from within the `chunk` function
/// will block the process from which the `Stream` was consumed to get the next value;
/// However, the `Stream` has no idea whether the provided function is blocking or not.
///
/// As such, if you create a blocking `Stream`, it's up to you as the user to remember that
/// it is blocking.
///
pub fn from_divider(
source: a,
chunk: fn(a) -> Result(#(a, b), b),
) -> Stream(b) {
Stream(fn() {
case chunk(source) {
Ok(#(source, value)) -> Next(from_divider(source, chunk), value)
Error(last_value) -> Next(empty(), last_value)
}
})
}
/// Create an infinite `Stream` that only ever emits this single value and never emits `Done`.
///
pub fn repeat(element: a) -> Stream(a) {
Stream(fn() { Next(repeat(element), element) })
}
/// Create an infinite `Stream` that only ever emits the input `List` in a loop.
///
/// In most cases it's quivalent to [`repeat_stream`](aqueduct.html#repeat_stream), but with
/// the added constraint of the fact that a `List` cannot be infinite in length, so it's
/// guaranteed that the provided `List` will cycle.
///
/// ## Note
/// This function returns a `Result` because it doesn't make sense to create an infinite `Stream`
/// by repeating elements from an empty `List`.
///
/// So this function returns `Error(Nil)` if the provided `List` is length 0.
///
pub fn repeat_list(from: List(a)) -> Result(Stream(a), Nil) {
case from {
[] -> Error(Nil)
non_empty -> Ok(repeat_list_loop(non_empty, from_list(non_empty)))
}
}
fn repeat_list_loop(repeat: List(a), acc: Stream(a)) -> Stream(a) {
Stream(fn() {
case next(acc) {
Next(rest, element) -> Next(repeat_list_loop(repeat, rest), element)
Done -> next(repeat_list_loop(repeat, from_list(repeat)))
}
})
}
/// Transform a finite `Stream` into an infinite one by making it loop forever.
///
/// It can also be used with infinite `Stream`s, but since this function works by waiting until
/// the stream is finished then replacing the `Done` value with another instance of the input
/// `Stream`, using it on infinite `Stream`s is pointless, since they never return `Done`.
///
/// ## Note
/// This function returns a `Result` because it's impossible to convert an empty `Stream` into
/// an infinite one, since there are no elements to infinitely repeat.
///
/// If this check were not in place, passing in an empty `Stream` would create a `Stream`
/// that would never emit the next element, as trying to call the internal function would
/// start an infinite recursive loop.
///
pub fn repeat_stream(stream: Stream(a)) -> Result(Stream(a), Nil) {
case next(stream) {
Next(new_stream, element) ->
Ok(repeat_stream_loop(prepend(new_stream, element), empty()))
// If the input stream returns Done, then trying to call `repeat_loop` would create a `Stream`
// that can never return the next value
Done -> Error(Nil)
}
}
fn repeat_stream_loop(repeat: Stream(a), acc: Stream(a)) -> Stream(a) {
Stream(fn() {
case next(acc) {
Next(stream, value) -> Next(repeat_stream_loop(repeat, stream), value)
Done -> next(repeat_stream_loop(repeat, repeat))
}
})
}
// => Destructors (Getters)
/// Consume the provided `Stream` and collect all of the values into a `List`.
///
/// ## Note
/// As this function attempts to eagerly evaluate all of the elements until it encounters the
/// `Done` next step, if called on an infinite `Stream`, it will never terminate.
///
pub fn collect(stream: Stream(a)) -> List(a) {
collect_loop(stream, [])
}
fn collect_loop(stream: Stream(a), acc: List(a)) -> List(a) {
case next(stream) {
Next(rest, element) -> collect_loop(rest, [element, ..acc])
Done -> list.reverse(acc)
}
}
/// Consume the provided `Stream` and collect `count` number of values into a `List`.
///
/// Since this function has a built in limit, as long as all of the elements in the `Stream`
/// can be evaluated and terminate, it will also terminate, even if the `Stream` is infinite.
///
/// Basically, using this function protects you against the infinite length of the `Stream`,
/// but cannot protect you against potentially infinite requirements of the internal
/// function of the `Stream`.
///
/// Such a `Stream` with infinite requirements of the internal function would be created
/// by something like this:
/// ```gleam
/// aqueduct.from_iterator(fn(num) { num + 2 }, 1)
/// |> aqueduct.filter(fn(num) { num % 2 == 0 })
/// ```
/// This creates a `Stream` that emits odd numbers, while filtering all non-even numbers out.
/// Trying to call [`next`](aqueduct.html#next) on this `Stream` would never terminate
/// (as the [`filter`](aqueduct.html#filter) function would recursively call itself to get
/// the next element ad infinitum), and since this function uses `next` internally,
/// it too would never terminate.
///
pub fn collect_count(stream: Stream(a), count: Int) -> List(a) {
collect_count_loop(stream, count, [])
}
fn collect_count_loop(stream: Stream(a), count: Int, acc: List(a)) -> List(a) {
case count {
c if c <= 0 -> list.reverse(acc)
positive_count ->
case next(stream) {
Next(rest, element) ->
collect_count_loop(rest, positive_count - 1, [element, ..acc])
Done -> list.reverse(acc)
}
}
}
/// Consume the provided `Stream` and collect elements into a `List` until the provided `stop`
/// function encounters an element for which it returns `True`, or the `Stream` ends.
///
/// Since this function has a built in limit, as long as all of the elements in the `Stream`
/// can be evaluated and terminate and there exists an element for which the provided `stop`
/// function returns `True`, it will also terminate, even if the `Stream` is infinite.
///
/// Basically, using this function protects you against the infinite length of the `Stream`,
/// but cannot protect you against potentially infinite requirements of the internal
/// function of the `Stream`.
///
/// ## Example
/// Such a `Stream` with infinite requirements of the internal function would be created
/// by something like this:
/// ```gleam
/// aqueduct.from_iterator(fn(num) { num + 2 }, 1)
/// |> aqueduct.filter(fn(num) { num % 2 == 0 })
/// ```
/// This creates a `Stream` that emits odd numbers, while filtering all non-even numbers out.
/// Trying to call [`next`](aqueduct.html#next) on this `Stream` would never terminate
/// (as the [`filter`](aqueduct.html#filter) function would recursively call itself to get the
/// next element ad infinitum), and since this function uses `next` internally, it too
/// would never terminate.
///
pub fn collect_until(stream: Stream(a), stop: fn(a) -> Bool) -> List(a) {
collect_until_loop(stream, stop, [])
}
fn collect_until_loop(
stream: Stream(a),
stop: fn(a) -> Bool,
acc: List(a),
) -> List(a) {
case next(stream) {
Next(rest, element) ->
case stop(element) {
True -> list.reverse(acc)
False -> collect_until_loop(rest, stop, [element, ..acc])
}
Done -> list.reverse(acc)
}
}
/// Consume all of the elements of a `Stream` and fold them into a single value,
/// using the provided function and initial accumulator.
///
/// ## Note
/// Since this function tries to collect all of the elements of the input `Stream`,
/// if the `Stream` is infinite, then it will never terminate.
///
pub fn foldl(stream: Stream(a), fun: fn(a, b) -> b, acc: b) -> b {
case next(stream) {
Next(rest, element) -> foldl(rest, fun, fun(element, acc))
Done -> acc
}
}
/// Consume `count` number of the elements of a `Stream` and fold them into a single value,
/// using the provided function and initial accumulator.
///
/// This function returns a Pair:
/// - The `Stream` with `count` elements removed (if the `Stream` was shorter than `count`,
/// it is an [`empty`](aqueduct.html#empty) `Stream`)
/// - The folded value.
///
/// ## Note
/// This function is safe to use with infinite `Stream`s, since it has a built in termination
/// point - it takes only `count` elements, and returns the folded result of those elements.
///
pub fn foldl_count(
stream: Stream(a),
count: Int,
fun: fn(a, b) -> b,
acc: b,
) -> #(Stream(a), b) {
case count {
c if c <= 0 -> #(stream, acc)
positive_count ->
case next(stream) {
Next(rest, element) ->
foldl_count(rest, positive_count - 1, fun, fun(element, acc))
Done -> #(empty(), acc)
}
}
}
/// Consume all of the elements of a `Stream` and join them into a single value,
/// using the provided function.
///
/// If the stream is empty, return `Error(Nil)`, and if there's only a single value,
/// return that. Only if there are two or more elements is the function called.
///
/// ## Use case
/// I made this function to imitate the output of the `string.join` function, but since unlike
/// `String`s, this function works for an arbitrary element, I can't just return an empty
/// string like `string.join` does.
///
/// So, under the hood, this function just gets the next step of the `Stream` once, and then
/// calls [`foldl`](aqueduct.html#foldl), with the initial accumulator being the first element
/// of the `Stream`.
///
/// ## Note
/// Since this function tries to collect all of the elements of the input `Stream`,
/// if the `Stream` is infinite, then it will never terminate.
///
pub fn join(stream: Stream(a), fun: fn(a, a) -> a) -> Result(a, Nil) {
case next(stream) {
Next(rest, element) -> Ok(foldl(rest, fun, element))
Done -> Error(Nil)
}
}
/// Collect `count` number of elements from the `Stream` into a `List`, and return both the
/// `List` and the `Stream` without the collected elements.
///
/// Use this function over [`collect_count`](aqueduct.html#collect_count) if you don't
/// want to discard the `Stream` after taking the specified number of elements.
///
/// ## Note
/// This is done by recursively traversing the `Stream` output while decrementing `count`
/// until it reaches 0.
///
/// As such, if trying to call [`next`](aqueduct.html#next) doesn't terminate, neither will this function.
///
pub fn take_count(stream: Stream(a), count: Int) -> #(Stream(a), List(a)) {
take_count_loop(stream, count, [])
}
fn take_count_loop(
stream: Stream(a),
count: Int,
acc: List(a),
) -> #(Stream(a), List(a)) {
case count {
c if c <= 0 -> #(stream, list.reverse(acc))
positive_count ->
case next(stream) {
Next(rest, element) ->
take_count_loop(rest, positive_count - 1, [element, ..acc])
Done -> #(empty(), list.reverse(acc))
}
}
}
/// Collect the values inside of the `Stream` into the List until an element evaluates
/// `True` when passed into the `stop` argument.
///
/// When an element evaluates `True`, the function ends, and returns the List containing all
/// of the previous elements **without** that one, and the `Stream` which **does**
/// contain that element.
///
/// ## Note
/// This is done by recursively traversing the `Stream` output until we encounter an element
/// that evaluates to `True` when passed to the `stop` function, and then prepending
/// that element to the `Stream`.
///
/// As such, if your `Stream` is created from a function that executes some side-effect to
/// obtain the next value, if you use this function, the very next iteration of this `Stream`
/// will not execute that operation, since it has the output of that function stored inside of it.
///
/// The simplest example is if you had a `Stream` that returned the system time whenever you
/// called it. Then, if you for some reason used this function, the very next element you'd
/// get from this `Stream` would be the time in the past, when the function was evaluated
/// inside of this function.
///
pub fn take_until(
stream: Stream(a),
stop: fn(a) -> Bool,
) -> #(Stream(a), List(a)) {
take_until_loop(stream, stop, [])
}
fn take_until_loop(
stream: Stream(a),
stop: fn(a) -> Bool,
acc: List(a),
) -> #(Stream(a), List(a)) {
case next(stream) {
Next(rest, element) ->
case stop(element) {
True -> #(prepend(rest, element), list.reverse(acc))
False -> take_until_loop(rest, stop, [element, ..acc])
}
Done -> #(empty(), list.reverse(acc))
}
}
/// Get the next value from the `Stream`.
///
/// ## Note
/// If the `Stream` was constructed from a function, that function is called to produce that
/// value, so if that function never terminates, then this function will also never terminate.
///
pub fn next(stream: Stream(a)) -> Step(a) {
let Stream(step) = stream
step()
}
// => Transformations
/// Drop `count` elements from the beginning of the `Stream`.
///
/// If there are no elements left (the [`next`](aqueduct.html#next) function returned `Done`),
/// an empty `Stream` is returned.
///
pub fn drop(stream: Stream(a), count: Int) -> Stream(a) {
case count {
c if c <= 0 -> stream
positive_count -> {
case next(stream) {
Next(rest, _) -> drop(rest, positive_count - 1)
Done -> empty()
}
}
}
}
/// Transform the provided `Stream` using the given function.
///
/// This is done lazily - a new `Stream` is constructed, whose internal function just gets the
/// next [`Step`](aqueduct.html#Step) from the old `Stream`, transforms the emitted element using
/// the provided function, and constructs a new `Stream` by calling itself on the one that was
/// returned inside of the [`Step`](aqueduct.html#Step) type.
///
/// As such, it is safe to use with infinite `Stream`s, since each step from the input `Stream`
/// that is called (importantly **that terminated** already) is guaranteed to return in finite
/// time... as long as the function provided takes finite time.
///
pub fn map(stream: Stream(a), fun: fn(a) -> b) -> Stream(b) {
Stream(fn() {
case next(stream) {
Next(rest, element) -> Next(map(rest, fun), fun(element))
Done -> Done
}
})
}
/// Combines two `Stream`s into another using the given function.
///
/// This is done lazily - a new `Stream` is constructed, whose internal function just gets the
/// next [`Step`](aqueduct.html#Step) from the old `Stream`s, and if both return a `Next` variant,
/// return a `Next` step by calling itself recursively on the new streams and call the function
/// on the two values.
///
/// Thus, if either of the `Stream`s end, the resulting `Stream` ends - in short, the length
/// of the resulting `Stream` is the minimum of the length of the two input `Stream`s.
///
pub fn map2(
first: Stream(a),
second: Stream(b),
fun: fn(a, b) -> c,
) -> Stream(c) {
Stream(fn() {
case next(first), next(second) {
Next(rest_first, element_first), Next(rest_second, element_second) ->
Next(
map2(rest_first, rest_second, fun),
fun(element_first, element_second),
)
_, _ -> Done
}
})
}
/// Transform a `Stream` to only retain values that match the predicate.
///
/// ## Note
/// This function works by creating a new `Stream`, which for every element that is requested,
/// requests an element from the provided `Stream`.
///
/// If the returned element passes the function, then it stops there and returns itself;
///
/// If it does not, then it recursively calls itself until the element passes.
///
/// As such, if a `Stream` created using this function were to be based on an infinite `Stream`
/// and the values that pass were extremely rare, then calling [`next`](aqueduct.html#next)
/// on such a `Stream` would take a long time.
///
/// Furthermore, if no elements in the input infinite `Stream` were to pass the predicate,
/// then calling [`next`](aqueduct.html#next) on such a `Stream` will never return.
///
pub fn filter(stream: Stream(a), predicate: fn(a) -> Bool) -> Stream(a) {
Stream(fn() {
case next(stream) {
Next(rest, element) -> {
case predicate(element) {
True -> Next(filter(rest, predicate), element)
False -> next(filter(rest, predicate))
}
}
Done -> Done
}
})
}
/// Transform a `Stream` to only retain values that are returned in an `Ok` variant
/// of the `Result` type.
///
/// Basically equivalent to the composition of [`filter`](aqueduct.html#filter) and
/// [`map`](aqueduct.html#map), just a bit more optimized.
///
/// ## Note
/// This function works by creating a new `Stream`, which for every element that is
/// requested, requests an element from the provided `Stream`.
///
/// If the returned element passes the function, then it stops there and returns
/// that transformed element and calls itself on the returned `Stream`;
///
/// If it does not, then it recursively calls itself until the element passes.
///
/// As such, if a `Stream` created using this function were to be based on an infinite
/// `Stream` and the values that pass were extremely rare, then calling
/// [`next`](aqueduct.html#next) on such a `Stream` would take a long time.
///
/// Furthermore, if no elements in the input infinite `Stream` were to pass the predicate,
/// then calling [`next`](aqueduct.html#next) on such a `Stream` will never return.
///
pub fn filter_map(
stream: Stream(a),
predicate: fn(a) -> Result(b, e),
) -> Stream(b) {
Stream(fn() {
case next(stream) {
Next(rest, element) -> {
case predicate(element) {
Ok(new_element) -> Next(filter_map(rest, predicate), new_element)
Error(_) -> next(filter_map(rest, predicate))
}
}
Done -> Done
}
})
}
/// Consume the provided `Stream` and execute a function on each element, eagerly consuming them.
///
/// Use to execute side-effects based on the values in the `Stream` using functions that cannot
/// fail, when you don't care about the consumed values on their own afterwards.
///
/// ## Note
/// If the provided `Stream` is infinite, this function will never return, and constantly
/// execute side effects on successive elements emitted by the `Stream`.
///
pub fn each(stream: Stream(a), evaluate fun: fn(a) -> Nil) -> Nil {
case next(stream) {
Next(rest, element) -> {
fun(element)
each(rest, fun)
}
Done -> Nil
}
}
/// Add the provided element to the **start** of the Stream - it will be the next element shown.
///
/// Useful when the `Stream` you made is not 'pure' - ie, it executes some side effect in order
/// to obtain the next element. In such a case, it's not guaranteed that after calling
/// [`next`](aqueduct.html#next) on such a `Stream`, and then returning to using the original
/// value would behave the same as expected.
///
/// Thus, if you wish to write generic functions that operate on `Stream`s and wish to reverse a
/// `Step`, instead of reusing the old value, you should prefer to take the new `Stream` and the
/// value from the `Next` variant of a `Step` and use this function to prepend that value
/// to the new stream.
///
pub fn prepend(stream: Stream(a), element: a) -> Stream(a) {
Stream(fn() { Next(stream, element) })
}
/// If the provided element is `Some`, add it to the **start** of the Stream - it will be
/// the next element shown.
///
/// Otherwise, return the `Stream` unchanged.
///
pub fn maybe_prepend(stream: Stream(a), maybe_element: Option(a)) -> Stream(a) {
case maybe_element {
Some(el) -> prepend(stream, el)
None -> stream
}
}
/// Add the provided element to the **end** of the Stream - it will be the last element shown.
///
/// ## Note
/// If the stream is infinite, this element will never be returned, since this function only
/// waits until the stream returns `Done`, and then replaces that `Done`
/// with `Next(empty(), element)`.
///
/// So if the `Stream` never returns `Done`, that element will never be returned.
///
pub fn append(stream: Stream(a), element: a) -> Stream(a) {
Stream(fn() {
case next(stream) {
Next(rest, element) -> Next(append(rest, element), element)
Done -> Next(empty(), element)
}
})
}
/// If the provided element is `Some`, it to the **end** of the Stream - it will be the
/// last element shown.
///
/// Otherwise, return the `Stream` unchanged.
///
/// ## Note
/// If the stream is infinite, this element will never be returned, since this function only
/// waits until the stream returns `Done`, and then replaces that `Done` with
/// `Next(empty(), element)`.
///
/// So if the `Stream` never returns done, that element will never be returned.
///
pub fn maybe_append(stream: Stream(a), maybe_element: Option(a)) -> Stream(a) {
case maybe_element {
Some(element) -> append(stream, element)
None -> stream
}
}
/// Concatonate two `Stream`s that emit the same type of element together.
///
/// ## Note
/// If the first `Stream` is infinite, then the second `Stream` will never appear.
///
pub fn concat(first: Stream(a), second: Stream(a)) -> Stream(a) {
Stream(fn() {
case next(first) {
Next(rest, element) -> Next(concat(rest, second), element)
Done -> next(second)
}
})
}
/// Helper function to both *prepend* and *append* the provided element to the `Stream`.
///
/// It is basically equivalent to first calling the [`append`](aqueduct.html#append) function,
/// then the [`prepend`](aqueduct.html#prepend) function.
///
pub fn wrap(stream: Stream(a), in element: a) -> Stream(a) {
Stream(fn() { Next(append(stream, element), element) })
}
/// Helper function to both *prepend* and *append* the provided `Option(element)` to the `Stream`.
///
/// If the provided element is `None`, the stream is returned unmodified.
///
/// If it's `Some`, it just calls [`wrap`](aqueduct.html#wrap).
///
/// It is basically equivalent to first calling the [`maybe_append`](aqueduct.html#maybe_append)
/// function, then the [`maybe_prepend`](aqueduct.html#maybe_prepend) function.
///
pub fn maybe_wrap(stream: Stream(a), in maybe_element: Option(a)) -> Stream(a) {
case maybe_element {
Some(element) -> wrap(stream, element)
None -> stream
}
}