src/db/z_db_pgsql.erl

%% @author Arjan Scherpenisse <arjan@scherpenisse.net>
%% @copyright 2014-2020 Arjan Scherpenisse
%% @doc Postgresql pool worker

%% Copyright 2014-2020 Arjan Scherpenisse
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(z_db_pgsql).
-behaviour(gen_server).

-behaviour(poolboy_worker).
-behaviour(z_db_worker).

-include("zotonic.hrl").
-include_lib("epgsql/include/epgsql.hrl").

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
         code_change/3]).

%% poolboy_worker callbacks
-export([start_link/1]).

%% z_db_worker callbacks
-export([
    pool_return_connection/2,
    pool_get_connection/1,
    is_connection_alive/1,

    ensure_all_started/0,
    test_connection/1,
    squery/3,
    equery/4,
    get_raw_connection/1
]).

%% Used by the z_install_update to access props columns
-export([
    decode_value/1
    ]).

-define(TERM_MAGIC_NUMBER, 16#01326A3A:1/big-unsigned-unit:32).

-define(CONNECT_TIMEOUT, 5000).
-define(IDLE_TIMEOUT, 60000).

-define(CONNECT_RETRIES, 50).
-define(CONNECT_RETRY_SHORT,   100).
-define(CONNECT_RETRY_MIDDLE, 1000).
-define(CONNECT_RETRY_SLEEP, 10000).

%% @doc Threshold above which we do an automatic explain of traced queries.
 -define(DBTRACE_EXPLAIN_MSEC, 100).

-record(state, {
    conn,
    conn_args = undefined :: undefined | list(),
    busy_monitor = undefined :: undefined | reference(),
    busy_pid = undefined :: undefined | pid(),
    busy_ref = undefined :: undefined | reference(),
    busy_timeout = undefined :: undefined | integer(),
    busy_start = undefined :: undefined | pos_integer(),
    busy_sql = undefined :: undefined | string(),
    busy_params = [] :: list(),
    busy_tracing = false :: boolean()
}).

-type query_result() :: epgsql:reply(epgsql:equery_row())
                      | epgsql:reply(epgsql:squery_row()).

-export_type([ query_result/0 ]).


%%
%% API
%%

start_link(Args) when is_list(Args) ->
    gen_server:start_link(?MODULE, Args, []).

-spec test_connection( list() ) -> ok | {error, term()}.
test_connection(Args) ->
    case try_connect_tcp(Args) of
        ok ->
            test_connection_1(Args);
        {error, _} = Error ->
            Error
    end.

ensure_all_started() ->
    application:ensure_all_started(epgsql).

test_connection_1(Args) ->
    case connect(Args) of
        {ok, Conn} ->
            {dbschema, Schema} = proplists:lookup(dbschema, Args),
            case z_db:schema_exists_conn(Conn, Schema) of
                true ->
                    epgsql:close(Conn),
                    ok;
                false ->
                    epgsql:close(Conn),
                    {error, noschema}
            end;
        {error, _} = E ->
            E
    end.

-spec is_connection_alive( pid() ) -> boolean().
is_connection_alive(Worker) ->
    erlang:is_process_alive(Worker).

-spec pool_get_connection( z:context() ) -> {ok, pid()} | {error, term()}.
pool_get_connection(Context) ->
    z_db_pool:get_connection(Context).

-spec pool_return_connection( pid(), z:context() ) -> ok | {error, term()}.
pool_return_connection(Worker, Context) ->
    case is_connection_alive(Worker) of
        true ->
            try
                case gen_server:call(Worker, {pool_return_connection_check, self()}) of
                    ok ->
                        z_db_pool:return_connection(Worker, Context);
                    {error, _} = Error ->
                        Error
                end
            catch
                exit:Reason:Stack ->
                    z_context:logger_md(Context),
                    ?LOG_ERROR(#{
                        text => <<"Return connection failed.">>,
                        in => zotonic_core,
                        result => exit,
                        reason => Reason,
                        stack => Stack,
                        worker_pid => Worker
                    }),
                    {error, Reason}
            end;
        false ->
            {error, connection_down}
    end.

