defmodule Selecto.ConnectionPool do
@moduledoc """
Connection pooling and management for Selecto.
Provides a high-performance connection pool using DBConnection for efficient
database connection reuse, prepared statement caching, and connection health monitoring.
## Features
- Connection pooling with configurable pool sizes
- Prepared statement caching for repeated queries
- Connection health monitoring and automatic recovery
- Adapter-owned pool startup and pooled execution
- Graceful fallback to direct connections when pooling is disabled
## Configuration
# Application config
config :selecto, Selecto.ConnectionPool,
pool_size: 10,
max_overflow: 20,
prepared_statement_cache_size: 1000,
connection_timeout: 5000,
checkout_timeout: 5000
## Usage
# Start a connection pool
{:ok, pool} = Selecto.ConnectionPool.start_pool(connection_input)
# Configure Selecto with pooled connection
selecto = Selecto.configure(domain, {:pool, pool})
# Or use default pool management
selecto = Selecto.configure(domain, connection_input, pool: true)
"""
use GenServer
require Logger
@default_pool_config [
pool_size: 10,
max_overflow: 20,
prepared_statement_cache_size: 1000,
connection_timeout: 5000,
checkout_timeout: 5000
]
@type pool_ref :: pid() | atom()
@type connection_config :: Keyword.t() | map()
@type pool_options :: Keyword.t()
@runtime Selecto.ConnectionPool.Runtime
@registry Selecto.ConnectionPool.Registry
@manager_supervisor Selecto.ConnectionPool.ManagerSupervisor
@prepared_statements_table :selecto_prepared_statements
@checkout_refs_table :selecto_checkout_refs
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) when is_list(opts) do
pool_name = Keyword.fetch!(opts, :pool_name)
GenServer.start_link(__MODULE__, opts, name: via_manager(pool_name))
end
def child_spec(opts) do
pool_name = Keyword.fetch!(opts, :pool_name)
%{
id: {__MODULE__, pool_name},
start: {__MODULE__, :start_link, [opts]},
restart: :permanent,
shutdown: 5000,
type: :worker
}
end
@doc """
Start a connection pool with the given configuration.
## Parameters
- `connection_config` - Database connection configuration
- `pool_options` - Pool-specific options (optional)
## Returns
- `{:ok, pool_ref}` - Pool started successfully
- `{:error, reason}` - Pool startup failed
## Examples
# Start pool with default adapter config
config = [
hostname: "localhost",
username: "user",
password: "pass",
database: "mydb"
]
{:ok, pool} = Selecto.ConnectionPool.start_pool(config)
# Start pool with custom options and adapter
{:ok, pool} = Selecto.ConnectionPool.start_pool(config,
pool_size: 20,
adapter: SelectoDBMySQL.Adapter
)
"""
@spec start_pool(connection_config(), pool_options()) :: {:ok, pool_ref()} | {:error, term()}
def start_pool(connection_config, pool_options \\ []) do
pool_config = Keyword.merge(@default_pool_config, pool_options)
adapter = Keyword.get(pool_options, :adapter, Selecto.AdapterSupport.default_adapter())
# Create unique pool name based on connection config and adapter
pool_name = generate_pool_name(%{adapter: adapter, connection_config: connection_config})
with :ok <- ensure_runtime_started() do
cond do
Selecto.AdapterSupport.callback_available?(adapter, :start_pool, 3) ->
adapter.start_pool(connection_config, pool_config, pool_name)
Selecto.AdapterSupport.postgresql_adapter?(adapter) ->
{:error, {:adapter_pool_start_unavailable, adapter}}
true ->
start_generic_pool(adapter, connection_config, pool_config, pool_name)
end
end
end
defp start_generic_pool(adapter, connection_config, pool_config, pool_name) do
case get_manager_pid_by_name(pool_name) do
{:ok, manager_pid} ->
build_pool_ref_from_manager(manager_pid)
:error ->
cond do
not is_atom(adapter) ->
{:error, {:invalid_adapter, adapter}}
not Selecto.AdapterSupport.callback_available?(adapter, :connect, 1) ->
{:error, {:unsupported_adapter, adapter}}
true ->
case adapter.connect(connection_config) do
{:ok, connection} ->
manager_opts = [
adapter: adapter,
connection: connection,
pool_name: pool_name,
pool_config: pool_config,
connection_config: connection_config
]
case start_manager(manager_opts) do
{:ok, manager_pid, :started} ->
build_pool_ref_from_manager(manager_pid)
{:ok, manager_pid, :existing} ->
maybe_stop_connection(connection)
build_pool_ref_from_manager(manager_pid)
{:error, reason} ->
maybe_stop_connection(connection)
{:error, reason}
end
{:error, reason} ->
{:error, reason}
end
end
end
end
@doc """
Stop a connection pool.
Gracefully shuts down the pool and all its connections.
"""
@spec stop_pool(pool_ref()) :: :ok
def stop_pool(%{pool: pool_pid, manager: manager_pid}) do
GenServer.stop(manager_pid)
GenServer.stop(pool_pid)
end
def stop_pool(%{manager: manager_pid, connection: connection}) do
GenServer.stop(manager_pid)
maybe_stop_connection(connection)
:ok
end
def stop_pool(pool_pid) when is_pid(pool_pid) do
GenServer.stop(pool_pid)
end
@doc """
Execute a query using a pooled connection.
Automatically handles connection checkout/checkin and prepared statement caching.
"""
@spec execute(pool_ref(), String.t(), list(), Keyword.t()) ::
{:ok, term()} | {:error, term()}
def execute(pool_ref, query, params, opts \\ [])
def execute({:pool, pool_ref}, query, params, opts) do
execute(pool_ref, query, params, opts)
end
def execute(%{adapter: adapter, connection: connection}, query, params, opts) do
if Selecto.AdapterSupport.postgresql_adapter?(adapter) do
execute(%{adapter: adapter}, query, params, opts)
else
if Selecto.AdapterSupport.callback_available?(adapter, :execute, 4) do
adapter.execute(connection, query, params, opts)
else
{:error, {:unsupported_adapter, adapter}}
end
end
end
def execute(pool_ref, query, params, opts) do
adapter = pool_adapter(pool_ref)
cond do
Selecto.AdapterSupport.callback_available?(adapter, :execute_pool, 4) ->
adapter.execute_pool(pool_ref, query, params, opts)
true ->
{:error, {:unsupported_adapter, adapter}}
end
end
@doc """
Get pool statistics for monitoring.
Returns information about pool health, connection counts, and cache statistics.
"""
@spec pool_stats(pool_ref()) :: map()
def pool_stats({:pool, pool_ref}) do
pool_stats(pool_ref)
end
def pool_stats(pool_ref) do
case get_manager_pid(pool_ref) do
{:ok, manager_pid} ->
GenServer.call(manager_pid, :get_stats)
{:error, _reason} ->
%{error: "Pool manager not available"}
end
end
@doc """
Clear prepared statement cache for a pool.
"""
@spec clear_cache(pool_ref()) :: :ok
def clear_cache({:pool, pool_ref}) do
clear_cache(pool_ref)
end
def clear_cache(pool_ref) do
maybe_clear_prepared_flags(pool_ref)
case get_manager_pid(pool_ref) do
{:ok, manager_pid} ->
GenServer.cast(manager_pid, :clear_cache)
{:error, _reason} ->
:ok
end
end
# GenServer Implementation
@impl GenServer
def init(opts) do
pool_pid = Keyword.get(opts, :pool_pid)
connection = Keyword.get(opts, :connection)
adapter = Keyword.get(opts, :adapter, Selecto.AdapterSupport.default_adapter())
pool_name = Keyword.fetch!(opts, :pool_name)
pool_config = Keyword.fetch!(opts, :pool_config)
connection_config = Keyword.fetch!(opts, :connection_config)
state = %{
adapter: adapter,
pool_pid: pool_pid,
connection: connection,
pool_name: pool_name,
pool_config: pool_config,
connection_config: connection_config,
stats: %{
queries_executed: 0,
cache_hits: 0,
cache_misses: 0,
connections_created: 0,
errors: 0
}
}
# Setup periodic health checks
schedule_health_check()
{:ok, state}
end
@impl GenServer
def handle_call(:pool_reference, _from, state) do
reference =
if Selecto.AdapterSupport.postgresql_adapter?(state.adapter) do
%{
adapter: state.adapter,
pool: state.pool_pid,
manager: self(),
name: state.pool_name
}
else
%{
adapter: state.adapter,
connection: state.connection,
manager: self(),
name: state.pool_name
}
end
{:reply, reference, state}
end
@impl GenServer
def handle_call(:get_stats, _from, state) do
pool_info =
if is_pid(state.pool_pid) do
try do
DBConnection.status(state.pool_pid)
rescue
_ -> %{available: 0, size: 0}
end
else
%{available: nil, size: nil, mode: :adapter_managed}
end
stats =
Map.merge(state.stats, %{
pool_info: pool_info,
cache_size: prepared_cache_size(state.pool_pid),
uptime: System.system_time(:second)
})
{:reply, stats, state}
end
@impl GenServer
def handle_cast(:clear_cache, state) do
clear_prepared_flags(state.pool_pid)
{:noreply, state}
end
@impl GenServer
def handle_info(:health_check, state) do
# Perform health check
case validate_pool_health(state) do
:ok ->
schedule_health_check()
{:noreply, state}
{:error, reason} ->
Logger.warning("Pool health check failed: #{inspect(reason)}")
schedule_health_check()
{:noreply, update_in(state.stats.errors, &(&1 + 1))}
end
end
@impl GenServer
def terminate(_reason, _state), do: :ok
# Private Functions
@doc """
Checkout a connection from the pool for manual management.
This is useful for transactions or when you need to execute multiple
queries on the same connection.
## Parameters
- `pool_ref` - The pool reference
- `opts` - Options including :timeout
## Returns
- `{:ok, connection_ref}` - Connection checked out successfully
- `{:error, reason}` - Checkout failed
## Examples
{:ok, conn} = Selecto.ConnectionPool.checkout(pool)
try do
run_checkout_queries(conn)
after
Selecto.ConnectionPool.checkin(pool, conn)
end
"""
@spec checkout(pool_ref(), Keyword.t()) :: {:ok, DBConnection.conn()} | {:error, term()}
def checkout(pool_ref, opts \\ []) do
_timeout = Keyword.get(opts, :timeout, 15_000)
case get_pool_pid(pool_ref) do
{:ok, pool_pid} ->
# DBConnection pools handle checkout internally when you call query functions
# For manual checkout semantics, we return a reference to the pool
# The actual connection checkout happens when you execute a query
ref = make_ref()
put_checkout_ref(ref, pool_pid, self())
{:ok, {:selecto_conn, ref, pool_pid}}
{:error, reason} ->
{:error, reason}
end
end
@doc """
Return a checked-out connection to the pool.
Always call this after checkout/2 to return the connection.
"""
@spec checkin(pool_ref(), term()) :: :ok
def checkin(_pool_ref, {:selecto_conn, ref, _pool_pid}) do
delete_checkout_ref(ref)
:ok
end
def checkin(_pool_ref, _conn), do: :ok
@doc false
def checkout_lookup(ref) do
case lookup_checkout_ref(ref) do
{pool_pid, _owner_pid} -> {:ok, pool_pid}
:error -> :error
end
end
@doc """
Execute a function with a checked-out connection.
This is the recommended way to use connections for multiple operations.
The connection is automatically returned to the pool when the function completes.
## Examples
Selecto.ConnectionPool.with_connection(pool, fn conn ->
run_queries_with_adapter_connection(conn)
end)
"""
@spec with_connection(pool_ref(), (DBConnection.conn() -> result)) ::
{:ok, result} | {:error, term()}
when result: term()
def with_connection(pool_ref, fun) when is_function(fun, 1) do
adapter = pool_adapter(pool_ref)
cond do
Selecto.AdapterSupport.callback_available?(adapter, :with_connection, 2) ->
adapter.with_connection(pool_ref, fun)
true ->
{:error, {:unsupported_adapter, adapter}}
end
end
@doc """
Execute a transaction on a pooled connection.
All queries in the function are executed within a database transaction.
## Examples
Selecto.ConnectionPool.transaction(pool, fn conn ->
run_transaction_steps(conn)
end)
"""
@spec transaction(pool_ref(), (DBConnection.conn() -> result), Keyword.t()) ::
{:ok, result} | {:error, term()}
when result: term()
def transaction(pool_ref, fun, opts \\ []) when is_function(fun, 1) do
adapter = pool_adapter(pool_ref)
cond do
Selecto.AdapterSupport.callback_available?(adapter, :transaction, 3) ->
adapter.transaction(pool_ref, fun, opts)
true ->
{:error, {:unsupported_adapter, adapter}}
end
end
defp pool_adapter(%{adapter: adapter}) when is_atom(adapter), do: adapter
defp pool_adapter(_pool_ref), do: Selecto.AdapterSupport.default_adapter()
defp validate_pool_health(%{pool_pid: pool_pid}) when is_pid(pool_pid) do
try do
if Process.alive?(pool_pid) do
:ok
else
{:error, "Pool process not alive"}
end
rescue
error -> {:error, error}
end
end
defp validate_pool_health(%{adapter: adapter, connection: connection}) do
cond do
is_nil(connection) ->
{:error, "Adapter connection not available"}
Selecto.AdapterSupport.callback_available?(adapter, :supports?, 1) ->
:ok
true ->
{:error, "Adapter does not implement health capabilities"}
end
end
defp schedule_health_check() do
Process.send_after(self(), :health_check, 30_000)
end
# Make these public for testing
def get_pool_pid(%{pool: pool_pid}), do: {:ok, pool_pid}
def get_pool_pid(pool_pid) when is_pid(pool_pid), do: {:ok, pool_pid}
def get_pool_pid(_), do: {:error, "Invalid pool reference"}
def get_manager_pid(%{manager: manager_pid}), do: {:ok, manager_pid}
def get_manager_pid(%{name: pool_name}) when is_atom(pool_name) do
case get_manager_pid_by_name(pool_name) do
{:ok, manager_pid} -> {:ok, manager_pid}
:error -> {:error, "Invalid pool reference"}
end
end
def get_manager_pid(pool_name) when is_atom(pool_name) do
case get_manager_pid_by_name(pool_name) do
{:ok, manager_pid} -> {:ok, manager_pid}
:error -> {:error, "Invalid pool reference"}
end
end
def get_manager_pid(_), do: {:error, "Invalid pool reference"}
defp ensure_runtime_started do
case @runtime.ensure_started() do
{:ok, _pid} -> :ok
{:error, reason} -> {:error, reason}
end
end
defp via_manager(pool_name) when is_atom(pool_name) do
{:via, Registry, {@registry, {:manager, pool_name}}}
end
@doc false
def get_manager_pid_by_name(pool_name) when is_atom(pool_name) do
case Process.whereis(@registry) do
nil ->
:error
_pid ->
case Registry.lookup(@registry, {:manager, pool_name}) do
[{pid, _value}] -> {:ok, pid}
[] -> :error
end
end
end
@doc false
def start_manager(manager_opts) when is_list(manager_opts) do
pool_name = Keyword.fetch!(manager_opts, :pool_name)
case DynamicSupervisor.start_child(@manager_supervisor, {__MODULE__, manager_opts}) do
{:ok, manager_pid} ->
{:ok, manager_pid, :started}
{:error, {:already_started, manager_pid}} ->
{:ok, manager_pid, :existing}
{:error, {:already_present, _child_id}} ->
case get_manager_pid_by_name(pool_name) do
{:ok, manager_pid} -> {:ok, manager_pid, :existing}
:error -> {:error, :manager_start_conflict}
end
{:error, reason} ->
{:error, reason}
end
end
@doc false
def build_pool_ref_from_manager(manager_pid) when is_pid(manager_pid) do
case GenServer.call(manager_pid, :pool_reference) do
%{} = reference -> {:ok, reference}
other -> {:error, {:invalid_pool_reference, other}}
end
catch
:exit, reason -> {:error, {:manager_unavailable, reason}}
end
defp maybe_stop_connection(connection) when is_pid(connection) do
if Process.alive?(connection) do
Process.exit(connection, :normal)
else
:ok
end
end
defp maybe_stop_connection(_), do: :ok
# Expose for testing
def generate_pool_name(connection_config) do
# Create a unique pool name based on connection parameters
hash =
:crypto.hash(:md5, inspect(connection_config))
|> Base.encode16(case: :lower)
|> String.slice(0, 8)
:"selecto_pool_#{hash}"
end
def generate_cache_key(query) do
:crypto.hash(:md5, query) |> Base.encode16(case: :lower)
end
@doc false
def prepared_statement_cached?(pool_pid, cache_key)
when is_pid(pool_pid) and is_binary(cache_key) do
table = ensure_prepared_statements_table()
:ets.member(table, {pool_pid, cache_key})
end
@doc false
def mark_prepared_statement(pool_pid, cache_key)
when is_pid(pool_pid) and is_binary(cache_key) do
table = ensure_prepared_statements_table()
:ets.insert(table, {{pool_pid, cache_key}, true})
:ok
end
defp maybe_clear_prepared_flags(pool_ref) do
case get_pool_pid(pool_ref) do
{:ok, pool_pid} -> clear_prepared_flags(pool_pid)
{:error, _reason} -> :ok
end
end
defp prepared_cache_size(pool_pid) when is_pid(pool_pid) do
table = ensure_prepared_statements_table()
:ets.select_count(table, [
{{{pool_pid, :_}, :_}, [], [true]}
])
end
defp prepared_cache_size(_), do: 0
defp clear_prepared_flags(pool_pid) when is_pid(pool_pid) do
table = ensure_prepared_statements_table()
:ets.select_delete(table, [
{{{pool_pid, :_}, :_}, [], [true]}
])
:ok
end
defp clear_prepared_flags(_), do: :ok
defp put_checkout_ref(ref, pool_pid, owner_pid)
when is_reference(ref) and is_pid(pool_pid) and is_pid(owner_pid) do
table = ensure_checkout_refs_table()
:ets.insert(table, {ref, pool_pid, owner_pid})
:ok
end
defp delete_checkout_ref(ref) when is_reference(ref) do
table = ensure_checkout_refs_table()
:ets.delete(table, ref)
:ok
end
defp lookup_checkout_ref(ref) when is_reference(ref) do
table = ensure_checkout_refs_table()
case :ets.lookup(table, ref) do
[{^ref, pool_pid, owner_pid}] -> {pool_pid, owner_pid}
[] -> :error
end
end
defp ensure_prepared_statements_table do
ensure_named_table(@prepared_statements_table)
end
defp ensure_checkout_refs_table do
ensure_named_table(@checkout_refs_table)
end
defp ensure_named_table(name) when is_atom(name) do
case :ets.whereis(name) do
:undefined ->
try do
:ets.new(name, [
:set,
:public,
:named_table,
read_concurrency: true,
write_concurrency: true
])
rescue
ArgumentError -> :ets.whereis(name)
end
table ->
table
end
end
end