lib/k8s/conn/pki.ex

defmodule K8s.Conn.PKI do
  @moduledoc """
  Retrieves information from certificates
  """

  alias K8s.Conn
  alias K8s.Conn.PKI

  @private_key_atoms [
    :RSAPrivateKey,
    :DSAPrivateKey,
    :ECPrivateKey,
    :PrivateKeyInfo
  ]

  @type private_key_data_t :: {atom, binary}

  @doc """
  Reads the certificate from PEM file or base64 encoding
  """
  @spec cert_from_map(map, binary) ::
          {:error, :enoent | K8s.Conn.Error.t()} | {:ok, binary() | nil}
  def cert_from_map(%{"certificate-authority-data" => data}, _) when not is_nil(data),
    do: PKI.cert_from_base64(data)

  def cert_from_map(%{"certificate-authority" => file_name}, base_path)
      when not is_nil(file_name) do
    file_name
    |> Conn.resolve_file_path(base_path)
    |> PKI.cert_from_pem()
  end

  def cert_from_map(%{"insecure-skip-tls-verify" => true}, _), do: {:ok, nil}

  @doc """
  Reads the certificate from a PEM file
  """
  @spec cert_from_pem(String.t()) :: {:ok, binary} | {:error, :enoent | K8s.Conn.Error.t()}
  def cert_from_pem(file) do
    with {:ok, data} <- File.read(file), do: decode_cert_data(data)
  end

  @doc """
  Decodes the certificate from a base64 encoded string
  """
  @spec cert_from_base64(binary) :: {:ok, binary} | {:enoent | K8s.Conn.Error.t()}
  def cert_from_base64(data) do
    with {:ok, data} <- decode64(data) do
      decode_cert_data(data)
    end
  end

  @doc """
  Reads private key from a PEM file
  """
  @spec private_key_from_pem(String.t()) ::
          {:ok, private_key_data_t} | {:error, :enoent | K8s.Conn.Error.t()}
  def private_key_from_pem(file) do
    with {:ok, data} <- File.read(file) do
      decode_private_key_data(data)
    end
  end

  @doc """
  Decodes private key from a base64 encoded string
  """
  @spec private_key_from_base64(String.t()) ::
          {:ok, private_key_data_t} | {:error, K8s.Conn.Error.t()}
  def private_key_from_base64(encoded_key) do
    with {:ok, data} <- decode64(encoded_key) do
      case decode_private_key_data(data) do
        {:ok, private_key_data} -> {:ok, private_key_data}
        error -> error
      end
    end
  end

  @spec decode64(binary) :: {:ok, binary} | {:error, K8s.Conn.Error.t()}
  defp decode64(data) do
    case Base.decode64(data) do
      {:ok, data} -> {:ok, data}
      :error -> {:error, %K8s.Conn.Error{message: "Invalid Base64"}}
    end
  end

  @spec decode_cert_data(binary) :: {:ok, binary} | {:error, K8s.Conn.Error.t()}
  defp decode_cert_data(cert_data) do
    cert_data
    |> :public_key.pem_decode()
    |> Enum.find_value(fn
      {:Certificate, data, _} ->
        {:ok, data}

      _ ->
        {:error, %K8s.Conn.Error{message: "No cert data."}}
    end)
  end

  @spec decode_private_key_data(binary) ::
          {:ok, private_key_data_t} | {:error, K8s.Conn.Error.t()}
  defp decode_private_key_data(private_key_data) do
    private_key_data
    |> :public_key.pem_decode()
    |> Enum.find_value(fn
      {type, data, _} when type in @private_key_atoms ->
        {:ok, {type, data}}

      _ ->
        {:error,
         %K8s.Conn.Error{
           message: "Unsupported PEM data. Supported types #{inspect(@private_key_atoms)}"
         }}
    end)
  end
end