defmodule Hammer.Backend.EredisCluster do
@moduledoc """
Documentation for Hammer.Backend.EredisCluster
This backend uses the [eredis_cluster](https://hex.pm/packages/eredis_cluster) library to connect to Redis.
The backend process is started by calling `start_link`:
Hammer.Backend.EredisCluster.start_link(
expiry_ms: 60_000 * 10,
eredis_cluster_name: :mycluster,
eredis_cluster_init_servers: [
{"127.0.0.1", 30001},
{"127.0.0.1", 30002},
{"127.0.0.1", 30003}
],
eredis_options: [
username: "myuser",
password: "mypassword"
]
)
Options are:
- `expiry_ms`: Expiry time of buckets in milliseconds,
used to set TTL on Redis keys. This configuration is mandatory.
- `eredis_cluster_name`: The `eredis_cluster` name to use.
- `eredis_cluster_init_servers`: List of tuples of hostname (or IP address) and port number pairs.
- `eredis_options`: Keyword list of options that are passed down to `eredis`.
"""
@behaviour Hammer.Backend
@default_cluster :eredis_cluster_default
use GenServer
@type bucket_key :: {bucket :: integer, id :: String.t()}
@type bucket_info ::
{key :: bucket_key, count :: integer, created :: integer, updated :: integer}
## Public API
@spec start :: :ignore | {:error, any} | {:ok, pid}
def start do
start([])
end
@spec start(keyword()) :: :ignore | {:error, any} | {:ok, pid}
def start(args) do
GenServer.start(__MODULE__, args)
end
@spec start_link :: :ignore | {:error, any} | {:ok, pid}
def start_link do
start_link([])
end
@spec start_link(keyword()) :: :ignore | {:error, any} | {:ok, pid}
def start_link(args) do
GenServer.start_link(__MODULE__, args)
end
@spec stop :: any
def stop do
GenServer.call(__MODULE__, :stop)
end
@doc """
Record a hit in the bucket identified by `key`
"""
@impl Hammer.Backend
def count_hit(pid, key, now) do
GenServer.call(pid, {:count_hit, key, now, 1})
end
@doc """
Record a hit in the bucket identified by `key`, with a custom increment
"""
@impl Hammer.Backend
def count_hit(pid, key, now, increment) do
GenServer.call(pid, {:count_hit, key, now, increment})
end
@doc """
Retrieve information about the bucket identified by `key`
"""
@impl Hammer.Backend
def get_bucket(pid, key) do
GenServer.call(pid, {:get_bucket, key})
end
@doc """
Delete all buckets associated with `id`.
"""
@impl Hammer.Backend
def delete_buckets(pid, id) do
delete_buckets_timeout = GenServer.call(pid, {:get_delete_buckets_timeout})
GenServer.call(pid, {:delete_buckets, id}, delete_buckets_timeout)
end
## GenServer Callbacks
@impl GenServer
def init(args) do
expiry_ms = Keyword.get(args, :expiry_ms)
if !expiry_ms do
raise RuntimeError, "Missing required config: expiry_ms"
end
eredis_cluster_name = Keyword.get(args, :eredis_cluster_name)
eredis_cluster_init_servers = Keyword.get(args, :eredis_cluster_init_servers)
eredis_options = Keyword.get(args, :eredis_options)
:eredis_cluster.start()
eredis_cluster_init_servers =
if eredis_cluster_init_servers != nil do
Enum.map(eredis_cluster_init_servers, fn
{host, port} when is_binary(host) -> {String.to_charlist(host), port}
other -> other
end)
end
eredis_options =
if eredis_options != nil do
Enum.map(eredis_options, fn
{:socket_options, socket_options} -> {:socket_options, socket_options}
{key, value} when is_binary(value) -> {key, String.to_charlist(value)}
other -> other
end)
end
case {eredis_cluster_name, eredis_cluster_init_servers, eredis_options} do
# Init is done elsewhere no need to connect
{nil, nil, nil} ->
:ok
# We only have a init_servers config
{nil, _, nil} ->
:eredis_cluster.connect(eredis_cluster_init_servers)
# We have init_servers config and options for eredis
{nil, _, _} ->
:eredis_cluster.connect(eredis_cluster_init_servers, eredis_options)
# We have a full configuration with cluster name
{_, _, _} ->
:eredis_cluster.connect(eredis_cluster_name, eredis_cluster_init_servers, eredis_options)
end
delete_buckets_timeout = Keyword.get(args, :delete_buckets_timeout, 5000)
{:ok,
%{
eredis_cluster_name: eredis_cluster_name || @default_cluster,
expiry_ms: expiry_ms,
delete_buckets_timeout: delete_buckets_timeout
}}
end
@impl GenServer
def handle_call(:stop, _from, state) do
{:stop, :normal, :ok, state}
end
def handle_call({:count_hit, key, now, increment}, _from, %{eredis_cluster_name: name} = state) do
expiry = get_expiry(state)
result = do_count_hit(name, key, now, increment, expiry)
{:reply, result, state}
end
def handle_call({:get_bucket, key}, _from, %{eredis_cluster_name: name} = state) do
redis_key = make_redis_key(key)
command = ["HMGET", redis_key, "count", "created", "updated"]
result =
case :eredis_cluster.q(name, command) do
{:ok, [:undefined, :undefined, :undefined]} ->
{:ok, nil}
{:ok, [count, created, updated]} ->
count = String.to_integer(count)
created = String.to_integer(created)
updated = String.to_integer(updated)
{:ok, {key, count, created, updated}}
{:error, reason} ->
{:error, reason}
end
{:reply, result, state}
end
def handle_call({:delete_buckets, id}, _from, %{eredis_cluster_name: name} = state) do
redis_key_pattern = make_redis_key_pattern(id)
result = do_delete_buckets(name, redis_key_pattern, "0", 0)
{:reply, result, state}
end
def handle_call(
{:get_delete_buckets_timeout},
_from,
%{delete_buckets_timeout: delete_buckets_timeout} = state
) do
{:reply, delete_buckets_timeout, state}
end
defp do_delete_buckets(name, redis_key_pattern, cursor, count_deleted) do
case :eredis_cluster.qk(name, ["SCAN", cursor, "MATCH", redis_key_pattern], redis_key_pattern) do
{:ok, ["0", []]} ->
{:ok, count_deleted}
{:ok, [next_cursor, []]} ->
do_delete_buckets(name, redis_key_pattern, next_cursor, count_deleted)
{:ok, ["0", keys]} ->
:eredis_cluster.qk(name, for(k <- keys, do: ["DEL", k]), redis_key_pattern)
{:ok, Enum.count(keys) + count_deleted}
{:ok, [next_cursor, keys]} ->
:eredis_cluster.qk(name, for(k <- keys, do: ["DEL", k]), redis_key_pattern)
do_delete_buckets(name, redis_key_pattern, next_cursor, count_deleted + Enum.count(keys))
{:error, reason} ->
{:error, reason}
end
end
# we are using the first method described (called bucketing)
# in https://www.youtube.com/watch?v=CRGPbCbRTHA
# but we add the 'created' and 'updated' meta information fields.
defp do_count_hit(name, key, now, increment, expiry) do
redis_key = make_redis_key(key)
cmds = [
["MULTI"],
[
"HINCRBY",
redis_key,
"count",
increment
],
[
"HSETNX",
redis_key,
"created",
now
],
[
"HSET",
redis_key,
"updated",
now
],
[
"EXPIRE",
redis_key,
expiry
],
["EXEC"]
]
case :eredis_cluster.q(name, cmds) do
[
{:ok, "OK"},
{:ok, "QUEUED"},
{:ok, "QUEUED"},
{:ok, "QUEUED"},
{:ok, "QUEUED"},
{:ok, [new_count, _, _, "1"]}
] ->
{new_count, ""} = Integer.parse(new_count)
{:ok, new_count}
results ->
Enum.find(results, fn
{:error, _} -> true
_ -> false
end)
end
end
def make_redis_key({bucket, id}) do
"Hammer:EredisCluster:{#{id}}:#{bucket}"
end
def make_redis_key_pattern(id) do
"Hammer:EredisCluster:{#{id}}:*"
end
defp get_expiry(state) do
%{expiry_ms: expiry_ms} = state
round(expiry_ms / 1000 + 1)
end
end