%% @author Marc Worrell <marc@worrell.nl>
%% @copyright 2009-2025 Marc Worrell
%% @doc Interface to database, uses database definition from Context.
%% @end
%% Copyright 2009-2025 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(z_db).
-author("Marc Worrell <marc@worrell.nl>").
-author("Arjan Scherpenisse <arjan@scherpenisse.net>").
-define(TIMEOUT, 30000).
-define(IS_PROPS_COL(Col), (Col =:= <<"props">> orelse Col =:= <<"props_json">>)).
-define(IS_EMPTY(V), (V =:= undefined orelse V =:= <<>>)).
%% interface functions
-export([
has_connection/1,
database_version_string/1,
database_version/1,
dbschema/1,
dbdatabase/1,
dbusername/1,
transaction/2,
transaction/3,
transaction_clear/1,
assoc_row/2,
assoc_row/3,
assoc_props_row/2,
assoc_props_row/3,
assoc/2,
assoc/3,
assoc/4,
assoc_props/2,
assoc_props/3,
assoc_props/4,
qmap_row/2,
qmap_row/3,
qmap_row/4,
qmap_props_row/2,
qmap_props_row/3,
qmap_props_row/4,
qmap/2,
qmap/3,
qmap/4,
qmap_props/2,
qmap_props/3,
qmap_props/4,
q/2,
q/3,
q/4,
q1/2,
q1/3,
q1/4,
q_row/2,
q_row/3,
squery/2,
squery/3,
equery/2,
equery/3,
equery/4,
execute_batch/3,
execute_batch/4,
insert/2,
insert/3,
update/4,
delete/3,
select/3,
column/3,
columns/2,
columns/3,
column_names/2,
column_names_bin/2,
column_exists/3,
to_column_value/4,
estimate_rows/3,
get_current_props/3,
update_sequence/3,
prepare_database/1,
constraint_exists/3,
function_exists/2,
foreign_keys/2,
foreign_key/3,
key_exists/3,
table_exists/2,
table_keys/2,
create_table/3,
alter_table/3,
drop_table/2,
flush/1,
assert_table_name/1,
quoted_table_name/1,
prepare_cols/2,
merge_props/1,
ensure_database/2,
ensure_schema/2,
schema_exists_conn/2,
drop_schema/1
]).
-type database_server() :: postgresql.
-type sql() :: string() | iodata().
-type query_error() :: nodb | enoent | epgsql:query_error() | term().
-type query_timeout() :: integer().
-type transaction_fun() :: fun((z:context()) -> term()).
-type table_name() :: atom() | nonempty_string() | nonempty_binary().
-type column_name() :: atom() | string() | binary().
-type schema_name() :: default | atom() | string() | binary().
-type parameters() :: list( parameter() ).
-type parameter() :: tuple()
| calendar:datetime()
| atom()
| string()
| binary()
| integer()
| boolean()
| float()
| list().
-type qmap_options() :: list( qmap_option() ).
-type qmap_option() :: {keys, binary|atom}
| {timeout, non_neg_integer()}.
-type props() :: proplists:proplist() | props_map().
-type props_map() :: #{ prop_key() => term() }.
-type prop_key() :: binary() | atom().
-type id() :: pos_integer().
-type id_key() :: binary().
-type query_result() :: {ok, Columns :: list(), Rows :: list()}
| {ok, Count :: non_neg_integer(), Columns :: list(), Rows :: list()}
| {ok, Count :: non_neg_integer()}
| {error, term()}.
-export_type([
sql/0,
query_error/0,
query_timeout/0,
query_result/0,
transaction_fun/0,
parameters/0,
parameter/0,
table_name/0,
column_name/0,
schema_name/0,
props/0,
props_map/0,
prop_key/0,
id/0,
qmap_options/0,
qmap_option/0
]).
-include_lib("zotonic.hrl").
-include_lib("epgsql/include/epgsql.hrl").
%% @doc Perform a function inside a transaction, do a rollback on exceptions
-spec transaction(transaction_fun(), z:context()) -> any() | {error, term()}.
transaction(Function, Context) ->
transaction(Function, [], Context).
% @doc Perform a transaction with extra options. Default retry on deadlock
-spec transaction(transaction_fun(), list(), z:context()) -> any() | {error, term()}.
transaction(Function, Options, Context) ->
z_context:logger_md(Context),
Result = case transaction1(Function, Context) of
{rollback, {error, #error{ codename = deadlock_detected }}} ->
{rollback, {deadlock, []}};
{rollback, {{error, #error{ codename = deadlock_detected }}, Trace1}} ->
{rollback, {deadlock, Trace1}};
{rollback, {{case_clause, {error, #error{ codename = deadlock_detected }}}, Trace1}} ->
{rollback, {deadlock, Trace1}};
{rollback, {{badmatch, {error, #error{ codename = deadlock_detected }}}, Trace1}} ->
{rollback, {deadlock, Trace1}};
{error, #error{ codename = deadlock_detected }} ->
{rollback, {deadlock, []}};
Other ->
Other
end,
case Result of
{rollback, {deadlock, Stack}} = DeadlockError ->
case proplists:get_value(noretry_on_deadlock, Options) of
true ->
?LOG_ERROR(#{
text => <<"DEADLOCK on database transaction, NO RETRY">>,
in => zotonic_core,
stack => Stack
}),
DeadlockError;
_False ->
?LOG_WARNING(#{
text => <<"DEADLOCK on database transaction, will retry">>,
in => zotonic_core,
stack => Stack
}),
% Sleep random time, then retry transaction
timer:sleep(z_ids:number(100)),
transaction(Function, Options, Context)
end;
R ->
R
end.
% @doc Perform the transaction, return error when the transaction function crashed
transaction1(Function, #context{dbc=undefined} = Context) ->
case has_connection(Context) of
true ->
case with_connection(
fun(C) ->
Context1 = Context#context{dbc=C},
DbDriver = z_context:db_driver(Context),
try
case DbDriver:squery(C, "BEGIN", ?TIMEOUT) of
{ok, [], []} -> ok;
{error, _} = ErrorBegin -> throw(ErrorBegin)
end,
case Function(Context1) of
{rollback, R} ->
DbDriver:squery(C, "ROLLBACK", ?TIMEOUT),
R;
R ->
case DbDriver:squery(C, "COMMIT", ?TIMEOUT) of
{ok, [], []} ->
R;
{error, connection_down} ->
{rollback, {error, connection_down}};
{error, _} = ErrorCommit ->
z_notifier:notify_queue_flush(Context),
throw(ErrorCommit)
end
end
catch
E:Why:S ->
?LOG_ERROR(#{
text => <<"Error on database transaction">>,
in => zotonic_core,
result => E,
reason => Why,
stack => S
}),
case DbDriver:is_connection_alive(C) of
true ->
DbDriver:squery(C, "ROLLBACK", ?TIMEOUT);
false ->
ok
end,
{rollback, {Why, S}}
end
end,
Context)
of
{rollback, _} = Result ->
z_notifier:notify_queue_flush(Context),
Result;
Result ->
z_notifier:notify_queue(Context),
Result
end;
false ->
{rollback, {no_database_connection, []}}
end;
transaction1(Function, Context) ->
% Nested transaction, only keep the outermost transaction
Function(Context).
%% @doc Clear any transaction in the context, useful when starting a thread with this context.
transaction_clear(#context{dbc=undefined} = Context) ->
Context;
transaction_clear(Context) ->
Context#context{dbc=undefined}.
%% @doc Check if we have database connection up and runnng
-spec has_connection(z:context() | atom()) -> boolean().
has_connection(Site) when is_atom(Site) ->
is_pid(erlang:whereis(z_db_pool:db_pool_name(Site)));
has_connection(Context) ->
is_pid(erlang:whereis(z_context:db_pool(Context))).
%% @doc Return the version of the database. This is the long string describing the
%% database version. Returns the empty string if there is no database.
-spec database_version_string( z:context() ) -> binary().
database_version_string(Context) ->
case has_connection(Context) of
true ->
z_db:q1("select version()", Context);
false ->
<<>>
end.
%% @doc Return the version of the database. Returns {postgres, Major, Minor} for
%% the database being used.
-spec database_version( z:context() ) ->
{ok, {database_server(), non_neg_integer(), non_neg_integer()}}
| {error, no_database_connection}.
database_version(Context) ->
case has_connection(Context) of
true ->
Ver = z_db:q1("show server_version", Context),
{Major, Minor} = case binary:split(Ver, <<".">>, [ global ]) of
[ A ] -> {binary_to_integer(A), 0};
[ A, B | _ ] -> {binary_to_integer(A), binary_to_integer(B)}
end,
{ok, {postgresql, Major, Minor}};
false ->
{error, no_database_connection}
end.
%% @doc Return the schema name used for the current site.
-spec dbschema(Context) -> Schema when
Context :: z:context(),
Schema :: string() | undefined.
dbschema(Context) ->
Options = z_db_pool:get_database_options(Context),
proplists:get_value(dbschema, Options).
%% @doc Return the database name used for the current site.
-spec dbdatabase(Context) -> Database when
Context :: z:context(),
Database :: string() | undefined.
dbdatabase(Context) ->
Options = z_db_pool:get_database_options(Context),
proplists:get_value(dbdatabase, Options).
%% @doc Return the user name used for the current site.
-spec dbusername(Context) -> Username when
Context :: z:context(),
Username :: string() | undefined.
dbusername(Context) ->
Options = z_db_pool:get_database_options(Context),
proplists:get_value(dbusername, Options).
%% @doc Transaction handler safe function for fetching a db connection
-spec get_connection( z:context() ) -> {ok, pid()} | {error, nodatabase | none | full}.
get_connection(#context{dbc=undefined} = Context) ->
case has_connection(Context) of
true ->
set_dbtrace_flag(Context),
DbDriver = z_context:db_driver(Context),
DbDriver:pool_get_connection(Context);
false ->
{error, nodatabase}
end;
get_connection(Context) ->
{ok, Context#context.dbc}.
set_dbtrace_flag(Context) ->
case erlang:get(is_dbtrace) of
true -> ok;
false -> ok;
_ ->
IsTrace = case z_notifier:first({server_storage, secure_lookup, is_dbtrace, undefined}, Context) of
{ok, true} -> true;
_ -> false
end,
erlang:put(is_dbtrace, IsTrace),
ok
end.
%% @doc Transaction handler safe function for releasing a db connection
return_connection(C, Context=#context{dbc=undefined}) ->
DbDriver = z_context:db_driver(Context),
DbDriver:pool_return_connection(C, Context);
return_connection(_C, _Context) ->
ok.
%% @doc Apply function F with a connection as parameter. Make sure the
%% connection is returned after usage.
-spec with_connection(fun(), z:context()) -> any().
with_connection(F, Context) ->
with_connection(F, get_connection(Context), Context).
with_connection(F, {error, nodatabase}, _Context) ->
F(none);
with_connection(_F, {error, _} = Error, _Context) ->
Error;
with_connection(F, {ok, Connection}, Context) when is_pid(Connection) ->
z_stats:record_event(db, requests, Context),
try
z_context:ensure_logger_md(Context),
{Time, Result} = timer:tc(F, [Connection]),
z_stats:record_duration(db, request, Time, Context),
Result
after
return_connection(Connection, Context)
end.
%% ----------------------------------------------------------------
%% Query - return proplists
%% ----------------------------------------------------------------
-spec assoc_row(sql(), z:context()) -> proplists:proplist() | undefined.
assoc_row(Sql, Context) ->
assoc_row(Sql, [], Context).
-spec assoc_row(sql(), parameters(), z:context()) -> proplists:proplist() | undefined.
assoc_row(Sql, Parameters, Context) ->
case assoc(Sql, Parameters, Context) of
[Row|_] -> Row;
[] -> undefined
end.
-spec assoc_props_row(sql(), z:context()) -> proplists:proplist() | undefined.
assoc_props_row(Sql, Context) ->
assoc_props_row(Sql, [], Context).
-spec assoc_props_row(sql(), list(), z:context()) -> proplists:proplist() | undefined.
assoc_props_row(Sql, Parameters, Context) ->
case assoc_props(Sql, Parameters, Context) of
[Row|_] -> Row;
[] -> undefined
end.
%% @doc Return property lists of the results of a query on the database in the Context
-spec assoc(sql(), z:context()) -> list( proplists:proplist() ).
assoc(Sql, Context) ->
assoc(Sql, [], Context).
-spec assoc(sql(), list(), z:context()) -> list( proplists:proplist() ).
assoc(Sql, Parameters, #context{} = Context) ->
assoc(Sql, Parameters, Context, ?TIMEOUT);
assoc(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
assoc(Sql, [], Context, Timeout).
-spec assoc(sql(), list(), z:context(), integer()) -> list( proplists:proplist() ).
assoc(Sql, Parameters, Context, Timeout) ->
DbDriver = z_context:db_driver(Context),
F = fun
(none) -> [];
(C) ->
case assoc1(DbDriver, C, Sql, Parameters, Timeout) of
{ok, Result} when is_list(Result) ->
Result
end
end,
with_connection(F, Context).
-spec assoc_props(sql(), z:context()) -> list( proplists:proplist() ).
assoc_props(Sql, Context) ->
assoc_props(Sql, [], Context).
-spec assoc_props(sql(), list(), z:context()) -> list( proplists:proplist() ).
assoc_props(Sql, Parameters, #context{} = Context) ->
assoc_props(Sql, Parameters, Context, ?TIMEOUT);
assoc_props(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
assoc_props(Sql, [], Context, Timeout).
-spec assoc_props(sql(), list(), z:context(), integer()) -> list( proplists:proplist() ).
assoc_props(Sql, Parameters, Context, Timeout) ->
DbDriver = z_context:db_driver(Context),
F = fun
(none) -> [];
(C) ->
case assoc1(DbDriver, C, Sql, Parameters, Timeout) of
{ok, Result} when is_list(Result) ->
merge_props(Result)
end
end,
with_connection(F, Context).
%% ----------------------------------------------------------------
%% Query - return maps and error codes
%% ----------------------------------------------------------------
-spec qmap_row( sql(), z:context() ) -> {ok, map()} | {error, query_error()}.
qmap_row(Sql, Context) ->
qmap_row(Sql, [], [], Context).
-spec qmap_row( sql(), parameters(), z:context() ) -> {ok, map()} | {error, query_error()}.
qmap_row(Sql, Args, Context) ->
qmap_row(Sql, Args, [], Context).
-spec qmap_row( sql(), parameters(), qmap_options(), z:context()) -> {ok, map()} | {error, query_error()}.
qmap_row(Sql, Args, Options, Context) ->
case qmap(Sql, Args, Options, Context) of
{ok, [ M | _ ]} when is_map(M) ->
{ok, M};
{ok, []} ->
{error, enoent};
Other ->
Other
end.
-spec qmap_props_row( sql(), z:context() ) -> {ok, map()} | {error, query_error()}.
qmap_props_row(Sql, Context) ->
qmap_props_row(Sql, [], [], Context).
-spec qmap_props_row( sql(), parameters(), z:context() ) -> {ok, map()} | {error, query_error()}.
qmap_props_row(Sql, Args, Context) ->
qmap_props_row(Sql, Args, [], Context).
-spec qmap_props_row( sql(), parameters(), qmap_options(), z:context()) -> {ok, map()} | {error, query_error()}.
qmap_props_row(Sql, Args, Options, Context) ->
case qmap_props(Sql, Args, Options, Context) of
{ok, [ M | _ ]} when is_map(M) ->
{ok, M};
{ok, []} ->
{error, enoent};
Other ->
Other
end.
-spec qmap( sql(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap(Sql, Context) ->
qmap(Sql, [], [], Context).
-spec qmap( sql(), parameters(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap(Sql, Args, Context) ->
qmap(Sql, Args, [], Context).
-spec qmap( sql(), parameters(), qmap_options(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap(Sql, Args, Options, Context) ->
DbDriver = z_context:db_driver(Context),
F = fun
(none) ->
{error, nodb};
(C) ->
Timeout = proplists:get_value(timeout, Options, ?TIMEOUT),
Keys = proplists:get_value(keys, Options, binary),
case DbDriver:equery(C, Sql, Args, Timeout) of
{ok, _Affected, Cols, Rows} when is_list(Rows) ->
{ok, cols_map(Cols, Rows, false, Keys)};
{ok, Cols, Rows} when is_list(Rows) ->
{ok, cols_map(Cols, Rows, false, Keys)};
{ok, Value} when is_list(Value); is_integer(Value) ->
{ok, Value};
{error, Reason} = Error ->
?LOG_ERROR(#{
text => <<"z_db error in query">>,
in => zotonic_core,
result => error,
reason => Reason,
query => Sql,
args => Args
}),
Error
end
end,
with_connection(F, Context).
-spec qmap_props( sql(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap_props(Sql, Context) ->
qmap_props(Sql, [], [], Context).
-spec qmap_props( sql(), parameters(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap_props(Sql, Args, Context) ->
qmap_props(Sql, Args, [], Context).
-spec qmap_props( sql(), parameters(), list(), z:context() ) -> {ok, [ map() ]} | {error, query_error()}.
qmap_props(Sql, Args, Options, Context) ->
DbDriver = z_context:db_driver(Context),
F = fun
(none) ->
{error, nodb};
(C) ->
Timeout = proplists:get_value(timeout, Options, ?TIMEOUT),
Keys = proplists:get_value(keys, Options, binary),
case DbDriver:equery(C, Sql, Args, Timeout) of
{ok, _Affected, Cols, Rows} when is_list(Rows) ->
{ok, cols_map(Cols, Rows, true, Keys)};
{ok, Cols, Rows} when is_list(Rows) ->
{ok, cols_map(Cols, Rows, true, Keys)};
{ok, Value} when is_list(Value); is_integer(Value) ->
{ok, Value};
{error, Reason} = Error ->
?LOG_ERROR(#{
text => <<"z_db error in query">>,
in => zotonic_core,
result => error,
reason => Reason,
query => Sql,
args => Args
}),
Error
end
end,
with_connection(F, Context).
%% @doc Make associative maps from all the rows in the result set.
cols_map(_Cols, [], _IsMergeProps, _Keys) -> [];
cols_map(Cols, Rows, IsMergeProps, Keys) ->
ColProps = build_col_props(Cols, Keys, IsMergeProps),
[ map_row(ColProps, Row) || Row <- Rows ].
map_row(ColProps, Row) ->
lists:foldl(
fun({Nr, Col, NeedsMap}, Acc) ->
Val = erlang:element(Nr, Row),
map_cell(Col, NeedsMap, Val, Acc)
end,
#{},
ColProps).
map_cell(_Col, true, Cell, Acc) ->
map_merge_props(Cell, Acc);
map_cell(Col, false, Cell, Acc) ->
maps:put(Col, Cell, Acc).
build_col_props(Cols, Keys, IsMergeProps) ->
build_col_props(Cols, Keys, IsMergeProps, 1, []).
build_col_props([], _Keys, _IsMergeProps, _N, Acc) ->
lists:reverse(Acc);
build_col_props([#column{ name = Name } | Rest], Keys, IsMergeProps, N, Acc) ->
Name1 = case Keys of
atom -> binary_to_atom(Name, utf8);
binary -> Name
end,
NeedsMerge = IsMergeProps andalso ?IS_PROPS_COL(Name),
build_col_props(Rest, Keys, IsMergeProps, N + 1, [{N, Name1, NeedsMerge} | Acc]).
map_merge_props(M, Acc) when is_map(M) ->
maps:merge(M, Acc);
map_merge_props(Props, Acc) when is_list(Props) ->
maps:merge(z_props:from_props(Props), Acc);
map_merge_props(_, Acc) ->
Acc.
%% ----------------------------------------------------------------
%% Simple queries - return tuples or number of rows updated.
%% ----------------------------------------------------------------
%% @doc Do an SQL query, return its results. Throws if the query errors.
%% There is a query timeout of 30 seconds.
-spec q(SQL, Context) -> Result when
SQL :: sql(),
Context :: z:context(),
Result :: list() | non_neg_integer().
q(Sql, Context) ->
q(Sql, [], Context, ?TIMEOUT).
%% @doc Do an SQL query, return its results. Throws if the query errors.
%% The parameters are used for argument $1 etc. in the query. Query timeout
%% of 30 seconds.
-spec q(SQL, Parameters, Context) -> Result when
SQL :: sql(),
Parameters :: parameters(),
Context :: z:context(),
Result :: term()
; (SQL, Context, Timeout) -> Result when
SQL :: sql(),
Context :: z:context(),
Timeout :: pos_integer(),
Result :: list() | non_neg_integer().
q(Sql, Parameters, #context{} = Context) ->
q(Sql, Parameters, Context, ?TIMEOUT);
q(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
q(Sql, [], Context, Timeout).
%% @doc Do an SQL query, return its results. Throws if the query errors.
%% The parameters are used for argument $1 etc. in the query. Supply a
%% timeout in milliseconds after which the query is canceled.
-spec q(SQL, Parameters, Context, Timeout) -> Result when
SQL :: sql(),
Parameters :: parameters(),
Context :: z:context(),
Timeout :: pos_integer(),
Result :: list() | non_neg_integer().
q(Sql, Parameters, Context, Timeout) ->
F = fun
(none) -> [];
(C) ->
DbDriver = z_context:db_driver(Context),
case DbDriver:equery(C, Sql, Parameters, Timeout) of
{ok, _Affected, _Cols, Rows} when is_list(Rows) -> Rows;
{ok, _Cols, Rows} when is_list(Rows) -> Rows;
{ok, Value} when is_list(Value); is_integer(Value) -> Value;
{error, Reason} = Error ->
?LOG_ERROR(#{
text => <<"z_db error in query">>,
in => zotonic_core,
result => error,
reason => Reason,
query => Sql,
args => Parameters
}),
throw(Error)
end
end,
with_connection(F, Context).
%% @doc Do an SQL query, returns the first result of
%% the first row or undefined if there are no returned rows. Crash if the
%% query errors. There is a query timeout of 30 seconds.
-spec q1(SQL, Context) -> Result when
SQL :: sql(),
Context :: z:context(),
Result :: term() | undefined.
q1(Sql, Context) ->
q1(Sql, [], Context).
%% @doc Do an SQL query, returns the first result of
%% the first row or undefined if there are no returned rows. The parameters
%% are used for argument $1 etc. Crash if the query errors. There is a query
%% timeout of 30 seconds.
-spec q1(SQL, Parameters, Context) -> Result when
SQL :: sql(),
Parameters :: parameters(),
Context :: z:context(),
Result :: term() | undefined
; (SQL, Context, Timeout) -> Result when
SQL :: sql(),
Context :: z:context(),
Timeout :: pos_integer(),
Result :: term() | undefined.
q1(Sql, Parameters, #context{} = Context) ->
q1(Sql, Parameters, Context, ?TIMEOUT);
q1(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
q1(Sql, [], Context, Timeout).
%% @doc Do an SQL query, returns the first result of
%% the first row or undefined if there are no returned rows. Crash if
%% the query errors. The parameters are used for argument $1 etc. in the query.
%% Supply a timeout in milliseconds after which the query is canceled.
-spec q1(SQL, Parameters, Context, Timeout) -> Result when
SQL :: sql(),
Parameters :: parameters(),
Context :: z:context(),
Timeout :: pos_integer(),
Result :: term() | undefined.
q1(Sql, Parameters, Context, Timeout) ->
F = fun
(none) -> undefined;
(C) ->
DbDriver = z_context:db_driver(Context),
case equery1(DbDriver, C, Sql, Parameters, Timeout) of
{ok, Value} -> Value;
{error, noresult} -> undefined;
{error, Reason} = Error ->
?LOG_ERROR(#{
text => <<"z_db error in query">>,
in => zotonic_core,
result => error,
reason => Reason,
query => Sql,
args => Parameters
}),
throw(Error)
end
end,
with_connection(F, Context).
%% @doc Do an SQL query, return the first row or undefined if no rows are
%% returned. Crash if the query errors. There is a query timeout of 30 seconds.
-spec q_row(SQL, Context) -> Row | undefined when
SQL :: sql(),
Context :: z:context(),
Row :: tuple() | undefined.
q_row(Sql, Context) ->
q_row(Sql, [], Context).
%% @doc Do an SQL query, return the first row or undefined if no rows are
%% returned. Crash if the query errors. There is a query timeout of 30 seconds.
-spec q_row(SQL, Parameters, Context) -> Row | undefined when
SQL :: sql(),
Parameters :: parameter(),
Context :: z:context(),
Row :: tuple() | undefined.
q_row(Sql, Args, Context) ->
case q(Sql, Args, Context) of
[Row|_] -> Row;
[] -> undefined
end.
%% @doc Do an SQL query without parameters, returns the result without mapping
%% from the database driver. There is a query timeout of 30 seconds.
-spec squery(SQL, Context) -> Result when
SQL :: sql(),
Context :: z:context(),
Result :: query_result().
squery(Sql, Context) ->
squery(Sql, Context, ?TIMEOUT).
%% @doc Do an SQL query without parameters, returns the result without mapping
%% from the database driver. There is a query timeout of 30 seconds.
-spec squery(SQL, Context, Timeout) -> Result when
SQL :: sql(),
Context :: z:context(),
Timeout :: pos_integer(),
Result :: query_result().
squery(Sql, Context, Timeout) when is_integer(Timeout) ->
F = fun(C) when C =:= none -> {error, noresult};
(C) ->
DbDriver = z_context:db_driver(Context),
DbDriver:squery(C, Sql, Timeout)
end,
with_connection(F, Context).
%% @doc Do an SQL query with empty parameters, returns the result without mapping
%% from the database driver. There is a query timeout of 30 seconds.
-spec equery(SQL, Context) -> Result when
SQL :: sql(),
Context :: z:context(),
Result :: query_result().
equery(Sql, Context) ->
equery(Sql, [], Context).
%% @doc Do an SQL query with parameters, returns the result without mapping
%% from the database driver. There is a query timeout of 30 seconds.
-spec equery(SQL, Parameters, Context) -> Result when
SQL :: sql(),
Parameters :: parameters(),
Context :: z:context(),
Result :: query_result().
equery(Sql, Parameters, #context{} = Context) ->
equery(Sql, Parameters, Context, ?TIMEOUT);
equery(Sql, #context{} = Context, Timeout) when is_integer(Timeout) ->
equery(Sql, [], Context, Timeout).
%% @doc Do an SQL query empty parameters, returns the result without mapping
%% from the database driver. The given timeout is in milliseconds.
-spec equery(SQL, Parameters, Context, Timeout) -> Result when
SQL :: sql(),
Parameters :: parameters(),
Context :: z:context(),
Timeout :: pos_integer(),
Result :: query_result().
equery(Sql, Parameters, Context, Timeout) ->
F = fun(C) when C =:= none -> {error, noresult};
(C) ->
DbDriver = z_context:db_driver(Context),
DbDriver:equery(C, Sql, Parameters, Timeout)
end,
with_connection(F, Context).
%% @doc Execute the same SQL statement for a list of parameters. Default timeout
%% of 30 seconds.
-spec execute_batch(SQL, ParametersList, Context) -> Result when
SQL :: sql(),
ParametersList :: list( parameters() ),
Context :: z:context(),
Result :: {ok, [ query_result() ]}
| {error, term()}.
execute_batch(Sql, Batch, Context) ->
execute_batch(Sql, Batch, Context, ?TIMEOUT).
%% @doc Execute the same SQL statement for a list of parameters.
-spec execute_batch(SQL, ParametersList, Context, Timeout) -> Result when
SQL :: sql(),
ParametersList :: list( parameters() ),
Context :: z:context(),
Timeout :: pos_integer(),
Result :: {ok, [ query_result() ]}
| {error, term()}.
execute_batch(Sql, Batch, Context, Timeout) ->
F = fun(none) ->
{error, noresult};
(C) ->
DbDriver = z_context:db_driver(Context),
DbDriver:execute_batch(C, Sql, Batch, Timeout)
end,
with_connection(F, Context).
%% @doc Insert a new row in a table, use only default values and return the new record id.
%% If the table has an 'id' column then the new id is returned. The 'id' column shoud be
%% the primary key column and have type 'serial' (or bigserial) if it is not given in the
%% insert statement. All columns must have a default value or be nullable.
-spec insert(Table, Context) -> {ok, NewId | undefined} | {error, Reason} when
Table :: table_name(),
Context :: z:context(),
NewId :: id(),
Reason :: term().
insert(Table, Context) ->
{_Schema, _Tab, QTab} = quoted_table_name(Table),
with_connection(
fun(C) ->
DbDriver = z_context:db_driver(Context),
equery1(DbDriver, C, "insert into "++QTab++" default values returning id")
end,
Context).
%% @doc Insert a new row in a table and return the new record id.
%% Unknown columns are serialized in the props or props_json column. If the table has an 'id'
%% column then the new id is returned. The 'id' column shoud be the primary key column
%% and have type 'serial' (or bigserial) if it is not given in the passed parameters.
-spec insert(Table, Parameters, Context) -> {ok, NewId | undefined} | {error, Reason} when
Table :: table_name(),
Parameters :: props(),
Context :: z:context(),
NewId :: id(),
Reason :: term().
insert(Table, Parameters, Context) when is_list(Parameters) ->
insert(Table, z_props:from_props(Parameters), Context);
insert(Table, Parameters, Context) ->
{Schema, Tab, QTab} = quoted_table_name(Table),
Cols = column_names_bin(Schema, Tab, Context),
BinParams = ensure_binary_keys(Parameters),
case prepare_cols(Cols, BinParams) of
{ok, InsertProps} ->
HasProps = maps:is_key(<<"props">>, InsertProps),
HasPropsJSON = maps:is_key(<<"props_json">>, InsertProps),
InsertProps1 = if HasPropsJSON ->
#{<<"props_json">> := PropsJSONCol} = InsertProps,
case is_map(PropsJSONCol) of
true ->
InsertProps#{
<<"props_json">> => ?DB_PROPS_JSON(filter_empty_props(PropsJSONCol))
};
false ->
InsertProps
end;
HasProps ->
#{<<"props">> := PropsCol} = InsertProps,
case PropsCol of
#{} ->
InsertProps#{
<<"props">> => ?DB_PROPS(filter_empty_props(PropsCol))
};
[ {_, _} | _ ] ->
Props1 = z_props:from_props(PropsCol),
InsertProps#{
<<"props">> => ?DB_PROPS(filter_empty_props(Props1))
};
_ ->
InsertProps
end;
not HasPropsJSON and not HasProps ->
InsertProps
end,
%% Build the SQL insert statement
{ColNames, ColParams} = lists:unzip( maps:to_list(InsertProps1) ),
Sql = iolist_to_binary([
"insert into ", QTab, " (\"",
lists:join("\", \"", ColNames),
"\") values (",
lists:join(", ", [ [$$ | integer_to_list(N)] || N <- lists:seq(1, length(ColParams)) ]),
")"
]),
FinalSql = case lists:member(<<"id">>, Cols) of
true -> <<Sql/binary, " returning id">>;
false -> Sql
end,
F = fun(C) ->
DbDriver = z_context:db_driver(Context),
case equery1(DbDriver, C, FinalSql, ColParams) of
{ok, Id} ->
{ok, Id};
{error, noresult} ->
{ok, undefined};
{error, #error{ codename = unique_violation, message = Message }} = Error ->
?LOG_NOTICE(#{
in => zotonic_core,
text => <<"z_db unique_violation in insert">>,
result => error,
reason => unique_violation,
message => Message,
table => Table,
parameters => Parameters
}),
Error;
{error, #error{ codename = ErrCode, message = Message }} = Error ->
?LOG_ERROR(#{
in => zotonic_core,
text => <<"z_db error in insert">>,
result => error,
reason => ErrCode,
message => Message,
table => Table,
query => FinalSql,
parameters => Parameters
}),
Error;
{error, Reason} = Error ->
?LOG_ERROR(#{
in => zotonic_core,
text => <<"z_db error in query">>,
result => error,
reason => Reason,
table => Table,
query => FinalSql,
parameters => ColParams
}),
Error
end
end,
with_connection(F, Context);
{error, _} = Error ->
Error
end.
%% @doc Update a row in a table, merging the properties with any new property values. The table
%% must have a column id of some integer type. If there is no matching column then 0 is returned
%% for the number of updated columns. The update is done within a transaction, first the old values
%% are read and then merged with the new values.
-spec update(Table, Id, Props, Context) -> {ok, RowsUpdated} | {error, Reason} when
Table :: table_name(),
Id :: id() | id_key(),
Props :: props(),
Context :: z:context(),
RowsUpdated :: non_neg_integer(),
Reason :: term().
update(Table, Id, Parameters, Context) when is_list(Parameters) ->
update(Table, Id, z_props:from_props(Parameters), Context);
update(Table, Id, Parameters, Context) when is_map(Parameters) ->
{Schema, Tab, QTab} = quoted_table_name(Table),
DbDriver = z_context:db_driver(Context),
Cols = column_names_bin(Schema, Tab, Context),
BinParams = ensure_binary_keys(Parameters),
case prepare_cols(Cols, BinParams) of
{ok, UpdateProps} ->
F = fun(C) ->
UpdateProps1 = update_merge_props(DbDriver, C, Table, Cols, Id, UpdateProps, Context),
UpdateProps2 = update_map_atom_arrays(UpdateProps1),
{ColNames, Params} = lists:unzip( maps:to_list(UpdateProps2) ),
ColNamesNr = lists:zip(ColNames, lists:seq(2, length(ColNames)+1)),
ColAssigns = [
["\"", ColName, "\" = $", integer_to_list(Nr)]
|| {ColName, Nr} <- ColNamesNr
],
Sql = iolist_to_binary([
"update ", QTab, " set ",
lists:join(", ", ColAssigns),
" where id = $1"
]),
SqlParams = [ Id | Params ],
case equery1(DbDriver, C, Sql, SqlParams) of
{ok, _RowsUpdated} = Ok ->
Ok;
{error, #error{ codename = unique_violation, message = Message }} = Error ->
?LOG_NOTICE(#{
in => zotonic_core,
text => <<"z_db unique_violation in update">>,
message => Message,
result => error,
reason => unique_violation,
table => Table,
parameters => SqlParams
}),
Error;
{error, #error{ codename = ErrCode, message = Message }} = Error ->
?LOG_ERROR(#{
in => zotonic_core,
text => <<"z_db error in update">>,
result => error,
reason => ErrCode,
message => Message,
table => Table,
query => Sql,
parameters => SqlParams
}),
Error;
{error, Reason} = Error ->
?LOG_ERROR(#{
in => zotonic_core,
text => <<"z_db error in query">>,
result => error,
reason => Reason,
table => Table,
query => Sql,
parameters => SqlParams
}),
Error
end
end,
with_connection(F, Context);
{error, _} = Error ->
Error
end.
%% @doc Estimate the number of rows matching a query. This uses the PostgreSQL query planner
%% to return an estimate of the number of rows.
-spec estimate_rows(Query, Args, Context) -> {ok, Rows} | {error, term()}
when Query :: string() | binary(),
Args :: list(),
Context :: z:context(),
Rows :: non_neg_integer().
estimate_rows(Query, Args, Context) ->
Query1 = "explain " ++ z_convert:to_list(Query),
try
find_estimate( z_db:q(Query1, Args, Context) )
catch
throw:{error, _} = Error -> Error
end.
find_estimate([]) ->
{ok, 0};
find_estimate([{R}|Rs]) ->
case re:run(R, <<" rows=([0-9]+)">>, [{capture, all_but_first, binary}]) of
nomatch -> find_estimate(Rs);
{match, [Rows]} -> {ok, binary_to_integer(Rows)}
end.
get_current_props(Table, Id, Context) ->
DBDriver = z_context:db_driver(Context),
F = fun(C) ->
get_current_props(DBDriver, C, Table, Id, Context)
end,
with_connection(F, Context).
get_current_props(DBDriver, Connection, Table, Id, Context) when is_atom(Table) ->
get_current_props(DBDriver, Connection, atom_to_list(Table), Id, Context);
get_current_props(DBDriver, Connection, Table, Id, Context) ->
ColNames = column_names(Table, Context),
get_current_props(DBDriver, Connection,
lists:member(props_json, ColNames), lists:member(props, ColNames), Table, Id, Context).
get_current_props(_DBDriver, _Connection, false, false, _Table, _Id, _Context) ->
%% There is no props column
{error, no_properties};
get_current_props(DBDriver, Connection, false, true, Table, Id, _Context) ->
%% There is only a props column.
case equery1(DBDriver, Connection, "select props from \"" ++ Table ++ "\" where id = $1", [Id]) of
{ok, Props} when is_list(Props) -> {ok, z_props:from_props(Props)};
{ok, Props} when is_map(Props) -> {ok, Props};
_E ->
{error, no_properties}
end;
get_current_props(DBDriver, Connection, true, false, Table, Id, _Context) ->
%% There is only a props_json column
case equery1(DBDriver, Connection, "select props_json from \"" ++ Table ++ "\" where id = $1", [Id]) of
{ok, JSON} when is_binary(JSON) ->
{ok, jsxrecord:decode(JSON)};
_ ->
{error, no_properties}
end;
get_current_props(DBDriver, Connection, true, true, Table, Id, _Context) ->
%% There is a props_json, and a props column.
R = case DBDriver:equery(Connection, "select props, props_json from \"" ++ Table ++ "\" where id = $1", [Id], ?TIMEOUT) of
{ok, _Columns, []} -> {error, noresult};
{ok, _RowCount, _Columns, []} -> {error, noresult};
{ok, _Columns, [Row|_]} -> {ok, element(1, Row), element(2, Row)};
{ok, _RowCount, _Columns, [Row|_]} -> {ok, element(1, Row), element(2, Row)};
Other -> Other
end,
%% Merge the properties found in the columns, the props_json column gets priority.
case R of
{ok, Props, undefined} when is_list(Props) ->
{ok, z_props:from_props(Props)};
{ok, Props, undefined} when is_map(Props) ->
{ok, Props};
{ok, undefined, JSON} when is_binary(JSON) ->
{ok, jsxrecord:decode(JSON)};
{ok, Props, JSON} when is_list(Props) andalso is_binary(JSON) ->
{ok, maps:merge(z_props:from_props(Props), jsxrecord:decode(JSON))};
{ok, Props, JSON} when is_map(Props) andalso is_binary(JSON) ->
{ok, maps:merge(Props, jsxrecord:decode(JSON))};
_ ->
{error, no_properties}
end.
update_map_atom_arrays(Props) ->
maps:map(
fun
(_K, [ A | _ ] = V) when is_atom(A) ->
[ atom_to_binary(X, utf8) || X <- V ];
(_K, V) ->
V
end,
Props).
update_merge_props(DbDriver, Connection, Table, Cols, Id, #{ <<"props_json">> := NewProps }=UpdateProps, Context) ->
P = case get_current_props(DbDriver, Connection, Table, Id, Context) of
{ok, OldProps} ->
NewProps1 = maps:merge(OldProps, NewProps),
NewProps2 = maps:without(Cols, NewProps1),
UpdateProps#{<<"props_json">> => ?DB_PROPS_JSON( drop_undefined(NewProps2) ) };
_ ->
UpdateProps#{<<"props_json">> => ?DB_PROPS_JSON( drop_undefined(NewProps) )}
end,
%% Clear the existing props column
case lists:member(<<"props">>, Cols) of
true -> P#{ <<"props">> => null };
false -> P
end;
update_merge_props(DbDriver, Connection, Table, Cols, Id, #{ <<"props">> := NewProps }=UpdateProps, Context) ->
case get_current_props(DbDriver, Connection, Table, Id, Context) of
{ok, OldProps} ->
NewProps1 = maps:merge(OldProps, NewProps),
NewProps2 = maps:without(Cols, NewProps1),
UpdateProps#{ <<"props">> => ?DB_PROPS( drop_undefined(NewProps2) )};
_ ->
UpdateProps#{ <<"props">> => ?DB_PROPS( drop_undefined(NewProps) )}
end;
update_merge_props(_DbDriver, _Connection, _Table, _Cols, _Id, #{}=UpdateProps, _Context) ->
UpdateProps;
update_merge_props(DbDriver, Connection, Table, Cols, Id, NewProps, Context) when is_list(NewProps) ->
Props1 = z_props:from_props(NewProps),
update_merge_props(DbDriver, Connection, Table, Cols, Id, Props1, Context).
drop_undefined(Props) ->
maps:fold(
fun
(_K, undefined, Acc) -> Acc;
(K, V, Acc) -> Acc#{ K => V }
end,
#{},
Props).
ensure_binary_keys(Ps) ->
maps:fold(
fun
(K, V, Acc) when is_atom(K) ->
Acc#{
atom_to_binary(K, utf8) => V
};
(K, V, Acc) when is_binary(K) ->
Acc#{
K => V
}
end,
#{},
Ps).
filter_empty_props(Map) ->
maps:filter(
fun
(_K, undefined) -> false;
(_K, <<>>) -> false;
(_K, #trans{ tr = Tr }) ->
lists:any(
fun
({_, <<>>}) -> false;
(_) -> true
end,
Tr);
(_K, _V) ->
true
end,
Map).
%% @doc Delete a row from a table, the row must have a column with the name 'id'
-spec delete(Table, Id, Context) -> {ok, RowsDeleted} | {error, Reason} when
Table :: table_name(),
Id :: id() | id_key(),
Context :: z:context(),
RowsDeleted :: non_neg_integer(),
Reason :: term().
delete(Table, Id, Context) ->
{_Schema, _Tab, QTab} = quoted_table_name(Table),
Sql = "delete from "++QTab++" where id = $1",
case equery(Sql, [Id], Context) of
{ok, _} = Ok ->
Ok;
{error, _} = Error ->
Error
end.
%% @doc Read a row from a table, the row must have a column with the name 'id'.
%% The props column contents is merged with the other properties returned.
-spec select(Table, Id, Context) -> {ok, Row} | {error, Reason} when
Table :: table_name(),
Id :: id() | id_key(),
Context :: z:context(),
Row :: map(),
Reason :: term().
select(Table, Id, Context) ->
select(Table, Id, [], Context).
-spec select(Table, Id, Options, Context) -> {ok, Row} | {error, Reason} when
Table :: table_name(),
Id :: id() | id_key(),
Options :: qmap_options(),
Context :: z:context(),
Row :: map(),
Reason :: term().
select(Table, Id, Options, Context) ->
{_Schema, _Tab, QTab} = quoted_table_name(Table),
Sql = "select * from "++QTab++" where id = $1 limit 1",
qmap_props_row(Sql, [ Id ], Options, Context).
%% @doc Check if all cols are valid columns in the target table, move unknown properties
%% to the props column (if exists).
prepare_cols(Cols, Props) ->
case split_props(Props, Cols) of
{ok, {CProps, PProps}} ->
case maps:size(PProps) of
0 ->
{ok, CProps};
_ ->
case lists:member(<<"props_json">>, Cols) of
true ->
PropsCol = merge_properties(maps:get(<<"props_json">>, CProps, undefined), PProps),
{ok, CProps#{ <<"props_json">> => PropsCol }};
false ->
PropsCol = merge_properties(maps:get(<<"props">>, CProps, undefined), PProps),
{ok, CProps#{ <<"props">> => PropsCol }}
end
end;
{error, _} = Error ->
Error
end.
merge_properties(Properties, Props) when is_list(Properties) ->
maps:merge(z_props:from_props(Properties), Props);
merge_properties(Properties, Props) when is_map(Properties) ->
maps:merge(Properties, Props);
merge_properties(_, Props) ->
Props.
split_props(Props, Cols) ->
{CProps, PProps} = lists:foldl(
fun(Col, {CAcc, PAcc}) ->
case maps:find(Col, PAcc) of
{ok, V} ->
PAcc1 = maps:remove(Col, PAcc),
CAcc1 = CAcc#{ Col => V },
{CAcc1, PAcc1};
error ->
{CAcc, PAcc}
end
end,
{#{}, Props},
Cols),
case maps:size(PProps) of
0 ->
{ok, {CProps, PProps}};
_ ->
case lists:member(<<"props_json">>, Cols) orelse lists:member(<<"props">>, Cols) of
true ->
{ok, {CProps, PProps}};
false ->
Ks = maps:keys(PProps),
{error, {unknown_column, Ks}}
end
end.
%% @doc Return a list of column definitions for all columns of the table.
-spec columns(table_name(), z:context()) -> list( #column_def{} ).
columns(Table, Context) ->
z_db_table:columns(Table, Context).
%% @doc Return a property list with all columns of the table. (example: [{id,int4,modifier},...])
-spec columns(schema_name(), table_name(), z:context()) -> list( #column_def{} ).
columns(Schema, Table, Context) ->
z_db_table:columns(Schema, Table, Context).
%% @doc Return the column definition of the given column.
-spec column( table_name(), column_name(), z:context()) ->
{ok, #column_def{}} | {error, enoent}.
column(Table, Column, Context) ->
z_db_table:column(Table, Column, Context).
%% @doc Return a list with the column (atom) names of a table. The names are sorted.
-spec column_names(table_name(), z:context()) -> list( atom() ).
column_names(Table, Context) ->
{Schema, Table1, _QTab} = quoted_table_name(Table),
column_names(Schema, Table1, Context).
%% @doc Return a list with all (atom) columns names of a table. The names are sorted.
-spec column_names(schema_name(), table_name(), z:context()) -> list( atom() ).
column_names(Schema, Table, Context) ->
Names = [ C#column_def.name || C <- z_db_table:columns(Schema, Table, Context)],
lists:sort(Names).
%% @doc Return a list with all (binary) columns names of a table. The names are not sorted.
-spec column_names_bin(table_name(), z:context()) -> list( binary() ).
column_names_bin(Table, Context) ->
[ atom_to_binary(Col, utf8) || Col <- column_names(Table, Context) ].
%% @doc Return a list with all (binary) columns names of a table. The names are not sorted.
-spec column_names_bin(schema_name(), table_name(), z:context()) -> list( binary() ).
column_names_bin(Schema, Table, Context) ->
[ atom_to_binary(Col, utf8) || Col <- column_names(Schema, Table, Context) ].
%% @doc Check if a column exists in a table.
-spec column_exists(table_name(), column_name(), z:context()) -> boolean().
column_exists(Table, Column, Context) when is_atom(Column) ->
lists:member(Column, column_names(Table, Context));
column_exists(Table, Column, Context) when is_atom(Column) ->
Col1 = z_convert:to_binary(Column),
lists:member(Col1, column_names_bin(Table, Context)).
%% @doc Convert a value so that it is compatible with the column type
-spec to_column_value(table_name(), column_name(), term(), z:context()) -> {ok, term()} | {error, term()}.
to_column_value(Table, Column, Value, Context) ->
case column(Table, Column, Context) of
{ok, #column_def{ type = Type, length = Length, is_array = false }} ->
try
{ok, convert_value(Type, Length, Value)}
catch
_:Reason ->
{error, Reason}
end;
{ok, #column_def{ type = Type, length = Length, is_array = true }} ->
try
{ok, convert_array_value(Type, Length, Value)}
catch
_:Reason ->
{error, Reason}
end;
{error, _} = Error ->
Error
end.
convert_value(<<"text">>, _, V) -> z_convert:to_binary(V);
convert_value(<<"character varying">>, Len, V) -> V1 = z_convert:to_binary(V), z_string:truncatechars(V1, Len);
convert_value(<<"integer">>, _, V) -> z_convert:to_integer(V);
convert_value(<<"bigint">>, _, V) -> z_convert:to_integer(V);
convert_value(<<"smallint">>, _, V) -> z_convert:to_integer(V);
convert_value(<<"serial">>, _, V) -> z_convert:to_integer(V);
convert_value(<<"bigserial">>, _, V) -> z_convert:to_integer(V);
convert_value(<<"smallserial">>, _, V) -> z_convert:to_integer(V);
convert_value(<<"numeric">>, _, V) -> z_convert:to_float(V);
convert_value(<<"decimal">>, _, V) -> z_convert:to_float(V);
convert_value(<<"float">>, _, V) -> z_convert:to_float(V);
convert_value(<<"double">>, _, V) -> z_convert:to_float(V);
convert_value(<<"boolean">>, _, V) -> z_convert:to_bool_strict(V);
convert_value(<<"timestamp", _/binary>>, _, V) -> z_datetime:to_datetime(V);
convert_value(<<"datetime", _/binary>>, _, V) -> z_datetime:to_datetime(V);
convert_value(<<"date", _/binary>>, _, V) -> z_datetime:to_datetime(V);
convert_value(<<"bytea">>, _, V) -> ?DB_PROPS(V);
convert_value(<<"tsvector">>, _, V) -> z_convert:to_binary(V);
convert_value(<<"ARRAY">>, _, V) when is_list(V) -> V;
convert_value(<<"ARRAY">>, _, V) -> [ V ];
convert_value(<<"array">>, _, V) when is_list(V) -> V;
convert_value(<<"array">>, _, V) -> [ V ];
convert_value(Type, _, V) when is_binary(Type) ->
?LOG_WARNING(#{
in => zotonic_core,
text => <<"No type conversion for column type">>,
result => error,
type => Type,
value => V
}),
V.
convert_array_value(Type, Length, V) when is_list(V) ->
[ convert_value(Type, Length, E) || E <- V ];
convert_array_value(Type, Length, V) ->
[ convert_value(Type, Length, V) ].
%% @doc Flush all cached information about the database.
-spec flush(Context) -> ok when
Context :: z:context().
flush(Context) ->
Options = z_db_pool:get_database_options(Context),
Db = proplists:get_value(dbdatabase, Options),
z_depcache:flush({database, Db}, Context).
%% @doc Update the sequence of the ids in the table. They will be renumbered according to their position in the id list.
%% @todo Make the steps of the sequence bigger, and try to keep the old sequence numbers in tact (needs a diff routine)
-spec update_sequence(table_name(), list( integer() ), z:context()) -> any().
update_sequence(Table, Ids, Context) ->
{_Schema, _Table, QTab} = quoted_table_name(Table),
DbDriver = z_context:db_driver(Context),
Args = lists:zip(Ids, lists:seq(1, length(Ids))),
F = fun
(none) ->
[];
(C) ->
[ {ok, _} = equery1(DbDriver, C, "update "++QTab++" set seq = $2 where id = $1", tuple_to_list(Arg)) || Arg <- Args ]
end,
with_connection(F, Context).
%% @doc Create database and schema if they do not yet exist
-spec prepare_database(z:context()) -> ok | {error, term()}.
prepare_database(Context) ->
Options = z_db_pool:get_database_options(Context),
Site = z_context:site(Context),
case ensure_database(Site, Options) of
ok ->
ensure_schema(Site, Options);
{error, _} = Error ->
Error
end.
ensure_database(Site, Options) ->
Database = proplists:get_value(dbdatabase, Options),
case open_connection("postgres", Options) of
{ok, PgConnection} ->
Result = case database_exists(PgConnection, Database) of
true ->
ok;
false ->
AnonOptions = proplists:delete(dbpassword, Options),
?LOG_NOTICE(#{
text => <<"Creating database">>,
database => Database,
options => AnonOptions
}),
create_database(Site, PgConnection, Database)
end,
close_connection(PgConnection),
Result;
{error, Reason} = Error ->
?LOG_ERROR(#{
text => <<"Cannot create database because user cannot connect to the 'postgres' database">>,
result => error,
reason => Reason,
database => Database,
dbuser => proplists:get_value(dbuser, Options)
}),
Error
end.
ensure_schema(Site, Options) ->
Database = proplists:get_value(dbdatabase, Options),
Schema = proplists:get_value(dbschema, Options),
{ok, DbConnection} = open_connection(Database, Options),
Result = case schema_exists_conn(DbConnection, Schema) of
true ->
ok;
false ->
?LOG_NOTICE(#{
text => <<"Creating schema in database">>,
database => Database,
schema => Schema
}),
create_schema(Site, DbConnection, Schema)
end,
close_connection(DbConnection),
Result.
drop_schema(#context{} = Context) ->
drop_schema(z_db_pool:get_database_options(Context));
drop_schema(Options) when is_list(Options) ->
Schema = proplists:get_value(dbschema, Options),
Database = proplists:get_value(dbdatabase, Options),
case open_connection(Database, Options) of
{ok, DbConnection} ->
Result = case schema_exists_conn(DbConnection, Schema) of
true ->
case epgsql:equery(
DbConnection,
"DROP SCHEMA \"" ++ Schema ++ "\" CASCADE"
) of
{ok, _, _} = OK ->
?LOG_NOTICE(#{
text => <<"Dropped schema">>,
result => ok,
database => Database,
schema => Schema,
return_value => OK
}),
ok;
{error, Reason} = Error ->
?LOG_ERROR(#{
text => <<"z_db error when dropping schema">>,
result => error,
reason => Reason,
database => Database,
schema => Schema
}),
Error
end;
false ->
?LOG_WARNING(#{
text => <<"Could not drop schema as it does not exist">>,
result => warning,
reason => noschema,
database => Database,
schema => Schema
}),
ok
end,
close_connection(DbConnection),
Result;
{error, Reason} ->
?LOG_ERROR(#{
text => <<"z_db error when connecting for dropping schema">>,
result => error,
reason => Reason,
database => Database,
schema => Schema
}),
ok
end.
open_connection(DatabaseName, Options) ->
epgsql:connect(z_db_pgsql:build_connect_options(DatabaseName, Options)).
close_connection(Connection) ->
epgsql:close(Connection).
%% @doc Check whether database exists
-spec database_exists(epgsql:connection(), string()) -> boolean().
database_exists(Connection, Database) ->
{ok, _, [{Count}]} = epgsql:equery(
Connection,
"SELECT COUNT(*) FROM pg_catalog.pg_database WHERE datname = $1",
[Database]
),
case z_convert:to_integer(Count) of
0 -> false;
1 -> true
end.
%% @doc Create a database
-spec create_database(atom(), epgsql:connection(), string()) -> ok | {error, term()}.
create_database(_Site, Connection, Database) ->
%% Use template0 to prevent ERROR: new encoding (UTF8) is incompatible with
%% the encoding of the template database (SQL_ASCII)
z_db_table:assert_database_name(Database),
case epgsql:equery(
Connection,
"CREATE DATABASE \"" ++ Database ++ "\" ENCODING = 'UTF8' TEMPLATE template0"
) of
{error, Reason} = Error ->
?LOG_ERROR(#{
text => <<"z_db error when creating database">>,
result => error,
reason => Reason,
database => Database
}),
Error;
{ok, _, _} ->
ok
end.
%% @doc Check whether schema exists
-spec schema_exists_conn(epgsql:connection(), string()) -> boolean().
schema_exists_conn(Connection, Schema) ->
{ok, _, Schemas} = epgsql:equery(
Connection,
"SELECT schema_name FROM information_schema.schemata"),
lists:member( {z_convert:to_binary(Schema)}, Schemas ).
%% @doc Create a schema
-spec create_schema(atom(), epgsql:connection(), string()) -> ok | {error, term()}.
create_schema(_Site, Connection, Schema) ->
z_db_table:assert_schema_name(Schema),
case epgsql:equery(
Connection,
"CREATE SCHEMA \"" ++ Schema ++ "\""
) of
{ok, _, _} ->
ok;
{error, #error{ codename = duplicate_schema, message = Msg }} ->
?LOG_NOTICE(#{
text => <<"Schema already exists">>,
result => warning,
reason => duplicate_schema,
message => Msg
}),
ok;
{error, Reason} = Error ->
?LOG_ERROR(#{
text => <<"z_db error when creating schema">>,
result => error,
reason => Reason,
schema => Schema
}),
Error
end.
%% @doc Check if a named constraint exists on a table.
-spec constraint_exists(Table, Constraint, Context) -> boolean() when
Table :: table_name(),
Constraint :: binary() | string() | atom(),
Context :: z:context().
constraint_exists(Table, Constraint, Context) ->
Options = z_db_pool:get_database_options(Context),
Db = proplists:get_value(dbdatabase, Options),
Schema = proplists:get_value(dbschema, Options),
HasConstraint = q1("
select count(*)
from information_schema.table_constraints
where constraint_catalog = $1
and constraint_schema = $2
and table_name = $3
and constraint_name = $4",
[Db, Schema, Table, z_convert:to_binary(Constraint)],
Context),
HasConstraint >= 1.
%% @doc Check if a function is defined in the current schema.
-spec function_exists( FunctionName, Context) -> boolean() when
FunctionName :: binary() | string() | atom(),
Context :: z:context().
function_exists(Function, Context) ->
Options = z_db_pool:get_database_options(Context),
Schema = proplists:get_value(dbschema, Options),
HasFunction = z_db:q1("
select count(*)
from pg_proc p
join pg_namespace n
on p.pronamespace = n.oid
where n.nspname = $1
and p.proname = $2",
[Schema, z_convert:to_binary(Function)],
Context),
HasFunction =:= 1.
%% @doc Return a list with all foreign keys on a table.
-spec foreign_keys( table_name(), z:context() ) -> {ok, [ map() ]} | {error, Reason} when
Reason :: enoent | term().
foreign_keys(Table, Context) ->
Options = z_db_pool:get_database_options(Context),
Db = proplists:get_value(dbdatabase, Options),
Schema = proplists:get_value(dbschema, Options),
z_db:qmap("
SELECT
tc.table_schema,
tc.constraint_name,
tc.table_name,
kcu.column_name,
ccu.table_schema AS foreign_table_schema,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_catalog = kcu.table_catalog
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_catalog = tc.table_catalog
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.constraint_catalog = $1
AND tc.table_schema = $2
AND tc.table_name = $3",
[Db, Schema, Table],
[ {keys, atom} ],
Context).
-spec foreign_key( table_name(), Name, z:context() ) -> {ok, map()} | {error, Reason} when
Name :: atom() | string() | binary(),
Reason :: enoent | term().
foreign_key(Table, Name, Context) ->
Options = z_db_pool:get_database_options(Context),
Db = proplists:get_value(dbdatabase, Options),
Schema = proplists:get_value(dbschema, Options),
z_db:qmap_row("
SELECT
tc.table_schema,
tc.constraint_name,
tc.table_name,
kcu.column_name,
ccu.table_schema AS foreign_table_schema,
ccu.table_name AS foreign_table_name,
ccu.column_name AS foreign_column_name
FROM information_schema.table_constraints AS tc
JOIN information_schema.key_column_usage AS kcu
ON tc.constraint_name = kcu.constraint_name
AND tc.table_catalog = kcu.table_catalog
AND tc.table_schema = kcu.table_schema
JOIN information_schema.constraint_column_usage AS ccu
ON ccu.constraint_name = tc.constraint_name
AND ccu.table_catalog = tc.table_catalog
AND ccu.table_schema = tc.table_schema
WHERE tc.constraint_type = 'FOREIGN KEY'
AND tc.constraint_catalog = $1
AND tc.table_schema = $2
AND tc.table_name = $3
AND tc.constraint_name = $4",
[Db, Schema, Table, Name],
[ {keys, atom} ],
Context).
-spec key_exists( table_name(), binary() | string() | atom(), z:context() ) -> boolean().
key_exists(Table, Key, Context) ->
Options = z_db_pool:get_database_options(Context),
Schema = proplists:get_value(dbschema, Options),
HasKey = z_db:q1("
select count(*)
from pg_indexes
where schemaname = $1
and tablename = $2
and indexname = $3",
[ Schema, Table, Key ],
Context),
HasKey >= 1.
%% @doc Return a map of all indices of a table. The key is the name of
%% the index, the value is the definition of the index.
-spec table_keys( table_name(), z:context() ) -> {ok, Indices} when
Indices :: #{ IndexName := IndexDef },
IndexName :: binary(),
IndexDef :: binary().
table_keys(Table, Context) ->
Options = z_db_pool:get_database_options(Context),
Schema = proplists:get_value(dbschema, Options),
Rs = z_db:q("
select indexname, indexdef
from pg_indexes
where schemaname = $1
and tablename = $2",
[ Schema, Table ],
Context),
Map = lists:foldl(
fun({N, D}, Acc) ->
Acc#{ N => D }
end,
#{},
Rs),
{ok, Map}.
%% @doc Check the information schema if a certain table exists in the context database.
-spec table_exists(table_name(), z:context()) -> boolean().
table_exists(Table, Context) ->
z_db_table:table_exists(Table, Context).
%% @doc Ensure that a table with the given columns exists, if the table exists then
%% add, modify or drop columns. The 'id' (with type serial) column _must_ be defined
%% when creating the table.
-spec create_table(Table, Columns, Context) -> ok | {error, Reason} when
Table :: table_name(),
Columns :: list( #column_def{} ),
Context :: z:context(),
Reason :: term().
create_table(Table, Cols, Context) ->
z_db_table:create_table(Table, Cols, Context).
%% @doc Alter a table so that it matches the given column definitions. If the table doesn't
%% exist then it is created. RESTRICTIONS: does NOT change the primary key and unique
%% constraint of existing columns. If the table doesn't exist then it is created.
%% Be careful when adding columns that are not nullable, if the table contains data then
%% adding those columns will fail.
-spec alter_table(Table, Columns, Context) -> ok | {error, Reason} when
Table :: table_name(),
Columns :: list( #column_def{} ),
Context :: z:context(),
Reason :: term().
alter_table(Table, Cols, Context) ->
z_db_table:alter_table(Table, Cols, Context).
%% @doc Make sure that a table is dropped, only if the table exists
-spec drop_table(table_name(), z:context()) -> ok.
drop_table(Table, Context) ->
z_db_table:drop_table(Table, Context).
%% @doc Assert that the table name is safe to use. Crashes if the table name is not safe.
-spec assert_table_name(table_name()) -> true.
assert_table_name(Table) ->
z_db_table:assert_table_name(Table).
%% @doc Quote a table name so that it is safe to use in SQL queries.
-spec quoted_table_name(table_name()) -> {default | string(), string(), string()}.
quoted_table_name(Table) ->
z_db_table:quoted_table_name(Table).
%% @doc Merge the contents of the props column into the result rows
-spec merge_props([proplists:proplist() | map()]) -> list().
merge_props(List) ->
merge_props(List, []).
merge_props([], Acc) ->
lists:reverse(Acc);
merge_props([R | Rest], Acc) when is_list(R) ->
case {proplists:get_value(props, R, undefined), proplists:get_value(props_json, R, undefined)} of
{Props, PropsJSON} when ?IS_EMPTY(Props) andalso ?IS_EMPTY(PropsJSON) ->
merge_props(Rest, [R | Acc]);
{Term, PropsJSON} when ?IS_EMPTY(PropsJSON) ->
case Term of
T when is_list(T) ->
merge_props(Rest, [lists:keydelete(props, 1, R) ++ T | Acc]);
T when is_map(T) ->
T1 = map_to_proplist(Term),
merge_props(Rest, [lists:keydelete(props, 1, R) ++ T1 | Acc])
end;
{Term, PropsJSON} when ?IS_EMPTY(Term) ->
T1 = map_to_proplist(PropsJSON),
merge_props(Rest, [lists:keydelete(props_json, 1, R) ++ T1 | Acc]);
{Term, PropsJSON} ->
PropsTerm = case Term of
L when is_list(L) -> L;
M when is_map(M) -> map_to_proplist(M)
end,
PropsJSONTerm = map_to_proplist(PropsJSON),
PropsMerged = z_utils:props_merge(PropsJSONTerm, PropsTerm),
merge_props(Rest, [ lists:keydelete(props_json, 1, lists:keydelete(props, 1, R)) ++ PropsMerged | Acc])
end.
map_to_proplist(Map) ->
lists:map(fun({K,V}) -> {z_convert:to_atom(K), V} end, maps:to_list(Map)).
-spec assoc1(atom(), z:context(), sql(), parameters(), pos_integer()) -> {ok, [proplists:proplist()]}.
assoc1(DbDriver, C, Sql, Parameters, Timeout) ->
case DbDriver:equery(C, Sql, Parameters, Timeout) of
{ok, Columns, Rows} ->
Names = [ list_to_atom(binary_to_list(Name)) || #column{name=Name} <- Columns ],
Rows1 = [ lists:zip(Names, tuple_to_list(Row)) || Row <- Rows ],
{ok, Rows1};
Other -> Other
end.
equery1(DbDriver, C, Sql) ->
equery1(DbDriver, C, Sql, [], ?TIMEOUT).
equery1(DbDriver, C, Sql, Parameters) when is_list(Parameters); is_tuple(Parameters) ->
equery1(DbDriver, C, Sql, Parameters, ?TIMEOUT).
equery1(DbDriver, C, Sql, Parameters, Timeout) ->
case DbDriver:equery(C, Sql, Parameters, Timeout) of
{ok, _Columns, []} -> {error, noresult};
{ok, _RowCount, _Columns, []} -> {error, noresult};
{ok, _Columns, [Row|_]} -> {ok, element(1, Row)};
{ok, _RowCount, _Columns, [Row|_]} -> {ok, element(1, Row)};
Other -> Other
end.