defmodule Stoker.Activator do
require Logger
@moduledoc """
This GenServer is used to make sure that a process will
always - and once - be active in the cluster.
The way this works is that we use the `:global` naming
service as a mutex - if the name {Activator, YourModule} can be registered,
then this server runs the "correct" activator.
If it cannot be registered, this means somebody else is holding it
- so we put a monitor on the process who actually registered the name,
and when it dies, we try registering instead.
On every application, its Activator (or Activators) are always
started once per server, so they "idle" ones will just be waiting
for their turn to take over. If a new server joins a healthy cluster,
its Activator will simply be waiting.
Once achieved, the only way to lose one's :leader status is by
death, or disconnection from the main cluster.
TODO: Decommissioning and cluster limits
## The Stoker callback
A module implementing the Stoker callback will be called
when some of these events happen:
- A GenServer becomes master
- A node joins or leaves the cluster
- Every few minutes
## The state
This GenServer has a state that can be queried by using the
`dump/1` system call.
It contains:
- `module`: the Stoker module to be called back. This is the
same module that the Activator is registered under.
- `mod_state`: a provate state for the Stoker module. This state is
per-machine - this means that when another node becomes :leader,
- `state`: :unk,
- `created_at`: When this GenServer was started (even if not :leader)
- `active_from`: When this GenServer became :leader, or nil
- `leader_pid`: The current PID, when :leader
- `monitor_leader`: The monitor reference to the current :leader, if not :leader
- `current_node`: The name of the current node.
"""
use GenServer, restart: :permanent
require Logger
@type level :: :info | :warning | :error
def start_link(module) when is_atom(module) do
GenServer.start_link(__MODULE__, %{module: module})
end
def dump(p) when is_pid(p) do
GenServer.call(p, :dump)
end
def dump(module) when is_atom(module) do
GenServer.call(whereis(module), :dump)
end
def whereis(module) when is_atom(module) do
:global.whereis_name({__MODULE__, module})
end
@impl true
def init(%{module: module}) do
Process.flag(:trap_exit, true)
Logger.warn("Starting #{__MODULE__} relying on #{i(module)}")
{:ok, initial_mod_state} = module.init()
next_timer =
initial_mod_state
|> module.next_timer_in()
|> build_timer()
state = %{
module: module,
mod_state: initial_mod_state,
state: :unk,
current_node: Node.self(),
created_at: DateTime.utc_now(),
active_from: nil,
leader_pid: nil,
leader_ref: nil,
timer_ref: next_timer
}
{:ok, register(state)}
end
@impl true
def handle_call(:dump, _from, state) do
{:reply, state, state}
end
@impl true
def handle_info({:DOWN, _ref, :process, _, _}, %{leader_ref: _mon_ref} = state) do
{:noreply, register(state)}
end
def handle_info({:EXIT, _pid, :name_conflict}, state) do
# :ok = Supervisor.stop(pid, :shutdown)
{:stop, {:shutdown, :name_conflict}, state}
end
def handle_info({:nodeup, _} = msg, state),
do: cluster_changed(state, msg)
def handle_info({:nodedown, _} = msg, state),
do: cluster_changed(state, msg)
def handle_info(:tick, %{module: module} = state) do
new_state = event(state, :timer, :none)
next_timer =
new_state
|> module.next_timer_in()
|> build_timer()
{:noreply, %{new_state | timer_ref: next_timer}, :hibernate}
end
@impl true
def terminate(reason, state) do
# :ok = Supervisor.stop(pid, reason)
new_state = event(state, :shutdown, reason)
{reason, new_state}
end
defp cluster_changed(state, change_reason) do
new_state = event(state, :cluster_change, change_reason)
{:noreply, new_state, :hibernate}
end
defp name(%{module: module}) do
{__MODULE__, module}
end
defp handle_conflict(_name, pid1, pid2) do
Process.exit(pid2, :name_conflict)
pid1
end
defp register(state) do
case :global.register_name(name(state), self(), &handle_conflict/3) do
:yes -> started(state)
:no -> monitor(state)
end
end
defp started(state) do
Logger.warn("Became Dear Leader")
# see https://stackoverflow.com/questions/49260444/in-elixir-how-can-i-get-notified-when-a-node-joins-or-leaves-the-cluster
:net_kernel.monitor_nodes(true)
new_state = event(state, :now_leader)
%{new_state | state: :leader, active_from: DateTime.utc_now()}
end
defp monitor(%{module: module} = state) do
case whereis(module) do
:undefined ->
register(state)
pid ->
with leader_ref <- Process.monitor(pid) do
Logger.warn("Is currently a follower of #{i(pid)}")
%{state | state: :follower, leader_pid: pid, leader_ref: leader_ref}
end
end
end
@spec event(
%{:mod_state => any, :module => atom, optional(any) => any},
Stoker.activator_event(),
any
) :: %{
:mod_state => any,
:module => atom,
optional(any) => any
}
def event(%{module: module, mod_state: mod_state} = state, event, reason \\ :none) do
{:ok, new_mod_state} = module.event(mod_state, event, reason)
%{state | mod_state: new_mod_state}
end
defp i(term), do: inspect(term)
def build_timer(t) when is_integer(t) and t > 0 do
Process.send_after(self(), :tick, t)
end
def build_timer(_), do: nil
end