defmodule Ankh.HTTP2 do
@moduledoc "HTTP/2 protocol implementation"
alias Ankh.{HTTP2, Protocol, Transport}
alias HPack.Table
alias HTTP2.Frame
alias HTTP2.Frame.Settings
alias HTTP2.Stream, as: HTTP2Stream
@opaque t :: %__MODULE__{
buffer: iodata(),
concurrent_streams: non_neg_integer(),
headers_type: :headers | :trailers,
last_stream_id: HTTP2Stream.id(),
last_local_stream_id: HTTP2Stream.id(),
last_remote_stream_id: HTTP2Stream.id(),
recv_hbf_type: HTTP2Stream.hbf_type(),
recv_hpack: Table.t(),
recv_settings: Settings.Payload.settings(),
references: %{HTTP2Stream.id() => reference()},
send_hpack: Table.t(),
send_queue: :queue.queue(Frame.t()),
send_settings: Settings.Payload.settings(),
streams: %{HTTP2Stream.id() => HTTP2Stream.t()},
transport: Transport.t(),
uri: URI.t(),
window_size: integer()
}
defstruct buffer: <<>>,
concurrent_streams: 0,
headers_type: :headers,
last_stream_id: 0,
last_local_stream_id: 0,
last_remote_stream_id: 0,
recv_hbf_type: nil,
recv_hpack: nil,
recv_settings: [],
references: %{},
send_hpack: nil,
send_queue: :queue.new(),
send_settings: [],
streams: %{},
transport: nil,
uri: nil,
window_size: 0
defimpl Protocol do
alias Ankh.{HTTP, Transport}
alias Ankh.HTTP.{Request, Response}
alias Ankh.HTTP2.Frame
alias Ankh.HTTP2.Stream, as: HTTP2Stream
alias Frame.{
Continuation,
Data,
GoAway,
Headers,
Ping,
Priority,
PushPromise,
RstStream,
Settings,
Splittable,
WindowUpdate
}
alias HPack.Table
import Ankh.HTTP2.Stream,
only: [is_local_stream: 2, is_client_stream: 1, is_server_stream: 1]
require Logger
@active_stream_states [:open, :half_closed_local, :half_closed_remote]
@connection_preface "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n"
@initial_header_table_size 4_096
@initial_concurrent_streams 128
@initial_frame_size 16_384
@initial_window_size 65_535
@max_window_size 2_147_483_647
@max_frame_size 16_777_215
@default_settings [
header_table_size: @initial_header_table_size,
enable_push: true,
max_concurrent_streams: @initial_concurrent_streams,
initial_window_size: @initial_window_size,
max_frame_size: @initial_frame_size,
max_header_list_size: 128
]
def accept(%HTTP2{} = protocol, uri, transport, socket, options) do
{timeout, options} = Keyword.pop(options, :connect_timeout, 5_000)
with {:ok, protocol} <- new(protocol, options),
{:ok, transport} <- Transport.new(transport, socket),
{:ok, @connection_preface} <- Transport.recv(transport, 24, timeout),
{:ok, transport} <- Transport.accept(transport, options),
{:ok, protocol} <- send_settings(%{protocol | transport: transport, uri: uri}) do
{:ok, protocol}
else
_ ->
{:error, :protocol_error}
end
end
def connect(%HTTP2{} = protocol, uri, transport, options) do
with {:ok, protocol} <- new(protocol, options),
:ok <- Transport.send(transport, @connection_preface),
{:ok, protocol} <-
send_settings(%{
protocol
| last_local_stream_id: -1,
transport: transport,
uri: uri
}) do
{:ok, protocol}
end
end
defp new(%HTTP2{} = protocol, options) do
settings = Keyword.get(options, :settings, [])
recv_settings = Keyword.merge(@default_settings, settings)
header_table_size = Keyword.get(recv_settings, :header_table_size)
with send_hpack <- Table.new(@initial_header_table_size),
recv_hpack <- Table.new(header_table_size) do
{:ok,
%{
protocol
| send_hpack: send_hpack,
send_settings: @default_settings,
recv_hpack: recv_hpack,
recv_settings: recv_settings,
window_size: @initial_window_size
}}
end
end
def error(%HTTP2{} = protocol) do
with {:ok, protocol} <- send_error(protocol, :protocol_error) do
{:ok, protocol}
end
end
def stream(%HTTP2{buffer: buffer, transport: transport} = protocol, msg) do
with {:ok, data} <- Transport.handle_msg(transport, msg),
{:ok, protocol, responses} <- process_buffer(%{protocol | buffer: buffer <> data}) do
{:ok, protocol, Enum.reverse(responses)}
end
end
def request(
%HTTP2{uri: %{authority: authority, scheme: scheme}} = protocol,
%Request{method: method, path: path} = request
) do
request =
request
|> Request.put_header(":method", Atom.to_string(method))
|> Request.put_header(":authority", authority)
|> Request.put_header(":scheme", scheme)
|> Request.put_header(":path", path)
with {:ok, protocol, %{reference: reference} = stream} <- get_stream(protocol, nil),
{:ok, protocol} <- send_headers(protocol, stream, request),
{:ok, protocol} <- send_data(protocol, stream, request),
{:ok, protocol} <- send_trailers(protocol, stream, request) do
{:ok, protocol, reference}
end
end
def respond(%HTTP2{} = protocol, reference, %Response{status: status} = response) do
response = Response.put_header(response, ":status", Integer.to_string(status))
with {:ok, protocol, stream} <- get_stream(protocol, reference),
{:ok, protocol} <- send_headers(protocol, stream, response),
{:ok, protocol} <- send_data(protocol, stream, response),
{:ok, protocol} <- send_trailers(protocol, stream, response) do
{:ok, protocol}
end
end
defp process_buffer(%HTTP2{buffer: buffer, recv_settings: recv_settings} = protocol) do
max_frame_size =
recv_settings
|> Keyword.fetch!(:max_frame_size)
|> min(@max_frame_size)
buffer
|> Frame.stream()
|> Enum.reduce_while({:ok, protocol, []}, fn
{rest, nil}, {:ok, protocol, responses} ->
{:halt, {:ok, %{protocol | buffer: rest}, responses}}
{_rest, {length, _type, _id, _data}}, {:ok, protocol, _responses}
when length > max_frame_size ->
{:halt, send_error(protocol, :frame_size_error)}
{_rest, {_length, _type, id, _data}},
{:ok, %{last_local_stream_id: llid, last_remote_stream_id: lrid}, _responses}
when not is_local_stream(llid, id) and id < lrid ->
{:halt, send_error(protocol, :protocol_error)}
{rest, {_length, type, _id, data}},
{:ok, %{recv_hbf_type: recv_hbf_type} = protocol, responses} ->
with {:ok, frame_type} <- frame_for_type(type),
{:ok, frame} <- Frame.decode(frame_type, data),
{:ok, protocol, responses} <- recv_frame(protocol, frame, responses) do
{:cont, {:ok, %{protocol | buffer: rest}, responses}}
else
{:error, :not_found} when not is_nil(recv_hbf_type) ->
{:halt, {:error, :protocol_error}}
{:error, :not_found} ->
{:cont, {:ok, %{protocol | buffer: rest}, responses}}
{:error, reason} ->
send_error(protocol, reason)
{:halt, {:error, reason}}
end
end)
end
defp frame_for_type(0x0), do: {:ok, %Data{}}
defp frame_for_type(0x1), do: {:ok, %Headers{}}
defp frame_for_type(0x2), do: {:ok, %Priority{}}
defp frame_for_type(0x3), do: {:ok, %RstStream{}}
defp frame_for_type(0x4), do: {:ok, %Settings{}}
defp frame_for_type(0x5), do: {:ok, %PushPromise{}}
defp frame_for_type(0x6), do: {:ok, %Ping{}}
defp frame_for_type(0x7), do: {:ok, %GoAway{}}
defp frame_for_type(0x8), do: {:ok, %WindowUpdate{}}
defp frame_for_type(0x9), do: {:ok, %Continuation{}}
defp frame_for_type(_type), do: {:error, :not_found}
defp get_stream(%HTTP2{references: references} = protocol, reference)
when is_reference(reference),
do: get_stream(protocol, Map.get(references, reference))
defp get_stream(%HTTP2{last_local_stream_id: last_local_stream_id} = protocol, nil = _id) do
stream_id = last_local_stream_id + 2
with {:ok, protocol, stream} <- new_stream(protocol, stream_id) do
{:ok, %{protocol | last_local_stream_id: stream_id}, stream}
end
end
defp get_stream(
%HTTP2{
last_local_stream_id: last_local_stream_id,
streams: streams
} = protocol,
stream_id
) do
case Map.get(streams, stream_id) do
stream when not is_nil(stream) ->
{:ok, protocol, stream}
nil when not is_local_stream(last_local_stream_id, stream_id) ->
with {:ok, protocol, stream} <- new_stream(protocol, stream_id) do
{:ok, protocol, stream}
end
_ ->
{:error, :protocol_error}
end
end
defp new_stream(
%HTTP2{references: references, send_settings: send_settings, streams: streams} =
protocol,
stream_id
) do
window_size = Keyword.get(send_settings, :initial_window_size)
%HTTP2Stream{reference: reference} = stream = HTTP2Stream.new(stream_id, window_size)
{
:ok,
%{
protocol
| references: Map.put(references, reference, stream_id),
streams: Map.put(streams, stream_id, stream)
},
stream
}
end
defp send_frame(%HTTP2{transport: transport} = protocol, %{stream_id: 0} = frame) do
with {:ok, frame, data} <- Frame.encode(frame),
:ok <- Transport.send(transport, data) do
Logger.debug(fn -> "SENT #{inspect(frame)}" end)
{:ok, protocol}
end
end
defp send_frame(%HTTP2{send_queue: send_queue} = protocol, frame),
do: process_send_queue(%{protocol | send_queue: :queue.in(frame, send_queue)})
defp process_send_queue(%HTTP2{send_queue: send_queue} = protocol) do
fn -> {protocol, send_queue} end
|> Stream.resource(
fn {protocol, queue} ->
with {:value, frame} <- :queue.peek(queue),
max_frame_size when max_frame_size > 0 <- frame_size_for(protocol, frame),
{{:value, frame}, queue} <- :queue.out(queue),
{:ok, protocol, []} <-
process_queued_frame(%{protocol | send_queue: queue}, frame, max_frame_size) do
{[protocol], {protocol, queue}}
else
{:ok, protocol, frames} ->
protocol =
Enum.reduce(frames, protocol, fn frame, %{send_queue: queue} = protocol ->
%{protocol | send_queue: :queue.in_r(frame, queue)}
end)
{[protocol], {protocol, :queue.new()}}
max_frame_size when is_integer(max_frame_size) and max_frame_size <= 0 ->
{:halt, protocol}
:empty ->
{:halt, protocol}
{:empty, _queue} ->
{:halt, protocol}
{:error, _reason} ->
{:halt, protocol}
end
end,
fn _protocol -> :ok end
)
|> Enum.reduce({:ok, protocol}, fn protocol, _acc -> {:ok, protocol} end)
end
defp process_queued_frame(
%HTTP2{send_hpack: send_hpack} = protocol,
%Headers{payload: %Headers.Payload{hbf: headers} = payload} = frame,
size
)
when is_list(headers) do
headers
|> HPack.encode(send_hpack)
|> case do
{:ok, send_hpack, hbf} ->
process_queued_frame(
%{protocol | send_hpack: send_hpack},
%{frame | payload: %Headers.Payload{payload | hbf: hbf}},
size
)
_ ->
{:error, :compression_error}
end
end
defp process_queued_frame(%HTTP2{send_queue: send_queue} = protocol, %Data{} = frame, size)
when size <= 0 do
{:ok, %{protocol | send_queue: :queue.in_r(frame, send_queue)}}
end
defp process_queued_frame(protocol, %{stream_id: stream_id} = frame, size) do
frame
|> Splittable.split(size)
|> Enum.reduce_while({:ok, protocol, []}, fn
%Data{length: length} = frame, {:ok, protocol, frames} ->
with {:ok, %{window_size: window_size} = protocol,
%{window_size: stream_window_size}}
when window_size - length >= 0 and stream_window_size - length >= 0 <-
get_stream(protocol, stream_id),
{:ok, protocol} <- send_stream_frame(protocol, frame) do
{:cont, {:ok, protocol, frames}}
else
{:ok, protocol, _stream} ->
{:cont, {:ok, protocol, [frame | frames]}}
{:error, _reason} = error ->
{:halt, error}
end
frame, {:ok, protocol, frames} ->
case send_stream_frame(protocol, frame) do
{:ok, protocol} ->
{:cont, {:ok, protocol, frames}}
{:error, _reason} = error ->
{:halt, error}
end
end)
end
defp send_stream_frame(
%HTTP2{streams: streams, transport: transport} = protocol,
%{stream_id: stream_id} = frame
) do
with {:ok, frame, data} <- Frame.encode(frame),
{:ok, protocol, stream} <- get_stream(protocol, stream_id),
{:ok, stream} <- HTTP2Stream.send(stream, frame),
:ok <- Transport.send(transport, data),
{:ok, protocol} <- reduce_window_size_after_send(protocol, frame) do
{:ok, %{protocol | streams: Map.put(streams, stream_id, stream)}}
else
{:error, _reason} = error ->
error
end
end
defp frame_size_for(
%HTTP2{send_settings: send_settings, window_size: window_size} = protocol,
%Data{stream_id: stream_id}
) do
with {:ok, _protocol, %{window_size: stream_window_size}} <-
get_stream(protocol, stream_id) do
send_settings
|> Keyword.fetch!(:max_frame_size)
|> min(@max_frame_size)
|> min(window_size)
|> min(stream_window_size)
end
end
defp frame_size_for(%{send_settings: send_settings}, _frame) do
send_settings
|> Keyword.fetch!(:max_frame_size)
|> min(@max_frame_size)
end
defp reduce_window_size_after_send(%HTTP2{window_size: window_size} = protocol, %Data{
length: length
}) do
new_window_size = window_size - length
Logger.debug(fn ->
"window_size after send: #{window_size} - #{length} = #{new_window_size}"
end)
{:ok, %{protocol | window_size: new_window_size}}
end
defp reduce_window_size_after_send(protocol, _frame), do: {:ok, protocol}
defp send_settings(%HTTP2{send_hpack: send_hpack, recv_settings: recv_settings} = protocol) do
header_table_size = Keyword.get(recv_settings, :header_table_size)
with {:ok, send_hpack} <- Table.resize(header_table_size, send_hpack),
{:ok, protocol} <-
send_frame(protocol, %Settings{
payload: %Settings.Payload{settings: recv_settings}
}) do
{:ok, %{protocol | send_hpack: send_hpack}}
else
_ ->
{:error, :compression_error}
end
end
defp send_error(
%HTTP2{last_stream_id: last_stream_id, transport: transport} = protocol,
reason
) do
with {:ok, _protocol} <-
send_frame(protocol, %GoAway{
payload: %GoAway.Payload{
last_stream_id: last_stream_id,
error_code: reason
}
}),
{:ok, _transport} <- Transport.close(transport) do
{:error, reason}
end
end
defp send_stream_error(protocol, stream_id, reason) do
send_frame(protocol, %RstStream{
stream_id: stream_id,
payload: %RstStream.Payload{error_code: reason}
})
end
defp recv_frame(
%HTTP2{send_settings: send_settings} = protocol,
%Settings{
stream_id: 0,
flags: %Settings.Flags{ack: false},
payload: %Settings.Payload{settings: settings}
},
responses
) do
new_send_settings = Keyword.merge(send_settings, settings)
old_header_table_size = Keyword.get(send_settings, :header_table_size)
new_header_table_size = Keyword.get(new_send_settings, :header_table_size)
old_window_size = Keyword.get(send_settings, :initial_window_size)
new_window_size = Keyword.get(new_send_settings, :initial_window_size)
with {:ok, protocol} <-
send_frame(protocol, %Settings{flags: %Settings.Flags{ack: true}, payload: nil}),
{:ok, protocol} <-
adjust_header_table_size(protocol, old_header_table_size, new_header_table_size),
{:ok, protocol} <-
adjust_streams_window_size(protocol, old_window_size, new_window_size),
{:ok, protocol} <-
process_send_queue(%{protocol | send_settings: new_send_settings}) do
{:ok, protocol, responses}
else
_ ->
{:error, :compression_error}
end
end
defp recv_frame(
protocol,
%Settings{stream_id: 0, length: 0, flags: %Settings.Flags{ack: true}},
responses
),
do: {:ok, protocol, responses}
defp recv_frame(
_protocol,
%Settings{stream_id: 0, flags: %Settings.Flags{ack: true}},
_responses
),
do: {:error, :frame_size_error}
defp recv_frame(
protocol,
%Ping{stream_id: 0, length: 8, flags: %Ping.Flags{ack: true}},
responses
),
do: {:ok, protocol, responses}
defp recv_frame(_protocol, %Ping{stream_id: 0, length: length}, _responses)
when length != 8,
do: {:error, :frame_size_error}
defp recv_frame(
protocol,
%Ping{stream_id: 0, flags: %Ping.Flags{ack: false} = flags} = frame,
responses
) do
with {:ok, protocol} <-
send_frame(protocol, %Ping{frame | flags: %Ping.Flags{flags | ack: true}}),
do: {:ok, protocol, responses}
end
defp recv_frame(_protocol, %WindowUpdate{stream_id: 0, length: length}, _responses)
when length != 4,
do: {:error, :frame_size_error}
defp recv_frame(
protocol,
%WindowUpdate{stream_id: 0, payload: %WindowUpdate.Payload{increment: 0}},
_responses
),
do: send_error(protocol, :protocol_error)
defp recv_frame(
%HTTP2{window_size: window_size},
%WindowUpdate{stream_id: 0, payload: %WindowUpdate.Payload{increment: increment}},
_responses
)
when window_size + increment > @max_window_size do
new_window_size = window_size + increment
Logger.error(fn ->
"WINDOW_UPDATE window_size: #{new_window_size} larger than max_window_size #{@max_window_size}"
end)
{:error, :flow_control_error}
end
defp recv_frame(
%HTTP2{window_size: window_size} = protocol,
%WindowUpdate{stream_id: 0, payload: %WindowUpdate.Payload{increment: increment}},
responses
) do
new_window_size = window_size + increment
Logger.debug(fn ->
"WINDOW_UPDATE window_size: #{window_size} + #{increment} = #{new_window_size}"
end)
with {:ok, protocol} <- process_send_queue(%{protocol | window_size: new_window_size}) do
{:ok, protocol, responses}
end
end
defp recv_frame(
_protocol,
%GoAway{stream_id: 0, payload: %GoAway.Payload{error_code: reason}},
_responses
),
do: {:error, reason}
defp recv_frame(_protocol, %{stream_id: 0} = _frame, _responses),
do: {:error, :protocol_error}
defp recv_frame(
%HTTP2{streams: streams} = protocol,
%{stream_id: stream_id} = frame,
responses
) do
with {:ok, protocol, %{state: old_state} = stream} <-
get_stream(protocol, stream_id),
{:ok, %{state: new_state, recv_hbf_type: recv_hbf_type} = stream, response} <-
HTTP2Stream.recv(stream, frame),
{:ok, protocol} <-
check_stream_limit(
%{
protocol
| recv_hbf_type: recv_hbf_type,
streams: Map.put(streams, stream_id, stream)
},
old_state,
new_state
),
{:ok, protocol} <-
calculate_last_stream_ids(protocol, frame),
{:ok, protocol, responses} <-
process_stream_response(protocol, frame, responses, response),
{:ok, protocol} <- process_window_update(protocol, frame) do
{:ok, protocol, responses}
else
{:error, reason}
when reason in [:protocol_error, :compression_error, :stream_closed, :refused_stream] ->
{:error, reason}
{:error, reason} ->
send_stream_error(protocol, stream_id, reason)
{:ok, protocol, responses}
end
end
defp check_stream_limit(
%HTTP2{send_settings: send_settings, concurrent_streams: concurrent_streams} =
protocol,
:idle,
new_state
)
when new_state in @active_stream_states do
send_settings
|> Keyword.get(:max_concurrent_streams)
|> case do
max_concurrent_streams when max_concurrent_streams > concurrent_streams ->
{:ok, %{protocol | concurrent_streams: concurrent_streams + 1}}
_ ->
{:error, :refused_stream}
end
end
defp check_stream_limit(
%HTTP2{concurrent_streams: concurrent_streams} = protocol,
old_state,
_new_state
)
when old_state in @active_stream_states,
do: {:ok, %{protocol | concurrent_streams: concurrent_streams - 1}}
defp check_stream_limit(protocol, _old_state, _new_state), do: {:ok, protocol}
defp calculate_last_stream_ids(
%HTTP2{last_stream_id: lsid, last_local_stream_id: llid} = protocol,
%{stream_id: stream_id} = _frame
)
when is_local_stream(llid, stream_id) do
llid = max(llid, stream_id)
{:ok, %{protocol | last_local_stream_id: llid, last_stream_id: max(llid, lsid)}}
end
defp calculate_last_stream_ids(protocol, %Priority{} = _frame), do: {:ok, protocol}
defp calculate_last_stream_ids(
%HTTP2{last_stream_id: lsid, last_remote_stream_id: lrid} = protocol,
%{stream_id: stream_id} = _frame
) do
lrid = max(lrid, stream_id)
{:ok, %{protocol | last_remote_stream_id: lrid, last_stream_id: max(lrid, lsid)}}
end
defp process_window_update(protocol, %WindowUpdate{}), do: process_send_queue(protocol)
defp process_window_update(protocol, _frame), do: {:ok, protocol}
defp process_stream_response(
protocol,
%{length: 0},
responses,
{:data, _ref, _hbf, _end_stream} = response
),
do: {:ok, protocol, [response | responses]}
defp process_stream_response(
protocol,
%{length: length, stream_id: stream_id},
responses,
{:data, _ref, _hbf, _end_stream} = response
) do
window_update = %WindowUpdate{payload: %WindowUpdate.Payload{increment: length}}
with {:ok, protocol} <- send_frame(protocol, window_update),
{:ok, protocol} <- send_frame(protocol, %{window_update | stream_id: stream_id}) do
{:ok, protocol, [response | responses]}
end
end
defp process_stream_response(
protocol,
_frame,
responses,
{_type, _ref, [<<>>], _end_stream} = response
),
do: {:ok, protocol, [response | responses]}
defp process_stream_response(
%HTTP2{recv_hpack: recv_hpack, send_settings: send_settings} = protocol,
frame,
responses,
{type, ref, hbf, end_stream}
)
when type in [:headers, :push_promise] do
max_header_table_size = Keyword.get(send_settings, :header_table_size)
hbf
|> Enum.join()
|> HPack.decode(recv_hpack, max_header_table_size)
|> case do
{:ok, recv_hpack, headers} ->
with {:ok, protocol} <- validate_headers(protocol, frame, headers) do
{
:ok,
%{protocol | recv_hpack: recv_hpack},
[{type, ref, headers, end_stream} | responses]
}
end
_ ->
{:error, :compression_error}
end
end
defp process_stream_response(protocol, _frame, responses, nil),
do: {:ok, protocol, responses}
defp process_stream_response(protocol, _frame, responses, response),
do: {:ok, protocol, [response | responses]}
defp validate_headers(
%HTTP2{headers_type: :headers, last_local_stream_id: llid} = protocol,
%{stream_id: id} = _frame,
headers
)
when is_client_stream(id) and is_local_stream(llid, id) do
with :ok <- Response.validate_headers(headers, true, ["connection"]),
do: {:ok, %{protocol | headers_type: :trailers}}
end
defp validate_headers(
%HTTP2{headers_type: :headers, last_local_stream_id: llid} = protocol,
%{stream_id: id} = _frame,
headers
)
when is_client_stream(id) and not is_local_stream(llid, id) do
with :ok <- Request.validate_headers(headers, true, ["connection"]),
do: {:ok, %{protocol | headers_type: :trailers}}
end
defp validate_headers(
%HTTP2{headers_type: :headers, last_local_stream_id: llid} = protocol,
%{stream_id: id} = _frame,
headers
)
when is_server_stream(id) and is_local_stream(llid, id) do
with :ok <- Request.validate_headers(headers, true, ["connection"]),
do: {:ok, %{protocol | headers_type: :trailers}}
end
defp validate_headers(
%HTTP2{headers_type: :headers, last_local_stream_id: llid} = protocol,
%{stream_id: id} = _frame,
headers
)
when is_server_stream(id) and not is_local_stream(llid, id) do
with :ok <- Response.validate_headers(headers, true, ["connection"]),
do: {:ok, %{protocol | headers_type: :trailers}}
end
defp validate_headers(protocol, _frame, trailers) do
with :ok <- HTTP.validate_trailers(trailers, true),
do: {:ok, protocol}
end
defp send_headers(protocol, %HTTP2Stream{id: stream_id}, %{
headers: headers,
body: body
}) do
send_frame(protocol, %Headers{
stream_id: stream_id,
flags: %Headers.Flags{end_stream: IO.iodata_length(body) == 0},
payload: %Headers.Payload{hbf: headers}
})
end
defp send_data(protocol, _stream, %{body: []}), do: {:ok, protocol}
defp send_data(protocol, %HTTP2Stream{id: stream_id}, %{
body: [_data | _rest] = data,
trailers: trailers
}) do
Enum.reduce_while(data, {:ok, protocol}, fn data, {:ok, protocol} ->
case send_frame(protocol, %Data{
stream_id: stream_id,
flags: %Data.Flags{end_stream: Enum.empty?(trailers)},
payload: %Data.Payload{data: data}
}) do
{:ok, protocol} ->
{:cont, {:ok, protocol}}
{:error, _reason} = error ->
{:halt, error}
end
end)
end
defp send_data(protocol, %HTTP2Stream{id: stream_id}, %{
body: data,
trailers: trailers
})
when is_binary(data) do
send_frame(protocol, %Data{
stream_id: stream_id,
flags: %Data.Flags{end_stream: Enum.empty?(trailers)},
payload: %Data.Payload{data: data}
})
end
defp send_trailers(protocol, _stream, %{trailers: []}), do: {:ok, protocol}
defp send_trailers(protocol, %HTTP2Stream{id: stream_id}, %{trailers: trailers}) do
send_frame(protocol, %Headers{
stream_id: stream_id,
flags: %Headers.Flags{end_stream: true},
payload: %Headers.Payload{hbf: trailers}
})
end
defp adjust_header_table_size(
%HTTP2{send_hpack: send_hpack} = protocol,
old_size,
new_size
) do
new_size
|> Table.resize(send_hpack, old_size)
|> case do
{:ok, send_hpack} -> {:ok, %{protocol | send_hpack: send_hpack}}
_ -> {:error, :compression_error}
end
end
defp adjust_streams_window_size(
%HTTP2{streams: streams} = protocol,
old_window_size,
new_window_size
) do
streams =
Enum.reduce(streams, %{}, fn {id, stream}, streams ->
stream = HTTP2Stream.adjust_window_size(stream, old_window_size, new_window_size)
Map.put(streams, id, stream)
end)
{:ok, %{protocol | streams: streams}}
end
end
end