Skip to main content

priv/rx_backend.R

.rx_lib_paths <- commandArgs(trailingOnly = TRUE)
if (length(.rx_lib_paths) > 0L) {
  .libPaths(c(.rx_lib_paths, .libPaths()))
}
rm(.rx_lib_paths)

.rx_renv_enabled <- identical(Sys.getenv("RX_RENV_ENABLED", unset = ""), "1")
if (.rx_renv_enabled) {
  .rx_renv_project <- Sys.getenv("RX_RENV_PROJECT", unset = "")
  .rx_renv_lockfile <- Sys.getenv("RX_RENV_LOCKFILE", unset = "")

  if (!nzchar(.rx_renv_project)) {
    stop("RX_RENV_PROJECT is required when RX_RENV_ENABLED=1", call. = FALSE)
  }

  if (!nzchar(.rx_renv_lockfile)) {
    stop("RX_RENV_LOCKFILE is required when RX_RENV_ENABLED=1", call. = FALSE)
  }

  Sys.setenv(RENV_PATHS_LOCKFILE = .rx_renv_lockfile)

  if (!requireNamespace("renv", quietly = TRUE)) {
    stop("missing required R package: renv", call. = FALSE)
  }

  invisible(capture.output(
    suppressWarnings(suppressMessages(renv::load(.rx_renv_project, quiet = TRUE)))
  ))
}

rm(.rx_renv_enabled)
if (exists(".rx_renv_project", inherits = FALSE)) rm(.rx_renv_project)
if (exists(".rx_renv_lockfile", inherits = FALSE)) rm(.rx_renv_lockfile)

suppressPackageStartupMessages(library(jsonlite))

# R 4.x marks stdout()/stdin() as mode "w"/"r" (text). writeBin/readBin
# require binary connections. Open /dev/fd/0 and /dev/fd/1 explicitly in
# binary mode — this goes straight to the Erlang port pipe without seeking.
.rx_in  <- suppressWarnings(file("/dev/fd/0", open = "rb"))
.rx_out <- suppressWarnings(file("/dev/fd/1", open = "wb"))

read_exact <- function(n) {
  buf <- readBin(.rx_in, what = "raw", n = n)
  if (length(buf) < n) quit(save = "no", status = 0)
  buf
}

# Read a 4-byte big-endian unsigned integer.
# readBin with signed=FALSE is deprecated for size=4 in R 4.5.x; use
# signed=TRUE and convert negative two's-complement values to unsigned.
# 2147483648 (2^31) must be a double literal — it overflows R's 32-bit integer.
.U32_SIGN_BIT <- 2147483648
.U32_MAX_INT  <- 2147483647L

read_u32 <- function() {
  n <- readBin(read_exact(4), "integer", size = 4L, endian = "big", signed = TRUE)
  if (n < 0L) bitwOr(bitwAnd(n, .U32_MAX_INT), .U32_SIGN_BIT) else n
}

read_u64 <- function() {
  raw <- read_exact(8)
  hi <- readBin(raw[1:4], "integer", size = 4L, endian = "big", signed = TRUE)
  lo <- readBin(raw[5:8], "integer", size = 4L, endian = "big", signed = TRUE)
  hi_u <- if (hi < 0L) bitwOr(bitwAnd(hi, .U32_MAX_INT), .U32_SIGN_BIT) else hi
  lo_u <- if (lo < 0L) bitwOr(bitwAnd(lo, .U32_MAX_INT), .U32_SIGN_BIT) else lo
  as.numeric(hi_u) * 4294967296 + as.numeric(lo_u)
}

write_frame <- function(header, body = raw()) {
  header_raw <- charToRaw(toJSON(header, auto_unbox = TRUE, null = "null"))
  writeBin(length(header_raw), .rx_out, size = 4, endian = "big")
  writeBin(header_raw, .rx_out)
  writeBin(length(body), .rx_out, size = 8, endian = "big")
  if (length(body) > 0) writeBin(body, .rx_out)
  flush(.rx_out)
}

# Object store for opaque R values
.__rx_objects__ <- new.env(parent = emptyenv())
.__rx_object_meta__ <- new.env(parent = emptyenv())
.__rx_next_id__ <- 0L
.__rx_session_id__ <- paste0(
  Sys.getpid(), "_",
  gsub("[^0-9]", "", format(Sys.time(), "%Y%m%d%H%M%OS6"))
)

store_object <- function(value, metadata = NULL) {
  .__rx_next_id__ <<- .__rx_next_id__ + 1L
  id <- paste0("rx_", .__rx_session_id__, "_obj_", .__rx_next_id__)
  assign(id, value, envir = .__rx_objects__)
  if (!is.null(metadata)) {
    assign(id, metadata, envir = .__rx_object_meta__)
  }
  id
}

is_stale_object_id <- function(id) {
  is.character(id) && length(id) == 1L &&
    (startsWith(id, "rx_") || startsWith(id, "rpx_")) &&
    !startsWith(id, paste0("rx_", .__rx_session_id__, "_")) &&
    !startsWith(id, paste0("rpx_", .__rx_session_id__, "_"))
}

