%% @author Arjan Scherpenisse <arjan@scherpenisse.net>
%% @copyright 2014-2025 Arjan Scherpenisse
%% @doc Postgresql pool worker. Supervises a database connection, ensures
%% it is connected. The connection is given to the query process. If the
%% query process doesn't return the connection, then the connection is reset.
%%
%% The connection is given to the query process to prevent extra copying of
%% query results between the worker and the query process.
%% @end
%% Copyright 2014-2025 Arjan Scherpenisse
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(z_db_pgsql).
-behaviour(gen_server).
-behaviour(poolboy_worker).
-behaviour(z_db_worker).
-include("zotonic.hrl").
-include_lib("epgsql/include/epgsql.hrl").
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
%% poolboy_worker callbacks
-export([start_link/1]).
%% z_db_worker callbacks
-export([
pool_return_connection/2,
pool_get_connection/1,
is_connection_alive/1,
build_connect_options/2,
ensure_all_started/0,
test_connection/1,
squery/3,
equery/4,
execute_batch/4,
get_raw_connection/1,
release_raw_connection/1
]).
-define(CONNECT_TIMEOUT, 5000).
-define(IDLE_TIMEOUT, 60000).
-define(RAW_CONN_TIMEOUT, 2*3600*1000).
-define(CONNECT_RETRIES, 50).
-define(CONNECT_RETRY_SHORT, 100).
-define(CONNECT_RETRY_MIDDLE, 1000).
-define(CONNECT_RETRY_SLEEP, 10000).
%% @doc Threshold above which we do an automatic explain of traced queries.
-define(DBTRACE_EXPLAIN_MSEC, 100).
-record(state, {
conn = undefined :: undefined | pid(),
conn_args = undefined :: undefined | list(),
busy_monitor = undefined :: undefined | reference(),
busy_pid = undefined :: undefined | pid(),
busy_ref = undefined :: undefined | reference(),
busy_timeout = undefined :: undefined | integer(),
busy_start = undefined :: undefined | pos_integer(),
busy_sql = undefined :: undefined | string() | binary() | raw,
busy_params = [] :: list(),
busy_tracing = false :: boolean(),
canceled_busy_ref = undefined :: undefined | pid(),
is_paused = false :: boolean(),
pause_waiting = undefined
}).
-type error() :: {error, query_error()}
| epgsql_sock:error().
-type query_error() :: epgsql:query_error()
| query_timeout
| connection_down
| paused
| term().
-type query_result() :: squery_result()
| equery_result().
-type squery_result() :: epgsql_cmd_squery:response()
| error().
-type equery_result() :: epgsql_cmd_equery:response()
| error().
-export_type([
query_result/0,
squery_result/0,
equery_result/0,
query_error/0,
error/0
]).
%%
%% API
%%
start_link(Args) when is_list(Args) ->
gen_server:start_link(?MODULE, Args, []).
-spec test_connection( list() ) -> ok | {error, term()}.
test_connection(Args) ->
case try_connect_tcp(Args) of
ok ->
test_connection_1(Args);
{error, _} = Error ->
Error
end.
ensure_all_started() ->
application:ensure_all_started(epgsql, permanent).
test_connection_1(Args) ->
case connect(Args) of
{ok, Conn} ->
{dbschema, Schema} = proplists:lookup(dbschema, Args),
case z_db:schema_exists_conn(Conn, Schema) of
true ->
epgsql:close(Conn),
ok;
false ->
epgsql:close(Conn),
{error, noschema}
end;
{error, _} = E ->
E
end.
-spec is_connection_alive(Worker) -> boolean() when
Worker :: pid().
is_connection_alive(Worker) ->
erlang:is_process_alive(Worker).
-spec pool_get_connection(Context) -> {ok, Conn} | {error, Reason} when
Context :: z:context(),
Conn :: pid(),
Reason :: term().
pool_get_connection(Context) ->
z_db_pool:get_connection(Context).
-spec pool_return_connection(Worker, Context) -> ok | {error, Reason} when
Worker :: pid(),
Context :: z:context(),
Reason :: term().
pool_return_connection(Worker, Context) ->
case is_connection_alive(Worker) of
true ->
try
case gen_server:call(Worker, {pool_return_connection_check, self()}) of
ok ->
z_db_pool:return_connection(Worker, Context);
{error, _} = Error ->
Error
end
catch
exit:Reason:Stack ->
z_context:logger_md(Context),
?LOG_ERROR(#{
text => <<"Return connection failed.">>,
in => zotonic_core,
result => exit,
reason => Reason,
stack => Stack,
worker_pid => Worker
}),
{error, Reason}
end;
false ->
{error, connection_down}
end.
%% @doc Simple query without parameters, the query is interrupted if it takes
%% longer than Timeout msec.
-spec squery(Worker, Sql, Timeout) -> Result when
Worker :: pid(),
Sql :: string() | binary(),
Timeout :: pos_integer(),
Result :: squery_result().
squery(Worker, Sql, Timeout) ->
case fetch_conn(Worker, Sql, [], Timeout) of
{ok, {Conn, Ref}} ->
try
maybe_map_error(epgsql:squery(Conn, Sql))
after
ok = return_conn(Worker, Ref)
end;
{error, _} = Error ->
Error
end.
%% @doc Query with parameters, the query is interrupted if it takes
%% longer than Timeout msec.
-spec equery(Worker, Sql, Parameters, Timeout) -> Result when
Worker :: pid(),
Sql :: string() | binary(),
Parameters :: list(),
Timeout :: pos_integer(),
Result :: equery_result().
equery(Worker, Sql, Parameters, Timeout) ->
case fetch_conn(Worker, Sql, Parameters, Timeout) of
{ok, {Conn, Ref}} ->
try
maybe_map_error(epgsql:equery(Conn, Sql, Parameters))
after
ok = return_conn(Worker, Ref)
end;
{error, _} = Error ->
Error
end.
%% @doc Batch Query, the query is interrupted if it takes
%% longer than Timeout msec.
-spec execute_batch(Worker, Sql, Batch, Timeout) -> Result when
Worker :: pid(),
Sql :: string() | binary(),
Batch :: list( list() ),
Timeout :: pos_integer(),
Result :: {ok, [ equery_result() ]}
| {error, connection_down | term()}.
execute_batch(Worker, Sql, Batch, Timeout) ->
case fetch_conn(Worker, Sql, Batch, Timeout) of
{ok, {Conn, Ref}} ->
try
{Columns, Result} = epgsql:execute_batch(Conn, Sql, Batch),
Result1 = lists:map(
fun
({ok, Count, Rows}) when is_list(Rows) ->
{ok, Count, Columns, Rows};
({ok, Rows}) when is_list(Rows) ->
{ok, Columns, Rows};
({ok, _} = Ok) ->
Ok;
({error, _} = Error) ->
maybe_map_error(Error)
end,
Result),
{ok, Result1}
after
ok = return_conn(Worker, Ref)
end;
{error, _} = Error ->
Error
end.
maybe_map_error({error, #error{ codename = query_canceled }}) ->
{error, query_timeout};
maybe_map_error({error, _} = Error) ->
Error;
maybe_map_error({ok, _, _, _} = Result) ->
Result;
maybe_map_error({ok, _, _} = Result) ->
Result;
maybe_map_error({ok, _} = Result) ->
Result.
%% @doc Request the SQL connection from the worker. The query is passed for logging
% purposes. This caller will do the query using the returned connection.
-spec fetch_conn(Worker, Sql, Parameters, Timeout) -> {ok, {Conn, Ref}} | {error, Reason} when
Worker :: pid(),
Sql :: string() | binary(),
Parameters :: list(),
Timeout :: pos_integer(),
Conn :: pid(),
Ref :: reference(),
Reason :: paused | connection_down | term().
fetch_conn(Worker, Sql, Parameters, Timeout) ->
Ref = erlang:make_ref(),
try
case gen_server:call(Worker, {fetch_conn, Ref, self(), Sql, Parameters, Timeout, is_tracing()}) of
{ok, Conn} ->
{ok, {Conn, Ref}};
{error, paused} ->
{error, paused}
end
catch
exit:{noproc, _} ->
%% The worker process is gone.
{error, connection_down};
exit:{normal, _} ->
%% The worker went down after the call was sent, but
%% the worker exited normally.
{error, connection_down};
exit:Reason:Stack ->
?LOG_ERROR(#{
text => <<"Fetch connection failed.">>,
in => zotonic_core,
result => exit,
reason => Reason,
stack => Stack,
worker_pid => Worker,
sql => Sql
}),
{error, Reason}
end.
%% @doc Return the SQL connection to the worker, must be done within the timeout
%% specified in the fetch_conn/4 call.
-spec return_conn(Worker, Ref) -> ok | {error, Reason} when
Worker :: pid(),
Ref :: reference(),
Reason :: term().
return_conn(Worker, Ref) ->
case is_connection_alive(Worker) of
true ->
gen_server:call(Worker, {return_conn, Ref, self()});
false ->
{error, connection_down}
end.
%% @doc Return the tracing flag from the process dictionary.
-spec is_tracing() -> boolean().
is_tracing() ->
case erlang:get(is_dbtrace) of
true -> true;
_ -> false
end.
%% @doc This function MUST NOT be used, but currently is required by the
%% install / upgrade routines. Can only be called from inside a
%% z_db:transaction/2.
-spec get_raw_connection(Context) -> {ok, ConnPid} | {error, Reason} when
Context :: z:context(),
ConnPid :: pid(),
Reason :: term().
get_raw_connection(#context{dbc=Worker}) when Worker =/= undefined ->
gen_server:call(Worker, {get_raw_connection, self()}).
%% @doc After a connection is fetched using get_raw_connection/1, use this
%% to release the connection again. Otherwise the connection can not be used
%% for other SQL queries. This must be called from inside the same transaction
%% as get_raw_connection/1 was called.
-spec release_raw_connection(Context) -> ok | {error, Reason} when
Context :: z:context(),
Reason :: term().
release_raw_connection(#context{dbc=Worker}) when Worker =/= undefined ->
gen_server:call(Worker, {release_raw_connection, self()}).
%%
%% gen_server callbacks
%%
init(Args) ->
%% Start disconnected
process_flag(trap_exit, true),
{ok, #state{conn=undefined, conn_args=Args}, ?IDLE_TIMEOUT}.
handle_call(pause, _From, #state{ is_paused = true } = State) ->
{reply, ok, State};
handle_call(pause, _From, #state{ busy_pid = undefined } = State) ->
{reply, ok, State#state{ is_paused = true, pause_waiting = undefined }};
handle_call(pause, From, #state{ busy_pid = _Pid } = State) ->
{noreply, State#state{ is_paused = true, pause_waiting = From }};
handle_call(unpause, _From, State) ->
{reply, ok, State#state{ is_paused = false, pause_waiting = undefined }};
handle_call({pool_return_connection_check, _CallerPid}, _From, #state{ busy_pid = undefined } = State) ->
{reply, ok, State};
handle_call({pool_return_connection_check, CallerPid}, From, #state{
busy_pid = Pid,
busy_sql = Sql,
busy_params = Params
} = State) ->
?LOG_ERROR(#{
text => <<"Connection return to pool by worker but still running">>,
in => zotonic_core,
result => error,
reason => running,
request_pid => CallerPid,
busy_pid => Pid,
query => Sql,
args => Params,
worker_pid => self()
}),
gen_server:reply(From, {error, checkin_busy}),
State1 = disconnect(State),
{stop, normal, {error, running}, State1};
handle_call({fetch_conn, _Ref, _CallerPid, _Sql, _Params, _Timeout, _IsTracing}, _From, #state{ is_paused = true } = State) ->
{reply, {error, paused}, State};
handle_call({fetch_conn, _Ref, _CallerPid, _Sql, _Params, _Timeout, _IsTracing} = Cmd, From,
#state{ busy_pid = undefined, conn = undefined, conn_args = Args } = State) ->
case connect(Args, From) of
{ok, Conn} ->
erlang:monitor(process, Conn),
handle_call(Cmd, From, State#state{conn=Conn});
{error, _} = E ->
{reply, E, State}
end;
handle_call({fetch_conn, Ref, CallerPid, Sql, Params, Timeout, IsTracing}, _From, #state{ busy_pid = undefined } = State) ->
Start = trace_start(),
State1 = State#state{
busy_monitor = erlang:monitor(process, CallerPid),
busy_pid = CallerPid,
busy_ref = Ref,
busy_timeout = Timeout,
busy_start = Start,
busy_sql = Sql,
busy_params = Params,
busy_tracing = IsTracing,
canceled_busy_ref = undefined
},
{reply, {ok, State#state.conn}, State1, Timeout};
handle_call({fetch_conn, _Ref, CallerPid, Sql, Params, _Timeout, _IsTracing}, From, #state{ busy_pid = OtherPid } = State)
when CallerPid =:= OtherPid ->
% Caller is confused - starting a request whilst the current request isn't finished yet.
% Log an error, stop the running query, and kill this worker.
% No hope of recovery, as the caller is in an illegal state reusing this connection
% for multiple queries.
?LOG_ERROR(#{
text => <<"Connection requested but already using same connection">>,
in => zotonic_core,
result => error,
reason => busy,
request_pid => CallerPid,
query => Sql,
args => Params,
worker_pid => self()
}),
gen_server:reply(From, {error, busy}),
State1 = disconnect(State),
{stop, normal, {error, busy}, State1};
handle_call({fetch_conn, _Ref, CallerPid, Sql, Params, _Timeout, _IsTracing}, _From, #state{ busy_pid = OtherPid } = State) ->
% This can happen if a connection is shared by two processes.
% Deny the request and continue with the running request.
?LOG_ERROR(#{
text => <<"Connection requested but in use by other pid">>,
in => zotonic_core,
result => error,
reason => busy,
request_pid => CallerPid,
busy_pid => OtherPid,
query => Sql,
args => Params,
worker_pid => self()
}),
{reply, {error, busy}, State, timeout(State)};
handle_call({return_conn, Ref, Pid}, _From,
#state{
busy_monitor = Monitor,
busy_ref = Ref,
busy_pid = Pid,
busy_sql = Sql,
busy_params = Params,
busy_start = Start,
busy_tracing = IsTracing,
conn = Conn
} = State) ->
erlang:demonitor(Monitor),
trace_end(IsTracing, Start, Sql, Params, Conn),
State1 = reset_busy_state(State),
{reply, ok, State1, timeout(State1)};
handle_call({return_conn, Ref, Pid}, _From, #state{ busy_pid = undefined, canceled_busy_ref = Ref } = State) ->
?LOG_INFO(#{
text => <<"SQL connection returned after cancel">>,
in => zotonic_core,
request_pid => Pid
}),
{reply, ok, State#state{ canceled_busy_ref = undefined }, timeout(State)};
handle_call({return_conn, _Ref, Pid}, _From, #state{ busy_pid = OtherPid } = State) ->
?LOG_ERROR(#{
text => <<"SQL connection returned but in use by other pid">>,
in => zotonic_core,
result => error,
reason => busy,
request_pid => Pid,
busy_pid => OtherPid,
worker_pid => self()
}),
{reply, {error, notyours}, State, timeout(State)};
handle_call({get_raw_connection, CallerPid}, _From, #state{ busy_pid = BusyPid } = State) when BusyPid =/= undefined, CallerPid =/= BusyPid ->
?LOG_ERROR(#{
text => <<"Raw SQL connection requested but already in use by other pid">>,
in => zotonic_core,
result => error,
reason => busy,
request_pid => CallerPid,
busy_pid => BusyPid,
worker_pid => self()
}),
{reply, {error, busy}, State, timeout(State)};
handle_call({get_raw_connection, CallerPid}, From, #state{ conn = undefined, conn_args = Args } = State) ->
case connect(Args, From) of
{ok, Conn} ->
erlang:monitor(process, Conn),
handle_call({get_raw_connection, CallerPid}, From, State#state{conn=Conn});
{error, _} = E ->
{reply, E, State}
end;
handle_call({get_raw_connection, CallerPid}, _From, #state{ conn = Conn } = State) ->
State1 = State#state{
busy_monitor = erlang:monitor(process, CallerPid),
busy_pid = CallerPid,
busy_sql = raw
},
{reply, {ok, Conn}, State1, ?RAW_CONN_TIMEOUT};
handle_call({release_raw_connection, CallerPid}, _From, #state{ busy_pid = BusyPid } = State) when BusyPid =:= CallerPid ->
State1 = demonitor_busy(State),
{reply, ok, State1, ?IDLE_TIMEOUT};
handle_call({release_raw_connection, CallerPid}, _From, #state{ busy_pid = BusyPid } = State) ->
?LOG_ERROR(#{
text => <<"Raw SQL connection released but in use by other pid">>,
in => zotonic_core,
result => error,
reason => busy,
request_pid => CallerPid,
busy_pid => BusyPid,
worker_pid => self()
}),
{reply, {error, notyours}, State, ?RAW_CONN_TIMEOUT};
handle_call(Message, _From, State) ->
?LOG_NOTICE(#{
text => <<"SQL unexpected call message">>,
in => zotonic_core,
request => Message,
worker_pid => self()
}),
{reply, {error, unknown_call}, State, timeout(State)}.
handle_cast(_Msg, State) ->
{noreply, State, ?IDLE_TIMEOUT}.
handle_info(disconnect, #state{ conn = undefined } = State) ->
{noreply, State};
handle_info(disconnect, #state{ busy_pid = undefined } = State) ->
Database = get_arg(dbdatabase, State#state.conn_args),
Schema = get_arg(dbschema, State#state.conn_args),
?LOG_DEBUG(#{
text => <<"SQL closing connection">>,
in => zotonic_core,
database => Database,
schema => Schema,
worker_pid => self()
}),
{noreply, disconnect(State), hibernate};
handle_info(disconnect, State) ->
Database = get_arg(dbdatabase, State#state.conn_args),
Schema = get_arg(dbschema, State#state.conn_args),
?LOG_ERROR(#{
text => <<"SQL disconnect whilst busy with query">>,
in => zotonic_core,
result => error,
reason => disconnect,
database => Database,
schema => Schema,
query => State#state.busy_sql,
args => State#state.busy_params,
worker_pid => self()
}),
{stop, normal, disconnect(State)};
handle_info(timeout, #state{ busy_pid = undefined } = State) ->
% Idle timeout - no SQL query is running
{noreply, disconnect(State), hibernate};
handle_info(timeout, #state{
busy_pid = Pid,
busy_sql = Sql,
busy_params = Params,
busy_timeout = Timeout
} = State) ->
% Query timeout - cancel the running query.
Database = get_arg(dbdatabase, State#state.conn_args),
Schema = get_arg(dbschema, State#state.conn_args),
?LOG_ERROR(#{
text => <<"SQL Timeout">>,
in => zotonic_core,
result => error,
reason => timeout,
busy_pid => Pid,
timeout => Timeout,
database => Database,
schema => Schema,
query => Sql,
args => Params,
worker_pid => self()
}),
State1 = cancel(State),
{noreply, State1, hibernate};
handle_info({'DOWN', _Ref, process, BusyPid, Reason}, #state{
busy_pid = BusyPid,
busy_sql = Sql,
busy_params = Params
} = State) ->
% The process using our connection is down.
% As it might have been in a transaction, we just kill
% the connection and let the database clean up.
Database = get_arg(dbdatabase, State#state.conn_args),
Schema = get_arg(dbschema, State#state.conn_args),
?LOG_DEBUG(#{
text => <<"SQL caller down during query">>,
in => zotonic_core,
result => error,
reason => Reason,
busy_pid => BusyPid,
database => Database,
schema => Schema,
query => Sql,
args => Params,
worker_pid => self()
}),
{noreply, disconnect(State), hibernate};
handle_info({'DOWN', _Ref, process, ConnPid, Reason}, #state{
conn = ConnPid,
busy_pid = BusyPid,
busy_sql = Sql,
busy_params = Params
} = State) when is_pid(BusyPid) ->
% Unexpected DOWN from the connection during query
Database = get_arg(dbdatabase, State#state.conn_args),
Schema = get_arg(dbschema, State#state.conn_args),
?LOG_ERROR(#{
text => <<"SQL connection drop during query">>,
in => zotonic_core,
result => error,
reason => Reason,
busy_pid => BusyPid,
database => Database,
schema => Schema,
query => Sql,
args => Params,
worker_pid => self()
}),
State1 = State#state{ conn = undefined },
{noreply, disconnect(State1), hibernate};
handle_info({'DOWN', _Ref, process, Pid, _Reason}, #state{ conn = Pid } = State) ->
% Connection down, no processes running, ok to hibernate
State1 = State#state{ conn = undefined },
{noreply, disconnect(State1), hibernate};
handle_info({'DOWN', _Ref, process, _Pid, _Reason}, #state{ busy_pid = undefined } = State) ->
% Might be a late down message from the busy pid, ignore.
{noreply, State, timeout(State)};
handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
% Stray 'DOWN' message, might be a race condition.
?LOG_NOTICE(#{
text => <<"SQL got 'DOWN' message from unknown process">>,
in => zotonic_core,
down_pid => Pid,
down_reason => Reason,
state => State,
worker_pid => self()
}),
{noreply, State, timeout(State)};
handle_info({'EXIT', _Pid, _Reason}, State) ->
% Ignore - we have monitors for the connection and the request caller.
{noreply, State};
handle_info(Info, State) ->
?LOG_WARNING(#{
text => <<"SQL unexpected info message">>,
in => zotonic_core,
message => Info,
state => State,
worker_pid => self()
}),
{noreply, State, timeout(State)}.
terminate(_Reason, #state{} = State) ->
disconnect(State),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%
%% Helper functions
%%
%% @doc Cancel the running query.
cancel(#state{ conn = Conn, busy_pid = Pid, busy_ref = Ref } = State) when is_pid(Pid), Conn =/= undefined ->
ok = epgsql:cancel(Conn),
State1 = demonitor_busy(State),
State1#state{
canceled_busy_ref = Ref
};
cancel(#state{ conn = undefined } = State) ->
State.
%% @doc Cancel any running query and close the connection to the SQL server
disconnect(#state{ conn = undefined } = State) ->
demonitor_busy(State);
disconnect(#state{ conn = Conn, busy_pid = Pid, busy_ref = Ref } = State) when is_pid(Pid) ->
ok = epgsql:cancel(Conn),
State1 = disconnect_1(State),
State1#state{
canceled_busy_ref = Ref
};
disconnect(#state{ busy_pid = undefined } = State) ->
disconnect_1(State).
disconnect_1(#state{ conn = Conn} = State) ->
ok = epgsql:close(Conn),
State1 = receive
{'DOWN', _Ref, process, Conn, _Reason} ->
% The SQL connection sent the error to the busy pid
reset_busy_state(State)
after 500 ->
% Assume busy pid did not receive the error, kill it
erlang:exit(Conn, kill),
demonitor_busy(State)
end,
State1#state{ conn = undefined }.
%% @doc Demonitor the busy process - it will kill itself by using the
%% now defunct connection process.
demonitor_busy(#state{ busy_pid = Pid } = State) when is_pid(Pid) ->
#state{
busy_monitor = Monitor,
busy_sql = Sql,
busy_params = Params,
busy_start = Start,
busy_tracing = IsTracing,
conn = Conn
} = State,
erlang:demonitor(Monitor),
trace_end(IsTracing, Start, Sql, Params, Conn),
reset_busy_state(State);
demonitor_busy(State) ->
reset_busy_state(State).
reset_busy_state(#state{ is_paused = true, pause_waiting = From } = State) when From =/= undefined ->
gen_server:reply(From, ok),
reset_busy_state(State#state{ pause_waiting = undefined });
reset_busy_state(State) ->
State#state{
busy_monitor = undefined,
busy_pid = undefined,
busy_ref = undefined,
busy_timeout = undefined,
busy_start = undefined,
busy_sql = undefined,
busy_params = [],
canceled_busy_ref = undefined
}.
%% @doc Calculate the remaining timeout for the running query.
timeout(#state{ busy_timeout = undefined }) ->
?IDLE_TIMEOUT;
timeout(#state{ busy_timeout = Timeout, busy_start = Start }) ->
Now = msec(),
erlang:max(1, Timeout - (Now - Start)).
try_connect_tcp(Args) ->
Addr = get_arg(dbhost, Args),
Port = get_arg(dbport, Args),
SockOpts = [{active, false}, {packet, raw}, binary],
case gen_tcp:connect(Addr, Port, SockOpts, ?CONNECT_TIMEOUT) of
{ok, Sock} ->
gen_tcp:close(Sock),
ok;
{error, _} = Error ->
Error
end.
connect(Args) when is_list(Args) ->
connect(Args, 0, undefined).
connect(Args, {Pid, _Ref}) when is_list(Args) ->
MRef = monitor(process, Pid),
Result = connect(Args, 0, MRef),
demonitor(MRef),
Result.
connect(_Args, RetryCt, _MRef) when RetryCt >= ?CONNECT_RETRIES ->
{error, econnrefused};
connect(Args, RetryCt, undefined) ->
connect_1(Args, RetryCt, undefined);
connect(Args, RetryCt, MRef) ->
receive
{'DOWN', MRef, process, _Pid, _Reson} ->
{error, caller_down}
after 0 ->
connect_1(Args, RetryCt, MRef)
end.
% Suppress warning about epgsql_connect not returning {error, econnrefused}
% It is returning it, but the type spec in epgsql is wrong.
-dialyzer({nowarn_function, connect_1/3}).
connect_1(Args, RetryCt, MRef) ->
try
Database = get_arg(dbdatabase, Args),
Schema = get_arg(dbschema, Args),
Options = build_connect_options(Database, Args),
case epgsql:connect(Options) of
{ok, Conn} ->
set_schema(Conn, Schema);
{error, #error{ codename = too_many_connections }} ->
retry(Args, too_many_connections, RetryCt, MRef);
{error, #error{ codename = out_of_memory }} ->
retry(Args, out_of_memory, RetryCt, MRef);
{error, #error{ codename = admin_shutdown }} ->
retry(Args, admin_shutdown, RetryCt, MRef);
{error, #error{ codename = crash_shutdown }} ->
retry(Args, crash_shutdown, RetryCt, MRef);
{error, #error{ codename = cannot_connect_now }} ->
retry(Args, cannot_connect_now, RetryCt, MRef);
{error, econnrefused} ->
retry(Args, econnrefused, RetryCt, MRef);
{error, Reason} = E ->
?LOG_WARNING(#{
text => <<"Database psql connect returned error">>,
in => zotonic_core,
database => Database,
schema => Schema,
options => Options,
result => error,
reason => Reason
}),
E
end
catch
A:B ->
retry(Args, {A, B}, RetryCt, MRef)
end.
build_connect_options(DatabaseName, Args) ->
Opts = #{
database => DatabaseName,
codecs => [{z_db_pgsql_codec, []}],
nulls => [undefined, null, {term, undefined}, {term_json, undefined}]
},
maybe_put_args([{dbhost, host}, {dbport, port}, {dbuser, username}, {dbpassword, password}], Args, Opts).
maybe_put_args([], _, Map) ->
Map;
maybe_put_args([{ArgsKey, Key} | T], Args, Map) ->
case get_arg(ArgsKey, Args) of
undefined ->
maybe_put_args(T, Args, Map);
Value ->
maybe_put_args(T, Args, Map#{Key => Value})
end.
set_schema(Conn, Schema) ->
case epgsql:squery(Conn,"SET TIME ZONE 'UTC'; SET search_path TO \"" ++ Schema ++ "\"") of
[{ok, [], []}, {ok, [], []}] ->
{ok, Conn};
Error ->
catch epgsql:close(Conn),
{error, Error}
end.
%% @doc Retry connection to PostgreSQL server.
retry(Args, Reason, RetryCt, MRef) ->
Hostname = get_arg(dbhost, Args),
Port = get_arg(dbport, Args),
Delay = retry_delay(Reason, RetryCt),
?LOG_WARNING(#{
text => <<"Database psql connect failed">>,
in => zotonic_core,
hostname => Hostname,
port => Port,
result => error,
reason => Reason,
retry_msec => Delay,
worker_pid => self()
}),
maybe_close_connections(Reason),
timer:sleep(Delay),
connect(Args, RetryCt + 1, MRef).
maybe_close_connections(out_of_memory) ->
z_db_pool:close_connections();
maybe_close_connections(too_many_connections) ->
z_db_pool:close_connections();
maybe_close_connections(_) ->
nop.
retry_delay(_, RetryCount) when RetryCount < 2 ->
?CONNECT_RETRY_SHORT;
retry_delay(too_many_connections, _) ->
?CONNECT_RETRY_MIDDLE;
retry_delay(_, _RetryCount) ->
?CONNECT_RETRY_SLEEP.
get_arg(K, Args) ->
maybe_default(K, proplists:get_value(K, Args)).
maybe_default(dbport, 0) -> z_config:get(dbport);
maybe_default(K, undefined) -> z_config:get(K);
maybe_default(K, "") -> z_config:get(K);
maybe_default(K, <<>>) -> z_config:get(K);
maybe_default(_K, V) -> V.
%%
%% Request tracing
%%
trace_start() ->
msec().
trace_end(false, _Start, _Sql, _Params, _Conn) ->
ok;
trace_end(true, Start, Sql, Params, Conn) ->
Duration = msec() - Start,
?LOG_NOTICE(#{
text => <<"SQL TRACE">>,
in => zotonic_core,
msec => Duration,
sql => Sql,
params => Params
}),
maybe_explain(Duration, Sql, Params, Conn).
maybe_explain(Duration, _Sql, _Params, _Conn) when Duration < ?DBTRACE_EXPLAIN_MSEC ->
ok;
maybe_explain(_Duration, Sql, Params, Conn) ->
case is_explainable(z_string:to_lower(Sql)) of
true ->
Sql1 = "explain "++Sql,
R = epgsql:equery(Conn, Sql1, Params),
maybe_log_query_plan(R);
false ->
ok
end.
is_explainable(<<"begin", _/binary>>) -> false;
is_explainable(<<"commit", _/binary>>) -> false;
is_explainable(<<"rollback", _/binary>>) -> false;
is_explainable(<<"explain ", _/binary>>) -> false;
is_explainable(<<"alter ", _/binary>>) -> false;
is_explainable(<<"drop ", _/binary>>) -> false;
is_explainable(<<"create ", _/binary>>) -> false;
is_explainable(_) -> true.
maybe_log_query_plan({ok, [ #column{ name = <<"QUERY PLAN">> } ], Rows}) ->
Lines = lists:map( fun({R}) -> [ 10, R ] end, Rows ),
?LOG_NOTICE(#{
text => <<"SQL EXPLAIN">>,
in => zotonic_core,
explain => iolist_to_binary(Lines)
});
maybe_log_query_plan(Other) ->
?LOG_NOTICE(#{
text => <<"SQL EXPLAIN">>,
in => zotonic_core,
explain => Other
}),
ok.
msec() ->
{A, B, C} = os:timestamp(),
A * 1000000000 + B * 1000 + C div 1000.