lib/ex_wal.ex

defmodule ExWal do
  @moduledoc "README.md"
             |> File.read!()
             |> String.split("<!-- MDOC !-->")
             |> Enum.fetch!(1)

  use GenServer

  alias ExWal.LRU
  alias ExWal.Models.Block
  alias ExWal.Models.Entry
  alias ExWal.Models.Segment
  alias ExWal.Store
  alias ExWal.Uvarint

  @external_resource "README.md"
  @wal_option_schema [
    path: [
      type: :string,
      doc: "The path of the WAL. Default is `./wal`.",
      default: "./wal"
    ],
    nosync: [
      type: :boolean,
      doc: "If true, WAL will not sync to disk. Default is false.",
      default: false
    ],
    segment_size: [
      type: :integer,
      doc: "The size of each segment file. Default is 16MB.",
      default: 16 * 1024 * 1024
    ],
    segment_cache_size: [
      type: :integer,
      doc: "The size of the segment cache. Default is 100.",
      default: 2
    ],
    store: [
      type: :atom,
      doc: "The store module which implement `ExWal.Store`. Default is `ExWal.Store.File`.",
      default: Store.File
    ],
    store_opts: [
      type: :keyword_list,
      doc: "The options for the store module.",
      default: []
    ],
    dir_permission: [
      type: :integer,
      doc: "The permission of the directory. Default is 0o750.",
      default: 0o750
    ],
    file_permission: [
      type: :integer,
      doc: "The permission of the file. Default is 0o640.",
      default: 0o640
    ],
    name: [
      type: :atom,
      doc: "The name of the WAL. Default is `wal`.",
      default: __MODULE__
    ]
  ]

  defstruct data_path: "",
            hot: %Segment{},
            cold: :array.new(),
            first_index: 0,
            last_index: 0,
            lru_cache: nil,
            store: nil,
            tail_store_handler: nil,
            opts: [
              nosync: false,
              segment_size: 64 * 1024 * 1024,
              dir_permission: 0o750,
              file_permission: 0o640
            ]

  @type t :: %__MODULE__{
          data_path: String.t(),
          hot: Segment.t(),
          cold: :array.array(),
          first_index: non_neg_integer(),
          last_index: non_neg_integer(),
          lru_cache: atom(),
          store: Store.t(),
          tail_store_handler: Store.handler(),
          opts: [
            nosync: boolean(),
            segment_size: non_neg_integer(),
            dir_permission: non_neg_integer(),
            file_permission: non_neg_integer()
          ]
        }

  @type index :: non_neg_integer()

  @type wal_option_schema_t :: [unquote(NimbleOptions.option_typespec(@wal_option_schema))]

  @doc """
  Start a WAL process

  ## Options
  #{NimbleOptions.docs(@wal_option_schema)}
  """
  @spec start_link(wal_option_schema_t()) :: GenServer.on_start()
  def start_link(opts) do
    opts = NimbleOptions.validate!(opts, @wal_option_schema)
    GenServer.start_link(__MODULE__, opts, name: opts[:name])
  end

  @doc """
  Stop a WAL process
  """
  @spec stop(atom() | pid()) :: :ok
  def stop(name_or_pid) do
    GenServer.stop(name_or_pid)
  end

  @doc """
  Write entries to WAL, the entries must be strictly consecutive and incremental,
  and the index of the first entry must be WAL's last_index + 1.

  ## Examples

      iex> last_index = ExWal.last_index(:wal_name)
      iex> enties = last_index..(last_index+10) |> Enum.map(fn i -> Entry.new(i, "some data") end)
      iex> :ok = ExWal.write(:wal_name, entries)
  """
  @spec write(atom() | pid(), [Entry.t()], non_neg_integer()) :: :ok
  def write(name_or_pid, entries, timeout \\ 5000) do
    GenServer.call(name_or_pid, {:write, entries}, timeout)
  end

  @doc """
  Read entry content from WAL by index. if index is not found, return {:error, :index_not_found}

  ## Examples

      iex> {:ok, data} = ExWal.read(:wal_name, 1)
  """
  @spec read(atom() | pid(), index()) :: {:ok, binary()} | {:error, :index_not_found}
  def read(name_or_pid, index, timeout \\ 5000) do
    GenServer.call(name_or_pid, {:read, index}, timeout)
  end

  @doc """
  Get the last index of WAL
  """
  @spec last_index(atom() | pid()) :: index()
  def last_index(name_or_pid) do
    GenServer.call(name_or_pid, :last_index)
  end

  @doc """
  Get the first index of WAL
  """
  @spec first_index(atom() | pid()) :: index()
  def first_index(name_or_pid) do
    GenServer.call(name_or_pid, :first_index)
  end

  @doc """
  Get the segment count of WAL
  """
  @spec segment_count(atom() | pid()) :: non_neg_integer()
  def segment_count(name_or_pid) do
    GenServer.call(name_or_pid, :segment_count)
  end

  @doc """
  Truncates the write-ahead log (WAL) after a specific index.

  ## Examples

      iex> MyApp.WAL.truncate_after(:my_wal, 10)
      :ok

  ## Parameters

    * `name_or_pid` - The name or process identifier of the WAL GenServer.
    * `index` - The index after which to truncate the WAL.
    * `timeout` (optional) - The timeout value in milliseconds (default: 5000).

  ## Returns

  - `:ok` - If the truncation is successful.
  - `{:error, :index_out_of_range}` - If the provided index is out of range.

  """
  @spec truncate_after(atom() | pid(), index(), non_neg_integer()) :: :ok | {:error, :index_out_of_range}
  def truncate_after(name_or_pid, index, timeout \\ 5000) do
    GenServer.call(name_or_pid, {:truncate_after, index}, timeout)
  end

  @doc """
  Truncates the write-ahead log (WAL) before a specific index.

  ## Examples

      iex> MyApp.WAL.truncate_before(:my_wal, 10)
      :ok

  ## Parameters

    * `name_or_pid` - The name or process identifier of the WAL GenServer.
    * `index` - The index before which to truncate the WAL.
    * `timeout` (optional) - The timeout value in milliseconds (default: 5000).

  ## Returns

  - `:ok` - If the truncation is successful.
  - `{:error, :index_out_of_range}` - If the provided index is out of range.

  """
  @spec truncate_before(atom() | pid(), index(), non_neg_integer()) :: :ok | {:error, :index_out_of_range}
  def truncate_before(name_or_pid, index, timeout \\ 5000) do
    GenServer.call(name_or_pid, {:truncate_before, index}, timeout)
  end

  @doc """
  Clears the write-ahead log (WAL) by removing all entries.

  ## Examples

      iex> MyApp.WAL.clear(:my_wal)
      :ok

  ## Parameters

    * `name_or_pid` - The name or process identifier of the WAL GenServer.

  ## Returns

  - `:ok` - If the clearing is successful.

  """
  @spec clear(atom() | pid()) :: :ok
  def clear(name_or_pid) do
    GenServer.cast(name_or_pid, :clear)
  end

  # ----------------- Server  -----------------

  @impl GenServer
  def init(opts) do
    lru_name = :"#{opts[:name]}_lru"
    {:ok, _} = LRU.start_link(lru_name, opts[:segment_cache_size])

    store = apply(opts[:store], :new, [opts[:store_opts]])

    path = Path.absname(opts[:path])
    Store.mkdir(store, path, opts[:dir_permission])

    segments =
      path
      |> Path.join("*")
      |> Path.wildcard()
      |> Enum.reject(fn x -> File.dir?(x) end)
      |> Enum.map(&Path.basename(&1))
      |> Enum.sort(:desc)
      |> Enum.reject(&(String.length(&1) < 20))
      |> Enum.map(fn x ->
        %Segment{
          path: Path.join(path, x),
          index: parse_segment_filename(x)
        }
      end)

    if Enum.empty?(segments) do
      seg1_path = Path.join(path, segment_filename(1))

      {:ok, h} =
        Store.open(
          store,
          seg1_path,
          [:read, :append],
          opts[:file_permission]
        )

      {:ok,
       %__MODULE__{
         data_path: path,
         hot: %Segment{path: seg1_path, index: 1},
         cold: :array.new(),
         first_index: 1,
         last_index: 0,
         lru_cache: lru_name,
         store: store,
         tail_store_handler: h,
         opts: [
           nosync: opts[:nosync],
           segment_size: opts[:segment_size],
           dir_permission: opts[:dir_permission],
           file_permission: opts[:file_permission]
         ]
       }}
    else
      # segment is in reverse order
      first_index = List.last(segments).index

      [%Segment{path: spath} = seg | t] = segments
      {:ok, h} = Store.open(store, spath, [:read, :append], opts[:file_permission])

      {:ok,
       %__MODULE__{
         data_path: path,
         hot: seg,
         cold: :array.from_list(t),
         first_index: first_index,
         lru_cache: lru_name,
         store: store,
         tail_store_handler: h,
         opts: [
           nosync: opts[:nosync],
           segment_size: opts[:segment_size],
           dir_permission: opts[:dir_permission],
           file_permission: opts[:file_permission]
         ]
       }, {:continue, :load_segment}}
    end
  end

  @impl GenServer
  def terminate(_reason, %__MODULE__{lru_cache: lru, store: store, tail_store_handler: h}) do
    LRU.stop(lru)
    Store.sync(store, h)
    Store.close(store, h)
  end

  @impl GenServer
  def handle_continue(:load_segment, %__MODULE__{hot: seg, store: store} = state) do
    %Segment{block_count: bc, index: begin_index} =
      seg =
      load_segment(seg, store)

    {:noreply, %__MODULE__{state | hot: seg, last_index: begin_index + bc - 1}}
  end

  @impl GenServer
  def handle_call(:last_index, _, %__MODULE__{last_index: last_index} = state), do: {:reply, last_index, state}

  def handle_call(:first_index, _, %__MODULE__{first_index: first_index} = state), do: {:reply, first_index, state}

  def handle_call(:segment_count, _, %__MODULE__{cold: cold} = state) do
    {:reply, :array.size(cold) + 1, state}
  end

  def handle_call(
        {:write, entries},
        _from,
        %__MODULE__{last_index: last_index, hot: %Segment{buf: buf0}, opts: opts} = state
      ) do
    # check
    entries
    |> Enum.with_index(last_index + 1)
    |> Enum.find(fn {%Entry{index: index}, i} -> index != i end)
    |> is_nil() || raise ArgumentError, "invalid index"

    # cycle if needed
    state =
      if byte_size(buf0) > opts[:segment_size] do
        cycle(state)
      else
        state
      end

    {:reply, :ok, write_entries(entries, state)}
  end

  def handle_call({:read, index}, _from, %__MODULE__{first_index: first_index, last_index: last_index} = state)
      when index < first_index or index > last_index do
    {:reply, {:error, :index_not_found}, state}
  end

  def handle_call({:read, index}, _from, %__MODULE__{} = state) do
    %Segment{index: begin_index, blocks: blocks, buf: buf} = find_segment(index, state)

    %Block{offset: offset, size: size} = :array.get(index - begin_index, blocks)

    # ExWal.Debug.stacktrace(%{
    #   target_index: index,
    #   begin_index: begin_index,
    #   block_offset: offset,
    #   block_size: size,
    #   buf_size: byte_size(buf)
    # })

    data = binary_part(buf, offset, size)
    {data_size, _, data} = Uvarint.decode(data)
    # ExWal.Debug.debug(data)
    byte_size(data) == data_size || raise ArgumentError, "invalid data"

    {:reply, {:ok, data}, state}
  end

  def handle_call({:truncate_after, 0}, _from, state), do: {:reply, {:error, :index_out_of_range}, state}

  def handle_call({:truncate_after, index}, _from, %__MODULE__{first_index: first_index, last_index: last_index} = state)
      when index < first_index or index > last_index do
    {:reply, {:error, :index_out_of_range}, state}
  end

  def handle_call({:truncate_after, index}, _from, %__MODULE__{last_index: last_index} = state) when index == last_index,
    do: {:reply, :ok, state}

  def handle_call({:truncate_after, index}, _from, state) do
    {:reply, :ok, __truncate_after(index, state)}
  end

  def handle_call({:truncate_before, 0}, _from, state), do: {:reply, {:error, :index_out_of_range}, state}

  def handle_call({:truncate_before, _}, _from, %__MODULE__{last_index: 0} = state),
    do: {:reply, {:error, :index_out_of_range}, state}

  def handle_call({:truncate_before, index}, _from, %__MODULE__{last_index: last_index, first_index: first_index} = state)
      when index < first_index or index > last_index do
    {:reply, {:error, :index_out_of_range}, state}
  end

  def handle_call({:truncate_before, index}, _from, %__MODULE__{first_index: first_index} = state)
      when index == first_index,
      do: {:reply, :ok, state}

  def handle_call({:truncate_before, index}, _from, state) do
    {:reply, :ok, __truncate_before(index, state)}
  end

  @impl GenServer
  def handle_cast(
        :clear,
        %__MODULE__{
          data_path: data_path,
          lru_cache: lru,
          hot: %Segment{path: path},
          store: store,
          tail_store_handler: h,
          cold: cold,
          opts: opts
        } = state
      ) do
    # rm cache
    LRU.clear(lru)

    # rm hot
    :ok = Store.close(store, h)
    :ok = Store.rm(store, path)

    # rm cold
    if :array.size(cold) > 0 do
      Enum.each(0..(:array.size(cold) - 1), fn i ->
        %Segment{path: path} = :array.get(i, cold)
        :ok = Store.rm(store, path)
      end)
    end

    # reinit
    seg1_path = Path.join(data_path, segment_filename(1))

    {:ok, h} =
      Store.open(
        store,
        seg1_path,
        [:read, :append],
        opts[:file_permission]
      )

    {:noreply,
     %__MODULE__{
       state
       | hot: %Segment{path: seg1_path, index: 1},
         cold: :array.new(),
         first_index: 1,
         last_index: 0,
         tail_store_handler: h
     }}
  end

  # ----------------- Private -----------------

  @spec load_segment(Segment.t(), Store.t()) :: Segment.t()
  #  index: begin_index,
  defp load_segment(%Segment{path: path} = seg, store) do
    {:ok, content} = Store.read_all(store, path)

    {bc, blocks} = parse_blocks(content, 0, 0, [])
    %Segment{seg | buf: content, blocks: :array.from_list(blocks), block_count: bc}
  end

  @spec parse_segment_filename(String.t()) :: index()
  defp parse_segment_filename(filename) do
    filename
    |> String.trim_leading("0")
    |> String.to_integer()
  end

  @spec segment_filename(index()) :: String.t()
  defp segment_filename(index) do
    index
    |> Integer.to_string()
    |> String.pad_leading(20, "0")
  end

  @spec parse_blocks(
          data :: binary(),
          since :: non_neg_integer(),
          block_count :: non_neg_integer(),
          blocks :: [Block.t()]
        ) ::
          {block_count :: non_neg_integer(), blocks :: [Block.t()]}
  defp parse_blocks("", _, bc, blocks), do: {bc, Enum.reverse(blocks)}

  defp parse_blocks(data, since, bc, acc) do
    {size, bytes_read, data} = Uvarint.decode(data)

    # check
    size == 0 && raise ArgumentError, "invalid block size"

    next = since + size + bytes_read
    <<_::bytes-size(size), rest::binary>> = data

    parse_blocks(rest, next, bc + 1, [
      %Block{offset: since, size: size + bytes_read} | acc
    ])
  end

  @spec append_entry(Segment.t(), Entry.t()) :: {binary(), Segment.t()}
  defp append_entry(%Segment{buf: buf, index: begin_index, block_count: bc, blocks: blocks} = seg, %Entry{
         index: index,
         data: data
       }) do
    index == begin_index + bc ||
      raise ArgumentError,
            "invalid index, begin_index: #{begin_index}, bc: #{bc}, index: #{index}"

    data = Uvarint.encode(byte_size(data)) <> data

    {data,
     %Segment{
       seg
       | buf: <<buf::binary, data::binary>>,
         block_count: bc + 1,
         blocks: :array.set(bc, %Block{offset: byte_size(buf), size: byte_size(data)}, blocks)
     }}
  end

  @spec write_entries([Entry.t()], t()) :: t()
  defp write_entries([], m), do: m

  defp write_entries([entry | t], %__MODULE__{hot: seg, opts: opts, store: store, tail_store_handler: h} = m) do
    {data, %Segment{buf: buf, index: begin_index, block_count: bc} = seg} =
      append_entry(seg, entry)

    :ok = Store.append(store, h, data)

    if opts[:nosync] do
      :ok = Store.sync(store, h)
    end

    m = %__MODULE__{m | hot: seg, last_index: begin_index + bc - 1}

    if byte_size(buf) < opts[:segment_size] do
      write_entries(t, m)
    else
      m = cycle(m)
      write_entries(t, m)
    end
  end

  # Cycle the old segment for a new segment.
  @spec cycle(t()) :: t()
  defp cycle(
         %__MODULE__{
           data_path: data_path,
           store: store,
           tail_store_handler: h,
           lru_cache: lru,
           last_index: last_index,
           hot: %Segment{path: path} = seg,
           cold: cold,
           opts: opts
         } = m
       ) do
    :ok = Store.sync(store, h)
    :ok = Store.close(store, h)
    :ok = LRU.put(lru, path, seg)

    new_seg = %Segment{
      path: Path.join(data_path, segment_filename(last_index + 1)),
      index: last_index + 1
    }

    {:ok, h} = Store.open(store, new_seg.path, [:read, :append], opts[:file_permission])

    size = :array.size(cold)

    %__MODULE__{
      m
      | tail_store_handler: h,
        hot: new_seg,
        cold: :array.set(size, %Segment{seg | blocks: nil, buf: ""}, cold),
        last_index: last_index
    }
  end

  @spec find_segment(index :: index(), t()) :: Segment.t()
  defp find_segment(index, %__MODULE__{hot: %Segment{index: last_begin_index} = seg} = state) do
    if index >= last_begin_index do
      seg
    else
      find_cold_segment(index, state)
    end
  end

  @spec find_cold_segment(index(), t()) :: Segment.t()
  defp find_cold_segment(index, %__MODULE__{lru_cache: lru, cold: cold, store: store}) do
    lru
    |> LRU.select(fn %Segment{index: x, block_count: bc} ->
      index >= x and index < x + bc
    end)
    |> case do
      nil ->
        cold
        |> bin_search(index)
        |> :array.get(cold)
        |> load_segment(store)
        |> tap(fn %Segment{path: path} = x -> LRU.put(lru, path, x) end)

      seg ->
        seg
    end
  end

  @spec bin_search(:array.array(), index()) :: non_neg_integer() | nil
  defp bin_search(cold, target_index), do: do_search(cold, target_index, 0, :array.size(cold) - 1)

  defp do_search(_cold, _target_index, min, max) when min > max, do: nil

  defp do_search(cold, target_index, min, max) when min == max do
    %Segment{index: index, block_count: bc} = :array.get(min, cold)

    if target_index >= index and target_index < index + bc do
      min
    end
  end

  defp do_search(cold, target_index, min, max) do
    mid = div(min + max, 2)
    %Segment{index: index, block_count: bc} = :array.get(mid, cold)

    cond do
      target_index < index ->
        do_search(cold, target_index, min, mid)

      target_index >= index and target_index < index + bc ->
        mid

      true ->
        do_search(cold, target_index, mid + 1, max)
    end
  end

  @spec __truncate_after(index(), t()) :: t()
  defp __truncate_after(
         index,
         %__MODULE__{
           hot: %Segment{index: begin_index, blocks: blocks, buf: buf, path: path} = seg,
           data_path: data_path,
           store: store,
           tail_store_handler: h
         } = state
       )
       when index >= begin_index do
    # truncate buf and blocks
    truncate_idx = index - begin_index + 1

    new_seg =
      if truncate_idx < :array.size(blocks) do
        %Block{offset: offset} = :array.get(truncate_idx, blocks)
        buf = binary_part(buf, 0, offset)
        blocks = :array.resize(truncate_idx, blocks)

        %Segment{
          seg
          | buf: buf,
            block_count: truncate_idx - 1,
            blocks: blocks
        }
      else
        seg
      end

    # truncate file
    temp_file = Path.join(data_path, "tmp")
    :ok = Store.write_all(store, temp_file, buf)

    # swap the tmp file with the old segment file
    :ok = Store.close(store, h)
    :ok = Store.rename(store, temp_file, path)

    # reopen tail handler
    {:ok, h} = Store.open(store, path, [:read, :append], state.opts[:file_permission])

    # no need to gc blocks, reset block_count is all we need
    %__MODULE__{
      state
      | hot: new_seg,
        last_index: index,
        tail_store_handler: h
    }
  end

  defp __truncate_after(
         index,
         %__MODULE__{
           cold: cold,
           store: store,
           data_path: data_path,
           tail_store_handler: h,
           lru_cache: lru,
           hot: %Segment{path: hot_path}
         } = state
       ) do
    # find segment by index
    idx = bin_search(cold, index)

    %Segment{buf: buf, blocks: blocks, index: begin_index, path: path} =
      seg =
      idx
      |> :array.get(cold)
      |> load_segment(store)

    # truncate buf and blocks
    truncate_idx = index - begin_index + 1

    new_seg =
      if truncate_idx < :array.size(blocks) do
        %Block{offset: offset} = :array.get(truncate_idx, blocks)
        buf = binary_part(buf, 0, offset)
        blocks = :array.resize(truncate_idx, blocks)

        %Segment{
          seg
          | buf: buf,
            block_count: truncate_idx - 1,
            blocks: blocks
        }
      else
        seg
      end

    # create tmp file to store new buf
    temp_file = Path.join(data_path, "tmp")
    :ok = Store.write_all(store, temp_file, buf)

    # swap the tmp file with the old segment file
    :ok = Store.rename(store, temp_file, path)

    # cleanup
    # all segments and cache after the new segment should be deleted
    :ok = LRU.clear(lru)

    if idx + 1 < :array.size(cold) do
      Enum.each((idx + 1)..(:array.size(cold) - 1), fn i ->
        %Segment{path: path} = :array.get(i, cold)
        :ok = Store.rm(store, path)
      end)
    end

    :ok = Store.close(store, h)
    :ok = Store.rm(store, hot_path)

    # reopen tail handler
    {:ok, h} = Store.open(store, path, [:read, :append], state.opts[:file_permission])

    %__MODULE__{
      state
      | hot: new_seg,
        cold: :array.resize(idx, cold),
        tail_store_handler: h,
        last_index: index
    }
  end

  @spec __truncate_before(index(), t()) :: t()
  defp __truncate_before(
         index,
         %__MODULE__{
           hot: %Segment{index: begin_index, blocks: blocks, buf: buf, path: path},
           cold: cold,
           store: store,
           data_path: data_path,
           tail_store_handler: h,
           lru_cache: lru
         } = state
       )
       when index >= begin_index do
    %Block{offset: offset} = :array.get(index - begin_index, blocks)

    new_buf = binary_part(buf, offset, byte_size(buf) - offset)
    {bc, blocks} = parse_blocks(new_buf, 0, 0, [])

    # remake last segment
    new_seg = %Segment{
      index: index,
      buf: new_buf,
      path: Path.join(data_path, segment_filename(index)),
      block_count: bc,
      blocks: :array.from_list(blocks)
    }

    # rewrite file
    # make a tmp file to store new buf
    temp_file = Path.join(data_path, "tmp")
    :ok = Store.write_all(store, temp_file, new_seg.buf)

    # swap the tmp file with the old segment file
    :ok = Store.close(store, h)
    :ok = Store.rename(store, temp_file, new_seg.path)
    :ok = Store.rm(store, path)

    # delete all segment files after the new segment
    cold_size = :array.size(cold)

    if cold_size > 0 do
      # Task.async_stream(0..(cold_size - 1), fn i ->
      #   %Segment{path: path} = :array.get(i, cold)
      #   :ok = Store.rm(store, path)
      #   :ok = LRU.delete(lru, path)
      # end)
      # |> Stream.run()
      Enum.each(0..(cold_size - 1), fn i ->
        %Segment{path: path} = :array.get(i, cold)
        :ok = Store.rm(store, path)
        :ok = LRU.delete(lru, path)
      end)
    end

    # reopen tail handler
    {:ok, h} = Store.open(store, new_seg.path, [:read, :append], state.opts[:file_permission])

    %__MODULE__{state | hot: new_seg, first_index: index, cold: :array.new(), tail_store_handler: h}
  end

  defp __truncate_before(
         index,
         %__MODULE__{hot: %Segment{index: begin_index}, cold: cold, store: store, data_path: data_path, lru_cache: lru} =
           state
       )
       when index < begin_index do
    idx = bin_search(cold, index)

    %Segment{buf: buf, blocks: blocks, index: begin_index} =
      idx
      |> :array.get(cold)
      |> load_segment(store)

    %Block{offset: offset} = :array.get(index - begin_index, blocks)

    # remake segment

    new_seg = %Segment{
      index: index,
      buf: binary_part(buf, offset, byte_size(buf) - offset),
      path: Path.join(data_path, segment_filename(index)),
      block_count: :array.size(blocks) - (index - begin_index)
    }

    # clear cache
    LRU.clear(lru)

    # delete all segment files and cache before the new segment
    Enum.each(0..idx, fn i ->
      %Segment{path: path} = :array.get(i, cold)
      :ok = Store.rm(store, path)
    end)

    # rewrite file
    # make a tmp file to store new buf
    temp_file = Path.join(data_path, "tmp")
    :ok = Store.write_all(store, temp_file, new_seg.buf)

    # swap the tmp file with the old segment file
    :ok = Store.rename(store, temp_file, new_seg.path)

    cold_size = :array.size(cold)

    new_cold =
      cold |> array_slice(idx..(cold_size - 1)) |> then(fn x -> :array.set(0, %Segment{new_seg | buf: ""}, x) end)

    %__MODULE__{state | first_index: index, cold: new_cold}
  end

  @spec array_slice(:array.array(), Range.t()) :: :array.array()
  defp array_slice(array, range) do
    for_result =
      for i <- range do
        :array.get(i, array)
      end

    :array.from_list(for_result)
  end
end