lib/clickhouse/stream.ex

defmodule ClickHouse.Stream do
  @moduledoc """
  Defines a `ClickHouse.Stream` struct returned by `ClickHouse.stream!/4`.
  """

  defstruct [:client, :query, :opts, :id]

  @type t :: %__MODULE__{
          client: ClickHouse.Client.t(),
          query: ClickHouse.Query.t(),
          opts: keyword(),
          id: nil | reference()
        }

  @doc false
  @spec new(ClickHouse.client(), ClickHouse.Query.t(), opts :: keyword()) :: ClickHouse.Stream.t()
  def new(client, query, opts) do
    %__MODULE__{client: client, query: query, opts: opts}
  end
end

defimpl Enumerable, for: ClickHouse.Stream do
  def reduce(stream, acc, fun) do
    stream_start = fn ->
      case stream.client.interface.stream_start(stream) do
        {:ok, stream} ->
          stream

        {:error, error} ->
          raise error
      end
    end

    stream_next = fn stream ->
      case stream.client.interface.stream_next(stream) do
        {:cont, stream} ->
          {[], stream}

        {:cont, stream, data} ->
          {[data], stream}

        {:halt, stream} ->
          {:halt, stream}

        {:error, error} ->
          raise error
      end
    end

    Stream.resource(stream_start, stream_next, & &1).(acc, fun)
  end

  def member?(_, _) do
    {:error, __MODULE__}
  end

  def count(_) do
    {:error, __MODULE__}
  end

  def slice(_) do
    {:error, __MODULE__}
  end
end

defimpl Collectable, for: ClickHouse.Stream do
  def into(stream) do
    stream =
      case stream.client.interface.stream_into_start(stream) do
        {:ok, stream} -> stream
        {:error, error} -> raise error
      end

    collector_fun = fn stream, command ->
      case stream.client.interface.stream_into_next(stream, command) do
        {:ok, result} -> result
        {:error, error} -> raise error
      end
    end

    {stream, collector_fun}
  end
end