lib/registry/registry.ex

# Copyright(c) 2015-2023 ACCESS CO., LTD. All rights reserved.

use Croma

defmodule Antikythera.Registry do
  alias Antikythera.{GearName, TenantId, Context}
  alias Antikythera.ExecutorPool.Id, as: EPoolId
  alias __MODULE__

  @type name :: {:gear, GearName.t(), String.t()} | {:tenant, TenantId.t(), String.t()}

  @doc false
  defun make_name(epool_id_or_context :: v[EPoolId.t() | Context.t()], name :: v[String.t()]) ::
          name do
    {gear_or_tenant, id} =
      case epool_id_or_context do
        %Context{executor_pool_id: epool_id} -> epool_id
        epool_id -> epool_id
      end

    {gear_or_tenant, id, name}
  end

  defmodule Unique do
    @moduledoc """
    A global (cluster-wide) process registry to implement 1-to-1 process communications.

    Process names can be arbitrary string as long as it's unique within an executor pool.
    Each process is not allowed to have more than one name.
    When the registered process dies its name will be automatically removed from the registry.

    Note that the uniqueness of names is not strictly checked;
    in case of race conditions of simultaneous registrations on different nodes,
    multiple processes successfully register the same name.
    The conflict will be resolved by choosing a single process and killing the others.
    To avoid troubles with this kind of naming conflicts,
    it's recommended to use intrinsically unique IDs such as login IDs of clients.
    """

    defun register(name :: v[String.t()], epool_id_or_context :: v[EPoolId.t() | Context.t()]) ::
            :ok | {:error, :taken | :pid_already_registered} do
      :syn.register(:antikythera, Registry.make_name(epool_id_or_context, name), self())
    end

    defun send_message(
            name :: v[String.t()],
            epool_id_or_context :: v[EPoolId.t() | Context.t()],
            message :: any
          ) :: boolean do
      case :syn.lookup(:antikythera, Registry.make_name(epool_id_or_context, name)) do
        :undefined ->
          false

        {pid, _meta} ->
          send(pid, message)
          true
      end
    end
  end

  defmodule Group do
    @moduledoc """
    A global (cluster-wide) process registry for implementing publisher-subscriber communication pattern.

    In this registry you can register multiple processes with the same name.
    Then you can broadcast a message to the group of processes having the same name.
    Group names can be arbitrary string.
    Each process can join multiple groups at the same time.
    When the registered process dies its pid will be automatically removed from all the groups that the process has joined.
    """

    defun join(name :: v[String.t()], epool_id_or_context :: v[EPoolId.t() | Context.t()]) :: :ok do
      :syn.join(:antikythera, Registry.make_name(epool_id_or_context, name), self())
    end

    defun leave(name :: v[String.t()], epool_id_or_context :: v[EPoolId.t() | Context.t()]) ::
            :ok | {:error, :pid_not_in_group} do
      :syn.leave(:antikythera, Registry.make_name(epool_id_or_context, name), self())
    end

    defun publish(
            name :: v[String.t()],
            epool_id_or_context :: v[EPoolId.t() | Context.t()],
            message :: any
          ) :: non_neg_integer do
      {:ok, count} =
        :syn.publish(:antikythera, Registry.make_name(epool_id_or_context, name), message)

      count
    end
  end
end