defmodule ProcessHub.Service.ProcessRegistry do
@moduledoc """
The process registry service provides API functions for managing the process registry.
"""
@type registry() :: %{
ProcessHub.child_id() => {
ProcessHub.child_spec(),
[{node(), pid()}]
}
}
alias ProcessHub.Utility.Name
alias ProcessHub.Constant.Hook
alias ProcessHub.Service.HookManager
@doc "Returns information about all registered processes."
@spec registry(ProcessHub.hub_id()) :: registry()
def registry(hub_id) do
Name.registry(hub_id)
|> Cachex.export()
|> elem(1)
|> Enum.map(fn {:entry, key, _, _, value} -> {key, value} end)
|> Map.new()
end
@spec process_list(atom(), :global | :local) :: [
{ProcessHub.child_id(), [{node(), pid()}] | pid()}
]
def process_list(hub_id, :global) do
registry(hub_id)
|> Enum.map(fn {child_id, {_child_spec, nodes}} ->
{child_id, nodes}
end)
end
def process_list(hub_id, :local) do
local_node = node()
process_list(hub_id, :global)
|> Enum.map(fn {child_id, nodes} ->
{child_id, Keyword.get(nodes, local_node)}
end)
|> Enum.filter(fn {_, pid} -> pid end)
end
@spec contains_children(ProcessHub.hub_id(), [ProcessHub.child_id()]) :: [ProcessHub.child_id()]
@doc "Returns a list of child_ids that match the given `child_ids` variable."
def contains_children(hub_id, child_ids) do
Enum.reduce(registry(hub_id), [], fn {child_id, _}, acc ->
case Enum.member?(child_ids, child_id) do
true -> [child_id | acc]
false -> acc
end
end)
|> Enum.reverse()
end
@doc "Deletes all objects from the process registry."
@spec clear_all(ProcessHub.hub_id()) :: integer()
def clear_all(hub_id) do
{:ok, number_of_rows} =
Name.registry(hub_id)
|> Cachex.clear()
number_of_rows
end
@doc "Returns information on all processes that are running on the local node."
@spec local_data(ProcessHub.hub_id()) :: [
{ProcessHub.child_id(), {ProcessHub.child_spec(), [{node(), pid()}]}}
]
def local_data(hub_id) do
local_node = node()
registry(hub_id)
|> Enum.filter(fn {_, {_, nodes}} ->
Enum.member?(Keyword.keys(nodes), local_node)
end)
end
@doc "Returns a list of child specs registered under the local node."
@spec local_child_specs(ProcessHub.hub_id()) :: [ProcessHub.child_spec()]
def local_child_specs(hub_id) do
local_data(hub_id)
|> Enum.map(fn
{_, {child_spec, _}} -> child_spec
end)
end
@doc "Return the child_spec, nodes, and pids for the given child_id."
@spec lookup(ProcessHub.hub_id(), ProcessHub.child_id(), [table: atom()] | nil) ::
nil | {ProcessHub.child_spec(), [{node(), pid()}]}
def lookup(hub_id, child_id, opts) do
{:ok, result} =
Keyword.get(opts, :table, Name.registry(hub_id))
|> Cachex.get(child_id)
case result do
nil ->
nil
{_child_spec, _nodes} ->
result
end
end
def lookup(hub_id, child_id) do
lookup(hub_id, child_id, table: Name.registry(hub_id))
end
@doc """
Inserts information about a child process into the registry.
Calling this function will dispatch the `:registry_pid_insert_hook` hook unless the `:skip_hooks` option is set to `true`.
"""
@spec insert(ProcessHub.hub_id(), ProcessHub.child_spec(), [{node(), pid()}], keyword() | nil) ::
:ok
def insert(hub_id, child_spec, child_nodes, opts \\ []) do
Keyword.get(opts, :table, Name.registry(hub_id))
|> Cachex.put(child_spec.id, {child_spec, child_nodes})
unless Keyword.get(opts, :skip_hooks, false) do
HookManager.dispatch_hook(
hub_id,
Hook.registry_pid_inserted(),
{child_spec.id, child_nodes}
)
end
:ok
end
@doc """
Deletes information about a child process from the registry.
Calling this function will dispatch the `:registry_pid_remove_hook` hook unless the `:skip_hooks` option is set to `true`.
"""
@spec delete(ProcessHub.hub_id(), ProcessHub.child_id(), keyword() | nil) :: :ok
def delete(hub_id, child_id, opts \\ []) do
Keyword.get(opts, :table, Name.registry(hub_id))
|> Cachex.del(child_id)
unless Keyword.get(opts, :skip_hooks, false) do
HookManager.dispatch_hook(hub_id, Hook.registry_pid_removed(), child_id)
end
:ok
end
@doc """
Inserts information about multiple child processes into the registry.
Calling this function will dispatch the `:registry_pid_insert_hook` hook unless the `:skip_hooks` option is set to `true`.
"""
@spec bulk_insert(ProcessHub.hub_id(), %{
ProcessHub.child_id() => {ProcessHub.child_spec(), [{node(), pid()}]}
}) :: :ok
def bulk_insert(hub_id, children) do
res =
Cachex.transaction(Name.registry(hub_id), Map.keys(children), fn worker ->
Enum.map(children, fn {child_id, {child_spec, child_nodes}} ->
diff =
case lookup(hub_id, child_id, table: worker) do
nil ->
insert(hub_id, child_spec, child_nodes, skip_hooks: true, table: worker)
child_nodes
{_child_spec, existing_nodes} ->
merge_insert(child_nodes, existing_nodes, hub_id, child_spec, worker)
end
if is_list(diff) && length(diff) > 0 do
{Hook.registry_pid_inserted(), {child_spec.id, diff}}
end
end)
|> Enum.filter(&is_tuple/1)
end)
hooks =
case res do
{:ok, hooks} ->
hooks
{:error, reason} ->
raise reason
end
HookManager.dispatch_hooks(hub_id, hooks)
:ok
end
@doc """
Deletes information about multiple child processes from the registry.
Calling this function will dispatch the `:registry_pid_remove_hook` hooks unless
the `:skip_hooks` option is set to `true`.
"""
@spec bulk_delete(ProcessHub.hub_id(), %{
ProcessHub.child_id() => {ProcessHub.child_spec(), [{node(), pid()}]}
}) :: :ok
def bulk_delete(hub_id, children) do
{:ok, hooks} =
Cachex.transaction(Name.registry(hub_id), Map.keys(children), fn worker ->
Enum.map(children, fn {child_id, rem_nodes} ->
case lookup(hub_id, child_id, table: worker) do
nil ->
nil
{child_spec, nodes} ->
new_nodes =
Enum.filter(nodes, fn {node, _pid} ->
!Enum.member?(rem_nodes, node)
end)
if length(new_nodes) > 0 do
insert(hub_id, child_spec, new_nodes, skip_hooks: true, table: worker)
else
delete(hub_id, child_id, skip_hooks: true, table: worker)
end
{Hook.registry_pid_removed(), {child_id, rem_nodes}}
end
end)
|> Enum.reject(&is_nil/1)
end)
HookManager.dispatch_hooks(hub_id, hooks)
end
defp merge_insert(nodes_new, nodes_existing, hub_id, child_spec, worker) do
cond do
Enum.sort(nodes_new) !== Enum.sort(nodes_existing) ->
merged_data = Keyword.merge(nodes_existing, nodes_new)
insert(hub_id, child_spec, merged_data, skip_hooks: true, table: worker)
insert_diff(nodes_new, nodes_existing)
true ->
nil
end
end
defp insert_diff(nodes_new, nodes_existing) do
Enum.reduce(nodes_new, [], fn {node, pid}, acc ->
case nodes_existing[node] do
nil ->
[{node, pid} | acc]
existing_pid ->
if pid !== existing_pid do
[{node, pid} | acc]
else
acc
end
end
end)
end
end