lib/dets_plus.ex

defmodule DetsPlus do
  @moduledoc """
  DetsPlus persistent tuple storage.

  [DetsPlus](https://github.com/dominicletz/dets_plus) has a similiar API as `dets` but without
  the 2GB file storage limit. Writes are buffered in an
  internal ETS table and synced every `auto_save` period
  to the persistent storage.

  While `sync()` or `auto_save` is in progress the database
  can still read and written.

  There is no commitlog so not synced writes are lost.
  Lookups are possible by key and non-matches are accelerated
  using a bloom filter. The persistent file concept follows
  DJ Bernsteins CDB database format, but uses an Elixir
  encoded header https://cr.yp.to/cdb.html

  Limits are:
  - Total file size: 18_446 Petabyte
  - Maximum entry size: 4 Gigabyte
  - Maximum entry count: :infinity
  """

  # This limits the total database size to 18_446 PB
  @slot_size 8
  @slot_size_bits @slot_size * 8

  # This limits the biggest entry size to 4 GB
  @entry_size_size 4
  @entry_size_size_bits @entry_size_size * 8

  # We're using sha256 - should not have conflicts ever
  @hash_size 32
  @hash_size_bits @hash_size * 8

  use GenServer
  require Logger

  @enforce_keys [:version]
  defstruct [
    :version,
    :header_offset,
    :bloom,
    :bloom_size,
    :filename,
    :fp,
    :name,
    :ets,
    :mode,
    :auto_save,
    :keypos,
    :type,
    :file_entries,
    :slot_counts,
    :table_offsets,
    :sync,
    :sync_waiters,
    :file_size,
    :sync_fallback
  ]

  @version 1

  @doc """
    Opens an existing table or creates a new table. If no
    `file` argument is provided the table name will be used.

    Arguments:

    - `auto_save` - The autosave interval. If the interval is an integer Time, the table is flushed to disk whenever it is not accessed for Time milliseconds. A table that has been flushed requires no reparation when reopened after an uncontrolled emulator halt. If the interval is the atom infinity, autosave is disabled. Defaults to 180000 (3 minutes).
    - `keypos` - The position of the element of each object to be used as key. Defaults to 1. The ability to explicitly state the key position is most convenient when we want to store Erlang records in which the first position of the record is the name of the record type.
  """
  def open_file(name, args \\ []) when is_atom(name) do
    filename = Keyword.get(args, :file, name) |> do_string()

    state =
      if File.exists?(filename) do
        load_state(filename)
        |> init_table_offsets()
      else
        mode = Keyword.get(args, :access, :read_write)
        auto_save = Keyword.get(args, :auto_save, 18_000)
        keypos = Keyword.get(args, :keypos, 1)
        type = Keyword.get(args, :type, :set)

        # unused options from dets:
        # - ram_file
        # - max_no_slots
        # - min_no_slots
        # - repair

        %DetsPlus{
          version: @version,
          bloom: "",
          bloom_size: 0,
          header_offset: 0,
          filename: filename,
          fp: nil,
          name: name,
          mode: mode,
          auto_save: auto_save,
          keypos: keypos,
          type: type,
          file_entries: 0,
          slot_counts: %{},
          file_size: 0,
          sync: nil,
          sync_waiters: [],
          sync_fallback: %{}
        }
      end

    case GenServer.start_link(__MODULE__, state, hibernate_after: 5_000, name: name) do
      {:ok, _pid} -> {:ok, name}
      err -> err
    end
  end

  defp load_state(filename) do
    fp = file_open(filename)

    {:ok, <<header_size::unsigned-size(@entry_size_size_bits)>>} =
      PagedFile.pread(fp, 0, @entry_size_size)

    {:ok, header} = PagedFile.pread(fp, @entry_size_size, header_size)

    %DetsPlus{version: version} = state = :erlang.binary_to_term(header)

    if version > @version do
      raise("incompatible dets+ version #{version}")
    end

    %DetsPlus{
      state
      | fp: fp,
        header_offset: header_size + @entry_size_size,
        file_size: File.stat!(filename).size
    }
  end

  defp store_state(state = %DetsPlus{}, fp) do
    bin =
      :erlang.term_to_binary(
        %DetsPlus{
          state
          | fp: nil,
            sync: nil,
            sync_waiters: [],
            header_offset: 0,
            table_offsets: %{},
            ets: nil
        },
        [:compressed]
      )

    size = byte_size(bin)
    :ok = PagedFile.pwrite(fp, 0, <<size::unsigned-size(@entry_size_size_bits)>>)
    :ok = PagedFile.pwrite(fp, @entry_size_size, bin)
    %DetsPlus{state | header_offset: size + @entry_size_size}
  end

  @impl true
  def init(state = %DetsPlus{auto_save: auto_save}) do
    if is_integer(auto_save) do
      :timer.send_interval(auto_save, :auto_save)
    end

    {:ok, init_ets(state)}
  end

  defp init_ets(state = %DetsPlus{name: name, type: type, keypos: keypos}) do
    %DetsPlus{state | ets: :ets.new(name, [type, {:keypos, keypos}])}
  end

  defp init_table_offsets(
         state = %DetsPlus{header_offset: header_offset, slot_counts: slot_counts}
       ) do
    table_offsets =
      Enum.reduce(1..256, %{0 => header_offset}, fn table_idx, table_offsets ->
        offset =
          Map.get(table_offsets, table_idx - 1) +
            Map.get(slot_counts, table_idx - 1, 0) * @slot_size

        Map.put(table_offsets, table_idx, offset)
      end)

    %DetsPlus{state | table_offsets: table_offsets}
  end

  @doc """
  Syncs pending writes to the persistent file and closes the table.
  """
  @spec close(atom | pid) :: :ok
  def close(pid) do
    call(pid, :sync)
    GenServer.stop(pid)
  end

  @doc """
    Inserts one or more objects into the table. If there already exists an object with a key matching the key of some of the given objects, the old object will be replaced.
  """
  @spec insert(atom | pid, tuple()) :: :ok | {:error, atom()}
  def insert(pid, tuple) do
    call(pid, {:insert, tuple})
  end

  @doc """
    Inserts one or more objects into the table. If there already exists an object with a key matching the key of some of the given objects, the old object will be replaced.
  """
  def insert_new(pid, tuple) do
    case call(pid, {:insert_new, tuple}) do
      :ok -> true
      false -> false
    end
  end

  @doc """
  Returns a list of all objects with key Key stored in the table.

  Example:

  ```
  2> DetsPlus.open_file(:abc)
  {ok,:abc}
  3> DetsPlus.insert(:abc, {1,2,3})
  ok
  4> DetsPlus.insert(:abc, {1,3,4})
  ok
  5> DetsPlus.lookup(:abc, 1).
  [{1,3,4}]
  ```

  If the table type is set, the function returns either the empty list or a list with one object, as there cannot be more than one object with a given key. If the table type is bag or duplicate_bag, the function returns a list of arbitrary length.

  Notice that the order of objects returned is unspecified. In particular, the order in which objects were inserted is not reflected.
  """
  @spec lookup(atom | pid, any) :: [tuple()] | {:error, atom()}
  def lookup(pid, key) do
    call(pid, {:lookup, key})
  end

  @doc """
  Works like `lookup/2`, but does not return the objects. Returns true if one or more table elements has the key `key`, otherwise false.
  """
  @spec member?(atom | pid, any) :: false | true | {:error, atom}
  def member?(pid, key) do
    case lookup(pid, key) do
      [] -> false
      list when is_list(list) -> true
      error -> error
    end
  end

  @doc """
  Same as `member?/2`
  """
  @spec member(atom | pid, any) :: false | true | {:error, atom}
  def member(pid, key) do
    member?(pid, key)
  end

  @doc """
  Ensures that all updates made to table are written to disk. While the sync is running the
  table can still be used for reads and writes, but writes issed after the `sync/1` call
  will not be part of the persistent file. These new changes will only be included in the
  next sync call.
  """
  @spec sync(atom | pid) :: :ok
  def sync(pid) do
    call(pid, :sync, :infinity)
  end

  @doc """
  Returns information about table Name as a list of tuples:

  - `{file_size, integer() >= 0}}` - The file size, in bytes.
  - `{filename, file:name()}` - The name of the file where objects are stored.
  - `{keypos, keypos()}` - The key position.
  - `{size, integer() >= 0}` - The number of objects estimated in the table.
  - `{type, type()}` - The table type.
  """
  @spec info(atom | pid) :: [] | nil
  def info(pid) do
    call(pid, :info)
  end

  @doc """
  Returns the information associated with `item` for the table. The following items are allowed:

  - `{file_size, integer() >= 0}}` - The file size, in bytes.
  - `{filename, file:name()}` - The name of the file where objects are stored.
  - `{keypos, keypos()}` - The key position.
  - `{size, integer() >= 0}` - The number of objects estimated in the table.
  - `{type, type()}` - The table type.
  """
  @spec info(atom | pid, :file_size | :filename | :keypos | :size | :type) :: any()
  def info(pid, item)
      when item == :file_size or item == :filename or item == :keypos or item == :size or
             item == :type do
    case info(pid) do
      nil -> nil
      list -> Keyword.get(list, item)
    end
  end

  defp call(pid, cmd, timeout \\ :infinity) do
    GenServer.call(pid, cmd, timeout)
  end

  @impl true
  def handle_call(
        {:insert, tuple},
        from,
        state = %DetsPlus{
          ets: ets,
          sync: sync,
          sync_fallback: fallback,
          sync_waiters: sync_waiters
        }
      ) do
    if sync == nil do
      :ets.insert(ets, tuple)
      {:reply, :ok, check_auto_save(state)}
    else
      fallback =
        List.wrap(tuple)
        |> Enum.reduce(fallback, fn tuple, fallback ->
          Map.put(fallback, do_key(state, tuple), tuple)
        end)

      if map_size(fallback) > 1_000_000 do
        # this pause exists to protect from out_of_memory situations when the writer can't
        # finish in time
        Logger.warn(
          "DetsPlus flush slower than new inserts - pausing writes until flush is complete"
        )

        {:noreply,
         %DetsPlus{state | sync_fallback: fallback, sync_waiters: [from | sync_waiters]}}
      else
        {:reply, :ok, %DetsPlus{state | sync_fallback: fallback}}
      end
    end
  end

  def handle_call(
        {:insert_new, tuple},
        from,
        state = %DetsPlus{ets: ets, sync_fallback: fallback}
      ) do
    tuples = List.wrap(tuple)

    exists =
      Enum.any?(tuples, fn tuple ->
        key = do_key(state, tuple)

        Map.has_key?(fallback, key) || :ets.lookup(ets, key) != [] ||
          file_lookup(state, key) != []
      end)

    if exists do
      {:reply, false, state}
    else
      handle_call({:insert, tuples}, from, state)
    end
  end

  def handle_call(
        {:lookup, key},
        _from,
        state = %DetsPlus{ets: ets, sync_fallback: fallback}
      ) do
    case Map.get(fallback, key) do
      nil ->
        case :ets.lookup(ets, key) do
          [] -> {:reply, file_lookup(state, key), state}
          other -> {:reply, other, state}
        end

      value ->
        {:reply, [value], state}
    end
  end

  def handle_call(
        :info,
        _from,
        state = %DetsPlus{
          filename: filename,
          keypos: keypos,
          type: type,
          ets: ets,
          file_size: file_size,
          file_entries: file_entries
        }
      ) do
    size = :ets.info(ets, :size) + file_entries
    info = [file_size: file_size, filename: filename, keypos: keypos, size: size, type: type]
    {:reply, info, state}
  end

  def handle_call(:sync, from, state = %DetsPlus{sync: nil, ets: ets}) do
    if :ets.info(ets, :size) == 0 do
      {:reply, :ok, state}
    else
      sync = spawn_sync_worker(state)
      {:noreply, %DetsPlus{state | sync: sync, sync_waiters: [from]}}
    end
  end

  def handle_call(:sync, from, state = %DetsPlus{sync: sync, sync_waiters: sync_waiters})
      when is_pid(sync) do
    {:noreply, %DetsPlus{state | sync_waiters: [from | sync_waiters]}}
  end

  @impl true
  def handle_cast(
        {:sync_complete, new_filename, new_state = %DetsPlus{}},
        %DetsPlus{
          fp: fp,
          filename: filename,
          ets: ets,
          sync_waiters: waiters,
          sync_fallback: fallback
        }
      ) do
    if fp != nil do
      :ok = PagedFile.close(fp)
    end

    File.rename!(new_filename, filename)
    fp = file_open(filename)

    :ets.delete_all_objects(ets)
    :ets.insert(ets, Map.values(fallback))

    for w <- waiters do
      :ok = GenServer.reply(w, :ok)
    end

    {:noreply, %DetsPlus{new_state | fp: fp, sync: nil, sync_fallback: %{}, sync_waiters: []}}
  end

  @impl true
  def handle_info(:auto_save, state = %DetsPlus{sync: sync, ets: ets}) do
    sync =
      if sync == nil and :ets.info(ets, :size) > 0 do
        spawn_sync_worker(state)
      else
        sync
      end

    {:noreply, %DetsPlus{state | sync: sync}}
  end

  defp check_auto_save(state) do
    state
  end

  defp spawn_sync_worker(
         state = %DetsPlus{
           ets: ets,
           fp: fp,
           filename: filename,
           file_entries: file_entries
         }
       ) do
    # assumptions here
    # 1. ets data set is small enough to fit into memory
    # 2. fp entries are sorted by hash
    dets = self()

    worker =
      spawn_link(fn ->
        new_dataset = :ets.tab2list(ets)
        send(dets, :continue)

        # Ensuring hash function sort order
        new_dataset =
          parallel_hash(state, new_dataset)
          |> Enum.sort_by(fn {hash, _tuple} -> hash end, :asc)

        new_filename = "#{filename}.buffer"

        # opts =
        #   if String.ends_with?(filename, ".gz") do
        #     [:raw, :read, :read_ahead, :write, :delayed_write, :binary, :compressed]
        #   else
        #     [:raw, :read, :read_ahead, :write, :delayed_write, :binary]
        #   end
        opts = [page_size: 1_000_000, max_pages: 1000]

        # setting the bloom size based of a size estimate
        bloom_size = (file_entries + length(new_dataset)) * 10
        state = bloom_create(state, bloom_size)

        old_file =
          if fp != nil do
            FileReader.new(fp, table_offset(state, 256), module: PagedFile, buffer_size: 512_000)
          else
            nil
          end

        state = pre_scan_file(state, new_dataset, old_file)

        PagedFile.delete(new_filename)
        {:ok, new_file} = PagedFile.open(new_filename, opts)

        state =
          %DetsPlus{state | fp: new_file}
          |> bloom_finalize()
          |> store_state(new_file)
          |> init_table_offsets()

        state = write_to_file(state, table_offset(state, 256), new_dataset, old_file)
        PagedFile.close(new_file)
        GenServer.cast(dets, {:sync_complete, new_filename, state})
      end)

    # Profiler.fprof(worker)

    receive do
      :continue ->
        :erlang.garbage_collect()
        worker
    end
  end

  @min_chunk_size 10_000
  @max_tasks 4
  defp parallel_hash(state, new_dataset, tasks \\ 1) do
    len = length(new_dataset)

    if len > @min_chunk_size and tasks < @max_tasks do
      {a, b} = Enum.split(new_dataset, div(len, 2))
      task = Task.async(fn -> parallel_hash(state, a, tasks * 2) end)
      result = parallel_hash(state, b, tasks * 2)
      Task.await(task, :infinity) ++ result
    else
      Enum.map(new_dataset, fn tuple ->
        key = do_key(state, tuple)
        hash = do_hash(state, key)
        {hash, tuple}
      end)
    end
  end

  defp pre_scan_file(state, new_dataset, old_file) do
    state = %DetsPlus{state | file_entries: 0, slot_counts: %{}}

    state =
      %DetsPlus{slot_counts: slot_counts} =
      iterate(
        state,
        state,
        new_dataset,
        old_file,
        fn state = %DetsPlus{file_entries: file_entries, slot_counts: slot_counts},
           entry_hash,
           _ ->
          table_idx = rem(entry_hash, 256)
          slot_counts = Map.update(slot_counts, table_idx, 1, fn count -> count + 1 end)
          state = bloom_add(state, entry_hash)
          %DetsPlus{state | file_entries: file_entries + 1, slot_counts: slot_counts}
        end
      )

    new_slot_counts =
      Enum.map(slot_counts, fn {key, value} -> {key, trunc(value * 1.5) + 1} end)
      |> Map.new()

    %DetsPlus{state | slot_counts: new_slot_counts}
  end

  defp write_to_file(
         state = %DetsPlus{},
         new_file_offset,
         new_dataset,
         old_file
       ) do
    {state, new_file_offset} =
      iterate(state, {state, new_file_offset}, new_dataset, old_file, fn {state, new_file_offset},
                                                                         entry_hash,
                                                                         entry ->
        new_file_offset = file_put_entry(state, entry_hash, entry, new_file_offset)
        {state, new_file_offset}
      end)

    %DetsPlus{state | file_size: new_file_offset}
  end

  # this function takes a new_dataset and an old file and merges them, it calls
  # on every entry the callback `fun.(acc, entry_hash, binary_entry)` and returns the final acc
  defp iterate(
         state = %DetsPlus{},
         acc,
         new_dataset,
         file_reader,
         fun
       ) do
    # Reading a new entry from the top of the dataset or nil
    {new_entry, new_entry_hash} =
      case new_dataset do
        [{new_entry_hash, new_entry} | _rest] ->
          {new_entry, new_entry_hash}

        [] ->
          {nil, nil}
      end

    # Reading a new entry from the file or falling back to else if there is no next
    # entry in the file anymore
    with false <- file_reader == nil,
         {new_file_reader, <<old_size::unsigned-size(@entry_size_size_bits)>>} <-
           FileReader.read(file_reader, @entry_size_size) do
      {new_file_reader, old_entry} = FileReader.read(new_file_reader, old_size)
      entry = :erlang.binary_to_term(old_entry)
      entry_key = do_key(state, entry)
      entry_hash = do_hash(state, entry_key)
      {entry, old_entry, entry_hash, new_file_reader}

      case new_entry_hash do
        # reached end of new_dataset
        nil ->
          iterate(state, fun.(acc, entry_hash, old_entry), [], new_file_reader, fun)

        # replacing an old entry with a new entry
        ^entry_hash ->
          bin = :erlang.term_to_binary(new_entry)
          iterate(state, fun.(acc, new_entry_hash, bin), tl(new_dataset), new_file_reader, fun)

        # inserting the new entry before the old
        new_entry_hash when new_entry_hash < entry_hash ->
          bin = :erlang.term_to_binary(new_entry)
          iterate(state, fun.(acc, new_entry_hash, bin), tl(new_dataset), file_reader, fun)

        # inserting the old entry before the new
        new_entry_hash when new_entry_hash > entry_hash ->
          iterate(state, fun.(acc, entry_hash, old_entry), new_dataset, new_file_reader, fun)
      end
    else
      atom when atom == true or atom == :eof ->
        # reached end of both lines
        if new_entry == nil do
          acc
        else
          # reached end of file for the old file
          bin = :erlang.term_to_binary(new_entry)
          iterate(state, fun.(acc, new_entry_hash, bin), tl(new_dataset), nil, fun)
        end
    end
  end

  defp bloom_create(state = %DetsPlus{}, bloom_size) do
    {:ok, ram_file} = :file.open(:binary.copy(<<0>>, bloom_size), [:ram, :read, :write, :binary])
    %DetsPlus{state | bloom_size: bloom_size, bloom: {ram_file, [], 0}}
  end

  defp bloom_add(state = %DetsPlus{bloom_size: bloom_size, bloom: {ram_file, keys, n}}, hash) do
    key = rem(hash, bloom_size)
    keys = [key | keys]
    n = n + 1

    if n < 128 do
      %DetsPlus{state | bloom: {ram_file, keys, n}}
    else
      :ok = :file.pwrite(ram_file, Enum.zip(keys, List.duplicate(<<1>>, n)))
      %DetsPlus{state | bloom: {ram_file, [], 0}}
    end
  end

  defp bloom_finalize(state = %DetsPlus{bloom: {ram_file, keys, n}, bloom_size: bloom_size}) do
    :ok = :file.pwrite(ram_file, Enum.zip(keys, List.duplicate(<<1>>, n)))
    {:ok, binary} = :file.pread(ram_file, 0, bloom_size)
    :file.close(ram_file)
    %DetsPlus{state | bloom: binary}
  end

  defp bloom_lookup(%DetsPlus{bloom_size: bloom_size, bloom: bloom}, hash) do
    :binary.at(bloom, rem(hash, bloom_size)) == 1
  end

  defp table_offset(%DetsPlus{table_offsets: nil}, _table_idx) do
    nil
  end

  defp table_offset(%DetsPlus{table_offsets: offsets}, table_idx) do
    Map.get(offsets, table_idx)
  end

  defp file_put_entry(
         state = %DetsPlus{fp: fp, slot_counts: slot_counts},
         hash,
         entry,
         offset
       ) do
    size = byte_size(entry)
    table_idx = rem(hash, 256)

    slot_count = Map.get(slot_counts, table_idx)
    slot = rem(div(hash, 256), slot_count)
    base_offset = table_offset(state, table_idx)
    file_put_entry_probe(fp, base_offset, slot, slot_count, offset)

    :ok =
      PagedFile.pwrite(
        fp,
        offset,
        <<size::unsigned-size(@entry_size_size_bits), entry::binary()>>
      )

    offset + size + @entry_size_size
  end

  # recursivley retry next slot if current slot is used already
  defp file_put_entry_probe(fp, base_offset, slot, slot_count, value) do
    slot = rem(slot, slot_count)
    # There could be some analytics here for number of probings worst case etc...

    # probe batching to improve file io
    batch_size = min(32, slot_count - slot)

    {slot, probe} =
      case PagedFile.pread(fp, base_offset + slot * @slot_size, @slot_size * batch_size) do
        # eof means the file has not been extended yet, so safe to write
        :eof ->
          {slot, true}

        {:ok, data} ->
          data = data <> :binary.copy(<<0>>, @slot_size * batch_size - byte_size(data))
          # all zeros means there is no entry address stored yet
          for(<<x::unsigned-size(@slot_size_bits) <- data>>, do: x == 0)
          |> Enum.with_index()
          |> Enum.find(fn {x, _index} -> x end)
          |> case do
            nil -> {slot, false}
            {true, idx} -> {slot + idx, true}
          end
      end

    if probe do
      :ok =
        PagedFile.pwrite(
          fp,
          base_offset + slot * @slot_size,
          <<value::unsigned-size(@slot_size_bits)>>
        )
    else
      file_put_entry_probe(fp, base_offset, slot + batch_size, slot_count, value)
    end
  end

  defp file_lookup(%DetsPlus{file_entries: 0}, _key), do: []

  defp file_lookup(state = %DetsPlus{slot_counts: slot_counts}, key) do
    hash = do_hash(state, key)
    table_idx = rem(hash, 256)
    slot_count = Map.get(slot_counts, table_idx, 0)

    if bloom_lookup(state, hash) and slot_count > 0 do
      # if slot_count > 0 do
      slot = rem(div(hash, 256), slot_count)

      {ret, _n} =
        file_lookup_slot_loop(state, key, table_offset(state, table_idx), slot, slot_count)

      # if n > 2 do
      #   IO.inspect({ret, n, slot_count})
      # end

      ret
    else
      []
    end
  end

  defp batch_read(fp, point, count) do
    {:ok, data} = PagedFile.pread(fp, point, @slot_size * count)
    data = data <> :binary.copy(<<0>>, @slot_size * count - byte_size(data))
    for <<offset::unsigned-size(@slot_size_bits) <- data>>, do: offset
  end

  @batch_size 32
  defp file_lookup_slot_loop(
         state = %DetsPlus{fp: fp},
         key,
         base_offset,
         slot,
         slot_count,
         n \\ 0
       ) do
    slot = rem(slot, slot_count)
    point = base_offset + slot * @slot_size
    batch_size = min(@batch_size, slot_count - slot)
    offsets = batch_read(fp, point, batch_size)

    if hd(offsets) == 0 do
      {[], n}
    else
      # a zero is an indication of the end
      offsets = Enum.take_while(offsets, fn x -> x != 0 end)
      len = length(offsets)

      {:ok, sizes} = PagedFile.pread(fp, Enum.zip(offsets, List.duplicate(@entry_size_size, len)))
      sizes = Enum.map(sizes, fn <<size::unsigned-size(@entry_size_size_bits)>> -> size end)

      offsets = Enum.map(offsets, fn offset -> offset + @entry_size_size end)
      {:ok, entries} = PagedFile.pread(fp, Enum.zip(offsets, sizes))

      Enum.find_value(entries, fn entry ->
        entry = :erlang.binary_to_term(entry)

        if do_key(state, entry) == key do
          entry
        end
      end)
      |> case do
        nil ->
          if len < batch_size do
            {[], n}
          else
            file_lookup_slot_loop(state, key, base_offset, slot + batch_size, slot_count, n + 1)
          end

        entry ->
          {[entry], n}
      end
    end
  end

  defp do_key(%DetsPlus{keypos: keypos}, tuple) do
    elem(tuple, keypos - 1)
  end

  defp do_hash(%DetsPlus{}, key) do
    <<slot::unsigned-size(@hash_size_bits)>> = :crypto.hash(:sha256, :erlang.term_to_binary(key))

    slot
  end

  defp do_string(atom) when is_atom(atom), do: Atom.to_string(atom)
  defp do_string(list) when is_list(list), do: List.to_string(list)
  defp do_string(string) when is_binary(string), do: string

  defp file_open(filename) do
    # opts =
    #   if String.ends_with?(filename, ".gz") do
    #     [:read, :read_ahead, :binary, :compressed]
    #   else
    #     [:read, :read_ahead, :binary]
    #   end
    opts = [page_size: 1_000_000, max_pages: 1000]
    {:ok, fp} = PagedFile.open(filename, opts)
    fp
  end
end