Skip to main content

lib/agent_sea/providers/sse.ex

defmodule AgentSea.Providers.SSE do
  @moduledoc """
  Frames a stream of raw HTTP body chunks into Server-Sent Events.

  Pure and lazy: it buffers across chunk boundaries (an event may be split mid-
  way between two network reads) and emits one map per complete event with its
  `:event` name (if any) and concatenated `:data` payload.
  """

  @type event :: %{event: String.t() | nil, data: String.t()}

  @doc "Turn an enumerable of binary chunks into a lazy stream of `t:event/0`."
  @spec events(Enumerable.t()) :: Enumerable.t()
  def events(chunks) do
    chunks
    # ensure the final block is flushed even without a trailing blank line
    |> Stream.concat(["\n\n"])
    |> Stream.transform("", fn chunk, buffer ->
      split_blocks(buffer <> chunk)
    end)
    |> Stream.map(&parse_block/1)
    |> Stream.reject(&is_nil/1)
  end

  # Returns {complete_event_blocks, leftover_buffer}.
  defp split_blocks(buffer) do
    parts = String.split(buffer, "\n\n")
    {complete, [rest]} = Enum.split(parts, length(parts) - 1)
    {complete, rest}
  end

  defp parse_block(block) do
    lines =
      block
      |> String.split("\n")
      |> Enum.map(&String.trim_trailing(&1, "\r"))

    event =
      Enum.find_value(lines, fn
        "event:" <> rest -> String.trim(rest)
        _ -> nil
      end)

    data =
      lines
      |> Enum.filter(&String.starts_with?(&1, "data:"))
      |> Enum.map_join("\n", fn "data:" <> rest -> String.trim_leading(rest) end)

    if data == "" and is_nil(event), do: nil, else: %{event: event, data: data}
  end
end