lib/paged_file.ex

defmodule PagedFile do
  @moduledoc """
  [PagedFile](https://github.com/dominicletz/paged_file) provides fast
  reads and writes by using multiple buffers of `page_size`. This makes
  many `:pread`/`:pwrite` calls faster. Especially useful for read-modify-write
  use cases.

  # Example

  ```
  {:ok, fp} = __MODULE__.open("test_file")
  :ok = __MODULE__.pwrite(fp, 10, "hello")
  {:ok, "hello"} = __MODULE__.pread(fp, 10, 5)
  :ok = __MODULE__.close(fp)
  ```

  """
  use GenServer
  defstruct [:fp, :filename, :page_size, :max_pages, :pages, :pq, :dirty_pages, :file_size]

  @doc """

  Opens file File. Files are always opened for `:read` and `:write` and in `:binary` mode.
  The underlying usage of pages and memory can be controlled with the following options:any()

  - `page_size` - The default page size of loading disk data into memory and writing it back again.
  - `max_pages` - The maximum number of pages that should be kept in memory.

  """
  @spec open(binary | list(), keyword) :: {:ok, pid}
  def open(filename, args \\ []) do
    # default 0.5mb page size with 250 pages max (up to 125mb)
    page_size = Keyword.get(args, :page_size, 512_000)
    max_pages = Keyword.get(args, :max_pages, 250)

    file_size =
      case File.stat(filename) do
        {:ok, %File.Stat{size: size}} -> size
        _ -> 0
      end

    state = %__MODULE__{
      filename: filename,
      page_size: page_size,
      max_pages: max_pages,
      file_size: file_size,
      pages: %{},
      pq: :queue.new(),
      dirty_pages: MapSet.new()
    }

    GenServer.start_link(__MODULE__, state, hibernate_after: 5_000)
  end

  @doc """
  Performs a sequence of `pread/3` in one operation, which is more efficient than
  calling them one at a time. Returns `{ok, [Data, ...]}`,
  where each Data, the result of the corresponding pread, is a binary or `:eof`
  if the requested position is beyond end of file.
  """
  @spec pread(atom | pid, [{integer(), integer()}]) :: {:ok, [binary() | :eof]}
  def pread(_pid, []) do
    {:ok, []}
  end

  def pread(pid, locnums) do
    {:ok, call(pid, {:pread, locnums})}
  end

  @spec pread(atom | pid, integer(), integer()) :: {:ok, binary()} | :eof
  @doc """
  Executes are of `num` bytes at the position `loc`.
  """
  def pread(pid, loc, num) when loc >= 0 and num >= 0 do
    case call(pid, {:pread, [{loc, num}]}) do
      [bin] when is_binary(bin) -> {:ok, bin}
      [:eof] -> :eof
      [error] when is_atom(error) -> {:error, error}
    end
  end

  @doc """
  Performs a sequence of `pwrite/3` in one operation, which is more efficient
  than calling them one at a time. Returns `:ok`.
  """
  @spec pwrite(atom | pid, [{integer(), binary()}]) :: :ok
  def pwrite(_pid, []) do
    :ok
  end

  def pwrite(pid, locnums) do
    send(pid, {:pwrite, locnums})
    :ok
  end

  @doc """
  Writes `data` to the position `loc` in the file. This is call is executed
  asynchrounosly and the file size is extended if needed to complete this call.
  """
  @spec pwrite(atom | pid, integer(), binary()) :: :ok
  def pwrite(pid, loc, data) when loc >= 0, do: pwrite(pid, [{loc, data}])

  @doc """
  Ensures that any all pages that have changes are written to disk.
  """
  @spec sync(atom | pid) :: :ok
  def sync(pid) do
    call(pid, :sync)
  end

  @doc """
  Returns the current file size
  """
  @spec size(atom | pid) :: non_neg_integer()
  def size(pid) do
    call(pid, :size)
  end

  @doc false
  def info(pid) do
    call(pid, :info)
  end

  @doc """
  Writes all pending changes to disk and closes the file.
  """
  @spec close(atom | pid) :: :ok
  def close(pid) do
    sync(pid)
    GenServer.stop(pid)
  end

  @spec delete(atom | binary | [atom | list | char]) :: :ok | {:error, atom}
  @doc """
  Deletes the given file. Same as `:file.delete(filename)`
  """
  def delete(filename) do
    :file.delete(filename)
  end

  defp call(pid, cmd) do
    GenServer.call(pid, cmd, :infinity)
  end

  @impl true
  @doc false
  def init(state = %__MODULE__{filename: filename}) do
    with {:ok, fp} <- :file.open(filename, [:raw, :read, :write, :binary]) do
      {:ok, %__MODULE__{state | fp: fp}}
    end
  end

  @impl true
  def handle_call(:size, _from, state = %__MODULE__{file_size: file_size}) do
    {:reply, file_size, state}
  end

  def handle_call(:sync, _from, state = %__MODULE__{}) do
    {:reply, :ok, sync_dirty_pages(state)}
  end

  def handle_call(:info, _from, state = %__MODULE__{}) do
    {:reply, state, state}
  end

  def handle_call({:pread, locnums}, _from, state = %__MODULE__{}) do
    {rets, state} =
      Enum.reduce(locnums, {[], state}, fn {loc, num}, {rets, state} ->
        {ret, state} = do_read(state, loc, num)
        {rets ++ [ret], state}
      end)

    {:reply, rets, state}
  end

  def handle_call({:pwrite, locnums}, _from, state = %__MODULE__{}) do
    {rets, state} =
      Enum.reduce(locnums, {[], state}, fn {loc, data}, {rets, state} ->
        {ret, state} = do_write(state, loc, data)
        {rets ++ [ret], state}
      end)

    {:reply, rets, state}
  end

  def handle_call(:sync, _from, state) do
    {:reply, :ok, state}
  end

  @impl true
  def handle_info({:pwrite, locnums}, state = %__MODULE__{}) do
    locnums = locnums ++ collect_pwrites()

    state =
      Enum.reduce(locnums, state, fn {loc, data}, state ->
        {_ret, state} = do_write(state, loc, data)
        state
      end)

    {:noreply, state}
  end

  defp collect_pwrites() do
    receive do
      message ->
        case message do
          {:pwrite, locnums} ->
            locnums ++ collect_pwrites()

          other ->
            send(self(), other)
            []
        end
    after
      0 -> []
    end
  end

  defp do_read(state = %__MODULE__{file_size: file_size}, loc, _num) when loc >= file_size do
    {:eof, state}
  end

  defp do_read(state = %__MODULE__{page_size: page_size, file_size: file_size}, loc, num) do
    page_idx = div(loc, page_size)
    page_start = rem(loc, page_size)

    state = %__MODULE__{pages: pages} = load_page(state, page_idx)
    num = min(file_size - loc, num)

    ram_file = Map.get(pages, page_idx)
    {:ok, data} = :file.pread(ram_file, page_start, num)

    if byte_size(data) < num do
      {rest, state} = do_read(state, (page_idx + 1) * page_size, num - byte_size(data))
      {data <> rest, state}
    else
      {data, state}
    end
  end

  defp do_write(
         state = %__MODULE__{page_size: page_size},
         loc,
         data
       ) do
    page_idx = div(loc, page_size)
    page_start = rem(loc, page_size)

    state =
      %__MODULE__{pages: pages, dirty_pages: dirty_pages, file_size: file_size} =
      load_page(state, page_idx)

    write_len = min(page_size - page_start, byte_size(data))

    ram_file = Map.get(pages, page_idx)
    :ok = :file.pwrite(ram_file, page_start, binary_part(data, 0, write_len))

    state = %__MODULE__{
      state
      | file_size: max(file_size, page_size * page_idx + page_start + write_len),
        dirty_pages: MapSet.put(dirty_pages, page_idx)
    }

    if write_len < byte_size(data) do
      do_write(
        state,
        (page_idx + 1) * page_size,
        binary_part(data, write_len, byte_size(data) - write_len)
      )
    else
      {:ok, state}
    end
  end

  defp load_page(state = %__MODULE__{pages: pages, page_size: page_size, fp: fp}, page_idx) do
    if Map.get(pages, page_idx) != nil do
      state
    else
      page =
        case :file.pread(fp, page_idx * page_size, page_size) do
          {:ok, page} -> page
          # loading pages beyond physical boundaries, because of pwrites
          # that made the file longer
          :eof -> ""
        end

      delta = page_size - byte_size(page)
      page = page <> :binary.copy(<<0>>, delta)
      {:ok, page} = :file.open(page, [:ram, :read, :write, :binary])
      state = %__MODULE__{pages: pages, pq: pq} = flush_pages(state)
      %__MODULE__{state | pages: Map.put(pages, page_idx, page), pq: :queue.in(page_idx, pq)}
    end
  end

  defp flush_pages(state = %__MODULE__{pages: pages, max_pages: max_pages}) do
    if map_size(pages) > max_pages do
      flush_page(state)
    else
      state
    end
  end

  defp sync_dirty_pages(state = %__MODULE__{dirty_pages: dirty_pages}) do
    Enum.reduce(dirty_pages, state, fn page_idx, state -> sync_page(state, page_idx) end)
  end

  defp sync_page(
         state = %__MODULE__{
           pages: pages,
           dirty_pages: dirty_pages,
           fp: fp,
           file_size: file_size,
           page_size: page_size
         },
         page_idx
       ) do
    loc = page_idx * page_size
    num = min(file_size - loc, page_size)
    {:ok, data} = :file.pread(Map.get(pages, page_idx), 0, num)
    :file.pwrite(fp, loc, data)
    dirty_pages = MapSet.delete(dirty_pages, page_idx)
    %__MODULE__{state | dirty_pages: dirty_pages}
  end

  defp flush_page(state = %__MODULE__{dirty_pages: dirty_pages, pq: pq}) do
    {:value, page_idx} = :queue.peek(pq)

    state =
      %__MODULE__{pages: pages, pq: ^pq} =
      if MapSet.member?(dirty_pages, page_idx) do
        sync_page(state, page_idx)
      else
        state
      end

    {page, pages} = Map.pop(pages, page_idx)
    {{:value, ^page_idx}, pq} = :queue.out(pq)
    :file.close(page)
    %__MODULE__{state | pages: pages, pq: pq}
  end
end