defprotocol GSMLG.Socket.Stream.Protocol do
@doc """
Send data through the socket.
"""
@spec send(t, iodata) :: :ok | {:error, term}
def send(self, data)
@doc """
Send a file through the socket, using non-copying operations where available.
"""
@spec file(t, String.t()) :: :ok | {:error, term}
@spec file(t, String.t(), Keyword.t()) :: :ok | {:error, term}
def file(self, path, options \\ [])
@doc """
Receive data from the socket compatible with the packet type.
"""
@spec recv(t) :: {:ok, term} | {:error, term}
def recv(self)
@doc """
Receive data from the socket with the given length or options.
"""
@spec recv(t, non_neg_integer | Keyword.t()) :: {:ok, term} | {:error, term}
def recv(self, length_or_options)
@doc """
Receive data from the socket with the given length and options.
"""
@spec recv(t, non_neg_integer, Keyword.t()) :: {:ok, term} | {:error, term}
def recv(self, length, options)
@doc """
Receive data from the socket until `{:error, :closed}` is returned.
"""
@spec recv_all!(t, String.t()) :: String.t()
def recv_all!(self, acc \\ "")
@doc """
Shutdown the socket in the given mode, either `:both`, `:read`, or `:write`.
"""
@spec shutdown(t, :both | :read | :write) :: :ok | {:error, term}
def shutdown(self, how \\ :both)
@doc """
Close the socket.
"""
@spec close(t) :: :ok | {:error, term}
def close(self)
end
defmodule GSMLG.Socket.Stream do
@type t :: GSMLG.Socket.Stream.Protocol.t()
use GSMLG.Socket.Helpers
import Kernel, except: [send: 2]
defdelegate send(self, data), to: GSMLG.Socket.Stream.Protocol
defbang(send(self, data), to: GSMLG.Socket.Stream.Protocol)
defdelegate file(self, path), to: GSMLG.Socket.Stream.Protocol
defbang(file(self, path), to: GSMLG.Socket.Stream.Protocol)
defdelegate file(self, path, options), to: GSMLG.Socket.Stream.Protocol
defbang(file(self, path, options), to: GSMLG.Socket.Stream.Protocol)
defdelegate recv(self), to: GSMLG.Socket.Stream.Protocol
defbang(recv(self), to: GSMLG.Socket.Stream.Protocol)
defdelegate recv(self, length_or_options), to: GSMLG.Socket.Stream.Protocol
defbang(recv(self, length_or_options), to: GSMLG.Socket.Stream.Protocol)
defdelegate recv(self, length, options), to: GSMLG.Socket.Stream.Protocol
defbang(recv(self, length, options), to: GSMLG.Socket.Stream.Protocol)
defdelegate recv_all!(self), to: GSMLG.Socket.Stream.Protocol
defbang(recv_all!(self), to: GSMLG.Socket.Stream.Protocol)
defdelegate recv_all!(self, acc), to: GSMLG.Socket.Stream.Protocol
defbang(recv_all!(self, acc), to: GSMLG.Socket.Stream.Protocol)
defdelegate shutdown(self), to: GSMLG.Socket.Stream.Protocol
defbang(shutdown(self), to: GSMLG.Socket.Stream.Protocol)
defdelegate shutdown(self, how), to: GSMLG.Socket.Stream.Protocol
defbang(shutdown(self, how), to: GSMLG.Socket.Stream.Protocol)
defdelegate close(self), to: GSMLG.Socket.Stream.Protocol
defbang(close(self), to: GSMLG.Socket.Stream.Protocol)
@doc """
Read from the IO device and send to the socket following the given options.
## Options
- `:size` is the amount of bytes to read from the IO device, if omitted it
will read until EOF
- `:offset` is the amount of bytes to read from the IO device before
starting to send what's being read
- `:chunk_size` is the size of the chunks read from the IO device at a time
"""
@spec io(t, :io.device()) :: :ok | {:error, term}
@spec io(t, :io.device(), Keyword.t()) :: :ok | {:error, term}
def io(self, io, options \\ []) do
if offset = options[:offset] do
case IO.binread(io, offset) do
:eof ->
:ok
{:error, reason} ->
{:error, reason}
_ ->
io(0, self, io, options[:size] || -1, options[:chunk_size] || 4096)
end
else
io(0, self, io, options[:size] || -1, options[:chunk_size] || 4096)
end
end
defp io(total, self, io, size, chunk_size) when size > 0 and total + chunk_size > size do
case IO.binread(io, size - total) do
:eof ->
:ok
{:error, reason} ->
{:error, reason}
data ->
self |> send(data)
end
end
defp io(total, self, io, size, chunk_size) do
case IO.binread(io, chunk_size) do
:eof ->
:ok
{:error, reason} ->
{:error, reason}
data ->
self |> send(data)
io(total + chunk_size, self, io, size, chunk_size)
end
end
defbang(io(self, io))
defbang(io(self, io, options))
end
defimpl GSMLG.Socket.Stream.Protocol, for: Port do
def send(self, data) do
:gen_tcp.send(self, data)
end
def file(self, path, options \\ []) do
cond do
options[:size] && options[:chunk_size] ->
:file.sendfile(path, self, options[:offset] || 0, options[:size],
chunk_size: options[:chunk_size]
)
options[:size] ->
:file.sendfile(path, self, options[:offset] || 0, options[:size], [])
true ->
:file.sendfile(path, self)
end
end
def recv(self) do
recv(self, 0, [])
end
def recv(self, length) when length |> is_integer do
recv(self, length, [])
end
def recv(self, options) when options |> is_list do
recv(self, 0, options)
end
def recv(self, length, options) do
timeout = options[:timeout] || :infinity
case :gen_tcp.recv(self, length, timeout) do
{:ok, _} = ok ->
ok
{:error, :closed} ->
{:ok, nil}
{:error, reason} ->
{:error, reason}
end
end
def recv_all!(self, acc \\ "") do
case :gen_tcp.recv(self, 0) do
{:ok, data} -> recv_all!(self, acc <> data)
{:error, :closed} -> acc
end
end
def shutdown(self, how \\ :both) do
:gen_tcp.shutdown(
self,
case how do
:read -> :read
:write -> :write
:both -> :read_write
end
)
end
def close(self) do
:gen_tcp.close(self)
end
end
defimpl GSMLG.Socket.Stream.Protocol, for: Tuple do
require Record
def send(self, data) when self |> Record.is_record(:sslsocket) do
:ssl.send(self, data)
end
def file(self, path, options \\ []) when self |> Record.is_record(:sslsocket) do
cond do
options[:size] && options[:chunk_size] ->
file(self, path, options[:offset] || 0, options[:size], options[:chunk_size])
options[:size] ->
file(self, path, options[:offset] || 0, options[:size], 4096)
true ->
file(self, path, 0, -1, 4096)
end
end
defp file(self, path, offset, -1, chunk_size) when path |> is_binary do
file(self, path, offset, File.stat!(path).size, chunk_size)
end
defp file(self, path, offset, size, chunk_size) when path |> is_binary do
case File.open!(
path,
[:read],
&GSMLG.Socket.Stream.io(self, &1, offset: offset, size: size, chunk_size: chunk_size)
) do
{:ok, :ok} ->
:ok
{:ok, {:error, reason}} ->
{:error, reason}
{:error, reason} ->
{:error, reason}
end
end
def recv(self) when self |> Record.is_record(:sslsocket) do
recv(self, 0, [])
end
def recv(self, length) when self |> Record.is_record(:sslsocket) and length |> is_integer do
recv(self, length, [])
end
def recv(self, options) when self |> Record.is_record(:sslsocket) and options |> is_list do
recv(self, 0, options)
end
def recv(self, length, options) when self |> Record.is_record(:sslsocket) do
timeout = options[:timeout] || :infinity
case :ssl.recv(self, length, timeout) do
{:ok, _} = ok ->
ok
{:error, :closed} ->
{:ok, nil}
{:error, reason} ->
{:error, reason}
end
end
def recv_all!(self, acc \\ "") do
case :ssl.recv(self, 0, :infinity) do
{:ok, data} -> recv_all!(self, acc <> data)
{:error, :closed} -> acc
end
end
def shutdown(self, how \\ :both) do
:ssl.shutdown(
self,
case how do
:read -> :read
:write -> :write
:both -> :read_write
end
)
end
def close(self) do
:ssl.close(self)
end
end