%% @author Marc Worrell <marc@worrell.nl>
%% @copyright 2009-2022 Marc Worrell, Maas-Maarten Zeeman
%% @doc Pivoting server for the rsc table. Takes care of full text indices. Polls the pivot queue for any changed resources.
%% Copyright 2009-2022 Marc Worrell, Maas-Maarten Zeeman
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(z_pivot_rsc).
-author("Marc Worrell <marc@worrell.nl").
-behaviour(gen_server).
%% gen_server exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export([start_link/1]).
%% interface functions
-export([
poll/1,
status/1,
pivot/2,
pivot_delay/1,
pivot_resource_update/4,
queue_all/1,
queue_count/1,
insert_queue/2,
get_pivot_title/1,
get_pivot_title/2,
insert_task/3,
insert_task/4,
insert_task/5,
insert_task_after/6,
get_task/1,
get_task/2,
get_task/3,
get_task/4,
delete_task/3,
delete_task/4,
list_tasks/1,
count_tasks/1,
delete_tasks/1,
task_job_ping/3,
task_job_done/2,
pivot_job_ping/2,
pivot_job_done/1,
stemmer_language/1,
stemmer_language_config/1,
cleanup_tsv_text/1,
pg_lang/1,
pg_lang_extra/1,
% get_pivot_data/2,
define_custom_pivot/3,
lookup_custom_pivot/4,
insert_queue/3
]).
-include("zotonic.hrl").
% -include_lib("epgsql/include/epgsql.hrl").
% Interval (in seconds) to check if there are any items to be pivoted.
-define(PIVOT_POLL_INTERVAL_FAST, 2).
-define(PIVOT_POLL_INTERVAL_SLOW, 20).
% Interval to check for stuck pivot or task jobs (seconds)
-define(JOB_CHECK_INTERVAL, 10*60).
% Timeouts for pivot and tasks, killed if didn't ping in this number of seconds.
-define(PIVOT_JOB_TIMEOUT, 20*60). % 20 minutes for pivots
-define(TASK_JOB_TIMEOUT, 2*60*60). % 2 hours for tasks (might be shorter in future)
% How many PIVOT_POLL_INTERVAL_SLOW we will skip on SQL errors (like timeouts)
-define(BACKOFF_POLL_ERROR, 4).
% Number of queued ids taken from the queue at one go
-define(POLL_BATCH, 50).
-record(state, {
site :: atom(),
is_initial_delay = true :: boolean(),
is_pivot_delay = false :: boolean(),
backoff_counter = 0 :: integer(),
task_pid :: undefined | pid(),
task_id :: undefined | integer(),
task_progress :: undefined | 0..100,
task_ping :: undefined | erlang:timestamp(),
task :: undefined | map(),
pivot_pid :: undefined | pid(),
pivot_rsc_id :: undefined | m_rsc:resource_id(),
pivot_queue = [] :: [ m_rsc:resource_id() ],
pivot_ping :: undefined | erlang:timestamp()
}).
%% @doc Poll the pivot queue for the database in the context
-spec poll( z:context() ) -> ok.
poll(Context) ->
gen_server:cast(Context#context.pivot_server, poll).
-spec status( z:context() ) -> {ok, map()} | {error, term()}.
status(Context) ->
gen_server:call(Context#context.pivot_server, status).
%% @doc An immediate pivot request for a resource
-spec pivot(integer(), z:context()) -> ok.
pivot(Id, Context) ->
gen_server:cast(Context#context.pivot_server, {pivot, Id}).
%% @doc Delay the next pivot, useful when performing big updates
-spec pivot_delay(z:context()) -> ok.
pivot_delay(Context) ->
gen_server:cast(Context#context.pivot_server, pivot_delay).
%% @doc Return a modified property list with fields that need immediate pivoting on an update.
pivot_resource_update(Id, UpdateProps, RawProps, Context) ->
z_pivot_rsc_job:pivot_resource_update(Id, UpdateProps, RawProps, Context).
%% @doc Rebuild the search index by queueing all resources for pivot.
queue_all(Context) ->
erlang:spawn(fun() ->
queue_all(0, Context)
end).
queue_all(FromId, Context) ->
case z_db:q("select id from rsc where id > $1 order by id limit 1000", [FromId], Context) of
[] ->
done;
Rs ->
Ids = [ Id || {Id} <- Rs ],
do_insert_queue(Ids, calendar:universal_time(), Context),
queue_all(lists:last(Ids), Context)
end.
%% @doc Return the length of the pivot queue.
-spec queue_count(z:context()) -> integer().
queue_count(Context) ->
z_db:q1("SELECT COUNT(*) FROM rsc_pivot_queue", Context).
%% @doc Insert a rsc_id in the pivot queue
-spec insert_queue(m_rsc:resource_id() | list(m_rsc:resource_id()), z:context()) -> ok | {error, eexist}.
insert_queue(IdorIds, Context) ->
insert_queue(IdorIds, calendar:universal_time(), Context).
%% @doc Insert a rsc_id in the pivot queue for a certain date
-spec insert_queue(m_rsc:resource_id() | list(m_rsc:resource_id()), calendar:datetime(), z:context()) -> ok | {error, eexist}.
insert_queue(Id, DueDate, Context) when is_integer(Id), is_tuple(DueDate) ->
insert_queue([Id], DueDate, Context);
insert_queue(Ids, DueDate, Context) when is_list(Ids), is_tuple(DueDate) ->
gen_server:cast(Context#context.pivot_server, {insert_queue, DueDate, Ids}).
%% @doc Insert a slow running pivot task. For example syncing category numbers after an category update.
-spec insert_task(Module, Function, Context) -> {ok, TaskId} | {error, term()}
when Module :: atom(),
Function :: atom(),
Context :: z:context(),
TaskId :: integer().
insert_task(Module, Function, Context) ->
insert_task_after(undefined, Module, Function, undefined, [], Context).
%% @doc Insert a slow running pivot task. Use the UniqueKey to prevent double queued tasks.
-spec insert_task(Module, Function, UniqueKey, Context) -> {ok, TaskId} | {error, term()}
when Module :: atom(),
Function :: atom(),
UniqueKey :: undefined | binary() | string() | atom(),
Context :: z:context(),
TaskId :: integer().
insert_task(Module, Function, UniqueKey, Context) ->
insert_task_after(undefined, Module, Function, UniqueKey, [], Context).
%% @doc Insert a slow running pivot task with unique key and arguments.
-spec insert_task(Module, Function, UniqueKey, Args, Context) -> {ok, TaskId} | {error, term()}
when Module :: atom(),
Function :: atom(),
UniqueKey :: undefined | binary() | string() | atom(),
Args :: list()
| fun( ( OldDue :: undefined | calendar:datetime(),
OldArgs :: undefined | list(),
NewDue :: calendar:datetime(),
z:context()
) -> {ok, {calendar:datetime(), list()}} | {error, term()} ),
Context :: z:context(),
TaskId :: integer().
insert_task(Module, Function, UniqueKey, Args, Context) ->
insert_task_after(undefined, Module, Function, UniqueKey, Args, Context).
%% @doc Insert a slow running pivot task with unique key and arguments that should start after Seconds seconds.
%% Always delete any existing transaction, to prevent race conditions when the task is running
%% during this insert. The UniqueKey is used to have multiple entries per module/function. If only
%% a single module/function should be queued, then set the UniqueKey to <tt><<>></tt>.
-spec insert_task_after(SecondsOrDate, Module, Function, UniqueKey, Args, Context) -> {ok, TaskId} | {error, term()}
when SecondsOrDate::undefined | integer() | calendar:datetime(),
Module :: atom(),
Function :: atom(),
UniqueKey :: undefined | binary() | string() | atom(),
Args :: list()
| fun( ( OldDue :: undefined | calendar:datetime(),
OldArgs :: undefined | list(),
NewDue :: calendar:datetime(),
z:context()
) -> {ok, {calendar:datetime(), list()}} | {error, term()} ),
Context :: z:context(),
TaskId :: integer().
insert_task_after(SecondsOrDate, Module, Function, UniqueKey, ArgsFun, Context) ->
gen_server:call(
Context#context.pivot_server,
{insert_task_after, SecondsOrDate, Module, Function, UniqueKey, ArgsFun}).
-spec get_task( z:context() ) -> {ok, [ map() ]} | {error, term()}.
get_task(Context) ->
z_db:qmap("
select *
from pivot_task_queue",
Context).
-spec get_task( module(), z:context() ) -> {ok, [ map() ]} | {error, term()}.
get_task(Module, Context) ->
z_db:qmap("
select *
from pivot_task_queue
where module = $1",
[Module],
Context).
-spec get_task( module(), atom(), z:context() ) -> {ok, [ map() ]} | {error, term()}.
get_task(Module, Function, Context) ->
z_db:qmap("
select *
from pivot_task_queue
where module = $1 and function = $2",
[Module, Function],
Context).
-spec get_task( module(), atom(), binary()|string(), z:context() ) -> {ok, map()} | {error, term()}.
get_task(Module, Function, UniqueKey, Context) ->
UniqueKeyBin = z_convert:to_binary(UniqueKey),
z_db:qmap_row("
select *
from pivot_task_queue
where module = $1 and function = $2 and key = $3",
[Module, Function, UniqueKeyBin],
Context).
to_utc_date(undefined) ->
undefined;
to_utc_date(N) when is_integer(N) ->
calendar:gregorian_seconds_to_datetime(
calendar:datetime_to_gregorian_seconds(calendar:universal_time()) + N);
to_utc_date({Y,M,D} = YMD) when is_integer(Y), is_integer(M), is_integer(D) ->
{YMD,{0,0,0}};
to_utc_date({{Y,M,D},{H,I,S}} = Date) when is_integer(Y), is_integer(M), is_integer(D), is_integer(H), is_integer(I), is_integer(S) ->
Date.
-spec delete_task( module(), atom(), z:context() ) -> non_neg_integer().
delete_task(Module, Function, Context) ->
z_db:q("delete from pivot_task_queue where module = $1 and function = $2",
[Module, Function],
Context).
-spec delete_task( module(), atom(), term(), z:context() ) -> non_neg_integer().
delete_task(Module, Function, UniqueKey, Context) ->
UniqueKeyBin = z_convert:to_binary(UniqueKey),
z_db:q("delete from pivot_task_queue where module = $1 and function = $2 and key = $3",
[Module, Function, UniqueKeyBin],
Context).
-spec list_tasks( z:context() ) -> {ok, list( map() )} | {error, term()}.
list_tasks(Context) ->
z_db:qmap("select * from pivot_task_queue", Context).
-spec count_tasks( z:context() ) -> {ok, list( map() )} | {error, term()}.
count_tasks(Context) ->
z_db:qmap("
select module, function, count(*), min(due) as due,
sum(error_count) as error_count_total,
max(error_count) as error_count_max
from pivot_task_queue
group by module, function
order by due", Context).
-spec delete_tasks( z:context() ) -> non_neg_integer().
delete_tasks(Context) ->
z_db:q("delete from pivot_task_queue", Context).
-spec get_pivot_title( m_rsc:resource_id(), z:context() ) -> binary().
get_pivot_title(Id, Context) ->
z_pivot_rsc_job:get_pivot_title(Id, Context).
-spec get_pivot_title( map() ) -> binary().
get_pivot_title(Props) ->
z_pivot_rsc_job:get_pivot_title(Props).
%% @doc Ping from pivot process to keep alive and report progress
-spec pivot_job_ping( m_rsc:resource_id(), z:context() ) -> ok.
pivot_job_ping(Id, Context) ->
gen_server:cast(Context#context.pivot_server, {pivot_ping, self(), Id}).
%% @doc Signal from pivot job that processing is done.
-spec pivot_job_done( z:context() ) -> ok.
pivot_job_done(Context) ->
gen_server:call(Context#context.pivot_server, {pivot_done, self()}).
%% @doc Ping from task process to keep alive and report progress
-spec task_job_ping( TaskId :: integer(), Percentage :: 0..100, z:context() ) -> ok.
task_job_ping(TaskId, Percentage, Context) ->
gen_server:cast(Context#context.pivot_server, {task_ping, self(), TaskId, Percentage}).
%% @doc Signal from task process that job is finished.
-spec task_job_done( TaskId :: integer(), z:context() ) -> ok.
task_job_done(TaskId, Context) ->
gen_server:call(Context#context.pivot_server, {task_done, self(), TaskId}, infinity).
%% @doc Return the language used for stemming the full text index.
%% We use a single stemming to prevent having seperate indexes per language.
-spec stemmer_language(z:context()) -> string().
stemmer_language(Context) ->
z_pivot_rsc_job:stemmer_language(Context).
-spec stemmer_language_config(z:context()) -> atom().
stemmer_language_config(Context) ->
z_pivot_rsc_job:stemmer_language_config(Context).
-spec cleanup_tsv_text(binary()) -> binary().
cleanup_tsv_text(Text) when is_binary(Text) ->
z_pivot_rsc_job:cleanup_tsv_text(Text).
-spec pg_lang(atom()) -> string().
pg_lang(LangCode) ->
z_pivot_rsc_job:pg_lang(LangCode).
-spec pg_lang_extra(atom()) -> string().
pg_lang_extra(LangCode) ->
z_pivot_rsc_job:pg_lang_extra(LangCode).
%%====================================================================
%% API
%%====================================================================
%% @spec start_link(SiteProps) -> {ok,Pid} | ignore | {error,Error}
%% @doc Starts the server
start_link(Site) ->
Name = z_utils:name_for_site(?MODULE, Site),
gen_server:start_link({local, Name}, ?MODULE, Site, []).
%%====================================================================
%% gen_server callbacks
%%====================================================================
%% @spec init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @doc Initiates the server.
init(Site) ->
z_context:logger_md(Site),
logger:set_process_metadata(#{
site => Site,
module => ?MODULE
}),
timer:send_interval(?JOB_CHECK_INTERVAL*1000, job_check),
timer:send_after(?PIVOT_POLL_INTERVAL_SLOW*1000, poll),
{ok, #state{
site = Site,
is_initial_delay = true,
is_pivot_delay = false
}}.
handle_call({task_done, _TaskPid, TaskId}, _From, #state{ task_id = TaskId } = State) ->
% Handle task_done messages from task queue jobs
State1 = State#state {
task_id = undefined,
task_pid = undefined
},
{reply, ok, State1};
handle_call({task_done, _TaskPid, TaskId}, _From, State) ->
?LOG_ERROR(#{
text => <<"Pivot received 'task_done' from unknown task job">>,
in => zotonic_core,
result => error,
reason => unknown_task_job,
task_id => TaskId
}),
{reply, {error, unknown_task}, State};
handle_call({pivot_done, PivotPid}, _From, #state{ pivot_pid = PivotPid } = State) ->
{reply, ok, State#state{ pivot_pid = undefined }};
handle_call({pivot_done, PivotPid}, _From, State) ->
?LOG_ERROR(#{
text => <<"Pivot received 'pivot_done' from unknown pivot job">>,
in => zotonic_core,
result => error,
reason => unknown_pivot_job,
pid => PivotPid
}),
{reply, {error, unknown_pivot}, State};
handle_call({insert_task_after, SecondsOrDate, Module, Function, UniqueKey, ArgsFun}, _From, State) ->
Context = z_context:new(State#state.site),
Result = do_insert_task_after(SecondsOrDate, Module, Function, UniqueKey, ArgsFun, Context),
{reply, Result, State};
handle_call(status, _From, State) ->
Status = #{
site => State#state.site,
is_initial_delay => State#state.is_initial_delay,
is_pivot_delay => State#state.is_pivot_delay,
task_pid => State#state.task_pid,
task_id => State#state.task_id,
task_progress => State#state.task_progress,
pivot_pid => State#state.pivot_pid,
pivot_rsc_id => State#state.pivot_rsc_id
},
{reply, {ok, Status}, State};
%% @doc Trap unknown calls
handle_call(Message, _From, State) ->
{stop, {unknown_call, Message}, State}.
handle_cast(poll, #state{is_initial_delay=true} = State) ->
% Starting up - wait
{noreply, State};
handle_cast(poll, State) ->
% Poll the pivot queue
try
State1 = do_poll(State),
{noreply, State1}
catch
Type:Err:Stack ->
?LOG_ERROR(#{
text => <<"Pivot poll error">>,
in => zotonic_core,
result => Type,
reason => Err,
stack => Stack
}),
{noreply, State#state{ backoff_counter = ?BACKOFF_POLL_ERROR }}
end;
handle_cast({insert_queue, DueDate, Ids}, State) when is_list(Ids) ->
% Insert an id into the queue.
do_insert_queue(Ids, DueDate, z_context:new(State#state.site)),
z_utils:flush_message({'$gen_cast', {insert_queue, DueDate, Ids}}),
{noreply, State};
handle_cast({pivot, Id}, #state{ is_initial_delay = true } = State) when is_integer(Id) ->
% Immediate pivot of an resource-id
Due = z_datetime:next_minute(calendar:universal_time()),
do_insert_queue([ Id ], Due, z_context:new(State#state.site)),
{noreply, State};
handle_cast({pivot, Id}, #state{ backoff_counter = Ct } = State) when Ct > 0 ->
Due = z_datetime:next_minute(calendar:universal_time()),
do_insert_queue([ Id ], Due, z_context:new(State#state.site)),
{noreply, State};
handle_cast({pivot, Id}, State) when is_integer(Id) ->
State1 = State#state{ pivot_queue = [ Id | State#state.pivot_queue ]},
State2 = do_poll(State1),
{noreply, State2};
handle_cast({pivot_ping, Pid, Id}, #state{ pivot_pid = Pid } = State) ->
State1 = State#state{
pivot_ping = os:timestamp(),
pivot_rsc_id = Id
},
{noreply, State1};
handle_cast({pivot_ping, Pid, Id}, State) ->
?LOG_NOTICE(#{
text => <<"Pivot ping from unknown process">>,
in => zotonic_core,
result => error,
reason => unknown_process,
pid => Pid,
rsc_id => Id,
expected_pid => State#state.pivot_pid
}),
{noreply, State};
handle_cast({task_ping, _Pid, TaskId, Percentage}, #state{ task_id = TaskId } = State) ->
State1 = State#state{
task_ping = os:timestamp(),
task_progress = Percentage
},
{noreply, State1};
handle_cast({task_ping, Pid, TaskId, _Percentage}, State) ->
?LOG_NOTICE(#{
text => <<"Task ping for wrong task id">>,
in => zotonic_core,
result => error,
reason => wrong_task_id,
pid => Pid,
task_id => TaskId,
expected_task_id => State#state.task_id
}),
{noreply, State};
handle_cast(pivot_delay, State) ->
% Delay the next pivot, useful when performing big updates
{noreply, State#state{is_pivot_delay=true}};
handle_cast(Message, State) ->
{stop, {unknown_cast, Message}, State}.
%% @doc Handling all non call/cast messages
handle_info(poll, #state{ is_pivot_delay = true } = State) ->
timer:send_after(?PIVOT_POLL_INTERVAL_SLOW*1000, poll),
{noreply, State#state{ is_pivot_delay = false}};
handle_info(poll, #state{backoff_counter = Ct} = State) when Ct > 0 ->
timer:send_after(?PIVOT_POLL_INTERVAL_SLOW*1000, poll),
{noreply, State#state{ backoff_counter = Ct - 1 }};
handle_info(poll, #state{ pivot_pid = Pid } = State) when is_pid(Pid) ->
?LOG_INFO(#{
text => <<"Pivot job still running, delaying next poll">>,
in => zotonic_core,
pivot_pid => Pid,
reason => busy
}),
timer:send_after(?PIVOT_POLL_INTERVAL_SLOW*1000, poll),
{noreply, State};
handle_info(poll, #state{ site = Site } = State) ->
case z_sites_manager:get_site_status(Site) of
{ok, running} ->
?LOG_DEBUG(#{
text => <<"Pivot poll">>,
in => zotonic_core
}),
try
State1 = do_poll(State),
IsPivoting = is_pid(State1#state.pivot_pid)
orelse is_pid(State1#state.task_pid),
case IsPivoting of
true -> timer:send_after(?PIVOT_POLL_INTERVAL_FAST*1000, poll);
false -> timer:send_after(?PIVOT_POLL_INTERVAL_SLOW*1000, poll)
end,
{noreply, State1#state{ is_initial_delay = false }}
catch
Type:Err:Stack ->
?LOG_ERROR(#{
text => <<"Pivot error">>,
in => zotonic_core,
result => Type,
reason => Err,
stack => Stack
}),
timer:send_after(?PIVOT_POLL_INTERVAL_SLOW*1000, poll),
{noreply, State#state{ backoff_counter = ?BACKOFF_POLL_ERROR }}
end;
_ ->
timer:send_after(?PIVOT_POLL_INTERVAL_SLOW*1000, poll),
{noreply, State#state{ is_initial_delay = true }}
end;
handle_info({'DOWN', _MRef, process, _Pid, _Reason}, #state{ pivot_pid = undefined, task_pid = undefined } = State) ->
{noreply, State};
handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{ pivot_pid = Pid } = State) ->
LastRscId = State#state.pivot_rsc_id,
?LOG_ERROR(#{
text => <<"Pivot received DOWN from pivot job">>,
in => zotonic_core,
rsc_id => LastRscId,
result => 'DOWN',
reason => Reason,
pivot_pid => Pid
}),
Context = z_context:new(State#state.site),
z_db:q("
delete from rsc_pivot_queue
where rsc_id = $1
",
[ LastRscId ],
Context),
{noreply, State#state{
pivot_pid = undefined,
pivot_rsc_id = undefined,
backoff_counter = ?BACKOFF_POLL_ERROR
}};
handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{ task = Task, task_pid = Pid } = State) ->
Context = z_context:new(State#state.site),
z_pivot_rsc_task_job:maybe_schedule_retry(Task, 'DOWN', Reason, [], Context),
{noreply, State#state{ task_id = undefined, task_pid = undefined }};
handle_info(job_check, State) ->
check_pivot_job(State),
check_task_job(State),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
terminate(_Reason, _State) ->
ok.
%% @doc Convert process state when code is changed
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%====================================================================
%% support functions
%%====================================================================
check_pivot_job(#state{ pivot_pid = undefined }) ->
ok;
check_pivot_job(#state{ pivot_pid = Pid, pivot_ping = LastPing } = State) ->
Now = z_datetime:timestamp(),
Ping = z_datetime:datetime_to_timestamp(calendar:now_to_universal_time(LastPing)),
Timeout = Ping + ?PIVOT_JOB_TIMEOUT,
case Now > Timeout of
true ->
?LOG_ERROR(#{
text => <<"Pivot job timeout, killing pivot job">>,
in => zotonic_core,
reason => timeout,
pivot_pid => Pid,
rsc_id => State#state.pivot_rsc_id
}),
erlang:exit(Pid, timeout);
false ->
ok
end.
check_task_job(#state{ task_pid = undefined }) ->
ok;
check_task_job(#state{ task_pid = Pid, task_id = TaskId, task_ping = LastPing } = State) ->
Now = z_datetime:timestamp(),
Ping = z_datetime:datetime_to_timestamp(calendar:now_to_universal_time(LastPing)),
Timeout = Ping + ?TASK_JOB_TIMEOUT,
case Now > Timeout of
true ->
?LOG_ERROR(#{
text => <<"Task job timeout, killing task for later retry">>,
in => zotonic_core,
reason => timeout,
task_id => TaskId,
task_pid => Pid,
progress => State#state.task_progress
}),
% Force exit - the monitor will increment the task error count
% and then restart the task after a backoff.
erlang:exit(Pid, timeout);
false ->
ok
end.
do_insert_task_after(SecondsOrDate, Module, Function, undefined, ArgsFun, Context) ->
UniqueKey = z_ids:id(),
do_insert_task_after(SecondsOrDate, Module, Function, UniqueKey, ArgsFun, Context);
do_insert_task_after(SecondsOrDate, Module, Function, UniqueKey, ArgsFun, Context) when is_function(ArgsFun) ->
Due = to_utc_date(SecondsOrDate),
UniqueKeyBin = z_convert:to_binary(UniqueKey),
z_db:transaction(
fun(Ctx) ->
OldTask = z_db:q_row("
select props, due
from pivot_task_queue
where module = $1
and function = $2
and key = $3
limit 1
for update",
[ Module, Function, UniqueKeyBin ],
Ctx),
New = case OldTask of
{OldProps, OldDue} ->
OldArgs = get_args(OldProps),
ArgsFun(OldDue, OldArgs, Due, Ctx);
undefined ->
ArgsFun(undefined, undefined, Due, Ctx)
end,
case New of
{ok, {NewDue, NewArgs}} ->
case OldTask of
undefined -> ok;
{_, _} ->
_ = z_db:q("
delete from pivot_task_queue
where module = $1
and function = $2
and key = $3",
[ Module, Function, UniqueKeyBin ],
Ctx)
end,
Fields = #{
<<"module">> => Module,
<<"function">> => Function,
<<"key">> => UniqueKeyBin,
<<"args">> => NewArgs,
<<"due">> => NewDue
},
z_db:insert(pivot_task_queue, Fields, Ctx);
{error, _} = Error ->
Error
end
end,
Context);
do_insert_task_after(SecondsOrDate, Module, Function, UniqueKey, Args, Context) ->
Due = to_utc_date(SecondsOrDate),
UniqueKeyBin = z_convert:to_binary(UniqueKey),
z_db:transaction(
fun(Ctx) ->
_ = z_db:q("
delete from pivot_task_queue
where module = $1
and function = $2
and key = $3",
[ Module, Function, UniqueKeyBin ],
Ctx),
Fields = #{
<<"module">> => Module,
<<"function">> => Function,
<<"key">> => UniqueKeyBin,
<<"args">> => Args,
<<"due">> => Due
},
z_db:insert(pivot_task_queue, Fields, Ctx)
end,
Context).
%% @doc Insert a list of ids into the pivot queue.
do_insert_queue(Ids, DueDate, Context) when is_list(Ids) ->
F = fun(Ctx) ->
z_db:q("lock table rsc_pivot_queue in share row exclusive mode", Ctx),
lists:foreach(
fun(Id) ->
case z_db:q1("select id from rsc where id = $1", [Id], Ctx) of
Id ->
case z_db:q("
update rsc_pivot_queue
set serial = serial + 1,
due = $2
where rsc_id = $1",
[ Id, DueDate ],
Ctx)
of
1 -> ok;
0 ->
z_db:q("
insert into rsc_pivot_queue (rsc_id, due, is_update)
select id, $2, true from rsc where id = $1",
[ Id, DueDate ], Ctx)
end;
undefined ->
ok
end
end,
Ids)
end,
case z_db:transaction(F, Context) of
ok ->
ok;
{rollback, Reason} ->
?LOG_ERROR(#{
text => <<"Rollback during pivot queue insert">>,
in => zotonic_core,
reason => Reason
}),
timer:apply_after(100, ?MODULE, insert_queue, [Ids, DueDate, Context]);
{error, Reason} ->
?LOG_ERROR(#{
text => <<"Error during pivot queue insert">>,
in => zotonic_core,
reason => Reason
})
end.
%% @doc Poll a database for any queued updates.
-spec do_poll( #state{} ) -> #state{}.
do_poll(State) ->
Context = z_acl:sudo( z_context:new(State#state.site) ),
State1 = maybe_start_task(State, Context),
maybe_start_pivot(State1, Context).
maybe_start_pivot(#state{ pivot_pid = Pid } = State, _Context) when is_pid(Pid) ->
State;
maybe_start_pivot(#state{ pivot_queue = Queue } = State, Context) ->
Qs = do_poll_queue(Context),
Qs1 = lists:foldl(
fun(Id, Acc) ->
case lists:keymember(Id, 1, Acc) of
true ->
Acc;
false ->
OptSerial = fetch_queue_id(Id, Context),
[ {Id, OptSerial} | Acc ]
end
end,
Qs,
Queue),
case Qs1 of
[] ->
State;
_ ->
case z_pivot_rsc_job:start_pivot(Qs1, Context) of
{ok, Pid} ->
erlang:monitor(process, Pid),
State#state{
pivot_pid = Pid,
pivot_queue = [],
pivot_ping = os:timestamp(),
pivot_rsc_id = undefined
};
{error, _} ->
% overload - ignore
State
end
end.
do_poll_queue(Context) ->
try
fetch_queue(Context)
catch
exit:{timeout, _} ->
[];
throw:{error, econnrefused} ->
[]
end.
maybe_start_task(#state{ task_pid = undefined } = State, Context) ->
case poll_task(Context) of
{ok, #{ task_id := TaskId } = Task} ->
case z_pivot_rsc_task_job:start_task(Task, Context) of
{ok, TaskPid} ->
erlang:monitor(process, TaskPid),
State#state{
task_id = TaskId,
task_pid = TaskPid,
task_ping = os:timestamp(),
task = Task
};
{error, _} ->
State
end;
{error, enoent} ->
State;
{error, nodb} ->
State;
{error, Reason} ->
?LOG_ERROR(#{
text => "Pivot could not check task queue",
in => zotonic_core,
reason => Reason
}),
State
end;
maybe_start_task(State, _Context) ->
State.
%% @doc Fetch the next task uit de task queue, if any.
poll_task(Context) ->
case z_db:qmap_row("
select id, module, function, props, error_count
from pivot_task_queue
where due is null
or due < current_timestamp
order by due asc
limit 1",
[],
[ {keys, atom} ],
Context)
of
{ok, #{
id := Id,
module := Module,
function := Function,
props := Props,
error_count := ErrCt
}} ->
Args = get_args(Props),
{ok, #{
task_id => Id,
mfa => {z_convert:to_atom(Module), z_convert:to_atom(Function), Args},
error_count => ErrCt
}};
{error, _} = Error ->
Error
end.
get_args(Props) when is_map(Props) ->
maps:get(<<"args">>, Props, []);
get_args(Props) when is_list(Props) ->
% deprecated task queue entries
proplists:get_value(args, Props, []);
get_args(undefined) ->
[].
%% @doc Fetch the next batch of ids from the queue. Remembers the serials, as a new
%% pivot request might come in while we are pivoting.
-spec fetch_queue( z:context() ) -> [ { Id::m_rsc:resource_id(), Serial::integer() }].
fetch_queue(Context) ->
z_db:q("
select rsc_id, serial
from rsc_pivot_queue
where due < current_timestamp - '10 second'::interval
order by is_update, due
limit $1", [?POLL_BATCH], Context).
%% @doc Fetch the serial of the id's queue record
-spec fetch_queue_id( m_rsc:resource_id(), z:context() ) -> Serial::integer() | undefined.
fetch_queue_id(Id, Context) ->
z_db:q1("select serial from rsc_pivot_queue where rsc_id = $1", [Id], Context).
%% @spec define_custom_pivot(Module, columns(), Context) -> ok
%% @doc Let a module define a custom pivot
%% columns() -> [column()]
%% column() -> {ColumName::atom(), ColSpec::string()} | {atom(), string(), options::list()}
define_custom_pivot(Module, Columns, Context) ->
TableName = "pivot_" ++ z_convert:to_list(Module),
case z_db:table_exists(TableName, Context) of
true ->
% Compare column names to see if table needs an update
DbColumns = [ Name || #column_def{name=Name} <- z_db:columns(TableName, Context), not(Name == id)],
SpecColumns = lists:map(
fun(ColumnDef) ->
[Name|_] = tuple_to_list(ColumnDef),
Name
end,
Columns
),
case lists:usort(SpecColumns) == lists:usort(DbColumns) of
false ->
z_db:drop_table(TableName, Context),
define_custom_pivot(Module, Columns, Context);
true ->
ok
end;
false ->
ok = z_db:transaction(
fun(Ctx) ->
Fields = custom_columns(Columns),
Sql = "CREATE TABLE " ++ TableName ++ "(" ++
"id int NOT NULL," ++ Fields ++ " primary key(id))",
[] = z_db:q(lists:flatten(Sql), Ctx),
[] = z_db:q("ALTER TABLE " ++ TableName ++
" ADD CONSTRAINT fk_" ++ TableName ++ "_id " ++
" FOREIGN KEY (id) REFERENCES rsc(id) ON UPDATE CASCADE ON DELETE CASCADE", Ctx),
Indexable = lists:filter(fun({_,_}) -> true;
({_,_,Opts}) -> not lists:member(noindex, Opts)
end,
Columns),
Idx = [
begin
K = element(1,Col),
"CREATE INDEX " ++ TableName ++ "_" ++ z_convert:to_list(K) ++ "_key ON "
++ TableName ++ "(" ++ z_convert:to_list(K) ++ ")"
end
|| Col <- Indexable
],
lists:foreach(
fun(Sql1) ->
[] = z_db:q(Sql1, Ctx)
end,
Idx),
ok
end,
Context),
z_db:flush(Context),
ok
end.
custom_columns(Cols) ->
custom_columns(Cols, []).
custom_columns([], Acc) ->
lists:reverse(Acc);
custom_columns([{Name, Spec}|Rest], Acc) ->
custom_columns(Rest, [ [z_convert:to_list(Name), " ", Spec, ","] | Acc]);
custom_columns([{Name, Spec, _Opts}|Rest], Acc) ->
custom_columns(Rest, [ [z_convert:to_list(Name), " ", Spec, ","] | Acc]).
%% @doc Lookup a custom pivot; give back the Id based on a column. Will always return the first Id found.
%% @spec lookup_custom_pivot(Module, Column, Value, Context) -> Id | undefined
lookup_custom_pivot(Module, Column, Value, Context) ->
TableName = "pivot_" ++ z_convert:to_list(Module),
Column1 = z_convert:to_list(Column),
Query = "SELECT id FROM " ++ TableName ++ " WHERE " ++ Column1 ++ " = $1",
case z_db:q(Query, [Value], Context) of
[] -> undefined;
[{Id}|_] -> Id
end.