lib/strategy/kubernetes_dns_srv.ex

defmodule Cluster.Strategy.Kubernetes.DNSSRV do
  @default_polling_interval 5_000

  @moduledoc """
  This clustering strategy works by issuing a SRV query for the headless service where the StatefulSet
  containing your nodes is running.

  > This strategy requires deploying pods as a StatefulSet which is exposed by a headless service.
  > If you want to avoid that, you could use `Cluster.Strategy.Kubernetes.DNS`.

  It assumes that all Erlang nodes are using longnames - `<basename>@<domain>`:

  + all nodes are using the same `<basename>`
  + all nodes are using unique `<domain>`

  In `<basename>@<domain>`:

  + `<basename>` would be the value configured by `:application_name` option.
  + `<domain>` would be the value which is controlled by following options:
     - `:service`
     - `:namespace`
     - `:resolver`

  ## Getting `<basename>`

  As said above, the basename is configured by `:application_name` option.

  Just one thing to keep in mind - when building an OTP release, make sure that the name of the OTP
  release matches the name configured by `:application_name`.

  ## Getting `<domain>`

  > For more information, see the kubernetes stateful-application [documentation](https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/#using-stable-network-identities)

  ## Setup

  Getting this strategy to work requires:

  1. deploying pods as a StatefulSet (otherwise, hostname won't set for pods)
  2. exposing above StatefulSet by a headless service (otherwise, the SRV query won't work as expected)
  3. setting the name of Erlang node according to hostname of pods

  First, deploying pods as a StatefulSet which is exposed by a headless service. And here is an
  example of a corresponding Kubernetes definition:

  ```yaml
  apiVersion: v1
  kind: Service
  metadata:
    name: "myapp-headless"
    labels:
      app: myapp
  spec:
    ports:
    - port: 4000
      name: web
    clusterIP: None
    selector:
      app: myapp
  ---
  apiVersion: apps/v1
  kind: StatefulSet
  metadata:
    name: myapp
  spec:
    serviceName: "myapp-headless"
    replicas: 2
    selector:
      matchLabels:
        app: myapp
    template:
      metadata:
        labels:
          app: myapp
      spec:
        containers:
        - name: myapp
          image: myapp:v1.0.0
          imagePullPolicy: Always
          ports:
          - containerPort: 4000
            name: http
            protocol: TCP
  ```

  Then, set the name of Erlang node by using the hostname of pod. If you use mix releases, you
  can configure the required options in `rel/env.sh.eex`:

      # rel/env.sh.eex
      export RELEASE_DISTRIBUTION=name
      export RELEASE_NODE=<%= @release.name %>@$(hostname -f)

  ## Polling Interval

  The default interval to sync topologies is `#{@default_polling_interval}`
  (#{div(@default_polling_interval, 1000)} seconds). You can configure it with `:polling_interval` option.

  ## An example configuration

      config :libcluster,
        topologies: [
          erlang_nodes_in_k8s: [
            strategy: #{__MODULE__},
            config: [
              service: "myapp-headless",
              application_name: "myapp",
              namespace: "default",
              polling_interval: 10_000
            ]
          ]
        ]

  ## An example of how this strategy extracts topology information from DNS

  ```sh
  $ hostname -f
  myapp-1.myapp-headless.default.svc.cluster.local

  # An SRV query for a headless service returns multiple entries
  $ dig SRV myapp-headless.default.svc.cluster.local

  ; <<>> DiG 9.14.3 <<>> SRV myapp-headless.default.svc.cluster.local
  ;; global options: +cmd
  ;; Got answer:
  ;; WARNING: .local is reserved for Multicast DNS
  ;; You are currently testing what happens when an mDNS query is leaked to DNS
  ;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 7169
  ;; flags: qr aa rd ra; QUERY: 1, ANSWER: 2, AUTHORITY: 0, ADDITIONAL: 2

  ;; QUESTION SECTION:
  ;myapp-headless.default.svc.cluster.local. IN SRV

  ;; ANSWER SECTION:
  myapp-headless.default.svc.cluster.local. 30 IN SRV 10 50 0 myapp-0.myapp-headless.default.svc.cluster.local.
  myapp-headless.default.svc.cluster.local. 30 IN SRV 10 50 0 myapp-1.myapp-headless.default.svc.cluster.local.

  ;; ADDITIONAL SECTION:
  myapp-0.myapp-headless.default.svc.cluster.local. 30 IN A 10.1.0.95
  myapp--1.myapp-headless.default.svc.cluster.local. 30 IN A 10.1.0.96

  ;; Query time: 0 msec
  ;; SERVER: 10.96.0.10#53(10.96.0.10)
  ;; WHEN: Wed Jul 03 11:55:27 UTC 2019
  ;; MSG SIZE  rcvd: 167
  ```

  """
  use GenServer
  use Cluster.Strategy
  import Cluster.Logger

  alias Cluster.Strategy.State

  @impl true
  def start_link(args), do: GenServer.start_link(__MODULE__, args)

  @impl true
  def init([%State{meta: nil} = state]) do
    init([%State{state | :meta => MapSet.new()}])
  end

  def init([%State{} = state]) do
    {:ok, load(state), 0}
  end

  @impl true
  def handle_info(:timeout, state) do
    handle_info(:load, state)
  end

  def handle_info(:load, state) do
    {:noreply, load(state)}
  end

  def handle_info(_, state) do
    {:noreply, state}
  end

  defp load(%State{topology: topology, meta: meta} = state) do
    new_nodelist = MapSet.new(get_nodes(state))
    removed = MapSet.difference(meta, new_nodelist)

    new_nodelist =
      case Cluster.Strategy.disconnect_nodes(
             topology,
             state.disconnect,
             state.list_nodes,
             MapSet.to_list(removed)
           ) do
        :ok ->
          new_nodelist

        {:error, bad_nodes} ->
          # Add back the nodes which should have been removed, but which couldn't be for some reason
          Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc ->
            MapSet.put(acc, n)
          end)
      end

    new_nodelist =
      case Cluster.Strategy.connect_nodes(
             topology,
             state.connect,
             state.list_nodes,
             MapSet.to_list(new_nodelist)
           ) do
        :ok ->
          new_nodelist

        {:error, bad_nodes} ->
          # Remove the nodes which should have been added, but couldn't be for some reason
          Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc ->
            MapSet.delete(acc, n)
          end)
      end

    Process.send_after(
      self(),
      :load,
      polling_interval(state)
    )

    %State{state | :meta => new_nodelist}
  end

  @spec get_nodes(State.t()) :: [atom()]
  defp get_nodes(%State{topology: topology, config: config}) do
    app_name = Keyword.fetch!(config, :application_name)
    service = Keyword.fetch!(config, :service)
    namespace = Keyword.fetch!(config, :namespace)

    service_k8s_path =
      "#{service}.#{namespace}.svc.#{System.get_env("CLUSTER_DOMAIN", "cluster.local.")}"

    resolver = Keyword.get(config, :resolver, &:inet_res.getbyname(&1, :srv))

    cond do
      app_name != nil and service != nil ->
        headless_service = to_charlist(service_k8s_path)

        case resolver.(headless_service) do
          {:ok, {:hostent, _, _, :srv, _count, addresses}} ->
            parse_response(addresses, app_name)

          {:error, reason} ->
            error(
              topology,
              "#{inspect(headless_service)} : lookup against #{service} failed: #{inspect(reason)}"
            )

            []
        end

      app_name == nil ->
        warn(
          topology,
          "kubernetes.DNS strategy is selected, but :application_name is not configured!"
        )

        []

      service == nil ->
        warn(topology, "kubernetes strategy is selected, but :service is not configured!")
        []

      :else ->
        warn(topology, "kubernetes strategy is selected, but is not configured!")
        []
    end
  end

  defp polling_interval(%State{config: config}) do
    Keyword.get(config, :polling_interval, @default_polling_interval)
  end

  defp parse_response(addresses, app_name) do
    addresses
    |> Enum.map(&:erlang.list_to_binary(elem(&1, 3)))
    |> Enum.map(&"#{app_name}@#{&1}")
    |> Enum.map(&String.to_atom(&1))
  end
end