get_object <- function(id) {
  if (is_stale_object_id(id)) {
    stop("stale object handle from previous R backend session: ", id, call. = FALSE)
  }
  if (!exists(id, envir = .__rx_objects__, inherits = FALSE)) {
    stop("object not found: ", id, call. = FALSE)
  }
  get(id, envir = .__rx_objects__)
}

get_object_metadata <- function(id) {
  if (is_stale_object_id(id)) {
    stop("stale object handle from previous R backend session: ", id, call. = FALSE)
  }
  if (exists(id, envir = .__rx_object_meta__, inherits = FALSE)) {
    get(id, envir = .__rx_object_meta__)
  } else {
    NULL
  }
}

encode_table <- function(value) {
  dims <- dim(value)
  if (is.null(dims)) dims <- length(value)

  names_list <- dimnames(value)
  if (is.null(names_list)) {
    names_list <- vector("list", length(dims))
    for (i in seq_along(dims)) names_list[[i]] <- as.character(seq_len(dims[[i]]))
  }

  names_list_names <- names(names_list)
  dimname_entries <- lapply(seq_along(names_list), function(i) {
    name <- if (!is.null(names_list_names) &&
                length(names_list_names) >= i &&
                nzchar(names_list_names[[i]])) {
      names_list_names[[i]]
    } else {
      NULL
    }

    list(
      name = name,
      values = as.list(as.character(names_list[[i]]))
    )
  })

  list(
    kind = "table",
    counts = as.list(as.integer(value)),
    dim = as.list(as.integer(dims)),
    dimnames = dimname_entries
  )
}

decode_table <- function(v) {
  counts <- as.integer(unlist(v$counts, use.names = FALSE))
  dims <- as.integer(unlist(v$dim, use.names = FALSE))

  dimname_entries <- v$dimnames
  names_list <- lapply(dimname_entries, function(entry) {
    as.character(unlist(entry$values, use.names = FALSE))
  })

  dim_names <- vapply(dimname_entries, function(entry) {
    if (is.null(entry$name)) "" else as.character(entry$name)
  }, character(1))

  if (any(nzchar(dim_names))) names(names_list) <- dim_names

  structure(
    array(counts, dim = dims, dimnames = names_list),
    class = "table"
  )
}

.rx_dataframe_error <- function(tag, ...) {
  args <- list(...)
  message <- paste0(
    tag,
    ": ",
    paste(vapply(args, as.character, character(1)), collapse = ", ")
  )
  structure(
    list(tag = tag, args = args, message = message),
    class = c("rx_dataframe_error", "error", "condition")
  )
}

conditionMessage.rx_dataframe_error <- function(c) c$message

.rx_write_dataframe_error_frame <- function(id, err) {
  if (inherits(err, "rx_dataframe_error")) {
    write_frame(list(
      id = id,
      status = "error",
      message = conditionMessage(err),
      rx_error = list(tag = err$tag, args = err$args)
    ))
  } else {
    write_frame(list(id = id, status = "error", message = conditionMessage(err)))
  }
}

.rx_dataframe_reason <- function(reason) {
  if (is.character(reason) && length(reason) == 1L) return(reason)
  reason
}

.rx_dataframe_column_type <- function(column) {
  if (!is.null(dim(column))) {
    stop(.rx_dataframe_error("unsupported_dataframe_column", "", "matrix"))
  }

  class_attr <- attr(column, "class", exact = TRUE)
  if (!is.null(class_attr) && length(class_attr) > 0L) {
    if (inherits(column, "factor")) {
      stop(.rx_dataframe_error("unsupported_dataframe_column", "", "factor"))
    }
    if (inherits(column, "Date")) {
      stop(.rx_dataframe_error("unsupported_dataframe_column", "", "date"))
    }
    if (inherits(column, "POSIXt")) {
      stop(.rx_dataframe_error("unsupported_dataframe_column", "", "posix"))
    }
  }

  if (is.list(column)) {
    stop(.rx_dataframe_error("unsupported_dataframe_column", "", "list"))
  }

  if (!is.null(class_attr) && length(class_attr) > 0L) {
    stop(.rx_dataframe_error(
      "unsupported_dataframe_column",
      "",
      list(tuple = list("class", as.character(class_attr[[1L]])))
    ))
  }

  if (is.logical(column)) return("logical")
  if (is.integer(column)) return("integer")
  if (is.double(column)) return("double")
  if (is.character(column)) return("character")
  if (is.complex(column)) {
    stop(.rx_dataframe_error("unsupported_dataframe_column", "", "complex"))
  }
  if (is.raw(column)) {
    stop(.rx_dataframe_error("unsupported_dataframe_column", "", "raw"))
  }
  stop(.rx_dataframe_error("unsupported_dataframe_column", "", "unsupported_type"))
}

.rx_dataframe_default_row_names <- function(value) {
  .row_names_info(value, 1L) <= 0L
}

.rx_dataframe_json_array <- function(value) {
  is.list(value) && is.null(names(value))
}

.rx_dataframe_wire_field <- function(value, name) {
  if (!is.list(value)) return(NULL)
  value[[name, exact = TRUE]]
}

.rx_dataframe_wire_integer <- function(value, min = NULL, max = .U32_MAX_INT) {
  if (is.null(value) || is.list(value) || length(value) != 1L ||
      is.na(value) || !is.numeric(value) || !is.finite(value) ||
      value != floor(value)) {
    stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
  }
  if (!is.null(min) && value < min) {
    stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
  }
  if (!is.null(max) && value > max) {
    stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
  }

  as.integer(value)
}

