lib/cloister/monitor/fsm.ex

defmodule Cloister.Monitor.Fsm do
  @moduledoc false

  alias Cloister.Monitor, as: Mon
  alias Finitomata.State, as: State
  alias HashRing.Managed, as: Ring

  @fsm """
  down --> |quorum!| assembling
  assembling --> |assembled| rehashing
  rehashing --> |nonode| nonode
  rehashing --> |rehash| rehashing
  rehashing --> |rehash| ready
  rehashing --> |stop!| stopping
  ready --> |nonode| nonode
  ready --> |rehash| ready
  ready --> |rehash| rehashing
  ready --> |stop!| stopping
  nonode --> |rehash!| assembling
  stopping --> |stop!| stopped
  """

  use Finitomata, fsm: @fsm, auto_terminate: true, timer: 5_000

  @typep t :: Mon.t()

  @impl Finitomata
  def on_timer(:assembling, %State{payload: %Mon{} = mon}) do
    Node.alive?()
    |> assembly_quorum(mon)
    |> case do
      :wait -> :ok
      %Mon{} = mon -> {:transition, :assembled, mon}
    end
  end

  @impl Finitomata
  def on_timer(current, %State{payload: %Mon{ring: ring} = mon})
      when current in ~w|rehashing ready|a do
    {nodes, ring} = nodes_vs_ring(ring)
    if MapSet.equal?(nodes, ring), do: :ok, else: {:transition, :rehash, mon}
  end

  @impl Finitomata
  def on_transition(:*, :__start__, _, %Mon{} = state) do
    net_kernel_magic(node_type(), state.otp_app, state.monitor)
    {:ok, :down, state}
  end

  @impl Finitomata
  def on_transition(current, :rehash, _, %Mon{ring: ring, consensus: consensus} = state)
      when current in ~w|rehashing ready|a do
    {na, nr} = nodes_vs_ring(ring)

    na |> MapSet.difference(nr) |> Enum.each(&Ring.add_node(ring, &1))
    nr |> MapSet.difference(na) |> Enum.each(&Ring.remove_node(ring, &1))

    goto =
      case length(Ring.nodes(ring)) - consensus do
        neg when neg < 0 -> :rehashing
        pos when pos >= 0 -> :ready
      end

    {:ok, goto, state}
  end

  @impl Finitomata
  def on_enter(_entering, %Finitomata.State{
        history: [from | _],
        payload: %Mon{listener: listener} = state
      }) do
    listener.on_state_change(from, state)
  end

  @spec assembly_quorum(boolean(), state :: t()) :: :wait | t()
  @doc false
  defp assembly_quorum(true, %Mon{otp_app: otp_app, consensus: consensus} = state) do
    case active_sentry(otp_app, consensus) do
      [] ->
        :wait

      [_ | _] = active_sentry ->
        %Mon{
          state
          | alive?: true,
            sentry?: Enum.member?(active_sentry, node()),
            clustered?: true
        }
    end
  end

  @doc false
  defp assembly_quorum(false, %Mon{} = state),
    do: %Mon{state | alive?: true, sentry?: true, clustered?: false}

  @spec active_sentry(otp_app :: atom(), consensus :: pos_integer()) :: [node()]
  defp active_sentry(otp_app, consensus) do
    case Application.get_env(:cloister, :sentry, [node()]) do
      service when is_atom(service) ->
        case :inet_tcp.getaddrs(service) do
          {:ok, ip_list} ->
            for {_, _, _, _} = ip4_addr <- ip_list,
                sentry = :"#{otp_app}@#{ip_addr_to_s(ip4_addr)}",
                node() == sentry or Node.connect(sentry),
                do: sentry

          {:error, :nxdomain} ->
            Logger.warning("[🕸️ :#{node()}] Service not found: #{inspect(service)}.")

            case consensus do
              1 -> [node()]
              _ -> []
            end

          {:error, reason} ->
            Logger.warning("[🕸️ #{inspect(service)}] :#{node()} ❓: #{inspect(reason)}.")
            []
        end

      [_ | _] = node_list ->
        for sentry <- node_list,
            node() == sentry or Node.connect(sentry),
            do: sentry
    end
  end

  @spec net_kernel_magic(type :: Mon.node_type(), otp_app :: atom(), monitor :: module()) :: :ok
  defp net_kernel_magic(:longnames, _otp_app, monitor),
    do: send(monitor, :monitor_nodes)

  defp net_kernel_magic(:shortnames, _otp_app, monitor),
    do: send(monitor, :monitor_nodes)

  defp net_kernel_magic(type, otp_app, monitor) do
    {type, _} =
      maybe_host =
      with service when is_atom(service) <- Application.fetch_env!(:cloister, :sentry),
           {:ok, s_ips} <- :inet_tcp.getaddrs(service),
           {:ok, l_ips} <- :inet.getifaddrs() do
        maybe_ips =
          for {_, l_ip_info} <- l_ips,
              l_ip_info_addr = l_ip_info[:addr],
              ^l_ip_info_addr <- s_ips,
              do: ip_addr_to_s(l_ip_info_addr)

        case maybe_ips do
          [] ->
            Logger.warning("[🕸️ :#{node()}] IP could not be found, retrying.")
            net_kernel_magic(type, otp_app, monitor)

          [ip | _] ->
            Logger.debug("[🕸️ :#{node()}] IP found: #{ip}")
            {:longnames, ip}
        end
      else
        expected when expected == {:error, :nxdomain} or is_list(expected) ->
          magic? = Application.get_env(:cloister, :magic?, :longnames)

          case {magic?, :inet.getifaddrs()} do
            {falsey, _} when falsey in [false, :nohost] ->
              {:skip, :magic_disabled_in_config}

            {truthy, {:ok, ip_addrs}} when truthy in [true, :longnames] and is_list(ip_addrs) ->
              pick_up_addr(ip_addrs)

            _ ->
              with {:ok, shortname} <- :inet.gethostname(), do: {:shortnames, shortname}
          end

        other ->
          {:skip, other}
      end

    node_restart(maybe_host, otp_app)
    net_kernel_magic(type, otp_app, monitor)
  end

  defp loopback?, do: Application.get_env(:cloister, :loopback?, false)

  @spec ip_addr_to_s(:inet.ip4_address()) :: binary()
  defp ip_addr_to_s({a, b, c, d}), do: "#{a}.#{b}.#{c}.#{d}"

  @spec pick_up_addr([{[binary()], [any()]}]) :: {:longnames, binary()} | {:skip, any()}
  defp pick_up_addr(addrs) do
    loopback = if loopback?(), do: loopback(addrs)

    case loopback || point_to_point(addrs) || broadcast(addrs) do
      addr when is_binary(addr) -> {:longnames, addr}
      _other -> {:skip, {:unfit, addrs}}
    end
  end

  # second type http://erlang.org/doc/man/inet.html#type-getifaddrs_ifopts
  @spec loopback([{binary(), any()}]) :: binary() | nil
  defp loopback(addrs) do
    case Enum.filter(addrs, fn {_, addr} -> :loopback in addr[:flags] end) do
      [] -> nil
      [{_, addr} | _] -> ip_addr_to_s(addr[:addr])
      _many -> with {:ok, host} <- :inet.gethostname(), do: host
    end
  end

  # second type http://erlang.org/doc/man/inet.html#type-getifaddrs_ifopts
  @spec point_to_point([{binary(), any()}]) :: binary() | nil
  defp point_to_point(addrs) do
    case Enum.filter(addrs, fn {_, addr} -> :pointtopoint in addr[:flags] end) do
      [] -> nil
      [{_, addr} | _] -> ip_addr_to_s(addr[:addr])
      _many -> with {:ok, host} <- :inet.gethostname(), do: host
    end
  end

  # second type http://erlang.org/doc/man/inet.html#type-getifaddrs_ifopts
  @spec broadcast([{binary(), any()}]) :: binary()
  defp broadcast(addrs) do
    case Enum.filter(addrs, fn {_, addr} -> :broadcast in addr[:flags] end) do
      [{_, addr} | _] -> ip_addr_to_s(addr[:addr])
      _any -> with {:ok, host} <- :inet.gethostname(), do: host
    end
  end

  @spec node_restart(
          {:shortnames, binary()} | {:longnames, binary()} | {:skip, any()},
          otp_app :: atom()
        ) ::
          {:ok, pid()} | {:error, term()}
  defp node_restart({:skip, any}, _otp_app) do
    Logger.warning("[🕸️ :#{node()}] skipping restart, expected host, got: [#{inspect(any)}].")
    {:error, any}
  end

  defp node_restart({type, host}, otp_app) when type in [:shortnames, :longnames] do
    stopped = Node.stop()

    Logger.info(
      "[🕸️ :#{node()}] stopped: [#{inspect(stopped)}], starting as: [#{otp_app}@#{host}]."
    )

    Node.start(:"#{otp_app}@#{host}", type)
  end

  @spec node_type :: Mon.node_type()
  defp node_type do
    case node() do
      :nonode@nohost ->
        :nonode

      name ->
        name
        |> Atom.to_string()
        |> String.split("@")
        |> List.last()
        |> String.contains?(".")
        |> if(do: :longnames, else: :shortnames)
    end
  end

  @spec nodes_vs_ring(atom()) :: {MapSet.t(node()), MapSet.t(node())}
  defp nodes_vs_ring(ring),
    do: {MapSet.new([node() | Node.list()]), MapSet.new(Ring.nodes(ring))}
end