%% @doc Simple query without parameters, the query is interrupted if it takes
%%      longer than Timeout msec.
-spec squery( pid(), string() | binary(), pos_integer() ) -> query_result().
squery(Worker, Sql, Timeout) ->
    case is_connection_alive(Worker) of
        true ->
            case fetch_conn(Worker, Sql, [], Timeout) of
                {ok, {Conn, Ref}} ->
                    Result = epgsql:squery(Conn, Sql),
                    ok = return_conn(Worker, Ref),
                    decode_reply(Result);
                {error, _} = Error ->
                    Error
            end;
        false ->
            {error, connection_down}
    end.

%% @doc Query with parameters, the query is interrupted if it takes
%%      longer than Timeout msec.
-spec equery( pid(), string() | binary(), list(), pos_integer() ) -> query_result().
equery(Worker, Sql, Parameters, Timeout) ->
    case is_connection_alive(Worker) of
        true ->
            case fetch_conn(Worker, Sql, Parameters, Timeout) of
                {ok, {Conn, Ref}} ->
                    Result = epgsql:equery(Conn, Sql, encode_values(Parameters)),
                    ok = return_conn(Worker, Ref),
                    decode_reply(Result);
                {error, _} = Error ->
                    Error
            end;
        false ->
            {error, connection_down}
    end.

%% @doc Request the SQL connection from the worker. The query is passed for logging
% purposes. This caller will do the query using the returned connection.
-spec fetch_conn( pid(), string() | binary(), list(), pos_integer() ) -> {ok, {pid(), reference()}} | {error, term()}.
fetch_conn(Worker, Sql, Parameters, Timeout) ->
    case is_connection_alive(Worker) of
        true ->
            try
                Ref = erlang:make_ref(),
                {ok, Conn} = gen_server:call(Worker, {fetch_conn, Ref, self(), Sql, Parameters, Timeout, is_tracing()}),
                {ok, {Conn, Ref}}
            catch
                exit:Reason:Stack ->
                    ?LOG_ERROR(#{
                        text => <<"Fetch connection failed.">>,
                        in => zotonic_core,
                        result => exit,
                        reason => Reason,
                        stack => Stack,
                        worker_pid => Worker,
                        sql => Sql
                    }),
                    {error, Reason}
            end;
        false ->
            {error, connection_down}
    end.

%% @doc Return the SQL connection to the worker, must be done within the timeout
%%      specified in the fetch_conn/4 call.
-spec return_conn(pid(), reference()) -> ok | {error, term()}.
return_conn(Worker, Ref) ->
    case is_connection_alive(Worker) of
        true ->
            gen_server:call(Worker, {return_conn, Ref, self()});
        false ->
            {error, connection_down}
    end.


%% @doc Return the tracing flag from the process dictionary.
-spec is_tracing() -> boolean().
is_tracing() ->
    case erlang:get(is_dbtrace) of
        true -> true;
        _ -> false
    end.