.rx_dataframe_wire_value <- function(value, type) {
  if (is.list(value) || is.null(value) || length(value) != 1L || is.na(value)) {
    stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
  }

  if (identical(type, "logical")) {
    if (!is.logical(value)) stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
    return(as.logical(value))
  }
  if (identical(type, "integer")) {
    if (!is.numeric(value) || !is.finite(value) || value != floor(value) ||
        value < -2147483647 || value > 2147483647) {
      stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
    }
    return(as.integer(value))
  }
  if (identical(type, "double")) {
    if (!is.numeric(value)) stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
    out <- as.numeric(value)
    if (!is.finite(out)) {
      stop(.rx_dataframe_error("unsupported_dataframe_column", "", "non_finite_double"))
    }
    return(out)
  }
  if (identical(type, "character")) {
    if (!is.character(value)) stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
    return(as.character(value))
  }
  stop(.rx_dataframe_error("invalid_dataframe", "unsupported_type"))
}

.rx_dataframe_column_values <- function(column, type) {
  if (identical(type, "double") && any(is.infinite(column) | is.nan(column), na.rm = TRUE)) {
    stop(.rx_dataframe_error("unsupported_dataframe_column", "", "non_finite_double"))
  }

  lapply(seq_along(column), function(i) {
    if (is.na(column[[i]])) {
      list(kind = "na", type = type)
    } else if (identical(type, "double")) {
      as.numeric(column[[i]])
    } else if (identical(type, "integer")) {
      as.integer(column[[i]])
    } else if (identical(type, "logical")) {
      as.logical(column[[i]])
    } else {
      as.character(column[[i]])
    }
  })
}

.rx_encode_data_frame <- function(value, max_rows = NULL) {
  if (!inherits(value, "data.frame")) {
    stop(.rx_dataframe_error("not_dataframe", "R object is not a data.frame"))
  }

  rows <- nrow(value)
  if (!is.null(max_rows) && rows > max_rows) {
    stop(.rx_dataframe_error("dataframe_too_large", rows, max_rows))
  }

  names_value <- names(value)
  if (is.null(names_value) || length(names_value) != length(value) || any(is.na(names_value))) {
    stop(.rx_dataframe_error("unsupported_dataframe_names", "non_string"))
  }
  if (any(!nzchar(names_value))) {
    stop(.rx_dataframe_error("unsupported_dataframe_names", "empty"))
  }
  if (anyDuplicated(names_value) != 0L) {
    stop(.rx_dataframe_error("unsupported_dataframe_names", "duplicate"))
  }
  if (length(value) == 0L && rows > 0L) {
    stop(.rx_dataframe_error("unsupported_dataframe_shape", "zero_column_nonzero_row"))
  }
  if (!.rx_dataframe_default_row_names(value)) {
    stop(.rx_dataframe_error("unsupported_dataframe_row_names", "custom"))
  }

  columns <- lapply(seq_along(value), function(i) {
    column_name <- names_value[[i]]
    column <- value[[i]]
    type <- tryCatch(
      .rx_dataframe_column_type(column),
      rx_dataframe_error = function(e) {
        if (identical(e$tag, "unsupported_dataframe_column") &&
            identical(e$args[[1L]], "")) {
          stop(.rx_dataframe_error("unsupported_dataframe_column", column_name, e$args[[2L]]))
        }
        stop(e)
      }
    )
    values <- tryCatch(
      .rx_dataframe_column_values(column, type),
      rx_dataframe_error = function(e) {
        if (identical(e$tag, "unsupported_dataframe_column") &&
            identical(e$args[[1L]], "")) {
          stop(.rx_dataframe_error("unsupported_dataframe_column", column_name, e$args[[2L]]))
        }
        stop(e)
      }
    )
    list(name = column_name, type = type, values = values)
  })

  list(kind = "data_frame", names = as.list(names_value), n_rows = rows, columns = columns)
}

.rx_decode_data_frame_value <- function(value, type) {
  if (is.list(value) && identical(.rx_dataframe_wire_field(value, "kind"), "na")) {
    if (!identical(.rx_dataframe_wire_field(value, "type"), type)) {
      stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
    }
    return(switch(type,
      logical = NA,
      integer = NA_integer_,
      double = NA_real_,
      character = NA_character_
    ))
  }

  .rx_dataframe_wire_value(value, type)
}

