lib/kvasir/storage/event_stream.ex

defmodule EventStream do
  defmodule State do
    @enforce_keys ~w(source topic id partition from events)a
    defstruct @enforce_keys ++ [endless: false]
  end

  @type t :: %__MODULE__{}
  defstruct ~w(source topic id partition from events)a ++ [endless: false]

  def start(stream, opts \\ []) do
    pid = opts[:pid] || self()

    spawn(fn -> Enum.each(stream, &send(pid, {:event, &1})) end)
  end

  def start_link(stream, opts \\ []) do
    pid = opts[:pid] || self()

    spawn_link(fn -> Enum.each(stream, &send(pid, {:event, &1})) end)
  end

  def slice(stream, from..to) do
    if from < 0 or to < 0, do: raise("Negative ranges not allowed.")

    stream
    |> Enumerable.reduce({:cont, {[], 0}}, fn i, {l, c} ->
      cond do
        c < from -> {:cont, l, c + 1}
        c < to -> {:cont, [i | l], c + 1}
        :done -> {:halt, [i | l]}
      end
    end)
    |> elem(1)
    |> :lists.reverse()
  end

  defimpl Enumerable, for: __MODULE__ do
    def count(_stream), do: {:error, __MODULE__}

    def member?(_stream, _value), do: {:error, __MODULE__}

    def slice(_stream), do: {:error, __MODULE__}

    def reduce(es = %EventStream{source: source}, acc, fun) do
      fun_x = fn e, {o, a} ->
        {x, y} = fun.(e, a)
        {x, {Kvasir.Offset.set(o, e.__meta__.partition, e.__meta__.offset + 1), y}}
      end

      {acc_t, acc_v} = acc
      acc = {acc_t, {Kvasir.Offset.create(), acc_v}}

      {_t, {offset, cold}} =
        source.__storages__()
        |> storages(es.topic, es.from)
        |> Kernel.++([{source.__source__(), Module.concat(source, Source)}])
        |> cold_storage(es, acc, fun_x)

      if es.endless do
        streaming = self()

        o =
          if offset.partitions == %{} and (es.from && es.from.partitions != %{}),
            do: es.from,
            else: offset

        source.listen(
          es.topic.topic,
          fn event ->
            send(streaming, {:event, event})
            :ok
          end,
          from: o
        )

        listen(fun, {:cont, cold})
      else
        {:done, cold}
      end
    end

    defp listen(fun, acc) do
      state =
        receive do
          {:event, e} -> fun.(e, acc)
        end

      listen(fun, state)
    end

    defp storages(storages, topic, from, acc \\ [])
    defp storages([], _topic, _from, acc), do: acc
    defp storages(storages, _topic, nil, _acc), do: :lists.reverse(storages)

    defp storages([h = {m, n} | t], topic, from, acc) do
      case m.contains?(n, topic, from) do
        true -> [h | acc]
        false -> storages(t, topic, from, [h | acc])
        :maybe -> storages(t, topic, from, [h | acc])
      end
    end

    defp cold_storage([], _es, result, _fun), do: result

    defp cold_storage([{cold, name} | tail], es, acc, fun) do
      if cold.contains?(name, es.topic, es.from) in [false] do
        cold_storage(tail, es, acc, fun)
      else
        {:ok, stream} =
          cold.stream(name, es.topic,
            from: es.from,
            id: es.id,
            key: es.id,
            partition: es.partition,
            events: es.events
          )

        {type, {new_acc, o}} =
          case Enumerable.reduce(stream, acc, fun) do
            {:halted, {offset, a}} -> {:halted, {a, offset}}
            {:suspended, {offset, a}} -> {:suspend, {a, offset}}
            {t, []} -> {t, {[], es.from}}
            {t, a = [{offset, _} | _]} -> {t, {Enum.map(a, &elem(&1, 1)), offset}}
            {t, a = {offset, _}} -> {t, {a, offset}}
          end

        cond do
          type == :halted ->
            {type, {o, new_acc}}

          type == :done and tail == [] ->
            {type, new_acc}

          :default ->
            tt = if(type == :done, do: :cont, else: type)

            if o.partitions == %{} do
              cold_storage(tail, es, {tt, new_acc}, fun)
            else
              o2 = Kvasir.Offset.merge(es.from, o)
              cold_storage(tail, %{es | from: o2}, {tt, new_acc}, fun)
            end
        end
      end
    end
  end

  defimpl Inspect, for: __MODULE__ do
    import Inspect.Algebra

    def inspect(%{topic: topic, id: id, partition: partition, from: from, endless: e}, opts) do
      filter =
        Enum.reject(
          [
            id: id,
            partition: partition,
            from: from
          ],
          &is_nil(elem(&1, 1))
        )

      s = if(e, do: "∞", else: "#")

      if filter == [] do
        concat([s, "EventStream<", to_doc(topic.topic, opts), ">"])
      else
        f = filter(to_doc(filter, opts))
        concat([s, "EventStream<", to_doc(topic.topic, opts), ",", f, ">"])
      end
    end

    @spec filter(tuple) :: tuple
    defp filter({a, {b1, {b2a, {b2b1, {b2b2a, _b2b2b, b2b2c}, b2b3}, b2c, b2d}, _b3}, c}) do
      {a,
       {b1, {b2a, {b2b1, {b2b2a, {:doc_color, " ", :default_color}, b2b2c}, b2b3}, b2c, b2d},
        {:doc_cons, {:doc_break, "", :strict},
         {:doc_cons, {:doc_color, "", :default_color}, {:doc_color, :doc_nil, [:reset, :yellow]}}}},
       c}
    end

    defp filter(
           {a, {:doc_cons, {:doc_nest, {:doc_cons, _, inside}, b, b2}, {:doc_cons, close, _}}, c}
         ) do
      {a, {:doc_cons, {:doc_nest, {:doc_cons, " ", inside}, b, b2}, close}, c}
    end

    defp filter(unknown), do: unknown
  end
end