%% @doc This function MUST NOT be used, but currently is required by the
%% install / upgrade routines. Can only be called from inside a
%% z_db:transaction/2.
get_raw_connection(#context{dbc=Worker}) when Worker =/= undefined ->
    gen_server:call(Worker, get_raw_connection).


%%
%% gen_server callbacks
%%

init(Args) ->
    %% Start disconnected
    process_flag(trap_exit, true),
    {ok, #state{conn=undefined, conn_args=Args}, ?IDLE_TIMEOUT}.

handle_call({pool_return_connection_check, _CallerPid}, _From, #state{ busy_pid = undefined } = State) ->
    {reply, ok, State};
handle_call({pool_return_connection_check, CallerPid}, From, #state{
            busy_pid = Pid,
            busy_sql = Sql,
            busy_params = Params
        } = State) ->
    ?LOG_ERROR(#{
        text => <<"Connection return to pool by but still running">>,
        in => zotonic_core,
        result => error,
        reason => running,
        request_pid => CallerPid,
        busy_pid => Pid,
        query => Sql,
        args => Params,
        worker_pid => self()
    }),
    gen_server:reply(From, {error, checkin_busy}),
    State1 = disconnect(State, checkin_busy),
    {stop, normal, State1};

handle_call({fetch_conn, _Ref, _CallerPid, _Sql, _Params, _Timeout, _IsTracing} = Cmd, From,
            #state{ busy_pid = undefined, conn = undefined, conn_args = Args } = State) ->
    case connect(Args, From) of
        {ok, Conn} ->
            erlang:monitor(process, Conn),
            handle_call(Cmd, From, State#state{conn=Conn});
        {error, _} = E ->
            {reply, E, State}
    end;

handle_call({fetch_conn, Ref, CallerPid, Sql, Params, Timeout, IsTracing}, _From, #state{ busy_pid = undefined } = State) ->
    Start = trace_start(),
    State1 = State#state{
        busy_monitor = erlang:monitor(process, CallerPid),
        busy_pid = CallerPid,
        busy_ref = Ref,
        busy_timeout = Timeout,
        busy_start = Start,
        busy_sql = Sql,
        busy_params = Params,
        busy_tracing = IsTracing
    },
    {reply, {ok, State#state.conn}, State1, Timeout};

handle_call({fetch_conn, _Ref, CallerPid, Sql, Params, _Timeout, _IsTracing}, From, #state{ busy_pid = OtherPid } = State)
    when CallerPid =:= OtherPid ->
    % Caller is confused - starting a request whilst the current request isn't finished yet.
    % Log an error, stop the running query, and kill this worker.
    % No hope of recovery, as the caller is in an illegal state reusing this connection
    % for multiple queries.
    ?LOG_ERROR(#{
        text => <<"Connection requested but already using same connection">>,
        in => zotonic_core,
        result => error,
        reason => busy,
        request_pid => CallerPid,
        query => Sql,
        args => Params,
        worker_pid => self()
    }),
    gen_server:reply(From, {error, busy}),
    State1 = disconnect(State, busy),
    {stop, normal, State1};

handle_call({fetch_conn, _Ref, CallerPid, Sql, Params, _Timeout, _IsTracing}, _From, #state{ busy_pid = OtherPid } = State) ->
    % This can happen if a connection is shared by two processes.
    % Deny the request and continue with the running request.
    ?LOG_ERROR(#{
        text => <<"Connection requested but in use by other pid">>,
        in => zotonic_core,
        result => error,
        reason => busy,
        request_pid => CallerPid,
        busy_pid => OtherPid,
        query => Sql,
        args => Params,
        worker_pid => self()
    }),
    {reply, {error, busy}, State, timeout(State)};

handle_call({return_conn, Ref, Pid}, _From,
        #state{
            busy_monitor = Monitor,
            busy_ref = Ref,
            busy_pid = Pid,
            busy_sql = Sql,
            busy_params = Params,
            busy_start = Start,
            busy_tracing = IsTracing,
            conn = Conn
        } = State) ->
    erlang:demonitor(Monitor),
    trace_end(IsTracing, Start, Sql, Params, Conn),
    State1 = reset_busy_state(State),
    {reply, ok, State1, timeout(State1)};

handle_call({return_conn, _Ref, Pid}, _From, #state{ busy_pid = undefined } = State) ->
    ?LOG_ERROR(#{
        text => <<"SQL connection returned but not in use.">>,
        in => zotonic_core,
        request_pid => Pid
    }),
    {reply, {error, idle}, State, timeout(State)};

handle_call({return_conn, _Ref, Pid}, _From, #state{ busy_pid = OtherPid } = State) ->
    ?LOG_ERROR(#{
        text => <<"SQL connection returned but in use by other pid">>,
        in => zotonic_core,
        result => error,
        reason => busy,
        request_pid => Pid,
        busy_pid => OtherPid,
        worker_pid => self()
    }),
    {reply, {error, notyours}, State, timeout(State)};

handle_call(get_raw_connection, From, #state{ conn = undefined, conn_args = Args } = State) ->
    case connect(Args, From) of
        {ok, Conn} ->
            erlang:monitor(process, Conn),
            handle_call(get_raw_connection, From, State#state{conn=Conn});
        {error, _} = E ->
            {reply, E, State}
    end;
handle_call(get_raw_connection, _From, #state{ conn = Conn } = State) ->
    {reply, Conn, State, timeout(State)};

handle_call(Message, _From, State) ->
    ?LOG_NOTICE(#{
        text => <<"SQL unexpected call message">>,
        in => zotonic_core,
        request => Message,
        worker_pid => self()
    }),
    {reply, {error, unknown_call}, State, timeout(State)}.


handle_cast(_Msg, State) ->
    {noreply, State, ?IDLE_TIMEOUT}.

handle_info(disconnect, #state{ conn = undefined } = State) ->
    {noreply, State};


handle_info(disconnect, #state{ busy_pid = undefined } = State) ->
    Database = get_arg(dbdatabase, State#state.conn_args),
    Schema = get_arg(dbschema, State#state.conn_args),
    ?LOG_DEBUG(#{
        text => <<"SQL closing connection">>,
        in => zotonic_core,
        database => Database,
        schema => Schema,
        worker_pid => self()
    }),
    {noreply, disconnect(State, disconnect), hibernate};

handle_info(disconnect, State) ->
    Database = get_arg(dbdatabase, State#state.conn_args),
    Schema = get_arg(dbschema, State#state.conn_args),
    ?LOG_ERROR(#{
        text => <<"SQL disconnect whilst busy with query">>,
        in => zotonic_core,
        result => error,
        reason => disconnect,
        database => Database,
        schema => Schema,
        query => State#state.busy_sql,
        args => State#state.busy_params,
        worker_pid => self()
    }),
    {noreply, State, disconnect(State, disconnect), hibernate};

handle_info(timeout, #state{ busy_pid = undefined } = State) ->
    % Idle timeout - no SQL query is running
    {noreply, disconnect(State, idle), hibernate};

handle_info(timeout, #state{
        busy_pid = Pid,
        busy_sql = Sql,
        busy_params = Params,
        busy_timeout = Timeout
    } = State) ->
    % Query timeout - pull the connection from underneath the caller
    % The connection needs to be killed to stop the out-of-bounds query
    % on the db server. This to prevent that long running queries are
    % filling up all our connections and also slowing down the database.
    Database = get_arg(dbdatabase, State#state.conn_args),
    Schema = get_arg(dbschema, State#state.conn_args),
    ?LOG_ERROR(#{
        text => <<"SQL Timeout">>,
        in => zotonic_core,
        result => error,
        reason => timeout,
        busy_pid => Pid,
        timeout => Timeout,
        database => Database,
        schema => Schema,
        query => Sql,
        args => Params,
        worker_pid => self()
    }),
    State1 = disconnect(State, sql_timeout),
    {stop, normal, State1};

handle_info({'DOWN', _Ref, process, BusyPid, Reason}, #state{
        busy_pid = BusyPid,
        busy_sql = Sql,
        busy_params = Params
    } = State) ->
    % The process using our connection is down.
    % As it might have been in a transaction, we just kill
    % the connection and let the database clean up.
    Database = get_arg(dbdatabase, State#state.conn_args),
    Schema = get_arg(dbschema, State#state.conn_args),
    ?LOG_NOTICE(#{
        text => <<"SQL caller down during query">>,
        in => zotonic_core,
        result => error,
        reason => Reason,
        busy_pid => BusyPid,
        database => Database,
        schema => Schema,
        query => Sql,
        args => Params,
        worker_pid => self()
    }),
    {noreply, disconnect(State, sql_timeout), hibernate};

handle_info({'DOWN', _Ref, process, ConnPid, Reason}, #state{
        conn = ConnPid,
        busy_pid = BusyPid,
        busy_sql = Sql,
        busy_params = Params
    } = State) when is_pid(BusyPid) ->
    % Unexpected DOWN from the connection during query
    Database = get_arg(dbdatabase, State#state.conn_args),
    Schema = get_arg(dbschema, State#state.conn_args),
    ?LOG_ERROR(#{
        text => <<"SQL connection drop during query">>,
        in => zotonic_core,
        result => error,
        reason => Reason,
        busy_pid => BusyPid,
        database => Database,
        schema => Schema,
        query => Sql,
        args => Params,
        worker_pid => self()
    }),
    State1 = State#state{ conn = undefined },
    {noreply, disconnect(State1, sql_conn_down), hibernate};

handle_info({'DOWN', _Ref, process, Pid, _Reason}, #state{ conn = Pid } = State) ->
    % Connection down, no processes running, ok to hibernate
    State1 = State#state{ conn = undefined },
    {noreply, disconnect(State1, sql_conn_down), hibernate};

handle_info({'DOWN', _Ref, process, _Pid, _Reason}, #state{ busy_pid = undefined } = State) ->
    % Might be a late down message from the busy pid, ignore.
    {noreply, State, timeout(State)};

handle_info({'DOWN', _Ref, process, Pid, Reason}, State) ->
    % Stray 'DOWN' message, might be a race condition.
    ?LOG_NOTICE(#{
        text => <<"SQL got 'DOWN' message from unknown process">>,
        in => zotonic_core,
        down_pid => Pid,
        down_reason => Reason,
        state => State,
        worker_pid => self()
    }),
    {noreply, State, timeout(State)};

handle_info({'EXIT', _Pid, _Reason}, State) ->
    % Ignore - we have monitors for the connection and the request caller.
    {noreply, State};

handle_info(Info, State) ->
    ?LOG_WARNING(#{
        text => <<"SQL unexpected info message">>,
        in => zotonic_core,
        message => Info,
        state => State,
        worker_pid => self()
    }),
    {noreply, State, timeout(State)}.

terminate(_Reason, #state{} = State) ->
    disconnect(State, sql_conn_terminate),
    ok.

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.


%%
%% Helper functions
%%

%% @doc Close the connection to the SQL server
disconnect(#state{ conn = undefined } = State, Reason) ->
    kill_busy(State, Reason);
disconnect(#state{ conn = Conn } = State, Reason) ->
    ok = epgsql:close(Conn),
    State1 = receive
        {'DOWN', _Ref, process, Conn, _Reason} ->
            % The SQL connection sent the error to the busy pid
            reset_busy_state(State)
        after 500 ->
            % Assume busy pid did not receive the error, kill it
            kill_busy(State, Reason)
    end,
    State1#state{ conn = undefined }.

%% @doc Kill the busy process.
kill_busy(#state{ busy_pid = Pid } = State, Reason) when is_pid(Pid) ->
    #state{
        busy_monitor = Monitor,
        busy_sql = Sql,
        busy_params = Params,
        busy_start = Start,
        busy_tracing = IsTracing,
        conn = Conn
    } = State,
    erlang:demonitor(Monitor),
    erlang:exit(Pid, Reason),
    trace_end(IsTracing, Start, Sql, Params, Conn),
    reset_busy_state(State);
kill_busy(State, _Reason) ->
    reset_busy_state(State).


reset_busy_state(State) ->
    State#state{
        busy_monitor = undefined,
        busy_pid = undefined,
        busy_ref = undefined,
        busy_timeout = undefined,
        busy_start = undefined,
        busy_sql = undefined,
        busy_params = []
    }.

%% @doc Calculate the remaining timeout for the running query.
timeout(#state{ busy_timeout = undefined }) ->
    ?IDLE_TIMEOUT;
timeout(#state{ busy_timeout = Timeout, busy_start = Start }) ->
    Now = msec(),
    erlang:max(1, Timeout - (Now - Start)).

try_connect_tcp(Args) ->
    Addr = get_arg(dbhost, Args),
    Port = get_arg(dbport, Args),
    SockOpts = [{active, false}, {packet, raw}, binary],
    case gen_tcp:connect(Addr, Port, SockOpts, ?CONNECT_TIMEOUT) of
        {ok, Sock} ->
            gen_tcp:close(Sock),
            ok;
        {error, _} = Error ->
            Error
    end.

connect(Args) when is_list(Args) ->
    connect(Args, 0, undefined).

connect(Args, {Pid, _Ref}) when is_list(Args) ->
    MRef = monitor(process, Pid),
    Result = connect(Args, 0, MRef),
    demonitor(MRef),
    Result.

connect(_Args, RetryCt, _MRef) when RetryCt >= ?CONNECT_RETRIES ->
    {error, econnrefused};
connect(Args, RetryCt, undefined) ->
    connect_1(Args, RetryCt, undefined);
connect(Args, RetryCt, MRef) ->
    receive
        {'DOWN', MRef, process, _Pid, _Reson} ->
            {error, caller_down}
    after 0 ->
        connect_1(Args, RetryCt, MRef)
    end.

% Suppress warning about epgsql_connect not returning {error, econnrefused}
% It is returning it, but the type spec in epgsql is wrong.
-dialyzer({nowarn_function, connect_1/3}).
connect_1(Args, RetryCt, MRef) ->
    Hostname = get_arg(dbhost, Args),
    Port = get_arg(dbport, Args),
    Database = get_arg(dbdatabase, Args),
    Username = get_arg(dbuser, Args),
    Password = get_arg(dbpassword, Args),
    Schema = get_arg(dbschema, Args),
    try
        case epgsql:connect(Hostname, Username, Password,
                           [{database, Database}, {port, Port}]) of
            {ok, Conn} ->
                set_schema(Conn, Schema);
            {error, #error{ codename = too_many_connections }} ->
                retry(Args, too_many_connections, RetryCt, MRef);
            {error, #error{ codename = out_of_memory }} ->
                retry(Args, out_of_memory, RetryCt, MRef);
            {error, #error{ codename = admin_shutdown }} ->
                retry(Args, admin_shutdown, RetryCt, MRef);
            {error, #error{ codename = crash_shutdown }} ->
                retry(Args, crash_shutdown, RetryCt, MRef);
            {error, #error{ codename = cannot_connect_now }} ->
                retry(Args, cannot_connect_now, RetryCt, MRef);
            {error, econnrefused} ->
                retry(Args, econnrefused, RetryCt, MRef);
            {error, Reason} = E ->
                ?LOG_WARNING(#{
                    text => <<"Database psql connect returned error">>,
                    in => zotonic_core,
                    database => Database,
                    schema => Schema,
                    username => Username,
                    hostname => Hostname,
                    port => Port,
                    result => error,
                    reason => Reason
                }),
                E
        end
    catch
        A:B ->
            retry(Args, {A, B}, RetryCt, MRef)
    end.

set_schema(Conn, Schema) ->
    case epgsql:squery(Conn,"SET TIME ZONE 'UTC'; SET search_path TO \"" ++ Schema ++ "\"") of
        [{ok, [], []}, {ok, [], []}] ->
            {ok, Conn};
        Error ->
            catch epgsql:close(Conn),
            {error, Error}
    end.

%% @doc Retry connection to PostgreSQL server.
retry(Args, Reason, RetryCt, MRef) ->
    Hostname = get_arg(dbhost, Args),
    Port = get_arg(dbport, Args),
    Delay = retry_delay(Reason, RetryCt),
    ?LOG_WARNING(#{
        text => <<"Database psql connect failed">>,
        in => zotonic_core,
        hostname => Hostname,
        port => Port,
        result => error,
        reason => Reason,
        retry_msec => Delay,
        worker_pid => self()
    }),
    maybe_close_connections(Reason),
    timer:sleep(Delay),
    connect(Args, RetryCt + 1, MRef).

maybe_close_connections(out_of_memory) ->
    z_db_pool:close_connections();
maybe_close_connections(too_many_connections) ->
    z_db_pool:close_connections();
maybe_close_connections(_) ->
    nop.

retry_delay(_, RetryCount) when RetryCount < 2 ->
    ?CONNECT_RETRY_SHORT;
retry_delay(too_many_connections, _) ->
    ?CONNECT_RETRY_MIDDLE;
retry_delay(_, _RetryCount)  ->
    ?CONNECT_RETRY_SLEEP.


get_arg(K, Args) ->
    maybe_default(K, proplists:get_value(K, Args)).

maybe_default(dbport, 0) -> z_config:get(dbport);
maybe_default(K, undefined) -> z_config:get(K);
maybe_default(K, "") -> z_config:get(K);
maybe_default(K, <<>>) -> z_config:get(K);
maybe_default(_K, V) -> V.

%%
%% Request tracing
%%

trace_start() ->
    msec().

trace_end(false, _Start, _Sql, _Params, _Conn) ->
    ok;
trace_end(true, Start, Sql, Params, Conn) ->
    Duration = msec() - Start,
    ?LOG_NOTICE(#{
        text => <<"SQL TRACE">>,
        in => zotonic_core,
        msec => Duration,
        sql => Sql,
        params => Params
    }),
    maybe_explain(Duration, Sql, Params, Conn).

maybe_explain(Duration, _Sql, _Params, _Conn) when Duration < ?DBTRACE_EXPLAIN_MSEC ->
    ok;
maybe_explain(_Duration, Sql, Params, Conn) ->
    case is_explainable(z_string:to_lower(Sql)) of
        true ->
            Sql1 = "explain "++Sql,
            R = epgsql:equery(Conn, Sql1, encode_values(Params)),
            maybe_log_query_plan(R);
        false ->
            ok
    end.

is_explainable(<<"begin", _/binary>>) -> false;
is_explainable(<<"commit", _/binary>>) -> false;
is_explainable(<<"rollback", _/binary>>) -> false;
is_explainable(<<"explain ", _/binary>>) -> false;
is_explainable(<<"alter ", _/binary>>) -> false;
is_explainable(<<"drop ", _/binary>>) -> false;
is_explainable(<<"create ", _/binary>>) -> false;
is_explainable(_) -> true.

maybe_log_query_plan({ok, [ #column{ name = <<"QUERY PLAN">> } ], Rows}) ->
    Lines = lists:map( fun({R}) -> [ 10, R ] end, Rows ),
    ?LOG_NOTICE(#{
        text => <<"SQL EXPLAIN">>,
        in => zotonic_core,
        explain => iolist_to_binary(Lines)
    });
maybe_log_query_plan(Other) ->
    ?LOG_NOTICE(#{
        text => <<"SQL EXPLAIN">>,
        in => zotonic_core,
        explain => Other
    }),
    ok.

msec() ->
    {A, B, C} = os:timestamp(),
    A * 1000000000 + B * 1000 + C div 1000.

%%
%% These are conversion routines between how z_db expects values and how epgsl expects them.

%% Notable differences:
%% - Input values {term, ...} (use the ?DB_PROPS(...) macro!) are term_to_binary encoded and decoded
%% - null <-> undefined
%% - date/datetimes have a floating-point second argument in epgsql, in Zotonic they don't.

encode_values(L) when is_list(L) ->
    lists:map(fun encode_value/1, L).

encode_value(undefined) ->
    null;
encode_value({term, undefined}) ->
    null;
encode_value({term, Term}) ->
    B = term_to_binary(Term),
    <<?TERM_MAGIC_NUMBER, B/binary>>;
encode_value({term_json, undefined}) ->
    null;
encode_value({term_json, Term}) ->
    jsxrecord:encode(Term);
encode_value(Value) ->
    Value.


decode_reply({ok, Columns, Rows}) ->
    {ok, Columns, lists:map(fun decode_values/1, Rows)};
decode_reply({ok, Nr, Columns, Rows}) ->
    {ok, Nr, Columns, lists:map(fun decode_values/1, Rows)};
decode_reply(R) ->
    R.

decode_values(T) when is_tuple(T) ->
    list_to_tuple(decode_values(tuple_to_list(T)));
decode_values(L) when is_list(L) ->
    lists:map(fun decode_value/1, L).

decode_value({V}) ->
    {decode_value(V)};

decode_value(null) ->
    undefined;
decode_value(<<?TERM_MAGIC_NUMBER, B/binary>>) ->
    binary_to_term(B);
decode_value({H,M,S}) when is_float(S) ->
    {H,M,trunc(S)};
decode_value({{Y,Mm,D},{H,M,S}}) when is_float(S) ->
    {{Y,Mm,D},{H,M,trunc(S)}};
decode_value(V) ->
    V.