lib/strategy/gossip.ex

defmodule Cluster.Strategy.Gossip do
  @moduledoc """
  This clustering strategy uses multicast UDP to gossip node names
  to other nodes on the network. These packets are listened for on
  each node as well, and a connection will be established between the
  two nodes if they are reachable on the network, and share the same
  magic cookie. In this way, a cluster of nodes may be formed dynamically.

  The gossip protocol is extremely simple, with a prelude followed by the node
  name which sent the packet. The node name is parsed from the packet, and a
  connection attempt is made. It will fail if the two nodes do not share a cookie.

  By default, the gossip occurs on port 45892, using the multicast address 233.252.1.32

  The gossip protocol is not encrypted by default, but can be by providing a secret
  in the configuration of the strategy (as shown below).
  This can also be used to run multiple clusters with the same multicast configuration,
  as nodes not sharing the same encryption key will not be connected.

  You may configure the multicast interface, multicast address, the interface address to bind to, the port,
  the TTL of the packets and the optional secret using the following settings:

      config :libcluster,
        topologies: [
          gossip_example: [
            strategy: #{__MODULE__},
            config: [
              port: 45892,
              if_addr: "0.0.0.0",
              multicast_if: "192.168.1.1",
              multicast_addr: "233.252.1.32",
              multicast_ttl: 1,
              secret: "somepassword"]]]

  A TTL of 1 will limit packets to the local network, and is the default TTL.

  Optionally, `broadcast_only: true` option can be set which disables multicast and
  only uses broadcasting. This limits connectivity to local network but works on in
  scenarios where multicast is not enabled. Use `multicast_addr` as the broadcast address.

  Example for broadcast only:

      config :libcluster,
        topologies: [
          gossip_example: [
            strategy: #{__MODULE__},
            config: [
              port: 45892,
              if_addr: "0.0.0.0",
              multicast_addr: "255.255.255.255",
              broadcast_only: true]]]

  Debug logging is deactivated by default for this clustering strategy, but it can be easily activated by configuring the application:

      use Mix.Config

      config :libcluster,
        debug: true

  All the checks are done at runtime, so you can flip the debug level without being forced to shutdown your node.
  """
  use GenServer
  use Cluster.Strategy
  import Cluster.Logger

  alias Cluster.Strategy.State

  @default_port 45892
  @default_addr {0, 0, 0, 0}
  @default_multicast_addr {233, 252, 1, 32}
  @sol_socket 0xFFFF
  @so_reuseport 0x0200

  def start_link(args) do
    GenServer.start_link(__MODULE__, args)
  end

  @impl true
  def init([%State{config: config} = state]) do
    port = Keyword.get(config, :port, @default_port)

    ip =
      config
      |> Keyword.get(:if_addr, @default_addr)
      |> sanitize_ip()

    broadcast_only? = Keyword.get(config, :broadcast_only, false)
    ttl = Keyword.get(config, :multicast_ttl, 1)

    multicast_if = Keyword.get(config, :multicast_if)

    multicast_addr =
      config
      |> Keyword.get(:multicast_addr, @default_multicast_addr)
      |> sanitize_ip()

    multicast_opts =
      cond do
        broadcast_only? ->
          []

        multicast_if != nil ->
          [
            multicast_if: sanitize_ip(multicast_if),
            multicast_ttl: ttl,
            multicast_loop: true,
            add_membership: {multicast_addr, sanitize_ip(multicast_if)}
          ]

        :else ->
          [
            multicast_ttl: ttl,
            multicast_loop: true,
            add_membership: {multicast_addr, {0, 0, 0, 0}}
          ]
      end

    options =
      [
        :binary,
        active: true,
        ip: ip,
        reuseaddr: true,
        broadcast: true
      ] ++ multicast_opts ++ reuse_port()

    {:ok, socket} = :gen_udp.open(port, options)

    secret = Keyword.get(config, :secret, nil)
    state = %State{state | :meta => {multicast_addr, port, socket, secret}}

    # TODO: Remove this version check when we deprecate OTP < 21 support
    if :erlang.system_info(:otp_release) >= '21' do
      {:ok, state, {:continue, nil}}
    else
      {:ok, state, 0}
    end
  end

  defp reuse_port() do
    case :os.type() do
      {:unix, os_name} ->
        cond do
          os_name in [:darwin, :freebsd, :openbsd, :linux, :netbsd] ->
            [{:raw, @sol_socket, @so_reuseport, <<1::native-32>>}]

          true ->
            []
        end

      _ ->
        []
    end
  end

  defp sanitize_ip(input) do
    case input do
      {_a, _b, _c, _d} = ip ->
        ip

      ip when is_binary(ip) ->
        {:ok, addr} = :inet.parse_ipv4_address(~c"#{ip}")
        addr
    end
  end

  # Send stuttered heartbeats
  # TODO: Remove this version check when we deprecate OTP < 21 support
  if :erlang.system_info(:otp_release) >= '21' do
    @impl true
    def handle_continue(_, state), do: handle_info(:heartbeat, state)
  else
    @impl true
    def handle_info(:timeout, state), do: handle_info(:heartbeat, state)
  end

  @impl true
  def handle_info(:heartbeat, %State{meta: {multicast_addr, port, socket, _}} = state) do
    debug(state.topology, "heartbeat")
    :gen_udp.send(socket, multicast_addr, port, heartbeat(node(), state))
    Process.send_after(self(), :heartbeat, :rand.uniform(5_000))
    {:noreply, state}
  end

  # Handle received heartbeats
  def handle_info(
        {:udp, _socket, _ip, _port, <<"heartbeat::", _::binary>> = packet},
        %State{meta: {_, _, _, secret}} = state
      )
      when is_nil(secret) do
    handle_heartbeat(state, packet)
    {:noreply, state}
  end

  def handle_info(
        {:udp, _socket, _ip, _port, <<iv::binary-size(16)>> <> ciphertext},
        %State{meta: {_, _, _, secret}} = state
      )
      when is_binary(secret) do
    case decrypt(state, ciphertext, secret, iv) do
      {:ok, plaintext} ->
        handle_heartbeat(state, plaintext)
        {:noreply, state}

      _ ->
        {:noreply, state}
    end
  end

  def handle_info({:udp, _socket, _ip, _port, _}, state) do
    {:noreply, state}
  end

  @impl true
  def terminate(_reason, %State{meta: {_, _, socket, _}}) do
    :gen_udp.close(socket)
    :ok
  end

  # Construct iodata representing packet to send
  defp heartbeat(node_name, %State{meta: {_, _, _, secret}})
       when is_nil(secret) do
    ["heartbeat::", :erlang.term_to_binary(%{node: node_name})]
  end

  defp heartbeat(node_name, %State{meta: {_, _, _, secret}} = state) when is_binary(secret) do
    message = "heartbeat::" <> :erlang.term_to_binary(%{node: node_name})
    {:ok, iv, msg} = encrypt(state, message, secret)

    [iv, msg]
  end

  # Upon receipt of a heartbeat, we check to see if the node
  # is connected to us, and if not, we connect to it.
  # If the connection fails, it's likely because the cookie
  # is different, and thus a node we can ignore
  @spec handle_heartbeat(State.t(), binary) :: :ok
  defp handle_heartbeat(%State{} = state, <<"heartbeat::", rest::binary>>) do
    self = node()
    connect = state.connect
    list_nodes = state.list_nodes
    topology = state.topology

    case :erlang.binary_to_term(rest) do
      %{node: ^self} ->
        :ok

      %{node: n} when is_atom(n) ->
        debug(state.topology, "received heartbeat from #{n}")
        Cluster.Strategy.connect_nodes(topology, connect, list_nodes, [n])
        :ok

      _ ->
        :ok
    end
  end

  defp handle_heartbeat(_state, _packet) do
    :ok
  end

  defp encrypt(_state, plaintext, password) do
    iv = :crypto.strong_rand_bytes(16)
    key = :crypto.hash(:sha256, password)
    ciphertext = :crypto.crypto_one_time(:aes_256_cbc, key, iv, pkcs7_pad(plaintext), true)

    {:ok, iv, ciphertext}
  end

  defp decrypt(state, ciphertext, password, iv) do
    key = :crypto.hash(:sha256, password)

    with {:unpadding, {:ok, padded}} <- {:unpadding, safe_decrypt(state, key, iv, ciphertext)},
         {:decrypt, {:ok, _plaintext} = res} <- {:decrypt, pkcs7_unpad(padded)} do
      res
    else
      {:unpadding, :error} -> {:error, :decrypt}
      {:decrypt, :error} -> {:error, :unpadding}
    end
  end

  defp safe_decrypt(state, key, iv, ciphertext) do
    try do
      {:ok, :crypto.crypto_one_time(:aes_256_cbc, key, iv, ciphertext, false)}
    catch
      :error, {tag, {file, line}, desc} ->
        warn(state.topology, "decryption failed: #{inspect(tag)} (#{file}:#{line}): #{desc}")
        :error
    end
  end

  #
  # Pads a message using the PKCS #7 cryptographic message syntax.
  #
  # from: https://github.com/izelnakri/aes256/blob/master/lib/aes256.ex
  #
  # See: https://tools.ietf.org/html/rfc2315
  # See: `pkcs7_unpad/1`
  defp pkcs7_pad(message) do
    bytes_remaining = rem(byte_size(message), 16)
    padding_size = 16 - bytes_remaining
    message <> :binary.copy(<<padding_size>>, padding_size)
  end

  #
  # Unpads a message using the PKCS #7 cryptographic message syntax.
  #
  # from: https://github.com/izelnakri/aes256/blob/master/lib/aes256.ex
  #
  # See: https://tools.ietf.org/html/rfc2315
  # See: `pkcs7_pad/1`
  defp pkcs7_unpad(<<>>), do: :error

  defp pkcs7_unpad(message) do
    padding_size = :binary.last(message)

    if padding_size <= 16 do
      message_size = byte_size(message)

      if binary_part(message, message_size, -padding_size) ===
           :binary.copy(<<padding_size>>, padding_size) do
        {:ok, binary_part(message, 0, message_size - padding_size)}
      else
        :error
      end
    else
      :error
    end
  end
end