defmodule GRPC.Client.Stream do
@moduledoc """
A struct that *streaming* clients get from rpc function calls and use to send further requests.
## Fields
* `:channel` - `GRPC.Channel`, the channel established by client
* `:payload` - data used by adapter in a request
* `:path` - the request path to sent
* `:request_mod` - the request module, or nil for untyped protocols
* `:response_mod` - the response module, or nil for untyped protocols
* `:codec` - the codec
* `:req_stream` - indicates if request is streaming
* `:res_stream` - indicates if reply is streaming
"""
@typep stream_payload :: any()
@type t :: %__MODULE__{
channel: GRPC.Channel.t(),
service_name: String.t(),
method_name: String.t(),
grpc_type: atom(),
rpc: tuple(),
payload: stream_payload,
path: String.t(),
request_mod: atom(),
response_mod: atom(),
codec: atom(),
server_stream: boolean(),
canceled: boolean(),
compressor: module(),
accepted_compressors: [module()],
headers: map(),
__interface__: map()
}
defstruct channel: nil,
service_name: nil,
method_name: nil,
grpc_type: nil,
rpc: nil,
payload: %{},
path: nil,
request_mod: nil,
response_mod: nil,
codec: GRPC.Codec.Proto,
server_stream: nil,
# TODO: it's better to get canceled status from adapter
canceled: false,
compressor: nil,
accepted_compressors: [],
headers: %{},
__interface__: %{
send_request: &__MODULE__.send_request/3,
receive_data: &__MODULE__.receive_data/2
}
@doc false
def put_payload(%{payload: payload} = stream, key, val) do
payload = if payload, do: payload, else: %{}
%{stream | payload: Map.put(payload, key, val)}
end
def put_headers(%{headers: existing} = stream, headers) do
new_headers = Map.merge(existing, headers)
%{stream | headers: new_headers}
end
def put_headers(stream, _) do
stream
end
@doc false
@spec send_request(GRPC.Client.Stream.t(), struct, Keyword.t()) :: GRPC.Client.Stream.t()
def send_request(
%{codec: codec, channel: %{adapter: adapter}, compressor: compressor} = stream,
request,
opts
) do
encoded = codec.encode(request)
send_end_stream = Keyword.get(opts, :end_stream) || false
# If compressor exists, compress is true by default
compressor =
if opts[:compress] == false do
nil
else
compressor
end
adapter.send_data(stream, encoded,
send_end_stream: send_end_stream,
compressor: compressor
)
end
def receive_data(%{channel: %{adapter: adapter}} = stream, opts) do
adapter.receive_data(stream, opts)
end
end