lib/etfs/stream.ex

defmodule ETFs.Stream do
  defstruct path: nil, format: :v3

  @fourcc "ETFs"

  @doc false
  def open(path, opts \\ []) do
    format = Keyword.get(opts, :format, :v3)
    %__MODULE__{path: path, format: format}
  end

  def record_count(%__MODULE__{path: path, format: :v3}) do
    with {:ok, f} <- File.open(path, [:read]),
         @fourcc <- IO.binread(f, 4),
         <<record_count::integer-size(32)>> <- IO.binread(f, 4),
         :ok = File.close(f) do
      {:ok, record_count}
    else
      err -> {:error, err}
    end
  end

  def stream_all_records!(%__MODULE__{path: path, format: :v3}) do
    Stream.resource(
      fn ->
        {:ok, io} = File.open(path, [:read])
        @fourcc = IO.binread(io, 4)
        <<record_count::integer-size(32)>> = IO.binread(io, 4)
        <<_toc_pos::integer-size(64)>> = IO.binread(io, 8)
        {io, record_count}
      end,
      fn
        {io, 0} ->
          {:halt, io}

        {io, records_left} ->
          with <<record_len::integer-size(32)>> <- IO.binread(io, 4),
               record when is_binary(record) <- IO.binread(io, record_len) do
            {[record], {io, records_left - 1}}
          else
            :eof -> {:halt, io}
            {:error, _} -> {:halt, io}
          end
      end,
      &File.close/1
    )
    |> Stream.map(&:erlang.binary_to_term/1)
  end

  def slice_records(%__MODULE__{path: path, format: :v3}) do
    {:ok, io} = File.open(path, [:read])

    @fourcc = IO.binread(io, 4)
    <<record_count::integer-size(32)>> = IO.binread(io, 4)
    <<toc_pos::integer-size(64)>> = IO.binread(io, 8)

    f = fn start, count ->
      first_record_pos_pos = toc_pos + start * 8
      {:ok, <<first_record_pos::integer-size(64)>>} = :file.pread(io, first_record_pos_pos, 8)
      :file.position(io, {:bof, first_record_pos})

      records =
        for _i <- 0..(count - 1) do
          <<record_len::integer-size(32)>> = IO.binread(io, 4)
          IO.binread(io, record_len) |> :erlang.binary_to_term()
        end

      File.close(io)

      records
    end

    {:ok, record_count, f}
  end

  def collect_into(%__MODULE__{path: path, format: :v3}) do
    {:ok, io} = File.open(path, [:write])

    IO.binwrite(io, [@fourcc, <<0::integer-size(32)>>, <<0::integer-size(64)>>])

    collector_fun = fn
      {io, pos, toc}, {:cont, record} ->
        record_bin = :erlang.term_to_binary(record, [:compressed, minor_version: 2])
        msg = [<<byte_size(record_bin)::integer-size(32)>>, record_bin]

        IO.binwrite(io, msg)

        msg_size = 4 + byte_size(record_bin)
        {io, pos + msg_size, [{pos, msg_size} | toc]}

      {io, toc_pos, toc}, :done ->
        toc
        |> Enum.reverse()
        |> Enum.each(fn {pos, _msg_size} ->
          IO.binwrite(io, [
            <<pos::integer-size(64)>>
          ])
        end)

        :file.position(io, {:bof, 4})
        IO.binwrite(io, <<length(toc)::integer-size(32)>>)
        IO.binwrite(io, <<toc_pos::integer-size(64)>>)

        File.close(io)

      _set, :halt ->
        File.close(io)
    end

    {{io, 16, []}, collector_fun}
  end
end

defimpl Enumerable, for: ETFs.Stream do
  def reduce(etfs, acc, fun) do
    s = ETFs.Stream.stream_all_records!(etfs)
    Enumerable.reduce(s, acc, fun)
  end

  def slice(etfs), do: ETFs.Stream.slice_records(etfs)

  def member?(etfs, element) do
    s = ETFs.Stream.stream_all_records!(etfs)
    Enumerable.member?(s, element)
  end

  def count(etfs) do
    ETFs.Stream.record_count(etfs)
  end
end

defimpl Collectable, for: ETFs.Stream do
  def into(etfs) do
    ETFs.Stream.collect_into(etfs)
  end
end