defmodule Nebulex.Adapters.Cachex do
@moduledoc """
Nebulex adapter for [Cachex][cachex].
This adapter provides a Nebulex interface on top of Cachex, a powerful
in-memory caching library for Elixir. It combines Nebulex's unified caching
API with Cachex's rich feature set including transactions, hooks, expiration,
and statistics. Use this adapter when you need a feature-rich local cache or
as a building block for distributed caching topologies.
[cachex]: https://hexdocs.pm/cachex/Cachex.html
## Options
This adapter supports all Cachex configuration options. The options are passed
directly to [Cachex.start_link/2][cachex_start_link], allowing you to leverage
Cachex's full feature set including expiration, hooks, limits, and warmers.
[cachex_start_link]: https://hexdocs.pm/cachex/Cachex.html#start_link/2
## Example
You can define a cache using Cachex as follows:
defmodule MyApp.Cache do
use Nebulex.Cache,
otp_app: :my_app,
adapter: Nebulex.Adapters.Cachex
end
Where the configuration for the cache must be in your application
environment, usually defined in your `config/config.exs`:
config :my_app, MyApp.Cache,
stats: true,
...
If your application was generated with a supervisor (by passing `--sup`
to `mix new`) you will have a `lib/my_app/application.ex` file containing
the application start callback that defines and starts your supervisor.
You just need to edit the `start/2` function to start the cache as a
supervisor on your application's supervisor:
def start(_type, _args) do
children = [
{MyApp.Cache, []},
]
...
end
Since Cachex uses macros for some configuration options, you could also
pass the options in runtime when the cache is started, either by calling
`MyApp.Cache.start_link/1` directly, or in your app supervision tree:
def start(_type, _args) do
children = [
{MyApp.Cache, cachex_opts()},
]
...
end
defp cachex_opts do
import Cachex.Spec
[
expiration: expiration(
# how often cleanup should occur
interval: :timer.seconds(30),
# default record expiration
default: :timer.seconds(60),
# whether to enable lazy checking
lazy: true
),
# hooks
hooks: [
hook(module: MyHook, name: :my_hook, args: { })
],
...
]
end
> See [Cachex.start_link/2][cachex_start_link] for more information.
## Distributed caching topologies
Using the distributed adapters with `Cachex` as a primary storage is possible.
For example, let's define a multi-level cache (near cache topology), where
the L1 is a local cache using Cachex and the L2 is a partitioned cache.
defmodule MyApp.NearCache do
use Nebulex.Cache,
otp_app: :nebulex,
adapter: Nebulex.Adapters.Multilevel
defmodule L1 do
use Nebulex.Cache,
otp_app: :nebulex,
adapter: Nebulex.Adapters.Cachex
end
defmodule L2 do
use Nebulex.Cache,
otp_app: :nebulex,
adapter: Nebulex.Adapters.Partitioned,
adapter_opts: [primary_storage_adapter: Nebulex.Adapters.Cachex]
end
end
And the configuration may look like:
config :my_app, MyApp.NearCache,
model: :inclusive,
levels: [
{MyApp.NearCache.L1, []},
{MyApp.NearCache.L2, primary: [transactions: true]}
]
> **NOTE:** You could also use [Nebulex.Adapters.Redis][nbx_redis_adapter] for
L2, it would be a matter of changing the adapter for the L2 and the
configuration to set up the Redis adapter.
[nbx_redis_adapter]: https://github.com/elixir-nebulex/nebulex_redis_adapter
See [Nebulex examples](https://github.com/elixir-nebulex/nebulex_examples).
You will find examples for all different topologies, even using other adapters
like Redis; for all examples using the `Nebulex.Adapters.Local` adapter,
you can replace it by `Nebulex.Adapters.Cachex`.
## Query API
The adapter supports querying cached entries using Cachex's query syntax or
explicit key lists.
### Pattern-based queries
Use Cachex query syntax for pattern matching:
# Get all entries
MyApp.Cache.get_all!()
# Count all entries
MyApp.Cache.count_all!()
# Delete all entries
MyApp.Cache.delete_all!()
# Delete expired entries
MyApp.Cache.delete_all!(:expired)
# Stream entries for large datasets
MyApp.Cache.stream!() |> Enum.take(100)
### Explicit key queries
Query specific keys using the `in: keys` syntax:
# Get multiple keys
MyApp.Cache.get_all!(in: ["key1", "key2", "key3"])
# Count specific keys
MyApp.Cache.count_all!(in: ["key1", "key2"])
# Delete specific keys
MyApp.Cache.delete_all!(in: ["key1", "key2"])
## Transactions
The adapter provides full transaction support through Cachex's locking
mechanism, ensuring atomic operations across multiple keys.
MyApp.Cache.transaction(
fn ->
value = MyApp.Cache.get!("counter")
MyApp.Cache.put!("counter", value + 1)
MyApp.Cache.put!("last_updated", DateTime.utc_now())
end,
keys: ["counter", "last_updated"]
)
Transactions automatically handle locking and isolation for the specified keys.
## Stats and Monitoring
Enable statistics collection by setting `stats: true` in your configuration
(enabled by default):
config :my_app, MyApp.Cache,
stats: true
Retrieve statistics:
# Get all stats
iex> MyApp.Cache.info!(:stats)
%{
calls: %{get: 10, put: 5, delete: 2},
evictions: 0,
expirations: 1,
hit_rate: 80.0,
hits: 8,
misses: 2,
...
}
See `Cachex.Stats` for detailed statistics information.
"""
# Provide Cache Implementation
@behaviour Nebulex.Adapter
@behaviour Nebulex.Adapter.KV
@behaviour Nebulex.Adapter.Queryable
@behaviour Nebulex.Adapter.Transaction
# Inherit default info implementation
use Nebulex.Adapters.Common.Info
# Inherit default observable implementation
use Nebulex.Adapter.Observable
import Cachex.Spec
import Nebulex.Utils
alias __MODULE__.Router
alias Cachex.Query
alias Cachex.Services.Locksmith
alias Nebulex.Adapter
alias Nebulex.Cache.Options, as: NbxOptions
# Nebulex options
@nbx_start_opts NbxOptions.__compile_opts__() ++ NbxOptions.__start_opts__()
## Nebulex.Adapter
@impl true
defmacro __before_compile__(_env) do
quote do
@doc """
A convenience function to return the Cachex cache name.
"""
def cache_name(opts \\ []) do
name = Keyword.get(opts, :name, __MODULE__)
name
|> Adapter.lookup_meta()
|> Map.fetch!(:cachex_name)
end
end
end
@impl true
def init(opts) do
# Get the cache name (required)
name = opts[:name] || Keyword.fetch!(opts, :cache)
# Maybe use stats
stats = Keyword.get(opts, :stats, true)
# Stats hooks
stats_hooks =
if Keyword.get(opts, :stats, true) do
[hook(module: Cachex.Stats)]
else
[]
end
adapter_meta = %{
name: name,
cachex_name: camelize_and_concat([name, Cachex]),
stats: stats
}
child_spec =
opts
|> Keyword.drop(@nbx_start_opts)
|> Keyword.put(:name, adapter_meta.cachex_name)
|> Keyword.update(:hooks, stats_hooks, &(stats_hooks ++ &1))
|> Cachex.child_spec()
{:ok, child_spec, adapter_meta}
end
## Nebulex.Adapter.KV
@impl true
def fetch(%{cachex_name: name}, key, opts) do
name
|> Router.route({:fetch, [key, opts]})
|> handle_response()
end
@impl true
def put(%{cachex_name: name}, key, value, on_write, ttl, keep_ttl?, _opts) do
do_put(on_write, name, key, value, ttl, keep_ttl?)
end
defp do_put(:put, name, key, value, ttl, true) do
Cachex.transaction(name, [key], fn worker ->
with {:ok, false} <- Cachex.update(worker, key, value),
{:ok, _} <- Cachex.put(worker, key, value, expire: to_ttl(ttl)) do
{:ok, true}
end
end)
|> handle_response(true)
end
defp do_put(:put, name, key, value, ttl, false) do
name
|> Cachex.put(key, value, expire: to_ttl(ttl))
|> handle_response()
end
defp do_put(:replace, name, key, value, ttl, false) do
Cachex.transaction(name, [key], fn worker ->
with {:ok, true} <- Cachex.update(worker, key, value),
{:ok, _} <- Cachex.expire(worker, key, to_ttl(ttl)) do
{:ok, true}
end
end)
|> handle_response(true)
end
defp do_put(:replace, name, key, value, _ttl, true) do
name
|> Cachex.update(key, value)
|> handle_response()
end
defp do_put(:put_new, name, key, value, ttl, _keep_ttl?) do
name
|> Router.route({:put_new, [key, value, [expire: to_ttl(ttl)]]})
|> handle_response()
end
@impl true
def put_all(adapter_meta, entries, on_write, ttl, opts)
def put_all(%{cachex_name: name}, entries, on_write, ttl, _opts) when is_map(entries) do
do_put_all(name, :maps.to_list(entries), ttl, on_write)
end
def put_all(%{cachex_name: name}, entries, on_write, ttl, _opts) do
do_put_all(name, entries, ttl, on_write)
end
defp do_put_all(name, entries, ttl, :put) do
name
|> Cachex.put_many(entries, expire: to_ttl(ttl))
|> handle_response()
end
defp do_put_all(name, entries, ttl, :put_new) do
name
|> Router.route({:put_new_all, [entries, [expire: to_ttl(ttl)]]})
|> handle_response()
end
@impl true
def delete(%{cachex_name: name}, key, _opts) do
with {:ok, true} <- Cachex.del(name, key) do
:ok
end
|> handle_response()
end
@impl true
def take(%{cachex_name: name}, key, _opts) do
case {Cachex.exists?(name, key), Cachex.take(name, key)} do
{{:ok, true}, {:ok, nil}} ->
{:ok, nil}
{{:ok, false}, {:ok, nil}} ->
wrap_error Nebulex.KeyError, key: key, cache: name, reason: :not_found
{_ignore, result} ->
result
end
|> handle_response()
end
@impl true
def has_key?(%{cachex_name: name}, key, _opts) do
name
|> Cachex.exists?(key)
|> handle_response()
end
@impl true
def ttl(%{cachex_name: name}, key, _opts) do
# TODO(cachex-ttl): Workaround for Cachex nil ambiguity. Remove this once
# Cachex can clearly distinguish "key not found" from "no TTL set".
Cachex.transaction(name, [key], fn worker ->
with {:ok, nil} <- Cachex.ttl(worker, key),
{:ok, true} <- Cachex.exists?(worker, key) do
# Key does exist and hasn't a TTL associated with it
{:ok, :infinity}
else
{:ok, false} ->
# Key does not exist
wrap_error Nebulex.KeyError, key: key, cache: worker, reason: :not_found
other ->
other
end
end)
|> handle_response(true)
end
@impl true
def expire(%{cachex_name: name}, key, ttl, _opts) do
name
|> Cachex.expire(key, to_ttl(ttl))
|> handle_response()
end
@impl true
def touch(%{cachex_name: name}, key, _opts) do
name
|> Cachex.touch(key)
|> handle_response()
end
@impl true
def update_counter(%{cachex_name: name}, key, amount, default, ttl, _opts) do
# TODO(cachex-update-counter-ttl): Workaround because Cachex.incr/4 does
# not support setting TTL atomically. Remove this when Cachex provides TTL
# support for counter updates.
Cachex.transaction(name, [key], fn worker ->
with {:ok, exists?} <- Cachex.exists?(worker, key) do
do_update_counter(worker, key, amount, default, ttl, exists?)
end
end)
|> handle_response(true)
end
defp do_update_counter(name, key, amount, default, _ttl, true) do
Cachex.incr(name, key, amount, default: default)
end
defp do_update_counter(name, key, amount, default, ttl, false) do
with {:ok, _} = ok <- Cachex.incr(name, key, amount, default: default),
{:ok, _} <- Cachex.expire(name, key, to_ttl(ttl)) do
ok
end
end
## Nebulex.Adapter.Queryable
@impl true
def execute(adapter_meta, query_meta, opts)
def execute(_adapter_meta, %{op: :get_all, query: {:in, []}}, _opts) do
{:ok, []}
end
def execute(_adapter_meta, %{op: op, query: {:in, []}}, _opts)
when op in [:count_all, :delete_all] do
{:ok, 0}
end
def execute(%{cachex_name: name}, %{op: :count_all, query: {:q, nil}}, _opts) do
name
|> Cachex.size()
|> handle_response()
end
def execute(%{cachex_name: name}, %{op: :delete_all, query: {:q, nil}}, _opts) do
name
|> Cachex.clear()
|> handle_response()
end
def execute(%{cachex_name: name}, %{op: :delete_all, query: {:q, :expired}}, _opts) do
name
|> Cachex.purge()
|> handle_response()
end
def execute(%{cachex_name: name}, %{op: :count_all, query: {:in, keys}}, opts)
when is_list(keys) do
name
|> Router.route({:count_all, [keys, opts]})
|> handle_response()
end
def execute(%{cachex_name: name}, %{op: :delete_all, query: {:in, keys}}, opts)
when is_list(keys) do
name
|> Router.route({:delete_all, [keys, opts]})
|> handle_response()
end
def execute(adapter_meta, query, opts) do
with {:ok, stream} <- stream(adapter_meta, query, Keyword.put_new(opts, :max_entries, 25)) do
{:ok, Enum.to_list(stream)}
end
|> handle_response()
end
@impl true
def stream(adapter_meta, query_meta, opts)
def stream(adapter_meta, %{query: {:q, nil}, select: select} = query, opts) do
stream(adapter_meta, %{query | query: {:q, Query.build(output: select)}}, opts)
end
def stream(%{cachex_name: name}, %{query: {:q, query}}, opts) do
with {:error, :invalid_match} <-
Cachex.stream(name, query, buffer: Keyword.fetch!(opts, :max_entries)) do
raise Nebulex.QueryError, message: "invalid query #{inspect(query)}", query: query
end
|> handle_response()
end
def stream(%{cachex_name: name}, %{query: {:in, keys}, select: select}, opts) do
max_entries = Keyword.fetch!(opts, :max_entries)
keys
|> Stream.chunk_every(max_entries)
|> Stream.map(&Router.route(name, {:get_all, [&1, select, [max_entries: max_entries]]}))
|> Stream.flat_map(& &1)
|> wrap_ok()
end
## Nebulex.Adapter.Transaction
@impl true
def transaction(%{cachex_name: name}, fun, opts) do
opts = Keyword.validate!(opts, keys: [])
keys = Keyword.fetch!(opts, :keys)
name
|> Cachex.transaction(keys, fun)
|> handle_response()
end
@impl true
def in_transaction?(_adapter_meta, _opts) do
{:ok, Locksmith.transaction?()}
end
## Nebulex.Adapter.Info
@impl true
def info(adapter_meta, spec, opts)
def info(%{cachex_name: name} = adapter_meta, :all, opts) do
with {:ok, stats} <- Cachex.stats(name) do
{:ok, base_info} = super(adapter_meta, :all, opts)
{:ok, Map.merge(base_info, %{stats: stats})}
end
|> handle_response()
end
def info(%{cachex_name: name}, :stats, _opts) do
name
|> Cachex.stats()
|> handle_response()
end
def info(adapter_meta, spec, opts) when is_list(spec) do
Enum.reduce(spec, {:ok, %{}}, fn s, {:ok, acc} ->
{:ok, info} = info(adapter_meta, s, opts)
{:ok, Map.put(acc, s, info)}
end)
|> handle_response()
end
def info(adapter_meta, spec, opts) do
super(adapter_meta, spec, opts)
end
## Private Functions
defp to_ttl(:infinity), do: nil
defp to_ttl(ttl), do: ttl
defp handle_response(response, transaction? \\ false)
defp handle_response({:error, reason}, false) when is_nebulex_exception(reason) do
{:error, reason}
end
defp handle_response({:error, reason}, false) do
wrap_error Nebulex.Error, reason: reason
end
defp handle_response(other, false) do
other
end
defp handle_response({:ok, response}, true) do
handle_response(response)
end
defp handle_response(error, true) do
handle_response(error)
end
end