lib/thousand_island.ex

defmodule ThousandIsland do
  @moduledoc """
  Thousand Island is a modern, pure Elixir socket server, inspired heavily by
  [ranch](https://github.com/ninenines/ranch). It aims to be easy to understand
  & reason about, while also being at least as stable and performant as alternatives.

  Thousand Island is implemented as a supervision tree which is intended to be hosted
  inside a host application, often as a dependency embedded within a higher-level
  protocol library such as [Bandit](https://github.com/mtrudel/bandit). Aside from
  supervising the Thousand Island process tree, applications interact with Thousand
  Island primarily via the `ThousandIsland.Handler` behaviour.

  ## Handlers

  The `ThousandIsland.Handler` behaviour defines the interface that Thousand Island
  uses to pass `ThousandIsland.Socket`s up to the application level; together they
  form the primary interface that most applications will have with Thousand Island.
  Thousand Island comes with a few simple protocol handlers to serve as examples;
  these can be found in the [examples](https://github.com/mtrudel/thousand_island/tree/main/examples)
  folder of this project. A simple implementation would look like this:

  ```elixir
  defmodule Echo do
    use ThousandIsland.Handler

    @impl ThousandIsland.Handler
    def handle_data(data, socket, state) do
      ThousandIsland.Socket.send(socket, data)
      {:continue, state}
    end
  end

  {:ok, pid} = ThousandIsland.start_link(port: 1234, handler_module: Echo)
  ```

  For more information, please consult the `ThousandIsland.Handler` documentation.

  ## Starting a Thousand Island Server

  A typical use of `ThousandIsland` might look like the following:

  ```elixir
  defmodule MyApp.Supervisor do
    # ... other Supervisor boilerplate

    def init(config) do
      children = [
        # ... other children as dictated by your app
        {ThousandIsland, port: 1234, handler_module: MyApp.ConnectionHandler}
      ]

      Supervisor.init(children, strategy: :one_for_one)
    end
  end
  ```

  You can also start servers directly via the `start_link/1` function:

  ```elixir
  {:ok, pid} = ThousandIsland.start_link(port: 1234, handler_module: MyApp.ConnectionHandler)
  ```

  ## Configuration

  A number of options are defined when starting a server. The complete list is
  defined by the `t:ThousandIsland.options/0` type.

  ## Connection Draining & Shutdown

  `ThousandIsland` instances are just a process tree consisting of standard
  `Supervisor`, `GenServer` and `Task` modules, and so the usual rules regarding
  shutdown and shutdown timeouts apply. Immediately upon beginning the shutdown
  sequence the ThousandIsland.ShutdownListener process will cause the listening
  socket to shut down, which in turn will cause all of the
  ThousandIsland.Acceptor processes to shut down as well. At this point all that
  is left in the supervision tree are several layers of Supervisors and whatever
  `Handler` processes were in progress when shutdown was initiated. At this
  point, standard `Supervisor` shutdown timeout semantics give existing
  connections a chance to finish things up. `Handler` processes trap exit, so
  they continue running beyond shutdown until they either complete or are
  `:brutal_kill`ed after their shutdown timeout expires.

  ## Logging & Telemetry

  As a low-level library, Thousand Island purposely does not do any inline
  logging of any kind. The `ThousandIsland.Logger` module defines a number of
  functions to aid in tracing connections at various log levels, and such logging
  can be dynamically enabled and disabled against an already running server. This
  logging is backed by telemetry events internally.

  Thousand Island emits a rich set of telemetry events including spans for each
  server, acceptor process, and individual client connection. These telemetry
  events are documented in the `ThousandIsland.Telemetry` module.
  """

  @typedoc """
  Possible options to configure a server. Valid option values are as follows:

  * `handler_module`: The name of the module used to handle connections to this server.
  The module is expected to implement the `ThousandIsland.Handler` behaviour. Required
  * `handler_options`: A term which is passed as the initial state value to
  `c:ThousandIsland.Handler.handle_connection/2` calls. Optional, defaulting to nil.
  * `genserver_options`: A term which is passed as the value to the handler module's
  underlying `GenServer.start_link/3` call. Optional, defaulting to []
  * `port`: The TCP port number to listen on. If not specified this defaults to 4000.
  If a port number of `0` is given, the server will dynamically assign a port number
  which can then be obtained via `sockname/1`
  * `transport_module`: The name of the module which provides basic socket functions.
  Thousand Island provides `ThousandIsland.Transports.TCP` and `ThousandIsland.Transports.SSL`,
  which provide clear and TLS encrypted TCP sockets respectively. If not specified this
  defaults to `ThousandIsland.Transports.TCP`
  * `transport_options`: A keyword list of options to be passed to the transport module's
  `c:ThousandIsland.Transport.listen/2` function. Valid values depend on the transport
  module specified in `transport_module` and can be found in the documentation for the
  `ThousandIsland.Transports.TCP` and `ThousandIsland.Transports.SSL` modules. Any options
  in terms of interfaces to listen to / certificates and keys to use for SSL connections
  will be passed in via this option
  * `num_acceptors`: The number of acceptor processes to run. Defaults to 100
  * `num_connections`: The maximum number of concurrent connections which each acceptor will
  accept before throttling connections. Connections will be throttled by having the acceptor
  process wait `max_connections_retry_wait` milliseconds, up to `max_connections_retry_count`
  times for existing connections to terminate & make room for this new connection. If there is
  still no room for this new connection after this interval, the acceptor will close the client
  connection and emit a `[:thousand_island, :acceptor, :spawn_error]` telemetry event. This number
  is expressed per-acceptor, so the total number of maximum connections for a Thousand Island
  server is `num_acceptors * num_connections`. Defaults to `16_384`
  * `max_connections_retry_wait`: How long to wait during each iteration as described in
  `num_connectors` above, in milliseconds. Defaults to `1000`
  * `max_connections_retry_count`: How many iterations to wait as described in `num_connectors`
  above. Defaults to `5`
  * `read_timeout`: How long to wait for client data before closing the connection, in
  milliseconds. Defaults to 60_000
  * `shutdown_timeout`: How long to wait for existing client connections to complete before
  forcibly shutting those connections down at server shutdown time, in milliseconds. Defaults to
  15_000. May also be `:infinity` or `:brutal_kill` as described in the `Supervisor`
  documentation
  """
  @type options :: [
          handler_module: module(),
          handler_options: term(),
          genserver_options: GenServer.options(),
          port: :inet.port_number(),
          transport_module: module(),
          transport_options: transport_options(),
          num_acceptors: pos_integer(),
          num_connections: non_neg_integer() | :infinity,
          max_connections_retry_count: non_neg_integer(),
          max_connections_retry_wait: timeout(),
          read_timeout: timeout(),
          shutdown_timeout: timeout()
        ]

  @typedoc "A module implementing `ThousandIsland.Transport` behaviour"
  @type transport_module :: ThousandIsland.Transports.TCP | ThousandIsland.Transports.SSL

  @typedoc "A keyword list of options to be passed to the transport module's `listen/2` function"
  @type transport_options() ::
          ThousandIsland.Transports.TCP.options() | ThousandIsland.Transports.SSL.options()

  @doc false
  @spec child_spec(options()) :: Supervisor.child_spec()
  def child_spec(opts) do
    %{
      id: {__MODULE__, make_ref()},
      start: {__MODULE__, :start_link, [opts]},
      type: :supervisor,
      restart: :permanent
    }
  end

  @doc """
  Starts a `ThousandIsland` instance with the given options. Returns a pid
  that can be used to further manipulate the server via other functions defined on
  this module in the case of success, or an error tuple describing the reason the
  server was unable to start in the case of failure.
  """
  @spec start_link(options()) :: Supervisor.on_start()
  def start_link(opts \\ []) do
    opts
    |> ThousandIsland.ServerConfig.new()
    |> ThousandIsland.Server.start_link()
  end

  @doc """
  Returns information about the address and port that the server is listening on
  """
  @spec listener_info(Supervisor.supervisor()) ::
          {:ok, ThousandIsland.Transport.socket_info()} | :error
  def listener_info(supervisor) do
    case ThousandIsland.Server.listener_pid(supervisor) do
      nil -> :error
      pid -> {:ok, ThousandIsland.Listener.listener_info(pid)}
    end
  end

  @doc """
  Gets a list of active connection processes. This is inherently a bit of a leaky notion in the
  face of concurrency, as there may be connections coming and going during the period that this
  function takes to run. Callers should account for the possibility that new connections may have
  been made since / during this call, and that processes returned by this call may have since
  completed. The order that connection processes are returned in is not specified
  """
  @spec connection_pids(Supervisor.supervisor()) :: {:ok, [pid()]} | :error
  def connection_pids(supervisor) do
    case ThousandIsland.Server.acceptor_pool_supervisor_pid(supervisor) do
      nil -> :error
      acceptor_pool_pid -> {:ok, collect_connection_pids(acceptor_pool_pid)}
    end
  end

  @doc """
  Suspend the server. This will close the listening port, and will stop the acceptance of new
  connections. Existing connections will stay connected and will continue to be processed.

  The server can later be resumed by calling `resume/1`, or shut down via standard supervision
  patterns.

  If this function returns `:error`, it is unlikely that the server is in a useable state

  Note that if you do not explicitly set a port (or if you set port to `0`), then the server will
  bind to a different port when you resume it. This new port can be obtained as usual via the
  `listener_info/1` function. This is not a concern if you explicitly set a port value when first
  instantiating the server
  """
  defdelegate suspend(supervisor), to: ThousandIsland.Server

  @doc """
  Resume a suspended server. This will reopen the listening port, and resume the acceptance of new
  connections
  """
  defdelegate resume(supervisor), to: ThousandIsland.Server

  defp collect_connection_pids(acceptor_pool_pid) do
    acceptor_pool_pid
    |> ThousandIsland.AcceptorPoolSupervisor.acceptor_supervisor_pids()
    |> Enum.reduce([], fn acceptor_sup_pid, acc ->
      case ThousandIsland.AcceptorSupervisor.connection_sup_pid(acceptor_sup_pid) do
        nil -> acc
        connection_sup_pid -> connection_pids(connection_sup_pid, acc)
      end
    end)
  end

  defp connection_pids(connection_sup_pid, acc) do
    connection_sup_pid
    |> DynamicSupervisor.which_children()
    |> Enum.reduce(acc, fn
      {_, connection_pid, _, _}, acc when is_pid(connection_pid) ->
        [connection_pid | acc]

      _, acc ->
        acc
    end)
  end

  @doc """
  Synchronously stops the given server, waiting up to the given number of milliseconds
  for existing connections to finish up. Immediately upon calling this function,
  the server stops listening for new connections, and then proceeds to wait until
  either all existing connections have completed or the specified timeout has
  elapsed.
  """
  @spec stop(Supervisor.supervisor(), timeout()) :: :ok
  def stop(supervisor, connection_wait \\ 15_000) do
    Supervisor.stop(supervisor, :normal, connection_wait)
  end
end