lib/k8s/client/mint/http_adapter.ex

defmodule K8s.Client.Mint.HTTPAdapter do
  @moduledoc """
  The Mint client implementation. This module handles both, HTTP requests and
  websocket connections.

  ## Processes

  The module creates a process per connection to the Kubernetes API server.
  It supports `HTTP/2` for HTTP requests, but not for websockets. So while
  an open connection can process multiple requests (if the server supports
  `HTTP/2`), it can only process one single websocket connection.
  Therefore, each websocket connection is handled in its own process.

  ## State

  The module keeps track of the `Mint.HTTP` connection struct and a map of
  pending requests (`K8s.Client.Mint.Request`) for that connection, indexed by the
  `Mint.Types.request_ref()`.

  ### Requests

  Requests are calls to the GenServer that immediately return while the GenServer
  receives the response parts in the background. The way these response parts are
  returned depends on the `state_to` argument passed to `request/7` resp.
  `websocket_request/5`. See these function's docs for more details.
  """
  use GenServer, restart: :temporary

  alias K8s.Client.HTTPError
  alias K8s.Client.Mint.Request

  import K8s.Sys.Logger, only: [log_prefix: 1]
  require Logger
  require Mint.HTTP

  # healthcheck frequency in seconds
  @healthcheck_freq 30

  @type connection_args_t ::
          {scheme :: atom(), host :: binary(), port :: integer(), opts :: keyword()}

  @typedoc """
  Describes the state of the connection.

  - `:conn` - The current state of the `Mint` connection.
  - `:requests` - The opened requests over this connection (Only `HTTP/2` connections will hold multiple entries in this field.)
  """
  @type t :: %__MODULE__{
          conn: Mint.HTTP.t(),
          requests: %{reference() => Request.t()}
        }

  defstruct [:conn, requests: %{}]

  @doc """
  Opens a connection to Kubernetes, defined by `uri` and `opts`,
  and starts the GenServer.
  """
  @spec start_link(connection_args_t()) :: GenServer.on_start()
  def start_link(conn_args) do
    GenServer.start_link(__MODULE__, conn_args)
  end

  @doc """
  Returns connection arguments for the given `URI` and HTTP options.
  """
  @spec connection_args(URI.t(), keyword()) :: connection_args_t()
  def connection_args(uri, opts) do
    {String.to_atom(uri.scheme), uri.host, uri.port, opts}
  end

  @spec open?(GenServer.server(), :read | :write | :read_write) :: boolean()
  def open?(pid, type \\ :read_write) do
    GenServer.call(pid, {:open?, type})
  catch
    :exit, _ -> false
  end

  @doc """
  Starts a HTTP request. The way the response parts are returned depends on the
  `stream_to` argument passed to it.

    - `nil` - response parts are buffered. In order to receive them, the caller
      needs to `recv/2` passing it the `request_ref` returned by this function.
    - `pid` - response parts are sent to the process with the given `pid`.
    - `{pid, reference}` - response parts are sent to the process with the given
      `pid`. Messages are of the form `{reference, response_part}`.
  """
  @spec request(
          GenServer.server(),
          method :: binary(),
          path :: binary(),
          Mint.Types.headers(),
          body :: iodata() | nil | :stream,
          pool :: pid() | nil,
          stream_to :: pid() | nil
        ) :: {:ok, reference()} | {:error, HTTPError.t()}
  def request(pid, method, path, headers, body, pool, stream_to) do
    GenServer.call(pid, {:request, method, path, headers, body, pool, stream_to})
  end

  @doc """
  Upgrades to a Websocket connection. The way the frames are returned depends
  on the `stream_to` argument passed to it.

    - `nil` - frames are buffered. In order to receive them, the caller
      needs to `recv/2` passing it the `request_ref` returned by this function.
    - `pid` - frames are sent to the process with the given `pid`.
    - `{pid, reference}` - frames are sent to the process with the given
      `pid`. Messages are of the form `{reference, response_part}`.
  """
  @spec websocket_request(
          pid(),
          path :: binary(),
          Mint.Types.headers(),
          pool :: pid() | nil,
          stream_to :: pid() | nil
        ) :: {:ok, reference()} | {:error, HTTPError.t()}
  def websocket_request(pid, path, headers, pool, stream_to) do
    GenServer.call(pid, {:websocket_request, path, headers, pool, stream_to})
  end

  @doc """
  Returns response parts / frames that were buffered by the process. The
  `stream_to` must have been set to `nil` in `request/7` or
  `websocket_request/5`.

  If the buffer is empty, this call blocks until the next response part /
  frame is received.
  """
  @spec recv(GenServer.server(), reference()) :: list()
  def recv(pid, ref) do
    GenServer.call(pid, {:recv, ref}, :infinity)
  end

  @doc """
  Sends the given `data` throught the websocket specified by the `request_ref`.
  """
  @spec websocket_send(
          GenServer.server(),
          reference(),
          term()
        ) :: :ok
  def websocket_send(pid, request_ref, data) do
    GenServer.cast(pid, {:websocket_send, request_ref, data})
  end

  @impl true
  def init({scheme, host, port, opts}) do
    case Mint.HTTP.connect(scheme, host, port, opts) do
      {:ok, conn} ->
        Process.send_after(self(), :healthcheck, @healthcheck_freq * 1_000)
        state = %__MODULE__{conn: conn}
        {:ok, state}

      {:error, error} ->
        Logger.error(log_prefix("Failed initializing HTTPAdapter GenServer"), library: :k8s)
        {:stop, HTTPError.from_exception(error)}
    end
  end

  @impl true
  def handle_call({:open?, type}, _from, state) do
    {:reply, Mint.HTTP.open?(state.conn, type), state}
  end

  def handle_call({:request, method, path, headers, body, pool, stream_to}, from, state) do
    caller_ref = from |> elem(0) |> Process.monitor()
    conn = state.conn

    # For HTTP2, if the body is larger than the connection window, we've got to
    # stream it to the server.
    {body, pending_request_body} =
      cond do
        Mint.HTTP.protocol(conn) == :http1 -> {body, nil}
        is_nil(body) -> {nil, nil}
        :otherwise -> {:stream, body}
      end

    with {:ok, conn, request_ref} <- Mint.HTTP.request(conn, method, path, headers, body),
         {:request, request} <-
           {:request,
            Request.new(
              request_ref: request_ref,
              pool: pool,
              stream_to: stream_to,
              caller_ref: caller_ref,
              pending_request_body: pending_request_body
            )},
         {:ok, request, conn} <- Request.stream_request_body(request, conn) do
      state = put_in(state.requests[request_ref], request) |> struct!(conn: conn)
      {:reply, {:ok, request_ref}, state}
    else
      {:error, conn, error} ->
        state = struct!(state, conn: conn)

        {:reply, {:error, HTTPError.from_exception(error)}, state}
    end
  end

  def handle_call({:websocket_request, path, headers, pool, stream_to}, from, state) do
    caller_ref = from |> elem(0) |> Process.monitor()

    with {:ok, conn} <- Mint.HTTP.set_mode(state.conn, :passive),
         {:ok, conn, request_ref} <- Mint.WebSocket.upgrade(:wss, conn, path, headers),
         {:ok, conn, response} <- Request.receive_upgrade_response(conn, request_ref),
         {:ok, conn} <- Mint.HTTP.set_mode(conn, :active),
         {:ok, conn, websocket} <-
           Mint.WebSocket.new(conn, request_ref, response.status, response.headers) do
      state =
        put_in(
          state.requests[request_ref],
          Request.new(
            request_ref: request_ref,
            websocket: websocket,
            pool: pool,
            stream_to: stream_to,
            caller_ref: caller_ref
          )
        )

      send(stream_to, {:open, true})
      {:reply, {:ok, request_ref}, struct!(state, conn: conn)}
    else
      {:error, error} ->
        GenServer.reply(from, {:error, HTTPError.from_exception(error)})
        {:stop, :normal, state}

      {:error, conn, error} ->
        GenServer.reply(from, {:error, HTTPError.from_exception(error)})
        {:stop, :normal, struct!(state, conn: conn)}
    end
  end

  def handle_call({:recv, request_ref}, from, state) do
    {_, state} =
      get_and_update_in(
        state.requests[request_ref],
        &Request.recv(&1, from)
      )

    shutdown_if_closed(state)
  end

  @impl true
  def handle_cast({:websocket_send, request_ref, data}, state) do
    request = state.requests[request_ref]

    with {:ok, frame} <- Request.map_outgoing_frame(data),
         {:ok, websocket, data} <- Mint.WebSocket.encode(request.websocket, frame),
         {:ok, conn} <- Mint.WebSocket.stream_request_body(state.conn, request_ref, data) do
      state = struct!(state, conn: conn)
      state = put_in(state.requests[request_ref].websocket, websocket)
      {:noreply, state}
    end
  end

  @impl true
  def handle_info(message, %__MODULE__{conn: conn} = state)
      when Mint.HTTP.is_connection_message(conn, message) do
    with {:ok, conn, responses} <- Mint.WebSocket.stream(state.conn, message),
         {:ok, state} <- stream_pending_request_bodies(struct!(state, conn: conn)) do
      process_responses_or_frames(state, responses)
    else
      {:error, conn, %Mint.TransportError{reason: :closed}, []} ->
        Logger.debug("The connection was closed.", library: :k8s)

        shutdown_if_closed(struct!(state, conn: conn))

      {:error, conn, error} ->
        Logger.warning(
          log_prefix(
            "An error occurred when streaming the request body: #{Exception.message(error)}"
          ),
          error: error,
          library: :k8s
        )

        struct!(state, conn: conn)

      {:error, conn, error, responses} ->
        Logger.warning(
          log_prefix(
            "An error occurred when streaming the response: #{Exception.message(error)}"
          ),
          error: error,
          library: :k8s
        )

        state
        |> struct!(conn: conn)
        |> process_responses_or_frames(responses)
    end
  end

  def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
    state =
      state.requests
      |> Map.filter(fn {_request_ref, request} -> request.caller_ref == ref end)
      |> Map.keys()
      |> Enum.reduce_while(state, fn
        request_ref, state ->
          case pop_in(state.requests[request_ref]) do
            {%Request{}, %{conn: %Mint.HTTP2{}} = state} ->
              conn = Mint.HTTP2.cancel_request(state.conn, request_ref) |> elem(1)
              {:cont, struct!(state, conn: conn)}

            {_, state} ->
              {:halt, {:stop, state}}
          end
      end)

    case state do
      {:stop, state} ->
        Logger.debug(
          log_prefix(
            "Received :DOWN signal from parent process. Terminating HTTPAdapter #{inspect(self())}."
          ),
          library: :k8s
        )

        {:stop, :normal, state}

      state ->
        {:noreply, state}
    end
  end

  # This is called regularly to check whether the connection is still open. If
  # it's not open, and all buffers are emptied, this process is considered
  # garbage and is stopped.
  def handle_info(:healthcheck, state) do
    if Mint.HTTP.open?(state.conn, :read) or any_non_empty_buffers?(state) do
      Process.send_after(self(), :healthcheck, @healthcheck_freq * 1000)
      {:noreply, state}
    else
      Logger.warning(
        log_prefix("Connection closed for reading and writing - stopping this process."),
        library: :k8s
      )

      {:stop, {:shutdown, :closed}, state}
    end
  end

  @impl true
  def terminate(reason, state) do
    state = state

    state.requests
    |> Enum.each(fn
      {_request_ref, request} when is_nil(request.websocket) ->
        Request.put_response(
          request,
          {:error, reason}
        )

      {request_ref, request} ->
        {:ok, _websocket, data} = Mint.WebSocket.encode(request.websocket, :close)
        Mint.WebSocket.stream_request_body(state.conn, request_ref, data)
    end)

    Mint.HTTP.close(state.conn)

    Logger.debug(log_prefix("Terminating HTTPAdapter GenServer #{inspect(self())}"),
      library: :k8s
    )

    :ok
  end

  @spec stream_pending_request_bodies(t()) ::
          {:ok, t()} | {:error, Mint.HTTP.t(), Mint.Types.error()}
  defp stream_pending_request_bodies(state) do
    stream_pending_request_bodies(state, Map.values(state.requests))
  end

  @spec stream_pending_request_bodies(t(), [Request.t()]) ::
          {:ok, t()} | {:error, Mint.HTTP.t(), Mint.Types.error()}
  defp stream_pending_request_bodies(state, []) do
    {:ok, state}
  end

  defp stream_pending_request_bodies(state, [request | rest]) do
    case Request.stream_request_body(
           request,
           state.conn
         ) do
      {:ok, request, conn} ->
        state =
          put_in(state.requests[request.request_ref], request)
          |> struct!(conn: conn)

        stream_pending_request_bodies(state, rest)

      {:error, conn, error} ->
        {:error, conn, error}
    end
  end

  @spec process_responses_or_frames(t(), [Mint.Types.response()]) :: {:noreply, t()}
  defp process_responses_or_frames(state, [{:data, request_ref, data}] = responses) do
    request = state.requests[request_ref]

    if is_nil(request.websocket) do
      process_responses(state, responses)
    else
      {:ok, websocket, frames} = Mint.WebSocket.decode(request.websocket, data)
      state = put_in(state.requests[request_ref].websocket, websocket)
      process_frames(state, request_ref, frames)
    end
  end

  defp process_responses_or_frames(state, responses) do
    process_responses(state, responses)
  end

  @spec process_frames(t(), reference(), list(Mint.WebSocket.frame())) :: {:noreply, t()}
  defp process_frames(state, request_ref, frames) do
    state =
      frames
      |> Enum.map(&Request.map_frame/1)
      |> Enum.reduce(state, fn mapped_frame, state ->
        {_, state} =
          get_and_update_in(
            state.requests[request_ref],
            &Request.put_response(&1, mapped_frame)
          )

        state
      end)

    {:noreply, state}
  end

  @spec process_responses(t(), [Mint.Types.response()]) :: {:noreply, t()}
  defp process_responses(state, responses) do
    state =
      responses
      |> Enum.map(&Request.map_response/1)
      |> Enum.reduce(state, fn {mapped_response, request_ref}, state ->
        {_, state} =
          get_and_update_in(
            state.requests[request_ref],
            &Request.put_response(&1, mapped_response)
          )

        state
      end)

    {:noreply, state}
  end

  @spec shutdown_if_closed(t()) :: {:noreply, t()} | {:stop, {:shutdown, :closed}, t()}
  defp shutdown_if_closed(state) do
    if Mint.HTTP.open?(state.conn, :read) or any_non_empty_buffers?(state) do
      {:noreply, state}
    else
      Logger.warning(
        log_prefix("Connection closed for reading and writing - stopping this process."),
        library: :k8s
      )

      {:stop, {:shutdown, :closed}, state}
    end
  end

  @spec any_non_empty_buffers?(t()) :: boolean()
  defp any_non_empty_buffers?(state) do
    Enum.any?(state.requests, fn {_, request} -> request.buffer != [] end)
  end
end