lib/redix_clustered/conn.ex

defmodule RedixClustered.Conn do
  @max_redirects 5

  alias RedixClustered.Registry
  alias RedixClustered.Slots

  def nodes(name), do: Registry.nodes(name)

  def command(name, cmd), do: command(name, cmd, @max_redirects)
  def command(name, cmd, "" <> node), do: command(name, cmd, node, @max_redirects)

  def command(name, cmd, max_redirects) do
    get_key(cmd)
    |> registry_lookup(name)
    |> follow_redirects(name, &Redix.command/2, cmd, max_redirects)
  end

  def command(name, cmd, node, max_redirects) do
    node
    |> registry_connect(name)
    |> follow_redirects(name, &Redix.command/2, cmd, max_redirects)
  end

  def pipeline(name, cmds), do: pipeline(name, cmds, @max_redirects)
  def pipeline(name, cmds, "" <> node), do: pipeline(name, cmds, node, @max_redirects)

  def pipeline(name, cmds, max_redirects) do
    get_pipeline_key(cmds)
    |> registry_lookup(name)
    |> follow_redirects(name, &Redix.pipeline/2, cmds, max_redirects)
  end

  def pipeline(name, cmds, node, max_redirects) do
    node
    |> registry_connect(name)
    |> follow_redirects(name, &Redix.pipeline/2, cmds, max_redirects)
  end

  def get_key(["CLUSTER" | _]), do: nil
  def get_key([_, key | _]), do: key
  def get_key(_), do: nil

  def get_pipeline_key([["MULTI"] | rest_cmds]), do: get_pipeline_key(rest_cmds)
  def get_pipeline_key([first_cmd | _rest_cmds]), do: get_key(first_cmd)
  def get_pipeline_key(_), do: nil

  defp registry_lookup(key, cluster_name), do: Registry.lookup(cluster_name, key)
  defp registry_connect(node, cluster_name), do: Registry.connect(cluster_name, node)

  defp follow_redirects(pid, name, redix_fn, args, max) do
    follow_redirects(pid, name, redix_fn, args, max, 0)
  end

  defp follow_redirects(_pid, _name, _redix_fn, _args, max, attempt) when attempt > max do
    {:error, "Max redis redirects reached after #{attempt}"}
  end

  defp follow_redirects(pid, name, redix_fn, args, max, attempt) do
    # TODO: ASK support
    case redix_fn.(pid, args) do
      {:error, %Redix.Error{message: "MOVED " <> moved}} ->
        refresh_and_follow(pid, name, redix_fn, args, max, attempt, moved)

      # ASSUME pipeline errors are attempting to set the same key. which is a
      # terrible assumption, but i'm a terrible person.
      {:ok, [%Redix.Error{message: "MOVED " <> moved} | _rest]} ->
        refresh_and_follow(pid, name, redix_fn, args, max, attempt, moved)

      result ->
        result
    end
  end

  defp refresh_and_follow(pid, name, redix_fn, args, max, attempt, moved) do
    Slots.refresh(name, pid)

    moved
    |> registry_connect(name)
    |> follow_redirects(name, redix_fn, args, max, attempt + 1)
  end
end