% This file is licensed to you under the Apache License,
% Version 2.0 (the "License"); you may not use this file
% except in compliance with the License. You may obtain
% a copy of the License at
%
% https://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing,
% software distributed under the License is distributed on an
% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
% KIND, either express or implied. See the License for the
% specific language governing permissions and limitations
% under the License.
%%% @author Fernando Benavides <elbrujohalcon@inaka.net>
%%% @doc Decorator over `gen_server' that lets `wpool_pool'
%%% control certain aspects of the execution
-module(wpool_process).
-behaviour(gen_server).
-record(state,
{name :: atom(),
mod :: atom(),
state :: term(),
options ::
#{time_checker := atom(),
queue_manager := atom(),
overrun_warning := timeout(),
_ => _}}).
-opaque state() :: #state{}.
-export_type([state/0]).
-type from() :: {pid(), reference()}.
-export_type([from/0]).
-type next_step() :: timeout() | hibernate | {continue, term()}.
-export_type([next_step/0]).
-type options() :: [{time_checker | queue_manager, atom()} | wpool:option()].
-export_type([options/0]).
%% api
-export([start_link/4, call/3, cast/2, send_request/2]).
%% gen_server callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2,
handle_continue/2, format_status/2]).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Starts a named process
-spec start_link(wpool:name(), module(), term(), options()) ->
{ok, pid()} | ignore | {error, {already_started, pid()} | term()}.
start_link(Name, Module, InitArgs, Options) ->
FullOpts = wpool_utils:add_defaults(Options),
WorkerOpt = proplists:get_value(worker_opt, FullOpts, []),
gen_server:start_link({local, Name},
?MODULE,
{Name, Module, InitArgs, FullOpts},
WorkerOpt).
%% @equiv gen_server:call(Process, Call, Timeout)
-spec call(wpool:name() | pid(), term(), timeout()) -> term().
call(Process, Call, Timeout) ->
gen_server:call(Process, Call, Timeout).
%% @equiv gen_server:cast(Process, {cast, Cast})
-spec cast(wpool:name() | pid(), term()) -> ok.
cast(Process, Cast) ->
gen_server:cast(Process, Cast).
%% @equiv gen_server:send_request(Process, Request)
-spec send_request(wpool:name() | pid(), term()) -> term().
send_request(Name, Request) ->
gen_server:send_request(Name, Request).
%%%===================================================================
%%% init, terminate, code_change, info callbacks
%%%===================================================================
%% @private
-spec init({atom(), atom(), term(), options()}) ->
{ok, state()} | {ok, state(), next_step()} | {stop, can_not_ignore} | {stop, term()}.
init({Name, Mod, InitArgs, LOptions}) ->
Options = maps:from_list(LOptions),
wpool_process_callbacks:notify(handle_init_start, Options, [Name]),
case Mod:init(InitArgs) of
{ok, ModState} ->
ok = notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok,
#state{name = Name,
mod = Mod,
state = ModState,
options = Options}};
{ok, ModState, NextStep} ->
ok = notify_queue_manager(new_worker, Name, Options),
wpool_process_callbacks:notify(handle_worker_creation, Options, [Name]),
{ok,
#state{name = Name,
mod = Mod,
state = ModState,
options = Options},
NextStep};
ignore ->
{stop, can_not_ignore};
Error ->
Error
end.
%% @private
-spec terminate(atom(), state()) -> term().
terminate(Reason, State) ->
#state{mod = Mod,
state = ModState,
name = Name,
options = Options} =
State,
ok = notify_queue_manager(worker_dead, Name, Options),
wpool_process_callbacks:notify(handle_worker_death, Options, [Name, Reason]),
Mod:terminate(Reason, ModState).
%% @private
-spec code_change(string(), state(), any()) -> {ok, state()} | {error, term()}.
code_change(OldVsn, State, Extra) ->
case (State#state.mod):code_change(OldVsn, State#state.state, Extra) of
{ok, NewState} ->
{ok, State#state{state = NewState}};
Error ->
{error, Error}
end.
%% @private
-spec handle_info(any(), state()) ->
{noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}.
handle_info(Info, State) ->
try (State#state.mod):handle_info(Info, State#state.state) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}}
catch
_:{noreply, NewState} ->
{noreply, State#state{state = NewState}};
_:{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
_:{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}}
end.
%% @private
-spec handle_continue(any(), state()) ->
{noreply, state()} |
{noreply, state(), next_step()} |
{stop, term(), state()}.
handle_continue(Continue, State) ->
try (State#state.mod):handle_continue(Continue, State#state.state) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}}
catch
_:{noreply, NewState} ->
{noreply, State#state{state = NewState}};
_:{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
_:{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}}
end.
%% @private
-spec format_status(normal | terminate, [[{_, _}] | state(), ...]) -> term().
format_status(Opt, [PDict, State]) ->
case erlang:function_exported(State#state.mod, format_status, 2) of
false ->
case Opt % This is copied from gen_server:format_status/4
of
terminate ->
State#state.state;
normal ->
[{data, [{"State", State#state.state}]}]
end;
true ->
(State#state.mod):format_status(Opt, [PDict, State#state.state])
end.
%%%===================================================================
%%% real (i.e. interesting) callbacks
%%%===================================================================
%% @private
-spec handle_cast(term(), state()) ->
{noreply, state()} | {noreply, state(), next_step()} | {stop, term(), state()}.
handle_cast(Cast, #state{options = Options} = State) ->
Task = wpool_utils:task_init({cast, Cast}, Options),
ok = notify_queue_manager(worker_busy, State#state.name, Options),
Reply =
try (State#state.mod):handle_cast(Cast, State#state.state) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}}
catch
_:{noreply, NewState} ->
{noreply, State#state{state = NewState}};
_:{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
_:{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}}
end,
wpool_utils:task_end(Task),
ok = notify_queue_manager(worker_ready, State#state.name, Options),
Reply.
%% @private
-spec handle_call(term(), from(), state()) ->
{reply, term(), state()} |
{reply, term(), state(), next_step()} |
{noreply, state()} |
{noreply, state(), next_step()} |
{stop, term(), term(), state()} |
{stop, term(), state()}.
handle_call(Call, From, #state{options = Options} = State) ->
Task = wpool_utils:task_init({call, Call}, Options),
ok = notify_queue_manager(worker_busy, State#state.name, Options),
Reply =
try (State#state.mod):handle_call(Call, From, State#state.state) of
{noreply, NewState} ->
{noreply, State#state{state = NewState}};
{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
{reply, Response, NewState} ->
{reply, Response, State#state{state = NewState}};
{reply, Response, NewState, NextStep} ->
{reply, Response, State#state{state = NewState}, NextStep};
{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}};
{stop, Reason, Response, NewState} ->
{stop, Reason, Response, State#state{state = NewState}}
catch
_:{noreply, NewState} ->
{noreply, State#state{state = NewState}};
_:{noreply, NewState, NextStep} ->
{noreply, State#state{state = NewState}, NextStep};
_:{reply, Response, NewState} ->
{reply, Response, State#state{state = NewState}};
_:{reply, Response, NewState, NextStep} ->
{reply, Response, State#state{state = NewState}, NextStep};
_:{stop, Reason, NewState} ->
{stop, Reason, State#state{state = NewState}};
_:{stop, Reason, Response, NewState} ->
{stop, Reason, Response, State#state{state = NewState}}
end,
wpool_utils:task_end(Task),
ok = notify_queue_manager(worker_ready, State#state.name, Options),
Reply.
notify_queue_manager(Function, Name, #{queue_manager := QueueManager}) ->
wpool_queue_manager:Function(QueueManager, Name);
notify_queue_manager(_, _, _) ->
ok.