defmodule Nebulex.Adapters.Local.Generation do
@moduledoc """
Generational garbage collection process.
The generational garbage collector manage the heap as several sub-heaps,
known as generations, based on age of the objects. An object is allocated
in the youngest generation, sometimes called the nursery, and is promoted
to an older generation if its lifetime exceeds the threshold of its current
generation (defined by option `:gc_interval`). Every time the GC runs
(triggered by `:gc_interval` timeout), a new cache generation is created
and the oldest one is deleted.
The only way to create new generations is through this module (this server
is the metadata owner) calling `new/2` function. When a Cache is created,
a generational garbage collector is attached to it automatically,
therefore, this server MUST NOT be started directly.
## Options
These options are configured through the `Nebulex.Adapters.Local` adapter:
* `:gc_interval` - If it is set, an integer > 0 is expected defining the
interval time in milliseconds to garbage collection to run, delete the
oldest generation and create a new one. If this option is not set,
garbage collection is never executed, so new generations must be
created explicitly, e.g.: `MyCache.new_generation(opts)`.
* `:max_size` - If it is set, an integer > 0 is expected defining the
max number of cached entries (cache limit). If it is not set (`nil`),
the check to release memory is not performed (the default).
* `:allocated_memory` - If it is set, an integer > 0 is expected defining
the max size in bytes allocated for a cache generation. When this option
is set and the configured value is reached, a new cache generation is
created so the oldest is deleted and force releasing memory space.
If it is not set (`nil`), the cleanup check to release memory is
not performed (the default).
* `:gc_cleanup_min_timeout` - An integer > 0 defining the min timeout in
milliseconds for triggering the next cleanup and memory check. This will
be the timeout to use when either the max size or max allocated memory
is reached. Defaults to `10_000` (10 seconds).
* `:gc_cleanup_max_timeout` - An integer > 0 defining the max timeout in
milliseconds for triggering the next cleanup and memory check. This is
the timeout used when the cache starts and there are few entries or the
consumed memory is near to `0`. Defaults to `600_000` (10 minutes).
"""
# State
defstruct [
:cache,
:name,
:telemetry,
:telemetry_prefix,
:meta_tab,
:backend,
:backend_opts,
:stats_counter,
:gc_interval,
:gc_heartbeat_ref,
:max_size,
:allocated_memory,
:gc_cleanup_min_timeout,
:gc_cleanup_max_timeout,
:gc_cleanup_ref
]
use GenServer
import Nebulex.Helpers
alias Nebulex.Adapter
alias Nebulex.Adapter.Stats
alias Nebulex.Adapters.Local
alias Nebulex.Adapters.Local.{Backend, Metadata}
alias Nebulex.Telemetry
alias Nebulex.Telemetry.StatsHandler
@type t :: %__MODULE__{}
@type server_ref :: pid | atom | :ets.tid()
@type opts :: Nebulex.Cache.opts()
## API
@doc """
Starts the garbage collector for the built-in local cache adapter.
"""
@spec start_link(opts) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts)
end
@doc """
Creates a new cache generation. Once the max number of generations
is reached, when a new generation is created, the oldest one is
deleted.
## Options
* `:reset_timer` - Indicates if the poll frequency time-out should
be reset or not (default: true).
## Example
Nebulex.Adapters.Local.Generation.new(MyCache)
Nebulex.Adapters.Local.Generation.new(MyCache, reset_timer: false)
"""
@spec new(server_ref, opts) :: [atom]
def new(server_ref, opts \\ []) do
reset_timer? = get_option(opts, :reset_timer, "boolean", &is_boolean/1, true)
do_call(server_ref, {:new_generation, reset_timer?})
end
@doc """
Removes or flushes all entries from the cache (including all its generations).
## Example
Nebulex.Adapters.Local.Generation.delete_all(MyCache)
"""
@spec delete_all(server_ref) :: integer
def delete_all(server_ref) do
do_call(server_ref, :delete_all)
end
@doc """
Reallocates the block of memory that was previously allocated for the given
`server_ref` with the new `size`. In other words, reallocates the max memory
size for a cache generation.
## Example
Nebulex.Adapters.Local.Generation.realloc(MyCache, 1_000_000)
"""
@spec realloc(server_ref, pos_integer) :: :ok
def realloc(server_ref, size) do
do_call(server_ref, {:realloc, size})
end
@doc """
Returns the memory info in a tuple form `{used_mem, total_mem}`.
## Example
Nebulex.Adapters.Local.Generation.memory_info(MyCache)
"""
@spec memory_info(server_ref) :: {used_mem :: non_neg_integer, total_mem :: non_neg_integer}
def memory_info(server_ref) do
do_call(server_ref, :memory_info)
end
@doc """
Resets the timer for pushing new cache generations.
## Example
Nebulex.Adapters.Local.Generation.reset_timer(MyCache)
"""
def reset_timer(server_ref) do
server_ref
|> server()
|> GenServer.cast(:reset_timer)
end
@doc """
Returns the list of the generations in the form `[newer, older]`.
## Example
Nebulex.Adapters.Local.Generation.list(MyCache)
"""
@spec list(server_ref) :: [:ets.tid()]
def list(server_ref) do
server_ref
|> get_meta_tab()
|> Metadata.get(:generations, [])
end
@doc """
Returns the newer generation.
## Example
Nebulex.Adapters.Local.Generation.newer(MyCache)
"""
@spec newer(server_ref) :: :ets.tid()
def newer(server_ref) do
server_ref
|> get_meta_tab()
|> Metadata.get(:generations, [])
|> hd()
end
@doc """
Returns the PID of the GC server for the given `server_ref`.
## Example
Nebulex.Adapters.Local.Generation.server(MyCache)
"""
@spec server(server_ref) :: pid
def server(server_ref) do
server_ref
|> get_meta_tab()
|> Metadata.fetch!(:gc_pid)
end
@doc """
A convenience function for retrieving the state.
"""
@spec get_state(server_ref) :: t
def get_state(server_ref) do
server_ref
|> server()
|> GenServer.call(:get_state)
end
defp do_call(tab, message) do
tab
|> server()
|> GenServer.call(message)
end
defp get_meta_tab(server_ref) when is_atom(server_ref) or is_pid(server_ref) do
Adapter.with_meta(server_ref, fn _, %{meta_tab: meta_tab} ->
meta_tab
end)
end
defp get_meta_tab(server_ref), do: server_ref
## GenServer Callbacks
@impl true
def init(opts) do
# Trap exit signals to run cleanup process
_ = Process.flag(:trap_exit, true)
# Initial state
state = struct(__MODULE__, parse_opts(opts))
# Init cleanup timer
cleanup_ref =
if state.max_size || state.allocated_memory,
do: start_timer(state.gc_cleanup_max_timeout, nil, :cleanup)
# Timer ref
{:ok, ref} =
if state.gc_interval,
do: {new_gen(state), start_timer(state.gc_interval)},
else: {new_gen(state), nil}
# Update state
state = %{state | gc_cleanup_ref: cleanup_ref, gc_heartbeat_ref: ref}
{:ok, state, {:continue, :attach_stats_handler}}
end
defp parse_opts(opts) do
# Get adapter metadata
adapter_meta = Keyword.fetch!(opts, :adapter_meta)
# Add the GC PID to the meta table
meta_tab = Map.fetch!(adapter_meta, :meta_tab)
:ok = Metadata.put(meta_tab, :gc_pid, self())
# Common validators
pos_integer = &(is_integer(&1) and &1 > 0)
pos_integer_or_nil = &((is_integer(&1) and &1 > 0) or is_nil(&1))
Map.merge(adapter_meta, %{
backend_opts: Keyword.get(opts, :backend_opts, []),
gc_interval: get_option(opts, :gc_interval, "an integer > 0", pos_integer_or_nil),
max_size: get_option(opts, :max_size, "an integer > 0", pos_integer_or_nil),
allocated_memory: get_option(opts, :allocated_memory, "an integer > 0", pos_integer_or_nil),
gc_cleanup_min_timeout:
get_option(opts, :gc_cleanup_min_timeout, "an integer > 0", pos_integer, 10_000),
gc_cleanup_max_timeout:
get_option(opts, :gc_cleanup_max_timeout, "an integer > 0", pos_integer, 600_000)
})
end
@impl true
def handle_continue(:attach_stats_handler, %__MODULE__{stats_counter: nil} = state) do
{:noreply, state}
end
def handle_continue(:attach_stats_handler, %__MODULE__{stats_counter: stats_counter} = state) do
_ =
Telemetry.attach_many(
stats_counter,
[state.telemetry_prefix ++ [:command, :stop]],
&StatsHandler.handle_event/4,
stats_counter
)
{:noreply, state}
end
@impl true
def terminate(_reason, state) do
if ref = state.stats_counter, do: Telemetry.detach(ref)
end
@impl true
def handle_call(:delete_all, _from, %__MODULE__{} = state) do
# Get current size
size =
state
|> Map.from_struct()
|> Local.execute(:count_all, nil, [])
# Create new generation
:ok = new_gen(state)
# Delete all objects
:ok =
state.meta_tab
|> list()
|> Enum.each(&state.backend.delete_all_objects(&1))
{:reply, size, %{state | gc_heartbeat_ref: maybe_reset_timer(true, state)}}
end
def handle_call({:new_generation, reset_timer?}, _from, state) do
# Create new generation
:ok = new_gen(state)
# Maybe reset heartbeat timer
heartbeat_ref = maybe_reset_timer(reset_timer?, state)
{:reply, :ok, %{state | gc_heartbeat_ref: heartbeat_ref}}
end
def handle_call(
:memory_info,
_from,
%__MODULE__{backend: backend, meta_tab: meta_tab, allocated_memory: allocated} = state
) do
{:reply, {memory_info(backend, meta_tab), allocated}, state}
end
def handle_call({:realloc, mem_size}, _from, state) do
{:reply, :ok, %{state | allocated_memory: mem_size}}
end
def handle_call(:get_state, _from, state) do
{:reply, state, state}
end
@impl true
def handle_cast(:reset_timer, state) do
{:noreply, %{state | gc_heartbeat_ref: maybe_reset_timer(true, state)}}
end
@impl true
def handle_info(
:heartbeat,
%__MODULE__{
gc_interval: gc_interval,
gc_heartbeat_ref: heartbeat_ref
} = state
) do
# Create new generation
:ok = new_gen(state)
# Reset heartbeat timer
heartbeat_ref = start_timer(gc_interval, heartbeat_ref)
{:noreply, %{state | gc_heartbeat_ref: heartbeat_ref}}
end
def handle_info(:cleanup, state) do
# Check size first, if the cleanup is done, skip checking the memory,
# otherwise, check the memory too.
{_, state} =
with {false, state} <- check_size(state) do
check_memory(state)
end
{:noreply, state}
end
defp check_size(%__MODULE__{max_size: max_size} = state) when not is_nil(max_size) do
maybe_cleanup(:size, state)
end
defp check_size(state) do
{false, state}
end
defp check_memory(%__MODULE__{allocated_memory: allocated} = state) when not is_nil(allocated) do
maybe_cleanup(:memory, state)
end
defp check_memory(state) do
{false, state}
end
defp maybe_cleanup(
info,
%__MODULE__{
cache: cache,
name: name,
gc_cleanup_ref: cleanup_ref,
gc_cleanup_min_timeout: min_timeout,
gc_cleanup_max_timeout: max_timeout,
gc_interval: gc_interval,
gc_heartbeat_ref: heartbeat_ref
} = state
) do
case cleanup_info(info, state) do
{size, max_size} when size >= max_size ->
# Create a new generation
:ok = new_gen(state)
# Purge expired entries
_ = cache.delete_all(:expired, dynamic_cache: name)
# Reset the heartbeat timer
heartbeat_ref = start_timer(gc_interval, heartbeat_ref)
# Reset the cleanup timer
cleanup_ref =
info
|> cleanup_info(state)
|> elem(0)
|> reset_cleanup_timer(max_size, min_timeout, max_timeout, cleanup_ref)
{true, %{state | gc_heartbeat_ref: heartbeat_ref, gc_cleanup_ref: cleanup_ref}}
{size, max_size} ->
# Reset the cleanup timer
cleanup_ref = reset_cleanup_timer(size, max_size, min_timeout, max_timeout, cleanup_ref)
{false, %{state | gc_cleanup_ref: cleanup_ref}}
end
end
defp cleanup_info(:size, %__MODULE__{backend: mod, meta_tab: tab, max_size: max}) do
{size_info(mod, tab), max}
end
defp cleanup_info(:memory, %__MODULE__{backend: mod, meta_tab: tab, allocated_memory: max}) do
{memory_info(mod, tab), max}
end
## Private Functions
defp new_gen(%__MODULE__{
meta_tab: meta_tab,
backend: backend,
backend_opts: backend_opts,
stats_counter: stats_counter
}) do
# Create new generation
gen_tab = Backend.new(backend, meta_tab, backend_opts)
# Update generation list
case list(meta_tab) do
[newer, older] ->
# Since the older generation is deleted, update evictions count
:ok = Stats.incr(stats_counter, :evictions, backend.info(older, :size))
# Update generations
:ok = Metadata.put(meta_tab, :generations, [gen_tab, newer])
# Process the older generation:
# - Delete previously stored deprecated generation
# - Flush the older generation
# - Deprecate it (mark it for deletion)
:ok = process_older_gen(meta_tab, backend, older)
[newer] ->
# Update generations
:ok = Metadata.put(meta_tab, :generations, [gen_tab, newer])
[] ->
# Update generations
:ok = Metadata.put(meta_tab, :generations, [gen_tab])
end
end
# The older generation cannot be removed immediately because there may be
# ongoing operations using it, then it may cause race-condition errors.
# Hence, the idea is to keep it alive till a new generation is pushed, but
# flushing its data before so that we release memory space. By the time a new
# generation is pushed, then it is safe to delete it completely.
defp process_older_gen(meta_tab, backend, older) do
if deprecated = Metadata.get(meta_tab, :deprecated) do
# Delete deprecated generation if it does exist
_ = Backend.delete(backend, meta_tab, deprecated)
end
# Flush older generation to release space so it can be marked for deletion
true = backend.delete_all_objects(older)
# Keep alive older generation reference into the metadata
Metadata.put(meta_tab, :deprecated, older)
end
defp start_timer(time, ref \\ nil, event \\ :heartbeat)
defp start_timer(nil, _, _), do: nil
defp start_timer(time, ref, event) do
_ = if ref, do: Process.cancel_timer(ref)
Process.send_after(self(), event, time)
end
defp maybe_reset_timer(_, %__MODULE__{gc_interval: nil} = state) do
state.gc_heartbeat_ref
end
defp maybe_reset_timer(false, state) do
state.gc_heartbeat_ref
end
defp maybe_reset_timer(true, %__MODULE__{} = state) do
start_timer(state.gc_interval, state.gc_heartbeat_ref)
end
defp reset_cleanup_timer(size, max_size, min_timeout, max_timeout, cleanup_ref) do
size
|> linear_inverse_backoff(max_size, min_timeout, max_timeout)
|> start_timer(cleanup_ref, :cleanup)
end
defp size_info(backend, meta_tab) do
meta_tab
|> list()
|> Enum.reduce(0, &(backend.info(&1, :size) + &2))
end
defp memory_info(backend, meta_tab) do
meta_tab
|> list()
|> Enum.reduce(0, fn gen, acc ->
gen
|> backend.info(:memory)
|> Kernel.*(:erlang.system_info(:wordsize))
|> Kernel.+(acc)
end)
end
defp linear_inverse_backoff(size, _max_size, _min_timeout, max_timeout) when size <= 0 do
max_timeout
end
defp linear_inverse_backoff(size, max_size, min_timeout, _max_timeout) when size >= max_size do
min_timeout
end
defp linear_inverse_backoff(size, max_size, min_timeout, max_timeout) do
round((min_timeout - max_timeout) / max_size * size + max_timeout)
end
end