defmodule LibRedis.Client do
@moduledoc """
Redis client behaviour, there are 2 implementations of this behaviour:
- LibRedis.Standalone
- LibRedis.Cluster
"""
alias LibRedis.{Typespecs, Error}
@type t :: struct()
@opaque command_t :: Typespecs.command_t()
@type resp_t :: {:ok, term()} | Error.t()
@callback new(keyword()) :: t()
@callback start_link(keyword()) :: Typespecs.on_start()
@callback command(t(), command_t(), keyword()) :: resp_t()
@callback pipeline(t(), [command_t()], keyword()) :: resp_t()
@spec command(t(), command_t(), keyword()) :: resp_t()
def command(client, command, opts), do: delegate(client, :command, [command, opts])
@spec pipeline(t(), [command_t()], keyword()) :: resp_t()
def pipeline(client, commands, opts), do: delegate(client, :pipeline, [commands, opts])
defp delegate(%module{} = client, func, args),
do: apply(module, func, [client | args])
end
defmodule LibRedis do
@external_resource "README.md"
@moduledoc "README.md"
|> File.read!()
|> String.split("<!-- MDOC !-->")
|> Enum.fetch!(1)
alias LibRedis.{Standalone, Cluster, Typespecs, Client}
@client_options_schema [
name: [
type: :atom,
default: :redis,
doc: "The name of the redis client"
],
mode: [
type: {:in, [:standalone, :cluster]},
default: :standalone,
doc: "The mode of the redis client, :standalone or :cluster"
],
url: [
type: :string,
required: true,
doc: "The url of the redis server, like redis://:123456@localhost:6379"
],
password: [
type: :string,
default: "",
doc: "The password of the redis server, this option is useful in cluster mode"
],
pool_size: [
type: :non_neg_integer,
default: 10,
doc: "The pool size of the redis client's pool"
]
]
@type options() :: [unquote(NimbleOptions.option_typespec(@client_options_schema))]
# types
@type t :: %__MODULE__{
name: Typespecs.name(),
mode: :cluster | :standalone,
url: Typespecs.url(),
password: Typespecs.password(),
pool_size: non_neg_integer(),
client: Client.t(),
client_store_module: module(),
slot_store_module: module()
}
@enforce_keys ~w(name mode url password pool_size client client_store_module slot_store_module)a
defstruct @enforce_keys
@doc """
create a new redis client
## Options
#{NimbleOptions.docs(@client_options_schema)}
## Examples
iex> standalone_options = [
name: :redis,
mode: :standalone,
url: "redis://:pwd@localhost:6379",
pool_size: 5
]
iex> LibRedis.new(standalone_options)
"""
@spec new(options()) :: t()
def new(opts \\ []) do
opts = opts |> NimbleOptions.validate!(@client_options_schema)
__MODULE__
|> struct(opts)
|> with_client()
end
defp with_client(%__MODULE__{mode: :standalone} = redis) do
client =
Standalone.new(
name: redis.name,
url: redis.url,
pool_size: redis.pool_size
)
%{redis | client: client}
end
defp with_client(%__MODULE__{mode: :cluster} = redis) do
client =
Cluster.new(
name: redis.name,
urls: redis.url |> url_to_cluster_urls(),
password: redis.password,
pool_size: redis.pool_size
)
%{redis | client: client}
end
@doc """
execute a redis command
## Examples
iex> LibRedis.command(redis, ["SET", "key", "value"])
{:ok, "OK"}
"""
@spec command(t(), Client.command_t(), keyword()) :: Client.resp_t()
def command(redis, command, opts \\ []) do
Client.command(redis.client, command, opts)
end
@doc """
execute a redis pipeline
## Examples
iex> LibRedis.pipeline(redis, [["SET", "key", "value"], ["GET", "key"]])
{:ok, ["OK", "value"]}
"""
@spec pipeline(t(), [Client.command_t()], keyword()) :: Client.resp_t()
def pipeline(redis, commands, opts \\ []) do
Client.pipeline(redis.client, commands, opts)
end
def child_spec(opts) do
redis = Keyword.fetch!(opts, :redis)
%{id: {__MODULE__, redis.name}, start: {__MODULE__, :start_link, [opts]}}
end
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) do
{redis, _opts} = Keyword.pop!(opts, :redis)
case redis.mode do
:standalone ->
LibRedis.Standalone.start_link(pool: redis.client)
:cluster ->
LibRedis.Cluster.start_link(cluster: redis.client)
end
end
defp url_to_cluster_urls(url) do
url
|> String.split(",")
end
end
defmodule LibRedis.Standalone do
@moduledoc """
Standalone redis client is just a delegate to LibRedis.Pool
"""
alias LibRedis.{Client, Pool}
@behaviour Client
@impl Client
defdelegate new(opts), to: Pool
@impl Client
defdelegate command(cli, command, opts), to: Pool
@impl Client
defdelegate pipeline(cli, commands, opts), to: Pool
@impl Client
defdelegate start_link(opts), to: Pool
end
defmodule LibRedis.Cluster do
@moduledoc """
cluster redis client
"""
alias LibRedis.{Client, Typespecs, Error}
require Logger
@behaviour Client
# types
@type t :: %__MODULE__{
name: Typespecs.name(),
urls: [Typespecs.url()],
password: Typespecs.password(),
pool_size: non_neg_integer(),
refresh_interval_ms: non_neg_integer(),
client_store: ClientStore.t(),
slot_store: SlotStore.t()
}
@type node_info :: %{ip: bitstring(), port: non_neg_integer()}
@type slot_info :: %{
start_slot: non_neg_integer(),
end_slot: non_neg_integer(),
master: node_info(),
replicas: [node_info()]
}
@opaque state :: %{cluster: t()}
@url_regex ~r/^redis:\/\/\w+:\d+$/
defstruct [
:name,
:urls,
:password,
:pool_size,
:refresh_interval_ms,
:client_store,
:slot_store
]
@impl Client
def new(opts \\ []) do
opts =
opts
|> Keyword.put_new(:name, :cluster)
|> Keyword.put_new(:password, "")
|> Keyword.put_new(:pool_size, 10)
|> Keyword.put_new(:refresh_interval_ms, 10_000)
|> Keyword.put_new(:client_store, LibRedis.ClientStore.new())
|> Keyword.put_new(:slot_store, LibRedis.SlotStore.new())
opts[:urls]
|> Enum.map(fn url ->
if not valid_redis_url?(url) do
raise ArgumentError, "invalid url: #{url}"
end
end)
struct(__MODULE__, opts)
end
defp error_log(cmd, error),
do: Logger.error(%{"mode" => "cluster", "error" => error, "cmd" => cmd})
defp valid_redis_url?(url), do: Regex.match?(@url_regex, url)
def child_spec(opts) do
cluster = Keyword.fetch!(opts, :cluster)
%{id: {__MODULE__, cluster.name}, start: {__MODULE__, :start_link, [opts]}}
end
@impl Client
@spec start_link(cluster: t()) :: Typespecs.on_start()
def start_link(cluster: cluster) do
GenServer.start(__MODULE__, cluster, name: cluster.name)
end
@impl Client
def command(client, command, opts \\ []) do
GenServer.call(client.name, {:command, command, opts})
end
@impl Client
def pipeline(client, commands, opts \\ []) do
GenServer.call(client.name, {:pipeline, commands, opts})
end
defp url_with_password(url, ""), do: url
defp url_with_password(url, password) do
prefix = String.slice(url, 0, 8)
prefix <> ":" <> password <> "@" <> String.slice(url, 8..-1)
end
@spec parse_url(Typespecs.url()) :: {bitstring(), non_neg_integer()}
defp parse_url(url) do
[ip, port] =
url
|> String.slice(8..-1)
|> String.split(":")
{ip, String.to_integer(port)}
end
def init(cluster) do
{:ok, _} = LibRedis.ClientStore.start_link(store: cluster.client_store)
{:ok, _} = LibRedis.SlotStore.start_link(store: cluster.slot_store)
cluster.urls
|> Enum.map(fn url ->
[
name: {:via, Registry, {cluster.client_store.name, parse_url(url)}},
pool_size: cluster.pool_size,
url: url_with_password(url, cluster.password)
]
end)
|> Enum.map(&LibRedis.Pool.new(&1))
|> Enum.map(&LibRedis.Pool.start_link(pool: &1))
send(self(), :refresh)
{:ok, %{cluster: cluster}}
end
# Response Format:
# - Start slot range
# - End slot range
# - Master for slot range represented as nested IP/Port array
# - First replica of master for slot range
# - Second replica
# ...continues until all replicas for this master are returned.
# Ref - https://redis.io/commands/cluster-slots#nested-result-array
@spec parse_slot_info(any) :: [slot_info()]
defp parse_slot_info(slot_info) do
Enum.map(slot_info, fn [start_slot, end_slot, master | replicas] ->
%{
start_slot: start_slot,
end_slot: end_slot,
master: parse_node_info(master),
replicas: Enum.map(replicas, &parse_node_info/1)
}
end)
end
defp parse_node_info([node_ip, node_port, _node_id | _] = _node) do
%{
ip: node_ip,
port: node_port
}
end
@spec refetch_slot_info(ClientStore.t()) :: [slot_info()]
defp refetch_slot_info(client_store) do
client_store
|> LibRedis.ClientStore.random()
|> LibRedis.Pool.command(["CLUSTER", "SLOTS"])
|> case do
{:ok, slot_info} -> slot_info
{:error, _} -> []
end
|> parse_slot_info()
end
def handle_info(:refresh, %{cluster: cluster} = state) do
cluster.client_store
|> refetch_slot_info()
|> (fn x -> LibRedis.SlotStore.put(cluster.slot_store, x) end).()
Process.send_after(self(), :refresh, cluster.refresh_interval_ms)
{:noreply, state}
end
def handle_call({:command, command, opts}, _from, state) do
{:reply, try_command(state, command, opts, 3), state}
end
def handle_call({:pipeline, commands, opts}, _from, state) do
res = try_pipeline(state, commands, opts, 3)
# resort by original order
commands
|> Enum.map(fn cmd ->
{_, ret} =
Enum.find(res, fn
{^cmd, _} -> true
_ -> false
end)
ret
end)
|> (fn x -> {:reply, {:ok, x}, state} end).()
end
@spec try_command(state(), Typespecs.command_t(), keyword(), non_neg_integer()) ::
{:ok, any()} | {:error, any()}
defp try_command(state, command, opts, retries_left) do
case do_command(state, command, opts) do
{:ok, result} ->
{:ok, result}
{:error, %Redix.Error{message: msg}} when retries_left > 0 ->
if String.starts_with?(msg, "MOVED") do
send(self(), :refresh)
try_command(state, command, opts, retries_left - 1)
else
{:error, Error.new(msg)}
end
{:error, reason} ->
error_log(command, reason)
{:error, Error.new(inspect(reason))}
end
end
@spec try_pipeline(state(), [Typespecs.command_t()], keyword(), non_neg_integer()) ::
[{Typespecs.command_t(), any()}] | {:error, any()}
defp try_pipeline(state, commands, opts, retries_left) do
group_commands(commands, state, %{})
|> Map.to_list()
|> Enum.reduce_while([], fn {client, cmds}, acc ->
do_pipeline(client, cmds, opts)
|> case do
{:ok, res} ->
{:cont, acc ++ res}
{:error, :moved} when retries_left > 0 ->
res = try_pipeline(state, cmds, opts, retries_left - 1)
{:cont, acc ++ res}
err ->
{:halt, err}
end
end)
|> Enum.reverse()
end
@spec do_command(state(), Typespecs.command_t(), Keyword.t()) :: {:ok, term()} | {:error, any}
defp do_command(
%{cluster: cluster},
[_, key | _] = command,
opts
) do
get_client_by_key(cluster, key)
|> case do
nil -> {:error, "slot not found"}
client -> LibRedis.Pool.command(client, command, opts)
end
end
@spec do_pipeline(
LibRedis.Pool.t() | pid(),
[Typespecs.command_t()],
Keyword.t()
) ::
{:ok, [{Typespecs.command_t(), term()}]} | {:error, any}
defp do_pipeline(client, commands, opts) do
case LibRedis.Pool.pipeline(client, commands, opts) do
{:ok, result} ->
{:ok, Enum.zip(commands, result)}
{:error, %Redix.Error{message: msg}} ->
if String.starts_with?(msg, "MOVED") do
send(self(), :refresh)
{:error, :moved}
else
{:error, Error.new(msg)}
end
other ->
other
end
end
# group_commands takes a list of commands and groups them by slot.
# returns a map of standalone-client -> command list
@spec group_commands([Typespecs.command_t()], state(), map()) ::
%{
(LibRedis.Pool.t() | pid()) => [Typespecs.command_t()]
}
| {:error, any()}
defp group_commands([], _, acc), do: acc
defp group_commands([command | t], %{cluster: cluster} = state, acc) do
[_, key | _] = command
get_client_by_key(cluster, key)
|> case do
nil ->
{:error, "slot not found"}
cli ->
group_commands(t, state, Map.update(acc, cli, [command], &(&1 ++ [command])))
end
end
@spec load_client(t(), Typespecs.slot()) :: LibRedis.Pool.t() | pid()
defp load_client(cluster, slot) do
cluster.client_store
|> LibRedis.ClientStore.get({slot.master.ip, slot.master.port})
|> case do
nil ->
[
name: {:via, Registry, {cluster.client_store.name, {slot.master.ip, slot.master.port}}},
pool_size: cluster.pool_size,
url:
url_with_password(
"redis://:#{cluster.password}@#{slot.master.ip}:#{slot.master.port}",
""
)
]
|> LibRedis.Pool.new()
|> tap(&LibRedis.Pool.start_link(pool: &1))
client ->
client
end
end
@spec get_client_by_key(t(), bitstring()) :: LibRedis.Pool.t() | pid() | nil
defp get_client_by_key(cluster, key) do
target = LibRedis.SlotFinder.hash_slot(key)
cluster.slot_store
|> LibRedis.SlotStore.get()
|> Enum.find(fn x -> x.start_slot <= target and x.end_slot >= target end)
|> case do
nil ->
nil
slot ->
load_client(cluster, slot)
end
end
end