lib/membrane_rtmp_plugin/rtmp/source/tcp_server.ex

defmodule Membrane.RTMP.Source.TcpServer do
  @moduledoc """
  A simple tcp server, which handles each new incoming connection.

  The `socket_handler` function passed inside the options should take the socket returned by `:gen_tcp.accept/1`
  and return `{:ok, pid}`, where the `pid` describes a process, which will be interacting with the socket.
  `Membrane.RTMP.Source.TcpServer` will grant that process control over the socket via `:gen_tcp.controlling_process/2`.
  """

  use Task

  @enforce_keys [:port, :listen_options, :socket_handler]

  defstruct @enforce_keys ++ [:parent]

  @typedoc """
  Defines options for the TCP server.
  The `listen_options` are passed to the `:gen_tcp.listen/2` function.
  The `socket_handler` is a function that takes socket returned by `:gen_tcp.accept/1` and returns the pid of a process,
  which will be interacting with the socket. TcpServer will grant that process control over the socket via `:gen_tcp.controlling_process/2`.
  """
  @type t :: %__MODULE__{
          port: :inet.port_number(),
          listen_options: [:inet.inet_backend() | :gen_tcp.listen_option()],
          socket_handler: (:gen_tcp.socket() -> {:ok, pid} | {:error, reason :: any()}),
          parent: pid()
        }

  @spec start_link(t()) :: {:ok, pid}
  def start_link(options) do
    Task.start_link(__MODULE__, :run, [options])
  end

  @spec run(t()) :: nil
  def run(options) do
    {:ok, socket} = :gen_tcp.listen(options.port, options.listen_options)
    if options.parent, do: send(options.parent, {:tcp_server_started, socket})

    accept_loop(socket, options.socket_handler)
  end

  defp accept_loop(socket, socket_handler) do
    {:ok, client} = :gen_tcp.accept(socket)
    {:ok, handler_task} = Task.start(fn -> serve(client, socket_handler) end)

    :ok = :gen_tcp.controlling_process(client, handler_task)
    send(handler_task, :control_granted)

    accept_loop(socket, socket_handler)
  end

  defp serve(socket, socket_handler) do
    {:ok, pid} = socket_handler.(socket)

    receive do
      :control_granted ->
        :ok = :gen_tcp.controlling_process(socket, pid)
    end
  end
end