.rx_lib_paths <- commandArgs(trailingOnly = TRUE)
if (length(.rx_lib_paths) > 0L) {
.libPaths(c(.rx_lib_paths, .libPaths()))
}
rm(.rx_lib_paths)
.rx_renv_enabled <- identical(Sys.getenv("RX_RENV_ENABLED", unset = ""), "1")
if (.rx_renv_enabled) {
.rx_renv_project <- Sys.getenv("RX_RENV_PROJECT", unset = "")
.rx_renv_lockfile <- Sys.getenv("RX_RENV_LOCKFILE", unset = "")
if (!nzchar(.rx_renv_project)) {
stop("RX_RENV_PROJECT is required when RX_RENV_ENABLED=1", call. = FALSE)
}
if (!nzchar(.rx_renv_lockfile)) {
stop("RX_RENV_LOCKFILE is required when RX_RENV_ENABLED=1", call. = FALSE)
}
Sys.setenv(RENV_PATHS_LOCKFILE = .rx_renv_lockfile)
if (!requireNamespace("renv", quietly = TRUE)) {
stop("missing required R package: renv", call. = FALSE)
}
invisible(capture.output(
suppressWarnings(suppressMessages(renv::load(.rx_renv_project, quiet = TRUE)))
))
}
rm(.rx_renv_enabled)
if (exists(".rx_renv_project", inherits = FALSE)) rm(.rx_renv_project)
if (exists(".rx_renv_lockfile", inherits = FALSE)) rm(.rx_renv_lockfile)
suppressPackageStartupMessages(library(jsonlite))
# R 4.x marks stdout()/stdin() as mode "w"/"r" (text). writeBin/readBin
# require binary connections. Open /dev/fd/0 and /dev/fd/1 explicitly in
# binary mode — this goes straight to the Erlang port pipe without seeking.
.rx_in <- suppressWarnings(file("/dev/fd/0", open = "rb"))
.rx_out <- suppressWarnings(file("/dev/fd/1", open = "wb"))
read_exact <- function(n) {
buf <- readBin(.rx_in, what = "raw", n = n)
if (length(buf) < n) quit(save = "no", status = 0)
buf
}
# Read a 4-byte big-endian unsigned integer.
# readBin with signed=FALSE is deprecated for size=4 in R 4.5.x; use
# signed=TRUE and convert negative two's-complement values to unsigned.
# 2147483648 (2^31) must be a double literal — it overflows R's 32-bit integer.
.U32_SIGN_BIT <- 2147483648
.U32_MAX_INT <- 2147483647L
read_u32 <- function() {
n <- readBin(read_exact(4), "integer", size = 4L, endian = "big", signed = TRUE)
if (n < 0L) bitwOr(bitwAnd(n, .U32_MAX_INT), .U32_SIGN_BIT) else n
}
read_u64 <- function() {
raw <- read_exact(8)
hi <- readBin(raw[1:4], "integer", size = 4L, endian = "big", signed = TRUE)
lo <- readBin(raw[5:8], "integer", size = 4L, endian = "big", signed = TRUE)
hi_u <- if (hi < 0L) bitwOr(bitwAnd(hi, .U32_MAX_INT), .U32_SIGN_BIT) else hi
lo_u <- if (lo < 0L) bitwOr(bitwAnd(lo, .U32_MAX_INT), .U32_SIGN_BIT) else lo
as.numeric(hi_u) * 4294967296 + as.numeric(lo_u)
}
write_frame <- function(header, body = raw()) {
header_raw <- charToRaw(toJSON(header, auto_unbox = TRUE, null = "null"))
writeBin(length(header_raw), .rx_out, size = 4, endian = "big")
writeBin(header_raw, .rx_out)
writeBin(length(body), .rx_out, size = 8, endian = "big")
if (length(body) > 0) writeBin(body, .rx_out)
flush(.rx_out)
}
# Object store for opaque R values
.__rx_objects__ <- new.env(parent = emptyenv())
.__rx_object_meta__ <- new.env(parent = emptyenv())
.__rx_next_id__ <- 0L
.__rx_session_id__ <- paste0(
Sys.getpid(), "_",
gsub("[^0-9]", "", format(Sys.time(), "%Y%m%d%H%M%OS6"))
)
store_object <- function(value, metadata = NULL) {
.__rx_next_id__ <<- .__rx_next_id__ + 1L
id <- paste0("rx_", .__rx_session_id__, "_obj_", .__rx_next_id__)
assign(id, value, envir = .__rx_objects__)
if (!is.null(metadata)) {
assign(id, metadata, envir = .__rx_object_meta__)
}
id
}
is_stale_object_id <- function(id) {
is.character(id) && length(id) == 1L &&
(startsWith(id, "rx_") || startsWith(id, "rpx_")) &&
!startsWith(id, paste0("rx_", .__rx_session_id__, "_")) &&
!startsWith(id, paste0("rpx_", .__rx_session_id__, "_"))
}
get_object <- function(id) {
if (is_stale_object_id(id)) {
stop("stale object handle from previous R backend session: ", id, call. = FALSE)
}
if (!exists(id, envir = .__rx_objects__, inherits = FALSE)) {
stop("object not found: ", id, call. = FALSE)
}
get(id, envir = .__rx_objects__)
}
get_object_metadata <- function(id) {
if (is_stale_object_id(id)) {
stop("stale object handle from previous R backend session: ", id, call. = FALSE)
}
if (exists(id, envir = .__rx_object_meta__, inherits = FALSE)) {
get(id, envir = .__rx_object_meta__)
} else {
NULL
}
}
encode_table <- function(value) {
dims <- dim(value)
if (is.null(dims)) dims <- length(value)
names_list <- dimnames(value)
if (is.null(names_list)) {
names_list <- vector("list", length(dims))
for (i in seq_along(dims)) names_list[[i]] <- as.character(seq_len(dims[[i]]))
}
names_list_names <- names(names_list)
dimname_entries <- lapply(seq_along(names_list), function(i) {
name <- if (!is.null(names_list_names) &&
length(names_list_names) >= i &&
nzchar(names_list_names[[i]])) {
names_list_names[[i]]
} else {
NULL
}
list(
name = name,
values = as.list(as.character(names_list[[i]]))
)
})
list(
kind = "table",
counts = as.list(as.integer(value)),
dim = as.list(as.integer(dims)),
dimnames = dimname_entries
)
}
decode_table <- function(v) {
counts <- as.integer(unlist(v$counts, use.names = FALSE))
dims <- as.integer(unlist(v$dim, use.names = FALSE))
dimname_entries <- v$dimnames
names_list <- lapply(dimname_entries, function(entry) {
as.character(unlist(entry$values, use.names = FALSE))
})
dim_names <- vapply(dimname_entries, function(entry) {
if (is.null(entry$name)) "" else as.character(entry$name)
}, character(1))
if (any(nzchar(dim_names))) names(names_list) <- dim_names
structure(
array(counts, dim = dims, dimnames = names_list),
class = "table"
)
}
.rx_dataframe_error <- function(tag, ...) {
args <- list(...)
message <- paste0(
tag,
": ",
paste(vapply(args, as.character, character(1)), collapse = ", ")
)
structure(
list(tag = tag, args = args, message = message),
class = c("rx_dataframe_error", "error", "condition")
)
}
conditionMessage.rx_dataframe_error <- function(c) c$message
.rx_write_dataframe_error_frame <- function(id, err) {
if (inherits(err, "rx_dataframe_error")) {
write_frame(list(
id = id,
status = "error",
message = conditionMessage(err),
rx_error = list(tag = err$tag, args = err$args)
))
} else {
write_frame(list(id = id, status = "error", message = conditionMessage(err)))
}
}
.rx_dataframe_reason <- function(reason) {
if (is.character(reason) && length(reason) == 1L) return(reason)
reason
}
.rx_dataframe_column_type <- function(column) {
if (!is.null(dim(column))) {
stop(.rx_dataframe_error("unsupported_dataframe_column", "", "matrix"))
}
class_attr <- attr(column, "class", exact = TRUE)
if (!is.null(class_attr) && length(class_attr) > 0L) {
if (inherits(column, "factor")) {
stop(.rx_dataframe_error("unsupported_dataframe_column", "", "factor"))
}
if (inherits(column, "Date")) {
stop(.rx_dataframe_error("unsupported_dataframe_column", "", "date"))
}
if (inherits(column, "POSIXt")) {
stop(.rx_dataframe_error("unsupported_dataframe_column", "", "posix"))
}
}
if (is.list(column)) {
stop(.rx_dataframe_error("unsupported_dataframe_column", "", "list"))
}
if (!is.null(class_attr) && length(class_attr) > 0L) {
stop(.rx_dataframe_error(
"unsupported_dataframe_column",
"",
list(tuple = list("class", as.character(class_attr[[1L]])))
))
}
if (is.logical(column)) return("logical")
if (is.integer(column)) return("integer")
if (is.double(column)) return("double")
if (is.character(column)) return("character")
if (is.complex(column)) {
stop(.rx_dataframe_error("unsupported_dataframe_column", "", "complex"))
}
if (is.raw(column)) {
stop(.rx_dataframe_error("unsupported_dataframe_column", "", "raw"))
}
stop(.rx_dataframe_error("unsupported_dataframe_column", "", "unsupported_type"))
}
.rx_dataframe_default_row_names <- function(value) {
.row_names_info(value, 1L) <= 0L
}
.rx_dataframe_json_array <- function(value) {
is.list(value) && is.null(names(value))
}
.rx_dataframe_wire_field <- function(value, name) {
if (!is.list(value)) return(NULL)
value[[name, exact = TRUE]]
}
.rx_dataframe_wire_integer <- function(value, min = NULL, max = .U32_MAX_INT) {
if (is.null(value) || is.list(value) || length(value) != 1L ||
is.na(value) || !is.numeric(value) || !is.finite(value) ||
value != floor(value)) {
stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
}
if (!is.null(min) && value < min) {
stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
}
if (!is.null(max) && value > max) {
stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
}
as.integer(value)
}
.rx_dataframe_wire_value <- function(value, type) {
if (is.list(value) || is.null(value) || length(value) != 1L || is.na(value)) {
stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
}
if (identical(type, "logical")) {
if (!is.logical(value)) stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
return(as.logical(value))
}
if (identical(type, "integer")) {
if (!is.numeric(value) || !is.finite(value) || value != floor(value) ||
value < -2147483647 || value > 2147483647) {
stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
}
return(as.integer(value))
}
if (identical(type, "double")) {
if (!is.numeric(value)) stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
out <- as.numeric(value)
if (!is.finite(out)) {
stop(.rx_dataframe_error("unsupported_dataframe_column", "", "non_finite_double"))
}
return(out)
}
if (identical(type, "character")) {
if (!is.character(value)) stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
return(as.character(value))
}
stop(.rx_dataframe_error("invalid_dataframe", "unsupported_type"))
}
.rx_dataframe_column_values <- function(column, type) {
if (identical(type, "double") && any(is.infinite(column) | is.nan(column), na.rm = TRUE)) {
stop(.rx_dataframe_error("unsupported_dataframe_column", "", "non_finite_double"))
}
lapply(seq_along(column), function(i) {
if (is.na(column[[i]])) {
list(kind = "na", type = type)
} else if (identical(type, "double")) {
as.numeric(column[[i]])
} else if (identical(type, "integer")) {
as.integer(column[[i]])
} else if (identical(type, "logical")) {
as.logical(column[[i]])
} else {
as.character(column[[i]])
}
})
}
.rx_encode_data_frame <- function(value, max_rows = NULL) {
if (!inherits(value, "data.frame")) {
stop(.rx_dataframe_error("not_dataframe", "R object is not a data.frame"))
}
rows <- nrow(value)
if (!is.null(max_rows) && rows > max_rows) {
stop(.rx_dataframe_error("dataframe_too_large", rows, max_rows))
}
names_value <- names(value)
if (is.null(names_value) || length(names_value) != length(value) || any(is.na(names_value))) {
stop(.rx_dataframe_error("unsupported_dataframe_names", "non_string"))
}
if (any(!nzchar(names_value))) {
stop(.rx_dataframe_error("unsupported_dataframe_names", "empty"))
}
if (anyDuplicated(names_value) != 0L) {
stop(.rx_dataframe_error("unsupported_dataframe_names", "duplicate"))
}
if (length(value) == 0L && rows > 0L) {
stop(.rx_dataframe_error("unsupported_dataframe_shape", "zero_column_nonzero_row"))
}
if (!.rx_dataframe_default_row_names(value)) {
stop(.rx_dataframe_error("unsupported_dataframe_row_names", "custom"))
}
columns <- lapply(seq_along(value), function(i) {
column_name <- names_value[[i]]
column <- value[[i]]
type <- tryCatch(
.rx_dataframe_column_type(column),
rx_dataframe_error = function(e) {
if (identical(e$tag, "unsupported_dataframe_column") &&
identical(e$args[[1L]], "")) {
stop(.rx_dataframe_error("unsupported_dataframe_column", column_name, e$args[[2L]]))
}
stop(e)
}
)
values <- tryCatch(
.rx_dataframe_column_values(column, type),
rx_dataframe_error = function(e) {
if (identical(e$tag, "unsupported_dataframe_column") &&
identical(e$args[[1L]], "")) {
stop(.rx_dataframe_error("unsupported_dataframe_column", column_name, e$args[[2L]]))
}
stop(e)
}
)
list(name = column_name, type = type, values = values)
})
list(kind = "data_frame", names = as.list(names_value), n_rows = rows, columns = columns)
}
.rx_decode_data_frame_value <- function(value, type) {
if (is.list(value) && identical(.rx_dataframe_wire_field(value, "kind"), "na")) {
if (!identical(.rx_dataframe_wire_field(value, "type"), type)) {
stop(.rx_dataframe_error("invalid_dataframe", "mixed_column_type"))
}
return(switch(type,
logical = NA,
integer = NA_integer_,
double = NA_real_,
character = NA_character_
))
}
.rx_dataframe_wire_value(value, type)
}
.rx_decode_data_frame <- function(wire) {
kind <- .rx_dataframe_wire_field(wire, "kind")
names_wire <- .rx_dataframe_wire_field(wire, "names")
n_rows_wire <- .rx_dataframe_wire_field(wire, "n_rows")
columns <- .rx_dataframe_wire_field(wire, "columns")
if (!is.list(wire) || !identical(kind, "data_frame")) {
stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
}
if (is.null(names_wire) || !.rx_dataframe_json_array(names_wire) ||
is.null(n_rows_wire) || is.null(columns) ||
!.rx_dataframe_json_array(columns)) {
stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
}
names_value <- vapply(names_wire, function(name) {
if (!is.character(name) || length(name) != 1L || is.na(name)) {
stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
}
name
}, character(1))
n_rows <- .rx_dataframe_wire_integer(n_rows_wire, min = 0)
if (any(!nzchar(names_value))) {
stop(.rx_dataframe_error("invalid_dataframe", "empty_name"))
}
if (anyDuplicated(names_value) != 0L) {
stop(.rx_dataframe_error("invalid_dataframe", "duplicate_names"))
}
if (length(names_value) != length(columns)) {
stop(.rx_dataframe_error("invalid_dataframe", "column_key_mismatch"))
}
if (length(names_value) == 0L && n_rows > 0L) {
stop(.rx_dataframe_error("unsupported_dataframe_shape", "zero_column_nonzero_row"))
}
out <- vector("list", length(columns))
names(out) <- names_value
for (i in seq_along(columns)) {
entry <- columns[[i]]
name <- .rx_dataframe_wire_field(entry, "name")
type <- .rx_dataframe_wire_field(entry, "type")
values_wire <- .rx_dataframe_wire_field(entry, "values")
if (!is.list(entry) || is.null(name) || is.null(type) ||
is.null(values_wire) || !.rx_dataframe_json_array(values_wire)) {
stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
}
if (!is.character(name) || length(name) != 1L || is.na(name) ||
!is.character(type) || length(type) != 1L || is.na(type)) {
stop(.rx_dataframe_error("invalid_dataframe", "malformed_wire"))
}
if (!identical(name, names_value[[i]])) {
stop(.rx_dataframe_error("invalid_dataframe", "column_key_mismatch"))
}
values <- tryCatch(
lapply(values_wire, .rx_decode_data_frame_value, type = type),
rx_dataframe_error = function(e) {
if (identical(e$tag, "unsupported_dataframe_column") &&
identical(e$args[[1L]], "")) {
stop(.rx_dataframe_error("unsupported_dataframe_column", name, e$args[[2L]]))
}
stop(e)
}
)
if (length(values) != n_rows) {
stop(.rx_dataframe_error("invalid_dataframe", "column_length_mismatch"))
}
out[[i]] <- switch(type,
logical = as.logical(unlist(values, use.names = FALSE)),
integer = as.integer(unlist(values, use.names = FALSE)),
double = as.numeric(unlist(values, use.names = FALSE)),
character = as.character(unlist(values, use.names = FALSE)),
stop(.rx_dataframe_error("invalid_dataframe", "unsupported_type"))
)
}
as.data.frame(out, stringsAsFactors = FALSE, check.names = FALSE)
}
is_semantic_object <- function(value) {
class_attr <- attr(value, "class", exact = TRUE)
dim_attr <- dim(value)
(!is.null(class_attr) && length(class_attr) > 0L) ||
(!is.null(dim_attr) && length(dim_attr) > 0L)
}
encode_value <- function(value, depth = 0L) {
if (is.null(value)) return(list(kind = "scalar", type = "null", value = NULL))
if (inherits(value, "table")) return(encode_table(value))
if (is_semantic_object(value)) {
return(list(kind = "object", id = store_object(value)))
}
if (is.logical(value) && length(value) == 1) {
if (is.na(value)) return(list(kind = "na", type = "logical"))
return(list(kind = "scalar", type = "logical", value = value))
}
if (is.integer(value) && length(value) == 1) {
if (is.na(value)) return(list(kind = "na", type = "integer"))
return(list(kind = "scalar", type = "integer", value = value))
}
if (is.double(value) && length(value) == 1) {
if (is.na(value)) return(list(kind = "na", type = "double"))
return(list(kind = "scalar", type = "double", value = value))
}
if (is.character(value) && length(value) == 1) {
if (is.na(value)) return(list(kind = "na", type = "character"))
return(list(kind = "scalar", type = "character", value = value))
}
# flat vectors
if (is.logical(value) && length(value) != 1) {
vals <- lapply(seq_along(value), function(i) if (is.na(value[i])) list(kind = "na", type = "logical") else value[i])
return(list(kind = "vector", type = "logical", values = vals))
}
if (is.integer(value) && length(value) != 1) {
vals <- lapply(seq_along(value), function(i) if (is.na(value[i])) list(kind = "na", type = "integer") else value[i])
return(list(kind = "vector", type = "integer", values = vals))
}
if (is.double(value) && length(value) != 1) {
vals <- lapply(seq_along(value), function(i) if (is.na(value[i])) list(kind = "na", type = "double") else value[i])
return(list(kind = "vector", type = "double", values = vals))
}
if (is.character(value) && length(value) != 1) {
vals <- lapply(seq_along(value), function(i) if (is.na(value[i])) list(kind = "na", type = "character") else value[i])
return(list(kind = "vector", type = "character", values = vals))
}
# plain list (not classed — data.frame, S3, S4, POSIXlt, etc. are excluded by identical(class, "list"))
if (is.list(value) && identical(class(value), "list")) {
if (depth >= 100L) {
warning("rx: named-list nesting exceeded depth 100; returned opaque handle")
return(list(kind = "object", id = store_object(value)))
}
n <- length(value)
if (n == 0L) {
return(list(kind = "named_list", entries = list()))
}
nms <- names(value)
fully_named <- !is.null(nms) && length(nms) == n &&
all(nzchar(nms)) && anyDuplicated(nms) == 0L
if (fully_named) {
entries <- lapply(seq_len(n), function(i)
list(key = nms[[i]], value = encode_value(value[[i]], depth + 1L))
)
return(list(kind = "named_list", entries = entries))
} else {
nms_out <- if (is.null(nms)) {
vector("list", n)
} else {
lapply(nms, function(nm) if (nzchar(nm)) nm else NULL)
}
entries <- lapply(seq_len(n), function(i)
list(key = nms_out[[i]], value = encode_value(value[[i]], depth + 1L))
)
return(list(kind = "list", entries = entries))
}
}
# Opaque object (classed list, S4, environment, function, etc.)
list(kind = "object", id = store_object(value))
}
decode_value <- function(v) {
# Plain atomic scalar (e.g. a vector element from an __inline__ global)
if (!is.list(v)) return(v)
if (identical(v$kind, "scalar")) {
if (identical(v$type, "null")) return(NULL)
if (identical(v$type, "logical")) return(as.logical(v$value))
if (identical(v$type, "integer")) return(as.integer(v$value))
if (identical(v$type, "double")) return(as.double(v$value))
if (identical(v$type, "character")) return(as.character(v$value))
}
if (identical(v$kind, "na")) {
if (identical(v$type, "logical")) return(NA)
if (identical(v$type, "integer")) return(NA_integer_)
if (identical(v$type, "double")) return(NA_real_)
if (identical(v$type, "character")) return(NA_character_)
}
if (identical(v$kind, "object")) return(get_object(v$id))
if (identical(v$kind, "table")) return(decode_table(v))
if (identical(v$kind, "vector")) {
elems <- lapply(v$values, decode_value)
return(switch(v$type,
"double" = as.double(unlist(elems)),
"integer" = as.integer(unlist(elems)),
"logical" = as.logical(unlist(elems)),
"character" = as.character(unlist(elems)),
unlist(elems)
))
}
if (identical(v$kind, "named_list")) {
entries <- v$entries
if (length(entries) == 0L) return(list())
out <- vector("list", length(entries))
for (i in seq_along(entries)) out[[i]] <- decode_value(entries[[i]]$value)
names(out) <- vapply(entries, function(e) e$key, character(1))
return(out)
}
if (identical(v$kind, "list")) {
entries <- v$entries
if (length(entries) == 0L) return(list())
out <- vector("list", length(entries))
for (i in seq_along(entries)) out[[i]] <- decode_value(entries[[i]]$value)
nms <- vapply(entries, function(e) if (is.null(e$key)) "" else e$key, character(1))
names(out) <- if (all(nms == "")) NULL else nms
return(out)
}
NULL
}
# Send ready frame immediately on startup
write_frame(list(
id = "ready",
status = "ready",
version = as.character(getRversion()),
session_id = .__rx_session_id__
))
# Sentinel for "no expressions evaluated" (comment-only or empty source)
.NO_RESULT <- list(kind = "no_result")
.rx_plot_integer_option <- function(options, name, default, min, max) {
value <- options[[name]]
if (is.null(value)) return(as.integer(default))
if (!is.numeric(value) || length(value) != 1L || is.na(value) ||
!is.finite(value) || value != floor(value) || value < min || value > max) {
stop(sprintf("plot option %s must be an integer from %s through %s", name, min, max),
call. = FALSE)
}
as.integer(value)
}
.rx_plot_options <- function(options) {
if (is.null(options)) options <- list()
format <- options$format
if (is.null(format)) format <- "png"
if (!is.character(format) || length(format) != 1L || is.na(format) ||
!identical(format, "png")) {
stop("plot option format must be png", call. = FALSE)
}
list(
format = "png",
width = .rx_plot_integer_option(options, "width", 640L, 20L, 10000L),
height = .rx_plot_integer_option(options, "height", 480L, 20L, 10000L),
res = .rx_plot_integer_option(options, "res", 96L, 1L, 10000L),
pointsize = .rx_plot_integer_option(options, "pointsize", 12L, 1L, 1000L),
max_pages = .rx_plot_integer_option(options, "max_pages", 100L, 1L, 1000L),
max_bytes = .rx_plot_integer_option(options, "max_bytes", 64000000L, 1L, .U32_MAX_INT)
)
}
.rx_read_png_pages <- function(tmpdir, max_pages, max_bytes) {
files <- list.files(
tmpdir,
pattern = "^page-[0-9]+[.]png$",
full.names = TRUE
)
files <- sort(files)
if (length(files) > max_pages) {
stop("plot produced too many pages", call. = FALSE)
}
if (length(files) == 0L) return(list())
info <- file.info(files)
if (any(is.na(info$size) | info$size <= 0L)) {
stop("plot device produced an empty PNG file", call. = FALSE)
}
if (sum(info$size) > max_bytes) {
stop("plot PNG output exceeds byte limit", call. = FALSE)
}
sig <- as.raw(c(0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a))
lapply(seq_along(files), function(i) {
bytes <- readBin(files[[i]], what = "raw", n = info$size[[i]])
if (length(bytes) < 8L || !identical(bytes[1:8], sig)) {
stop("plot device did not produce a valid PNG file", call. = FALSE)
}
bytes
})
}
.rx_is_ggplot <- function(value) {
inherits(value, "ggplot") || inherits(value, "ggplot2::ggplot")
}
.rx_capture_plot <- function(code, globals, options) {
eval_env <- new.env(parent = .GlobalEnv)
tmpdir <- tempfile("rx-plot-")
dir.create(tmpdir, mode = "0700")
on.exit(unlink(tmpdir, recursive = TRUE, force = TRUE), add = TRUE)
messages_text <- character()
warnings_text <- character()
plot_error <- NULL
close_error <- NULL
read_error <- NULL
pages <- list()
opts <- NULL
plot_dev <- NULL
old_devices <- grDevices::dev.list()
old_devices <- if (is.null(old_devices)) integer() else unname(as.integer(old_devices))
old_dev <- unname(as.integer(grDevices::dev.cur()))
old_options <- NULL
close_plot_device <- function() {
open_devices <- grDevices::dev.list()
if (!is.null(open_devices)) {
opened_devices <- setdiff(unname(as.integer(open_devices)), old_devices)
for (device in rev(sort(opened_devices))) {
tryCatch(
invisible(grDevices::dev.off(which = device)),
error = function(e) { if (is.null(close_error)) close_error <<- e }
)
}
}
open_devices <- grDevices::dev.list()
if (!is.null(plot_dev) && !is.null(open_devices) &&
plot_dev %in% unname(as.integer(open_devices))) {
tryCatch(
invisible(grDevices::dev.off(which = plot_dev)),
error = function(e) { if (is.null(close_error)) close_error <<- e }
)
}
open_devices <- grDevices::dev.list()
if (!is.null(open_devices) && old_dev != 1L &&
old_dev %in% unname(as.integer(open_devices))) {
try(invisible(grDevices::dev.set(which = old_dev)), silent = TRUE)
}
}
stdout_lines <- capture.output({
invisible(withCallingHandlers(
tryCatch(
{
opts <- .rx_plot_options(options)
if (!is.null(globals) && length(globals) > 0) {
for (nm in names(globals)) {
assign(nm, decode_value(globals[[nm]]), envir = eval_env)
}
}
exprs <- parse(text = code)
file_pattern <- file.path(tmpdir, "page-%06d.png")
old_options <- options(
device = function(...) {
stop("Rx plot capture device is not available", call. = FALSE)
}
)
png_args <- list(
filename = file_pattern,
width = opts$width,
height = opts$height,
units = "px",
pointsize = opts$pointsize,
bg = "white",
res = opts$res
)
if (isTRUE(capabilities("cairo"))) {
png_args$type <- "cairo"
}
do.call(grDevices::png, png_args)
plot_dev <- unname(as.integer(grDevices::dev.cur()))
for (expr in exprs) {
evaluated <- withVisible(eval(expr, envir = eval_env))
if (isTRUE(evaluated$visible) && .rx_is_ggplot(evaluated$value)) {
print(evaluated$value)
}
}
NULL
},
error = function(e) { plot_error <<- e; NULL },
finally = {
close_plot_device()
if (!is.null(old_options)) options(old_options)
}
),
message = function(m) {
messages_text <<- c(messages_text, conditionMessage(m))
invokeRestart("muffleMessage")
},
warning = function(w) {
warnings_text <<- c(warnings_text, conditionMessage(w))
invokeRestart("muffleWarning")
}
))
})
output <- list(
stdout = paste(stdout_lines, collapse = "\n"),
messages = paste(messages_text, collapse = ""),
warnings = paste(warnings_text, collapse = "")
)
if (!is.null(plot_error)) return(list(ok = FALSE, error = plot_error, output = output))
if (!is.null(close_error)) return(list(ok = FALSE, error = close_error, output = output))
pages <- tryCatch(
.rx_read_png_pages(tmpdir, opts$max_pages, opts$max_bytes),
error = function(e) { read_error <<- e; NULL }
)
if (!is.null(read_error)) return(list(ok = FALSE, error = read_error, output = output))
if (length(pages) == 0L) {
return(list(ok = FALSE, error = simpleError("R code produced no plot"), output = output))
}
list(ok = TRUE, pages = pages, output = output, options = opts)
}
.rx_write_error_frame <- function(id, error, output = list(stdout = "", messages = "", warnings = "")) {
error_call <- conditionCall(error)
error_call <- if (is.null(error_call)) NULL else deparse(error_call)
write_frame(list(
id = id, status = "error",
message = conditionMessage(error),
r_class = class(error),
call = error_call,
traceback = as.list(as.character(sys.calls())),
stdout = output$stdout, messages = output$messages, warnings = output$warnings
))
}
repeat {
header_len <- read_u32()
header_json <- read_exact(header_len)
header <- fromJSON(rawToChar(header_json), simplifyVector = FALSE)
body_len <- read_u64()
body <- if (body_len > 0) read_exact(body_len) else raw()
if (identical(header$op, "ping")) {
write_frame(list(id = header$id, status = "ok", result = list(kind = "pong")))
} else if (identical(header$op, "eval")) {
code <- header$code
# Fresh environment, inheriting from global so library functions are accessible
eval_env <- new.env(parent = .GlobalEnv)
messages_text <- character()
warnings_text <- character()
eval_error <- NULL
stdout_lines <- capture.output({
result_value <- withCallingHandlers(
tryCatch(
{
# Assign globals inside tryCatch so a missing object id is caught
# and returned as an R error rather than crashing the backend.
if (!is.null(header$globals) && length(header$globals) > 0) {
for (nm in names(header$globals)) {
assign(nm, decode_value(header$globals[[nm]]), envir = eval_env)
}
}
exprs <- parse(text = code)
if (length(exprs) == 0) {
.NO_RESULT
} else {
res <- .NO_RESULT
for (expr in exprs) res <- eval(expr, envir = eval_env)
res
}
},
error = function(e) { eval_error <<- e; .NO_RESULT }
),
message = function(m) {
messages_text <<- c(messages_text, conditionMessage(m))
invokeRestart("muffleMessage")
},
warning = function(w) {
warnings_text <<- c(warnings_text, conditionMessage(w))
invokeRestart("muffleWarning")
}
)
})
output <- list(
stdout = paste(stdout_lines, collapse = "\n"),
messages = paste(messages_text, collapse = ""),
warnings = paste(warnings_text, collapse = "")
)
if (!is.null(eval_error)) {
write_frame(list(
id = header$id, status = "error",
message = conditionMessage(eval_error),
r_class = class(eval_error),
call = deparse(conditionCall(eval_error)),
traceback = as.character(sys.calls()),
stdout = output$stdout, messages = output$messages, warnings = output$warnings
))
} else {
encode_warnings <- character()
withCallingHandlers(
{
global_vars <- ls(eval_env)
globals_out <- list()
for (nm in global_vars) {
globals_out[[nm]] <- encode_value(get(nm, envir = eval_env))
}
encoded_result <- if (identical(result_value, .NO_RESULT)) {
list(kind = "no_result")
} else {
encode_value(result_value)
}
},
warning = function(w) {
encode_warnings <<- c(encode_warnings, conditionMessage(w))
invokeRestart("muffleWarning")
}
)
all_warnings <- paste(c(warnings_text, encode_warnings), collapse = "")
write_frame(list(
id = header$id, status = "ok",
result = encoded_result,
globals = globals_out,
stdout = output$stdout, messages = output$messages, warnings = all_warnings,
body_format = NULL
))
}
} else if (identical(header$op, "plot")) {
capture <- .rx_capture_plot(header$code, header$globals, header$options)
if (!isTRUE(capture$ok)) {
.rx_write_error_frame(header$id, capture$error, capture$output)
} else {
sizes <- vapply(capture$pages, length, integer(1))
body <- do.call(c, capture$pages)
write_frame(
list(
id = header$id,
status = "ok",
result = list(
kind = "plot_png_pages",
page_count = length(capture$pages),
sizes = as.list(as.integer(sizes)),
width = capture$options$width,
height = capture$options$height
),
stdout = capture$output$stdout,
messages = capture$output$messages,
warnings = capture$output$warnings,
body_format = "plot_png_pages"
),
body
)
}
} else if (identical(header$op, "encode")) {
encode_error <- NULL
obj <- tryCatch(
decode_value(header$value),
error = function(e) { encode_error <<- e; NULL }
)
if (!is.null(encode_error)) {
write_frame(list(id = header$id, status = "error",
message = conditionMessage(encode_error),
r_class = class(encode_error),
call = deparse(conditionCall(encode_error)),
traceback = as.character(sys.calls())))
} else {
object_id <- store_object(obj, header$value)
write_frame(list(id = header$id, status = "ok",
result = list(kind = "object", id = object_id)))
}
} else if (identical(header$op, "decode")) {
decode_error <- NULL
obj <- tryCatch(
get_object(header$object_id),
error = function(e) { decode_error <<- e; NULL }
)
if (!is.null(decode_error)) {
write_frame(list(id = header$id, status = "error",
message = conditionMessage(decode_error)))
} else {
encoded <- get_object_metadata(header$object_id)
if (is.null(encoded)) {
encoded <- encode_value(obj)
}
# Opaque values re-encode to a new id; return the original to avoid duplication.
if (identical(encoded$kind, "object")) {
encoded <- list(kind = "object", id = header$object_id)
}
write_frame(list(id = header$id, status = "ok", result = encoded))
}
} else if (identical(header$op, "print")) {
messages_text <- character()
warnings_text <- character()
print_error <- NULL
old_options <- list()
restore_print_options <- function() {
if (length(old_options) > 0L) do.call(options, old_options)
}
apply_print_options <- function(opts) {
if (is.null(opts)) return()
if (!is.null(opts$width)) {
old_options[["width"]] <<- getOption("width")
options(width = as.integer(opts$width))
}
if (!is.null(opts$max_print)) {
old_options[["max.print"]] <<- getOption("max.print")
options(max.print = as.integer(opts$max_print))
}
}
stdout_lines <- capture.output({
invisible(withCallingHandlers(
tryCatch(
{
apply_print_options(header$options)
value <- if (!is.null(header$object_id)) {
get_object(header$object_id)
} else {
decode_value(header$value)
}
print(value)
NULL
},
error = function(e) { print_error <<- e; NULL },
finally = restore_print_options()
),
message = function(m) {
messages_text <<- c(messages_text, conditionMessage(m))
invokeRestart("muffleMessage")
},
warning = function(w) {
warnings_text <<- c(warnings_text, conditionMessage(w))
invokeRestart("muffleWarning")
}
))
})
output <- list(
stdout = paste(stdout_lines, collapse = "\n"),
messages = paste(messages_text, collapse = ""),
warnings = paste(warnings_text, collapse = "")
)
if (!is.null(print_error)) {
write_frame(list(
id = header$id, status = "error",
message = conditionMessage(print_error),
r_class = class(print_error),
call = deparse(conditionCall(print_error)),
traceback = as.character(sys.calls()),
stdout = output$stdout, messages = output$messages, warnings = output$warnings
))
} else {
write_frame(list(
id = header$id, status = "ok",
result = list(kind = "print_output"),
stdout = output$stdout, messages = output$messages, warnings = output$warnings,
body_format = NULL
))
}
} else if (identical(header$op, "decode_data_frame")) {
err <- NULL
result <- tryCatch({
obj <- get_object(header$object_id)
dataframe_options <- header$options
max_rows_value <- .rx_dataframe_wire_field(dataframe_options, "max_rows")
max_rows <- if (!is.null(max_rows_value)) {
.rx_dataframe_wire_integer(max_rows_value, min = 1)
} else {
NULL
}
.rx_encode_data_frame(obj, max_rows = max_rows)
}, error = function(e) { err <<- e; NULL })
if (!is.null(err)) {
.rx_write_dataframe_error_frame(header$id, err)
} else {
write_frame(list(id = header$id, status = "ok", result = result))
}
} else if (identical(header$op, "encode_data_frame")) {
err <- NULL
obj <- tryCatch(
.rx_decode_data_frame(header$value),
error = function(e) { err <<- e; NULL }
)
if (!is.null(err)) {
.rx_write_dataframe_error_frame(header$id, err)
} else {
id <- store_object(obj)
write_frame(list(id = header$id, status = "ok",
result = list(kind = "object", id = id)))
}
} else if (identical(header$op, "decode_arrow")) {
decode_arrow_error <- NULL
obj <- tryCatch(
get_object(header$object_id),
error = function(e) { decode_arrow_error <<- e; NULL }
)
if (!is.null(decode_arrow_error)) {
write_frame(list(id = header$id, status = "error", message = conditionMessage(decode_arrow_error)))
} else if (is.null(obj) || !is.data.frame(obj)) {
write_frame(list(id = header$id, status = "error", message = "object is not an Arrow-compatible data frame"))
} else if (!requireNamespace("arrow", quietly = TRUE)) {
write_frame(list(
id = header$id,
status = "error",
message = "R 'arrow' package is not installed",
rx_error = list(tag = "missing_r_package", args = list("arrow"))
))
} else {
arrow_bytes <- arrow::write_to_raw(obj, format = "stream")
write_frame(
list(id = header$id, status = "ok", result = list(kind = "arrow", type = "ipc_stream"), body_format = "arrow_stream"),
arrow_bytes
)
}
} else if (identical(header$op, "encode_dataframe")) {
if (!requireNamespace("arrow", quietly = TRUE)) {
write_frame(list(
id = header$id,
status = "error",
message = "R 'arrow' package is not installed",
rx_error = list(tag = "missing_r_package", args = list("arrow"))
))
} else {
df <- tryCatch(
arrow::read_ipc_stream(body, as_data_frame = TRUE),
error = function(e) NULL
)
if (is.null(df)) {
write_frame(list(id = header$id, status = "error", message = "failed to decode Arrow IPC stream"))
} else {
id <- store_object(df)
write_frame(list(id = header$id, status = "ok",
result = list(kind = "object", id = id)))
}
}
} else {
write_frame(list(id = header$id, status = "error", message = paste("unknown op:", header$op)))
}
}