lib/cluster_kv.ex

defmodule ClusterKV do
  @moduledoc """
  Documentation for ClusterKV.
  """
  use Supervisor
  alias ClusterKV.{DB, ETSTable, Ring}

  @spec start_link(
          name: atom(),
          topologies: list(),
          replicas: integer(),
          quorum: integer()
        ) ::
          {:ok, pid()}
          | {:error, {:already_started, pid()} | {:shutdown, term()} | term()}

  def start_link(
        name: name,
        topologies: topologies,
        replicas: replicas,
        quorum: quorum
      ) do
    Supervisor.start_link(__MODULE__, {name, topologies, replicas, quorum}, name: name)
  end

  def init({name, topologies, replicas, quorum}) do
    db = db_name(name)
    ring = ring_name(name)
    cluster_supervisor = cluster_supervisor_name(name)
    ets = ets_name(name)
    pool_size = System.schedulers()
    ring_data = %Ring{name: ring, replicas: replicas, quorum: quorum, db: db}

    children = [
      {ETSTable, [name: ets]},
      :poolboy.child_spec(
        db,
        [name: {:local, db}, worker_module: DB, size: pool_size, max_overflow: pool_size],
        table: ets
      ),
      {Ring, [{:local, ring}, ring_data, []]},
      {Cluster.Supervisor, [topologies, [name: cluster_supervisor]]}
    ]

    Supervisor.init(children, strategy: :rest_for_one, max_restarts: 5, max_seconds: 10)
  end

  def db_name(name), do: Module.concat([name, DB])
  def ets_name(name), do: Module.concat([name, ETSTable])
  def ring_name(name), do: Module.concat([name, Ring])
  def cluster_supervisor_name(name), do: Module.concat([name, ClusterSupervisor])

  defdelegate get(name, keyspace, key, timeout \\ :infinity), to: Ring
  defdelegate put(name, keyspace, key, value), to: Ring
  defdelegate update(name, keyspace, key, value, fun), to: Ring
  defdelegate update_if(name, keyspace, key, value, fun), to: Ring
  defdelegate batch(name, keyspace, batch), to: Ring
  defdelegate get_wildcard_key(key, split_on, join, wildcard), to: Ring
  defdelegate put_wildcard(name, keyspace, key, value, split_on, join, wildcard), to: Ring
  defdelegate prefix(name, keyspace, key, split_on, min, timeout \\ :infinity), to: Ring
  defdelegate stream(name, timeout \\ :infinity), to: Ring

  defdelegate wildcard(name, keyspace, key, split_on, join, wildcard, timeout \\ :infinity),
    to: Ring
end