%% @author Eric des Courtis <eric.descourtis@fireworkhq.com>
%% @doc eharbor is an Erlang library that allows for running a
%% function in a controlled manner, with the ability to deduplicate
%% function calls with the same arguments. It features configurable
%% deduplication, ordered function calls, and buffering to prevent
%% system overload. This library is useful for running expensive tasks,
%% where deduplication of the same inputs can lead to significant
%% performance gains.
%%
%% The config type is a map with the following keys:
%%
%% `name' (default: `default') The name of the harbor instance. This name is used to derive
%% a coordinator name and a pobox name.
%%
%% `backlog' (default: `1000') How big should our pobox buffer be? This is
%% effectively going to determine how much memory the pobox will
%% be allowed to consume. The underlying pobox acts as our load
%% shedding for overload scenarios. Don't set this parameter to
%% an arbitrary value instead try to calculate how much memory will
%% use when the pobox will be full to make sure that the system
%% will survive.
%%
%% `piers' (default: `100') How many unique parameters to process at once. This
%% effectively limits concurrency on the backend. It can be used as a
%% bulkhead by providing a limit that is smaller than connection pool
%% size for a database like Postgres for example.
%%
%% `breakwater_limit' (default: `2000') Maximum number of processes waiting
%% on a result being processed by the backend. This does
%% not include the `backlog' processes. Keep in mind this
%% value should not be to large otherwise memory spikes
%% could occur during fanout of the results from conductor
%% to follower.
%%
%% `dedup' (default: `true') Is this instance being used as a simple bulkhead
%% or are we tring to accelerate the requests by deduplicating them.
%%
%% `ordered' (default: `true') For a sequential process P where no other sequential
%% process writes. If the process reads and then writes when this parameter
%% is set to false the write may appear to have occured before the read.
%% This results in lower latency and lower overhead at the cost of ordering.
%% The parameter is set to true by default since it that behavior is
%% more intuitive. But if maximum performance is desired set it to false.
%%
%% `group_by_key_fun' (default: `fun(Key) -> Key end') Sometimes some parameters need
%% to be passed into harbor but shouldn't be considered for
%% pier assignment or deduplication. This allows you to ignore
%% some parameters while still passing them into your function.
%%
%% `error_type' (default: `value') How to handle scenarios where the pobox is full
%% what type of error to return (should be one of these `throw | error | raise | exit | value').
%%
%% `error_value' (default: `{error, full}') The value of the throw error, exit etc.
%% In this case we return a value because the default is `value'.
%%
%% `buffer_insert_timeout' (default: `5000') How long to wait for insertion into the
%% pobox buffer normally changing this should not be required.
%%
%% `follower_wait_for_conductor_timeout' (default: `infinity') How long the follower should
%% wait after the conductor. Normally you shouldn't
%% have to modify this value since crashes are detected
%% and will not cause followers to wait until timeout.
%%
%% `conductor_wait_for_coordinator_followers_timeout' (default: `infinity') Don't touch unless
%% you know exactly what you are doing.
%% `assign_role_timeout' (default: `infinity') Don't touch unless you know exactly what you
%% are doing.
%%
-module(eharbor).
-export([run/2, run/3, config_defaults/0, merge_with_defaults/1, pobox_name/1,
coordinator_name/1]).
-export_type([incomplete_config/0, config/0]).
-type config() ::
#{name := atom(),
backlog := non_neg_integer(),
piers := non_neg_integer(),
breakwater_limit := non_neg_integer(),
dedup := boolean(),
ordered := boolean(),
group_by_key_fun := fun((any()) -> any()),
error_type := throw | error | raise | exit | value,
error_value := any(),
buffer_insert_timeout := timeout(),
follower_wait_for_conductor_timeout := timeout(),
conductor_wait_for_coordinator_followers_timeout := timeout(),
assign_role_timeout := timeout()}.
-type incomplete_config() ::
#{name => atom(),
backlog => non_neg_integer(),
piers => non_neg_integer(),
breakwater_limit => non_neg_integer(),
dedup => boolean(),
ordered => boolean(),
group_by_key_fun => fun((any()) -> any()),
error_type => throw | error | raise | exit | value,
error_value => any(),
buffer_insert_timeout => timeout(),
follower_wait_for_conductor_timeout => timeout(),
conductor_wait_for_coordinator_followers_timeout => timeout(),
assign_role_timeout => timeout()}.
-define(CONFIG_DEFAULTS,
#{name => default,
backlog => 1000,
piers => 100,
breakwater_limit => 2000,
dedup => true,
ordered => true,
group_by_key_fun => fun(Key) -> Key end,
error_type => value,
error_value => {error, full},
buffer_insert_timeout => 5000,
follower_wait_for_conductor_timeout => infinity,
conductor_wait_for_coordinator_followers_timeout => infinity,
assign_role_timeout => infinity}).
%% @doc start_link the harbor instance using a configuration
%%
%% @end
-spec start_link(Config :: config()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}.
start_link(Config) ->
eharbor_coordinator:start_link(Config).
%% @doc Return the configuration defaults for harbor.
%%
%% `config_defaults/2' returns the default configuration parameters.
%% This is later used to convert an `incomplete_config()' into a
%% `config()'.
%%
%% @see merge_with_defaults/1
%% @end
-spec config_defaults() -> config().
config_defaults() ->
?CONFIG_DEFAULTS.
%% @doc Merge an incomplete configuration with the defaults to produce a proper config.
%%
%% `merge_with_defaults/2' returns overridden parameters merged with the default
%% configuration parameters when the paramter is missing.
%%
%% @see config_defaults/0
%% @end
-spec merge_with_defaults(incomplete_config()) -> config().
merge_with_defaults(Map) when is_map(Map) ->
maps:merge_with(fun(_Key, Value1, Value2) ->
case Value2 of
undefined ->
Value1;
_ ->
Value2
end
end,
eharbor:config_defaults(),
Map).
%% @doc Run a function using a function reference and arguments with the
%% config using a harbor instance.
%%
%% @end
-spec run(fun((any()) -> any()), [any()], config()) -> any() | no_return().
run(Function, Arguments, Config = #{ordered := Ordered}) ->
do_run(Function, Arguments, Config, Ordered).
do_run(Function,
Arguments,
Config =
#{name := Name,
buffer_insert_timeout := BufferInsertTimeout,
follower_wait_for_conductor_timeout := FollowerWaitForConductorTimeout,
conductor_wait_for_coordinator_followers_timeout :=
ConductorWaitForCoordinatorFollowersTimeout,
assign_role_timeout := AssignRoleTimeout,
group_by_key_fun := GroupByKeyFun,
error_type := ErrorType,
error_value := ErrorValue},
Ordered) ->
PoboxName = pobox_name(Name),
CoordinatorName = coordinator_name(Name),
%% Start monitoring the coordinator before we start so we don't
%% wait forever if it dies
MRef = erlang:monitor(process, erlang:whereis(CoordinatorName)),
%% Ask pobox to store our arguments (transformed by group_by_key_fun)
%% This is how we dock into the harbor. If the buffer is full it will
%% return :full
case pobox:post_sync(PoboxName,
{'$habor_dock', {self(), MRef}, apply(GroupByKeyFun, Arguments)},
BufferInsertTimeout)
of
%% We successfully docked into the harbor
ok ->
%% Wait until we are assigned a role
receive
{MRef, {follower, ConductorPid}} ->
%% We got assigned a follower role lets monitor the conductor for this pier
%% since he is doing the work
ConductorMRef = erlang:monitor(process, ConductorPid),
receive
{MRef, Result0} ->
%% We have our result computed by the conductor
%% We can stop monitoring and flush monitoring messages
erlang:demonitor(MRef, [flush]),
erlang:demonitor(ConductorMRef, [flush]),
%% Ensure ordering (only the follower needs to care
%% because it may be joining an in progress operation
case Ordered of
true ->
%% Run again we have to wait for the current operation
%% the next one with be synchronized
do_run(Function, Arguments, Config, false);
false ->
%% Error was encapsulated by the conductor lets unpack it so we behave
%% as if we executed it
case Result0 of
%% Return the result
{success, Result1} ->
Result1;
{value, Val} ->
Val;
{Class, Reason, Stacktrace} ->
erlang:raise(Class, Reason, Stacktrace)
end
end;
{'DOWN', MRef, _, _, Reason} ->
%% Coordinator died clean up and exit with the same reason
erlang:demonitor(MRef, [flush]),
erlang:demonitor(ConductorMRef, [flush]),
erlang:exit(Reason);
{'DOWN', ConductorMRef, _, _, Reason} ->
%% Conductor died clean up and exit
erlang:demonitor(MRef, [flush]),
erlang:demonitor(ConductorMRef, [flush]),
erlang:exit({conductor_died, Reason})
after FollowerWaitForConductorTimeout ->
erlang:demonitor(MRef, [flush]),
erlang:exit(timeout)
end;
{MRef, conductor} ->
%% We are assigned the conductor role by the coordinator
Result0 =
try
%% Run the real function specified by the user and encapsulate it
%% in a success tuple
{success, apply(Function, Arguments)}
catch
Class0:Reason0:Stacktrace0 ->
{Class0, Reason0, Stacktrace0}
end,
%% Let the coordinator know that we are done the work so it can stop
%% accumulating followers for this pier and send use the list.
%% We are sending it a reply to reference to call us back with the details.
erlang:send(CoordinatorName, {'$harbor_work_completed', {erlang:self(), MRef}}),
receive
{MRef, {followers, Followers}} ->
%% We have the list of followers from the coordinator now we can fan out
%% the result and stop monitoring the coordinator since we are now undocked
%% from the harbor
erlang:demonitor(MRef, [flush]),
lists:foreach(fun(Follower) -> gen:reply(Follower, Result0) end,
Followers);
{'DOWN', MRef, _, _, Reason1} ->
%% Coordinator died clean up and exit
erlang:demonitor(MRef, [flush]),
erlang:exit(Reason1)
after ConductorWaitForCoordinatorFollowersTimeout ->
erlang:demonitor(MRef, [flush]),
erlang:exit(timeout)
end,
%% Error was encapsulated by the conductor lets unpack it so we behave
%% as if we executed it
case Result0 of
%% Return the result
{success, Result1} ->
Result1;
{value, Val} ->
Val;
{Class2, Reason2, Stacktrace2} ->
erlang:raise(Class2, Reason2, Stacktrace2)
end;
{'DOWN', MRef, _, _, Reason} ->
%% Coordinator died clean up and exit
erlang:demonitor(MRef, [flush]),
erlang:exit(Reason)
after AssignRoleTimeout ->
erlang:demonitor(MRef, [flush]),
erlang:exit(timeout)
end;
full ->
%% We can't dock into the harbor the backlog is full, there is no
%% space available in the buffer. Emit the user chosen error result
erlang:demonitor(MRef, [flush]),
case ErrorType of
throw ->
erlang:throw(ErrorValue);
error ->
erlang:error(ErrorValue);
exit ->
erlang:exit(ErrorValue);
value ->
ErrorValue
end
end.
%% @doc Run a function using an MFA with the config using a harbor instance.
%%
%% @end
-spec run(MFA :: mfa(), Config :: config()) -> any() | no_return().
run({Module, Function, Arguments}, Config) ->
F = fun(Args) -> erlang:apply(Module, Function, Args) end,
run(F, Arguments, Config).
%% @doc From the harbor name return the derived pobox name.
%%
%% The pobox is a named process and to talk to it directly
%% some operations require its name.
%%
%% @end
-spec pobox_name(Name :: atom()) -> atom().
pobox_name(Name) ->
erlang:binary_to_atom(<<(erlang:atom_to_binary(Name))/binary, "_pobox">>).
%% @doc From the harbor name return the derived coordinator name.
%%
%% The coordinator is a named process and to talk to it directly
%% some operations require its name.
%%
%% @end
-spec coordinator_name(Name :: atom()) -> atom().
coordinator_name(Name) ->
erlang:binary_to_atom(<<(erlang:atom_to_binary(Name))/binary, "_coordinator">>).