%% Copyright (c) 2017-2022 Guilherme Andrade
%%
%% Permission is hereby granted, free of charge, to any person obtaining a
%% copy of this software and associated documentation files (the "Software"),
%% to deal in the Software without restriction, including without limitation
%% the rights to use, copy, modify, merge, publish, distribute, sublicense,
%% and/or sell copies of the Software, and to permit persons to whom the
%% Software is furnished to do so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included in
%% all copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
%% DEALINGS IN THE SOFTWARE.
%%
%% locus is an independent project and has not been authorized, sponsored,
%% or otherwise approved by MaxMind.
%% @doc Loads and unpacks databases while managing any associated assets
%% (e.g. reading from and writing to cache)
-module(locus_loader).
-behaviour(gen_server).
-include_lib("stdlib/include/assert.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
-export(
[validate_opts/2,
start_link/4,
valid_blob_formats/0
]).
-ignore_xref(
[start_link/4
]).
-ifdef(TEST).
-export(
[cached_database_path_for_maxmind_edition_name/2,
cached_database_path_for_url/1,
cached_database_path_for_custom_fetcher/2
]).
-endif.
%% ------------------------------------------------------------------
%% gen_server Function Exports
%% ------------------------------------------------------------------
-export(
[init/1,
handle_call/3,
handle_cast/2,
handle_info/2,
terminate/2,
code_change/3
]).
%% ------------------------------------------------------------------
%% Macro Definitions
%% ------------------------------------------------------------------
-define(HIBERNATE_AFTER, (timer:seconds(5))).
-define(is_pos_integer(V), ((is_integer((V)) andalso ((V) >= 1)))).
% https://en.wikipedia.org/wiki/Gzip
-define(GZIP_MAGIC_BYTES, 16#1f, 16#8b).
%% ------------------------------------------------------------------
%% Record and Type Definitions
%% ------------------------------------------------------------------
-type opt() :: loader_opt() | fetcher_opt().
-export_type([opt/0]).
-type loader_opt() ::
{update_period, milliseconds_interval()} |
{error_retries, error_retry_behaviour()} |
no_cache |
{database_cache_file, file:filename()}.
-export_type([loader_opt/0]).
-type milliseconds_interval() :: pos_integer().
-export_type([milliseconds_interval/0]).
-type error_retry_behaviour() ::
{backoff, milliseconds_interval()} |
{exponential_backoff, exponential_backoff_params()}.
-export_type([error_retry_behaviour/0]).
-type exponential_backoff_params() ::
#{min_interval := milliseconds_interval(),
max_interval := milliseconds_interval(),
growth_base := milliseconds_interval(),
growth_exponent := number()
}.
-export_type([exponential_backoff_params/0]).
-type fetcher_opt() :: locus_maxmind_download:opt() | locus_http_download:opt().
-export_type([fetcher_opt/0]).
-type fetcher_msg() ::
locus_http_download:msg() |
locus_filesystem_load:msg() |
locus_custom_fetcher:msg().
-type fetcher_success() ::
locus_http_download:success() |
locus_filesystem_load:success() |
locus_custom_fetcher:success().
-type cacher_msg() :: locus_filesystem_store:msg().
-record(state, {
owner_pid :: pid(),
database_id :: atom(),
origin :: origin(),
settings :: settings(),
fetcher_opts :: [fetcher_opt()],
update_timer :: reference() | undefined,
last_modified :: calendar:datetime() | unknown,
last_loaded_version :: calendar:datetime() | undefined,
fetch_metadata_for_last_successful_load
:: locus_custom_fetcher:successful_fetch_metadata() | undefined,
fetcher_pid :: pid() | undefined,
fetcher_source :: source() | undefined,
error_backoff_count :: non_neg_integer(),
cacher_pid :: pid() | undefined,
cacher_path :: locus_filesystem_store:path() | undefined,
cacher_source :: source() | undefined
}).
-type state() :: #state{}.
-record(settings, {
update_period :: pos_integer(),
error_retry_behaviour :: error_retry_behaviour(),
error_retry_behaviour_applies_after_readiness :: boolean(),
use_cache :: boolean(),
database_cache_file :: file:filename() | undefined
}).
-type settings() :: #settings{}.
-type update_period() :: pos_integer() | uniformly_distributed_update_period().
-export_type([update_period/0]).
-type uniformly_distributed_update_period() :: #{ min := pos_integer(), max := pos_integer() }.
-export_type([uniformly_distributed_update_period/0]).
-type blob_format() :: tgz | tarball | gzip | gzipped_mmdb | mmdb | unknown.
-export_type([blob_format/0]).
-type origin() ::
{maxmind, atom()} |
{http, locus_http_download:url()} |
{filesystem, locus_filesystem_load:path()} |
locus:custom_fetcher().
-export_type([origin/0]).
-type maxmind_origin_params() ::
#{ license_key := unicode:unicode_binary(),
date => calendar:date()
}.
-export_type([maxmind_origin_params/0]).
-type event() ::
locus_maxmind_download:event() |
locus_http_download:event() |
locus_filesystem_load:event() |
locus_custom_fetcher:event() |
event_cache_attempt_finished().
-export_type([event/0]).
-type event_cache_attempt_finished() ::
{cache_attempt_finished, locus_filesystem_store:path(), ok} |
{cache_attempt_finished, locus_filesystem_store:path(), {error, term()}}.
-export_type([event_cache_attempt_finished/0]).
-type source() ::
{remote, provider_source()} |
{remote, locus_http_download:url()} |
locus_filesystem_load:source() |
locus_custom_fetcher:source().
-export_type([source/0]).
-type provider_source() ::
{maxmind, atom()}.
-export_type([provider_source/0]).
-type msg() ::
{event, event()} |
{load_success, source(), calendar:datetime(), locus_mmdb:database()} |
{load_failure, source(), Reason :: term()}.
-export_type([msg/0]).
%% ------------------------------------------------------------------
%% API Function Definitions
%% ------------------------------------------------------------------
-spec validate_opts(origin(), proplists:proplist())
-> {ok, {[opt()], [fetcher_opt()], proplists:proplist()}} |
{error, BadOpt :: term()}.
%% @private
validate_opts(Origin, MixedOpts) ->
case validate_fetcher_opts(Origin, MixedOpts) of
{ok, {FetcherOpts, RemainingMixedOpts}} ->
validate_loader_opts(RemainingMixedOpts, FetcherOpts);
{error, BadOpt} ->
{error, BadOpt}
end.
-spec start_link(atom(), origin(), [loader_opt()], [fetcher_opt()]) -> {ok, pid()}.
%% @private
start_link(DatabaseId, Origin, LoaderOpts, FetcherOpts) ->
Opts = [self(), DatabaseId, Origin, LoaderOpts, FetcherOpts],
ServerOpts = [{hibernate_after, ?HIBERNATE_AFTER}],
gen_server:start_link(?MODULE, Opts, ServerOpts).
-spec valid_blob_formats() -> [tgz | tarball | gzip | gzipped_mmdb | mmdb | unknown, ...].
%% @private
valid_blob_formats() ->
[tgz, tarball, gzip, gzipped_mmdb, mmdb, unknown].
%% ------------------------------------------------------------------
%% gen_server Function Definitions
%% ------------------------------------------------------------------
-spec init([pid() | atom() | origin() | [loader_opt()] | [fetcher_opt()], ...])
-> {ok, state()}.
%% @private
init([OwnerPid, DatabaseId, Origin, LoaderOpts, FetcherOpts]) ->
_ = process_flag(trap_exit, true),
DefaultSettings = default_settings(Origin),
Settings = customized_settings(DefaultSettings, LoaderOpts),
State =
#state{
owner_pid = OwnerPid,
database_id = DatabaseId,
origin = Origin,
settings = Settings,
last_modified = unknown,
fetcher_opts = FetcherOpts,
error_backoff_count = 0
},
self() ! finish_initialization,
{ok, State}.
-spec handle_call(term(), {pid(), reference()}, state())
-> {stop, unexpected_call, state()}.
%% @private
handle_call(_Call, _From, State) ->
{stop, unexpected_call, State}.
-spec handle_cast(term(), state())
-> {stop, unexpected_cast, state()}.
%% @private
handle_cast(_Cast, State) ->
{stop, unexpected_cast, State}.
-spec handle_info(term(), state())
-> {noreply, state()} |
{stop, term(), state()}.
%% @private
handle_info(finish_initialization, State)
when State#state.update_timer =:= undefined ->
UpdatedState = finish_initialization(State),
{noreply, UpdatedState};
handle_info(begin_update, State) ->
false = erlang:cancel_timer(State#state.update_timer),
State2 = State#state{ update_timer = undefined },
State3 = begin_update(State2),
{noreply, State3};
handle_info({FetcherPid, Msg}, State)
when FetcherPid =:= State#state.fetcher_pid ->
handle_fetcher_msg(Msg, State);
handle_info({CacherPid, Msg}, State)
when CacherPid =:= State#state.cacher_pid ->
handle_cacher_msg(Msg, State);
handle_info({'EXIT', Pid, Reason}, State) ->
handle_linked_process_death(Pid, Reason, State);
handle_info(_Info, State) ->
{stop, unexpected_info, State}.
-spec terminate(term(), state()) -> ok.
%% @private
terminate(_Reason, _State) ->
ok.
-spec code_change(term(), state(), term()) -> {ok, state()}.
%% @private
code_change(_OldVsn, #state{} = State, _Extra) ->
{ok, State}.
%% ------------------------------------------------------------------
%% Internal Function Definitions - Initialization
%% ------------------------------------------------------------------
-spec validate_fetcher_opts(origin(), list())
-> {ok, {[fetcher_opt()], list()}} |
{error, term()}.
validate_fetcher_opts({maxmind, _}, MixedOpts) ->
locus_maxmind_download:validate_opts(MixedOpts);
validate_fetcher_opts({http, _}, MixedOpts) ->
locus_http_download:validate_opts(MixedOpts);
validate_fetcher_opts({filesystem, _}, MixedOpts) ->
{ok, {[], MixedOpts}};
validate_fetcher_opts({custom_fetcher, _Module, _Args}, MixedOpts) ->
% Whatever opts `Module' needs can be placed in `Args' instead,
% keeping things simple.
{ok, {[], MixedOpts}}.
-spec validate_loader_opts(list(), [fetcher_opt()])
-> {ok, {[loader_opt()], [fetcher_opt()], list()}} |
{error, term()}.
validate_loader_opts(MixedOpts, FetcherOpts) ->
try
lists:partition(
fun ({update_period, Interval} = Opt) ->
?is_pos_integer(Interval)
orelse error({badopt, Opt});
({error_retries, Behaviour} = Opt) ->
is_error_retry_behaviour(Behaviour)
orelse error({badopt, Opt});
(no_cache) ->
true;
({database_cache_file, File} = Opt) ->
% Ensure directory exists
Dirname = filename:dirname(File),
(
has_extenstion(File, ["gz", "mmdb"])
and
filelib:is_dir(Dirname)
)
orelse error({badopt, Opt});
(_) ->
false
end,
MixedOpts)
of
{LoaderOpts, OtherOpts} ->
{ok, {LoaderOpts, FetcherOpts, OtherOpts}}
catch
error:{badopt, BadOpt} ->
{error, BadOpt}
end.
-spec is_error_retry_behaviour(term()) -> boolean().
is_error_retry_behaviour({backoff, Interval}) ->
?is_pos_integer(Interval);
is_error_retry_behaviour({exponential_backoff, #{min_interval := MinInterval,
max_interval := MaxInterval,
growth_base := GrowthBase,
growth_exponent := GrowthExponent}}) ->
?is_pos_integer(MinInterval)
andalso ?is_pos_integer(MaxInterval)
andalso ?is_pos_integer(GrowthBase) andalso GrowthBase >= 1
andalso is_number(GrowthExponent) andalso GrowthExponent >= 0
andalso MaxInterval >= MinInterval;
is_error_retry_behaviour(_) ->
false.
-spec default_settings(origin()) -> settings().
default_settings({maxmind, _}) ->
default_remote_origin_settings();
default_settings({http, _}) ->
default_remote_origin_settings();
default_settings({filesystem, _}) ->
default_local_origin_settings();
default_settings({custom_fetcher, Module, Args}) ->
case locus_custom_fetcher:source(Module, Args) of
{remote, {custom, _}} ->
default_remote_origin_settings();
{local, {custom, _}} ->
default_local_origin_settings()
end.
-spec default_remote_origin_settings() -> settings().
default_remote_origin_settings() ->
#settings{
update_period = timer:hours(6),
error_retry_behaviour = default_remote_origin_error_retry_behaviour(),
error_retry_behaviour_applies_after_readiness = true,
use_cache = true,
database_cache_file = undefined
}.
default_remote_origin_error_retry_behaviour() ->
{exponential_backoff, #{min_interval => timer:seconds(1),
max_interval => timer:minutes(15),
growth_base => timer:seconds(2),
growth_exponent => 0.625}}.
-spec default_local_origin_settings() -> settings().
default_local_origin_settings() ->
#settings{
update_period = timer:seconds(30),
error_retry_behaviour = default_local_origin_error_retry_behaviour(),
error_retry_behaviour_applies_after_readiness = true,
use_cache = false,
database_cache_file = undefined
}.
default_local_origin_error_retry_behaviour() ->
{exponential_backoff, #{min_interval => timer:seconds(1),
max_interval => timer:seconds(30),
growth_base => timer:seconds(2),
growth_exponent => 1.0}}.
-spec customized_settings(settings(), [loader_opt()]) -> settings().
customized_settings(Settings, LoaderOpts) ->
lists:foldl(
fun ({update_period, Interval}, Acc) ->
Acc#settings{ update_period = Interval };
({error_retries, Behaviour}, Acc) ->
Acc#settings{ error_retry_behaviour = Behaviour };
(no_cache, Acc) ->
Acc#settings{ use_cache = false};
({database_cache_file, File}, Acc) ->
Acc#settings{ database_cache_file = File }
end,
Settings, LoaderOpts).
-spec finish_initialization(state()) -> state().
finish_initialization(State)
when (State#state.settings)#settings.use_cache ->
UpdatedState = maybe_mock_fetch_metadata_for_last_successful_load(State),
CachedDatabasePath = cached_database_path(UpdatedState),
Source = {cache, CachedDatabasePath},
{ok, FetcherPid} = locus_filesystem_load:start_link(Source, unknown),
UpdatedState#state{ fetcher_pid = FetcherPid, fetcher_source = Source };
finish_initialization(State) ->
schedule_update(0, State).
maybe_mock_fetch_metadata_for_last_successful_load(State) ->
case State#state.origin of
{custom_fetcher, Module, Args} ->
maybe_mock_custom_fetcher_metadata_for_last_successful_load(Module, Args, State);
_ ->
State
end.
maybe_mock_custom_fetcher_metadata_for_last_successful_load(Module, Args, State) ->
Description = locus_custom_fetcher:description(Module, Args),
#{database_is_stored_remotely := true, database_is_fetched_from := FetchedFrom} = Description,
MockMetadata = #{fetched_from => FetchedFrom, modified_on => unknown},
State#state{fetch_metadata_for_last_successful_load = MockMetadata}.
-spec schedule_update(0 | milliseconds_interval(), state()) -> state().
schedule_update(Interval, State)
when State#state.update_timer =:= undefined ->
NewTimer = erlang:send_after(Interval, self(), begin_update),
State#state{ update_timer = NewTimer }.
-spec cached_database_path(state()) -> nonempty_string().
cached_database_path(#state{
settings = #settings{
database_cache_file = DatabaseCacheFile
}
}
)
when DatabaseCacheFile =/= undefined ->
DatabaseCacheFile;
cached_database_path(State) ->
case State#state.origin of
{maxmind, EditionName} ->
FetcherOpts = State#state.fetcher_opts,
MaybeDate = proplists:get_value(date, FetcherOpts),
cached_database_path_for_maxmind_edition_name(EditionName, MaybeDate);
{http, URL} ->
cached_database_path_for_url(URL);
{custom_fetcher, Module, _Args} ->
#state{fetch_metadata_for_last_successful_load = PreviousMetadata} = State,
#{fetched_from := FetchedFrom} = PreviousMetadata,
cached_database_path_for_custom_fetcher(Module, FetchedFrom)
end.
-spec cached_database_path_for_maxmind_edition_name(atom(), undefined | calendar:date())
-> nonempty_string().
%% @private
cached_database_path_for_maxmind_edition_name(Edition, MaybeDate) ->
DirectoryPath = cache_directory_path(),
BinEdition = atom_to_binary(Edition, utf8),
BaseFilename = locus_util:filesystem_safe_name(BinEdition),
FilenameChardata =
case MaybeDate of
undefined ->
io_lib:format("~ts.mmdb.gz", [BaseFilename]);
{Year, Month, Day} ->
io_lib:format("~ts.~4..0B-~2..0B-~2..0B.mmdb.gz",
[BaseFilename, Year, Month, Day])
end,
Filename = [_|_] = unicode:characters_to_list(FilenameChardata),
filename:join(DirectoryPath, Filename).
-spec cached_database_path_for_url(string()) -> nonempty_string().
%% @private
cached_database_path_for_url(URL) ->
DirectoryPath = cache_directory_path(),
URLHash = crypto:hash(sha256, URL),
HexURLHash = locus_util:bin_to_hex_str(URLHash),
Filename = HexURLHash ++ ".mmdb.gz",
filename:join(DirectoryPath, Filename).
-spec cached_database_path_for_custom_fetcher(module(), term()) -> nonempty_string().
%% @private
cached_database_path_for_custom_fetcher(Module, FetchedFrom) ->
DirectoryPath = cache_directory_path(),
ModuleName = atom_to_binary(Module, utf8),
SafeModuleName = locus_util:filesystem_safe_name(ModuleName),
Hash = erlang:phash2(FetchedFrom, 1 bsl 32),
FilenameChardata = io_lib:format("custom.~ts.~.36..b.mmdb.gz", [SafeModuleName, Hash]),
Filename = [_|_] = unicode:characters_to_list(FilenameChardata),
filename:join(DirectoryPath, Filename).
-spec cache_directory_path() -> nonempty_string().
-ifdef(TEST).
cache_directory_path() ->
filename:join(
filename:basedir(user_cache, "locus_erlang"),
"tests").
-else.
cache_directory_path() ->
filename:basedir(user_cache, "locus_erlang").
-endif.
%% ------------------------------------------------------------------
%% Internal Function Definitions - Database Updates
%% ------------------------------------------------------------------
-spec begin_update(state()) -> state().
begin_update(State)
when State#state.fetcher_pid =:= undefined ->
#state{origin = Origin,
fetcher_opts = FetcherOpts,
last_modified = LastModified} = State,
case Origin of
{maxmind, EditionName} ->
Headers = http_request_headers(LastModified),
{ok, FetcherPid} = locus_maxmind_download:start_link(EditionName, Headers, FetcherOpts),
State#state{ fetcher_pid = FetcherPid,
fetcher_source = {remote, {maxmind, EditionName}} };
{http, URL} ->
Headers = http_request_headers(LastModified),
{ok, FetcherPid} = locus_http_download:start_link(URL, Headers, FetcherOpts),
State#state{ fetcher_pid = FetcherPid, fetcher_source = {remote, URL} };
{filesystem, _} = Source ->
?assertEqual([], FetcherOpts),
{ok, FetcherPid} = locus_filesystem_load:start_link(Source, LastModified),
State#state{ fetcher_pid = FetcherPid, fetcher_source = Source };
{custom_fetcher, Module, Args} ->
?assertEqual([], FetcherOpts),
#state{fetch_metadata_for_last_successful_load = PreviousMetadata} = State,
Source = locus_custom_fetcher:source(Module, Args),
{ok, FetcherPid} = locus_custom_fetcher:start_link(Source, Module, Args,
PreviousMetadata),
State#state{ fetcher_pid = FetcherPid, fetcher_source = Source }
end.
http_request_headers(unknown = _LastModified) ->
http_base_request_headers();
http_request_headers(LastModified) ->
LocalLastModified = calendar:universal_time_to_local_time(LastModified),
[{"if-modified-since", httpd_util:rfc1123_date(LocalLastModified)}
| http_base_request_headers()].
http_base_request_headers() ->
[{"accept", join_http_header_values(
["application/gzip",
"application/x-gzip",
"application/x-gtar",
"application/x-tgz",
"application/x-tar",
"application/octet-stream"
])},
{"connection", "close"}].
join_http_header_values(Values) ->
string:join(Values, "; ").
-spec handle_fetcher_msg(fetcher_msg(), state()) -> {noreply, state()}.
handle_fetcher_msg({event, Event}, State) ->
#state{fetcher_source = Source} = State,
case Source of
{cache, _} ->
{noreply, State};
_ ->
report_event(Event, State),
{noreply, State}
end;
handle_fetcher_msg({finished, Status}, State) ->
#state{fetcher_pid = FetcherPid, fetcher_source = Source} = State,
locus_util:expect_linked_process_termination(FetcherPid),
UpdatedState = State#state{ fetcher_pid = undefined, fetcher_source = undefined },
case Status of
dismissed ->
handle_database_fetch_dismissal(Source, UpdatedState);
{success, Success} ->
handle_database_fetch_success(Source, Success, UpdatedState);
{error, Reason} ->
handle_database_fetch_error(Source, Reason, UpdatedState)
end.
-spec handle_cacher_msg(cacher_msg(), state()) -> {noreply, state()}.
handle_cacher_msg({finished, Status}, State) ->
#state{cacher_pid = CacherPid, cacher_path = CacherPath, cacher_source = Source} = State,
locus_util:expect_linked_process_termination(CacherPid),
UpdatedState = State#state{ cacher_pid = undefined,
cacher_path = undefined,
cacher_source = undefined },
case Status of
success ->
report_event({cache_attempt_finished, CacherPath, ok}, UpdatedState),
handle_load_attempt_conclusion(Source, reset, UpdatedState);
{error, Reason} ->
report_event({cache_attempt_finished, CacherPath, {error, Reason}}, UpdatedState),
handle_load_attempt_conclusion(Source, reset, UpdatedState)
end.
-spec handle_database_fetch_dismissal(source(), state()) -> {noreply, state()}.
handle_database_fetch_dismissal(Source, State)
when State#state.last_modified =/= unknown -> % sanity check
handle_load_attempt_conclusion(Source, reset, State).
-spec handle_database_fetch_success(source(), fetcher_success(), state()) -> {noreply, state()}.
handle_database_fetch_success(Source, Success, State) ->
{BlobFormat, Blob} = fetched_database_format_and_blob(Source, Success),
case unpack_database_from_blob(BlobFormat, Blob) of
{ok, Database, EncodedDatabase} ->
LastModified = fetched_database_modification_datetime(Source, Success),
UpdatedState = maybe_save_fetch_metadata(Source, Success, State),
handle_database_unpack_success(Source, Database, EncodedDatabase,
LastModified, UpdatedState);
{error, Reason} ->
handle_database_unpack_error(Source, Reason, State)
end.
-spec maybe_save_fetch_metadata(source(), fetcher_success(), state()) -> state().
maybe_save_fetch_metadata({_, {custom, _}}, Success, State) ->
#{metadata := Metadata} = Success,
State#state{fetch_metadata_for_last_successful_load = Metadata};
maybe_save_fetch_metadata({remote, _}, _Success, State) ->
?assertEqual(undefined, State#state.fetch_metadata_for_last_successful_load),
State;
maybe_save_fetch_metadata({cache, _}, Success, State) ->
case State#state.fetch_metadata_for_last_successful_load of
undefined ->
State;
Metadata ->
% Not the prettiest hack around.
#{modified_on := ModifiedOn} = Success,
UpdatedMetadata = Metadata#{modified_on := ModifiedOn},
State#state{fetch_metadata_for_last_successful_load = UpdatedMetadata}
end;
maybe_save_fetch_metadata({filesystem, _}, _Success, State) ->
?assertEqual(undefined, State#state.fetch_metadata_for_last_successful_load),
State.
-spec handle_database_unpack_success(source(), locus_mmdb:database(),
binary(),
calendar:datetime(), state())
-> {noreply, state()}.
handle_database_unpack_success(Source, Database, EncodedDatabase, LastModified, State) ->
Version = database_version(Database),
State2 = State#state{ last_modified = LastModified, last_loaded_version = Version },
notify_owner({load_success, Source, Version, Database}, State2),
case Source of
{remote, _} when (State2#state.settings)#settings.use_cache, LastModified =/= unknown ->
CachedDatabasePath = cached_database_path(State2),
CachedDatabaseBlob = make_cached_database_blob(CachedDatabasePath, EncodedDatabase),
{ok, CacherPid} = locus_filesystem_store:start_link(CachedDatabasePath,
CachedDatabaseBlob,
LastModified),
State3 = State2#state{ cacher_pid = CacherPid,
cacher_path = CachedDatabasePath,
cacher_source = Source },
{noreply, State3};
_ ->
handle_load_attempt_conclusion(Source, reset, State2)
end.
-spec database_version(locus_mmdb:database()) -> calendar:datetime().
database_version(Database) ->
#{metadata := Metadata} = Database,
#{build_epoch := BuildEpoch} = Metadata,
locus_database:version(BuildEpoch).
-spec handle_database_unpack_error(source(), term(), state()) -> {noreply, state()}.
handle_database_unpack_error(Source, Reason, State) ->
notify_owner({load_failure, Source, Reason}, State),
handle_load_attempt_conclusion(Source, +1, State).
-spec handle_database_fetch_error(source(), term(), state()) -> {noreply, state()}.
handle_database_fetch_error(Source, Reason, State) ->
notify_owner({load_failure, Source, Reason}, State),
case Source of
{cache, _} -> handle_load_attempt_conclusion(Source, reset, State);
_ -> handle_load_attempt_conclusion(Source, +1, State)
end.
-spec handle_load_attempt_conclusion(source(), reset | +1, state()) -> {noreply, state()}.
handle_load_attempt_conclusion(Source, ErrorBackoffUpdate, State) ->
State2 = update_error_backoff_count(ErrorBackoffUpdate, State),
TimeToNextUpdate = time_to_next_update(Source, State2),
State3 = schedule_update(TimeToNextUpdate, State2),
{noreply, State3}.
-spec update_error_backoff_count(reset | +1, state()) -> state().
update_error_backoff_count(reset, State) ->
State#state{ error_backoff_count = 0 };
update_error_backoff_count(Increment, State) ->
Current = State#state.error_backoff_count,
State#state{ error_backoff_count = Current + Increment }.
-spec time_to_next_update(source(), state()) -> milliseconds_interval().
time_to_next_update(LastFetchSource, State) ->
#state{settings = Settings, error_backoff_count = ErrorBackoffCount} = State,
{LastFetchSourceType, _} = LastFetchSource,
HasAchievedReadiness = (State#state.last_loaded_version =/= unknown),
if LastFetchSourceType =:= cache ->
0;
ErrorBackoffCount > 0,
(Settings#settings.error_retry_behaviour_applies_after_readiness
orelse not HasAchievedReadiness) ->
error_backoff_interval(ErrorBackoffCount, Settings#settings.error_retry_behaviour);
HasAchievedReadiness ->
Settings#settings.update_period
end.
-spec error_backoff_interval(pos_integer(), error_retry_behaviour()) -> milliseconds_interval().
error_backoff_interval(_, {backoff, Interval}) ->
Interval;
error_backoff_interval(Count, {exponential_backoff, Params}) ->
exponential_error_backoff_interval(Count, Params).
exponential_error_backoff_interval(Count, Params) ->
#{min_interval := Min, max_interval := Max,
growth_base := GrowthBase, growth_exponent := GrowthExponent} = Params,
MultipliedGrowthExponent = (Count - 1) * GrowthExponent,
Growth = (1000 * math:pow(GrowthBase / 1000, MultipliedGrowthExponent)) - Min,
min(Max, Min + trunc(Growth)).
-spec unpack_database_from_blob(blob_format(), binary())
-> {ok, locus_mmdb:database(), binary()} |
{error, {unpack_database_from, tgz_blob, {atom(), term(), [term()]}}} |
{error, {unpack_database_from, tarball_blob, {atom(), term(), [term()]}}} |
{error, {unpack_database_from, mmdb_blob, locus_mmdb:unpack_error()}}.
unpack_database_from_blob(tgz, Blob) ->
unpack_database_from_tgz_blob(Blob);
unpack_database_from_blob(tarball, Blob) ->
unpack_database_from_tarball_blob(Blob);
unpack_database_from_blob(gzip, Blob) ->
unpack_database_from_gzip_blob(Blob);
unpack_database_from_blob(gzipped_mmdb, Blob) ->
unpack_database_from_gzipped_mmdb_blob(Blob);
unpack_database_from_blob(mmdb, Blob) ->
unpack_database_from_mmdb_blob(Blob);
unpack_database_from_blob(unknown, Blob) ->
unpack_database_from_unknown_blob(Blob).
-spec unpack_database_from_tgz_blob(binary())
-> {ok, locus_mmdb:database(), binary()} |
{error, {unpack_database_from, tarball_blob, {atom(), term(), [term()]}}} |
{error, {unpack_database_from, mmdb_blob, locus_mmdb:unpack_error()}}.
unpack_database_from_tgz_blob(Blob) ->
try zlib:gunzip(Blob) of
Tarball ->
unpack_database_from_tarball_blob(Tarball)
catch
Class:Reason:Stacktrace ->
SaferReason = locus_util:purge_term_of_very_large_binaries(Reason),
SaferStacktrace = locus_util:purge_term_of_very_large_binaries(Stacktrace),
{error, {unpack_database_from, tgz_blob, {Class, SaferReason, SaferStacktrace}}}
end.
-spec unpack_database_from_gzip_blob(binary())
-> {ok, locus_mmdb:database(), binary()} |
{error, {unpack_database_from, gzip_blob, {atom(), term(), [term()]}}} |
{error, {unpack_database_from, tarball_blob, {atom(), term(), [term()]}}} |
{error, {unpack_database_from, mmdb_blob, locus_mmdb:unpack_error()}}.
unpack_database_from_gzip_blob(Blob) ->
try zlib:gunzip(Blob) of
Uncompressed ->
unpack_database_from_unknown_blob(Uncompressed)
catch
Class:Reason:Stacktrace ->
SaferReason = locus_util:purge_term_of_very_large_binaries(Reason),
SaferStacktrace = locus_util:purge_term_of_very_large_binaries(Stacktrace),
{error, {unpack_database_from, gzip_blob, {Class, SaferReason, SaferStacktrace}}}
end.
-spec unpack_database_from_gzipped_mmdb_blob(binary())
-> {ok, locus_mmdb:database(), binary()} |
{error, {unpack_database_from, gzipped_mmdb_blob, {atom(), term(), [term()]}}} |
{error, {unpack_database_from, mmdb_blob, locus_mmdb:unpack_error()}}.
unpack_database_from_gzipped_mmdb_blob(Blob) ->
try zlib:gunzip(Blob) of
Uncompressed ->
unpack_database_from_mmdb_blob(Uncompressed)
catch
Class:Reason:Stacktrace ->
SaferReason = locus_util:purge_term_of_very_large_binaries(Reason),
SaferStacktrace = locus_util:purge_term_of_very_large_binaries(Stacktrace),
{error, {unpack_database_from, gzipped_mmdb_blob,
{Class, SaferReason, SaferStacktrace}}}
end.
-spec unpack_database_from_unknown_blob(binary())
-> {ok, locus_mmdb:database(), binary()} |
{error, {unpack_database_from, tarball_blob, {atom(), term(), [term()]}}} |
{error, {unpack_database_from, mmdb_blob, locus_mmdb:unpack_error()}}.
unpack_database_from_unknown_blob(Blob) ->
case unpack_database_from_tarball_blob(Blob) of
{ok, _, _} = Success ->
Success;
_ ->
unpack_database_from_mmdb_blob(Blob)
end.
-spec unpack_database_from_tarball_blob(binary())
-> {ok, locus_mmdb:database(), binary()} |
{error, {unpack_database_from, tarball_blob, {atom(), term(), [term()]}}} |
{error, {unpack_database_from, mmdb_blob, locus_mmdb:unpack_error()}}.
unpack_database_from_tarball_blob(Tarball) ->
try extract_mmdb_from_tarball_blob(Tarball) of
EncodedDatabase ->
unpack_database_from_mmdb_blob(EncodedDatabase)
catch
Class:Reason:Stacktrace ->
SaferReason = locus_util:purge_term_of_very_large_binaries(Reason),
SaferStacktrace = locus_util:purge_term_of_very_large_binaries(Stacktrace),
{error, {unpack_database_from, tarball_blob, {Class, SaferReason, SaferStacktrace}}}
end.
-spec unpack_database_from_mmdb_blob(binary())
-> {ok, locus_mmdb:database(), binary()} |
{error, {unpack_database_from, mmdb_blob, locus_mmdb:unpack_error()}}.
unpack_database_from_mmdb_blob(EncodedDatabase) ->
case locus_mmdb:unpack_database(EncodedDatabase) of
{ok, Database} ->
{ok, Database, EncodedDatabase};
{error, Reason} ->
{error, {unpack_database_from, mmdb_blob, Reason}}
end.
-spec extract_mmdb_from_tarball_blob(binary()) -> binary().
extract_mmdb_from_tarball_blob(Tarball) ->
{ok, ContainedPaths} = erl_tar:table({binary, Tarball}),
{true, DatabasePath} = locus_util:lists_anymap(fun has_mmdb_extension/1, ContainedPaths),
{ok, [{DatabasePath, EncodedDatabase}]} =
erl_tar:extract({binary, Tarball}, [{files, [DatabasePath]}, memory]),
EncodedDatabase.
-spec has_mmdb_extension(nonempty_string()) -> boolean().
has_mmdb_extension(Filename) ->
has_extenstion(Filename, ["mmdb"]).
% Make sure the ExpectedExtensions list is passed in reverse order.
% So if file is "archive.tar.gz", the ExpectedExtensions list is ["gz", "tar"].
-spec has_extenstion(nonempty_string(), [nonempty_string()]) -> boolean().
has_extenstion(Filename, ExpectedExtensions) ->
ExtensionParts = filename_extension_parts(Filename),
lists:prefix(ExpectedExtensions, ExtensionParts).
filename_extension_parts(Filename) ->
filename_extension_parts_recur(Filename, []).
filename_extension_parts_recur(Filename, Acc) ->
case filename:extension(Filename) of
"." ++ RawExtension ->
Extension = string:to_lower(RawExtension),
UpdatedFilename = lists:sublist(Filename, length(Filename) - length(Extension) - 1),
UpdatedAcc = [Extension | Acc],
filename_extension_parts_recur(UpdatedFilename, UpdatedAcc);
_ ->
lists:reverse(Acc)
end.
-spec fetched_database_format_and_blob(source(), fetcher_success()) -> {blob_format(), binary()}.
fetched_database_format_and_blob({_, {custom, _}}, #{format := BlobFormat, content := Blob}) ->
{BlobFormat, Blob};
fetched_database_format_and_blob({remote, From}, #{headers := Headers, body := Body}) ->
?assertNotMatch({custom, _}, From),
case {lists:keyfind("content-type", 1, Headers), Body} of
{{_, "application/gzip"}, _} ->
{gzip, Body};
{{_, "application/x-gzip"}, _} ->
{gzip, Body};
{{_, "application/x-gtar"}, <<?GZIP_MAGIC_BYTES, _/bytes>>} ->
{tgz, Body};
{{_, "application/x-tgz"}, _} ->
{tgz, Body};
{{_, "application/x-tar"}, <<?GZIP_MAGIC_BYTES, _/bytes>>} ->
{tgz, Body};
{{_, "application/x-tar"}, _} ->
{tarball, Body};
{_, <<?GZIP_MAGIC_BYTES, _/bytes>>} ->
{gzip, Body};
_ ->
{unknown, Body}
end;
fetched_database_format_and_blob({SourceType, Path}, #{content := Content})
when SourceType =:= cache;
SourceType =:= filesystem ->
case {filename_extension_parts(Path), Content} of
{["tgz" | _], _} ->
{tgz, Content};
{["gz", "tar" | _], _} ->
{tgz, Content};
{["tar" | _], _} ->
{tarball, Content};
{["mmdb" | _], _} ->
{mmdb, Content};
{["gz", "mmdb" | _], _} ->
{gzipped_mmdb, Content};
{_, <<?GZIP_MAGIC_BYTES, _/bytes>>} ->
{gzip, Content};
_ ->
{unknown, Content}
end.
-spec fetched_database_modification_datetime(source(), fetcher_success())
-> calendar:datetime() | unknown.
fetched_database_modification_datetime({_, {custom, _}}, Success) ->
case Success of
#{metadata := #{modified_on := ModificationDate}} ->
ModificationDate;
#{} ->
unknown
end;
fetched_database_modification_datetime({remote, _}, #{headers := Headers}) ->
case lists:keyfind("last-modified", 1, Headers) of
{"last-modified", LastModified} ->
({_, _} = ModificationDate) = httpd_util:convert_request_date(LastModified),
ModificationDate;
false ->
unknown
end;
fetched_database_modification_datetime({cache, _}, #{modified_on := ModificationDate}) ->
ModificationDate;
fetched_database_modification_datetime({filesystem, _}, #{modified_on := ModificationDate}) ->
ModificationDate.
%-spec make_cached_database_blob(nonempty_string(), binary()) -> binary().
make_cached_database_blob(CachedTarballPath, EncodedDatabase) ->
case filename_extension_parts(CachedTarballPath) of
["gz", "mmdb" | _] -> zlib:gzip(EncodedDatabase)
end.
%% ------------------------------------------------------------------
%% Internal Function Definitions - Monitoring and Events
%% ------------------------------------------------------------------
-spec handle_linked_process_death(pid(), term(), state())
-> {stop, Reason, state()}
when Reason :: normal | FetcherStopped | CacherStopped,
FetcherStopped :: {fetched_stopped, pid(), term()},
CacherStopped :: {cached_stopped, pid(), term()}.
handle_linked_process_death(Pid, _, State)
when Pid =:= State#state.owner_pid ->
{stop, normal, State};
handle_linked_process_death(Pid, Reason, State)
when Pid =:= State#state.fetcher_pid ->
{stop, {fetcher_stopped, Pid, Reason}, State};
handle_linked_process_death(Pid, Reason, State)
when Pid =:= State#state.cacher_pid ->
{stop, {cacher_stopped, Pid, Reason}, State}.
-spec report_event(event(), state()) -> ok.
report_event(Event, State) ->
notify_owner({event, Event}, State).
-spec notify_owner(msg(), state()) -> ok.
notify_owner(Msg, State) ->
#state{owner_pid = OwnerPid} = State,
_ = erlang:send(OwnerPid, {self(), Msg}, [noconnect]),
ok.
%% ------------------------------------------------------------------
%% Internal Function Definitions - Unit Tests
%% ------------------------------------------------------------------
-ifdef(TEST).
constant_error_backoff_test() ->
lists:foreach(
fun (_) ->
Interval = rand:uniform(10000),
RetryBehaviour = {backoff, Interval},
lists:foreach(
fun (ConsecutiveErrors) ->
?assertEqual(Interval, error_backoff_interval(ConsecutiveErrors,
RetryBehaviour))
end,
lists:seq(1, 1000))
end,
lists:seq(1, 100)).
exponential_error_backoff_test() ->
Trial1 = {exponential_backoff,
#{min_interval => timer:seconds(1),
max_interval => timer:hours(24),
growth_base => timer:seconds(2),
growth_exponent => 0.625}},
?assertEqual(1000, error_backoff_interval(1, Trial1)),
?assertEqual(1542, error_backoff_interval(2, Trial1)),
?assertEqual(2378, error_backoff_interval(3, Trial1)),
?assertEqual(3668, error_backoff_interval(4, Trial1)),
?assertEqual(5656, error_backoff_interval(5, Trial1)),
?assertEqual(8724, error_backoff_interval(6, Trial1)),
?assertEqual(13454, error_backoff_interval(7, Trial1)),
?assertEqual(20749, error_backoff_interval(8, Trial1)),
?assertEqual(32000, error_backoff_interval(9, Trial1)),
?assertEqual(49350, error_backoff_interval(10, Trial1)),
?assertEqual(76109, error_backoff_interval(11, Trial1)),
?assertEqual(117376, error_backoff_interval(12, Trial1)),
?assertEqual(181019, error_backoff_interval(13, Trial1)),
?assertEqual(279169, error_backoff_interval(14, Trial1)),
?assertEqual(430538, error_backoff_interval(15, Trial1)),
?assertEqual(663981, error_backoff_interval(16, Trial1)),
?assertEqual(1024000, error_backoff_interval(17, Trial1)),
?assertEqual(1579223, error_backoff_interval(18, Trial1)),
?assertEqual(2435496, error_backoff_interval(19, Trial1)),
?assertEqual(3756048, error_backoff_interval(20, Trial1)),
?assertEqual(5792618, error_backoff_interval(21, Trial1)),
?assertEqual(8933439, error_backoff_interval(22, Trial1)),
?assertEqual(13777246, error_backoff_interval(23, Trial1)),
?assertEqual(21247419, error_backoff_interval(24, Trial1)),
?assertEqual(32768000, error_backoff_interval(25, Trial1)),
?assertEqual(50535164, error_backoff_interval(26, Trial1)),
?assertEqual(77935877, error_backoff_interval(27, Trial1)),
?assertEqual(86400000, error_backoff_interval(28, Trial1)),
?assertEqual(86400000, error_backoff_interval(29, Trial1)),
?assertEqual(86400000, error_backoff_interval(30, Trial1)),
Trial2 = {exponential_backoff,
#{min_interval => timer:seconds(1),
max_interval => timer:hours(24),
growth_base => timer:seconds(3),
growth_exponent => 0.5}},
?assertEqual(1000, error_backoff_interval(1, Trial2)),
?assertEqual(1732, error_backoff_interval(2, Trial2)),
?assertEqual(3000, error_backoff_interval(3, Trial2)),
?assertEqual(5196, error_backoff_interval(4, Trial2)),
?assertEqual(9000, error_backoff_interval(5, Trial2)),
?assertEqual(15588, error_backoff_interval(6, Trial2)),
?assertEqual(27000, error_backoff_interval(7, Trial2)),
?assertEqual(46765, error_backoff_interval(8, Trial2)),
?assertEqual(81000, error_backoff_interval(9, Trial2)),
?assertEqual(140296, error_backoff_interval(10, Trial2)),
?assertEqual(243000, error_backoff_interval(11, Trial2)),
?assertEqual(420888, error_backoff_interval(12, Trial2)),
?assertEqual(729000, error_backoff_interval(13, Trial2)),
?assertEqual(1262665, error_backoff_interval(14, Trial2)),
?assertEqual(2187000, error_backoff_interval(15, Trial2)),
?assertEqual(3787995, error_backoff_interval(16, Trial2)),
?assertEqual(6561000, error_backoff_interval(17, Trial2)),
?assertEqual(11363985, error_backoff_interval(18, Trial2)),
?assertEqual(19683000, error_backoff_interval(19, Trial2)),
?assertEqual(34091956, error_backoff_interval(20, Trial2)),
?assertEqual(59049000, error_backoff_interval(21, Trial2)),
?assertEqual(86400000, error_backoff_interval(22, Trial2)),
?assertEqual(86400000, error_backoff_interval(23, Trial2)),
?assertEqual(86400000, error_backoff_interval(24, Trial2)),
?assertEqual(86400000, error_backoff_interval(25, Trial2)),
?assertEqual(86400000, error_backoff_interval(26, Trial2)),
?assertEqual(86400000, error_backoff_interval(27, Trial2)),
?assertEqual(86400000, error_backoff_interval(28, Trial2)),
?assertEqual(86400000, error_backoff_interval(29, Trial2)),
?assertEqual(86400000, error_backoff_interval(30, Trial2)),
Trial3 = {exponential_backoff,
#{min_interval => timer:seconds(1),
max_interval => timer:hours(24),
growth_base => timer:seconds(1.5),
growth_exponent => 2.0}},
?assertEqual(1000, error_backoff_interval(1, Trial3)),
?assertEqual(2250, error_backoff_interval(2, Trial3)),
?assertEqual(5062, error_backoff_interval(3, Trial3)),
?assertEqual(11390, error_backoff_interval(4, Trial3)),
?assertEqual(25628, error_backoff_interval(5, Trial3)),
?assertEqual(57665, error_backoff_interval(6, Trial3)),
?assertEqual(129746, error_backoff_interval(7, Trial3)),
?assertEqual(291929, error_backoff_interval(8, Trial3)),
?assertEqual(656840, error_backoff_interval(9, Trial3)),
?assertEqual(1477891, error_backoff_interval(10, Trial3)),
?assertEqual(3325256, error_backoff_interval(11, Trial3)),
?assertEqual(7481827, error_backoff_interval(12, Trial3)),
?assertEqual(16834112, error_backoff_interval(13, Trial3)),
?assertEqual(37876752, error_backoff_interval(14, Trial3)),
?assertEqual(85222692, error_backoff_interval(15, Trial3)),
?assertEqual(86400000, error_backoff_interval(16, Trial3)),
?assertEqual(86400000, error_backoff_interval(17, Trial3)),
?assertEqual(86400000, error_backoff_interval(18, Trial3)),
?assertEqual(86400000, error_backoff_interval(19, Trial3)),
?assertEqual(86400000, error_backoff_interval(20, Trial3)),
?assertEqual(86400000, error_backoff_interval(21, Trial3)),
?assertEqual(86400000, error_backoff_interval(22, Trial3)),
?assertEqual(86400000, error_backoff_interval(23, Trial3)),
?assertEqual(86400000, error_backoff_interval(24, Trial3)),
?assertEqual(86400000, error_backoff_interval(25, Trial3)),
?assertEqual(86400000, error_backoff_interval(26, Trial3)),
?assertEqual(86400000, error_backoff_interval(27, Trial3)),
?assertEqual(86400000, error_backoff_interval(28, Trial3)),
?assertEqual(86400000, error_backoff_interval(29, Trial3)),
?assertEqual(86400000, error_backoff_interval(30, Trial3)).
-endif.