defmodule ExSQL.Log do
@moduledoc """
A logical redo log with an async writer, the BEAM-native answer to SQLite's
WAL + checkpoint.
Each committed transaction's effects are appended to a per-database log file;
on open the base SQLite file is read and the log replayed on top; periodically
the log is folded back into a fresh base file (a checkpoint). Write cost per
commit is O(change), not O(total DB size) — unlike the whole-file rewrite of
`:file` mode.
One writer process per base path (registered in `ExSQL.LogRegistry`, started
under `ExSQL.LogSupervisor`) owns the log file. A connection `cast`s redo
records to it and returns immediately; the writer batches appends and fsyncs
on a timer / on `flush/1` / checkpoint / shutdown. With `pool_size: 1` there is
a single ordered caster, so the log is in commit order.
## Record format
One record per committed transaction: `term_to_binary([{sql, params}, …])`,
framed as `<<size::32, crc32::32, payload::binary>>`. A torn or
crc-mismatched tail stops replay, so an interrupted append is dropped whole.
## Crash-safe checkpoint
Folding the log into the base must survive a crash at any step. Done in the
serialized writer (no appends interleave):
1. fsync the active log
2. `db = read(base) + replay(log)`
3. write `base.new` (temp); fsync
4. rename `log` → `log.archived`; open a fresh empty `log`
5. commit point: rename `base.new` → `base`
6. delete `log.archived`
Recovery decides from which files exist (see `recover/1`): `base.new` present
⇒ checkpoint didn't commit (discard it); `base.new` absent + `log.archived`
present ⇒ it committed (base already folded — drop the archive). Never
double-applies.
## Determinism (v1, statement log)
Replay re-runs the logged SQL, so SQL-level volatile functions (`random()`,
`datetime('now')`) replay differently. rowid/AUTOINCREMENT is deterministic
under ordered replay. The Ecto path passes resolved values as params, so this
does not affect it.
"""
use GenServer
alias ExSQL.{Database, Executor, FileFormat}
@flush_ms 50
# -- client -------------------------------------------------------------------
@doc "Ensures the writer for `base_path` is running and returns the recovered database."
@spec open(Path.t(), keyword()) :: {:ok, Database.t()} | {:error, term()}
def open(base_path, opts \\ []) do
with {:ok, _pid} <- ensure_started(base_path, opts) do
{:ok, GenServer.call(via(base_path), :load_db, :infinity)}
end
end
@doc "Appends one committed transaction's statements (a list of `{sql, params}`)."
@spec append(Path.t(), [{String.t(), list()}]) :: :ok
def append(_base_path, []), do: :ok
def append(base_path, records), do: GenServer.cast(via(base_path), {:append, records})
@doc "Forces pending appends to disk (fsync). Returns after the write is durable."
@spec flush(Path.t()) :: :ok
def flush(base_path), do: GenServer.call(via(base_path), :flush, :infinity)
@doc "Folds the log into a fresh base file and truncates the log."
@spec checkpoint(Path.t()) :: :ok
def checkpoint(base_path), do: GenServer.call(via(base_path), :checkpoint, :infinity)
@doc "Stops the writer for `base_path` (flushing first)."
@spec stop(Path.t()) :: :ok
def stop(base_path) do
case Registry.lookup(ExSQL.LogRegistry, base_path) do
[{pid, _}] -> GenServer.stop(pid)
[] -> :ok
end
end
@doc false
def ensure_started(base_path, opts \\ []) do
case DynamicSupervisor.start_child(ExSQL.LogSupervisor, {__MODULE__, {base_path, opts}}) do
{:ok, pid} -> {:ok, pid}
{:error, {:already_started, pid}} -> {:ok, pid}
other -> other
end
end
@doc false
def start_link({base_path, opts}) do
GenServer.start_link(__MODULE__, {base_path, opts}, name: via(base_path))
end
def child_spec({base_path, opts}) do
%{
id: {__MODULE__, base_path},
start: {__MODULE__, :start_link, [{base_path, opts}]},
restart: :transient
}
end
defp via(base_path), do: {:via, Registry, {ExSQL.LogRegistry, base_path}}
# -- server -------------------------------------------------------------------
@impl true
def init({base_path, opts}) do
paths = paths(base_path)
File.mkdir_p!(Path.dirname(base_path))
recover(paths)
io = open_log(paths.log)
state = %{
paths: paths,
io: io,
sync: Keyword.get(opts, :sync, false),
flush_ms: Keyword.get(opts, :flush_ms, @flush_ms),
unsynced: false
}
schedule_flush(state)
{:ok, state}
end
@impl true
def handle_call(:load_db, _from, state) do
{:reply, load_from_files(state.paths), state}
end
def handle_call(:flush, _from, state), do: {:reply, :ok, do_sync(state)}
def handle_call(:checkpoint, _from, state), do: {:reply, :ok, do_checkpoint(state)}
@impl true
def handle_cast({:append, records}, state) do
:ok = :file.write(state.io, encode(records))
state = %{state | unsynced: true}
{:noreply, if(state.sync, do: do_sync(state), else: state)}
end
@impl true
def handle_info(:flush_tick, state) do
state = if state.unsynced, do: do_sync(state), else: state
schedule_flush(state)
{:noreply, state}
end
@impl true
def terminate(_reason, state) do
_ = do_sync(state)
_ = :file.close(state.io)
:ok
end
# -- file handling ------------------------------------------------------------
defp paths(base_path) do
%{
base: base_path,
base_new: base_path <> ".new",
log: base_path <> ".log",
archived: base_path <> ".log.archived"
}
end
defp open_log(path), do: File.open!(path, [:append, :binary, :raw])
defp schedule_flush(%{flush_ms: ms}), do: Process.send_after(self(), :flush_tick, ms)
defp do_sync(%{unsynced: false} = state), do: state
defp do_sync(state) do
:ok = :file.sync(state.io)
%{state | unsynced: false}
end
# Reduce any post-crash file state back to the canonical {base, log} layout.
defp recover(p) do
cond do
File.exists?(p.base_new) ->
# Checkpoint did not reach its commit point — discard the partial base.
# The archived log (if any) was NOT folded, so fold base + archive + log
# into a fresh base now.
File.rm!(p.base_new)
canonicalize(p)
File.exists?(p.archived) ->
# Checkpoint committed (base already includes the archive) but cleanup
# was interrupted — just drop the archive.
File.rm!(p.archived)
true ->
:ok
end
end
defp canonicalize(p) do
db =
read_base(p.base) |> maybe_replay(p.archived) |> maybe_replay(p.log) |> rollback_if_open()
write_base(db, p)
if File.exists?(p.archived), do: File.rm!(p.archived)
if File.exists?(p.log), do: File.rm!(p.log)
end
defp do_checkpoint(state) do
p = state.paths
:ok = :file.sync(state.io)
db = load_from_files(p)
write_base_new(db, p)
:ok = :file.close(state.io)
:ok = File.rename(p.log, p.archived)
io = open_log(p.log)
:ok = File.rename(p.base_new, p.base)
if File.exists?(p.archived), do: File.rm!(p.archived)
%{state | io: io, unsynced: false}
end
# base + replay(active log). After recover/1 there is never an archive here.
defp load_from_files(p), do: read_base(p.base) |> maybe_replay(p.log) |> rollback_if_open()
# Each statement (including BEGIN/COMMIT/SAVEPOINT) is logged verbatim and
# replayed in order, so transaction semantics reproduce themselves. A log that
# ends mid-transaction (a crash before COMMIT) leaves the engine with an open
# transaction; discard it, mirroring SQLite rolling back an incomplete txn on
# recovery.
defp rollback_if_open(%Database{txn_stack: []} = db), do: db
defp rollback_if_open(db) do
case Executor.run(db, "ROLLBACK") do
{:ok, _results, db} -> db
{:error, _error, db} -> db
end
end
defp read_base(base) do
if File.exists?(base) do
case FileFormat.read(base) do
{:ok, db} -> db
{:error, message} -> raise ExSQL.Error, message: "log: unreadable base file: #{message}"
end
else
Database.new()
end
end
defp maybe_replay(db, log_path) do
case File.read(log_path) do
{:ok, bin} -> bin |> decode_frames([]) |> Enum.reduce(db, &apply_record/2)
_ -> db
end
end
defp apply_record(records, db) do
Enum.reduce(records, db, fn {sql, params}, db ->
case Executor.run(db, sql, params) do
{:ok, _results, db} -> db
{:error, _error, db} -> db
end
end)
end
# base.new := db, fsync, then rename over base.
defp write_base(db, p) do
write_base_new(db, p)
:ok = File.rename(p.base_new, p.base)
end
defp write_base_new(db, p) do
case FileFormat.write(db, p.base_new, journal_mode: :memory) do
{:ok, _path} -> fsync_file(p.base_new)
{:error, message} -> raise ExSQL.Error, message: "log: cannot write base: #{message}"
end
end
defp fsync_file(path) do
{:ok, fd} = :file.open(path, [:read, :write, :raw, :binary])
:ok = :file.sync(fd)
:ok = :file.close(fd)
end
# -- framing ------------------------------------------------------------------
defp encode(records) do
payload = :erlang.term_to_binary(records)
<<byte_size(payload)::32, :erlang.crc32(payload)::32, payload::binary>>
end
defp decode_frames(<<size::32, crc::32, rest::binary>>, acc) when byte_size(rest) >= size do
<<payload::binary-size(^size), tail::binary>> = rest
if :erlang.crc32(payload) == crc do
decode_frames(tail, [:erlang.binary_to_term(payload, [:safe]) | acc])
else
Enum.reverse(acc)
end
end
defp decode_frames(_partial, acc), do: Enum.reverse(acc)
end