.rx_decode_data_frame <- function(wire) {
  kind <- .rx_dataframe_wire_field(wire, "kind")
  names_wire <- .rx_dataframe_wire_field(wire, "names")
  n_rows_wire <- .rx_dataframe_wire_field(wire, "n_rows")
  columns <- .rx_dataframe_wire_field(wire, "columns")

  if (!is.list(wire) || !identical(kind, "data_frame")) {
    stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
  }
  if (is.null(names_wire) || !.rx_dataframe_json_array(names_wire) ||
      is.null(n_rows_wire) || is.null(columns) ||
      !.rx_dataframe_json_array(columns)) {
    stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
  }

  names_value <- vapply(names_wire, function(name) {
    if (!is.character(name) || length(name) != 1L || is.na(name)) {
      stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
    }
    name
  }, character(1))
  n_rows <- .rx_dataframe_wire_integer(n_rows_wire, min = 0)
  if (any(!nzchar(names_value))) {
    stop(.rx_dataframe_error("invalid_dataframe", "empty_name"))
  }
  if (anyDuplicated(names_value) != 0L) {
    stop(.rx_dataframe_error("invalid_dataframe", "duplicate_names"))
  }
  if (length(names_value) != length(columns)) {
    stop(.rx_dataframe_error("invalid_dataframe", "column_key_mismatch"))
  }
  if (length(names_value) == 0L && n_rows > 0L) {
    stop(.rx_dataframe_error("unsupported_dataframe_shape", "zero_column_nonzero_row"))
  }

  out <- vector("list", length(columns))
  names(out) <- names_value

  for (i in seq_along(columns)) {
    entry <- columns[[i]]
    name <- .rx_dataframe_wire_field(entry, "name")
    type <- .rx_dataframe_wire_field(entry, "type")
    values_wire <- .rx_dataframe_wire_field(entry, "values")

    if (!is.list(entry) || is.null(name) || is.null(type) ||
        is.null(values_wire) || !.rx_dataframe_json_array(values_wire)) {
      stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
    }
    if (!is.character(name) || length(name) != 1L || is.na(name) ||
        !is.character(type) || length(type) != 1L || is.na(type)) {
      stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
    }
    if (!identical(name, names_value[[i]])) {
      stop(.rx_dataframe_error("invalid_dataframe", "column_key_mismatch"))
    }
    values <- tryCatch(
      lapply(values_wire, .rx_decode_data_frame_value, type = type),
      rx_dataframe_error = function(e) {
        if (identical(e$tag, "unsupported_dataframe_column") &&
            identical(e$args[[1L]], "")) {
          stop(.rx_dataframe_error("unsupported_dataframe_column", name, e$args[[2L]]))
        }
        stop(e)
      }
    )
    if (length(values) != n_rows) {
      stop(.rx_dataframe_error("invalid_dataframe", "column_length_mismatch"))
    }
    out[[i]] <- switch(type,
      logical = as.logical(unlist(values, use.names = FALSE)),
      integer = as.integer(unlist(values, use.names = FALSE)),
      double = as.numeric(unlist(values, use.names = FALSE)),
      character = as.character(unlist(values, use.names = FALSE)),
      stop(.rx_dataframe_error("invalid_dataframe", "unsupported_type"))
    )
  }

  as.data.frame(out, stringsAsFactors = FALSE, check.names = FALSE)
}

is_semantic_object <- function(value) {
  class_attr <- attr(value, "class", exact = TRUE)
  dim_attr <- dim(value)
  (!is.null(class_attr) && length(class_attr) > 0L) ||
    (!is.null(dim_attr) && length(dim_attr) > 0L)
}

encode_value <- function(value, depth = 0L) {
  if (is.null(value)) return(list(kind = "scalar", type = "null", value = NULL))
  if (inherits(value, "table")) return(encode_table(value))
  if (is_semantic_object(value)) {
    return(list(kind = "object", id = store_object(value)))
  }
  if (is.logical(value) && length(value) == 1) {
    if (is.na(value)) return(list(kind = "na", type = "logical"))
    return(list(kind = "scalar", type = "logical", value = value))
  }
  if (is.integer(value) && length(value) == 1) {
    if (is.na(value)) return(list(kind = "na", type = "integer"))
    return(list(kind = "scalar", type = "integer", value = value))
  }
  if (is.double(value) && length(value) == 1) {
    if (is.na(value)) return(list(kind = "na", type = "double"))
    return(list(kind = "scalar", type = "double", value = value))
  }
  if (is.character(value) && length(value) == 1) {
    if (is.na(value)) return(list(kind = "na", type = "character"))
    return(list(kind = "scalar", type = "character", value = value))
  }
  # flat vectors
  if (is.logical(value) && length(value) != 1) {
    vals <- lapply(seq_along(value), function(i) if (is.na(value[i])) list(kind = "na", type = "logical") else value[i])
    return(list(kind = "vector", type = "logical", values = vals))
  }
  if (is.integer(value) && length(value) != 1) {
    vals <- lapply(seq_along(value), function(i) if (is.na(value[i])) list(kind = "na", type = "integer") else value[i])
    return(list(kind = "vector", type = "integer", values = vals))
  }
  if (is.double(value) && length(value) != 1) {
    vals <- lapply(seq_along(value), function(i) if (is.na(value[i])) list(kind = "na", type = "double") else value[i])
    return(list(kind = "vector", type = "double", values = vals))
  }
  if (is.character(value) && length(value) != 1) {
    vals <- lapply(seq_along(value), function(i) if (is.na(value[i])) list(kind = "na", type = "character") else value[i])
    return(list(kind = "vector", type = "character", values = vals))
  }
  # plain list (not classed — data.frame, S3, S4, POSIXlt, etc. are excluded by identical(class, "list"))
  if (is.list(value) && identical(class(value), "list")) {
    if (depth >= 100L) {
      warning("rx: named-list nesting exceeded depth 100; returned opaque handle")
      return(list(kind = "object", id = store_object(value)))
    }
    n <- length(value)
    if (n == 0L) {
      return(list(kind = "named_list", entries = list()))
    }
    nms <- names(value)
    fully_named <- !is.null(nms) && length(nms) == n &&
                   all(nzchar(nms)) && anyDuplicated(nms) == 0L
    if (fully_named) {
      entries <- lapply(seq_len(n), function(i)
        list(key = nms[[i]], value = encode_value(value[[i]], depth + 1L))
      )
      return(list(kind = "named_list", entries = entries))
    } else {
      nms_out <- if (is.null(nms)) {
        vector("list", n)
      } else {
        lapply(nms, function(nm) if (nzchar(nm)) nm else NULL)
      }
      entries <- lapply(seq_len(n), function(i)
        list(key = nms_out[[i]], value = encode_value(value[[i]], depth + 1L))
      )
      return(list(kind = "list", entries = entries))
    }
  }
  # Opaque object (classed list, S4, environment, function, etc.)
  list(kind = "object", id = store_object(value))
}

