defmodule ExSQL.Connection do
@moduledoc """
A stateful database handle, holding an `ExSQL.Database` value in a GenServer.
This is the process-shaped counterpart to a `sqlite3*` connection pointer:
statements from any process are serialized through the server, which is the
BEAM-native answer to SQLite's connection mutex. The functional core stays
pure — this module only threads the database value through `ExSQL.Executor`.
"""
use GenServer
alias ExSQL.{Database, Executor}
# -- client ------------------------------------------------------------------
@doc "Starts a connection holding an empty database."
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, :ok, opts)
end
@doc """
Executes all statements in `sql`, returning `{:ok, results}` with one
`ExSQL.Result` per statement. On error, statements before the failure have
taken effect (as with `sqlite3_exec`).
Bind parameters (`?`, `?NNN`, `:name`, `@name`, `$name`) are bound from
`params`: a list for positional parameters (1-based) or a map for named
ones (keys may include or omit the sigil; integer keys bind by index).
"""
@spec execute(GenServer.server(), String.t(), [ExSQL.Value.t()] | map()) ::
{:ok, [ExSQL.Result.t()]} | {:error, ExSQL.Error.t()}
def execute(conn, sql, params \\ []) do
GenServer.call(conn, {:execute, sql, params})
end
@doc "Registers or replaces a connection-local scalar function."
@spec create_function(GenServer.server(), String.t(), non_neg_integer(), function()) ::
:ok | {:error, ExSQL.Error.t()}
def create_function(conn, name, arity, callback) do
GenServer.call(conn, {:create_function, name, arity, callback})
end
@doc "Registers or replaces a connection-local aggregate function."
@spec create_aggregate(GenServer.server(), String.t(), non_neg_integer(), function()) ::
:ok | {:error, ExSQL.Error.t()}
def create_aggregate(conn, name, arity, callback) do
GenServer.call(conn, {:create_aggregate, name, arity, callback})
end
@doc "Registers or replaces a connection-local aggregate window function."
@spec create_window_function(GenServer.server(), String.t(), non_neg_integer(), function()) ::
:ok | {:error, ExSQL.Error.t()}
def create_window_function(conn, name, arity, callback) do
GenServer.call(conn, {:create_window_function, name, arity, callback})
end
@doc "Registers or replaces a connection-local incremental aggregate window function."
@spec create_incremental_window_function(
GenServer.server(),
String.t(),
non_neg_integer(),
map()
) ::
:ok | {:error, ExSQL.Error.t()}
def create_incremental_window_function(conn, name, arity, callbacks) do
GenServer.call(conn, {:create_incremental_window_function, name, arity, callbacks})
end
@doc "Registers or replaces a connection-local collation."
@spec create_collation(GenServer.server(), String.t(), function()) ::
:ok | {:error, ExSQL.Error.t()}
def create_collation(conn, name, callback) do
GenServer.call(conn, {:create_collation, name, callback})
end
@doc "Returns a snapshot of the current database value."
@spec database(GenServer.server()) :: Database.t()
def database(conn), do: GenServer.call(conn, :database)
@doc """
Captures a consistent point-in-time snapshot of the database.
The result is an immutable `ExSQL.Database` value. Reads run against it with
`read/3` see that exact point in time and are unaffected by writes committed
afterwards — the BEAM-native equivalent of a SQLite WAL read transaction /
`sqlite3_snapshot`.
"""
@spec snapshot(GenServer.server()) :: Database.t()
def snapshot(conn), do: GenServer.call(conn, :database)
@doc """
Runs a read-only query against a snapshot **in the calling process**, so it
executes concurrently with — and without blocking — the connection's other
work. Accepts either a snapshot from `snapshot/1` or a connection (which is
snapshotted first).
Because the snapshot is immutable, the read is isolated and consistent. Any
statement that would modify the database has no persistent effect (the
connection's state is untouched); `read/3` is for `SELECT`-style queries.
"""
@spec read(GenServer.server() | Database.t(), String.t(), [ExSQL.Value.t()] | map()) ::
{:ok, [ExSQL.Result.t()]} | {:error, ExSQL.Error.t()}
def read(conn_or_snapshot, sql, params \\ [])
def read(%Database{} = snapshot, sql, params) do
case Executor.run(snapshot, sql, params) do
{:ok, results, _db} -> {:ok, results}
{:error, error, _db} -> {:error, error}
end
end
def read(conn, sql, params), do: read(snapshot(conn), sql, params)
# -- server -------------------------------------------------------------------
@impl true
def init(:ok), do: {:ok, Database.new()}
@impl true
def handle_call({:execute, sql, params}, _from, db) do
case Executor.run(db, sql, params) do
{:ok, results, db} -> {:reply, {:ok, results}, db}
{:error, error, db} -> {:reply, {:error, error}, db}
end
end
def handle_call({:create_function, name, arity, callback}, _from, db) do
case Database.create_scalar_function(db, name, arity, callback) do
{:ok, db} -> {:reply, :ok, db}
{:error, message} -> {:reply, {:error, %ExSQL.Error{message: message}}, db}
end
end
def handle_call({:create_aggregate, name, arity, callback}, _from, db) do
case Database.create_aggregate_function(db, name, arity, callback) do
{:ok, db} -> {:reply, :ok, db}
{:error, message} -> {:reply, {:error, %ExSQL.Error{message: message}}, db}
end
end
def handle_call({:create_window_function, name, arity, callback}, _from, db) do
case Database.create_window_function(db, name, arity, callback) do
{:ok, db} -> {:reply, :ok, db}
{:error, message} -> {:reply, {:error, %ExSQL.Error{message: message}}, db}
end
end
def handle_call({:create_incremental_window_function, name, arity, callbacks}, _from, db) do
case Database.create_incremental_window_function(db, name, arity, callbacks) do
{:ok, db} -> {:reply, :ok, db}
{:error, message} -> {:reply, {:error, %ExSQL.Error{message: message}}, db}
end
end
def handle_call({:create_collation, name, callback}, _from, db) do
case Database.create_collation(db, name, callback) do
{:ok, db} -> {:reply, :ok, db}
{:error, message} -> {:reply, {:error, %ExSQL.Error{message: message}}, db}
end
end
def handle_call(:database, _from, db), do: {:reply, db, db}
end