lib/process_hub/strategy/partition_tolerance/dynamic_quorum.ex

defmodule ProcessHub.Strategy.PartitionTolerance.DynamicQuorum do
  @moduledoc """
  The dynamic quorum strategy provides a dynamic way to handle network partitions in the `ProcessHub` cluster.

  This strategy is suitable for clusters that do not know the number of nodes in the cluster beforehand.

  The dynamic quorum strategy uses a cache to store the quorum log. The quorum log is a list of tuples
  containing the node name and the timestamp when the connection to the node was lost.
  This log is used to calculate the quorum size, which is measured in percentage.
  If the calculated quorum size is less than the `quorum_size` defined in the configuration,
  the `ProcessHub` will be considered to be in a network partition.
  In such a case, the `ProcessHub` will terminate its distributed supervisor process,
  `ProcessHub.DistributedSupervisor`, and all its children. Also, the local event queue will be
  locked by increasing the priority level.

  The quorum size is calculated by the following formula:

      quorum_size = (connected_nodes / (connected_nodes + down_nodes)) * 100

      # The `connected_nodes` are the number of nodes that are currently connected to the `ProcessHub` cluster.
      # The `down_nodes` are the number of nodes that are listed under the quorum log.

  The `threshold_time` is used to determine how long a node should be listed in the quorum log
  before it is removed from the quorum log. This means if we have a `threshold_time` of 10 seconds,
  and we lose one node every 10 seconds until we have lost all nodes, the system won't be
  considered to be in a network partition, regardless of whether the lost nodes are lost due to network
  partition or not. If we lose all those nodes in less than 10 seconds, the system will be considered
  to be in a network partition.

  > #### `Scaling down` {: .error}
  > When scaling down the cluster, make sure to scale down one node at a time
  > and wait for the `threshold_time` before scaling down the next node.
  >
  > Do not scale down too many nodes at once because the system may think that it is in a network partition
  > and terminate the `ProcessHub.DistributedSupervisor` process and all its children.
  """

  alias ProcessHub.Strategy.PartitionTolerance.Base, as: PartitionToleranceStrategy
  alias ProcessHub.Service.LocalStorage
  alias ProcessHub.Service.State

  @typedoc """
  Dynamic quorum strategy configuration.

  - `quorum_size` - The quorum size is measured in percentage. This is the required quorum size.
  For example, `50` means that if we have 10 nodes in the cluster, the system will be
  considered to be in a network partition if we lose 6 (60% of the cluster) nodes or more within
  the `threshold_time` period.
  - `threshold_time` - The threshold time is measured in seconds. This is how long a node should be
  listed in the quorum log before it is removed from the quorum log.
  """
  @type t :: %__MODULE__{
          quorum_size: non_neg_integer(),
          threshold_time: non_neg_integer()
        }
  @enforce_keys [:quorum_size]
  defstruct [:quorum_size, threshold_time: 30]

  defimpl PartitionToleranceStrategy, for: ProcessHub.Strategy.PartitionTolerance.DynamicQuorum do
    @quorum_cache_key :dynamic_quorum_node_down

    @spec handle_startup(
            ProcessHub.Strategy.PartitionTolerance.DynamicQuorum.t(),
            ProcessHub.hub_id(),
            [node()]
          ) :: :ok
    def handle_startup(_strategy, _hub_id, _cluster_nodes), do: :ok

    @spec handle_node_up(
            ProcessHub.Strategy.PartitionTolerance.DynamicQuorum.t(),
            ProcessHub.hub_id(),
            node(),
            [node()]
          ) :: :ok
    def handle_node_up(strategy, hub_id, node, cluster_nodes) do
      node_log_func(strategy, hub_id, node, :up)

      with false <- quorum_failure?(hub_id, strategy, cluster_nodes) do
        State.toggle_quorum_success(hub_id)
      else
        _any ->
          nil
      end

      propagate_quorum_log(hub_id, node)

      :ok
    end

    @spec handle_node_down(
            ProcessHub.Strategy.PartitionTolerance.DynamicQuorum.t(),
            ProcessHub.hub_id(),
            node(),
            [node()]
          ) :: :ok
    def handle_node_down(strategy, hub_id, node, cluster_nodes) do
      node_log_func(strategy, hub_id, node, :down)

      case quorum_failure?(hub_id, strategy, cluster_nodes) do
        true ->
          State.toggle_quorum_failure(hub_id)

        false ->
          nil
      end

      :ok
    end

    defp propagate_quorum_log(hub_id, node) do
      local_log =
        case LocalStorage.get(hub_id, @quorum_cache_key) do
          nil -> []
          data -> data
        end

      Node.spawn(node, fn ->
        remote_log =
          case LocalStorage.get(hub_id, @quorum_cache_key) do
            nil -> []
            remote_log -> remote_log
          end

        if length(local_log) > length(remote_log) do
          # Overwrite quorum log with other nodes' data that has more data in it.
          LocalStorage.insert(hub_id, @quorum_cache_key, local_log)
        end
      end)
    end

    defp quorum_failure?(hub_id, strategy, cluster_nodes) do
      connected_nodes = length(cluster_nodes)

      cached_data =
        case LocalStorage.get(hub_id, @quorum_cache_key) do
          nil -> []
          data -> data
        end

      down_nodes = length(cached_data)

      lost_quorum = down_nodes / (connected_nodes + down_nodes) * 100
      quorum_left = 100 - lost_quorum

      quorum_left < strategy.quorum_size
    end

    defp node_log_func(strategy, hub_id, node, type) do
      new_data = node_log_data(hub_id, node, strategy.threshold_time, type)

      LocalStorage.insert(hub_id, @quorum_cache_key, new_data)
    end

    defp node_log_data(cache, node, threshold_time, :up) do
      timestamp = DateTime.utc_now() |> DateTime.to_unix(:second)
      threshold_time = timestamp - threshold_time

      case LocalStorage.get(cache, @quorum_cache_key) do
        nil ->
          []

        data ->
          # Remove rows older than threshold_time and the same node
          Enum.filter(data, fn {cache_node, time} ->
            time > threshold_time && cache_node != node
          end)
      end
    end

    defp node_log_data(ets_table, node, threshold_time, :down) do
      timestamp = DateTime.utc_now() |> DateTime.to_unix(:second)
      threshold_time = timestamp - threshold_time

      case LocalStorage.get(ets_table, @quorum_cache_key) do
        nil ->
          [{node, timestamp}]

        data ->
          # Remove rows older than threshold_time and the same node
          filtered_data =
            Enum.filter(data, fn {cache_node, time} ->
              time > threshold_time && cache_node != node
            end)

          # Add a new row.
          [{node, timestamp} | filtered_data]
      end
    end
  end
end