%%------------------------------------------------------------------------------
%% @doc Throttle given rate over a number of seconds.
%%
%% Implementation uses time spacing reservation algorithm where each
%% allocation of samples reserves a fraction of space in the throttling
%% window. The reservation gets freed as the time goes by. No more than
%% the `Rate' number of samples are allowed to fit in the milliseconds `Window'.
%%
%% This is an Erlang implementation of the throttling algorithm from the utxx
%% library found at this URL:
%% [https://github.com/saleyn/utxx/blob/master/include/utxx/rate_throttler.hpp]
%%
%% @author Serge Aleynikov <saleyn at gmail dot com>
%% @end
%%------------------------------------------------------------------------------
%% Copyright (c) 2011 Serge Aleynikov
%%
%% Permission is hereby granted, free of charge, to any person
%% obtaining a copy of this software and associated documentation
%% files (the "Software"), to deal in the Software without restriction,
%% including without limitation the rights to use, copy, modify, merge,
%% publish, distribute, sublicense, and/or sell copies of the Software,
%% and to permit persons to whom the Software is furnished to do
%% so, subject to the following conditions:
%%
%% The above copyright notice and this permission notice shall be included
%% in all copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
%% MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
%% IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
%% CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT,
%% TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE
%% SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
%%------------------------------------------------------------------------------
-module(throttle).
-export([new/1, new/2, new/3, available/1, available/2, used/1, used/2]).
-export([next_timeout/1, call/2, call/3, call/4, call/5]).
-export([now/0, reset/1, reset/2, add/1, add/2, add/3, curr_rps/1, curr_rps/2]).
-compile({no_auto_import,[now/0]}).
-record(throttle, {
rate
, window :: integer() %% In microseconds
, step :: integer() %% In microseconds
, next_ts :: integer()
}).
-type throttle() :: #throttle{}.
-type time() :: non_neg_integer().
-type throttle_opts() :: #{
retries => integer(),
retry_delay => integer(),
blocking => boolean()
}.
%% `retries' - number of retries.
%% `retry_delay' - delay in milliseconds between successive retries.
%% `blocking' - instructs to block the call if throttled.
-type throttle_result() ::
{ok, any()} | {error, throttled | {Reason::any(), StackTrace::list()}}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
%% @doc Create a new throttle given the `Rate' per second.
-spec new(non_neg_integer()) -> throttle().
new(Rate) ->
new(Rate, 1000).
%% @see new/3
-spec new(non_neg_integer(), non_neg_integer()) -> throttle().
new(Rate, Window) ->
new(Rate, Window, now()).
%% @doc Create a new throttle given the `Rate' per `Window' milliseconds.
%% `Now' is expressesed in microseconds since epoch using `now()'.
-spec new(non_neg_integer(), non_neg_integer(), time()) -> throttle().
new(Rate, Window, Now) when is_integer(Rate), is_integer(Window), is_integer(Now) ->
Win = Window * 1000,
Step = if Rate == 0 -> 0; true -> Win div Rate end,
#throttle{rate = Rate, window = Win, step = Step, next_ts = Now}.
%% @see reset/2
reset(#throttle{} = T) ->
reset(T, now()).
%% @doc Reset the throttle request counter
reset(#throttle{} = T, Now) when is_integer(Now) ->
T#throttle{next_ts = Now}.
%% @doc Call the lambda `F', ensuring that it's not called more
%% frequently than the throttle would allow.
%%
%% Example:
%% <code>
%% 1> T = throttle:new(10, 1000).
%% 2> lists:foldl(fun(_,{T1,A}) ->
%% {T2,R} = throttle:call(T1, fun() -> http:get("google.com") end),
%% {T2, [R|A]}
%% end, {T,[]}, lists:seq(1, 100)).
%% </code>
call(T, F) ->
call2(T, F, #{}, now()).
%% @doc Call the lambda `F', ensuring that it's not called more
%% often then the throttle would allow. `Opts' are a map of options.
%% When `{retries, R}' option is given and `R' is greater than 0, the
%% throttler will call the function `F()' up to `R' times if the `F()'
%% raises an exception. The delay between retries is controlled by
%% the `{retry_delay, D}' options, expressed in milliseconds (default: `1')
%% between successive executions of `F()'.
%% If `F()' still raises an exception after the R's retry, that exception
%% would be reraised and it would be the responsibility of the caller
%% to handle it.
-spec call(#throttle{}, fun(() -> any()), throttle_opts()) ->
{#throttle{}, throttle_result()}.
call(#throttle{} = T, F, Opts) when is_function(F, 0), is_map(Opts) ->
call2(T, F, Opts, now()).
%% @doc Call M,F,A, ensuring that it's not called more frequently than the
%% throttle would allow.
%%
%% Example:
%% <code>
%% 1> T = throttle:new(10, 1000).
%% 2> lists:foldl(fun(_,{T1,A}) ->
%% {T2,R} = throttle:call(T1, http, get, ["google.com"]),
%% {T2, [R|A]}
%% end, {T,[]}, lists:seq(1, 100)).
%% </code>
call(T, M,F,A) when is_atom(M), is_atom(F), is_list(A) ->
call(T, M,F,A, #{}, now());
call(T, F, Opts, Now) when is_function(F, 0), is_map(Opts), is_integer(Now) ->
call2(T, F, Opts, Now).
%% @doc Call M,F,A, ensuring that it's not called more frequently than the
%% throttle would allow.
call(T, M,F,A, Now) when is_integer(Now) ->
call(T, M,F,A, #{}, Now);
call(T, M,F,A, Opts) when is_map(Opts) ->
call(T, M,F,A, Opts, now()).
%% @doc Call M,F,A, ensuring that it's not called more frequently than the
%% throttle would allow.
-spec call(#throttle{}, atom(), atom(), [any()], non_neg_integer(), throttle_opts()) ->
{#throttle{}, throttle_result()}.
call(#throttle{} = T, M,F,A, Opts, Now) when is_atom(M), is_atom(F), is_list(A) ->
call2(T, fun() -> apply(M,F,A) end, Opts, Now).
-spec call2(#throttle{}, fun(() -> any()), throttle_opts(), non_neg_integer()) ->
{#throttle{}, throttle_result()}.
call2(#throttle{} = T, F, Opts, Now) when is_integer(Now), is_function(F, 0), is_map(Opts) ->
Retries = maps:get(retries, Opts, 0),
DelayMS = maps:get(retry_delay, Opts, 1),
Block = maps:get(blocking, Opts, true),
call3(T, F, Now, Block, Retries, DelayMS).
call3(T, F, Now, Block, Retries, DelayMS) ->
case next_timeout(T, Now) of
0 ->
{1, T1} = add(T, 1, Now),
Result = try {ok, F()} catch _:R:Trace -> {retry, {R, Trace}} end,
case Result of
{ok, _} ->
{T1, Result};
{retry, _} when Retries > 0 ->
receive after DelayMS -> ok end,
call3(T, F, now(), Block, Retries-1, DelayMS);
{retry, Error} ->
{error, Error}
end;
_ when not Block ->
{error, throttled};
WaitMS ->
receive after WaitMS -> ok end,
call3(T, F, now(), Block, Retries, DelayMS)
end.
%% @doc Add one sample to the throttle
add(T) -> add(T, 1).
%% @doc Add `Samples' to the throttle
add(T, Samples) -> add(T, Samples, now()).
%% @doc Add `Samples' to the throtlle.
%% Return `{FitSamples, State}', where `FitSamples' are the number of samples
%% that fit in the throttling window. 0 means that the throttler is fully
%% congested, and more time needs to elapse before the throttles gets reset
%% to accept more samples.
-spec add(throttle(), integer(), time()) -> {integer(), throttle()}.
add(#throttle{rate = 0}, Samples, _Now) ->
Samples;
add(#throttle{next_ts = TS, step = Step, window = Win} = T, Samples, Now) ->
NextTS = TS + Samples * Step,
NowNextTS = Now + Win,
Diff = NextTS - NowNextTS,
if
Diff < -Win ->
{Samples, T#throttle{next_ts = Now + Step}};
Diff < 0 ->
{Samples, T#throttle{next_ts = NextTS}};
true ->
N = max(0, Samples - (Diff div Step)),
{N, T#throttle{next_ts = TS + N * Step}}
end.
%% @see available/2
available(T) -> available(T, now()).
%% @doc Return the number of available samples given `Now' current time.
available(#throttle{rate=0}=T,_Now) -> T#throttle.window;
available(#throttle{} =T, Now) -> calc_available(T, Now).
%% @see used/2
used(T) -> used(T, now()).
%% @doc Return the number of used samples given `a_now' current time.
used(#throttle{rate = 0}, _Now) -> 0;
used(#throttle{rate = R} = T, Now) -> R-calc_available(T, Now).
%% @doc Return the number of milliseconds to wait until the throttling
%% threshold is satisfied to fit another sample.
next_timeout(T) -> next_timeout(T, now()).
next_timeout(#throttle{next_ts = TS, step = Step, window = Win}, Now) ->
NextTS = TS + Step,
NowNextTS = Now + Win,
Diff = NextTS - NowNextTS,
if
Diff =< 0 -> 0;
true -> ceil(Diff / 1000)
end.
%% @see curr_rps/2
curr_rps(T) -> curr_rps(T, now()).
%% @doc Return currently used rate per second.
curr_rps(#throttle{rate=0}, _Now) -> 0;
curr_rps(#throttle{rate=R}=T, Now) ->
(R-calc_available(T, Now))*1000000/T#throttle.window.
now() ->
erlang:system_time(microsecond).
%%------------------------------------------------------------------------------
%% Internal functions
%%------------------------------------------------------------------------------
%% Return the number of available samples given `Now' current time.
calc_available(#throttle{rate=R, window=W, step=S} = T, Now) ->
Diff = Now - T#throttle.next_ts,
if Diff >= 0 -> R;
true -> min(R, max(0, (W+Diff) div S))
end.
%%------------------------------------------------------------------------------
%% Unit testing
%%------------------------------------------------------------------------------
-ifdef(EUNIT).
time(TS, US) -> erlang:universaltime_to_posixtime(TS) * 1000000 + US.
all_test() ->
Now = time({{2015, 6, 1}, {11,59,58}}, 900000),
Thr = throttle:new(10, 1000, Now), %% Throttle 10 samples / sec
?assertEqual(100000, Thr#throttle.step),
?assertEqual(10, throttle:available(Thr, Now)),
{N, T1} = throttle:add(Thr, 1, Now),
?assertEqual(1, N),
?assertEqual(9, throttle:available(T1, Now)),
?assertEqual(1, throttle:used(T1, Now)),
Now1 = time({{2015, 6, 1}, {11,59,58}}, 999999),
?assertEqual(9, throttle:available(T1, Now1)),
?assertEqual(1, throttle:used(T1, Now1)),
Now2 = time({{2015, 6, 1}, {11,59,59}}, 0),
?assertEqual(10,throttle:available(T1, Now2)),
?assertEqual(0, throttle:used(T1, Now2)),
Now3 = time({{2015, 6, 1}, {12,0,0}}, 0),
% 1 second elapsed, the throttler's interval is reset, and 10 samples are available
?assertEqual(10,throttle:available(T1, Now3)),
?assertEqual(0, throttle:used(T1, Now2)),
{N1, T2} = throttle:add(T1, 1, Now3),
?assertEqual(1, N1),
?assertEqual(9, throttle:available(T2, Now3)),
?assertEqual(1, throttle:used(T2, Now3)),
TT = throttle:new(5, 1000),
Now4 = now(),
{_,RR} = lists:foldl(fun(_,{TT1,A}) ->
{TT2, R} = throttle:call(TT1, erlang, system_time, [millisecond]),
{TT2, [R|A]}
end, {TT, []}, lists:seq(1, 15)),
Now5 = now(),
?assertEqual(15, length(RR)),
?assert(Now5 - Now4 >= 2000000),
TT2 = throttle:new(5, 1000),
{_,RR1} = lists:foldl(fun(_,{TT3,A}) ->
{TT4, R} = throttle:call(TT3, fun() -> erlang:system_time(millisecond) end),
{TT4, [R|A]}
end, {TT2, []}, lists:seq(1, 15)),
Now6 = now(),
?assertEqual(15, length(RR1)),
?assert(Now6 - Now5 >= 2000000),
ok.
retry_ok_test() ->
Now = now(),
Opts = #{retries => 3, retry_delay => 100},
TT = throttle:new(10, 1000),
Inc = fun(undefined) -> 1; (N) -> N+1 end,
F = fun() ->
case get(count) of
2 -> success;
N -> put(count, Inc(N)), erlang:error(exception)
end
end,
{_, R} = throttle:call(TT, F, Opts, Now),
erase(count),
?assertEqual({ok, success}, R).
retry_fail_test() ->
Now = now(),
Opts = #{retries => 3, retry_delay => 100},
TT = throttle:new(10, 1000),
{error, {exception, _}} = throttle:call(TT, fun() -> erlang:error(exception) end, Opts, Now),
Diff = now() - Now,
%% Expected delay should be around 300ms
?assert(Diff >= 300000),
?assert(Diff < 400000).
retry_fail_no_block_test() ->
Now = now(),
Opts = #{retries => 3, retry_delay => 0},
TT = throttle:new(10, 1000),
{error, {exception, _}} = throttle:call(TT, fun() -> erlang:error(exception) end, Opts, Now),
Diff = now() - Now,
%% Expected delay should be around 0ms
?assert(Diff > 0),
?assert(Diff < 100).
-endif.