decode_value <- function(v) {
  # Plain atomic scalar (e.g. a vector element from an __inline__ global)
  if (!is.list(v)) return(v)
  if (identical(v$kind, "scalar")) {
    if (identical(v$type, "null")) return(NULL)
    if (identical(v$type, "logical")) return(as.logical(v$value))
    if (identical(v$type, "integer")) return(as.integer(v$value))
    if (identical(v$type, "double")) return(as.double(v$value))
    if (identical(v$type, "character")) return(as.character(v$value))
  }
  if (identical(v$kind, "na")) {
    if (identical(v$type, "logical")) return(NA)
    if (identical(v$type, "integer")) return(NA_integer_)
    if (identical(v$type, "double")) return(NA_real_)
    if (identical(v$type, "character")) return(NA_character_)
  }
  if (identical(v$kind, "object")) return(get_object(v$id))
  if (identical(v$kind, "table")) return(decode_table(v))
  if (identical(v$kind, "vector")) {
    elems <- lapply(v$values, decode_value)
    return(switch(v$type,
      "double"    = as.double(unlist(elems)),
      "integer"   = as.integer(unlist(elems)),
      "logical"   = as.logical(unlist(elems)),
      "character" = as.character(unlist(elems)),
      unlist(elems)
    ))
  }
  if (identical(v$kind, "named_list")) {
    entries <- v$entries
    if (length(entries) == 0L) return(list())
    out <- vector("list", length(entries))
    for (i in seq_along(entries)) out[[i]] <- decode_value(entries[[i]]$value)
    names(out) <- vapply(entries, function(e) e$key, character(1))
    return(out)
  }
  if (identical(v$kind, "list")) {
    entries <- v$entries
    if (length(entries) == 0L) return(list())
    out <- vector("list", length(entries))
    for (i in seq_along(entries)) out[[i]] <- decode_value(entries[[i]]$value)
    nms <- vapply(entries, function(e) if (is.null(e$key)) "" else e$key, character(1))
    names(out) <- if (all(nms == "")) NULL else nms
    return(out)
  }
  NULL
}

# Send ready frame immediately on startup
write_frame(list(
  id = "ready",
  status = "ready",
  version = as.character(getRversion()),
  session_id = .__rx_session_id__
))

# Sentinel for "no expressions evaluated" (comment-only or empty source)
.NO_RESULT <- list(kind = "no_result")

.rx_plot_integer_option <- function(options, name, default, min, max) {
  value <- options[[name]]
  if (is.null(value)) return(as.integer(default))
  if (!is.numeric(value) || length(value) != 1L || is.na(value) ||
      !is.finite(value) || value != floor(value) || value < min || value > max) {
    stop(sprintf("plot option %s must be an integer from %s through %s", name, min, max),
         call. = FALSE)
  }
  as.integer(value)
}

.rx_plot_options <- function(options) {
  if (is.null(options)) options <- list()
  format <- options$format
  if (is.null(format)) format <- "png"
  if (!is.character(format) || length(format) != 1L || is.na(format) ||
      !identical(format, "png")) {
    stop("plot option format must be png", call. = FALSE)
  }

  list(
    format = "png",
    width = .rx_plot_integer_option(options, "width", 640L, 20L, 10000L),
    height = .rx_plot_integer_option(options, "height", 480L, 20L, 10000L),
    res = .rx_plot_integer_option(options, "res", 96L, 1L, 10000L),
    pointsize = .rx_plot_integer_option(options, "pointsize", 12L, 1L, 1000L),
    max_pages = .rx_plot_integer_option(options, "max_pages", 100L, 1L, 1000L),
    max_bytes = .rx_plot_integer_option(options, "max_bytes", 64000000L, 1L, .U32_MAX_INT)
  )
}

.rx_read_png_pages <- function(tmpdir, max_pages, max_bytes) {
  files <- list.files(
    tmpdir,
    pattern = "^page-[0-9]+[.]png$",
    full.names = TRUE
  )
  files <- sort(files)

  if (length(files) > max_pages) {
    stop("plot produced too many pages", call. = FALSE)
  }

  if (length(files) == 0L) return(list())

  info <- file.info(files)
  if (any(is.na(info$size) | info$size <= 0L)) {
    stop("plot device produced an empty PNG file", call. = FALSE)
  }

  if (sum(info$size) > max_bytes) {
    stop("plot PNG output exceeds byte limit", call. = FALSE)
  }

  sig <- as.raw(c(0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a))

  lapply(seq_along(files), function(i) {
    bytes <- readBin(files[[i]], what = "raw", n = info$size[[i]])
    if (length(bytes) < 8L || !identical(bytes[1:8], sig)) {
      stop("plot device did not produce a valid PNG file", call. = FALSE)
    }
    bytes
  })
}

