lib/stream_split.ex

defmodule StreamSplit do
  @enforce_keys [:continuation, :stream]
  defstruct @enforce_keys

  @doc """
  This function is a combination of `Enum.take/2` and `Enum.drop/2` returning
  first `n` dropped elements and the rest of the enum as a stream.

  The important difference is that the enumerable is only iterated once, and
  only for the required `n` items. The rest of the enumerable may be iterated
  lazily later from the returned stream.

  ## Examples
      iex> {head, tail} = take_and_drop(Stream.cycle(1..3), 4)
      iex> head
      [1, 2, 3, 1]
      iex> Enum.take(tail, 7)
      [2, 3, 1, 2, 3, 1, 2]
  """
  @spec take_and_drop(Enumerable.t(), pos_integer) :: {List.t(), Enumerable.t()}
  def take_and_drop(enum, n) when n > 0 do
    case apply_reduce(enum, n) do
      {:done, {_, list}} ->
        {:lists.reverse(list), []}

      {:suspended, {_, list}, cont} ->
        stream_split = %__MODULE__{continuation: cont, stream: continuation_to_stream(cont)}
        {:lists.reverse(list), stream_split}

      {:halted, {_, list}} ->
        {:lists.reverse(list), []}
    end
  end

  def take_and_drop(enum, 0) do
    {[], enum}
  end

  defp apply_reduce(%__MODULE__{continuation: cont}, n) do
    cont.({:cont, {n, []}})
  end

  defp apply_reduce(enum, n) do
    Enumerable.reduce(enum, {:cont, {n, []}}, &reducer_helper/2)
  end

  defp reducer_helper(item, :tail) do
    {:suspend, item}
  end

  defp reducer_helper(item, {c, list}) when c > 1 do
    {:cont, {c - 1, [item | list]}}
  end

  defp reducer_helper(item, {_, list}) do
    {:suspend, {0, [item | list]}}
  end

  defp continuation_to_stream(cont) do
    wrapped = fn {_, _, acc_cont} ->
      case acc_cont.({:cont, :tail}) do
        acc = {:suspended, item, _cont} ->
          {[item], acc}

        {:halted, acc} ->
          {:halt, acc}

        {:done, acc} ->
          {:halt, acc}
      end
    end

    cleanup = fn
      {:suspended, _, acc_cont} ->
        acc_cont.({:halt, nil})

      _ ->
        nil
    end

    Stream.resource(fn -> {:suspended, nil, cont} end, wrapped, cleanup)
  end

  @doc """
  This function looks at the first `n` items in a stream. The remainder of the
  enumerable is returned as a stream that may be lazily enumerated at a later
  time.

  You may think of this function as popping `n` items of the enumerable, then
  pushing them back after making a copy.

  Use this function with a stream to peek at items, but not iterate a stream
  with side effects more than once.

  ## Examples
      iex> {head, new_enum} = peek(Stream.cycle(1..3), 4)
      iex> head
      [1, 2, 3, 1]
      iex> Enum.take(new_enum, 7)
      [1, 2, 3, 1, 2, 3, 1]
  """
  @spec peek(Enumerable.t(), pos_integer) :: {List.t(), Enumerable.t()}
  def peek(enum, n) when n >= 0 do
    {h, t} = take_and_drop(enum, n)
    {h, Stream.concat(h, t)}
  end

  @doc """
  This function may be seen as splitting head and tail for a `List`, but for
  enumerables.

  It is implemented on top of `take_and_drop/2`

  ## Examples
      iex> {head, tail} = pop(Stream.cycle(1..3))
      iex> head
      1
      iex> Enum.take(tail, 7)
      [2, 3, 1, 2, 3, 1, 2]
  """
  @spec pop(Enumerable.t()) :: {any, Enumerable.t()}
  def pop(enum) do
    {[h], rest} = take_and_drop(enum, 1)
    {h, rest}
  end
end

defimpl Enumerable, for: StreamSplit do
  def count(_stream_split), do: {:error, __MODULE__}

  def member?(_stream_split, _value), do: {:error, __MODULE__}

  def slice(_stream_split), do: {:error, __MODULE__}

  def reduce(%StreamSplit{stream: stream}, acc, fun) do
    Enumerable.reduce(stream, acc, fun)
  end
end