defmodule ExSQL.FileFormat do
@moduledoc """
Reads the SQLite on-disk file format (`btree.c` / `btreeInt.h` territory)
into an `ExSQL.Database` value.
The reader walks the database header, the sqlite_schema B-tree on page 1,
and each table's B-tree, decoding varint record headers and serial types.
Schema objects are rebuilt by running their stored `CREATE ...` SQL
through the ordinary executor, then table rows are bulk-loaded.
{:ok, db} = ExSQL.FileFormat.read("app.db")
ExSQL.Executor.run(db, "SELECT * FROM users")
Writing serializes a `Database` value into a fresh, SQLite-valid file:
rowid and WITHOUT ROWID tables, secondary and unique-auto indexes (with
multi-level interior B-trees), AUTOINCREMENT sequences, overflow-page chains
for rows larger than a page, and an optional WAL sidecar. It is a whole-file
re-serialize, not in-place page editing.
Limitations: reads UTF-8 databases only (UTF-16 is rejected with a clear
error).
"""
alias ExSQL.{Database, Executor, Table, Value}
@doc """
Writes an in-memory `ExSQL.Database` into a SQLite `.db` file.
Returns `{:ok, path}` on success.
"""
@spec write(Database.t(), Path.t(), keyword()) :: {:ok, String.t()} | {:error, String.t()}
def write(db, path, opts \\ []) do
try do
{:ok, binary, metadata} = serialize(db)
journal_mode = Keyword.get(opts, :journal_mode, db.journal_mode)
with :ok <- File.write(path, binary),
:ok <- maybe_write_wal(path, binary, metadata, journal_mode) do
{:ok, path}
else
{:error, reason} ->
{:error, "unable to write database file: #{reason}"}
end
rescue
e in ExSQL.Error -> {:error, e.message}
end
end
@doc "Reads a database file into an in-memory `ExSQL.Database`."
@spec read(Path.t()) :: {:ok, Database.t()} | {:error, String.t()}
def read(path) do
try do
with {:ok, data} <- read_file(path),
{:empty, false} <- {:empty, data == <<>>},
{:ok, file} <- parse_header(data),
{:ok, file} <- replay_wal(path, file) do
load(file)
else
{:empty, true} -> {:ok, Database.new()}
other -> other
end
rescue
e in ExSQL.Error -> {:error, e.message}
end
end
# -- writing ------------------------------------------------------------------
defp serialize(db) do
page_size = normalize_page_size(db.page_size)
state = %{page_size: page_size, usable: page_size, next_page: 2, pages: %{}}
{schema_rows, state} = collect_schema_rows(db, state)
{schema_page, state} = build_schema_page(schema_rows, state)
state = put_page(state, 1, schema_page)
page_count = page_count(state)
header = build_header(db, page_size, page_count)
binary = assemble_file(header, state, page_count)
{:ok, binary, %{page_size: page_size, page_count: page_count}}
end
defp maybe_write_wal(path, binary, metadata, journal_mode) do
if wal_mode?(journal_mode) do
write_wal_file(path, binary, metadata)
else
_ = File.rm(path <> "-wal")
_ = File.rm(path <> "-shm")
:ok
end
end
defp wal_mode?(mode) when is_binary(mode), do: String.upcase(mode) == "WAL"
defp wal_mode?(mode) when is_atom(mode), do: mode == :wal
defp wal_mode?(_), do: false
defp write_wal_file(path, binary, %{page_size: page_size, page_count: page_count}) do
wal_path = "#{path}-wal"
wal_header = build_wal_header(page_size)
salt1 = 0
salt2 = 0
pages =
1..page_count
|> Enum.map(fn page_no ->
page = binary_part(binary, (page_no - 1) * page_size, page_size)
build_wal_frame(page_no, page_count, salt1, salt2, page)
end)
File.write(wal_path, [wal_header | pages])
end
defp build_wal_header(_page_size) do
magic = 0x377F0682
version = 3_007_000
wal_page_size = 0
checkpoint_seqno = 0
salt = 0
<<magic::32, version::32, wal_page_size::32, checkpoint_seqno::32, salt::32, salt::32, 0::32,
0::32>>
end
defp build_wal_frame(page_number, page_count, salt1, salt2, page) do
<<page_number::32, page_count::32, salt1::32, salt2::32, 0::32, 0::32>> <> page
end
defp normalize_page_size(size) do
cond do
size == 65_536 ->
size
size < 512 or size > 65_536 ->
raise(ExSQL.Error, message: "unsupported page size: #{size}")
not power_of_two?(size) ->
raise(ExSQL.Error, message: "page size must be a power of two: #{size}")
true ->
size
end
end
defp power_of_two?(n), do: n > 0 and Bitwise.band(n, n - 1) == 0
defp page_count(state), do: state.next_page - 1
defp collect_schema_rows(db, state) do
tables = db.tables |> Map.values() |> Enum.sort_by(&Table.key(&1.name))
{table_rows, state} =
Enum.map_reduce(tables, state, fn table, acc_state ->
collect_table_schema_rows(table, acc_state)
end)
table_rows = Enum.concat(table_rows)
{sequence_rows, state} =
if sqlite_sequence_needed?(db) do
build_sqlite_sequence_rows(db, state)
else
{[], state}
end
{table_rows ++ sequence_rows, state}
end
defp collect_table_schema_rows(table, state) do
{rootpage, state} =
if table.without_rowid do
build_without_rowid_table_pages(table, state)
else
build_rowid_table_pages(table, state)
end
table_row = [
"table",
table.name,
table.name,
rootpage,
emit_create_table_sql(table)
]
{autoindex_rows, state} =
table.autoindexes
|> Enum.sort_by(&Table.key(&1.name))
|> Enum.map_reduce(state, fn index, acc_state ->
{rootpage, acc_state} = build_index_pages(table, index, acc_state)
{["index", index.name, table.name, rootpage, nil], acc_state}
end)
{index_rows, state} =
table.indexes
|> Enum.sort_by(&Table.key(&1.name))
|> Enum.map_reduce(state, fn index, acc_state ->
{rootpage, acc_state} = build_index_pages(table, index, acc_state)
{["index", index.name, table.name, rootpage, emit_create_index_sql(table, index)],
acc_state}
end)
{[table_row | autoindex_rows ++ index_rows], state}
end
defp sqlite_sequence_needed?(db) do
Enum.any?(db.tables, fn {_k, table} ->
table.autoincrement and table.sequence_row and table.sequence > 0
end) or db.sqlite_sequence_orphans != %{}
end
defp build_sqlite_sequence_rows(db, state) do
visible_rows =
db.tables
|> Map.values()
|> Enum.filter(&(&1.autoincrement and &1.sequence_row))
|> Enum.map(&{Table.key(&1.name), [&1.name, &1.sequence]})
orphan_rows =
db.sqlite_sequence_orphans
|> Enum.map(fn {key, {name, sequence}} -> {key, [name, sequence]} end)
entries =
(visible_rows ++ orphan_rows)
|> Enum.sort_by(fn {key, _row} -> key end)
|> Enum.map(fn {_key, row} -> row end)
{rootpage, state} = build_rowid_table_rows(entries, "sqlite_sequence", state)
{
[
[
"table",
"sqlite_sequence",
"sqlite_sequence",
rootpage,
"CREATE TABLE sqlite_sequence(name,seq)"
]
],
state
}
end
defp build_schema_page(rows, state) do
{schema_root, state} = build_rowid_table_rows(rows, :sqlite_schema, state)
schema_page = Map.fetch!(state.pages, schema_root)
schema_page = relocate_schema_page(schema_page, state)
pages = Map.delete(state.pages, schema_root)
state =
if schema_root == page_count(state) do
%{state | pages: pages, next_page: schema_root}
else
%{state | pages: pages}
end
{schema_page, state}
end
defp relocate_schema_page(schema_page, state) do
case :binary.at(schema_page, 0) do
page_type when page_type in [5, 13] ->
page_size = page_size(state)
region = page_size - 100
header_size = page_header_size(page_type)
ncells = u16(schema_page, 3)
content_start = u16(schema_page, 5)
pointers =
if ncells == 0 do
<<>>
else
0..(ncells - 1)
|> Enum.map(fn index -> u16(schema_page, header_size + index * 2) end)
|> Enum.map_join(&<<&1::16>>)
end
header = binary_part(schema_page, 0, header_size)
pointer_end = 100 + header_size + 2 * ncells
gap_size = content_start - pointer_end
if gap_size < 0 do
raise ExSQL.Error, message: "unable to relocate schema page to first page"
end
cell_area = binary_part(schema_page, content_start, page_size - content_start)
body = header <> pointers <> String.duplicate(<<0>>, gap_size) <> cell_area
if byte_size(body) > region do
raise ExSQL.Error, message: "unable to relocate schema page to first page"
end
String.duplicate(<<0>>, 100) <> body <> String.duplicate(<<0>>, region - byte_size(body))
_ ->
schema_page
end
end
defp build_rowid_table_pages(table, state) when is_map(table) do
stored_columns =
table.columns
|> Enum.reject(&match?({:virtual, _}, &1.generated))
rows =
table
|> Table.scan()
|> Enum.map(fn {rowid, row} ->
values =
stored_columns
|> Enum.map(fn column ->
key = Table.key(column.name)
if key == table.rowid_alias do
nil
else
Map.get(row, key)
end
end)
{rowid, values}
end)
build_table_pages(rows, :rowid, state)
end
defp build_rowid_table_rows(rows, table_name, state) when is_list(rows) do
row_entries =
rows
|> Enum.with_index(1)
|> Enum.map(fn {values, rowid} -> {rowid, values} end)
build_table_pages(row_entries, :rowid, state, table_name: table_name)
end
defp build_without_rowid_table_pages(table, state) do
stored_columns = without_rowid_storage_columns(table)
rows =
table
|> Table.scan()
|> Enum.map(fn {_rowid, row} ->
Enum.map(stored_columns, &Map.get(row, Table.key(&1.name)))
end)
build_index_btree_pages(rows, state, "WITHOUT ROWID table")
end
defp build_index_pages(%{without_rowid: true}, _index, state), do: {0, state}
defp build_index_pages(table, index, state) do
rows =
table
|> serializable_index_entries(index)
|> Enum.flat_map(fn {values, rowids} ->
values = Tuple.to_list(values)
Enum.map(rowids, fn rowid -> values ++ [rowid] end)
end)
|> Enum.sort()
build_index_btree_pages(rows, state, "index")
end
defp serializable_index_entries(table, %{where: nil} = index) do
members = emit_index_members(index)
if Enum.all?(members, &match?({:column, _}, &1)) do
table
|> Table.scan()
|> Enum.reduce(%{}, fn {rowid, row}, entries ->
values =
Enum.map(members, fn {:column, key} ->
Map.get(row, Table.key(key))
end)
Map.update(entries, List.to_tuple(values), [rowid], fn rowids -> [rowid | rowids] end)
end)
|> Map.new(fn {values, rowids} -> {values, Enum.sort(rowids)} end)
else
Map.get(index, :entries, %{})
end
end
defp serializable_index_entries(_table, index), do: Map.get(index, :entries, %{})
defp build_table_pages(entries, :rowid, state, _opts) do
{leaf_pages, state} = pack_rowid_leaf_pages(entries, state)
case leaf_pages do
[] ->
page_no = state.next_page
state = allocate_page(state)
page = build_leaf_page(page_no, :rowid, [], state)
{page_no, put_page(state, page_no, page)}
[{page_no, _max}] ->
{page_no, state}
_ ->
build_rowid_interior_tree(leaf_pages, state)
end
end
defp build_index_btree_pages([], state, _context) do
page_no = state.next_page
state = allocate_page(state)
page = build_leaf_page(page_no, :index, [], state)
{page_no, put_page(state, page_no, page)}
end
defp build_index_btree_pages(entries, state, _context) do
{leaf_nodes, state} = pack_index_leaf_pages(entries, state, [])
build_index_interior_tree(leaf_nodes, state)
end
# Pack index records into leaf pages. SQLite's index B-tree is a true B-tree:
# the record that overflows a leaf is *promoted* — held back as the divider in
# the parent interior page — rather than stored in a leaf. `pack` returns
# `{leaf_page, divider_payload}` nodes (the last node's divider is `nil`).
defp pack_index_leaf_pages([], state, acc), do: {Enum.reverse(acc), state}
defp pack_index_leaf_pages(entries, state, acc) do
page_no = state.next_page
state = allocate_page(state)
capacity = page_region_size(state, page_no)
{cells, divider, remaining} = collect_index_leaf_cells(entries, capacity, 0, 0, [])
page = build_leaf_page(page_no, :index, cells, state)
state = put_page(state, page_no, page)
pack_index_leaf_pages(remaining, state, [{page_no, divider} | acc])
end
defp collect_index_leaf_cells([], _capacity, _count, _used, acc),
do: {Enum.reverse(acc), nil, []}
defp collect_index_leaf_cells([entry | rest], capacity, count, used, acc) do
payload = encode_record_payload(entry)
cell = encode_varint(byte_size(payload)) <> payload
needed = page_header_size(10) + 2 * (count + 1) + used + byte_size(cell)
if needed <= capacity do
collect_index_leaf_cells(rest, capacity, count + 1, used + byte_size(cell), [cell | acc])
else
if count == 0 do
raise ExSQL.Error, message: "index entry too large for a single page"
end
{Enum.reverse(acc), payload, rest}
end
end
defp build_index_interior_tree([{page_no, _divider}], state), do: {page_no, state}
defp build_index_interior_tree(nodes, state) do
{parents, state} = pack_index_interior_pages(nodes, state, [])
build_index_interior_tree(parents, state)
end
defp pack_index_interior_pages([], state, acc), do: {Enum.reverse(acc), state}
defp pack_index_interior_pages(nodes, state, acc) do
page_no = state.next_page
state = allocate_page(state)
capacity = page_region_size(state, page_no)
{cells, right_most, promoted, remaining} =
collect_index_interior_cells(nodes, capacity, 0, 0, [])
page = build_interior_page(state, page_no, cells, right_most, 2)
state = put_page(state, page_no, page)
pack_index_interior_pages(remaining, state, [{page_no, promoted} | acc])
end
# An interior page covers a run of children: cells `<child::32, divider>` for
# all but its rightmost child, the rightmost in the header, and the divider
# after its rightmost promoted up to the next level.
defp collect_index_interior_cells([{page, divider}], _capacity, _count, _used, cells) do
{Enum.reverse(cells), page, divider, []}
end
defp collect_index_interior_cells([{page, divider} | rest], capacity, count, used, cells) do
cell = render_index_interior_cell(page, divider)
needed = page_header_size(:interior) + 2 * (count + 1) + used + byte_size(cell)
if needed <= capacity do
collect_index_interior_cells(rest, capacity, count + 1, used + byte_size(cell), [
cell | cells
])
else
if count == 0 do
raise ExSQL.Error, message: "index interior cell too large for a single page"
end
{Enum.reverse(cells), page, divider, rest}
end
end
defp render_index_interior_cell(page, payload) do
<<page::32, encode_varint(byte_size(payload))::binary, payload::binary>>
end
defp build_table_pages(entries, :rowid, state),
do: build_table_pages(entries, :rowid, state, [])
defp pack_rowid_leaf_pages(entries, state) do
do_pack_rowid_leaf_pages(entries, state, [])
end
defp do_pack_rowid_leaf_pages([], state, chunks), do: {Enum.reverse(chunks), state}
defp do_pack_rowid_leaf_pages(entries, state, chunks) do
page_no = state.next_page
state = allocate_page(state)
capacity = page_region_size(state, page_no)
{page_rows, remaining, state} =
collect_rowid_leaf_rows(
entries,
capacity,
0,
0,
[],
state
)
if page_rows == [] do
raise ExSQL.Error,
message:
"row too large for a single rowid-table page: #{inspect(List.first(entries) |> elem(0))}"
end
cells = Enum.map(page_rows, &elem(&1, 1))
page = build_leaf_page(page_no, :rowid, cells, state)
state = put_page(state, page_no, page)
max_rowid = List.last(page_rows) |> elem(0)
if remaining == [] do
{Enum.reverse([{page_no, max_rowid} | chunks]), state}
else
do_pack_rowid_leaf_pages(remaining, state, [{page_no, max_rowid} | chunks])
end
end
defp collect_rowid_leaf_rows([], _capacity, _row_count, _used, rows_acc, state) do
{Enum.reverse(rows_acc), [], state}
end
defp collect_rowid_leaf_rows(
[{rowid, values} | remaining],
capacity,
row_count,
used,
rows_acc,
state
) do
payload = encode_record_payload(values)
payload_size = byte_size(payload)
# Size the cell (inline, or inline-prefix + 4-byte overflow pointer) without
# allocating overflow pages yet, so a row that spills to the next page
# doesn't leave orphan pages behind.
cell_size = rowid_leaf_cell_size(rowid, payload_size, state.usable)
needed =
page_header_size(:rowid) +
2 * (row_count + 1) +
used +
cell_size
if needed <= capacity do
{cell, state} = rowid_leaf_cell(rowid, payload, state)
collect_rowid_leaf_rows(
remaining,
capacity,
row_count + 1,
used + byte_size(cell),
[{rowid, cell} | rows_acc],
state
)
else
if row_count == 0 do
raise ExSQL.Error,
message: "row too large for a single rowid-table page: #{inspect(rowid)}"
end
{Enum.reverse(rows_acc), [{rowid, values} | remaining], state}
end
end
defp build_rowid_interior_tree([{page_no, _max_rowid}], state), do: {page_no, state}
defp build_rowid_interior_tree(nodes, state) do
{parents, state} = pack_rowid_interior_pages(nodes, state)
build_rowid_interior_tree(parents, state)
end
defp pack_rowid_interior_pages(nodes, state) do
nodes
|> rowid_interior_chunks(state)
|> Enum.map_reduce(state, fn chunk, acc_state ->
build_rowid_interior_page(chunk, acc_state)
end)
end
defp rowid_interior_chunks(nodes, state) do
nodes
|> do_rowid_interior_chunks(state, [])
|> rebalance_single_child_tail()
end
defp do_rowid_interior_chunks([], _state, chunks), do: Enum.reverse(chunks)
defp do_rowid_interior_chunks([first | rest], state, chunks) do
{chunk, remaining} = collect_rowid_interior_chunk(rest, state, [first], 0, 0)
do_rowid_interior_chunks(remaining, state, [chunk | chunks])
end
defp collect_rowid_interior_chunk([], _state, children, _cell_count, _used) do
{Enum.reverse(children), []}
end
defp collect_rowid_interior_chunk(
[next | rest] = remaining,
state,
[right_most | _] = children,
cell_count,
used
) do
cell = render_rowid_interior_cell(right_most)
needed =
page_header_size(:interior) +
2 * (cell_count + 1) +
used +
byte_size(cell)
if needed <= page_region_size(state, state.next_page) do
collect_rowid_interior_chunk(
rest,
state,
[next | children],
cell_count + 1,
used + byte_size(cell)
)
else
if cell_count == 0 do
raise ExSQL.Error,
message: "rowid-table interior cell too large for a single page"
end
{Enum.reverse(children), remaining}
end
end
defp rebalance_single_child_tail([_only] = chunks), do: chunks
defp rebalance_single_child_tail(chunks) do
case List.last(chunks) do
[_single] ->
previous_index = length(chunks) - 2
previous = Enum.at(chunks, previous_index)
case previous do
[_a, _b, _c | _rest] ->
{moved, previous} = List.pop_at(previous, -1)
chunks
|> List.replace_at(previous_index, previous)
|> List.update_at(-1, &[moved | &1])
_other ->
raise ExSQL.Error,
message: "unable to rebalance rowid-table interior pages"
end
_tail ->
chunks
end
end
defp build_rowid_interior_page(children, state) do
page_no = state.next_page
state = allocate_page(state)
cells =
children
|> Enum.take(length(children) - 1)
|> Enum.map(&render_rowid_interior_cell/1)
right_most = children |> List.last() |> elem(0)
page = build_interior_page(state, page_no, cells, right_most)
max_rowid = children |> List.last() |> elem(1)
{{page_no, max_rowid}, put_page(state, page_no, page)}
end
defp build_leaf_page(page_no, :rowid, cells, state) do
build_leaf_page(page_no, 13, cells, state)
end
defp build_leaf_page(page_no, :index, cells, state) do
build_leaf_page(page_no, 10, cells, state)
end
defp build_leaf_page(page_no, page_type, cells, state) do
capacity = page_region_size(state, page_no)
header_size = page_header_size(page_type)
page_offset = page_start_offset(page_no)
cell_area = IO.iodata_to_binary(cells)
content_start = page_offset + capacity - byte_size(cell_area)
pointers =
cells
|> Enum.reduce({[], content_start}, fn cell, {acc, cursor} ->
{[cursor | acc], cursor + byte_size(cell)}
end)
|> elem(0)
|> Enum.reverse()
header =
case page_type do
13 ->
<<13::8, 0::16, length(cells)::16, content_start::16, 0::8>>
10 ->
<<10::8, 0::16, length(cells)::16, content_start::16, 0::8>>
_ ->
fail("unsupported leaf page type: #{page_type}")
end
pointer_area = Enum.map_join(pointers, <<>>, &<<&1::16>>)
pointer_end = page_offset + header_size + byte_size(pointer_area)
gap_size = content_start - pointer_end
if gap_size < 0 do
raise ExSQL.Error, message: "not enough room to build page #{page_no}"
end
content =
String.duplicate(<<0>>, page_offset) <>
header <> pointer_area <> String.duplicate(<<0>>, gap_size) <> cell_area
if byte_size(content) > capacity + page_offset do
raise ExSQL.Error, message: "not enough room to build page #{page_no}"
end
content <> String.duplicate(<<0>>, page_size(state) - byte_size(content))
end
defp build_interior_page(state, page_no, cells, right_most, page_type \\ 5) do
capacity = page_region_size(state, page_no)
page_offset = page_start_offset(page_no)
header_size = 12
cell_area = IO.iodata_to_binary(cells)
content_start = page_offset + capacity - byte_size(cell_area)
pointers =
cells
|> Enum.reduce({[], content_start}, fn cell, {acc, cursor} ->
{[cursor | acc], cursor + byte_size(cell)}
end)
|> elem(0)
|> Enum.reverse()
header =
<<
page_type::8,
0::16,
length(cells)::16,
content_start::16,
0::8,
right_most::32
>>
pointer_area = Enum.map_join(pointers, <<>>, &<<&1::16>>)
pointer_end = page_offset + header_size + byte_size(pointer_area)
gap_size = content_start - pointer_end
if gap_size < 0 do
raise ExSQL.Error, message: "not enough room to build interior page #{page_no}"
end
content =
String.duplicate(<<0>>, page_offset) <>
header <> pointer_area <> String.duplicate(<<0>>, gap_size) <> cell_area
if byte_size(content) > capacity + page_offset do
raise ExSQL.Error, message: "not enough room to build interior page #{page_no}"
end
content <> String.duplicate(<<0>>, page_size(state) - byte_size(content))
end
# SQLite's local-payload formula (mirrors `read_payload/4`): a table-leaf cell
# stores up to `max_local` payload bytes inline; a larger payload keeps a
# computed prefix inline followed by a 4-byte pointer to an overflow-page
# chain holding the rest.
defp rowid_leaf_local_size(payload_size, usable) do
max_local = usable - 35
if payload_size <= max_local do
payload_size
else
min_local = div((usable - 12) * 32, 255) - 23
surplus = min_local + rem(payload_size - min_local, usable - 4)
if surplus <= max_local, do: surplus, else: min_local
end
end
defp rowid_leaf_cell_size(rowid, payload_size, usable) do
local = rowid_leaf_local_size(payload_size, usable)
overflow_pointer = if local < payload_size, do: 4, else: 0
byte_size(encode_varint(payload_size)) + byte_size(encode_varint(rowid)) + local +
overflow_pointer
end
defp rowid_leaf_cell(rowid, payload, state) do
payload_size = byte_size(payload)
local = rowid_leaf_local_size(payload_size, state.usable)
header = encode_varint(payload_size) <> encode_varint(rowid)
if local >= payload_size do
{header <> payload, state}
else
<<local_payload::binary-size(^local), overflow::binary>> = payload
{first_page, state} = write_overflow_chain(overflow, state)
{header <> local_payload <> <<first_page::32>>, state}
end
end
# Splits the spilled payload into `usable - 4` byte chunks across a linked list
# of overflow pages (each page: 4-byte next-page pointer, then data; last = 0),
# and returns the first page number.
defp write_overflow_chain(data, state) do
chunk_size = state.usable - 4
chunks = chunk_binary(data, chunk_size)
{page_nos, state} =
Enum.map_reduce(chunks, state, fn _chunk, acc ->
{acc.next_page, allocate_page(acc)}
end)
state =
chunks
|> Enum.zip(page_nos)
|> Enum.with_index()
|> Enum.reduce(state, fn {{chunk, page_no}, idx}, acc ->
next = Enum.at(page_nos, idx + 1, 0)
put_page(acc, page_no, build_overflow_page(acc, chunk, next))
end)
{hd(page_nos), state}
end
defp build_overflow_page(state, data, next_page) do
content = <<next_page::32>> <> data
content <> String.duplicate(<<0>>, page_size(state) - byte_size(content))
end
defp chunk_binary(<<>>, _size), do: []
defp chunk_binary(data, size) when byte_size(data) <= size, do: [data]
defp chunk_binary(data, size) do
<<head::binary-size(^size), rest::binary>> = data
[head | chunk_binary(rest, size)]
end
defp render_rowid_interior_cell({page_no, max_rowid}) do
<<page_no::32, encode_varint(max_rowid)::binary>>
end
defp page_header_size(13), do: 8
defp page_header_size(10), do: 8
defp page_header_size(5), do: 12
defp page_header_size(:interior), do: 12
defp page_header_size(_), do: 8
defp page_region_size(%{usable: usable}, 1), do: usable - 100
defp page_region_size(%{usable: usable}, _page_no), do: usable
defp page_start_offset(1), do: 100
defp page_start_offset(_), do: 0
defp page_size(%{page_size: page_size}), do: page_size
defp allocate_page(%{next_page: next_page} = state), do: %{state | next_page: next_page + 1}
defp put_page(state, page_no, page) do
%{state | pages: Map.put(state.pages, page_no, page)}
end
defp build_header(db, page_size, page_count) do
magic = "SQLite format 3" <> <<0>>
db_page_size = if page_size == 65_536, do: 1, else: page_size
payload =
<<db_page_size::16, 1, 1, 0, 64, 32, 32, 0::32, page_count::32, 0::32, 0::32, 0::32, 4::32,
0::32, db.auto_vacuum::32, 1::32>>
padding = String.duplicate(<<0>>, 100 - 16 - byte_size(payload))
magic <> payload <> padding
end
defp assemble_file(header, state, page_count) do
page_size = state.page_size
pages =
for page_no <- 1..page_count do
case Map.get(state.pages, page_no) do
nil when page_no == 1 ->
header <> String.duplicate(<<0>>, page_size - 100)
nil ->
String.duplicate(<<0>>, page_size)
page when page_no == 1 ->
header <> binary_part(page, 100, page_size - 100)
page ->
page
end
end
IO.iodata_to_binary(pages)
end
defp encode_record_payload(values) do
{serial_bytes, body} =
values
|> Enum.map(&encode_value/1)
|> Enum.unzip()
serial = encode_records_header(serial_bytes)
header_size = byte_size(serial) + byte_size(encode_varint(byte_size(serial) + 1))
header = encode_varint(header_size)
IO.iodata_to_binary([header, serial | body])
end
defp encode_records_header(serials) do
Enum.map_join(serials, <<>>, &encode_varint/1)
end
defp encode_varint(value) when is_integer(value) and value < 0 do
raise ExSQL.Error, message: "negative varints are not supported: #{value}"
end
defp encode_varint(value) when is_integer(value) do
if value > 18_446_744_073_709_551_615 do
raise ExSQL.Error, message: "varint overflow: #{value}"
end
value
|> varint_groups([])
|> encode_varint_bytes()
end
defp varint_groups(value, groups) when value < 128 and length(groups) < 8 do
[value | groups]
end
defp varint_groups(value, groups) when value < 256 and length(groups) == 8 do
[value | groups]
end
defp varint_groups(value, groups) do
varint_groups(div(value, 128), [Bitwise.band(value, 0x7F) | groups])
end
defp encode_varint_bytes([byte]), do: <<byte::8>>
defp encode_varint_bytes([head | rest]) do
<<Bitwise.bor(head, 0x80)::8, encode_varint_bytes(rest)::binary>>
end
defp encode_value(nil), do: {0, <<>>}
defp encode_value(:nan), do: {7, <<:math.pow(-1.0, 0.5)::float-64>>}
defp encode_value({:blob, blob}) when is_binary(blob) do
{12 + byte_size(blob) * 2, blob}
end
defp encode_value(value) when is_binary(value) do
{13 + 2 * byte_size(value), value}
end
defp encode_value({:json, json}) when is_binary(json) do
{13 + 2 * byte_size(json), json}
end
defp encode_value(value) when is_integer(value) do
cond do
value == 0 ->
{8, <<>>}
value == 1 ->
{9, <<>>}
value >= -128 and value <= 127 ->
{1, <<value::signed-size(8)>>}
value >= -32_768 and value <= 32_767 ->
{2, <<value::signed-size(16)>>}
value >= -8_388_608 and value <= 8_388_607 ->
{3, <<value::signed-size(24)>>}
value >= -2_147_483_648 and value <= 2_147_483_647 ->
{4, <<value::signed-size(32)>>}
value >= -140_737_488_355_328 and value <= 140_737_488_355_327 ->
{5, <<value::signed-size(48)>>}
true ->
{6, <<value::signed-size(64)>>}
end
end
defp encode_value(value) when is_float(value) do
{7, <<value::float-size(64)>>}
end
defp encode_value(_other), do: {0, <<>>}
defp emit_create_table_sql(table) do
definition_sql =
table.columns
|> Enum.map(&emit_column_def_sql/1)
|> Kernel.++(Enum.map(table.foreign_keys, &emit_table_foreign_key_sql(table, &1)))
|> Enum.join(", ")
options =
[
if(table.without_rowid, do: "WITHOUT ROWID"),
if(table.strict, do: "STRICT")
]
|> Enum.reject(&is_nil/1)
suffix = if options == [], do: "", else: " " <> Enum.join(options, ", ")
"CREATE TABLE #{table.name}(#{definition_sql})#{suffix}"
end
defp emit_column_def_sql(column) do
[
column.name,
column.declared_type,
emit_generated_sql(column),
if(column.primary_key, do: "PRIMARY KEY"),
if(column.autoincrement, do: "AUTOINCREMENT"),
if(column.not_null, do: "NOT NULL"),
if(column.unique, do: "UNIQUE"),
if(column.default, do: "DEFAULT #{emit_default(column.default)}"),
if(column.collate, do: "COLLATE #{column.collate}"),
emit_references_sql(column)
]
|> Enum.reject(&is_nil/1)
|> Enum.join(" ")
end
defp emit_generated_sql(%{generated: {kind, expr}}) do
"GENERATED ALWAYS AS (#{emit_expr(expr)}) #{kind |> Atom.to_string() |> String.upcase()}"
end
defp emit_generated_sql(_column), do: nil
defp emit_references_sql(%{references: {table, [], actions}}) do
"REFERENCES #{table}#{emit_fk_actions(actions)}"
end
defp emit_references_sql(%{references: {table, columns, actions}}) do
"REFERENCES #{table}(#{Enum.join(columns, ", ")})#{emit_fk_actions(actions)}"
end
defp emit_references_sql(_column), do: nil
defp emit_fk_actions(actions) do
[
if(actions.on_delete != :no_action, do: " ON DELETE #{emit_fk_action(actions.on_delete)}"),
if(actions.on_update != :no_action, do: " ON UPDATE #{emit_fk_action(actions.on_update)}"),
if(actions.deferred, do: " DEFERRABLE INITIALLY DEFERRED")
]
|> Enum.reject(&is_nil/1)
|> Enum.join()
end
defp emit_fk_action(:restrict), do: "RESTRICT"
defp emit_fk_action(:set_null), do: "SET NULL"
defp emit_fk_action(:set_default), do: "SET DEFAULT"
defp emit_fk_action(:cascade), do: "CASCADE"
defp emit_table_foreign_key_sql(table, {child_keys, parent_table, parent_keys, actions}) do
child_columns =
child_keys
|> Enum.map_join(", ", &display_column_name(table, &1))
references =
case parent_keys do
[] -> parent_table
keys -> "#{parent_table}(#{Enum.join(keys, ", ")})"
end
"FOREIGN KEY(#{child_columns}) REFERENCES #{references}#{emit_fk_actions(actions)}"
end
defp emit_create_index_sql(table, index) do
unique = if index.unique, do: "UNIQUE ", else: ""
columns =
index
|> emit_index_members()
|> Enum.map_join(", ", fn
{:column, key} -> display_column_name(table, key)
{:expr, expr} -> emit_expr(expr)
end)
where = if index.where, do: " WHERE #{emit_expr(index.where)}", else: ""
"CREATE #{unique}INDEX #{index.name} ON #{table.name}(#{columns})#{where}"
end
defp emit_default({:literal, nil}), do: "NULL"
defp emit_default({:literal, {:blob, blob}}), do: "x'#{Base.encode16(blob)}'"
defp emit_default({:literal, value}) when is_binary(value), do: "'#{value}'"
defp emit_default({:literal, value}), do: Value.to_text(value)
defp emit_default({:negate, {:literal, value}}), do: "-#{Value.to_text(value)}"
defp emit_default({:column, nil, word}), do: word
# An expression default (e.g. `(strftime('%Y-%m-%dT%H:%M:%SZ','now'))`): emit
# the expression, parenthesized as SQLite stores and re-parses it. Without
# this the schema round-trip through the file format would lose the default.
defp emit_default(expr), do: "(#{emit_expr(expr)})"
defp emit_expr({:param, _index, raw}), do: raw
defp emit_expr({:column, nil, name}), do: name
defp emit_expr({:column, table, name}), do: "#{table}.#{name}"
defp emit_expr({:literal, nil}), do: "NULL"
defp emit_expr({:literal, {:blob, b}}), do: "x'#{Base.encode16(b)}'"
defp emit_expr({:literal, v}) when is_binary(v), do: "'#{v}'"
defp emit_expr({:literal, v}), do: Value.to_text(v)
defp emit_expr({:function, name, :star}), do: "#{name}(*)"
defp emit_expr({:function, name, {:distinct, args}}),
do: "#{name}(DISTINCT #{Enum.map_join(args, ", ", &emit_expr/1)})"
defp emit_expr({:function, name, args}),
do: "#{name}(#{Enum.map_join(args, ", ", &emit_expr/1)})"
defp emit_expr({:binary, op, left, right}),
do: "#{emit_expr(left)} #{op} #{emit_expr(right)}"
defp emit_expr({:collate, expr, name}), do: "#{emit_expr(expr)} COLLATE #{name}"
defp emit_expr({:negate, expr}), do: "-#{emit_expr(expr)}"
defp emit_expr({:not, expr}), do: "NOT #{emit_expr(expr)}"
defp emit_expr(_expr), do: "expr"
defp emit_index_members(index),
do: Map.get(index, :members) || Enum.map(index.columns, &{:column, &1})
defp display_column_name(table, column_key) do
case Enum.find(table.columns, &(Table.key(&1.name) == column_key)) do
nil -> column_key
column -> column.name
end
end
defp read_file(path) do
case File.read(path) do
{:ok, data} -> {:ok, data}
{:error, reason} -> {:error, "unable to open database file: #{reason}"}
end
end
# -- file header (the first 100 bytes) ----------------------------------------
defp parse_header(<<"SQLite format 3", 0, rest::binary>> = data) do
<<page_size::16, _write_version, _read_version, reserved, _max_frac, _min_frac, _leaf_frac,
_change_count::32, _db_pages::32, _freelist_first::32, _freelist_count::32,
_schema_cookie::32, _schema_format::32, _cache_size::32, _vacuum::32, encoding::32,
_rest::binary>> = rest
page_size = if page_size == 1, do: 65_536, else: page_size
cond do
encoding not in [0, 1] ->
{:error, "unsupported text encoding (only UTF-8 databases are readable)"}
byte_size(data) < page_size ->
{:error, "file is not a database"}
true ->
{:ok, %{data: data, page_size: page_size, usable: page_size - reserved, pages: %{}}}
end
end
defp parse_header(_data), do: {:error, "file is not a database"}
# -- WAL replay ---------------------------------------------------------------
defp replay_wal(path, file) do
case File.read(path <> "-wal") do
{:ok, wal} when byte_size(wal) >= 32 ->
wal_pages(wal, file)
{:ok, _empty_or_short} ->
{:ok, file}
{:error, :enoent} ->
{:ok, file}
{:error, reason} ->
{:error, "unable to open WAL file: #{reason}"}
end
end
defp wal_pages(wal, file) do
<<magic::32, _version::32, wal_page_size::32, _checkpoint::32, salt1::32, salt2::32,
_checksum1::32, _checksum2::32, frames::binary>> = wal
page_size = if wal_page_size == 0, do: file.page_size, else: wal_page_size
cond do
magic not in [0x377F0682, 0x377F0683] ->
{:ok, file}
page_size != file.page_size ->
{:error, "WAL page size does not match database page size"}
true ->
committed =
frames
|> wal_frames(page_size, salt1, salt2)
|> committed_wal_pages()
{:ok, %{file | pages: Map.merge(file.pages, committed)}}
end
rescue
MatchError -> {:ok, file}
ArgumentError -> {:ok, file}
end
defp wal_frames(frames, page_size, salt1, salt2) do
frame_size = page_size + 24
complete_frames = div(byte_size(frames), frame_size)
Enum.map(0..(complete_frames - 1)//1, fn index ->
offset = index * frame_size
frame = binary_part(frames, offset, frame_size)
<<page_number::32, db_size::32, ^salt1::32, ^salt2::32, _checksum1::32, _checksum2::32,
page::binary-size(^page_size)>> = frame
%{page_number: page_number, db_size: db_size, page: page}
end)
end
defp committed_wal_pages(frames) do
commit_index =
frames
|> Enum.with_index()
|> Enum.reduce(nil, fn
{%{db_size: db_size}, index}, _last when db_size > 0 -> index
_frame, last -> last
end)
case commit_index do
nil ->
%{}
index ->
frames
|> Enum.take(index + 1)
|> Map.new(fn %{page_number: page_number, page: page} -> {page_number, page} end)
end
end
# -- loading -------------------------------------------------------------------
defp load(file) do
schema_rows = btree_rows(file, 1)
schema =
Enum.map(schema_rows, fn {_rowid, [type, name, tbl_name, rootpage, sql]} ->
%{type: type, name: name, tbl_name: tbl_name, rootpage: rootpage, sql: sql}
end)
db =
Enum.reduce(schema, Database.new(), fn entry, db ->
case entry do
%{name: "sqlite_sequence"} ->
db
%{sql: nil} ->
# Auto-indexes for UNIQUE/PK constraints have no SQL; the
# constraints themselves come back with the CREATE TABLE.
db
%{sql: sql} ->
case Executor.run(db, sql) do
{:ok, _results, db} -> db
{:error, error, _db} -> fail("schema error in #{entry.name}: #{error.message}")
end
end
end)
db =
Enum.reduce(schema, db, fn entry, db ->
case entry do
%{type: "table", name: name, rootpage: rootpage}
when name != "sqlite_sequence" and rootpage > 0 ->
load_table_rows(file, db, name, rootpage)
_other ->
db
end
end)
{:ok, load_sequences(file, db, schema)}
end
defp load_table_rows(file, db, name, rootpage) do
case Database.fetch_table(db, name) do
{:error, _} ->
db
{:ok, %{without_rowid: true} = table} ->
storage_columns = without_rowid_storage_columns(table)
rows =
file
|> index_btree_records(rootpage)
|> Enum.with_index(1)
|> Map.new(fn {values, rowid} ->
row =
storage_columns
|> zip_pad(values)
|> Map.new(fn {column, value} -> {Table.key(column.name), value} end)
{rowid, row}
end)
next_rowid = map_size(rows) + 1
Database.put_table(
db,
clear_index_entries(
Table.narrow_all_rows(%{table | rows: rows, next_rowid: next_rowid})
)
)
{:ok, table} ->
stored_columns = Enum.reject(table.columns, &match?({:virtual, _}, &1.generated))
rows =
for {rowid, values} <- btree_rows(file, rootpage), into: %{} do
row =
stored_columns
|> zip_pad(values)
|> Map.new(fn {column, value} ->
key = Table.key(column.name)
value =
if key == table.rowid_alias and value == nil do
rowid
else
value
end
{key, value}
end)
{rowid, row}
end
next_rowid = (rows |> Map.keys() |> Enum.max(fn -> 0 end)) + 1
Database.put_table(
db,
clear_index_entries(
Table.narrow_all_rows(%{table | rows: rows, next_rowid: next_rowid})
)
)
end
end
# Rows are bulk-loaded directly into `table.rows`, bypassing the executor's
# per-row index maintenance. The schema replay ran CREATE INDEX against the
# then-empty table, leaving `entries: %{}` — which the executor treats as
# already materialized. Drop `:entries` so the lazy `ensure_index_entries`
# path rebuilds them from the loaded rows on first use.
defp clear_index_entries(%{indexes: indexes, autoindexes: autoindexes} = table) do
%{
table
| indexes: Enum.map(indexes, &Map.delete(&1, :entries)),
autoindexes: Enum.map(autoindexes, &Map.delete(&1, :entries))
}
end
defp clear_index_entries(table), do: table
defp without_rowid_storage_columns(table) do
stored_columns = Enum.reject(table.columns, &match?({:virtual, _}, &1.generated))
primary_keys =
case table.composite_keys do
[{_name, keys} | _rest] ->
keys
[] ->
stored_columns
|> Enum.filter(& &1.primary_key)
|> Enum.map(&Table.key(&1.name))
end
primary_key_set = MapSet.new(primary_keys)
primary_columns =
Enum.map(primary_keys, fn key -> Enum.find(stored_columns, &(Table.key(&1.name) == key)) end)
|> Enum.reject(&is_nil/1)
rest_columns =
Enum.reject(stored_columns, &(Table.key(&1.name) in primary_key_set))
primary_columns ++ rest_columns
end
# Records written before an ALTER TABLE ADD COLUMN are short; the missing
# trailing columns take their declared defaults.
defp zip_pad([], _values), do: []
defp zip_pad([column | columns], []),
do: [{column, stored_default(column)} | zip_pad(columns, [])]
defp zip_pad([column | columns], [value | values]),
do: [{column, value} | zip_pad(columns, values)]
defp stored_default(%{default: {:literal, value}}), do: value
defp stored_default(_column), do: nil
defp load_sequences(file, db, schema) do
case Enum.find(schema, &(&1.type == "table" and &1.name == "sqlite_sequence")) do
nil ->
db
%{rootpage: rootpage} ->
Enum.reduce(btree_rows(file, rootpage), db, fn {_rowid, [name, seq]}, db ->
case Database.fetch_table(db, name || "") do
{:ok, table} when is_integer(seq) ->
Database.put_table(db, %{table | sequence: seq, sequence_row: true})
_other ->
db
end
end)
end
end
# -- B-tree walking --------------------------------------------------------------
# All {rowid, values} rows of the table B-tree rooted at `page_number`.
defp btree_rows(file, page_number) do
page = page(file, page_number)
header_offset = if page_number == 1, do: 100, else: 0
page_type = :binary.at(page, header_offset)
ncells = u16(page, header_offset + 3)
case page_type do
# interior table page
5 ->
right_most = u32(page, header_offset + 8)
children =
page
|> cell_pointers(header_offset + 12, ncells)
|> Enum.map(&u32(page, &1))
Enum.flat_map(children ++ [right_most], &btree_rows(file, &1))
# leaf table page
13 ->
page
|> cell_pointers(header_offset + 8, ncells)
|> Enum.map(&leaf_cell(file, page, &1))
other ->
fail("unsupported B-tree page type: #{other}")
end
end
# All records in an index B-tree rooted at `page_number`. WITHOUT ROWID
# tables are stored as index B-trees whose key payload is the complete row.
defp index_btree_records(file, page_number) do
page = page(file, page_number)
header_offset = if page_number == 1, do: 100, else: 0
page_type = :binary.at(page, header_offset)
ncells = u16(page, header_offset + 3)
case page_type do
# interior index page
2 ->
right_most = u32(page, header_offset + 8)
page
|> cell_pointers(header_offset + 12, ncells)
|> Enum.flat_map(fn offset ->
child = u32(page, offset)
index_btree_records(file, child) ++ [index_interior_cell(file, page, offset)]
end)
|> Kernel.++(index_btree_records(file, right_most))
# leaf index page
10 ->
page
|> cell_pointers(header_offset + 8, ncells)
|> Enum.map(&index_leaf_cell(file, page, &1))
other ->
fail("unsupported B-tree page type: #{other}")
end
end
defp page(%{pages: pages, data: data, page_size: page_size}, page_number) do
case Map.fetch(pages, page_number) do
{:ok, page} ->
page
:error ->
binary_part(data, (page_number - 1) * page_size, page_size)
end
end
defp u16(bin, offset), do: :binary.decode_unsigned(binary_part(bin, offset, 2))
defp u32(bin, offset), do: :binary.decode_unsigned(binary_part(bin, offset, 4))
defp cell_pointers(page, array_offset, ncells) do
for index <- 0..(ncells - 1)//1, do: u16(page, array_offset + index * 2)
end
defp leaf_cell(file, page, offset) do
cell = binary_part(page, offset, byte_size(page) - offset)
{payload_size, cell} = varint(cell)
{rowid, cell} = varint(cell)
payload = read_payload(file, cell, payload_size, :table_leaf)
{rowid, decode_record(payload)}
end
defp index_leaf_cell(file, page, offset) do
cell = binary_part(page, offset, byte_size(page) - offset)
{payload_size, cell} = varint(cell)
payload = read_payload(file, cell, payload_size, :index)
decode_record(payload)
end
defp index_interior_cell(file, page, offset) do
cell = binary_part(page, offset + 4, byte_size(page) - offset - 4)
{payload_size, cell} = varint(cell)
payload = read_payload(file, cell, payload_size, :index)
decode_record(payload)
end
# Payload overflow: a leaf cell stores up to maxLocal bytes locally; the
# remainder lives on a chain of overflow pages (btreeInt.h's local-payload
# computation for table leaves).
defp read_payload(%{usable: usable} = file, cell, payload_size, kind) do
max_local =
case kind do
:table_leaf -> usable - 35
:index -> div((usable - 12) * 64, 255) - 23
end
if payload_size <= max_local do
binary_part(cell, 0, payload_size)
else
min_local = div((usable - 12) * 32, 255) - 23
surplus = min_local + rem(payload_size - min_local, usable - 4)
local = if surplus <= max_local, do: surplus, else: min_local
<<local_payload::binary-size(^local), first_overflow::32, _::binary>> = cell
overflow = read_overflow(file, first_overflow, payload_size - local)
local_payload <> overflow
end
end
defp read_overflow(_file, _page_number, 0), do: ""
defp read_overflow(%{usable: usable} = file, page_number, remaining) do
<<next::32, content::binary>> = page(file, page_number)
take = min(remaining, usable - 4)
binary_part(content, 0, take) <> read_overflow(file, next, remaining - take)
end
# -- record decoding --------------------------------------------------------------
defp decode_record(payload) do
{header_size, rest} = varint(payload)
header_bytes = header_size - (byte_size(payload) - byte_size(rest))
<<header::binary-size(^header_bytes), body::binary>> = rest
header
|> serial_types([])
|> Enum.map_reduce(body, &decode_value/2)
|> elem(0)
end
defp serial_types(<<>>, acc), do: Enum.reverse(acc)
defp serial_types(header, acc) do
{serial_type, rest} = varint(header)
serial_types(rest, [serial_type | acc])
end
defp decode_value(0, body), do: {nil, body}
defp decode_value(n, body) when n in 1..6 do
bits = Enum.at([8, 16, 24, 32, 48, 64], n - 1)
<<value::signed-size(^bits), rest::binary>> = body
{value, rest}
end
defp decode_value(7, <<value::float-64, rest::binary>>), do: {value, rest}
defp decode_value(8, body), do: {0, body}
defp decode_value(9, body), do: {1, body}
defp decode_value(n, body) when n >= 12 and rem(n, 2) == 0 do
size = div(n - 12, 2)
<<value::binary-size(^size), rest::binary>> = body
{{:blob, value}, rest}
end
defp decode_value(n, body) when n >= 13 do
size = div(n - 13, 2)
<<value::binary-size(^size), rest::binary>> = body
{value, rest}
end
# Big-endian base-128 varint, at most 9 bytes (the 9th holds 8 raw bits).
defp varint(data), do: varint(data, 0, 0)
defp varint(<<byte, rest::binary>>, shift_acc, 8) do
{Bitwise.bsl(shift_acc, 8) + byte, rest}
end
defp varint(<<byte, rest::binary>>, acc, count) do
acc = Bitwise.bsl(acc, 7) + Bitwise.band(byte, 0x7F)
if Bitwise.band(byte, 0x80) == 0 do
{acc, rest}
else
varint(rest, acc, count + 1)
end
end
defp fail(message), do: raise(ExSQL.Error, message: message)
end