.rx_is_ggplot <- function(value) {
  inherits(value, "ggplot") || inherits(value, "ggplot2::ggplot")
}

.rx_capture_plot <- function(code, globals, options) {
  eval_env <- new.env(parent = .GlobalEnv)
  tmpdir <- tempfile("rx-plot-")
  dir.create(tmpdir, mode = "0700")
  on.exit(unlink(tmpdir, recursive = TRUE, force = TRUE), add = TRUE)

  messages_text <- character()
  warnings_text <- character()
  plot_error <- NULL
  close_error <- NULL
  read_error <- NULL
  pages <- list()
  opts <- NULL
  plot_dev <- NULL
  old_devices <- grDevices::dev.list()
  old_devices <- if (is.null(old_devices)) integer() else unname(as.integer(old_devices))
  old_dev <- unname(as.integer(grDevices::dev.cur()))
  old_options <- NULL

  close_plot_device <- function() {
    open_devices <- grDevices::dev.list()
    if (!is.null(open_devices)) {
      opened_devices <- setdiff(unname(as.integer(open_devices)), old_devices)
      for (device in rev(sort(opened_devices))) {
        tryCatch(
          invisible(grDevices::dev.off(which = device)),
          error = function(e) { if (is.null(close_error)) close_error <<- e }
        )
      }
    }

    open_devices <- grDevices::dev.list()
    if (!is.null(plot_dev) && !is.null(open_devices) &&
        plot_dev %in% unname(as.integer(open_devices))) {
      tryCatch(
        invisible(grDevices::dev.off(which = plot_dev)),
        error = function(e) { if (is.null(close_error)) close_error <<- e }
      )
    }

    open_devices <- grDevices::dev.list()
    if (!is.null(open_devices) && old_dev != 1L &&
        old_dev %in% unname(as.integer(open_devices))) {
      try(invisible(grDevices::dev.set(which = old_dev)), silent = TRUE)
    }
  }

  stdout_lines <- capture.output({
    invisible(withCallingHandlers(
      tryCatch(
        {
          opts <- .rx_plot_options(options)

          if (!is.null(globals) && length(globals) > 0) {
            for (nm in names(globals)) {
              assign(nm, decode_value(globals[[nm]]), envir = eval_env)
            }
          }

          exprs <- parse(text = code)
          file_pattern <- file.path(tmpdir, "page-%06d.png")
          old_options <- options(
            device = function(...) {
              stop("Rx plot capture device is not available", call. = FALSE)
            }
          )

          png_args <- list(
            filename = file_pattern,
            width = opts$width,
            height = opts$height,
            units = "px",
            pointsize = opts$pointsize,
            bg = "white",
            res = opts$res
          )
          if (isTRUE(capabilities("cairo"))) {
            png_args$type <- "cairo"
          }

          do.call(grDevices::png, png_args)
          plot_dev <- unname(as.integer(grDevices::dev.cur()))

          for (expr in exprs) {
            evaluated <- withVisible(eval(expr, envir = eval_env))
            if (isTRUE(evaluated$visible) && .rx_is_ggplot(evaluated$value)) {
              print(evaluated$value)
            }
          }
          NULL
        },
        error = function(e) { plot_error <<- e; NULL },
        finally = {
          close_plot_device()
          if (!is.null(old_options)) options(old_options)
        }
      ),
      message = function(m) {
        messages_text <<- c(messages_text, conditionMessage(m))
        invokeRestart("muffleMessage")
      },
      warning = function(w) {
        warnings_text <<- c(warnings_text, conditionMessage(w))
        invokeRestart("muffleWarning")
      }
    ))
  })

  output <- list(
    stdout = paste(stdout_lines, collapse = "\n"),
    messages = paste(messages_text, collapse = ""),
    warnings = paste(warnings_text, collapse = "")
  )

  if (!is.null(plot_error)) return(list(ok = FALSE, error = plot_error, output = output))
  if (!is.null(close_error)) return(list(ok = FALSE, error = close_error, output = output))

  pages <- tryCatch(
    .rx_read_png_pages(tmpdir, opts$max_pages, opts$max_bytes),
    error = function(e) { read_error <<- e; NULL }
  )

  if (!is.null(read_error)) return(list(ok = FALSE, error = read_error, output = output))
  if (length(pages) == 0L) {
    return(list(ok = FALSE, error = simpleError("R code produced no plot"), output = output))
  }

  list(ok = TRUE, pages = pages, output = output, options = opts)
}

.rx_write_error_frame <- function(id, error, output = list(stdout = "", messages = "", warnings = "")) {
  error_call <- conditionCall(error)
  error_call <- if (is.null(error_call)) NULL else deparse(error_call)

  write_frame(list(
    id = id, status = "error",
    message = conditionMessage(error),
    r_class = class(error),
    call = error_call,
    traceback = as.list(as.character(sys.calls())),
    stdout = output$stdout, messages = output$messages, warnings = output$warnings
  ))
}

