src/treewalker_dispatcher.erl

-module(treewalker_dispatcher).

-include_lib("kernel/include/logger.hrl").
-include_lib("treewalker.hrl").

-behaviour(gen_server).

%% API
-export([start_link/2,
         request/2]).

%% gen_server callbacks
-export([init/1,
         handle_call/3,
         handle_cast/2,
         handle_info/2,
         terminate/2]).

-record(state, {config :: config(),
                visited_urls = sets:new() :: sets:set(url()),
                retry_policy :: retry_policy(),
                max_worker_delay :: pos_integer(),
                max_concurrent_worker :: pos_integer(),
                pending_requests = queue:new() :: queue:queue({reference(), pid(), url()}),
                callers_by_workers_pids = #{} :: #{pid() := {reference(), pid()}}}).

-define(VIA_GPROC(Id), {via, gproc, {n, l, Id}}).
-define(MAX_WORKER_DELAY, 5).
-define(MIN_RETRY_DELAY, 1000).
-define(MAX_RETRY_DELAY, 5 * 60000).
-define(MAX_RETRIES, 5).
-define(MAX_CONCURRENT_WORKERS, 10).
-define(MILLISECONDS_PER_SECOND, 1000).

-type url() :: treewalker_page:url().
-type retry_policy() :: treewalker_worker:retry_policy().
-type config() :: treewalker_crawler_config:config().
-type id() :: treewalker_crawler_sup:dispatcher_id().

%%%===================================================================
%%% API
%%%===================================================================

-spec start_link(id(), config()) -> {ok, pid()} | {error, term()}.
start_link(Id, Config) ->
    gen_server:start_link(?VIA_GPROC(Id), ?MODULE, [Config], []).

-spec request(id(), url()) -> {ok, reference()} | {error, term()}.
request(Id, Url) ->
    gen_server:call(?VIA_GPROC(Id), {request, self(), Url}).

%%%===================================================================
%%% gen_server callbacks
%%%===================================================================

init([Config]) ->
    process_flag(trap_exit, true),
    MinDelay = application:get_env(treewalker, min_retry_delay, ?MIN_RETRY_DELAY),
    MaxDelay = application:get_env(treewalker, max_retry_delay, ?MAX_RETRY_DELAY),
    MaxRetry = application:get_env(treewalker, max_retries, ?MAX_RETRIES),
    MaxWorkerDelay = application:get_env(treewalker, max_worker_delay, ?MAX_WORKER_DELAY),
    MaxConcurrentWorker = application:get_env(treewalker,
                                              max_concurrent_worker,
                                              ?MAX_CONCURRENT_WORKERS),
    RetryPolicy = #retry_policy{max_retry = MaxRetry, min_delay = MinDelay, max_delay = MaxDelay},
    {ok, #state{config = Config,
                max_worker_delay = MaxWorkerDelay,
                retry_policy = RetryPolicy,
                max_concurrent_worker = MaxConcurrentWorker}}.

handle_call({request, Caller, Url}, _, State=#state{pending_requests = PendingRequests,
                                                    visited_urls = VisitedUrls}) ->
    ?LOG_INFO(#{what => request_received, requester => Caller, status => queuing, url => Url}),
    case sets:is_element(Url, VisitedUrls) of
        true ->
            {reply, {error, {already_present, Url}}, State};
        false ->
            Delay = treewalker_rate_limit:delay(State#state.max_worker_delay),
            erlang:send_after(Delay, self(), start),
            Ref = make_ref(),
            NewRequest = {Ref, Caller, Url},
            UpdatedState = State#state{pending_requests = queue:in(NewRequest, PendingRequests),
                                       visited_urls = sets:add_element(Url, VisitedUrls)},
            {reply, {ok, Ref}, UpdatedState}
    end;

handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

handle_info(start, State=#state{callers_by_workers_pids = Workers,
                                max_concurrent_worker = MaxConcurrentWorker})
  when map_size(Workers) < MaxConcurrentWorker ->
    ?LOG_INFO(#{what => worker_start, status => start, workers_amount => map_size(Workers)}),
    try_start_pending_requests(State);
handle_info(start, State) ->
    {noreply, State};

handle_info({treewalker_worker, Pid, Reason}, State) ->
    ?LOG_DEBUG(#{what => worker_response, pid => Pid}),
    UpdatedState = maybe_response_to_caller(Reason, Pid, State),
    Delay = treewalker_rate_limit:delay(State#state.max_worker_delay),
    erlang:send_after(Delay, self(), start),
    {noreply, UpdatedState};
handle_info({'EXIT', Pid, Reason}, State) ->
    ?LOG_DEBUG(#{what => worker_exit, pid => Pid, reason => Reason}),
    UpdatedState = maybe_response_to_caller({error, Reason}, Pid, State),
    Delay = treewalker_rate_limit:delay(State#state.max_worker_delay),
    erlang:send_after(Delay, self(), start),
    {noreply, UpdatedState};

handle_info(Message, State) ->
    ?LOG_WARNING(#{what => unexpected_message, message => Message}),
    {noreply, State}.

terminate(_Reason, _State) ->
    ok.

%%%===================================================================
%%% Internal functions
%%%===================================================================

try_start_worker(Caller, Ref, Url, State=#state{retry_policy = RetryPolicy, config = Config}) ->
    case treewalker_worker:start_link(Url, RetryPolicy, Config, self()) of
        {ok, Pid} ->
            ?LOG_INFO(#{what => request_received, requester => Caller, status => in_progress,
                        worker_pid => Pid, url => Url}),
            CallersByWorkersPids = State#state.callers_by_workers_pids,
            State#state{callers_by_workers_pids = CallersByWorkersPids#{Pid => {Ref, Caller}}};
        Error={error, _} ->
            ?LOG_ERROR(#{what => request_received, requester => Caller, status => done,
                         result => error, reason => Error}),
            Caller ! {?MODULE, Ref, Error},
            State
    end.

maybe_response_to_caller(Response, Pid, State) ->
    case maps:take(Pid, State#state.callers_by_workers_pids) of
        {{Ref, Caller}, CallersByWorkersPids} ->
            Caller ! {?MODULE, Ref, Response},
            State#state{callers_by_workers_pids = CallersByWorkersPids};
        error ->
            State
    end.

try_start_pending_requests(State=#state{pending_requests = PendingRequests}) ->
    case queue:out(PendingRequests) of
        {{value, {Ref, Caller, Url}}, UpdatedRequests} ->
            UpdatedState = State#state{pending_requests = UpdatedRequests},
            {noreply, try_start_worker(Caller, Ref, Url, UpdatedState)};
        {empty, _} ->
            {noreply, State}
    end.