lib/xandra/cluster/load_balancing_policy/dc_aware_round_robin.ex

defmodule Xandra.Cluster.LoadBalancingPolicy.DCAwareRoundRobin do
  @moduledoc since: "0.15.0"

  alias Xandra.Cluster.Host

  @behaviour Xandra.Cluster.LoadBalancingPolicy

  @init_opts_schema NimbleOptions.new!(
                      local_data_center: [
                        type: {:or, [:string, {:in, [:from_first_peer]}]},
                        default: :from_first_peer,
                        doc: """
                        The local data center. If `:from_first_peer`, this policy picks the local
                        data center from the first peer that is added to the cluster. This is
                        usually the first peer that the control connection connects to, which
                        (if everything goes well) is the first peer listed in the `:nodes`
                        option passed to `Xandra.Cluster.start_link/1`. If you want to force
                        a specific data center to be used as the local data center, you can
                        pass a string as the value of this option (such as `"datacenter1"`).
                        """
                      ]
                    )

  @moduledoc """
  A `Xandra.Cluster.LoadBalancingPolicy` that prefers hosts in a "local" data center.

  This policy uses a **round-robin strategy** to pick hosts, giving precedence to hosts
  in a "local" data center. The local data center is determined by the
  `:local_data_center` option (see below). "Giving precedence" means that
  `hosts_plan/1` and `query_plan/1` will return a list of hosts where first there are all
  the local hosts that are *up*, and then all the remote hosts that are *up*.

  The round-robin strategy is applied to local and remote hosts separately. For example,
  say the local hosts are `LH1`, `LH2`, `LH3`, and the remote hosts are `RH1`, `RH2`, `RH3`.
  The first time, this policy will return `[LH1, RH1, LH2, RH2, LH3, RH3]`. The second time,
  it will return `[LH2, LH3, LH1, RH2, RH3, RH1]`. And so on.

  This policy is available since Xandra *v0.15.0*.

  ## Options

  This policy supports the following initialization options:

  #{NimbleOptions.docs(@init_opts_schema)}
  """

  defstruct [:local_dc, local_hosts: [], remote_hosts: []]

  @impl true
  def init(options) do
    options = NimbleOptions.validate!(options, @init_opts_schema)

    case Keyword.fetch!(options, :local_data_center) do
      :from_first_peer -> %__MODULE__{}
      local_data_center -> %__MODULE__{local_dc: local_data_center}
    end
  end

  @impl true
  def host_added(hosts, new_host)

  def host_added(%__MODULE__{local_dc: nil, local_hosts: []} = state, %Host{} = host) do
    %__MODULE__{state | local_dc: host.data_center, local_hosts: [{host, :up}]}
  end

  def host_added(%__MODULE__{local_dc: local_dc} = state, %Host{data_center: local_dc} = host)
      when is_binary(local_dc) do
    update_in(state.local_hosts, &(&1 ++ [{host, :up}]))
  end

  def host_added(%__MODULE__{} = state, %Host{} = host) do
    update_in(state.remote_hosts, &(&1 ++ [{host, :up}]))
  end

  @impl true
  def host_removed(%__MODULE__{} = state, %Host{} = host) do
    key = if state.local_dc == host.data_center, do: :local_hosts, else: :remote_hosts

    update_in(state, [Access.key!(key)], fn hosts ->
      Enum.reject(hosts, fn {existing_host, _status} -> host_match?(existing_host, host) end)
    end)
  end

  @impl true
  def host_up(%__MODULE__{} = state, %Host{} = host) do
    key = if state.local_dc == host.data_center, do: :local_hosts, else: :remote_hosts

    update_in(state, [Access.key!(key)], fn hosts ->
      Enum.map(hosts, fn {existing_host, status} ->
        if host_match?(existing_host, host), do: {host, :up}, else: {existing_host, status}
      end)
    end)
  end

  @impl true
  def host_connected(%__MODULE__{} = state, %Host{} = host) do
    key = if state.local_dc == host.data_center, do: :local_hosts, else: :remote_hosts

    update_in(state, [Access.key!(key), Access.all()], fn {existing_host, status} ->
      if host_match?(existing_host, host), do: {host, :connected}, else: {existing_host, status}
    end)
  end

  @impl true
  def host_down(%__MODULE__{} = state, %Host{} = host) do
    key = if state.local_dc == host.data_center, do: :local_hosts, else: :remote_hosts

    update_in(state, [Access.key!(key)], fn hosts ->
      Enum.map(hosts, fn {existing_host, status} ->
        if host_match?(existing_host, host), do: {host, :down}, else: {existing_host, status}
      end)
    end)
  end

  @impl true
  def hosts_plan(%__MODULE__{} = state) do
    {local_hosts, state} = get_and_update_in(state.local_hosts, &slide/1)
    {remote_hosts, state} = get_and_update_in(state.remote_hosts, &slide/1)

    hosts =
      for {host, status} when status in [:up, :connected] <- local_hosts ++ remote_hosts, do: host

    {hosts, state}
  end

  @impl true
  def query_plan(%__MODULE__{} = state) do
    {local_hosts, state} = get_and_update_in(state.local_hosts, &slide/1)
    {remote_hosts, state} = get_and_update_in(state.remote_hosts, &slide/1)

    hosts = for {host, :connected} <- local_hosts ++ remote_hosts, do: host

    {hosts, state}
  end

  defp host_match?(%Host{} = host1, %Host{} = host2) do
    host1.address == host2.address and host1.port == host2.port
  end

  defp slide([]), do: {[], []}
  defp slide([head | rest] = list), do: {list, rest ++ [head]}

  # Made public for testing.
  @doc false
  def local_dc(%__MODULE__{local_dc: local_dc}), do: local_dc

  # Made public for testing.
  @doc false
  def hosts(%__MODULE__{local_hosts: hosts}, :local), do: hosts
  def hosts(%__MODULE__{remote_hosts: hosts}, :remote), do: hosts
end