Skip to main content

lib/ex_sql/file_format.ex

defmodule ExSQL.FileFormat do
  @moduledoc """
  Reads the SQLite on-disk file format (`btree.c` / `btreeInt.h` territory)
  into an `ExSQL.Database` value.

  The reader walks the database header, the sqlite_schema B-tree on page 1,
  and each table's B-tree, decoding varint record headers and serial types.
  Schema objects are rebuilt by running their stored `CREATE ...` SQL
  through the ordinary executor, then table rows are bulk-loaded.

      {:ok, db} = ExSQL.FileFormat.read("app.db")
      ExSQL.Executor.run(db, "SELECT * FROM users")

  Writing serializes a `Database` value into a fresh, SQLite-valid file:
  rowid and WITHOUT ROWID tables, secondary and unique-auto indexes (with
  multi-level interior B-trees), AUTOINCREMENT sequences, overflow-page chains
  for rows larger than a page, and an optional WAL sidecar. It is a whole-file
  re-serialize, not in-place page editing.

  Limitations: reads UTF-8 databases only (UTF-16 is rejected with a clear
  error).
  """

  alias ExSQL.{Database, Executor, Table, Value}

  @doc """
  Writes an in-memory `ExSQL.Database` into a SQLite `.db` file.

  Returns `{:ok, path}` on success.
  """
  @spec write(Database.t(), Path.t(), keyword()) :: {:ok, String.t()} | {:error, String.t()}
  def write(db, path, opts \\ []) do
    try do
      {:ok, binary, metadata} = serialize(db)
      journal_mode = Keyword.get(opts, :journal_mode, db.journal_mode)

      with :ok <- File.write(path, binary),
           :ok <- maybe_write_wal(path, binary, metadata, journal_mode) do
        {:ok, path}
      else
        {:error, reason} ->
          {:error, "unable to write database file: #{reason}"}
      end
    rescue
      e in ExSQL.Error -> {:error, e.message}
    end
  end

  @doc "Reads a database file into an in-memory `ExSQL.Database`."
  @spec read(Path.t()) :: {:ok, Database.t()} | {:error, String.t()}
  def read(path) do
    try do
      with {:ok, data} <- read_file(path),
           {:empty, false} <- {:empty, data == <<>>},
           {:ok, file} <- parse_header(data),
           {:ok, file} <- replay_wal(path, file) do
        load(file)
      else
        {:empty, true} -> {:ok, Database.new()}
        other -> other
      end
    rescue
      e in ExSQL.Error -> {:error, e.message}
    end
  end

  # -- writing ------------------------------------------------------------------

  defp serialize(db) do
    page_size = normalize_page_size(db.page_size)

    state = %{page_size: page_size, usable: page_size, next_page: 2, pages: %{}}
    {schema_rows, state} = collect_schema_rows(db, state)
    {schema_page, state} = build_schema_page(schema_rows, state)
    state = put_page(state, 1, schema_page)

    page_count = page_count(state)
    header = build_header(db, page_size, page_count)
    binary = assemble_file(header, state, page_count)

    {:ok, binary, %{page_size: page_size, page_count: page_count}}
  end

  defp maybe_write_wal(path, binary, metadata, journal_mode) do
    if wal_mode?(journal_mode) do
      write_wal_file(path, binary, metadata)
    else
      _ = File.rm(path <> "-wal")
      _ = File.rm(path <> "-shm")
      :ok
    end
  end

  defp wal_mode?(mode) when is_binary(mode), do: String.upcase(mode) == "WAL"
  defp wal_mode?(mode) when is_atom(mode), do: mode == :wal
  defp wal_mode?(_), do: false

  defp write_wal_file(path, binary, %{page_size: page_size, page_count: page_count}) do
    wal_path = "#{path}-wal"
    wal_header = build_wal_header(page_size)
    salt1 = 0
    salt2 = 0

    pages =
      1..page_count
      |> Enum.map(fn page_no ->
        page = binary_part(binary, (page_no - 1) * page_size, page_size)
        build_wal_frame(page_no, page_count, salt1, salt2, page)
      end)

    File.write(wal_path, [wal_header | pages])
  end

  defp build_wal_header(_page_size) do
    magic = 0x377F0682
    version = 3_007_000
    wal_page_size = 0
    checkpoint_seqno = 0
    salt = 0

    <<magic::32, version::32, wal_page_size::32, checkpoint_seqno::32, salt::32, salt::32, 0::32,
      0::32>>
  end

  defp build_wal_frame(page_number, page_count, salt1, salt2, page) do
    <<page_number::32, page_count::32, salt1::32, salt2::32, 0::32, 0::32>> <> page
  end

  defp normalize_page_size(size) do
    cond do
      size == 65_536 ->
        size

      size < 512 or size > 65_536 ->
        raise(ExSQL.Error, message: "unsupported page size: #{size}")

      not power_of_two?(size) ->
        raise(ExSQL.Error, message: "page size must be a power of two: #{size}")

      true ->
        size
    end
  end

  defp power_of_two?(n), do: n > 0 and Bitwise.band(n, n - 1) == 0

  defp page_count(state), do: state.next_page - 1

  defp collect_schema_rows(db, state) do
    tables = db.tables |> Map.values() |> Enum.sort_by(&Table.key(&1.name))

    {table_rows, state} =
      Enum.map_reduce(tables, state, fn table, acc_state ->
        collect_table_schema_rows(table, acc_state)
      end)

    table_rows = Enum.concat(table_rows)

    {sequence_rows, state} =
      if sqlite_sequence_needed?(db) do
        build_sqlite_sequence_rows(db, state)
      else
        {[], state}
      end

    {table_rows ++ sequence_rows, state}
  end

  defp collect_table_schema_rows(table, state) do
    {rootpage, state} =
      if table.without_rowid do
        build_without_rowid_table_pages(table, state)
      else
        build_rowid_table_pages(table, state)
      end

    table_row = [
      "table",
      table.name,
      table.name,
      rootpage,
      emit_create_table_sql(table)
    ]

    {autoindex_rows, state} =
      table.autoindexes
      |> Enum.sort_by(&Table.key(&1.name))
      |> Enum.map_reduce(state, fn index, acc_state ->
        {rootpage, acc_state} = build_index_pages(table, index, acc_state)
        {["index", index.name, table.name, rootpage, nil], acc_state}
      end)

    {index_rows, state} =
      table.indexes
      |> Enum.sort_by(&Table.key(&1.name))
      |> Enum.map_reduce(state, fn index, acc_state ->
        {rootpage, acc_state} = build_index_pages(table, index, acc_state)

        {["index", index.name, table.name, rootpage, emit_create_index_sql(table, index)],
         acc_state}
      end)

    {[table_row | autoindex_rows ++ index_rows], state}
  end

  defp sqlite_sequence_needed?(db) do
    Enum.any?(db.tables, fn {_k, table} ->
      table.autoincrement and table.sequence_row and table.sequence > 0
    end) or db.sqlite_sequence_orphans != %{}
  end

  defp build_sqlite_sequence_rows(db, state) do
    visible_rows =
      db.tables
      |> Map.values()
      |> Enum.filter(&(&1.autoincrement and &1.sequence_row))
      |> Enum.map(&{Table.key(&1.name), [&1.name, &1.sequence]})

    orphan_rows =
      db.sqlite_sequence_orphans
      |> Enum.map(fn {key, {name, sequence}} -> {key, [name, sequence]} end)

    entries =
      (visible_rows ++ orphan_rows)
      |> Enum.sort_by(fn {key, _row} -> key end)
      |> Enum.map(fn {_key, row} -> row end)

    {rootpage, state} = build_rowid_table_rows(entries, "sqlite_sequence", state)

    {
      [
        [
          "table",
          "sqlite_sequence",
          "sqlite_sequence",
          rootpage,
          "CREATE TABLE sqlite_sequence(name,seq)"
        ]
      ],
      state
    }
  end

  defp build_schema_page(rows, state) do
    {schema_root, state} = build_rowid_table_rows(rows, :sqlite_schema, state)
    schema_page = Map.fetch!(state.pages, schema_root)
    schema_page = relocate_schema_page(schema_page, state)

    pages = Map.delete(state.pages, schema_root)

    state =
      if schema_root == page_count(state) do
        %{state | pages: pages, next_page: schema_root}
      else
        %{state | pages: pages}
      end

    {schema_page, state}
  end

  defp relocate_schema_page(schema_page, state) do
    case :binary.at(schema_page, 0) do
      page_type when page_type in [5, 13] ->
        page_size = page_size(state)
        region = page_size - 100
        header_size = page_header_size(page_type)
        ncells = u16(schema_page, 3)
        content_start = u16(schema_page, 5)

        pointers =
          if ncells == 0 do
            <<>>
          else
            0..(ncells - 1)
            |> Enum.map(fn index -> u16(schema_page, header_size + index * 2) end)
            |> Enum.map_join(&<<&1::16>>)
          end

        header = binary_part(schema_page, 0, header_size)
        pointer_end = 100 + header_size + 2 * ncells
        gap_size = content_start - pointer_end

        if gap_size < 0 do
          raise ExSQL.Error, message: "unable to relocate schema page to first page"
        end

        cell_area = binary_part(schema_page, content_start, page_size - content_start)

        body = header <> pointers <> String.duplicate(<<0>>, gap_size) <> cell_area

        if byte_size(body) > region do
          raise ExSQL.Error, message: "unable to relocate schema page to first page"
        end

        String.duplicate(<<0>>, 100) <> body <> String.duplicate(<<0>>, region - byte_size(body))

      _ ->
        schema_page
    end
  end

  defp build_rowid_table_pages(table, state) when is_map(table) do
    stored_columns =
      table.columns
      |> Enum.reject(&match?({:virtual, _}, &1.generated))

    rows =
      table
      |> Table.scan()
      |> Enum.map(fn {rowid, row} ->
        values =
          stored_columns
          |> Enum.map(fn column ->
            key = Table.key(column.name)

            if key == table.rowid_alias do
              nil
            else
              Map.get(row, key)
            end
          end)

        {rowid, values}
      end)

    build_table_pages(rows, :rowid, state)
  end

  defp build_rowid_table_rows(rows, table_name, state) when is_list(rows) do
    row_entries =
      rows
      |> Enum.with_index(1)
      |> Enum.map(fn {values, rowid} -> {rowid, values} end)

    build_table_pages(row_entries, :rowid, state, table_name: table_name)
  end

  defp build_without_rowid_table_pages(table, state) do
    stored_columns = without_rowid_storage_columns(table)

    rows =
      table
      |> Table.scan()
      |> Enum.map(fn {_rowid, row} ->
        Enum.map(stored_columns, &Map.get(row, Table.key(&1.name)))
      end)

    build_index_btree_pages(rows, state, "WITHOUT ROWID table")
  end

  defp build_index_pages(%{without_rowid: true}, _index, state), do: {0, state}

  defp build_index_pages(table, index, state) do
    rows =
      table
      |> serializable_index_entries(index)
      |> Enum.flat_map(fn {values, rowids} ->
        values = Tuple.to_list(values)
        Enum.map(rowids, fn rowid -> values ++ [rowid] end)
      end)
      |> Enum.sort()

    build_index_btree_pages(rows, state, "index")
  end

  defp serializable_index_entries(table, %{where: nil} = index) do
    members = emit_index_members(index)

    if Enum.all?(members, &match?({:column, _}, &1)) do
      table
      |> Table.scan()
      |> Enum.reduce(%{}, fn {rowid, row}, entries ->
        values =
          Enum.map(members, fn {:column, key} ->
            Map.get(row, Table.key(key))
          end)

        Map.update(entries, List.to_tuple(values), [rowid], fn rowids -> [rowid | rowids] end)
      end)
      |> Map.new(fn {values, rowids} -> {values, Enum.sort(rowids)} end)
    else
      Map.get(index, :entries, %{})
    end
  end

  defp serializable_index_entries(_table, index), do: Map.get(index, :entries, %{})

  defp build_table_pages(entries, :rowid, state, _opts) do
    {leaf_pages, state} = pack_rowid_leaf_pages(entries, state)

    case leaf_pages do
      [] ->
        page_no = state.next_page
        state = allocate_page(state)
        page = build_leaf_page(page_no, :rowid, [], state)
        {page_no, put_page(state, page_no, page)}

      [{page_no, _max}] ->
        {page_no, state}

      _ ->
        build_rowid_interior_tree(leaf_pages, state)
    end
  end

  defp build_index_btree_pages([], state, _context) do
    page_no = state.next_page
    state = allocate_page(state)
    page = build_leaf_page(page_no, :index, [], state)
    {page_no, put_page(state, page_no, page)}
  end

  defp build_index_btree_pages(entries, state, _context) do
    {leaf_nodes, state} = pack_index_leaf_pages(entries, state, [])
    build_index_interior_tree(leaf_nodes, state)
  end

  # Pack index records into leaf pages. SQLite's index B-tree is a true B-tree:
  # the record that overflows a leaf is *promoted* — held back as the divider in
  # the parent interior page — rather than stored in a leaf. `pack` returns
  # `{leaf_page, divider_payload}` nodes (the last node's divider is `nil`).
  defp pack_index_leaf_pages([], state, acc), do: {Enum.reverse(acc), state}

  defp pack_index_leaf_pages(entries, state, acc) do
    page_no = state.next_page
    state = allocate_page(state)
    capacity = page_region_size(state, page_no)
    {cells, divider, remaining} = collect_index_leaf_cells(entries, capacity, 0, 0, [])
    page = build_leaf_page(page_no, :index, cells, state)
    state = put_page(state, page_no, page)
    pack_index_leaf_pages(remaining, state, [{page_no, divider} | acc])
  end

  defp collect_index_leaf_cells([], _capacity, _count, _used, acc),
    do: {Enum.reverse(acc), nil, []}

  defp collect_index_leaf_cells([entry | rest], capacity, count, used, acc) do
    payload = encode_record_payload(entry)
    cell = encode_varint(byte_size(payload)) <> payload
    needed = page_header_size(10) + 2 * (count + 1) + used + byte_size(cell)

    if needed <= capacity do
      collect_index_leaf_cells(rest, capacity, count + 1, used + byte_size(cell), [cell | acc])
    else
      if count == 0 do
        raise ExSQL.Error, message: "index entry too large for a single page"
      end

      {Enum.reverse(acc), payload, rest}
    end
  end

  defp build_index_interior_tree([{page_no, _divider}], state), do: {page_no, state}

  defp build_index_interior_tree(nodes, state) do
    {parents, state} = pack_index_interior_pages(nodes, state, [])
    build_index_interior_tree(parents, state)
  end

  defp pack_index_interior_pages([], state, acc), do: {Enum.reverse(acc), state}

  defp pack_index_interior_pages(nodes, state, acc) do
    page_no = state.next_page
    state = allocate_page(state)
    capacity = page_region_size(state, page_no)

    {cells, right_most, promoted, remaining} =
      collect_index_interior_cells(nodes, capacity, 0, 0, [])

    page = build_interior_page(state, page_no, cells, right_most, 2)
    state = put_page(state, page_no, page)
    pack_index_interior_pages(remaining, state, [{page_no, promoted} | acc])
  end

  # An interior page covers a run of children: cells `<child::32, divider>` for
  # all but its rightmost child, the rightmost in the header, and the divider
  # after its rightmost promoted up to the next level.
  defp collect_index_interior_cells([{page, divider}], _capacity, _count, _used, cells) do
    {Enum.reverse(cells), page, divider, []}
  end

  defp collect_index_interior_cells([{page, divider} | rest], capacity, count, used, cells) do
    cell = render_index_interior_cell(page, divider)
    needed = page_header_size(:interior) + 2 * (count + 1) + used + byte_size(cell)

    if needed <= capacity do
      collect_index_interior_cells(rest, capacity, count + 1, used + byte_size(cell), [
        cell | cells
      ])
    else
      if count == 0 do
        raise ExSQL.Error, message: "index interior cell too large for a single page"
      end

      {Enum.reverse(cells), page, divider, rest}
    end
  end

  defp render_index_interior_cell(page, payload) do
    <<page::32, encode_varint(byte_size(payload))::binary, payload::binary>>
  end

  defp build_table_pages(entries, :rowid, state),
    do: build_table_pages(entries, :rowid, state, [])

  defp pack_rowid_leaf_pages(entries, state) do
    do_pack_rowid_leaf_pages(entries, state, [])
  end

  defp do_pack_rowid_leaf_pages([], state, chunks), do: {Enum.reverse(chunks), state}

  defp do_pack_rowid_leaf_pages(entries, state, chunks) do
    page_no = state.next_page
    state = allocate_page(state)
    capacity = page_region_size(state, page_no)

    {page_rows, remaining, state} =
      collect_rowid_leaf_rows(
        entries,
        capacity,
        0,
        0,
        [],
        state
      )

    if page_rows == [] do
      raise ExSQL.Error,
        message:
          "row too large for a single rowid-table page: #{inspect(List.first(entries) |> elem(0))}"
    end

    cells = Enum.map(page_rows, &elem(&1, 1))
    page = build_leaf_page(page_no, :rowid, cells, state)
    state = put_page(state, page_no, page)

    max_rowid = List.last(page_rows) |> elem(0)

    if remaining == [] do
      {Enum.reverse([{page_no, max_rowid} | chunks]), state}
    else
      do_pack_rowid_leaf_pages(remaining, state, [{page_no, max_rowid} | chunks])
    end
  end

  defp collect_rowid_leaf_rows([], _capacity, _row_count, _used, rows_acc, state) do
    {Enum.reverse(rows_acc), [], state}
  end

  defp collect_rowid_leaf_rows(
         [{rowid, values} | remaining],
         capacity,
         row_count,
         used,
         rows_acc,
         state
       ) do
    payload = encode_record_payload(values)
    payload_size = byte_size(payload)
    # Size the cell (inline, or inline-prefix + 4-byte overflow pointer) without
    # allocating overflow pages yet, so a row that spills to the next page
    # doesn't leave orphan pages behind.
    cell_size = rowid_leaf_cell_size(rowid, payload_size, state.usable)

    needed =
      page_header_size(:rowid) +
        2 * (row_count + 1) +
        used +
        cell_size

    if needed <= capacity do
      {cell, state} = rowid_leaf_cell(rowid, payload, state)

      collect_rowid_leaf_rows(
        remaining,
        capacity,
        row_count + 1,
        used + byte_size(cell),
        [{rowid, cell} | rows_acc],
        state
      )
    else
      if row_count == 0 do
        raise ExSQL.Error,
          message: "row too large for a single rowid-table page: #{inspect(rowid)}"
      end

      {Enum.reverse(rows_acc), [{rowid, values} | remaining], state}
    end
  end

  defp build_rowid_interior_tree([{page_no, _max_rowid}], state), do: {page_no, state}

  defp build_rowid_interior_tree(nodes, state) do
    {parents, state} = pack_rowid_interior_pages(nodes, state)
    build_rowid_interior_tree(parents, state)
  end

  defp pack_rowid_interior_pages(nodes, state) do
    nodes
    |> rowid_interior_chunks(state)
    |> Enum.map_reduce(state, fn chunk, acc_state ->
      build_rowid_interior_page(chunk, acc_state)
    end)
  end

  defp rowid_interior_chunks(nodes, state) do
    nodes
    |> do_rowid_interior_chunks(state, [])
    |> rebalance_single_child_tail()
  end

  defp do_rowid_interior_chunks([], _state, chunks), do: Enum.reverse(chunks)

  defp do_rowid_interior_chunks([first | rest], state, chunks) do
    {chunk, remaining} = collect_rowid_interior_chunk(rest, state, [first], 0, 0)
    do_rowid_interior_chunks(remaining, state, [chunk | chunks])
  end

  defp collect_rowid_interior_chunk([], _state, children, _cell_count, _used) do
    {Enum.reverse(children), []}
  end

  defp collect_rowid_interior_chunk(
         [next | rest] = remaining,
         state,
         [right_most | _] = children,
         cell_count,
         used
       ) do
    cell = render_rowid_interior_cell(right_most)

    needed =
      page_header_size(:interior) +
        2 * (cell_count + 1) +
        used +
        byte_size(cell)

    if needed <= page_region_size(state, state.next_page) do
      collect_rowid_interior_chunk(
        rest,
        state,
        [next | children],
        cell_count + 1,
        used + byte_size(cell)
      )
    else
      if cell_count == 0 do
        raise ExSQL.Error,
          message: "rowid-table interior cell too large for a single page"
      end

      {Enum.reverse(children), remaining}
    end
  end

  defp rebalance_single_child_tail([_only] = chunks), do: chunks

  defp rebalance_single_child_tail(chunks) do
    case List.last(chunks) do
      [_single] ->
        previous_index = length(chunks) - 2
        previous = Enum.at(chunks, previous_index)

        case previous do
          [_a, _b, _c | _rest] ->
            {moved, previous} = List.pop_at(previous, -1)

            chunks
            |> List.replace_at(previous_index, previous)
            |> List.update_at(-1, &[moved | &1])

          _other ->
            raise ExSQL.Error,
              message: "unable to rebalance rowid-table interior pages"
        end

      _tail ->
        chunks
    end
  end

  defp build_rowid_interior_page(children, state) do
    page_no = state.next_page
    state = allocate_page(state)

    cells =
      children
      |> Enum.take(length(children) - 1)
      |> Enum.map(&render_rowid_interior_cell/1)

    right_most = children |> List.last() |> elem(0)
    page = build_interior_page(state, page_no, cells, right_most)
    max_rowid = children |> List.last() |> elem(1)

    {{page_no, max_rowid}, put_page(state, page_no, page)}
  end

  defp build_leaf_page(page_no, :rowid, cells, state) do
    build_leaf_page(page_no, 13, cells, state)
  end

  defp build_leaf_page(page_no, :index, cells, state) do
    build_leaf_page(page_no, 10, cells, state)
  end

  defp build_leaf_page(page_no, page_type, cells, state) do
    capacity = page_region_size(state, page_no)
    header_size = page_header_size(page_type)
    page_offset = page_start_offset(page_no)

    cell_area = IO.iodata_to_binary(cells)
    content_start = page_offset + capacity - byte_size(cell_area)

    pointers =
      cells
      |> Enum.reduce({[], content_start}, fn cell, {acc, cursor} ->
        {[cursor | acc], cursor + byte_size(cell)}
      end)
      |> elem(0)
      |> Enum.reverse()

    header =
      case page_type do
        13 ->
          <<13::8, 0::16, length(cells)::16, content_start::16, 0::8>>

        10 ->
          <<10::8, 0::16, length(cells)::16, content_start::16, 0::8>>

        _ ->
          fail("unsupported leaf page type: #{page_type}")
      end

    pointer_area = Enum.map_join(pointers, <<>>, &<<&1::16>>)
    pointer_end = page_offset + header_size + byte_size(pointer_area)
    gap_size = content_start - pointer_end

    if gap_size < 0 do
      raise ExSQL.Error, message: "not enough room to build page #{page_no}"
    end

    content =
      String.duplicate(<<0>>, page_offset) <>
        header <> pointer_area <> String.duplicate(<<0>>, gap_size) <> cell_area

    if byte_size(content) > capacity + page_offset do
      raise ExSQL.Error, message: "not enough room to build page #{page_no}"
    end

    content <> String.duplicate(<<0>>, page_size(state) - byte_size(content))
  end

  defp build_interior_page(state, page_no, cells, right_most, page_type \\ 5) do
    capacity = page_region_size(state, page_no)
    page_offset = page_start_offset(page_no)
    header_size = 12
    cell_area = IO.iodata_to_binary(cells)
    content_start = page_offset + capacity - byte_size(cell_area)

    pointers =
      cells
      |> Enum.reduce({[], content_start}, fn cell, {acc, cursor} ->
        {[cursor | acc], cursor + byte_size(cell)}
      end)
      |> elem(0)
      |> Enum.reverse()

    header =
      <<
        page_type::8,
        0::16,
        length(cells)::16,
        content_start::16,
        0::8,
        right_most::32
      >>

    pointer_area = Enum.map_join(pointers, <<>>, &<<&1::16>>)
    pointer_end = page_offset + header_size + byte_size(pointer_area)
    gap_size = content_start - pointer_end

    if gap_size < 0 do
      raise ExSQL.Error, message: "not enough room to build interior page #{page_no}"
    end

    content =
      String.duplicate(<<0>>, page_offset) <>
        header <> pointer_area <> String.duplicate(<<0>>, gap_size) <> cell_area

    if byte_size(content) > capacity + page_offset do
      raise ExSQL.Error, message: "not enough room to build interior page #{page_no}"
    end

    content <> String.duplicate(<<0>>, page_size(state) - byte_size(content))
  end

  # SQLite's local-payload formula (mirrors `read_payload/4`): a table-leaf cell
  # stores up to `max_local` payload bytes inline; a larger payload keeps a
  # computed prefix inline followed by a 4-byte pointer to an overflow-page
  # chain holding the rest.
  defp rowid_leaf_local_size(payload_size, usable) do
    max_local = usable - 35

    if payload_size <= max_local do
      payload_size
    else
      min_local = div((usable - 12) * 32, 255) - 23
      surplus = min_local + rem(payload_size - min_local, usable - 4)
      if surplus <= max_local, do: surplus, else: min_local
    end
  end

  defp rowid_leaf_cell_size(rowid, payload_size, usable) do
    local = rowid_leaf_local_size(payload_size, usable)
    overflow_pointer = if local < payload_size, do: 4, else: 0

    byte_size(encode_varint(payload_size)) + byte_size(encode_varint(rowid)) + local +
      overflow_pointer
  end

  defp rowid_leaf_cell(rowid, payload, state) do
    payload_size = byte_size(payload)
    local = rowid_leaf_local_size(payload_size, state.usable)
    header = encode_varint(payload_size) <> encode_varint(rowid)

    if local >= payload_size do
      {header <> payload, state}
    else
      <<local_payload::binary-size(^local), overflow::binary>> = payload
      {first_page, state} = write_overflow_chain(overflow, state)
      {header <> local_payload <> <<first_page::32>>, state}
    end
  end

  # Splits the spilled payload into `usable - 4` byte chunks across a linked list
  # of overflow pages (each page: 4-byte next-page pointer, then data; last = 0),
  # and returns the first page number.
  defp write_overflow_chain(data, state) do
    chunk_size = state.usable - 4
    chunks = chunk_binary(data, chunk_size)

    {page_nos, state} =
      Enum.map_reduce(chunks, state, fn _chunk, acc ->
        {acc.next_page, allocate_page(acc)}
      end)

    state =
      chunks
      |> Enum.zip(page_nos)
      |> Enum.with_index()
      |> Enum.reduce(state, fn {{chunk, page_no}, idx}, acc ->
        next = Enum.at(page_nos, idx + 1, 0)
        put_page(acc, page_no, build_overflow_page(acc, chunk, next))
      end)

    {hd(page_nos), state}
  end

  defp build_overflow_page(state, data, next_page) do
    content = <<next_page::32>> <> data
    content <> String.duplicate(<<0>>, page_size(state) - byte_size(content))
  end

  defp chunk_binary(<<>>, _size), do: []

  defp chunk_binary(data, size) when byte_size(data) <= size, do: [data]

  defp chunk_binary(data, size) do
    <<head::binary-size(^size), rest::binary>> = data
    [head | chunk_binary(rest, size)]
  end

  defp render_rowid_interior_cell({page_no, max_rowid}) do
    <<page_no::32, encode_varint(max_rowid)::binary>>
  end

  defp page_header_size(13), do: 8
  defp page_header_size(10), do: 8
  defp page_header_size(5), do: 12
  defp page_header_size(:interior), do: 12
  defp page_header_size(_), do: 8

  defp page_region_size(%{usable: usable}, 1), do: usable - 100
  defp page_region_size(%{usable: usable}, _page_no), do: usable

  defp page_start_offset(1), do: 100
  defp page_start_offset(_), do: 0

  defp page_size(%{page_size: page_size}), do: page_size

  defp allocate_page(%{next_page: next_page} = state), do: %{state | next_page: next_page + 1}

  defp put_page(state, page_no, page) do
    %{state | pages: Map.put(state.pages, page_no, page)}
  end

  defp build_header(db, page_size, page_count) do
    magic = "SQLite format 3" <> <<0>>
    db_page_size = if page_size == 65_536, do: 1, else: page_size

    payload =
      <<db_page_size::16, 1, 1, 0, 64, 32, 32, 0::32, page_count::32, 0::32, 0::32, 0::32, 4::32,
        0::32, db.auto_vacuum::32, 1::32>>

    padding = String.duplicate(<<0>>, 100 - 16 - byte_size(payload))
    magic <> payload <> padding
  end

  defp assemble_file(header, state, page_count) do
    page_size = state.page_size

    pages =
      for page_no <- 1..page_count do
        case Map.get(state.pages, page_no) do
          nil when page_no == 1 ->
            header <> String.duplicate(<<0>>, page_size - 100)

          nil ->
            String.duplicate(<<0>>, page_size)

          page when page_no == 1 ->
            header <> binary_part(page, 100, page_size - 100)

          page ->
            page
        end
      end

    IO.iodata_to_binary(pages)
  end

  defp encode_record_payload(values) do
    {serial_bytes, body} =
      values
      |> Enum.map(&encode_value/1)
      |> Enum.unzip()

    serial = encode_records_header(serial_bytes)
    header_size = byte_size(serial) + byte_size(encode_varint(byte_size(serial) + 1))
    header = encode_varint(header_size)
    IO.iodata_to_binary([header, serial | body])
  end

  defp encode_records_header(serials) do
    Enum.map_join(serials, <<>>, &encode_varint/1)
  end

  defp encode_varint(value) when is_integer(value) and value < 0 do
    raise ExSQL.Error, message: "negative varints are not supported: #{value}"
  end

  defp encode_varint(value) when is_integer(value) do
    if value > 18_446_744_073_709_551_615 do
      raise ExSQL.Error, message: "varint overflow: #{value}"
    end

    value
    |> varint_groups([])
    |> encode_varint_bytes()
  end

  defp varint_groups(value, groups) when value < 128 and length(groups) < 8 do
    [value | groups]
  end

  defp varint_groups(value, groups) when value < 256 and length(groups) == 8 do
    [value | groups]
  end

  defp varint_groups(value, groups) do
    varint_groups(div(value, 128), [Bitwise.band(value, 0x7F) | groups])
  end

  defp encode_varint_bytes([byte]), do: <<byte::8>>

  defp encode_varint_bytes([head | rest]) do
    <<Bitwise.bor(head, 0x80)::8, encode_varint_bytes(rest)::binary>>
  end

  defp encode_value(nil), do: {0, <<>>}
  defp encode_value(:nan), do: {7, <<:math.pow(-1.0, 0.5)::float-64>>}

  defp encode_value({:blob, blob}) when is_binary(blob) do
    {12 + byte_size(blob) * 2, blob}
  end

  defp encode_value(value) when is_binary(value) do
    {13 + 2 * byte_size(value), value}
  end

  defp encode_value({:json, json}) when is_binary(json) do
    {13 + 2 * byte_size(json), json}
  end

  defp encode_value(value) when is_integer(value) do
    cond do
      value == 0 ->
        {8, <<>>}

      value == 1 ->
        {9, <<>>}

      value >= -128 and value <= 127 ->
        {1, <<value::signed-size(8)>>}

      value >= -32_768 and value <= 32_767 ->
        {2, <<value::signed-size(16)>>}

      value >= -8_388_608 and value <= 8_388_607 ->
        {3, <<value::signed-size(24)>>}

      value >= -2_147_483_648 and value <= 2_147_483_647 ->
        {4, <<value::signed-size(32)>>}

      value >= -140_737_488_355_328 and value <= 140_737_488_355_327 ->
        {5, <<value::signed-size(48)>>}

      true ->
        {6, <<value::signed-size(64)>>}
    end
  end

  defp encode_value(value) when is_float(value) do
    {7, <<value::float-size(64)>>}
  end

  defp encode_value(_other), do: {0, <<>>}

  defp emit_create_table_sql(table) do
    definition_sql =
      table.columns
      |> Enum.map(&emit_column_def_sql/1)
      |> Kernel.++(Enum.map(table.foreign_keys, &emit_table_foreign_key_sql(table, &1)))
      |> Enum.join(", ")

    options =
      [
        if(table.without_rowid, do: "WITHOUT ROWID"),
        if(table.strict, do: "STRICT")
      ]
      |> Enum.reject(&is_nil/1)

    suffix = if options == [], do: "", else: " " <> Enum.join(options, ", ")
    "CREATE TABLE #{table.name}(#{definition_sql})#{suffix}"
  end

  defp emit_column_def_sql(column) do
    [
      column.name,
      column.declared_type,
      emit_generated_sql(column),
      if(column.primary_key, do: "PRIMARY KEY"),
      if(column.autoincrement, do: "AUTOINCREMENT"),
      if(column.not_null, do: "NOT NULL"),
      if(column.unique, do: "UNIQUE"),
      if(column.default, do: "DEFAULT #{emit_default(column.default)}"),
      if(column.collate, do: "COLLATE #{column.collate}"),
      emit_references_sql(column)
    ]
    |> Enum.reject(&is_nil/1)
    |> Enum.join(" ")
  end

  defp emit_generated_sql(%{generated: {kind, expr}}) do
    "GENERATED ALWAYS AS (#{emit_expr(expr)}) #{kind |> Atom.to_string() |> String.upcase()}"
  end

  defp emit_generated_sql(_column), do: nil

  defp emit_references_sql(%{references: {table, [], actions}}) do
    "REFERENCES #{table}#{emit_fk_actions(actions)}"
  end

  defp emit_references_sql(%{references: {table, columns, actions}}) do
    "REFERENCES #{table}(#{Enum.join(columns, ", ")})#{emit_fk_actions(actions)}"
  end

  defp emit_references_sql(_column), do: nil

  defp emit_fk_actions(actions) do
    [
      if(actions.on_delete != :no_action, do: " ON DELETE #{emit_fk_action(actions.on_delete)}"),
      if(actions.on_update != :no_action, do: " ON UPDATE #{emit_fk_action(actions.on_update)}"),
      if(actions.deferred, do: " DEFERRABLE INITIALLY DEFERRED")
    ]
    |> Enum.reject(&is_nil/1)
    |> Enum.join()
  end

  defp emit_fk_action(:restrict), do: "RESTRICT"
  defp emit_fk_action(:set_null), do: "SET NULL"
  defp emit_fk_action(:set_default), do: "SET DEFAULT"
  defp emit_fk_action(:cascade), do: "CASCADE"

  defp emit_table_foreign_key_sql(table, {child_keys, parent_table, parent_keys, actions}) do
    child_columns =
      child_keys
      |> Enum.map_join(", ", &display_column_name(table, &1))

    references =
      case parent_keys do
        [] -> parent_table
        keys -> "#{parent_table}(#{Enum.join(keys, ", ")})"
      end

    "FOREIGN KEY(#{child_columns}) REFERENCES #{references}#{emit_fk_actions(actions)}"
  end

  defp emit_create_index_sql(table, index) do
    unique = if index.unique, do: "UNIQUE ", else: ""

    columns =
      index
      |> emit_index_members()
      |> Enum.map_join(", ", fn
        {:column, key} -> display_column_name(table, key)
        {:expr, expr} -> emit_expr(expr)
      end)

    where = if index.where, do: " WHERE #{emit_expr(index.where)}", else: ""
    "CREATE #{unique}INDEX #{index.name} ON #{table.name}(#{columns})#{where}"
  end

  defp emit_default({:literal, nil}), do: "NULL"
  defp emit_default({:literal, {:blob, blob}}), do: "x'#{Base.encode16(blob)}'"
  defp emit_default({:literal, value}) when is_binary(value), do: "'#{value}'"
  defp emit_default({:literal, value}), do: Value.to_text(value)
  defp emit_default({:negate, {:literal, value}}), do: "-#{Value.to_text(value)}"
  defp emit_default({:column, nil, word}), do: word
  # An expression default (e.g. `(strftime('%Y-%m-%dT%H:%M:%SZ','now'))`): emit
  # the expression, parenthesized as SQLite stores and re-parses it. Without
  # this the schema round-trip through the file format would lose the default.
  defp emit_default(expr), do: "(#{emit_expr(expr)})"

  defp emit_expr({:param, _index, raw}), do: raw
  defp emit_expr({:column, nil, name}), do: name
  defp emit_expr({:column, table, name}), do: "#{table}.#{name}"
  defp emit_expr({:literal, nil}), do: "NULL"
  defp emit_expr({:literal, {:blob, b}}), do: "x'#{Base.encode16(b)}'"
  defp emit_expr({:literal, v}) when is_binary(v), do: "'#{v}'"
  defp emit_expr({:literal, v}), do: Value.to_text(v)
  defp emit_expr({:function, name, :star}), do: "#{name}(*)"

  defp emit_expr({:function, name, {:distinct, args}}),
    do: "#{name}(DISTINCT #{Enum.map_join(args, ", ", &emit_expr/1)})"

  defp emit_expr({:function, name, args}),
    do: "#{name}(#{Enum.map_join(args, ", ", &emit_expr/1)})"

  defp emit_expr({:binary, op, left, right}),
    do: "#{emit_expr(left)} #{op} #{emit_expr(right)}"

  defp emit_expr({:collate, expr, name}), do: "#{emit_expr(expr)} COLLATE #{name}"
  defp emit_expr({:negate, expr}), do: "-#{emit_expr(expr)}"
  defp emit_expr({:not, expr}), do: "NOT #{emit_expr(expr)}"
  defp emit_expr(_expr), do: "expr"

  defp emit_index_members(index),
    do: Map.get(index, :members) || Enum.map(index.columns, &{:column, &1})

  defp display_column_name(table, column_key) do
    case Enum.find(table.columns, &(Table.key(&1.name) == column_key)) do
      nil -> column_key
      column -> column.name
    end
  end

  defp read_file(path) do
    case File.read(path) do
      {:ok, data} -> {:ok, data}
      {:error, reason} -> {:error, "unable to open database file: #{reason}"}
    end
  end

  # -- file header (the first 100 bytes) ----------------------------------------

  defp parse_header(<<"SQLite format 3", 0, rest::binary>> = data) do
    <<page_size::16, _write_version, _read_version, reserved, _max_frac, _min_frac, _leaf_frac,
      _change_count::32, _db_pages::32, _freelist_first::32, _freelist_count::32,
      _schema_cookie::32, _schema_format::32, _cache_size::32, _vacuum::32, encoding::32,
      _rest::binary>> = rest

    page_size = if page_size == 1, do: 65_536, else: page_size

    cond do
      encoding not in [0, 1] ->
        {:error, "unsupported text encoding (only UTF-8 databases are readable)"}

      byte_size(data) < page_size ->
        {:error, "file is not a database"}

      true ->
        {:ok, %{data: data, page_size: page_size, usable: page_size - reserved, pages: %{}}}
    end
  end

  defp parse_header(_data), do: {:error, "file is not a database"}

  # -- WAL replay ---------------------------------------------------------------

  defp replay_wal(path, file) do
    case File.read(path <> "-wal") do
      {:ok, wal} when byte_size(wal) >= 32 ->
        wal_pages(wal, file)

      {:ok, _empty_or_short} ->
        {:ok, file}

      {:error, :enoent} ->
        {:ok, file}

      {:error, reason} ->
        {:error, "unable to open WAL file: #{reason}"}
    end
  end

  defp wal_pages(wal, file) do
    <<magic::32, _version::32, wal_page_size::32, _checkpoint::32, salt1::32, salt2::32,
      _checksum1::32, _checksum2::32, frames::binary>> = wal

    page_size = if wal_page_size == 0, do: file.page_size, else: wal_page_size

    cond do
      magic not in [0x377F0682, 0x377F0683] ->
        {:ok, file}

      page_size != file.page_size ->
        {:error, "WAL page size does not match database page size"}

      true ->
        committed =
          frames
          |> wal_frames(page_size, salt1, salt2)
          |> committed_wal_pages()

        {:ok, %{file | pages: Map.merge(file.pages, committed)}}
    end
  rescue
    MatchError -> {:ok, file}
    ArgumentError -> {:ok, file}
  end

  defp wal_frames(frames, page_size, salt1, salt2) do
    frame_size = page_size + 24
    complete_frames = div(byte_size(frames), frame_size)

    Enum.map(0..(complete_frames - 1)//1, fn index ->
      offset = index * frame_size
      frame = binary_part(frames, offset, frame_size)

      <<page_number::32, db_size::32, ^salt1::32, ^salt2::32, _checksum1::32, _checksum2::32,
        page::binary-size(^page_size)>> = frame

      %{page_number: page_number, db_size: db_size, page: page}
    end)
  end

  defp committed_wal_pages(frames) do
    commit_index =
      frames
      |> Enum.with_index()
      |> Enum.reduce(nil, fn
        {%{db_size: db_size}, index}, _last when db_size > 0 -> index
        _frame, last -> last
      end)

    case commit_index do
      nil ->
        %{}

      index ->
        frames
        |> Enum.take(index + 1)
        |> Map.new(fn %{page_number: page_number, page: page} -> {page_number, page} end)
    end
  end

  # -- loading -------------------------------------------------------------------

  defp load(file) do
    schema_rows = btree_rows(file, 1)

    schema =
      Enum.map(schema_rows, fn {_rowid, [type, name, tbl_name, rootpage, sql]} ->
        %{type: type, name: name, tbl_name: tbl_name, rootpage: rootpage, sql: sql}
      end)

    db =
      Enum.reduce(schema, Database.new(), fn entry, db ->
        case entry do
          %{name: "sqlite_sequence"} ->
            db

          %{sql: nil} ->
            # Auto-indexes for UNIQUE/PK constraints have no SQL; the
            # constraints themselves come back with the CREATE TABLE.
            db

          %{sql: sql} ->
            case Executor.run(db, sql) do
              {:ok, _results, db} -> db
              {:error, error, _db} -> fail("schema error in #{entry.name}: #{error.message}")
            end
        end
      end)

    db =
      Enum.reduce(schema, db, fn entry, db ->
        case entry do
          %{type: "table", name: name, rootpage: rootpage}
          when name != "sqlite_sequence" and rootpage > 0 ->
            load_table_rows(file, db, name, rootpage)

          _other ->
            db
        end
      end)

    {:ok, load_sequences(file, db, schema)}
  end

  defp load_table_rows(file, db, name, rootpage) do
    case Database.fetch_table(db, name) do
      {:error, _} ->
        db

      {:ok, %{without_rowid: true} = table} ->
        storage_columns = without_rowid_storage_columns(table)

        rows =
          file
          |> index_btree_records(rootpage)
          |> Enum.with_index(1)
          |> Map.new(fn {values, rowid} ->
            row =
              storage_columns
              |> zip_pad(values)
              |> Map.new(fn {column, value} -> {Table.key(column.name), value} end)

            {rowid, row}
          end)

        next_rowid = map_size(rows) + 1

        Database.put_table(
          db,
          clear_index_entries(
            Table.narrow_all_rows(%{table | rows: rows, next_rowid: next_rowid})
          )
        )

      {:ok, table} ->
        stored_columns = Enum.reject(table.columns, &match?({:virtual, _}, &1.generated))

        rows =
          for {rowid, values} <- btree_rows(file, rootpage), into: %{} do
            row =
              stored_columns
              |> zip_pad(values)
              |> Map.new(fn {column, value} ->
                key = Table.key(column.name)

                value =
                  if key == table.rowid_alias and value == nil do
                    rowid
                  else
                    value
                  end

                {key, value}
              end)

            {rowid, row}
          end

        next_rowid = (rows |> Map.keys() |> Enum.max(fn -> 0 end)) + 1

        Database.put_table(
          db,
          clear_index_entries(
            Table.narrow_all_rows(%{table | rows: rows, next_rowid: next_rowid})
          )
        )
    end
  end

  # Rows are bulk-loaded directly into `table.rows`, bypassing the executor's
  # per-row index maintenance. The schema replay ran CREATE INDEX against the
  # then-empty table, leaving `entries: %{}` — which the executor treats as
  # already materialized. Drop `:entries` so the lazy `ensure_index_entries`
  # path rebuilds them from the loaded rows on first use.
  defp clear_index_entries(%{indexes: indexes, autoindexes: autoindexes} = table) do
    %{
      table
      | indexes: Enum.map(indexes, &Map.delete(&1, :entries)),
        autoindexes: Enum.map(autoindexes, &Map.delete(&1, :entries))
    }
  end

  defp clear_index_entries(table), do: table

  defp without_rowid_storage_columns(table) do
    stored_columns = Enum.reject(table.columns, &match?({:virtual, _}, &1.generated))

    primary_keys =
      case table.composite_keys do
        [{_name, keys} | _rest] ->
          keys

        [] ->
          stored_columns
          |> Enum.filter(& &1.primary_key)
          |> Enum.map(&Table.key(&1.name))
      end

    primary_key_set = MapSet.new(primary_keys)

    primary_columns =
      Enum.map(primary_keys, fn key -> Enum.find(stored_columns, &(Table.key(&1.name) == key)) end)
      |> Enum.reject(&is_nil/1)

    rest_columns =
      Enum.reject(stored_columns, &(Table.key(&1.name) in primary_key_set))

    primary_columns ++ rest_columns
  end

  # Records written before an ALTER TABLE ADD COLUMN are short; the missing
  # trailing columns take their declared defaults.
  defp zip_pad([], _values), do: []

  defp zip_pad([column | columns], []),
    do: [{column, stored_default(column)} | zip_pad(columns, [])]

  defp zip_pad([column | columns], [value | values]),
    do: [{column, value} | zip_pad(columns, values)]

  defp stored_default(%{default: {:literal, value}}), do: value
  defp stored_default(_column), do: nil

  defp load_sequences(file, db, schema) do
    case Enum.find(schema, &(&1.type == "table" and &1.name == "sqlite_sequence")) do
      nil ->
        db

      %{rootpage: rootpage} ->
        Enum.reduce(btree_rows(file, rootpage), db, fn {_rowid, [name, seq]}, db ->
          case Database.fetch_table(db, name || "") do
            {:ok, table} when is_integer(seq) ->
              Database.put_table(db, %{table | sequence: seq, sequence_row: true})

            _other ->
              db
          end
        end)
    end
  end

  # -- B-tree walking --------------------------------------------------------------

  # All {rowid, values} rows of the table B-tree rooted at `page_number`.
  defp btree_rows(file, page_number) do
    page = page(file, page_number)
    header_offset = if page_number == 1, do: 100, else: 0
    page_type = :binary.at(page, header_offset)
    ncells = u16(page, header_offset + 3)

    case page_type do
      # interior table page
      5 ->
        right_most = u32(page, header_offset + 8)

        children =
          page
          |> cell_pointers(header_offset + 12, ncells)
          |> Enum.map(&u32(page, &1))

        Enum.flat_map(children ++ [right_most], &btree_rows(file, &1))

      # leaf table page
      13 ->
        page
        |> cell_pointers(header_offset + 8, ncells)
        |> Enum.map(&leaf_cell(file, page, &1))

      other ->
        fail("unsupported B-tree page type: #{other}")
    end
  end

  # All records in an index B-tree rooted at `page_number`. WITHOUT ROWID
  # tables are stored as index B-trees whose key payload is the complete row.
  defp index_btree_records(file, page_number) do
    page = page(file, page_number)
    header_offset = if page_number == 1, do: 100, else: 0
    page_type = :binary.at(page, header_offset)
    ncells = u16(page, header_offset + 3)

    case page_type do
      # interior index page
      2 ->
        right_most = u32(page, header_offset + 8)

        page
        |> cell_pointers(header_offset + 12, ncells)
        |> Enum.flat_map(fn offset ->
          child = u32(page, offset)
          index_btree_records(file, child) ++ [index_interior_cell(file, page, offset)]
        end)
        |> Kernel.++(index_btree_records(file, right_most))

      # leaf index page
      10 ->
        page
        |> cell_pointers(header_offset + 8, ncells)
        |> Enum.map(&index_leaf_cell(file, page, &1))

      other ->
        fail("unsupported B-tree page type: #{other}")
    end
  end

  defp page(%{pages: pages, data: data, page_size: page_size}, page_number) do
    case Map.fetch(pages, page_number) do
      {:ok, page} ->
        page

      :error ->
        binary_part(data, (page_number - 1) * page_size, page_size)
    end
  end

  defp u16(bin, offset), do: :binary.decode_unsigned(binary_part(bin, offset, 2))
  defp u32(bin, offset), do: :binary.decode_unsigned(binary_part(bin, offset, 4))

  defp cell_pointers(page, array_offset, ncells) do
    for index <- 0..(ncells - 1)//1, do: u16(page, array_offset + index * 2)
  end

  defp leaf_cell(file, page, offset) do
    cell = binary_part(page, offset, byte_size(page) - offset)
    {payload_size, cell} = varint(cell)
    {rowid, cell} = varint(cell)
    payload = read_payload(file, cell, payload_size, :table_leaf)
    {rowid, decode_record(payload)}
  end

  defp index_leaf_cell(file, page, offset) do
    cell = binary_part(page, offset, byte_size(page) - offset)
    {payload_size, cell} = varint(cell)
    payload = read_payload(file, cell, payload_size, :index)
    decode_record(payload)
  end

  defp index_interior_cell(file, page, offset) do
    cell = binary_part(page, offset + 4, byte_size(page) - offset - 4)
    {payload_size, cell} = varint(cell)
    payload = read_payload(file, cell, payload_size, :index)
    decode_record(payload)
  end

  # Payload overflow: a leaf cell stores up to maxLocal bytes locally; the
  # remainder lives on a chain of overflow pages (btreeInt.h's local-payload
  # computation for table leaves).
  defp read_payload(%{usable: usable} = file, cell, payload_size, kind) do
    max_local =
      case kind do
        :table_leaf -> usable - 35
        :index -> div((usable - 12) * 64, 255) - 23
      end

    if payload_size <= max_local do
      binary_part(cell, 0, payload_size)
    else
      min_local = div((usable - 12) * 32, 255) - 23
      surplus = min_local + rem(payload_size - min_local, usable - 4)
      local = if surplus <= max_local, do: surplus, else: min_local

      <<local_payload::binary-size(^local), first_overflow::32, _::binary>> = cell
      overflow = read_overflow(file, first_overflow, payload_size - local)
      local_payload <> overflow
    end
  end

  defp read_overflow(_file, _page_number, 0), do: ""

  defp read_overflow(%{usable: usable} = file, page_number, remaining) do
    <<next::32, content::binary>> = page(file, page_number)
    take = min(remaining, usable - 4)
    binary_part(content, 0, take) <> read_overflow(file, next, remaining - take)
  end

  # -- record decoding --------------------------------------------------------------

  defp decode_record(payload) do
    {header_size, rest} = varint(payload)
    header_bytes = header_size - (byte_size(payload) - byte_size(rest))
    <<header::binary-size(^header_bytes), body::binary>> = rest

    header
    |> serial_types([])
    |> Enum.map_reduce(body, &decode_value/2)
    |> elem(0)
  end

  defp serial_types(<<>>, acc), do: Enum.reverse(acc)

  defp serial_types(header, acc) do
    {serial_type, rest} = varint(header)
    serial_types(rest, [serial_type | acc])
  end

  defp decode_value(0, body), do: {nil, body}

  defp decode_value(n, body) when n in 1..6 do
    bits = Enum.at([8, 16, 24, 32, 48, 64], n - 1)
    <<value::signed-size(^bits), rest::binary>> = body
    {value, rest}
  end

  defp decode_value(7, <<value::float-64, rest::binary>>), do: {value, rest}
  defp decode_value(8, body), do: {0, body}
  defp decode_value(9, body), do: {1, body}

  defp decode_value(n, body) when n >= 12 and rem(n, 2) == 0 do
    size = div(n - 12, 2)
    <<value::binary-size(^size), rest::binary>> = body
    {{:blob, value}, rest}
  end

  defp decode_value(n, body) when n >= 13 do
    size = div(n - 13, 2)
    <<value::binary-size(^size), rest::binary>> = body
    {value, rest}
  end

  # Big-endian base-128 varint, at most 9 bytes (the 9th holds 8 raw bits).
  defp varint(data), do: varint(data, 0, 0)

  defp varint(<<byte, rest::binary>>, shift_acc, 8) do
    {Bitwise.bsl(shift_acc, 8) + byte, rest}
  end

  defp varint(<<byte, rest::binary>>, acc, count) do
    acc = Bitwise.bsl(acc, 7) + Bitwise.band(byte, 0x7F)

    if Bitwise.band(byte, 0x80) == 0 do
      {acc, rest}
    else
      varint(rest, acc, count + 1)
    end
  end

  defp fail(message), do: raise(ExSQL.Error, message: message)
end