lib/k8s/client/mint/request.ex

defmodule K8s.Client.Mint.Request do
  @moduledoc """
  Maintains the state of a HTTP or Websocket request.
  """

  alias K8s.Client.Mint.ConnectionRegistry

  @typedoc """
  Describes the mode the request is currently in.

  - `::pending` - The request is still streaming its body to the server
  - `:receiving` - The request is currently receiving response parts / frames
  - `:closing` - Websocket requests only: The `:close` frame was received but the process wasn't terminated yet
  - `:terminating` - HTTP requests only: The `:done` part was received but the request isn't cleaned up yet
  """
  @type request_modes :: :pending | :receiving | :closing | :terminating

  @typedoc """
  Defines the state of the request.

  - `:request_ref` - Mint request reference
  - `:caller_ref` - Monitor reference of the calling process.
  - `:stream_to` - the process expecting response parts sent to.
  - `:pool` - the PID of the pool so we can checkin after the last part is sent.
  - `:websocket` - for WebSocket requests: The websocket state (`Mint.WebSocket.t()`).
  - `:mode` - defines what mode the request is currently in.
  - `:buffer` - Holds the buffered response parts or frames that haven't been
    sent to / received by the caller yet
  - `:pending_request_body` - Part of the request body that has not been sent yet.
  """
  @type t :: %__MODULE__{
          request_ref: Mint.Types.request_ref(),
          caller_ref: reference(),
          stream_to: pid() | {pid(), reference()} | nil,
          pool: pid() | nil,
          websocket: Mint.WebSocket.t() | nil,
          mode: request_modes(),
          buffer: list(),
          pending_request_body: binary()
        }

  defstruct [
    :request_ref,
    :caller_ref,
    :stream_to,
    :pool,
    :websocket,
    :pending_request_body,
    mode: :pending,
    buffer: []
  ]

  @spec new(keyword()) :: t()
  def new(fields) do
    mode = if is_nil(fields[:pending_request_body]), do: :receiving, else: :pending
    fields = Keyword.put(fields, :mode, mode)
    struct!(__MODULE__, fields)
  end

  @spec put_response(t(), :done | {atom(), any()}) :: :pop | {t(), t()}
  def put_response(request, response) do
    request
    |> struct!(buffer: [response | request.buffer])
    |> update_mode(response)
    |> send_response()
    |> maybe_terminate_request()
  end

  @spec recv(t(), GenServer.from()) :: :pop | {t(), t()}
  def recv(request, from) do
    request
    |> struct!(stream_to: {:reply, from})
    |> send_response()
    |> maybe_terminate_request()
  end

  @spec update_mode(t(), :done | {atom(), term()}) :: t()
  defp update_mode(%__MODULE__{mode: mode} = request, _) when mode != :receiving, do: request

  defp update_mode(request, {:close, _}) do
    struct!(request, mode: :closing)
  end

  defp update_mode(request, :done) do
    struct!(request, mode: :terminating)
  end

  defp update_mode(request, _), do: request

  @spec maybe_terminate_request(t()) :: {t(), t()} | :pop
  def maybe_terminate_request(%__MODULE__{mode: :closing, buffer: []}), do: :pop

  def maybe_terminate_request(%__MODULE__{mode: :terminating, buffer: []} = request) do
    Process.demonitor(request.caller_ref)
    ConnectionRegistry.checkin(%{pool: request.pool, adapter: self()})
    :pop
  end

  def maybe_terminate_request(request), do: {request, request}

  @spec send_response(t()) :: t()
  defp send_response(%__MODULE__{stream_to: nil} = request) do
    request
  end

  defp send_response(%__MODULE__{stream_to: {:reply, from}, buffer: [_ | _]} = request) do
    GenServer.reply(from, Enum.reverse(request.buffer))
    struct!(request, stream_to: nil, buffer: [])
  end

  defp send_response(%__MODULE__{stream_to: {pid, ref}} = request) do
    request.buffer |> Enum.reverse() |> Enum.each(&send(pid, {ref, &1}))
    struct!(request, buffer: [])
  end

  defp send_response(%__MODULE__{stream_to: pid} = request) do
    request.buffer |> Enum.reverse() |> Enum.each(&send(pid, &1))
    struct!(request, buffer: [])
  end

  @spec map_response({:done, reference()} | {atom(), reference(), any}) ::
          {:done | {atom(), any}, reference()}
  def map_response({:done, ref}), do: {:done, ref}
  def map_response({type, ref, value}), do: {{type, value}, ref}

  @spec map_frame({:binary, binary} | {:close, any, any}) ::
          {:close, {integer(), binary()}}
          | {:error, binary}
          | {:stderr, binary}
          | {:stdout, binary}
  def map_frame({:close, code, reason}), do: {:close, {code, reason}}
  def map_frame({:binary, <<1, msg::binary>>}), do: {:stdout, msg}
  def map_frame({:binary, <<2, msg::binary>>}), do: {:stderr, msg}
  def map_frame({:binary, <<3, msg::binary>>}), do: {:error, msg}
  def map_frame({:binary, msg}), do: {:stdout, msg}

  @spec map_outgoing_frame({:stdin, binary()} | {:close, integer(), binary()} | :close | :exit) ::
          {:ok, :close | {:text, binary} | {:close, integer(), binary()}}
          | K8s.Client.HTTPError.t()
  def map_outgoing_frame({:stdin, data}), do: {:ok, {:text, <<0>> <> data}}
  def map_outgoing_frame(:close), do: {:ok, :close}
  def map_outgoing_frame(:exit), do: {:ok, :close}
  def map_outgoing_frame({:close, code, reason}), do: {:ok, {:close, code, reason}}

  def map_outgoing_frame(data) do
    K8s.Client.HTTPError.new(
      message: "The given message #{inspect(data)} is not supported to be sent to the Pod."
    )
  end

  @spec receive_upgrade_response(Mint.HTTP.t(), reference()) ::
          {:ok, Mint.HTTP.t(), map()} | {:error, Mint.HTTP.t(), Mint.Types.error()}
  def receive_upgrade_response(conn, ref) do
    Enum.reduce_while(Stream.cycle([:ok]), {conn, %{}}, fn _, {conn, response} ->
      case Mint.HTTP.recv(conn, 0, 5000) do
        {:ok, conn, parts} ->
          response =
            parts
            |> Map.new(fn
              {type, ^ref} -> {type, true}
              {type, ^ref, value} -> {type, value}
            end)
            |> Map.merge(response)

          # credo:disable-for-lines:3
          if response[:done],
            do: {:halt, {:ok, conn, response}},
            else: {:cont, {conn, response}}

        {:error, conn, error, _} ->
          {:halt, {:error, conn, error}}
      end
    end)
  end

  @spec stream_request_body(t(), Mint.HTTP.t()) ::
          {:ok, t(), Mint.HTTP.t()} | {:error, Mint.HTTP.t(), Mint.Types.error()}
  def stream_request_body(%__MODULE__{mode: mode} = req, conn) when mode != :pending,
    do: {:ok, req, conn}

  def stream_request_body(request, conn) do
    chunk_size = chunk_size(request, conn)

    %__MODULE__{
      request_ref: request_ref,
      pending_request_body: <<chunk::binary-size(chunk_size), remaining_request_body::binary>>
    } = request

    with {:ok, conn} <- Mint.HTTP.stream_request_body(conn, request_ref, chunk),
         {:remaining_request_body, conn, ""} <-
           {:remaining_request_body, conn, remaining_request_body},
         {:ok, conn} <- Mint.HTTP.stream_request_body(conn, request_ref, :eof) do
      {:ok, struct!(request, mode: :receiving, pending_request_body: nil), conn}
    else
      {:remaining_request_body, conn, remaining_request_body} ->
        {:ok, struct!(request, pending_request_body: remaining_request_body), conn}

      {:error, conn, error} ->
        {:error, conn, error}
    end
  end

  @spec chunk_size(t(), Mint.HTTP.t()) :: non_neg_integer()
  defp chunk_size(request, conn) do
    Enum.min([
      get_window_size(conn, {:request, request.request_ref}),
      get_window_size(conn, :connection),
      byte_size(request.pending_request_body)
    ])
  end

  @spec get_window_size(
          Mint.HTTP.t() | Mint.HTTP2.t(),
          :connection | {:request, Mint.Types.request_ref()}
        ) :: non_neg_integer
  defp get_window_size(conn, connection_or_request) do
    # This is necessary becuase conn types in Mint are opaque and dialyzer
    # would raise if we call Mint.HTTP2.get_window_size() directly with a
    # Mint.HTTP.t() conn.
    #
    # See [elixir-mint/mint#380](https://github.com/elixir-mint/mint/issues/380) for
    # the discussion on it.

    Mint.HTTP2.get_window_size(conn, connection_or_request)
  end
end