lib/strategy/kubernetes.ex

defmodule Cluster.Strategy.Kubernetes do
  @default_polling_interval 5_000
  @kubernetes_master "kubernetes.default.svc"
  @service_account_path "/var/run/secrets/kubernetes.io/serviceaccount"

  @moduledoc """
  This clustering strategy works by fetching information of endpoints or pods, which are filtered by
  given Kubernetes namespace and label.

  > This strategy requires a service account with the ability to list endpoints or pods. If you want
  > to avoid that, you could use one of the DNS-based strategies instead.
  >
  > See `Cluster.Strategy.Kubernetes.DNS` and `Cluster.Strategy.Kubernetes.DNSSRV`.

  It assumes that all Erlang nodes are using longnames - `<basename>@<ip_or_domain>`:

  + all nodes are using the same `<basename>`
  + all nodes are using unique `<ip_or_domain>`

  In `<basename>@<ip_or_domain>`:

  + `<basename>` would be the value configured by `:kubernetes_node_basename` option.
  + `<ip_or_domain>` would be the value which is controlled by following options:
     - `:kubernetes_namespace`
     - `:kubernetes_selector`
     - `:kubernetes_service_name`
     - `:kubernetes_ip_lookup_mode`
     - `:mode`

  ## Getting `<basename>`

  As said above, the basename is configured by `:kubernetes_node_basename` option.

  Just one thing to keep in mind - when building an OTP release, make sure that the name of the OTP
  release matches the name configured by `:kubernetes_node_basename`.

  ## Getting `<ip_or_domain>`

  ### `:kubernetes_namespace` and `:kubernetes_selector` option

  These two options configure how to filter required endpoints or pods.

  ### `:kubernetes_ip_lookup_mode` option

  These option configures where to lookup the required IP.

  Available values:

  + `:endpoints` (default)
  + `:pods`

  #### :endpoints

  When setting this value, this strategy will lookup IP from endpoints.

  In order for your endpoints to be found they should be returned when you run:

      kubectl get endpoints -l app=myapp

  Then, this strategy will fetch the addresses of all endpoints with that label and attempt to
  connect.

  #### :pods

  When setting this value, this strategy will lookup IP from pods directly.

  In order for your pods to be found they should be returned when you run:

      kubectl get pods -l app=myapp

  Then, this strategy will fetch the IP of all pods with that label and attempt to connect.


  ### `:mode` option

  These option configures how to build the longname.

  Available values:

  + `:ip` (default)
  + `:dns`
  + `:hostname`

  #### :ip

  In this mode, the IP address is used directly. The longname will be something like:

      myapp@<ip>

  Getting this mode to work requires:

  1. exposing pod IP from Kubernetes to the Erlang node.
  2. setting the name of Erlang node according to the exposed information

  First, expose required information from Kubernetes as environment variables of Erlang node:

      # deployment.yaml
      env:
      - name: POD_IP
        valueFrom:
          fieldRef:
            fieldPath: status.podIP

  Then, set the name of Erlang node by using the exposed environment variables. If you use mix releases, you
  can configure the required options in `rel/env.sh.eex`:

      # rel/env.sh.eex
      export RELEASE_DISTRIBUTION=name
      export RELEASE_NODE=<%= @release.name %>@${POD_IP}

  > `export RELEASE_DISTRIBUTION=name` will append a `-name` option to the `start` command directly
  > and requires no further changes to the `vm.args`.

  #### :hostname

  In this mode, the hostname is used directly. The longname will be something like:

      myapp@<hostname>.<service_name>.<namespace>.svc.<cluster_domain>

  Getting `:hostname` mode to work requires:

  1. deploying pods as a StatefulSet (otherwise, hostname is not set for pods)
  2. setting `:kubernetes_service_name` to the name of the Kubernetes service that is being lookup
  3. setting the name of Erlang node according to hostname of pods

  Then, set the name of Erlang node by using the hostname of pod. If you use mix releases, you can
  configure the required options in `rel/env.sh.eex`:

      # rel/env.sh.eex
      export RELEASE_DISTRIBUTION=name
      export RELEASE_NODE=<%= @release.name %>@$(hostname -f)

  > `hostname -f` returns the whole FQDN, which is something like:
  > `$(hostname).${SERVICE_NAME}.${NAMESPACE}.svc.${CLUSTER_DOMAIN}"`.

  #### :dns

  In this mode, an IP-based pod A record is used. The longname will be something like:

      myapp@<pod_a_record>.<namespace>.pod.<cluster_domain>

  Getting `:dns` mode to work requires:

  1. exposing pod IP from Kubernetes to the Erlang node
  2. setting the name of Erlang node according to the exposed information

  First, expose required information from Kubernetes as environment variables of Erlang node:

      # deployment.yaml
      env:
      - name: NAMESPACE
        valueFrom:
          fieldRef:
            fieldPath: metadata.namespace
      - name: POD_IP
        valueFrom:
          fieldRef:
            fieldPath: status.podIP

  Then, set the name of Erlang node by using the exposed environment variables. If you use mix
  releases, you can configure the required options in `rel/env.sh.eex`:

      # rel/env.sh.eex
      export POD_A_RECORD=$(echo $POD_IP | sed 's/\./-/g')
      export CLUSTER_DOMAIN=cluster.local  # modify this value according to your actual situation
      export RELEASE_DISTRIBUTION=name
      export RELEASE_NODE=<%= @release.name %>@${POD_A_RECORD}.${NAMESPACE}.pod.${CLUSTER_DOMAIN}

  ### Which mode is the best one?

  There is no best, only the best for you:

  + If you're not using a StatefulSet, use `:ip` or `:dns`.
  + If you're using a StatefulSet, use `:hostname`.

  And, there is one thing that can be taken into consideration. When using `:ip` or `:dns`, you
  can establish a remote shell (as well as run observer) by using `kubectl port-forward` in combination
  with some entries in `/etc/hosts`.

  ## Polling Interval

  The default interval to sync topologies is `#{@default_polling_interval}`
  (#{div(@default_polling_interval, 1000)} seconds). You can configure it with `:polling_interval` option.

  ## Getting cluster information

  > In general, you don't need to read this, the default values will work.

  This strategy fetchs information of endpoints or pods by accessing the REST API provided by
  Kubernetes.

  The base URL of the REST API has two parts:

      <master_name>.<cluster_domain>

  `<master_name>` is configured by following options:

  + `:kubernetes_master` - the default value is `#{@kubernetes_master}`

  `<cluster_domain>` is configured by following options and environment variables:

  + `:kubernetes_cluster_name` - the default value is `cluster`, and the final cluster domain will be `<cluster_name>.local`
  + `CLUSTER_DOMAIN` - when this environment variable is provided, `:kubernetes_cluster_name` will be ignored

  > `<master_name>` and `<cluster_domain>` also affect each other, checkout the source code for more
  > details.

  Besides the base URL of the REST API, a service account must be provided. The service account is
  configured by following options:

  + `:kubernetes_service_account_path` - the default value is `#{@service_account_path}`

  ## An example configuration

      config :libcluster,
        topologies: [
          erlang_nodes_in_k8s: [
            strategy: #{__MODULE__},
            config: [
              mode: :ip,
              kubernetes_node_basename: "myapp",
              kubernetes_selector: "app=myapp",
              kubernetes_namespace: "my_namespace",
              polling_interval: 10_000
            ]
          ]
        ]

  """
  use GenServer
  use Cluster.Strategy
  import Cluster.Logger

  alias Cluster.Strategy.State

  def start_link(args), do: GenServer.start_link(__MODULE__, args)

  @impl true
  def init([%State{meta: nil} = state]) do
    init([%State{state | :meta => MapSet.new()}])
  end

  def init([%State{} = state]) do
    {:ok, load(state)}
  end

  @impl true
  def handle_info(:timeout, state) do
    handle_info(:load, state)
  end

  def handle_info(:load, %State{} = state) do
    {:noreply, load(state)}
  end

  def handle_info(_, state) do
    {:noreply, state}
  end

  defp load(%State{topology: topology} = state) do
    new_nodelist = MapSet.new(get_nodes(state))
    removed = MapSet.difference(state.meta, new_nodelist)

    new_nodelist =
      case Cluster.Strategy.disconnect_nodes(
             topology,
             state.disconnect,
             state.list_nodes,
             MapSet.to_list(removed)
           ) do
        :ok ->
          new_nodelist

        {:error, bad_nodes} ->
          # Add back the nodes which should have been removed, but which couldn't be for some reason
          Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc ->
            MapSet.put(acc, n)
          end)
      end

    new_nodelist =
      case Cluster.Strategy.connect_nodes(
             topology,
             state.connect,
             state.list_nodes,
             MapSet.to_list(new_nodelist)
           ) do
        :ok ->
          new_nodelist

        {:error, bad_nodes} ->
          # Remove the nodes which should have been added, but couldn't be for some reason
          Enum.reduce(bad_nodes, new_nodelist, fn {n, _}, acc ->
            MapSet.delete(acc, n)
          end)
      end

    Process.send_after(self(), :load, polling_interval(state))

    %State{state | meta: new_nodelist}
  end

  defp polling_interval(%State{config: config}) do
    Keyword.get(config, :polling_interval, @default_polling_interval)
  end

  @spec get_token(String.t()) :: String.t()
  defp get_token(service_account_path) do
    path = Path.join(service_account_path, "token")

    case File.exists?(path) do
      true -> path |> File.read!() |> String.trim()
      false -> ""
    end
  end

  @spec get_ssl_opts(Path.t()) :: Keyword.t()
  defp get_ssl_opts(service_account_path) do
    path = Path.join(service_account_path, "ca.crt")

    case File.exists?(path) do
      true ->
        [
          verify: :verify_peer,
          cacertfile: String.to_charlist(path)
        ]

      false ->
        [verify: :verify_none]
    end
  end

  @spec get_namespace(String.t(), String.t()) :: String.t()
  if Mix.env() == :test do
    defp get_namespace(_service_account_path, nil), do: "__libcluster_test"
  else
    defp get_namespace(service_account_path, nil) do
      path = Path.join(service_account_path, "namespace")

      if File.exists?(path) do
        path |> File.read!() |> String.trim()
      else
        ""
      end
    end
  end

  defp get_namespace(_, namespace), do: namespace

  @spec get_nodes(State.t()) :: [atom()]
  defp get_nodes(%State{topology: topology, config: config, meta: meta}) do
    service_account_path =
      Keyword.get(config, :kubernetes_service_account_path, @service_account_path)

    token = get_token(service_account_path)
    ssl_opts = get_ssl_opts(service_account_path)

    namespace = get_namespace(service_account_path, Keyword.get(config, :kubernetes_namespace))
    app_name = Keyword.fetch!(config, :kubernetes_node_basename)
    cluster_name = Keyword.get(config, :kubernetes_cluster_name, "cluster")
    service_name = Keyword.get(config, :kubernetes_service_name)
    selector = Keyword.fetch!(config, :kubernetes_selector)
    ip_lookup_mode = Keyword.get(config, :kubernetes_ip_lookup_mode, :endpoints)

    master_name = Keyword.get(config, :kubernetes_master, @kubernetes_master)
    cluster_domain = System.get_env("CLUSTER_DOMAIN", "#{cluster_name}.local")

    master =
      cond do
        String.ends_with?(master_name, cluster_domain) ->
          master_name

        String.ends_with?(master_name, ".") ->
          # The dot at the end is used to determine that the name is "final"
          master_name

        :else ->
          master_name <> "." <> cluster_domain
      end

    cond do
      app_name != nil and selector != nil ->
        selector = URI.encode(selector)

        path =
          case ip_lookup_mode do
            :endpoints -> "api/v1/namespaces/#{namespace}/endpoints?labelSelector=#{selector}"
            :pods -> "api/v1/namespaces/#{namespace}/pods?labelSelector=#{selector}"
          end

        headers = [{'authorization', 'Bearer #{token}'}]
        http_options = [ssl: ssl_opts, timeout: 15000]

        case :httpc.request(:get, {'https://#{master}/#{path}', headers}, http_options, []) do
          {:ok, {{_version, 200, _status}, _headers, body}} ->
            parse_response(ip_lookup_mode, Jason.decode!(body))
            |> Enum.map(fn node_info ->
              format_node(
                Keyword.get(config, :mode, :ip),
                node_info,
                app_name,
                cluster_name,
                service_name
              )
            end)

          {:ok, {{_version, 403, _status}, _headers, body}} ->
            %{"message" => msg} = Jason.decode!(body)
            warn(topology, "cannot query kubernetes (unauthorized): #{msg}")
            []

          {:ok, {{_version, code, status}, _headers, body}} ->
            warn(topology, "cannot query kubernetes (#{code} #{status}): #{inspect(body)}")
            meta

          {:error, reason} ->
            error(topology, "request to kubernetes failed!: #{inspect(reason)}")
            meta
        end

      app_name == nil ->
        warn(
          topology,
          "kubernetes strategy is selected, but :kubernetes_node_basename is not configured!"
        )

        []

      selector == nil ->
        warn(
          topology,
          "kubernetes strategy is selected, but :kubernetes_selector is not configured!"
        )

        []

      :else ->
        warn(topology, "kubernetes strategy is selected, but is not configured!")
        []
    end
  end

  defp parse_response(:endpoints, resp) do
    case resp do
      %{"items" => items} when is_list(items) ->
        Enum.reduce(items, [], fn
          %{"subsets" => subsets}, acc when is_list(subsets) ->
            addrs =
              Enum.flat_map(subsets, fn
                %{"addresses" => addresses} when is_list(addresses) ->
                  Enum.map(addresses, fn %{"ip" => ip, "targetRef" => %{"namespace" => namespace}} =
                                           address ->
                    %{ip: ip, namespace: namespace, hostname: address["hostname"]}
                  end)

                _ ->
                  []
              end)

            acc ++ addrs

          _, acc ->
            acc
        end)

      _ ->
        []
    end
  end

  defp parse_response(:pods, resp) do
    case resp do
      %{"items" => items} when is_list(items) ->
        Enum.map(items, fn
          %{
            "status" => %{"podIP" => ip},
            "metadata" => %{"namespace" => ns},
            "spec" => pod_spec
          } ->
            %{ip: ip, namespace: ns, hostname: pod_spec["hostname"]}

          _ ->
            nil
        end)
        |> Enum.filter(&(&1 != nil))

      _ ->
        []
    end
  end

  defp format_node(:ip, %{ip: ip}, app_name, _cluster_name, _service_name),
    do: :"#{app_name}@#{ip}"

  defp format_node(
         :hostname,
         %{hostname: hostname, namespace: namespace},
         app_name,
         cluster_name,
         service_name
       ) do
    :"#{app_name}@#{hostname}.#{service_name}.#{namespace}.svc.#{cluster_name}.local"
  end

  defp format_node(:dns, %{ip: ip, namespace: namespace}, app_name, cluster_name, _service_name) do
    ip = String.replace(ip, ".", "-")
    :"#{app_name}@#{ip}.#{namespace}.pod.#{cluster_name}.local"
  end
end