lib/lib_redis.ex

defmodule LibRedis do
  @external_resource "README.md"
  @moduledoc "README.md"
             |> File.read!()
             |> String.split("<!-- MDOC !-->")
             |> Enum.fetch!(1)

  alias LibRedis.{Standalone, Cluster, Typespecs}

  @client_options_schema [
    name: [
      type: :atom,
      default: :redis,
      doc: "The name of the redis client"
    ],
    mode: [
      type: {:in, [:standalone, :cluster]},
      default: :standalone,
      doc: "The mode of the redis client, :standalone or :cluster"
    ],
    url: [
      type: :string,
      required: true,
      doc: "The url of the redis server, like redis://:123456@localhost:6379"
    ],
    password: [
      type: :string,
      default: "",
      doc: "The password of the redis server, this option is useful in cluster mode"
    ],
    pool_size: [
      type: :non_neg_integer,
      default: 10,
      doc: "The pool size of the redis client's pool"
    ],
    client_store_module: [
      type: :any,
      default: LibRedis.ClientStore.Default,
      doc: "The implementation of the client store, see LibRedis.ClientStore for more details"
    ],
    slot_store_module: [
      type: :any,
      default: LibRedis.SlotStore.Default,
      doc: "The implementation of the slot store, see LibRedis.SlotStore for more details"
    ]
  ]
  @type options() :: [unquote(NimbleOptions.option_typespec(@client_options_schema))]
  # types
  @type t :: %__MODULE__{
          name: Typespecs.name(),
          mode: :cluster | :standalone,
          url: Typespecs.url(),
          password: Typespecs.password(),
          pool_size: non_neg_integer(),
          client: Client.client(),
          client_store_module: module(),
          slot_store_module: module()
        }

  @enforce_keys ~w(name mode url password pool_size client client_store_module slot_store_module)a

  defstruct @enforce_keys

  defmodule Client do
    alias LibRedis.Typespecs

    @type client :: struct()
    @type command_t :: [binary() | bitstring()]
    @type resp_t :: {:ok, term()} | {:error, term()}

    @callback new(keyword()) :: client()
    @callback start_link(keyword()) :: Typespecs.on_start()
    @callback command(client(), command_t(), keyword()) :: resp_t()
    @callback pipeline(client(), [command_t()], keyword()) :: resp_t()

    def command(client, command, opts), do: delegate(client, :command, [command, opts])
    def pipeline(client, commands, opts), do: delegate(client, :pipeline, [commands, opts])

    defp delegate(%module{} = client, func, args),
      do: apply(module, func, [client | args])
  end

  @doc """
  create a new redis client

  ## Options
  #{NimbleOptions.docs(@client_options_schema)}

  ## Examples

      iex> standalone_options = [
        name: :redis,
        mode: :standalone,
        url: "redis://:pwd@localhost:6379",
        pool_size: 5
      ]
      iex> LibRedis.new(standalone_options)
  """
  @spec new(options()) :: t()
  def new(opts \\ []) do
    opts = opts |> NimbleOptions.validate!(@client_options_schema)

    __MODULE__
    |> struct(opts)
    |> with_client()
  end

  defp with_client(%__MODULE__{mode: :standalone} = redis) do
    client =
      Standalone.new(
        name: redis.name,
        url: redis.url,
        pool_size: redis.pool_size
      )

    %{redis | client: client}
  end

  defp with_client(%__MODULE__{mode: :cluster} = redis) do
    client =
      Cluster.new(
        name: redis.name,
        urls: redis.url |> url_to_cluster_urls(),
        password: redis.password,
        pool_size: redis.pool_size,
        client_store_module: redis.client_store_module,
        slot_store_module: redis.slot_store_module
      )

    %{redis | client: client}
  end

  @doc """
  execute a redis command

  ## Examples

      iex> LibRedis.command(redis, ["SET", "key", "value"])
      {:ok, "OK"}
  """
  @spec command(t(), Client.command_t(), keyword()) :: Client.resp_t()
  def command(redis, command, opts \\ []) do
    Client.command(redis.client, command, opts)
  end


  @doc """
  execute a redis pipeline

  ## Examples

      iex> LibRedis.pipeline(redis, [["SET", "key", "value"], ["GET", "key"]])
      {:ok, ["OK", "value"]}
  """
  @spec pipeline(t(), [Client.command_t()], keyword()) :: Client.resp_t()
  def pipeline(redis, commands, opts \\ []) do
    Client.pipeline(redis.client, commands, opts)
  end

  def child_spec(opts) do
    redis = Keyword.fetch!(opts, :redis)
    %{id: {__MODULE__, redis.name}, start: {__MODULE__, :start_link, [opts]}}
  end

  @spec start_link(keyword()) :: GenServer.on_start()
  def start_link(opts) do
    {redis, _opts} = Keyword.pop!(opts, :redis)

    case redis.mode do
      :standalone ->
        LibRedis.Standalone.start_link(pool: redis.client)

      :cluster ->
        LibRedis.Cluster.start_link(cluster: redis.client)
    end
  end

  defp url_to_cluster_urls(url) do
    url
    |> String.split(",")
  end
