-module(roadrunner_listener).
-moduledoc """
Listener gen_server — owns the listening socket and the acceptor pool
for one named roadrunner instance.
Plain TCP is backed by `gen_tcp` with the legacy `inet_drv` backend.
The OTP-27 `{inet_backend, socket}` NIF path was tried but adds
significant own-time overhead on short-lived connections via
per-socket-option lookups. TLS is backed by `ssl`, gated by the
`tls` opt.
Both paths share the same `roadrunner_transport` tagged-socket abstraction.
On `init/1` the listener opens the listen socket, builds the shared
`roadrunner_conn:proto_opts()` (dispatch + body limits + timeouts +
`max_clients` counter), and spawn-links `num_acceptors` (default 10)
`roadrunner_acceptor` processes that pull from the same listen socket.
Connection workers are unlinked from the acceptor so a single
connection crash doesn't take the pool down.
All duration and interval values in `opts()` are in milliseconds —
`request_timeout`, `keep_alive_timeout`, `rate_check_interval`,
`hibernate_after`, and `slot_reconciliation.interval`.
""".
-behaviour(gen_server).
-export([start_link/2, stop/1, drain/2, port/1, info/1, status/1, reload_routes/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2]).
-export_type([opts/0]).
-define(DEFAULT_MAX_CONTENT_LENGTH, 10485760).
-define(DEFAULT_REQUEST_TIMEOUT, 30000).
-define(DEFAULT_KEEP_ALIVE_TIMEOUT, 60000).
-define(DEFAULT_NUM_ACCEPTORS, 10).
-define(DEFAULT_MAX_KEEP_ALIVE, 1000).
-define(DEFAULT_MAX_CLIENTS, 150).
-define(DEFAULT_MIN_BYTES_PER_SECOND, 100).
-doc """
Listener configuration map.
Required:
- `port` — TCP port to bind. `0` lets the kernel pick an ephemeral
port; query it back with `port/1`.
Routing (pick one):
- `routes => module()` — single-handler dispatch. Every request
goes to `Module:handle/1` and `roadrunner_req:state/1`
returns `undefined`.
- `routes => {module(), term()}` — single-handler dispatch with
per-handler state. The opaque second element is reachable from
the handler via `roadrunner_req:state/1`.
- `routes => #{handler := module(), state => term(),
middlewares => [...]}` — map form for single-handler dispatch;
use it to attach per-handler middlewares (or future per-handler
framework knobs) alongside the state.
- `routes => roadrunner_router:routes()` — list of route entries;
each entry is either a `{Path, Handler}` / `{Path, Handler, State}`
tuple or a `#{path := Path, handler := Handler, state => ...,
middlewares => [...]}` map. First match wins.
Optional middleware and timing knobs (durations in milliseconds):
- `middlewares` — listener-wide pipeline applied to every request.
- `max_content_length` — request-body cap; over-cap reads return
`payload_too_large`. Default 10 MB.
- `request_timeout` — header-read timeout on a fresh conn.
Default 30 s.
- `keep_alive_timeout` — idle timeout between requests on a
keep-alive conn. Default 60 s.
- `num_acceptors` — size of the acceptor pool. Default 10.
- `max_keep_alive_requests` — requests served per conn before
forced close. Default 1000.
- `max_clients` — concurrent connection cap. Default 150.
- `min_bytes_per_second` — slow-loris guard on the request-read
phase (0 disables). Default 100.
- `rate_check_interval` — how often the rate guard re-checks
(ms). Default 1000.
- `body_buffering` — `auto` (default; framework reads the full
body before invoking the handler) or `manual` (handler calls
`roadrunner_req:read_body/1,2`).
- `slot_reconciliation` — `disabled` (default) or
`#{interval := Ms}` to periodically reap slots orphaned by
brutal-kill exits.
- `graceful_drain` — opt out of the per-conn pg drain group
(`true` default; `false` trades drain notification for ~10 %
lower per-conn overhead on short-lived workloads).
- `hibernate_after` — when set, idle conns hibernate after this
many milliseconds of main-loop idle time.
- `protocols` — list of `t:protocol_entry/0`. Default `[http1]`.
On TLS this drives `alpn_preferred_protocols` automatically.
- `tls` — `[ssl:tls_server_option()]` for HTTPS. Empty / absent
for plain HTTP.
The inline source comments next to each field carry the deeper
ops-tuning rationale.
""".
-type opts() :: #{
port := inet:port_number(),
routes =>
module()
| {module(), term()}
| #{
handler := module(),
state => term(),
middlewares => roadrunner_middleware:middleware_list()
}
| roadrunner_router:routes(),
middlewares => roadrunner_middleware:middleware_list(),
max_content_length => non_neg_integer(),
request_timeout => non_neg_integer(),
keep_alive_timeout => non_neg_integer(),
num_acceptors => pos_integer(),
max_keep_alive_requests => pos_integer(),
max_clients => pos_integer(),
min_bytes_per_second => non_neg_integer(),
%% How often `reading_request` re-checks the running
%% bytes-per-second average against `min_bytes_per_second`.
%% Default `1000` — matches the 1-second grace period of the
%% rate check itself. Tests use shorter intervals (20–30) to
%% exercise rate-check fires deterministically without
%% second-scale waits; ops can tune for chattier observability.
rate_check_interval => pos_integer(),
body_buffering => auto | manual,
slot_reconciliation => disabled | #{interval := pos_integer()},
%% Opt out of the per-conn `pg` drain group. Default `true`
%% (current behavior). Set to `false` for short-lived h1-only
%% workloads (REST APIs, health-check probes, CLI clients) where
%% conns finish on their own faster than any drain notification
%% could fire. Trades graceful drain notification for ~10% lower
%% per-conn overhead. Long-lived conns (loop handlers, SSE,
%% WebSocket) still rely on this — keep `true` if your handlers
%% have those.
graceful_drain => boolean(),
%% When set, the per-connection process auto-hibernates after
%% `Ms` milliseconds of idle main-loop time. Most useful for
%% long-lived keep-alive HTTP/1.1 connections that mostly sit
%% idle between requests — drops process heap to ~1KB during
%% the wait. Setting this routes `roadrunner_conn_loop`'s recv
%% through the active-mode `recv_with_hibernate/3` path so the
%% receive's `after` clause has a window to call
%% `erlang:hibernate/3`.
hibernate_after => pos_integer(),
%% Protocols this listener accepts. Each entry is either a bare
%% atom (`http1` / `http2`) or a `{Proto, Opts}` tuple carrying
%% protocol-specific tuning. Bare atom means "default opts".
%% Default `[http1]`.
%%
%% On TLS listeners the list drives `alpn_preferred_protocols`
%% automatically (`http1` → `~"http/1.1"`, `http2` → `~"h2"`).
%% An explicit `alpn_preferred_protocols` inside `tls` overrides
%% the derivation.
%%
%% On plain TCP, `[http1]` serves HTTP/1.1 only; `[http2]` serves
%% h2c prior-knowledge (client sends the h2 connection preface
%% `PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n` directly, no Upgrade
%% negotiation). `[http1, http2]` on plain TCP is rejected at
%% `init/1` — roadrunner has no `Upgrade: h2c` implementation,
%% so the two cannot share a plaintext port.
%%
%% HTTP/1 currently has no tunables; its opts map must be empty
%% (room reserved for future additions without an API break).
%%
%% HTTP/2 tunables live under the `http2` tuple's opts map:
%%
%% - `conn_window` — connection-level receive window peak (bytes,
%% `1..2^31-1`). RFC 9113 default `65535`; values above the RFC
%% default emit an early `WINDOW_UPDATE(0, peak - 65535)` after
%% the server SETTINGS. Useful for upload-heavy workloads on
%% non-LAN RTTs. Reference points: gun 8 MB, Go net/http2 1 GB,
%% h2o 16 MB+, Mint 16 MB. Worst-case memory is
%% `max_clients × peak`.
%% - `stream_window` — stream-level receive window peak (bytes,
%% `1..2^31-1`). Advertised via `SETTINGS_INITIAL_WINDOW_SIZE`.
%% Default `65535`. Setting above `conn_window` is allowed but
%% not useful — the conn-level peak is the binding constraint.
%% - `window_refill_threshold` — refill trigger (bytes,
%% `pos_integer`). When the remaining window drops below this,
%% the conn refills back to the peak. Lower threshold = fewer
%% `WINDOW_UPDATE` frames per byte consumed but a smaller live
%% window between refills. Default `32768`.
%%
%% Empty list, unknown protocol atoms, duplicate entries, bad
%% tuple shape, unknown sub-option keys, or out-of-range sub-
%% option values are rejected at `init/1`. See
%% `docs/roadmap.md` "h2 receive-window defaults" for the
%% trade-off behind keeping the conservative RFC defaults.
protocols => [protocol_entry(), ...],
tls => [ssl:tls_server_option()]
}.
-doc """
One protocol entry in the listener's `protocols` list. Either a
bare atom (`http1` / `http2`) for default opts, or a tuple
`{Proto, ProtoOpts}` carrying protocol-specific tuning. HTTP/1
currently has no tunables (its opts map must be empty); HTTP/2
tunables live under `t:http2_opts/0`.
On TLS the list drives `alpn_preferred_protocols`. On plain TCP,
`[http2]` means prior-knowledge h2c (client sends the h2 preface
directly); `[http1, http2]` on plain TCP is rejected at `init/1`
since there's no `Upgrade: h2c` implementation.
""".
-type protocol_entry() :: http1 | http2 | {http1, #{}} | {http2, http2_opts()}.
-doc """
HTTP/2 listener tunables (under `{http2, ThisMap}` in `protocols`).
- `conn_window` — connection-level receive window peak in bytes
(`1..2^31-1`). RFC 9113 default `65535`; values above the
default emit an early `WINDOW_UPDATE(0, peak - 65535)` after
the server SETTINGS. Worst-case memory is
`max_clients × peak`.
- `stream_window` — stream-level receive window peak in bytes
(`1..2^31-1`). Advertised via `SETTINGS_INITIAL_WINDOW_SIZE`.
Default `65535`. Setting above `conn_window` is allowed but
not useful — the conn-level peak is the binding constraint.
- `window_refill_threshold` — refill trigger in bytes. When the
remaining window drops below this, the conn refills back to
the peak. Lower threshold = fewer `WINDOW_UPDATE` frames per
byte consumed but a smaller live window between refills.
Default `32768`.
""".
-type http2_opts() :: #{
conn_window => 1..16#7FFFFFFF,
stream_window => 1..16#7FFFFFFF,
window_refill_threshold => 1..16#7FFFFFFF
}.
-record(state, {
listen_socket :: roadrunner_transport:socket() | closed,
port :: inet:port_number(),
proto_opts :: roadrunner_conn:proto_opts(),
phase = accepting :: accepting | draining | stopped,
%% Slot reconciliation (off by default). When enabled, a periodic
%% timer compares `client_counter` against pg group membership and
%% releases slots that have been orphaned by `kill`-style exits
%% (which bypass `terminate/3`). `prev_diff` tracks the previous
%% tick's diff to filter out spawn-time races (a freshly-started
%% conn has bumped the counter but not yet pg:join'd) — only
%% sustained diffs are reaped.
reconciliation = disabled ::
disabled
| #{
interval := pos_integer(),
prev_diff := non_neg_integer()
}
}).
-doc """
Start a named listener that binds the given TCP port.
`port => 0` lets the kernel choose an ephemeral port — query it back
with `port/1`.
""".
-spec start_link(Name :: atom(), opts()) -> {ok, pid()} | {error, term()}.
start_link(Name, Opts) when is_atom(Name), is_map(Opts) ->
gen_server:start_link({local, Name}, ?MODULE, Opts, []).
-doc "Stop a listener and release its port. In-flight conns are not waited on.".
-spec stop(Name :: atom()) -> ok.
stop(Name) ->
gen_server:stop(Name).
-doc """
Graceful shutdown. Closes the listen socket immediately so no new
connections are accepted, broadcasts `{roadrunner_drain, Deadline}` to
every active conn (so `{loop, ...}` handlers can opt to honor it),
and then polls the live-connection counter until it hits zero or
`Timeout` milliseconds elapse. Conns still alive at the deadline are
hard-killed via `exit(Pid, shutdown)`.
Returns `{ok, drained}` when the counter reached zero before the
deadline, or `{timeout, Remaining}` with the count that was still
alive when the timeout fired (those processes are torn down before
returning).
After `drain/2` returns the listener exits — call `start_link/2`
again to bring it back up.
""".
-spec drain(Name :: atom(), Timeout :: non_neg_integer()) ->
{ok, drained} | {timeout, non_neg_integer()}.
drain(Name, Timeout) ->
gen_server:call(Name, {drain, Timeout}, Timeout + 5000).
-doc "Return the actual TCP port the listener is bound to.".
-spec port(Name :: atom()) -> inet:port_number().
port(Name) ->
gen_server:call(Name, port).
-doc """
Return runtime introspection for a listener:
- `active_clients` — current number of connections held open.
- `max_clients` — the configured cap.
- `requests_served` — cumulative count of requests whose headers
parsed successfully since the listener started. Includes 4xx
responses from the router (404) and the body-size pre-check (413);
excludes wire-level parse failures, idle keep-alive timeouts, and
silent slow-client closes.
Useful for ops dashboards / health endpoints.
""".
-spec info(Name :: atom()) ->
#{
active_clients := non_neg_integer(),
max_clients := pos_integer(),
requests_served := non_neg_integer()
}.
info(Name) ->
gen_server:call(Name, info).
-doc """
Return the listener's lifecycle phase:
- `accepting` — normal serving; new connections are being accepted.
- `draining` — `drain/2` is in progress; the listen socket is
closed and active conns are finishing.
After `drain/2` (or `stop/1`) returns the listener has exited and
this call would fail with a `noproc`.
""".
-spec status(Name :: atom()) -> accepting | draining.
status(Name) ->
gen_server:call(Name, status).
-doc """
Atomically swap the listener's compiled route table without
restarting it. The new `Routes` are compiled via
`roadrunner_router:compile/2` (with the listener's `middlewares`
re-baked) and published to `persistent_term`;
in-flight conns keep using whatever they read at request-resolve
time, but every subsequent dispatch sees the new table.
Returns `ok` on success or `{error, no_routes}` if the listener was
started in single-handler mode (`routes => Module` or no `routes`
opt) — there's no router table to reload.
""".
-spec reload_routes(Name :: atom(), roadrunner_router:routes()) ->
ok | {error, no_routes}.
reload_routes(Name, Routes) ->
gen_server:call(Name, {reload_routes, Routes}).
%% --- gen_server callbacks ---
-doc false.
-spec init(opts()) -> {ok, #state{}} | {stop, term()}.
init(#{port := Port} = Opts) ->
ListenerName = listener_name(),
publish_routes(ListenerName, Opts),
ProtoOpts = build_proto_opts(Opts, ListenerName),
proc_lib:set_label({roadrunner_listener, ListenerName, Port}),
Protocols = maps:get(protocols, ProtoOpts),
case open_listen_socket(Port, Opts, Protocols) of
{ok, LSocket} ->
{ok, BoundPort} = roadrunner_transport:port(LSocket),
NumAcceptors = maps:get(num_acceptors, Opts, ?DEFAULT_NUM_ACCEPTORS),
ok = spawn_acceptors(LSocket, ProtoOpts, NumAcceptors),
Reconciliation = setup_reconciliation(Opts),
{ok, #state{
listen_socket = LSocket,
port = BoundPort,
proto_opts = ProtoOpts,
reconciliation = Reconciliation
}};
{error, Reason} ->
{stop, {listen_failed, Reason}}
end.
-spec setup_reconciliation(opts()) ->
disabled | #{interval := pos_integer(), prev_diff := non_neg_integer()}.
setup_reconciliation(#{slot_reconciliation := #{interval := IntervalMs}}) when
is_integer(IntervalMs), IntervalMs > 0
->
erlang:send_after(IntervalMs, self(), reconcile_slots),
#{interval => IntervalMs, prev_diff => 0};
setup_reconciliation(_Opts) ->
disabled.
%% Compile + publish to `persistent_term` once, at listener start. The
%% conn reads via `persistent_term:get/1` on every request, so the
%% lookup is O(1) and the table is shared across all conns of this
%% listener without copying.
-spec publish_routes(atom(), opts()) -> ok.
publish_routes(ListenerName, #{routes := Routes} = Opts) when is_list(Routes) ->
ListenerMws = maps:get(middlewares, Opts, []),
persistent_term:put(
{roadrunner_routes, ListenerName},
roadrunner_router:compile(Routes, ListenerMws)
);
publish_routes(_ListenerName, _Opts) ->
ok.
%% Recover the registered name we were started with. `start_link/2` always
%% calls `gen_server:start_link({local, Name}, ...)` so the name is set
%% before `init/1` runs.
-spec listener_name() -> atom().
listener_name() ->
{registered_name, Name} = process_info(self(), registered_name),
Name.
-spec open_listen_socket(
inet:port_number(), opts(), [http1 | http2, ...]
) -> {ok, roadrunner_transport:socket()} | {error, term()}.
open_listen_socket(Port, #{tls := UserTlsOpts}, Protocols) ->
%% TLS path — caller supplies cert/key. ALPN is derived from the
%% normalized `protocols` list (`http2` → `~"h2"`, `http1` →
%% `~"http/1.1"`) unless the user supplied
%% `alpn_preferred_protocols` explicitly, in which case the
%% explicit value wins. Hardened defaults are layered underneath
%% (user values win), and the standard transport options sit on
%% top so accepted sockets behave like the plain-TCP variant.
TlsOpts = roadrunner_transport:build_tls_opts(Protocols, UserTlsOpts),
roadrunner_transport:listen_tls(Port, TlsOpts ++ base_listen_opts());
open_listen_socket(Port, _Opts, _Protocols) ->
%% Plain TCP. The legacy `inet_drv` backend (gen_tcp default) has
%% lower per-call overhead than the OTP-27 `socket` backend on
%% short-lived connections. fprof on `connection_storm` shows the
%% `socket` backend's `prim_socket:is_supported_option` + the
%% `maps:fold_1` walking it costs ~46% of per-conn own time
%% (~106 lookups per connection). See
%% `docs/conn_lifecycle_investigation.md`. The new backend's
%% async I/O wins are real for long-lived connections; revisit
%% if/when the workload mix shifts there.
roadrunner_transport:listen(Port, base_listen_opts()).
-spec base_listen_opts() -> [gen_tcp:listen_option()].
base_listen_opts() ->
%% `nodelay` disables Nagle's algorithm on accepted sockets.
%% RFC 9113 §5.2 doesn't mandate it, but every production h2
%% server (nginx, h2o, cowboy, …) sets it because h2 responses
%% emit multiple small frames per request (HEADERS + DATA),
%% and Nagle holds the second write until the client ACKs the
%% first — hitting Linux's 40 ms delayed-ACK timer and
%% capping per-request latency at ~50 ms. h1 isn't affected
%% (one `ssl:send/2` per response) but `nodelay` is the right
%% default for any HTTP server. See
%% `docs/h2_loadgen_artifact.md` for the original investigation.
%%
%% `backlog` overrides OTP's default of 5. With 5, a burst of
%% concurrent connects (real apps, load tests, health-check
%% storms) overflows the kernel listen queue and the new SYNs
%% get dropped — `gen_tcp:connect` succeeds (kernel SYN-cookie
%% path), then the first `send` returns `{error, closed}`
%% because the conn was never queued for `accept`. Cowboy
%% defaults to 1024; matching that. Linux clamps at
%% `net.core.somaxconn` (typically 4096), so this is safely
%% non-truncated everywhere.
%%
%% `buffer` is the emulator's user-space buffer that bounds how
%% many bytes each `{tcp, _, Data}` message carries in
%% `{active, ...}` mode. The OTP default is `min(sndbuf,
%% recbuf)` and on plain TCP with MTU-bounded delivery
%% (1460-byte chunks) this can result in many small messages
%% per request body, each paying the message-passing tax. 64 KB
%% is enough to carry 4 default-sized HTTP/2 DATA frames or a
%% typical request, comfortably above the per-MTU floor without
%% wasting memory at scale (`max_clients × 64KB` ≈ 10 MB at the
%% default `max_clients = 150`). See `erlang/otp#9423` and
%% `ninenines/cowlib#143` for the upstream context that prompted
%% this tuning.
[
binary,
{active, false},
{reuseaddr, true},
{packet, raw},
{nodelay, true},
{backlog, 1024},
{buffer, 65536}
].
%% Multiple acceptor processes all calling gen_tcp:accept on the same listen
%% socket — Linux/BSD accept is thread-safe and avoids thundering-herd via
%% kernel-side queueing.
-spec spawn_acceptors(roadrunner_transport:socket(), roadrunner_conn:proto_opts(), pos_integer()) ->
ok.
spawn_acceptors(LSocket, ProtoOpts, N) when is_integer(N), N >= 0 ->
spawn_acceptors_loop(LSocket, ProtoOpts, 1, N).
-spec spawn_acceptors_loop(
roadrunner_transport:socket(), roadrunner_conn:proto_opts(), pos_integer(), non_neg_integer()
) -> ok.
spawn_acceptors_loop(_LSocket, _ProtoOpts, I, N) when I > N ->
ok;
spawn_acceptors_loop(LSocket, ProtoOpts, I, N) ->
{ok, _Pid} = roadrunner_acceptor:start_link(LSocket, ProtoOpts, I),
spawn_acceptors_loop(LSocket, ProtoOpts, I + 1, N).
-spec build_proto_opts(opts(), atom()) -> roadrunner_conn:proto_opts().
build_proto_opts(Opts, ListenerName) ->
%% Validate + normalize the `protocols` list. Public input may
%% nest `{http2, #{...}}` for protocol-specific tuning; the
%% normalizer flattens HTTP/2 sub-opts onto proto_opts top-level
%% with an `http2_` prefix so the hot path reads each knob via
%% a single `maps:get/2` instead of a nested map dive.
{Protocols, ProtoFlats} = normalize_protocols(Opts),
%% Per-listener atomics: live-connection counter (acceptors bump on
%% accept; conns decrement on exit) and a cumulative requests-served
%% counter (conn bumps on each handler dispatch). Lock-free, ~1ns
%% per op — cheap enough on the hot path.
ClientCounter = atomics:new(1, [{signed, false}]),
RequestsCounter = atomics:new(1, [{signed, false}]),
Base = maps:merge(
#{
dispatch => build_dispatch(Opts, ListenerName),
middlewares => maps:get(middlewares, Opts, []),
max_content_length =>
maps:get(max_content_length, Opts, ?DEFAULT_MAX_CONTENT_LENGTH),
request_timeout => maps:get(request_timeout, Opts, ?DEFAULT_REQUEST_TIMEOUT),
keep_alive_timeout =>
maps:get(keep_alive_timeout, Opts, ?DEFAULT_KEEP_ALIVE_TIMEOUT),
max_keep_alive_requests =>
maps:get(max_keep_alive_requests, Opts, ?DEFAULT_MAX_KEEP_ALIVE),
max_clients => maps:get(max_clients, Opts, ?DEFAULT_MAX_CLIENTS),
client_counter => ClientCounter,
requests_counter => RequestsCounter,
min_bytes_per_second =>
maps:get(min_bytes_per_second, Opts, ?DEFAULT_MIN_BYTES_PER_SECOND),
body_buffering => maps:get(body_buffering, Opts, auto),
listener_name => ListenerName,
graceful_drain => maps:get(graceful_drain, Opts, true),
protocols => Protocols
},
ProtoFlats
),
WithHibernate =
%% Optional `hibernate_after` — `roadrunner_conn_loop` reads it
%% from proto_opts and routes the recv path through
%% `recv_with_hibernate/3` so the conn auto-hibernates after
%% Ms of idle time. Omitted by default because hibernation has
%% a per-wake CPU cost (~tens of microseconds for the GC); only
%% worth enabling for workloads with mostly-idle keep-alive
%% conns where the heap-shrink win dominates.
case Opts of
#{hibernate_after := Ms} when is_integer(Ms), Ms > 0 ->
Base#{hibernate_after => Ms};
#{} ->
Base
end,
%% Optional `rate_check_interval` — the rate-check timer
%% interval inside `reading_request`. Default 1000ms; ops can
%% override.
case Opts of
#{rate_check_interval := IntervalMs} when is_integer(IntervalMs), IntervalMs > 0 ->
WithHibernate#{rate_check_interval => IntervalMs};
#{} ->
WithHibernate
end.
%% `routes` is the unified dispatch option. Single-handler forms
%% (bare atom, `{Mod, State}` tuple, or `#{handler := Mod, ...}` map)
%% all compile to a `{handler, Mod, Pipeline, State}` dispatch tag
%% where `Pipeline` is the pre-composed `next()` fun (listener mws
%% ++ per-handler mws, optionally wrapped in a state-injecting
%% closure, ending in `fun Mod:handle/1`) and `State` is the user's
%% attached state (or `undefined`) exposed alongside for callers
%% that need to introspect outside the request flow. A list of route
%% entries uses the router. When `routes` is omitted, fall back to
%% the default hello-world handler. List-form routes are published
%% to `persistent_term` by `publish_routes/2` — the dispatch tag
%% carries the listener name so the conn can look the table up.
-spec build_dispatch(opts(), atom()) -> roadrunner_conn:dispatch().
%% Validate + normalize the `protocols` opt. Returns a 2-tuple:
%%
%% - `Protocols :: [http1 | http2, ...]` — the enabled protocols as a
%% flat atom list in user-supplied order (ALPN preference).
%% - `ProtoFlats :: #{atom() => term()}` — HTTP/2 sub-opts flattened
%% with `http2_` prefix (`http2_conn_window`, `http2_stream_window`,
%% `http2_window_refill_threshold`), defaults filled. Empty map
%% when http2 is not in the list. Flat keys keep the hot path to
%% one `maps:get/2` per knob; no nested map dives.
%%
%% Error shapes follow the existing `{invalid_listener_opt, K, V}` /
%% `{listener_opt_conflict, ...}` convention:
%%
%% - `{invalid_listener_opt, protocols, V}` for a bad list shape:
%% empty list, non-list, unknown atom, malformed tuple, unknown
%% sub-option key, out-of-range sub-option value, or duplicate
%% entries.
%% - `{listener_opt_conflict, protocols, V, no_h2c_upgrade}` for the
%% one combo we have to reject at config time: both `http1` and
%% `http2` on a plain-TCP listener. Roadrunner has no
%% `Upgrade: h2c` implementation, so the two cannot share a
%% plaintext port; the reason token spells that out so the error
%% message is honest.
-spec normalize_protocols(opts()) -> {[http1 | http2, ...], #{atom() => term()}}.
normalize_protocols(Opts) ->
Raw = maps:get(protocols, Opts, [http1]),
HasTls = maps:is_key(tls, Opts),
Entries = normalize_protocols_list(Raw),
Names = [N || {N, _} <- Entries],
case Names of
[http1, http2] when not HasTls ->
error({listener_opt_conflict, protocols, Raw, no_h2c_upgrade});
[http2, http1] when not HasTls ->
error({listener_opt_conflict, protocols, Raw, no_h2c_upgrade});
_ ->
{Names, flatten_http2_opts(Entries)}
end.
-type protocol_entry_norm() :: {http1, #{}} | {http2, http2_opts()}.
-spec normalize_protocols_list(term()) -> [protocol_entry_norm(), ...].
normalize_protocols_list(L) when is_list(L), L =/= [] ->
Entries = [normalize_protocol_entry(E, L) || E <- L],
Names = [N || {N, _} <- Entries],
case length(lists:usort(Names)) =:= length(Names) of
true -> Entries;
false -> error({invalid_listener_opt, protocols, L})
end;
normalize_protocols_list(L) ->
error({invalid_listener_opt, protocols, L}).
-spec normalize_protocol_entry(term(), term()) -> protocol_entry_norm().
normalize_protocol_entry(http1, _Raw) ->
{http1, #{}};
normalize_protocol_entry(http2, _Raw) ->
{http2, http2_defaults()};
normalize_protocol_entry({http1, Opts}, _Raw) when is_map(Opts), map_size(Opts) =:= 0 ->
{http1, #{}};
normalize_protocol_entry({http2, Opts}, Raw) when is_map(Opts) ->
{http2, validate_http2_opts(Opts, Raw)};
normalize_protocol_entry(_, Raw) ->
error({invalid_listener_opt, protocols, Raw}).
-spec http2_defaults() -> http2_opts().
http2_defaults() ->
#{conn_window => 65535, stream_window => 65535, window_refill_threshold => 32768}.
-spec validate_http2_opts(map(), term()) -> http2_opts().
validate_http2_opts(Opts, Raw) ->
Defaults = http2_defaults(),
maps:fold(
fun(K, V, Acc) ->
case is_map_key(K, Defaults) of
false -> error({invalid_listener_opt, protocols, Raw});
true when is_integer(V, 1, 16#7FFFFFFF) -> Acc#{K => V};
true -> error({invalid_listener_opt, protocols, Raw})
end
end,
Defaults,
Opts
).
%% Flatten the http2 sub-opts onto proto_opts top-level with an
%% `http2_` prefix so the hot path reads each knob with a single
%% `maps:get/2`. Returns an empty map when http2 isn't in the list.
-spec flatten_http2_opts([protocol_entry_norm(), ...]) -> #{atom() => term()}.
flatten_http2_opts(Entries) ->
case lists:keyfind(http2, 1, Entries) of
false ->
#{};
{http2, #{
conn_window := Conn,
stream_window := Stream,
window_refill_threshold := Threshold
}} ->
#{
http2_conn_window => Conn,
http2_stream_window => Stream,
http2_window_refill_threshold => Threshold
}
end.
build_dispatch(#{routes := Module} = Opts, _ListenerName) when is_atom(Module) ->
bake_dispatch(Module, Opts, [], no_state);
build_dispatch(#{routes := {Module, State}} = Opts, _ListenerName) when is_atom(Module) ->
bake_dispatch(Module, Opts, [], {state, State});
build_dispatch(#{routes := #{handler := Module} = Route} = Opts, _ListenerName) when
is_atom(Module)
->
HandlerMws = maps:get(middlewares, Route, []),
StateArg =
case Route of
#{state := S} -> {state, S};
_ -> no_state
end,
bake_dispatch(Module, Opts, HandlerMws, StateArg);
build_dispatch(#{routes := Routes}, ListenerName) when is_list(Routes) ->
{router, ListenerName};
build_dispatch(Opts, _ListenerName) ->
bake_dispatch(roadrunner_default_handler, Opts, [], no_state).
%% Single-handler dispatch counterpart of the router's compile path.
%% Defers to `roadrunner_middleware:compile_pipeline/3` after combining
%% the listener-wide and per-handler mws lists. State is exposed
%% alongside the pipeline so `roadrunner_router:match/2`-shaped
%% callers (and other introspection paths) can read it without
%% running the pipeline.
-spec bake_dispatch(
module(),
opts(),
roadrunner_middleware:middleware_list(),
no_state | {state, term()}
) -> roadrunner_conn:dispatch().
bake_dispatch(Handler, Opts, HandlerMws, StateArg) ->
ListenerMws = maps:get(middlewares, Opts, []),
Pipeline = roadrunner_middleware:compile_pipeline(
ListenerMws ++ HandlerMws, Handler, StateArg
),
State =
case StateArg of
no_state -> undefined;
{state, S} -> S
end,
{handler, Handler, Pipeline, State}.
-doc false.
-spec handle_call(
port
| info
| status
| {drain, non_neg_integer()}
| {reload_routes, roadrunner_router:routes()},
gen_server:from(),
#state{}
) -> {reply, term(), #state{}} | {stop, normal, term(), #state{}}.
handle_call(port, _From, #state{port = Port} = State) ->
{reply, Port, State};
handle_call(status, _From, #state{phase = Phase} = State) ->
{reply, Phase, State};
handle_call({drain, Timeout}, _From, State) ->
{Reply, NewState} = do_drain(State, Timeout),
{stop, normal, Reply, NewState};
handle_call({reload_routes, Routes}, _From, State) ->
Reply = do_reload_routes(State, Routes),
{reply, Reply, State};
handle_call(info, _From, #state{proto_opts = ProtoOpts} = State) ->
#{
client_counter := ClientCounter,
requests_counter := RequestsCounter,
max_clients := MaxClients
} = ProtoOpts,
Reply = #{
active_clients => atomics:get(ClientCounter, 1),
max_clients => MaxClients,
requests_served => atomics:get(RequestsCounter, 1)
},
{reply, Reply, State}.
-spec do_reload_routes(#state{}, roadrunner_router:routes()) ->
ok | {error, no_routes}.
do_reload_routes(
#state{proto_opts = #{dispatch := {router, Name}, middlewares := ListenerMws}},
Routes
) ->
persistent_term:put(
{roadrunner_routes, Name},
roadrunner_router:compile(Routes, ListenerMws)
),
ok;
do_reload_routes(#state{proto_opts = #{dispatch := {handler, _, _, _}}}, _Routes) ->
{error, no_routes}.
-spec do_drain(#state{}, non_neg_integer()) ->
{{ok, drained} | {timeout, non_neg_integer()}, #state{}}.
do_drain(#state{listen_socket = LSocket, proto_opts = ProtoOpts} = State, Timeout) ->
%% Close listen socket — accept fails, acceptors exit cleanly.
ok = roadrunner_transport:close(LSocket),
Deadline = erlang:monotonic_time(millisecond) + Timeout,
Group = drain_group(ProtoOpts),
notify_conns(Group, Deadline),
DrainingState = State#state{listen_socket = closed, phase = draining},
Counter = maps:get(client_counter, ProtoOpts),
Reply = wait_for_drain(Counter, Deadline, Group),
{Reply, DrainingState#state{phase = stopped}}.
%% Best-effort broadcast to in-flight conns. Loop / SSE / WebSocket
%% handlers can pattern-match on `{roadrunner_drain, Deadline}` in
%% `handle_info/3`; non-loop conns ignore the message and fall through
%% to the mailbox check at the next keep-alive boundary.
-spec notify_conns(term(), integer()) -> ok.
notify_conns(Group, Deadline) ->
_ = [Pid ! {roadrunner_drain, Deadline} || Pid <- pg:get_members(Group)],
ok.
%% Poll the active-clients atomics counter every 50ms (or whatever
%% remains, if smaller) until it hits zero or the deadline expires.
-spec wait_for_drain(atomics:atomics_ref(), integer(), term()) ->
{ok, drained} | {timeout, non_neg_integer()}.
wait_for_drain(Counter, Deadline, Group) ->
case atomics:get(Counter, 1) of
0 ->
{ok, drained};
N ->
Remaining = Deadline - erlang:monotonic_time(millisecond),
case Remaining =< 0 of
true ->
_ = [exit(Pid, shutdown) || Pid <- pg:get_members(Group)],
{timeout, N};
false ->
timer:sleep(min(50, Remaining)),
wait_for_drain(Counter, Deadline, Group)
end
end.
-spec drain_group(roadrunner_conn:proto_opts()) -> {roadrunner_drain, atom()}.
drain_group(#{listener_name := Name}) ->
{roadrunner_drain, Name}.
-doc false.
-spec handle_cast(term(), #state{}) -> {noreply, #state{}}.
handle_cast(_Msg, State) ->
{noreply, State}.
-doc false.
-spec handle_info(term(), #state{}) -> {noreply, #state{}}.
handle_info(reconcile_slots, #state{reconciliation = disabled} = State) ->
%% Race: a `slot_reconciliation` opt change between scheduling and
%% receipt would surface as the timer firing in disabled state.
%% Just drop it; the new config is the source of truth.
{noreply, State};
handle_info(
reconcile_slots,
#state{
proto_opts = #{client_counter := Counter, listener_name := Name},
reconciliation = #{interval := Interval, prev_diff := PrevDiff}
} = State
) ->
%% pg is supervised by roadrunner_sup so it's always up when the
%% reconciler runs (which only fires when explicitly opted into).
%%
%% We avoid `length/1` on the member list because `max_clients`
%% can be configured into the tens of thousands and a full O(N)
%% length walk would dominate the tick. We only need to know
%% whether `length(members) >= counter` (no orphans) or
%% `length(members) < counter` (orphans = counter - length); a
%% bounded count short-circuits at the counter so the worst case
%% is `min(length(members), counter)` element visits.
Counter0 = atomics:get(Counter, 1),
PgCountBounded = count_up_to(pg:get_members({roadrunner_drain, Name}), Counter0),
NewDiff = Counter0 - PgCountBounded,
%% Only release slots that have been orphaned for two consecutive
%% ticks — filters out the spawn-time race where a fresh conn has
%% incremented the counter but hasn't yet pg:join'd.
Release = min(PrevDiff, NewDiff),
case Release of
0 ->
ok;
N when N > 0 ->
_ = atomics:sub(Counter, 1, N),
logger:notice(#{
msg => "roadrunner_listener reconciled orphan slots",
listener_name => Name,
released => N,
counter_was => Counter0,
pg_count_bounded => PgCountBounded
}),
ok = roadrunner_telemetry:slots_reconciled(#{
listener_name => Name,
released => N,
counter_was => Counter0
})
end,
erlang:send_after(Interval, self(), reconcile_slots),
{noreply, State#state{
reconciliation = #{interval => Interval, prev_diff => NewDiff - Release}
}};
handle_info(_Msg, State) ->
{noreply, State}.
%% Count list elements, short-circuiting at `Cap`. Used by the slot
%% reconciler so the worst-case walk per tick is bounded by the
%% `client_counter` (i.e. `max_clients`) rather than the absolute
%% size of the pg member list.
-spec count_up_to([term()], non_neg_integer()) -> non_neg_integer().
count_up_to(List, Cap) ->
count_up_to(List, Cap, 0).
-spec count_up_to([term()], non_neg_integer(), non_neg_integer()) -> non_neg_integer().
count_up_to(_, Cap, N) when N >= Cap -> Cap;
count_up_to([], _Cap, N) -> N;
count_up_to([_ | T], Cap, N) -> count_up_to(T, Cap, N + 1).
-doc false.
-spec terminate(term(), #state{}) -> ok.
terminate(_Reason, #state{listen_socket = LSocket, proto_opts = ProtoOpts}) ->
erase_routes(ProtoOpts),
case LSocket of
%% `drain/2` already closed the listen socket on its way out.
closed -> ok;
_ -> roadrunner_transport:close(LSocket)
end.
-spec erase_routes(roadrunner_conn:proto_opts()) -> ok.
erase_routes(#{dispatch := {router, Name}}) ->
_ = persistent_term:erase({roadrunner_routes, Name}),
ok;
erase_routes(_) ->
ok.