defmodule Bonny.Operator.LeaderElector do
@moduledoc """
The leader uses a [Kubernetes
Lease](https://kubernetes.io/docs/concepts/architecture/leases/) to make sure
the operator only runs on one single replica (the leader) at the same time.
## Enabling the Leader Election
> #### Functionality still in Beta {: .warning}
>
> The leader election is still being tested. Enable it for testing purposes
> only and please report any issues on Github.
To enable leader election you have to pass the `enable_leader_election: true` option when [adding the operator to your Supervisor](#adding-the-operator-to-your-supervisor):
```elixir
defmodule MyOperator.Application do
use Application
def start(_type, env: env) do
children = [
{MyOperator.Operator,
conn: MyOperator.K8sConn.get!(env),
watch_namespace: :all,
enable_leader_election: true} # <-- starts the leader elector
]
opts = [strategy: :one_for_one, name: MyOperator.Supervisor]
Supervisor.start_link(children, opts)
end
end
```
"""
use GenServer
import YamlElixir.Sigil
require Logger
# lease_duration is the duration that non-leader candidates will
# wait to force acquire leadership. This is measured against time of
# last observed ack.
@lease_duration 15
# renew_deadline is the duration that the acting master will retry
# refreshing leadership before giving up.
@renew_deadline 10
# retry_period is the duration the LeaderElector clients should wait
# between tries of actions.
@retry_period 2
defstruct [:controllers, :operator, :init_args, :conn, operator_pid: nil]
@spec start_link(controllers :: list(), operator :: atom(), init_args :: Keyword.t()) ::
{:ok, pid}
def start_link(controllers, operator, init_args) do
{:ok, pid} = GenServer.start_link(__MODULE__, {controllers, operator, init_args})
send(pid, :maybe_acquire_leadership)
{:ok, pid}
end
@impl true
def init({controllers, operator, init_args}) do
conn = Keyword.fetch!(init_args, :conn)
{:ok,
struct!(__MODULE__,
controllers: controllers,
operator: operator,
conn: conn,
init_args: init_args
)}
end
@impl true
def handle_info(:maybe_acquire_leadership, state) do
am_i_leader? = not is_nil(state.operator_pid)
Logger.debug("{Operator=#{inspect(state.operator)}} - Starting leadership evaluation",
library: :bonny
)
state =
case acquire_or_renew(state.conn, state.operator) do
:ok when am_i_leader? ->
Logger.debug(
"{Operator=#{inspect(state.operator)}} - I am the leader - I stay the leader.",
library: :bonny
)
state
:ok ->
Logger.debug(
"{Operator=#{inspect(state.operator)}} - I am the new leader. Starting the operator.",
library: :bonny
)
{:ok, pid} =
Bonny.Operator.Supervisor.start_link(
state.controllers,
state.operator,
state.init_args
)
ref = Process.monitor(pid)
struct!(state, operator_pid: {pid, ref})
_other when am_i_leader? ->
Logger.debug(
"{Operator=#{inspect(state.operator)}} - I was the leader but somebody else took over leadership. Terminating operator.",
library: :bonny
)
{pid, _ref} = state.operator_pid
Process.exit(pid, :shutdown)
struct!(state, operator_pid: nil)
_other ->
Logger.debug("{Operator=#{inspect(state.operator)}} - Somebody else is the leader.",
library: :bonny
)
state
end
timeout = if is_nil(state.operator_pid), do: @retry_period, else: @renew_deadline
Process.send_after(self(), :maybe_acquire_leadership, timeout * 1000)
{:noreply, state}
end
def handle_info(
{:DOWN, ref, :process, pid, _reason},
%__MODULE__{operator_pid: {pid, ref}} = state
) do
Logger.warning(
"{Operator=#{inspect(state.operator)}} - Uh-oh! Our operator just went down. Guess that means I have to give up leadership. Boohoo!",
library: :bonny
)
release(state.conn, state.operator)
struct!(state, operator_pid: nil)
end
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
Logger.warning(
"{Operator=#{inspect(state.operator)}} - Very strange. A process I'm monitoring went down. But I'm not the leader. Looks like a bug in Bonny. Anyway, releaseing the lock if I have it.",
library: :bonny
)
release(state.conn, state.operator)
struct!(state, operator_pid: nil)
end
@impl true
def terminate(_, %__MODULE__{operator_pid: {pid, _ref}} = state) do
Logger.debug(
"{Operator=#{inspect(state.operator)}} - I'm going down - releasing the lock now.",
library: :bonny
)
release(state.conn, state.operator)
Process.exit(pid, :shutdown)
struct!(state, operator_pid: nil)
end
def terminate(_, state) do
Logger.debug(
"{Operator=#{inspect(state.operator)}} - I'm going down but I'm not the leader so chill!"
)
state
end
defp release(conn, operator) do
my_name = Bonny.Config.instance_name()
case get_lease(conn, operator) do
{:error, _} ->
:ok
{:ok, %{"spec" => %{"holderIdentity" => ^my_name}} = old_lease} ->
old_lease
|> put_in(~w(spec leaseDurationSeconds), 1)
|> Bonny.Resource.apply(conn, [])
:ok
_ ->
:ok
end
end
defp acquire_or_renew(conn, operator) do
now = DateTime.utc_now()
my_lease = lease(now, @lease_duration, operator)
case get_lease(conn, operator) do
{:error, %K8s.Client.APIError{reason: "NotFound"}} ->
Logger.debug("{Operator=#{inspect(operator)}} - Lease not found. Trying to create it.",
library: :bonny
)
result =
K8s.Client.create(my_lease)
|> K8s.Client.put_conn(conn)
|> K8s.Client.run()
case result do
{:ok, _} ->
Logger.debug("{Operator=#{inspect(operator)}} - Lease successfully created.",
library: :bonny
)
:ok
{:error, %K8s.Client.APIError{reason: "AlreadyExists"}} ->
Logger.debug(
"{Operator=#{inspect(operator)}} - Failed creating lease. Seems to have been created by somebody else in the meantime.",
library: :bonny
)
:locked
end
{:ok, old_lease} ->
if locked_by_sbdy_else?(now, old_lease, my_lease) do
Logger.debug(
~s({Operator=#{inspect(operator)}} - Lock is held by "#{old_lease["spec"]["holderIdentity"]}" and has not yet expired.),
library: :bonny
)
:locked
else
my_lease =
if old_lease["spec"]["holderIdentity"] == my_lease["spec"]["holderIdentity"] do
Logger.debug(
"{Operator=#{inspect(operator)}} - I'm holding the lock. Trying to renew it",
library: :bonny
)
my_lease
|> put_in(~w(spec acquireTime), old_lease["spec"]["acquireTime"])
|> put_in(~w(metadata resourceVersion), old_lease["metadata"]["resourceVersion"])
else
Logger.debug(
~s({Operator=#{inspect(operator)}} - Lock is held by "#{old_lease["spec"]["holderIdentity"]}" but has expired. Trying to acquire it.),
library: :bonny
)
my_lease
|> put_in(~w(metadata resourceVersion), old_lease["metadata"]["resourceVersion"])
end
# credo:disable-for-next-line
case Bonny.Resource.apply(my_lease, conn, []) do
{:ok, _} ->
Logger.debug(
~s({Operator=#{inspect(operator)}} - Lock successfully acquired/renewed.),
library: :bonny
)
:ok
{:error, exception} when is_exception(exception) ->
Logger.debug(
~s({Operator=#{inspect(operator)}} - Failed aquiring/renewing the lock. #{Exception.message(exception)}),
library: :bonny
)
:error
end
end
end
end
defp locked_by_sbdy_else?(now, %{"spec" => old_lease_spec}, my_lease) do
{:ok, last_renew, 0} = DateTime.from_iso8601(old_lease_spec["renewTime"])
time_of_expiration = DateTime.add(last_renew, old_lease_spec["leaseDurationSeconds"])
String.length(old_lease_spec["holderIdentity"]) > 0 and
old_lease_spec["holderIdentity"] != my_lease["spec"]["holderIdentity"] and
DateTime.compare(time_of_expiration, now) == :gt
end
defp lease_name(operator) do
operator_hash =
:crypto.hash(:md5, Atom.to_string(operator)) |> Base.encode16() |> String.downcase()
"#{Bonny.Config.namespace()}-#{Bonny.Config.name()}-#{operator_hash}"
end
defp get_lease(conn, operator) do
K8s.Client.get("coordination.k8s.io/v1", "Lease",
name: lease_name(operator),
namespace: Bonny.Config.namespace()
)
|> K8s.Client.put_conn(conn)
|> K8s.Client.run()
end
defp lease(now, lease_duration, operator) do
~y"""
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
name: #{lease_name(operator)}
namespace: #{Bonny.Config.namespace()}
spec:
holderIdentity: #{Bonny.Config.instance_name()}
leaseDurationSeconds: #{lease_duration}
renewTime: #{DateTime.to_iso8601(now)}
acquireTime: #{DateTime.to_iso8601(now)}
"""
end
end