Skip to main content

lib/terminus_db/streaming.ex

defmodule TerminusDB.Streaming do
  @moduledoc """
  Incremental decoding of TerminusDB concatenated-JSON response bodies.

  TerminusDB's document endpoint returns documents as *concatenated JSON*
  (multiple JSON objects back-to-back, not newline-delimited and not a JSON
  array) when the `as_list` query parameter is not set. This module provides a
  bracket/depth-aware splitter that incrementally parses chunks emitted by Req's
  streaming (`into:`) into a stream of decoded maps.

  Used by `TerminusDB.Document.stream/2` (ADR-0007). Note: `Document.stream/2`
  does not pass `as_list: true`, so the server returns concatenated JSON; do not
  enable `as_list` on a streaming call as this splitter does not handle JSON
  array bodies.
  """

  @doc """
  Returns a stream of decoded JSON maps from a Req response that delivers chunks
  to `into: :self`. The response body must be concatenated JSON objects (the
  TerminusDB default when `as_list` is not set); JSON array bodies are not
  supported.

  ## Options

  - `:timeout` — receive timeout in milliseconds between chunks. If no message
    arrives within this window, the stream halts (defaults to `15_000`).

  ## Examples

      # With a Req response streamed via `into: :self`:
      resp = Req.get!(req, url: "document/admin/mydb", into: :self)
      TerminusDB.Streaming.document_stream(resp) |> Enum.take(10)

  """
  @spec document_stream(Req.Response.t(), keyword()) :: Enumerable.t()
  def document_stream(%Req.Response{} = resp, opts \\ []) do
    timeout = opts[:timeout] || 15_000

    Stream.resource(
      fn -> {resp, <<>>, false, timeout} end,
      &next_document/1,
      fn _ -> :ok end
    )
  end

  @doc """
  Splits a binary accumulator of concatenated JSON into a list of complete JSON
  object binaries and the remaining buffer. Objects are split on top-level `}`
  that closes the initial `{`, respecting string literals and nested braces.

  If the buffer ends inside a string and the last byte is a lone `\\` (an
  incomplete escape sequence), the backslash is retained in the returned buffer
  so the caller can prepend the next chunk and complete the escape.
  """
  @spec split_concatenated(binary()) :: {[binary()], binary()}
  def split_concatenated(buffer) when is_binary(buffer) do
    do_split(buffer, 0, false, [], <<>>)
  end

  # The accumulator walks the buffer, tracking depth and string state. When a
  # top-level object closes (depth returns to 0), the bytes consumed so far form
  # a complete JSON object.
  defp do_split(<<>>, 0, false, acc_docs, <<>>) when acc_docs == [],
    do: {[], <<>>}

  defp do_split(<<>>, 0, false, acc_docs, current) do
    if current == <<>> do
      {Enum.reverse(acc_docs), <<>>}
    else
      {Enum.reverse(acc_docs), current}
    end
  end

  defp do_split(<<>>, _depth, _in_string, acc_docs, current),
    do: {Enum.reverse(acc_docs), current}

  # Opening brace: increase depth (unless inside a string).
  defp do_split(<<"{", rest::binary>>, depth, false, acc_docs, current) do
    do_split(rest, depth + 1, false, acc_docs, current <> "{")
  end

  # Closing brace: decrease depth; if we hit 0, we have a complete object.
  defp do_split(<<"}", rest::binary>>, 1, false, acc_docs, current) do
    complete = current <> "}"
    do_split(rest, 0, false, [complete | acc_docs], <<>>)
  end

  defp do_split(<<"}", rest::binary>>, depth, false, acc_docs, current) when depth > 1 do
    do_split(rest, depth - 1, false, acc_docs, current <> "}")
  end

  # Escape sequence inside a string: consume the backslash and the next char.
  defp do_split(<<"\\", char, rest::binary>>, depth, true, acc_docs, current) do
    do_split(rest, depth, true, acc_docs, <<current::binary, ?\\, char>>)
  end

  # Lone trailing backslash inside a string at end of buffer: keep it in the
  # buffer so the next chunk can complete the escape sequence.
  defp do_split(<<"\\">>, _depth, true, acc_docs, current),
    do: {Enum.reverse(acc_docs), current <> "\\"}

  # String handling: toggle in_string on unescaped quotes.
  defp do_split(<<"\"", rest::binary>>, depth, in_string, acc_docs, current) do
    do_split(rest, depth, not in_string, acc_docs, current <> "\"")
  end

  # Any other byte: accumulate.
  defp do_split(<<byte, rest::binary>>, depth, in_string, acc_docs, current) do
    do_split(rest, depth, in_string, acc_docs, <<current::binary, byte>>)
  end

  # Stream iteration: pull messages from the async response, accumulate, split.
  defp next_document({resp, buffer, true, _timeout}) do
    {:halt, {resp, buffer, true, nil}}
  end

  defp next_document({resp, buffer, false, timeout}) do
    case Req.parse_message(
           resp,
           receive do
             message -> message
           after
             timeout -> :timeout
           end
         ) do
      {:ok, chunks} ->
        {docs, new_buffer} = process_chunks(chunks, buffer)
        decoded = Enum.map(docs, &Jason.decode!/1)
        done = Enum.member?(chunks, :done)
        {decoded, {resp, new_buffer, done, timeout}}

      {:error, _} ->
        {:halt, {resp, buffer, true, timeout}}

      :timeout ->
        # No chunk arrived within the timeout window; halt to avoid hanging.
        {:halt, {resp, buffer, true, timeout}}

      :unknown ->
        next_document({resp, buffer, false, timeout})
    end
  end

  defp process_chunks(chunks, buffer) do
    raw =
      Enum.reduce(chunks, buffer, fn
        {:data, data}, acc -> acc <> data
        _other, acc -> acc
      end)

    split_concatenated(raw)
  end
end