Skip to main content

lib/ex_sql/connection.ex

defmodule ExSQL.Connection do
  @moduledoc """
  A stateful database handle, holding an `ExSQL.Database` value in a GenServer.

  This is the process-shaped counterpart to a `sqlite3*` connection pointer:
  statements from any process are serialized through the server, which is the
  BEAM-native answer to SQLite's connection mutex. The functional core stays
  pure — this module only threads the database value through `ExSQL.Executor`.
  """

  use GenServer

  alias ExSQL.{Database, Executor}

  # -- client ------------------------------------------------------------------

  @doc "Starts a connection holding an empty database."
  @spec start_link(keyword()) :: GenServer.on_start()
  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  @doc """
  Executes all statements in `sql`, returning `{:ok, results}` with one
  `ExSQL.Result` per statement. On error, statements before the failure have
  taken effect (as with `sqlite3_exec`).

  Bind parameters (`?`, `?NNN`, `:name`, `@name`, `$name`) are bound from
  `params`: a list for positional parameters (1-based) or a map for named
  ones (keys may include or omit the sigil; integer keys bind by index).
  """
  @spec execute(GenServer.server(), String.t(), [ExSQL.Value.t()] | map()) ::
          {:ok, [ExSQL.Result.t()]} | {:error, ExSQL.Error.t()}
  def execute(conn, sql, params \\ []) do
    GenServer.call(conn, {:execute, sql, params})
  end

  @doc "Registers or replaces a connection-local scalar function."
  @spec create_function(GenServer.server(), String.t(), non_neg_integer(), function()) ::
          :ok | {:error, ExSQL.Error.t()}
  def create_function(conn, name, arity, callback) do
    GenServer.call(conn, {:create_function, name, arity, callback})
  end

  @doc "Registers or replaces a connection-local aggregate function."
  @spec create_aggregate(GenServer.server(), String.t(), non_neg_integer(), function()) ::
          :ok | {:error, ExSQL.Error.t()}
  def create_aggregate(conn, name, arity, callback) do
    GenServer.call(conn, {:create_aggregate, name, arity, callback})
  end

  @doc "Registers or replaces a connection-local aggregate window function."
  @spec create_window_function(GenServer.server(), String.t(), non_neg_integer(), function()) ::
          :ok | {:error, ExSQL.Error.t()}
  def create_window_function(conn, name, arity, callback) do
    GenServer.call(conn, {:create_window_function, name, arity, callback})
  end

  @doc "Registers or replaces a connection-local incremental aggregate window function."
  @spec create_incremental_window_function(
          GenServer.server(),
          String.t(),
          non_neg_integer(),
          map()
        ) ::
          :ok | {:error, ExSQL.Error.t()}
  def create_incremental_window_function(conn, name, arity, callbacks) do
    GenServer.call(conn, {:create_incremental_window_function, name, arity, callbacks})
  end

  @doc "Registers or replaces a connection-local collation."
  @spec create_collation(GenServer.server(), String.t(), function()) ::
          :ok | {:error, ExSQL.Error.t()}
  def create_collation(conn, name, callback) do
    GenServer.call(conn, {:create_collation, name, callback})
  end

  @doc "Returns a snapshot of the current database value."
  @spec database(GenServer.server()) :: Database.t()
  def database(conn), do: GenServer.call(conn, :database)

  @doc """
  Captures a consistent point-in-time snapshot of the database.

  The result is an immutable `ExSQL.Database` value. Reads run against it with
  `read/3` see that exact point in time and are unaffected by writes committed
  afterwards — the BEAM-native equivalent of a SQLite WAL read transaction /
  `sqlite3_snapshot`.
  """
  @spec snapshot(GenServer.server()) :: Database.t()
  def snapshot(conn), do: GenServer.call(conn, :database)

  @doc """
  Runs a read-only query against a snapshot **in the calling process**, so it
  executes concurrently with — and without blocking — the connection's other
  work. Accepts either a snapshot from `snapshot/1` or a connection (which is
  snapshotted first).

  Because the snapshot is immutable, the read is isolated and consistent. Any
  statement that would modify the database has no persistent effect (the
  connection's state is untouched); `read/3` is for `SELECT`-style queries.
  """
  @spec read(GenServer.server() | Database.t(), String.t(), [ExSQL.Value.t()] | map()) ::
          {:ok, [ExSQL.Result.t()]} | {:error, ExSQL.Error.t()}
  def read(conn_or_snapshot, sql, params \\ [])

  def read(%Database{} = snapshot, sql, params) do
    case Executor.run(snapshot, sql, params) do
      {:ok, results, _db} -> {:ok, results}
      {:error, error, _db} -> {:error, error}
    end
  end

  def read(conn, sql, params), do: read(snapshot(conn), sql, params)

  # -- server -------------------------------------------------------------------

  @impl true
  def init(:ok), do: {:ok, Database.new()}

  @impl true
  def handle_call({:execute, sql, params}, _from, db) do
    case Executor.run(db, sql, params) do
      {:ok, results, db} -> {:reply, {:ok, results}, db}
      {:error, error, db} -> {:reply, {:error, error}, db}
    end
  end

  def handle_call({:create_function, name, arity, callback}, _from, db) do
    case Database.create_scalar_function(db, name, arity, callback) do
      {:ok, db} -> {:reply, :ok, db}
      {:error, message} -> {:reply, {:error, %ExSQL.Error{message: message}}, db}
    end
  end

  def handle_call({:create_aggregate, name, arity, callback}, _from, db) do
    case Database.create_aggregate_function(db, name, arity, callback) do
      {:ok, db} -> {:reply, :ok, db}
      {:error, message} -> {:reply, {:error, %ExSQL.Error{message: message}}, db}
    end
  end

  def handle_call({:create_window_function, name, arity, callback}, _from, db) do
    case Database.create_window_function(db, name, arity, callback) do
      {:ok, db} -> {:reply, :ok, db}
      {:error, message} -> {:reply, {:error, %ExSQL.Error{message: message}}, db}
    end
  end

  def handle_call({:create_incremental_window_function, name, arity, callbacks}, _from, db) do
    case Database.create_incremental_window_function(db, name, arity, callbacks) do
      {:ok, db} -> {:reply, :ok, db}
      {:error, message} -> {:reply, {:error, %ExSQL.Error{message: message}}, db}
    end
  end

  def handle_call({:create_collation, name, callback}, _from, db) do
    case Database.create_collation(db, name, callback) do
      {:ok, db} -> {:reply, :ok, db}
      {:error, message} -> {:reply, {:error, %ExSQL.Error{message: message}}, db}
    end
  end

  def handle_call(:database, _from, db), do: {:reply, db, db}
end