src/db/z_db.erl

%% @author Marc Worrell <marc@worrell.nl>
%% @copyright 2009-2021 Marc Worrell
%% @doc Interface to database, uses database definition from Context

%% Copyright 2009-2021 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%%     http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.

-module(z_db).
-author("Marc Worrell <marc@worrell.nl>").
-author("Arjan Scherpenisse <arjan@scherpenisse.net>").

-define(TIMEOUT, 30000).

%% interface functions
-export([

    has_connection/1,
    database_version_string/1,
    database_version/1,

    transaction/2,
    transaction/3,
    transaction_clear/1,

    assoc_row/2,
    assoc_row/3,
    assoc_props_row/2,
    assoc_props_row/3,
    assoc/2,
    assoc/3,
    assoc/4,
    assoc_props/2,
    assoc_props/3,
    assoc_props/4,

    qmap_row/2,
    qmap_row/3,
    qmap_row/4,
    qmap_props_row/2,
    qmap_props_row/3,
    qmap_props_row/4,
    qmap/2,
    qmap/3,
    qmap/4,
    qmap_props/2,
    qmap_props/3,
    qmap_props/4,

    q/2,
    q/3,
    q/4,
    q1/2,
    q1/3,
    q1/4,
    q_row/2,
    q_row/3,

    squery/2,
    squery/3,
    equery/2,
    equery/3,
    equery/4,

    insert/2,
    insert/3,
    update/4,
    delete/3,
    select/3,

    column/3,
    columns/2,
    columns/3,
    column_names/2,
    column_names_bin/2,
    column_exists/3,
    to_column_value/4,

    estimate_rows/3,

    get_current_props/3,
    update_sequence/3,
    prepare_database/1,
    constraint_exists/3,
    key_exists/3,
    table_exists/2,
    create_table/3,
    drop_table/2,
    flush/1,

    assert_table_name/1,
    quoted_table_name/1,
    prepare_cols/2,

    ensure_database/2,
    ensure_schema/2,
    schema_exists_conn/2,
    drop_schema/1
]).

-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.

-type database_server() :: postgresql.

-type sql() :: string() | iodata().
-type query_error() :: nodb | enoent | epgsql:query_error() | term().
-type query_timeout() :: integer().

-type transaction_fun() :: fun((z:context()) -> term()).
-type table_name() :: atom() | string() | binary().
-type column_name() :: atom() | string() | binary().
-type schema_name() :: default | atom() | string() | binary().
-type parameters() :: list( parameter() ).
-type parameter() :: tuple()
                   | calendar:datetime()
                   | atom()
                   | string()
                   | binary()
                   | integer()
                   | boolean()
                   | float()
                   | list().

-type qmap_options() :: list( qmap_option() ).
-type qmap_option() :: {keys, binary|atom}
                     | {timeout, non_neg_integer()}.

-type props() :: proplists:proplist() | props_map().
-type props_map() :: #{ prop_key() => term() }.
-type prop_key() :: binary() | atom().

-type id() :: pos_integer().

-type query_result() :: {ok, Columns :: list(), Rows :: list()}
                      | {ok, Count :: non_neg_integer(), Columns :: list(), Rows :: list()}
                      | {ok, Count :: non_neg_integer()}
                      | {error, term()}.

-export_type([
    sql/0,
    query_error/0,
    query_timeout/0,
    query_result/0,
    transaction_fun/0,
    parameters/0,
    parameter/0,
    table_name/0,
    column_name/0,
    schema_name/0,
    props/0,
    props_map/0,
    prop_key/0,

    id/0,

    qmap_options/0,
    qmap_option/0
]).


-include_lib("zotonic.hrl").
-include_lib("epgsql/include/epgsql.hrl").

%% @doc Perform a function inside a transaction, do a rollback on exceptions
-spec transaction(transaction_fun(), z:context()) -> any() | {error, term()}.
transaction(Function, Context) ->
    transaction(Function, [], Context).