repeat {
  header_len <- read_u32()
  header_json <- read_exact(header_len)
  header <- fromJSON(rawToChar(header_json), simplifyVector = FALSE)
  body_len <- read_u64()
  body <- if (body_len > 0) read_exact(body_len) else raw()

  if (identical(header$op, "ping")) {
    write_frame(list(id = header$id, status = "ok", result = list(kind = "pong")))
  } else if (identical(header$op, "eval")) {
    code <- header$code

    # Fresh environment, inheriting from global so library functions are accessible
    eval_env <- new.env(parent = .GlobalEnv)

    messages_text <- character()
    warnings_text <- character()
    eval_error <- NULL

    stdout_lines <- capture.output({
      result_value <- withCallingHandlers(
        tryCatch(
          {
            # Assign globals inside tryCatch so a missing object id is caught
            # and returned as an R error rather than crashing the backend.
            if (!is.null(header$globals) && length(header$globals) > 0) {
              for (nm in names(header$globals)) {
                assign(nm, decode_value(header$globals[[nm]]), envir = eval_env)
              }
            }
            exprs <- parse(text = code)
            if (length(exprs) == 0) {
              .NO_RESULT
            } else {
              res <- .NO_RESULT
              for (expr in exprs) res <- eval(expr, envir = eval_env)
              res
            }
          },
          error = function(e) { eval_error <<- e; .NO_RESULT }
        ),
        message = function(m) {
          messages_text <<- c(messages_text, conditionMessage(m))
          invokeRestart("muffleMessage")
        },
        warning = function(w) {
          warnings_text <<- c(warnings_text, conditionMessage(w))
          invokeRestart("muffleWarning")
        }
      )
    })

    output <- list(
      stdout = paste(stdout_lines, collapse = "\n"),
      messages = paste(messages_text, collapse = ""),
      warnings = paste(warnings_text, collapse = "")
    )

    if (!is.null(eval_error)) {
      write_frame(list(
        id = header$id, status = "error",
        message = conditionMessage(eval_error),
        r_class = class(eval_error),
        call = deparse(conditionCall(eval_error)),
        traceback = as.character(sys.calls()),
        stdout = output$stdout, messages = output$messages, warnings = output$warnings
      ))
    } else {
      encode_warnings <- character()
      withCallingHandlers(
        {
          global_vars <- ls(eval_env)
          globals_out <- list()
          for (nm in global_vars) {
            globals_out[[nm]] <- encode_value(get(nm, envir = eval_env))
          }
          encoded_result <- if (identical(result_value, .NO_RESULT)) {
            list(kind = "no_result")
          } else {
            encode_value(result_value)
          }
        },
        warning = function(w) {
          encode_warnings <<- c(encode_warnings, conditionMessage(w))
          invokeRestart("muffleWarning")
        }
      )
      all_warnings <- paste(c(warnings_text, encode_warnings), collapse = "")
      write_frame(list(
        id = header$id, status = "ok",
        result = encoded_result,
        globals = globals_out,
        stdout = output$stdout, messages = output$messages, warnings = all_warnings,
        body_format = NULL
      ))
    }
  } else if (identical(header$op, "plot")) {
    capture <- .rx_capture_plot(header$code, header$globals, header$options)

    if (!isTRUE(capture$ok)) {
      .rx_write_error_frame(header$id, capture$error, capture$output)
    } else {
      sizes <- vapply(capture$pages, length, integer(1))
      body <- do.call(c, capture$pages)
      write_frame(
        list(
          id = header$id,
          status = "ok",
          result = list(
            kind = "plot_png_pages",
            page_count = length(capture$pages),
            sizes = as.list(as.integer(sizes)),
            width = capture$options$width,
            height = capture$options$height
          ),
          stdout = capture$output$stdout,
          messages = capture$output$messages,
          warnings = capture$output$warnings,
          body_format = "plot_png_pages"
        ),
        body
      )
    }
  } else if (identical(header$op, "encode")) {
    encode_error <- NULL
    obj <- tryCatch(
      decode_value(header$value),
      error = function(e) { encode_error <<- e; NULL }
    )

    if (!is.null(encode_error)) {
      write_frame(list(id = header$id, status = "error",
                       message = conditionMessage(encode_error),
                       r_class = class(encode_error),
                       call = deparse(conditionCall(encode_error)),
                       traceback = as.character(sys.calls())))
    } else {
      object_id <- store_object(obj, header$value)
      write_frame(list(id = header$id, status = "ok",
                       result = list(kind = "object", id = object_id)))
    }
  } else if (identical(header$op, "decode")) {
    decode_error <- NULL
    obj <- tryCatch(
      get_object(header$object_id),
      error = function(e) { decode_error <<- e; NULL }
    )

    if (!is.null(decode_error)) {
      write_frame(list(id = header$id, status = "error",
                       message = conditionMessage(decode_error)))
    } else {
      encoded <- get_object_metadata(header$object_id)
      if (is.null(encoded)) {
        encoded <- encode_value(obj)
      }
      # Opaque values re-encode to a new id; return the original to avoid duplication.
      if (identical(encoded$kind, "object")) {
        encoded <- list(kind = "object", id = header$object_id)
      }
      write_frame(list(id = header$id, status = "ok", result = encoded))
    }
  } else if (identical(header$op, "print")) {
    messages_text <- character()
    warnings_text <- character()
    print_error <- NULL
    old_options <- list()

    restore_print_options <- function() {
      if (length(old_options) > 0L) do.call(options, old_options)
    }

    apply_print_options <- function(opts) {
      if (is.null(opts)) return()
      if (!is.null(opts$width)) {
        old_options[["width"]] <<- getOption("width")
        options(width = as.integer(opts$width))
      }
      if (!is.null(opts$max_print)) {
        old_options[["max.print"]] <<- getOption("max.print")
        options(max.print = as.integer(opts$max_print))
      }
    }

    stdout_lines <- capture.output({
      invisible(withCallingHandlers(
        tryCatch(
          {
            apply_print_options(header$options)

            value <- if (!is.null(header$object_id)) {
              get_object(header$object_id)
            } else {
              decode_value(header$value)
            }

            print(value)
            NULL
          },
          error = function(e) { print_error <<- e; NULL },
          finally = restore_print_options()
        ),
        message = function(m) {
          messages_text <<- c(messages_text, conditionMessage(m))
          invokeRestart("muffleMessage")
        },
        warning = function(w) {
          warnings_text <<- c(warnings_text, conditionMessage(w))
          invokeRestart("muffleWarning")
        }
      ))
    })

    output <- list(
      stdout = paste(stdout_lines, collapse = "\n"),
      messages = paste(messages_text, collapse = ""),
      warnings = paste(warnings_text, collapse = "")
    )

    if (!is.null(print_error)) {
      write_frame(list(
        id = header$id, status = "error",
        message = conditionMessage(print_error),
        r_class = class(print_error),
        call = deparse(conditionCall(print_error)),
        traceback = as.character(sys.calls()),
        stdout = output$stdout, messages = output$messages, warnings = output$warnings
      ))
    } else {
      write_frame(list(
        id = header$id, status = "ok",
        result = list(kind = "print_output"),
        stdout = output$stdout, messages = output$messages, warnings = output$warnings,
        body_format = NULL
      ))
    }
  } else if (identical(header$op, "decode_data_frame")) {
    err <- NULL
    result <- tryCatch({
      obj <- get_object(header$object_id)
      dataframe_options <- header$options
      max_rows_value <- .rx_dataframe_wire_field(dataframe_options, "max_rows")
      max_rows <- if (!is.null(max_rows_value)) {
        .rx_dataframe_wire_integer(max_rows_value, min = 1)
      } else {
        NULL
      }
      .rx_encode_data_frame(obj, max_rows = max_rows)
    }, error = function(e) { err <<- e; NULL })

    if (!is.null(err)) {
      .rx_write_dataframe_error_frame(header$id, err)
    } else {
      write_frame(list(id = header$id, status = "ok", result = result))
    }
  } else if (identical(header$op, "encode_data_frame")) {
    err <- NULL
    obj <- tryCatch(
      .rx_decode_data_frame(header$value),
      error = function(e) { err <<- e; NULL }
    )

    if (!is.null(err)) {
      .rx_write_dataframe_error_frame(header$id, err)
    } else {
      id <- store_object(obj)
      write_frame(list(id = header$id, status = "ok",
                       result = list(kind = "object", id = id)))
    }
  } else if (identical(header$op, "decode_arrow")) {
    decode_arrow_error <- NULL
    obj <- tryCatch(
      get_object(header$object_id),
      error = function(e) { decode_arrow_error <<- e; NULL }
    )
    if (!is.null(decode_arrow_error)) {
      write_frame(list(id = header$id, status = "error", message = conditionMessage(decode_arrow_error)))
    } else if (is.null(obj) || !is.data.frame(obj)) {
      write_frame(list(id = header$id, status = "error", message = "object is not an Arrow-compatible data frame"))
    } else if (!requireNamespace("arrow", quietly = TRUE)) {
      write_frame(list(
        id = header$id,
        status = "error",
        message = "R 'arrow' package is not installed",
        rx_error = list(tag = "missing_r_package", args = list("arrow"))
      ))
    } else {
      arrow_bytes <- arrow::write_to_raw(obj, format = "stream")
      write_frame(
        list(id = header$id, status = "ok", result = list(kind = "arrow", type = "ipc_stream"), body_format = "arrow_stream"),
        arrow_bytes
      )
    }
  } else if (identical(header$op, "encode_dataframe")) {
    if (!requireNamespace("arrow", quietly = TRUE)) {
      write_frame(list(
        id = header$id,
        status = "error",
        message = "R 'arrow' package is not installed",
        rx_error = list(tag = "missing_r_package", args = list("arrow"))
      ))
    } else {
      df <- tryCatch(
        arrow::read_ipc_stream(body, as_data_frame = TRUE),
        error = function(e) NULL
      )
      if (is.null(df)) {
        write_frame(list(id = header$id, status = "error", message = "failed to decode Arrow IPC stream"))
      } else {
        id <- store_object(df)
        write_frame(list(id = header$id, status = "ok",
                         result = list(kind = "object", id = id)))
      }
    }
  } else {
    write_frame(list(id = header$id, status = "error", message = paste("unknown op:", header$op)))
  }
}