#define _GNU_SOURCE
#include "erl_nif.h"
#include <Rembedded.h>
#include <Rinternals.h>
#include <R_ext/Parse.h>
#include <dlfcn.h>
#include <limits.h>
#include <math.h>
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
typedef int (*Rf_initEmbeddedR_fn)(int, char **);
typedef int (*Rf_initialize_R_fn)(int, char **);
typedef void (*setup_Rmainloop_fn)(void);
typedef SEXP (*Rf_mkString_fn)(const char *);
typedef SEXP (*Rf_protect_fn)(SEXP);
typedef void (*Rf_unprotect_fn)(int);
typedef SEXP (*R_ParseVector_fn)(SEXP, int, ParseStatus *, SEXP);
typedef SEXP (*R_tryEval_fn)(SEXP, SEXP, int *);
typedef Rboolean (*R_ToplevelExec_fn)(void (*)(void *), void *);
typedef R_xlen_t (*Rf_xlength_fn)(SEXP);
typedef int (*LENGTH_fn)(SEXP);
typedef int (*TYPEOF_fn)(SEXP);
typedef SEXP (*VECTOR_ELT_fn)(SEXP, R_xlen_t);
typedef int (*INTEGER_ELT_fn)(SEXP, R_xlen_t);
typedef double (*REAL_ELT_fn)(SEXP, R_xlen_t);
typedef int (*LOGICAL_ELT_fn)(SEXP, R_xlen_t);
typedef SEXP (*STRING_ELT_fn)(SEXP, R_xlen_t);
typedef const char *(*R_CHAR_fn)(SEXP);
typedef void (*R_PreserveObject_fn)(SEXP);
typedef void (*R_ReleaseObject_fn)(SEXP);
typedef SEXP (*Rf_ScalarReal_fn)(double);
typedef SEXP (*Rf_ScalarInteger_fn)(int);
typedef SEXP (*Rf_ScalarLogical_fn)(int);
typedef SEXP (*Rf_mkCharLenCE_fn)(const char *, int, cetype_t);
typedef SEXP (*Rf_ScalarString_fn)(SEXP);
typedef SEXP (*Rf_allocVector_fn)(SEXPTYPE, R_xlen_t);
typedef Rbyte *(*RAW_fn)(SEXP);
typedef void (*SET_INTEGER_ELT_fn)(SEXP, R_xlen_t, int);
typedef void (*SET_REAL_ELT_fn)(SEXP, R_xlen_t, double);
typedef void (*SET_LOGICAL_ELT_fn)(SEXP, R_xlen_t, int);
typedef void (*SET_STRING_ELT_fn)(SEXP, R_xlen_t, SEXP);
typedef SEXP (*SET_VECTOR_ELT_fn)(SEXP, R_xlen_t, SEXP);
typedef SEXP (*Rf_setAttrib_fn)(SEXP, SEXP, SEXP);
typedef SEXP (*R_NewEnv_fn)(SEXP, int, int);
typedef SEXP (*Rf_install_fn)(const char *);
typedef void (*Rf_defineVar_fn)(SEXP, SEXP, SEXP);
typedef SEXP (*Rf_findVarInFrame_fn)(SEXP, SEXP);
typedef SEXP (*R_lsInternal_fn)(SEXP, Rboolean);
typedef SEXP (*Rf_lang1_fn)(SEXP);
typedef SEXP (*Rf_lang2_fn)(SEXP, SEXP);
typedef SEXP (*Rf_lang3_fn)(SEXP, SEXP, SEXP);
typedef SEXP (*Rf_lang4_fn)(SEXP, SEXP, SEXP, SEXP);
typedef SEXP (*Rf_getAttrib_fn)(SEXP, SEXP);
typedef Rboolean (*Rf_inherits_fn)(SEXP, const char *);
typedef void (*ptr_R_WriteConsole_fn)(const char *, int);
typedef void (*ptr_R_WriteConsoleEx_fn)(const char *, int, int);
#define RX_DECODE_MAX_DEPTH 100
typedef enum {
WORK_INIT,
WORK_EVAL_STRING,
WORK_EVAL,
WORK_PRINT,
WORK_PLOT,
WORK_ENCODE_RESOURCE,
WORK_DECODE_RESOURCE,
WORK_ENCODE_DATAFRAME,
WORK_ENCODE_DATA_FRAME,
WORK_DECODE_DATA_FRAME,
WORK_DECODE_ARROW,
WORK_RELEASE
} work_kind;
typedef enum {
RESULT_NONE,
RESULT_NULL,
RESULT_INTEGER,
RESULT_DOUBLE,
RESULT_LOGICAL,
RESULT_CHARACTER,
RESULT_INTEGER_VECTOR,
RESULT_DOUBLE_VECTOR,
RESULT_LOGICAL_VECTOR,
RESULT_CHARACTER_VECTOR,
RESULT_UNSUPPORTED
} r_result_kind;
typedef enum {
ENCODE_KIND_NULL = 1,
ENCODE_KIND_LOGICAL,
ENCODE_KIND_INTEGER,
ENCODE_KIND_DOUBLE,
ENCODE_KIND_CHARACTER,
ENCODE_KIND_LOGICAL_VECTOR,
ENCODE_KIND_INTEGER_VECTOR,
ENCODE_KIND_DOUBLE_VECTOR,
ENCODE_KIND_CHARACTER_VECTOR,
ENCODE_KIND_NA,
ENCODE_KIND_NAMED_LIST
} encode_kind;
typedef enum {
RESOURCE_KIND_NULL = 1,
RESOURCE_KIND_LOGICAL,
RESOURCE_KIND_INTEGER,
RESOURCE_KIND_DOUBLE,
RESOURCE_KIND_CHARACTER,
RESOURCE_KIND_LOGICAL_VECTOR,
RESOURCE_KIND_INTEGER_VECTOR,
RESOURCE_KIND_DOUBLE_VECTOR,
RESOURCE_KIND_CHARACTER_VECTOR,
RESOURCE_KIND_GENERIC,
RESOURCE_KIND_DATAFRAME
} resource_kind;
typedef enum {
DECODE_VALUE_NULL,
DECODE_VALUE_LOGICAL,
DECODE_VALUE_INTEGER,
DECODE_VALUE_DOUBLE,
DECODE_VALUE_CHARACTER,
DECODE_VALUE_NA,
DECODE_VALUE_VECTOR,
DECODE_VALUE_NAMED_LIST,
DECODE_VALUE_R_LIST,
DECODE_VALUE_OBJECT
} decoded_value_kind;
typedef enum {
DECODE_ATOMIC_LOGICAL,
DECODE_ATOMIC_INTEGER,
DECODE_ATOMIC_DOUBLE,
DECODE_ATOMIC_CHARACTER
} decoded_atomic_type;
typedef enum {
INIT_STATE_UNINITIALIZED,
INIT_STATE_INITIALIZED,
INIT_STATE_FAILED
} init_state_tag;
typedef enum {
INIT_RETURN_OK,
INIT_RETURN_PLAIN_ERROR,
INIT_RETURN_MISMATCH,
INIT_RETURN_FAILED
} init_return_kind;
typedef struct init_config {
char *r_home;
char *lib_r_path;
char **lib_paths;
unsigned lib_paths_count;
} init_config;
typedef struct init_failure_info {
char stage[64];
char message[1024];
bool retryable;
bool restart_required;
} init_failure_info;
typedef struct init_mismatch_info {
bool r_home;
bool lib_r_path;
bool lib_paths;
char message[1024];
init_config current;
init_config requested;
} init_mismatch_info;
typedef struct rx_object_resource rx_object_resource;
typedef struct encode_value {
encode_kind kind;
R_xlen_t length;
bool logical_scalar;
int integer_scalar;
double double_scalar;
char *string_scalar;
size_t string_scalar_len;
bool *logical_values;
int *integer_values;
double *double_values;
char **string_values;
size_t *string_value_lens;
decoded_atomic_type na_type;
char **entry_names;
rx_object_resource **entry_resources;
} encode_value;
struct rx_object_resource {
SEXP sexp;
uint64_t id;
resource_kind kind;
bool release_enqueued;
pthread_mutex_t mutex;
};
typedef struct decoded_value decoded_value;
typedef struct decoded_entry {
char *name;
decoded_value *value;
} decoded_entry;
struct decoded_value {
decoded_value_kind kind;
decoded_atomic_type atomic_type;
bool logical_value;
int integer_value;
double double_value;
char *string_value;
size_t string_value_len;
decoded_value **items;
decoded_entry *entries;
unsigned count;
rx_object_resource *resource;
};
typedef enum {
DATAFRAME_COLUMN_LOGICAL,
DATAFRAME_COLUMN_INTEGER,
DATAFRAME_COLUMN_DOUBLE,
DATAFRAME_COLUMN_CHARACTER
} dataframe_column_type;
typedef struct dataframe_cell {
bool is_na;
bool logical_value;
int integer_value;
double double_value;
char *string_value;
size_t string_value_len;
} dataframe_cell;
typedef struct dataframe_column {
char *name;
dataframe_column_type type;
dataframe_cell *values;
unsigned value_count;
} dataframe_column;
typedef struct dataframe_wire {
char **names;
unsigned name_count;
unsigned n_rows;
dataframe_column *columns;
unsigned column_count;
} dataframe_wire;
typedef struct byte_buffer {
char *data;
size_t len;
size_t cap;
} byte_buffer;
typedef struct r_work {
work_kind kind;
char *lib_r_path;
char *r_home;
char **lib_paths;
unsigned lib_paths_count;
char *source;
char *ipc_bytes;
size_t ipc_len;
char *output_bytes;
size_t output_len;
dataframe_wire *input_dataframe;
dataframe_wire *output_dataframe;
bool has_max_rows;
unsigned max_rows;
encode_value encode;
rx_object_resource *resource;
bool has_print_width;
int print_width;
bool has_print_max_print;
int print_max_print;
int plot_width;
int plot_height;
int plot_res;
int plot_pointsize;
int plot_max_pages;
int plot_max_bytes;
char **plot_pages;
size_t *plot_page_lens;
unsigned plot_page_count;
char **global_names;
rx_object_resource **global_resources;
unsigned global_count;
rx_object_resource *result_resource;
char **result_global_names;
rx_object_resource **result_global_resources;
unsigned result_global_count;
decoded_value *decoded_result;
byte_buffer stdout_buffer;
byte_buffer messages_buffer;
byte_buffer warnings_buffer;
bool structured_error;
char error_message[1024];
char **error_classes;
unsigned error_class_count;
char error_call[512];
SEXP release_sexp;
uint64_t release_resource_id;
r_result_kind result_kind;
R_xlen_t result_length;
int integer_result;
double double_result;
bool logical_result;
char *string_result;
size_t string_result_len;
int *integer_values;
double *double_values;
bool *logical_values;
char **string_values;
size_t *string_value_lens;
int unsupported_type;
bool done;
bool ok;
char error[1024];
init_return_kind init_return;
init_failure_info init_failure;
init_mismatch_info init_mismatch;
pthread_cond_t done_cond;
struct r_work *next;
} r_work;
typedef struct r_api {
Rf_mkString_fn mk_string;
Rf_protect_fn protect;
Rf_unprotect_fn unprotect;
R_ParseVector_fn parse_vector;
R_tryEval_fn try_eval;
R_ToplevelExec_fn toplevel_exec;
Rf_xlength_fn xlength_fn;
LENGTH_fn length_fn;
TYPEOF_fn type_of;
VECTOR_ELT_fn vector_elt;
INTEGER_ELT_fn integer_elt;
REAL_ELT_fn real_elt;
LOGICAL_ELT_fn logical_elt;
STRING_ELT_fn string_elt;
R_CHAR_fn r_char;
R_PreserveObject_fn preserve_object;
R_ReleaseObject_fn release_object;
Rf_ScalarReal_fn scalar_real;
Rf_ScalarInteger_fn scalar_integer;
Rf_ScalarLogical_fn scalar_logical;
Rf_mkCharLenCE_fn mk_char_len_ce;
Rf_ScalarString_fn scalar_string;
Rf_allocVector_fn alloc_vector;
RAW_fn raw;
SET_INTEGER_ELT_fn set_integer_elt;
SET_REAL_ELT_fn set_real_elt;
SET_LOGICAL_ELT_fn set_logical_elt;
SET_STRING_ELT_fn set_string_elt;
SET_VECTOR_ELT_fn set_vector_elt;
Rf_setAttrib_fn set_attrib;
R_NewEnv_fn new_env;
Rf_install_fn install;
Rf_defineVar_fn define_var;
Rf_findVarInFrame_fn find_var_in_frame;
R_lsInternal_fn ls_internal;
Rf_lang1_fn lang1;
Rf_lang2_fn lang2;
Rf_lang3_fn lang3;
Rf_lang4_fn lang4;
Rf_getAttrib_fn get_attrib;
Rf_inherits_fn inherits;
SEXP *nil_value;
SEXP *global_env;
SEXP *names_symbol;
SEXP *class_symbol;
SEXP *dim_symbol;
SEXP *na_string;
int *na_int;
double *na_real;
} r_api;
static ERL_NIF_TERM atom_ok;
static ERL_NIF_TERM atom_error;
static ErlNifResourceType *object_resource_type = NULL;
static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
static r_work *queue_head = NULL;
static r_work *queue_tail = NULL;
static pthread_mutex_t owner_start_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_t owner_thread;
static bool owner_started = false;
static bool runtime_initialized = false;
static void *lib_r_handle = NULL;
static r_api r = {0};
static ptr_R_WriteConsole_fn *r_write_console = NULL;
static ptr_R_WriteConsoleEx_fn *r_write_console_ex = NULL;
static __thread r_work *current_capture_work = NULL;
static SEXP eval_helper = NULL;
static SEXP plot_helper = NULL;
static pthread_mutex_t diagnostics_mutex = PTHREAD_MUTEX_INITIALIZER;
static uint64_t next_object_id = 1;
static init_state_tag init_state = INIT_STATE_UNINITIALIZED;
static init_config successful_init_config = {0};
static init_config attempted_init_config = {0};
static init_failure_info last_init_error = {0};
static bool has_last_init_error = false;
static uint64_t init_attempt_count = 0;
static uint64_t init_mismatch_count = 0;
static uint64_t release_count = 0;
static uint64_t release_enqueued_count = 0;
static uint64_t release_success_count = 0;
static uint64_t release_failure_count = 0;
static uint64_t release_skipped_uninitialized_count = 0;
static uint64_t owner_thread_id_value = 0;
static uint64_t last_release_thread_id_value = 0;
static uint64_t last_release_resource_id_value = 0;
static bool has_last_release_resource_id = false;
static char last_release_error[1024] = "";
static uint64_t current_thread_id(void) {
return (uint64_t)(uintptr_t)pthread_self();
}
static int native_debug_enabled(void) {
const char *value = getenv("RX_NATIVE_DEBUG");
return value != NULL && value[0] != '\0' && strcmp(value, "0") != 0;
}
#define NDBG(fmt, ...) do { \
if (native_debug_enabled()) { \
fprintf(stderr, "[rx_native tid=%llu] " fmt "\n", \
(unsigned long long)current_thread_id(), ##__VA_ARGS__); \
fflush(stderr); \
} \
} while (0)
static uint64_t reserve_object_id(void) {
pthread_mutex_lock(&diagnostics_mutex);
uint64_t id = next_object_id++;
pthread_mutex_unlock(&diagnostics_mutex);
return id;
}
static void set_owner_thread_id(uint64_t id) {
pthread_mutex_lock(&diagnostics_mutex);
owner_thread_id_value = id;
pthread_mutex_unlock(&diagnostics_mutex);
}
static void note_release_on_owner_thread(uint64_t resource_id) {
pthread_mutex_lock(&diagnostics_mutex);
release_count++;
release_success_count++;
last_release_thread_id_value = current_thread_id();
last_release_resource_id_value = resource_id;
has_last_release_resource_id = true;
last_release_error[0] = '\0';
pthread_mutex_unlock(&diagnostics_mutex);
}
static void note_release_enqueued(uint64_t resource_id) {
pthread_mutex_lock(&diagnostics_mutex);
release_enqueued_count++;
last_release_resource_id_value = resource_id;
has_last_release_resource_id = true;
last_release_error[0] = '\0';
pthread_mutex_unlock(&diagnostics_mutex);
}
static void note_release_skipped_uninitialized(uint64_t resource_id, const char *message) {
pthread_mutex_lock(&diagnostics_mutex);
release_skipped_uninitialized_count++;
last_release_thread_id_value = current_thread_id();
last_release_resource_id_value = resource_id;
has_last_release_resource_id = true;
snprintf(last_release_error, sizeof(last_release_error), "%s", message);
pthread_mutex_unlock(&diagnostics_mutex);
}
static void note_release_failure(uint64_t resource_id, const char *message) {
pthread_mutex_lock(&diagnostics_mutex);
release_failure_count++;
last_release_thread_id_value = current_thread_id();
last_release_resource_id_value = resource_id;
has_last_release_resource_id = true;
snprintf(last_release_error, sizeof(last_release_error), "%s", message);
pthread_mutex_unlock(&diagnostics_mutex);
}
static uint64_t read_diagnostic_uint64(uint64_t *value) {
pthread_mutex_lock(&diagnostics_mutex);
uint64_t result = *value;
pthread_mutex_unlock(&diagnostics_mutex);
return result;
}
static void set_error(r_work *work, const char *message) {
snprintf(work->error, sizeof(work->error), "%s", message);
work->ok = false;
}
static void fill_init_failure(init_failure_info *failure, const char *stage, const char *message, bool retryable) {
snprintf(failure->stage, sizeof(failure->stage), "%s", stage);
snprintf(failure->message, sizeof(failure->message), "%s", message);
failure->retryable = retryable;
failure->restart_required = !retryable;
}
static void record_init_failure(const init_failure_info *failure) {
pthread_mutex_lock(&diagnostics_mutex);
init_state = INIT_STATE_FAILED;
last_init_error = *failure;
has_last_init_error = true;
pthread_mutex_unlock(&diagnostics_mutex);
}
static void set_init_failed(r_work *work, const char *stage, const char *message, bool retryable) {
fill_init_failure(&work->init_failure, stage, message, retryable);
record_init_failure(&work->init_failure);
work->ok = false;
work->init_return = INIT_RETURN_FAILED;
}
static bool copy_terminal_init_failure(init_failure_info *out) {
bool terminal = false;
pthread_mutex_lock(&diagnostics_mutex);
if (init_state == INIT_STATE_FAILED && has_last_init_error && !last_init_error.retryable) {
*out = last_init_error;
terminal = true;
}
pthread_mutex_unlock(&diagnostics_mutex);
return terminal;
}
static bool set_terminal_init_failure_if_present(r_work *work) {
init_failure_info failure;
if (!copy_terminal_init_failure(&failure)) return false;
work->init_failure = failure;
work->init_return = INIT_RETURN_FAILED;
work->ok = false;
return true;
}
static bool crash_repro_enabled(void) {
const char *value = getenv("RX_CRASH_REPRO");
return value != NULL && strcmp(value, "1") == 0;
}
static bool init_fault_stage_enabled(const char *stage) {
const char *value = getenv("RX_NATIVE_FAULT_INIT_STAGE");
return crash_repro_enabled() && value != NULL && strcmp(value, stage) == 0;
}
static bool release_fault_enabled(const char *mode) {
const char *value = getenv("RX_NATIVE_FAULT_RELEASE");
return crash_repro_enabled() && value != NULL && strcmp(value, mode) == 0;
}
static void close_uninitialized_lib_r_handle(void) {
if (lib_r_handle != NULL) {
dlclose(lib_r_handle);
lib_r_handle = NULL;
}
memset(&r, 0, sizeof(r));
r_write_console = NULL;
r_write_console_ex = NULL;
}
static int get_string(ErlNifEnv *env, ERL_NIF_TERM term, char **out) {
ErlNifBinary bin;
if (enif_inspect_binary(env, term, &bin)) {
*out = malloc(bin.size + 1);
if (*out == NULL) return 0;
memcpy(*out, bin.data, bin.size);
(*out)[bin.size] = '\0';
return 1;
}
int size = enif_get_string(env, term, NULL, 0, ERL_NIF_LATIN1);
if (size <= 0) return 0;
*out = malloc((size_t)size);
if (*out == NULL) return 0;
return enif_get_string(env, term, *out, (unsigned)size, ERL_NIF_LATIN1) > 0;
}
static int get_string_reject_nul(ErlNifEnv *env, ERL_NIF_TERM term, char **out, bool *has_nul) {
*has_nul = false;
*out = NULL;
ErlNifBinary bin;
if (enif_inspect_binary(env, term, &bin)) {
if (memchr(bin.data, '\0', bin.size) != NULL) {
*has_nul = true;
return 1;
}
*out = malloc(bin.size + 1);
if (*out == NULL) return 0;
memcpy(*out, bin.data, bin.size);
(*out)[bin.size] = '\0';
return 1;
}
int size = enif_get_string(env, term, NULL, 0, ERL_NIF_LATIN1);
if (size <= 0) return 0;
*out = malloc((size_t)size);
if (*out == NULL) return 0;
if (enif_get_string(env, term, *out, (unsigned)size, ERL_NIF_LATIN1) <= 0) {
free(*out);
*out = NULL;
return 0;
}
if (memchr(*out, '\0', (size_t)size - 1) != NULL) {
free(*out);
*out = NULL;
*has_nul = true;
}
return 1;
}
static int get_global_name(ErlNifEnv *env, ERL_NIF_TERM term, char **out) {
ErlNifBinary bin;
if (!enif_inspect_binary(env, term, &bin)) return 0;
for (size_t i = 0; i < bin.size; i++) {
if (bin.data[i] == '\0') return 0;
}
*out = malloc(bin.size + 1);
if (*out == NULL) return 0;
memcpy(*out, bin.data, bin.size);
(*out)[bin.size] = '\0';
return 1;
}
static int get_optional_int(ErlNifEnv *env, ERL_NIF_TERM term, bool *has_value, int *out) {
if (enif_is_identical(term, enif_make_atom(env, "nil"))) {
*has_value = false;
*out = 0;
return 1;
}
int value = 0;
if (!enif_get_int(env, term, &value)) return 0;
*has_value = true;
*out = value;
return 1;
}
static void free_string_array(char **items, unsigned count) {
if (items == NULL) return;
for (unsigned i = 0; i < count; i++) free(items[i]);
free(items);
}
static void init_config_clear(init_config *config) {
if (config == NULL) return;
free(config->r_home);
free(config->lib_r_path);
free_string_array(config->lib_paths, config->lib_paths_count);
memset(config, 0, sizeof(*config));
}
static bool init_config_copy(init_config *dest, const char *lib_r_path, const char *r_home, char **lib_paths, unsigned lib_paths_count) {
init_config_clear(dest);
dest->lib_r_path = strdup(lib_r_path);
dest->r_home = strdup(r_home);
dest->lib_paths_count = lib_paths_count;
if (dest->lib_r_path == NULL || dest->r_home == NULL) {
init_config_clear(dest);
return false;
}
if (lib_paths_count > 0) {
dest->lib_paths = calloc(lib_paths_count, sizeof(char *));
if (dest->lib_paths == NULL) {
init_config_clear(dest);
return false;
}
for (unsigned i = 0; i < lib_paths_count; i++) {
dest->lib_paths[i] = strdup(lib_paths[i]);
if (dest->lib_paths[i] == NULL) {
init_config_clear(dest);
return false;
}
}
}
return true;
}
static bool init_config_copy_from_work(init_config *dest, r_work *work) {
return init_config_copy(dest, work->lib_r_path, work->r_home, work->lib_paths, work->lib_paths_count);
}
static bool init_config_equal(const init_config *a, const init_config *b) {
if (strcmp(a->r_home, b->r_home) != 0) return false;
if (strcmp(a->lib_r_path, b->lib_r_path) != 0) return false;
if (a->lib_paths_count != b->lib_paths_count) return false;
for (unsigned i = 0; i < a->lib_paths_count; i++) {
if (strcmp(a->lib_paths[i], b->lib_paths[i]) != 0) return false;
}
return true;
}
static bool fill_init_mismatch(init_mismatch_info *mismatch, const init_config *current, const init_config *requested) {
memset(mismatch, 0, sizeof(*mismatch));
snprintf(mismatch->message, sizeof(mismatch->message),
"embedded native R is already initialized with different native init options; restart the BEAM to change native init options");
mismatch->r_home = strcmp(current->r_home, requested->r_home) != 0;
mismatch->lib_r_path = strcmp(current->lib_r_path, requested->lib_r_path) != 0;
mismatch->lib_paths = current->lib_paths_count != requested->lib_paths_count;
if (!mismatch->lib_paths) {
for (unsigned i = 0; i < current->lib_paths_count; i++) {
if (strcmp(current->lib_paths[i], requested->lib_paths[i]) != 0) {
mismatch->lib_paths = true;
break;
}
}
}
if (!init_config_copy(&mismatch->current, current->lib_r_path, current->r_home, current->lib_paths, current->lib_paths_count)) {
return false;
}
if (!init_config_copy(&mismatch->requested, requested->lib_r_path, requested->r_home, requested->lib_paths, requested->lib_paths_count)) {
init_config_clear(&mismatch->current);
return false;
}
return true;
}
static void free_init_mismatch(init_mismatch_info *mismatch) {
init_config_clear(&mismatch->current);
init_config_clear(&mismatch->requested);
}
static void release_resource_array(rx_object_resource **resources, unsigned count);
static int buffer_append(byte_buffer *buffer, const char *data, size_t len) {
if (len == 0) return 1;
if (data == NULL) return 0;
if (buffer->len > SIZE_MAX - len - 1) return 0;
size_t needed = buffer->len + len + 1;
if (needed > buffer->cap) {
size_t cap = buffer->cap == 0 ? 256 : buffer->cap;
while (cap < needed) {
if (cap > SIZE_MAX / 2) {
cap = needed;
break;
}
cap *= 2;
}
char *data_copy = realloc(buffer->data, cap);
if (data_copy == NULL) return 0;
buffer->data = data_copy;
buffer->cap = cap;
}
memcpy(buffer->data + buffer->len, data, len);
buffer->len += len;
buffer->data[buffer->len] = '\0';
return 1;
}
static void buffer_free(byte_buffer *buffer) {
if (buffer == NULL) return;
free(buffer->data);
buffer->data = NULL;
buffer->len = 0;
buffer->cap = 0;
}
static void capture_write_console_ex(const char *buf, int len, int otype) {
r_work *work = current_capture_work;
if (work == NULL || buf == NULL || len <= 0) return;
if (otype == 0) {
(void)buffer_append(&work->stdout_buffer, buf, (size_t)len);
} else {
(void)buffer_append(&work->messages_buffer, buf, (size_t)len);
}
}
static void capture_write_console(const char *buf, int len) {
r_work *work = current_capture_work;
if (work == NULL || buf == NULL || len <= 0) return;
(void)buffer_append(&work->stdout_buffer, buf, (size_t)len);
}
static int get_string_list(ErlNifEnv *env, ERL_NIF_TERM list, char ***out, unsigned *out_count) {
unsigned count;
if (!enif_get_list_length(env, list, &count)) return 0;
char **items = calloc(count, sizeof(char *));
if (items == NULL && count > 0) return 0;
ERL_NIF_TERM head;
ERL_NIF_TERM tail = list;
for (unsigned i = 0; i < count; i++) {
if (!enif_get_list_cell(env, tail, &head, &tail) || !get_string(env, head, &items[i])) {
free_string_array(items, count);
return 0;
}
}
*out = items;
*out_count = count;
return 1;
}
static void free_encode_value(encode_value *value) {
if (value == NULL) return;
free(value->string_scalar);
free(value->logical_values);
free(value->integer_values);
free(value->double_values);
if (value->string_values != NULL) {
for (R_xlen_t i = 0; i < value->length; i++) free(value->string_values[i]);
}
free(value->string_values);
free(value->string_value_lens);
free_string_array(value->entry_names, (unsigned)value->length);
release_resource_array(value->entry_resources, (unsigned)value->length);
memset(value, 0, sizeof(*value));
}
static void free_dataframe_wire(dataframe_wire *wire) {
if (wire == NULL) return;
free_string_array(wire->names, wire->name_count);
if (wire->columns != NULL) {
for (unsigned i = 0; i < wire->column_count; i++) {
free(wire->columns[i].name);
if (wire->columns[i].values != NULL) {
for (unsigned j = 0; j < wire->columns[i].value_count; j++) {
free(wire->columns[i].values[j].string_value);
}
}
free(wire->columns[i].values);
}
}
free(wire->columns);
free(wire);
}
static void free_decoded_value(decoded_value *value);
static void free_result_value(r_work *work) {
if (work == NULL) return;
free_decoded_value(work->decoded_result);
free(work->string_result);
free(work->integer_values);
free(work->double_values);
free(work->logical_values);
if (work->string_values != NULL) {
for (R_xlen_t i = 0; i < work->result_length; i++) free(work->string_values[i]);
}
free(work->string_values);
free(work->string_value_lens);
work->string_result = NULL;
work->string_result_len = 0;
work->integer_values = NULL;
work->double_values = NULL;
work->logical_values = NULL;
work->string_values = NULL;
work->string_value_lens = NULL;
work->result_length = 0;
work->decoded_result = NULL;
}
static void release_resource_array(rx_object_resource **resources, unsigned count) {
if (resources == NULL) return;
for (unsigned i = 0; i < count; i++) {
if (resources[i] != NULL) enif_release_resource(resources[i]);
}
free(resources);
}
static void free_decoded_value(decoded_value *value) {
if (value == NULL) return;
free(value->string_value);
if (value->items != NULL) {
for (unsigned i = 0; i < value->count; i++) free_decoded_value(value->items[i]);
free(value->items);
}
if (value->entries != NULL) {
for (unsigned i = 0; i < value->count; i++) {
free(value->entries[i].name);
free_decoded_value(value->entries[i].value);
}
free(value->entries);
}
if (value->resource != NULL) enif_release_resource(value->resource);
free(value);
}
static void free_eval_result_globals(r_work *work) {
if (work == NULL) return;
free_string_array(work->result_global_names, work->result_global_count);
release_resource_array(work->result_global_resources, work->result_global_count);
work->result_global_names = NULL;
work->result_global_resources = NULL;
work->result_global_count = 0;
}
static void free_eval_capture(r_work *work) {
if (work == NULL) return;
buffer_free(&work->stdout_buffer);
buffer_free(&work->messages_buffer);
buffer_free(&work->warnings_buffer);
free_string_array(work->error_classes, work->error_class_count);
work->error_classes = NULL;
work->error_class_count = 0;
}
static void free_plot_pages(r_work *work) {
if (work == NULL) return;
if (work->plot_pages != NULL) {
for (unsigned i = 0; i < work->plot_page_count; i++) {
free(work->plot_pages[i]);
}
}
free(work->plot_pages);
free(work->plot_page_lens);
work->plot_pages = NULL;
work->plot_page_lens = NULL;
work->plot_page_count = 0;
}
static resource_kind encode_kind_to_resource_kind(encode_kind kind) {
switch (kind) {
case ENCODE_KIND_NULL:
return RESOURCE_KIND_NULL;
case ENCODE_KIND_LOGICAL:
return RESOURCE_KIND_LOGICAL;
case ENCODE_KIND_INTEGER:
return RESOURCE_KIND_INTEGER;
case ENCODE_KIND_DOUBLE:
return RESOURCE_KIND_DOUBLE;
case ENCODE_KIND_CHARACTER:
return RESOURCE_KIND_CHARACTER;
case ENCODE_KIND_LOGICAL_VECTOR:
return RESOURCE_KIND_LOGICAL_VECTOR;
case ENCODE_KIND_INTEGER_VECTOR:
return RESOURCE_KIND_INTEGER_VECTOR;
case ENCODE_KIND_DOUBLE_VECTOR:
return RESOURCE_KIND_DOUBLE_VECTOR;
case ENCODE_KIND_CHARACTER_VECTOR:
return RESOURCE_KIND_CHARACTER_VECTOR;
case ENCODE_KIND_NA:
case ENCODE_KIND_NAMED_LIST:
return RESOURCE_KIND_GENERIC;
}
return RESOURCE_KIND_GENERIC;
}
static int get_type_name(ErlNifEnv *env, ERL_NIF_TERM term, char *out, size_t out_size) {
if (out_size == 0) return 0;
if (enif_get_atom(env, term, out, (unsigned)out_size, ERL_NIF_LATIN1)) {
return 1;
}
ErlNifBinary bin;
if (!enif_inspect_binary(env, term, &bin) || bin.size >= out_size) {
return 0;
}
memcpy(out, bin.data, bin.size);
out[bin.size] = '\0';
return 1;
}
static int copy_binary_bytes(ErlNifEnv *env, ERL_NIF_TERM term, char **out, size_t *out_len) {
ErlNifBinary bin;
if (!enif_inspect_binary(env, term, &bin) || bin.size > (size_t)INT_MAX) {
return 0;
}
char *copy = malloc(bin.size + 1);
if (copy == NULL) return 0;
if (bin.size > 0) memcpy(copy, bin.data, bin.size);
copy[bin.size] = '\0';
*out = copy;
*out_len = bin.size;
return 1;
}
static int get_boolean(ErlNifEnv *env, ERL_NIF_TERM term, bool *out) {
if (enif_is_identical(term, enif_make_atom(env, "true"))) {
*out = true;
return 1;
}
if (enif_is_identical(term, enif_make_atom(env, "false"))) {
*out = false;
return 1;
}
return 0;
}
static int get_int32(ErlNifEnv *env, ERL_NIF_TERM term, int *out) {
ErlNifSInt64 value;
if (!enif_get_int64(env, term, &value) || value < INT_MIN || value > INT_MAX) {
return 0;
}
*out = (int)value;
return 1;
}
static int get_double_value(ErlNifEnv *env, ERL_NIF_TERM term, double *out) {
if (enif_get_double(env, term, out)) {
return 1;
}
ErlNifSInt64 integer_value;
if (enif_get_int64(env, term, &integer_value)) {
*out = (double)integer_value;
return 1;
}
return 0;
}
static int get_double_float(ErlNifEnv *env, ERL_NIF_TERM term, double *out) {
return enif_get_double(env, term, out);
}
static int parse_encode_kind(ErlNifEnv *env, ERL_NIF_TERM term, encode_kind *kind) {
char type[32];
if (!get_type_name(env, term, type, sizeof(type))) return 0;
if (strcmp(type, "null") == 0) *kind = ENCODE_KIND_NULL;
else if (strcmp(type, "logical") == 0) *kind = ENCODE_KIND_LOGICAL;
else if (strcmp(type, "integer") == 0) *kind = ENCODE_KIND_INTEGER;
else if (strcmp(type, "double") == 0) *kind = ENCODE_KIND_DOUBLE;
else if (strcmp(type, "character") == 0) *kind = ENCODE_KIND_CHARACTER;
else if (strcmp(type, "logical_vector") == 0) *kind = ENCODE_KIND_LOGICAL_VECTOR;
else if (strcmp(type, "integer_vector") == 0) *kind = ENCODE_KIND_INTEGER_VECTOR;
else if (strcmp(type, "double_vector") == 0) *kind = ENCODE_KIND_DOUBLE_VECTOR;
else if (strcmp(type, "character_vector") == 0) *kind = ENCODE_KIND_CHARACTER_VECTOR;
else if (strcmp(type, "na") == 0) *kind = ENCODE_KIND_NA;
else if (strcmp(type, "named_list") == 0) *kind = ENCODE_KIND_NAMED_LIST;
else return 0;
return 1;
}
static int parse_decoded_atomic_type(ErlNifEnv *env, ERL_NIF_TERM term, decoded_atomic_type *type) {
char type_name[32];
if (!get_type_name(env, term, type_name, sizeof(type_name))) return 0;
if (strcmp(type_name, "logical") == 0) *type = DECODE_ATOMIC_LOGICAL;
else if (strcmp(type_name, "integer") == 0) *type = DECODE_ATOMIC_INTEGER;
else if (strcmp(type_name, "double") == 0) *type = DECODE_ATOMIC_DOUBLE;
else if (strcmp(type_name, "character") == 0) *type = DECODE_ATOMIC_CHARACTER;
else return 0;
return 1;
}
static int parse_encode_vector(ErlNifEnv *env, ERL_NIF_TERM list, encode_value *value) {
unsigned count;
if (!enif_get_list_length(env, list, &count)) return 0;
value->length = (R_xlen_t)count;
if (count == 0) return 1;
switch (value->kind) {
case ENCODE_KIND_LOGICAL_VECTOR:
value->logical_values = calloc(count, sizeof(bool));
if (value->logical_values == NULL) return 0;
break;
case ENCODE_KIND_INTEGER_VECTOR:
value->integer_values = calloc(count, sizeof(int));
if (value->integer_values == NULL) return 0;
break;
case ENCODE_KIND_DOUBLE_VECTOR:
value->double_values = calloc(count, sizeof(double));
if (value->double_values == NULL) return 0;
break;
case ENCODE_KIND_CHARACTER_VECTOR:
value->string_values = calloc(count, sizeof(char *));
value->string_value_lens = calloc(count, sizeof(size_t));
if (value->string_values == NULL || value->string_value_lens == NULL) return 0;
break;
default:
return 0;
}
ERL_NIF_TERM head;
ERL_NIF_TERM tail = list;
for (unsigned i = 0; i < count; i++) {
if (!enif_get_list_cell(env, tail, &head, &tail)) return 0;
switch (value->kind) {
case ENCODE_KIND_LOGICAL_VECTOR:
if (!get_boolean(env, head, &value->logical_values[i])) return 0;
break;
case ENCODE_KIND_INTEGER_VECTOR:
if (!get_int32(env, head, &value->integer_values[i])) return 0;
break;
case ENCODE_KIND_DOUBLE_VECTOR:
if (!get_double_float(env, head, &value->double_values[i])) return 0;
break;
case ENCODE_KIND_CHARACTER_VECTOR:
if (!copy_binary_bytes(env, head, &value->string_values[i], &value->string_value_lens[i])) return 0;
break;
default:
return 0;
}
}
return 1;
}
static int parse_encode_named_list(ErlNifEnv *env, ERL_NIF_TERM list, encode_value *value) {
unsigned count;
if (!enif_get_list_length(env, list, &count)) return 0;
value->length = (R_xlen_t)count;
if (count == 0) return 1;
value->entry_names = calloc(count, sizeof(char *));
value->entry_resources = calloc(count, sizeof(rx_object_resource *));
if (value->entry_names == NULL || value->entry_resources == NULL) return 0;
ERL_NIF_TERM head;
ERL_NIF_TERM tail = list;
for (unsigned i = 0; i < count; i++) {
const ERL_NIF_TERM *tuple;
int arity;
if (!enif_get_list_cell(env, tail, &head, &tail) ||
!enif_get_tuple(env, head, &arity, &tuple) ||
arity != 2 ||
!get_global_name(env, tuple[0], &value->entry_names[i]) ||
!enif_get_resource(env, tuple[1], object_resource_type, (void **)&value->entry_resources[i])) {
return 0;
}
enif_keep_resource(value->entry_resources[i]);
}
return 1;
}
static int parse_encode_value(ErlNifEnv *env, ERL_NIF_TERM type_term, ERL_NIF_TERM value_term, encode_value *value) {
memset(value, 0, sizeof(*value));
if (!parse_encode_kind(env, type_term, &value->kind)) return 0;
switch (value->kind) {
case ENCODE_KIND_NULL:
return enif_is_identical(value_term, enif_make_atom(env, "nil"));
case ENCODE_KIND_LOGICAL:
return get_boolean(env, value_term, &value->logical_scalar);
case ENCODE_KIND_INTEGER:
return get_int32(env, value_term, &value->integer_scalar);
case ENCODE_KIND_DOUBLE:
return get_double_value(env, value_term, &value->double_scalar);
case ENCODE_KIND_CHARACTER:
return copy_binary_bytes(env, value_term, &value->string_scalar, &value->string_scalar_len);
case ENCODE_KIND_NA:
return parse_decoded_atomic_type(env, value_term, &value->na_type);
case ENCODE_KIND_LOGICAL_VECTOR:
case ENCODE_KIND_INTEGER_VECTOR:
case ENCODE_KIND_DOUBLE_VECTOR:
case ENCODE_KIND_CHARACTER_VECTOR:
return parse_encode_vector(env, value_term, value);
case ENCODE_KIND_NAMED_LIST:
return parse_encode_named_list(env, value_term, value);
}
return 0;
}
static int map_get_binary_key(ErlNifEnv *env, ERL_NIF_TERM map, const char *key, ERL_NIF_TERM *out) {
size_t len = strlen(key);
ERL_NIF_TERM key_term;
unsigned char *data = enif_make_new_binary(env, len, &key_term);
if (data == NULL && len > 0) return 0;
if (len > 0) memcpy(data, key, len);
return enif_get_map_value(env, map, key_term, out);
}
static int dataframe_type_from_name(const char *name, dataframe_column_type *type) {
if (strcmp(name, "logical") == 0) *type = DATAFRAME_COLUMN_LOGICAL;
else if (strcmp(name, "integer") == 0) *type = DATAFRAME_COLUMN_INTEGER;
else if (strcmp(name, "double") == 0) *type = DATAFRAME_COLUMN_DOUBLE;
else if (strcmp(name, "character") == 0) *type = DATAFRAME_COLUMN_CHARACTER;
else return 0;
return 1;
}
static const char *dataframe_type_name(dataframe_column_type type) {
switch (type) {
case DATAFRAME_COLUMN_LOGICAL:
return "logical";
case DATAFRAME_COLUMN_INTEGER:
return "integer";
case DATAFRAME_COLUMN_DOUBLE:
return "double";
case DATAFRAME_COLUMN_CHARACTER:
return "character";
}
return "unknown";
}
static int parse_dataframe_non_na_value(ErlNifEnv *env, ERL_NIF_TERM term, dataframe_column_type type, dataframe_cell *cell) {
memset(cell, 0, sizeof(*cell));
switch (type) {
case DATAFRAME_COLUMN_LOGICAL:
return get_boolean(env, term, &cell->logical_value);
case DATAFRAME_COLUMN_INTEGER: {
ErlNifSInt64 value;
if (!enif_get_int64(env, term, &value) || value < -2147483647LL || value > INT_MAX) return 0;
cell->integer_value = (int)value;
return 1;
}
case DATAFRAME_COLUMN_DOUBLE:
if (!get_double_value(env, term, &cell->double_value) || !isfinite(cell->double_value)) return 0;
return 1;
case DATAFRAME_COLUMN_CHARACTER:
return copy_binary_bytes(env, term, &cell->string_value, &cell->string_value_len);
}
return 0;
}
static int parse_dataframe_cell(ErlNifEnv *env, ERL_NIF_TERM term, dataframe_column_type type, dataframe_cell *cell) {
memset(cell, 0, sizeof(*cell));
ERL_NIF_TERM kind_term;
if (map_get_binary_key(env, term, "kind", &kind_term)) {
char *kind = NULL;
if (!copy_binary_bytes(env, kind_term, &kind, &(size_t){0})) return 0;
bool is_na = strcmp(kind, "na") == 0;
free(kind);
if (is_na) {
ERL_NIF_TERM type_term;
char *type_name = NULL;
if (!map_get_binary_key(env, term, "type", &type_term) ||
!copy_binary_bytes(env, type_term, &type_name, &(size_t){0})) {
free(type_name);
return 0;
}
bool matches = strcmp(type_name, dataframe_type_name(type)) == 0;
free(type_name);
if (!matches) return 0;
cell->is_na = true;
return 1;
}
return 0;
}
return parse_dataframe_non_na_value(env, term, type, cell);
}
static int dataframe_name_duplicate(char **names, unsigned index, const char *name) {
for (unsigned i = 0; i < index; i++) {
if (strcmp(names[i], name) == 0) return 1;
}
return 0;
}
static int parse_dataframe_wire(ErlNifEnv *env, ERL_NIF_TERM term, dataframe_wire **out) {
*out = NULL;
ERL_NIF_TERM kind_term;
char *kind = NULL;
if (!map_get_binary_key(env, term, "kind", &kind_term) ||
!copy_binary_bytes(env, kind_term, &kind, &(size_t){0})) {
free(kind);
return 0;
}
bool is_dataframe = strcmp(kind, "data_frame") == 0;
free(kind);
if (!is_dataframe) return 0;
ERL_NIF_TERM names_term;
ERL_NIF_TERM n_rows_term;
ERL_NIF_TERM columns_term;
unsigned name_count = 0;
unsigned column_count = 0;
ErlNifSInt64 n_rows_value = 0;
if (!map_get_binary_key(env, term, "names", &names_term) ||
!map_get_binary_key(env, term, "n_rows", &n_rows_term) ||
!map_get_binary_key(env, term, "columns", &columns_term) ||
!enif_get_list_length(env, names_term, &name_count) ||
!enif_get_list_length(env, columns_term, &column_count) ||
!enif_get_int64(env, n_rows_term, &n_rows_value) ||
n_rows_value < 0 || n_rows_value > INT_MAX ||
name_count != column_count ||
(name_count == 0 && n_rows_value > 0)) {
return 0;
}
dataframe_wire *wire = calloc(1, sizeof(dataframe_wire));
if (wire == NULL) return 0;
wire->name_count = name_count;
wire->column_count = column_count;
wire->n_rows = (unsigned)n_rows_value;
if (name_count > 0) {
wire->names = calloc(name_count, sizeof(char *));
wire->columns = calloc(column_count, sizeof(dataframe_column));
if (wire->names == NULL || wire->columns == NULL) {
free_dataframe_wire(wire);
return 0;
}
}
ERL_NIF_TERM head;
ERL_NIF_TERM tail = names_term;
for (unsigned i = 0; i < name_count; i++) {
size_t ignored_len = 0;
if (!enif_get_list_cell(env, tail, &head, &tail) ||
!copy_binary_bytes(env, head, &wire->names[i], &ignored_len) ||
wire->names[i][0] == '\0' ||
dataframe_name_duplicate(wire->names, i, wire->names[i])) {
free_dataframe_wire(wire);
return 0;
}
}
tail = columns_term;
for (unsigned i = 0; i < column_count; i++) {
if (!enif_get_list_cell(env, tail, &head, &tail)) {
free_dataframe_wire(wire);
return 0;
}
dataframe_column *column = &wire->columns[i];
ERL_NIF_TERM name_term;
ERL_NIF_TERM type_term;
ERL_NIF_TERM values_term;
char *type_name = NULL;
unsigned value_count = 0;
if (!map_get_binary_key(env, head, "name", &name_term) ||
!map_get_binary_key(env, head, "type", &type_term) ||
!map_get_binary_key(env, head, "values", &values_term) ||
!copy_binary_bytes(env, name_term, &column->name, &(size_t){0}) ||
strcmp(column->name, wire->names[i]) != 0 ||
!copy_binary_bytes(env, type_term, &type_name, &(size_t){0}) ||
!dataframe_type_from_name(type_name, &column->type) ||
!enif_get_list_length(env, values_term, &value_count) ||
value_count != wire->n_rows) {
free(type_name);
free_dataframe_wire(wire);
return 0;
}
free(type_name);
column->value_count = value_count;
if (value_count > 0) {
column->values = calloc(value_count, sizeof(dataframe_cell));
if (column->values == NULL) {
free_dataframe_wire(wire);
return 0;
}
}
ERL_NIF_TERM value_head;
ERL_NIF_TERM value_tail = values_term;
for (unsigned j = 0; j < value_count; j++) {
if (!enif_get_list_cell(env, value_tail, &value_head, &value_tail) ||
!parse_dataframe_cell(env, value_head, column->type, &column->values[j])) {
free_dataframe_wire(wire);
return 0;
}
}
}
*out = wire;
return 1;
}
static int parse_dataframe_opts(ErlNifEnv *env, ERL_NIF_TERM opts, bool *has_max_rows, unsigned *max_rows) {
*has_max_rows = false;
*max_rows = 0;
unsigned count = 0;
if (!enif_get_list_length(env, opts, &count)) return 0;
ERL_NIF_TERM head;
ERL_NIF_TERM tail = opts;
for (unsigned i = 0; i < count; i++) {
const ERL_NIF_TERM *tuple;
int arity = 0;
if (!enif_get_list_cell(env, tail, &head, &tail) ||
!enif_get_tuple(env, head, &arity, &tuple) ||
arity != 2) {
return 0;
}
if (enif_is_identical(tuple[0], enif_make_atom(env, "max_rows"))) {
ErlNifSInt64 value = 0;
if (!enif_get_int64(env, tuple[1], &value) || value <= 0 || value > INT_MAX) return 0;
*has_max_rows = true;
*max_rows = (unsigned)value;
}
}
return 1;
}
static void enqueue_work(r_work *work) {
pthread_mutex_lock(&queue_mutex);
if (queue_tail) queue_tail->next = work;
else queue_head = work;
queue_tail = work;
pthread_cond_signal(&queue_cond);
pthread_mutex_unlock(&queue_mutex);
}
static r_work *dequeue_work(void) {
pthread_mutex_lock(&queue_mutex);
while (!queue_head) pthread_cond_wait(&queue_cond, &queue_mutex);
r_work *work = queue_head;
queue_head = work->next;
if (!queue_head) queue_tail = NULL;
pthread_mutex_unlock(&queue_mutex);
return work;
}
static void finish_work(r_work *work) {
if (work->kind == WORK_RELEASE) {
free(work);
return;
}
pthread_mutex_lock(&queue_mutex);
work->done = true;
pthread_cond_signal(&work->done_cond);
pthread_mutex_unlock(&queue_mutex);
}
static void configure_r_c_stack(void) {
char stack_anchor;
uintptr_t *r_c_stack_start = (uintptr_t *)dlsym(lib_r_handle, "R_CStackStart");
uintptr_t *r_c_stack_limit = (uintptr_t *)dlsym(lib_r_handle, "R_CStackLimit");
int *r_c_stack_dir = (int *)dlsym(lib_r_handle, "R_CStackDir");
if (r_c_stack_start != NULL) {
*r_c_stack_start = (uintptr_t)&stack_anchor;
}
if (r_c_stack_limit != NULL) {
*r_c_stack_limit = (uintptr_t)-1;
}
if (r_c_stack_dir != NULL) {
*r_c_stack_dir = -1;
}
}
static bool load_symbol(void **out, const char *name) {
*out = dlsym(lib_r_handle, name);
return *out != NULL;
}
static int ensure_eval_helper(r_work *work);
static int ensure_plot_helper(r_work *work);
static bool load_r_api(r_work *work) {
if (r.mk_string != NULL) return true;
r_api candidate = {0};
if (!load_symbol((void **)&candidate.mk_string, "Rf_mkString") ||
!load_symbol((void **)&candidate.protect, "Rf_protect") ||
!load_symbol((void **)&candidate.unprotect, "Rf_unprotect") ||
!load_symbol((void **)&candidate.parse_vector, "R_ParseVector") ||
!load_symbol((void **)&candidate.try_eval, "R_tryEval") ||
!load_symbol((void **)&candidate.toplevel_exec, "R_ToplevelExec") ||
!load_symbol((void **)&candidate.xlength_fn, "Rf_xlength") ||
!load_symbol((void **)&candidate.length_fn, "LENGTH") ||
!load_symbol((void **)&candidate.type_of, "TYPEOF") ||
!load_symbol((void **)&candidate.vector_elt, "VECTOR_ELT") ||
!load_symbol((void **)&candidate.integer_elt, "INTEGER_ELT") ||
!load_symbol((void **)&candidate.real_elt, "REAL_ELT") ||
!load_symbol((void **)&candidate.logical_elt, "LOGICAL_ELT") ||
!load_symbol((void **)&candidate.string_elt, "STRING_ELT") ||
!load_symbol((void **)&candidate.r_char, "R_CHAR") ||
!load_symbol((void **)&candidate.preserve_object, "R_PreserveObject") ||
!load_symbol((void **)&candidate.release_object, "R_ReleaseObject") ||
!load_symbol((void **)&candidate.scalar_real, "Rf_ScalarReal") ||
!load_symbol((void **)&candidate.scalar_integer, "Rf_ScalarInteger") ||
!load_symbol((void **)&candidate.scalar_logical, "Rf_ScalarLogical") ||
!load_symbol((void **)&candidate.mk_char_len_ce, "Rf_mkCharLenCE") ||
!load_symbol((void **)&candidate.scalar_string, "Rf_ScalarString") ||
!load_symbol((void **)&candidate.alloc_vector, "Rf_allocVector") ||
!load_symbol((void **)&candidate.raw, "RAW") ||
!load_symbol((void **)&candidate.set_integer_elt, "SET_INTEGER_ELT") ||
!load_symbol((void **)&candidate.set_real_elt, "SET_REAL_ELT") ||
!load_symbol((void **)&candidate.set_logical_elt, "SET_LOGICAL_ELT") ||
!load_symbol((void **)&candidate.set_string_elt, "SET_STRING_ELT") ||
!load_symbol((void **)&candidate.set_vector_elt, "SET_VECTOR_ELT") ||
!load_symbol((void **)&candidate.set_attrib, "Rf_setAttrib") ||
!load_symbol((void **)&candidate.new_env, "R_NewEnv") ||
!load_symbol((void **)&candidate.install, "Rf_install") ||
!load_symbol((void **)&candidate.define_var, "Rf_defineVar") ||
!load_symbol((void **)&candidate.find_var_in_frame, "Rf_findVarInFrame") ||
!load_symbol((void **)&candidate.ls_internal, "R_lsInternal") ||
!load_symbol((void **)&candidate.lang1, "Rf_lang1") ||
!load_symbol((void **)&candidate.lang2, "Rf_lang2") ||
!load_symbol((void **)&candidate.lang3, "Rf_lang3") ||
!load_symbol((void **)&candidate.lang4, "Rf_lang4") ||
!load_symbol((void **)&candidate.get_attrib, "Rf_getAttrib") ||
!load_symbol((void **)&candidate.inherits, "Rf_inherits") ||
!load_symbol((void **)&candidate.nil_value, "R_NilValue") ||
!load_symbol((void **)&candidate.global_env, "R_GlobalEnv") ||
!load_symbol((void **)&candidate.names_symbol, "R_NamesSymbol") ||
!load_symbol((void **)&candidate.class_symbol, "R_ClassSymbol") ||
!load_symbol((void **)&candidate.dim_symbol, "R_DimSymbol") ||
!load_symbol((void **)&candidate.na_string, "R_NaString") ||
!load_symbol((void **)&candidate.na_int, "R_NaInt") ||
!load_symbol((void **)&candidate.na_real, "R_NaReal")) {
set_error(work, "libR is missing one or more native API symbols");
return false;
}
r = candidate;
r_write_console =
(ptr_R_WriteConsole_fn *)dlsym(lib_r_handle, "ptr_R_WriteConsole");
r_write_console_ex =
(ptr_R_WriteConsoleEx_fn *)dlsym(lib_r_handle, "ptr_R_WriteConsoleEx");
return true;
}
static int apply_lib_paths(r_work *work) {
if (work->lib_paths_count == 0) return 1;
int protect_count = 0;
SEXP paths = r.protect(r.alloc_vector(STRSXP, work->lib_paths_count));
protect_count++;
for (unsigned i = 0; i < work->lib_paths_count; i++) {
SEXP path = r.protect(r.mk_char_len_ce(work->lib_paths[i], (int)strlen(work->lib_paths[i]), CE_UTF8));
protect_count++;
r.set_string_elt(paths, i, path);
}
SEXP current_call = r.protect(r.lang1(r.install(".libPaths")));
protect_count++;
int error_occurred = 0;
SEXP current = r.protect(r.try_eval(current_call, *r.global_env, &error_occurred));
protect_count++;
if (error_occurred) {
r.unprotect(protect_count);
set_error(work, "failed to read native R library paths");
return 0;
}
R_xlen_t current_len = r.xlength_fn(current);
R_xlen_t total_len = work->lib_paths_count + current_len;
SEXP combined = r.protect(r.alloc_vector(STRSXP, total_len));
protect_count++;
for (unsigned i = 0; i < work->lib_paths_count; i++) {
r.set_string_elt(combined, i, r.string_elt(paths, i));
}
for (R_xlen_t i = 0; i < current_len; i++) {
r.set_string_elt(combined, work->lib_paths_count + i, r.string_elt(current, i));
}
SEXP set_call = r.protect(r.lang2(r.install(".libPaths"), combined));
protect_count++;
error_occurred = 0;
r.try_eval(set_call, *r.global_env, &error_occurred);
if (error_occurred) {
r.unprotect(protect_count);
set_error(work, "failed to apply native R library paths");
return 0;
}
r.unprotect(protect_count);
return 1;
}
// Hand SIGCHLD back to its default disposition so embedded R's libc
// system()/popen() collect their own children instead of the BEAM's signal
// dispatcher consuming the exit (see the call site in do_init for the full
// rationale). Runs on the R owner thread; sigaction disposition is
// process-wide.
static void reset_sigchld_to_default(void) {
struct sigaction default_action;
memset(&default_action, 0, sizeof(default_action));
default_action.sa_handler = SIG_DFL;
sigemptyset(&default_action.sa_mask);
sigaction(SIGCHLD, &default_action, NULL);
}
static void do_init(r_work *work) {
work->ok = true;
work->error[0] = '\0';
work->init_return = INIT_RETURN_OK;
memset(&work->init_failure, 0, sizeof(work->init_failure));
memset(&work->init_mismatch, 0, sizeof(work->init_mismatch));
init_config requested = {0};
if (!init_config_copy_from_work(&requested, work)) {
work->init_return = INIT_RETURN_PLAIN_ERROR;
set_error(work, "failed to copy native init config");
return;
}
pthread_mutex_lock(&diagnostics_mutex);
init_attempt_count++;
if (!init_config_copy(&attempted_init_config, requested.lib_r_path, requested.r_home, requested.lib_paths, requested.lib_paths_count)) {
pthread_mutex_unlock(&diagnostics_mutex);
work->init_return = INIT_RETURN_PLAIN_ERROR;
set_error(work, "failed to record native init config");
init_config_clear(&requested);
return;
}
if (init_state == INIT_STATE_INITIALIZED) {
bool same_config = init_config_equal(&successful_init_config, &requested);
if (same_config) {
pthread_mutex_unlock(&diagnostics_mutex);
init_config_clear(&requested);
return;
}
init_mismatch_count++;
if (!fill_init_mismatch(&work->init_mismatch, &successful_init_config, &requested)) {
pthread_mutex_unlock(&diagnostics_mutex);
work->init_return = INIT_RETURN_PLAIN_ERROR;
set_error(work, "failed to record native init mismatch diagnostics");
init_config_clear(&requested);
return;
}
pthread_mutex_unlock(&diagnostics_mutex);
work->init_return = INIT_RETURN_MISMATCH;
set_error(work, "native init config mismatch");
init_config_clear(&requested);
return;
}
if (init_state == INIT_STATE_FAILED && has_last_init_error && !last_init_error.retryable) {
work->ok = false;
work->init_return = INIT_RETURN_FAILED;
work->init_failure = last_init_error;
pthread_mutex_unlock(&diagnostics_mutex);
init_config_clear(&requested);
return;
}
pthread_mutex_unlock(&diagnostics_mutex);
init_config pending_success_config = {0};
if (!init_config_copy(&pending_success_config, requested.lib_r_path, requested.r_home, requested.lib_paths, requested.lib_paths_count)) {
work->init_return = INIT_RETURN_PLAIN_ERROR;
set_error(work, "failed to record native init config");
init_config_clear(&requested);
return;
}
setenv("R_HOME", work->r_home, 1);
setenv("R_ENABLE_JIT", "0", 1);
setenv("R_DEFAULT_PACKAGES", "NULL", 1);
lib_r_handle = dlopen(work->lib_r_path, RTLD_NOW | RTLD_GLOBAL);
if (lib_r_handle == NULL) {
const char *message = dlerror();
set_init_failed(work, "dlopen_lib_r", message ? message : "failed to dlopen libR", true);
init_config_clear(&pending_success_config);
init_config_clear(&requested);
return;
}
configure_r_c_stack();
Rf_initialize_R_fn Rf_initialize_R =
(Rf_initialize_R_fn)dlsym(lib_r_handle, "Rf_initialize_R");
setup_Rmainloop_fn setup_Rmainloop =
(setup_Rmainloop_fn)dlsym(lib_r_handle, "setup_Rmainloop");
if (Rf_initialize_R == NULL || setup_Rmainloop == NULL) {
set_init_failed(work, "resolve_boot_symbols", "libR is missing Rf_initialize_R or setup_Rmainloop", true);
close_uninitialized_lib_r_handle();
init_config_clear(&pending_success_config);
init_config_clear(&requested);
return;
}
char *argv[] = {"R", "--silent", "--no-save", "--no-restore", "--no-readline"};
if (Rf_initialize_R(5, argv) < 0) {
set_init_failed(work, "rf_initialize_r", "Rf_initialize_R failed", false);
init_config_clear(&pending_success_config);
init_config_clear(&requested);
return;
}
if (init_fault_stage_enabled("after_rf_initialize")) {
set_init_failed(work, "rf_initialize_r", "fault injected after Rf_initialize_R", false);
init_config_clear(&pending_success_config);
init_config_clear(&requested);
return;
}
configure_r_c_stack();
setup_Rmainloop();
// The BEAM owns a process-wide SIGCHLD handler whose signal-dispatcher thread
// consumes child-process exits. Embedded R's libc system()/popen() then block
// forever in wait4() waiting for a child whose exit the BEAM already reaped,
// which hangs every R call that shells out (system/system2/pipe and therefore
// package/namespace loading via requireNamespace). Restore the default
// SIGCHLD disposition so R's own system()/popen() reap their children. This
// is safe for the BEAM: OTP reaps Port children through erl_child_setup, not
// through this handler (verified: Port/System.cmd/os:cmd keep working).
reset_sigchld_to_default();
if (!load_r_api(work)) {
set_init_failed(work, "load_r_api", work->error[0] == '\0' ? "libR is missing one or more native API symbols" : work->error, false);
init_config_clear(&pending_success_config);
init_config_clear(&requested);
return;
}
if (!apply_lib_paths(work)) {
set_init_failed(work, "apply_lib_paths", work->error[0] == '\0' ? "failed to apply native R library paths" : work->error, false);
init_config_clear(&pending_success_config);
init_config_clear(&requested);
return;
}
if (!ensure_eval_helper(work)) {
set_init_failed(work, "ensure_eval_helper", work->error[0] == '\0' ? "failed to initialize native eval helper" : work->error, false);
init_config_clear(&pending_success_config);
init_config_clear(&requested);
return;
}
pthread_mutex_lock(&diagnostics_mutex);
init_config_clear(&successful_init_config);
successful_init_config = pending_success_config;
memset(&pending_success_config, 0, sizeof(pending_success_config));
init_state = INIT_STATE_INITIALIZED;
has_last_init_error = false;
memset(&last_init_error, 0, sizeof(last_init_error));
pthread_mutex_unlock(&diagnostics_mutex);
runtime_initialized = true;
init_config_clear(&requested);
}
static void set_unsupported_result(r_work *work, SEXP value) {
work->result_kind = RESULT_UNSUPPORTED;
work->unsupported_type = r.type_of(value);
}
static rx_object_resource *make_preserved_resource(SEXP value, resource_kind kind);
static int copy_r_string(SEXP string, char **out, size_t *out_len) {
int length = r.length_fn(string);
if (length < 0) return 0;
const char *chars = r.r_char(string);
size_t size = (size_t)length;
char *copy = malloc(size + 1);
if (copy == NULL) return 0;
if (size > 0 && chars != NULL) memcpy(copy, chars, size);
copy[size] = '\0';
*out = copy;
*out_len = size;
return 1;
}
static decoded_value *alloc_decoded_value(decoded_value_kind kind) {
decoded_value *value = calloc(1, sizeof(decoded_value));
if (value != NULL) value->kind = kind;
return value;
}
static resource_kind resource_kind_for_object(SEXP value) {
if (r.inherits != NULL && r.inherits(value, "data.frame")) {
return RESOURCE_KIND_DATAFRAME;
}
return RESOURCE_KIND_GENERIC;
}
static int decoded_object(SEXP value, decoded_value **out) {
decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_OBJECT);
if (decoded == NULL) return 0;
decoded->resource = make_preserved_resource(value, resource_kind_for_object(value));
if (decoded->resource == NULL) {
free_decoded_value(decoded);
return 0;
}
*out = decoded;
return 1;
}
static int decode_opaque_value(r_work *work, SEXP value, int depth, decoded_value **out) {
if (depth == 0) {
set_unsupported_result(work, value);
*out = NULL;
return 1;
}
if (!decoded_object(value, out)) {
set_error(work, "failed to allocate native opaque decode result");
return 0;
}
return 1;
}
static int has_non_empty_attribute(SEXP value, SEXP symbol) {
if (symbol == NULL || value == *r.nil_value) return 0;
SEXP attr = r.get_attrib(value, symbol);
return attr != *r.nil_value && r.xlength_fn(attr) > 0;
}
static int is_semantic_object(SEXP value) {
return has_non_empty_attribute(value, *r.class_symbol) ||
has_non_empty_attribute(value, *r.dim_symbol);
}
static int decoded_count(R_xlen_t length, unsigned *out) {
if (length < 0 || (uint64_t)length > UINT_MAX) return 0;
*out = (unsigned)length;
return 1;
}
static int decoded_na(decoded_atomic_type type, decoded_value **out) {
decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_NA);
if (decoded == NULL) return 0;
decoded->atomic_type = type;
*out = decoded;
return 1;
}
static int decoded_logical(int value, decoded_value **out) {
if (value == *r.na_int) return decoded_na(DECODE_ATOMIC_LOGICAL, out);
decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_LOGICAL);
if (decoded == NULL) return 0;
decoded->logical_value = value == 1;
*out = decoded;
return 1;
}
static int decoded_integer(int value, decoded_value **out) {
if (value == *r.na_int) return decoded_na(DECODE_ATOMIC_INTEGER, out);
decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_INTEGER);
if (decoded == NULL) return 0;
decoded->integer_value = value;
*out = decoded;
return 1;
}
static int decoded_double(double value, decoded_value **out) {
if (isnan(value)) return decoded_na(DECODE_ATOMIC_DOUBLE, out);
decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_DOUBLE);
if (decoded == NULL) return 0;
decoded->double_value = value;
*out = decoded;
return 1;
}
static int decoded_character(SEXP string, decoded_value **out) {
if (string == *r.na_string) return decoded_na(DECODE_ATOMIC_CHARACTER, out);
decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_CHARACTER);
if (decoded == NULL) return 0;
if (!copy_r_string(string, &decoded->string_value, &decoded->string_value_len)) {
free_decoded_value(decoded);
return 0;
}
*out = decoded;
return 1;
}
static int decode_r_value(r_work *work, SEXP value, int depth, decoded_value **out);
static int decode_atomic_vector(r_work *work, SEXP value, decoded_atomic_type type, decoded_value **out) {
R_xlen_t length = r.xlength_fn(value);
unsigned count = 0;
if (!decoded_count(length, &count)) {
set_error(work, "native vector result is too large");
return 0;
}
decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_VECTOR);
if (decoded == NULL) {
set_error(work, "failed to allocate native vector decode result");
return 0;
}
decoded->atomic_type = type;
decoded->count = count;
if (count > 0) {
decoded->items = calloc(count, sizeof(decoded_value *));
if (decoded->items == NULL) {
free_decoded_value(decoded);
set_error(work, "failed to allocate native vector decode result");
return 0;
}
}
for (unsigned i = 0; i < count; i++) {
int ok = 0;
switch (type) {
case DECODE_ATOMIC_LOGICAL:
ok = decoded_logical(r.logical_elt(value, (R_xlen_t)i), &decoded->items[i]);
break;
case DECODE_ATOMIC_INTEGER:
ok = decoded_integer(r.integer_elt(value, (R_xlen_t)i), &decoded->items[i]);
break;
case DECODE_ATOMIC_DOUBLE:
ok = decoded_double(r.real_elt(value, (R_xlen_t)i), &decoded->items[i]);
break;
case DECODE_ATOMIC_CHARACTER:
ok = decoded_character(r.string_elt(value, (R_xlen_t)i), &decoded->items[i]);
break;
}
if (!ok) {
free_decoded_value(decoded);
set_error(work, "failed to copy native vector decode result");
return 0;
}
}
*out = decoded;
return 1;
}
static int copy_list_name(SEXP names, unsigned index, char **out) {
*out = NULL;
if (names == *r.nil_value || r.type_of(names) != STRSXP ||
r.xlength_fn(names) <= (R_xlen_t)index) {
return 1;
}
SEXP name = r.string_elt(names, (R_xlen_t)index);
if (name == *r.na_string || r.length_fn(name) == 0) {
return 1;
}
size_t len = 0;
return copy_r_string(name, out, &len);
}
static int duplicate_name(decoded_entry *entries, unsigned index, const char *name) {
if (name == NULL) return 0;
for (unsigned i = 0; i < index; i++) {
if (entries[i].name != NULL && strcmp(entries[i].name, name) == 0) {
return 1;
}
}
return 0;
}
static int decode_r_list(r_work *work, SEXP value, int depth, decoded_value **out) {
if (depth >= RX_DECODE_MAX_DEPTH) {
return decode_opaque_value(work, value, depth, out);
}
R_xlen_t length = r.xlength_fn(value);
unsigned count = 0;
if (!decoded_count(length, &count)) {
set_error(work, "native list result is too large");
return 0;
}
decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_NAMED_LIST);
if (decoded == NULL) {
set_error(work, "failed to allocate native list decode result");
return 0;
}
decoded->count = count;
if (count > 0) {
decoded->entries = calloc(count, sizeof(decoded_entry));
if (decoded->entries == NULL) {
free_decoded_value(decoded);
set_error(work, "failed to allocate native list decode result");
return 0;
}
}
SEXP names = r.get_attrib(value, *r.names_symbol);
int has_valid_names =
names != *r.nil_value && r.type_of(names) == STRSXP && r.xlength_fn(names) == length;
int fully_named = count == 0 ? 1 : has_valid_names;
for (unsigned i = 0; i < count; i++) {
if (!copy_list_name(has_valid_names ? names : *r.nil_value, i, &decoded->entries[i].name)) {
free_decoded_value(decoded);
set_error(work, "failed to copy native list name");
return 0;
}
if (decoded->entries[i].name == NULL ||
duplicate_name(decoded->entries, i, decoded->entries[i].name)) {
fully_named = 0;
}
if (!decode_r_value(work, r.vector_elt(value, (R_xlen_t)i), depth + 1, &decoded->entries[i].value)) {
free_decoded_value(decoded);
return 0;
}
}
decoded->kind = fully_named ? DECODE_VALUE_NAMED_LIST : DECODE_VALUE_R_LIST;
*out = decoded;
return 1;
}
static int decode_r_value(r_work *work, SEXP value, int depth, decoded_value **out) {
*out = NULL;
if (value == *r.nil_value || r.type_of(value) == NILSXP) {
decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_NULL);
if (decoded == NULL) {
set_error(work, "failed to allocate native NULL decode result");
return 0;
}
*out = decoded;
return 1;
}
if (is_semantic_object(value)) {
return decode_opaque_value(work, value, depth, out);
}
int type = r.type_of(value);
R_xlen_t length = r.xlength_fn(value);
switch (type) {
case LGLSXP:
if (length == 1) {
if (!decoded_logical(r.logical_elt(value, 0), out)) {
set_error(work, "failed to copy native logical decode result");
return 0;
}
return 1;
}
return decode_atomic_vector(work, value, DECODE_ATOMIC_LOGICAL, out);
case INTSXP:
if (length == 1) {
if (!decoded_integer(r.integer_elt(value, 0), out)) {
set_error(work, "failed to copy native integer decode result");
return 0;
}
return 1;
}
return decode_atomic_vector(work, value, DECODE_ATOMIC_INTEGER, out);
case REALSXP:
if (length == 1) {
if (!decoded_double(r.real_elt(value, 0), out)) {
set_error(work, "failed to copy native double decode result");
return 0;
}
return 1;
}
return decode_atomic_vector(work, value, DECODE_ATOMIC_DOUBLE, out);
case STRSXP:
if (length == 1) {
if (!decoded_character(r.string_elt(value, 0), out)) {
set_error(work, "failed to copy native character decode result");
return 0;
}
return 1;
}
return decode_atomic_vector(work, value, DECODE_ATOMIC_CHARACTER, out);
case VECSXP:
return decode_r_list(work, value, depth, out);
default:
return decode_opaque_value(work, value, depth, out);
}
}
static void copy_scalar_result(r_work *work, SEXP value) {
work->result_kind = RESULT_NONE;
work->result_length = 0;
int type = r.type_of(value);
R_xlen_t length = r.xlength_fn(value);
if (value == *r.nil_value || type == NILSXP) {
work->result_kind = RESULT_NULL;
return;
}
if (length != 1) {
set_unsupported_result(work, value);
return;
}
switch (type) {
case INTSXP:
work->result_kind = RESULT_INTEGER;
work->integer_result = r.integer_elt(value, 0);
break;
case REALSXP:
work->result_kind = RESULT_DOUBLE;
work->double_result = r.real_elt(value, 0);
break;
case LGLSXP:
work->result_kind = RESULT_LOGICAL;
work->logical_result = r.logical_elt(value, 0) == 1;
break;
case STRSXP: {
SEXP string = r.string_elt(value, 0);
if (!copy_r_string(string, &work->string_result, &work->string_result_len)) {
set_error(work, "failed to allocate eval string result");
} else {
work->result_kind = RESULT_CHARACTER;
}
break;
}
default:
set_unsupported_result(work, value);
break;
}
}
static void copy_logical_vector_result(r_work *work, SEXP value) {
R_xlen_t length = r.xlength_fn(value);
work->result_length = length;
if (length > 0) {
if ((uint64_t)length > (uint64_t)SIZE_MAX / sizeof(bool)) {
set_error(work, "logical vector result is too large");
return;
}
work->logical_values = malloc((size_t)length * sizeof(bool));
if (work->logical_values == NULL) {
set_error(work, "failed to allocate logical vector result");
return;
}
}
for (R_xlen_t i = 0; i < length; i++) {
work->logical_values[i] = r.logical_elt(value, i) == 1;
}
work->result_kind = RESULT_LOGICAL_VECTOR;
}
static void copy_integer_vector_result(r_work *work, SEXP value) {
R_xlen_t length = r.xlength_fn(value);
work->result_length = length;
if (length > 0) {
if ((uint64_t)length > (uint64_t)SIZE_MAX / sizeof(int)) {
set_error(work, "integer vector result is too large");
return;
}
work->integer_values = malloc((size_t)length * sizeof(int));
if (work->integer_values == NULL) {
set_error(work, "failed to allocate integer vector result");
return;
}
}
for (R_xlen_t i = 0; i < length; i++) {
work->integer_values[i] = r.integer_elt(value, i);
}
work->result_kind = RESULT_INTEGER_VECTOR;
}
static void copy_double_vector_result(r_work *work, SEXP value) {
R_xlen_t length = r.xlength_fn(value);
work->result_length = length;
if (length > 0) {
if ((uint64_t)length > (uint64_t)SIZE_MAX / sizeof(double)) {
set_error(work, "double vector result is too large");
return;
}
work->double_values = malloc((size_t)length * sizeof(double));
if (work->double_values == NULL) {
set_error(work, "failed to allocate double vector result");
return;
}
}
for (R_xlen_t i = 0; i < length; i++) {
work->double_values[i] = r.real_elt(value, i);
}
work->result_kind = RESULT_DOUBLE_VECTOR;
}
static void copy_character_vector_result(r_work *work, SEXP value) {
R_xlen_t length = r.xlength_fn(value);
work->result_length = length;
if (length > 0) {
if ((uint64_t)length > (uint64_t)SIZE_MAX / sizeof(char *) ||
(uint64_t)length > (uint64_t)SIZE_MAX / sizeof(size_t)) {
set_error(work, "character vector result is too large");
return;
}
work->string_values = calloc((size_t)length, sizeof(char *));
work->string_value_lens = calloc((size_t)length, sizeof(size_t));
if (work->string_values == NULL || work->string_value_lens == NULL) {
set_error(work, "failed to allocate character vector result");
return;
}
}
for (R_xlen_t i = 0; i < length; i++) {
SEXP string = r.string_elt(value, i);
if (!copy_r_string(string, &work->string_values[i], &work->string_value_lens[i])) {
set_error(work, "failed to copy character vector result");
return;
}
}
work->result_kind = RESULT_CHARACTER_VECTOR;
}
static void copy_generic_result(r_work *work, SEXP value) {
free_decoded_value(work->decoded_result);
work->decoded_result = NULL;
work->result_kind = RESULT_NONE;
(void)decode_r_value(work, value, 0, &work->decoded_result);
}
static void copy_resource_result(r_work *work, SEXP value, resource_kind kind) {
work->result_kind = RESULT_NONE;
work->result_length = 0;
switch (kind) {
case RESOURCE_KIND_NULL:
work->result_kind = RESULT_NULL;
return;
case RESOURCE_KIND_LOGICAL:
work->result_kind = RESULT_LOGICAL;
work->logical_result = r.logical_elt(value, 0) == 1;
return;
case RESOURCE_KIND_INTEGER:
work->result_kind = RESULT_INTEGER;
work->integer_result = r.integer_elt(value, 0);
return;
case RESOURCE_KIND_DOUBLE:
work->result_kind = RESULT_DOUBLE;
work->double_result = r.real_elt(value, 0);
return;
case RESOURCE_KIND_CHARACTER: {
SEXP string = r.string_elt(value, 0);
if (!copy_r_string(string, &work->string_result, &work->string_result_len)) {
set_error(work, "failed to allocate character result");
} else {
work->result_kind = RESULT_CHARACTER;
}
return;
}
case RESOURCE_KIND_LOGICAL_VECTOR:
copy_logical_vector_result(work, value);
return;
case RESOURCE_KIND_INTEGER_VECTOR:
copy_integer_vector_result(work, value);
return;
case RESOURCE_KIND_DOUBLE_VECTOR:
copy_double_vector_result(work, value);
return;
case RESOURCE_KIND_CHARACTER_VECTOR:
copy_character_vector_result(work, value);
return;
case RESOURCE_KIND_GENERIC:
copy_generic_result(work, value);
return;
case RESOURCE_KIND_DATAFRAME:
set_unsupported_result(work, value);
return;
}
}
typedef struct eval_string_ctx {
r_work *work;
bool ok;
char error[1024];
} eval_string_ctx;
static void eval_string_body(void *data) {
eval_string_ctx *ctx = (eval_string_ctx *)data;
r_work *work = ctx->work;
int protect_count = 0;
ParseStatus parse_status = PARSE_NULL;
SEXP source = r.protect(r.mk_string(work->source));
protect_count++;
SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
protect_count++;
if (parse_status != PARSE_OK) {
snprintf(ctx->error, sizeof(ctx->error), "R parse failed");
r.unprotect(protect_count);
return;
}
SEXP result = *r.nil_value;
R_xlen_t expr_count = r.xlength_fn(exprs);
for (R_xlen_t i = 0; i < expr_count; i++) {
int error_occurred = 0;
result = r.try_eval(r.vector_elt(exprs, i), *r.global_env, &error_occurred);
if (error_occurred) {
snprintf(ctx->error, sizeof(ctx->error), "R evaluation failed");
r.unprotect(protect_count);
return;
}
}
copy_scalar_result(work, result);
r.unprotect(protect_count);
ctx->ok = work->ok;
if (!work->ok) {
snprintf(ctx->error, sizeof(ctx->error), "%s", work->error);
}
}
static void do_eval_string(r_work *work) {
work->ok = true;
work->error[0] = '\0';
work->result_kind = RESULT_NONE;
work->string_result = NULL;
work->unsupported_type = 0;
if (set_terminal_init_failure_if_present(work)) return;
if (!runtime_initialized) {
set_error(work, "embedded R runtime is not initialized");
return;
}
if (!load_r_api(work)) return;
eval_string_ctx ctx = {.work = work, .ok = false, .error = ""};
if (!r.toplevel_exec(eval_string_body, &ctx)) {
set_error(work, "R non-local error during eval_string");
return;
}
if (!ctx.ok) {
set_error(work, ctx.error[0] == '\0' ? "R evaluation failed" : ctx.error);
}
}
typedef struct eval_ctx {
r_work *work;
bool ok;
char error[1024];
} eval_ctx;
static rx_object_resource *make_preserved_resource(SEXP value, resource_kind kind) {
rx_object_resource *resource = enif_alloc_resource(object_resource_type, sizeof(rx_object_resource));
if (resource == NULL) return NULL;
resource->sexp = NULL;
resource->id = reserve_object_id();
resource->kind = kind;
resource->release_enqueued = false;
pthread_mutex_init(&resource->mutex, NULL);
r.preserve_object(value);
pthread_mutex_lock(&resource->mutex);
resource->sexp = value;
pthread_mutex_unlock(&resource->mutex);
return resource;
}
static int append_character_vector(byte_buffer *buffer, SEXP value) {
if (value == *r.nil_value) return 1;
if (r.type_of(value) != STRSXP) return 0;
R_xlen_t count = r.xlength_fn(value);
for (R_xlen_t i = 0; i < count; i++) {
SEXP string = r.string_elt(value, i);
const char *chars = r.r_char(string);
int len = r.length_fn(string);
if (len < 0) return 0;
if (!buffer_append(buffer, chars, (size_t)len)) return 0;
if (!buffer_append(buffer, "\n", 1)) return 0;
}
return 1;
}
static int append_character_vector_join(byte_buffer *buffer, SEXP value, const char *separator) {
if (value == *r.nil_value) return 1;
if (r.type_of(value) != STRSXP) return 0;
size_t separator_len = strlen(separator);
R_xlen_t count = r.xlength_fn(value);
for (R_xlen_t i = 0; i < count; i++) {
if ((buffer->len > 0 || i > 0) && separator_len > 0) {
if (!buffer_append(buffer, separator, separator_len)) return 0;
}
SEXP string = r.string_elt(value, i);
const char *chars = r.r_char(string);
int len = r.length_fn(string);
if (len < 0) return 0;
if (!buffer_append(buffer, chars, (size_t)len)) return 0;
}
return 1;
}
static int copy_character_vector_to_array(SEXP value, char ***out, unsigned *out_count) {
*out = NULL;
*out_count = 0;
if (value == *r.nil_value) return 1;
if (r.type_of(value) != STRSXP) return 0;
R_xlen_t count = r.xlength_fn(value);
if (count < 0 || (uint64_t)count > UINT_MAX) return 0;
char **items = calloc((unsigned)count, sizeof(char *));
if (items == NULL && count > 0) return 0;
for (R_xlen_t i = 0; i < count; i++) {
size_t len = 0;
if (!copy_r_string(r.string_elt(value, i), &items[i], &len)) {
free_string_array(items, (unsigned)count);
return 0;
}
}
*out = items;
*out_count = (unsigned)count;
return 1;
}
static int copy_optional_character(SEXP value, char *out, size_t out_size);
static int fill_print_error_from_helper_result(r_work *work, SEXP helper_result) {
if (r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) return 0;
work->structured_error = true;
if (!copy_optional_character(r.vector_elt(helper_result, 2), work->error_message, sizeof(work->error_message))) {
return 0;
}
if (work->error_message[0] == '\0') {
snprintf(work->error_message, sizeof(work->error_message), "R print failed");
}
if (!copy_character_vector_to_array(
r.vector_elt(helper_result, 3),
&work->error_classes,
&work->error_class_count)) {
return 0;
}
if (!copy_optional_character(r.vector_elt(helper_result, 4), work->error_call, sizeof(work->error_call))) {
return 0;
}
if (!append_character_vector_join(&work->stdout_buffer, r.vector_elt(helper_result, 5), "\n")) return 0;
if (!append_character_vector_join(&work->messages_buffer, r.vector_elt(helper_result, 6), "")) return 0;
if (!append_character_vector_join(&work->warnings_buffer, r.vector_elt(helper_result, 7), "")) return 0;
return 1;
}
static int copy_optional_character(SEXP value, char *out, size_t out_size) {
if (out_size == 0) return 0;
out[0] = '\0';
if (value == *r.nil_value) return 1;
if (r.type_of(value) != STRSXP || r.xlength_fn(value) < 1) return 0;
char *copy = NULL;
size_t len = 0;
if (!copy_r_string(r.string_elt(value, 0), ©, &len)) return 0;
size_t copy_len = len < out_size - 1 ? len : out_size - 1;
if (copy_len > 0) memcpy(out, copy, copy_len);
out[copy_len] = '\0';
free(copy);
return 1;
}
static int set_parse_error_map(r_work *work) {
work->structured_error = true;
snprintf(work->error_message, sizeof(work->error_message), "parse error: R parse failed");
work->error_call[0] = '\0';
work->error_classes = calloc(1, sizeof(char *));
if (work->error_classes == NULL) return 0;
work->error_classes[0] = strdup("parseError");
if (work->error_classes[0] == NULL) {
free(work->error_classes);
work->error_classes = NULL;
return 0;
}
work->error_class_count = 1;
return 1;
}
static int fill_error_from_helper_result(r_work *work, SEXP helper_result) {
if (r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) return 0;
work->structured_error = true;
if (!copy_optional_character(r.vector_elt(helper_result, 2), work->error_message, sizeof(work->error_message))) {
return 0;
}
if (work->error_message[0] == '\0') {
snprintf(work->error_message, sizeof(work->error_message), "R evaluation failed");
}
if (!copy_character_vector_to_array(
r.vector_elt(helper_result, 3),
&work->error_classes,
&work->error_class_count)) {
return 0;
}
if (!copy_optional_character(r.vector_elt(helper_result, 4), work->error_call, sizeof(work->error_call))) {
return 0;
}
if (!append_character_vector(&work->stdout_buffer, r.vector_elt(helper_result, 5))) return 0;
if (!append_character_vector(&work->messages_buffer, r.vector_elt(helper_result, 6))) return 0;
if (!append_character_vector(&work->warnings_buffer, r.vector_elt(helper_result, 7))) return 0;
return 1;
}
static int fill_plot_error_from_helper_result(r_work *work, SEXP helper_result) {
if (r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) return 0;
work->structured_error = true;
if (!copy_optional_character(r.vector_elt(helper_result, 2), work->error_message, sizeof(work->error_message))) {
work->structured_error = false;
return 0;
}
if (work->error_message[0] == '\0') {
snprintf(work->error_message, sizeof(work->error_message), "R plot failed");
}
if (!copy_character_vector_to_array(
r.vector_elt(helper_result, 3),
&work->error_classes,
&work->error_class_count)) {
work->structured_error = false;
return 0;
}
if (!copy_optional_character(r.vector_elt(helper_result, 4), work->error_call, sizeof(work->error_call))) {
work->structured_error = false;
return 0;
}
if (!append_character_vector_join(&work->stdout_buffer, r.vector_elt(helper_result, 5), "\n")) {
work->structured_error = false;
return 0;
}
if (!append_character_vector_join(&work->messages_buffer, r.vector_elt(helper_result, 6), "")) {
work->structured_error = false;
return 0;
}
if (!append_character_vector_join(&work->warnings_buffer, r.vector_elt(helper_result, 7), "")) {
work->structured_error = false;
return 0;
}
return 1;
}
static const char *eval_helper_source =
"function(exprs, env) {\n"
" messages <- character()\n"
" warnings <- character()\n"
" stdout <- character()\n"
" value <- NULL\n"
" con <- textConnection(\"stdout\", \"w\", local = TRUE)\n"
" sinking <- FALSE\n"
" closed <- FALSE\n"
" finish <- function() {\n"
" if (sinking) {\n"
" sink(type = \"output\")\n"
" sinking <<- FALSE\n"
" }\n"
" if (!closed) {\n"
" close(con)\n"
" closed <<- TRUE\n"
" }\n"
" }\n"
" sink(con, type = \"output\")\n"
" sinking <- TRUE\n"
" tryCatch(\n"
" {\n"
" result <- withCallingHandlers({\n"
" for (expr in exprs) value <- eval(expr, envir = env)\n"
" list(TRUE, value, NULL, NULL, NULL, stdout, messages, warnings)\n"
" },\n"
" message = function(m) {\n"
" messages <<- c(messages, conditionMessage(m))\n"
" invokeRestart(\"muffleMessage\")\n"
" },\n"
" warning = function(w) {\n"
" warnings <<- c(warnings, conditionMessage(w))\n"
" invokeRestart(\"muffleWarning\")\n"
" })\n"
" finish()\n"
" result[[6]] <- stdout\n"
" result[[7]] <- messages\n"
" result[[8]] <- warnings\n"
" result\n"
" },\n"
" error = function(e) {\n"
" finish()\n"
" call <- conditionCall(e)\n"
" call_text <- if (is.null(call)) NULL else paste(deparse(call), collapse = \"\\n\")\n"
" list(FALSE, NULL, conditionMessage(e), class(e), call_text, stdout, messages, warnings)\n"
" }\n"
" )\n"
"}";
static int ensure_eval_helper(r_work *work) {
if (eval_helper != NULL) return 1;
int protect_count = 0;
ParseStatus parse_status = PARSE_NULL;
SEXP source = r.protect(r.mk_string(eval_helper_source));
protect_count++;
SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
protect_count++;
if (parse_status != PARSE_OK || r.xlength_fn(exprs) < 1) {
r.unprotect(protect_count);
set_error(work, "failed to parse native eval helper");
return 0;
}
int error_occurred = 0;
SEXP helper = r.try_eval(r.vector_elt(exprs, 0), *r.global_env, &error_occurred);
if (error_occurred) {
r.unprotect(protect_count);
set_error(work, "failed to initialize native eval helper");
return 0;
}
r.preserve_object(helper);
eval_helper = helper;
r.unprotect(protect_count);
return 1;
}
static const char *plot_helper_source =
"function(exprs, env, options) {\n"
" load_packages <- function() {\n"
" for (pkg in c(\"grDevices\", \"graphics\", \"stats\", \"utils\", \"datasets\")) {\n"
" if (!paste0(\"package:\", pkg) %in% search()) {\n"
" suppressPackageStartupMessages(library(pkg, character.only = TRUE, pos = length(search())))\n"
" }\n"
" }\n"
" }\n"
" is_ggplot <- function(value) {\n"
" inherits(value, \"ggplot\") || inherits(value, \"ggplot2::ggplot\")\n"
" }\n"
" read_pages <- function(tmpdir, max_pages, max_bytes) {\n"
" files <- sort(list.files(tmpdir, pattern = \"^page-[0-9]+[.]png$\", full.names = TRUE))\n"
" if (length(files) > max_pages) stop(\"plot produced too many pages\", call. = FALSE)\n"
" if (length(files) == 0L) return(list())\n"
" info <- file.info(files)\n"
" if (any(is.na(info$size) | info$size <= 0L)) {\n"
" stop(\"plot device produced an empty PNG file\", call. = FALSE)\n"
" }\n"
" if (sum(info$size) > max_bytes) stop(\"plot PNG output exceeds byte limit\", call. = FALSE)\n"
" sig <- as.raw(c(0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a))\n"
" lapply(seq_along(files), function(i) {\n"
" bytes <- readBin(files[[i]], what = \"raw\", n = info$size[[i]])\n"
" if (length(bytes) < 8L || !identical(bytes[1:8], sig)) {\n"
" stop(\"plot device did not produce a valid PNG file\", call. = FALSE)\n"
" }\n"
" bytes\n"
" })\n"
" }\n"
" messages <- character()\n"
" warnings <- character()\n"
" plot_error <- NULL\n"
" close_error <- NULL\n"
" read_error <- NULL\n"
" opts <- NULL\n"
" pages <- list()\n"
" plot_dev <- NULL\n"
" old_devices <- integer()\n"
" old_dev <- 1L\n"
" old_options <- NULL\n"
" devices_snapshot <- FALSE\n"
" device_sentinel <- function(...) {\n"
" stop(\"Rx plot capture device is not available\", call. = FALSE)\n"
" }\n"
" close_plot_device <- function() {\n"
" if (!devices_snapshot) return()\n"
" open_devices <- grDevices::dev.list()\n"
" if (!is.null(open_devices)) {\n"
" opened_devices <- setdiff(unname(as.integer(open_devices)), old_devices)\n"
" for (device in rev(sort(opened_devices))) {\n"
" tryCatch(\n"
" invisible(grDevices::dev.off(which = device)),\n"
" error = function(e) { if (is.null(close_error)) close_error <<- e }\n"
" )\n"
" }\n"
" }\n"
" open_devices <- grDevices::dev.list()\n"
" if (!is.null(plot_dev) && !is.null(open_devices) &&\n"
" plot_dev %in% unname(as.integer(open_devices))) {\n"
" tryCatch(\n"
" invisible(grDevices::dev.off(which = plot_dev)),\n"
" error = function(e) { if (is.null(close_error)) close_error <<- e }\n"
" )\n"
" }\n"
" open_devices <- grDevices::dev.list()\n"
" if (!is.null(open_devices) && old_dev != 1L &&\n"
" old_dev %in% unname(as.integer(open_devices))) {\n"
" try(invisible(grDevices::dev.set(which = old_dev)), silent = TRUE)\n"
" }\n"
" }\n"
" tmpdir <- tempfile(\"rx-plot-\")\n"
" dir.create(tmpdir, mode = \"0700\")\n"
" on.exit(unlink(tmpdir, recursive = TRUE, force = TRUE), add = TRUE)\n"
" stdout <- utils::capture.output({\n"
" invisible(withCallingHandlers(\n"
" tryCatch(\n"
" {\n"
" load_packages()\n"
" opts <- list(\n"
" width = as.integer(options$width),\n"
" height = as.integer(options$height),\n"
" res = as.integer(options$res),\n"
" pointsize = as.integer(options$pointsize),\n"
" max_pages = as.integer(options$max_pages),\n"
" max_bytes = as.numeric(options$max_bytes)\n"
" )\n"
" old_devices <- grDevices::dev.list()\n"
" old_devices <- if (is.null(old_devices)) integer() else unname(as.integer(old_devices))\n"
" old_dev <- unname(as.integer(grDevices::dev.cur()))\n"
" devices_snapshot <- TRUE\n"
" old_options <- base::options(device = device_sentinel)\n"
" file_pattern <- file.path(tmpdir, \"page-%06d.png\")\n"
" png_args <- list(\n"
" filename = file_pattern,\n"
" width = opts$width,\n"
" height = opts$height,\n"
" units = \"px\",\n"
" pointsize = opts$pointsize,\n"
" bg = \"white\",\n"
" res = opts$res\n"
" )\n"
" if (isTRUE(capabilities(\"cairo\"))) png_args$type <- \"cairo\"\n"
" do.call(grDevices::png, png_args)\n"
" plot_dev <- unname(as.integer(grDevices::dev.cur()))\n"
" for (expr in exprs) {\n"
" evaluated <- withVisible(eval(expr, envir = env))\n"
" if (isTRUE(evaluated$visible) && is_ggplot(evaluated$value)) {\n"
" print(evaluated$value)\n"
" }\n"
" }\n"
" if (!identical(base::getOption(\"device\"), device_sentinel)) {\n"
" stop(\"Rx plot capture does not allow changing options(device=...)\", call. = FALSE)\n"
" }\n"
" NULL\n"
" },\n"
" error = function(e) { plot_error <<- e; NULL },\n"
" finally = {\n"
" close_plot_device()\n"
" if (!is.null(old_options)) base::options(old_options)\n"
" }\n"
" ),\n"
" message = function(m) {\n"
" messages <<- c(messages, conditionMessage(m))\n"
" invokeRestart(\"muffleMessage\")\n"
" },\n"
" warning = function(w) {\n"
" warnings <<- c(warnings, conditionMessage(w))\n"
" invokeRestart(\"muffleWarning\")\n"
" }\n"
" ))\n"
" })\n"
" output_stdout <- paste(stdout, collapse = \"\\n\")\n"
" output_messages <- paste(messages, collapse = \"\")\n"
" output_warnings <- paste(warnings, collapse = \"\")\n"
" error_result <- function(e) {\n"
" call <- conditionCall(e)\n"
" call_text <- if (is.null(call)) NULL else paste(deparse(call), collapse = \"\\n\")\n"
" list(FALSE, NULL, conditionMessage(e), class(e), call_text,\n"
" output_stdout, output_messages, output_warnings)\n"
" }\n"
" if (!is.null(plot_error)) return(error_result(plot_error))\n"
" if (!is.null(close_error)) return(error_result(close_error))\n"
" pages <- tryCatch(\n"
" read_pages(tmpdir, opts$max_pages, opts$max_bytes),\n"
" error = function(e) { read_error <<- e; NULL }\n"
" )\n"
" if (!is.null(read_error)) return(error_result(read_error))\n"
" if (length(pages) == 0L) return(error_result(simpleError(\"R code produced no plot\")))\n"
" payload <- list(width = opts$width, height = opts$height, pages = pages)\n"
" list(TRUE, payload, NULL, NULL, NULL, output_stdout, output_messages, output_warnings)\n"
"}";
static int ensure_plot_helper(r_work *work) {
if (plot_helper != NULL) return 1;
int protect_count = 0;
ParseStatus parse_status = PARSE_NULL;
SEXP source = r.protect(r.mk_string(plot_helper_source));
protect_count++;
SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
protect_count++;
if (parse_status != PARSE_OK || r.xlength_fn(exprs) < 1) {
r.unprotect(protect_count);
set_error(work, "failed to parse native plot helper");
return 0;
}
int error_occurred = 0;
SEXP helper = r.try_eval(r.vector_elt(exprs, 0), *r.global_env, &error_occurred);
if (error_occurred) {
r.unprotect(protect_count);
set_error(work, "failed to initialize native plot helper");
return 0;
}
r.preserve_object(helper);
plot_helper = helper;
r.unprotect(protect_count);
return 1;
}
static void eval_body(void *data) {
eval_ctx *ctx = (eval_ctx *)data;
r_work *work = ctx->work;
int protect_count = 0;
ParseStatus parse_status = PARSE_NULL;
SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
protect_count++;
for (unsigned i = 0; i < work->global_count; i++) {
pthread_mutex_lock(&work->global_resources[i]->mutex);
SEXP value = work->global_resources[i]->sexp;
bool release_enqueued = work->global_resources[i]->release_enqueued;
pthread_mutex_unlock(&work->global_resources[i]->mutex);
if (value == NULL || release_enqueued) {
snprintf(ctx->error, sizeof(ctx->error), "native R object resource has already been released");
r.unprotect(protect_count);
return;
}
r.define_var(r.install(work->global_names[i]), value, env);
}
SEXP source = r.protect(r.mk_string(work->source));
protect_count++;
SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
protect_count++;
if (parse_status != PARSE_OK && parse_status != PARSE_NULL) {
if (!set_parse_error_map(work)) {
snprintf(ctx->error, sizeof(ctx->error), "R parse failed");
}
r.unprotect(protect_count);
return;
}
R_xlen_t expr_count = r.xlength_fn(exprs);
SEXP call = r.protect(r.lang3(eval_helper, exprs, env));
protect_count++;
int error_occurred = 0;
SEXP helper_result = r.protect(r.try_eval(call, *r.global_env, &error_occurred));
protect_count++;
if (error_occurred || r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) {
snprintf(ctx->error, sizeof(ctx->error), "R evaluation failed");
r.unprotect(protect_count);
return;
}
SEXP ok_value = r.vector_elt(helper_result, 0);
bool helper_ok =
r.type_of(ok_value) == LGLSXP && r.xlength_fn(ok_value) >= 1 && r.logical_elt(ok_value, 0) == 1;
if (helper_ok) {
if (!append_character_vector(&work->stdout_buffer, r.vector_elt(helper_result, 5)) ||
!append_character_vector(&work->messages_buffer, r.vector_elt(helper_result, 6)) ||
!append_character_vector(&work->warnings_buffer, r.vector_elt(helper_result, 7))) {
snprintf(ctx->error, sizeof(ctx->error), "failed to capture native eval output");
r.unprotect(protect_count);
return;
}
} else {
if (!fill_error_from_helper_result(work, helper_result)) {
snprintf(ctx->error, sizeof(ctx->error), "R evaluation failed");
}
r.unprotect(protect_count);
return;
}
if (expr_count > 0) {
SEXP result = r.vector_elt(helper_result, 1);
work->result_resource = make_preserved_resource(result, RESOURCE_KIND_GENERIC);
if (work->result_resource == NULL) {
snprintf(ctx->error, sizeof(ctx->error), "failed to allocate native eval result resource");
r.unprotect(protect_count);
return;
}
}
SEXP names = r.protect(r.ls_internal(env, TRUE));
protect_count++;
R_xlen_t name_count = r.xlength_fn(names);
if (name_count < 0 || (uint64_t)name_count > UINT_MAX) {
snprintf(ctx->error, sizeof(ctx->error), "native eval produced too many globals");
r.unprotect(protect_count);
return;
}
work->result_global_count = (unsigned)name_count;
if (work->result_global_count > 0) {
work->result_global_names = calloc(work->result_global_count, sizeof(char *));
work->result_global_resources = calloc(work->result_global_count, sizeof(rx_object_resource *));
if (work->result_global_names == NULL || work->result_global_resources == NULL) {
snprintf(ctx->error, sizeof(ctx->error), "failed to allocate native eval globals");
r.unprotect(protect_count);
return;
}
}
for (unsigned i = 0; i < work->result_global_count; i++) {
SEXP name_string = r.string_elt(names, (R_xlen_t)i);
size_t name_len = 0;
if (!copy_r_string(name_string, &work->result_global_names[i], &name_len)) {
snprintf(ctx->error, sizeof(ctx->error), "failed to copy native eval global name");
r.unprotect(protect_count);
return;
}
SEXP value = r.find_var_in_frame(env, r.install(work->result_global_names[i]));
work->result_global_resources[i] = make_preserved_resource(value, RESOURCE_KIND_GENERIC);
if (work->result_global_resources[i] == NULL) {
snprintf(ctx->error, sizeof(ctx->error), "failed to allocate native eval global resource");
r.unprotect(protect_count);
return;
}
}
r.unprotect(protect_count);
ctx->ok = true;
}
static void do_eval(r_work *work) {
work->ok = true;
work->error[0] = '\0';
work->result_resource = NULL;
work->result_global_names = NULL;
work->result_global_resources = NULL;
work->result_global_count = 0;
if (set_terminal_init_failure_if_present(work)) return;
if (!runtime_initialized) {
set_error(work, "embedded R runtime is not initialized");
return;
}
if (!load_r_api(work)) return;
if (!ensure_eval_helper(work)) return;
eval_ctx ctx = {.work = work, .ok = false, .error = ""};
ptr_R_WriteConsole_fn previous_write_console = NULL;
ptr_R_WriteConsoleEx_fn previous_write_console_ex = NULL;
if (r_write_console != NULL) {
previous_write_console = *r_write_console;
current_capture_work = work;
*r_write_console = capture_write_console;
}
if (r_write_console_ex != NULL) {
previous_write_console_ex = *r_write_console_ex;
current_capture_work = work;
*r_write_console_ex = capture_write_console_ex;
}
if (!r.toplevel_exec(eval_body, &ctx)) {
set_error(work, "R non-local error during eval");
if (r_write_console != NULL) {
*r_write_console = previous_write_console;
}
if (r_write_console_ex != NULL) {
*r_write_console_ex = previous_write_console_ex;
}
current_capture_work = NULL;
return;
}
if (r_write_console != NULL) {
*r_write_console = previous_write_console;
}
if (r_write_console_ex != NULL) {
*r_write_console_ex = previous_write_console_ex;
}
current_capture_work = NULL;
if (!ctx.ok) {
if (!work->structured_error) {
set_error(work, ctx.error[0] == '\0' ? "R evaluation failed" : ctx.error);
} else {
work->ok = false;
}
}
}
typedef struct print_ctx {
r_work *work;
SEXP sexp;
bool ok;
char error[1024];
} print_ctx;
static const char *print_eval_source =
".rx_old_options <- list()\n"
"tryCatch({\n"
" if (!is.null(.rx_width)) {\n"
" .rx_old_options$width <- getOption(\"width\")\n"
" options(width = as.integer(.rx_width))\n"
" }\n"
" if (!is.null(.rx_max_print)) {\n"
" .rx_old_options$max.print <- getOption(\"max.print\")\n"
" options(max.print = as.integer(.rx_max_print))\n"
" }\n"
" print(.rx_value)\n"
"}, finally = {\n"
" if (length(.rx_old_options) > 0L) options(.rx_old_options)\n"
"})";
static void print_body(void *data) {
print_ctx *ctx = (print_ctx *)data;
r_work *work = ctx->work;
int protect_count = 0;
ParseStatus parse_status = PARSE_NULL;
SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
protect_count++;
r.define_var(r.install(".rx_value"), ctx->sexp, env);
SEXP width = *r.nil_value;
if (work->has_print_width) {
width = r.protect(r.scalar_integer(work->print_width));
protect_count++;
}
r.define_var(r.install(".rx_width"), width, env);
SEXP max_print = *r.nil_value;
if (work->has_print_max_print) {
max_print = r.protect(r.scalar_integer(work->print_max_print));
protect_count++;
}
r.define_var(r.install(".rx_max_print"), max_print, env);
SEXP source = r.protect(r.mk_string(print_eval_source));
protect_count++;
SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
protect_count++;
if (parse_status != PARSE_OK || r.xlength_fn(exprs) < 1) {
snprintf(ctx->error, sizeof(ctx->error), "R print helper parse failed");
r.unprotect(protect_count);
return;
}
SEXP call = r.protect(r.lang3(eval_helper, exprs, env));
protect_count++;
int error_occurred = 0;
SEXP helper_result = r.protect(r.try_eval(call, *r.global_env, &error_occurred));
protect_count++;
if (error_occurred || r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) {
snprintf(ctx->error, sizeof(ctx->error), "R print failed");
r.unprotect(protect_count);
return;
}
SEXP ok_value = r.vector_elt(helper_result, 0);
bool helper_ok =
r.type_of(ok_value) == LGLSXP && r.xlength_fn(ok_value) >= 1 && r.logical_elt(ok_value, 0) == 1;
if (helper_ok) {
if (!append_character_vector_join(&work->stdout_buffer, r.vector_elt(helper_result, 5), "\n") ||
!append_character_vector_join(&work->messages_buffer, r.vector_elt(helper_result, 6), "") ||
!append_character_vector_join(&work->warnings_buffer, r.vector_elt(helper_result, 7), "")) {
snprintf(ctx->error, sizeof(ctx->error), "failed to capture native print output");
r.unprotect(protect_count);
return;
}
} else {
if (!fill_print_error_from_helper_result(work, helper_result)) {
snprintf(ctx->error, sizeof(ctx->error), "R print failed");
}
r.unprotect(protect_count);
return;
}
r.unprotect(protect_count);
ctx->ok = true;
}
static void do_print(r_work *work) {
work->ok = true;
work->error[0] = '\0';
work->structured_error = false;
work->error_message[0] = '\0';
work->error_call[0] = '\0';
work->error_classes = NULL;
work->error_class_count = 0;
if (set_terminal_init_failure_if_present(work)) return;
if (!runtime_initialized) {
set_error(work, "embedded R runtime is not initialized");
return;
}
if (!load_r_api(work)) return;
if (!ensure_eval_helper(work)) return;
pthread_mutex_lock(&work->resource->mutex);
SEXP sexp = work->resource->sexp;
bool release_enqueued = work->resource->release_enqueued;
pthread_mutex_unlock(&work->resource->mutex);
if (sexp == NULL || release_enqueued) {
set_error(work, "native R object resource has already been released");
return;
}
print_ctx ctx = {.work = work, .sexp = sexp, .ok = false, .error = ""};
if (!r.toplevel_exec(print_body, &ctx)) {
set_error(work, "R non-local error during print");
return;
}
if (!ctx.ok) {
if (!work->structured_error) {
set_error(work, ctx.error[0] == '\0' ? "R print failed" : ctx.error);
} else {
work->ok = false;
}
}
}
typedef struct plot_ctx {
r_work *work;
bool ok;
char error[1024];
} plot_ctx;
static void set_plot_option_name(SEXP names, R_xlen_t index, const char *name) {
SEXP chars = r.protect(r.mk_char_len_ce(name, (int)strlen(name), CE_UTF8));
r.set_string_elt(names, index, chars);
r.unprotect(1);
}
static void set_plot_option_integer(SEXP options, R_xlen_t index, int value) {
SEXP scalar = r.protect(r.scalar_integer(value));
r.set_vector_elt(options, index, scalar);
r.unprotect(1);
}
static SEXP make_plot_options(r_work *work, int *protect_count) {
SEXP options = r.protect(r.alloc_vector(VECSXP, 6));
(*protect_count)++;
SEXP names = r.protect(r.alloc_vector(STRSXP, 6));
(*protect_count)++;
set_plot_option_name(names, 0, "width");
set_plot_option_name(names, 1, "height");
set_plot_option_name(names, 2, "res");
set_plot_option_name(names, 3, "pointsize");
set_plot_option_name(names, 4, "max_pages");
set_plot_option_name(names, 5, "max_bytes");
set_plot_option_integer(options, 0, work->plot_width);
set_plot_option_integer(options, 1, work->plot_height);
set_plot_option_integer(options, 2, work->plot_res);
set_plot_option_integer(options, 3, work->plot_pointsize);
set_plot_option_integer(options, 4, work->plot_max_pages);
set_plot_option_integer(options, 5, work->plot_max_bytes);
r.set_attrib(options, *r.names_symbol, names);
return options;
}
static int copy_plot_integer(SEXP value, int *out) {
if (r.type_of(value) == INTSXP && r.xlength_fn(value) >= 1) {
*out = r.integer_elt(value, 0);
return 1;
}
if (r.type_of(value) == REALSXP && r.xlength_fn(value) >= 1) {
double numeric = r.real_elt(value, 0);
if (numeric < (double)INT_MIN || numeric > (double)INT_MAX) return 0;
*out = (int)numeric;
return 1;
}
return 0;
}
static int is_png_bytes(const char *bytes, size_t len) {
static const unsigned char sig[] = {0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a};
if (bytes == NULL || len < sizeof(sig)) return 0;
return memcmp((const unsigned char *)bytes, sig, sizeof(sig)) == 0;
}
static int append_plot_output(r_work *work, SEXP helper_result) {
if (r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) return 0;
if (!append_character_vector_join(&work->stdout_buffer, r.vector_elt(helper_result, 5), "\n")) return 0;
if (!append_character_vector_join(&work->messages_buffer, r.vector_elt(helper_result, 6), "")) return 0;
if (!append_character_vector_join(&work->warnings_buffer, r.vector_elt(helper_result, 7), "")) return 0;
return 1;
}
static int copy_plot_success(r_work *work, SEXP helper_result, char *error, size_t error_size) {
if (r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) {
snprintf(error, error_size, "R plot helper returned an invalid result");
return 0;
}
SEXP payload = r.vector_elt(helper_result, 1);
if (r.type_of(payload) != VECSXP || r.xlength_fn(payload) < 3) {
snprintf(error, error_size, "R plot helper returned an invalid plot payload");
return 0;
}
int width = 0;
int height = 0;
if (!copy_plot_integer(r.vector_elt(payload, 0), &width) ||
!copy_plot_integer(r.vector_elt(payload, 1), &height)) {
snprintf(error, error_size, "R plot helper returned invalid plot dimensions");
return 0;
}
SEXP pages = r.vector_elt(payload, 2);
if (r.type_of(pages) != VECSXP) {
snprintf(error, error_size, "R plot helper returned invalid plot pages");
return 0;
}
R_xlen_t page_count_len = r.xlength_fn(pages);
if (page_count_len <= 0) {
snprintf(error, error_size, "R code produced no plot");
return 0;
}
if ((uint64_t)page_count_len > UINT_MAX || page_count_len > 1000) {
snprintf(error, error_size, "plot produced too many pages");
return 0;
}
if ((int)page_count_len > work->plot_max_pages) {
snprintf(error, error_size, "plot produced too many pages");
return 0;
}
free_plot_pages(work);
work->plot_width = width;
work->plot_height = height;
work->plot_page_count = (unsigned)page_count_len;
work->plot_pages = calloc(work->plot_page_count, sizeof(char *));
work->plot_page_lens = calloc(work->plot_page_count, sizeof(size_t));
if (work->plot_pages == NULL || work->plot_page_lens == NULL) {
snprintf(error, error_size, "failed to allocate native plot pages");
free_plot_pages(work);
return 0;
}
size_t total_bytes = 0;
for (unsigned i = 0; i < work->plot_page_count; i++) {
SEXP page = r.vector_elt(pages, (R_xlen_t)i);
if (r.type_of(page) != RAWSXP) {
snprintf(error, error_size, "R plot helper returned a non-raw page");
free_plot_pages(work);
return 0;
}
R_xlen_t page_len = r.xlength_fn(page);
if (page_len <= 0) {
snprintf(error, error_size, "plot device produced an empty PNG file");
free_plot_pages(work);
return 0;
}
if ((uint64_t)page_len > SIZE_MAX) {
snprintf(error, error_size, "plot PNG output exceeds byte limit");
free_plot_pages(work);
return 0;
}
size_t len = (size_t)page_len;
if (total_bytes > SIZE_MAX - len || total_bytes + len > (size_t)work->plot_max_bytes) {
snprintf(error, error_size, "plot PNG output exceeds byte limit");
free_plot_pages(work);
return 0;
}
total_bytes += len;
char *copy = malloc(len);
if (copy == NULL) {
snprintf(error, error_size, "failed to allocate native plot page");
free_plot_pages(work);
return 0;
}
memcpy(copy, r.raw(page), len);
if (!is_png_bytes(copy, len)) {
free(copy);
snprintf(error, error_size, "plot device did not produce a valid PNG file");
free_plot_pages(work);
return 0;
}
work->plot_pages[i] = copy;
work->plot_page_lens[i] = len;
}
return 1;
}
static void plot_body(void *data) {
plot_ctx *ctx = (plot_ctx *)data;
r_work *work = ctx->work;
int protect_count = 0;
ParseStatus parse_status = PARSE_NULL;
SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
protect_count++;
for (unsigned i = 0; i < work->global_count; i++) {
pthread_mutex_lock(&work->global_resources[i]->mutex);
SEXP value = work->global_resources[i]->sexp;
bool release_enqueued = work->global_resources[i]->release_enqueued;
pthread_mutex_unlock(&work->global_resources[i]->mutex);
if (value == NULL || release_enqueued) {
snprintf(ctx->error, sizeof(ctx->error), "native R object resource has already been released");
r.unprotect(protect_count);
return;
}
r.define_var(r.install(work->global_names[i]), value, env);
}
SEXP source = r.protect(r.mk_string(work->source));
protect_count++;
SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
protect_count++;
if (parse_status != PARSE_OK && parse_status != PARSE_NULL) {
if (!set_parse_error_map(work)) {
snprintf(ctx->error, sizeof(ctx->error), "R parse failed");
}
r.unprotect(protect_count);
return;
}
SEXP options = make_plot_options(work, &protect_count);
SEXP call = r.protect(r.lang4(plot_helper, exprs, env, options));
protect_count++;
int error_occurred = 0;
SEXP helper_result = r.protect(r.try_eval(call, *r.global_env, &error_occurred));
protect_count++;
if (error_occurred || r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) {
snprintf(ctx->error, sizeof(ctx->error), "R plot failed");
r.unprotect(protect_count);
return;
}
SEXP ok_value = r.vector_elt(helper_result, 0);
bool helper_ok =
r.type_of(ok_value) == LGLSXP && r.xlength_fn(ok_value) >= 1 && r.logical_elt(ok_value, 0) == 1;
if (helper_ok) {
if (!append_plot_output(work, helper_result)) {
snprintf(ctx->error, sizeof(ctx->error), "failed to capture native plot output");
r.unprotect(protect_count);
return;
}
if (!copy_plot_success(work, helper_result, ctx->error, sizeof(ctx->error))) {
r.unprotect(protect_count);
return;
}
} else {
if (!fill_plot_error_from_helper_result(work, helper_result)) {
snprintf(ctx->error, sizeof(ctx->error), "R plot failed");
}
r.unprotect(protect_count);
return;
}
r.unprotect(protect_count);
ctx->ok = true;
}
static void do_plot(r_work *work) {
work->ok = true;
work->error[0] = '\0';
work->structured_error = false;
work->error_message[0] = '\0';
work->error_call[0] = '\0';
work->error_classes = NULL;
work->error_class_count = 0;
work->plot_pages = NULL;
work->plot_page_lens = NULL;
work->plot_page_count = 0;
if (set_terminal_init_failure_if_present(work)) return;
if (!runtime_initialized) {
set_error(work, "embedded R runtime is not initialized");
return;
}
if (!load_r_api(work)) return;
if (!ensure_plot_helper(work)) return;
plot_ctx ctx = {.work = work, .ok = false, .error = ""};
ptr_R_WriteConsole_fn previous_write_console = NULL;
ptr_R_WriteConsoleEx_fn previous_write_console_ex = NULL;
if (r_write_console != NULL) {
previous_write_console = *r_write_console;
current_capture_work = work;
*r_write_console = capture_write_console;
}
if (r_write_console_ex != NULL) {
previous_write_console_ex = *r_write_console_ex;
current_capture_work = work;
*r_write_console_ex = capture_write_console_ex;
}
if (!r.toplevel_exec(plot_body, &ctx)) {
set_error(work, "R non-local error during plot");
if (r_write_console != NULL) {
*r_write_console = previous_write_console;
}
if (r_write_console_ex != NULL) {
*r_write_console_ex = previous_write_console_ex;
}
current_capture_work = NULL;
return;
}
if (r_write_console != NULL) {
*r_write_console = previous_write_console;
}
if (r_write_console_ex != NULL) {
*r_write_console_ex = previous_write_console_ex;
}
current_capture_work = NULL;
if (!ctx.ok) {
free_plot_pages(work);
if (!work->structured_error) {
set_error(work, ctx.error[0] == '\0' ? "R plot failed" : ctx.error);
} else {
work->ok = false;
}
}
}
typedef struct encode_resource_ctx {
r_work *work;
bool ok;
} encode_resource_ctx;
static void encode_resource_body(void *data) {
encode_resource_ctx *ctx = (encode_resource_ctx *)data;
r_work *work = ctx->work;
encode_value *encoded = &work->encode;
int protect_count = 0;
SEXP value = *r.nil_value;
switch (encoded->kind) {
case ENCODE_KIND_NULL:
value = *r.nil_value;
break;
case ENCODE_KIND_LOGICAL:
value = r.protect(r.scalar_logical(encoded->logical_scalar ? 1 : 0));
protect_count++;
break;
case ENCODE_KIND_INTEGER:
value = r.protect(r.scalar_integer(encoded->integer_scalar));
protect_count++;
break;
case ENCODE_KIND_DOUBLE:
value = r.protect(r.scalar_real(encoded->double_scalar));
protect_count++;
break;
case ENCODE_KIND_CHARACTER: {
SEXP chars = r.protect(r.mk_char_len_ce(
encoded->string_scalar,
(int)encoded->string_scalar_len,
CE_UTF8));
protect_count++;
value = r.protect(r.scalar_string(chars));
protect_count++;
break;
}
case ENCODE_KIND_NA:
switch (encoded->na_type) {
case DECODE_ATOMIC_LOGICAL:
value = r.protect(r.scalar_logical(*r.na_int));
protect_count++;
break;
case DECODE_ATOMIC_INTEGER:
value = r.protect(r.scalar_integer(*r.na_int));
protect_count++;
break;
case DECODE_ATOMIC_DOUBLE:
value = r.protect(r.scalar_real(*r.na_real));
protect_count++;
break;
case DECODE_ATOMIC_CHARACTER:
value = r.protect(r.scalar_string(*r.na_string));
protect_count++;
break;
}
break;
case ENCODE_KIND_LOGICAL_VECTOR:
value = r.protect(r.alloc_vector(LGLSXP, encoded->length));
protect_count++;
for (R_xlen_t i = 0; i < encoded->length; i++) {
r.set_logical_elt(value, i, encoded->logical_values[i] ? 1 : 0);
}
break;
case ENCODE_KIND_INTEGER_VECTOR:
value = r.protect(r.alloc_vector(INTSXP, encoded->length));
protect_count++;
for (R_xlen_t i = 0; i < encoded->length; i++) {
r.set_integer_elt(value, i, encoded->integer_values[i]);
}
break;
case ENCODE_KIND_DOUBLE_VECTOR:
value = r.protect(r.alloc_vector(REALSXP, encoded->length));
protect_count++;
for (R_xlen_t i = 0; i < encoded->length; i++) {
r.set_real_elt(value, i, encoded->double_values[i]);
}
break;
case ENCODE_KIND_CHARACTER_VECTOR:
value = r.protect(r.alloc_vector(STRSXP, encoded->length));
protect_count++;
for (R_xlen_t i = 0; i < encoded->length; i++) {
SEXP chars = r.protect(r.mk_char_len_ce(
encoded->string_values[i],
(int)encoded->string_value_lens[i],
CE_UTF8));
r.set_string_elt(value, i, chars);
r.unprotect(1);
}
break;
case ENCODE_KIND_NAMED_LIST: {
value = r.protect(r.alloc_vector(VECSXP, encoded->length));
protect_count++;
SEXP names = r.protect(r.alloc_vector(STRSXP, encoded->length));
protect_count++;
for (R_xlen_t i = 0; i < encoded->length; i++) {
pthread_mutex_lock(&encoded->entry_resources[i]->mutex);
SEXP entry_value = encoded->entry_resources[i]->sexp;
bool release_enqueued = encoded->entry_resources[i]->release_enqueued;
pthread_mutex_unlock(&encoded->entry_resources[i]->mutex);
if (entry_value == NULL || release_enqueued) {
set_error(work, "native R object resource has already been released");
if (protect_count > 0) r.unprotect(protect_count);
return;
}
r.set_vector_elt(value, i, entry_value);
SEXP chars = r.protect(r.mk_char_len_ce(
encoded->entry_names[i],
(int)strlen(encoded->entry_names[i]),
CE_UTF8));
r.set_string_elt(names, i, chars);
r.unprotect(1);
}
r.set_attrib(value, *r.names_symbol, names);
break;
}
}
r.preserve_object(value);
pthread_mutex_lock(&work->resource->mutex);
work->resource->sexp = value;
work->resource->kind = encode_kind_to_resource_kind(encoded->kind);
pthread_mutex_unlock(&work->resource->mutex);
if (protect_count > 0) r.unprotect(protect_count);
ctx->ok = true;
}
static void do_encode_resource(r_work *work) {
work->ok = true;
work->error[0] = '\0';
if (set_terminal_init_failure_if_present(work)) return;
if (!runtime_initialized) {
set_error(work, "embedded R runtime is not initialized");
return;
}
if (!load_r_api(work)) return;
encode_resource_ctx ctx = {.work = work, .ok = false};
if (!r.toplevel_exec(encode_resource_body, &ctx)) {
set_error(work, "R non-local error during encode");
return;
}
if (!ctx.ok) {
set_error(work, "R encode failed");
}
}
typedef struct decode_resource_ctx {
r_work *work;
SEXP sexp;
resource_kind kind;
bool ok;
char error[1024];
} decode_resource_ctx;
static void decode_resource_body(void *data) {
decode_resource_ctx *ctx = (decode_resource_ctx *)data;
r_work *work = ctx->work;
copy_resource_result(work, ctx->sexp, ctx->kind);
ctx->ok = work->ok;
if (!work->ok) {
snprintf(ctx->error, sizeof(ctx->error), "%s", work->error);
}
}
static void do_decode_resource(r_work *work) {
work->ok = true;
work->error[0] = '\0';
work->result_kind = RESULT_NONE;
work->string_result = NULL;
work->unsupported_type = 0;
if (set_terminal_init_failure_if_present(work)) return;
if (!runtime_initialized) {
set_error(work, "embedded R runtime is not initialized");
return;
}
if (!load_r_api(work)) return;
pthread_mutex_lock(&work->resource->mutex);
SEXP sexp = work->resource->sexp;
resource_kind kind = work->resource->kind;
bool release_enqueued = work->resource->release_enqueued;
pthread_mutex_unlock(&work->resource->mutex);
if (sexp == NULL || release_enqueued) {
set_error(work, "native R object resource has already been released");
return;
}
decode_resource_ctx ctx = {.work = work, .sexp = sexp, .kind = kind, .ok = false, .error = ""};
if (!r.toplevel_exec(decode_resource_body, &ctx)) {
set_error(work, "R non-local error during decode");
return;
}
if (!ctx.ok) {
set_error(work, ctx.error[0] == '\0' ? "R decode failed" : ctx.error);
}
}
static int eval_source_in_env(SEXP env, const char *source_code, SEXP *out, char *error, size_t error_size) {
int protect_count = 0;
ParseStatus parse_status = PARSE_NULL;
NDBG("eval_source begin: %.96s", source_code);
SEXP source = r.protect(r.mk_string(source_code));
protect_count++;
SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
protect_count++;
if (parse_status != PARSE_OK && parse_status != PARSE_NULL) {
snprintf(error, error_size, "R parse failed");
r.unprotect(protect_count);
return 0;
}
SEXP result = *r.nil_value;
R_xlen_t expr_count = r.xlength_fn(exprs);
for (R_xlen_t i = 0; i < expr_count; i++) {
int error_occurred = 0;
NDBG("eval_source before try_eval expr=%lld error=%d", (long long)i, error_occurred);
result = r.try_eval(r.vector_elt(exprs, i), env, &error_occurred);
NDBG("eval_source after try_eval expr=%lld error=%d", (long long)i, error_occurred);
if (error_occurred) {
snprintf(error, error_size, "R evaluation failed");
r.unprotect(protect_count);
return 0;
}
}
*out = result;
r.unprotect(protect_count);
return 1;
}
static int eval_logical_in_env(SEXP env, const char *source_code, bool *out, char *error, size_t error_size) {
SEXP value = *r.nil_value;
if (!eval_source_in_env(env, source_code, &value, error, error_size)) {
return 0;
}
if (r.type_of(value) != LGLSXP || r.xlength_fn(value) < 1) {
snprintf(error, error_size, "R expression did not return a logical scalar");
return 0;
}
*out = r.logical_elt(value, 0) == 1;
return 1;
}
static int require_arrow_in_env(SEXP env, char *error, size_t error_size) {
bool available = false;
NDBG("require_arrow requireNamespace begin");
int eval_ok = eval_logical_in_env(env, "requireNamespace(\"arrow\", quietly = TRUE)", &available, error, error_size);
NDBG("require_arrow requireNamespace end ok=%d available=%d", eval_ok, available ? 1 : 0);
if (!eval_ok) {
return 0;
}
if (!available) {
snprintf(error, error_size, "missing_r_package: R 'arrow' package is not installed");
return 0;
}
return 1;
}
typedef struct encode_dataframe_ctx {
r_work *work;
bool ok;
char error[1024];
} encode_dataframe_ctx;
static const char *read_ipc_dataframe_source =
"value <- arrow::read_ipc_stream(.rx_ipc_raw, as_data_frame = TRUE)\n"
"if (!inherits(value, \"data.frame\")) stop(\"Arrow IPC did not decode to a data.frame\")\n"
"value";
static void encode_dataframe_body(void *data) {
encode_dataframe_ctx *ctx = (encode_dataframe_ctx *)data;
r_work *work = ctx->work;
int protect_count = 0;
SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
protect_count++;
SEXP raw = r.protect(r.alloc_vector(RAWSXP, (R_xlen_t)work->ipc_len));
protect_count++;
if (work->ipc_len > 0) {
memcpy(r.raw(raw), work->ipc_bytes, work->ipc_len);
}
r.define_var(r.install(".rx_ipc_raw"), raw, env);
NDBG("encode_dataframe require_arrow begin");
int arrow_ok = require_arrow_in_env(env, ctx->error, sizeof(ctx->error));
NDBG("encode_dataframe require_arrow end");
if (!arrow_ok) {
r.unprotect(protect_count);
return;
}
SEXP value = *r.nil_value;
NDBG("encode_dataframe read_ipc_stream begin");
int read_ok = eval_source_in_env(env, read_ipc_dataframe_source, &value, ctx->error, sizeof(ctx->error));
NDBG("encode_dataframe read_ipc_stream end");
if (!read_ok) {
r.unprotect(protect_count);
return;
}
NDBG("encode_dataframe preserve begin");
work->result_resource = make_preserved_resource(value, RESOURCE_KIND_DATAFRAME);
NDBG("encode_dataframe preserve end");
if (work->result_resource == NULL) {
snprintf(ctx->error, sizeof(ctx->error), "failed to allocate native dataframe resource");
r.unprotect(protect_count);
return;
}
r.unprotect(protect_count);
ctx->ok = true;
}
static void do_encode_dataframe(r_work *work) {
work->ok = true;
work->error[0] = '\0';
work->result_resource = NULL;
if (set_terminal_init_failure_if_present(work)) return;
if (!runtime_initialized) {
set_error(work, "embedded R runtime is not initialized");
return;
}
if (!load_r_api(work)) return;
encode_dataframe_ctx ctx = {.work = work, .ok = false, .error = ""};
if (!r.toplevel_exec(encode_dataframe_body, &ctx)) {
set_error(work, "R non-local error during encode_dataframe");
return;
}
if (!ctx.ok) {
set_error(work, ctx.error[0] == '\0' ? "R encode_dataframe failed" : ctx.error);
}
}
typedef struct decode_data_frame_ctx {
r_work *work;
SEXP sexp;
resource_kind kind;
bool ok;
char error[1024];
} decode_data_frame_ctx;
typedef struct encode_data_frame_ctx {
r_work *work;
bool ok;
char error[1024];
} encode_data_frame_ctx;
static const char *default_row_names_source =
".row_names_info(.rx_df, 1L) <= 0L";
static int eval_unsigned_in_env(SEXP env, const char *source_code, unsigned *out, char *error, size_t error_size) {
SEXP value = *r.nil_value;
if (!eval_source_in_env(env, source_code, &value, error, error_size)) return 0;
if (r.xlength_fn(value) < 1) {
snprintf(error, error_size, "R expression did not return a scalar integer");
return 0;
}
int integer = 0;
if (r.type_of(value) == INTSXP) {
integer = r.integer_elt(value, 0);
} else if (r.type_of(value) == REALSXP) {
double numeric = r.real_elt(value, 0);
if (!isfinite(numeric) || numeric < 0 || numeric > (double)INT_MAX || floor(numeric) != numeric) {
snprintf(error, error_size, "R expression returned an invalid integer");
return 0;
}
integer = (int)numeric;
} else {
snprintf(error, error_size, "R expression did not return an integer");
return 0;
}
if (integer < 0) {
snprintf(error, error_size, "R expression returned a negative integer");
return 0;
}
*out = (unsigned)integer;
return 1;
}
static int copy_dataframe_names(SEXP names, unsigned count, char ***out, char *error, size_t error_size) {
if (count == 0) {
*out = NULL;
return 1;
}
if (names == *r.nil_value || r.type_of(names) != STRSXP || r.xlength_fn(names) != (R_xlen_t)count) {
snprintf(error, error_size, "unsupported_dataframe_names: non_string");
return 0;
}
char **items = calloc(count, sizeof(char *));
if (items == NULL) {
snprintf(error, error_size, "failed to allocate dataframe names");
return 0;
}
for (unsigned i = 0; i < count; i++) {
SEXP name = r.string_elt(names, (R_xlen_t)i);
if (name == *r.na_string) {
snprintf(error, error_size, "unsupported_dataframe_names: non_string");
free_string_array(items, count);
return 0;
}
size_t len = 0;
if (!copy_r_string(name, &items[i], &len)) {
snprintf(error, error_size, "failed to copy dataframe names");
free_string_array(items, count);
return 0;
}
if (len == 0) {
snprintf(error, error_size, "unsupported_dataframe_names: empty");
free_string_array(items, count);
return 0;
}
if (dataframe_name_duplicate(items, i, items[i])) {
snprintf(error, error_size, "unsupported_dataframe_names: duplicate");
free_string_array(items, count);
return 0;
}
}
*out = items;
return 1;
}
static int dataframe_column_type_from_r(SEXP column, dataframe_column_type *type, char *reason, size_t reason_size) {
SEXP dim = r.get_attrib(column, *r.dim_symbol);
if (dim != *r.nil_value && r.xlength_fn(dim) > 0) {
snprintf(reason, reason_size, "matrix");
return 0;
}
SEXP class_attr = r.get_attrib(column, *r.class_symbol);
if (class_attr != *r.nil_value && r.xlength_fn(class_attr) > 0) {
if (r.inherits(column, "factor")) {
snprintf(reason, reason_size, "factor");
return 0;
}
if (r.inherits(column, "Date")) {
snprintf(reason, reason_size, "date");
return 0;
}
if (r.inherits(column, "POSIXt")) {
snprintf(reason, reason_size, "posix");
return 0;
}
}
if (r.type_of(column) == VECSXP) {
snprintf(reason, reason_size, "list");
return 0;
}
if (class_attr != *r.nil_value && r.xlength_fn(class_attr) > 0) {
SEXP class_name = r.string_elt(class_attr, 0);
if (class_name != *r.na_string) {
char *class_copy = NULL;
size_t len = 0;
if (copy_r_string(class_name, &class_copy, &len)) {
snprintf(reason, reason_size, "class:%s", class_copy);
free(class_copy);
return 0;
}
}
snprintf(reason, reason_size, "unsupported_type");
return 0;
}
switch (r.type_of(column)) {
case LGLSXP:
*type = DATAFRAME_COLUMN_LOGICAL;
return 1;
case INTSXP:
*type = DATAFRAME_COLUMN_INTEGER;
return 1;
case REALSXP:
*type = DATAFRAME_COLUMN_DOUBLE;
return 1;
case STRSXP:
*type = DATAFRAME_COLUMN_CHARACTER;
return 1;
case CPLXSXP:
snprintf(reason, reason_size, "complex");
return 0;
case RAWSXP:
snprintf(reason, reason_size, "raw");
return 0;
default:
snprintf(reason, reason_size, "unsupported_type");
return 0;
}
}
static bool same_double_bits(double a, double b) {
uint64_t a_bits = 0;
uint64_t b_bits = 0;
memcpy(&a_bits, &a, sizeof(a_bits));
memcpy(&b_bits, &b, sizeof(b_bits));
return a_bits == b_bits;
}
static int copy_dataframe_column_values(SEXP column, dataframe_column *out, char *error, size_t error_size) {
R_xlen_t length = r.xlength_fn(column);
if (length < 0 || (uint64_t)length > UINT_MAX) {
snprintf(error, error_size, "invalid_dataframe: column_length_mismatch");
return 0;
}
out->value_count = (unsigned)length;
if (out->value_count == 0) return 1;
out->values = calloc(out->value_count, sizeof(dataframe_cell));
if (out->values == NULL) {
snprintf(error, error_size, "failed to allocate dataframe column values");
return 0;
}
for (unsigned i = 0; i < out->value_count; i++) {
dataframe_cell *cell = &out->values[i];
switch (out->type) {
case DATAFRAME_COLUMN_LOGICAL: {
int value = r.logical_elt(column, (R_xlen_t)i);
if (value == *r.na_int) {
cell->is_na = true;
} else {
cell->logical_value = value == 1;
}
break;
}
case DATAFRAME_COLUMN_INTEGER: {
int value = r.integer_elt(column, (R_xlen_t)i);
if (value == *r.na_int) {
cell->is_na = true;
} else {
cell->integer_value = value;
}
break;
}
case DATAFRAME_COLUMN_DOUBLE: {
double value = r.real_elt(column, (R_xlen_t)i);
if (isnan(value) && same_double_bits(value, *r.na_real)) {
cell->is_na = true;
} else if (!isfinite(value)) {
snprintf(error, error_size, "unsupported_dataframe_column: %s: non_finite_double", out->name);
return 0;
} else {
cell->double_value = value;
}
break;
}
case DATAFRAME_COLUMN_CHARACTER: {
SEXP string = r.string_elt(column, (R_xlen_t)i);
if (string == *r.na_string) {
cell->is_na = true;
} else if (!copy_r_string(string, &cell->string_value, &cell->string_value_len)) {
snprintf(error, error_size, "failed to copy dataframe character column");
return 0;
}
break;
}
}
}
return 1;
}
static void decode_data_frame_body(void *data) {
decode_data_frame_ctx *ctx = (decode_data_frame_ctx *)data;
r_work *work = ctx->work;
int protect_count = 0;
SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
protect_count++;
r.define_var(r.install(".rx_df"), ctx->sexp, env);
if (ctx->kind != RESOURCE_KIND_DATAFRAME && !r.inherits(ctx->sexp, "data.frame")) {
snprintf(ctx->error, sizeof(ctx->error), "not_dataframe: native object is not a data.frame");
r.unprotect(protect_count);
return;
}
R_xlen_t column_count_len = r.xlength_fn(ctx->sexp);
if (column_count_len < 0 || (uint64_t)column_count_len > UINT_MAX) {
snprintf(ctx->error, sizeof(ctx->error), "invalid_dataframe: malformed_wire");
r.unprotect(protect_count);
return;
}
unsigned column_count = (unsigned)column_count_len;
unsigned n_rows = 0;
if (!eval_unsigned_in_env(env, "nrow(.rx_df)", &n_rows, ctx->error, sizeof(ctx->error))) {
r.unprotect(protect_count);
return;
}
if (work->has_max_rows && n_rows > work->max_rows) {
snprintf(ctx->error, sizeof(ctx->error), "dataframe_too_large: %u: %u", n_rows, work->max_rows);
r.unprotect(protect_count);
return;
}
if (column_count == 0 && n_rows > 0) {
snprintf(ctx->error, sizeof(ctx->error), "unsupported_dataframe_shape: zero_column_nonzero_row");
r.unprotect(protect_count);
return;
}
bool default_row_names = false;
if (!eval_logical_in_env(env, default_row_names_source, &default_row_names, ctx->error, sizeof(ctx->error))) {
r.unprotect(protect_count);
return;
}
if (!default_row_names) {
snprintf(ctx->error, sizeof(ctx->error), "unsupported_dataframe_row_names: custom");
r.unprotect(protect_count);
return;
}
dataframe_wire *wire = calloc(1, sizeof(dataframe_wire));
if (wire == NULL) {
snprintf(ctx->error, sizeof(ctx->error), "failed to allocate dataframe wire");
r.unprotect(protect_count);
return;
}
wire->n_rows = n_rows;
wire->name_count = column_count;
wire->column_count = column_count;
SEXP names_attr = r.get_attrib(ctx->sexp, *r.names_symbol);
if (!copy_dataframe_names(names_attr, column_count, &wire->names, ctx->error, sizeof(ctx->error))) {
free_dataframe_wire(wire);
r.unprotect(protect_count);
return;
}
if (column_count > 0) {
wire->columns = calloc(column_count, sizeof(dataframe_column));
if (wire->columns == NULL) {
snprintf(ctx->error, sizeof(ctx->error), "failed to allocate dataframe columns");
free_dataframe_wire(wire);
r.unprotect(protect_count);
return;
}
}
for (unsigned i = 0; i < column_count; i++) {
SEXP column_value = r.vector_elt(ctx->sexp, (R_xlen_t)i);
dataframe_column *column = &wire->columns[i];
column->name = strdup(wire->names[i]);
if (column->name == NULL) {
snprintf(ctx->error, sizeof(ctx->error), "failed to allocate dataframe column name");
free_dataframe_wire(wire);
r.unprotect(protect_count);
return;
}
char reason[128] = "";
if (!dataframe_column_type_from_r(column_value, &column->type, reason, sizeof(reason))) {
snprintf(ctx->error, sizeof(ctx->error), "unsupported_dataframe_column: %s: %s", column->name, reason);
free_dataframe_wire(wire);
r.unprotect(protect_count);
return;
}
if (!copy_dataframe_column_values(column_value, column, ctx->error, sizeof(ctx->error))) {
free_dataframe_wire(wire);
r.unprotect(protect_count);
return;
}
if (column->value_count != n_rows) {
snprintf(ctx->error, sizeof(ctx->error), "invalid_dataframe: column_length_mismatch");
free_dataframe_wire(wire);
r.unprotect(protect_count);
return;
}
}
work->output_dataframe = wire;
r.unprotect(protect_count);
ctx->ok = true;
}
static void do_decode_data_frame(r_work *work) {
work->ok = true;
work->error[0] = '\0';
work->output_dataframe = NULL;
if (set_terminal_init_failure_if_present(work)) return;
if (!runtime_initialized) {
set_error(work, "embedded R runtime is not initialized");
return;
}
if (!load_r_api(work)) return;
pthread_mutex_lock(&work->resource->mutex);
SEXP sexp = work->resource->sexp;
resource_kind kind = work->resource->kind;
bool release_enqueued = work->resource->release_enqueued;
pthread_mutex_unlock(&work->resource->mutex);
if (sexp == NULL || release_enqueued) {
set_error(work, "native R object resource has already been released");
return;
}
decode_data_frame_ctx ctx = {.work = work, .sexp = sexp, .kind = kind, .ok = false, .error = ""};
if (!r.toplevel_exec(decode_data_frame_body, &ctx)) {
set_error(work, "R non-local error during decode_data_frame");
return;
}
if (!ctx.ok) {
set_error(work, ctx.error[0] == '\0' ? "R decode_data_frame failed" : ctx.error);
}
}
static SEXP make_dataframe_row_names(unsigned n_rows, int *protect_count) {
if (n_rows == 0) {
SEXP row_names = r.protect(r.alloc_vector(INTSXP, 0));
(*protect_count)++;
return row_names;
}
SEXP row_names = r.protect(r.alloc_vector(INTSXP, 2));
(*protect_count)++;
r.set_integer_elt(row_names, 0, *r.na_int);
r.set_integer_elt(row_names, 1, -(int)n_rows);
return row_names;
}
static SEXP make_dataframe_column_vector(const dataframe_column *column, int *protect_count, char *error, size_t error_size) {
SEXP vector = *r.nil_value;
switch (column->type) {
case DATAFRAME_COLUMN_LOGICAL:
vector = r.protect(r.alloc_vector(LGLSXP, column->value_count));
(*protect_count)++;
for (unsigned i = 0; i < column->value_count; i++) {
r.set_logical_elt(vector, (R_xlen_t)i, column->values[i].is_na ? *r.na_int : (column->values[i].logical_value ? 1 : 0));
}
return vector;
case DATAFRAME_COLUMN_INTEGER:
vector = r.protect(r.alloc_vector(INTSXP, column->value_count));
(*protect_count)++;
for (unsigned i = 0; i < column->value_count; i++) {
r.set_integer_elt(vector, (R_xlen_t)i, column->values[i].is_na ? *r.na_int : column->values[i].integer_value);
}
return vector;
case DATAFRAME_COLUMN_DOUBLE:
vector = r.protect(r.alloc_vector(REALSXP, column->value_count));
(*protect_count)++;
for (unsigned i = 0; i < column->value_count; i++) {
r.set_real_elt(vector, (R_xlen_t)i, column->values[i].is_na ? *r.na_real : column->values[i].double_value);
}
return vector;
case DATAFRAME_COLUMN_CHARACTER:
vector = r.protect(r.alloc_vector(STRSXP, column->value_count));
(*protect_count)++;
for (unsigned i = 0; i < column->value_count; i++) {
if (column->values[i].is_na) {
r.set_string_elt(vector, (R_xlen_t)i, *r.na_string);
} else {
SEXP chars = r.protect(r.mk_char_len_ce(
column->values[i].string_value,
(int)column->values[i].string_value_len,
CE_UTF8));
r.set_string_elt(vector, (R_xlen_t)i, chars);
r.unprotect(1);
}
}
return vector;
}
snprintf(error, error_size, "invalid_dataframe: unsupported_type");
return *r.nil_value;
}
static void encode_data_frame_body(void *data) {
encode_data_frame_ctx *ctx = (encode_data_frame_ctx *)data;
r_work *work = ctx->work;
dataframe_wire *wire = work->input_dataframe;
int protect_count = 0;
SEXP df = r.protect(r.alloc_vector(VECSXP, wire->column_count));
protect_count++;
SEXP names = r.protect(r.alloc_vector(STRSXP, wire->name_count));
protect_count++;
for (unsigned i = 0; i < wire->column_count; i++) {
SEXP column = make_dataframe_column_vector(&wire->columns[i], &protect_count, ctx->error, sizeof(ctx->error));
if (column == *r.nil_value && ctx->error[0] != '\0') {
r.unprotect(protect_count);
return;
}
r.set_vector_elt(df, (R_xlen_t)i, column);
SEXP name = r.protect(r.mk_char_len_ce(wire->names[i], (int)strlen(wire->names[i]), CE_UTF8));
r.set_string_elt(names, (R_xlen_t)i, name);
r.unprotect(1);
}
SEXP class_value = r.protect(r.alloc_vector(STRSXP, 1));
protect_count++;
SEXP class_name = r.protect(r.mk_char_len_ce("data.frame", 10, CE_UTF8));
r.set_string_elt(class_value, 0, class_name);
r.unprotect(1);
SEXP row_names = make_dataframe_row_names(wire->n_rows, &protect_count);
r.set_attrib(df, *r.names_symbol, names);
r.set_attrib(df, *r.class_symbol, class_value);
r.set_attrib(df, r.install("row.names"), row_names);
work->result_resource = make_preserved_resource(df, RESOURCE_KIND_DATAFRAME);
if (work->result_resource == NULL) {
snprintf(ctx->error, sizeof(ctx->error), "failed to allocate native dataframe resource");
r.unprotect(protect_count);
return;
}
r.unprotect(protect_count);
ctx->ok = true;
}
static void do_encode_data_frame_wire(r_work *work) {
work->ok = true;
work->error[0] = '\0';
work->result_resource = NULL;
if (set_terminal_init_failure_if_present(work)) return;
if (!runtime_initialized) {
set_error(work, "embedded R runtime is not initialized");
return;
}
if (!load_r_api(work)) return;
encode_data_frame_ctx ctx = {.work = work, .ok = false, .error = ""};
if (!r.toplevel_exec(encode_data_frame_body, &ctx)) {
set_error(work, "R non-local error during encode_data_frame");
return;
}
if (!ctx.ok) {
set_error(work, ctx.error[0] == '\0' ? "R encode_data_frame failed" : ctx.error);
}
}
typedef struct decode_arrow_ctx {
r_work *work;
SEXP sexp;
resource_kind kind;
bool ok;
char error[1024];
} decode_arrow_ctx;
static const char *inherits_dataframe_source = "inherits(.rx_df, \"data.frame\")";
static const char *write_ipc_dataframe_source =
"arrow::write_to_raw(.rx_df, format = \"stream\")";
static void decode_arrow_body(void *data) {
decode_arrow_ctx *ctx = (decode_arrow_ctx *)data;
r_work *work = ctx->work;
int protect_count = 0;
SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
protect_count++;
r.define_var(r.install(".rx_df"), ctx->sexp, env);
NDBG("decode_arrow require_arrow begin");
int arrow_ok = require_arrow_in_env(env, ctx->error, sizeof(ctx->error));
NDBG("decode_arrow require_arrow end");
if (!arrow_ok) {
r.unprotect(protect_count);
return;
}
if (ctx->kind != RESOURCE_KIND_DATAFRAME) {
SEXP inherits = *r.nil_value;
NDBG("decode_arrow inherits_dataframe begin");
int inherits_ok = eval_source_in_env(env, inherits_dataframe_source, &inherits, ctx->error, sizeof(ctx->error));
NDBG("decode_arrow inherits_dataframe end");
if (!inherits_ok) {
r.unprotect(protect_count);
return;
}
bool is_dataframe =
r.type_of(inherits) == LGLSXP && r.xlength_fn(inherits) >= 1 && r.logical_elt(inherits, 0) == 1;
if (!is_dataframe) {
snprintf(ctx->error, sizeof(ctx->error), "not_dataframe: native object is not an Arrow-compatible data frame");
r.unprotect(protect_count);
return;
}
}
SEXP raw = *r.nil_value;
NDBG("decode_arrow write_to_raw begin");
int write_ok = eval_source_in_env(env, write_ipc_dataframe_source, &raw, ctx->error, sizeof(ctx->error));
NDBG("decode_arrow write_to_raw end");
if (!write_ok) {
r.unprotect(protect_count);
return;
}
if (r.type_of(raw) != RAWSXP) {
snprintf(ctx->error, sizeof(ctx->error), "R arrow writer did not return raw bytes");
r.unprotect(protect_count);
return;
}
R_xlen_t output_len = r.xlength_fn(raw);
if (output_len < 0 || (uint64_t)output_len > SIZE_MAX) {
snprintf(ctx->error, sizeof(ctx->error), "Arrow IPC output is too large");
r.unprotect(protect_count);
return;
}
work->output_len = (size_t)output_len;
work->output_bytes = malloc(work->output_len + 1);
if (work->output_bytes == NULL) {
snprintf(ctx->error, sizeof(ctx->error), "failed to allocate Arrow IPC output");
r.unprotect(protect_count);
return;
}
if (work->output_len > 0) {
memcpy(work->output_bytes, r.raw(raw), work->output_len);
}
work->output_bytes[work->output_len] = '\0';
r.unprotect(protect_count);
ctx->ok = true;
}
static void do_decode_arrow(r_work *work) {
work->ok = true;
work->error[0] = '\0';
work->output_bytes = NULL;
work->output_len = 0;
if (set_terminal_init_failure_if_present(work)) return;
if (!runtime_initialized) {
set_error(work, "embedded R runtime is not initialized");
return;
}
if (!load_r_api(work)) return;
pthread_mutex_lock(&work->resource->mutex);
SEXP sexp = work->resource->sexp;
resource_kind kind = work->resource->kind;
bool release_enqueued = work->resource->release_enqueued;
pthread_mutex_unlock(&work->resource->mutex);
if (sexp == NULL || release_enqueued) {
set_error(work, "native R object resource has already been released");
return;
}
decode_arrow_ctx ctx = {.work = work, .sexp = sexp, .kind = kind, .ok = false, .error = ""};
if (!r.toplevel_exec(decode_arrow_body, &ctx)) {
set_error(work, "R non-local error during decode_arrow");
return;
}
if (!ctx.ok) {
set_error(work, ctx.error[0] == '\0' ? "R decode_arrow failed" : ctx.error);
}
}
typedef struct release_resource_ctx {
SEXP sexp;
bool ok;
} release_resource_ctx;
static void release_resource_body(void *data) {
release_resource_ctx *ctx = (release_resource_ctx *)data;
r.release_object(ctx->sexp);
ctx->ok = true;
}
static void do_release_resource(r_work *work) {
if (!runtime_initialized) {
note_release_skipped_uninitialized(work->release_resource_id, "native runtime is not initialized");
return;
}
if (!load_r_api(work)) {
note_release_failure(work->release_resource_id, work->error[0] == '\0' ? "failed to load native R API for release" : work->error);
return;
}
if (release_fault_enabled("skip_uninitialized")) {
note_release_skipped_uninitialized(work->release_resource_id, "fault injected release skip before R_ReleaseObject");
return;
}
if (release_fault_enabled("1")) {
note_release_failure(work->release_resource_id, "fault injected during native resource release");
return;
}
release_resource_ctx ctx = {.sexp = work->release_sexp, .ok = false};
if (r.toplevel_exec(release_resource_body, &ctx) && ctx.ok) {
note_release_on_owner_thread(work->release_resource_id);
} else {
note_release_failure(work->release_resource_id, "R non-local error during resource release");
}
}
static void *owner_loop(void *arg) {
(void)arg;
set_owner_thread_id(current_thread_id());
for (;;) {
r_work *work = dequeue_work();
switch (work->kind) {
case WORK_INIT:
do_init(work);
break;
case WORK_EVAL_STRING:
do_eval_string(work);
break;
case WORK_EVAL:
do_eval(work);
break;
case WORK_PRINT:
do_print(work);
break;
case WORK_PLOT:
do_plot(work);
break;
case WORK_ENCODE_RESOURCE:
do_encode_resource(work);
break;
case WORK_DECODE_RESOURCE:
do_decode_resource(work);
break;
case WORK_ENCODE_DATAFRAME:
do_encode_dataframe(work);
break;
case WORK_ENCODE_DATA_FRAME:
do_encode_data_frame_wire(work);
break;
case WORK_DECODE_DATA_FRAME:
do_decode_data_frame(work);
break;
case WORK_DECODE_ARROW:
do_decode_arrow(work);
break;
case WORK_RELEASE:
do_release_resource(work);
break;
}
finish_work(work);
}
return NULL;
}
static int ensure_owner_thread(void) {
pthread_mutex_lock(&owner_start_mutex);
if (owner_started) {
pthread_mutex_unlock(&owner_start_mutex);
return 1;
}
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setstacksize(&attr, 64 * 1024 * 1024);
int result = pthread_create(&owner_thread, &attr, owner_loop, NULL);
pthread_attr_destroy(&attr);
if (result != 0) {
pthread_mutex_unlock(&owner_start_mutex);
return 0;
}
owner_started = true;
pthread_mutex_unlock(&owner_start_mutex);
return 1;
}
static ERL_NIF_TERM make_error(ErlNifEnv *env, const char *message) {
return enif_make_tuple2(env, atom_error, enif_make_string(env, message, ERL_NIF_LATIN1));
}
static ERL_NIF_TERM make_tagged_error(ErlNifEnv *env, const char *tag, const char *message) {
return enif_make_tuple2(
env,
atom_error,
enif_make_tuple2(env, enif_make_atom(env, tag), enif_make_string(env, message, ERL_NIF_LATIN1)));
}
static ERL_NIF_TERM make_binary_term(ErlNifEnv *env, const char *bytes, size_t length, bool *ok) {
ERL_NIF_TERM term;
unsigned char *dest = enif_make_new_binary(env, length, &term);
if (dest == NULL && length > 0) {
*ok = false;
return enif_make_atom(env, "nil");
}
if (length > 0) memcpy(dest, bytes, length);
*ok = true;
return term;
}
static ERL_NIF_TERM make_binary_string(ErlNifEnv *env, const char *value) {
if (value == NULL) return enif_make_atom(env, "nil");
bool ok = false;
ERL_NIF_TERM term = make_binary_term(env, value, strlen(value), &ok);
if (!ok) return enif_make_string(env, value, ERL_NIF_LATIN1);
return term;
}
static int map_put_binary_key(ErlNifEnv *env, ERL_NIF_TERM *map, const char *key, ERL_NIF_TERM value) {
ERL_NIF_TERM key_term = make_binary_string(env, key);
return enif_make_map_put(env, *map, key_term, value, map);
}
static ERL_NIF_TERM dataframe_reason_term(ErlNifEnv *env, const char *reason) {
if (strncmp(reason, "class:", 6) == 0) {
return enif_make_tuple2(env, enif_make_atom(env, "class"), make_binary_string(env, reason + 6));
}
return enif_make_atom(env, reason);
}
static ERL_NIF_TERM make_tagged_error_atom(ErlNifEnv *env, const char *tag, const char *reason) {
return enif_make_tuple2(
env,
atom_error,
enif_make_tuple2(env, enif_make_atom(env, tag), dataframe_reason_term(env, reason)));
}
static ERL_NIF_TERM make_dataframe_column_error(ErlNifEnv *env, const char *detail) {
const char *separator = strstr(detail, ": ");
if (separator == NULL) return make_error(env, detail);
size_t name_len = (size_t)(separator - detail);
bool ok = false;
ERL_NIF_TERM name = make_binary_term(env, detail, name_len, &ok);
if (!ok) return make_error(env, "failed to allocate dataframe column error");
return enif_make_tuple2(
env,
atom_error,
enif_make_tuple3(
env,
enif_make_atom(env, "unsupported_dataframe_column"),
name,
dataframe_reason_term(env, separator + 2)));
}
static ERL_NIF_TERM make_dataframe_too_large_error(ErlNifEnv *env, const char *detail) {
unsigned rows = 0;
unsigned max_rows = 0;
if (sscanf(detail, "%u: %u", &rows, &max_rows) != 2) return make_error(env, detail);
return enif_make_tuple2(
env,
atom_error,
enif_make_tuple3(
env,
enif_make_atom(env, "dataframe_too_large"),
enif_make_uint(env, rows),
enif_make_uint(env, max_rows)));
}
static ERL_NIF_TERM make_dataframe_error(ErlNifEnv *env, const char *error) {
if (strncmp(error, "not_dataframe: ", 15) == 0) {
return make_tagged_error(env, "not_dataframe", error + 15);
}
if (strncmp(error, "unsupported_dataframe_column: ", 30) == 0) {
return make_dataframe_column_error(env, error + 30);
}
if (strncmp(error, "unsupported_dataframe_names: ", 29) == 0) {
return make_tagged_error_atom(env, "unsupported_dataframe_names", error + 29);
}
if (strncmp(error, "unsupported_dataframe_row_names: ", 33) == 0) {
return make_tagged_error_atom(env, "unsupported_dataframe_row_names", error + 33);
}
if (strncmp(error, "unsupported_dataframe_shape: ", 29) == 0) {
return make_tagged_error_atom(env, "unsupported_dataframe_shape", error + 29);
}
if (strncmp(error, "invalid_dataframe: ", 19) == 0) {
return make_tagged_error_atom(env, "invalid_dataframe", error + 19);
}
if (strncmp(error, "dataframe_too_large: ", 21) == 0) {
return make_dataframe_too_large_error(env, error + 21);
}
return make_error(env, error);
}
static int make_dataframe_cell_term(ErlNifEnv *env, const dataframe_cell *cell, dataframe_column_type type, ERL_NIF_TERM *out) {
if (cell->is_na) {
ERL_NIF_TERM na = enif_make_new_map(env);
if (!map_put_binary_key(env, &na, "kind", make_binary_string(env, "na")) ||
!map_put_binary_key(env, &na, "type", make_binary_string(env, dataframe_type_name(type)))) {
return 0;
}
*out = na;
return 1;
}
switch (type) {
case DATAFRAME_COLUMN_LOGICAL:
*out = enif_make_atom(env, cell->logical_value ? "true" : "false");
return 1;
case DATAFRAME_COLUMN_INTEGER:
*out = enif_make_int(env, cell->integer_value);
return 1;
case DATAFRAME_COLUMN_DOUBLE:
*out = enif_make_double(env, cell->double_value);
return 1;
case DATAFRAME_COLUMN_CHARACTER: {
bool ok = false;
*out = make_binary_term(env, cell->string_value, cell->string_value_len, &ok);
return ok;
}
}
return 0;
}
static int make_dataframe_values_list(ErlNifEnv *env, const dataframe_column *column, ERL_NIF_TERM *out) {
if (column->value_count == 0) {
*out = enif_make_list(env, 0);
return 1;
}
ERL_NIF_TERM *items = enif_alloc((size_t)column->value_count * sizeof(ERL_NIF_TERM));
if (items == NULL) return 0;
for (unsigned i = 0; i < column->value_count; i++) {
if (!make_dataframe_cell_term(env, &column->values[i], column->type, &items[i])) {
enif_free(items);
return 0;
}
}
*out = enif_make_list_from_array(env, items, column->value_count);
enif_free(items);
return 1;
}
static int make_dataframe_wire_term(ErlNifEnv *env, const dataframe_wire *wire, ERL_NIF_TERM *out) {
ERL_NIF_TERM names = enif_make_list(env, 0);
for (unsigned i = wire->name_count; i > 0; i--) {
names = enif_make_list_cell(env, make_binary_string(env, wire->names[i - 1]), names);
}
ERL_NIF_TERM columns = enif_make_list(env, 0);
for (unsigned i = wire->column_count; i > 0; i--) {
const dataframe_column *column = &wire->columns[i - 1];
ERL_NIF_TERM values;
if (!make_dataframe_values_list(env, column, &values)) return 0;
ERL_NIF_TERM column_map = enif_make_new_map(env);
if (!map_put_binary_key(env, &column_map, "name", make_binary_string(env, column->name)) ||
!map_put_binary_key(env, &column_map, "type", make_binary_string(env, dataframe_type_name(column->type))) ||
!map_put_binary_key(env, &column_map, "values", values)) {
return 0;
}
columns = enif_make_list_cell(env, column_map, columns);
}
ERL_NIF_TERM wire_map = enif_make_new_map(env);
if (!map_put_binary_key(env, &wire_map, "kind", make_binary_string(env, "data_frame")) ||
!map_put_binary_key(env, &wire_map, "names", names) ||
!map_put_binary_key(env, &wire_map, "n_rows", enif_make_uint(env, wire->n_rows)) ||
!map_put_binary_key(env, &wire_map, "columns", columns)) {
return 0;
}
*out = wire_map;
return 1;
}
static ERL_NIF_TERM make_init_config_map(ErlNifEnv *env, const init_config *config) {
ERL_NIF_TERM map = enif_make_new_map(env);
ERL_NIF_TERM lib_paths_list = enif_make_list(env, 0);
for (unsigned i = config->lib_paths_count; i > 0; i--) {
lib_paths_list = enif_make_list_cell(env, make_binary_string(env, config->lib_paths[i - 1]), lib_paths_list);
}
enif_make_map_put(env, map, enif_make_atom(env, "r_home"), make_binary_string(env, config->r_home), &map);
enif_make_map_put(env, map, enif_make_atom(env, "lib_r_path"), make_binary_string(env, config->lib_r_path), &map);
enif_make_map_put(env, map, enif_make_atom(env, "lib_paths"), lib_paths_list, &map);
return map;
}
static ERL_NIF_TERM make_optional_config(ErlNifEnv *env, const init_config *config) {
if (config->r_home == NULL || config->lib_r_path == NULL) return enif_make_atom(env, "nil");
return make_init_config_map(env, config);
}
static ERL_NIF_TERM make_mismatch_list(ErlNifEnv *env, const init_mismatch_info *mismatch) {
ERL_NIF_TERM items[3];
unsigned count = 0;
if (mismatch->r_home) items[count++] = enif_make_atom(env, "r_home");
if (mismatch->lib_r_path) items[count++] = enif_make_atom(env, "lib_r_path");
if (mismatch->lib_paths) items[count++] = enif_make_atom(env, "lib_paths");
if (count == 0) return enif_make_list(env, 0);
return enif_make_list_from_array(env, items, count);
}
static ERL_NIF_TERM make_native_init_mismatch_error(ErlNifEnv *env, const init_mismatch_info *mismatch) {
ERL_NIF_TERM map = enif_make_new_map(env);
enif_make_map_put(env, map, enif_make_atom(env, "message"), make_binary_string(env, mismatch->message), &map);
enif_make_map_put(env, map, enif_make_atom(env, "mismatches"), make_mismatch_list(env, mismatch), &map);
enif_make_map_put(env, map, enif_make_atom(env, "current"), make_init_config_map(env, &mismatch->current), &map);
enif_make_map_put(env, map, enif_make_atom(env, "requested"), make_init_config_map(env, &mismatch->requested), &map);
enif_make_map_put(env, map, enif_make_atom(env, "restart_required"), enif_make_atom(env, "true"), &map);
return enif_make_tuple2(env, atom_error, enif_make_tuple2(env, enif_make_atom(env, "native_init_mismatch"), map));
}
static ERL_NIF_TERM init_state_atom(ErlNifEnv *env, init_state_tag state) {
switch (state) {
case INIT_STATE_INITIALIZED: return enif_make_atom(env, "initialized");
case INIT_STATE_FAILED: return enif_make_atom(env, "failed");
case INIT_STATE_UNINITIALIZED:
default: return enif_make_atom(env, "uninitialized");
}
}
static ERL_NIF_TERM make_optional_init_failure(ErlNifEnv *env, const init_failure_info *failure, bool has_failure) {
if (!has_failure) return enif_make_atom(env, "nil");
ERL_NIF_TERM map = enif_make_new_map(env);
enif_make_map_put(env, map, enif_make_atom(env, "stage"), enif_make_atom(env, failure->stage), &map);
enif_make_map_put(env, map, enif_make_atom(env, "message"), make_binary_string(env, failure->message), &map);
enif_make_map_put(env, map, enif_make_atom(env, "retryable"), enif_make_atom(env, failure->retryable ? "true" : "false"), &map);
enif_make_map_put(env, map, enif_make_atom(env, "restart_required"), enif_make_atom(env, failure->restart_required ? "true" : "false"), &map);
return map;
}
static ERL_NIF_TERM make_native_init_failed_error(ErlNifEnv *env, const init_failure_info *failure) {
return enif_make_tuple2(env, atom_error,
enif_make_tuple2(env, enif_make_atom(env, "native_init_failed"),
make_optional_init_failure(env, failure, true)));
}
static ERL_NIF_TERM rx_diagnostics(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
(void)argv;
if (argc != 0) return enif_make_badarg(env);
pthread_mutex_lock(&diagnostics_mutex);
ERL_NIF_TERM map = enif_make_new_map(env);
enif_make_map_put(env, map, enif_make_atom(env, "init_state"), init_state_atom(env, init_state), &map);
enif_make_map_put(env, map, enif_make_atom(env, "init_config"), make_optional_config(env, &successful_init_config), &map);
enif_make_map_put(env, map, enif_make_atom(env, "attempted_init_config"), make_optional_config(env, &attempted_init_config), &map);
enif_make_map_put(env, map, enif_make_atom(env, "last_init_error"), make_optional_init_failure(env, &last_init_error, has_last_init_error), &map);
enif_make_map_put(env, map, enif_make_atom(env, "init_attempt_count"), enif_make_uint64(env, init_attempt_count), &map);
enif_make_map_put(env, map, enif_make_atom(env, "init_mismatch_count"), enif_make_uint64(env, init_mismatch_count), &map);
enif_make_map_put(env, map, enif_make_atom(env, "owner_thread_id"), enif_make_uint64(env, owner_thread_id_value), &map);
enif_make_map_put(env, map, enif_make_atom(env, "release_enqueued_count"), enif_make_uint64(env, release_enqueued_count), &map);
enif_make_map_put(env, map, enif_make_atom(env, "release_success_count"), enif_make_uint64(env, release_success_count), &map);
enif_make_map_put(env, map, enif_make_atom(env, "release_failure_count"), enif_make_uint64(env, release_failure_count), &map);
enif_make_map_put(env, map, enif_make_atom(env, "release_skipped_uninitialized_count"), enif_make_uint64(env, release_skipped_uninitialized_count), &map);
enif_make_map_put(env, map, enif_make_atom(env, "last_release_thread_id"), enif_make_uint64(env, last_release_thread_id_value), &map);
enif_make_map_put(env, map, enif_make_atom(env, "last_release_resource_id"), has_last_release_resource_id ? enif_make_uint64(env, last_release_resource_id_value) : enif_make_atom(env, "nil"), &map);
enif_make_map_put(env, map, enif_make_atom(env, "last_release_error"), last_release_error[0] == '\0' ? enif_make_atom(env, "nil") : make_binary_string(env, last_release_error), &map);
pthread_mutex_unlock(&diagnostics_mutex);
return map;
}
static int result_list_count(r_work *work, unsigned *out) {
if (work->result_length < 0 || (uint64_t)work->result_length > UINT_MAX) {
return 0;
}
*out = (unsigned)work->result_length;
return 1;
}
static int make_result_list(ErlNifEnv *env, r_work *work, r_result_kind kind, ERL_NIF_TERM *out) {
unsigned count;
if (!result_list_count(work, &count)) {
return 0;
}
if (count == 0) {
*out = enif_make_list(env, 0);
return 1;
}
ERL_NIF_TERM *items = enif_alloc((size_t)count * sizeof(ERL_NIF_TERM));
if (items == NULL) {
return 0;
}
for (unsigned i = 0; i < count; i++) {
switch (kind) {
case RESULT_INTEGER_VECTOR:
items[i] = enif_make_int(env, work->integer_values[i]);
break;
case RESULT_DOUBLE_VECTOR:
items[i] = enif_make_double(env, work->double_values[i]);
break;
case RESULT_LOGICAL_VECTOR:
items[i] = enif_make_atom(env, work->logical_values[i] ? "true" : "false");
break;
case RESULT_CHARACTER_VECTOR: {
bool ok = false;
items[i] = make_binary_term(env, work->string_values[i], work->string_value_lens[i], &ok);
if (!ok) {
enif_free(items);
return 0;
}
break;
}
default:
enif_free(items);
return 0;
}
}
*out = enif_make_list_from_array(env, items, count);
enif_free(items);
return 1;
}
static ERL_NIF_TERM decoded_atomic_atom(ErlNifEnv *env, decoded_atomic_type type) {
switch (type) {
case DECODE_ATOMIC_LOGICAL:
return enif_make_atom(env, "logical");
case DECODE_ATOMIC_INTEGER:
return enif_make_atom(env, "integer");
case DECODE_ATOMIC_DOUBLE:
return enif_make_atom(env, "double");
case DECODE_ATOMIC_CHARACTER:
return enif_make_atom(env, "character");
}
return enif_make_atom(env, "unknown");
}
static ERL_NIF_TERM decoded_vector_atom(ErlNifEnv *env, decoded_atomic_type type) {
switch (type) {
case DECODE_ATOMIC_LOGICAL:
return enif_make_atom(env, "logical_vector");
case DECODE_ATOMIC_INTEGER:
return enif_make_atom(env, "integer_vector");
case DECODE_ATOMIC_DOUBLE:
return enif_make_atom(env, "double_vector");
case DECODE_ATOMIC_CHARACTER:
return enif_make_atom(env, "character_vector");
}
return enif_make_atom(env, "unknown_vector");
}
static int make_decoded_term(ErlNifEnv *env, decoded_value *decoded, ERL_NIF_TERM *out);
static int make_decoded_value_list(ErlNifEnv *env, decoded_value **values, unsigned count, ERL_NIF_TERM *out) {
if (count == 0) {
*out = enif_make_list(env, 0);
return 1;
}
ERL_NIF_TERM *items = enif_alloc((size_t)count * sizeof(ERL_NIF_TERM));
if (items == NULL) return 0;
for (unsigned i = 0; i < count; i++) {
if (!make_decoded_term(env, values[i], &items[i])) {
enif_free(items);
return 0;
}
}
*out = enif_make_list_from_array(env, items, count);
enif_free(items);
return 1;
}
static int make_decoded_entry_list(ErlNifEnv *env, decoded_entry *entries, unsigned count, ERL_NIF_TERM *out) {
if (count == 0) {
*out = enif_make_list(env, 0);
return 1;
}
ERL_NIF_TERM *items = enif_alloc((size_t)count * sizeof(ERL_NIF_TERM));
if (items == NULL) return 0;
for (unsigned i = 0; i < count; i++) {
ERL_NIF_TERM name = enif_make_atom(env, "nil");
if (entries[i].name != NULL) {
bool ok = false;
name = make_binary_term(env, entries[i].name, strlen(entries[i].name), &ok);
if (!ok) {
enif_free(items);
return 0;
}
}
ERL_NIF_TERM value;
if (!make_decoded_term(env, entries[i].value, &value)) {
enif_free(items);
return 0;
}
items[i] = enif_make_tuple2(env, name, value);
}
*out = enif_make_list_from_array(env, items, count);
enif_free(items);
return 1;
}
static int make_decoded_term(ErlNifEnv *env, decoded_value *decoded, ERL_NIF_TERM *out) {
if (decoded == NULL) return 0;
switch (decoded->kind) {
case DECODE_VALUE_NULL:
*out = enif_make_tuple2(env, enif_make_atom(env, "null"), enif_make_atom(env, "nil"));
return 1;
case DECODE_VALUE_LOGICAL:
*out = enif_make_tuple2(
env,
enif_make_atom(env, "logical"),
enif_make_atom(env, decoded->logical_value ? "true" : "false"));
return 1;
case DECODE_VALUE_INTEGER:
*out = enif_make_tuple2(env, enif_make_atom(env, "integer"), enif_make_int(env, decoded->integer_value));
return 1;
case DECODE_VALUE_DOUBLE:
*out = enif_make_tuple2(env, enif_make_atom(env, "double"), enif_make_double(env, decoded->double_value));
return 1;
case DECODE_VALUE_CHARACTER: {
bool ok = false;
ERL_NIF_TERM binary = make_binary_term(env, decoded->string_value, decoded->string_value_len, &ok);
if (!ok) return 0;
*out = enif_make_tuple2(env, enif_make_atom(env, "character"), binary);
return 1;
}
case DECODE_VALUE_NA:
*out = enif_make_tuple2(env, enif_make_atom(env, "na"), decoded_atomic_atom(env, decoded->atomic_type));
return 1;
case DECODE_VALUE_VECTOR: {
ERL_NIF_TERM values;
if (!make_decoded_value_list(env, decoded->items, decoded->count, &values)) return 0;
*out = enif_make_tuple2(env, decoded_vector_atom(env, decoded->atomic_type), values);
return 1;
}
case DECODE_VALUE_NAMED_LIST: {
ERL_NIF_TERM entries;
if (!make_decoded_entry_list(env, decoded->entries, decoded->count, &entries)) return 0;
*out = enif_make_tuple2(env, enif_make_atom(env, "named_list"), entries);
return 1;
}
case DECODE_VALUE_R_LIST: {
ERL_NIF_TERM entries;
if (!make_decoded_entry_list(env, decoded->entries, decoded->count, &entries)) return 0;
*out = enif_make_tuple2(env, enif_make_atom(env, "r_list"), entries);
return 1;
}
case DECODE_VALUE_OBJECT:
if (decoded->resource == NULL) return 0;
*out =
enif_make_tuple2(env, enif_make_atom(env, "object"), enif_make_resource(env, decoded->resource));
return 1;
}
return 0;
}
static ERL_NIF_TERM make_eval_success(ErlNifEnv *env, r_work *work) {
ERL_NIF_TERM value;
if (work->decoded_result != NULL) {
if (!make_decoded_term(env, work->decoded_result, &value)) {
return make_error(env, "failed to allocate native decode result");
}
return enif_make_tuple2(env, atom_ok, value);
}
switch (work->result_kind) {
case RESULT_NULL:
value = enif_make_tuple2(env, enif_make_atom(env, "null"), enif_make_atom(env, "nil"));
break;
case RESULT_INTEGER:
value = enif_make_tuple2(env, enif_make_atom(env, "integer"), enif_make_int(env, work->integer_result));
break;
case RESULT_DOUBLE:
value = enif_make_tuple2(env, enif_make_atom(env, "double"), enif_make_double(env, work->double_result));
break;
case RESULT_LOGICAL:
value = enif_make_tuple2(
env,
enif_make_atom(env, "logical"),
enif_make_atom(env, work->logical_result ? "true" : "false"));
break;
case RESULT_CHARACTER: {
bool ok = false;
ERL_NIF_TERM binary = make_binary_term(env, work->string_result, work->string_result_len, &ok);
if (!ok) return make_error(env, "failed to allocate character binary");
value = enif_make_tuple2(env, enif_make_atom(env, "character"), binary);
break;
}
case RESULT_INTEGER_VECTOR:
if (!make_result_list(env, work, RESULT_INTEGER_VECTOR, &value)) return make_error(env, "failed to allocate integer vector result");
value = enif_make_tuple2(env, enif_make_atom(env, "integer_vector"), value);
break;
case RESULT_DOUBLE_VECTOR:
if (!make_result_list(env, work, RESULT_DOUBLE_VECTOR, &value)) return make_error(env, "failed to allocate double vector result");
value = enif_make_tuple2(env, enif_make_atom(env, "double_vector"), value);
break;
case RESULT_LOGICAL_VECTOR:
if (!make_result_list(env, work, RESULT_LOGICAL_VECTOR, &value)) return make_error(env, "failed to allocate logical vector result");
value = enif_make_tuple2(env, enif_make_atom(env, "logical_vector"), value);
break;
case RESULT_CHARACTER_VECTOR:
if (!make_result_list(env, work, RESULT_CHARACTER_VECTOR, &value)) return make_error(env, "failed to allocate character vector result");
value = enif_make_tuple2(env, enif_make_atom(env, "character_vector"), value);
break;
case RESULT_UNSUPPORTED:
return make_tagged_error(env, "unsupported_r_value", "R result type is not supported by the native harness");
case RESULT_NONE:
default:
return make_error(env, "native eval did not produce a result");
}
return enif_make_tuple2(env, atom_ok, value);
}
static ERL_NIF_TERM make_output_map(ErlNifEnv *env, r_work *work) {
ERL_NIF_TERM map = enif_make_new_map(env);
bool ok = false;
ERL_NIF_TERM stdout_value =
make_binary_term(env, work->stdout_buffer.data == NULL ? "" : work->stdout_buffer.data, work->stdout_buffer.len, &ok);
if (!ok) stdout_value = enif_make_string(env, "", ERL_NIF_LATIN1);
ERL_NIF_TERM messages_value =
make_binary_term(env, work->messages_buffer.data == NULL ? "" : work->messages_buffer.data, work->messages_buffer.len, &ok);
if (!ok) messages_value = enif_make_string(env, "", ERL_NIF_LATIN1);
ERL_NIF_TERM warnings_value =
make_binary_term(env, work->warnings_buffer.data == NULL ? "" : work->warnings_buffer.data, work->warnings_buffer.len, &ok);
if (!ok) warnings_value = enif_make_string(env, "", ERL_NIF_LATIN1);
enif_make_map_put(env, map, enif_make_atom(env, "stdout"), stdout_value, &map);
enif_make_map_put(env, map, enif_make_atom(env, "messages"), messages_value, &map);
enif_make_map_put(env, map, enif_make_atom(env, "warnings"), warnings_value, &map);
return map;
}
static ERL_NIF_TERM make_string_list_term(ErlNifEnv *env, char **items, unsigned count) {
if (count == 0) return enif_make_atom(env, "nil");
ERL_NIF_TERM *terms = enif_alloc((size_t)count * sizeof(ERL_NIF_TERM));
if (terms == NULL) return enif_make_atom(env, "nil");
for (unsigned i = 0; i < count; i++) {
bool ok = false;
terms[i] = make_binary_term(env, items[i], strlen(items[i]), &ok);
if (!ok) terms[i] = enif_make_string(env, items[i], ERL_NIF_LATIN1);
}
ERL_NIF_TERM list = enif_make_list_from_array(env, terms, count);
enif_free(terms);
return list;
}
static int make_eval_global_list(ErlNifEnv *env, r_work *work, ERL_NIF_TERM *out) {
if (work->result_global_count == 0) {
*out = enif_make_list(env, 0);
return 1;
}
ERL_NIF_TERM *items = enif_alloc((size_t)work->result_global_count * sizeof(ERL_NIF_TERM));
if (items == NULL) return 0;
for (unsigned i = 0; i < work->result_global_count; i++) {
bool ok = false;
ERL_NIF_TERM name = make_binary_term(env, work->result_global_names[i], strlen(work->result_global_names[i]), &ok);
if (!ok) {
enif_free(items);
return 0;
}
ERL_NIF_TERM resource = enif_make_resource(env, work->result_global_resources[i]);
items[i] = enif_make_tuple2(env, name, resource);
}
*out = enif_make_list_from_array(env, items, work->result_global_count);
enif_free(items);
return 1;
}
static ERL_NIF_TERM make_native_eval_success(ErlNifEnv *env, r_work *work) {
ERL_NIF_TERM result =
work->result_resource == NULL ? enif_make_atom(env, "nil") : enif_make_resource(env, work->result_resource);
ERL_NIF_TERM globals;
if (!make_eval_global_list(env, work, &globals)) {
return make_error(env, "failed to allocate native eval globals result");
}
ERL_NIF_TERM payload = enif_make_tuple3(env, result, globals, make_output_map(env, work));
return enif_make_tuple2(env, atom_ok, payload);
}
static int make_plot_page_list(ErlNifEnv *env, r_work *work, ERL_NIF_TERM *out) {
if (work->plot_page_count == 0) {
*out = enif_make_list(env, 0);
return 1;
}
ERL_NIF_TERM *items = enif_alloc((size_t)work->plot_page_count * sizeof(ERL_NIF_TERM));
if (items == NULL) return 0;
for (unsigned i = 0; i < work->plot_page_count; i++) {
bool ok = false;
items[i] = make_binary_term(env, work->plot_pages[i], work->plot_page_lens[i], &ok);
if (!ok) {
enif_free(items);
return 0;
}
}
*out = enif_make_list_from_array(env, items, work->plot_page_count);
enif_free(items);
return 1;
}
static ERL_NIF_TERM make_native_plot_success(ErlNifEnv *env, r_work *work) {
ERL_NIF_TERM pages;
if (!make_plot_page_list(env, work, &pages)) {
return make_error(env, "failed to allocate native plot pages result");
}
ERL_NIF_TERM payload = enif_make_tuple4(
env,
enif_make_int(env, work->plot_width),
enif_make_int(env, work->plot_height),
pages,
make_output_map(env, work));
return enif_make_tuple2(env, atom_ok, payload);
}
static ERL_NIF_TERM make_native_eval_error(ErlNifEnv *env, r_work *work) {
ERL_NIF_TERM map = enif_make_new_map(env);
bool ok = false;
ERL_NIF_TERM message =
make_binary_term(env, work->error_message, strlen(work->error_message), &ok);
if (!ok) message = enif_make_string(env, work->error_message, ERL_NIF_LATIN1);
ERL_NIF_TERM call = enif_make_atom(env, "nil");
if (work->error_call[0] != '\0') {
call = make_binary_term(env, work->error_call, strlen(work->error_call), &ok);
if (!ok) call = enif_make_string(env, work->error_call, ERL_NIF_LATIN1);
}
enif_make_map_put(env, map, enif_make_atom(env, "message"), message, &map);
enif_make_map_put(
env,
map,
enif_make_atom(env, "r_class"),
make_string_list_term(env, work->error_classes, work->error_class_count),
&map);
enif_make_map_put(env, map, enif_make_atom(env, "call"), call, &map);
enif_make_map_put(env, map, enif_make_atom(env, "traceback"), enif_make_list(env, 0), &map);
enif_make_map_put(env, map, enif_make_atom(env, "output"), make_output_map(env, work), &map);
return enif_make_tuple2(env, atom_error, map);
}
static void rx_object_resource_dtor(ErlNifEnv *env, void *obj) {
(void)env;
rx_object_resource *resource = (rx_object_resource *)obj;
pthread_mutex_lock(&resource->mutex);
SEXP sexp = resource->sexp;
uint64_t resource_id = resource->id;
if (resource->release_enqueued || sexp == NULL) {
pthread_mutex_unlock(&resource->mutex);
pthread_mutex_destroy(&resource->mutex);
return;
}
r_work *work = calloc(1, sizeof(r_work));
if (work == NULL) {
note_release_failure(resource_id, "failed to allocate native resource release work");
pthread_mutex_unlock(&resource->mutex);
pthread_mutex_destroy(&resource->mutex);
return;
}
resource->release_enqueued = true;
pthread_mutex_unlock(&resource->mutex);
pthread_mutex_destroy(&resource->mutex);
work->kind = WORK_RELEASE;
work->release_sexp = sexp;
work->release_resource_id = resource_id;
work->next = NULL;
if (!ensure_owner_thread()) {
note_release_failure(resource_id, "failed to start native R owner thread for release");
free(work);
return;
}
note_release_enqueued(resource_id);
enqueue_work(work);
}
static ERL_NIF_TERM rx_init(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 3) return enif_make_badarg(env);
r_work work = {.kind = WORK_INIT, .done = false, .ok = false, .next = NULL};
if (!get_string(env, argv[0], &work.lib_r_path) ||
!get_string(env, argv[1], &work.r_home) ||
!get_string_list(env, argv[2], &work.lib_paths, &work.lib_paths_count)) {
free(work.lib_r_path);
free(work.r_home);
free_string_array(work.lib_paths, work.lib_paths_count);
return enif_make_badarg(env);
}
if (!ensure_owner_thread()) {
free(work.lib_r_path);
free(work.r_home);
free_string_array(work.lib_paths, work.lib_paths_count);
return make_error(env, "failed to start embedded R owner thread");
}
pthread_cond_init(&work.done_cond, NULL);
enqueue_work(&work);
pthread_mutex_lock(&queue_mutex);
while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_destroy(&work.done_cond);
free(work.lib_r_path);
free(work.r_home);
free_string_array(work.lib_paths, work.lib_paths_count);
ERL_NIF_TERM result;
if (work.ok) {
result = atom_ok;
} else if (work.init_return == INIT_RETURN_MISMATCH) {
result = make_native_init_mismatch_error(env, &work.init_mismatch);
} else if (work.init_return == INIT_RETURN_FAILED) {
result = make_native_init_failed_error(env, &work.init_failure);
} else {
result = make_error(env, work.error);
}
free_init_mismatch(&work.init_mismatch);
return result;
}
static ERL_NIF_TERM rx_eval_string(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1) return enif_make_badarg(env);
r_work work = {.kind = WORK_EVAL_STRING, .done = false, .ok = false, .next = NULL};
if (!get_string(env, argv[0], &work.source)) {
return enif_make_badarg(env);
}
if (!ensure_owner_thread()) {
free(work.source);
return make_error(env, "failed to start embedded R owner thread");
}
pthread_cond_init(&work.done_cond, NULL);
enqueue_work(&work);
pthread_mutex_lock(&queue_mutex);
while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_destroy(&work.done_cond);
free(work.source);
ERL_NIF_TERM result;
if (work.ok) {
result = make_eval_success(env, &work);
} else if (work.init_return == INIT_RETURN_FAILED) {
result = make_native_init_failed_error(env, &work.init_failure);
} else if (strcmp(work.error, "R evaluation failed") == 0) {
result = make_tagged_error(env, "r_error", work.error);
} else if (strcmp(work.error, "R parse failed") == 0) {
result = make_tagged_error(env, "parse_error", work.error);
} else {
result = make_error(env, work.error);
}
free_result_value(&work);
return result;
}
static ERL_NIF_TERM rx_eval(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 2) return enif_make_badarg(env);
r_work work = {.kind = WORK_EVAL, .done = false, .ok = false, .next = NULL};
bool source_has_nul = false;
if (!get_string_reject_nul(env, argv[0], &work.source, &source_has_nul)) {
return enif_make_badarg(env);
}
if (source_has_nul) {
return make_error(env, "source contains NUL byte");
}
unsigned global_count;
if (!enif_get_list_length(env, argv[1], &global_count)) {
free(work.source);
return enif_make_badarg(env);
}
work.global_count = global_count;
if (global_count > 0) {
work.global_names = calloc(global_count, sizeof(char *));
work.global_resources = calloc(global_count, sizeof(rx_object_resource *));
if (work.global_names == NULL || work.global_resources == NULL) {
free(work.source);
free_string_array(work.global_names, global_count);
free(work.global_resources);
return make_error(env, "failed to allocate native eval globals");
}
}
ERL_NIF_TERM head;
ERL_NIF_TERM tail = argv[1];
for (unsigned i = 0; i < global_count; i++) {
const ERL_NIF_TERM *tuple;
int arity;
if (!enif_get_list_cell(env, tail, &head, &tail) ||
!enif_get_tuple(env, head, &arity, &tuple) ||
arity != 2 ||
!get_global_name(env, tuple[0], &work.global_names[i]) ||
!enif_get_resource(env, tuple[1], object_resource_type, (void **)&work.global_resources[i])) {
free(work.source);
free_string_array(work.global_names, global_count);
release_resource_array(work.global_resources, global_count);
return enif_make_badarg(env);
}
enif_keep_resource(work.global_resources[i]);
}
if (!ensure_owner_thread()) {
free(work.source);
free_string_array(work.global_names, work.global_count);
release_resource_array(work.global_resources, work.global_count);
return make_error(env, "failed to start embedded R owner thread");
}
pthread_cond_init(&work.done_cond, NULL);
enqueue_work(&work);
pthread_mutex_lock(&queue_mutex);
while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_destroy(&work.done_cond);
free(work.source);
free_string_array(work.global_names, work.global_count);
release_resource_array(work.global_resources, work.global_count);
ERL_NIF_TERM result;
if (work.ok) {
result = make_native_eval_success(env, &work);
} else if (work.structured_error) {
result = make_native_eval_error(env, &work);
} else if (work.init_return == INIT_RETURN_FAILED) {
result = make_native_init_failed_error(env, &work.init_failure);
} else if (strcmp(work.error, "R evaluation failed") == 0) {
result = make_tagged_error(env, "r_error", work.error);
} else if (strcmp(work.error, "R parse failed") == 0) {
result = make_tagged_error(env, "parse_error", work.error);
} else {
result = make_error(env, work.error);
}
if (work.result_resource != NULL) enif_release_resource(work.result_resource);
free_eval_result_globals(&work);
free_eval_capture(&work);
return result;
}
static ERL_NIF_TERM rx_print(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 3) return enif_make_badarg(env);
rx_object_resource *resource;
if (!enif_get_resource(env, argv[0], object_resource_type, (void **)&resource)) {
return enif_make_badarg(env);
}
bool has_width = false;
int width = 0;
bool has_max_print = false;
int max_print = 0;
if (!get_optional_int(env, argv[1], &has_width, &width) ||
!get_optional_int(env, argv[2], &has_max_print, &max_print)) {
return enif_make_badarg(env);
}
r_work work = {
.kind = WORK_PRINT,
.done = false,
.ok = false,
.resource = resource,
.has_print_width = has_width,
.print_width = width,
.has_print_max_print = has_max_print,
.print_max_print = max_print,
.next = NULL};
if (!ensure_owner_thread()) {
return make_error(env, "failed to start embedded R owner thread");
}
enif_keep_resource(resource);
pthread_cond_init(&work.done_cond, NULL);
enqueue_work(&work);
pthread_mutex_lock(&queue_mutex);
while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_destroy(&work.done_cond);
enif_release_resource(resource);
ERL_NIF_TERM result;
if (work.ok) {
result = enif_make_tuple2(env, atom_ok, make_output_map(env, &work));
} else if (work.structured_error) {
result = make_native_eval_error(env, &work);
} else if (work.init_return == INIT_RETURN_FAILED) {
result = make_native_init_failed_error(env, &work.init_failure);
} else {
result = make_error(env, work.error);
}
free_eval_capture(&work);
return result;
}
static int get_required_positive_int(ErlNifEnv *env, ERL_NIF_TERM term, int *out) {
if (!get_int32(env, term, out)) return 0;
return *out > 0;
}
static ERL_NIF_TERM rx_plot(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 8) return enif_make_badarg(env);
r_work work = {.kind = WORK_PLOT, .done = false, .ok = false, .next = NULL};
bool source_has_nul = false;
if (!get_string_reject_nul(env, argv[0], &work.source, &source_has_nul)) {
return enif_make_badarg(env);
}
if (source_has_nul) {
return make_error(env, "source contains NUL byte");
}
unsigned global_count;
if (!enif_get_list_length(env, argv[1], &global_count)) {
free(work.source);
return enif_make_badarg(env);
}
work.global_count = global_count;
if (global_count > 0) {
work.global_names = calloc(global_count, sizeof(char *));
work.global_resources = calloc(global_count, sizeof(rx_object_resource *));
if (work.global_names == NULL || work.global_resources == NULL) {
free(work.source);
free_string_array(work.global_names, global_count);
free(work.global_resources);
return make_error(env, "failed to allocate native plot globals");
}
}
ERL_NIF_TERM head;
ERL_NIF_TERM tail = argv[1];
for (unsigned i = 0; i < global_count; i++) {
const ERL_NIF_TERM *tuple;
int arity;
if (!enif_get_list_cell(env, tail, &head, &tail) ||
!enif_get_tuple(env, head, &arity, &tuple) ||
arity != 2 ||
!get_global_name(env, tuple[0], &work.global_names[i]) ||
!enif_get_resource(env, tuple[1], object_resource_type, (void **)&work.global_resources[i])) {
free(work.source);
free_string_array(work.global_names, global_count);
release_resource_array(work.global_resources, global_count);
return enif_make_badarg(env);
}
enif_keep_resource(work.global_resources[i]);
}
if (!get_required_positive_int(env, argv[2], &work.plot_width) ||
!get_required_positive_int(env, argv[3], &work.plot_height) ||
!get_required_positive_int(env, argv[4], &work.plot_res) ||
!get_required_positive_int(env, argv[5], &work.plot_pointsize) ||
!get_required_positive_int(env, argv[6], &work.plot_max_pages) ||
!get_required_positive_int(env, argv[7], &work.plot_max_bytes)) {
free(work.source);
free_string_array(work.global_names, work.global_count);
release_resource_array(work.global_resources, work.global_count);
return enif_make_badarg(env);
}
if (!ensure_owner_thread()) {
free(work.source);
free_string_array(work.global_names, work.global_count);
release_resource_array(work.global_resources, work.global_count);
return make_error(env, "failed to start embedded R owner thread");
}
pthread_cond_init(&work.done_cond, NULL);
enqueue_work(&work);
pthread_mutex_lock(&queue_mutex);
while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_destroy(&work.done_cond);
free(work.source);
free_string_array(work.global_names, work.global_count);
release_resource_array(work.global_resources, work.global_count);
ERL_NIF_TERM result;
if (work.ok) {
result = make_native_plot_success(env, &work);
} else if (work.structured_error) {
result = make_native_eval_error(env, &work);
} else if (work.init_return == INIT_RETURN_FAILED) {
result = make_native_init_failed_error(env, &work.init_failure);
} else {
result = make_error(env, work.error);
}
free_eval_capture(&work);
free_plot_pages(&work);
return result;
}
static ERL_NIF_TERM rx_encode(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 2) return enif_make_badarg(env);
encode_value encoded;
if (!parse_encode_value(env, argv[0], argv[1], &encoded)) {
free_encode_value(&encoded);
return enif_make_badarg(env);
}
rx_object_resource *resource = enif_alloc_resource(object_resource_type, sizeof(rx_object_resource));
if (resource == NULL) {
free_encode_value(&encoded);
return make_error(env, "failed to allocate native R object resource");
}
resource->sexp = NULL;
resource->id = reserve_object_id();
resource->kind = encode_kind_to_resource_kind(encoded.kind);
resource->release_enqueued = false;
pthread_mutex_init(&resource->mutex, NULL);
r_work work = {
.kind = WORK_ENCODE_RESOURCE,
.done = false,
.ok = false,
.encode = encoded,
.resource = resource,
.next = NULL};
if (!ensure_owner_thread()) {
enif_release_resource(resource);
free_encode_value(&work.encode);
return make_error(env, "failed to start embedded R owner thread");
}
pthread_cond_init(&work.done_cond, NULL);
enqueue_work(&work);
pthread_mutex_lock(&queue_mutex);
while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_destroy(&work.done_cond);
if (!work.ok) {
enif_release_resource(resource);
free_encode_value(&work.encode);
if (work.init_return == INIT_RETURN_FAILED) {
return make_native_init_failed_error(env, &work.init_failure);
}
return make_error(env, work.error);
}
ERL_NIF_TERM resource_term = enif_make_resource(env, resource);
enif_release_resource(resource);
free_encode_value(&work.encode);
return enif_make_tuple2(env, atom_ok, resource_term);
}
static ERL_NIF_TERM rx_decode(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1) return enif_make_badarg(env);
rx_object_resource *resource;
if (!enif_get_resource(env, argv[0], object_resource_type, (void **)&resource)) {
return enif_make_badarg(env);
}
r_work work = {
.kind = WORK_DECODE_RESOURCE,
.done = false,
.ok = false,
.resource = resource,
.next = NULL};
if (!ensure_owner_thread()) {
return make_error(env, "failed to start embedded R owner thread");
}
enif_keep_resource(resource);
pthread_cond_init(&work.done_cond, NULL);
enqueue_work(&work);
pthread_mutex_lock(&queue_mutex);
while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_destroy(&work.done_cond);
enif_release_resource(resource);
ERL_NIF_TERM result;
if (work.ok) {
result = make_eval_success(env, &work);
} else if (work.init_return == INIT_RETURN_FAILED) {
result = make_native_init_failed_error(env, &work.init_failure);
} else {
result = make_error(env, work.error);
}
free_result_value(&work);
return result;
}
static ERL_NIF_TERM rx_encode_dataframe(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1) return enif_make_badarg(env);
r_work work = {
.kind = WORK_ENCODE_DATAFRAME,
.done = false,
.ok = false,
.next = NULL};
if (!copy_binary_bytes(env, argv[0], &work.ipc_bytes, &work.ipc_len)) {
return enif_make_badarg(env);
}
if (!ensure_owner_thread()) {
free(work.ipc_bytes);
return make_error(env, "failed to start embedded R owner thread");
}
pthread_cond_init(&work.done_cond, NULL);
enqueue_work(&work);
pthread_mutex_lock(&queue_mutex);
while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_destroy(&work.done_cond);
free(work.ipc_bytes);
if (!work.ok) {
if (work.result_resource != NULL) enif_release_resource(work.result_resource);
if (work.init_return == INIT_RETURN_FAILED) {
return make_native_init_failed_error(env, &work.init_failure);
}
if (strncmp(work.error, "missing_r_package: ", 19) == 0) {
return make_tagged_error(env, "missing_r_package", work.error + 19);
}
return make_error(env, work.error);
}
ERL_NIF_TERM resource_term = enif_make_resource(env, work.result_resource);
enif_release_resource(work.result_resource);
return enif_make_tuple2(env, atom_ok, resource_term);
}
static ERL_NIF_TERM rx_encode_data_frame(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1) return enif_make_badarg(env);
dataframe_wire *wire = NULL;
if (!parse_dataframe_wire(env, argv[0], &wire)) {
return make_dataframe_error(env, "invalid_dataframe: malformed_wire");
}
r_work work = {
.kind = WORK_ENCODE_DATA_FRAME,
.done = false,
.ok = false,
.input_dataframe = wire,
.next = NULL};
if (!ensure_owner_thread()) {
free_dataframe_wire(wire);
return make_error(env, "failed to start embedded R owner thread");
}
pthread_cond_init(&work.done_cond, NULL);
enqueue_work(&work);
pthread_mutex_lock(&queue_mutex);
while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_destroy(&work.done_cond);
free_dataframe_wire(wire);
if (!work.ok) {
if (work.result_resource != NULL) enif_release_resource(work.result_resource);
if (work.init_return == INIT_RETURN_FAILED) {
return make_native_init_failed_error(env, &work.init_failure);
}
return make_dataframe_error(env, work.error);
}
ERL_NIF_TERM resource_term = enif_make_resource(env, work.result_resource);
enif_release_resource(work.result_resource);
return enif_make_tuple2(env, atom_ok, resource_term);
}
static ERL_NIF_TERM rx_decode_data_frame(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 2) return enif_make_badarg(env);
rx_object_resource *resource;
if (!enif_get_resource(env, argv[0], object_resource_type, (void **)&resource)) {
return enif_make_badarg(env);
}
bool has_max_rows = false;
unsigned max_rows = 0;
if (!parse_dataframe_opts(env, argv[1], &has_max_rows, &max_rows)) {
return enif_make_badarg(env);
}
r_work work = {
.kind = WORK_DECODE_DATA_FRAME,
.done = false,
.ok = false,
.resource = resource,
.has_max_rows = has_max_rows,
.max_rows = max_rows,
.next = NULL};
if (!ensure_owner_thread()) {
return make_error(env, "failed to start embedded R owner thread");
}
enif_keep_resource(resource);
pthread_cond_init(&work.done_cond, NULL);
enqueue_work(&work);
pthread_mutex_lock(&queue_mutex);
while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_destroy(&work.done_cond);
enif_release_resource(resource);
ERL_NIF_TERM result;
if (work.ok) {
ERL_NIF_TERM wire_term;
result = make_dataframe_wire_term(env, work.output_dataframe, &wire_term)
? enif_make_tuple2(env, atom_ok, wire_term)
: make_error(env, "failed to allocate dataframe wire result");
} else if (work.init_return == INIT_RETURN_FAILED) {
result = make_native_init_failed_error(env, &work.init_failure);
} else {
result = make_dataframe_error(env, work.error);
}
free_dataframe_wire(work.output_dataframe);
return result;
}
static ERL_NIF_TERM rx_decode_arrow(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
if (argc != 1) return enif_make_badarg(env);
rx_object_resource *resource;
if (!enif_get_resource(env, argv[0], object_resource_type, (void **)&resource)) {
return enif_make_badarg(env);
}
r_work work = {
.kind = WORK_DECODE_ARROW,
.done = false,
.ok = false,
.resource = resource,
.next = NULL};
if (!ensure_owner_thread()) {
return make_error(env, "failed to start embedded R owner thread");
}
enif_keep_resource(resource);
pthread_cond_init(&work.done_cond, NULL);
enqueue_work(&work);
pthread_mutex_lock(&queue_mutex);
while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
pthread_mutex_unlock(&queue_mutex);
pthread_cond_destroy(&work.done_cond);
enif_release_resource(resource);
ERL_NIF_TERM result;
if (work.ok) {
bool ok = false;
ERL_NIF_TERM binary = make_binary_term(env, work.output_bytes, work.output_len, &ok);
result = ok ? enif_make_tuple2(env, atom_ok, binary) : make_error(env, "failed to allocate Arrow IPC binary");
} else if (work.init_return == INIT_RETURN_FAILED) {
result = make_native_init_failed_error(env, &work.init_failure);
} else if (strncmp(work.error, "missing_r_package: ", 19) == 0) {
result = make_tagged_error(env, "missing_r_package", work.error + 19);
} else if (strncmp(work.error, "not_dataframe: ", 15) == 0) {
result = make_tagged_error(env, "not_dataframe", work.error + 15);
} else {
result = make_error(env, work.error);
}
free(work.output_bytes);
return result;
}
static ERL_NIF_TERM rx_owner_thread_id(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
(void)argv;
if (argc != 0) return enif_make_badarg(env);
return enif_make_uint64(env, read_diagnostic_uint64(&owner_thread_id_value));
}
static ERL_NIF_TERM rx_release_count(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
(void)argv;
if (argc != 0) return enif_make_badarg(env);
return enif_make_uint64(env, read_diagnostic_uint64(&release_count));
}
static ERL_NIF_TERM rx_last_release_thread_id(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
(void)argv;
if (argc != 0) return enif_make_badarg(env);
return enif_make_uint64(env, read_diagnostic_uint64(&last_release_thread_id_value));
}
static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) {
(void)priv_data;
(void)load_info;
object_resource_type = enif_open_resource_type(
env,
NULL,
"rx_object_resource",
rx_object_resource_dtor,
ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER,
NULL);
if (object_resource_type == NULL) return 1;
atom_ok = enif_make_atom(env, "ok");
atom_error = enif_make_atom(env, "error");
return 0;
}
static ErlNifFunc funcs[] = {
{"init", 3, rx_init, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"eval_string", 1, rx_eval_string, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"eval", 2, rx_eval, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"print", 3, rx_print, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"plot", 8, rx_plot, ERL_NIF_DIRTY_JOB_IO_BOUND},
{"encode", 2, rx_encode, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"decode", 1, rx_decode, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"encode_dataframe", 1, rx_encode_dataframe, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"encode_data_frame", 1, rx_encode_data_frame, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"decode_data_frame", 2, rx_decode_data_frame, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"decode_arrow", 1, rx_decode_arrow, ERL_NIF_DIRTY_JOB_CPU_BOUND},
{"diagnostics", 0, rx_diagnostics, 0},
{"owner_thread_id", 0, rx_owner_thread_id, 0},
{"release_count", 0, rx_release_count, 0},
{"last_release_thread_id", 0, rx_last_release_thread_id, 0},
};
ERL_NIF_INIT(Elixir.Rx.Native, funcs, load, NULL, NULL, NULL)