Skip to main content

src/roadrunner_sse.erl

-module(roadrunner_sse).
-moduledoc """
Server-Sent Events (SSE) encoding helpers.

Pairs naturally with the `{loop, ...}` handler return: subscribe to a
pubsub topic in `handle/1`, then call `roadrunner_sse:event/1,2,3` from
`handle_info/3` to format each event before emitting it via the
`Push` fun.

Example handler:

```erlang
-behaviour(roadrunner_handler).
-export([handle/1, handle_info/3]).

handle(Req) ->
    ok = my_pubsub:subscribe(self(), notifications),
    Headers = [
        {~"content-type", ~"text/event-stream"},
        {~"cache-control", ~"no-cache"}
    ],
    {{loop, 200, Headers, undefined}, Req}.

handle_info({notification, Body}, Push, State) ->
    _ = Push(roadrunner_sse:event(~"notify", Body)),
    {ok, State};
handle_info(close, Push, State) ->
    _ = Push(roadrunner_sse:comment(~"bye")),
    {stop, State}.
```

Per the SSE spec ([WHATWG HTML §9.2](https://html.spec.whatwg.org/multipage/server-sent-events.html#parsing-an-event-stream)):

- Each event ends with a blank line (`\n\n`).
- `data:` may repeat — newlines in the payload split it into multiple
  `data:` lines, all reassembled into one event by the client.
- `event:` is optional; an unnamed event dispatches as the generic
  `message` event in the browser.
- `id:` lets the client resume from the last event seen.
- `retry: N` tells the client to wait N milliseconds before
  reconnecting.
- A line starting with `:` is a comment — useful for keep-alives over
  proxies that close idle connections.
""".

-on_load(init_patterns/0).

-define(LF_CP_KEY, {?MODULE, lf_cp}).
-define(LINE_BREAK_CP_KEY, {?MODULE, line_break_cp}).

-export([
    event/1,
    event/2,
    event/3,
    comment/1,
    retry/1
]).

-doc "Anonymous event with just a data payload.".
-spec event(Data :: binary()) -> iodata().
event(Data) when is_binary(Data) ->
    [data_lines(Data), $\n].

-doc "Named event with a data payload.".
-spec event(EventName :: binary(), Data :: binary()) -> iodata().
event(Name, Data) when is_binary(Name), is_binary(Data) ->
    ok = check_single_line(Name, event_name),
    [~"event: ", Name, $\n, data_lines(Data), $\n].

-doc """
Named event with an id (for client-side reconnection resume) and a
data payload.
""".
-spec event(EventName :: binary(), Data :: binary(), Id :: binary()) -> iodata().
event(Name, Data, Id) when is_binary(Name), is_binary(Data), is_binary(Id) ->
    ok = check_single_line(Name, event_name),
    ok = check_single_line(Id, event_id),
    [~"event: ", Name, $\n, ~"id: ", Id, $\n, data_lines(Data), $\n].

-doc """
SSE comment line — invisible to the client's event handler but useful
for keep-alives over proxies.
""".
-spec comment(Text :: binary()) -> iodata().
comment(Text) when is_binary(Text) ->
    ok = check_single_line(Text, comment),
    [~": ", Text, ~"\n\n"].

%% SSE field values (event name, id, comment text) MUST NOT contain
%% line separators — `\r` or `\n` would split the value into a second
%% line that's either silently dropped or interpreted as a new field
%% by the client. Crash hard so a programmer bug — usually echoing
%% user input into one of these fields without sanitization — turns
%% into a 500, not a stream-corruption vulnerability. The `data` field
%% is exempt; multi-line `data:` emission is handled by `data_lines/1`.
-spec check_single_line(binary(), atom()) -> ok.
check_single_line(Bin, Field) ->
    case binary:match(Bin, persistent_term:get(?LINE_BREAK_CP_KEY)) of
        nomatch -> ok;
        _ -> error({sse_line_break, Field, Bin})
    end.

-doc """
Tell the client how long (in milliseconds) to wait before retrying
after a connection drop.
""".
-spec retry(Ms :: non_neg_integer()) -> iodata().
retry(Ms) when is_integer(Ms), Ms >= 0 ->
    [~"retry: ", integer_to_binary(Ms), ~"\n\n"].

%% Encode `Data` as one or more `data:` lines (split on each `\n`),
%% each terminated by a single newline. The caller appends the
%% trailing blank line that ends the event.
-spec data_lines(binary()) -> iodata().
data_lines(Data) ->
    [
        [~"data: ", Line, $\n]
     || Line <- binary:split(Data, persistent_term:get(?LF_CP_KEY), [global])
    ].

%% `-on_load` callback. See `feedback_compile_pattern_convention`.
-spec init_patterns() -> ok.
init_patterns() ->
    persistent_term:put(?LF_CP_KEY, binary:compile_pattern(~"\n")),
    persistent_term:put(?LINE_BREAK_CP_KEY, binary:compile_pattern([~"\r", ~"\n"])),
    ok.