Skip to main content

lib/ex_sql/log.ex

defmodule ExSQL.Log do
  @moduledoc """
  A logical redo log with an async writer, the BEAM-native answer to SQLite's
  WAL + checkpoint.

  Each committed transaction's effects are appended to a per-database log file;
  on open the base SQLite file is read and the log replayed on top; periodically
  the log is folded back into a fresh base file (a checkpoint). Write cost per
  commit is O(change), not O(total DB size) — unlike the whole-file rewrite of
  `:file` mode.

  One writer process per base path (registered in `ExSQL.LogRegistry`, started
  under `ExSQL.LogSupervisor`) owns the log file. A connection `cast`s redo
  records to it and returns immediately; the writer batches appends and fsyncs
  on a timer / on `flush/1` / checkpoint / shutdown. With `pool_size: 1` there is
  a single ordered caster, so the log is in commit order.

  ## Record format

  One record per committed transaction: `term_to_binary([{sql, params}, …])`,
  framed as `<<size::32, crc32::32, payload::binary>>`. A torn or
  crc-mismatched tail stops replay, so an interrupted append is dropped whole.

  ## Crash-safe checkpoint

  Folding the log into the base must survive a crash at any step. Done in the
  serialized writer (no appends interleave):

    1. fsync the active log
    2. `db = read(base) + replay(log)`
    3. write `base.new` (temp); fsync
    4. rename `log` → `log.archived`; open a fresh empty `log`
    5. commit point: rename `base.new` → `base`
    6. delete `log.archived`

  Recovery decides from which files exist (see `recover/1`): `base.new` present
  ⇒ checkpoint didn't commit (discard it); `base.new` absent + `log.archived`
  present ⇒ it committed (base already folded — drop the archive). Never
  double-applies.

  ## Determinism (v1, statement log)

  Replay re-runs the logged SQL, so SQL-level volatile functions (`random()`,
  `datetime('now')`) replay differently. rowid/AUTOINCREMENT is deterministic
  under ordered replay. The Ecto path passes resolved values as params, so this
  does not affect it.
  """

  use GenServer

  alias ExSQL.{Database, Executor, FileFormat}

  @flush_ms 50

  # -- client -------------------------------------------------------------------

  @doc "Ensures the writer for `base_path` is running and returns the recovered database."
  @spec open(Path.t(), keyword()) :: {:ok, Database.t()} | {:error, term()}
  def open(base_path, opts \\ []) do
    with {:ok, _pid} <- ensure_started(base_path, opts) do
      {:ok, GenServer.call(via(base_path), :load_db, :infinity)}
    end
  end

  @doc "Appends one committed transaction's statements (a list of `{sql, params}`)."
  @spec append(Path.t(), [{String.t(), list()}]) :: :ok
  def append(_base_path, []), do: :ok
  def append(base_path, records), do: GenServer.cast(via(base_path), {:append, records})

  @doc "Forces pending appends to disk (fsync). Returns after the write is durable."
  @spec flush(Path.t()) :: :ok
  def flush(base_path), do: GenServer.call(via(base_path), :flush, :infinity)

  @doc "Folds the log into a fresh base file and truncates the log."
  @spec checkpoint(Path.t()) :: :ok
  def checkpoint(base_path), do: GenServer.call(via(base_path), :checkpoint, :infinity)

  @doc "Stops the writer for `base_path` (flushing first)."
  @spec stop(Path.t()) :: :ok
  def stop(base_path) do
    case Registry.lookup(ExSQL.LogRegistry, base_path) do
      [{pid, _}] -> GenServer.stop(pid)
      [] -> :ok
    end
  end

  @doc false
  def ensure_started(base_path, opts \\ []) do
    case DynamicSupervisor.start_child(ExSQL.LogSupervisor, {__MODULE__, {base_path, opts}}) do
      {:ok, pid} -> {:ok, pid}
      {:error, {:already_started, pid}} -> {:ok, pid}
      other -> other
    end
  end

  @doc false
  def start_link({base_path, opts}) do
    GenServer.start_link(__MODULE__, {base_path, opts}, name: via(base_path))
  end

  def child_spec({base_path, opts}) do
    %{
      id: {__MODULE__, base_path},
      start: {__MODULE__, :start_link, [{base_path, opts}]},
      restart: :transient
    }
  end

  defp via(base_path), do: {:via, Registry, {ExSQL.LogRegistry, base_path}}

  # -- server -------------------------------------------------------------------

  @impl true
  def init({base_path, opts}) do
    paths = paths(base_path)
    File.mkdir_p!(Path.dirname(base_path))
    recover(paths)

    io = open_log(paths.log)

    state = %{
      paths: paths,
      io: io,
      sync: Keyword.get(opts, :sync, false),
      flush_ms: Keyword.get(opts, :flush_ms, @flush_ms),
      unsynced: false
    }

    schedule_flush(state)
    {:ok, state}
  end

  @impl true
  def handle_call(:load_db, _from, state) do
    {:reply, load_from_files(state.paths), state}
  end

  def handle_call(:flush, _from, state), do: {:reply, :ok, do_sync(state)}

  def handle_call(:checkpoint, _from, state), do: {:reply, :ok, do_checkpoint(state)}

  @impl true
  def handle_cast({:append, records}, state) do
    :ok = :file.write(state.io, encode(records))
    state = %{state | unsynced: true}
    {:noreply, if(state.sync, do: do_sync(state), else: state)}
  end

  @impl true
  def handle_info(:flush_tick, state) do
    state = if state.unsynced, do: do_sync(state), else: state
    schedule_flush(state)
    {:noreply, state}
  end

  @impl true
  def terminate(_reason, state) do
    _ = do_sync(state)
    _ = :file.close(state.io)
    :ok
  end

  # -- file handling ------------------------------------------------------------

  defp paths(base_path) do
    %{
      base: base_path,
      base_new: base_path <> ".new",
      log: base_path <> ".log",
      archived: base_path <> ".log.archived"
    }
  end

  defp open_log(path), do: File.open!(path, [:append, :binary, :raw])

  defp schedule_flush(%{flush_ms: ms}), do: Process.send_after(self(), :flush_tick, ms)

  defp do_sync(%{unsynced: false} = state), do: state

  defp do_sync(state) do
    :ok = :file.sync(state.io)
    %{state | unsynced: false}
  end

  # Reduce any post-crash file state back to the canonical {base, log} layout.
  defp recover(p) do
    cond do
      File.exists?(p.base_new) ->
        # Checkpoint did not reach its commit point — discard the partial base.
        # The archived log (if any) was NOT folded, so fold base + archive + log
        # into a fresh base now.
        File.rm!(p.base_new)
        canonicalize(p)

      File.exists?(p.archived) ->
        # Checkpoint committed (base already includes the archive) but cleanup
        # was interrupted — just drop the archive.
        File.rm!(p.archived)

      true ->
        :ok
    end
  end

  defp canonicalize(p) do
    db =
      read_base(p.base) |> maybe_replay(p.archived) |> maybe_replay(p.log) |> rollback_if_open()

    write_base(db, p)
    if File.exists?(p.archived), do: File.rm!(p.archived)
    if File.exists?(p.log), do: File.rm!(p.log)
  end

  defp do_checkpoint(state) do
    p = state.paths
    :ok = :file.sync(state.io)
    db = load_from_files(p)
    write_base_new(db, p)

    :ok = :file.close(state.io)
    :ok = File.rename(p.log, p.archived)
    io = open_log(p.log)

    :ok = File.rename(p.base_new, p.base)
    if File.exists?(p.archived), do: File.rm!(p.archived)

    %{state | io: io, unsynced: false}
  end

  # base + replay(active log). After recover/1 there is never an archive here.
  defp load_from_files(p), do: read_base(p.base) |> maybe_replay(p.log) |> rollback_if_open()

  # Each statement (including BEGIN/COMMIT/SAVEPOINT) is logged verbatim and
  # replayed in order, so transaction semantics reproduce themselves. A log that
  # ends mid-transaction (a crash before COMMIT) leaves the engine with an open
  # transaction; discard it, mirroring SQLite rolling back an incomplete txn on
  # recovery.
  defp rollback_if_open(%Database{txn_stack: []} = db), do: db

  defp rollback_if_open(db) do
    case Executor.run(db, "ROLLBACK") do
      {:ok, _results, db} -> db
      {:error, _error, db} -> db
    end
  end

  defp read_base(base) do
    if File.exists?(base) do
      case FileFormat.read(base) do
        {:ok, db} -> db
        {:error, message} -> raise ExSQL.Error, message: "log: unreadable base file: #{message}"
      end
    else
      Database.new()
    end
  end

  defp maybe_replay(db, log_path) do
    case File.read(log_path) do
      {:ok, bin} -> bin |> decode_frames([]) |> Enum.reduce(db, &apply_record/2)
      _ -> db
    end
  end

  defp apply_record(records, db) do
    Enum.reduce(records, db, fn {sql, params}, db ->
      case Executor.run(db, sql, params) do
        {:ok, _results, db} -> db
        {:error, _error, db} -> db
      end
    end)
  end

  # base.new := db, fsync, then rename over base.
  defp write_base(db, p) do
    write_base_new(db, p)
    :ok = File.rename(p.base_new, p.base)
  end

  defp write_base_new(db, p) do
    case FileFormat.write(db, p.base_new, journal_mode: :memory) do
      {:ok, _path} -> fsync_file(p.base_new)
      {:error, message} -> raise ExSQL.Error, message: "log: cannot write base: #{message}"
    end
  end

  defp fsync_file(path) do
    {:ok, fd} = :file.open(path, [:read, :write, :raw, :binary])
    :ok = :file.sync(fd)
    :ok = :file.close(fd)
  end

  # -- framing ------------------------------------------------------------------

  defp encode(records) do
    payload = :erlang.term_to_binary(records)
    <<byte_size(payload)::32, :erlang.crc32(payload)::32, payload::binary>>
  end

  defp decode_frames(<<size::32, crc::32, rest::binary>>, acc) when byte_size(rest) >= size do
    <<payload::binary-size(^size), tail::binary>> = rest

    if :erlang.crc32(payload) == crc do
      decode_frames(tail, [:erlang.binary_to_term(payload, [:safe]) | acc])
    else
      Enum.reverse(acc)
    end
  end

  defp decode_frames(_partial, acc), do: Enum.reverse(acc)
end