%% @author Marc Worrell <marc@worrell.nl>
%% @copyright 2009-2026 Marc Worrell, Maas-Maarten Zeeman
%% @doc Pivoting server for the rsc table. Takes care of full text indices. Polls the pivot queue for any changed resources.
%% @end
%% Copyright 2009-2026 Marc Worrell, Maas-Maarten Zeeman
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
-module(z_pivot_rsc).
-author("Marc Worrell <marc@worrell.nl").
-behaviour(gen_server).
%% gen_server exports
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export([start_link/1]).
%% interface functions
-export([
poll/1,
status/1,
pivot/2,
pivot_delay/1,
pivot_resource_update/4,
queue_all/1,
queue_count/1,
queue_count_backlog/1,
insert_queue/2,
get_pivot_title/1,
get_pivot_title/2,
insert_task/3,
insert_task/4,
insert_task/5,
insert_task_after/6,
get_task/1,
get_task/2,
get_task/3,
get_task/4,
delete_task/3,
delete_task/4,
list_tasks/1,
count_tasks/1,
delete_tasks/1,
task_job_ping/3,
task_job_done/2,
pivot_job_ping/2,
pivot_job_done/2,
publish_task_event/5,
stemmer_language/1,
stemmer_language_config/1,
cleanup_tsv_text/1,
pg_lang/1,
pg_lang_extra/1,
% get_pivot_data/2,
define_custom_pivot/3,
lookup_custom_pivot/4,
insert_queue/3
]).
-include("zotonic.hrl").
% -include_lib("epgsql/include/epgsql.hrl").
% Interval (in seconds) to check if there are any items to be pivoted.
-define(PIVOT_POLL_INTERVAL_FAST, 2).
-define(PIVOT_POLL_INTERVAL_SLOW, 20).
% Interval to check for stuck pivot or task jobs (seconds)
-define(JOB_CHECK_INTERVAL, 10*60).
% Timeouts for pivot and tasks, killed if didn't ping in this number of seconds.
-define(PIVOT_JOB_TIMEOUT, 20*60). % 20 minutes for pivots
-define(TASK_JOB_TIMEOUT, 2*60*60). % 2 hours for tasks (might be shorter in future)
% How many PIVOT_POLL_INTERVAL_SLOW we will skip on SQL errors (like timeouts)
-define(BACKOFF_POLL_ERROR, 4).
% Number of queued ids taken from the queue at one go
-define(POLL_BATCH, 50).
-record(state, {
site :: atom(),
is_env_backup = false :: boolean(),
is_initial_delay = true :: boolean(),
is_pivot_delay = false :: boolean(),
backoff_counter = 0 :: integer(),
poll_timer :: timer:tref() | undefined,
task_pid :: undefined | pid(),
task_id :: undefined | integer(),
task_progress :: undefined | 0..100,
task_ping :: undefined | erlang:timestamp(),
task :: undefined | map(),
pivot_pid :: undefined | pid(),
pivot_rsc_id :: undefined | m_rsc:resource_id(),
pivot_queue = [] :: [ m_rsc:resource_id() ],
pivot_queue_inflight = [] :: [ m_rsc:resource_id() ],
pivot_queue_inflight_date :: undefined | calendar:datetime(),
pivot_inflight_date :: undefined | calendar:datetime(),
pivot_ping :: undefined | erlang:timestamp()
}).
-type task_key() :: undefined | binary() | string() | atom() | integer().
-type task_return() :: ok
| {delay, task_delay()}
| {delay, task_delay(), NewArgs :: list()}
| any().
-type task_delay() :: non_neg_integer()
| calendar:datetime()
| undefined.
-export_type([
task_key/0,
task_return/0,
task_delay/0
]).
%% @doc Poll the pivot queue for the database in the context
-spec poll( z:context() ) -> ok.
poll(Context) ->
gen_server:cast(Context#context.pivot_server, poll).
-spec status( z:context() ) -> {ok, map()} | {error, term()}.
status(Context) ->
gen_server:call(Context#context.pivot_server, status).
%% @doc An immediate pivot request for a resource
-spec pivot(Id, Context) -> ok when
Id :: m_rsc:resource_id(),
Context :: z:context().
pivot(Id, Context) ->
gen_server:cast(Context#context.pivot_server, {pivot, Id}).
%% @doc Delay the next pivot, useful when performing big updates
-spec pivot_delay(z:context()) -> ok.
pivot_delay(Context) ->
gen_server:cast(Context#context.pivot_server, pivot_delay).
%% @doc Return a modified property list with fields that need immediate pivoting on an update.
-spec pivot_resource_update(Id, UpdateProps, RawProps, Context) -> UpdateProps1 when
Id :: m_rsc:resource_id(),
UpdateProps :: m_rsc:props(),
RawProps :: m_rsc:props(),
Context :: z:context(),
UpdateProps1 :: m_rsc:props().
pivot_resource_update(Id, UpdateProps, RawProps, Context) ->
z_pivot_rsc_job:pivot_resource_update(Id, UpdateProps, RawProps, Context).
%% @doc Rebuild the search index by queueing all resources for pivot.
-spec queue_all(Context) -> ok when
Context :: z:context().
queue_all(Context) ->
?LOG_INFO(#{
in => zotonic_mod_search,
text => <<"Pivot: queueing all resources for repivot - start">>
}),
Max = z_db:q1("select max(id) from rsc", Context),
z_proc:spawn_md(fun() ->
queue_all_1(Max+1, Context)
end),
ok.
queue_all_1(ToId, Context) ->
case z_db:q("
insert into rsc_pivot_log (rsc_id, priority, is_update)
select id, 2, false
from rsc
where id < $1
order by id desc
limit 10000
returning rsc_id",
[ ToId ],
Context)
of
[] ->
?LOG_INFO(#{
in => zotonic_mod_search,
text => <<"Pivot: queueing all resources for repivot - queued">>
}),
done;
Rs ->
Ids = [ Id || {Id} <- Rs ],
queue_all_1(lists:min(Ids), Context)
end.
%% @doc Return the length of the pivot queue.
-spec queue_count(Context) -> QueueLength when
Context :: z:context(),
QueueLength :: non_neg_integer().
queue_count(Context) ->
z_db:q1("SELECT COUNT(distinct rsc_id) FROM rsc_pivot_log", Context).
%% @doc Return the number of pivot queue items scheduled for direct pivot.
-spec queue_count_backlog(Context) -> BacklogLength when
Context :: z:context(),
BacklogLength :: non_neg_integer().
queue_count_backlog(Context) ->
z_db:q1("
select count(distinct rsc_id)
from rsc_pivot_log
where due < current_timestamp", Context).
%% @doc Insert a rsc_id in the pivot queue
-spec insert_queue(IdOrIds, Context) -> ok when
IdOrIds :: m_rsc:resource_id() | list( m_rsc:resource_id() ),
Context :: z:context().
insert_queue(IdorIds, Context) ->
insert_queue(IdorIds, calendar:universal_time(), Context).
%% @doc Insert a rsc_id in the pivot queue for a certain date
-spec insert_queue(IdOrIds, DueDate, Context) -> ok when
IdOrIds :: m_rsc:resource_id() | list( m_rsc:resource_id() ),
DueDate :: calendar:datetime(),
Context :: z:context().
insert_queue(Id, DueDate, Context) when is_integer(Id), is_tuple(DueDate) ->
insert_queue([Id], DueDate, Context);
insert_queue(Ids, DueDate, Context) when is_list(Ids), is_tuple(DueDate) ->
gen_server:cast(Context#context.pivot_server, {insert_queue, DueDate, Ids}).
%% @doc Insert a slow running pivot task. For example syncing category numbers after an category update.
-spec insert_task(Module, Function, Context) -> {ok, TaskId} | {error, term()}
when Module :: atom(),
Function :: atom(),
Context :: z:context(),
TaskId :: integer().
insert_task(Module, Function, Context) ->
insert_task_after(undefined, Module, Function, undefined, [], Context).
%% @doc Insert a slow running pivot task. Use the UniqueKey to prevent double queued tasks for
%% the same module:function.
-spec insert_task(Module, Function, UniqueKey, Context) -> {ok, TaskId} | {error, term()}
when Module :: atom(),
Function :: atom(),
UniqueKey :: task_key(),
Context :: z:context(),
TaskId :: integer().
insert_task(Module, Function, UniqueKey, Context) ->
insert_task_after(undefined, Module, Function, UniqueKey, [], Context).
%% @doc Insert a slow running pivot task with unique key and arguments. The key is unique
%% for the module:function.
-spec insert_task(Module, Function, UniqueKey, Args, Context) -> {ok, TaskId} | {error, term()}
when Module :: atom(),
Function :: atom(),
UniqueKey :: task_key(),
Args :: list()
| fun( ( OldDue :: undefined | calendar:datetime(),
OldArgs :: undefined | list(),
NewDue :: calendar:datetime(),
z:context()
) -> {ok, {calendar:datetime(), list()}} | {error, term()} ),
Context :: z:context(),
TaskId :: integer().
insert_task(Module, Function, UniqueKey, Args, Context) ->
insert_task_after(undefined, Module, Function, UniqueKey, Args, Context).
%% @doc Insert a slow running pivot task with unique key and arguments that should start after Seconds seconds.
%% Always delete any existing transaction, to prevent race conditions when the task is running
%% during this insert. The UniqueKey is used to have multiple entries per module/function. If only
%% a single module/function should be queued, then set the UniqueKey to ``<<>>''.
-spec insert_task_after(Delay, Module, Function, UniqueKey, Args, Context) -> {ok, TaskId} | {error, term()}
when Delay:: task_delay(),
Module :: atom(),
Function :: atom(),
UniqueKey :: task_key(),
Args :: list()
| fun( ( OldDue :: undefined | calendar:datetime(),
OldArgs :: undefined | list(),
NewDue :: calendar:datetime(),
z:context()
) -> {ok, {calendar:datetime(), list()}} | {error, term()} ),
Context :: z:context(),
TaskId :: integer().
insert_task_after(Delay, Module, Function, UniqueKey, ArgsFun, Context) ->
gen_server:call(
Context#context.pivot_server,
{insert_task_after, Delay, Module, Function, UniqueKey, ArgsFun}).
-spec get_task( z:context() ) -> {ok, [ map() ]} | {error, term()}.
get_task(Context) ->
z_db:qmap("
select *
from pivot_task_queue",
Context).
-spec get_task( module(), z:context() ) -> {ok, [ map() ]} | {error, term()}.
get_task(Module, Context) ->
z_db:qmap("
select *
from pivot_task_queue
where module = $1",
[Module],
Context).
-spec get_task( module(), atom(), z:context() ) -> {ok, [ map() ]} | {error, term()}.
get_task(Module, Function, Context) ->
z_db:qmap("
select *
from pivot_task_queue
where module = $1 and function = $2",
[Module, Function],
Context).
-spec get_task( module(), atom(), task_key(), z:context() ) -> {ok, map()} | {error, term()}.
get_task(Module, Function, UniqueKey, Context) ->
UniqueKeyBin = z_convert:to_binary(UniqueKey),
z_db:qmap_row("
select *
from pivot_task_queue
where module = $1 and function = $2 and key = $3",
[Module, Function, UniqueKeyBin],
Context).
to_utc_date(undefined) ->
undefined;
to_utc_date(N) when is_integer(N) ->
calendar:gregorian_seconds_to_datetime(
calendar:datetime_to_gregorian_seconds(calendar:universal_time()) + N);
to_utc_date({Y,M,D} = YMD) when is_integer(Y), is_integer(M), is_integer(D) ->
{YMD,{0,0,0}};
to_utc_date({{Y,M,D},{H,I,S}} = Date) when is_integer(Y), is_integer(M), is_integer(D), is_integer(H), is_integer(I), is_integer(S) ->
Date.
-spec delete_task( module(), atom(), z:context() ) -> non_neg_integer().
delete_task(Module, Function, Context) ->
case z_db:q1("delete from pivot_task_queue where module = $1 and function = $2",
[Module, Function],
Context)
of
0 ->
0;
N ->
publish_task_event(delete, Module, Function, undefined, Context),
N
end.
-spec delete_task( module(), atom(), task_key(), z:context() ) -> non_neg_integer().
delete_task(Module, Function, UniqueKey, Context) ->
UniqueKeyBin = z_convert:to_binary(UniqueKey),
case z_db:q1("delete from pivot_task_queue where module = $1 and function = $2 and key = $3",
[Module, Function, UniqueKeyBin],
Context)
of
0 ->
0;
N ->
publish_task_event(delete, Module, Function, undefined, Context),
N
end.
-spec list_tasks( z:context() ) -> {ok, list( map() )} | {error, term()}.
list_tasks(Context) ->
z_db:qmap("select * from pivot_task_queue", Context).
-spec count_tasks( z:context() ) -> {ok, list( map() )} | {error, term()}.
count_tasks(Context) ->
z_db:qmap("
select module, function, count(*), min(due) as due,
sum(error_count) as error_count_total,
max(error_count) as error_count_max
from pivot_task_queue
group by module, function
order by due", Context).
-spec delete_tasks( z:context() ) -> non_neg_integer().
delete_tasks(Context) ->
case z_db:q1("delete from pivot_task_queue", Context) of
0 ->
0;
N ->
publish_task_event(delete, <<"*">>, <<"*">>, undefined, Context),
N
end.
-spec get_pivot_title( m_rsc:resource_id(), z:context() ) -> binary().
get_pivot_title(Id, Context) ->
z_pivot_rsc_job:get_pivot_title(Id, Context).
-spec get_pivot_title( map() ) -> binary().
get_pivot_title(Props) ->
z_pivot_rsc_job:get_pivot_title(Props).
%% @doc Ping from pivot process to keep alive and report progress
-spec pivot_job_ping( m_rsc:resource_id(), z:context() ) -> ok.
pivot_job_ping(Id, Context) ->
gen_server:cast(Context#context.pivot_server, {pivot_ping, self(), Id}).
%% @doc Signal from pivot job that processing is done.
-spec pivot_job_done(IdsOrError, Context) -> ok when
IdsOrError :: list( m_rsc:resource_id() ) | error,
Context :: z:context().
pivot_job_done(IdsOrError, Context) ->
gen_server:call(Context#context.pivot_server, {pivot_done, self(), IdsOrError}, infinity).
%% @doc Ping from task process to keep alive and report progress
-spec task_job_ping( TaskId :: integer(), Percentage :: 0..100, z:context() ) -> ok.
task_job_ping(TaskId, Percentage, Context) ->
gen_server:cast(Context#context.pivot_server, {task_ping, self(), TaskId, Percentage}).
%% @doc Signal from task process that job is finished.
-spec task_job_done( TaskId :: integer(), z:context() ) -> ok.
task_job_done(TaskId, Context) ->
gen_server:call(Context#context.pivot_server, {task_done, self(), TaskId}, infinity).
%% @doc Return the language used for stemming the full text index.
%% We use a single stemming to prevent having seperate indexes per language.
-spec stemmer_language(z:context()) -> string().
stemmer_language(Context) ->
z_pivot_rsc_job:stemmer_language(Context).
-spec stemmer_language_config(z:context()) -> atom().
stemmer_language_config(Context) ->
z_pivot_rsc_job:stemmer_language_config(Context).
-spec cleanup_tsv_text(binary()) -> binary().
cleanup_tsv_text(Text) when is_binary(Text) ->
z_pivot_rsc_job:cleanup_tsv_text(Text).
-spec pg_lang(atom()) -> string().
pg_lang(LangCode) ->
z_pivot_rsc_job:pg_lang(LangCode).
-spec pg_lang_extra(atom()) -> string().
pg_lang_extra(LangCode) ->
z_pivot_rsc_job:pg_lang_extra(LangCode).
%%====================================================================
%% API
%%====================================================================
-spec start_link(term()) -> {ok, pid()} | ignore | {error, term()}.
%% @doc Starts the server
start_link(Site) ->
Name = z_utils:name_for_site(?MODULE, Site),
gen_server:start_link({local, Name}, ?MODULE, Site, []).
%%====================================================================
%% gen_server callbacks
%%====================================================================
-spec init(term()) -> {ok, term()} | {ok, term(), timeout() | hibernate} | ignore | {stop, term()}.
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% @doc Initiates the server.
init(Site) ->
z_context:logger_md(Site),
logger:set_process_metadata(#{
site => Site,
module => ?MODULE
}),
{IsEnvBackup, PollTimer} = case m_site:environment(Site) of
backup ->
{true, undefined};
_Env ->
timer:send_interval(?JOB_CHECK_INTERVAL*1000, job_check),
{ok, TRefPoll} = timer:send_after(?PIVOT_POLL_INTERVAL_SLOW*1000, poll),
{false, TRefPoll}
end,
{ok, #state{
site = Site,
is_env_backup = IsEnvBackup,
poll_timer = PollTimer,
is_initial_delay = true,
is_pivot_delay = false
}}.
handle_call({task_done, _TaskPid, TaskId}, _From, #state{ task_id = TaskId } = State) ->
% Handle task_done messages from task queue jobs
State1 = State#state {
task_id = undefined,
task_pid = undefined
},
self() ! poll, % Request another poll, there could be more due tasks on the queue.
{reply, ok, State1};
handle_call({task_done, _TaskPid, TaskId}, _From, State) ->
?LOG_ERROR(#{
text => <<"Pivot received 'task_done' from unknown task job">>,
in => zotonic_core,
result => error,
reason => unknown_task_job,
task_id => TaskId
}),
{reply, {error, unknown_task}, State};
handle_call({pivot_done, PivotPid, error}, _From, #state{ pivot_pid = PivotPid } = State) ->
% Error during pivot - keep the pivot log queue as is.
{reply, ok, State#state{
pivot_pid = undefined,
pivot_queue_inflight = [],
pivot_queue_inflight_date = undefined,
pivot_inflight_date = undefined
}};
handle_call({pivot_done, PivotPid, Ids}, _From, #state{ pivot_pid = PivotPid, site = Site } = State) ->
% Signal that ids are pivoted, delete all entries before the cut off date.
Context = z_context:new(Site),
{Prio, Normal} = lists:partition(
fun(Id) -> lists:member(Id, State#state.pivot_queue_inflight) end,
Ids),
delete_queue(Prio, State#state.pivot_queue_inflight_date, Context),
delete_queue(Normal, State#state.pivot_inflight_date, Context),
{reply, ok, State#state{
pivot_pid = undefined,
pivot_queue_inflight = [],
pivot_queue_inflight_date = undefined,
pivot_inflight_date = undefined
}};
handle_call({pivot_done, PivotPid, _IdsOrError}, _From, State) ->
?LOG_ERROR(#{
text => <<"Pivot received 'pivot_done' from unknown pivot job">>,
in => zotonic_core,
result => error,
reason => unknown_pivot_job,
pid => PivotPid
}),
{reply, {error, unknown_pivot}, State};
handle_call({insert_task_after, _Secs, _Module, _Func, _Key, _Args}, _From, #state{ is_env_backup = true } = State) ->
{reply, {error, backup}, State};
handle_call({insert_task_after, SecondsOrDate, Module, Function, UniqueKey, ArgsFun}, _From, State) ->
Context = z_context:new(State#state.site),
Result = do_insert_task_after(SecondsOrDate, Module, Function, UniqueKey, ArgsFun, Context),
State1 = if
SecondsOrDate =:= undefined ->
next_poll(State, 0);
true ->
State
end,
{reply, Result, State1};
handle_call(status, _From, State) ->
Status = #{
site => State#state.site,
is_env_backup => State#state.is_env_backup,
is_initial_delay => State#state.is_initial_delay,
is_pivot_delay => State#state.is_pivot_delay,
task_pid => State#state.task_pid,
task_id => State#state.task_id,
task_progress => State#state.task_progress,
pivot_pid => State#state.pivot_pid,
pivot_rsc_id => State#state.pivot_rsc_id
},
{reply, {ok, Status}, State};
%% @doc Trap unknown calls
handle_call(Message, _From, State) ->
{stop, {unknown_call, Message}, State}.
handle_cast(poll, #state{ is_initial_delay = true } = State) ->
% Manual poll of the pivot queue, but starting up - wait
{noreply, State};
handle_cast(poll, #state{ is_env_backup = true } = State) ->
% No pivot or tasks when running as backup.
{noreply, State};
handle_cast(poll, State) ->
% Manual poll of the pivot queue
try
State1 = do_poll(State),
{noreply, State1}
catch
Type:Err:Stack ->
?LOG_ERROR(#{
text => <<"Pivot poll error">>,
in => zotonic_core,
result => Type,
reason => Err,
stack => Stack
}),
{noreply, State#state{ backoff_counter = ?BACKOFF_POLL_ERROR }}
end;
handle_cast({insert_queue, _DueDate, _Ids}, #state{ is_env_backup = true } = State) ->
{noreply, State};
handle_cast({insert_queue, DueDate, Ids}, State) when is_list(Ids) ->
% Insert an id into the queue.
do_insert_queue(Ids, DueDate, z_context:new(State#state.site)),
z_utils:flush_message({'$gen_cast', {insert_queue, DueDate, Ids}}),
{noreply, State};
handle_cast({pivot, _Id}, #state{ is_env_backup = true } = State) ->
{noreply, State};
handle_cast({pivot, Id}, #state{ is_initial_delay = true } = State) when is_integer(Id) ->
% Immediate pivot of an resource-id - but we are still starting up
Due = z_datetime:next_minute(calendar:universal_time()),
do_insert_queue([ Id ], Due, z_context:new(State#state.site)),
{noreply, State};
handle_cast({pivot, Id}, #state{ backoff_counter = Ct } = State) when Ct > 0 ->
% Immediate pivot of an resource-id - but we are in a back off due to errors
Due = z_datetime:next_minute(calendar:universal_time()),
do_insert_queue([ Id ], Due, z_context:new(State#state.site)),
{noreply, State};
handle_cast({pivot, Id}, State) when is_integer(Id) ->
% Immediate pivot of an resource-id - queue and add as priority
do_insert_queue([ Id ], calendar:universal_time(), z_context:new(State#state.site)),
State1 = State#state{ pivot_queue = [ Id | State#state.pivot_queue ]},
State2 = do_poll(State1),
{noreply, State2};
handle_cast({pivot_ping, Pid, Id}, #state{ pivot_pid = Pid } = State) ->
State1 = State#state{
pivot_ping = os:timestamp(),
pivot_rsc_id = Id
},
{noreply, State1};
handle_cast({pivot_ping, Pid, Id}, State) ->
?LOG_NOTICE(#{
text => <<"Pivot ping from unknown process">>,
in => zotonic_core,
result => error,
reason => unknown_process,
pid => Pid,
rsc_id => Id,
expected_pid => State#state.pivot_pid
}),
{noreply, State};
handle_cast({task_ping, _Pid, TaskId, Percentage}, #state{ task_id = TaskId } = State) ->
State1 = State#state{
task_ping = os:timestamp(),
task_progress = Percentage
},
{noreply, State1};
handle_cast({task_ping, Pid, TaskId, _Percentage}, State) ->
?LOG_NOTICE(#{
text => <<"Task ping for wrong task id">>,
in => zotonic_core,
result => error,
reason => wrong_task_id,
pid => Pid,
task_id => TaskId,
expected_task_id => State#state.task_id
}),
{noreply, State};
handle_cast(pivot_delay, State) ->
% Delay the next pivot, useful when performing big updates
{noreply, State#state{is_pivot_delay=true}};
handle_cast(Message, State) ->
{stop, {unknown_cast, Message}, State}.
%% @doc Handling all non call/cast messages
handle_info(poll, #state{ is_env_backup = true } = State) ->
{noreply, State};
handle_info(poll, #state{ is_pivot_delay = true } = State) ->
State1 = State#state{ is_pivot_delay = false },
{noreply, next_poll(State1, ?PIVOT_POLL_INTERVAL_SLOW)};
handle_info(poll, #state{backoff_counter = Ct} = State) when Ct > 0 ->
State1 = State#state{ backoff_counter = Ct - 1 },
{noreply, next_poll(State1, ?PIVOT_POLL_INTERVAL_SLOW)};
handle_info(poll, #state{ pivot_pid = Pid } = State) when is_pid(Pid) ->
?LOG_DEBUG(#{
text => <<"Pivot job still running, delaying next poll">>,
in => zotonic_core,
pivot_pid => Pid,
reason => busy
}),
{noreply, next_poll(State, ?PIVOT_POLL_INTERVAL_FAST)};
handle_info(poll, #state{ site = Site } = State) ->
case z_sites_manager:get_site_status(Site) of
{ok, running} ->
?LOG_DEBUG(#{
text => <<"Pivot poll">>,
in => zotonic_core
}),
try
State1 = do_poll(State),
IsPivoting = is_pid(State1#state.pivot_pid)
orelse is_pid(State1#state.task_pid),
Interval = case IsPivoting of
true -> ?PIVOT_POLL_INTERVAL_FAST;
false -> ?PIVOT_POLL_INTERVAL_SLOW
end,
State2 = State1#state{ is_initial_delay = false },
{noreply, next_poll(State2, Interval)}
catch
Type:Err:Stack ->
?LOG_ERROR(#{
text => <<"Pivot error">>,
in => zotonic_core,
result => Type,
reason => Err,
stack => Stack
}),
StateBackoff = State#state{ backoff_counter = ?BACKOFF_POLL_ERROR },
{noreply, next_poll(StateBackoff, ?PIVOT_POLL_INTERVAL_SLOW)}
end;
_ ->
State1 = State#state{ is_initial_delay = true },
{noreply, next_poll(State1, ?PIVOT_POLL_INTERVAL_SLOW)}
end;
handle_info({'DOWN', _MRef, process, _Pid, _Reason}, #state{ pivot_pid = undefined, task_pid = undefined } = State) ->
{noreply, State};
handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{ pivot_pid = Pid } = State) ->
LastRscId = State#state.pivot_rsc_id,
?LOG_ERROR(#{
text => <<"Pivot received DOWN from pivot job">>,
in => zotonic_core,
rsc_id => LastRscId,
result => 'DOWN',
reason => Reason,
pivot_pid => Pid
}),
Context = z_context:new(State#state.site),
z_db:q("
delete from rsc_pivot_log
where rsc_id = $1
",
[ LastRscId ],
Context),
{noreply, State#state{
pivot_pid = undefined,
pivot_rsc_id = undefined,
backoff_counter = ?BACKOFF_POLL_ERROR
}};
handle_info({'DOWN', _MRef, process, Pid, Reason}, #state{ task = Task, task_pid = Pid } = State) ->
Context = z_context:new(State#state.site),
z_pivot_rsc_task_job:maybe_schedule_retry(Task, 'DOWN', Reason, [], Context),
{noreply, State#state{ task_id = undefined, task_pid = undefined }};
handle_info(job_check, State) ->
check_pivot_job(State),
check_task_job(State),
{noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any necessary
%% cleaning up. When it returns, the gen_server terminates with Reason.
%% The return value is ignored.
terminate(_Reason, _State) ->
ok.
%% @doc Convert process state when code is changed
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%====================================================================
%% support functions
%%====================================================================
publish_task_event(What, Module, Function, undefined, Context) ->
z_mqtt:publish(task_topic(Context),
#{
what => What,
module => Module,
function => Function
},
z_acl:sudo(Context));
publish_task_event(What, Module, Function, Due, Context) ->
z_mqtt:publish(task_topic(Context),
#{
what => What,
module => Module,
function => Function,
due => Due
},
z_acl:sudo(Context)).
task_topic(Context) ->
[ <<"$SYS">>, <<"site">>, z_convert:to_binary(z_context:site(Context)), <<"task-queue">> ].
next_poll(State = #state{ poll_timer = TRef }, Interval) ->
timer:cancel(TRef),
z_utils:flush_message(poll),
{ok, TRefNew} = timer:send_after(Interval*1000, poll),
State#state{ poll_timer = TRefNew }.
check_pivot_job(#state{ pivot_pid = undefined }) ->
ok;
check_pivot_job(#state{ pivot_pid = Pid, pivot_ping = LastPing } = State) ->
Now = z_datetime:timestamp(),
Ping = z_datetime:datetime_to_timestamp(calendar:now_to_universal_time(LastPing)),
Timeout = Ping + ?PIVOT_JOB_TIMEOUT,
case Now > Timeout of
true ->
?LOG_ERROR(#{
text => <<"Pivot job timeout, killing pivot job">>,
in => zotonic_core,
reason => timeout,
pivot_pid => Pid,
rsc_id => State#state.pivot_rsc_id
}),
erlang:exit(Pid, timeout);
false ->
ok
end.
check_task_job(#state{ task_pid = undefined }) ->
ok;
check_task_job(#state{ task_pid = Pid, task_id = TaskId, task_ping = LastPing } = State) ->
Now = z_datetime:timestamp(),
Ping = z_datetime:datetime_to_timestamp(calendar:now_to_universal_time(LastPing)),
Timeout = Ping + ?TASK_JOB_TIMEOUT,
case Now > Timeout of
true ->
?LOG_ERROR(#{
text => <<"Task job timeout, killing task for later retry">>,
in => zotonic_core,
reason => timeout,
task_id => TaskId,
task_pid => Pid,
progress => State#state.task_progress
}),
% Force exit - the monitor will increment the task error count
% and then restart the task after a backoff.
erlang:exit(Pid, timeout);
false ->
ok
end.
do_insert_task_after(SecondsOrDate, Module, Function, undefined, ArgsFun, Context) ->
UniqueKey = z_ids:id(),
do_insert_task_after(SecondsOrDate, Module, Function, UniqueKey, ArgsFun, Context);
do_insert_task_after(SecondsOrDate, Module, Function, UniqueKey, ArgsFun, Context) when is_function(ArgsFun) ->
Due = to_utc_date(SecondsOrDate),
UniqueKeyBin = z_convert:to_binary(UniqueKey),
case z_db:transaction(
fun(Ctx) ->
OldTask = z_db:q_row("
select props, due
from pivot_task_queue
where module = $1
and function = $2
and key = $3
limit 1
for update",
[ Module, Function, UniqueKeyBin ],
Ctx),
New = case OldTask of
{OldProps, OldDue} ->
OldArgs = get_args(OldProps),
ArgsFun(OldDue, OldArgs, Due, Ctx);
undefined ->
ArgsFun(undefined, undefined, Due, Ctx)
end,
case New of
{ok, {NewDue, NewArgs}} ->
case OldTask of
undefined -> ok;
{_, _} ->
_ = z_db:q("
delete from pivot_task_queue
where module = $1
and function = $2
and key = $3",
[ Module, Function, UniqueKeyBin ],
Ctx)
end,
Fields = #{
<<"module">> => Module,
<<"function">> => Function,
<<"key">> => UniqueKeyBin,
<<"args">> => NewArgs,
<<"due">> => NewDue
},
z_db:insert(pivot_task_queue, Fields, Ctx);
{error, _} = Error ->
Error
end
end,
Context)
of
{ok, _} = Ok ->
z_mqtt:publish(<<"model/sysconfig/event/task">>,
#{
what => <<"insert">>,
module => Module,
function => Function,
due => Due
},
Context),
Ok;
{error, _} = Error ->
Error
end;
do_insert_task_after(SecondsOrDate, Module, Function, UniqueKey, Args, Context) ->
Due = to_utc_date(SecondsOrDate),
UniqueKeyBin = z_convert:to_binary(UniqueKey),
case z_db:transaction(
fun(Ctx) ->
_ = z_db:q("
delete from pivot_task_queue
where module = $1
and function = $2
and key = $3",
[ Module, Function, UniqueKeyBin ],
Ctx),
Fields = #{
<<"module">> => Module,
<<"function">> => Function,
<<"key">> => UniqueKeyBin,
<<"args">> => Args,
<<"due">> => Due
},
z_db:insert(pivot_task_queue, Fields, Ctx)
end,
Context)
of
{ok, _} = Ok ->
z_mqtt:publish(<<"model/sysconfig/event/task">>,
#{
what => <<"insert">>,
module => Module,
function => Function,
due => Due
},
Context),
Ok;
{error, _} = Error ->
Error
end.
%% @doc Insert a list of ids into the pivot queue.
do_insert_queue(Ids, DueDate, Context) when is_list(Ids) ->
z_db:q("insert into rsc_pivot_log as p (rsc_id, due, is_update)
select r.id, $2, true from rsc r where r.id = any($1)",
[ Ids, DueDate ], Context),
ok.
%% @doc Poll a database for any queued updates.
-spec do_poll( #state{} ) -> #state{}.
do_poll(State) ->
Context = z_acl:sudo( z_context:new(State#state.site) ),
State1 = maybe_start_task(State, Context),
maybe_start_pivot(State1, Context).
maybe_start_pivot(#state{ pivot_pid = Pid } = State, _Context) when is_pid(Pid) ->
State;
maybe_start_pivot(#state{ pivot_queue = Queue } = State, Context) ->
case do_poll_queue(Context) of
{_, undefined} ->
State;
{Ids, DueDate} ->
case lists:usort(Queue ++ Ids) of
[] ->
State;
PivotIds ->
case z_pivot_rsc_job:start_pivot(PivotIds, Context) of
{ok, Pid} ->
erlang:monitor(process, Pid),
State#state{
pivot_pid = Pid,
pivot_queue = [],
pivot_queue_inflight = Queue,
pivot_queue_inflight_date = calendar:universal_time(),
pivot_inflight_date = DueDate,
pivot_ping = os:timestamp(),
pivot_rsc_id = undefined
};
{error, _} ->
% overload - ignore
State
end
end
end.
do_poll_queue(Context) ->
try
fetch_queue(Context)
catch
exit:{timeout, _} ->
{[], undefined};
throw:{error, econnrefused} ->
{[], undefined}
end.
maybe_start_task(#state{ task_pid = undefined } = State, Context) ->
case poll_task(Context) of
{ok, #{ task_id := TaskId } = Task} ->
case z_pivot_rsc_task_job:start_task(Task, Context) of
{ok, TaskPid} ->
erlang:monitor(process, TaskPid),
State#state{
task_id = TaskId,
task_pid = TaskPid,
task_ping = os:timestamp(),
task = Task
};
{error, _} ->
State
end;
{error, enoent} ->
State;
{error, nodb} ->
State;
{error, Reason} ->
?LOG_ERROR(#{
text => "Pivot could not check task queue",
in => zotonic_core,
reason => Reason
}),
State
end;
maybe_start_task(State, _Context) ->
State.
%% @doc Fetch the next task uit de task queue, if any.
poll_task(Context) ->
case z_db:qmap_row("
select id, module, function, props, error_count
from pivot_task_queue
where due is null
or due < current_timestamp
order by due asc
limit 1",
[],
[ {keys, atom} ],
Context)
of
{ok, #{
id := Id,
module := Module,
function := Function,
props := Props,
error_count := ErrCt
}} ->
Args = get_args(Props),
{ok, #{
task_id => Id,
mfa => {z_convert:to_atom(Module), z_convert:to_atom(Function), Args},
error_count => ErrCt
}};
{error, _} = Error ->
Error
end.
get_args(Props) when is_map(Props) ->
maps:get(<<"args">>, Props, []);
get_args(Props) when is_list(Props) ->
% deprecated task queue entries
proplists:get_value(args, Props, []);
get_args(undefined) ->
[].
%% @doc Fetch the next batch of ids from the queue. Remembers the serials, as a new
%% pivot request might come in while we are pivoting.
-spec fetch_queue(Context) -> {Ids, DueDate} when
Context :: z:context(),
Ids :: [ m_rsc:resource_id() ],
DueDate :: calendar:datetime().
fetch_queue(Context) ->
PivotDate = z_db:q1("select current_timestamp - '10 second'::interval", Context),
Rows = z_db:q("
select rsc_id
from rsc_pivot_log
where due < $2
order by priority, is_update, due
limit $1",
[ ?POLL_BATCH, PivotDate ],
Context),
if
Rows =:= [] ->
{[], PivotDate};
true ->
% Remove log entries that have a pivot date after the cutoff date.
% They will be pivoted at a later date.
ToPivot = lists:foldl(
fun({Id}, Acc) ->
case z_db:q_row("
select max(due), max(due) >= $2
from rsc_pivot_log
where rsc_id = $1
", [ Id, PivotDate ], Context)
of
{_MaxDue, false} ->
[ Id | Acc ];
{MaxDue, true} ->
z_db:q("
delete from rsc_pivot_log
where rsc_id = $1
and due < $2
", [ Id, MaxDue ],
Context),
Acc
end
end,
[],
lists:usort(Rows)),
if
ToPivot =:= [] ->
fetch_queue(Context);
true ->
{ToPivot, PivotDate}
end
end.
%% @doc Delete pivot log entries for all pivoted ids. Use the due date
%% so that optional newer entries are still queued.
delete_queue(Ids, DueDate, Context) ->
z_db:q("
delete from rsc_pivot_log
where rsc_id = any($1)
and due < $2",
[ Ids, DueDate ],
Context).
%% @doc Let a module define a custom pivot. The custom pivot table is created or
%% changed to reflect the given columns. Per default an index is created for each
%% column.
%% Be careful when adding columns that are not nullable, if the table contains data then
%% adding those columns will fail.
-spec define_custom_pivot(Module, Columns, Context) -> ok when
Module :: atom(),
Columns :: [ Column ],
Column :: #column_def{}
| {#column_def{}, Options}
| {Colname, Colspec}
| {Colname, Colspec, Options},
Colname :: string() | binary() | atom(),
Colspec :: string() | binary(),
Options :: [ Option ],
Option :: noindex,
Context :: z:context().
define_custom_pivot(Module, Columns, Context) ->
TableName = list_to_atom("pivot_" ++ z_convert:to_list(Module)),
ColDefs = to_column_defs(Columns),
case z_db:table_exists(TableName, Context) of
true ->
DbColumns = z_db:column_names(TableName, Context) -- [ id ],
DefColumns = [ Name || {#column_def{ name = Name }, _} <- ColDefs ],
case lists:usort(DefColumns) =:= lists:usort(DbColumns) of
false ->
update_custom_pivot(TableName, ColDefs, Context);
true ->
ok
end;
false ->
update_custom_pivot(TableName, ColDefs, Context)
end.
update_custom_pivot(TableName, DefColumns, Context) ->
Cols = [ Col || {Col, _} <- DefColumns ],
Cols1 = [
#column_def{
name = id,
type = "integer",
primary_key = true,
is_nullable = false
}
| Cols
],
ok = z_db:transaction(
fun(Ctx) ->
ok = z_db:alter_table(TableName, Cols1, Ctx),
Constraint = "fk_" ++ z_convert:to_list(TableName) ++ "_id",
case z_db:constraint_exists(TableName, Constraint, Ctx) of
true ->
ok;
false ->
[] = z_db:q("ALTER TABLE " ++ z_convert:to_list(TableName) ++
" ADD CONSTRAINT " ++ Constraint ++
" FOREIGN KEY (id) REFERENCES rsc(id) ON UPDATE CASCADE ON DELETE CASCADE", Ctx)
end,
lists:foreach(
fun({#column_def{ name = ColName }, Opts}) ->
case proplists:get_bool(noindex, Opts) of
true ->
ok;
false ->
Sql = iolist_to_binary([
"CREATE INDEX IF NOT EXISTS ",
z_convert:to_list(TableName), "_", z_convert:to_list(ColName), "_key ON ",
z_convert:to_list(TableName), "(", z_convert:to_list(ColName), ")"
]),
[] = z_db:q(Sql, Ctx)
end
end,
DefColumns)
end,
Context),
z_db:flush(Context),
ok.
to_column_defs(List) ->
lists:map(
fun
(#column_def{} = Def) -> {Def, []};
({#column_def{}, _Opts} = Def) -> Def;
({Name, Spec}) -> {to_column_def(Name, Spec), []};
({Name, Spec, Opts}) -> {to_column_def(Name, Spec), Opts}
end,
List).
to_column_def(Name, Spec) ->
C = #column_def{ name = z_convert:to_atom(Name), type = <<>> },
{C1, SpecRest} = case binary:split(z_convert:to_binary(Spec), <<"(">>) of
[Type, LenRest] ->
[Len, Rest] = binary:split(LenRest, <<")">>),
Len1 = binary_to_integer(z_string:trim(Len)),
{C#column_def{ length = Len1 }, <<Type/binary, " ", Rest/binary>>};
[Rest] ->
{C, Rest}
end,
case binary:split(z_string:to_lower(SpecRest), <<"not null">>) of
[Type1, NullRest] ->
case binary:split(NullRest, <<"default ">>) of
[_, Default] ->
C1#column_def{ type = z_string:trim(Type1), is_nullable = false, default = z_string:trim(Default) };
[_] ->
C1#column_def{ type = z_string:trim(Type1), is_nullable = false }
end;
[NullRest] ->
case binary:split(NullRest, <<"default ">>) of
[Type1, Default] ->
C1#column_def{ type = z_string:trim(Type1), default = z_string:trim(Default) };
[Type1] ->
C1#column_def{ type = z_string:trim(Type1) }
end
end.
%% @doc Lookup a custom pivot; give back the Id based on a column. Will always return the first Id found.
lookup_custom_pivot(Module, Column, Value, Context) ->
TableName = "pivot_" ++ z_convert:to_list(Module),
Column1 = z_convert:to_list(Column),
Query = "SELECT id FROM " ++ TableName ++ " WHERE " ++ Column1 ++ " = $1",
z_db:q1(Query, [Value], Context).