defmodule DCache do
@doc """
Creates a module to wrap cache operations. This is the preferred way to
create a cache as it performs better.
defmodule App.Cache do
require DCache
DCache.define(Users, 100_000) # creates an App.Cache.Users module
end
opts:
cache: Module
The name of the module to create. This will be nested within the calling module
max: integer
The maximum number of items to hold in the cache
opts:
segments: integer
The number of segments to create (defaults to 100, 10, 3 or 1) depending on `max`
purger: symbol | function
The purger to use, defaults to `:default`
Supported values: `:default`, `:no_spawn`, `:blocking` or a custom function
"""
defmacro define(cache, max, opts \\ []) do
quote location: :keep do
defmodule unquote(cache) do
@config DCache.Impl.config(unquote(cache), unquote(max), unquote(opts))
def setup(), do: DCache.Impl.setup(@config)
def clear(), do: DCache.Impl.clear(@config)
def get(key), do: DCache.Impl.get(key, @config)
def del(key), do: DCache.Impl.del(key, @config)
def take(key), do: DCache.Impl.take(key, @config)
def put(key, value, ttl), do: DCache.Impl.put(key, value, ttl, @config)
def expires(key), do: DCache.Impl.expires(key, @config)
def fetch(key, fun, ttl \\ nil), do: DCache.Impl.fetch(key, fun, ttl, @config)
def fetch!(key, fun, ttl \\ nil), do: DCache.Impl.fetch!(key, fun, ttl, @config)
def size(), do: DCache.Impl.size(@config)
end
end
end
@doc """
Creates the cache
opts:
cache: atom
The name of the cache
max: integer
The maximum number of items to hold in the cache
opts:
segments: integer
The number of segments to create (defaults to 100)
"""
def setup(cache, max, opts \\ []) do
config = DCache.Impl.config(cache, max, opts)
:ets.new(cache, [:set, :public, :named_table, read_concurrency: true])
:ets.insert(cache, {:config, config})
DCache.Impl.setup(config)
end
@doc """
Clears the cache. While the cache is being cleared, concurrent activity is
severely limited.
"""
def clear(cache), do: DCache.Impl.clear(get_config(cache))
@doc """
Gets the value from the cache, returning nil if not found or expired
"""
def get(cache, key), do: DCache.Impl.get(key, get_config(cache))
@doc """
Deletes the value from the cache, safe to call even if the key is not in the cache
"""
def del(cache, key), do: DCache.Impl.del(key, get_config(cache))
@doc """
Deletes and removes the value from the cache. Returns `nil` if not found.
Returns {:ok, {key, value, expires}} if found
"""
def take(cache, key), do: DCache.Impl.take(key, get_config(cache))
@doc """
Gets the unix time in seconds when the value will be considered expired.
Rrturns `nil` if the value is not found. The return value can
be in the past
"""
def expires(cache, key), do: DCache.Impl.expires(key, get_config(cache))
@doc """
Puts the value in the cache. TTL is a relative time in second.
For example, 300 would mean that the value would expire in
5 minutes
"""
def put(cache, key, value, ttl), do: DCache.Impl.put(key, value, ttl, get_config(cache))
@doc """
Gets the value from the cache. Executes `fun/1` if the value is not found.
`fun/1` receives the cache key being looked up and should return one of:
* `{:ok, value}`
* `{:ok, value, ttl}`
* `{:skip, value}`
* `{:error, term}`
"""
def fetch(cache, key, fun, ttl \\ nil), do: DCache.Impl.fetch(key, fun, ttl, get_config(cache))
@doc """
Same as fetch, but unwraps the returned value or raises on error.
"""
def fetch!(cache, key, fun, ttl \\ nil), do: DCache.Impl.fetch!(key, fun, ttl, get_config(cache))
@doc """
Returns the total number of items in the cache, including expired
items. This is O(N) over the number of segments
"""
def size(cache), do: DCache.Impl.size(get_config(cache))
defp get_config(cache) do
[{_, config}] = :ets.lookup(cache, :config)
config
end
defmodule Impl do
@moduledoc false
def config(cache, max, opts) do
purger = Keyword.get(opts, :purger, :default)
segment_count = Keyword.get_lazy(opts, :segments, fn ->
cond do
max >= 10_000 -> 100
max >= 100 -> 10
max >= 10 -> 3
true -> 1
end
end)
segments = Enum.map(0..(segment_count)-1, fn i ->
String.to_atom("#{cache}#{i}")
end)
{List.to_tuple(segments), trunc(max / segment_count), purger}
end
def setup({segments, _max_per_segment, _purger}) do
reduce_segments(segments, nil, fn segment, _->
:ets.new(segment, [
:set,
:public,
:named_table,
read_concurrency: true,
write_concurrency: true,
decentralized_counters: false,
])
end)
:ok
end
def clear({segments, _max_per_segment, _purger}) do
reduce_segments(segments, nil, fn segment, _ ->
:ets.delete_all_objects(segment)
end)
end
def get(key, {segments, _max_per_segment, _purger}) do
key
|> segment_for_key(segments)
|> get_from_segment(key)
end
defp get_from_segment(segment, key) do
with [{_key, value, expires}] <- :ets.lookup(segment, key),
true <- expires > :erlang.system_time(:second)
do
{:ok, value}
else
[] -> nil
false ->
:ets.delete(segment, key)
nil
end
end
def put(key, value, ttl, {segments, max_per_segment, purger}) do
key
|> segment_for_key(segments)
|> put_in_segment(key, value, ttl, max_per_segment, purger)
end
defp put_in_segment(segment, key, value, ttl, max_per_segment, purger) do
expires = :erlang.system_time(:second) + ttl
entry = {key, value, expires}
case :ets.insert_new(segment, entry) do
false -> :ets.insert(segment, entry) # just relace, didn't grow
true ->
if :ets.info(segment, :size) > max_per_segment do
case purger do
:blocking ->
:ets.delete_all_objects(segment)
:ets.insert_new(segment, entry) # re-insert this one we just inserted
:default ->
# Only do this if purger isn't already running.
# Insert a 'valid' entry (key, value, expiration) makes our purging
# simpler, as it doesn't have to special-case this key
if :ets.insert_new(segment, {:__purging, nil, 99999999999}) do
spawn fn -> purge_segment(segment, max_per_segment) end
end
:no_spawn ->
# Only do this if purger isn't already running
# Insert a 'valid' entry (key, value, expiration) makes our purging
# simpler, as it doesn't have to special-case this key
if :ets.insert_new(segment, {:__purging, nil, 99999999999}) do
purge_segment(segment, max_per_segment)
end
fun -> fun.(segment)
end
end
end
:ok
end
def del(key, {segments, _max_per_segment, _purger}) do
segment = segment_for_key(key, segments)
:ets.delete(segment, key)
:ok
end
def take(key, {segments, _max_per_segment, _purger}) do
segment = segment_for_key(key, segments)
case :ets.take(segment, key) do
[] -> nil
[item] -> {:ok, item}
end
end
def expires(key, {segments, _max_per_segment, _purger}) do
segment = segment_for_key(key, segments)
case :ets.lookup(segment, key) do
[{_, _, expires}] -> expires
_ -> nil
end
end
def fetch(key, fun, ttl, {segments, max_per_segment, purger}) do
segment = segment_for_key(key, segments)
case get_from_segment(segment, key) do
nil ->
case fun.(key) do
{:ok, value} = ok -> put_in_segment(segment, key, value, ttl, max_per_segment, purger); ok
{:ok, value, ttl} -> put_in_segment(segment, key, value, ttl, max_per_segment, purger); {:ok, value}
{:error, _} = err -> err
{:skip, value} -> value
end
found -> found
end
end
def fetch!(key, fun, ttl, config) do
case fetch(key, fun, ttl, config) do
{:ok, value} -> value
{:error, err} -> raise err
whatever -> whatever
end
end
defp segment_for_key(key, segments) do
hash = :erlang.phash2(key, tuple_size(segments))
elem(segments, hash)
end
def size({segments, _max_per_segment, _purger}) do
reduce_segments(segments, 0, fn segment, count ->
count + :ets.info(segment, :size)
end)
end
defp reduce_segments(segments, acc, fun) do
Enum.reduce(0..tuple_size(segments)-1, acc, fn i, acc ->
fun.(elem(segments, i), acc)
end)
end
# really small, just lock it and delete it
defp purge_segment(segment, max_per_segment) when max_per_segment < 100 do
:ets.delete_all_objects(segment)
end
defp purge_segment(segment, _max_per_segment) do
now = :erlang.system_time(:second)
:ets.safe_fixtable(segment, true)
# First try to purge expired slots
# If we don't purge anything, purge random slots
# If we still haven't purged anything, then purge everything
with {0, max_slot} <- purge_slots_expired(segment, 0, now, 0),
0 <- purge_slots_random(segment, max_slot)
do
# what else to do?
:ets.delete_all_objects(segment)
end
after
:ets.safe_fixtable(segment, false)
:ets.delete(segment, :__purging)
end
# Purges expired items
defp purge_slots_expired(segment, slot, now, purged) do
case :ets.slot(segment, slot) do
:'$end_of_table' -> {purged, slot}
entries ->
purged =
Enum.reduce(entries, purged, fn {key, _, expires}, purged ->
case expires < now do
false -> purged
true -> :ets.delete(segment, key); purged + 1
end
end)
purge_slots_expired(segment, slot + 1, now, purged)
end
end
# Purges random (expired or not) items. We need to make space
defp purge_slots_random(segment, max_slot) do
start_slot = max(:rand.uniform(max_slot) - 100, 0)
end_slot = min(start_slot + 200, max_slot - 1)
purge_slot_range(segment, start_slot, end_slot, 0)
end
defp purge_slot_range(_segment, slot, end_slot, purged) when slot > end_slot, do: purged
defp purge_slot_range(segment, slot, end_slot, purged) do
case :ets.slot(segment, slot) do
:'$end_of_table' -> purged
entries ->
purged =
Enum.reduce(entries, purged, fn {key, _value, _expires}, purged ->
:ets.delete(segment, key)
purged + 1
end)
purge_slot_range(segment, slot + 1, end_slot, purged)
end
rescue
# happens if we call slot/2 with an invalid slot
ArgumentError -> purged
end
end
end