lib/strategy/droplet.ex

defmodule Cluster.Strategy.Droplet do
  @moduledoc """
  A libcluster strategy for Digital Ocean Droplets. Check out the [README](readme.html) to get
  started.
  """

  use GenServer
  use Cluster.Strategy

  alias Cluster.Logger
  alias Cluster.Strategy
  alias Cluster.Strategy.State

  @interval 5_000
  @api_url "https://api.digitalocean.com/v2/droplets"
  @metadata_url "http://169.254.169.254/metadata/v1/"

  @doc """
  Starts a GenServer to poll the Digital Ocean API for a list of nodes to add to the cluster.

  Any nodes currently in the cluster that are no longer returned from the API will be removed from
  the cluster. Filtering can be done by tag name or Droplet name, but not both. Otherwise an
  exception will be raised. The current Droplet will be excluded so the node doesn't try to
  connect to itself.

  If there is an issue making the API request, the node list is kept as is with no changes. This
  is because we don't want all the nodes tp disconnect from each other if the Digital Ocean API
  goes down.
  """
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts)
  end

  @impl true
  def init([%State{meta: nil} = state]), do: {:ok, poll(%State{state | meta: MapSet.new()})}
  def init([%State{} = state]), do: {:ok, poll(state)}

  @impl true
  def handle_info(:timeout, state), do: handle_info(:poll, state)
  def handle_info(:poll, state), do: {:noreply, poll(state)}
  def handle_info(_, state), do: {:noreply, state}

  defp poll(%State{config: config} = state) do
    interval = Keyword.get(config, :polling_interval, @interval)
    token = Keyword.fetch!(config, :token)
    tag_name = Keyword.get(config, :tag_name)
    name = Keyword.get(config, :name)

    filters = Enum.filter([tag_name: tag_name, name: name], fn {_, v} -> !is_nil(v) end)
    url = "#{@api_url}?#{URI.encode_query(filters)}"
    id = get_metadata("id")

    if filters[:tag_name] && filters[:name] do
      raise ArgumentError, "Cannot specify both `tag_name` and `name` config values"
    end

    nodes =
      case get_nodes(state, url, token, id) do
        :error ->
          # Something went wrong with the API, don't add or remove any nodes
          state.meta

        nodes ->
          MapSet.new(nodes)
      end

    removed = MapSet.difference(state.meta, nodes) |> MapSet.to_list()

    nodes =
      case Strategy.disconnect_nodes(state.topology, state.disconnect, state.list_nodes, removed) do
        :ok ->
          nodes

        {:error, bad_nodes} ->
          # Add back the nodes which should have been removed but couldn't be
          Enum.reduce(bad_nodes, nodes, fn {n, _}, acc -> MapSet.put(acc, n) end)
      end

    nodes =
      case Cluster.Strategy.connect_nodes(state.topology, state.connect, state.list_nodes, MapSet.to_list(nodes)) do
        :ok ->
          nodes

        {:error, bad_nodes} ->
          # Remove the nodes which should have been added but couldn't be
          Enum.reduce(bad_nodes, nodes, fn {n, _}, acc -> MapSet.delete(acc, n) end)
      end

    Process.send_after(self(), :poll, interval)

    %{state | meta: nodes}
  end

  @doc """
  Makes a request to the Digital Ocean API for a list of droplets and recurses through the pages.

  Will return a parsed list of node names derived from the droplet objects. Expects a full URL and
  a valid access token to be passed. Logs a warning and returns `:error` if the API didn't return
  a successful response.
  """
  def get_nodes(%State{} = state, url, token, id) do
    headers = [
      {to_charlist("Accept"), to_charlist("application/json")},
      {to_charlist("Authorization"), to_charlist("Bearer #{token}")}
    ]

    case :httpc.request(:get, {to_charlist(url), headers}, [ssl: ssl_opts()], []) do
      {:ok, {{_, 200, _}, _, body}} ->
        body = Jason.decode!(body)
        droplets = Map.get(body, "droplets", [])
        nodes = to_node_names(state, droplets, id)

        if next = get_in(body, ["links", "pages", "next"]) do
          nodes ++ get_nodes(state, next, token, id)
        else
          nodes
        end

      {_, error} ->
        Logger.error(state.topology, inspect(error))
        :error
    end
  end

  @doc """
  Returns the Droplet metadata top-level index, or specific metadata values.

  See https://docs.digitalocean.com/products/droplets/how-to/retrieve-droplet-metadata/
  """
  def get_metadata(type) do
    case :httpc.request(:get, {"#{@metadata_url}#{type}", []}, [], []) do
      {:ok, {{_, 200, _}, _, body}} -> body
      _ -> nil
    end
  end

  @doc """
  Returns a list of node names as described in `to_node_name/2`.

  Will not return node names for droplets that don't have a status of "active", or that match the
  provided ID of the current droplet.
  """
  def to_node_names(%State{} = state, droplets, id \\ nil) when is_list(droplets) do
    droplets
    |> Enum.filter(&(&1["id"] != id && &1["status"] == "active"))
    |> Enum.map(&to_node_name(state, &1))
    |> Enum.filter(& &1)
  end

  @doc """
  Converts a droplet map returned from the Digital Ocean API to a node name such as
  `:"foobar@127.0.0.1"`.

  Will optionally run a health check on the node to ensure it is ready to connect to the cluster.
  Returns nil if the health check fails, or if the droplet doesn't have an address for the defined
  network type and ip version.
  """
  def to_node_name(%State{} = state, droplet) when is_map(droplet) do
    basename = Keyword.get(state.config, :node_basename, Map.get(droplet, "name"))
    type = Keyword.get(state.config, :network, :private)
    ipv = if Keyword.get(state.config, :ipv6, false), do: "v6", else: "v4"
    health_check = Keyword.get(state.config, :health_check)

    network =
      droplet
      |> Map.get("networks", %{})
      |> Map.get(ipv, [])
      |> Enum.find(&(&1["type"] == Atom.to_string(type)))

    case network do
      %{"ip_address" => ip_address} ->
        if healthy?(ip_address, health_check) do
          :"#{basename}@#{ip_address}"
        else
          nil
        end

      _ ->
        Logger.warn(
          state.topology,
          "No #{type} ip#{ipv} network was found for droplet ##{droplet["id"]}"
        )

        nil
    end
  end

  @doc """
  Runs a health check for the provided IP address.
  """
  def healthy?(_ip, nil), do: true

  def healthy?(ip, {:tcp, opts}) do
    port = Keyword.fetch!(opts, :port)
    timeout = Keyword.get(opts, :timeout, 500)

    with {:ok, socket} <- :gen_tcp.connect(to_charlist(ip), port, [], timeout),
         :ok <- :gen_tcp.close(socket) do
      true
    else
      _ -> false
    end
  end

  defp ssl_opts do
    [
      depth: 3,
      verify: :verify_peer,
      cacertfile: CAStore.file_path(),
      customize_hostname_check: [
        match_fun: :public_key.pkix_verify_hostname_match_fun(:https)
      ]
    ]
  end
end