-module(treewalker_crawler).
-include_lib("kernel/include/logger.hrl").
-behaviour(gen_statem).
%% API
-export([start_link/3,
start_crawler/1,
stop_crawler/1]).
%% gen_statem callbacks
-export([init/1,
callback_mode/0,
handle_event/4,
terminate/3]).
-record(data, {config :: config(),
retry_timeout :: pos_integer(),
requests_by_ids = #{} :: requests_by_ids(),
dispatcher_id :: dispatcher_id()}).
-define(VIA_GPROC(Id), {via, gproc, {n, l, Id}}).
-type config() :: treewalker_crawler_config:config().
-type requests_by_ids() :: #{request_id() := {url(), depth()}}.
-type depth() :: treewalker_crawler_config:depth().
-type id() :: treewalker_crawler_sup:crawler_id().
-type url() :: treewalker_page:url().
-type request_id() :: reference().
-type dispatcher_id() :: treewalker_crawler_sup:dispatcher_id().
-type agent_rules() :: robots:agent_rules().
-type either(Left, Right) :: {ok, Left} | {error, Right}.
-type data() :: #data{}.
-define(RETRY_TIMEOUT, 5000).
%%%===================================================================
%%% API
%%%===================================================================
-spec start_link(id(), dispatcher_id(), config()) -> {ok, pid()} | {error, term()}.
start_link(Id, DispatcherId, Config) ->
gen_statem:start_link(?VIA_GPROC(Id), ?MODULE, [DispatcherId, Config], []).
-spec start_crawler(id()) -> ok.
start_crawler(Id) ->
gen_statem:cast(?VIA_GPROC(Id), start).
-spec stop_crawler(id()) -> ok.
stop_crawler(Id) ->
gen_statem:cast(?VIA_GPROC(Id), stop).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
callback_mode() ->
[handle_event_function].
init([DispatcherId, Config]) ->
RetryTimeout = application:get_env(treewalker, retry_interval, ?RETRY_TIMEOUT),
{ok, stopped, #data{config = Config,
dispatcher_id = DispatcherId,
retry_timeout = RetryTimeout}}.
handle_event(internal, fetch_robots, started, Data=#data{config = Config}) ->
Url = treewalker_crawler_config:url(Config),
?LOG_DEBUG(#{what => robots_fetch, status => start, url => Url}),
UriMap = uri_string:parse(Url),
Recomposed = uri_string:recompose(UriMap#{path => <<"/robots.txt">>}),
try_fetch_robots(Recomposed, Data);
handle_event(internal, crawl, {crawling, _Robots}, Data=#data{config = Config}) ->
Url = treewalker_crawler_config:url(Config),
UpdatedData = crawl(Url, 0, Data),
{keep_state, UpdatedData};
handle_event(cast, start, _State, Data) ->
?LOG_INFO(#{what => crawler_start, status => start}),
NewData = Data#data{requests_by_ids = #{}},
{next_state, started, NewData, {next_event, internal, fetch_robots}};
handle_event(cast, stop, _State, Data) ->
?LOG_INFO(#{what => crawler_start, status => start}),
NewData = Data#data{requests_by_ids = #{}},
{next_state, stopped, NewData};
handle_event(cast, _Message, _State, _Data) ->
keep_state_and_data;
handle_event({call, From}, _Action, State, Data) ->
{next_state, State, Data, [{reply, From, Data}]};
handle_event(info, {treewalker_dispatcher, Ref, {ok, {Code, Result}}}, {crawling, Robots}, Data) ->
?LOG_DEBUG(#{what => crawl, event => message_received, status => in_progress, code => Code,
id => Ref}),
UpdatedData = walk(Result, Ref, Robots, Data),
{keep_state, UpdatedData};
handle_event(info, {treewalker_dispatcher, Ref, Error={error, _}}, {crawling, _Robots},
_Data) ->
?LOG_WARNING(#{what => crawl, event => message_received, status => in_progress,
result => error, id => Ref, reason => Error}),
keep_state_and_data;
handle_event(info, {treewalker_dispatcher, Ref, Url, _Result}, stopped, _Data) ->
?LOG_WARNING(#{what => response_received, id => Ref, url => Url, status => done,
result => ignored, reason => crawler_stopped}),
keep_state_and_data;
handle_event(info, Message, State, _Data) ->
?LOG_WARNING(#{what => unexpected_message, message => Message, state => State}),
keep_state_and_data;
handle_event({timeout, retry}, retry, started, _Data) ->
{keep_state_and_data, {next_event, internal, fetch_robots}};
handle_event(Event, Message, State, _Data) ->
?LOG_WARNING(#{what => unexpected_event, event => Event, message => Message, state => State}),
keep_state_and_data.
terminate(_Reason, _State, _Data) ->
ok.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec walk(binary(), request_id(), agent_rules(), data()) -> data().
walk(Content, Ref, Robots, Data=#data{config = Config}) ->
MaxDepth = treewalker_crawler_config:max_depth(Config),
case maps:take(Ref, Data#data.requests_by_ids) of
{{Url, Depth}, RequestsByIds} when Depth =< MaxDepth ->
ok = try_store(Url, Content, Config),
UpdatedData = Data#data{requests_by_ids = RequestsByIds},
Scraper = treewalker_crawler_config:scraper(Config),
Options = treewalker_crawler_config:scraper_options(Config),
Result = Scraper:scrap_links(Url, Content, Options),
maybe_walk(Result, Depth, Robots, UpdatedData);
{{Url, Depth}, RequestsByIds} ->
?LOG_INFO(#{what => walk, status => done, reason => max_depth_reached, url => Url,
max_depth => MaxDepth, depth => Depth}),
Data#data{requests_by_ids = RequestsByIds};
error ->
Data
end.
-spec try_store(url(), treewalker_scraper:page_data(), config()) -> ok.
try_store(Url, Content, Config) ->
Scraper = treewalker_crawler_config:scraper(Config),
Options = treewalker_crawler_config:scraper_options(Config),
?LOG_DEBUG(#{what => store, status => start, url => Url}),
case Scraper:scrap(Url, Content, Options) of
{ok, Scraped} ->
Page = page(Url, Scraped, Config),
Store = treewalker_crawler_config:store(Config),
StoreOptions = treewalker_crawler_config:store_options(Config),
Store:store(Page, StoreOptions);
Error={error, _} ->
?LOG_ERROR(#{what => store, url => Url, status => done, result => error,
reason => Error}),
ok
end.
-spec maybe_walk(either([url()], term()), depth(), agent_rules(), data()) -> data().
maybe_walk({ok, Links}, Depth, Robots, Data) ->
Filter = filter_link(Data, Robots),
Filtered = lists:filtermap(Filter, Links),
lists:foldl(fun (Url, Acc) -> crawl(Url, Depth + 1, Acc) end, Data, Filtered);
maybe_walk(Error={error, _}, _Depth, _Robots, Data) ->
?LOG_WARNING(#{what => walk, status => done, result => error, reason => Error}),
Data.
-spec crawl(url(), depth(), data()) -> data().
crawl(Url, Depth, Data=#data{requests_by_ids = RequestsByIds}) ->
case treewalker_dispatcher:request(Data#data.dispatcher_id, Url) of
Error={error, _} ->
?LOG_DEBUG(#{what => crawl, status => skip, reason => Error}),
Data;
{ok, Ref} ->
Data#data{requests_by_ids = RequestsByIds#{Ref => {Url, Depth}}}
end.
-spec filter_link(data(), agent_rules()) -> fun ((url()) -> {true, url()} | false).
filter_link(Data, Robots) ->
Filter = filter(Data, Robots),
Normalize = normalize_relative_url(Data),
fun (Url) ->
case Normalize(Url) of
{ok, Normalized} ->
case Filter(Normalized) of
true ->
{true, Normalized};
false ->
false
end;
Error={error, _} ->
?LOG_WARNING(#{what => walk, status => in_progress,
result => skipping, url => Url, reason => Error}),
false
end
end.
-spec normalize_relative_url(data()) -> fun ((url()) -> either(url(), term())).
normalize_relative_url(#data{config = Config}) ->
Url = treewalker_crawler_config:url(Config),
UriMap = uri_string:parse(Url),
Updated = UriMap#{path => <<"/">>},
fun (Other) ->
case uri_string:parse(Other) of
{error, E, I} ->
{error, {E, I}};
OtherUriMap ->
Merged = merge_urls(Updated, OtherUriMap),
{ok, Merged}
end
end.
-spec merge_urls(uri_string:uri_map(), uri_string:uri_map()) -> uri_string:uri_string().
merge_urls(Url, Other) ->
Merged = maps:merge(Url, Other),
Fixed = fix_path(Merged),
Normalized = uri_string:normalize(Fixed, [return_map]),
uri_string:recompose(Normalized).
-spec fix_path(uri_string:uri_map()) -> uri_string:uri_map().
fix_path(Uri=#{path := <<$/, _/binary>>}) ->
Uri;
fix_path(Uri=#{path := Path}) ->
Uri#{path := <<$/, Path/binary>>}.
-spec filter(data(), agent_rules()) -> fun ((url()) -> boolean()).
filter(#data{config = Config}, Robots) ->
LinkFilter = treewalker_crawler_config:link_filter(Config),
UserAgent = treewalker_crawler_config:user_agent(Config),
fun (Url) ->
Filtered = LinkFilter:filter(Url),
#{path := Path} = uri_string:parse(Url),
Allowed = robots:is_allowed(UserAgent, Path, Robots),
Filtered andalso Allowed
end.
try_fetch_robots(Url, Data=#data{config = Config}) ->
Fetcher = treewalker_crawler_config:fetcher(Config),
Options = treewalker_crawler_config:fetcher_options(Config),
UserAgent = treewalker_crawler_config:user_agent(Config),
case Fetcher:request(Url, UserAgent, Options) of
{ok, {Code, Body}} ->
try_parse_robots(Code, Body, Data);
Error={error, _} ->
?LOG_ERROR(#{what => robots_fetch, status => done, result => error, reason => Error}),
{keep_state_and_data, {{timeout, retry}, Data#data.retry_timeout, retry}}
end.
try_parse_robots(Code, Body, Data) ->
case robots:parse(Body, Code) of
{ok, Robots} ->
{next_state, {crawling, Robots}, Data, {next_event, internal, crawl}};
Error={error, _} ->
?LOG_ERROR(#{what => robots_fetch, status => done, result => error, reason => Error}),
{keep_state_and_data, {{timeout, retry}, Data#data.retry_timeout, retry}}
end.
-spec page(url(), treewalker_page:content(), config()) -> treewalker_page:page().
page(Url, ScrapedContent, Config) ->
Page = treewalker_page:init(),
UserAgent = treewalker_crawler_config:user_agent(Config),
WithUrl = treewalker_page:url(Url, Page),
WithContent = treewalker_page:content(ScrapedContent, WithUrl),
treewalker_page:name(UserAgent, WithContent).