lib/redix_clustered/slots.ex

defmodule RedixClustered.Slots do
  use GenServer

  import RedixClustered.Options, only: [slots_name: 1]

  @num_slots 16384
  @slots_key "cluster_slots"
  @refresh_debounce :timer.seconds(10)

  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, opts)
  end

  @impl true
  def init(opts) do
    {:ok, slots_name} = Keyword.fetch(opts, :name)
    :ets.new(slots_name, [:set, :protected, :named_table])
    {:ok, 0}
  end

  @impl true
  def handle_call({:last_refreshed}, _, time), do: {:reply, time, time}

  @impl true
  def handle_call({:set_last_refreshed, time}, _, _), do: {:reply, :ok, time}

  @impl true
  def handle_call({:set_slots, name, slots}, _, _) do
    :ets.insert(slots_name(name), {@slots_key, slots})
    {:reply, :ok, :os.system_time(:millisecond)}
  end

  @impl true
  def handle_cast({:refresh, name, conn}, last_refreshed) do
    if :os.system_time(:millisecond) - last_refreshed > @refresh_debounce do
      :ets.insert(slots_name(name), {@slots_key, get_cluster_slots(conn)})
      {:noreply, :os.system_time(:millisecond)}
    else
      {:noreply, last_refreshed}
    end
  end

  # calls into genserver, mostly for testing
  def last_refreshed(name), do: GenServer.call(slots_name(name), {:last_refreshed})
  def set_last_refreshed(n, t), do: GenServer.call(slots_name(n), {:set_last_refreshed, t})
  def set_slots(name, slots), do: GenServer.call(slots_name(name), {:set_slots, name, slots})

  # background-refresh cluster slots
  def refresh(name, conn), do: GenServer.cast(slots_name(name), {:refresh, name, conn})

  # lookup slots
  def slots(name) do
    case :ets.lookup(slots_name(name), @slots_key) do
      [{@slots_key, val}] -> val
      _ -> []
    end
  end

  # lookup the slot for a key
  def lookup(name, key), do: slots(name) |> find_node(key)

  def find_node(_nodes, nil), do: nil
  def find_node([], _key), do: nil

  def find_node(nodes, key) do
    slot = hash(key)

    Enum.find_value(nodes, fn {node, range} ->
      if slot in range do
        node
      end
    end)
  end

  def hash(key) when is_atom(key), do: hash("#{key}")

  def hash(key) do
    hash_key =
      case String.split(key, "{", parts: 2) do
        [_, rest] ->
          case String.split(rest, "}", parts: 2) do
            ["", _rest] -> key
            [str, _rest] -> str
            _ -> key
          end

        _ ->
          key
      end

    RedixClustered.CRC16.hash(hash_key) |> rem(@num_slots)
  end

  def get_cluster_slots(conn) do
    case Redix.command(conn, ["CLUSTER", "SLOTS"]) do
      {:ok, slots} -> parse_cluster_slots(slots)
      _err -> []
    end
  end

  def parse_cluster_slots(slots) do
    slots
    |> Enum.map(&parse_slot/1)
    |> Enum.filter(& &1)
  end

  defp parse_slot([range_start, range_end, [ip, port, _] | _replicas]) do
    {"#{ip}:#{port}", range_start..range_end}
  end

  defp parse_slot(_), do: nil
end