defmodule ALLM.Providers.Support.SSE do
@moduledoc """
Stateless line-buffered Server-Sent Events (SSE) decoder.
Reused by all SSE-streaming providers (currently `ALLM.Providers.OpenAI`;
Phase 11's `ALLM.Providers.Anthropic` consumes it unchanged). Provider-specific
interpretation of `data:` payloads happens in each adapter's chunk-to-event
mapper; this module only parses the SSE wire format per the
[WHATWG Server-Sent Events spec](https://html.spec.whatwg.org/multipage/server-sent-events.html).
## Carve-out: the `:done` sentinel is OpenAI-specific
OpenAI's Chat Completions streaming terminates with a `data: [DONE]\\n\\n`
marker that has no native SSE meaning. To keep adapter chunk-mappers simple,
this decoder returns `:done` as an in-band sentinel inside the message list
whenever it parses a `data: [DONE]` event. Anthropic (Phase 11) signals
termination with a regular `event: message_stop` SSE event — Anthropic-side
reviewers must NOT blanket-pattern-match on `:done` and should instead
consume it as one possible element of the message list.
## Accumulator round-trip
The accumulator is a plain map (no PIDs / refs / funs) and round-trips
cleanly through `:erlang.term_to_binary/1`. This matters for
`Stream.resource/3` callers that thread the accumulator through
`start_fun → next_fun` state — see spec §7.2 and Invariant 8 in the
Phase 10 design doc's SSE Behaviour & Type Contracts.
## Spec sections
Spec §7.2 (HTTP/1 streaming guidance).
"""
@typedoc "Parsed SSE message per https://html.spec.whatwg.org/multipage/server-sent-events.html."
@type message :: %{
event: String.t() | nil,
data: String.t(),
id: String.t() | nil,
retry: pos_integer() | nil
}
@typedoc "Parser accumulator: pending byte buffer + in-progress message fields."
@type accumulator :: %{
buffer: binary(),
partial: %{
event: String.t() | nil,
data: [String.t()],
id: String.t() | nil,
retry: pos_integer() | nil
}
}
@typedoc "Sentinel emitted in-band for OpenAI's `[DONE]` terminator (provider-specific carve-out)."
@type done_marker :: :done
@empty_partial %{event: nil, data: [], id: nil, retry: nil}
@doc """
Returns an empty accumulator. Pure; no IO.
## Examples
iex> ALLM.Providers.Support.SSE.new()
%{buffer: "", partial: %{event: nil, data: [], id: nil, retry: nil}}
"""
@spec new() :: accumulator()
def new, do: %{buffer: "", partial: @empty_partial}
@doc """
Decodes one chunk of SSE bytes against the running accumulator.
Total over `binary() × accumulator()` — never raises on malformed input.
Malformed lines (no `:` separator) are silently dropped per the SSE spec's
"ignore unrecognized fields" rule. Comment lines (starting with `:`) are
also dropped.
Returns `{messages, new_accumulator}` where `messages` is a list of parsed
`t:message/0` values plus an in-band `:done` sentinel for OpenAI's
`data: [DONE]` terminator (see module doc).
Honors all three line terminators from the SSE spec: LF, CR, CRLF.
## Examples
iex> {messages, _acc} =
...> ALLM.Providers.Support.SSE.new()
...> |> ALLM.Providers.Support.SSE.decode_chunk("data: hello\\n\\n")
iex> messages
[%{event: nil, data: "hello", id: nil, retry: nil}]
"""
@spec decode_chunk(accumulator(), binary()) :: {[message() | done_marker()], accumulator()}
def decode_chunk(%{buffer: buffer, partial: partial} = _acc, chunk) when is_binary(chunk) do
{lines, leftover} = split_lines(buffer <> chunk)
{messages, final_partial} = process_lines(lines, partial, [])
{messages, %{buffer: leftover, partial: final_partial}}
end
# --- Line splitting ----------------------------------------------------
# Splits the accumulated bytes into completed lines and a trailing remnant.
# A "completed" line is one terminated by LF, CR, or CRLF.
@spec split_lines(binary()) :: {[binary()], binary()}
defp split_lines(binary), do: do_split_lines(binary, "", [])
defp do_split_lines(<<"\r\n", rest::binary>>, current, acc),
do: do_split_lines(rest, "", [current | acc])
defp do_split_lines(<<"\n", rest::binary>>, current, acc),
do: do_split_lines(rest, "", [current | acc])
defp do_split_lines(<<"\r", rest::binary>>, current, acc),
do: do_split_lines(rest, "", [current | acc])
defp do_split_lines(<<byte, rest::binary>>, current, acc),
do: do_split_lines(rest, current <> <<byte>>, acc)
defp do_split_lines(<<>>, current, acc), do: {Enum.reverse(acc), current}
# --- Line processing ---------------------------------------------------
# Walks the completed lines and folds them into messages and a residual
# partial. An empty line dispatches the partial as a message; a comment
# line is dropped; a parseable `field: value` line accumulates onto the
# partial.
@spec process_lines([binary()], map(), [message() | done_marker()]) ::
{[message() | done_marker()], map()}
defp process_lines([], partial, messages), do: {Enum.reverse(messages), partial}
defp process_lines(["" | rest], partial, messages) do
case dispatch(partial) do
:no_message -> process_lines(rest, @empty_partial, messages)
message -> process_lines(rest, @empty_partial, [message | messages])
end
end
defp process_lines([<<":", _rest_of_line::binary>> | rest], partial, messages) do
# Comment line — drop.
process_lines(rest, partial, messages)
end
defp process_lines([line | rest], partial, messages) do
case parse_field(line) do
{:ok, field, value} -> process_lines(rest, apply_field(partial, field, value), messages)
:ignore -> process_lines(rest, partial, messages)
end
end
# --- Field parsing -----------------------------------------------------
# Splits a line on its first `:`; per the SSE spec, a single space after
# the colon is stripped from the value.
@spec parse_field(binary()) :: {:ok, String.t(), String.t()} | :ignore
defp parse_field(line) do
case :binary.split(line, ":") do
[field, value] -> {:ok, field, strip_leading_space(value)}
_no_colon -> :ignore
end
end
defp strip_leading_space(<<" ", rest::binary>>), do: rest
defp strip_leading_space(value), do: value
# Applies a parsed field to the partial. Unknown fields are dropped per
# the SSE spec.
@spec apply_field(map(), String.t(), String.t()) :: map()
defp apply_field(partial, "data", value) do
%{partial | data: [value | partial.data]}
end
defp apply_field(partial, "event", value), do: %{partial | event: value}
defp apply_field(partial, "id", value), do: %{partial | id: value}
defp apply_field(partial, "retry", value) do
case Integer.parse(value) do
{ms, ""} when ms > 0 -> %{partial | retry: ms}
_ -> partial
end
end
defp apply_field(partial, _field, _value), do: partial
# --- Dispatch ----------------------------------------------------------
# An empty-line dispatch: build a `t:message/0` (or `:done` sentinel)
# from the partial. If no `data:` accumulated, emit nothing.
@spec dispatch(map()) :: message() | done_marker() | :no_message
defp dispatch(%{data: []}), do: :no_message
defp dispatch(%{data: data_lines, event: event, id: id, retry: retry}) do
data = data_lines |> Enum.reverse() |> Enum.join("\n")
if data == "[DONE]" do
:done
else
%{event: event, data: data, id: id, retry: retry}
end
end
end