lib/nebulex_redis_adapter.ex

defmodule NebulexRedisAdapter do
  @moduledoc ~S"""
  Nebulex adapter for Redis. This adapter is implemented using `Redix`,
  a Redis driver for Elixir.

  **NebulexRedisAdapter** provides three setup alternatives:

    * **Standalone** - The adapter establishes a pool of connections
      with a single Redis node. The `:standalone` is the default mode.

    * **Redis Cluster** - [Redis Cluster](https://redis.io/topics/cluster-tutorial)
      is a built-in feature in Redis since version 3, and it may be the most
      convenient and recommendable way to set up Redis in a cluster and have
      a distributed cache storage out-of-box. This adapter provides the
      `:redis_cluster` mode to set up **Redis Cluster** from the client-side
      automatically and be able to use it transparently.

    * **Built-in client-side cluster based on sharding** - This adapter
      provides a simple client-side cluster implementation based on
      Sharding distribution model via `:client_side_cluster` mode.

  ## Shared Options

  In addition to `Nebulex.Cache` shared options, this adapters supports the
  following options:

    * `:mode` - Defines the mode Redis will be set up. It can be one of the
      next values: `:standalone`, `:client_side_cluster`, `:redis_cluster`.
      Defaults to `:standalone`.

    * `:pool_size` - Number of connections in the pool. Defaults to
      `System.schedulers_online()`.

    * `:conn_opts` - Redis client options (`Redix` options in this case).
      For more information about connection options, see `Redix` docs.

  ## Telemetry events

  This adapter emits the recommended Telemetry events.
  See the "Telemetry events" section in `Nebulex.Cache`
  for more information.

  ## TTL or Expiration Time

  As is explained in `Nebulex.Cache`, most of the write-like functions support
  the `:ttl` option to define the expiration time, and it is defined in
  **milliseconds**. Despite Redis work with **seconds**, the conversion logic
  is handled by the adapter transparently, so when using a cache even with the
  Redis adapter, be sure you pass the `:ttl` option in **milliseconds**.

  ## Data Types

  This adapter only works with strings internally, which means the given
  Elixir terms are encoded to binaries before executing the Redis command.
  The encoding/decoding process is performed by the adapter under-the-hood,
  so it is completely transparent for the user.

  **NOTE:** Support for other Redis Data Types is in the roadmap.

  ## Standalone

  We can define a cache to use Redis as follows:

      defmodule MyApp.RedisCache do
        use Nebulex.Cache,
          otp_app: :nebulex,
          adapter: NebulexRedisAdapter
      end

  The configuration for the cache must be in your application environment,
  usually defined in your `config/config.exs`:

      config :my_app, MyApp.RedisCache,
        conn_opts: [
          host: "127.0.0.1",
          port: 6379
        ]

  ## Redis Cluster

  We can define a cache to use Redis Cluster as follows:

      defmodule MyApp.RedisClusterCache do
        use Nebulex.Cache,
          otp_app: :nebulex,
          adapter: NebulexRedisAdapter
      end

  The config:

      config :my_app, MyApp.RedisClusterCache,
        mode: :redis_cluster,
        master_nodes: [
          [
            host: "127.0.0.1",
            port: 7000
          ],
          [
            url: "redis://127.0.0.1:7001"
          ],
          [
            url: "redis://127.0.0.1:7002"
          ]
        ],
        conn_opts: [
          # Redix options, except `:host` and `:port`; unless we have a cluster
          # of nodes with the same host and/or port, which doesn't make sense.
        ]

  ### Redis Cluster Options

  In addition to shared options, `:redis_cluster` mode supports the following
  options:

    * `:master_nodes` - The list with the configuration for the Redis cluster
      master nodes. The configuration for each master nodes contains the same
      options as `:conn_opts`. The adapter traverses the list trying to
      establish connection at least with one of them and get the cluster slots
      to finally setup the Redis cluster from client side properly. If one
      fails, the adapter retries with the next in the list, that's why at least
      one master node must be set.

    * `:conn_opts` - Same as shared options (optional). The `:conn_opts` will
      be applied to each connection pool with the cluster (they will override
      the host and port retrieved from cluster slots info). For that reason,
      be careful when setting `:host` or `:port` options since they will be
      used globally and can cause connection issues. Normally, we add here
      the desired client options except `:host` and `:port`. If you have a
      cluster with the same host for all nodes, in that case make sense to
      add also the `:host` option.

    * `:pool_size` - Same as shared options (optional). It applies to all
      cluster slots, meaning all connection pools will have the same size.

  ## Client-side cluster

  We can define a cache with "client-side cluster mode" as follows:

      defmodule MyApp.ClusteredCache do
        use Nebulex.Cache,
          otp_app: :nebulex,
          adapter: NebulexRedisAdapter
      end

  The config:

      config :my_app, MyApp.ClusteredCache,
        mode: :client_side_cluster,
        nodes: [
          node1: [
            pool_size: 10,
            conn_opts: [
              host: "127.0.0.1",
              port: 9001
            ]
          ],
          node2: [
            pool_size: 4,
            conn_opts: [
              url: "redis://127.0.0.1:9002"
            ]
          ],
          node3: [
            conn_opts: [
              host: "127.0.0.1",
              port: 9003
            ]
          ]
        ]

  By default, the adapter uses `NebulexRedisAdapter.ClientCluster.Keyslot` for
  the keyslot. Besides, if `:jchash` is defined as dependency, the adapter will
  use consistent-hashing automatically. However, you can also provide your own
  implementation by implementing the `Nebulex.Adapter.Keyslot` and set it into
  the `:keyslot` option. For example:

      defmodule MyApp.ClusteredCache.Keyslot do
        use Nebulex.Adapter.Keyslot

        @impl true
        def hash_slot(key, range) do
          # your implementation goes here
        end
      end

  And the config:

      config :my_app, MyApp.ClusteredCache,
        mode: :client_side_cluster,
        keyslot: MyApp.ClusteredCache.Keyslot,
        nodes: [
          ...
        ]

  ### Client-side cluster options

  In addition to shared options, `:client_side_cluster` mode supports the
  following options:

    * `:nodes` - The list of nodes the adapter will setup the cluster with;
      a pool of connections is established per node. The `:client_side_cluster`
      mode enables resilience to be able to survive in case any node(s) gets
      unreachable. For each element of the list, we set the configuration
      for each node, such as `:conn_opts`, `:pool_size`, etc.

    * `:keyslot` - Defines the module implementing `Nebulex.Adapter.Keyslot`
      behaviour, used to compute the node where the command will be applied to.
      It is highly recommendable to provide a consistent hashing implementation.

  ## Queryable API

  Since the queryable API is implemented by using `KEYS` command:

    * Only strings (`String.t()`) are allowed as query parameter.
    * Only keys can be queried.

  ### Examples

      iex> MyApp.RedisCache.put_all(%{
      ...>   "firstname" => "Albert",
      ...>   "lastname" => "Einstein",
      ...>   "age" => 76
      ...> })
      :ok

      iex> MyApp.RedisCache.all("**name**")
      ["firstname", "lastname"]

      iex> MyApp.RedisCache.all("a??")
      ["age"]

      iex> MyApp.RedisCache.all()
      ["age", "firstname", "lastname"]

      iex> stream = TestCache.stream("**name**")
      iex> stream |> Enum.to_list()
      ["firstname", "lastname"]

      # get the values for the returned queried keys
      iex> "**name**" |> MyApp.RedisCache.all() |> MyApp.RedisCache.get_all()
      %{"firstname" => "Albert", "lastname" => "Einstein"}

  ## Using the cache for executing a Redis command or pipeline

  Since `NebulexRedisAdapter` works on top of `Redix` and provides features like
  connection pools and "Redis Cluster" support, it may be seen also as a sort of
  Redis client, but it is meant to be used mainly with the Nebulex cache API.
  However, Redis API is quite extensive and there are a lot of useful commands
  we may want to run taking advantage of the `NebulexRedisAdapter` features.
  Therefore, the adapter injects two additional/extended functions to the
  defined cache: `command!/3` and `pipeline!/3`.

  ### `command!(key \\ nil, name \\ __MODULE__, command)`

      iex> MyCache.command!("mylist", ["LPUSH", "mylist", "world"])
      1
      iex> MyCache.command!("mylist", ["LPUSH", "mylist", "hello"])
      2
      iex> MyCache.command!("mylist", ["LRANGE", "mylist", "0", "-1"])
      ["hello", "world"]

  ### `pipeline!(key \\ nil, name \\ __MODULE__, commands)`

      iex> cache.pipeline!("mylist", [
      ...>   ["LPUSH", "mylist", "world"],
      ...>   ["LPUSH", "mylist", "hello"],
      ...>   ["LRANGE", "mylist", "0", "-1"]
      ...> ])
      [1, 2, ["hello", "world"]]

  Arguments for `command!/3` and `pipeline!/3`:

    * `key` - it is required when used the adapter in mode `:redis_cluster`
      or `:client_side_cluster` so that the node where the commands will
      take place can be selected properly. For `:standalone` it is optional.
    * `name` - The name of the cache in case you are using dynamic caches,
      otherwise it is not required.
    * `commands` - Redis commands.

  ## Transactions

  This adapter doesn't provide support for transactions, since there is no way
  to guarantee its execution on Redis itself, at least not in the way the
  `c:Nebulex.Adapter.Transaction.transaction/3` works, because the anonymous
  function can have any kind of logic, which cannot be translated easily into
  Redis commands.

  > In the future, it is planned to add to Nebulex a `multi`-like function to
    perform multiple commands at once, perhaps that will be the best way to
    perform [transactions via Redis](https://redis.io/topics/transactions).

  """

  # Provide Cache Implementation
  @behaviour Nebulex.Adapter
  @behaviour Nebulex.Adapter.Entry
  @behaviour Nebulex.Adapter.Queryable

  import Nebulex.Adapter
  import Nebulex.Helpers
  import NebulexRedisAdapter.Encoder

  use Nebulex.Adapter.Stats

  alias Nebulex.Adapter
  alias Nebulex.Adapter.Stats
  alias NebulexRedisAdapter.{ClientCluster, Command, Connection, RedisCluster}

  ## Nebulex.Adapter

  @impl true
  defmacro __before_compile__(_env) do
    quote do
      @doc """
      A convenience function for executing a Redis command.
      """
      def command(key \\ nil, name \\ __MODULE__, command) do
        Adapter.with_meta(name, fn _, meta ->
          Command.exec(meta, command, key)
        end)
      end

      @doc """
      A convenience function for executing a Redis command,
      but raises an exception if an error occurs.
      """
      def command!(key \\ nil, name \\ __MODULE__, command) do
        Adapter.with_meta(name, fn _, meta ->
          Command.exec!(meta, command, key)
        end)
      end

      @doc """
      A convenience function for executing a Redis pipeline.
      """
      def pipeline(key \\ nil, name \\ __MODULE__, commands) do
        Adapter.with_meta(name, fn _, meta ->
          Command.pipeline(meta, commands, key)
        end)
      end

      @doc """
      A convenience function for executing a Redis pipeline,
      but raises an exception if an error occurs.
      """
      def pipeline!(key \\ nil, name \\ __MODULE__, commands) do
        Adapter.with_meta(name, fn _, meta ->
          Command.pipeline!(meta, commands, key)
        end)
      end
    end
  end

  @impl true
  def init(opts) do
    # Required cache name
    name = opts[:name] || Keyword.fetch!(opts, :cache)

    # Init stats
    stats_counter = Stats.init(opts)

    # Adapter mode
    mode = Keyword.get(opts, :mode, :standalone)

    # Local registry
    registry = normalize_module_name([name, Registry])

    # Resolve the pool size
    pool_size =
      get_option(
        opts,
        :pool_size,
        "an integer > 0",
        &(is_integer(&1) and &1 > 0),
        System.schedulers_online()
      )

    # Keyslot module for selecting nodes
    keyslot =
      if keyslot = Keyword.get(opts, :keyslot) do
        assert_behaviour(keyslot, Nebulex.Adapter.Keyslot, "keyslot")
      end

    # Cluster nodes
    nodes =
      for {node_name, node_opts} <- Keyword.get(opts, :nodes, []) do
        {node_name, Keyword.get(node_opts, :pool_size, pool_size)}
      end

    # Init adapter metadata
    adapter_meta = %{
      cache_pid: self(),
      name: name,
      mode: mode,
      keyslot: keyslot,
      nodes: nodes,
      pool_size: pool_size,
      stats_counter: stats_counter,
      registry: registry,
      started_at: DateTime.utc_now(),
      default_dt: Keyword.get(opts, :default_data_type, :object),
      telemetry: Keyword.fetch!(opts, :telemetry),
      telemetry_prefix: Keyword.fetch!(opts, :telemetry_prefix)
    }

    # Init the connections child spec according to the adapter mode
    {conn_child_spec, adapter_meta} = do_init(adapter_meta, opts)

    # Build the child spec
    child_spec =
      Nebulex.Adapters.Supervisor.child_spec(
        name: normalize_module_name([name, Supervisor]),
        strategy: :one_for_all,
        children: [
          {NebulexRedisAdapter.BootstrapServer, adapter_meta},
          {Registry, name: registry, keys: :unique},
          conn_child_spec
        ]
      )

    {:ok, child_spec, adapter_meta}
  end

  defp do_init(%{mode: :standalone} = adapter_meta, opts) do
    Connection.init(adapter_meta, opts)
  end

  defp do_init(%{mode: :redis_cluster} = adapter_meta, opts) do
    RedisCluster.init(adapter_meta, opts)
  end

  defp do_init(%{mode: :client_side_cluster} = adapter_meta, opts) do
    ClientCluster.init(adapter_meta, opts)
  end

  ## Nebulex.Adapter.Entry

  @impl true
  defspan get(adapter_meta, key, _opts) do
    with_pipeline(adapter_meta, key, [["GET", encode(key)]])
  end

  @impl true
  defspan get_all(adapter_meta, keys, _opts) do
    do_get_all(adapter_meta, keys)
  end

  defp do_get_all(%{mode: :standalone} = adapter_meta, keys) do
    mget(nil, adapter_meta, keys)
  end

  defp do_get_all(adapter_meta, keys) do
    keys
    |> group_keys_by_hash_slot(adapter_meta)
    |> Enum.reduce(%{}, fn {hash_slot, keys}, acc ->
      return = mget(hash_slot, adapter_meta, keys)
      Map.merge(acc, return)
    end)
  end

  defp mget(hash_slot_key, adapter_meta, keys) do
    adapter_meta
    |> Command.exec!(["MGET" | for(k <- keys, do: encode(k))], hash_slot_key)
    |> Enum.reduce({keys, %{}}, fn
      nil, {[_key | keys], acc} ->
        {keys, acc}

      value, {[key | keys], acc} ->
        {keys, Map.put(acc, key, decode(value))}
    end)
    |> elem(1)
  end

  @impl true
  defspan put(adapter_meta, key, value, ttl, on_write, opts) do
    cmd_opts = cmd_opts(action: on_write, ttl: fix_ttl(ttl))
    redis_k = encode(key)
    redis_v = encode(value, opts)

    case Command.exec!(adapter_meta, ["SET", redis_k, redis_v | cmd_opts], key) do
      "OK" -> true
      nil -> false
    end
  end

  @impl true
  defspan put_all(adapter_meta, entries, ttl, on_write, opts) do
    ttl = fix_ttl(ttl)

    case adapter_meta.mode do
      :standalone ->
        do_put_all(adapter_meta, nil, entries, ttl, on_write, opts)

      _ ->
        entries
        |> group_keys_by_hash_slot(adapter_meta)
        |> Enum.reduce(:ok, fn {hash_slot, group}, acc ->
          acc && do_put_all(adapter_meta, hash_slot, group, ttl, on_write, opts)
        end)
    end
  end

  defp do_put_all(adapter_meta, hash_slot, entries, ttl, on_write, opts) do
    cmd =
      case on_write do
        :put -> "MSET"
        :put_new -> "MSETNX"
      end

    {mset, expire} =
      Enum.reduce(entries, {[cmd], []}, fn {key, val}, {acc1, acc2} ->
        redis_k = encode(key)

        acc2 =
          if is_integer(ttl),
            do: [["EXPIRE", redis_k, ttl] | acc2],
            else: acc2

        {[encode(val, opts), redis_k | acc1], acc2}
      end)

    adapter_meta
    |> Command.pipeline!([Enum.reverse(mset) | expire], hash_slot)
    |> hd()
    |> case do
      "OK" -> :ok
      1 -> true
      0 -> false
    end
  end

  @impl true
  defspan delete(adapter_meta, key, _opts) do
    _ = Command.exec!(adapter_meta, ["DEL", encode(key)], key)

    :ok
  end

  @impl true
  defspan take(adapter_meta, key, _opts) do
    redis_k = encode(key)

    with_pipeline(adapter_meta, key, [["GET", redis_k], ["DEL", redis_k]])
  end

  @impl true
  defspan has_key?(adapter_meta, key) do
    case Command.exec!(adapter_meta, ["EXISTS", encode(key)], key) do
      1 -> true
      0 -> false
    end
  end

  @impl true
  defspan ttl(adapter_meta, key) do
    case Command.exec!(adapter_meta, ["TTL", encode(key)], key) do
      -1 -> :infinity
      -2 -> nil
      ttl -> ttl * 1000
    end
  end

  @impl true
  defspan expire(adapter_meta, key, ttl) do
    do_expire(adapter_meta, key, ttl)
  end

  defp do_expire(adapter_meta, key, :infinity) do
    redis_k = encode(key)

    case Command.pipeline!(adapter_meta, [["TTL", redis_k], ["PERSIST", redis_k]], key) do
      [-2, 0] -> false
      [_, _] -> true
    end
  end

  defp do_expire(adapter_meta, key, ttl) do
    case Command.exec!(adapter_meta, ["EXPIRE", encode(key), fix_ttl(ttl)], key) do
      1 -> true
      0 -> false
    end
  end

  @impl true
  defspan touch(adapter_meta, key) do
    case Command.exec!(adapter_meta, ["TOUCH", encode(key)], key) do
      1 -> true
      0 -> false
    end
  end

  @impl true
  defspan update_counter(adapter_meta, key, incr, ttl, default, _opts) do
    do_update_counter(adapter_meta, key, incr, ttl, default)
  end

  defp do_update_counter(adapter_meta, key, incr, :infinity, default) do
    redis_k = encode(key)

    adapter_meta
    |> maybe_incr_default(key, redis_k, default)
    |> Command.exec!(["INCRBY", redis_k, incr], key)
  end

  defp do_update_counter(adapter_meta, key, incr, ttl, default) do
    redis_k = encode(key)

    adapter_meta
    |> maybe_incr_default(key, redis_k, default)
    |> Command.pipeline!([["INCRBY", redis_k, incr], ["EXPIRE", redis_k, fix_ttl(ttl)]], key)
    |> hd()
  end

  defp maybe_incr_default(adapter_meta, key, redis_k, default)
       when is_integer(default) and default > 0 do
    case Command.exec!(adapter_meta, ["EXISTS", redis_k], key) do
      1 ->
        adapter_meta

      0 ->
        _ = Command.exec!(adapter_meta, ["INCRBY", redis_k, default], key)
        adapter_meta
    end
  end

  defp maybe_incr_default(adapter_meta, _, _, _), do: adapter_meta

  ## Nebulex.Adapter.Queryable

  @impl true
  defspan execute(adapter_meta, operation, query, _opts) do
    do_execute(adapter_meta, operation, query)
  end

  defp do_execute(%{mode: mode} = adapter_meta, :count_all, nil) do
    exec!(mode, [adapter_meta, ["DBSIZE"]], [0, &Kernel.+(&2, &1)])
  end

  defp do_execute(%{mode: mode} = adapter_meta, :delete_all, nil) do
    size = exec!(mode, [adapter_meta, ["DBSIZE"]], [0, &Kernel.+(&2, &1)])
    _ = exec!(mode, [adapter_meta, ["FLUSHDB"]], [])

    size
  end

  defp do_execute(adapter_meta, :all, query) do
    execute_query(query, adapter_meta)
  end

  @impl true
  defspan stream(adapter_meta, query, _opts) do
    Stream.resource(
      fn ->
        execute_query(query, adapter_meta)
      end,
      fn
        [] -> {:halt, []}
        elems -> {elems, []}
      end,
      & &1
    )
  end

  @impl Nebulex.Adapter.Stats
  def stats(%{started_at: started_at} = adapter_meta) do
    if stats = super(adapter_meta) do
      %{stats | metadata: Map.put(stats.metadata, :started_at, started_at)}
    end
  end

  ## Private Functions

  defp with_pipeline(adapter_meta, key, pipeline) do
    adapter_meta
    |> Command.pipeline!(pipeline, key)
    |> hd()
    |> decode()
  end

  defp cmd_opts(keys), do: Enum.reduce(keys, [], &cmd_opts/2)

  defp cmd_opts({:action, :put}, acc), do: acc
  defp cmd_opts({:action, :put_new}, acc), do: ["NX" | acc]
  defp cmd_opts({:action, :replace}, acc), do: ["XX" | acc]
  defp cmd_opts({:ttl, :infinity}, acc), do: acc
  defp cmd_opts({:ttl, ttl}, acc), do: ["EX", "#{ttl}" | acc]

  defp fix_ttl(:infinity), do: :infinity
  defp fix_ttl(ttl) when is_integer(ttl) and ttl >= 1000, do: div(ttl, 1000)

  defp fix_ttl(ttl) do
    raise ArgumentError,
          "expected ttl: to be an integer >= 1000 or :infinity, got: #{inspect(ttl)}"
  end

  defp execute_query(nil, adapter_meta) do
    "*"
    |> execute_query(adapter_meta)
    |> Enum.map(&decode/1)
  end

  defp execute_query(pattern, %{mode: mode} = adapter_meta) when is_binary(pattern) do
    exec!(mode, [adapter_meta, ["KEYS", pattern]], [[], &Kernel.++(&1, &2)])
  end

  defp execute_query(pattern, _adapter_meta) do
    raise Nebulex.QueryError, message: "invalid pattern", query: pattern
  end

  defp exec!(:standalone, args, _extra_args) do
    apply(Command, :exec!, args)
  end

  defp exec!(:client_side_cluster, args, extra_args) do
    apply(ClientCluster, :exec!, args ++ extra_args)
  end

  defp exec!(:redis_cluster, args, extra_args) do
    apply(RedisCluster, :exec!, args ++ extra_args)
  end

  defp group_keys_by_hash_slot(enum, %{mode: :client_side_cluster, nodes: nodes, keyslot: keyslot}) do
    ClientCluster.group_keys_by_hash_slot(enum, nodes, keyslot)
  end

  defp group_keys_by_hash_slot(enum, %{mode: :redis_cluster, keyslot: keyslot}) do
    RedisCluster.group_keys_by_hash_slot(enum, keyslot)
  end
end