lib/mint/http2.ex

defmodule Mint.HTTP2 do
  @moduledoc """
  Processless HTTP client with support for HTTP/2.

  This module provides a data structure that represents an HTTP/2 connection to
  a given server. The connection is represented as an opaque struct `%Mint.HTTP2{}`.
  The connection is a data structure and is not backed by a process, and all the
  connection handling happens in the process that creates the struct.

  This module and data structure work exactly like the ones described in the `Mint.HTTP`
  module, with the exception that `Mint.HTTP2` specifically deals with HTTP/2 while
  `Mint.HTTP` deals seamlessly with HTTP/1.1 and HTTP/2. For more information on
  how to use the data structure and client architecture, see `Mint.HTTP`.

  ## HTTP/2 streams and requests

  HTTP/2 introduces the concept of **streams**. A stream is an isolated conversation
  between the client and the server. Each stream is unique and identified by a unique
  **stream ID**, which means that there's no order when data comes on different streams
  since they can be identified uniquely. A stream closely corresponds to a request, so
  in this documentation and client we will mostly refer to streams as "requests".
  We mentioned data on streams can come in arbitrary order, and streams are requests,
  so the practical effect of this is that performing request A and then request B
  does not mean that the response to request A will come before the response to request B.
  This is why we identify each request with a unique reference returned by `request/5`.
  See `request/5` for more information.

  ## Closed connection

  In HTTP/2, the connection can either be open, closed, or only closed for writing.
  When a connection is closed for writing, the client cannot send requests or stream
  body chunks, but it can still read data that the server might be sending. When the
  connection gets closed on the writing side, a `:server_closed_connection` error is
  returned. `{:error, request_ref, error}` is returned for requests that haven't been
  processed by the server, with the reason of `error` being `:unprocessed`.
  These requests are safe to retry.

  ## HTTP/2 settings

  HTTP/2 supports settings negotiation between servers and clients. The server advertises
  its settings to the client and the client advertises its settings to the server. A peer
  (server or client) has to acknowledge the settings advertised by the other peer before
  those settings come into action (that's why it's called a negotiation).

  A first settings negotiation happens right when the connection starts.
  Servers and clients can renegotiate settings at any time during the life of the
  connection.

  Mint users don't need to care about settings acknowledgements directly since they're
  handled transparently by `stream/2`.

  To retrieve the server settings, you can use `get_server_setting/2`. Doing so is often
  useful to be able to tune your requests based on the server settings.

  To communicate client settings to the server, use `put_settings/2` or pass them when
  starting up a connection with `connect/4`. Note that the server needs to acknowledge
  the settings sent through `put_setting/2` before those settings come into effect. The
  server ack is processed transparently by `stream/2`, but this means that if you change
  a setting through `put_settings/2` and try to retrieve the value of that setting right
  after with `get_client_setting/2`, you'll likely get the old value of that setting. Once
  the server acknowledges the new settings, the updated value will be returned by
  `get_client_setting/2`.

  ## Server push

  HTTP/2 supports [server push](https://en.wikipedia.org/wiki/HTTP/2_Server_Push), which
  is a way for a server to send a response to a client without the client needing to make
  the corresponding request. The server sends a `:push_promise` response to a normal request:
  this creates a new request reference. Then, the server sends normal responses for the newly
  created request reference.

  Let's see an example. We will ask the server for `"/index.html"` and the server will
  send us a push promise for `"/style.css"`.

      {:ok, conn} = Mint.HTTP2.connect(:https, "example.com", 443)
      {:ok, conn, request_ref} = Mint.HTTP2.request(conn, "GET", "/index.html", _headers = [], _body = "")

      next_message =
        receive do
          msg -> msg
        end

      {:ok, conn, responses} = Mint.HTTP2.stream(conn, next_message)

      [
        {:push_promise, ^request_ref, promised_request_ref, promised_headers},
        {:status, ^request_ref, 200},
        {:headers, ^request_ref, []},
        {:data, ^request_ref, "<html>..."},
        {:done, ^request_ref}
      ] = responses

      promised_headers
      #=> [{":method", "GET"}, {":path", "/style.css"}]

  As you can see in the example above, when the server sends a push promise then a
  `:push_promise` response is returned as a response to a request. The `:push_promise`
  response contains a `promised_request_ref` and some `promised_headers`. The
  `promised_request_ref` is the new request ref that pushed responses will be tagged with.
  `promised_headers` are headers that tell the client *what request* the promised response
  will respond to. The idea is that the server tells the client a request the client will
  want to make and then preemptively sends a response for that request. Promised headers
  will always include `:method`, `:path`, and `:authority`.

      next_message =
        receive do
          msg -> msg
        end

      {:ok, conn, responses} = Mint.HTTP2.stream(conn, next_message)

      [
        {:status, ^promised_request_ref, 200},
        {:headers, ^promised_request_ref, []},
        {:data, ^promised_request_ref, "body { ... }"},
        {:done, ^promised_request_ref}
      ]

  The response to a promised request is like a response to any normal request.

  ### Disabling server pushes

  HTTP/2 exposes a boolean setting for enabling or disabling server pushes with `:enable_push`.
  You can pass this option when connecting or in `put_settings/2`. By default server push
  is enabled.
  """

  import Mint.Core.Util
  import Mint.HTTP2.Frame, except: [encode: 1, decode_next: 1]

  alias Mint.{HTTPError, TransportError}
  alias Mint.Types
  alias Mint.Core.Util
  alias Mint.HTTP2.Frame

  require Logger
  require Integer

  @behaviour Mint.Core.Conn

  ## Constants

  @connection_preface "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
  @transport_opts [alpn_advertised_protocols: ["h2"]]

  @default_window_size 65_535
  @max_window_size 2_147_483_647

  @default_max_frame_size 16_384
  @valid_max_frame_size_range @default_max_frame_size..16_777_215

  @user_agent "mint/" <> Mix.Project.config()[:version]

  # HTTP/2 connection struct.
  defstruct [
    # Transport things.
    :transport,
    :socket,
    :mode,

    # Host things.
    :hostname,
    :port,
    :scheme,

    # Connection state (open, closed, and so on).
    :state,

    # Fields of the connection.
    buffer: "",
    window_size: @default_window_size,
    encode_table: HPAX.new(4096),
    decode_table: HPAX.new(4096),

    # Queue for sent PING frames.
    ping_queue: :queue.new(),

    # Queue for sent SETTINGS frames.
    client_settings_queue: :queue.new(),

    # Stream-set-related things.
    next_stream_id: 3,
    streams: %{},
    open_client_stream_count: 0,
    open_server_stream_count: 0,
    ref_to_stream_id: %{},

    # Settings that the server communicates to the client.
    server_settings: %{
      enable_push: true,
      max_concurrent_streams: 100,
      initial_window_size: @default_window_size,
      max_frame_size: @default_max_frame_size,
      max_header_list_size: :infinity,
      # Only supported by the server: https://www.rfc-editor.org/rfc/rfc8441.html#section-3
      enable_connect_protocol: false
    },

    # Settings that the client communicates to the server.
    client_settings: %{
      max_concurrent_streams: 100,
      max_frame_size: @default_max_frame_size,
      enable_push: true
    },

    # Headers being processed (when headers are split into multiple frames with CONTINUATIONS, all
    # the continuation frames must come one right after the other).
    headers_being_processed: nil,

    # Stores the headers returned by the proxy in the `CONNECT` method
    proxy_headers: [],

    # Private store.
    private: %{}
  ]

  ## Types

  @typedoc """
  HTTP/2 setting with its value.

  This type represents both server settings as well as client settings. To retrieve
  server settings use `get_server_setting/2` and to retrieve client settings use
  `get_client_setting/2`. To send client settings to the server, see `put_settings/2`.

  The supported settings are the following:

    * `:header_table_size` - (integer) corresponds to `SETTINGS_HEADER_TABLE_SIZE`.

    * `:enable_push` - (boolean) corresponds to `SETTINGS_ENABLE_PUSH`. Sets whether
      push promises are supported. If you don't want to support push promises,
      use `put_settings/2` to tell the server that your client doesn't want push promises.

    * `:max_concurrent_streams` - (integer) corresponds to `SETTINGS_MAX_CONCURRENT_STREAMS`.
      Tells what is the maximum number of streams that the peer sending this (client or server)
      supports. As mentioned in the module documentation, HTTP/2 streams are equivalent to
      requests, so knowing the maximum number of streams that the server supports can be useful
      to know how many concurrent requests can be open at any time. Use `get_server_setting/2`
      to find out how many concurrent streams the server supports.

    * `:initial_window_size` - (integer smaller than `#{inspect(@max_window_size)}`)
      corresponds to `SETTINGS_INITIAL_WINDOW_SIZE`. Tells what is the value of
      the initial HTTP/2 window size for the peer that sends this setting.

    * `:max_frame_size` - (integer in the range `#{inspect(@valid_max_frame_size_range)}`)
      corresponds to `SETTINGS_MAX_FRAME_SIZE`. Tells what is the maximum size of an HTTP/2
      frame for the peer that sends this setting.

    * `:max_header_list_size` - (integer) corresponds to `SETTINGS_MAX_HEADER_LIST_SIZE`.

    * `:enable_connect_protocol` - (boolean) corresponds to `SETTINGS_ENABLE_CONNECT_PROTOCOL`.
      Sets whether the client may invoke the extended connect protocol which is used to
      bootstrap WebSocket connections.

  """
  @type setting() ::
          {:enable_push, boolean()}
          | {:max_concurrent_streams, pos_integer()}
          | {:initial_window_size, 1..2_147_483_647}
          | {:max_frame_size, 16_384..16_777_215}
          | {:max_header_list_size, :infinity | pos_integer()}
          | {:enable_connect_protocol, boolean()}

  @typedoc """
  HTTP/2 settings.

  See `t:setting/0`.
  """
  @type settings() :: [setting()]

  @typedoc """
  An HTTP/2-specific error reason.

  The values can be:

    * `:closed` - when you try to make a request or stream a body chunk but the connection
      is closed.

    * `:closed_for_writing` - when you try to make a request or stream a body chunk but
      the connection is closed for writing. This means you cannot issue any more requests.
      See the "Closed connection" section in the module documentation for more information.

    * `:too_many_concurrent_requests` - when the maximum number of concurrent requests
      allowed by the server is reached. To find out what this limit is, use `get_setting/2`
      with the `:max_concurrent_streams` setting name.

    * `{:max_header_list_size_exceeded, size, max_size}` - when the maximum size of
      the header list is reached. `size` is the actual value of the header list size,
      `max_size` is the maximum value allowed. See `get_setting/2` to retrieve the
      value of the max size.

    * `{:exceeds_window_size, what, window_size}` - when the data you're trying to send
      exceeds the window size of the connection (if `what` is `:connection`) or of a request
      (if `what` is `:request`). `window_size` is the allowed window size. See
      `get_window_size/2`.

    * `{:stream_not_found, stream_id}` - when the given request is not found.

    * `:unknown_request_to_stream` - when you're trying to stream data on an unknown
      request.

    * `:request_is_not_streaming` - when you try to send data (with `stream_request_body/3`)
      on a request that is not open for streaming.

    * `:unprocessed` - when a request was closed because it was not processed by the server.
      When this error is returned, it means that the server hasn't processed the request at all,
      so it's safe to retry the given request on a different or new connection.

    * `{:server_closed_request, error_code}` - when the server closes the request.
      `error_code` is the reason why the request was closed.

    * `{:server_closed_connection, reason, debug_data}` - when the server closes the connection
      gracefully or because of an error. In HTTP/2, this corresponds to a `GOAWAY` frame.
      `error` is the reason why the connection was closed. `debug_data` is additional debug data.

    * `{:frame_size_error, frame}` - when there's an error with the size of a frame.
      `frame` is the frame type, such as `:settings` or `:window_update`.

    * `{:protocol_error, debug_data}` - when there's a protocol error.
      `debug_data` is a string that explains the nature of the error.

    * `{:compression_error, debug_data}` - when there's a header compression error.
      `debug_data` is a string that explains the nature of the error.

    * `{:flow_control_error, debug_data}` - when there's a flow control error.
      `debug_data` is a string that explains the nature of the error.

  """
  @type error_reason() :: term()

  @opaque t() :: %Mint.HTTP2{}

  ## Public interface

  @doc """
  Same as `Mint.HTTP.connect/4`, but forces a HTTP/2 connection.
  """
  @spec connect(Types.scheme(), Types.address(), :inet.port_number(), keyword()) ::
          {:ok, t()} | {:error, Types.error()}
  def connect(scheme, address, port, opts \\ []) do
    hostname = Mint.Core.Util.hostname(opts, address)

    transport_opts =
      opts
      |> Keyword.get(:transport_opts, [])
      |> Keyword.merge(@transport_opts)
      |> Keyword.put(:hostname, hostname)

    case negotiate(address, port, scheme, transport_opts) do
      {:ok, socket} ->
        initiate(scheme, socket, hostname, port, opts)

      {:error, reason} ->
        {:error, reason}
    end
  end

  @doc false
  @spec upgrade(
          Types.scheme(),
          Mint.Types.socket(),
          Types.scheme(),
          String.t(),
          :inet.port_number(),
          keyword()
        ) :: {:ok, t()} | {:error, Types.error()}
  def upgrade(old_scheme, socket, new_scheme, hostname, port, opts) do
    transport = scheme_to_transport(new_scheme)

    transport_opts =
      opts
      |> Keyword.get(:transport_opts, [])
      |> Keyword.merge(@transport_opts)

    with {:ok, socket} <- transport.upgrade(socket, old_scheme, hostname, port, transport_opts) do
      initiate(new_scheme, socket, hostname, port, opts)
    end
  end

  @doc """
  See `Mint.HTTP.close/1`.
  """
  @impl true
  @spec close(t()) :: {:ok, t()}
  def close(conn)

  def close(%__MODULE__{state: :open} = conn) do
    send_connection_error!(conn, :no_error, "connection peacefully closed by client")
  catch
    {:mint, conn, %HTTPError{reason: {:no_error, _}}} ->
      {:ok, conn}

    # We could have an error sending the GOAWAY frame, but we want to ignore that since
    # we're closing the connection anyways.
    {:mint, conn, %TransportError{}} ->
      conn = put_in(conn.state, :closed)
      {:ok, conn}
  end

  def close(%__MODULE__{state: {:goaway, _error_code, _debug_data}} = conn) do
    _ = conn.transport.close(conn.socket)
    {:ok, put_in(conn.state, :closed)}
  end

  def close(%__MODULE__{state: :closed} = conn) do
    {:ok, conn}
  end

  @doc """
  See `Mint.HTTP.open?/1`.
  """
  @impl true
  @spec open?(t(), :read | :write | :read_write) :: boolean()
  def open?(%Mint.HTTP2{state: state} = _conn, type \\ :read_write)
      when type in [:read, :write, :read_write] do
    case state do
      :open -> true
      {:goaway, _error_code, _debug_data} -> type == :read
      :closed -> false
    end
  end

  @doc """
  See `Mint.HTTP.request/5`.

  In HTTP/2, opening a request means opening a new HTTP/2 stream (see the
  module documentation). This means that a request could fail because the
  maximum number of concurrent streams allowed by the server has been reached.
  In that case, the error reason `:too_many_concurrent_requests` is returned.
  If you want to avoid incurring in this error, you can retrieve the value of
  the maximum number of concurrent streams supported by the server through
  `get_server_setting/2` (passing in the `:max_concurrent_streams` setting name).

  ## Header list size

  In HTTP/2, the server can optionally specify a maximum header list size that
  the client needs to respect when sending headers. The header list size is calculated
  by summing the length (in bytes) of each header name plus value, plus 32 bytes for
  each header. Note that pseudo-headers (like `:path` or `:method`) count towards
  this size. If the size is exceeded, an error is returned. To check what the size
  is, use `get_server_setting/2`.

  ## Request body size

  If the request body size will exceed the window size of the HTTP/2 stream created by the
  request or the window size of the connection Mint will return a `:exceeds_window_size`
  error.

  To ensure you do not exceed the window size it is recommended to stream the request
  body by initially passing `:stream` as the body and sending the body in chunks using
  `stream_request_body/3` and using `get_window_size/2` to get the window size of the
  request and connection.
  """
  @impl true
  @spec request(
          t(),
          method :: String.t(),
          path :: String.t(),
          Types.headers(),
          body :: iodata() | nil | :stream
        ) ::
          {:ok, t(), Types.request_ref()}
          | {:error, t(), Types.error()}
  def request(conn, method, path, headers, body)

  def request(%Mint.HTTP2{state: :closed} = conn, _method, _path, _headers, _body) do
    {:error, conn, wrap_error(:closed)}
  end

  def request(
        %Mint.HTTP2{state: {:goaway, _error_code, _debug_data}} = conn,
        _method,
        _path,
        _headers,
        _body
      ) do
    {:error, conn, wrap_error(:closed_for_writing)}
  end

  def request(%Mint.HTTP2{} = conn, method, path, headers, body)
      when is_binary(method) and is_binary(path) and is_list(headers) do
    headers =
      headers
      |> downcase_header_names()
      |> add_pseudo_headers(conn, method, path)
      |> add_default_headers(body)
      |> sort_pseudo_headers_to_front()

    {conn, stream_id, ref} = open_stream(conn)
    {conn, payload} = encode_request_payload(conn, stream_id, headers, body)
    conn = send!(conn, payload)
    {:ok, conn, ref}
  catch
    :throw, {:mint, _conn, reason} ->
      # The stream is invalid and "_conn" may be tracking it, so we return the original connection instead.
      {:error, conn, reason}
  end

  @doc """
  See `Mint.HTTP.stream_request_body/3`.
  """
  @impl true
  @spec stream_request_body(
          t(),
          Types.request_ref(),
          iodata() | :eof | {:eof, trailing_headers :: Types.headers()}
        ) :: {:ok, t()} | {:error, t(), Types.error()}
  def stream_request_body(conn, request_ref, chunk)

  def stream_request_body(%Mint.HTTP2{state: :closed} = conn, _request_ref, _chunk) do
    {:error, conn, wrap_error(:closed)}
  end

  def stream_request_body(
        %Mint.HTTP2{state: {:goaway, _error_code, _debug_data}} = conn,
        _request_ref,
        _chunk
      ) do
    {:error, conn, wrap_error(:closed_for_writing)}
  end

  def stream_request_body(%Mint.HTTP2{} = conn, request_ref, chunk)
      when is_reference(request_ref) do
    case Map.fetch(conn.ref_to_stream_id, request_ref) do
      {:ok, stream_id} ->
        {conn, payload} = encode_stream_body_request_payload(conn, stream_id, chunk)
        conn = send!(conn, payload)
        {:ok, conn}

      :error ->
        {:error, conn, wrap_error(:unknown_request_to_stream)}
    end
  catch
    :throw, {:mint, _conn, reason} ->
      # The stream is invalid and "_conn" may be tracking it, so we return the original connection instead.
      {:error, conn, reason}
  end

  @doc """
  Pings the server.

  This function is specific to HTTP/2 connections. It sends a **ping** request to
  the server `conn` is connected to. A `{:ok, conn, request_ref}` tuple is returned,
  where `conn` is the updated connection and `request_ref` is a unique reference that
  identifies this ping request. The response to a ping request is returned by `stream/2`
  as a `{:pong, request_ref}` tuple. If there's an error, this function returns
  `{:error, conn, reason}` where `conn` is the updated connection and `reason` is the
  error reason.

  `payload` must be an 8-byte binary with arbitrary content. When the server responds to
  a ping request, it will use that same payload. By default, the payload is an 8-byte
  binary with all bits set to `0`.

  Pinging can be used to measure the latency with the server and to ensure the connection
  is alive and well.

  ## Examples

      {:ok, conn, ref} = Mint.HTTP2.ping(conn)

  """
  @spec ping(t(), <<_::8>>) :: {:ok, t(), Types.request_ref()} | {:error, t(), Types.error()}
  def ping(%Mint.HTTP2{} = conn, payload \\ :binary.copy(<<0>>, 8))
      when byte_size(payload) == 8 do
    {conn, ref} = send_ping(conn, payload)
    {:ok, conn, ref}
  catch
    :throw, {:mint, conn, error} -> {:error, conn, error}
  end

  @doc """
  Communicates the given client settings to the server.

  This function is HTTP/2-specific.

  This function takes a connection and a keyword list of HTTP/2 settings and sends
  the values of those settings to the server. The settings won't be effective until
  the server acknowledges them, which will be handled transparently by `stream/2`.

  This function returns `{:ok, conn}` when sending the settings to the server is
  successful, with `conn` being the updated connection. If there's an error, this
  function returns `{:error, conn, reason}` with `conn` being the updated connection
  and `reason` being the reason of the error.

  ## Supported settings

  See `t:setting/0` for the supported settings. You can see the meaning
  of these settings [in the corresponding section in the HTTP/2
  RFC](https://httpwg.org/specs/rfc7540.html#SettingValues).

  See the "HTTP/2 settings" section in the module documentation for more information.

  ## Examples

      {:ok, conn} = Mint.HTTP2.put_settings(conn, max_frame_size: 100)

  """
  @spec put_settings(t(), settings()) :: {:ok, t()} | {:error, t(), Types.error()}
  def put_settings(%Mint.HTTP2{} = conn, settings) when is_list(settings) do
    conn = send_settings(conn, settings)
    {:ok, conn}
  catch
    :throw, {:mint, conn, error} -> {:error, conn, error}
  end

  @doc """
  Gets the value of the given HTTP/2 server settings.

  This function returns the value of the given HTTP/2 setting that the server
  advertised to the client. This function is HTTP/2 specific.
  For more information on HTTP/2 settings, see [the related section in
  the RFC](https://httpwg.org/specs/rfc7540.html#SettingValues).

  See the "HTTP/2 settings" section in the module documentation for more information.

  ## Supported settings

  The possible settings that can be retrieved are described in `t:setting/0`.
  Any other atom passed as `name` will raise an error.

  ## Examples

      Mint.HTTP2.get_server_setting(conn, :max_concurrent_streams)
      #=> 500

  """
  @spec get_server_setting(t(), atom()) :: term()
  def get_server_setting(%Mint.HTTP2{} = conn, name) when is_atom(name) do
    get_setting(conn.server_settings, name)
  end

  @doc """
  Gets the value of the given HTTP/2 client setting.

  This function returns the value of the given HTTP/2 setting that the client
  advertised to the server. Client settings can be advertised through `put_settings/2`
  or when starting up a connection.

  Client settings have to be acknowledged by the server before coming into effect.

  This function is HTTP/2 specific. For more information on HTTP/2 settings, see
  [the related section in the RFC](https://httpwg.org/specs/rfc7540.html#SettingValues).

  See the "HTTP/2 settings" section in the module documentation for more information.

  ## Supported settings

  The possible settings that can be retrieved are described in `t:setting/0`.
  Any other atom passed as `name` will raise an error.

  ## Examples

      Mint.HTTP2.get_client_setting(conn, :max_concurrent_streams)
      #=> 500

  """
  @spec get_client_setting(t(), atom()) :: term()
  def get_client_setting(%Mint.HTTP2{} = conn, name) when is_atom(name) do
    get_setting(conn.client_settings, name)
  end

  defp get_setting(settings, name) do
    case Map.fetch(settings, name) do
      {:ok, value} -> value
      :error -> raise ArgumentError, "unknown HTTP/2 setting: #{inspect(name)}"
    end
  end

  @doc """
  Cancels an in-flight request.

  This function is HTTP/2 specific. It cancels an in-flight request. The server could have
  already sent responses for the request you want to cancel: those responses will be parsed
  by the connection but not returned to the user. No more responses
  to a request will be returned after you call `cancel_request/2` on that request.

  If there's no error in canceling the request, `{:ok, conn}` is returned where `conn` is
  the updated connection. If there's an error, `{:error, conn, reason}` is returned where
  `conn` is the updated connection and `reason` is the error reason.

  ## Examples

      {:ok, conn, ref} = Mint.HTTP2.request(conn, "GET", "/", _headers = [])
      {:ok, conn} = Mint.HTTP2.cancel_request(conn, ref)

  """
  @spec cancel_request(t(), Types.request_ref()) :: {:ok, t()} | {:error, t(), Types.error()}
  def cancel_request(%Mint.HTTP2{} = conn, request_ref) when is_reference(request_ref) do
    case Map.fetch(conn.ref_to_stream_id, request_ref) do
      {:ok, stream_id} ->
        conn = close_stream!(conn, stream_id, _error_code = :cancel)
        {:ok, conn}

      :error ->
        {:ok, conn}
    end
  catch
    :throw, {:mint, conn, error} -> {:error, conn, error}
  end

  @doc """
  Returns the window size of the connection or of a single request.

  This function is HTTP/2 specific. It returns the window size of
  either the connection if `connection_or_request` is `:connection` or of a single
  request if `connection_or_request` is `{:request, request_ref}`.

  Use this function to check the window size of the connection before sending a
  full request. Also use this function to check the window size of both the
  connection and of a request if you want to stream body chunks on that request.

  For more information on flow control and window sizes in HTTP/2, see the section
  below.

  ## HTTP/2 flow control

  In HTTP/2, flow control is implemented through a
  window size. When the client sends data to the server, the window size is decreased
  and the server needs to "refill" it on the client side. You don't need to take care of
  the refilling of the client window as it happens behind the scenes in `stream/2`.

  A window size is kept for the entire connection and all requests affect this window
  size. A window size is also kept per request.

  The only thing that affects the window size is the body of a request, regardless of
  if it's a full request sent with `request/5` or body chunks sent through
  `stream_request_body/3`. That means that if we make a request with a body that is
  five bytes long, like `"hello"`, the window size of the connection and the window size
  of that particular request will decrease by five bytes.

  If we use all the window size before the server refills it, functions like
  `request/5` will return an error.

  ## Examples

  On the connection:

      HTTP2.get_window_size(conn, :connection)
      #=> 65_536

  On a single streamed request:

      {:ok, conn, request_ref} = HTTP2.request(conn, "GET", "/", [], :stream)
      HTTP2.get_window_size(conn, {:request, request_ref})
      #=> 65_536

      {:ok, conn} = HTTP2.stream_request_body(conn, request_ref, "hello")
      HTTP2.get_window_size(conn, {:request, request_ref})
      #=> 65_531

  """
  @spec get_window_size(t(), :connection | {:request, Types.request_ref()}) :: non_neg_integer()
  def get_window_size(conn, connection_or_request)

  def get_window_size(%Mint.HTTP2{} = conn, :connection) do
    conn.window_size
  end

  def get_window_size(%Mint.HTTP2{} = conn, {:request, request_ref}) do
    case Map.fetch(conn.ref_to_stream_id, request_ref) do
      {:ok, stream_id} ->
        conn.streams[stream_id].window_size

      :error ->
        raise ArgumentError,
              "request with request reference #{inspect(request_ref)} was not found"
    end
  end

  @doc """
  See `Mint.HTTP.stream/2`.
  """
  @impl true
  @spec stream(t(), term()) ::
          {:ok, t(), [Types.response()]}
          | {:error, t(), Types.error(), [Types.response()]}
          | :unknown
  def stream(conn, message)

  def stream(%Mint.HTTP2{socket: socket} = conn, {tag, socket, reason})
      when tag in [:tcp_error, :ssl_error] do
    error = conn.transport.wrap_error(reason)
    {:error, %{conn | state: :closed}, error, _responses = []}
  end

  def stream(%Mint.HTTP2{socket: socket} = conn, {tag, socket})
      when tag in [:tcp_closed, :ssl_closed] do
    handle_closed(conn)
  end

  def stream(%Mint.HTTP2{transport: transport, socket: socket} = conn, {tag, socket, data})
      when tag in [:tcp, :ssl] do
    case maybe_concat_and_handle_new_data(conn, data) do
      {:ok, %{mode: mode, state: state} = conn, responses}
      when mode == :active and state != :closed ->
        case transport.setopts(socket, active: :once) do
          :ok -> {:ok, conn, responses}
          {:error, reason} -> {:error, put_in(conn.state, :closed), reason, responses}
        end

      other ->
        other
    end
  catch
    :throw, {:mint, conn, error, responses} -> {:error, conn, error, responses}
  end

  def stream(%Mint.HTTP2{}, _message) do
    :unknown
  end

  @doc """
  See `Mint.HTTP.open_request_count/1`.

  In HTTP/2, the number of open requests is the number of requests **opened by the client**
  that have not yet received a `:done` response. It's important to note that only
  requests opened by the client (with `request/5`) count towards the number of open
  requests, as requests opened from the server with server pushes (see the "Server push"
  section in the module documentation) are not considered open requests. We do this because
  clients might need to know how many open requests there are because the server limits
  the number of concurrent requests the client can open. To know how many requests the client
  can open, see `get_server_setting/2` with the `:max_concurrent_streams` setting.
  """
  @impl true
  @spec open_request_count(t()) :: non_neg_integer()
  def open_request_count(%Mint.HTTP2{} = conn) do
    conn.open_client_stream_count
  end

  @doc """
  See `Mint.HTTP.recv/3`.
  """
  @impl true
  @spec recv(t(), non_neg_integer(), timeout()) ::
          {:ok, t(), [Types.response()]}
          | {:error, t(), Types.error(), [Types.response()]}
  def recv(conn, byte_count, timeout)

  def recv(%__MODULE__{mode: :passive} = conn, byte_count, timeout) do
    case conn.transport.recv(conn.socket, byte_count, timeout) do
      {:ok, data} ->
        maybe_concat_and_handle_new_data(conn, data)

      {:error, %TransportError{reason: :closed}} ->
        handle_closed(conn)

      {:error, error} ->
        {:error, %{conn | state: :closed}, error, _responses = []}
    end
  catch
    :throw, {:mint, conn, error, responses} -> {:error, conn, error, responses}
  end

  def recv(_conn, _byte_count, _timeout) do
    raise ArgumentError,
          "can't use recv/3 to synchronously receive data when the mode is :active. " <>
            "Use Mint.HTTP.set_mode/2 to set the connection to passive mode"
  end

  @doc """
  See `Mint.HTTP.set_mode/2`.
  """
  @impl true
  @spec set_mode(t(), :active | :passive) :: {:ok, t()} | {:error, Types.error()}
  def set_mode(%__MODULE__{} = conn, mode) when mode in [:active, :passive] do
    active =
      case mode do
        :active -> :once
        :passive -> false
      end

    with :ok <- conn.transport.setopts(conn.socket, active: active) do
      {:ok, put_in(conn.mode, mode)}
    end
  end

  @doc """
  See `Mint.HTTP.controlling_process/2`.
  """
  @impl true
  @spec controlling_process(t(), pid()) :: {:ok, t()} | {:error, Types.error()}
  def controlling_process(%__MODULE__{} = conn, new_pid) when is_pid(new_pid) do
    with :ok <- conn.transport.controlling_process(conn.socket, new_pid) do
      {:ok, conn}
    end
  end

  @doc """
  See `Mint.HTTP.put_private/3`.
  """
  @impl true
  @spec put_private(t(), atom(), term()) :: t()
  def put_private(%Mint.HTTP2{private: private} = conn, key, value) when is_atom(key) do
    %{conn | private: Map.put(private, key, value)}
  end

  @doc """
  See `Mint.HTTP.get_private/3`.
  """
  @impl true
  @spec get_private(t(), atom(), term()) :: term()
  def get_private(%Mint.HTTP2{private: private} = _conn, key, default \\ nil) when is_atom(key) do
    Map.get(private, key, default)
  end

  @doc """
  See `Mint.HTTP.delete_private/2`.
  """
  @impl true
  @spec delete_private(t(), atom()) :: t()
  def delete_private(%Mint.HTTP2{private: private} = conn, key) when is_atom(key) do
    %{conn | private: Map.delete(private, key)}
  end

  # http://httpwg.org/specs/rfc7540.html#rfc.section.6.5
  # SETTINGS parameters are not negotiated. We keep client settings and server settings separate.
  @doc false
  @impl true
  @spec initiate(
          Types.scheme(),
          Types.socket(),
          String.t(),
          :inet.port_number(),
          keyword()
        ) :: {:ok, t()} | {:error, Types.error()}
  def initiate(scheme, socket, hostname, port, opts) do
    transport = scheme_to_transport(scheme)
    mode = Keyword.get(opts, :mode, :active)
    client_settings_params = Keyword.get(opts, :client_settings, [])
    validate_settings!(client_settings_params)

    unless mode in [:active, :passive] do
      raise ArgumentError,
            "the :mode option must be either :active or :passive, got: #{inspect(mode)}"
    end

    conn = %Mint.HTTP2{
      hostname: hostname,
      port: port,
      transport: scheme_to_transport(scheme),
      socket: socket,
      mode: mode,
      scheme: Atom.to_string(scheme),
      state: :open
    }

    with :ok <- inet_opts(transport, socket),
         client_settings = settings(stream_id: 0, params: client_settings_params),
         preface = [@connection_preface, Frame.encode(client_settings)],
         :ok <- transport.send(socket, preface),
         conn = update_in(conn.client_settings_queue, &:queue.in(client_settings_params, &1)),
         {:ok, server_settings, buffer, socket} <- receive_server_settings(transport, socket),
         server_settings_ack =
           settings(stream_id: 0, params: [], flags: set_flags(:settings, [:ack])),
         :ok <- transport.send(socket, Frame.encode(server_settings_ack)),
         conn = put_in(conn.buffer, buffer),
         conn = put_in(conn.socket, socket),
         conn = apply_server_settings(conn, settings(server_settings, :params)),
         :ok <- if(mode == :active, do: transport.setopts(socket, active: :once), else: :ok) do
      {:ok, conn}
    else
      error ->
        transport.close(socket)
        error
    end
  catch
    {:mint, conn, error} ->
      {:ok, _conn} = close(conn)
      {:error, error}
  end

  @doc """
  See `Mint.HTTP.get_socket/1`.
  """
  @impl true
  @spec get_socket(t()) :: Mint.Types.socket()
  def get_socket(%Mint.HTTP2{socket: socket} = _conn) do
    socket
  end

  @doc """
  See `Mint.HTTP.get_proxy_headers/1`.
  """
  if Version.compare(System.version(), "1.7.0") in [:eq, :gt] do
    @doc since: "1.4.0"
  end

  @impl true
  @spec get_proxy_headers(t()) :: Mint.Types.headers()
  def get_proxy_headers(%__MODULE__{proxy_headers: proxy_headers}), do: proxy_headers

  ## Helpers

  defp handle_closed(conn) do
    conn = put_in(conn.state, :closed)

    if conn.open_client_stream_count > 0 or conn.open_server_stream_count > 0 do
      error = conn.transport.wrap_error(:closed)
      {:error, conn, error, _responses = []}
    else
      {:ok, conn, _responses = []}
    end
  end

  defp negotiate(address, port, :http, transport_opts) do
    # We don't support protocol negotiation for TCP connections
    # so currently we just assume the HTTP/2 protocol
    transport = scheme_to_transport(:http)
    transport.connect(address, port, transport_opts)
  end

  defp negotiate(address, port, :https, transport_opts) do
    transport = scheme_to_transport(:https)

    with {:ok, socket} <- transport.connect(address, port, transport_opts),
         {:ok, protocol} <- transport.negotiated_protocol(socket) do
      if protocol == "h2" do
        {:ok, socket}
      else
        {:error, transport.wrap_error({:bad_alpn_protocol, protocol})}
      end
    end
  end

  defp receive_server_settings(transport, socket) do
    case recv_next_frame(transport, socket, _buffer = "") do
      {:ok, settings(), _buffer, _socket} = result ->
        result

      {:ok, goaway(error_code: error_code, debug_data: debug_data), _buffer, _socket} ->
        error = wrap_error({:server_closed_connection, error_code, debug_data})
        {:error, error}

      {:ok, frame, _buffer, _socket} ->
        debug_data = "received invalid frame #{elem(frame, 0)} during handshake"
        {:error, wrap_error({:protocol_error, debug_data})}

      {:error, error} ->
        {:error, error}
    end
  end

  defp recv_next_frame(transport, socket, buffer) do
    case Frame.decode_next(buffer, @default_max_frame_size) do
      {:ok, frame, rest} ->
        {:ok, frame, rest, socket}

      :more ->
        with {:ok, data} <- transport.recv(socket, 0, _timeout = 10_000) do
          data = maybe_concat(buffer, data)
          recv_next_frame(transport, socket, data)
        end

      {:error, {kind, _info} = reason} when kind in [:frame_size_error, :protocol_error] ->
        {:error, wrap_error(reason)}
    end
  end

  defp open_stream(conn) do
    max_concurrent_streams = conn.server_settings.max_concurrent_streams

    if conn.open_client_stream_count >= max_concurrent_streams do
      throw({:mint, conn, wrap_error(:too_many_concurrent_requests)})
    end

    stream = %{
      id: conn.next_stream_id,
      ref: make_ref(),
      state: :idle,
      window_size: conn.server_settings.initial_window_size,
      received_first_headers?: false
    }

    conn = put_in(conn.streams[stream.id], stream)
    conn = put_in(conn.ref_to_stream_id[stream.ref], stream.id)
    conn = update_in(conn.next_stream_id, &(&1 + 2))
    {conn, stream.id, stream.ref}
  end

  defp encode_stream_body_request_payload(conn, stream_id, :eof) do
    encode_data(conn, stream_id, "", [:end_stream])
  end

  defp encode_stream_body_request_payload(conn, stream_id, {:eof, trailing_headers}) do
    lowered_headers = downcase_header_names(trailing_headers)

    if unallowed_trailing_header = Util.find_unallowed_trailing_header(lowered_headers) do
      error = wrap_error({:unallowed_trailing_header, unallowed_trailing_header})
      throw({:mint, conn, error})
    end

    encode_headers(conn, stream_id, trailing_headers, [:end_headers, :end_stream])
  end

  defp encode_stream_body_request_payload(conn, stream_id, iodata) do
    encode_data(conn, stream_id, iodata, [])
  end

  defp encode_request_payload(conn, stream_id, headers, :stream) do
    encode_headers(conn, stream_id, headers, [:end_headers])
  end

  defp encode_request_payload(conn, stream_id, headers, nil) do
    encode_headers(conn, stream_id, headers, [:end_stream, :end_headers])
  end

  defp encode_request_payload(conn, stream_id, headers, iodata) do
    {conn, headers_payload} = encode_headers(conn, stream_id, headers, [:end_headers])
    {conn, data_payload} = encode_data(conn, stream_id, iodata, [:end_stream])
    {conn, [headers_payload, data_payload]}
  end

  defp encode_headers(conn, stream_id, headers, enabled_flags) do
    assert_headers_smaller_than_max_header_list_size(conn, headers)

    headers = Enum.map(headers, fn {name, value} -> {:store_name, name, value} end)
    {hbf, conn} = get_and_update_in(conn.encode_table, &HPAX.encode(headers, &1))

    payload = headers_to_encoded_frames(conn, stream_id, hbf, enabled_flags)

    stream_state = if :end_stream in enabled_flags, do: :half_closed_local, else: :open

    conn = put_in(conn.streams[stream_id].state, stream_state)
    conn = update_in(conn.open_client_stream_count, &(&1 + 1))

    {conn, payload}
  end

  defp assert_headers_smaller_than_max_header_list_size(
         %{server_settings: %{max_header_list_size: :infinity}},
         _headers
       ) do
    :ok
  end

  defp assert_headers_smaller_than_max_header_list_size(conn, headers) do
    # The value is based on the uncompressed size of header fields, including the length
    # of the name and value in octets plus an overhead of 32 octets for each header field.
    total_size =
      Enum.reduce(headers, 0, fn {name, value}, acc ->
        acc + byte_size(name) + byte_size(value) + 32
      end)

    max_header_list_size = conn.server_settings.max_header_list_size

    if total_size <= max_header_list_size do
      :ok
    else
      error = wrap_error({:max_header_list_size_exceeded, total_size, max_header_list_size})
      throw({:mint, conn, error})
    end
  end

  defp headers_to_encoded_frames(conn, stream_id, hbf, enabled_flags) do
    if IO.iodata_length(hbf) > conn.server_settings.max_frame_size do
      hbf
      |> IO.iodata_to_binary()
      |> split_payload_in_chunks(conn.server_settings.max_frame_size)
      |> split_hbf_to_encoded_frames(stream_id, enabled_flags)
    else
      Frame.encode(
        headers(stream_id: stream_id, hbf: hbf, flags: set_flags(:headers, enabled_flags))
      )
    end
  end

  defp split_hbf_to_encoded_frames({[first_chunk | chunks], last_chunk}, stream_id, enabled_flags) do
    flags = set_flags(:headers, enabled_flags -- [:end_headers])
    first_frame = Frame.encode(headers(stream_id: stream_id, hbf: first_chunk, flags: flags))

    middle_frames =
      Enum.map(chunks, fn chunk ->
        Frame.encode(continuation(stream_id: stream_id, hbf: chunk))
      end)

    flags =
      if :end_headers in enabled_flags do
        set_flags(:continuation, [:end_headers])
      else
        set_flags(:continuation, [])
      end

    last_frame = Frame.encode(continuation(stream_id: stream_id, hbf: last_chunk, flags: flags))

    [first_frame, middle_frames, last_frame]
  end

  defp encode_data(conn, stream_id, data, enabled_flags) do
    stream = fetch_stream!(conn, stream_id)

    if stream.state != :open do
      error = wrap_error(:request_is_not_streaming)
      throw({:mint, conn, error})
    end

    data_size = IO.iodata_length(data)

    cond do
      data_size > stream.window_size ->
        throw({:mint, conn, wrap_error({:exceeds_window_size, :request, stream.window_size})})

      data_size > conn.window_size ->
        throw({:mint, conn, wrap_error({:exceeds_window_size, :connection, conn.window_size})})

      # If the data size is greater than the max frame size, we chunk automatically based
      # on the max frame size.
      data_size > conn.server_settings.max_frame_size ->
        {chunks, last_chunk} =
          data
          |> IO.iodata_to_binary()
          |> split_payload_in_chunks(conn.server_settings.max_frame_size)

        {encoded_chunks, conn} =
          Enum.map_reduce(chunks, conn, fn chunk, acc ->
            {acc, encoded} = encode_data_chunk(acc, stream_id, chunk, [])
            {encoded, acc}
          end)

        {conn, encoded_last_chunk} = encode_data_chunk(conn, stream_id, last_chunk, enabled_flags)
        {conn, [encoded_chunks, encoded_last_chunk]}

      true ->
        encode_data_chunk(conn, stream_id, data, enabled_flags)
    end
  end

  defp encode_data_chunk(%__MODULE__{} = conn, stream_id, chunk, enabled_flags)
       when is_integer(stream_id) and is_list(enabled_flags) do
    chunk_size = IO.iodata_length(chunk)
    frame = data(stream_id: stream_id, flags: set_flags(:data, enabled_flags), data: chunk)
    conn = update_in(conn.streams[stream_id].window_size, &(&1 - chunk_size))
    conn = update_in(conn.window_size, &(&1 - chunk_size))

    conn =
      if :end_stream in enabled_flags do
        put_in(conn.streams[stream_id].state, :half_closed_local)
      else
        conn
      end

    {conn, Frame.encode(frame)}
  end

  defp split_payload_in_chunks(binary, chunk_size),
    do: split_payload_in_chunks(binary, chunk_size, [])

  defp split_payload_in_chunks(chunk, chunk_size, acc) when byte_size(chunk) <= chunk_size do
    {Enum.reverse(acc), chunk}
  end

  defp split_payload_in_chunks(binary, chunk_size, acc) do
    <<chunk::size(chunk_size)-binary, rest::binary>> = binary
    split_payload_in_chunks(rest, chunk_size, [chunk | acc])
  end

  defp send_ping(conn, payload) do
    frame = Frame.ping(stream_id: 0, opaque_data: payload)
    conn = send!(conn, Frame.encode(frame))
    ref = make_ref()
    conn = update_in(conn.ping_queue, &:queue.in({ref, payload}, &1))
    {conn, ref}
  end

  defp send_settings(conn, settings) do
    validate_settings!(settings)
    frame = settings(stream_id: 0, params: settings)
    conn = send!(conn, Frame.encode(frame))
    conn = update_in(conn.client_settings_queue, &:queue.in(settings, &1))
    conn
  end

  defp validate_settings!(settings) do
    unless Keyword.keyword?(settings) do
      raise ArgumentError, "settings must be a keyword list"
    end

    Enum.each(settings, fn
      {:header_table_size, value} ->
        unless is_integer(value) do
          raise ArgumentError, ":header_table_size must be an integer, got: #{inspect(value)}"
        end

      {:enable_push, value} ->
        unless is_boolean(value) do
          raise ArgumentError, ":enable_push must be a boolean, got: #{inspect(value)}"
        end

      {:max_concurrent_streams, value} ->
        unless is_integer(value) do
          raise ArgumentError,
                ":max_concurrent_streams must be an integer, got: #{inspect(value)}"
        end

      {:initial_window_size, value} ->
        unless is_integer(value) and value <= @max_window_size do
          raise ArgumentError,
                ":initial_window_size must be an integer < #{@max_window_size}, " <>
                  "got: #{inspect(value)}"
        end

      {:max_frame_size, value} ->
        unless is_integer(value) and value in @valid_max_frame_size_range do
          raise ArgumentError,
                ":max_frame_size must be an integer in #{inspect(@valid_max_frame_size_range)}, " <>
                  "got: #{inspect(value)}"
        end

      {:max_header_list_size, value} ->
        unless is_integer(value) do
          raise ArgumentError, ":max_header_list_size must be an integer, got: #{inspect(value)}"
        end

      {:enable_connect_protocol, value} ->
        unless is_boolean(value) do
          raise ArgumentError,
                ":enable_connect_protocol must be a boolean, got: #{inspect(value)}"
        end

      {name, _value} ->
        raise ArgumentError, "unknown setting parameter #{inspect(name)}"
    end)
  end

  defp downcase_header_names(headers) do
    for {name, value} <- headers, do: {Util.downcase_ascii(name), value}
  end

  defp add_default_headers(headers, body) do
    headers
    |> Util.put_new_header("user-agent", @user_agent)
    |> add_default_content_length_header(body)
  end

  defp add_default_content_length_header(headers, body) when body in [nil, :stream] do
    headers
  end

  defp add_default_content_length_header(headers, body) do
    Util.put_new_header_lazy(headers, "content-length", fn ->
      body |> IO.iodata_length() |> Integer.to_string()
    end)
  end

  defp add_pseudo_headers(headers, conn, method, path) do
    if String.upcase(method) == "CONNECT" do
      [
        {":method", method},
        {":authority", authority_pseudo_header(conn.scheme, conn.port, conn.hostname)}
        | headers
      ]
    else
      [
        {":method", method},
        {":path", path},
        {":scheme", conn.scheme},
        {":authority", authority_pseudo_header(conn.scheme, conn.port, conn.hostname)}
        | headers
      ]
    end
  end

  defp sort_pseudo_headers_to_front(headers) do
    Enum.sort_by(headers, fn {key, _value} ->
      not String.starts_with?(key, ":")
    end)
  end

  ## Frame handling

  defp maybe_concat_and_handle_new_data(conn, data) do
    data = maybe_concat(conn.buffer, data)
    {conn, responses} = handle_new_data(conn, data, [])
    {:ok, conn, Enum.reverse(responses)}
  end

  defp handle_new_data(%Mint.HTTP2{} = conn, data, responses) do
    case Frame.decode_next(data, conn.client_settings.max_frame_size) do
      {:ok, frame, rest} ->
        assert_valid_frame(conn, frame)
        {conn, responses} = handle_frame(conn, frame, responses)
        handle_new_data(conn, rest, responses)

      :more ->
        conn = put_in(conn.buffer, data)
        handle_consumed_all_frames(conn, responses)

      {:error, :payload_too_big} ->
        # TODO: sometimes, this could be handled with RST_STREAM instead of a GOAWAY frame (for
        # example, if the payload of a DATA frame is too big).
        # http://httpwg.org/specs/rfc7540.html#rfc.section.4.2
        debug_data = "frame payload exceeds connection's max frame size"
        send_connection_error!(conn, :frame_size_error, debug_data)

      {:error, {:frame_size_error, frame}} ->
        debug_data = "error with size of frame: #{inspect(frame)}"
        send_connection_error!(conn, :frame_size_error, debug_data)

      {:error, {:protocol_error, info}} ->
        debug_data = "error when decoding frame: #{inspect(info)}"
        send_connection_error!(conn, :protocol_error, debug_data)
    end
  catch
    :throw, {:mint, conn, error} -> throw({:mint, conn, error, responses})
    :throw, {:mint, _conn, _error, _responses} = thrown -> throw(thrown)
  end

  defp handle_consumed_all_frames(%{state: state} = conn, responses) do
    case state do
      # TODO: should we do something with the debug data here, like logging it?
      {:goaway, :no_error, _debug_data} ->
        {conn, responses}

      {:goaway, error_code, debug_data} ->
        error = wrap_error({:server_closed_connection, error_code, debug_data})
        throw({:mint, conn, error, responses})

      _ ->
        {conn, responses}
    end
  end

  defp assert_valid_frame(_conn, unknown()) do
    # Unknown frames MUST be ignored:
    # https://datatracker.ietf.org/doc/html/rfc7540#section-4.1
    :ok
  end

  defp assert_valid_frame(conn, frame) do
    stream_id = elem(frame, 1)

    assert_frame_on_right_level(conn, elem(frame, 0), stream_id)
    assert_stream_id_is_allowed(conn, stream_id)
    assert_frame_doesnt_interrupt_header_streaming(conn, frame)
  end

  # http://httpwg.org/specs/rfc7540.html#HttpSequence
  defp assert_frame_doesnt_interrupt_header_streaming(conn, frame) do
    case {conn.headers_being_processed, frame} do
      {nil, continuation()} ->
        debug_data = "CONTINUATION received outside of headers streaming"
        send_connection_error!(conn, :protocol_error, debug_data)

      {nil, _frame} ->
        :ok

      {{stream_id, _, _}, continuation(stream_id: stream_id)} ->
        :ok

      _other ->
        debug_data =
          "headers are streaming but got a #{inspect(elem(frame, 0))} frame instead " <>
            "of a CONTINUATION frame"

        send_connection_error!(conn, :protocol_error, debug_data)
    end
  end

  stream_level_frames = [:data, :headers, :priority, :rst_stream, :push_promise, :continuation]
  connection_level_frames = [:settings, :ping, :goaway]

  defp assert_frame_on_right_level(conn, frame, _stream_id = 0)
       when frame in unquote(stream_level_frames) do
    debug_data = "frame #{inspect(frame)} not allowed at the connection level (stream_id = 0)"
    send_connection_error!(conn, :protocol_error, debug_data)
  end

  defp assert_frame_on_right_level(conn, frame, stream_id)
       when frame in unquote(connection_level_frames) and stream_id != 0 do
    debug_data = "frame #{inspect(frame)} only allowed at the connection level"
    send_connection_error!(conn, :protocol_error, debug_data)
  end

  defp assert_frame_on_right_level(_conn, _frame, _stream_id) do
    :ok
  end

  defp assert_stream_id_is_allowed(conn, stream_id) do
    if Integer.is_odd(stream_id) and stream_id >= conn.next_stream_id do
      debug_data = "frame with stream ID #{inspect(stream_id)} has not been opened yet"
      send_connection_error!(conn, :protocol_error, debug_data)
    else
      :ok
    end
  end

  for frame_name <- stream_level_frames ++ connection_level_frames ++ [:window_update, :unknown] do
    function_name = :"handle_#{frame_name}"

    defp handle_frame(conn, Frame.unquote(frame_name)() = frame, responses) do
      unquote(function_name)(conn, frame, responses)
    end
  end

  defp handle_unknown(conn, _frame, responses) do
    # Implementations MUST ignore and discard any frame that has a type that is unknown.
    # see: https://datatracker.ietf.org/doc/html/rfc7540#section-4.1

    {conn, responses}
  end

  # DATA

  defp handle_data(conn, frame, responses) do
    data(stream_id: stream_id, flags: flags, data: data, padding: padding) = frame

    # Regardless of whether we have the stream or not, we need to abide by flow
    # control rules so we still refill the client window for the stream_id we got.
    window_size_increment = byte_size(data) + byte_size(padding || "")

    conn =
      if window_size_increment > 0 do
        refill_client_windows(conn, stream_id, window_size_increment)
      else
        conn
      end

    case Map.fetch(conn.streams, stream_id) do
      {:ok, stream} ->
        assert_stream_in_state(conn, stream, [:open, :half_closed_local])
        responses = [{:data, stream.ref, data} | responses]

        if flag_set?(flags, :data, :end_stream) do
          conn = close_stream!(conn, stream.id, :no_error)
          {conn, [{:done, stream.ref} | responses]}
        else
          {conn, responses}
        end

      :error ->
        _ = Logger.debug(fn -> "Received DATA frame on closed stream ID #{stream_id}" end)
        {conn, responses}
    end
  end

  defp refill_client_windows(conn, stream_id, data_size) do
    connection_frame = window_update(stream_id: 0, window_size_increment: data_size)
    stream_frame = window_update(stream_id: stream_id, window_size_increment: data_size)

    if open?(conn) do
      send!(conn, [Frame.encode(connection_frame), Frame.encode(stream_frame)])
    else
      conn
    end
  end

  # HEADERS

  defp handle_headers(conn, frame, responses) do
    headers(stream_id: stream_id, flags: flags, hbf: hbf) = frame

    stream = Map.get(conn.streams, stream_id)
    end_stream? = flag_set?(flags, :headers, :end_stream)

    if stream do
      assert_stream_in_state(conn, stream, [:open, :half_closed_local, :reserved_remote])
    end

    if flag_set?(flags, :headers, :end_headers) do
      decode_hbf_and_add_responses(conn, responses, hbf, stream, end_stream?)
    else
      callback = &decode_hbf_and_add_responses(&1, &2, &3, &4, end_stream?)
      conn = put_in(conn.headers_being_processed, {stream_id, hbf, callback})
      {conn, responses}
    end
  end

  # Here, "stream" can be nil in case the stream was closed. In that case, we
  # still need to process the hbf so that the HPACK table is updated, but then
  # we don't add any responses.
  defp decode_hbf_and_add_responses(conn, responses, hbf, stream, end_stream?) do
    {conn, headers} = decode_hbf(conn, hbf)

    if stream do
      handle_decoded_headers_for_stream(conn, responses, stream, headers, end_stream?)
    else
      _ = Logger.debug(fn -> "Received HEADERS frame on closed stream ID" end)
      {conn, responses}
    end
  end

  defp handle_decoded_headers_for_stream(conn, responses, stream, headers, end_stream?) do
    %{ref: ref, received_first_headers?: received_first_headers?} = stream

    case headers do
      # Interim response (1xx), which is made of only one HEADERS plus zero or more CONTINUATIONs.
      # There can be zero or more interim responses before a "proper" response.
      # https://httpwg.org/specs/rfc9113.html#HttpFraming
      [{":status", <<?1, _, _>> = status} | headers] ->
        cond do
          received_first_headers? ->
            conn = close_stream!(conn, stream.id, :protocol_error)

            debug_data =
              "informational response (1xx) must appear before final response, got a #{status} status"

            error = wrap_error({:protocol_error, debug_data})
            responses = [{:error, stream.ref, error} | responses]
            {conn, responses}

          end_stream? ->
            conn = close_stream!(conn, stream.id, :protocol_error)
            debug_data = "informational response (1xx) must not have the END_STREAM flag set"
            error = wrap_error({:protocol_error, debug_data})
            responses = [{:error, stream.ref, error} | responses]
            {conn, responses}

          true ->
            assert_stream_in_state(conn, stream, [:open, :half_closed_local])
            status = String.to_integer(status)
            headers = join_cookie_headers(headers)
            new_responses = [{:headers, ref, headers}, {:status, ref, status} | responses]
            {conn, new_responses}
        end

      [{":status", status} | headers] when not received_first_headers? ->
        conn = put_in(conn.streams[stream.id].received_first_headers?, true)
        status = String.to_integer(status)
        headers = join_cookie_headers(headers)
        new_responses = [{:headers, ref, headers}, {:status, ref, status} | responses]

        cond do
          # :reserved_remote means that this was a promised stream. As soon as headers come,
          # the stream goes in the :half_closed_local state (unless it's not allowed because
          # of the client's max concurrent streams limit, or END_STREAM is set).
          stream.state == :reserved_remote ->
            cond do
              conn.open_server_stream_count >= conn.client_settings.max_concurrent_streams ->
                conn = close_stream!(conn, stream.id, :refused_stream)
                {conn, responses}

              end_stream? ->
                conn = close_stream!(conn, stream.id, :no_error)
                {conn, [{:done, ref} | new_responses]}

              true ->
                conn = update_in(conn.open_server_stream_count, &(&1 + 1))
                conn = put_in(conn.streams[stream.id].state, :half_closed_local)
                {conn, new_responses}
            end

          end_stream? ->
            conn = close_stream!(conn, stream.id, :no_error)
            {conn, [{:done, ref} | new_responses]}

          true ->
            {conn, new_responses}
        end

      # Trailing headers. We don't care about the :status header here.
      headers when received_first_headers? ->
        if end_stream? do
          conn = close_stream!(conn, stream.id, :no_error)
          headers = headers |> Util.remove_unallowed_trailing_headers() |> join_cookie_headers()
          {conn, [{:done, ref}, {:headers, ref, headers} | responses]}
        else
          # Trailing headers must set the END_STREAM flag because they're
          # the last thing allowed on the stream (other than RST_STREAM and
          # the usual frames).
          conn = close_stream!(conn, stream.id, :protocol_error)
          debug_data = "trailing headers didn't set the END_STREAM flag"
          error = wrap_error({:protocol_error, debug_data})
          responses = [{:error, stream.ref, error} | responses]
          {conn, responses}
        end

      # Non-trailing headers need to have a :status header, otherwise
      # it's a protocol error.
      _headers ->
        conn = close_stream!(conn, stream.id, :protocol_error)
        error = wrap_error(:missing_status_header)
        responses = [{:error, stream.ref, error} | responses]
        {conn, responses}
    end
  end

  defp decode_hbf(conn, hbf) do
    case HPAX.decode(hbf, conn.decode_table) do
      {:ok, headers, decode_table} ->
        conn = put_in(conn.decode_table, decode_table)
        {conn, headers}

      {:error, reason} ->
        debug_data = "unable to decode headers: #{inspect(reason)}"
        send_connection_error!(conn, :compression_error, debug_data)
    end
  end

  # If the port is the default for the scheme, don't add it to the :authority pseudo-header
  defp authority_pseudo_header(scheme, port, hostname) do
    if URI.default_port(scheme) == port do
      hostname
    else
      "#{hostname}:#{port}"
    end
  end

  defp join_cookie_headers(headers) do
    # If we have 0 or 1 Cookie headers, we just use the old list of headers.
    case Enum.split_with(headers, fn {name, _value} -> Util.downcase_ascii(name) == "cookie" end) do
      {[], _headers} ->
        headers

      {[_], _headers} ->
        headers

      {cookies, headers} ->
        cookie = Enum.map_join(cookies, "; ", fn {_name, value} -> value end)
        [{"cookie", cookie} | headers]
    end
  end

  # PRIORITY

  # For now we ignore all PRIORITY frames. This shouldn't cause practical trouble.
  defp handle_priority(conn, frame, responses) do
    _ = Logger.warn(fn -> "Ignoring PRIORITY frame: #{inspect(frame)}" end)
    {conn, responses}
  end

  # RST_STREAM

  defp handle_rst_stream(conn, frame, responses) do
    rst_stream(stream_id: stream_id, error_code: error_code) = frame

    # If we receive RST_STREAM on a closed stream, we ignore it.
    case Map.fetch(conn.streams, stream_id) do
      {:ok, stream} ->
        # If we receive RST_STREAM then the stream is definitely closed.
        # We won't send anything else on the stream so we can simply delete
        # it, so that if we get things like DATA on that stream we error out.
        conn = delete_stream(conn, stream)

        if error_code == :no_error do
          {conn, [{:done, stream.ref} | responses]}
        else
          error = wrap_error({:server_closed_request, error_code})
          {conn, [{:error, stream.ref, error} | responses]}
        end

      :error ->
        {conn, responses}
    end
  end

  # SETTINGS

  defp handle_settings(conn, frame, responses) do
    settings(flags: flags, params: params) = frame

    if flag_set?(flags, :settings, :ack) do
      {{:value, params}, conn} = get_and_update_in(conn.client_settings_queue, &:queue.out/1)
      conn = apply_client_settings(conn, params)
      {conn, responses}
    else
      conn = apply_server_settings(conn, params)
      frame = settings(flags: set_flags(:settings, [:ack]), params: [])
      conn = send!(conn, Frame.encode(frame))
      {conn, responses}
    end
  end

  defp apply_server_settings(conn, server_settings) do
    Enum.reduce(server_settings, conn, fn
      {:header_table_size, header_table_size}, conn ->
        update_in(conn.encode_table, &HPAX.resize(&1, header_table_size))

      {:enable_push, enable_push?}, conn ->
        put_in(conn.server_settings.enable_push, enable_push?)

      {:max_concurrent_streams, max_concurrent_streams}, conn ->
        put_in(conn.server_settings.max_concurrent_streams, max_concurrent_streams)

      {:initial_window_size, initial_window_size}, conn ->
        if initial_window_size > @max_window_size do
          debug_data = "INITIAL_WINDOW_SIZE setting of #{initial_window_size} is too big"
          send_connection_error!(conn, :flow_control_error, debug_data)
        end

        update_server_initial_window_size(conn, initial_window_size)

      {:max_frame_size, max_frame_size}, conn ->
        if max_frame_size not in @valid_max_frame_size_range do
          debug_data = "MAX_FRAME_SIZE setting parameter outside of allowed range"
          send_connection_error!(conn, :protocol_error, debug_data)
        end

        put_in(conn.server_settings.max_frame_size, max_frame_size)

      {:max_header_list_size, max_header_list_size}, conn ->
        put_in(conn.server_settings.max_header_list_size, max_header_list_size)

      {:enable_connect_protocol, enable_connect_protocol?}, conn ->
        put_in(conn.server_settings.enable_connect_protocol, enable_connect_protocol?)
    end)
  end

  defp apply_client_settings(conn, client_settings) do
    Enum.reduce(client_settings, conn, fn
      {:max_frame_size, value}, conn ->
        put_in(conn.client_settings.max_frame_size, value)

      {:max_concurrent_streams, value}, conn ->
        put_in(conn.client_settings.max_concurrent_streams, value)

      {:enable_push, value}, conn ->
        put_in(conn.client_settings.enable_push, value)
    end)
  end

  defp update_server_initial_window_size(conn, new_iws) do
    diff = new_iws - conn.server_settings.initial_window_size

    conn =
      update_in(conn.streams, fn streams ->
        for {stream_id, stream} <- streams,
            stream.state in [:open, :half_closed_remote],
            into: streams do
          window_size = stream.window_size + diff

          if window_size > @max_window_size do
            debug_data =
              "INITIAL_WINDOW_SIZE parameter of #{window_size} makes some window sizes too big"

            send_connection_error!(conn, :flow_control_error, debug_data)
          end

          {stream_id, %{stream | window_size: window_size}}
        end
      end)

    put_in(conn.server_settings.initial_window_size, new_iws)
  end

  # PUSH_PROMISE

  defp handle_push_promise(
         %Mint.HTTP2{client_settings: %{enable_push: false}} = conn,
         push_promise(),
         _responses
       ) do
    debug_data = "received PUSH_PROMISE frame when SETTINGS_ENABLE_PUSH was false"
    send_connection_error!(conn, :protocol_error, debug_data)
  end

  defp handle_push_promise(conn, push_promise() = frame, responses) do
    push_promise(
      stream_id: stream_id,
      flags: flags,
      promised_stream_id: promised_stream_id,
      hbf: hbf
    ) = frame

    assert_valid_promised_stream_id(conn, promised_stream_id)

    stream = fetch_stream!(conn, stream_id)
    assert_stream_in_state(conn, stream, [:open, :half_closed_local])

    if flag_set?(flags, :push_promise, :end_headers) do
      decode_push_promise_headers_and_add_response(
        conn,
        responses,
        hbf,
        stream,
        promised_stream_id
      )
    else
      callback = &decode_push_promise_headers_and_add_response(&1, &2, &3, &4, promised_stream_id)
      conn = put_in(conn.headers_being_processed, {stream_id, hbf, callback})
      {conn, responses}
    end
  end

  defp decode_push_promise_headers_and_add_response(
         conn,
         responses,
         hbf,
         stream,
         promised_stream_id
       ) do
    {conn, headers} = decode_hbf(conn, hbf)

    promised_stream = %{
      id: promised_stream_id,
      ref: make_ref(),
      state: :reserved_remote,
      window_size: conn.server_settings.initial_window_size,
      received_first_headers?: false
    }

    conn = put_in(conn.streams[promised_stream.id], promised_stream)
    new_response = {:push_promise, stream.ref, promised_stream.ref, headers}
    {conn, [new_response | responses]}
  end

  defp assert_valid_promised_stream_id(conn, promised_stream_id) do
    cond do
      not is_integer(promised_stream_id) or Integer.is_odd(promised_stream_id) ->
        debug_data = "invalid promised stream ID: #{inspect(promised_stream_id)}"
        send_connection_error!(conn, :protocol_error, debug_data)

      Map.has_key?(conn.streams, promised_stream_id) ->
        debug_data =
          "stream with ID #{inspect(promised_stream_id)} already exists and can't be " <>
            "reserved by the server"

        send_connection_error!(conn, :protocol_error, debug_data)

      true ->
        :ok
    end
  end

  # PING

  defp handle_ping(conn, Frame.ping() = frame, responses) do
    Frame.ping(flags: flags, opaque_data: opaque_data) = frame

    if flag_set?(flags, :ping, :ack) do
      handle_ping_ack(conn, opaque_data, responses)
    else
      ack = Frame.ping(stream_id: 0, flags: set_flags(:ping, [:ack]), opaque_data: opaque_data)
      conn = send!(conn, Frame.encode(ack))
      {conn, responses}
    end
  end

  defp handle_ping_ack(conn, opaque_data, responses) do
    case :queue.peek(conn.ping_queue) do
      {:value, {ref, ^opaque_data}} ->
        conn = update_in(conn.ping_queue, &:queue.drop/1)
        {conn, [{:pong, ref} | responses]}

      {:value, _} ->
        _ = Logger.warn("Received PING ack that doesn't match next PING request in the queue")
        {conn, responses}

      :empty ->
        _ = Logger.warn("Received PING ack but no PING requests are pending")
        {conn, responses}
    end
  end

  # GOAWAY

  defp handle_goaway(conn, frame, responses) do
    goaway(
      last_stream_id: last_stream_id,
      error_code: error_code,
      debug_data: debug_data
    ) = frame

    # We gather all the unprocessed requests and form {:error, _, _} tuples for each one.
    # At the same time, we delete all the unprocessed requests from the stream set.
    {unprocessed_request_responses, conn} =
      Enum.flat_map_reduce(conn.streams, conn, fn
        {stream_id, _stream}, conn_acc when stream_id <= last_stream_id ->
          {[], conn_acc}

        {_stream_id, stream}, conn_acc ->
          conn_acc = delete_stream(conn_acc, stream)
          {[{:error, stream.ref, wrap_error(:unprocessed)}], conn_acc}
      end)

    conn = put_in(conn.state, {:goaway, error_code, debug_data})
    {conn, unprocessed_request_responses ++ responses}
  end

  # WINDOW_UPDATE

  defp handle_window_update(
         conn,
         window_update(stream_id: 0, window_size_increment: wsi),
         responses
       ) do
    new_window_size = conn.window_size + wsi

    if new_window_size > @max_window_size do
      send_connection_error!(conn, :flow_control_error, "window size too big")
    else
      conn = put_in(conn.window_size, new_window_size)
      {conn, responses}
    end
  end

  defp handle_window_update(
         conn,
         window_update(stream_id: stream_id, window_size_increment: wsi),
         responses
       ) do
    stream = fetch_stream!(conn, stream_id)
    new_window_size = conn.streams[stream_id].window_size + wsi

    if new_window_size > @max_window_size do
      conn = close_stream!(conn, stream_id, :flow_control_error)
      error = wrap_error({:flow_control_error, "window size too big"})
      {conn, [{:error, stream.ref, error} | responses]}
    else
      conn = put_in(conn.streams[stream_id].window_size, new_window_size)
      {conn, responses}
    end
  end

  # CONTINUATION

  defp handle_continuation(conn, frame, responses) do
    continuation(stream_id: stream_id, flags: flags, hbf: hbf_chunk) = frame
    stream = Map.get(conn.streams, stream_id)

    if stream do
      assert_stream_in_state(conn, stream, [:open, :half_closed_local, :reserved_remote])
    end

    {^stream_id, hbf_acc, callback} = conn.headers_being_processed

    if flag_set?(flags, :continuation, :end_headers) do
      hbf = IO.iodata_to_binary([hbf_acc, hbf_chunk])
      conn = put_in(conn.headers_being_processed, nil)
      callback.(conn, responses, hbf, stream)
    else
      conn = put_in(conn.headers_being_processed, {stream_id, [hbf_acc, hbf_chunk], callback})
      {conn, responses}
    end
  end

  ## General helpers

  defp send_connection_error!(conn, error_code, debug_data) do
    frame =
      goaway(stream_id: 0, last_stream_id: 2, error_code: error_code, debug_data: debug_data)

    conn = send!(conn, Frame.encode(frame))
    _ = conn.transport.close(conn.socket)
    conn = put_in(conn.state, :closed)
    throw({:mint, conn, wrap_error({error_code, debug_data})})
  end

  defp close_stream!(conn, stream_id, error_code) do
    stream = Map.fetch!(conn.streams, stream_id)

    # First of all we send a RST_STREAM with the given error code so that we
    # move the stream to the :closed state (that is, we remove it).
    rst_stream_frame = rst_stream(stream_id: stream_id, error_code: error_code)

    conn =
      if open?(conn) do
        send!(conn, Frame.encode(rst_stream_frame))
      else
        conn
      end

    delete_stream(conn, stream)
  end

  defp delete_stream(conn, stream) do
    conn = update_in(conn.streams, &Map.delete(&1, stream.id))
    conn = update_in(conn.ref_to_stream_id, &Map.delete(&1, stream.ref))

    stream_open? = stream.state in [:open, :half_closed_local, :half_closed_remote]

    conn =
      cond do
        # Stream initiated by the client.
        stream_open? and Integer.is_odd(stream.id) ->
          update_in(conn.open_client_stream_count, &(&1 - 1))

        # Stream initiated by the server.
        stream_open? and Integer.is_even(stream.id) ->
          update_in(conn.open_server_stream_count, &(&1 - 1))

        true ->
          conn
      end

    conn
  end

  defp fetch_stream!(conn, stream_id) do
    case Map.fetch(conn.streams, stream_id) do
      {:ok, stream} -> stream
      :error -> throw({:mint, conn, wrap_error({:stream_not_found, stream_id})})
    end
  end

  defp assert_stream_in_state(conn, %{state: state}, expected_states) do
    if state not in expected_states do
      debug_data =
        "stream was in state #{inspect(state)} and not in one of the expected states: " <>
          Enum.map_join(expected_states, ", ", &inspect/1)

      send_connection_error!(conn, :protocol_error, debug_data)
    end
  end

  defp send!(%Mint.HTTP2{transport: transport, socket: socket} = conn, bytes) do
    case transport.send(socket, bytes) do
      :ok ->
        conn

      {:error, %TransportError{reason: :closed} = error} ->
        throw({:mint, %{conn | state: :closed}, error})

      {:error, reason} ->
        throw({:mint, conn, reason})
    end
  end

  defp wrap_error(reason) do
    %HTTPError{reason: reason, module: __MODULE__}
  end

  @doc false
  def format_error(reason)

  def format_error(:closed) do
    "the connection is closed"
  end

  def format_error(:closed_for_writing) do
    "the connection is closed for writing, which means that you cannot issue any more " <>
      "requests on the connection but you can expect responses to still be delivered for " <>
      "part of the requests that are in flight. If a connection is closed for writing, " <>
      "it usually means that you got a :server_closed_request error already."
  end

  def format_error(:too_many_concurrent_requests) do
    "the number of max concurrent HTTP/2 requests supported by the server has been reached. " <>
      "Use Mint.HTTP2.get_server_setting/2 with the :max_concurrent_streams setting name " <>
      "to find out the maximum number of concurrent requests supported by the server."
  end

  def format_error({:max_header_list_size_exceeded, size, max_size}) do
    "the given header list (of size #{size}) goes over the max header list size of " <>
      "#{max_size} supported by the server. In HTTP/2, the header list size is calculated " <>
      "by summing up the size in bytes of each header name, value, plus 32 for each header."
  end

  def format_error({:exceeds_window_size, what, window_size}) do
    what =
      case what do
        :request -> "request"
        :connection -> "connection"
      end

    "the given data exceeds the #{what} window size, which is #{window_size}. " <>
      "The server will refill the window size of the #{what} when ready. This will be " <>
      "handled transparently by stream/2."
  end

  def format_error({:stream_not_found, stream_id}) do
    "request not found (with stream_id #{inspect(stream_id)})"
  end

  def format_error(:unknown_request_to_stream) do
    "can't stream chunk of data because the request is unknown"
  end

  def format_error(:request_is_not_streaming) do
    "can't send more data on this request since it's not streaming"
  end

  def format_error({:unallowed_trailing_header, {name, value}}) do
    "header #{inspect(name)} (with value #{inspect(value)}) is not allowed as a trailing header"
  end

  def format_error(:missing_status_header) do
    "the :status pseudo-header (which is required in HTTP/2) is missing from the response"
  end

  def format_error({:server_closed_request, error_code}) do
    "server closed request with error code #{inspect(error_code)}"
  end

  def format_error({:server_closed_connection, error, debug_data}) do
    "server closed connection with error code #{inspect(error)} and debug data: " <> debug_data
  end

  def format_error(:unprocessed) do
    "request was not processed by the server, which means that it's safe to retry on a " <>
      "different or new connection"
  end

  def format_error({:frame_size_error, frame}) do
    "frame size error for #{inspect(frame)} frame"
  end

  def format_error({:protocol_error, debug_data}) do
    "protocol error: " <> debug_data
  end

  def format_error({:compression_error, debug_data}) do
    "compression error: " <> debug_data
  end

  def format_error({:flow_control_error, debug_data}) do
    "flow control error: " <> debug_data
  end
end