end

defmodule LibRedis.Standalone do
  @moduledoc false

  alias LibRedis.{Client, Pool}

  @behaviour Client

  @impl Client
  defdelegate new(opts), to: Pool

  @impl Client
  defdelegate command(cli, command, opts), to: Pool

  @impl Client
  defdelegate pipeline(cli, commands, opts), to: Pool

  @impl Client
  defdelegate start_link(opts), to: Pool
end

defmodule LibRedis.Cluster do
  @moduledoc """
  cluster redis client
  """

  alias LibRedis.{Client, Typespecs}

  @behaviour Client

  # types
  @type t :: %__MODULE__{
          name: Typespecs.name(),
          urls: [Typespecs.url()],
          password: Typespecs.password(),
          pool_size: non_neg_integer(),
          refresh_interval_ms: non_neg_integer(),
          client_store_module: module(),
          slot_store_module: module()
        }

  @type node_info :: %{ip: bitstring(), port: non_neg_integer()}

  @type slot_info :: %{
          start_slot: non_neg_integer(),
          end_slot: non_neg_integer(),
          master: node_info(),
          replicas: [node_info()]
        }

  @url_regex ~r/^redis:\/\/\w+:\d+$/
  @enforce_keys ~w(name urls password pool_size refresh_interval_ms client_store_module slot_store_module)a

  defstruct @enforce_keys

  @impl Client
  def new(opts \\ []) do
    opts =
      opts
      |> Keyword.put_new(:name, :cluster)
      |> Keyword.put_new(:urls, ["redis://localhost:6379"])
      |> Keyword.put_new(:password, "")
      |> Keyword.put_new(:pool_size, 10)
      |> Keyword.put_new(:refresh_interval_ms, 10_000)
      |> Keyword.put_new(:client_store_module, LibRedis.ClientStore.Default)
      |> Keyword.put_new(:slot_store_module, LibRedis.SlotStore.Default)

    opts[:urls]
    |> Enum.map(fn url ->
      if not valid_redis_url?(url) do
        raise ArgumentError, "invalid url: #{url}"
      end
    end)

    struct(__MODULE__, opts)
  end

  defp valid_redis_url?(url), do: Regex.match?(@url_regex, url)

  def child_spec(opts) do
    cluster = Keyword.fetch!(opts, :cluster)
    %{id: {__MODULE__, cluster.name}, start: {__MODULE__, :start_link, [opts]}}
  end

  @impl Client
  @spec start_link(keyword()) :: GenServer.on_start()
  def start_link(opts) do
    {cluster, _opts} = Keyword.pop!(opts, :cluster)
    GenServer.start(__MODULE__, cluster, name: cluster.name)
  end

  @impl Client
  def command(client, command, opts \\ []) do
    GenServer.call(client.name, {:command, command, opts})
  end

  @impl Client
  def pipeline(client, commands, opts \\ []) do
    GenServer.call(client.name, {:pipeline, commands, opts})
  end

  defp url_with_password(url, ""), do: url

  defp url_with_password(url, password) do
    prefix = String.slice(url, 0, 8)
    prefix <> ":" <> password <> "@" <> String.slice(url, 8..-1)
  end

  defp random_name() do
    :crypto.strong_rand_bytes(16)
    |> Base.encode64()
    |> String.to_atom()
  end

  defp parse_url(url) do
    [ip, port] =
      url
      |> String.slice(8..-1)
      |> String.split(":")

    {ip, String.to_integer(port)}
  end

  def init(cluster) do
    slot_store = cluster.slot_store_module.new()
    client_store = cluster.client_store_module.new()
    {:ok, _} = cluster.slot_store_module.start_link(store: slot_store)
    {:ok, _} = cluster.client_store_module.start_link(store: client_store)

    cluster.urls
    |> Enum.map(fn url ->
      {host, port} = parse_url(url)

      cluster.client_store_module.new(
        host: host,
        port: port,
        opts: [
          name: random_name(),
          pool_size: cluster.pool_size,
          url: url_with_password(url, cluster.password)
        ]
      )
    end)
    |> Enum.each(&LibRedis.ClientStore.get(&1))

    send(self(), :refresh)

    {:ok, %{cluster: cluster, client_store: client_store, slot_store: slot_store}}
  end

  # Response Format:
  # - Start slot range
  # - End slot range
  # - Master for slot range represented as nested IP/Port array
  # - First replica of master for slot range
  # - Second replica
  # ...continues until all replicas for this master are returned.
  # Ref - https://redis.io/commands/cluster-slots#nested-result-array
  @spec parse_slot_info(any) :: [slot_info()]
  defp parse_slot_info(slot_info) do
    Enum.map(slot_info, fn [start_slot, end_slot, master | replicas] ->
      %{
        start_slot: start_slot,
        end_slot: end_slot,
        master: parse_node_info(master),
        replicas: Enum.map(replicas, &parse_node_info/1)
      }
    end)
  end

  defp parse_node_info([node_ip, node_port, _node_id | _] = _node) do
    %{
      ip: node_ip,
      port: node_port
    }
  end

  defp refetch_slot_info(client_store) do
    client_store
    |> LibRedis.ClientStore.random()
    |> LibRedis.Standalone.command(["CLUSTER", "SLOTS"], [])
    |> case do
      {:ok, slot_info} -> slot_info
      {:error, _} -> []
    end
    |> parse_slot_info()
  end

  def handle_info(:refresh, %{client_store: cs, slot_store: ss, cluster: cluster} = state) do
    cs
    |> refetch_slot_info()
    |> (fn x -> LibRedis.SlotStore.put(ss, x) end).()

    Process.send_after(self(), :refresh, cluster.refresh_interval_ms)

    {:noreply, state}
  end

  def handle_call({:command, command, opts}, _from, state) do
    try_command(state, command, opts, 3)
  end

  def handle_call({:pipeline, commands, opts}, _from, state) do
    try_pipeline(state, commands, opts)
  end

  defp try_command(state, command, opts, retries_left) do
    case do_command(state, command, opts) do
      {:ok, result} ->
        {:reply, {:ok, result}, state}

      {:error, %Redix.Error{message: msg}} when retries_left > 0 ->
        if String.starts_with?(msg, "MOVED") do
          send(self(), :refresh)
        end

        try_command(state, command, opts, retries_left - 1)

      {:error, _reason} when retries_left > 0 ->
        try_command(state, command, opts, retries_left - 1)

      {:error, reason} ->
        {:reply, {:error, reason}, state}
    end
  end

  defp try_pipeline(state, commands, opts) do
    res =
      group_commands(commands, state, %{})
      |> Map.to_list()
      |> Enum.map(fn {client, cmds} ->
        do_pipeline(client, cmds, opts, 3)
        |> case do
          {:ok, res} ->
            res

          {:error, :moved} ->
            try_pipeline(state, cmds, opts)

          {:error, error} ->
            raise error
        end
      end)
      |> List.flatten()

    reply =
      commands
      |> Enum.map(fn cmd ->
        {_, ret} =
          Enum.find(res, fn
            {^cmd, _} -> true
            _ -> false
          end)

        ret
      end)

    {:reply, {:ok, reply}, state}
  end

  defp do_command(
         %{cluster: cluster, slot_store: ss},
         [_, key | _] = command,
         opts
       ) do
    target = LibRedis.SlotFinder.hash_slot(key)

    ss
    |> LibRedis.SlotStore.get()
    |> Enum.find(fn x -> x.start_slot <= target and x.end_slot >= target end)
    |> case do
      nil ->
        {:error, "slot not found"}

      slot ->
        cluster.client_store_module.new(
          host: slot.master.ip,
          port: slot.master.port,
          opts: [
            name: random_name(),
            pool_size: cluster.pool_size,
            url:
              url_with_password(
                "redis://:#{cluster.password}@#{slot.master.ip}:#{slot.master.port}",
                ""
              )
          ]
        )
        |> LibRedis.ClientStore.get()
        |> LibRedis.Standalone.command(command, opts)
    end
  end

  defp do_pipeline(client, commands, opts, retries_left) do
    case LibRedis.Standalone.pipeline(client, commands, opts) do
      {:ok, result} ->
        {:ok, Enum.zip(commands, result)}

      {:error, %Redix.Error{message: msg}} when retries_left > 0 ->
        if String.starts_with?(msg, "MOVED") do
          send(self(), :refresh)
          {:error, :moved}
        else
          do_pipeline(client, commands, opts, retries_left - 1)
        end

      {:error, _reason} when retries_left > 0 ->
        do_pipeline(client, commands, opts, retries_left - 1)

      {:error, reason} ->
        {:error, reason}
    end
  end

  # group_commands takes a list of commands and groups them by slot.
  # returns a map of standalone-client -> command list
  defp group_commands([], _, acc), do: acc

  defp group_commands([command | t], %{slot_store: ss, cluster: cluster} = state, acc) do
    [_, key | _] = command
    target = LibRedis.SlotFinder.hash_slot(key)

    ss
    |> LibRedis.SlotStore.get()
    |> Enum.find(fn x -> x.start_slot <= target and x.end_slot >= target end)
    |> case do
      nil ->
        {:error, "slot not found"}

      slot ->
        cli =
          cluster.client_store_module.new(
            host: slot.master.ip,
            port: slot.master.port,
            opts: [
              name: random_name(),
              pool_size: cluster.pool_size,
              url:
                url_with_password(
                  "redis://:#{cluster.password}@#{slot.master.ip}:#{slot.master.port}",
                  ""
                )
            ]
          )
          |> LibRedis.ClientStore.get()

        group_commands(t, state, Map.update(acc, cli, [command], &(&1 ++ [command])))
    end
  end
end