defmodule Regulator do
@moduledoc """
Adaptive concurrency limits.
"""
alias Regulator.Regulators
alias Regulator.LimiterSup
alias Regulator.Limits
alias Regulator.Buffer
alias Regulator.Telemetry
alias Regulator.Monitor
@type token :: map()
@type result :: {:ok, term()}
| {:error, term()}
| {:ignore, term()}
@doc """
Creates a new regulator.
"""
@spec install(name :: term(), {module(), Keyword.t()}) :: DynamicSupervisor.on_start_child()
def install(name, {mod, opts}) do
opts = %{name: name, limit: {mod, mod.new(opts)}}
DynamicSupervisor.start_child(Regulators, {LimiterSup, opts})
end
@doc """
Removes a regulator.
"""
@spec uninstall(name :: term()) :: :ok | {:error, :not_found}
def uninstall(name) do
# Find the limit supervisor and terminate it. This will also clean up the
# ets tables that we've created since they are created by the supervisor
# process. If the process is not found then we assume its already been
# killed.
case Process.whereis(name) do
nil ->
:ok
pid ->
DynamicSupervisor.terminate_child(Regulators, pid)
end
end
@doc """
Ask for access to a protected service. If we've reached the concurrency limit
then `ask` will return a `:dropped` atom without executing the callback. Otherwise
the callback will be applied. The callback must return tuple with the result
as the first element and the desired return value as the second. The available
result atoms are:
* `:ok` - The call succeeded.
* `:error` - The call failed or timed out. This is used as a signal to backoff or otherwise adjust the limit.
* `:ignore` - The call should not be counted in the concurrency limit. This is typically used to filter out status checks and other low latency RPCs.
"""
@spec ask(term(), (-> result())) :: term()
def ask(name, f) do
with {:ok, ctx} <- ask(name) do
case safe_execute(ctx, f) do
{:ok, result} ->
ok(ctx)
result
{:error, result} ->
error(ctx)
result
{:ignore, result} ->
ignore(ctx)
result
end
end
end
@doc """
Ask for access to a protected service. Instead of executing a callback this
function returns a `dropped` atom or a context. It is the callers responsibility
to check the context map back in to the regulator using one of the corresponding
functions. Care must be taken to avoid leaking these contexts. Otherwise the
regulator will not be able to adjust the inflight count which will eventually
deadlock the regulator.
"""
@spec ask(name :: term()) :: {:ok, token()} | :dropped
def ask(name) do
:ok = Monitor.monitor_me(name)
inflight = Limits.add(name)
limit = Limits.limit(name)
start = Telemetry.start(:ask, %{regulator: name}, %{inflight: min(inflight, limit)})
if inflight <= limit do
{:ok, %{start: start, name: name, inflight: inflight}}
else
Limits.sub(name)
Monitor.demonitor_me(name)
Telemetry.stop(:ask, start, %{regulator: name, result: :dropped})
:dropped
end
end
@doc """
Checks in a context and marks it as "ok".
"""
@spec ok(token()) :: :ok
def ok(ctx) do
rtt = System.monotonic_time() - ctx.start
Telemetry.stop(:ask, ctx.start, %{regulator: ctx.name, result: :ok})
Buffer.add_sample(ctx.name, {rtt, ctx.inflight, false})
Limits.sub(ctx.name)
Monitor.demonitor_me(ctx.name)
:ok
end
@doc """
Checks in a context and marks it as an error.
"""
@spec error(token()) :: :ok
def error(ctx) do
rtt = System.monotonic_time() - ctx.start
Telemetry.stop(:ask, ctx.start, %{regulator: ctx.name, result: :error})
Buffer.add_sample(ctx.name, {rtt, ctx.inflight, true})
Limits.sub(ctx.name)
Monitor.demonitor_me(ctx.name)
:ok
end
@doc """
Checks in a context and ignores the result.
"""
@spec ignore(token()) :: :ok
def ignore(ctx) do
Telemetry.stop(:ask, ctx.start, %{regulator: ctx.name, result: :ignore})
Limits.sub(ctx.name)
Monitor.demonitor_me(ctx.name)
:ok
end
defp safe_execute(ctx, f) do
f.()
rescue
error ->
Limits.sub(ctx.name)
Monitor.demonitor_me(ctx.name)
Telemetry.exception(:ask, ctx.start, :error, error, __STACKTRACE__, %{regulator: ctx.name})
reraise error, __STACKTRACE__
catch
kind, reason ->
Limits.sub(ctx.name)
Monitor.demonitor_me(ctx.name)
Telemetry.exception(:ask, ctx.start, kind, reason, __STACKTRACE__, %{regulator: ctx.name})
:erlang.raise(kind, reason, __STACKTRACE__)
end
end