lib/k8s/client/mint_http_provider.ex

defmodule K8s.Client.MintHTTPProvider do
  @moduledoc """
  Mint based `K8s.Client.Provider`
  """
  @behaviour K8s.Client.Provider

  alias K8s.Client.HTTPError
  alias K8s.Client.Mint.ConnectionRegistry
  alias K8s.Client.Mint.HTTPAdapter
  require Logger

  @data_types [:data, :stdout, :stderr, :error]

  @impl true
  def request(method, uri, body, headers, http_opts) do
    with {:ok, stream} <- stream(method, uri, body, headers, http_opts) do
      response =
        stream
        |> Stream.reject(&(&1 == :done))
        |> Enum.reduce(%{data: []}, fn
          {:data, data}, response -> Map.update!(response, :data, &[data | &1])
          {type, value}, response -> Map.put(response, type, value)
          type, response -> Map.put(response, type, true)
        end)

      response
      |> Map.update!(:data, &(&1 |> Enum.reverse() |> IO.iodata_to_binary()))
      |> process_response()
    end
  end

  @impl true
  def stream(method, uri, body, headers, http_opts) do
    case do_stream_to(method, uri, body, headers, http_opts, nil) do
      {:ok, request_ref, adapter_pid} ->
        stream =
          Stream.resource(
            fn -> :pending end,
            fn
              :pending ->
                parts = HTTPAdapter.recv(adapter_pid, request_ref)
                # credo:disable-for-next-line
                next_state = if :done in parts, do: :done, else: :pending
                {parts, next_state}

              :done ->
                {:halt, :ok}
            end,
            fn _ -> :ok end
          )

        {:ok, stream}

      {:error, error} ->
        {:error, error}
    end
  end

  @impl true
  def stream_to(method, uri, body, headers, http_opts, stream_to) do
    with {:ok, _, _} <- do_stream_to(method, uri, body, headers, http_opts, stream_to) do
      :ok
    end
  end

  @spec do_stream_to(
          method :: atom(),
          uri :: URI.t(),
          body :: binary,
          headers :: list(),
          http_opts :: keyword(),
          stream_to :: pid() | nil
        ) :: {:ok, reference(), pid()} | {:error, HTTPError.t()}
  defp do_stream_to(method, uri, body, headers, http_opts, stream_to) do
    opts = [transport_opts: Keyword.fetch!(http_opts, :ssl)]
    method = String.upcase("#{method}")
    headers = Enum.map(headers, fn {header, value} -> {"#{header}", "#{value}"} end)
    path = uri_to_path(uri)

    with {:ok, %{adapter: adapter_pid, pool: pool}} <- ConnectionRegistry.checkout({uri, opts}),
         {:ok, request_ref} <-
           HTTPAdapter.request(adapter_pid, method, path, headers, body, pool, stream_to) do
      {:ok, request_ref, adapter_pid}
    end
  end

  @impl true
  def websocket_request(uri, headers, http_opts) do
    with {:ok, stream} <- websocket_stream(uri, headers, http_opts) do
      response =
        stream
        |> Stream.reject(&(&1 == :done))
        |> Enum.reduce(%{}, fn
          {type, data}, response when type in @data_types ->
            Map.update(response, type, [data], &[data | &1])

          {type, value}, response ->
            Map.put(response, type, value)
        end)

      response =
        @data_types
        |> Enum.reduce(response, fn type, response ->
          Map.update(response, type, nil, &(&1 |> Enum.reverse() |> IO.iodata_to_binary()))
        end)
        |> Map.reject(&(&1 |> elem(1) |> is_nil()))

      {:ok, response}
    end
  end

  @impl true
  def websocket_stream(uri, headers, http_opts) do
    case do_websocket_stream_to(uri, headers, http_opts, self()) do
      {:ok, request_ref, adapter_pid} ->
        stream =
          Stream.resource(
            fn -> :pending end,
            fn
              :pending ->
                parts = HTTPAdapter.recv(adapter_pid, request_ref)

                # credo:disable-for-lines:2
                next_state =
                  if Enum.any?(parts, &(elem(&1, 0) == :close)), do: :done, else: :pending

                {parts, next_state}

              :done ->
                {:halt, :ok}
            end,
            fn _ -> :ok end
          )

        {:ok, stream}

      {:error, error} ->
        {:error, error}
    end
  end

  @impl true
  def websocket_stream_to(uri, headers, http_opts, stream_to) do
    with {:ok, request_ref, adapter_pid} <-
           do_websocket_stream_to(uri, headers, http_opts, stream_to) do
      send_to_websocket = fn data ->
        HTTPAdapter.websocket_send(adapter_pid, request_ref, data)
      end

      {:ok, send_to_websocket}
    end
  end

  @spec do_websocket_stream_to(
          uri :: URI.t(),
          headers :: list(),
          http_opts :: keyword(),
          stream_to :: pid() | nil
        ) :: {:ok, reference(), pid()} | {:error, HTTPError.t()}
  defp do_websocket_stream_to(uri, headers, http_opts, stream_to) do
    opts = [transport_opts: Keyword.fetch!(http_opts, :ssl), protocols: [:http1]]
    path = uri_to_path(uri)
    headers = Enum.map(headers, fn {header, value} -> {"#{header}", "#{value}"} end)

    with {:ok, %{adapter: adapter_pid, pool: pool}} <- ConnectionRegistry.checkout({uri, opts}),
         {:ok, request_ref} <-
           HTTPAdapter.websocket_request(adapter_pid, path, headers, pool, stream_to) do
      {:ok, request_ref, adapter_pid}
    end
  end

  @spec process_response(map()) :: K8s.Client.Provider.response_t()
  defp process_response(%{status: status} = response) when status in 400..599 do
    %{data: data, headers: headers, status: status_code} = response

    case get_content_type(headers) do
      "application/json" = content_type ->
        data
        |> decode(content_type)
        |> K8s.Client.APIError.from_kubernetes_error()

      _other ->
        {:error, K8s.Client.HTTPError.new(message: "HTTP Error #{status_code}")}
    end
  end

  defp process_response(response) do
    content_type = get_content_type(response.headers)
    body = response.data |> decode(content_type)

    {:ok, body}
  end

  @spec get_content_type(keyword()) :: binary | nil
  defp get_content_type(headers) do
    case List.keyfind(headers, "content-type", 0) do
      {_key, content_type} -> content_type |> String.split(";") |> List.first()
      _ -> nil
    end
  end

  @spec decode(binary, binary) :: map | list | nil
  defp decode(body, "text/plain"), do: body

  defp decode(body, "application/json") do
    case Jason.decode(body) do
      {:ok, data} ->
        data

      {:error, error} ->
        Logger.error("The response body is supposed to be JSON but could not be decoded.",
          library: :k8s,
          error: error,
          body: body
        )

        nil
    end
  end

  @spec uri_to_path(URI.t()) :: binary()
  defp uri_to_path(uri) do
    path =
      IO.iodata_to_binary([
        uri.path,
        if(uri.query, do: ["?" | uri.query], else: [])
      ])

    String.trim(path, "?")
  end
end