Skip to main content

lib/http/web_transport/receive_stream.ex

defmodule HTTP.WebTransport.ReceiveStream do
  @moduledoc """
  Readable byte stream owned by a WebTransport session.
  """

  defstruct [:transport, :ref]

  @type t :: %__MODULE__{transport: HTTP.WebTransport.t(), ref: term()}

  @call_timeout 5_000

  @spec read(t(), keyword() | map()) :: {:ok, binary()} | :fin | {:error, term()}
  def read(%__MODULE__{transport: transport, ref: stream_ref}, options \\ []) do
    options = if is_map(options), do: Map.to_list(options), else: options
    timeout = Keyword.get(options, :timeout, :infinity)

    GenServer.call(transport.pid, {:read_stream, stream_ref, timeout}, call_timeout(timeout))
  catch
    :exit, {:timeout, _call} -> {:error, :timeout}
    :exit, _reason -> {:error, :closed}
  end

  @spec cancel(t(), keyword() | map()) :: :ok | {:error, term()}
  def cancel(%__MODULE__{transport: transport, ref: stream_ref}, options \\ []) do
    options = if is_map(options), do: Map.to_list(options), else: options
    code = Keyword.get(options, :code, 0)

    GenServer.call(transport.pid, {:cancel_receive_stream, stream_ref, code}, @call_timeout)
  catch
    :exit, _reason -> {:error, :closed}
  end

  defp call_timeout(:infinity), do: :infinity
  defp call_timeout(timeout) when is_integer(timeout) and timeout >= 0, do: timeout + 1_000
  defp call_timeout(_timeout), do: @call_timeout
end