lib/redix_clustered/scanner.ex

defmodule RedixClustered.Scanner do
  alias RedixClustered.Conn
  alias RedixClustered.Namespace

  @max_scan_limit 1000

  # handle scanning a redis cluster, including prefixes and multiple-nodes
  def scan(pattern), do: scan(nil, pattern, [])
  def scan(pattern, opts) when is_list(opts), do: scan(nil, pattern, opts)
  def scan(cluster_name, pattern), do: scan(cluster_name, pattern, [])

  def scan(cluster_name, pattern, opts) do
    scan_nodes(cluster_name, pattern, opts, nodes(cluster_name), 0, [])
  end

  def nuke(pattern), do: nuke(nil, pattern, [])
  def nuke(pattern, opts) when is_list(opts), do: nuke(nil, pattern, opts)
  def nuke(cluster_name, pattern), do: nuke(cluster_name, pattern, [])

  def nuke(cluster_name, pattern, opts) do
    case Mix.env() do
      :test ->
        {:ok, keys} = scan(cluster_name, pattern, opts)
        Enum.each(keys, &RedixClustered.command(cluster_name, ["DEL", &1]))
        length(keys)
    end
  end

  defp scan_nodes(_name, _pattern, _opts, [], _cursor, acc), do: {:ok, acc}

  # COMPLICATED: have to iterate over ALL nodes in cluster-mode
  defp scan_nodes(name, pattern, opts, [node | rest], cursor, acc) do
    if length(acc) >= Keyword.get(opts, :limit, Infinity) do
      {:ok, Enum.take(acc, Keyword.get(opts, :limit))}
    else
      key = prefix(name, pattern, opts)
      scan_limit = Enum.min([Keyword.get(opts, :limit, @max_scan_limit), @max_scan_limit])

      case Conn.command(name, ["SCAN", cursor, "MATCH", key, "COUNT", scan_limit], node) do
        {:ok, ["0", keys]} ->
          scan_nodes(name, pattern, opts, rest, 0, acc ++ unprefix(name, keys, opts))

        {:ok, [next, keys]} ->
          scan_nodes(name, pattern, opts, [node] ++ rest, next, acc ++ unprefix(name, keys, opts))

        err ->
          err
      end
    end
  end

  defp prefix(cluster_name, key, opts) do
    if Keyword.get(opts, :namespace, true) do
      Namespace.prefix(cluster_name, key)
    else
      key
    end
  end

  defp unprefix(cluster_name, key, opts) do
    if Keyword.get(opts, :namespace, true) do
      Namespace.unprefix(cluster_name, key)
    else
      key
    end
  end

  defp nodes(cluster_name) do
    case Conn.command(cluster_name, ["CLUSTER", "SLOTS"]) do
      {:ok, slots} -> Enum.map(slots, &slot_to_node/1)
      _err -> [nil]
    end
  end

  defp slot_to_node([_from, _to, [ip, port, _id] | _rest]), do: "#{ip}:#{port}"
end