%% @author Marc Worrell <marc@worrell.nl>
%% @copyright 2009-2025 Marc Worrell
%% @doc Search the database, interfaces to specific search routines.
%% @end
%% Copyright 2009-2025 Marc Worrell
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(z_search).
-author("Marc Worrell <marc@worrell.nl").
%% interface functions
-export([
search/5,
search/6,
search/2,
search/3,
search_pager/3,
search_pager/4,
search_result/3,
query_/2,
lookup_qarg_value/2,
lookup_qarg_value/3,
lookup_qarg/2,
lookup_qarg/3,
map_to_options/1,
props_to_map/1,
reformat_sql_query/3,
concat_sql_query/2,
default_pagelen/1
]).
% The tuple format is deprecated. Use separate binary search term with a map.
-type search_query():: {atom(), list()}
| {binary(), map()|undefined}.
-type search_offset() :: Limit :: pos_integer()
| {Offset :: pos_integer(), Limit :: pos_integer()}.
-type search_options() :: #{
properties => list(binary()) | boolean(),
is_count_rows => boolean(),
is_single_page => boolean()
}.
-export_type([
search_query/0,
search_offset/0,
search_options/0
]).
-include_lib("zotonic.hrl").
-define(SEARCH_ALL_LIMIT, 30000).
-define(MIN_LOOKAHEAD, 200).
%% @doc Perform a named search with arguments.
-spec search(Name, Args, Page, PageLen, Context) -> Result when
Name :: binary(),
Args :: map() | proplists:proplist() | undefined,
Page :: pos_integer() | undefined,
PageLen :: pos_integer() | undefined,
Context :: z:context(),
Result :: #search_result{}.
search(Name, #{ <<"options">> := Options } = Args, Page, PageLen, Context) ->
search(Name, Args, Page, PageLen, Options, Context);
search(Name, Args, Page, PageLen, Context) ->
search(Name, Args, Page, PageLen, #{}, Context).
%% @doc Perform a named search with arguments.
-spec search(Name, Args, Page, PageLen, Options, Context) -> Result when
Name :: binary(),
Args :: map() | proplists:proplist() | undefined,
Page :: pos_integer() | undefined,
PageLen :: pos_integer() | undefined,
Options :: map(),
Context :: z:context(),
Result :: #search_result{}.
search(Name, undefined, Page, PageLen, Options, Context) ->
search(Name, #{}, Page, PageLen, Options, Context);
search(Name, Args, undefined, PageLen, Options, Context) ->
search(Name, Args, 1, PageLen, Options, Context);
search(Name, Args, Page, undefined, Options, Context) ->
search(Name, Args, Page, default_pagelen(Context), Options, Context);
search(Name, Args0, Page, PageLen, Options0, Context) when is_binary(Name), is_map(Options0) ->
Args = props_to_map(Args0),
Options = map_to_options(Options0),
OffsetLimit = offset_limit(Page, PageLen, Options),
Q = #search_query{
name = Name,
args = Args,
offsetlimit = OffsetLimit,
options = Options
},
case z_notifier:first(Q, Context) of
undefined when Name =/= <<"query">> ->
case m_rsc:rid(Name, Context) of
RId when is_integer(RId) ->
case m_rsc:is_a(RId, 'query', Context) of
true ->
% Named query, perform the query with the stored arguments.
Args1 = Args#{
<<"query_id">> => RId
},
search(<<"query">>, Args1, Page, PageLen, Options, Context);
false ->
?LOG_NOTICE(#{
text => <<"z_search: ignored unknown search query">>,
in => zotonic_core,
name => Name,
args => Args
}),
#search_result{
search_name = Name,
search_args = Args
}
end;
undefined ->
?LOG_NOTICE(#{
text => <<"z_search: ignored unknown search query">>,
in => zotonic_core,
name => Name,
args => Args
}),
#search_result{
search_name = Name,
search_args = Args
}
end;
undefined ->
?LOG_NOTICE(#{
text => <<"z_search: ignored unknown search query">>,
in => zotonic_core,
name => Name,
args => Args
}),
#search_result{
search_name = Name,
search_args = Args
};
Result ->
handle_post_options(
handle_search_result(Result, Page, PageLen, OffsetLimit, Name, Args, Options, Context),
Context)
end.
%% @doc Search items and handle the paging. Uses the default page length.
%% @deprecated use search/5
-spec search_pager(Query, Page, Context) -> Result when
Query :: search_query(),
Page :: pos_integer(),
Context :: z:context(),
Result :: #search_result{}.
search_pager(Search, Page, Context) ->
search_pager(Search, Page, default_pagelen(Context), Context).
%% @doc Search items and handle the paging. This fetches extra rows beyond the requested
%% rows to ensure that the pager has the information for the "next page" options.
%% The number of extra rows depends on the current page, more for page 1, less for later pages.
-spec search_pager(Query, Page, PageLen, Context) -> Result when
Query :: search_query(),
Page :: pos_integer() | undefined,
PageLen :: pos_integer() | undefined,
Context :: z:context(),
Result :: #search_result{}.
search_pager(Search, undefined, PageLen, Context) ->
search_pager(Search, 1, PageLen, Context);
search_pager(Search, Page, undefined, Context) ->
search_pager(Search, Page, default_pagelen(Context), Context);
search_pager({Name, Args}, Page, PageLen, Context) when is_binary(Name) ->
search(Name, Args, Page, PageLen, Context);
search_pager({Name, Args}, _Page, PageLen, _Context) when PageLen < 1 ->
#search_result{
search_name = Name,
search_args = Args,
result = [],
page = 1,
pagelen = PageLen,
pages = 0,
prev = 1,
next = false
};
search_pager({Name, Args} = Search, Page, PageLen, Context) when is_atom(Name) ->
OffsetLimit = offset_limit(Page, PageLen, #{}),
SearchResult = search_1(Search, Page, PageLen, OffsetLimit, Context),
handle_search_result(SearchResult, Page, PageLen, OffsetLimit, Name, Args, #{}, Context).
%% @doc Search with the question and return the results
%% @deprecated use search/5
-spec search({atom()|binary(), proplists:proplist()|map()}, z:context()) -> #search_result{}.
search(Search, Context) ->
search(Search, default_offset_limit(Context), Context).
%% @doc Perform the named search and its arguments
%% @deprecated use search/5
-spec search(search_query(), search_offset() | undefined, z:context() ) -> #search_result{}.
search(Search, undefined, Context) ->
search_1(Search, 1, ?SEARCH_ALL_LIMIT, {1, ?SEARCH_ALL_LIMIT}, Context);
search(Search, MaxRows, Context) when is_integer(MaxRows) ->
search_1(Search, 1, MaxRows, {1, MaxRows}, Context);
search(Search, {1, Limit} = OffsetLimit, Context) ->
search_1(Search, 1, Limit, OffsetLimit, Context);
search(Search, {Offset, Limit} = OffsetLimit, Context) ->
case (Offset - 1) rem Limit of
0 ->
% On a page boundary, we can calculate the page number.
PageNr = (Offset - 1) div Limit + 1,
search_1(Search, PageNr, Limit, OffsetLimit, Context);
_ ->
% Not on a page boundary, give up on calculating the page number.
search_1(Search, 1, ?SEARCH_ALL_LIMIT, OffsetLimit, Context)
end.
-spec default_pagelen( z:context() ) -> pos_integer().
default_pagelen(Context) ->
Len = m_config:get_value(site, pagelen, Context),
case z_convert:to_integer(Len) of
undefined -> ?SEARCH_PAGELEN;
N -> N
end.
-spec default_offset_limit( z:context() ) -> search_offset().
default_offset_limit(Context) ->
{1, default_pagelen(Context)}.
%% @doc Find the term value of a named argument in a query terms list or map.
-spec lookup_qarg_value(Arg, Terms) -> TermValue | undefined when
Arg :: binary(),
Terms :: map(),
TermValue :: term().
lookup_qarg_value(Arg, Terms) ->
lookup_qarg_value(Arg, Terms, undefined).
%% @doc Find the term value of a named argument in a query terms list or map. Return
%% the default when the term is not found or doesn't have a value.
-spec lookup_qarg_value(Arg, Terms, Default) -> TermValue | Default when
Arg :: binary(),
Terms :: map(),
Default :: term(),
TermValue :: term().
lookup_qarg_value(Arg, Terms, Default) ->
case lookup_qarg(Arg, Terms, undefined) of
#{ <<"value">> := V } -> V;
#{} -> Default;
undefined -> Default
end.
%% @doc Find a named argument in a query terms list or map.
-spec lookup_qarg(Arg, Terms) -> Term | undefined when
Arg :: binary(),
Terms :: map(),
Term :: #{ binary() => term() } | undefined.
lookup_qarg(Arg, Terms) ->
lookup_qarg(Arg, Terms, undefined).
-spec lookup_qarg(Arg, Terms, Default) -> Term | Default when
Arg :: binary(),
Terms :: map(),
Default :: term(),
Term :: #{ binary() => term() }.
lookup_qarg(Arg, #{ <<"q">> := Terms }, Default) when is_list(Terms) ->
lookup_qarg_1(Arg, Terms, Default).
lookup_qarg_1(_Arg, [], Default) ->
Default;
lookup_qarg_1(Arg, [ #{ <<"term">> := T } = Term | _ ], _Default) when T =:= Arg ->
Term;
lookup_qarg_1(Arg, [ _ | Terms ], Default) ->
lookup_qarg_1(Arg, Terms, Default).
%% @doc Handle a return value from a search function. This can be an intermediate SQL statement
%% that still needs to be augmented with extra ACL checks.
-spec handle_search_result( Result, Page, PageLen, OffsetLimit, Name, Args, Options, Context ) -> #search_result{} when
Result :: list() | #search_result{} | #search_sql{},
Page :: pos_integer(),
PageLen :: pos_integer(),
OffsetLimit :: {non_neg_integer(), non_neg_integer()},
Name :: binary() | atom(),
Args :: map() | proplists:proplist(),
Options :: search_options(),
Context :: z:context().
handle_search_result(#search_result{ result = L, total = Total } = S, Page, PageLen, _OffsetLimit, Name, Args, Options, _Context)
when is_integer(Total) ->
Pages = (Total+PageLen-1) div PageLen,
Next = if
Page < Pages -> Page + 1;
true -> false
end,
S#search_result{
search_name = Name,
search_args = Args,
result = lists:sublist(L, 1, PageLen),
options = Options,
page = Page,
pagelen = PageLen,
pages = Pages,
prev = erlang:max(Page-1, 1),
next = Next
};
handle_search_result(#search_result{ result = L, total = undefined } = S, Page, PageLen, _OffsetLimit, Name, Args, Options, _Context) ->
% Search result without page nr, but which should have used the OffsetLimit.
Len = length(L),
Next = if
Len > PageLen -> Page + 1;
true -> false
end,
S#search_result{
search_name = Name,
search_args = Args,
result = lists:sublist(L, 1, PageLen),
options = Options,
page = Page,
pagelen = PageLen,
prev = erlang:max(Page-1, 1),
next = Next
};
handle_search_result(L, Page, PageLen, {Offset, _Limit}, Name, Args, Options, _Context) when is_list(L) ->
% Simple search results that don't do their paging and offset/limit handling, do the paging here.
Total = length(L),
Pages = (Total + PageLen - 1) div PageLen,
Next = if
Page < Pages -> Page + 1;
true -> false
end,
#search_result{
search_name = Name,
search_args = Args,
result = sublist(L, Offset, PageLen),
page = Page,
pagelen = PageLen,
pages = Pages,
options = Options,
total = Total,
is_total_estimated = false,
prev = erlang:max(Page-1, 1),
next = Next
};
handle_search_result(#search_sql_terms{} = Terms, Page, PageLen, OffsetLimit, Name, Args, Options, Context) ->
SearchSQL = z_search_terms:combine(Terms),
handle_search_result(SearchSQL, Page, PageLen, OffsetLimit, Name, Args, Options, Context);
handle_search_result(#search_sql{} = Q, Page, PageLen, {_, Limit} = OffsetLimit, Name, Args, Options, Context) ->
Q1 = reformat_sql_query(Q, Options, Context),
{Sql, SqlArgs} = concat_sql_query(Q1, OffsetLimit),
case Q#search_sql.run_func of
F when is_function(F) ->
Result = F(Q, Sql, Args, Context),
handle_search_result(Result, Page, PageLen, OffsetLimit, Name, Args, Options, Context);
_ ->
Rows = case Q#search_sql.assoc of
false ->
Rs = z_db:q(Sql, SqlArgs, Context),
case Rs of
[{_}|_] -> [ R || {R} <- Rs ];
_ -> Rs
end;
true ->
z_db:assoc_props(Sql, SqlArgs, Context)
end,
RowCount = length(Rows),
FoundTotal = (Page-1) * PageLen + RowCount,
{Total, IsEstimate} = if
Limit > RowCount andalso RowCount > 0 ->
% Didn't return all rows, assume we are at the end of the result set.
{FoundTotal, false};
true ->
% No rows or the number of requested rows was returned.
{SqlNoLimit, ArgsNoLimit} = concat_sql_query(Q1, undefined),
{ok, EstimatedTotal} = z_db:estimate_rows(SqlNoLimit, ArgsNoLimit, Context),
if
RowCount > 0 ->
% Max rows returned, the total is more
{erlang:max(FoundTotal, EstimatedTotal), true};
RowCount =:= 0 andalso Page =:= 1 ->
{0, false};
RowCount =:= 0 ->
% We are beyond the number of available rows
{erlang:min(FoundTotal, EstimatedTotal), true}
end
end,
Pages = (Total + PageLen - 1) div PageLen,
Next = if
Pages > Page -> Page + 1;
true -> false
end,
Result = #search_result{
search_name = Name,
search_args = Args,
result = lists:sublist(Rows, 1, PageLen),
options = Options,
pages = Pages,
page = Page,
pagelen = PageLen,
total = Total,
is_total_estimated = IsEstimate,
prev = erlang:max(Page-1, 1),
next = Next
},
case Q#search_sql.post_func of
undefined -> Result;
Fun -> Fun(Result, Q1, Context)
end
end.
%% Calculate an offset/limit for the query. This takes such a range from the results
%% that we can display a pager. We don't need exact results, as we will use the query
%% planner to give an estimated number of rows.
offset_limit(_Page, _PageLen, #{ is_count_rows := true }) ->
{1, 1};
offset_limit(N, PageLen, #{ is_single_page := true }) ->
{(N-1) * PageLen + 1, PageLen + 1};
offset_limit(1, PageLen, _Options) ->
% Take 6 pages + 1 or the MIN_LOOKAHEAD
{1, erlang:max(6 * PageLen + 1, ?MIN_LOOKAHEAD)};
offset_limit(N, PageLen, _Options) ->
% Take at least 5 pages + 1 to accomodate for the default slider of the pager.
{(N-1) * PageLen + 1, erlang:max(5 * PageLen + 1, ?MIN_LOOKAHEAD - N * PageLen)}.
sublist(L, Offset, Limit) ->
try
lists:sublist(L, Offset, Limit)
catch
error:function_clause ->
[]
end.
%% @doc Handle deprecated searches in the {atom, list()} format.
search_1({SearchName, Props}, Page, PageLen, _OffsetLimit, Context) when is_binary(SearchName), is_integer(Page) ->
ArgsMap = props_to_map(Props),
search(SearchName, ArgsMap, Page, PageLen, Context);
search_1({SearchName, Props}, Page, PageLen, {Offset, Limit} = OffsetLimit, Context) when is_atom(SearchName), is_list(Props) ->
Props1 = case proplists:get_all_values(cat, Props) of
[] -> Props;
[[]] -> Props;
Cats -> [{cat, Cats} | proplists:delete(cat, Props)]
end,
Props2 = case proplists:get_all_values(cat_exclude, Props1) of
[] -> Props1;
[[]] -> Props1;
CatsX -> [{cat_exclude, CatsX} | proplists:delete(cat_exclude, Props1)]
end,
PropsSorted = lists:keysort(1, Props2),
Q = #search_query{
search={SearchName, PropsSorted},
offsetlimit=OffsetLimit
},
PageRest = (Offset - 1) rem PageLen,
case z_notifier:first(Q, Context) of
undefined when PageRest =:= 0 ->
% Nicely on a page boundary, try the new search with binary search names.
PageNr = (Offset - 1) div PageLen + 1,
SearchNameBin = z_convert:to_binary(SearchName),
ArgsMap = props_to_map(PropsSorted),
search(SearchNameBin, ArgsMap, PageNr, PageLen, Context);
undefined ->
% Not on a page boundary so we can't use the new search, return the empty result.
?LOG_NOTICE(#{
text => <<"z_search: ignored unknown search query">>,
in => zotonic_core,
name => SearchName,
args => PropsSorted
}),
#search_result{};
Result when Page =/= undefined ->
handle_search_result(Result, Page, PageLen, OffsetLimit, SearchName, PropsSorted, #{}, Context);
Result when PageRest =:= 0 ->
PageNr = (Offset - 1) div Limit + 1,
handle_search_result(Result, PageNr, Limit, OffsetLimit, SearchName, PropsSorted, #{}, Context);
Result ->
S = search_result(Result, OffsetLimit, Context),
S#search_result{
search_name = SearchName,
search_args = PropsSorted
}
end;
search_1(Name, Page, PageLen, OffsetLimit, Context) when is_atom(Name) ->
search_1({Name, []}, Page, PageLen, OffsetLimit, Context);
search_1(Name, _Page, _PageLen, _OffsetLimit, _Context) ->
?LOG_NOTICE(#{
text => <<"z_search: ignored unknown search query">>,
in => zotonic_core,
name => Name
}),
#search_result{}.
%% @doc Handle the options applied after a search has been done.
-spec handle_post_options(#search_result{}, z:context()) -> #search_result{}.
handle_post_options(#search_result{ options = Options } = S, Context) ->
maybe_option_properties(Options, S, Context).
maybe_option_properties(#{ properties := Props }, #search_result{ result = Result } = S, Context) ->
Result1 = lists:filtermap(
fun(R) ->
option_properties(R, Props, Context)
end,
Result),
S#search_result{ result = Result1 };
maybe_option_properties(_Options, S, _Context) ->
S.
option_properties(R, true, Context) ->
DefaultProps = [
<<"id">>,
<<"title">>,
<<"short_title">>,
<<"summary">>,
<<"category_id">>,
<<"category">>,
<<"page_url">>,
<<"thumbnail_url">>
],
option_properties(R, DefaultProps, Context);
option_properties(Id, Props, Context) when is_integer(Id), is_list(Props) ->
case z_acl:rsc_visible(Id, Context) of
true ->
Rsc = lists:foldl(
fun
(<<"category">>, Acc) ->
CatId = m_rsc:p(Id, <<"category_id">>, Context),
Acc#{
<<"category">> => #{
<<"id">> => CatId,
<<"name">> => m_rsc:p(CatId, <<"name">>, Context),
<<"title">> => m_rsc:p(CatId, <<"title">>, Context)
}
};
(<<"o.", Predicate/binary>> = P, Acc) ->
OProps = sub_properties(m_edge:objects(Id, Predicate, Context), Context),
Acc#{
P => OProps
};
(<<"s.", Predicate/binary>> = P, Acc) ->
SProps = sub_properties(m_edge:subjects(Id, Predicate, Context), Context),
Acc#{
P => SProps
};
(P, Acc) when is_binary(P) ->
Acc#{
P => m_rsc:p(Id, P, Context)
};
(_, Acc) ->
Acc
end,
#{ <<"id">> => Id },
Props),
{true, Rsc};
false ->
false
end;
option_properties({Title, Id}, Props, Context) when is_integer(Id) ->
% Typical result from a search bytitle (or another sort key)
case option_properties(Id, Props, Context) of
{true, Rsc} ->
{true, {Title, Rsc}};
false ->
false
end;
option_properties(R, _Props, _Context) ->
R.
sub_properties(Ids, Context) ->
lists:filtermap(
fun(Id) ->
case z_acl:rsc_visible(Id, Context) of
true ->
{true, option_properties(Id, true, Context)};
false ->
false
end
end,
Ids).
%% @doc Map a map with (binary) search options to a search option list.
-spec map_to_options( map() ) -> search_options().
map_to_options(Map) ->
maps:fold(
fun
(properties, V, Acc) ->
Acc#{ properties => opt_props(V) };
(<<"properties">>, V, Acc) ->
Acc#{ properties => opt_props(V) };
(is_count_rows, V, Acc) ->
Acc#{ is_count_rows => z_convert:to_bool(V) };
(<<"is_count_rows">>, V, Acc) ->
Acc#{ is_count_rows => z_convert:to_bool(V) };
(is_single_page, V, Acc) ->
Acc#{ is_single_page => z_convert:to_bool(V) };
(<<"is_single_page">>, V, Acc) ->
Acc#{ is_single_page => z_convert:to_bool(V) };
(K, V, Acc) ->
?LOG_INFO(#{
text => <<"Dropping unknown search option">>,
in => zotonic_core,
option => K,
value => V
}),
Acc
end,
#{},
Map).
opt_props(<<>>) ->
true;
opt_props(B) when is_binary(B) ->
opt_props([B]);
opt_props([]) ->
true;
opt_props(L) when is_list(L) ->
lists:flatten([
binary:split(z_convert:to_binary(P), <<",">>, [global, trim]) || P <- L
]);
opt_props(P) ->
z_convert:to_bool(P).
%% @doc Change a search props list to a standard query format.
-spec props_to_map( list() | map() | undefined ) -> #{ binary() => term() }.
props_to_map(Props) when is_map(Props) ->
z_search_props:from_map(Props);
props_to_map(Props) when is_list(Props) ->
z_search_props:from_list(Props);
props_to_map(undefined) ->
z_search_props:from_list([]).
%% @doc Given a query as proplist or map, return all results.
-spec query_( proplists:proplist() | map(), z:context() ) -> list( m_rsc:resource_id() ).
query_(Args, Context) when is_map(Args) ->
S = search(<<"query">>, Args, 1, ?SEARCH_ALL_LIMIT, Context),
S#search_result.result;
query_(Props, Context) ->
% deprecated, should use argument maps.
S = search({'query', Props}, ?SEARCH_ALL_LIMIT, Context),
S#search_result.result.
%% @doc Handle a return value from a search function. This can be an intermediate SQL statement that still needs to be
%% augmented with extra ACL checks.
-spec search_result(Result, search_offset(), Context) -> #search_result{} when
Result :: list() | #search_result{} | #search_sql{},
Context :: z:context().
search_result(L, _Limit, _Context) when is_list(L) ->
#search_result{
result = L
};
search_result(#search_result{} = S, _Limit, _Context) ->
S;
search_result(#search_sql{} = Q, Limit, Context) ->
Q1 = reformat_sql_query(Q, #{}, Context),
{Sql, Args} = concat_sql_query(Q1, Limit),
case Q#search_sql.run_func of
F when is_function(F) ->
F(Q, Sql, Args, Context);
_ ->
Rows = case Q#search_sql.assoc of
false ->
Rs = z_db:q(Sql, Args, Context),
case Rs of
[{_}|_] ->
[ R || {R} <- Rs ];
_ ->
Rs
end;
true ->
z_db:assoc_props(Sql, Args, Context)
end,
#search_result{
result = Rows
}
end.
concat_sql_query(#search_sql{
select = Select,
from = From,
where = Where,
group_by = GroupBy,
order = Order,
limit = SearchLimit,
args = Args
}, Limit1) ->
From1 = concat_sql_from(From),
Where1 = case Where of
"" -> "";
<<>> -> <<>>;
_ -> [ "where ", Where ]
end,
Order1 = case Order of
"" -> "";
<<>> -> <<>>;
_ -> [ "order by ", Order ]
end,
GroupBy1 = case GroupBy of
"" -> "";
<<>> -> <<>>;
_ -> [ "group by ", GroupBy ]
end,
{Parts, FinalArgs} = case SearchLimit of
undefined ->
case Limit1 of
undefined ->
%% No limit. Use with care.
{[
"select", Select,
"from", From1,
Where1,
GroupBy1,
Order1
], Args};
{OffsetN, LimitN} ->
N = length(Args),
Args1 = Args ++ [OffsetN-1, LimitN],
{[
"select", Select,
"from", From1,
Where1,
GroupBy1,
Order1,
[ "offset $", integer_to_list(N+1) ],
[ "limit $", integer_to_list(N+2) ]
], Args1}
end;
_ ->
{[
"select", Select,
"from", From1,
Where1,
GroupBy1,
Order1,
SearchLimit
], Args}
end,
{iolist_to_binary( lists:join(" ", Parts) ), FinalArgs}.
%% @doc Inject the ACL checks in the SQL query.
-spec reformat_sql_query(#search_sql{}, search_options(), z:context()) -> #search_sql{}.
reformat_sql_query(#search_sql{where=Where, from=From, tables=Tables, args=Args,
cats=TabCats, cats_exclude=TabCatsExclude,
cats_exact=TabCatsExact} = Q,
Options,
Context) ->
{ExtraWhere, Args1} = lists:foldl(
fun(Table, {Acc,As}) ->
{W,As1} = add_acl_check(Table, As, Q, Context),
{[W|Acc], As1}
end, {[], Args}, Tables),
{From1, ExtraWhere1} = lists:foldl(
fun({Alias, Cats}, {F, C}) ->
case add_cat_check(F, Alias, false, Cats, Context) of
{_, []} -> {F, C};
{FromNew, CatCheck} -> {FromNew, [CatCheck | C]}
end
end, {From, ExtraWhere}, TabCats),
{From2, ExtraWhere2} = lists:foldl(
fun({Alias, Cats}, {F, C}) ->
case add_cat_check(F, Alias, true, Cats, Context) of
{_, []} -> {F, C};
{FromNew, CatCheck} -> {FromNew, [CatCheck | C]}
end
end, {From1, ExtraWhere1}, TabCatsExclude),
{ExtraWhere3, Args2} = lists:foldl(
fun({Alias, Cats}, {WAcc,As}) ->
add_cat_exact_check(Cats, Alias, WAcc, As, Context)
end, {ExtraWhere2, Args1}, TabCatsExact),
Where1 = case Where of
<<>> -> [];
B when is_binary(B) -> [ B ];
L when is_list(L) -> L
end,
Where2 = iolist_to_binary(concat_where(ExtraWhere3, Where1)),
Q1 = Q#search_sql{ where=Where2, from=From2, args=Args2 },
case Options of
#{ is_count_rows := true } ->
Q1#search_sql{
select = "count(*)",
limit = "",
order = ""
};
#{} ->
Q1
end.
%% @doc Concatenate the where clause with the extra ACL checks using "and". Skip empty clauses.
concat_where([], Acc) ->
Acc;
concat_where([<<>>|Rest], Acc) ->
concat_where(Rest, Acc);
concat_where([[]|Rest], Acc) ->
concat_where(Rest, Acc);
concat_where([W|Rest], []) ->
concat_where(Rest, [W]);
concat_where([W|Rest], Acc) ->
concat_where(Rest, [W, " and " | Acc]).
%% @doc Process SQL from clause. Analyzing the input (it may be a string, list of #search_sql or/and other strings)
concat_sql_from(From) ->
Froms = concat_sql_from1(From),
lists:join(",", Froms).
concat_sql_from1(From) when is_binary(From) ->
[ From ]; %% from is string
concat_sql_from1([ H | _ ] = From) when is_integer(H) ->
[ From ]; %% from is string?
concat_sql_from1([ #search_sql{} = From | T ]) ->
Subquery = case concat_sql_query(From, undefined) of
{SQL, []} ->
%% postgresql: alias for inner SELECT in FROM must be defined
iolist_to_binary([
"(", SQL, ") AS z_", z_ids:id()
]);
{SQL, [ {as, Alias} ]} when is_list(Alias); is_binary(Alias) ->
iolist_to_binary([
"(", SQL, ") AS ", Alias
]);
{_SQL, A} ->
throw({badarg, "Use outer #search_sql.args to store args of inner #search_sql. Inner arg.list only can be equals to [] or to [{as, Alias=string()}] for aliasing innered select in FROM (e.g. FROM (SELECT...) AS Alias).", A})
end,
[ Subquery | concat_sql_from1(T) ];
concat_sql_from1([ {Source,Alias} | T ]) ->
Alias2 = case z_utils:is_empty(Alias) of
false ->
[ " AS ", z_convert:to_binary(Alias) ];
_ ->
[]
end,
[ [ concat_sql_from1(Source), Alias2 ] | concat_sql_from1(T) ];
concat_sql_from1([H|T]) ->
[ concat_sql_from1(H) | concat_sql_from1(T) ];
concat_sql_from1([]) ->
[];
concat_sql_from1(Something) ->
% make list for records or other stuff
concat_sql_from1([ Something ]).
%% @doc Create extra 'where' conditions for checking the access control
add_acl_check({rsc, Alias}, Args, Q, Context) ->
add_acl_check_rsc(Alias, Args, Q, Context);
add_acl_check({<<"rsc">>, Alias}, Args, Q, Context) ->
add_acl_check_rsc(Alias, Args, Q, Context);
add_acl_check(_, Args, _Q, _Context) ->
{[], Args}.
%% @doc Create extra 'where' conditions for checking the access control
%% @todo This needs to be changed for the pluggable ACL
add_acl_check_rsc(Alias, Args, SearchSql, Context) ->
case z_notifier:first(#acl_add_sql_check{alias=Alias, args=Args, search_sql=SearchSql}, Context) of
undefined ->
case z_acl:is_admin(Context) of
true ->
% Admin can see all resources
{[], Args};
false ->
%% Others can only see published resources
{publish_check(Alias, SearchSql), Args}
end;
{_NewSql, _NewArgs} = Result ->
Result
end.
publish_check(Alias, #search_sql{extra=Extra}) ->
case lists:member(no_publish_check, Extra) of
true ->
[];
false ->
[
Alias, ".is_published = true and "
, Alias, ".publication_start <= now() and "
, Alias, ".publication_end >= now()"
]
end.
%% @doc Create the 'where' conditions for the category check
add_cat_check(_From, _Alias, _Exclude, [], _Context) ->
{_From, []};
add_cat_check(From, Alias, Exclude, Cats, Context) ->
case m_category:is_tree_dirty(Context) of
false ->
% Use range queries on the category_nr pivot column.
add_cat_check_pivot(From, Alias, Exclude, Cats, Context);
true ->
% While the category tree is rebuilding, we use the less efficient version with joins.
add_cat_check_joined(From, Alias, Exclude, Cats, Context)
end.
add_cat_check_pivot(From, Alias, Exclude, Cats, Context) ->
Ranges = m_category:ranges(Cats, Context),
CatChecks = [ cat_check_pivot1(Alias, Exclude, Range) || Range <- Ranges ],
case CatChecks of
[] ->
{From, []};
[_CatCheck] ->
{From, CatChecks};
_ ->
Sep = case Exclude of
false -> " or ";
true -> " and "
end,
{From, [ "(", lists:join(Sep, CatChecks), ")" ]}
end.
cat_check_pivot1(Alias, false, {From,From}) ->
[ Alias, ".pivot_category_nr = ", integer_to_list(From) ];
cat_check_pivot1(Alias, false, {From,To}) ->
[ Alias, ".pivot_category_nr >= ", integer_to_list(From)
, " and ", Alias, ".pivot_category_nr <= ", integer_to_list(To)
];
cat_check_pivot1(Alias, true, {From,From}) ->
[ Alias, ".pivot_category_nr <> ", integer_to_list(From) ];
cat_check_pivot1(Alias, true, {From,To}) ->
[ $(, Alias, ".pivot_category_nr < ", integer_to_list(From)
, " or ", Alias, ".pivot_category_nr > ", integer_to_list(To), ")"
].
%% Add category tree range checks by using joins. Less optimal; only
%% used while the category tree is being recalculated.
add_cat_check_joined(From, Alias, Exclude, Cats, Context) ->
Ranges = m_category:ranges(Cats, Context),
CatAlias = [ Alias, "_cat" ],
FromNew = [{"hierarchy", CatAlias}|From],
CatChecks = lists:map(
fun(Range) ->
Check = cat_check_joined1(CatAlias, Exclude, Range),
[
Alias, ".category_id = ", CatAlias, ".id and "
, CatAlias, ".name = '$category' and "
, Check
]
end,
Ranges),
case CatChecks of
[] ->
{From, []};
[_CatCheck] ->
{FromNew, CatChecks};
_ ->
Sep = case Exclude of
false -> " or ";
true -> " and "
end,
{FromNew, [
"(",
lists:join(Sep, CatChecks),
")"
]}
end.
cat_check_joined1(CatAlias, false, {Left,Left}) ->
[ CatAlias, ".nr = ", integer_to_list(Left) ];
cat_check_joined1(CatAlias, false, {Left,Right}) ->
[ CatAlias, ".nr >= ", integer_to_list(Left)
, " and ", CatAlias, ".nr <= ", integer_to_list(Right)
];
cat_check_joined1(CatAlias, true, {Left,Left}) ->
[ CatAlias, ".nr <> ", integer_to_list(Left) ];
cat_check_joined1(CatAlias, true, {Left,Right}) ->
[ "(", CatAlias, ".nr < ", integer_to_list(Left)
," or ", CatAlias, ".nr > ", integer_to_list(Right), ")"
].
%% @doc Add a check for an exact category match
add_cat_exact_check([], _Alias, WAcc, As, _Context) ->
{WAcc, As};
add_cat_exact_check(CatsExact, Alias, WAcc, As, Context) ->
CatIds = [ m_rsc:rid(CId, Context) || CId <- CatsExact ],
{WAcc ++ [
[Alias, [ ".category_id = any($", (integer_to_list(length(As)+1)), "::int[])"] ]
],
As ++ [CatIds]}.