% @doc Perform a transaction with extra options. Default retry on deadlock
-spec transaction(transaction_fun(), list(), z:context()) -> any() | {error, term()}.
transaction(Function, Options, Context) ->
    z_context:logger_md(Context),
    Result = case transaction1(Function, Context) of
                {rollback, {{error, #error{ codename = deadlock_detected }}, Trace1}} ->
                    {rollback, {deadlock, Trace1}};
                {rollback, {{case_clause, {error, #error{ codename = deadlock_detected }}}, Trace1}} ->
                    {rollback, {deadlock, Trace1}};
                {rollback, {{badmatch, {error, #error{ codename = deadlock_detected }}}, Trace1}} ->
                    {rollback, {deadlock, Trace1}};
                Other ->
                    Other
            end,
    case Result of
        {rollback, {deadlock, Stack}} = DeadlockError ->
            case proplists:get_value(noretry_on_deadlock, Options) of
                true ->
                    ?LOG_ERROR(#{
                        text => <<"DEADLOCK on database transaction, NO RETRY">>,
                        in => zotonic_core,
                        stack => Stack
                    }),
                    DeadlockError;
                _False ->
                    ?LOG_WARNING(#{
                        text => <<"DEADLOCK on database transaction, will retry">>,
                        in => zotonic_core,
                        stack => Stack
                    }),
                    % Sleep random time, then retry transaction
                    timer:sleep(z_ids:number(100)),
                    transaction(Function, Options, Context)
            end;
        R ->
            R
    end.


% @doc Perform the transaction, return error when the transaction function crashed
transaction1(Function, #context{dbc=undefined} = Context) ->
    case has_connection(Context) of
        true ->
            case with_connection(
                fun(C) ->
                    Context1 = Context#context{dbc=C},
                    DbDriver = z_context:db_driver(Context),
                    try
                        case DbDriver:squery(C, "BEGIN", ?TIMEOUT) of
                            {ok, [], []} -> ok;
                            {error, _} = ErrorBegin -> throw(ErrorBegin)
                        end,
                        case Function(Context1) of
                            {rollback, R} ->
                                DbDriver:squery(C, "ROLLBACK", ?TIMEOUT),
                                R;
                            R ->
                                case DbDriver:is_connection_alive(C) of
                                    true ->
                                        case DbDriver:squery(C, "COMMIT", ?TIMEOUT) of
                                            {ok, [], []} -> ok;
                                            {error, _} = ErrorCommit ->
                                                z_notifier:notify_queue_flush(Context),
                                                throw(ErrorCommit)
                                        end,
                                        R;
                                    false ->
                                        {rollback, {error, connection_down}}
                                end
                        end
                    catch
                        E:Why:S ->
                            ?LOG_ERROR(#{
                                text => <<"Error on database transaction">>,
                                in => zotonic_core,
                                result => E,
                                reason => Why,
                                stack => S
                            }),
                            case DbDriver:is_connection_alive(C) of
                                true ->
                                    DbDriver:squery(C, "ROLLBACK", ?TIMEOUT);
                                false ->
                                    ok
                            end,
                            {rollback, {Why, S}}
                    end
                end,
                Context)
            of
                {rollback, _} = Result ->
                    z_notifier:notify_queue_flush(Context),
                    Result;
                Result ->
                    z_notifier:notify_queue(Context),
                    Result
            end;
        false ->
            {rollback, {no_database_connection, []}}
    end;
transaction1(Function, Context) ->
    % Nested transaction, only keep the outermost transaction
    Function(Context).


%% @doc Clear any transaction in the context, useful when starting a thread with this context.
transaction_clear(#context{dbc=undefined} = Context) ->
    Context;
transaction_clear(Context) ->
    Context#context{dbc=undefined}.


%% @doc Check if we have database connection up and runnng
-spec has_connection(z:context() | atom()) -> boolean().
has_connection(Site) when is_atom(Site) ->
    is_pid(erlang:whereis(z_db_pool:db_pool_name(Site)));
has_connection(Context) ->
    is_pid(erlang:whereis(z_context:db_pool(Context))).


%% @doc Return the version of the database. This is the long string describing the
%% database version. Returns the empty string if there is no database.
-spec database_version_string( z:context() ) -> binary().
database_version_string(Context) ->
    case has_connection(Context) of
        true ->
            z_db:q1("select version()", Context);
        false ->
            <<>>
    end.

%% @doc Return the version of the database. Returns {postgres, Major, Minor} for
%% the database being used.
-spec database_version( z:context() ) ->
      {ok, {database_server(), non_neg_integer(), non_neg_integer()}}
    | {error, no_database_connection}.
database_version(Context) ->
    case has_connection(Context) of
        true ->
            Ver = z_db:q1("show server_version", Context),
            {Major, Minor} = case binary:split(Ver, <<".">>, [ global ]) of
                [ A ] -> {binary_to_integer(A), 0};
                [ A, B | _ ] -> {binary_to_integer(A), binary_to_integer(B)}
            end,
            {ok, {postgresql, Major, Minor}};
        false ->
            {error, no_database_connection}
    end.

%% @doc Transaction handler safe function for fetching a db connection
-spec get_connection( z:context() ) -> {ok, pid()} | {error, nodatabase | none | full}.
get_connection(#context{dbc=undefined} = Context) ->
    case has_connection(Context) of
        true ->
            set_dbtrace_flag(Context),
            DbDriver = z_context:db_driver(Context),
            DbDriver:pool_get_connection(Context);
        false ->
            {error, nodatabase}
    end;
get_connection(Context) ->
    {ok, Context#context.dbc}.

set_dbtrace_flag(Context) ->
    case erlang:get(is_dbtrace) of
        true -> ok;
        false -> ok;
        _ ->
            IsTrace = case z_notifier:first({server_storage, secure_lookup, is_dbtrace, undefined}, Context) of
                {ok, true} -> true;
                _ -> false
            end,
            erlang:put(is_dbtrace, IsTrace),
            ok
    end.

%% @doc Transaction handler safe function for releasing a db connection
return_connection(C, Context=#context{dbc=undefined}) ->
    DbDriver = z_context:db_driver(Context),
    DbDriver:pool_return_connection(C, Context);
return_connection(_C, _Context) ->
    ok.

%% @doc Apply function F with a connection as parameter. Make sure the
%% connection is returned after usage.
-spec with_connection(fun(), z:context()) -> any().
with_connection(F, Context) ->
    with_connection(F, get_connection(Context), Context).

with_connection(F, {error, nodatabase}, _Context) ->
    F(none);
with_connection(_F, {error, _} = Error, _Context) ->
    Error;
with_connection(F, {ok, Connection}, Context) when is_pid(Connection) ->
    z_stats:record_event(db, requests, Context),
    try
        z_context:ensure_logger_md(Context),
        {Time, Result} = timer:tc(F, [Connection]),
        z_stats:record_duration(db, request, Time, Context),
        Result
    after
        return_connection(Connection, Context)
end.


%% ----------------------------------------------------------------
%% Query - return proplists
%% ----------------------------------------------------------------

-spec assoc_row(sql(), z:context()) -> proplists:proplist().
assoc_row(Sql, Context) ->
    assoc_row(Sql, [], Context).

-spec assoc_row(sql(), parameters(), z:context()) -> proplists:proplist() | undefined.
assoc_row(Sql, Parameters, Context) ->
    case assoc(Sql, Parameters, Context) of
        [Row|_] -> Row;
        [] -> undefined
    end.

-spec assoc_props_row(sql(), z:context()) -> proplists:proplist() | undefined.
assoc_props_row(Sql, Context) ->
    assoc_props_row(Sql, [], Context).

-spec assoc_props_row(sql(), list(), z:context()) -> proplists:proplist() | undefined.
assoc_props_row(Sql, Parameters, Context) ->
    case assoc_props(Sql, Parameters, Context) of
        [Row|_] -> Row;
        [] -> undefined
    end.

%% @doc Return property lists of the results of a query on the database in the Context
-spec assoc(sql(), z:context()) -> list( proplists:proplist() ).
assoc(Sql, Context) ->
    assoc(Sql, [], Context).

-spec assoc(sql(), list(), z:context()) -> list( proplists:proplist() ).
assoc(Sql, Parameters, #context{} = Context) ->
    assoc(Sql, Parameters, Context, ?TIMEOUT);
assoc(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
    assoc(Sql, [], Context, Timeout).

-spec assoc(sql(), list(), z:context(), integer()) -> list( proplists:proplist() ).
assoc(Sql, Parameters, Context, Timeout) ->
    DbDriver = z_context:db_driver(Context),
    F = fun
       (none) -> [];
       (C) ->
            case assoc1(DbDriver, C, Sql, Parameters, Timeout) of
                {ok, Result} when is_list(Result) ->
                    Result
            end
    end,
    with_connection(F, Context).


-spec assoc_props(sql(), z:context()) -> list( proplists:proplist() ).
assoc_props(Sql, Context) ->
    assoc_props(Sql, [], Context).

-spec assoc_props(sql(), list(), z:context()) -> list( proplists:proplist() ).
assoc_props(Sql, Parameters, #context{} = Context) ->
    assoc_props(Sql, Parameters, Context, ?TIMEOUT);
assoc_props(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
    assoc_props(Sql, [], Context, Timeout).

-spec assoc_props(sql(), list(), z:context(), integer()) -> list( proplists:proplist() ).
assoc_props(Sql, Parameters, Context, Timeout) ->
    DbDriver = z_context:db_driver(Context),
    F = fun
        (none) -> [];
        (C) ->
            case assoc1(DbDriver, C, Sql, Parameters, Timeout) of
                {ok, Result} when is_list(Result) ->
                    merge_props(Result)
            end
    end,
    with_connection(F, Context).


%% ----------------------------------------------------------------
%% Query - return maps and error codes
%% ----------------------------------------------------------------

-spec qmap_row( sql(), z:context() ) -> {ok, map()} | {error, query_error()}.
qmap_row(Sql, Context) ->
    qmap_row(Sql, [], [], Context).

-spec qmap_row( sql(), parameters(), z:context() ) -> {ok, map()} | {error, query_error()}.
qmap_row(Sql, Args, Context) ->
    qmap_row(Sql, Args, [], Context).

-spec qmap_row( sql(), parameters(), qmap_options(), z:context()) -> {ok, map()} | {error, query_error()}.
qmap_row(Sql, Args, Options, Context) ->
    case qmap(Sql, Args, Options, Context) of
        {ok, [ M | _ ]} when is_map(M) ->
            {ok, M};
        {ok, []} ->
            {error, enoent};
        Other ->
            Other
    end.

-spec qmap_props_row( sql(), z:context() ) -> {ok, map()} | {error, query_error()}.
qmap_props_row(Sql, Context) ->
    qmap_props_row(Sql, [], [], Context).

-spec qmap_props_row( sql(), parameters(), z:context() ) -> {ok, map()} | {error, query_error()}.
qmap_props_row(Sql, Args, Context) ->
    qmap_props_row(Sql, Args, [], Context).

-spec qmap_props_row( sql(), parameters(), qmap_options(), z:context()) -> {ok, map()} | {error, query_error()}.
qmap_props_row(Sql, Args, Options, Context) ->
    case qmap_props(Sql, Args, Options, Context) of
        {ok, [ M | _ ]} when is_map(M) ->
            {ok, M};
        {ok, []} ->
            {error, enoent};
        Other ->
            Other
    end.

-spec qmap( sql(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap(Sql, Context) ->
    qmap(Sql, [], [], Context).

-spec qmap( sql(), parameters(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap(Sql, Args, Context) ->
    qmap(Sql, Args, [], Context).

-spec qmap( sql(), parameters(), qmap_options(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap(Sql, Args, Options, Context) ->
    DbDriver = z_context:db_driver(Context),
    F = fun
        (none) ->
            {error, nodb};
        (C) ->
            Timeout = proplists:get_value(timeout, Options, ?TIMEOUT),
            Keys = proplists:get_value(keys, Options, binary),
            case DbDriver:equery(C, Sql, Args, Timeout) of
                {ok, _Affected, Cols, Rows} when is_list(Rows) ->
                    {ok, cols_map(Cols, Rows, false, Keys)};
                {ok, Cols, Rows} when is_list(Rows) ->
                    {ok, cols_map(Cols, Rows, false, Keys)};
                {ok, Value} when is_list(Value); is_integer(Value) ->
                    {ok, Value};
                {error, Reason} = Error ->
                    ?LOG_ERROR(#{
                        text => <<"z_db error in query">>,
                        in => zotonic_core,
                        result => error,
                        reason => Reason,
                        query => Sql,
                        args => Args
                    }),
                    Error
            end

    end,
    with_connection(F, Context).

-spec qmap_props( sql(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap_props(Sql, Context) ->
    qmap_props(Sql, [], [], Context).

-spec qmap_props( sql(), parameters(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap_props(Sql, Args, Context) ->
    qmap_props(Sql, Args, [], Context).

-spec qmap_props( sql(), parameters(), list(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap_props(Sql, Args, Options, Context) ->
    DbDriver = z_context:db_driver(Context),
    F = fun
        (none) ->
            {error, nodb};
        (C) ->
            Timeout = proplists:get_value(timeout, Options, ?TIMEOUT),
            Keys = proplists:get_value(keys, Options, binary),
            case DbDriver:equery(C, Sql, Args, Timeout) of
                {ok, _Affected, Cols, Rows} when is_list(Rows) ->
                    {ok, cols_map(Cols, Rows, true, Keys)};
                {ok, Cols, Rows} when is_list(Rows) ->
                    {ok, cols_map(Cols, Rows, true, Keys)};
                {ok, Value} when is_list(Value); is_integer(Value) ->
                    {ok, Value};
                {error, Reason} = Error ->
                    ?LOG_ERROR(#{
                        text => <<"z_db error in query">>,
                        in => zotonic_core,
                        result => error,
                        reason => Reason,
                        query => Sql,
                        args => Args
                    }),
                    Error
            end

    end,
    with_connection(F, Context).


%% @doc Make associative maps from all the rows in the result set.
cols_map(Cols, Rows, IsMergeProps, Keys) ->
    ColNames = [ Name || #column{ name = Name } <- Cols ],
    ColNames1 = case Keys of
        atom -> [ binary_to_atom(K, utf8) || K <- ColNames ];
        binary -> ColNames
    end,
    ColIndices = lists:zip( lists:seq(1, length(ColNames1)), ColNames1 ),
    lists:map(
        fun(Row) ->
            lists:foldl(
                fun
                    ({Nr, Col}, Acc) when IsMergeProps and (Col =:= props_json orelse Col =:= <<"props_json">>) ->
                        JSON = erlang:element(Nr, Row),
                        case is_binary(JSON) of
                            true ->
                                map_merge_props(jsxrecord:decode(JSON), Acc);
                            false ->
                                Acc
                        end;

                    ({Nr, Col}, Acc) when IsMergeProps and (Col =:= props orelse Col =:= <<"props">>) ->
                        Props = erlang:element(Nr, Row),
                        map_merge_props(Props, Acc);

                    ({Nr, Col}, Acc) ->
                        Acc#{ Col => erlang:element(Nr, Row) }
                end,
                #{},
                ColIndices)
        end,
        Rows).

map_merge_props(M, Acc) when is_map(M) ->
    maps:merge(M, Acc);
map_merge_props(Props, Acc) when is_list(Props) ->
    maps:merge(z_props:from_props(Props), Acc);
map_merge_props(_, Acc) ->
    Acc.


%% ----------------------------------------------------------------
%% Simple queries - return tuples or number of rows updated.
%% ----------------------------------------------------------------

q(Sql, Context) ->
    q(Sql, [], Context, ?TIMEOUT).

q(Sql, Parameters, #context{} = Context) ->
    q(Sql, Parameters, Context, ?TIMEOUT);
q(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
    q(Sql, [], Context, Timeout).

q(Sql, Parameters, Context, Timeout) ->
    F = fun
        (none) -> [];
        (C) ->
            DbDriver = z_context:db_driver(Context),
            case DbDriver:equery(C, Sql, Parameters, Timeout) of
                {ok, _Affected, _Cols, Rows} when is_list(Rows) -> Rows;
                {ok, _Cols, Rows} when is_list(Rows) -> Rows;
                {ok, Value} when is_list(Value); is_integer(Value) -> Value;
                {error, Reason} = Error ->
                    ?LOG_ERROR(#{
                        text => <<"z_db error in query">>,
                        in => zotonic_core,
                        result => error,
                        reason => Reason,
                        query => Sql,
                        args => Parameters
                    }),
                    throw(Error)
            end
    end,
    with_connection(F, Context).

q1(Sql, Context) ->
    q1(Sql, [], Context).

q1(Sql, Parameters, #context{} = Context) ->
    q1(Sql, Parameters, Context, ?TIMEOUT);
q1(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
    q1(Sql, [], Context, Timeout).

-spec q1(sql(), parameters(), #context{}, pos_integer()) -> term() | undefined.
q1(Sql, Parameters, Context, Timeout) ->
    F = fun
        (none) -> undefined;
        (C) ->
            DbDriver = z_context:db_driver(Context),
            case equery1(DbDriver, C, Sql, Parameters, Timeout) of
                {ok, Value} -> Value;
                {error, noresult} -> undefined;
                {error, Reason} = Error ->
                    ?LOG_ERROR(#{
                        text => <<"z_db error in query">>,
                        in => zotonic_core,
                        result => error,
                        reason => Reason,
                        query => Sql,
                        args => Parameters
                    }),
                    throw(Error)
            end
    end,
    with_connection(F, Context).


q_row(Sql, Context) ->
    q_row(Sql, [], Context).

q_row(Sql, Args, Context) ->
    case q(Sql, Args, Context) of
        [Row|_] -> Row;
        [] -> undefined
    end.


squery(Sql, Context) ->
    squery(Sql, Context, ?TIMEOUT).

squery(Sql, Context, Timeout) when is_integer(Timeout) ->
    F = fun(C) when C =:= none -> {error, noresult};
           (C) ->
                DbDriver = z_context:db_driver(Context),
                DbDriver:squery(C, Sql, Timeout)
        end,
    with_connection(F, Context).


equery(Sql, Context) ->
    equery(Sql, [], Context).

equery(Sql, Parameters, #context{} = Context) ->
    equery(Sql, Parameters, Context, ?TIMEOUT);
equery(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
    equery(Sql, [], Context, Timeout).

-spec equery(sql(), parameters(), z:context(), integer()) -> query_result().
equery(Sql, Parameters, Context, Timeout) ->
    F = fun(C) when C =:= none -> {error, noresult};
           (C) ->
                DbDriver = z_context:db_driver(Context),
                DbDriver:equery(C, Sql, Parameters, Timeout)
        end,
    with_connection(F, Context).


%% @doc Insert a new row in a table, use only default values.
-spec insert(table_name(), z:context()) -> {ok, pos_integer()|undefined} | {error, term()}.
insert(Table, Context) ->
    {_Schema, _Tab, QTab} = quoted_table_name(Table),
    with_connection(
        fun(C) ->
            DbDriver = z_context:db_driver(Context),
            equery1(DbDriver, C, "insert into "++QTab++" default values returning id")
        end,
        Context).


%% @doc Insert a row, setting the fields to the props. Unknown columns are
%% serialized in the props column. When the table has an 'id' column then the
%% new id is returned.
-spec insert(table_name(), props(), z:context()) -> {ok, integer()|undefined} | {error, term()}.
insert(Table, Parameters, Context) when is_list(Parameters) ->
    insert(Table, z_props:from_props(Parameters), Context);
insert(Table, Parameters, Context) ->
    {Schema, Tab, QTab} = quoted_table_name(Table),
    Cols = column_names_bin(Schema, Tab, Context),
    BinParams = ensure_binary_keys(Parameters),
    case prepare_cols(Cols, BinParams) of
        {ok, InsertProps} ->
            HasProps = maps:is_key(<<"props">>, InsertProps), 
            HasPropsJSON = maps:is_key(<<"props_json">>, InsertProps), 

            InsertProps1 = if HasPropsJSON ->
                                  #{<<"props_json">> := PropsJSONCol} = InsertProps,
                                  case is_map(PropsJSONCol) of
                                      true ->
                                          InsertProps#{
                                            <<"props_json">> => ?DB_PROPS_JSON(filter_empty_props(PropsJSONCol))
                                           };
                                      false ->
                                          InsertProps
                                  end;
                              HasProps ->
                                  #{<<"props">> := PropsCol} = InsertProps,
                                  case PropsCol of
                                      #{} ->
                                          InsertProps#{
                                            <<"props">> => ?DB_PROPS(filter_empty_props(PropsCol))
                                           };
                                      [ {_, _} | _ ] ->
                                          Props1 = z_props:from_props(PropsCol),
                                          InsertProps#{
                                            <<"props">> => ?DB_PROPS(filter_empty_props(Props1))
                                           };
                                      _ ->
                                          InsertProps
                                  end;
                              not HasPropsJSON and not HasProps ->
                                  InsertProps
                           end,

            %% Build the SQL insert statement
            {ColNames, ColParams} = lists:unzip( maps:to_list(InsertProps1) ),
            Sql = iolist_to_binary([
                "insert into ", QTab, " (\"",
                    lists:join("\", \"", ColNames),
                "\") values (",
                    lists:join(", ", [ [$$ | integer_to_list(N)] || N <- lists:seq(1, length(ColParams)) ]),
                ")"
            ]),
            FinalSql = case lists:member(<<"id">>, Cols) of
                true -> <<Sql/binary, " returning id">>;
                false -> Sql
            end,

            F = fun(C) ->
                 DbDriver = z_context:db_driver(Context),
                 case equery1(DbDriver, C, FinalSql, ColParams) of
                     {ok, Id} ->
                        {ok, Id};
                     {error, noresult} ->
                        {ok, undefined};
                     {error, #error{ codename = unique_violation }} = Error ->
                        ?LOG_NOTICE(#{ text => "z_db unique_violation in insert",
                                       table => Table,
                                       parameters => Parameters}),
                        Error;
                     {error, Reason} = Error ->
                        ?LOG_ERROR(#{ text => "z_db error in query",
                                      reason => Reason,
                                      query => FinalSql, 
                                      parameters => ColParams }),
                        Error
                 end
            end,
            with_connection(F, Context);
        {error, _} = Error ->
            Error
    end.


%% @doc Update a row in a table, merging the props list with any new props values
-spec update(table_name(), id(), props(), z:context()) -> {ok, RowsUpdated::integer()} | {error, term()}.
update(Table, Id, Parameters, Context) when is_list(Parameters) ->
    update(Table, Id, z_props:from_props(Parameters), Context);
update(Table, Id, Parameters, Context) when is_map(Parameters) ->
    {Schema, Tab, QTab} = quoted_table_name(Table),
    DbDriver = z_context:db_driver(Context),
    Cols = column_names_bin(Schema, Tab, Context),
    BinParams = ensure_binary_keys(Parameters),
    case prepare_cols(Cols, BinParams) of
        {ok, UpdateProps} ->
            F = fun(C) ->
                UpdateProps1 = update_merge_props(DbDriver, C, Table, Cols, Id, UpdateProps, Context),
                UpdateProps2 = update_map_atom_arrays(UpdateProps1),
                {ColNames, Params} = lists:unzip( maps:to_list(UpdateProps2) ),
                ColNamesNr = lists:zip(ColNames, lists:seq(2, length(ColNames)+1)),
                ColAssigns = [
                    ["\"", ColName, "\" = $", integer_to_list(Nr)]
                    || {ColName, Nr} <- ColNamesNr
                ],
                Sql = iolist_to_binary([
                    "update ", QTab, " set ",
                    lists:join(", ", ColAssigns),
                    " where id = $1"
                ]),
                case equery1(DbDriver, C, Sql, [Id | Params]) of
                    {ok, _RowsUpdated} = Ok -> Ok;
                    {error, Reason} = Error ->
                        ?LOG_ERROR(#{
                            text => <<"z_db error in query">>,
                            result => error,
                            reason => Reason,
                            query => Sql,
                            args => [Id | Params]
                        }),
                        Error
                end
            end,
            with_connection(F, Context);
        {error, _} = Error ->
            Error
    end.


%% @doc Estimate the number of rows matching a query. This uses the PostgreSQL query planner
%% to return an estimate of the number of rows.
-spec estimate_rows(Query, Args, Context) -> {ok, Rows} | {error, term()}
    when Query :: string() | binary(),
         Args :: list(),
         Context :: z:context(),
         Rows :: non_neg_integer().
estimate_rows(Query, Args, Context) ->
    Query1 = "explain " ++ z_convert:to_list(Query),
    try
        find_estimate( z_db:q(Query1, Args, Context) )
    catch
        throw:{error, _} = Error -> Error
    end.

find_estimate([]) ->
    {ok, 0};
find_estimate([{R}|Rs]) ->
    case re:run(R, <<" rows=([0-9]+)">>, [{capture, all_but_first, binary}]) of
        nomatch -> find_estimate(Rs);
        {match, [Rows]} -> {ok, binary_to_integer(Rows)}
    end.


get_current_props(Table, Id, Context) ->
    DBDriver = z_context:db_driver(Context),
    F = fun(C) ->
                get_current_props(DBDriver, C, Table, Id, Context)
        end,
    with_connection(F, Context).

get_current_props(DBDriver, Connection, Table, Id, Context) when is_atom(Table) ->
    get_current_props(DBDriver, Connection, atom_to_list(Table), Id, Context);
get_current_props(DBDriver, Connection, Table, Id, Context) ->
    ColNames = column_names(Table, Context), 
    get_current_props(DBDriver, Connection,
                      lists:member(props_json, ColNames), lists:member(props, ColNames), Table, Id, Context).

   
get_current_props(_DBDriver, _Connection, false, false, _Table, _Id, _Context) ->
    %% There is no props column
    {error, no_properties};
get_current_props(DBDriver, Connection, false, true, Table, Id, _Context) ->
    %% There is only a props column.
    case equery1(DBDriver, Connection, "select props from \"" ++ Table ++ "\" where id = $1", [Id]) of
        {ok, Props} when is_list(Props) -> {ok, z_props:from_props(Props)};
        {ok, Props} when is_map(Props) -> {ok, Props};
        _E ->
            {error, no_properties}
    end;
get_current_props(DBDriver, Connection, true, false, Table, Id, _Context) ->
    %% There is only a props_json column
    case equery1(DBDriver, Connection, "select props_json from \"" ++ Table ++ "\" where id = $1", [Id]) of
        {ok, JSON} when is_binary(JSON) ->
            {ok, jsxrecord:decode(JSON)};
        _ ->
            {error, no_properties}
    end;
get_current_props(DBDriver, Connection, true, true, Table, Id, _Context) ->
    %% There is a props_json, and a props column.
    R = case DBDriver:equery(Connection, "select props, props_json from \"" ++ Table ++ "\" where id = $1", [Id], ?TIMEOUT) of
        {ok, _Columns, []} -> {error, noresult};
        {ok, _RowCount, _Columns, []} -> {error, noresult};
        {ok, _Columns, [Row|_]} -> {ok, element(1, Row), element(2, Row)};
        {ok, _RowCount, _Columns, [Row|_]} -> {ok, element(1, Row), element(2, Row)};
        Other -> Other
    end,

    %% Merge the properties found in the columns, the props_json column gets priority.
    case R of
        {ok, Props, undefined} when is_list(Props) ->
            {ok, z_props:from_props(Props)};
        {ok, Props, undefined} when is_map(Props) ->
            {ok, Props};
        {ok, undefined, JSON} when is_binary(JSON) ->
            {ok, jsxrecord:decode(JSON)};
        {ok, Props, JSON} when is_list(Props) andalso is_binary(JSON) ->
            {ok, maps:merge(z_props:from_props(Props), jsxrecord:decode(JSON))};
        {ok, Props, JSON} when is_map(Props) andalso is_binary(JSON) ->
            {ok, maps:merge(Props, jsxrecord:decode(JSON))};
        _ ->
            {error, no_properties}
    end.

update_map_atom_arrays(Props) ->
    maps:map(
        fun
            (_K, [ A | _ ] = V) when is_atom(A) ->
                [ atom_to_binary(X, utf8) || X <- V ];
            (_K, V) ->
                V
        end,
        Props).

update_merge_props(DbDriver, Connection, Table, Cols, Id, #{ <<"props_json">> := NewProps }=UpdateProps, Context) ->
    P = case get_current_props(DbDriver, Connection, Table, Id, Context) of
            {ok, OldProps} ->
                NewProps1 = maps:merge(OldProps, NewProps),
                NewProps2 = maps:without(Cols, NewProps1),
                UpdateProps#{<<"props_json">> => ?DB_PROPS_JSON( drop_undefined(NewProps2) ) };
            _ ->
                UpdateProps#{<<"props_json">> => ?DB_PROPS_JSON( drop_undefined(NewProps) )}
        end,

    %% Clear the existing props column
    case lists:member(<<"props">>, Cols) of
        true -> P#{ <<"props">> => null };
        false -> P
    end;
update_merge_props(DbDriver, Connection, Table, Cols, Id, #{ <<"props">> := NewProps }=UpdateProps, Context) ->
    case get_current_props(DbDriver, Connection, Table, Id, Context) of
        {ok, OldProps} ->
            NewProps1 = maps:merge(OldProps, NewProps),
            NewProps2 = maps:without(Cols, NewProps1),
            UpdateProps#{ <<"props">> => ?DB_PROPS( drop_undefined(NewProps2) )};
        _ ->
            UpdateProps#{ <<"props">> => ?DB_PROPS( drop_undefined(NewProps) )}
    end;
update_merge_props(_DbDriver, _Connection, _Table, _Cols, _Id, #{}=UpdateProps, _Context) ->
    UpdateProps;
update_merge_props(DbDriver, Connection, Table, Cols, Id, NewProps, Context) when is_list(NewProps) ->
    Props1 = z_props:from_props(NewProps),
    update_merge_props(DbDriver, Connection, Table, Cols, Id, Props1, Context).


drop_undefined(Props) ->
    maps:fold(
        fun
            (_K, undefined, Acc) -> Acc;
            (K, V, Acc) -> Acc#{ K => V }
        end,
        #{},
        Props).

ensure_binary_keys(Ps) ->
    maps:fold(
        fun
            (K, V, Acc) when is_atom(K) ->
                Acc#{
                    atom_to_binary(K, utf8) => V
                };
            (K, V, Acc) when is_binary(K) ->
                Acc#{
                    K => V
                }
        end,
        #{},
        Ps).

filter_empty_props(Map) ->
    maps:filter(
        fun
            (_K, undefined) -> false;
            (_K, <<>>) -> false;
            (_K, #trans{ tr = Tr }) ->
                lists:any(
                    fun
                        ({_, <<>>}) -> false;
                        (_) -> true
                    end,
                    Tr);
            (_K, _V) ->
                true
        end,
        Map).


%% @doc Delete a row from a table, the row must have a column with the name 'id'
-spec delete(Table::table_name(), Id::integer(), z:context()) -> {ok, RowsDeleted::non_neg_integer()} | {error, term()}.
delete(Table, Id, Context) ->
    {_Schema, _Tab, QTab} = quoted_table_name(Table),
    Sql = "delete from "++QTab++" where id = $1",
    case equery(Sql, [Id], Context) of
        {ok, _} = Ok ->
            Ok;
        {error, _} = Error ->
            Error
    end.

%% @doc Read a row from a table, the row must have a column with the name 'id'.
%% The props column contents is merged with the other properties returned.
-spec select(table_name(), any(), z:context()) -> {ok, Row :: map()} | {error, term()}.
select(Table, Id, Context) ->
    select(Table, Id, [], Context).


-spec select(table_name(), any(), qmap_options(), z:context()) -> {ok, Row :: map()} | {error, term()}.
select(Table, Id, Options, Context) ->
    {_Schema, _Tab, QTab} = quoted_table_name(Table),
    Sql = "select * from "++QTab++" where id = $1 limit 1",
    qmap_props_row(Sql, [ Id ], Options, Context).


%% @doc Check if all cols are valid columns in the target table, move unknown properties
%%      to the props column (if exists).
prepare_cols(Cols, Props) ->
    case split_props(Props, Cols) of
        {ok, {CProps, PProps}} ->
            case maps:size(PProps) of
                0 ->
                    {ok, CProps};
                _  ->
                    case lists:member(<<"props_json">>, Cols) of
                        true ->
                            PropsCol = merge_properties(maps:get(<<"props_json">>, CProps, undefined), PProps),
                            {ok, CProps#{ <<"props_json">> => PropsCol }};
                        false ->
                            PropsCol = merge_properties(maps:get(<<"props">>, CProps, undefined), PProps),
                            {ok, CProps#{ <<"props">> => PropsCol }}
                    end
            end;
        {error, _} = Error ->
            Error
    end.

merge_properties(Properties, Props) when is_list(Properties) ->
    maps:merge(z_props:from_props(Properties), Props);
merge_properties(Properties, Props) when is_map(Properties) ->
    maps:merge(Properties, Props);
merge_properties(_, Props) ->
    Props.

split_props(Props, Cols) ->
    {CProps, PProps} = lists:foldl(
        fun(Col, {CAcc, PAcc}) ->
            case maps:find(Col, PAcc) of
                {ok, V} ->
                    PAcc1 = maps:remove(Col, PAcc),
                    CAcc1 = CAcc#{ Col => V },
                    {CAcc1, PAcc1};
                error ->
                    {CAcc, PAcc}
            end
        end,
        {#{}, Props},
        Cols),
    case maps:size(PProps) of
        0 ->
            {ok, {CProps, PProps}};
        _  ->
            case lists:member(<<"props_json">>, Cols) orelse lists:member(<<"props">>, Cols) of
                true ->
                    {ok, {CProps, PProps}};
                false ->
                    Ks = maps:keys(PProps),
                    {error, {unknown_column, Ks}}
            end
    end.


%% @doc Return a property list with all columns of the table. (example: [{id,int4,modifier},...])
-spec columns(table_name(), z:context()) -> list( #column_def{} ).
columns(Table, Context) ->
    {Schema, Table1, _QTab} = quoted_table_name(Table),
    columns(Schema, Table1, Context).

-spec columns(schema_name(), table_name(), z:context()) -> list( #column_def{} ).
columns(Schema, Table, Context) when is_binary(Table) ->
    columns(Schema, binary_to_list(Table), Context);
columns(Schema, Table, Context) when is_atom(Table) ->
    columns(Schema, atom_to_list(Table), Context);
columns(Schema, Table, Context) when is_list(Table) ->
    assert_table_name(Table),
    Options = z_db_pool:get_database_options(Context),
    Db = proplists:get_value(dbdatabase, Options),
    Schema1 = case Schema of
        default -> proplists:get_value(dbschema, Options);
        S when is_list(S) -> S;
        S when is_binary(S) -> S
    end,
    case z_depcache:get({columns, Db, Schema1, Table}, Context) of
        {ok, Cols} ->
            Cols;
        _ ->
            Cols = q("  select column_name, data_type, character_maximum_length,
                               is_nullable, column_default, udt_name
                        from information_schema.columns
                        where table_catalog = $1
                          and table_schema = $2
                          and table_name = $3
                        order by ordinal_position", [Db, Schema1, Table], Context),
            Cols1 = [ columns1(Col) || Col <- Cols ],
            z_depcache:set({columns, Db, Schema1, Table}, Cols1, ?YEAR, [{database, Db}], Context),
            Cols1
    end.


columns1({<<"id">>, <<"integer">>, undefined, Nullable, <<"nextval(", _/binary>>, _UdtName}) ->
    #column_def{
        name = id,
        type = "serial",
        length = undefined,
        is_nullable = z_convert:to_bool(Nullable),
        default = undefined
    };
columns1({Name, <<"ARRAY">>, _MaxLength, Nullable, Default, UdtName}) ->
    % @todo derive the type of the array elements.
    #column_def{
        name = z_convert:to_atom(Name),
        type = case UdtName of
            <<"_text">> -> "text";
            <<"_varchar">> -> "character varying";
            <<"_int", _/binary>> -> "integer";
            _ -> UdtName
        end,
        length = undefined,
        is_array = true,
        is_nullable = z_convert:to_bool(Nullable),
        default = column_default(Default)
    };
columns1({Name,Type,MaxLength,Nullable,Default,_UdtName}) ->
    #column_def{
        name = z_convert:to_atom(Name),
        type = z_convert:to_list(Type),
        length = MaxLength,
        is_nullable = z_convert:to_bool(Nullable),
        default = column_default(Default)
    }.

column_default(undefined) -> undefined;
column_default(<<"nextval(", _/binary>>) -> undefined;
column_default(Default) -> binary_to_list(Default).


-spec column( table_name(), column_name(), z:context()) ->
        {ok, #column_def{}} | {error, enoent}.
column(Table, Column, Context) ->
    {Schema, Table1, _QTab} = quoted_table_name(Table),
    column(Schema, Table1, Column, Context).

-spec column( schema_name(), table_name(), column_name(), z:context()) ->
        {ok, #column_def{}} | {error, enoent}.
column(Schema, Table, Column0, Context) ->
    Columns = columns(Schema, Table, Context),
    Column = to_existing_atom(Column0),
    case lists:filter(
        fun
            (#column_def{ name = Name }) when Name =:= Column -> true;
            (_) -> false
        end,
        Columns)
    of
        [] -> {error, enoent};
        [ #column_def{} = Col ] -> {ok, Col}
    end.

to_existing_atom(C) ->
    try
        to_existing_atom_1(C)
    catch
        error:badarg ->
            'ERROR'
    end.

to_existing_atom_1(C) when is_binary(C) ->
    binary_to_existing_atom(C, utf8);
to_existing_atom_1(C) when is_list(C) ->
    list_to_existing_atom(C);
to_existing_atom_1(C) when is_atom(C) ->
    C.

%% @doc Return a list with the column names of a table.  The names are sorted.
-spec column_names(table_name(), z:context()) -> list( atom() ).
column_names(Table, Context) ->
    {Schema, Table1, _QTab} = quoted_table_name(Table),
    column_names(Schema, Table1, Context).

-spec column_names(schema_name(), table_name(), z:context()) -> list( atom() ).
column_names(Schema, Table, Context) ->
    Names = [ C#column_def.name || C <- columns(Schema, Table, Context)],
    lists:sort(Names).

-spec column_names_bin(table_name(), z:context()) -> list( binary() ).
column_names_bin(Table, Context) ->
    [ atom_to_binary(Col, utf8) || Col <- column_names(Table, Context) ].

-spec column_names_bin(schema_name(), table_name(), z:context()) -> list( binary() ).
column_names_bin(Schema, Table, Context) ->
    [ atom_to_binary(Col, utf8) || Col <- column_names(Schema, Table, Context) ].

%% @doc Check if a column exists in a table.
-spec column_exists(table_name(), column_name(), z:context()) -> boolean().
column_exists(Table, Column, Context) when is_atom(Column) ->
    lists:member(Column, column_names(Table, Context));
column_exists(Table, Column, Context) when is_atom(Column) ->
    Col1 = z_convert:to_binary(Column),
    lists:member(Col1, column_names_bin(Table, Context)).


%% @doc Convert a value so that it is compatible with the column type
-spec to_column_value(table_name(), column_name(), term(), z:context()) -> {ok, term()} | {error, term()}.
to_column_value(Table, Column, Value, Context) ->
    case column(Table, Column, Context) of
        {ok, #column_def{ type = Type, length = Length }} ->
            try
                {ok, convert_value(Type, Length, Value)}
            catch
                _:Reason ->
                    {error, Reason}
            end;
        {error, _} = Error ->
            Error
    end.

convert_value("text", _, V) ->
    z_convert:to_binary(V);
convert_value("character varying", Len, V) ->
    V1 = z_convert:to_binary(V),
    z_string:truncatechars(V1, Len);
convert_value("integer", _, V) ->
    z_convert:to_integer(V);
convert_value("bigint", _, V) ->
    z_convert:to_integer(V);
convert_value("smallint", _, V) ->
    z_convert:to_integer(V);
convert_value("serial", _, V) ->
    z_convert:to_integer(V);
convert_value("bigserial", _, V) ->
    z_convert:to_integer(V);
convert_value("smallserial", _, V) ->
    z_convert:to_integer(V);
convert_value("numeric", _, V) ->
    z_convert:to_float(V);
convert_value("decimal", _, V) ->
    z_convert:to_float(V);
convert_value("float", _, V) ->
    z_convert:to_float(V);
convert_value("double", _, V) ->
    z_convert:to_float(V);
convert_value("boolean", _, V) ->
    z_convert:to_bool_strict(V);
convert_value("timestamp" ++ _, _, V) ->
    z_datetime:to_datetime(V);
convert_value("datetime" ++ _, _, V) ->
    z_datetime:to_datetime(V);
convert_value("date" ++ _, _, V) ->
    z_datetime:to_datetime(V);
convert_value("bytea", _, V) ->
    ?DB_PROPS(V);
convert_value("tsvector", _, V) ->
    z_convert:to_binary(V);
convert_value("ARRAY", _, V) when is_list(V) ->
    V;
convert_value("ARRAY", _, V) ->
    [ V ];
convert_value(Type, _, V) ->
    ?LOG_WARNING(#{
        text => <<"No type conversion for column type">>,
        type => Type,
        value => V
    }),
    V.


%% @doc Flush all cached information about the database.
flush(Context) ->
    Options = z_db_pool:get_database_options(Context),
    Db = proplists:get_value(dbdatabase, Options),
    z_depcache:flush({database, Db}, Context).


%% @doc Update the sequence of the ids in the table. They will be renumbered according to their position in the id list.
%% @todo Make the steps of the sequence bigger, and try to keep the old sequence numbers in tact (needs a diff routine)
-spec update_sequence(table_name(), list( integer() ), z:context()) -> any().
update_sequence(Table, Ids, Context) ->
    {_Schema, _Table, QTab} = quoted_table_name(Table),
    DbDriver = z_context:db_driver(Context),
    Args = lists:zip(Ids, lists:seq(1, length(Ids))),
    F = fun
        (none) ->
            [];
        (C) ->
            [ {ok, _} = equery1(DbDriver, C, "update "++QTab++" set seq = $2 where id = $1", tuple_to_list(Arg)) || Arg <- Args ]
    end,
    with_connection(F, Context).

%% @doc Create database and schema if they do not yet exist
-spec prepare_database(z:context()) -> ok | {error, term()}.
prepare_database(Context) ->
    Options = z_db_pool:get_database_options(Context),
    Site = z_context:site(Context),
    case ensure_database(Site, Options) of
        ok ->
            ensure_schema(Site, Options);
        {error, _} = Error ->
            Error
    end.

ensure_database(Site, Options) ->
    Database = proplists:get_value(dbdatabase, Options),
    case open_connection("postgres", Options) of
        {ok, PgConnection} ->
            Result = case database_exists(PgConnection, Database) of
                true ->
                    ok;
                false ->
                    AnonOptions = proplists:delete(dbpassword, Options),
                    ?LOG_NOTICE(#{
                        text => <<"Creating database">>,
                        database => Database,
                        options => AnonOptions
                    }),
                    create_database(Site, PgConnection, Database)
            end,
            close_connection(PgConnection),
            Result;
        {error, Reason} = Error ->
            ?LOG_ERROR(#{
                text => <<"Cannot create database because user cannot connect to the 'postgres' database">>,
                result => error,
                reason => Reason,
                database => Database,
                dbuser => proplists:get_value(dbuser, Options)
            }),
            Error
    end.

ensure_schema(Site, Options) ->
    Database = proplists:get_value(dbdatabase, Options),
    Schema = proplists:get_value(dbschema, Options),
    {ok, DbConnection} = open_connection(Database, Options),
    Result = case schema_exists_conn(DbConnection, Schema) of
        true ->
            ok;
        false ->
            ?LOG_NOTICE(#{
                text => <<"Creating schema in database">>,
                database => Database,
                schema => Schema
            }),
            create_schema(Site, DbConnection, Schema)
    end,
    close_connection(DbConnection),
    Result.

drop_schema(Context) ->
    Options = z_db_pool:get_database_options(Context),
    Schema = proplists:get_value(dbschema, Options),
    Database = proplists:get_value(dbdatabase, Options),
    case open_connection(Database, Options) of
        {ok, DbConnection} ->
            Result = case schema_exists_conn(DbConnection, Schema) of
                true ->
                    case epgsql:equery(
                        DbConnection,
                        "DROP SCHEMA \"" ++ Schema ++ "\" CASCADE"
                    ) of
                        {ok, _, _} = OK ->
                            ?LOG_NOTICE(#{
                                text => <<"Dropped schema">>,
                                result => ok,
                                database => Database,
                                schema => Schema,
                                return_value => OK
                            }),
                            ok;
                        {error, Reason} = Error ->
                            ?LOG_ERROR(#{
                                text => <<"z_db error when dropping schema">>,
                                result => error,
                                reason => Reason,
                                database => Database,
                                schema => Schema
                            }),
                            Error
                    end;
                false ->
                    ?LOG_WARNING(#{
                        text => <<"Could not drop schema as it does not exist">>,
                        result => warning,
                        reason => noschema,
                        database => Database,
                        schema => Schema
                    }),
                    ok
            end,
            close_connection(DbConnection),
            Result;
        {error, Reason} ->
            ?LOG_ERROR(#{
                text => <<"z_db error when connecting for dropping schema">>,
                result => error,
                reason => Reason,
                database => Database,
                schema => Schema
            }),
            ok
    end.

open_connection(DatabaseName, Options) ->
    epgsql:connect(
        proplists:get_value(dbhost, Options),
        proplists:get_value(dbuser, Options),
        proplists:get_value(dbpassword, Options),
        [
            {port, proplists:get_value(dbport, Options)},
            {database, DatabaseName}
        ]
    ).

close_connection(Connection) ->
    epgsql:close(Connection).

%% @doc Check whether database exists
-spec database_exists(epgsql:connection(), string()) -> boolean().
database_exists(Connection, Database) ->
    {ok, _, [{Count}]} = epgsql:equery(
        Connection,
        "SELECT COUNT(*) FROM pg_catalog.pg_database WHERE datname = $1",
        [Database]
    ),
    case z_convert:to_integer(Count) of
        0 -> false;
        1 -> true
    end.

%% @doc Create a database
-spec create_database(atom(), epgsql:connection(), string()) -> ok | {error, term()}.
create_database(_Site, Connection, Database) ->
    %% Use template0 to prevent ERROR: new encoding (UTF8) is incompatible with
    %% the encoding of the template database (SQL_ASCII)
    assert_database_name(Database),
    case epgsql:equery(
        Connection,
        "CREATE DATABASE \"" ++ Database ++ "\" ENCODING = 'UTF8' TEMPLATE template0"
    ) of
        {error, Reason} = Error ->
            ?LOG_ERROR(#{
                text => <<"z_db error when creating database">>,
                result => error,
                reason => Reason,
                database => Database
            }),
            Error;
        {ok, _, _} ->
            ok
    end.

%% @doc Check whether schema exists
-spec schema_exists_conn(epgsql:connection(), string()) -> boolean().
schema_exists_conn(Connection, Schema) ->
    {ok, _, Schemas} = epgsql:equery(
        Connection,
        "SELECT schema_name FROM information_schema.schemata"),
    lists:member( {z_convert:to_binary(Schema)}, Schemas ).

%% @doc Create a schema
-spec create_schema(atom(), epgsql:connection(), string()) -> ok | {error, term()}.
create_schema(_Site, Connection, Schema) ->
    assert_schema_name(Schema),
    case epgsql:equery(
        Connection,
        "CREATE SCHEMA \"" ++ Schema ++ "\""
    ) of
        {ok, _, _} ->
            ok;
        {error, #error{ codename = duplicate_schema, message = Msg }} ->
            ?LOG_NOTICE(#{
                text => <<"Schema already exists">>,
                result => warning,
                reason => duplicate_schema,
                message => Msg
            }),
            ok;
        {error, Reason} = Error ->
            ?LOG_ERROR(#{
                text => <<"z_db error when creating schema">>,
                result => error,
                reason => Reason,
                schema => Schema
            }),
            Error
    end.


-spec constraint_exists( table_name(), binary() | string() | atom(), z:context() ) -> boolean().
constraint_exists(Table, Constraint, Context) ->
    Options = z_db_pool:get_database_options(Context),
    Db = proplists:get_value(dbdatabase, Options),
    Schema = proplists:get_value(dbschema, Options),
    HasConstraint = q1("
            select count(*)
            from information_schema.table_constraints
            where constraint_catalog = $1
              and constraint_schema = $2
              and table_name = $3
              and constraint_name = $4",
            [Db, Schema, Table, z_convert:to_binary(Constraint)],
            Context),
    HasConstraint >= 1.


-spec key_exists( table_name(), binary() | string() | atom(), z:context() ) -> boolean().
key_exists(Table, Key, Context) ->
    Options = z_db_pool:get_database_options(Context),
    Schema = proplists:get_value(dbschema, Options),
    HasKey = z_db:q1("
        select count(*)
        from pg_indexes
        where schemaname = $1
          and tablename = $2
          and indexname = $3",
        [ Schema, Table, Key ],
        Context),
    HasKey >= 1.


%% @doc Check the information schema if a certain table exists in the context database.
-spec table_exists(table_name(), z:context()) -> boolean().
table_exists(Table, Context) ->
    {Schema, Tab, _QTab} = quoted_table_name(Table),
    table_exists(Schema, Tab, Context).

-spec table_exists( schema_name(), table_name(), z:context() ) -> boolean().
table_exists(Schema, Table, Context) ->
    Options = z_db_pool:get_database_options(Context),
    Db = proplists:get_value(dbdatabase, Options),
    Schema1 = case Schema of
        default -> proplists:get_value(dbschema, Options);
        S -> S
    end,
    case q1("   select count(*)
                from information_schema.tables
                where table_catalog = $1
                  and table_name = $2
                  and table_schema = $3
                  and table_type = 'BASE TABLE'", [Db, Table, Schema1], Context) of
        1 -> true;
        0 -> false
    end.


%% @doc Make sure that a table is dropped, only when the table exists
-spec drop_table(table_name(), z:context()) -> ok.
drop_table(Table, Context) ->
    {_Schema, _Tab, QTab} = quoted_table_name(Table),
    case table_exists(Table, Context) of
        true -> q("drop table " ++ QTab, Context), ok;
        false -> ok
    end.


%% @doc Ensure that a table with the given columns exists, alter any existing table
%% to add, modify or drop columns.  The 'id' (with type serial) column _must_ be defined
%% when creating the table.
-spec create_table(table_name(), list(), z:context()) -> ok.
create_table(Table, Cols, Context) ->
    {_Schema, _Tab, QTab} = quoted_table_name(Table),
    ColsSQL = ensure_table_create_cols(Cols, []),
    z_db:q("CREATE TABLE "++QTab++" ("++string:join(ColsSQL, ",")
        ++ table_create_primary_key(Cols) ++ ")", Context),
    ok.


table_create_primary_key([]) -> [];
table_create_primary_key([#column_def{name=id, type="serial"}|_]) -> ", primary key(id)";
table_create_primary_key([#column_def{name=N, primary_key=true}|_]) -> ", primary key(" ++ z_convert:to_list(N) ++ ")";
table_create_primary_key([_|Cols]) -> table_create_primary_key(Cols).

ensure_table_create_cols([], Acc) ->
    lists:reverse(Acc);
ensure_table_create_cols([C|Cols], Acc) ->
    M = lists:flatten([$", atom_to_list(C#column_def.name), $", 32, column_spec(C)]),
    ensure_table_create_cols(Cols, [M|Acc]).

column_spec(#column_def{type=Type, length=Length, is_nullable=Nullable, is_array = IsArray, default=Default, unique=Unique}) ->
    L = case Length of
            undefined -> [];
            _ -> [$(, integer_to_list(Length), $)]
        end,
    A = column_spec_array(IsArray),
    N = column_spec_nullable(Nullable),
    D = column_spec_default(Default),
    U = column_spec_unique(Unique),
    lists:flatten([Type, L, A, N, D, U]).

column_spec_array(true) -> "[]";
column_spec_array(false) -> "".

column_spec_nullable(true) -> "";
column_spec_nullable(false) -> " not null".

column_spec_default(undefined) -> "";
column_spec_default(Default) -> [" DEFAULT ", Default].

column_spec_unique(false) -> "";
column_spec_unique(true) -> " UNIQUE".

%% @doc Check if a name is a valid SQL table name. Crashes when invalid
-spec assert_table_name( table_name() ) -> true.
assert_table_name(A) when is_atom(A) ->
    assert_table_name1(atom_to_list(A));
assert_table_name([ C | _ ] = Table) when C =/= $. ->
    assert_table_name1(Table);
assert_table_name(<< C,  _/binary>> = Table) when C =/= $. ->
    assert_table_name1b(Table).

assert_table_name1([]) -> true;
assert_table_name1([$_|T]) -> assert_table_name1(T);
assert_table_name1([$.|T]) -> assert_table_name1(T);
assert_table_name1([H|T]) when (H >= $a andalso H =< $z) ->
    assert_table_name1(T);
assert_table_name1([H|T]) when (H >= $0 andalso H =< $9) ->
    assert_table_name1(T).

assert_table_name1b(<<>>) -> true;
assert_table_name1b(<<$_, T/binary>>) -> assert_table_name1b(T);
assert_table_name1b(<<$., T/binary>>) -> assert_table_name1b(T);
assert_table_name1b(<<H, T/binary>>) when (H >= $a andalso H =< $z) ->
    assert_table_name1b(T);
assert_table_name1b(<<H, T/binary>>) when (H >= $0 andalso H =< $9) ->
    assert_table_name1b(T).

%% @doc Check if a name is a valid SQL database name. Crashes when invalid
-spec assert_database_name( string() ) -> true.
assert_database_name([]) -> true;
assert_database_name([$.|T]) -> assert_database_name(T);
assert_database_name([$_|T]) -> assert_database_name(T);
assert_database_name([H|T]) when (H >= $a andalso H =< $z) ->
    assert_database_name(T);
assert_database_name([H|T]) when (H >= $0 andalso H =< $9) ->
    assert_database_name(T).

%% @doc Check if a name is a valid SQL schema name. Crashes when invalid
-spec assert_schema_name( string() ) -> true.
assert_schema_name([]) -> true;
assert_schema_name([$_|T]) -> assert_schema_name(T);
assert_schema_name([H|T]) when (H >= $a andalso H =< $z) ->
    assert_schema_name(T);
assert_schema_name([H|T]) when (H >= $0 andalso H =< $9) ->
    assert_schema_name(T).


-spec quoted_table_name( table_name() ) -> {default | string(), string(), string()}.
quoted_table_name(TableName) ->
    assert_table_name(TableName),
    case binary:split(z_convert:to_binary(TableName), <<".">>, [ global ]) of
        [ Schema, Table ] ->
            QTab = binary_to_list(
                iolist_to_binary([
                    $", Schema, $", $., $", Table, $"
                ])),
            {binary_to_list(Schema), binary_to_list(Table), QTab};
        [ Table ] ->
            QTab = binary_to_list(
                iolist_to_binary([
                    $", Table, $"
                ])),
            {default, binary_to_list(Table), QTab}
    end.

%% @doc Merge the contents of the props column into the result rows
-spec merge_props([proplists:proplist() | map()]) -> list() | undefined.
merge_props(List) ->
    merge_props(List, []).

merge_props([], Acc) ->
    lists:reverse(Acc);
merge_props([R|Rest], Acc) when is_list(R) ->
    case {proplists:get_value(props, R, undefined), proplists:get_value(props_json, R, undefined)} of
        {Props, PropsJSON} when (Props == undefined orelse Props == <<>>) andalso (PropsJSON == undefined orelse PropsJSON == <<>>)  ->
            merge_props(Rest, [R|Acc]);
        {Term, PropsJSON} when PropsJSON == undefined orelse PropsJSON == <<>> ->
            case Term of
                T when is_list(T) ->
                    merge_props(Rest, [lists:keydelete(props, 1, R)++Term|Acc]);
                T when is_map(T) ->
                    T1 = lists:map(fun({K,V}) -> {z_convert:to_atom(K), V} end, maps:to_list(Term)),
                    merge_props(Rest, [lists:keydelete(props, 1, R)++T1|Acc])
            end;
        {Term, PropsJSON} when Term == undefined orelse Term == <<>> ->
            Map = jsxrecord:decode(PropsJSON),
            T1 = lists:map(fun({K,V}) ->
                                   {z_convert:to_atom(K), V}
                           end,
                           maps:to_list(Map)),
            merge_props(Rest, [lists:keydelete(props_json, 1, R)++ T1| Acc]);
        {Term, PropsJSON} ->
            PropsTerm = case Term of
                            L when is_list(L) ->
                                L;
                            M when is_map(M) ->
                                lists:map(fun({K,V}) -> {z_convert:to_atom(K), V} end, maps:to_list(Term))
                        end,
            PropsJSONTerm = lists:map(fun({K,V}) ->
                                              {z_convert:to_atom(K), V}
                                      end, maps:to_list(jsxrecord:decode(PropsJSON))),
            PropsMerged = z_utils:props_merge(PropsJSONTerm, PropsTerm),

            merge_props(Rest, [ lists:keydelete(props_json, 1, lists:keydelete(props, 1, R))  ++ PropsMerged | Acc])
    end.


-spec assoc1(atom(), z:context(), sql(), parameters(), pos_integer()) -> {ok, [proplists:proplist()]}.
assoc1(DbDriver, C, Sql, Parameters, Timeout) ->
    case DbDriver:equery(C, Sql, Parameters, Timeout) of
        {ok, Columns, Rows} ->
            Names = [ list_to_atom(binary_to_list(Name)) || #column{name=Name} <- Columns ],
            Rows1 = [ lists:zip(Names, tuple_to_list(Row)) || Row <- Rows ],
            {ok, Rows1};
        Other -> Other
    end.


equery1(DbDriver, C, Sql) ->
    equery1(DbDriver, C, Sql, [], ?TIMEOUT).

equery1(DbDriver, C, Sql, Parameters) when is_list(Parameters); is_tuple(Parameters) ->
    equery1(DbDriver, C, Sql, Parameters, ?TIMEOUT).

equery1(DbDriver, C, Sql, Parameters, Timeout) ->
    case DbDriver:equery(C, Sql, Parameters, Timeout) of
        {ok, _Columns, []} -> {error, noresult};
        {ok, _RowCount, _Columns, []} -> {error, noresult};
        {ok, _Columns, [Row|_]} -> {ok, element(1, Row)};
        {ok, _RowCount, _Columns, [Row|_]} -> {ok, element(1, Row)};
        Other -> Other
    end.


%%
%% Tests
%%

-ifdef(TEST).

prepare_cols_test() ->
    ?assertEqual({ok, #{}}, prepare_cols([], #{})),

    % Props go to the right place
    ?assertEqual({ok, #{<<"a">> => <<"a value">>}},
                 prepare_cols([<<"a">>, <<"b">>], #{<<"a">> => <<"a value">>})),
    ?assertEqual({ok, #{<<"a">> => <<"a value">>, <<"b">> => <<"b value">>}},
                 prepare_cols([<<"a">>, <<"b">>], #{<<"a">> => <<"a value">>,
                                                    <<"b">> => <<"b value">>})),

    % Column is not known
    ?assertEqual({error, {unknown_column,[<<"c">>]}},
                 prepare_cols([<<"a">>, <<"b">>], #{<<"a">> => <<"a value">>,
                                                    <<"c">> => <<"c value">>})),

    % When there is a props column, unknown properties go to that column.
    ?assertEqual({ok,#{<<"a">> => <<"a value">>,
                       <<"props">> => #{<<"c">> => <<"c value">>}}},
                 prepare_cols([<<"a">>, <<"b">>, <<"props">>], #{<<"a">> => <<"a value">>,
                                                                 <<"c">> => <<"c value">>})),

    % An existing props map will be merged with any new values.
    ?assertEqual({ok,#{<<"a">> => <<"a value">>,
                       <<"props">> => #{<<"c">> => <<"c value">>,
                                        <<"d">> => <<"d value">>}}},
                 prepare_cols([<<"a">>, <<"b">>, <<"props">>],
                              #{<<"a">> => <<"a value">>,
                                <<"c">> => <<"c value">>,
                                <<"props">> => #{<<"d">> => <<"d value">>}})),

    % When there is a props_json column, unknown properties go to that column.
    ?assertEqual({ok,#{<<"a">> => <<"a value">>,
                       <<"props_json">> => #{<<"c">> => <<"c value">>}}},
                 prepare_cols([<<"a">>, <<"b">>, <<"props">>, <<"props_json">>],
                              #{<<"a">> => <<"a value">>,
                                <<"c">> => <<"c value">>})),

    % When there is a props_json column, that gets priority
    ?assertEqual({ok,#{<<"a">> => <<"a value">>,
                       <<"props_json">> => #{<<"c">> => <<"c value">>}}},
                 prepare_cols([<<"a">>, <<"b">>, <<"props">>, <<"props_json">>],
                              #{<<"a">> => <<"a value">>, <<"c">> => <<"c value">>})),
    ?assertEqual({ok,#{<<"a">> => <<"a value">>,
                       <<"props_json">> => #{<<"c">> => <<"c value">>}}},
                 prepare_cols([<<"a">>, <<"b">>, <<"props_json">>],
                              #{<<"a">> => <<"a value">>, <<"c">> => <<"c value">>})),

    % existing props and props_json fields are merged
    ?assertEqual({ok,#{<<"a">> => <<"a value">>,
                       <<"props_json">> => #{<<"c">> => <<"c value">>,
                                             <<"e">> => <<"e value">>}}},
                 prepare_cols([<<"a">>, <<"b">>, <<"props">>, <<"props_json">>],
                              #{<<"a">> => <<"a value">>,
                                <<"c">> => <<"c value">>,
                                <<"props_json">> => #{<<"e">> => <<"e value">>}
                               })),


    ok.

merge_props_test() ->
    M = merge_props([[{id,1},
                      {is_visible,true},
                      {rsc_id,330},
                      {user_id,undefined},
                      {email,<<"test@example.com">>},
                      {name,<<"foo">>},
                      {keep_informed,false},
                      {props, undefined},
                      {created,{{2020,6,25},{10,54,37}}},
                      {props_json,undefined}],
                     [{id,2},
                      {is_visible,true},
                      {rsc_id,330},
                      {user_id,undefined},
                      {email,<<"test@example.com">>},
                      {name,<<"foo">>},
                      {keep_informed,false},
                      {props,#{<<"message">> => <<"test test">>}},
                      {created,{{2020,6,25},{10,54,37}}},
                      {props_json,undefined}],
                     [{id,3},
                      {is_visible,true},
                      {rsc_id,330},
                      {user_id,undefined},
                      {email,<<"test@example.com">>},
                      {name,<<"foo">>},
                      {keep_informed,false},
                      {props,[{message, <<"test test">>}]},
                      {created,{{2020,6,25},{10,54,37}}},
                      {props_json,undefined}],
                     [{id,4},
                      {is_visible,true},
                      {rsc_id,330},
                      {user_id,undefined},
                      {email,<<"test@example.com">>},
                      {name,<<"foo">>},
                      {keep_informed,false},
                      {props,undefined},
                      {created,{{2020,6,25},{10,54,37}}},
                      {props_json,<<"{\"message\": \"test test\"}">>}],
                     [{id,5},
                      {is_visible,true},
                      {rsc_id,330},
                      {user_id,undefined},
                      {email,<<"test@example.com">>},
                      {name,<<"foo">>},
                      {keep_informed,false},
                      {props,#{<<"message">> => <<"test test">>}},
                      {created,{{2020,6,25},{11,54,55}}},
                      {props_json,<<"{\"message\": \"123\"}">>}],
                     [{id,6},
                      {is_visible,true},
                      {rsc_id,330},
                      {user_id,undefined},
                      {email,<<"test@example.com">>},
                      {name,<<"foo">>},
                      {keep_informed,false},
                      {props, [{message,  <<"test test">>}, {extra, <<"hello">>} ]},
                      {created,{{2020,6,25},{11,54,55}}},
                      {props_json,<<"{\"message\": \"123\"}">>}] ]),
    
    ?assertEqual([[{id,1},
                   {is_visible,true},
                   {rsc_id,330},
                   {user_id,undefined},
                   {email,<<"test@example.com">>},
                   {name,<<"foo">>},
                   {keep_informed,false},
                   {props,undefined},
                   {created,{{2020,6,25},{10,54,37}}},
                   {props_json,undefined}],
                  [{id,2},
                   {is_visible,true},
                   {rsc_id,330},
                   {user_id,undefined},
                   {email,<<"test@example.com">>},
                   {name,<<"foo">>},
                   {keep_informed,false},
                   {created,{{2020,6,25},{10,54,37}}},
                   {props_json,undefined},
                   {message,<<"test test">>}],
                  [{id,3},
                   {is_visible,true},
                   {rsc_id,330},
                   {user_id,undefined},
                   {email,<<"test@example.com">>},
                   {name,<<"foo">>},
                   {keep_informed,false},
                   {created,{{2020,6,25},{10,54,37}}},
                   {props_json,undefined},
                   {message,<<"test test">>}],
                  [{id,4},
                   {is_visible,true},
                   {rsc_id,330},
                   {user_id,undefined},
                   {email,<<"test@example.com">>},
                   {name,<<"foo">>},
                   {keep_informed,false},
                   {props,undefined},
                   {created,{{2020,6,25},{10,54,37}}},
                   {message,<<"test test">>}],
                  [{id,5},
                   {is_visible,true},
                   {rsc_id,330},
                   {user_id,undefined},
                   {email,<<"test@example.com">>},
                   {name,<<"foo">>},
                   {keep_informed,false},
                   {created,{{2020,6,25},{11,54,55}}},
                   {message,<<"123">>}],
                  [{id,5},
                   {is_visible,true},
                   {rsc_id,330},
                   {user_id,undefined},
                   {email,<<"test@example.com">>},
                   {name,<<"foo">>},
                   {keep_informed,false},
                   {created,{{2020,6,25},{11,54,55}}},
                   {extra, <<"hello">>},
                   {message,<<"123">>}]], M),
    ok.


-endif.