lib/process_hub/strategy/synchronization/pubsub.ex

defmodule ProcessHub.Strategy.Synchronization.PubSub do
  @moduledoc """
  This PubSub synchronization strategy uses the `:blockade` library to
  dispatch and handle synchronization events. Each `ProcessHub` instance
  has its own event queue that is used to dispatch and handle synchronization events.
  """

  alias ProcessHub.Strategy.Synchronization.Base, as: SynchronizationStrategy
  alias ProcessHub.Utility.Name
  alias ProcessHub.Service.Synchronizer
  alias ProcessHub.Constant.Event
  alias ProcessHub.Constant.PriorityLevel
  alias :blockade, as: Blockade

  @typedoc """
  The PubSub synchronization strategy options.

  - `sync_interval` - the periodic synchronization interval in milliseconds. Defaults to `15000`.
  """
  @type t :: %__MODULE__{
          sync_interval: pos_integer()
        }
  defstruct sync_interval: 15000

  defimpl SynchronizationStrategy, for: ProcessHub.Strategy.Synchronization.PubSub do
    use Event

    @impl true
    def init(_strategy, _hub_id), do: nil

    @impl SynchronizationStrategy
    def propagate(_strategy, hub_id, children, node, :add, opts) do
      Blockade.dispatch_sync(
        Name.event_queue(hub_id),
        @event_children_registration,
        {children, node},
        %{
          priority: PriorityLevel.locked(),
          members: Keyword.get(opts, :members, :global)
        }
      )

      :ok
    end

    def propagate(_strategy, hub_id, child_ids, node, :rem, opts) do
      Blockade.dispatch_sync(
        Name.event_queue(hub_id),
        @event_children_unregistration,
        {child_ids, node},
        %{
          priority: PriorityLevel.locked(),
          members: Keyword.get(opts, :members, :global)
        }
      )

      :ok
    end

    @impl SynchronizationStrategy
    def handle_propagation(_strategy, _hub_id, _propagation_data, _type), do: :ok

    @impl SynchronizationStrategy
    def init_sync(strategy, hub_id, cluster_nodes) do
      local_data = Synchronizer.local_sync_data(hub_id)
      local_node = node()

      cluster_nodes
      |> Enum.filter(&(&1 !== local_node))
      |> Enum.each(fn node ->
        Node.spawn(node, fn ->
          GenServer.cast(
            Name.worker_queue(hub_id),
            {:handle_work,
             fn -> Synchronizer.exec_interval_sync(hub_id, strategy, local_data, local_node) end}
          )
        end)
      end)

      :ok
    end

    @impl SynchronizationStrategy
    def handle_synchronization(_strategy, hub_id, remote_data, remote_node) do
      Synchronizer.append_data(hub_id, %{remote_node => remote_data})
      Synchronizer.detach_data(hub_id, %{remote_node => remote_data})

      :ok
    end
  end
end