lib/grpc/client/stream.ex

defmodule GRPC.Client.Stream do
  @moduledoc """
  A struct that *streaming* clients get from rpc function calls and use to send further requests.

  ## Fields

    * `:channel`           - `GRPC.Channel`, the channel established by client
    * `:payload`           - data used by adapter in a request
    * `:path`              - the request path to sent
    * `request_mod`        - the request module, or nil for untyped protocols
    * `response_mod`       - the response module, or nil for untyped protocols
    * `:codec`             - the codec
    * `:req_stream`        - indicates if request is streaming
    * `:res_stream`        - indicates if reply is streaming
  """

  @typep stream_payload :: any
  @type t :: %__MODULE__{
          channel: GRPC.Channel.t(),
          service_name: String.t(),
          method_name: String.t(),
          grpc_type: atom,
          rpc: tuple,
          payload: stream_payload,
          path: String.t(),
          request_mod: atom,
          response_mod: atom,
          codec: atom,
          server_stream: boolean,
          canceled: boolean,
          compressor: module,
          accepted_compressors: [module],
          headers: map,
          __interface__: map
        }

  defstruct channel: nil,
            service_name: nil,
            method_name: nil,
            grpc_type: nil,
            rpc: nil,
            payload: %{},
            path: nil,
            request_mod: nil,
            response_mod: nil,
            codec: GRPC.Codec.Proto,
            server_stream: nil,
            # TODO: it's better to get canceled status from adapter
            canceled: false,
            compressor: nil,
            accepted_compressors: [],
            headers: %{},
            __interface__: %{send_request: &__MODULE__.send_request/3, recv: &GRPC.Stub.do_recv/2}

  @doc false
  def put_payload(%{payload: payload} = stream, key, val) do
    payload = if payload, do: payload, else: %{}
    %{stream | payload: Map.put(payload, key, val)}
  end

  def put_headers(%{headers: existing} = stream, headers) do
    new_headers = Map.merge(existing, headers)
    %{stream | headers: new_headers}
  end

  def put_headers(stream, _) do
    stream
  end

  @doc false
  @spec send_request(GRPC.Client.Stream.t(), struct, Keyword.t()) :: GRPC.Client.Stream.t()
  def send_request(
        %{codec: codec, channel: %{adapter: adapter}, compressor: compressor} = stream,
        request,
        opts
      ) do
    encoded = codec.encode(request)
    send_end_stream = Keyword.get(opts, :end_stream, false)

    # If compressor exists, compress is true by default
    compressor =
      if opts[:compress] == false do
        nil
      else
        compressor
      end

    adapter.send_data(stream, encoded,
      send_end_stream: send_end_stream,
      compressor: compressor
    )
  end
end