Skip to main content

native/c_src/rx_nif.c

#define _GNU_SOURCE

#include "erl_nif.h"
#include <Rembedded.h>
#include <Rinternals.h>
#include <R_ext/Parse.h>
#include <dlfcn.h>
#include <limits.h>
#include <math.h>
#include <pthread.h>
#include <signal.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

typedef int (*Rf_initEmbeddedR_fn)(int, char **);
typedef int (*Rf_initialize_R_fn)(int, char **);
typedef void (*setup_Rmainloop_fn)(void);
typedef SEXP (*Rf_mkString_fn)(const char *);
typedef SEXP (*Rf_protect_fn)(SEXP);
typedef void (*Rf_unprotect_fn)(int);
typedef SEXP (*R_ParseVector_fn)(SEXP, int, ParseStatus *, SEXP);
typedef SEXP (*R_tryEval_fn)(SEXP, SEXP, int *);
typedef Rboolean (*R_ToplevelExec_fn)(void (*)(void *), void *);
typedef R_xlen_t (*Rf_xlength_fn)(SEXP);
typedef int (*LENGTH_fn)(SEXP);
typedef int (*TYPEOF_fn)(SEXP);
typedef SEXP (*VECTOR_ELT_fn)(SEXP, R_xlen_t);
typedef int (*INTEGER_ELT_fn)(SEXP, R_xlen_t);
typedef double (*REAL_ELT_fn)(SEXP, R_xlen_t);
typedef int (*LOGICAL_ELT_fn)(SEXP, R_xlen_t);
typedef SEXP (*STRING_ELT_fn)(SEXP, R_xlen_t);
typedef const char *(*R_CHAR_fn)(SEXP);
typedef void (*R_PreserveObject_fn)(SEXP);
typedef void (*R_ReleaseObject_fn)(SEXP);
typedef SEXP (*Rf_ScalarReal_fn)(double);
typedef SEXP (*Rf_ScalarInteger_fn)(int);
typedef SEXP (*Rf_ScalarLogical_fn)(int);
typedef SEXP (*Rf_mkCharLenCE_fn)(const char *, int, cetype_t);
typedef SEXP (*Rf_ScalarString_fn)(SEXP);
typedef SEXP (*Rf_allocVector_fn)(SEXPTYPE, R_xlen_t);
typedef Rbyte *(*RAW_fn)(SEXP);
typedef void (*SET_INTEGER_ELT_fn)(SEXP, R_xlen_t, int);
typedef void (*SET_REAL_ELT_fn)(SEXP, R_xlen_t, double);
typedef void (*SET_LOGICAL_ELT_fn)(SEXP, R_xlen_t, int);
typedef void (*SET_STRING_ELT_fn)(SEXP, R_xlen_t, SEXP);
typedef SEXP (*SET_VECTOR_ELT_fn)(SEXP, R_xlen_t, SEXP);
typedef SEXP (*Rf_setAttrib_fn)(SEXP, SEXP, SEXP);
typedef SEXP (*R_NewEnv_fn)(SEXP, int, int);
typedef SEXP (*Rf_install_fn)(const char *);
typedef void (*Rf_defineVar_fn)(SEXP, SEXP, SEXP);
typedef SEXP (*Rf_findVarInFrame_fn)(SEXP, SEXP);
typedef SEXP (*R_lsInternal_fn)(SEXP, Rboolean);
typedef SEXP (*Rf_lang1_fn)(SEXP);
typedef SEXP (*Rf_lang2_fn)(SEXP, SEXP);
typedef SEXP (*Rf_lang3_fn)(SEXP, SEXP, SEXP);
typedef SEXP (*Rf_lang4_fn)(SEXP, SEXP, SEXP, SEXP);
typedef SEXP (*Rf_getAttrib_fn)(SEXP, SEXP);
typedef Rboolean (*Rf_inherits_fn)(SEXP, const char *);
typedef void (*ptr_R_WriteConsole_fn)(const char *, int);
typedef void (*ptr_R_WriteConsoleEx_fn)(const char *, int, int);

#define RX_DECODE_MAX_DEPTH 100

typedef enum {
  WORK_INIT,
  WORK_EVAL_STRING,
  WORK_EVAL,
  WORK_PRINT,
  WORK_PLOT,
  WORK_ENCODE_RESOURCE,
  WORK_DECODE_RESOURCE,
  WORK_ENCODE_DATAFRAME,
  WORK_ENCODE_DATA_FRAME,
  WORK_DECODE_DATA_FRAME,
  WORK_DECODE_ARROW,
  WORK_RELEASE
} work_kind;

typedef enum {
  RESULT_NONE,
  RESULT_NULL,
  RESULT_INTEGER,
  RESULT_DOUBLE,
  RESULT_LOGICAL,
  RESULT_CHARACTER,
  RESULT_INTEGER_VECTOR,
  RESULT_DOUBLE_VECTOR,
  RESULT_LOGICAL_VECTOR,
  RESULT_CHARACTER_VECTOR,
  RESULT_UNSUPPORTED
} r_result_kind;

typedef enum {
  ENCODE_KIND_NULL = 1,
  ENCODE_KIND_LOGICAL,
  ENCODE_KIND_INTEGER,
  ENCODE_KIND_DOUBLE,
  ENCODE_KIND_CHARACTER,
  ENCODE_KIND_LOGICAL_VECTOR,
  ENCODE_KIND_INTEGER_VECTOR,
  ENCODE_KIND_DOUBLE_VECTOR,
  ENCODE_KIND_CHARACTER_VECTOR,
  ENCODE_KIND_NA,
  ENCODE_KIND_NAMED_LIST
} encode_kind;

typedef enum {
  RESOURCE_KIND_NULL = 1,
  RESOURCE_KIND_LOGICAL,
  RESOURCE_KIND_INTEGER,
  RESOURCE_KIND_DOUBLE,
  RESOURCE_KIND_CHARACTER,
  RESOURCE_KIND_LOGICAL_VECTOR,
  RESOURCE_KIND_INTEGER_VECTOR,
  RESOURCE_KIND_DOUBLE_VECTOR,
  RESOURCE_KIND_CHARACTER_VECTOR,
  RESOURCE_KIND_GENERIC,
  RESOURCE_KIND_DATAFRAME
} resource_kind;

typedef enum {
  DECODE_VALUE_NULL,
  DECODE_VALUE_LOGICAL,
  DECODE_VALUE_INTEGER,
  DECODE_VALUE_DOUBLE,
  DECODE_VALUE_CHARACTER,
  DECODE_VALUE_NA,
  DECODE_VALUE_VECTOR,
  DECODE_VALUE_NAMED_LIST,
  DECODE_VALUE_R_LIST,
  DECODE_VALUE_OBJECT
} decoded_value_kind;

typedef enum {
  DECODE_ATOMIC_LOGICAL,
  DECODE_ATOMIC_INTEGER,
  DECODE_ATOMIC_DOUBLE,
  DECODE_ATOMIC_CHARACTER
} decoded_atomic_type;

typedef enum {
  INIT_STATE_UNINITIALIZED,
  INIT_STATE_INITIALIZED,
  INIT_STATE_FAILED
} init_state_tag;

typedef enum {
  INIT_RETURN_OK,
  INIT_RETURN_PLAIN_ERROR,
  INIT_RETURN_MISMATCH,
  INIT_RETURN_FAILED
} init_return_kind;

typedef struct init_config {
  char *r_home;
  char *lib_r_path;
  char **lib_paths;
  unsigned lib_paths_count;
} init_config;

typedef struct init_failure_info {
  char stage[64];
  char message[1024];
  bool retryable;
  bool restart_required;
} init_failure_info;

typedef struct init_mismatch_info {
  bool r_home;
  bool lib_r_path;
  bool lib_paths;
  char message[1024];
  init_config current;
  init_config requested;
} init_mismatch_info;

typedef struct rx_object_resource rx_object_resource;

typedef struct encode_value {
  encode_kind kind;
  R_xlen_t length;
  bool logical_scalar;
  int integer_scalar;
  double double_scalar;
  char *string_scalar;
  size_t string_scalar_len;
  bool *logical_values;
  int *integer_values;
  double *double_values;
  char **string_values;
  size_t *string_value_lens;
  decoded_atomic_type na_type;
  char **entry_names;
  rx_object_resource **entry_resources;
} encode_value;

struct rx_object_resource {
  SEXP sexp;
  uint64_t id;
  resource_kind kind;
  bool release_enqueued;
  pthread_mutex_t mutex;
};

typedef struct decoded_value decoded_value;

typedef struct decoded_entry {
  char *name;
  decoded_value *value;
} decoded_entry;

struct decoded_value {
  decoded_value_kind kind;
  decoded_atomic_type atomic_type;
  bool logical_value;
  int integer_value;
  double double_value;
  char *string_value;
  size_t string_value_len;
  decoded_value **items;
  decoded_entry *entries;
  unsigned count;
  rx_object_resource *resource;
};

typedef enum {
  DATAFRAME_COLUMN_LOGICAL,
  DATAFRAME_COLUMN_INTEGER,
  DATAFRAME_COLUMN_DOUBLE,
  DATAFRAME_COLUMN_CHARACTER
} dataframe_column_type;

typedef struct dataframe_cell {
  bool is_na;
  bool logical_value;
  int integer_value;
  double double_value;
  char *string_value;
  size_t string_value_len;
} dataframe_cell;

typedef struct dataframe_column {
  char *name;
  dataframe_column_type type;
  dataframe_cell *values;
  unsigned value_count;
} dataframe_column;

typedef struct dataframe_wire {
  char **names;
  unsigned name_count;
  unsigned n_rows;
  dataframe_column *columns;
  unsigned column_count;
} dataframe_wire;

typedef struct byte_buffer {
  char *data;
  size_t len;
  size_t cap;
} byte_buffer;

typedef struct r_work {
  work_kind kind;
  char *lib_r_path;
  char *r_home;
  char **lib_paths;
  unsigned lib_paths_count;
  char *source;
  char *ipc_bytes;
  size_t ipc_len;
  char *output_bytes;
  size_t output_len;
  dataframe_wire *input_dataframe;
  dataframe_wire *output_dataframe;
  bool has_max_rows;
  unsigned max_rows;
  encode_value encode;
  rx_object_resource *resource;
  bool has_print_width;
  int print_width;
  bool has_print_max_print;
  int print_max_print;
  int plot_width;
  int plot_height;
  int plot_res;
  int plot_pointsize;
  int plot_max_pages;
  int plot_max_bytes;
  char **plot_pages;
  size_t *plot_page_lens;
  unsigned plot_page_count;
  char **global_names;
  rx_object_resource **global_resources;
  unsigned global_count;
  rx_object_resource *result_resource;
  char **result_global_names;
  rx_object_resource **result_global_resources;
  unsigned result_global_count;
  decoded_value *decoded_result;
  byte_buffer stdout_buffer;
  byte_buffer messages_buffer;
  byte_buffer warnings_buffer;
  bool structured_error;
  char error_message[1024];
  char **error_classes;
  unsigned error_class_count;
  char error_call[512];
  SEXP release_sexp;
  uint64_t release_resource_id;
  r_result_kind result_kind;
  R_xlen_t result_length;
  int integer_result;
  double double_result;
  bool logical_result;
  char *string_result;
  size_t string_result_len;
  int *integer_values;
  double *double_values;
  bool *logical_values;
  char **string_values;
  size_t *string_value_lens;
  int unsupported_type;
  bool done;
  bool ok;
  char error[1024];
  init_return_kind init_return;
  init_failure_info init_failure;
  init_mismatch_info init_mismatch;
  pthread_cond_t done_cond;
  struct r_work *next;
} r_work;

typedef struct r_api {
  Rf_mkString_fn mk_string;
  Rf_protect_fn protect;
  Rf_unprotect_fn unprotect;
  R_ParseVector_fn parse_vector;
  R_tryEval_fn try_eval;
  R_ToplevelExec_fn toplevel_exec;
  Rf_xlength_fn xlength_fn;
  LENGTH_fn length_fn;
  TYPEOF_fn type_of;
  VECTOR_ELT_fn vector_elt;
  INTEGER_ELT_fn integer_elt;
  REAL_ELT_fn real_elt;
  LOGICAL_ELT_fn logical_elt;
  STRING_ELT_fn string_elt;
  R_CHAR_fn r_char;
  R_PreserveObject_fn preserve_object;
  R_ReleaseObject_fn release_object;
  Rf_ScalarReal_fn scalar_real;
  Rf_ScalarInteger_fn scalar_integer;
  Rf_ScalarLogical_fn scalar_logical;
  Rf_mkCharLenCE_fn mk_char_len_ce;
  Rf_ScalarString_fn scalar_string;
  Rf_allocVector_fn alloc_vector;
  RAW_fn raw;
  SET_INTEGER_ELT_fn set_integer_elt;
  SET_REAL_ELT_fn set_real_elt;
  SET_LOGICAL_ELT_fn set_logical_elt;
  SET_STRING_ELT_fn set_string_elt;
  SET_VECTOR_ELT_fn set_vector_elt;
  Rf_setAttrib_fn set_attrib;
  R_NewEnv_fn new_env;
  Rf_install_fn install;
  Rf_defineVar_fn define_var;
  Rf_findVarInFrame_fn find_var_in_frame;
  R_lsInternal_fn ls_internal;
  Rf_lang1_fn lang1;
  Rf_lang2_fn lang2;
  Rf_lang3_fn lang3;
  Rf_lang4_fn lang4;
  Rf_getAttrib_fn get_attrib;
  Rf_inherits_fn inherits;
  SEXP *nil_value;
  SEXP *global_env;
  SEXP *names_symbol;
  SEXP *class_symbol;
  SEXP *dim_symbol;
  SEXP *na_string;
  int *na_int;
  double *na_real;
} r_api;

static ERL_NIF_TERM atom_ok;
static ERL_NIF_TERM atom_error;
static ErlNifResourceType *object_resource_type = NULL;

static pthread_mutex_t queue_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
static r_work *queue_head = NULL;
static r_work *queue_tail = NULL;
static pthread_mutex_t owner_start_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_t owner_thread;
static bool owner_started = false;
static bool runtime_initialized = false;
static void *lib_r_handle = NULL;
static r_api r = {0};
static ptr_R_WriteConsole_fn *r_write_console = NULL;
static ptr_R_WriteConsoleEx_fn *r_write_console_ex = NULL;
static __thread r_work *current_capture_work = NULL;
static SEXP eval_helper = NULL;
static SEXP plot_helper = NULL;

static pthread_mutex_t diagnostics_mutex = PTHREAD_MUTEX_INITIALIZER;
static uint64_t next_object_id = 1;
static init_state_tag init_state = INIT_STATE_UNINITIALIZED;
static init_config successful_init_config = {0};
static init_config attempted_init_config = {0};
static init_failure_info last_init_error = {0};
static bool has_last_init_error = false;
static uint64_t init_attempt_count = 0;
static uint64_t init_mismatch_count = 0;
static uint64_t release_count = 0;
static uint64_t release_enqueued_count = 0;
static uint64_t release_success_count = 0;
static uint64_t release_failure_count = 0;
static uint64_t release_skipped_uninitialized_count = 0;
static uint64_t owner_thread_id_value = 0;
static uint64_t last_release_thread_id_value = 0;
static uint64_t last_release_resource_id_value = 0;
static bool has_last_release_resource_id = false;
static char last_release_error[1024] = "";

static uint64_t current_thread_id(void) {
  return (uint64_t)(uintptr_t)pthread_self();
}

static int native_debug_enabled(void) {
  const char *value = getenv("RX_NATIVE_DEBUG");
  return value != NULL && value[0] != '\0' && strcmp(value, "0") != 0;
}

#define NDBG(fmt, ...) do { \
  if (native_debug_enabled()) { \
    fprintf(stderr, "[rx_native tid=%llu] " fmt "\n", \
            (unsigned long long)current_thread_id(), ##__VA_ARGS__); \
    fflush(stderr); \
  } \
} while (0)

static uint64_t reserve_object_id(void) {
  pthread_mutex_lock(&diagnostics_mutex);
  uint64_t id = next_object_id++;
  pthread_mutex_unlock(&diagnostics_mutex);
  return id;
}

static void set_owner_thread_id(uint64_t id) {
  pthread_mutex_lock(&diagnostics_mutex);
  owner_thread_id_value = id;
  pthread_mutex_unlock(&diagnostics_mutex);
}

static void note_release_on_owner_thread(uint64_t resource_id) {
  pthread_mutex_lock(&diagnostics_mutex);
  release_count++;
  release_success_count++;
  last_release_thread_id_value = current_thread_id();
  last_release_resource_id_value = resource_id;
  has_last_release_resource_id = true;
  last_release_error[0] = '\0';
  pthread_mutex_unlock(&diagnostics_mutex);
}

static void note_release_enqueued(uint64_t resource_id) {
  pthread_mutex_lock(&diagnostics_mutex);
  release_enqueued_count++;
  last_release_resource_id_value = resource_id;
  has_last_release_resource_id = true;
  last_release_error[0] = '\0';
  pthread_mutex_unlock(&diagnostics_mutex);
}

static void note_release_skipped_uninitialized(uint64_t resource_id, const char *message) {
  pthread_mutex_lock(&diagnostics_mutex);
  release_skipped_uninitialized_count++;
  last_release_thread_id_value = current_thread_id();
  last_release_resource_id_value = resource_id;
  has_last_release_resource_id = true;
  snprintf(last_release_error, sizeof(last_release_error), "%s", message);
  pthread_mutex_unlock(&diagnostics_mutex);
}

static void note_release_failure(uint64_t resource_id, const char *message) {
  pthread_mutex_lock(&diagnostics_mutex);
  release_failure_count++;
  last_release_thread_id_value = current_thread_id();
  last_release_resource_id_value = resource_id;
  has_last_release_resource_id = true;
  snprintf(last_release_error, sizeof(last_release_error), "%s", message);
  pthread_mutex_unlock(&diagnostics_mutex);
}

static uint64_t read_diagnostic_uint64(uint64_t *value) {
  pthread_mutex_lock(&diagnostics_mutex);
  uint64_t result = *value;
  pthread_mutex_unlock(&diagnostics_mutex);
  return result;
}

static void set_error(r_work *work, const char *message) {
  snprintf(work->error, sizeof(work->error), "%s", message);
  work->ok = false;
}

static void fill_init_failure(init_failure_info *failure, const char *stage, const char *message, bool retryable) {
  snprintf(failure->stage, sizeof(failure->stage), "%s", stage);
  snprintf(failure->message, sizeof(failure->message), "%s", message);
  failure->retryable = retryable;
  failure->restart_required = !retryable;
}

static void record_init_failure(const init_failure_info *failure) {
  pthread_mutex_lock(&diagnostics_mutex);
  init_state = INIT_STATE_FAILED;
  last_init_error = *failure;
  has_last_init_error = true;
  pthread_mutex_unlock(&diagnostics_mutex);
}

static void set_init_failed(r_work *work, const char *stage, const char *message, bool retryable) {
  fill_init_failure(&work->init_failure, stage, message, retryable);
  record_init_failure(&work->init_failure);
  work->ok = false;
  work->init_return = INIT_RETURN_FAILED;
}

static bool copy_terminal_init_failure(init_failure_info *out) {
  bool terminal = false;
  pthread_mutex_lock(&diagnostics_mutex);
  if (init_state == INIT_STATE_FAILED && has_last_init_error && !last_init_error.retryable) {
    *out = last_init_error;
    terminal = true;
  }
  pthread_mutex_unlock(&diagnostics_mutex);
  return terminal;
}

static bool set_terminal_init_failure_if_present(r_work *work) {
  init_failure_info failure;
  if (!copy_terminal_init_failure(&failure)) return false;
  work->init_failure = failure;
  work->init_return = INIT_RETURN_FAILED;
  work->ok = false;
  return true;
}

static bool crash_repro_enabled(void) {
  const char *value = getenv("RX_CRASH_REPRO");
  return value != NULL && strcmp(value, "1") == 0;
}

static bool init_fault_stage_enabled(const char *stage) {
  const char *value = getenv("RX_NATIVE_FAULT_INIT_STAGE");
  return crash_repro_enabled() && value != NULL && strcmp(value, stage) == 0;
}

static bool release_fault_enabled(const char *mode) {
  const char *value = getenv("RX_NATIVE_FAULT_RELEASE");
  return crash_repro_enabled() && value != NULL && strcmp(value, mode) == 0;
}

static void close_uninitialized_lib_r_handle(void) {
  if (lib_r_handle != NULL) {
    dlclose(lib_r_handle);
    lib_r_handle = NULL;
  }

  memset(&r, 0, sizeof(r));
  r_write_console = NULL;
  r_write_console_ex = NULL;
}

static int get_string(ErlNifEnv *env, ERL_NIF_TERM term, char **out) {
  ErlNifBinary bin;
  if (enif_inspect_binary(env, term, &bin)) {
    *out = malloc(bin.size + 1);
    if (*out == NULL) return 0;
    memcpy(*out, bin.data, bin.size);
    (*out)[bin.size] = '\0';
    return 1;
  }

  int size = enif_get_string(env, term, NULL, 0, ERL_NIF_LATIN1);
  if (size <= 0) return 0;
  *out = malloc((size_t)size);
  if (*out == NULL) return 0;
  return enif_get_string(env, term, *out, (unsigned)size, ERL_NIF_LATIN1) > 0;
}

static int get_string_reject_nul(ErlNifEnv *env, ERL_NIF_TERM term, char **out, bool *has_nul) {
  *has_nul = false;
  *out = NULL;

  ErlNifBinary bin;
  if (enif_inspect_binary(env, term, &bin)) {
    if (memchr(bin.data, '\0', bin.size) != NULL) {
      *has_nul = true;
      return 1;
    }

    *out = malloc(bin.size + 1);
    if (*out == NULL) return 0;
    memcpy(*out, bin.data, bin.size);
    (*out)[bin.size] = '\0';
    return 1;
  }

  int size = enif_get_string(env, term, NULL, 0, ERL_NIF_LATIN1);
  if (size <= 0) return 0;
  *out = malloc((size_t)size);
  if (*out == NULL) return 0;

  if (enif_get_string(env, term, *out, (unsigned)size, ERL_NIF_LATIN1) <= 0) {
    free(*out);
    *out = NULL;
    return 0;
  }

  if (memchr(*out, '\0', (size_t)size - 1) != NULL) {
    free(*out);
    *out = NULL;
    *has_nul = true;
  }

  return 1;
}

static int get_global_name(ErlNifEnv *env, ERL_NIF_TERM term, char **out) {
  ErlNifBinary bin;
  if (!enif_inspect_binary(env, term, &bin)) return 0;

  for (size_t i = 0; i < bin.size; i++) {
    if (bin.data[i] == '\0') return 0;
  }

  *out = malloc(bin.size + 1);
  if (*out == NULL) return 0;
  memcpy(*out, bin.data, bin.size);
  (*out)[bin.size] = '\0';
  return 1;
}

static int get_optional_int(ErlNifEnv *env, ERL_NIF_TERM term, bool *has_value, int *out) {
  if (enif_is_identical(term, enif_make_atom(env, "nil"))) {
    *has_value = false;
    *out = 0;
    return 1;
  }

  int value = 0;
  if (!enif_get_int(env, term, &value)) return 0;

  *has_value = true;
  *out = value;
  return 1;
}

static void free_string_array(char **items, unsigned count) {
  if (items == NULL) return;
  for (unsigned i = 0; i < count; i++) free(items[i]);
  free(items);
}

static void init_config_clear(init_config *config) {
  if (config == NULL) return;
  free(config->r_home);
  free(config->lib_r_path);
  free_string_array(config->lib_paths, config->lib_paths_count);
  memset(config, 0, sizeof(*config));
}

static bool init_config_copy(init_config *dest, const char *lib_r_path, const char *r_home, char **lib_paths, unsigned lib_paths_count) {
  init_config_clear(dest);

  dest->lib_r_path = strdup(lib_r_path);
  dest->r_home = strdup(r_home);
  dest->lib_paths_count = lib_paths_count;

  if (dest->lib_r_path == NULL || dest->r_home == NULL) {
    init_config_clear(dest);
    return false;
  }

  if (lib_paths_count > 0) {
    dest->lib_paths = calloc(lib_paths_count, sizeof(char *));
    if (dest->lib_paths == NULL) {
      init_config_clear(dest);
      return false;
    }

    for (unsigned i = 0; i < lib_paths_count; i++) {
      dest->lib_paths[i] = strdup(lib_paths[i]);
      if (dest->lib_paths[i] == NULL) {
        init_config_clear(dest);
        return false;
      }
    }
  }

  return true;
}

static bool init_config_copy_from_work(init_config *dest, r_work *work) {
  return init_config_copy(dest, work->lib_r_path, work->r_home, work->lib_paths, work->lib_paths_count);
}

static bool init_config_equal(const init_config *a, const init_config *b) {
  if (strcmp(a->r_home, b->r_home) != 0) return false;
  if (strcmp(a->lib_r_path, b->lib_r_path) != 0) return false;
  if (a->lib_paths_count != b->lib_paths_count) return false;

  for (unsigned i = 0; i < a->lib_paths_count; i++) {
    if (strcmp(a->lib_paths[i], b->lib_paths[i]) != 0) return false;
  }

  return true;
}

static bool fill_init_mismatch(init_mismatch_info *mismatch, const init_config *current, const init_config *requested) {
  memset(mismatch, 0, sizeof(*mismatch));
  snprintf(mismatch->message, sizeof(mismatch->message),
           "embedded native R is already initialized with different native init options; restart the BEAM to change native init options");

  mismatch->r_home = strcmp(current->r_home, requested->r_home) != 0;
  mismatch->lib_r_path = strcmp(current->lib_r_path, requested->lib_r_path) != 0;
  mismatch->lib_paths = current->lib_paths_count != requested->lib_paths_count;

  if (!mismatch->lib_paths) {
    for (unsigned i = 0; i < current->lib_paths_count; i++) {
      if (strcmp(current->lib_paths[i], requested->lib_paths[i]) != 0) {
        mismatch->lib_paths = true;
        break;
      }
    }
  }

  if (!init_config_copy(&mismatch->current, current->lib_r_path, current->r_home, current->lib_paths, current->lib_paths_count)) {
    return false;
  }

  if (!init_config_copy(&mismatch->requested, requested->lib_r_path, requested->r_home, requested->lib_paths, requested->lib_paths_count)) {
    init_config_clear(&mismatch->current);
    return false;
  }

  return true;
}

static void free_init_mismatch(init_mismatch_info *mismatch) {
  init_config_clear(&mismatch->current);
  init_config_clear(&mismatch->requested);
}

static void release_resource_array(rx_object_resource **resources, unsigned count);

static int buffer_append(byte_buffer *buffer, const char *data, size_t len) {
  if (len == 0) return 1;
  if (data == NULL) return 0;

  if (buffer->len > SIZE_MAX - len - 1) return 0;
  size_t needed = buffer->len + len + 1;

  if (needed > buffer->cap) {
    size_t cap = buffer->cap == 0 ? 256 : buffer->cap;
    while (cap < needed) {
      if (cap > SIZE_MAX / 2) {
        cap = needed;
        break;
      }
      cap *= 2;
    }

    char *data_copy = realloc(buffer->data, cap);
    if (data_copy == NULL) return 0;
    buffer->data = data_copy;
    buffer->cap = cap;
  }

  memcpy(buffer->data + buffer->len, data, len);
  buffer->len += len;
  buffer->data[buffer->len] = '\0';
  return 1;
}

static void buffer_free(byte_buffer *buffer) {
  if (buffer == NULL) return;
  free(buffer->data);
  buffer->data = NULL;
  buffer->len = 0;
  buffer->cap = 0;
}

static void capture_write_console_ex(const char *buf, int len, int otype) {
  r_work *work = current_capture_work;
  if (work == NULL || buf == NULL || len <= 0) return;

  if (otype == 0) {
    (void)buffer_append(&work->stdout_buffer, buf, (size_t)len);
  } else {
    (void)buffer_append(&work->messages_buffer, buf, (size_t)len);
  }
}

static void capture_write_console(const char *buf, int len) {
  r_work *work = current_capture_work;
  if (work == NULL || buf == NULL || len <= 0) return;
  (void)buffer_append(&work->stdout_buffer, buf, (size_t)len);
}

static int get_string_list(ErlNifEnv *env, ERL_NIF_TERM list, char ***out, unsigned *out_count) {
  unsigned count;
  if (!enif_get_list_length(env, list, &count)) return 0;

  char **items = calloc(count, sizeof(char *));
  if (items == NULL && count > 0) return 0;

  ERL_NIF_TERM head;
  ERL_NIF_TERM tail = list;

  for (unsigned i = 0; i < count; i++) {
    if (!enif_get_list_cell(env, tail, &head, &tail) || !get_string(env, head, &items[i])) {
      free_string_array(items, count);
      return 0;
    }
  }

  *out = items;
  *out_count = count;
  return 1;
}

static void free_encode_value(encode_value *value) {
  if (value == NULL) return;

  free(value->string_scalar);
  free(value->logical_values);
  free(value->integer_values);
  free(value->double_values);

  if (value->string_values != NULL) {
    for (R_xlen_t i = 0; i < value->length; i++) free(value->string_values[i]);
  }

  free(value->string_values);
  free(value->string_value_lens);
  free_string_array(value->entry_names, (unsigned)value->length);
  release_resource_array(value->entry_resources, (unsigned)value->length);
  memset(value, 0, sizeof(*value));
}

static void free_dataframe_wire(dataframe_wire *wire) {
  if (wire == NULL) return;

  free_string_array(wire->names, wire->name_count);

  if (wire->columns != NULL) {
    for (unsigned i = 0; i < wire->column_count; i++) {
      free(wire->columns[i].name);

      if (wire->columns[i].values != NULL) {
        for (unsigned j = 0; j < wire->columns[i].value_count; j++) {
          free(wire->columns[i].values[j].string_value);
        }
      }

      free(wire->columns[i].values);
    }
  }

  free(wire->columns);
  free(wire);
}

static void free_decoded_value(decoded_value *value);

static void free_result_value(r_work *work) {
  if (work == NULL) return;

  free_decoded_value(work->decoded_result);
  free(work->string_result);
  free(work->integer_values);
  free(work->double_values);
  free(work->logical_values);

  if (work->string_values != NULL) {
    for (R_xlen_t i = 0; i < work->result_length; i++) free(work->string_values[i]);
  }

  free(work->string_values);
  free(work->string_value_lens);

  work->string_result = NULL;
  work->string_result_len = 0;
  work->integer_values = NULL;
  work->double_values = NULL;
  work->logical_values = NULL;
  work->string_values = NULL;
  work->string_value_lens = NULL;
  work->result_length = 0;
  work->decoded_result = NULL;
}

static void release_resource_array(rx_object_resource **resources, unsigned count) {
  if (resources == NULL) return;
  for (unsigned i = 0; i < count; i++) {
    if (resources[i] != NULL) enif_release_resource(resources[i]);
  }
  free(resources);
}

static void free_decoded_value(decoded_value *value) {
  if (value == NULL) return;

  free(value->string_value);

  if (value->items != NULL) {
    for (unsigned i = 0; i < value->count; i++) free_decoded_value(value->items[i]);
    free(value->items);
  }

  if (value->entries != NULL) {
    for (unsigned i = 0; i < value->count; i++) {
      free(value->entries[i].name);
      free_decoded_value(value->entries[i].value);
    }
    free(value->entries);
  }

  if (value->resource != NULL) enif_release_resource(value->resource);

  free(value);
}

static void free_eval_result_globals(r_work *work) {
  if (work == NULL) return;

  free_string_array(work->result_global_names, work->result_global_count);
  release_resource_array(work->result_global_resources, work->result_global_count);

  work->result_global_names = NULL;
  work->result_global_resources = NULL;
  work->result_global_count = 0;
}

static void free_eval_capture(r_work *work) {
  if (work == NULL) return;
  buffer_free(&work->stdout_buffer);
  buffer_free(&work->messages_buffer);
  buffer_free(&work->warnings_buffer);
  free_string_array(work->error_classes, work->error_class_count);
  work->error_classes = NULL;
  work->error_class_count = 0;
}

static void free_plot_pages(r_work *work) {
  if (work == NULL) return;

  if (work->plot_pages != NULL) {
    for (unsigned i = 0; i < work->plot_page_count; i++) {
      free(work->plot_pages[i]);
    }
  }

  free(work->plot_pages);
  free(work->plot_page_lens);
  work->plot_pages = NULL;
  work->plot_page_lens = NULL;
  work->plot_page_count = 0;
}

static resource_kind encode_kind_to_resource_kind(encode_kind kind) {
  switch (kind) {
    case ENCODE_KIND_NULL:
      return RESOURCE_KIND_NULL;
    case ENCODE_KIND_LOGICAL:
      return RESOURCE_KIND_LOGICAL;
    case ENCODE_KIND_INTEGER:
      return RESOURCE_KIND_INTEGER;
    case ENCODE_KIND_DOUBLE:
      return RESOURCE_KIND_DOUBLE;
    case ENCODE_KIND_CHARACTER:
      return RESOURCE_KIND_CHARACTER;
    case ENCODE_KIND_LOGICAL_VECTOR:
      return RESOURCE_KIND_LOGICAL_VECTOR;
    case ENCODE_KIND_INTEGER_VECTOR:
      return RESOURCE_KIND_INTEGER_VECTOR;
    case ENCODE_KIND_DOUBLE_VECTOR:
      return RESOURCE_KIND_DOUBLE_VECTOR;
    case ENCODE_KIND_CHARACTER_VECTOR:
      return RESOURCE_KIND_CHARACTER_VECTOR;
    case ENCODE_KIND_NA:
    case ENCODE_KIND_NAMED_LIST:
      return RESOURCE_KIND_GENERIC;
  }

  return RESOURCE_KIND_GENERIC;
}

static int get_type_name(ErlNifEnv *env, ERL_NIF_TERM term, char *out, size_t out_size) {
  if (out_size == 0) return 0;

  if (enif_get_atom(env, term, out, (unsigned)out_size, ERL_NIF_LATIN1)) {
    return 1;
  }

  ErlNifBinary bin;
  if (!enif_inspect_binary(env, term, &bin) || bin.size >= out_size) {
    return 0;
  }

  memcpy(out, bin.data, bin.size);
  out[bin.size] = '\0';
  return 1;
}

static int copy_binary_bytes(ErlNifEnv *env, ERL_NIF_TERM term, char **out, size_t *out_len) {
  ErlNifBinary bin;
  if (!enif_inspect_binary(env, term, &bin) || bin.size > (size_t)INT_MAX) {
    return 0;
  }

  char *copy = malloc(bin.size + 1);
  if (copy == NULL) return 0;

  if (bin.size > 0) memcpy(copy, bin.data, bin.size);
  copy[bin.size] = '\0';
  *out = copy;
  *out_len = bin.size;
  return 1;
}

static int get_boolean(ErlNifEnv *env, ERL_NIF_TERM term, bool *out) {
  if (enif_is_identical(term, enif_make_atom(env, "true"))) {
    *out = true;
    return 1;
  }

  if (enif_is_identical(term, enif_make_atom(env, "false"))) {
    *out = false;
    return 1;
  }

  return 0;
}

static int get_int32(ErlNifEnv *env, ERL_NIF_TERM term, int *out) {
  ErlNifSInt64 value;
  if (!enif_get_int64(env, term, &value) || value < INT_MIN || value > INT_MAX) {
    return 0;
  }

  *out = (int)value;
  return 1;
}

static int get_double_value(ErlNifEnv *env, ERL_NIF_TERM term, double *out) {
  if (enif_get_double(env, term, out)) {
    return 1;
  }

  ErlNifSInt64 integer_value;
  if (enif_get_int64(env, term, &integer_value)) {
    *out = (double)integer_value;
    return 1;
  }

  return 0;
}

static int get_double_float(ErlNifEnv *env, ERL_NIF_TERM term, double *out) {
  return enif_get_double(env, term, out);
}

static int parse_encode_kind(ErlNifEnv *env, ERL_NIF_TERM term, encode_kind *kind) {
  char type[32];
  if (!get_type_name(env, term, type, sizeof(type))) return 0;

  if (strcmp(type, "null") == 0) *kind = ENCODE_KIND_NULL;
  else if (strcmp(type, "logical") == 0) *kind = ENCODE_KIND_LOGICAL;
  else if (strcmp(type, "integer") == 0) *kind = ENCODE_KIND_INTEGER;
  else if (strcmp(type, "double") == 0) *kind = ENCODE_KIND_DOUBLE;
  else if (strcmp(type, "character") == 0) *kind = ENCODE_KIND_CHARACTER;
  else if (strcmp(type, "logical_vector") == 0) *kind = ENCODE_KIND_LOGICAL_VECTOR;
  else if (strcmp(type, "integer_vector") == 0) *kind = ENCODE_KIND_INTEGER_VECTOR;
  else if (strcmp(type, "double_vector") == 0) *kind = ENCODE_KIND_DOUBLE_VECTOR;
  else if (strcmp(type, "character_vector") == 0) *kind = ENCODE_KIND_CHARACTER_VECTOR;
  else if (strcmp(type, "na") == 0) *kind = ENCODE_KIND_NA;
  else if (strcmp(type, "named_list") == 0) *kind = ENCODE_KIND_NAMED_LIST;
  else return 0;

  return 1;
}

static int parse_decoded_atomic_type(ErlNifEnv *env, ERL_NIF_TERM term, decoded_atomic_type *type) {
  char type_name[32];
  if (!get_type_name(env, term, type_name, sizeof(type_name))) return 0;

  if (strcmp(type_name, "logical") == 0) *type = DECODE_ATOMIC_LOGICAL;
  else if (strcmp(type_name, "integer") == 0) *type = DECODE_ATOMIC_INTEGER;
  else if (strcmp(type_name, "double") == 0) *type = DECODE_ATOMIC_DOUBLE;
  else if (strcmp(type_name, "character") == 0) *type = DECODE_ATOMIC_CHARACTER;
  else return 0;

  return 1;
}

static int parse_encode_vector(ErlNifEnv *env, ERL_NIF_TERM list, encode_value *value) {
  unsigned count;
  if (!enif_get_list_length(env, list, &count)) return 0;

  value->length = (R_xlen_t)count;
  if (count == 0) return 1;

  switch (value->kind) {
    case ENCODE_KIND_LOGICAL_VECTOR:
      value->logical_values = calloc(count, sizeof(bool));
      if (value->logical_values == NULL) return 0;
      break;
    case ENCODE_KIND_INTEGER_VECTOR:
      value->integer_values = calloc(count, sizeof(int));
      if (value->integer_values == NULL) return 0;
      break;
    case ENCODE_KIND_DOUBLE_VECTOR:
      value->double_values = calloc(count, sizeof(double));
      if (value->double_values == NULL) return 0;
      break;
    case ENCODE_KIND_CHARACTER_VECTOR:
      value->string_values = calloc(count, sizeof(char *));
      value->string_value_lens = calloc(count, sizeof(size_t));
      if (value->string_values == NULL || value->string_value_lens == NULL) return 0;
      break;
    default:
      return 0;
  }

  ERL_NIF_TERM head;
  ERL_NIF_TERM tail = list;

  for (unsigned i = 0; i < count; i++) {
    if (!enif_get_list_cell(env, tail, &head, &tail)) return 0;

    switch (value->kind) {
      case ENCODE_KIND_LOGICAL_VECTOR:
        if (!get_boolean(env, head, &value->logical_values[i])) return 0;
        break;
      case ENCODE_KIND_INTEGER_VECTOR:
        if (!get_int32(env, head, &value->integer_values[i])) return 0;
        break;
      case ENCODE_KIND_DOUBLE_VECTOR:
        if (!get_double_float(env, head, &value->double_values[i])) return 0;
        break;
      case ENCODE_KIND_CHARACTER_VECTOR:
        if (!copy_binary_bytes(env, head, &value->string_values[i], &value->string_value_lens[i])) return 0;
        break;
      default:
        return 0;
    }
  }

  return 1;
}

static int parse_encode_named_list(ErlNifEnv *env, ERL_NIF_TERM list, encode_value *value) {
  unsigned count;
  if (!enif_get_list_length(env, list, &count)) return 0;

  value->length = (R_xlen_t)count;
  if (count == 0) return 1;

  value->entry_names = calloc(count, sizeof(char *));
  value->entry_resources = calloc(count, sizeof(rx_object_resource *));
  if (value->entry_names == NULL || value->entry_resources == NULL) return 0;

  ERL_NIF_TERM head;
  ERL_NIF_TERM tail = list;

  for (unsigned i = 0; i < count; i++) {
    const ERL_NIF_TERM *tuple;
    int arity;

    if (!enif_get_list_cell(env, tail, &head, &tail) ||
        !enif_get_tuple(env, head, &arity, &tuple) ||
        arity != 2 ||
        !get_global_name(env, tuple[0], &value->entry_names[i]) ||
        !enif_get_resource(env, tuple[1], object_resource_type, (void **)&value->entry_resources[i])) {
      return 0;
    }

    enif_keep_resource(value->entry_resources[i]);
  }

  return 1;
}

static int parse_encode_value(ErlNifEnv *env, ERL_NIF_TERM type_term, ERL_NIF_TERM value_term, encode_value *value) {
  memset(value, 0, sizeof(*value));

  if (!parse_encode_kind(env, type_term, &value->kind)) return 0;

  switch (value->kind) {
    case ENCODE_KIND_NULL:
      return enif_is_identical(value_term, enif_make_atom(env, "nil"));

    case ENCODE_KIND_LOGICAL:
      return get_boolean(env, value_term, &value->logical_scalar);

    case ENCODE_KIND_INTEGER:
      return get_int32(env, value_term, &value->integer_scalar);

    case ENCODE_KIND_DOUBLE:
      return get_double_value(env, value_term, &value->double_scalar);

    case ENCODE_KIND_CHARACTER:
      return copy_binary_bytes(env, value_term, &value->string_scalar, &value->string_scalar_len);

    case ENCODE_KIND_NA:
      return parse_decoded_atomic_type(env, value_term, &value->na_type);

    case ENCODE_KIND_LOGICAL_VECTOR:
    case ENCODE_KIND_INTEGER_VECTOR:
    case ENCODE_KIND_DOUBLE_VECTOR:
    case ENCODE_KIND_CHARACTER_VECTOR:
      return parse_encode_vector(env, value_term, value);

    case ENCODE_KIND_NAMED_LIST:
      return parse_encode_named_list(env, value_term, value);
  }

  return 0;
}

static int map_get_binary_key(ErlNifEnv *env, ERL_NIF_TERM map, const char *key, ERL_NIF_TERM *out) {
  size_t len = strlen(key);
  ERL_NIF_TERM key_term;
  unsigned char *data = enif_make_new_binary(env, len, &key_term);
  if (data == NULL && len > 0) return 0;
  if (len > 0) memcpy(data, key, len);
  return enif_get_map_value(env, map, key_term, out);
}

static int dataframe_type_from_name(const char *name, dataframe_column_type *type) {
  if (strcmp(name, "logical") == 0) *type = DATAFRAME_COLUMN_LOGICAL;
  else if (strcmp(name, "integer") == 0) *type = DATAFRAME_COLUMN_INTEGER;
  else if (strcmp(name, "double") == 0) *type = DATAFRAME_COLUMN_DOUBLE;
  else if (strcmp(name, "character") == 0) *type = DATAFRAME_COLUMN_CHARACTER;
  else return 0;

  return 1;
}

static const char *dataframe_type_name(dataframe_column_type type) {
  switch (type) {
    case DATAFRAME_COLUMN_LOGICAL:
      return "logical";
    case DATAFRAME_COLUMN_INTEGER:
      return "integer";
    case DATAFRAME_COLUMN_DOUBLE:
      return "double";
    case DATAFRAME_COLUMN_CHARACTER:
      return "character";
  }

  return "unknown";
}

static int parse_dataframe_non_na_value(ErlNifEnv *env, ERL_NIF_TERM term, dataframe_column_type type, dataframe_cell *cell) {
  memset(cell, 0, sizeof(*cell));

  switch (type) {
    case DATAFRAME_COLUMN_LOGICAL:
      return get_boolean(env, term, &cell->logical_value);

    case DATAFRAME_COLUMN_INTEGER: {
      ErlNifSInt64 value;
      if (!enif_get_int64(env, term, &value) || value < -2147483647LL || value > INT_MAX) return 0;
      cell->integer_value = (int)value;
      return 1;
    }

    case DATAFRAME_COLUMN_DOUBLE:
      if (!get_double_value(env, term, &cell->double_value) || !isfinite(cell->double_value)) return 0;
      return 1;

    case DATAFRAME_COLUMN_CHARACTER:
      return copy_binary_bytes(env, term, &cell->string_value, &cell->string_value_len);
  }

  return 0;
}

static int parse_dataframe_cell(ErlNifEnv *env, ERL_NIF_TERM term, dataframe_column_type type, dataframe_cell *cell) {
  memset(cell, 0, sizeof(*cell));

  ERL_NIF_TERM kind_term;
  if (map_get_binary_key(env, term, "kind", &kind_term)) {
    char *kind = NULL;
    if (!copy_binary_bytes(env, kind_term, &kind, &(size_t){0})) return 0;
    bool is_na = strcmp(kind, "na") == 0;
    free(kind);

    if (is_na) {
      ERL_NIF_TERM type_term;
      char *type_name = NULL;
      if (!map_get_binary_key(env, term, "type", &type_term) ||
          !copy_binary_bytes(env, type_term, &type_name, &(size_t){0})) {
        free(type_name);
        return 0;
      }

      bool matches = strcmp(type_name, dataframe_type_name(type)) == 0;
      free(type_name);
      if (!matches) return 0;

      cell->is_na = true;
      return 1;
    }

    return 0;
  }

  return parse_dataframe_non_na_value(env, term, type, cell);
}

static int dataframe_name_duplicate(char **names, unsigned index, const char *name) {
  for (unsigned i = 0; i < index; i++) {
    if (strcmp(names[i], name) == 0) return 1;
  }

  return 0;
}

static int parse_dataframe_wire(ErlNifEnv *env, ERL_NIF_TERM term, dataframe_wire **out) {
  *out = NULL;

  ERL_NIF_TERM kind_term;
  char *kind = NULL;
  if (!map_get_binary_key(env, term, "kind", &kind_term) ||
      !copy_binary_bytes(env, kind_term, &kind, &(size_t){0})) {
    free(kind);
    return 0;
  }

  bool is_dataframe = strcmp(kind, "data_frame") == 0;
  free(kind);
  if (!is_dataframe) return 0;

  ERL_NIF_TERM names_term;
  ERL_NIF_TERM n_rows_term;
  ERL_NIF_TERM columns_term;
  unsigned name_count = 0;
  unsigned column_count = 0;
  ErlNifSInt64 n_rows_value = 0;

  if (!map_get_binary_key(env, term, "names", &names_term) ||
      !map_get_binary_key(env, term, "n_rows", &n_rows_term) ||
      !map_get_binary_key(env, term, "columns", &columns_term) ||
      !enif_get_list_length(env, names_term, &name_count) ||
      !enif_get_list_length(env, columns_term, &column_count) ||
      !enif_get_int64(env, n_rows_term, &n_rows_value) ||
      n_rows_value < 0 || n_rows_value > INT_MAX ||
      name_count != column_count ||
      (name_count == 0 && n_rows_value > 0)) {
    return 0;
  }

  dataframe_wire *wire = calloc(1, sizeof(dataframe_wire));
  if (wire == NULL) return 0;
  wire->name_count = name_count;
  wire->column_count = column_count;
  wire->n_rows = (unsigned)n_rows_value;

  if (name_count > 0) {
    wire->names = calloc(name_count, sizeof(char *));
    wire->columns = calloc(column_count, sizeof(dataframe_column));
    if (wire->names == NULL || wire->columns == NULL) {
      free_dataframe_wire(wire);
      return 0;
    }
  }

  ERL_NIF_TERM head;
  ERL_NIF_TERM tail = names_term;
  for (unsigned i = 0; i < name_count; i++) {
    size_t ignored_len = 0;
    if (!enif_get_list_cell(env, tail, &head, &tail) ||
        !copy_binary_bytes(env, head, &wire->names[i], &ignored_len) ||
        wire->names[i][0] == '\0' ||
        dataframe_name_duplicate(wire->names, i, wire->names[i])) {
      free_dataframe_wire(wire);
      return 0;
    }
  }

  tail = columns_term;
  for (unsigned i = 0; i < column_count; i++) {
    if (!enif_get_list_cell(env, tail, &head, &tail)) {
      free_dataframe_wire(wire);
      return 0;
    }

    dataframe_column *column = &wire->columns[i];
    ERL_NIF_TERM name_term;
    ERL_NIF_TERM type_term;
    ERL_NIF_TERM values_term;
    char *type_name = NULL;
    unsigned value_count = 0;

    if (!map_get_binary_key(env, head, "name", &name_term) ||
        !map_get_binary_key(env, head, "type", &type_term) ||
        !map_get_binary_key(env, head, "values", &values_term) ||
        !copy_binary_bytes(env, name_term, &column->name, &(size_t){0}) ||
        strcmp(column->name, wire->names[i]) != 0 ||
        !copy_binary_bytes(env, type_term, &type_name, &(size_t){0}) ||
        !dataframe_type_from_name(type_name, &column->type) ||
        !enif_get_list_length(env, values_term, &value_count) ||
        value_count != wire->n_rows) {
      free(type_name);
      free_dataframe_wire(wire);
      return 0;
    }
    free(type_name);

    column->value_count = value_count;
    if (value_count > 0) {
      column->values = calloc(value_count, sizeof(dataframe_cell));
      if (column->values == NULL) {
        free_dataframe_wire(wire);
        return 0;
      }
    }

    ERL_NIF_TERM value_head;
    ERL_NIF_TERM value_tail = values_term;
    for (unsigned j = 0; j < value_count; j++) {
      if (!enif_get_list_cell(env, value_tail, &value_head, &value_tail) ||
          !parse_dataframe_cell(env, value_head, column->type, &column->values[j])) {
        free_dataframe_wire(wire);
        return 0;
      }
    }
  }

  *out = wire;
  return 1;
}

static int parse_dataframe_opts(ErlNifEnv *env, ERL_NIF_TERM opts, bool *has_max_rows, unsigned *max_rows) {
  *has_max_rows = false;
  *max_rows = 0;

  unsigned count = 0;
  if (!enif_get_list_length(env, opts, &count)) return 0;

  ERL_NIF_TERM head;
  ERL_NIF_TERM tail = opts;
  for (unsigned i = 0; i < count; i++) {
    const ERL_NIF_TERM *tuple;
    int arity = 0;
    if (!enif_get_list_cell(env, tail, &head, &tail) ||
        !enif_get_tuple(env, head, &arity, &tuple) ||
        arity != 2) {
      return 0;
    }

    if (enif_is_identical(tuple[0], enif_make_atom(env, "max_rows"))) {
      ErlNifSInt64 value = 0;
      if (!enif_get_int64(env, tuple[1], &value) || value <= 0 || value > INT_MAX) return 0;
      *has_max_rows = true;
      *max_rows = (unsigned)value;
    }
  }

  return 1;
}

static void enqueue_work(r_work *work) {
  pthread_mutex_lock(&queue_mutex);
  if (queue_tail) queue_tail->next = work;
  else queue_head = work;
  queue_tail = work;
  pthread_cond_signal(&queue_cond);
  pthread_mutex_unlock(&queue_mutex);
}

static r_work *dequeue_work(void) {
  pthread_mutex_lock(&queue_mutex);
  while (!queue_head) pthread_cond_wait(&queue_cond, &queue_mutex);
  r_work *work = queue_head;
  queue_head = work->next;
  if (!queue_head) queue_tail = NULL;
  pthread_mutex_unlock(&queue_mutex);
  return work;
}

static void finish_work(r_work *work) {
  if (work->kind == WORK_RELEASE) {
    free(work);
    return;
  }

  pthread_mutex_lock(&queue_mutex);
  work->done = true;
  pthread_cond_signal(&work->done_cond);
  pthread_mutex_unlock(&queue_mutex);
}

static void configure_r_c_stack(void) {
  char stack_anchor;
  uintptr_t *r_c_stack_start = (uintptr_t *)dlsym(lib_r_handle, "R_CStackStart");
  uintptr_t *r_c_stack_limit = (uintptr_t *)dlsym(lib_r_handle, "R_CStackLimit");
  int *r_c_stack_dir = (int *)dlsym(lib_r_handle, "R_CStackDir");

  if (r_c_stack_start != NULL) {
    *r_c_stack_start = (uintptr_t)&stack_anchor;
  }

  if (r_c_stack_limit != NULL) {
    *r_c_stack_limit = (uintptr_t)-1;
  }

  if (r_c_stack_dir != NULL) {
    *r_c_stack_dir = -1;
  }
}

static bool load_symbol(void **out, const char *name) {
  *out = dlsym(lib_r_handle, name);
  return *out != NULL;
}

static int ensure_eval_helper(r_work *work);
static int ensure_plot_helper(r_work *work);

static bool load_r_api(r_work *work) {
  if (r.mk_string != NULL) return true;

  r_api candidate = {0};

  if (!load_symbol((void **)&candidate.mk_string, "Rf_mkString") ||
      !load_symbol((void **)&candidate.protect, "Rf_protect") ||
      !load_symbol((void **)&candidate.unprotect, "Rf_unprotect") ||
      !load_symbol((void **)&candidate.parse_vector, "R_ParseVector") ||
      !load_symbol((void **)&candidate.try_eval, "R_tryEval") ||
      !load_symbol((void **)&candidate.toplevel_exec, "R_ToplevelExec") ||
      !load_symbol((void **)&candidate.xlength_fn, "Rf_xlength") ||
      !load_symbol((void **)&candidate.length_fn, "LENGTH") ||
      !load_symbol((void **)&candidate.type_of, "TYPEOF") ||
      !load_symbol((void **)&candidate.vector_elt, "VECTOR_ELT") ||
      !load_symbol((void **)&candidate.integer_elt, "INTEGER_ELT") ||
      !load_symbol((void **)&candidate.real_elt, "REAL_ELT") ||
      !load_symbol((void **)&candidate.logical_elt, "LOGICAL_ELT") ||
      !load_symbol((void **)&candidate.string_elt, "STRING_ELT") ||
      !load_symbol((void **)&candidate.r_char, "R_CHAR") ||
      !load_symbol((void **)&candidate.preserve_object, "R_PreserveObject") ||
      !load_symbol((void **)&candidate.release_object, "R_ReleaseObject") ||
      !load_symbol((void **)&candidate.scalar_real, "Rf_ScalarReal") ||
      !load_symbol((void **)&candidate.scalar_integer, "Rf_ScalarInteger") ||
      !load_symbol((void **)&candidate.scalar_logical, "Rf_ScalarLogical") ||
      !load_symbol((void **)&candidate.mk_char_len_ce, "Rf_mkCharLenCE") ||
      !load_symbol((void **)&candidate.scalar_string, "Rf_ScalarString") ||
      !load_symbol((void **)&candidate.alloc_vector, "Rf_allocVector") ||
      !load_symbol((void **)&candidate.raw, "RAW") ||
      !load_symbol((void **)&candidate.set_integer_elt, "SET_INTEGER_ELT") ||
      !load_symbol((void **)&candidate.set_real_elt, "SET_REAL_ELT") ||
      !load_symbol((void **)&candidate.set_logical_elt, "SET_LOGICAL_ELT") ||
      !load_symbol((void **)&candidate.set_string_elt, "SET_STRING_ELT") ||
      !load_symbol((void **)&candidate.set_vector_elt, "SET_VECTOR_ELT") ||
      !load_symbol((void **)&candidate.set_attrib, "Rf_setAttrib") ||
      !load_symbol((void **)&candidate.new_env, "R_NewEnv") ||
      !load_symbol((void **)&candidate.install, "Rf_install") ||
      !load_symbol((void **)&candidate.define_var, "Rf_defineVar") ||
      !load_symbol((void **)&candidate.find_var_in_frame, "Rf_findVarInFrame") ||
      !load_symbol((void **)&candidate.ls_internal, "R_lsInternal") ||
      !load_symbol((void **)&candidate.lang1, "Rf_lang1") ||
      !load_symbol((void **)&candidate.lang2, "Rf_lang2") ||
      !load_symbol((void **)&candidate.lang3, "Rf_lang3") ||
      !load_symbol((void **)&candidate.lang4, "Rf_lang4") ||
      !load_symbol((void **)&candidate.get_attrib, "Rf_getAttrib") ||
      !load_symbol((void **)&candidate.inherits, "Rf_inherits") ||
      !load_symbol((void **)&candidate.nil_value, "R_NilValue") ||
      !load_symbol((void **)&candidate.global_env, "R_GlobalEnv") ||
      !load_symbol((void **)&candidate.names_symbol, "R_NamesSymbol") ||
      !load_symbol((void **)&candidate.class_symbol, "R_ClassSymbol") ||
      !load_symbol((void **)&candidate.dim_symbol, "R_DimSymbol") ||
      !load_symbol((void **)&candidate.na_string, "R_NaString") ||
      !load_symbol((void **)&candidate.na_int, "R_NaInt") ||
      !load_symbol((void **)&candidate.na_real, "R_NaReal")) {
    set_error(work, "libR is missing one or more native API symbols");
    return false;
  }

  r = candidate;
  r_write_console =
      (ptr_R_WriteConsole_fn *)dlsym(lib_r_handle, "ptr_R_WriteConsole");
  r_write_console_ex =
      (ptr_R_WriteConsoleEx_fn *)dlsym(lib_r_handle, "ptr_R_WriteConsoleEx");

  return true;
}

static int apply_lib_paths(r_work *work) {
  if (work->lib_paths_count == 0) return 1;

  int protect_count = 0;
  SEXP paths = r.protect(r.alloc_vector(STRSXP, work->lib_paths_count));
  protect_count++;

  for (unsigned i = 0; i < work->lib_paths_count; i++) {
    SEXP path = r.protect(r.mk_char_len_ce(work->lib_paths[i], (int)strlen(work->lib_paths[i]), CE_UTF8));
    protect_count++;
    r.set_string_elt(paths, i, path);
  }

  SEXP current_call = r.protect(r.lang1(r.install(".libPaths")));
  protect_count++;

  int error_occurred = 0;
  SEXP current = r.protect(r.try_eval(current_call, *r.global_env, &error_occurred));
  protect_count++;

  if (error_occurred) {
    r.unprotect(protect_count);
    set_error(work, "failed to read native R library paths");
    return 0;
  }

  R_xlen_t current_len = r.xlength_fn(current);
  R_xlen_t total_len = work->lib_paths_count + current_len;
  SEXP combined = r.protect(r.alloc_vector(STRSXP, total_len));
  protect_count++;

  for (unsigned i = 0; i < work->lib_paths_count; i++) {
    r.set_string_elt(combined, i, r.string_elt(paths, i));
  }

  for (R_xlen_t i = 0; i < current_len; i++) {
    r.set_string_elt(combined, work->lib_paths_count + i, r.string_elt(current, i));
  }

  SEXP set_call = r.protect(r.lang2(r.install(".libPaths"), combined));
  protect_count++;

  error_occurred = 0;
  r.try_eval(set_call, *r.global_env, &error_occurred);

  if (error_occurred) {
    r.unprotect(protect_count);
    set_error(work, "failed to apply native R library paths");
    return 0;
  }

  r.unprotect(protect_count);
  return 1;
}

// Hand SIGCHLD back to its default disposition so embedded R's libc
// system()/popen() collect their own children instead of the BEAM's signal
// dispatcher consuming the exit (see the call site in do_init for the full
// rationale). Runs on the R owner thread; sigaction disposition is
// process-wide.
static void reset_sigchld_to_default(void) {
  struct sigaction default_action;
  memset(&default_action, 0, sizeof(default_action));
  default_action.sa_handler = SIG_DFL;
  sigemptyset(&default_action.sa_mask);
  sigaction(SIGCHLD, &default_action, NULL);
}

static void do_init(r_work *work) {
  work->ok = true;
  work->error[0] = '\0';
  work->init_return = INIT_RETURN_OK;
  memset(&work->init_failure, 0, sizeof(work->init_failure));
  memset(&work->init_mismatch, 0, sizeof(work->init_mismatch));

  init_config requested = {0};
  if (!init_config_copy_from_work(&requested, work)) {
    work->init_return = INIT_RETURN_PLAIN_ERROR;
    set_error(work, "failed to copy native init config");
    return;
  }

  pthread_mutex_lock(&diagnostics_mutex);
  init_attempt_count++;

  if (!init_config_copy(&attempted_init_config, requested.lib_r_path, requested.r_home, requested.lib_paths, requested.lib_paths_count)) {
    pthread_mutex_unlock(&diagnostics_mutex);
    work->init_return = INIT_RETURN_PLAIN_ERROR;
    set_error(work, "failed to record native init config");
    init_config_clear(&requested);
    return;
  }

  if (init_state == INIT_STATE_INITIALIZED) {
    bool same_config = init_config_equal(&successful_init_config, &requested);
    if (same_config) {
      pthread_mutex_unlock(&diagnostics_mutex);
      init_config_clear(&requested);
      return;
    }

    init_mismatch_count++;
    if (!fill_init_mismatch(&work->init_mismatch, &successful_init_config, &requested)) {
      pthread_mutex_unlock(&diagnostics_mutex);
      work->init_return = INIT_RETURN_PLAIN_ERROR;
      set_error(work, "failed to record native init mismatch diagnostics");
      init_config_clear(&requested);
      return;
    }

    pthread_mutex_unlock(&diagnostics_mutex);
    work->init_return = INIT_RETURN_MISMATCH;
    set_error(work, "native init config mismatch");
    init_config_clear(&requested);
    return;
  }

  if (init_state == INIT_STATE_FAILED && has_last_init_error && !last_init_error.retryable) {
    work->ok = false;
    work->init_return = INIT_RETURN_FAILED;
    work->init_failure = last_init_error;
    pthread_mutex_unlock(&diagnostics_mutex);
    init_config_clear(&requested);
    return;
  }

  pthread_mutex_unlock(&diagnostics_mutex);

  init_config pending_success_config = {0};
  if (!init_config_copy(&pending_success_config, requested.lib_r_path, requested.r_home, requested.lib_paths, requested.lib_paths_count)) {
    work->init_return = INIT_RETURN_PLAIN_ERROR;
    set_error(work, "failed to record native init config");
    init_config_clear(&requested);
    return;
  }

  setenv("R_HOME", work->r_home, 1);
  setenv("R_ENABLE_JIT", "0", 1);
  setenv("R_DEFAULT_PACKAGES", "NULL", 1);

  lib_r_handle = dlopen(work->lib_r_path, RTLD_NOW | RTLD_GLOBAL);
  if (lib_r_handle == NULL) {
    const char *message = dlerror();
    set_init_failed(work, "dlopen_lib_r", message ? message : "failed to dlopen libR", true);
    init_config_clear(&pending_success_config);
    init_config_clear(&requested);
    return;
  }

  configure_r_c_stack();

  Rf_initialize_R_fn Rf_initialize_R =
      (Rf_initialize_R_fn)dlsym(lib_r_handle, "Rf_initialize_R");
  setup_Rmainloop_fn setup_Rmainloop =
      (setup_Rmainloop_fn)dlsym(lib_r_handle, "setup_Rmainloop");

  if (Rf_initialize_R == NULL || setup_Rmainloop == NULL) {
    set_init_failed(work, "resolve_boot_symbols", "libR is missing Rf_initialize_R or setup_Rmainloop", true);
    close_uninitialized_lib_r_handle();
    init_config_clear(&pending_success_config);
    init_config_clear(&requested);
    return;
  }

  char *argv[] = {"R", "--silent", "--no-save", "--no-restore", "--no-readline"};
  if (Rf_initialize_R(5, argv) < 0) {
    set_init_failed(work, "rf_initialize_r", "Rf_initialize_R failed", false);
    init_config_clear(&pending_success_config);
    init_config_clear(&requested);
    return;
  }

  if (init_fault_stage_enabled("after_rf_initialize")) {
    set_init_failed(work, "rf_initialize_r", "fault injected after Rf_initialize_R", false);
    init_config_clear(&pending_success_config);
    init_config_clear(&requested);
    return;
  }

  configure_r_c_stack();
  setup_Rmainloop();

  // The BEAM owns a process-wide SIGCHLD handler whose signal-dispatcher thread
  // consumes child-process exits. Embedded R's libc system()/popen() then block
  // forever in wait4() waiting for a child whose exit the BEAM already reaped,
  // which hangs every R call that shells out (system/system2/pipe and therefore
  // package/namespace loading via requireNamespace). Restore the default
  // SIGCHLD disposition so R's own system()/popen() reap their children. This
  // is safe for the BEAM: OTP reaps Port children through erl_child_setup, not
  // through this handler (verified: Port/System.cmd/os:cmd keep working).
  reset_sigchld_to_default();

  if (!load_r_api(work)) {
    set_init_failed(work, "load_r_api", work->error[0] == '\0' ? "libR is missing one or more native API symbols" : work->error, false);
    init_config_clear(&pending_success_config);
    init_config_clear(&requested);
    return;
  }

  if (!apply_lib_paths(work)) {
    set_init_failed(work, "apply_lib_paths", work->error[0] == '\0' ? "failed to apply native R library paths" : work->error, false);
    init_config_clear(&pending_success_config);
    init_config_clear(&requested);
    return;
  }

  if (!ensure_eval_helper(work)) {
    set_init_failed(work, "ensure_eval_helper", work->error[0] == '\0' ? "failed to initialize native eval helper" : work->error, false);
    init_config_clear(&pending_success_config);
    init_config_clear(&requested);
    return;
  }

  pthread_mutex_lock(&diagnostics_mutex);
  init_config_clear(&successful_init_config);
  successful_init_config = pending_success_config;
  memset(&pending_success_config, 0, sizeof(pending_success_config));
  init_state = INIT_STATE_INITIALIZED;
  has_last_init_error = false;
  memset(&last_init_error, 0, sizeof(last_init_error));
  pthread_mutex_unlock(&diagnostics_mutex);

  runtime_initialized = true;
  init_config_clear(&requested);
}

static void set_unsupported_result(r_work *work, SEXP value) {
  work->result_kind = RESULT_UNSUPPORTED;
  work->unsupported_type = r.type_of(value);
}

static rx_object_resource *make_preserved_resource(SEXP value, resource_kind kind);

static int copy_r_string(SEXP string, char **out, size_t *out_len) {
  int length = r.length_fn(string);
  if (length < 0) return 0;

  const char *chars = r.r_char(string);
  size_t size = (size_t)length;
  char *copy = malloc(size + 1);
  if (copy == NULL) return 0;

  if (size > 0 && chars != NULL) memcpy(copy, chars, size);
  copy[size] = '\0';
  *out = copy;
  *out_len = size;
  return 1;
}

static decoded_value *alloc_decoded_value(decoded_value_kind kind) {
  decoded_value *value = calloc(1, sizeof(decoded_value));
  if (value != NULL) value->kind = kind;
  return value;
}

static resource_kind resource_kind_for_object(SEXP value) {
  if (r.inherits != NULL && r.inherits(value, "data.frame")) {
    return RESOURCE_KIND_DATAFRAME;
  }

  return RESOURCE_KIND_GENERIC;
}

static int decoded_object(SEXP value, decoded_value **out) {
  decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_OBJECT);
  if (decoded == NULL) return 0;

  decoded->resource = make_preserved_resource(value, resource_kind_for_object(value));
  if (decoded->resource == NULL) {
    free_decoded_value(decoded);
    return 0;
  }

  *out = decoded;
  return 1;
}

static int decode_opaque_value(r_work *work, SEXP value, int depth, decoded_value **out) {
  if (depth == 0) {
    set_unsupported_result(work, value);
    *out = NULL;
    return 1;
  }

  if (!decoded_object(value, out)) {
    set_error(work, "failed to allocate native opaque decode result");
    return 0;
  }

  return 1;
}

static int has_non_empty_attribute(SEXP value, SEXP symbol) {
  if (symbol == NULL || value == *r.nil_value) return 0;
  SEXP attr = r.get_attrib(value, symbol);
  return attr != *r.nil_value && r.xlength_fn(attr) > 0;
}

static int is_semantic_object(SEXP value) {
  return has_non_empty_attribute(value, *r.class_symbol) ||
         has_non_empty_attribute(value, *r.dim_symbol);
}

static int decoded_count(R_xlen_t length, unsigned *out) {
  if (length < 0 || (uint64_t)length > UINT_MAX) return 0;
  *out = (unsigned)length;
  return 1;
}

static int decoded_na(decoded_atomic_type type, decoded_value **out) {
  decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_NA);
  if (decoded == NULL) return 0;
  decoded->atomic_type = type;
  *out = decoded;
  return 1;
}

static int decoded_logical(int value, decoded_value **out) {
  if (value == *r.na_int) return decoded_na(DECODE_ATOMIC_LOGICAL, out);

  decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_LOGICAL);
  if (decoded == NULL) return 0;
  decoded->logical_value = value == 1;
  *out = decoded;
  return 1;
}

static int decoded_integer(int value, decoded_value **out) {
  if (value == *r.na_int) return decoded_na(DECODE_ATOMIC_INTEGER, out);

  decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_INTEGER);
  if (decoded == NULL) return 0;
  decoded->integer_value = value;
  *out = decoded;
  return 1;
}

static int decoded_double(double value, decoded_value **out) {
  if (isnan(value)) return decoded_na(DECODE_ATOMIC_DOUBLE, out);

  decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_DOUBLE);
  if (decoded == NULL) return 0;
  decoded->double_value = value;
  *out = decoded;
  return 1;
}

static int decoded_character(SEXP string, decoded_value **out) {
  if (string == *r.na_string) return decoded_na(DECODE_ATOMIC_CHARACTER, out);

  decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_CHARACTER);
  if (decoded == NULL) return 0;

  if (!copy_r_string(string, &decoded->string_value, &decoded->string_value_len)) {
    free_decoded_value(decoded);
    return 0;
  }

  *out = decoded;
  return 1;
}

static int decode_r_value(r_work *work, SEXP value, int depth, decoded_value **out);

static int decode_atomic_vector(r_work *work, SEXP value, decoded_atomic_type type, decoded_value **out) {
  R_xlen_t length = r.xlength_fn(value);
  unsigned count = 0;
  if (!decoded_count(length, &count)) {
    set_error(work, "native vector result is too large");
    return 0;
  }

  decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_VECTOR);
  if (decoded == NULL) {
    set_error(work, "failed to allocate native vector decode result");
    return 0;
  }

  decoded->atomic_type = type;
  decoded->count = count;
  if (count > 0) {
    decoded->items = calloc(count, sizeof(decoded_value *));
    if (decoded->items == NULL) {
      free_decoded_value(decoded);
      set_error(work, "failed to allocate native vector decode result");
      return 0;
    }
  }

  for (unsigned i = 0; i < count; i++) {
    int ok = 0;
    switch (type) {
      case DECODE_ATOMIC_LOGICAL:
        ok = decoded_logical(r.logical_elt(value, (R_xlen_t)i), &decoded->items[i]);
        break;
      case DECODE_ATOMIC_INTEGER:
        ok = decoded_integer(r.integer_elt(value, (R_xlen_t)i), &decoded->items[i]);
        break;
      case DECODE_ATOMIC_DOUBLE:
        ok = decoded_double(r.real_elt(value, (R_xlen_t)i), &decoded->items[i]);
        break;
      case DECODE_ATOMIC_CHARACTER:
        ok = decoded_character(r.string_elt(value, (R_xlen_t)i), &decoded->items[i]);
        break;
    }

    if (!ok) {
      free_decoded_value(decoded);
      set_error(work, "failed to copy native vector decode result");
      return 0;
    }
  }

  *out = decoded;
  return 1;
}

static int copy_list_name(SEXP names, unsigned index, char **out) {
  *out = NULL;

  if (names == *r.nil_value || r.type_of(names) != STRSXP ||
      r.xlength_fn(names) <= (R_xlen_t)index) {
    return 1;
  }

  SEXP name = r.string_elt(names, (R_xlen_t)index);
  if (name == *r.na_string || r.length_fn(name) == 0) {
    return 1;
  }

  size_t len = 0;
  return copy_r_string(name, out, &len);
}

static int duplicate_name(decoded_entry *entries, unsigned index, const char *name) {
  if (name == NULL) return 0;

  for (unsigned i = 0; i < index; i++) {
    if (entries[i].name != NULL && strcmp(entries[i].name, name) == 0) {
      return 1;
    }
  }

  return 0;
}

static int decode_r_list(r_work *work, SEXP value, int depth, decoded_value **out) {
  if (depth >= RX_DECODE_MAX_DEPTH) {
    return decode_opaque_value(work, value, depth, out);
  }

  R_xlen_t length = r.xlength_fn(value);
  unsigned count = 0;
  if (!decoded_count(length, &count)) {
    set_error(work, "native list result is too large");
    return 0;
  }

  decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_NAMED_LIST);
  if (decoded == NULL) {
    set_error(work, "failed to allocate native list decode result");
    return 0;
  }

  decoded->count = count;
  if (count > 0) {
    decoded->entries = calloc(count, sizeof(decoded_entry));
    if (decoded->entries == NULL) {
      free_decoded_value(decoded);
      set_error(work, "failed to allocate native list decode result");
      return 0;
    }
  }

  SEXP names = r.get_attrib(value, *r.names_symbol);
  int has_valid_names =
      names != *r.nil_value && r.type_of(names) == STRSXP && r.xlength_fn(names) == length;
  int fully_named = count == 0 ? 1 : has_valid_names;

  for (unsigned i = 0; i < count; i++) {
    if (!copy_list_name(has_valid_names ? names : *r.nil_value, i, &decoded->entries[i].name)) {
      free_decoded_value(decoded);
      set_error(work, "failed to copy native list name");
      return 0;
    }

    if (decoded->entries[i].name == NULL ||
        duplicate_name(decoded->entries, i, decoded->entries[i].name)) {
      fully_named = 0;
    }

    if (!decode_r_value(work, r.vector_elt(value, (R_xlen_t)i), depth + 1, &decoded->entries[i].value)) {
      free_decoded_value(decoded);
      return 0;
    }
  }

  decoded->kind = fully_named ? DECODE_VALUE_NAMED_LIST : DECODE_VALUE_R_LIST;
  *out = decoded;
  return 1;
}

static int decode_r_value(r_work *work, SEXP value, int depth, decoded_value **out) {
  *out = NULL;

  if (value == *r.nil_value || r.type_of(value) == NILSXP) {
    decoded_value *decoded = alloc_decoded_value(DECODE_VALUE_NULL);
    if (decoded == NULL) {
      set_error(work, "failed to allocate native NULL decode result");
      return 0;
    }
    *out = decoded;
    return 1;
  }

  if (is_semantic_object(value)) {
    return decode_opaque_value(work, value, depth, out);
  }

  int type = r.type_of(value);
  R_xlen_t length = r.xlength_fn(value);

  switch (type) {
    case LGLSXP:
      if (length == 1) {
        if (!decoded_logical(r.logical_elt(value, 0), out)) {
          set_error(work, "failed to copy native logical decode result");
          return 0;
        }
        return 1;
      }
      return decode_atomic_vector(work, value, DECODE_ATOMIC_LOGICAL, out);

    case INTSXP:
      if (length == 1) {
        if (!decoded_integer(r.integer_elt(value, 0), out)) {
          set_error(work, "failed to copy native integer decode result");
          return 0;
        }
        return 1;
      }
      return decode_atomic_vector(work, value, DECODE_ATOMIC_INTEGER, out);

    case REALSXP:
      if (length == 1) {
        if (!decoded_double(r.real_elt(value, 0), out)) {
          set_error(work, "failed to copy native double decode result");
          return 0;
        }
        return 1;
      }
      return decode_atomic_vector(work, value, DECODE_ATOMIC_DOUBLE, out);

    case STRSXP:
      if (length == 1) {
        if (!decoded_character(r.string_elt(value, 0), out)) {
          set_error(work, "failed to copy native character decode result");
          return 0;
        }
        return 1;
      }
      return decode_atomic_vector(work, value, DECODE_ATOMIC_CHARACTER, out);

    case VECSXP:
      return decode_r_list(work, value, depth, out);

    default:
      return decode_opaque_value(work, value, depth, out);
  }
}

static void copy_scalar_result(r_work *work, SEXP value) {
  work->result_kind = RESULT_NONE;
  work->result_length = 0;

  int type = r.type_of(value);
  R_xlen_t length = r.xlength_fn(value);

  if (value == *r.nil_value || type == NILSXP) {
    work->result_kind = RESULT_NULL;
    return;
  }

  if (length != 1) {
    set_unsupported_result(work, value);
    return;
  }

  switch (type) {
    case INTSXP:
      work->result_kind = RESULT_INTEGER;
      work->integer_result = r.integer_elt(value, 0);
      break;

    case REALSXP:
      work->result_kind = RESULT_DOUBLE;
      work->double_result = r.real_elt(value, 0);
      break;

    case LGLSXP:
      work->result_kind = RESULT_LOGICAL;
      work->logical_result = r.logical_elt(value, 0) == 1;
      break;

    case STRSXP: {
      SEXP string = r.string_elt(value, 0);
      if (!copy_r_string(string, &work->string_result, &work->string_result_len)) {
        set_error(work, "failed to allocate eval string result");
      } else {
        work->result_kind = RESULT_CHARACTER;
      }
      break;
    }

    default:
      set_unsupported_result(work, value);
      break;
  }
}

static void copy_logical_vector_result(r_work *work, SEXP value) {
  R_xlen_t length = r.xlength_fn(value);
  work->result_length = length;
  if (length > 0) {
    if ((uint64_t)length > (uint64_t)SIZE_MAX / sizeof(bool)) {
      set_error(work, "logical vector result is too large");
      return;
    }
    work->logical_values = malloc((size_t)length * sizeof(bool));
    if (work->logical_values == NULL) {
      set_error(work, "failed to allocate logical vector result");
      return;
    }
  }
  for (R_xlen_t i = 0; i < length; i++) {
    work->logical_values[i] = r.logical_elt(value, i) == 1;
  }
  work->result_kind = RESULT_LOGICAL_VECTOR;
}

static void copy_integer_vector_result(r_work *work, SEXP value) {
  R_xlen_t length = r.xlength_fn(value);
  work->result_length = length;
  if (length > 0) {
    if ((uint64_t)length > (uint64_t)SIZE_MAX / sizeof(int)) {
      set_error(work, "integer vector result is too large");
      return;
    }
    work->integer_values = malloc((size_t)length * sizeof(int));
    if (work->integer_values == NULL) {
      set_error(work, "failed to allocate integer vector result");
      return;
    }
  }
  for (R_xlen_t i = 0; i < length; i++) {
    work->integer_values[i] = r.integer_elt(value, i);
  }
  work->result_kind = RESULT_INTEGER_VECTOR;
}

static void copy_double_vector_result(r_work *work, SEXP value) {
  R_xlen_t length = r.xlength_fn(value);
  work->result_length = length;
  if (length > 0) {
    if ((uint64_t)length > (uint64_t)SIZE_MAX / sizeof(double)) {
      set_error(work, "double vector result is too large");
      return;
    }
    work->double_values = malloc((size_t)length * sizeof(double));
    if (work->double_values == NULL) {
      set_error(work, "failed to allocate double vector result");
      return;
    }
  }
  for (R_xlen_t i = 0; i < length; i++) {
    work->double_values[i] = r.real_elt(value, i);
  }
  work->result_kind = RESULT_DOUBLE_VECTOR;
}

static void copy_character_vector_result(r_work *work, SEXP value) {
  R_xlen_t length = r.xlength_fn(value);
  work->result_length = length;
  if (length > 0) {
    if ((uint64_t)length > (uint64_t)SIZE_MAX / sizeof(char *) ||
        (uint64_t)length > (uint64_t)SIZE_MAX / sizeof(size_t)) {
      set_error(work, "character vector result is too large");
      return;
    }
    work->string_values = calloc((size_t)length, sizeof(char *));
    work->string_value_lens = calloc((size_t)length, sizeof(size_t));
    if (work->string_values == NULL || work->string_value_lens == NULL) {
      set_error(work, "failed to allocate character vector result");
      return;
    }
  }
  for (R_xlen_t i = 0; i < length; i++) {
    SEXP string = r.string_elt(value, i);
    if (!copy_r_string(string, &work->string_values[i], &work->string_value_lens[i])) {
      set_error(work, "failed to copy character vector result");
      return;
    }
  }
  work->result_kind = RESULT_CHARACTER_VECTOR;
}

static void copy_generic_result(r_work *work, SEXP value) {
  free_decoded_value(work->decoded_result);
  work->decoded_result = NULL;
  work->result_kind = RESULT_NONE;
  (void)decode_r_value(work, value, 0, &work->decoded_result);
}

static void copy_resource_result(r_work *work, SEXP value, resource_kind kind) {
  work->result_kind = RESULT_NONE;
  work->result_length = 0;

  switch (kind) {
    case RESOURCE_KIND_NULL:
      work->result_kind = RESULT_NULL;
      return;

    case RESOURCE_KIND_LOGICAL:
      work->result_kind = RESULT_LOGICAL;
      work->logical_result = r.logical_elt(value, 0) == 1;
      return;

    case RESOURCE_KIND_INTEGER:
      work->result_kind = RESULT_INTEGER;
      work->integer_result = r.integer_elt(value, 0);
      return;

    case RESOURCE_KIND_DOUBLE:
      work->result_kind = RESULT_DOUBLE;
      work->double_result = r.real_elt(value, 0);
      return;

    case RESOURCE_KIND_CHARACTER: {
      SEXP string = r.string_elt(value, 0);
      if (!copy_r_string(string, &work->string_result, &work->string_result_len)) {
        set_error(work, "failed to allocate character result");
      } else {
        work->result_kind = RESULT_CHARACTER;
      }
      return;
    }

    case RESOURCE_KIND_LOGICAL_VECTOR:
      copy_logical_vector_result(work, value);
      return;

    case RESOURCE_KIND_INTEGER_VECTOR:
      copy_integer_vector_result(work, value);
      return;

    case RESOURCE_KIND_DOUBLE_VECTOR:
      copy_double_vector_result(work, value);
      return;

    case RESOURCE_KIND_CHARACTER_VECTOR:
      copy_character_vector_result(work, value);
      return;

    case RESOURCE_KIND_GENERIC:
      copy_generic_result(work, value);
      return;

    case RESOURCE_KIND_DATAFRAME:
      set_unsupported_result(work, value);
      return;
  }
}

typedef struct eval_string_ctx {
  r_work *work;
  bool ok;
  char error[1024];
} eval_string_ctx;

static void eval_string_body(void *data) {
  eval_string_ctx *ctx = (eval_string_ctx *)data;
  r_work *work = ctx->work;
  int protect_count = 0;
  ParseStatus parse_status = PARSE_NULL;
  SEXP source = r.protect(r.mk_string(work->source));
  protect_count++;

  SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
  protect_count++;

  if (parse_status != PARSE_OK) {
    snprintf(ctx->error, sizeof(ctx->error), "R parse failed");
    r.unprotect(protect_count);
    return;
  }

  SEXP result = *r.nil_value;
  R_xlen_t expr_count = r.xlength_fn(exprs);

  for (R_xlen_t i = 0; i < expr_count; i++) {
    int error_occurred = 0;
    result = r.try_eval(r.vector_elt(exprs, i), *r.global_env, &error_occurred);

    if (error_occurred) {
      snprintf(ctx->error, sizeof(ctx->error), "R evaluation failed");
      r.unprotect(protect_count);
      return;
    }
  }

  copy_scalar_result(work, result);
  r.unprotect(protect_count);
  ctx->ok = work->ok;
  if (!work->ok) {
    snprintf(ctx->error, sizeof(ctx->error), "%s", work->error);
  }
}

static void do_eval_string(r_work *work) {
  work->ok = true;
  work->error[0] = '\0';
  work->result_kind = RESULT_NONE;
  work->string_result = NULL;
  work->unsupported_type = 0;

  if (set_terminal_init_failure_if_present(work)) return;

  if (!runtime_initialized) {
    set_error(work, "embedded R runtime is not initialized");
    return;
  }

  if (!load_r_api(work)) return;

  eval_string_ctx ctx = {.work = work, .ok = false, .error = ""};

  if (!r.toplevel_exec(eval_string_body, &ctx)) {
    set_error(work, "R non-local error during eval_string");
    return;
  }

  if (!ctx.ok) {
    set_error(work, ctx.error[0] == '\0' ? "R evaluation failed" : ctx.error);
  }
}

typedef struct eval_ctx {
  r_work *work;
  bool ok;
  char error[1024];
} eval_ctx;

static rx_object_resource *make_preserved_resource(SEXP value, resource_kind kind) {
  rx_object_resource *resource = enif_alloc_resource(object_resource_type, sizeof(rx_object_resource));
  if (resource == NULL) return NULL;

  resource->sexp = NULL;
  resource->id = reserve_object_id();
  resource->kind = kind;
  resource->release_enqueued = false;
  pthread_mutex_init(&resource->mutex, NULL);

  r.preserve_object(value);

  pthread_mutex_lock(&resource->mutex);
  resource->sexp = value;
  pthread_mutex_unlock(&resource->mutex);

  return resource;
}

static int append_character_vector(byte_buffer *buffer, SEXP value) {
  if (value == *r.nil_value) return 1;
  if (r.type_of(value) != STRSXP) return 0;

  R_xlen_t count = r.xlength_fn(value);
  for (R_xlen_t i = 0; i < count; i++) {
    SEXP string = r.string_elt(value, i);
    const char *chars = r.r_char(string);
    int len = r.length_fn(string);
    if (len < 0) return 0;
    if (!buffer_append(buffer, chars, (size_t)len)) return 0;
    if (!buffer_append(buffer, "\n", 1)) return 0;
  }

  return 1;
}

static int append_character_vector_join(byte_buffer *buffer, SEXP value, const char *separator) {
  if (value == *r.nil_value) return 1;
  if (r.type_of(value) != STRSXP) return 0;

  size_t separator_len = strlen(separator);
  R_xlen_t count = r.xlength_fn(value);
  for (R_xlen_t i = 0; i < count; i++) {
    if ((buffer->len > 0 || i > 0) && separator_len > 0) {
      if (!buffer_append(buffer, separator, separator_len)) return 0;
    }

    SEXP string = r.string_elt(value, i);
    const char *chars = r.r_char(string);
    int len = r.length_fn(string);
    if (len < 0) return 0;
    if (!buffer_append(buffer, chars, (size_t)len)) return 0;
  }

  return 1;
}

static int copy_character_vector_to_array(SEXP value, char ***out, unsigned *out_count) {
  *out = NULL;
  *out_count = 0;

  if (value == *r.nil_value) return 1;
  if (r.type_of(value) != STRSXP) return 0;

  R_xlen_t count = r.xlength_fn(value);
  if (count < 0 || (uint64_t)count > UINT_MAX) return 0;

  char **items = calloc((unsigned)count, sizeof(char *));
  if (items == NULL && count > 0) return 0;

  for (R_xlen_t i = 0; i < count; i++) {
    size_t len = 0;
    if (!copy_r_string(r.string_elt(value, i), &items[i], &len)) {
      free_string_array(items, (unsigned)count);
      return 0;
    }
  }

  *out = items;
  *out_count = (unsigned)count;
  return 1;
}

static int copy_optional_character(SEXP value, char *out, size_t out_size);

static int fill_print_error_from_helper_result(r_work *work, SEXP helper_result) {
  if (r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) return 0;

  work->structured_error = true;
  if (!copy_optional_character(r.vector_elt(helper_result, 2), work->error_message, sizeof(work->error_message))) {
    return 0;
  }

  if (work->error_message[0] == '\0') {
    snprintf(work->error_message, sizeof(work->error_message), "R print failed");
  }

  if (!copy_character_vector_to_array(
          r.vector_elt(helper_result, 3),
          &work->error_classes,
          &work->error_class_count)) {
    return 0;
  }

  if (!copy_optional_character(r.vector_elt(helper_result, 4), work->error_call, sizeof(work->error_call))) {
    return 0;
  }

  if (!append_character_vector_join(&work->stdout_buffer, r.vector_elt(helper_result, 5), "\n")) return 0;
  if (!append_character_vector_join(&work->messages_buffer, r.vector_elt(helper_result, 6), "")) return 0;
  if (!append_character_vector_join(&work->warnings_buffer, r.vector_elt(helper_result, 7), "")) return 0;
  return 1;
}

static int copy_optional_character(SEXP value, char *out, size_t out_size) {
  if (out_size == 0) return 0;
  out[0] = '\0';

  if (value == *r.nil_value) return 1;
  if (r.type_of(value) != STRSXP || r.xlength_fn(value) < 1) return 0;

  char *copy = NULL;
  size_t len = 0;
  if (!copy_r_string(r.string_elt(value, 0), &copy, &len)) return 0;

  size_t copy_len = len < out_size - 1 ? len : out_size - 1;
  if (copy_len > 0) memcpy(out, copy, copy_len);
  out[copy_len] = '\0';
  free(copy);
  return 1;
}

static int set_parse_error_map(r_work *work) {
  work->structured_error = true;
  snprintf(work->error_message, sizeof(work->error_message), "parse error: R parse failed");
  work->error_call[0] = '\0';
  work->error_classes = calloc(1, sizeof(char *));
  if (work->error_classes == NULL) return 0;

  work->error_classes[0] = strdup("parseError");
  if (work->error_classes[0] == NULL) {
    free(work->error_classes);
    work->error_classes = NULL;
    return 0;
  }

  work->error_class_count = 1;
  return 1;
}

static int fill_error_from_helper_result(r_work *work, SEXP helper_result) {
  if (r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) return 0;

  work->structured_error = true;
  if (!copy_optional_character(r.vector_elt(helper_result, 2), work->error_message, sizeof(work->error_message))) {
    return 0;
  }

  if (work->error_message[0] == '\0') {
    snprintf(work->error_message, sizeof(work->error_message), "R evaluation failed");
  }

  if (!copy_character_vector_to_array(
          r.vector_elt(helper_result, 3),
          &work->error_classes,
          &work->error_class_count)) {
    return 0;
  }

  if (!copy_optional_character(r.vector_elt(helper_result, 4), work->error_call, sizeof(work->error_call))) {
    return 0;
  }

  if (!append_character_vector(&work->stdout_buffer, r.vector_elt(helper_result, 5))) return 0;
  if (!append_character_vector(&work->messages_buffer, r.vector_elt(helper_result, 6))) return 0;
  if (!append_character_vector(&work->warnings_buffer, r.vector_elt(helper_result, 7))) return 0;
  return 1;
}

static int fill_plot_error_from_helper_result(r_work *work, SEXP helper_result) {
  if (r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) return 0;

  work->structured_error = true;
  if (!copy_optional_character(r.vector_elt(helper_result, 2), work->error_message, sizeof(work->error_message))) {
    work->structured_error = false;
    return 0;
  }

  if (work->error_message[0] == '\0') {
    snprintf(work->error_message, sizeof(work->error_message), "R plot failed");
  }

  if (!copy_character_vector_to_array(
          r.vector_elt(helper_result, 3),
          &work->error_classes,
          &work->error_class_count)) {
    work->structured_error = false;
    return 0;
  }

  if (!copy_optional_character(r.vector_elt(helper_result, 4), work->error_call, sizeof(work->error_call))) {
    work->structured_error = false;
    return 0;
  }

  if (!append_character_vector_join(&work->stdout_buffer, r.vector_elt(helper_result, 5), "\n")) {
    work->structured_error = false;
    return 0;
  }
  if (!append_character_vector_join(&work->messages_buffer, r.vector_elt(helper_result, 6), "")) {
    work->structured_error = false;
    return 0;
  }
  if (!append_character_vector_join(&work->warnings_buffer, r.vector_elt(helper_result, 7), "")) {
    work->structured_error = false;
    return 0;
  }
  return 1;
}

static const char *eval_helper_source =
    "function(exprs, env) {\n"
    "  messages <- character()\n"
    "  warnings <- character()\n"
    "  stdout <- character()\n"
    "  value <- NULL\n"
    "  con <- textConnection(\"stdout\", \"w\", local = TRUE)\n"
    "  sinking <- FALSE\n"
    "  closed <- FALSE\n"
    "  finish <- function() {\n"
    "    if (sinking) {\n"
    "      sink(type = \"output\")\n"
    "      sinking <<- FALSE\n"
    "    }\n"
    "    if (!closed) {\n"
    "      close(con)\n"
    "      closed <<- TRUE\n"
    "    }\n"
    "  }\n"
    "  sink(con, type = \"output\")\n"
    "  sinking <- TRUE\n"
    "  tryCatch(\n"
    "    {\n"
    "      result <- withCallingHandlers({\n"
    "        for (expr in exprs) value <- eval(expr, envir = env)\n"
    "        list(TRUE, value, NULL, NULL, NULL, stdout, messages, warnings)\n"
    "      },\n"
    "      message = function(m) {\n"
    "        messages <<- c(messages, conditionMessage(m))\n"
    "        invokeRestart(\"muffleMessage\")\n"
    "      },\n"
    "      warning = function(w) {\n"
    "        warnings <<- c(warnings, conditionMessage(w))\n"
    "        invokeRestart(\"muffleWarning\")\n"
    "      })\n"
    "      finish()\n"
    "      result[[6]] <- stdout\n"
    "      result[[7]] <- messages\n"
    "      result[[8]] <- warnings\n"
    "      result\n"
    "    },\n"
    "    error = function(e) {\n"
    "      finish()\n"
    "      call <- conditionCall(e)\n"
    "      call_text <- if (is.null(call)) NULL else paste(deparse(call), collapse = \"\\n\")\n"
    "      list(FALSE, NULL, conditionMessage(e), class(e), call_text, stdout, messages, warnings)\n"
    "    }\n"
    "  )\n"
    "}";

static int ensure_eval_helper(r_work *work) {
  if (eval_helper != NULL) return 1;

  int protect_count = 0;
  ParseStatus parse_status = PARSE_NULL;
  SEXP source = r.protect(r.mk_string(eval_helper_source));
  protect_count++;
  SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
  protect_count++;

  if (parse_status != PARSE_OK || r.xlength_fn(exprs) < 1) {
    r.unprotect(protect_count);
    set_error(work, "failed to parse native eval helper");
    return 0;
  }

  int error_occurred = 0;
  SEXP helper = r.try_eval(r.vector_elt(exprs, 0), *r.global_env, &error_occurred);
  if (error_occurred) {
    r.unprotect(protect_count);
    set_error(work, "failed to initialize native eval helper");
    return 0;
  }

  r.preserve_object(helper);
  eval_helper = helper;
  r.unprotect(protect_count);
  return 1;
}

static const char *plot_helper_source =
    "function(exprs, env, options) {\n"
    "  load_packages <- function() {\n"
    "    for (pkg in c(\"grDevices\", \"graphics\", \"stats\", \"utils\", \"datasets\")) {\n"
    "      if (!paste0(\"package:\", pkg) %in% search()) {\n"
    "        suppressPackageStartupMessages(library(pkg, character.only = TRUE, pos = length(search())))\n"
    "      }\n"
    "    }\n"
    "  }\n"
    "  is_ggplot <- function(value) {\n"
    "    inherits(value, \"ggplot\") || inherits(value, \"ggplot2::ggplot\")\n"
    "  }\n"
    "  read_pages <- function(tmpdir, max_pages, max_bytes) {\n"
    "    files <- sort(list.files(tmpdir, pattern = \"^page-[0-9]+[.]png$\", full.names = TRUE))\n"
    "    if (length(files) > max_pages) stop(\"plot produced too many pages\", call. = FALSE)\n"
    "    if (length(files) == 0L) return(list())\n"
    "    info <- file.info(files)\n"
    "    if (any(is.na(info$size) | info$size <= 0L)) {\n"
    "      stop(\"plot device produced an empty PNG file\", call. = FALSE)\n"
    "    }\n"
    "    if (sum(info$size) > max_bytes) stop(\"plot PNG output exceeds byte limit\", call. = FALSE)\n"
    "    sig <- as.raw(c(0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a))\n"
    "    lapply(seq_along(files), function(i) {\n"
    "      bytes <- readBin(files[[i]], what = \"raw\", n = info$size[[i]])\n"
    "      if (length(bytes) < 8L || !identical(bytes[1:8], sig)) {\n"
    "        stop(\"plot device did not produce a valid PNG file\", call. = FALSE)\n"
    "      }\n"
    "      bytes\n"
    "    })\n"
    "  }\n"
    "  messages <- character()\n"
    "  warnings <- character()\n"
    "  plot_error <- NULL\n"
    "  close_error <- NULL\n"
    "  read_error <- NULL\n"
    "  opts <- NULL\n"
    "  pages <- list()\n"
    "  plot_dev <- NULL\n"
    "  old_devices <- integer()\n"
    "  old_dev <- 1L\n"
    "  old_options <- NULL\n"
    "  devices_snapshot <- FALSE\n"
    "  device_sentinel <- function(...) {\n"
    "    stop(\"Rx plot capture device is not available\", call. = FALSE)\n"
    "  }\n"
    "  close_plot_device <- function() {\n"
    "    if (!devices_snapshot) return()\n"
    "    open_devices <- grDevices::dev.list()\n"
    "    if (!is.null(open_devices)) {\n"
    "      opened_devices <- setdiff(unname(as.integer(open_devices)), old_devices)\n"
    "      for (device in rev(sort(opened_devices))) {\n"
    "        tryCatch(\n"
    "          invisible(grDevices::dev.off(which = device)),\n"
    "          error = function(e) { if (is.null(close_error)) close_error <<- e }\n"
    "        )\n"
    "      }\n"
    "    }\n"
    "    open_devices <- grDevices::dev.list()\n"
    "    if (!is.null(plot_dev) && !is.null(open_devices) &&\n"
    "        plot_dev %in% unname(as.integer(open_devices))) {\n"
    "      tryCatch(\n"
    "        invisible(grDevices::dev.off(which = plot_dev)),\n"
    "        error = function(e) { if (is.null(close_error)) close_error <<- e }\n"
    "      )\n"
    "    }\n"
    "    open_devices <- grDevices::dev.list()\n"
    "    if (!is.null(open_devices) && old_dev != 1L &&\n"
    "        old_dev %in% unname(as.integer(open_devices))) {\n"
    "      try(invisible(grDevices::dev.set(which = old_dev)), silent = TRUE)\n"
    "    }\n"
    "  }\n"
    "  tmpdir <- tempfile(\"rx-plot-\")\n"
    "  dir.create(tmpdir, mode = \"0700\")\n"
    "  on.exit(unlink(tmpdir, recursive = TRUE, force = TRUE), add = TRUE)\n"
    "  stdout <- utils::capture.output({\n"
    "    invisible(withCallingHandlers(\n"
    "      tryCatch(\n"
    "        {\n"
    "          load_packages()\n"
    "          opts <- list(\n"
    "            width = as.integer(options$width),\n"
    "            height = as.integer(options$height),\n"
    "            res = as.integer(options$res),\n"
    "            pointsize = as.integer(options$pointsize),\n"
    "            max_pages = as.integer(options$max_pages),\n"
    "            max_bytes = as.numeric(options$max_bytes)\n"
    "          )\n"
    "          old_devices <- grDevices::dev.list()\n"
    "          old_devices <- if (is.null(old_devices)) integer() else unname(as.integer(old_devices))\n"
    "          old_dev <- unname(as.integer(grDevices::dev.cur()))\n"
    "          devices_snapshot <- TRUE\n"
    "          old_options <- base::options(device = device_sentinel)\n"
    "          file_pattern <- file.path(tmpdir, \"page-%06d.png\")\n"
    "          png_args <- list(\n"
    "            filename = file_pattern,\n"
    "            width = opts$width,\n"
    "            height = opts$height,\n"
    "            units = \"px\",\n"
    "            pointsize = opts$pointsize,\n"
    "            bg = \"white\",\n"
    "            res = opts$res\n"
    "          )\n"
    "          if (isTRUE(capabilities(\"cairo\"))) png_args$type <- \"cairo\"\n"
    "          do.call(grDevices::png, png_args)\n"
    "          plot_dev <- unname(as.integer(grDevices::dev.cur()))\n"
    "          for (expr in exprs) {\n"
    "            evaluated <- withVisible(eval(expr, envir = env))\n"
    "            if (isTRUE(evaluated$visible) && is_ggplot(evaluated$value)) {\n"
    "              print(evaluated$value)\n"
    "            }\n"
    "          }\n"
    "          if (!identical(base::getOption(\"device\"), device_sentinel)) {\n"
    "            stop(\"Rx plot capture does not allow changing options(device=...)\", call. = FALSE)\n"
    "          }\n"
    "          NULL\n"
    "        },\n"
    "        error = function(e) { plot_error <<- e; NULL },\n"
    "        finally = {\n"
    "          close_plot_device()\n"
    "          if (!is.null(old_options)) base::options(old_options)\n"
    "        }\n"
    "      ),\n"
    "      message = function(m) {\n"
    "        messages <<- c(messages, conditionMessage(m))\n"
    "        invokeRestart(\"muffleMessage\")\n"
    "      },\n"
    "      warning = function(w) {\n"
    "        warnings <<- c(warnings, conditionMessage(w))\n"
    "        invokeRestart(\"muffleWarning\")\n"
    "      }\n"
    "    ))\n"
    "  })\n"
    "  output_stdout <- paste(stdout, collapse = \"\\n\")\n"
    "  output_messages <- paste(messages, collapse = \"\")\n"
    "  output_warnings <- paste(warnings, collapse = \"\")\n"
    "  error_result <- function(e) {\n"
    "    call <- conditionCall(e)\n"
    "    call_text <- if (is.null(call)) NULL else paste(deparse(call), collapse = \"\\n\")\n"
    "    list(FALSE, NULL, conditionMessage(e), class(e), call_text,\n"
    "         output_stdout, output_messages, output_warnings)\n"
    "  }\n"
    "  if (!is.null(plot_error)) return(error_result(plot_error))\n"
    "  if (!is.null(close_error)) return(error_result(close_error))\n"
    "  pages <- tryCatch(\n"
    "    read_pages(tmpdir, opts$max_pages, opts$max_bytes),\n"
    "    error = function(e) { read_error <<- e; NULL }\n"
    "  )\n"
    "  if (!is.null(read_error)) return(error_result(read_error))\n"
    "  if (length(pages) == 0L) return(error_result(simpleError(\"R code produced no plot\")))\n"
    "  payload <- list(width = opts$width, height = opts$height, pages = pages)\n"
    "  list(TRUE, payload, NULL, NULL, NULL, output_stdout, output_messages, output_warnings)\n"
    "}";

static int ensure_plot_helper(r_work *work) {
  if (plot_helper != NULL) return 1;

  int protect_count = 0;
  ParseStatus parse_status = PARSE_NULL;
  SEXP source = r.protect(r.mk_string(plot_helper_source));
  protect_count++;
  SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
  protect_count++;

  if (parse_status != PARSE_OK || r.xlength_fn(exprs) < 1) {
    r.unprotect(protect_count);
    set_error(work, "failed to parse native plot helper");
    return 0;
  }

  int error_occurred = 0;
  SEXP helper = r.try_eval(r.vector_elt(exprs, 0), *r.global_env, &error_occurred);
  if (error_occurred) {
    r.unprotect(protect_count);
    set_error(work, "failed to initialize native plot helper");
    return 0;
  }

  r.preserve_object(helper);
  plot_helper = helper;
  r.unprotect(protect_count);
  return 1;
}

static void eval_body(void *data) {
  eval_ctx *ctx = (eval_ctx *)data;
  r_work *work = ctx->work;
  int protect_count = 0;
  ParseStatus parse_status = PARSE_NULL;

  SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
  protect_count++;

  for (unsigned i = 0; i < work->global_count; i++) {
    pthread_mutex_lock(&work->global_resources[i]->mutex);
    SEXP value = work->global_resources[i]->sexp;
    bool release_enqueued = work->global_resources[i]->release_enqueued;
    pthread_mutex_unlock(&work->global_resources[i]->mutex);

    if (value == NULL || release_enqueued) {
      snprintf(ctx->error, sizeof(ctx->error), "native R object resource has already been released");
      r.unprotect(protect_count);
      return;
    }

    r.define_var(r.install(work->global_names[i]), value, env);
  }

  SEXP source = r.protect(r.mk_string(work->source));
  protect_count++;

  SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
  protect_count++;

  if (parse_status != PARSE_OK && parse_status != PARSE_NULL) {
    if (!set_parse_error_map(work)) {
      snprintf(ctx->error, sizeof(ctx->error), "R parse failed");
    }
    r.unprotect(protect_count);
    return;
  }

  R_xlen_t expr_count = r.xlength_fn(exprs);
  SEXP call = r.protect(r.lang3(eval_helper, exprs, env));
  protect_count++;

  int error_occurred = 0;
  SEXP helper_result = r.protect(r.try_eval(call, *r.global_env, &error_occurred));
  protect_count++;

  if (error_occurred || r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) {
    snprintf(ctx->error, sizeof(ctx->error), "R evaluation failed");
    r.unprotect(protect_count);
    return;
  }

  SEXP ok_value = r.vector_elt(helper_result, 0);
  bool helper_ok =
      r.type_of(ok_value) == LGLSXP && r.xlength_fn(ok_value) >= 1 && r.logical_elt(ok_value, 0) == 1;

  if (helper_ok) {
    if (!append_character_vector(&work->stdout_buffer, r.vector_elt(helper_result, 5)) ||
        !append_character_vector(&work->messages_buffer, r.vector_elt(helper_result, 6)) ||
        !append_character_vector(&work->warnings_buffer, r.vector_elt(helper_result, 7))) {
      snprintf(ctx->error, sizeof(ctx->error), "failed to capture native eval output");
      r.unprotect(protect_count);
      return;
    }
  } else {
    if (!fill_error_from_helper_result(work, helper_result)) {
      snprintf(ctx->error, sizeof(ctx->error), "R evaluation failed");
    }
    r.unprotect(protect_count);
    return;
  }

  if (expr_count > 0) {
    SEXP result = r.vector_elt(helper_result, 1);
    work->result_resource = make_preserved_resource(result, RESOURCE_KIND_GENERIC);
    if (work->result_resource == NULL) {
      snprintf(ctx->error, sizeof(ctx->error), "failed to allocate native eval result resource");
      r.unprotect(protect_count);
      return;
    }
  }

  SEXP names = r.protect(r.ls_internal(env, TRUE));
  protect_count++;

  R_xlen_t name_count = r.xlength_fn(names);
  if (name_count < 0 || (uint64_t)name_count > UINT_MAX) {
    snprintf(ctx->error, sizeof(ctx->error), "native eval produced too many globals");
    r.unprotect(protect_count);
    return;
  }

  work->result_global_count = (unsigned)name_count;
  if (work->result_global_count > 0) {
    work->result_global_names = calloc(work->result_global_count, sizeof(char *));
    work->result_global_resources = calloc(work->result_global_count, sizeof(rx_object_resource *));

    if (work->result_global_names == NULL || work->result_global_resources == NULL) {
      snprintf(ctx->error, sizeof(ctx->error), "failed to allocate native eval globals");
      r.unprotect(protect_count);
      return;
    }
  }

  for (unsigned i = 0; i < work->result_global_count; i++) {
    SEXP name_string = r.string_elt(names, (R_xlen_t)i);
    size_t name_len = 0;
    if (!copy_r_string(name_string, &work->result_global_names[i], &name_len)) {
      snprintf(ctx->error, sizeof(ctx->error), "failed to copy native eval global name");
      r.unprotect(protect_count);
      return;
    }

    SEXP value = r.find_var_in_frame(env, r.install(work->result_global_names[i]));
    work->result_global_resources[i] = make_preserved_resource(value, RESOURCE_KIND_GENERIC);
    if (work->result_global_resources[i] == NULL) {
      snprintf(ctx->error, sizeof(ctx->error), "failed to allocate native eval global resource");
      r.unprotect(protect_count);
      return;
    }
  }

  r.unprotect(protect_count);
  ctx->ok = true;
}

static void do_eval(r_work *work) {
  work->ok = true;
  work->error[0] = '\0';
  work->result_resource = NULL;
  work->result_global_names = NULL;
  work->result_global_resources = NULL;
  work->result_global_count = 0;

  if (set_terminal_init_failure_if_present(work)) return;

  if (!runtime_initialized) {
    set_error(work, "embedded R runtime is not initialized");
    return;
  }

  if (!load_r_api(work)) return;
  if (!ensure_eval_helper(work)) return;

  eval_ctx ctx = {.work = work, .ok = false, .error = ""};
  ptr_R_WriteConsole_fn previous_write_console = NULL;
  ptr_R_WriteConsoleEx_fn previous_write_console_ex = NULL;

  if (r_write_console != NULL) {
    previous_write_console = *r_write_console;
    current_capture_work = work;
    *r_write_console = capture_write_console;
  }

  if (r_write_console_ex != NULL) {
    previous_write_console_ex = *r_write_console_ex;
    current_capture_work = work;
    *r_write_console_ex = capture_write_console_ex;
  }

  if (!r.toplevel_exec(eval_body, &ctx)) {
    set_error(work, "R non-local error during eval");
    if (r_write_console != NULL) {
      *r_write_console = previous_write_console;
    }
    if (r_write_console_ex != NULL) {
      *r_write_console_ex = previous_write_console_ex;
    }
    current_capture_work = NULL;
    return;
  }

  if (r_write_console != NULL) {
    *r_write_console = previous_write_console;
  }
  if (r_write_console_ex != NULL) {
    *r_write_console_ex = previous_write_console_ex;
  }
  current_capture_work = NULL;

  if (!ctx.ok) {
    if (!work->structured_error) {
      set_error(work, ctx.error[0] == '\0' ? "R evaluation failed" : ctx.error);
    } else {
      work->ok = false;
    }
  }
}

typedef struct print_ctx {
  r_work *work;
  SEXP sexp;
  bool ok;
  char error[1024];
} print_ctx;

static const char *print_eval_source =
    ".rx_old_options <- list()\n"
    "tryCatch({\n"
    "  if (!is.null(.rx_width)) {\n"
    "    .rx_old_options$width <- getOption(\"width\")\n"
    "    options(width = as.integer(.rx_width))\n"
    "  }\n"
    "  if (!is.null(.rx_max_print)) {\n"
    "    .rx_old_options$max.print <- getOption(\"max.print\")\n"
    "    options(max.print = as.integer(.rx_max_print))\n"
    "  }\n"
    "  print(.rx_value)\n"
    "}, finally = {\n"
    "  if (length(.rx_old_options) > 0L) options(.rx_old_options)\n"
    "})";

static void print_body(void *data) {
  print_ctx *ctx = (print_ctx *)data;
  r_work *work = ctx->work;
  int protect_count = 0;
  ParseStatus parse_status = PARSE_NULL;

  SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
  protect_count++;
  r.define_var(r.install(".rx_value"), ctx->sexp, env);

  SEXP width = *r.nil_value;
  if (work->has_print_width) {
    width = r.protect(r.scalar_integer(work->print_width));
    protect_count++;
  }
  r.define_var(r.install(".rx_width"), width, env);

  SEXP max_print = *r.nil_value;
  if (work->has_print_max_print) {
    max_print = r.protect(r.scalar_integer(work->print_max_print));
    protect_count++;
  }
  r.define_var(r.install(".rx_max_print"), max_print, env);

  SEXP source = r.protect(r.mk_string(print_eval_source));
  protect_count++;

  SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
  protect_count++;

  if (parse_status != PARSE_OK || r.xlength_fn(exprs) < 1) {
    snprintf(ctx->error, sizeof(ctx->error), "R print helper parse failed");
    r.unprotect(protect_count);
    return;
  }

  SEXP call = r.protect(r.lang3(eval_helper, exprs, env));
  protect_count++;

  int error_occurred = 0;
  SEXP helper_result = r.protect(r.try_eval(call, *r.global_env, &error_occurred));
  protect_count++;

  if (error_occurred || r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) {
    snprintf(ctx->error, sizeof(ctx->error), "R print failed");
    r.unprotect(protect_count);
    return;
  }

  SEXP ok_value = r.vector_elt(helper_result, 0);
  bool helper_ok =
      r.type_of(ok_value) == LGLSXP && r.xlength_fn(ok_value) >= 1 && r.logical_elt(ok_value, 0) == 1;

  if (helper_ok) {
    if (!append_character_vector_join(&work->stdout_buffer, r.vector_elt(helper_result, 5), "\n") ||
        !append_character_vector_join(&work->messages_buffer, r.vector_elt(helper_result, 6), "") ||
        !append_character_vector_join(&work->warnings_buffer, r.vector_elt(helper_result, 7), "")) {
      snprintf(ctx->error, sizeof(ctx->error), "failed to capture native print output");
      r.unprotect(protect_count);
      return;
    }
  } else {
    if (!fill_print_error_from_helper_result(work, helper_result)) {
      snprintf(ctx->error, sizeof(ctx->error), "R print failed");
    }
    r.unprotect(protect_count);
    return;
  }

  r.unprotect(protect_count);
  ctx->ok = true;
}

static void do_print(r_work *work) {
  work->ok = true;
  work->error[0] = '\0';
  work->structured_error = false;
  work->error_message[0] = '\0';
  work->error_call[0] = '\0';
  work->error_classes = NULL;
  work->error_class_count = 0;

  if (set_terminal_init_failure_if_present(work)) return;

  if (!runtime_initialized) {
    set_error(work, "embedded R runtime is not initialized");
    return;
  }

  if (!load_r_api(work)) return;
  if (!ensure_eval_helper(work)) return;

  pthread_mutex_lock(&work->resource->mutex);
  SEXP sexp = work->resource->sexp;
  bool release_enqueued = work->resource->release_enqueued;
  pthread_mutex_unlock(&work->resource->mutex);

  if (sexp == NULL || release_enqueued) {
    set_error(work, "native R object resource has already been released");
    return;
  }

  print_ctx ctx = {.work = work, .sexp = sexp, .ok = false, .error = ""};

  if (!r.toplevel_exec(print_body, &ctx)) {
    set_error(work, "R non-local error during print");
    return;
  }

  if (!ctx.ok) {
    if (!work->structured_error) {
      set_error(work, ctx.error[0] == '\0' ? "R print failed" : ctx.error);
    } else {
      work->ok = false;
    }
  }
}

typedef struct plot_ctx {
  r_work *work;
  bool ok;
  char error[1024];
} plot_ctx;

static void set_plot_option_name(SEXP names, R_xlen_t index, const char *name) {
  SEXP chars = r.protect(r.mk_char_len_ce(name, (int)strlen(name), CE_UTF8));
  r.set_string_elt(names, index, chars);
  r.unprotect(1);
}

static void set_plot_option_integer(SEXP options, R_xlen_t index, int value) {
  SEXP scalar = r.protect(r.scalar_integer(value));
  r.set_vector_elt(options, index, scalar);
  r.unprotect(1);
}

static SEXP make_plot_options(r_work *work, int *protect_count) {
  SEXP options = r.protect(r.alloc_vector(VECSXP, 6));
  (*protect_count)++;

  SEXP names = r.protect(r.alloc_vector(STRSXP, 6));
  (*protect_count)++;

  set_plot_option_name(names, 0, "width");
  set_plot_option_name(names, 1, "height");
  set_plot_option_name(names, 2, "res");
  set_plot_option_name(names, 3, "pointsize");
  set_plot_option_name(names, 4, "max_pages");
  set_plot_option_name(names, 5, "max_bytes");

  set_plot_option_integer(options, 0, work->plot_width);
  set_plot_option_integer(options, 1, work->plot_height);
  set_plot_option_integer(options, 2, work->plot_res);
  set_plot_option_integer(options, 3, work->plot_pointsize);
  set_plot_option_integer(options, 4, work->plot_max_pages);
  set_plot_option_integer(options, 5, work->plot_max_bytes);

  r.set_attrib(options, *r.names_symbol, names);
  return options;
}

static int copy_plot_integer(SEXP value, int *out) {
  if (r.type_of(value) == INTSXP && r.xlength_fn(value) >= 1) {
    *out = r.integer_elt(value, 0);
    return 1;
  }

  if (r.type_of(value) == REALSXP && r.xlength_fn(value) >= 1) {
    double numeric = r.real_elt(value, 0);
    if (numeric < (double)INT_MIN || numeric > (double)INT_MAX) return 0;
    *out = (int)numeric;
    return 1;
  }

  return 0;
}

static int is_png_bytes(const char *bytes, size_t len) {
  static const unsigned char sig[] = {0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a};
  if (bytes == NULL || len < sizeof(sig)) return 0;
  return memcmp((const unsigned char *)bytes, sig, sizeof(sig)) == 0;
}

static int append_plot_output(r_work *work, SEXP helper_result) {
  if (r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) return 0;

  if (!append_character_vector_join(&work->stdout_buffer, r.vector_elt(helper_result, 5), "\n")) return 0;
  if (!append_character_vector_join(&work->messages_buffer, r.vector_elt(helper_result, 6), "")) return 0;
  if (!append_character_vector_join(&work->warnings_buffer, r.vector_elt(helper_result, 7), "")) return 0;
  return 1;
}

static int copy_plot_success(r_work *work, SEXP helper_result, char *error, size_t error_size) {
  if (r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) {
    snprintf(error, error_size, "R plot helper returned an invalid result");
    return 0;
  }

  SEXP payload = r.vector_elt(helper_result, 1);
  if (r.type_of(payload) != VECSXP || r.xlength_fn(payload) < 3) {
    snprintf(error, error_size, "R plot helper returned an invalid plot payload");
    return 0;
  }

  int width = 0;
  int height = 0;
  if (!copy_plot_integer(r.vector_elt(payload, 0), &width) ||
      !copy_plot_integer(r.vector_elt(payload, 1), &height)) {
    snprintf(error, error_size, "R plot helper returned invalid plot dimensions");
    return 0;
  }

  SEXP pages = r.vector_elt(payload, 2);
  if (r.type_of(pages) != VECSXP) {
    snprintf(error, error_size, "R plot helper returned invalid plot pages");
    return 0;
  }

  R_xlen_t page_count_len = r.xlength_fn(pages);
  if (page_count_len <= 0) {
    snprintf(error, error_size, "R code produced no plot");
    return 0;
  }

  if ((uint64_t)page_count_len > UINT_MAX || page_count_len > 1000) {
    snprintf(error, error_size, "plot produced too many pages");
    return 0;
  }

  if ((int)page_count_len > work->plot_max_pages) {
    snprintf(error, error_size, "plot produced too many pages");
    return 0;
  }

  free_plot_pages(work);
  work->plot_width = width;
  work->plot_height = height;
  work->plot_page_count = (unsigned)page_count_len;
  work->plot_pages = calloc(work->plot_page_count, sizeof(char *));
  work->plot_page_lens = calloc(work->plot_page_count, sizeof(size_t));
  if (work->plot_pages == NULL || work->plot_page_lens == NULL) {
    snprintf(error, error_size, "failed to allocate native plot pages");
    free_plot_pages(work);
    return 0;
  }

  size_t total_bytes = 0;
  for (unsigned i = 0; i < work->plot_page_count; i++) {
    SEXP page = r.vector_elt(pages, (R_xlen_t)i);
    if (r.type_of(page) != RAWSXP) {
      snprintf(error, error_size, "R plot helper returned a non-raw page");
      free_plot_pages(work);
      return 0;
    }

    R_xlen_t page_len = r.xlength_fn(page);
    if (page_len <= 0) {
      snprintf(error, error_size, "plot device produced an empty PNG file");
      free_plot_pages(work);
      return 0;
    }

    if ((uint64_t)page_len > SIZE_MAX) {
      snprintf(error, error_size, "plot PNG output exceeds byte limit");
      free_plot_pages(work);
      return 0;
    }

    size_t len = (size_t)page_len;
    if (total_bytes > SIZE_MAX - len || total_bytes + len > (size_t)work->plot_max_bytes) {
      snprintf(error, error_size, "plot PNG output exceeds byte limit");
      free_plot_pages(work);
      return 0;
    }
    total_bytes += len;

    char *copy = malloc(len);
    if (copy == NULL) {
      snprintf(error, error_size, "failed to allocate native plot page");
      free_plot_pages(work);
      return 0;
    }

    memcpy(copy, r.raw(page), len);
    if (!is_png_bytes(copy, len)) {
      free(copy);
      snprintf(error, error_size, "plot device did not produce a valid PNG file");
      free_plot_pages(work);
      return 0;
    }

    work->plot_pages[i] = copy;
    work->plot_page_lens[i] = len;
  }

  return 1;
}

static void plot_body(void *data) {
  plot_ctx *ctx = (plot_ctx *)data;
  r_work *work = ctx->work;
  int protect_count = 0;
  ParseStatus parse_status = PARSE_NULL;

  SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
  protect_count++;

  for (unsigned i = 0; i < work->global_count; i++) {
    pthread_mutex_lock(&work->global_resources[i]->mutex);
    SEXP value = work->global_resources[i]->sexp;
    bool release_enqueued = work->global_resources[i]->release_enqueued;
    pthread_mutex_unlock(&work->global_resources[i]->mutex);

    if (value == NULL || release_enqueued) {
      snprintf(ctx->error, sizeof(ctx->error), "native R object resource has already been released");
      r.unprotect(protect_count);
      return;
    }

    r.define_var(r.install(work->global_names[i]), value, env);
  }

  SEXP source = r.protect(r.mk_string(work->source));
  protect_count++;

  SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
  protect_count++;

  if (parse_status != PARSE_OK && parse_status != PARSE_NULL) {
    if (!set_parse_error_map(work)) {
      snprintf(ctx->error, sizeof(ctx->error), "R parse failed");
    }
    r.unprotect(protect_count);
    return;
  }

  SEXP options = make_plot_options(work, &protect_count);
  SEXP call = r.protect(r.lang4(plot_helper, exprs, env, options));
  protect_count++;

  int error_occurred = 0;
  SEXP helper_result = r.protect(r.try_eval(call, *r.global_env, &error_occurred));
  protect_count++;

  if (error_occurred || r.type_of(helper_result) != VECSXP || r.xlength_fn(helper_result) < 8) {
    snprintf(ctx->error, sizeof(ctx->error), "R plot failed");
    r.unprotect(protect_count);
    return;
  }

  SEXP ok_value = r.vector_elt(helper_result, 0);
  bool helper_ok =
      r.type_of(ok_value) == LGLSXP && r.xlength_fn(ok_value) >= 1 && r.logical_elt(ok_value, 0) == 1;

  if (helper_ok) {
    if (!append_plot_output(work, helper_result)) {
      snprintf(ctx->error, sizeof(ctx->error), "failed to capture native plot output");
      r.unprotect(protect_count);
      return;
    }

    if (!copy_plot_success(work, helper_result, ctx->error, sizeof(ctx->error))) {
      r.unprotect(protect_count);
      return;
    }
  } else {
    if (!fill_plot_error_from_helper_result(work, helper_result)) {
      snprintf(ctx->error, sizeof(ctx->error), "R plot failed");
    }
    r.unprotect(protect_count);
    return;
  }

  r.unprotect(protect_count);
  ctx->ok = true;
}

static void do_plot(r_work *work) {
  work->ok = true;
  work->error[0] = '\0';
  work->structured_error = false;
  work->error_message[0] = '\0';
  work->error_call[0] = '\0';
  work->error_classes = NULL;
  work->error_class_count = 0;
  work->plot_pages = NULL;
  work->plot_page_lens = NULL;
  work->plot_page_count = 0;

  if (set_terminal_init_failure_if_present(work)) return;

  if (!runtime_initialized) {
    set_error(work, "embedded R runtime is not initialized");
    return;
  }

  if (!load_r_api(work)) return;
  if (!ensure_plot_helper(work)) return;

  plot_ctx ctx = {.work = work, .ok = false, .error = ""};
  ptr_R_WriteConsole_fn previous_write_console = NULL;
  ptr_R_WriteConsoleEx_fn previous_write_console_ex = NULL;

  if (r_write_console != NULL) {
    previous_write_console = *r_write_console;
    current_capture_work = work;
    *r_write_console = capture_write_console;
  }

  if (r_write_console_ex != NULL) {
    previous_write_console_ex = *r_write_console_ex;
    current_capture_work = work;
    *r_write_console_ex = capture_write_console_ex;
  }

  if (!r.toplevel_exec(plot_body, &ctx)) {
    set_error(work, "R non-local error during plot");
    if (r_write_console != NULL) {
      *r_write_console = previous_write_console;
    }
    if (r_write_console_ex != NULL) {
      *r_write_console_ex = previous_write_console_ex;
    }
    current_capture_work = NULL;
    return;
  }

  if (r_write_console != NULL) {
    *r_write_console = previous_write_console;
  }
  if (r_write_console_ex != NULL) {
    *r_write_console_ex = previous_write_console_ex;
  }
  current_capture_work = NULL;

  if (!ctx.ok) {
    free_plot_pages(work);
    if (!work->structured_error) {
      set_error(work, ctx.error[0] == '\0' ? "R plot failed" : ctx.error);
    } else {
      work->ok = false;
    }
  }
}

typedef struct encode_resource_ctx {
  r_work *work;
  bool ok;
} encode_resource_ctx;

static void encode_resource_body(void *data) {
  encode_resource_ctx *ctx = (encode_resource_ctx *)data;
  r_work *work = ctx->work;
  encode_value *encoded = &work->encode;
  int protect_count = 0;
  SEXP value = *r.nil_value;

  switch (encoded->kind) {
    case ENCODE_KIND_NULL:
      value = *r.nil_value;
      break;

    case ENCODE_KIND_LOGICAL:
      value = r.protect(r.scalar_logical(encoded->logical_scalar ? 1 : 0));
      protect_count++;
      break;

    case ENCODE_KIND_INTEGER:
      value = r.protect(r.scalar_integer(encoded->integer_scalar));
      protect_count++;
      break;

    case ENCODE_KIND_DOUBLE:
      value = r.protect(r.scalar_real(encoded->double_scalar));
      protect_count++;
      break;

    case ENCODE_KIND_CHARACTER: {
      SEXP chars = r.protect(r.mk_char_len_ce(
          encoded->string_scalar,
          (int)encoded->string_scalar_len,
          CE_UTF8));
      protect_count++;
      value = r.protect(r.scalar_string(chars));
      protect_count++;
      break;
    }

    case ENCODE_KIND_NA:
      switch (encoded->na_type) {
        case DECODE_ATOMIC_LOGICAL:
          value = r.protect(r.scalar_logical(*r.na_int));
          protect_count++;
          break;

        case DECODE_ATOMIC_INTEGER:
          value = r.protect(r.scalar_integer(*r.na_int));
          protect_count++;
          break;

        case DECODE_ATOMIC_DOUBLE:
          value = r.protect(r.scalar_real(*r.na_real));
          protect_count++;
          break;

        case DECODE_ATOMIC_CHARACTER:
          value = r.protect(r.scalar_string(*r.na_string));
          protect_count++;
          break;
      }
      break;

    case ENCODE_KIND_LOGICAL_VECTOR:
      value = r.protect(r.alloc_vector(LGLSXP, encoded->length));
      protect_count++;
      for (R_xlen_t i = 0; i < encoded->length; i++) {
        r.set_logical_elt(value, i, encoded->logical_values[i] ? 1 : 0);
      }
      break;

    case ENCODE_KIND_INTEGER_VECTOR:
      value = r.protect(r.alloc_vector(INTSXP, encoded->length));
      protect_count++;
      for (R_xlen_t i = 0; i < encoded->length; i++) {
        r.set_integer_elt(value, i, encoded->integer_values[i]);
      }
      break;

    case ENCODE_KIND_DOUBLE_VECTOR:
      value = r.protect(r.alloc_vector(REALSXP, encoded->length));
      protect_count++;
      for (R_xlen_t i = 0; i < encoded->length; i++) {
        r.set_real_elt(value, i, encoded->double_values[i]);
      }
      break;

    case ENCODE_KIND_CHARACTER_VECTOR:
      value = r.protect(r.alloc_vector(STRSXP, encoded->length));
      protect_count++;
      for (R_xlen_t i = 0; i < encoded->length; i++) {
        SEXP chars = r.protect(r.mk_char_len_ce(
            encoded->string_values[i],
            (int)encoded->string_value_lens[i],
            CE_UTF8));
        r.set_string_elt(value, i, chars);
        r.unprotect(1);
      }
      break;

    case ENCODE_KIND_NAMED_LIST: {
      value = r.protect(r.alloc_vector(VECSXP, encoded->length));
      protect_count++;
      SEXP names = r.protect(r.alloc_vector(STRSXP, encoded->length));
      protect_count++;

      for (R_xlen_t i = 0; i < encoded->length; i++) {
        pthread_mutex_lock(&encoded->entry_resources[i]->mutex);
        SEXP entry_value = encoded->entry_resources[i]->sexp;
        bool release_enqueued = encoded->entry_resources[i]->release_enqueued;
        pthread_mutex_unlock(&encoded->entry_resources[i]->mutex);

        if (entry_value == NULL || release_enqueued) {
          set_error(work, "native R object resource has already been released");
          if (protect_count > 0) r.unprotect(protect_count);
          return;
        }

        r.set_vector_elt(value, i, entry_value);

        SEXP chars = r.protect(r.mk_char_len_ce(
            encoded->entry_names[i],
            (int)strlen(encoded->entry_names[i]),
            CE_UTF8));
        r.set_string_elt(names, i, chars);
        r.unprotect(1);
      }

      r.set_attrib(value, *r.names_symbol, names);
      break;
    }
  }

  r.preserve_object(value);

  pthread_mutex_lock(&work->resource->mutex);
  work->resource->sexp = value;
  work->resource->kind = encode_kind_to_resource_kind(encoded->kind);
  pthread_mutex_unlock(&work->resource->mutex);

  if (protect_count > 0) r.unprotect(protect_count);
  ctx->ok = true;
}

static void do_encode_resource(r_work *work) {
  work->ok = true;
  work->error[0] = '\0';

  if (set_terminal_init_failure_if_present(work)) return;

  if (!runtime_initialized) {
    set_error(work, "embedded R runtime is not initialized");
    return;
  }

  if (!load_r_api(work)) return;

  encode_resource_ctx ctx = {.work = work, .ok = false};

  if (!r.toplevel_exec(encode_resource_body, &ctx)) {
    set_error(work, "R non-local error during encode");
    return;
  }

  if (!ctx.ok) {
    set_error(work, "R encode failed");
  }
}

typedef struct decode_resource_ctx {
  r_work *work;
  SEXP sexp;
  resource_kind kind;
  bool ok;
  char error[1024];
} decode_resource_ctx;

static void decode_resource_body(void *data) {
  decode_resource_ctx *ctx = (decode_resource_ctx *)data;
  r_work *work = ctx->work;

  copy_resource_result(work, ctx->sexp, ctx->kind);
  ctx->ok = work->ok;
  if (!work->ok) {
    snprintf(ctx->error, sizeof(ctx->error), "%s", work->error);
  }
}

static void do_decode_resource(r_work *work) {
  work->ok = true;
  work->error[0] = '\0';
  work->result_kind = RESULT_NONE;
  work->string_result = NULL;
  work->unsupported_type = 0;

  if (set_terminal_init_failure_if_present(work)) return;

  if (!runtime_initialized) {
    set_error(work, "embedded R runtime is not initialized");
    return;
  }

  if (!load_r_api(work)) return;

  pthread_mutex_lock(&work->resource->mutex);
  SEXP sexp = work->resource->sexp;
  resource_kind kind = work->resource->kind;
  bool release_enqueued = work->resource->release_enqueued;
  pthread_mutex_unlock(&work->resource->mutex);

  if (sexp == NULL || release_enqueued) {
    set_error(work, "native R object resource has already been released");
    return;
  }

  decode_resource_ctx ctx = {.work = work, .sexp = sexp, .kind = kind, .ok = false, .error = ""};

  if (!r.toplevel_exec(decode_resource_body, &ctx)) {
    set_error(work, "R non-local error during decode");
    return;
  }

  if (!ctx.ok) {
    set_error(work, ctx.error[0] == '\0' ? "R decode failed" : ctx.error);
  }
}

static int eval_source_in_env(SEXP env, const char *source_code, SEXP *out, char *error, size_t error_size) {
  int protect_count = 0;
  ParseStatus parse_status = PARSE_NULL;

  NDBG("eval_source begin: %.96s", source_code);

  SEXP source = r.protect(r.mk_string(source_code));
  protect_count++;
  SEXP exprs = r.protect(r.parse_vector(source, -1, &parse_status, *r.nil_value));
  protect_count++;

  if (parse_status != PARSE_OK && parse_status != PARSE_NULL) {
    snprintf(error, error_size, "R parse failed");
    r.unprotect(protect_count);
    return 0;
  }

  SEXP result = *r.nil_value;
  R_xlen_t expr_count = r.xlength_fn(exprs);

  for (R_xlen_t i = 0; i < expr_count; i++) {
    int error_occurred = 0;
    NDBG("eval_source before try_eval expr=%lld error=%d", (long long)i, error_occurred);
    result = r.try_eval(r.vector_elt(exprs, i), env, &error_occurred);
    NDBG("eval_source after try_eval expr=%lld error=%d", (long long)i, error_occurred);

    if (error_occurred) {
      snprintf(error, error_size, "R evaluation failed");
      r.unprotect(protect_count);
      return 0;
    }
  }

  *out = result;
  r.unprotect(protect_count);
  return 1;
}

static int eval_logical_in_env(SEXP env, const char *source_code, bool *out, char *error, size_t error_size) {
  SEXP value = *r.nil_value;

  if (!eval_source_in_env(env, source_code, &value, error, error_size)) {
    return 0;
  }

  if (r.type_of(value) != LGLSXP || r.xlength_fn(value) < 1) {
    snprintf(error, error_size, "R expression did not return a logical scalar");
    return 0;
  }

  *out = r.logical_elt(value, 0) == 1;
  return 1;
}

static int require_arrow_in_env(SEXP env, char *error, size_t error_size) {
  bool available = false;

  NDBG("require_arrow requireNamespace begin");
  int eval_ok = eval_logical_in_env(env, "requireNamespace(\"arrow\", quietly = TRUE)", &available, error, error_size);
  NDBG("require_arrow requireNamespace end ok=%d available=%d", eval_ok, available ? 1 : 0);

  if (!eval_ok) {
    return 0;
  }

  if (!available) {
    snprintf(error, error_size, "missing_r_package: R 'arrow' package is not installed");
    return 0;
  }

  return 1;
}

typedef struct encode_dataframe_ctx {
  r_work *work;
  bool ok;
  char error[1024];
} encode_dataframe_ctx;

static const char *read_ipc_dataframe_source =
    "value <- arrow::read_ipc_stream(.rx_ipc_raw, as_data_frame = TRUE)\n"
    "if (!inherits(value, \"data.frame\")) stop(\"Arrow IPC did not decode to a data.frame\")\n"
    "value";

static void encode_dataframe_body(void *data) {
  encode_dataframe_ctx *ctx = (encode_dataframe_ctx *)data;
  r_work *work = ctx->work;
  int protect_count = 0;

  SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
  protect_count++;

  SEXP raw = r.protect(r.alloc_vector(RAWSXP, (R_xlen_t)work->ipc_len));
  protect_count++;

  if (work->ipc_len > 0) {
    memcpy(r.raw(raw), work->ipc_bytes, work->ipc_len);
  }

  r.define_var(r.install(".rx_ipc_raw"), raw, env);

  NDBG("encode_dataframe require_arrow begin");
  int arrow_ok = require_arrow_in_env(env, ctx->error, sizeof(ctx->error));
  NDBG("encode_dataframe require_arrow end");

  if (!arrow_ok) {
    r.unprotect(protect_count);
    return;
  }

  SEXP value = *r.nil_value;
  NDBG("encode_dataframe read_ipc_stream begin");
  int read_ok = eval_source_in_env(env, read_ipc_dataframe_source, &value, ctx->error, sizeof(ctx->error));
  NDBG("encode_dataframe read_ipc_stream end");

  if (!read_ok) {
    r.unprotect(protect_count);
    return;
  }

  NDBG("encode_dataframe preserve begin");
  work->result_resource = make_preserved_resource(value, RESOURCE_KIND_DATAFRAME);
  NDBG("encode_dataframe preserve end");

  if (work->result_resource == NULL) {
    snprintf(ctx->error, sizeof(ctx->error), "failed to allocate native dataframe resource");
    r.unprotect(protect_count);
    return;
  }

  r.unprotect(protect_count);
  ctx->ok = true;
}

static void do_encode_dataframe(r_work *work) {
  work->ok = true;
  work->error[0] = '\0';
  work->result_resource = NULL;

  if (set_terminal_init_failure_if_present(work)) return;

  if (!runtime_initialized) {
    set_error(work, "embedded R runtime is not initialized");
    return;
  }

  if (!load_r_api(work)) return;

  encode_dataframe_ctx ctx = {.work = work, .ok = false, .error = ""};

  if (!r.toplevel_exec(encode_dataframe_body, &ctx)) {
    set_error(work, "R non-local error during encode_dataframe");
    return;
  }

  if (!ctx.ok) {
    set_error(work, ctx.error[0] == '\0' ? "R encode_dataframe failed" : ctx.error);
  }
}

typedef struct decode_data_frame_ctx {
  r_work *work;
  SEXP sexp;
  resource_kind kind;
  bool ok;
  char error[1024];
} decode_data_frame_ctx;

typedef struct encode_data_frame_ctx {
  r_work *work;
  bool ok;
  char error[1024];
} encode_data_frame_ctx;

static const char *default_row_names_source =
    ".row_names_info(.rx_df, 1L) <= 0L";

static int eval_unsigned_in_env(SEXP env, const char *source_code, unsigned *out, char *error, size_t error_size) {
  SEXP value = *r.nil_value;
  if (!eval_source_in_env(env, source_code, &value, error, error_size)) return 0;

  if (r.xlength_fn(value) < 1) {
    snprintf(error, error_size, "R expression did not return a scalar integer");
    return 0;
  }

  int integer = 0;
  if (r.type_of(value) == INTSXP) {
    integer = r.integer_elt(value, 0);
  } else if (r.type_of(value) == REALSXP) {
    double numeric = r.real_elt(value, 0);
    if (!isfinite(numeric) || numeric < 0 || numeric > (double)INT_MAX || floor(numeric) != numeric) {
      snprintf(error, error_size, "R expression returned an invalid integer");
      return 0;
    }
    integer = (int)numeric;
  } else {
    snprintf(error, error_size, "R expression did not return an integer");
    return 0;
  }

  if (integer < 0) {
    snprintf(error, error_size, "R expression returned a negative integer");
    return 0;
  }

  *out = (unsigned)integer;
  return 1;
}

static int copy_dataframe_names(SEXP names, unsigned count, char ***out, char *error, size_t error_size) {
  if (count == 0) {
    *out = NULL;
    return 1;
  }

  if (names == *r.nil_value || r.type_of(names) != STRSXP || r.xlength_fn(names) != (R_xlen_t)count) {
    snprintf(error, error_size, "unsupported_dataframe_names: non_string");
    return 0;
  }

  char **items = calloc(count, sizeof(char *));
  if (items == NULL) {
    snprintf(error, error_size, "failed to allocate dataframe names");
    return 0;
  }

  for (unsigned i = 0; i < count; i++) {
    SEXP name = r.string_elt(names, (R_xlen_t)i);
    if (name == *r.na_string) {
      snprintf(error, error_size, "unsupported_dataframe_names: non_string");
      free_string_array(items, count);
      return 0;
    }

    size_t len = 0;
    if (!copy_r_string(name, &items[i], &len)) {
      snprintf(error, error_size, "failed to copy dataframe names");
      free_string_array(items, count);
      return 0;
    }

    if (len == 0) {
      snprintf(error, error_size, "unsupported_dataframe_names: empty");
      free_string_array(items, count);
      return 0;
    }

    if (dataframe_name_duplicate(items, i, items[i])) {
      snprintf(error, error_size, "unsupported_dataframe_names: duplicate");
      free_string_array(items, count);
      return 0;
    }
  }

  *out = items;
  return 1;
}

static int dataframe_column_type_from_r(SEXP column, dataframe_column_type *type, char *reason, size_t reason_size) {
  SEXP dim = r.get_attrib(column, *r.dim_symbol);
  if (dim != *r.nil_value && r.xlength_fn(dim) > 0) {
    snprintf(reason, reason_size, "matrix");
    return 0;
  }

  SEXP class_attr = r.get_attrib(column, *r.class_symbol);
  if (class_attr != *r.nil_value && r.xlength_fn(class_attr) > 0) {
    if (r.inherits(column, "factor")) {
      snprintf(reason, reason_size, "factor");
      return 0;
    }
    if (r.inherits(column, "Date")) {
      snprintf(reason, reason_size, "date");
      return 0;
    }
    if (r.inherits(column, "POSIXt")) {
      snprintf(reason, reason_size, "posix");
      return 0;
    }
  }

  if (r.type_of(column) == VECSXP) {
    snprintf(reason, reason_size, "list");
    return 0;
  }

  if (class_attr != *r.nil_value && r.xlength_fn(class_attr) > 0) {
    SEXP class_name = r.string_elt(class_attr, 0);
    if (class_name != *r.na_string) {
      char *class_copy = NULL;
      size_t len = 0;
      if (copy_r_string(class_name, &class_copy, &len)) {
        snprintf(reason, reason_size, "class:%s", class_copy);
        free(class_copy);
        return 0;
      }
    }
    snprintf(reason, reason_size, "unsupported_type");
    return 0;
  }

  switch (r.type_of(column)) {
    case LGLSXP:
      *type = DATAFRAME_COLUMN_LOGICAL;
      return 1;
    case INTSXP:
      *type = DATAFRAME_COLUMN_INTEGER;
      return 1;
    case REALSXP:
      *type = DATAFRAME_COLUMN_DOUBLE;
      return 1;
    case STRSXP:
      *type = DATAFRAME_COLUMN_CHARACTER;
      return 1;
    case CPLXSXP:
      snprintf(reason, reason_size, "complex");
      return 0;
    case RAWSXP:
      snprintf(reason, reason_size, "raw");
      return 0;
    default:
      snprintf(reason, reason_size, "unsupported_type");
      return 0;
  }
}

static bool same_double_bits(double a, double b) {
  uint64_t a_bits = 0;
  uint64_t b_bits = 0;
  memcpy(&a_bits, &a, sizeof(a_bits));
  memcpy(&b_bits, &b, sizeof(b_bits));
  return a_bits == b_bits;
}

static int copy_dataframe_column_values(SEXP column, dataframe_column *out, char *error, size_t error_size) {
  R_xlen_t length = r.xlength_fn(column);
  if (length < 0 || (uint64_t)length > UINT_MAX) {
    snprintf(error, error_size, "invalid_dataframe: column_length_mismatch");
    return 0;
  }

  out->value_count = (unsigned)length;
  if (out->value_count == 0) return 1;

  out->values = calloc(out->value_count, sizeof(dataframe_cell));
  if (out->values == NULL) {
    snprintf(error, error_size, "failed to allocate dataframe column values");
    return 0;
  }

  for (unsigned i = 0; i < out->value_count; i++) {
    dataframe_cell *cell = &out->values[i];

    switch (out->type) {
      case DATAFRAME_COLUMN_LOGICAL: {
        int value = r.logical_elt(column, (R_xlen_t)i);
        if (value == *r.na_int) {
          cell->is_na = true;
        } else {
          cell->logical_value = value == 1;
        }
        break;
      }

      case DATAFRAME_COLUMN_INTEGER: {
        int value = r.integer_elt(column, (R_xlen_t)i);
        if (value == *r.na_int) {
          cell->is_na = true;
        } else {
          cell->integer_value = value;
        }
        break;
      }

      case DATAFRAME_COLUMN_DOUBLE: {
        double value = r.real_elt(column, (R_xlen_t)i);
        if (isnan(value) && same_double_bits(value, *r.na_real)) {
          cell->is_na = true;
        } else if (!isfinite(value)) {
          snprintf(error, error_size, "unsupported_dataframe_column: %s: non_finite_double", out->name);
          return 0;
        } else {
          cell->double_value = value;
        }
        break;
      }

      case DATAFRAME_COLUMN_CHARACTER: {
        SEXP string = r.string_elt(column, (R_xlen_t)i);
        if (string == *r.na_string) {
          cell->is_na = true;
        } else if (!copy_r_string(string, &cell->string_value, &cell->string_value_len)) {
          snprintf(error, error_size, "failed to copy dataframe character column");
          return 0;
        }
        break;
      }
    }
  }

  return 1;
}

static void decode_data_frame_body(void *data) {
  decode_data_frame_ctx *ctx = (decode_data_frame_ctx *)data;
  r_work *work = ctx->work;
  int protect_count = 0;

  SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
  protect_count++;
  r.define_var(r.install(".rx_df"), ctx->sexp, env);

  if (ctx->kind != RESOURCE_KIND_DATAFRAME && !r.inherits(ctx->sexp, "data.frame")) {
    snprintf(ctx->error, sizeof(ctx->error), "not_dataframe: native object is not a data.frame");
    r.unprotect(protect_count);
    return;
  }

  R_xlen_t column_count_len = r.xlength_fn(ctx->sexp);
  if (column_count_len < 0 || (uint64_t)column_count_len > UINT_MAX) {
    snprintf(ctx->error, sizeof(ctx->error), "invalid_dataframe: malformed_wire");
    r.unprotect(protect_count);
    return;
  }
  unsigned column_count = (unsigned)column_count_len;

  unsigned n_rows = 0;
  if (!eval_unsigned_in_env(env, "nrow(.rx_df)", &n_rows, ctx->error, sizeof(ctx->error))) {
    r.unprotect(protect_count);
    return;
  }

  if (work->has_max_rows && n_rows > work->max_rows) {
    snprintf(ctx->error, sizeof(ctx->error), "dataframe_too_large: %u: %u", n_rows, work->max_rows);
    r.unprotect(protect_count);
    return;
  }

  if (column_count == 0 && n_rows > 0) {
    snprintf(ctx->error, sizeof(ctx->error), "unsupported_dataframe_shape: zero_column_nonzero_row");
    r.unprotect(protect_count);
    return;
  }

  bool default_row_names = false;
  if (!eval_logical_in_env(env, default_row_names_source, &default_row_names, ctx->error, sizeof(ctx->error))) {
    r.unprotect(protect_count);
    return;
  }
  if (!default_row_names) {
    snprintf(ctx->error, sizeof(ctx->error), "unsupported_dataframe_row_names: custom");
    r.unprotect(protect_count);
    return;
  }

  dataframe_wire *wire = calloc(1, sizeof(dataframe_wire));
  if (wire == NULL) {
    snprintf(ctx->error, sizeof(ctx->error), "failed to allocate dataframe wire");
    r.unprotect(protect_count);
    return;
  }
  wire->n_rows = n_rows;
  wire->name_count = column_count;
  wire->column_count = column_count;

  SEXP names_attr = r.get_attrib(ctx->sexp, *r.names_symbol);
  if (!copy_dataframe_names(names_attr, column_count, &wire->names, ctx->error, sizeof(ctx->error))) {
    free_dataframe_wire(wire);
    r.unprotect(protect_count);
    return;
  }

  if (column_count > 0) {
    wire->columns = calloc(column_count, sizeof(dataframe_column));
    if (wire->columns == NULL) {
      snprintf(ctx->error, sizeof(ctx->error), "failed to allocate dataframe columns");
      free_dataframe_wire(wire);
      r.unprotect(protect_count);
      return;
    }
  }

  for (unsigned i = 0; i < column_count; i++) {
    SEXP column_value = r.vector_elt(ctx->sexp, (R_xlen_t)i);
    dataframe_column *column = &wire->columns[i];
    column->name = strdup(wire->names[i]);
    if (column->name == NULL) {
      snprintf(ctx->error, sizeof(ctx->error), "failed to allocate dataframe column name");
      free_dataframe_wire(wire);
      r.unprotect(protect_count);
      return;
    }

    char reason[128] = "";
    if (!dataframe_column_type_from_r(column_value, &column->type, reason, sizeof(reason))) {
      snprintf(ctx->error, sizeof(ctx->error), "unsupported_dataframe_column: %s: %s", column->name, reason);
      free_dataframe_wire(wire);
      r.unprotect(protect_count);
      return;
    }

    if (!copy_dataframe_column_values(column_value, column, ctx->error, sizeof(ctx->error))) {
      free_dataframe_wire(wire);
      r.unprotect(protect_count);
      return;
    }

    if (column->value_count != n_rows) {
      snprintf(ctx->error, sizeof(ctx->error), "invalid_dataframe: column_length_mismatch");
      free_dataframe_wire(wire);
      r.unprotect(protect_count);
      return;
    }
  }

  work->output_dataframe = wire;
  r.unprotect(protect_count);
  ctx->ok = true;
}

static void do_decode_data_frame(r_work *work) {
  work->ok = true;
  work->error[0] = '\0';
  work->output_dataframe = NULL;

  if (set_terminal_init_failure_if_present(work)) return;

  if (!runtime_initialized) {
    set_error(work, "embedded R runtime is not initialized");
    return;
  }

  if (!load_r_api(work)) return;

  pthread_mutex_lock(&work->resource->mutex);
  SEXP sexp = work->resource->sexp;
  resource_kind kind = work->resource->kind;
  bool release_enqueued = work->resource->release_enqueued;
  pthread_mutex_unlock(&work->resource->mutex);

  if (sexp == NULL || release_enqueued) {
    set_error(work, "native R object resource has already been released");
    return;
  }

  decode_data_frame_ctx ctx = {.work = work, .sexp = sexp, .kind = kind, .ok = false, .error = ""};

  if (!r.toplevel_exec(decode_data_frame_body, &ctx)) {
    set_error(work, "R non-local error during decode_data_frame");
    return;
  }

  if (!ctx.ok) {
    set_error(work, ctx.error[0] == '\0' ? "R decode_data_frame failed" : ctx.error);
  }
}

static SEXP make_dataframe_row_names(unsigned n_rows, int *protect_count) {
  if (n_rows == 0) {
    SEXP row_names = r.protect(r.alloc_vector(INTSXP, 0));
    (*protect_count)++;
    return row_names;
  }

  SEXP row_names = r.protect(r.alloc_vector(INTSXP, 2));
  (*protect_count)++;
  r.set_integer_elt(row_names, 0, *r.na_int);
  r.set_integer_elt(row_names, 1, -(int)n_rows);
  return row_names;
}

static SEXP make_dataframe_column_vector(const dataframe_column *column, int *protect_count, char *error, size_t error_size) {
  SEXP vector = *r.nil_value;
  switch (column->type) {
    case DATAFRAME_COLUMN_LOGICAL:
      vector = r.protect(r.alloc_vector(LGLSXP, column->value_count));
      (*protect_count)++;
      for (unsigned i = 0; i < column->value_count; i++) {
        r.set_logical_elt(vector, (R_xlen_t)i, column->values[i].is_na ? *r.na_int : (column->values[i].logical_value ? 1 : 0));
      }
      return vector;

    case DATAFRAME_COLUMN_INTEGER:
      vector = r.protect(r.alloc_vector(INTSXP, column->value_count));
      (*protect_count)++;
      for (unsigned i = 0; i < column->value_count; i++) {
        r.set_integer_elt(vector, (R_xlen_t)i, column->values[i].is_na ? *r.na_int : column->values[i].integer_value);
      }
      return vector;

    case DATAFRAME_COLUMN_DOUBLE:
      vector = r.protect(r.alloc_vector(REALSXP, column->value_count));
      (*protect_count)++;
      for (unsigned i = 0; i < column->value_count; i++) {
        r.set_real_elt(vector, (R_xlen_t)i, column->values[i].is_na ? *r.na_real : column->values[i].double_value);
      }
      return vector;

    case DATAFRAME_COLUMN_CHARACTER:
      vector = r.protect(r.alloc_vector(STRSXP, column->value_count));
      (*protect_count)++;
      for (unsigned i = 0; i < column->value_count; i++) {
        if (column->values[i].is_na) {
          r.set_string_elt(vector, (R_xlen_t)i, *r.na_string);
        } else {
          SEXP chars = r.protect(r.mk_char_len_ce(
              column->values[i].string_value,
              (int)column->values[i].string_value_len,
              CE_UTF8));
          r.set_string_elt(vector, (R_xlen_t)i, chars);
          r.unprotect(1);
        }
      }
      return vector;
  }

  snprintf(error, error_size, "invalid_dataframe: unsupported_type");
  return *r.nil_value;
}

static void encode_data_frame_body(void *data) {
  encode_data_frame_ctx *ctx = (encode_data_frame_ctx *)data;
  r_work *work = ctx->work;
  dataframe_wire *wire = work->input_dataframe;
  int protect_count = 0;

  SEXP df = r.protect(r.alloc_vector(VECSXP, wire->column_count));
  protect_count++;
  SEXP names = r.protect(r.alloc_vector(STRSXP, wire->name_count));
  protect_count++;

  for (unsigned i = 0; i < wire->column_count; i++) {
    SEXP column = make_dataframe_column_vector(&wire->columns[i], &protect_count, ctx->error, sizeof(ctx->error));
    if (column == *r.nil_value && ctx->error[0] != '\0') {
      r.unprotect(protect_count);
      return;
    }

    r.set_vector_elt(df, (R_xlen_t)i, column);

    SEXP name = r.protect(r.mk_char_len_ce(wire->names[i], (int)strlen(wire->names[i]), CE_UTF8));
    r.set_string_elt(names, (R_xlen_t)i, name);
    r.unprotect(1);
  }

  SEXP class_value = r.protect(r.alloc_vector(STRSXP, 1));
  protect_count++;
  SEXP class_name = r.protect(r.mk_char_len_ce("data.frame", 10, CE_UTF8));
  r.set_string_elt(class_value, 0, class_name);
  r.unprotect(1);

  SEXP row_names = make_dataframe_row_names(wire->n_rows, &protect_count);

  r.set_attrib(df, *r.names_symbol, names);
  r.set_attrib(df, *r.class_symbol, class_value);
  r.set_attrib(df, r.install("row.names"), row_names);

  work->result_resource = make_preserved_resource(df, RESOURCE_KIND_DATAFRAME);
  if (work->result_resource == NULL) {
    snprintf(ctx->error, sizeof(ctx->error), "failed to allocate native dataframe resource");
    r.unprotect(protect_count);
    return;
  }

  r.unprotect(protect_count);
  ctx->ok = true;
}

static void do_encode_data_frame_wire(r_work *work) {
  work->ok = true;
  work->error[0] = '\0';
  work->result_resource = NULL;

  if (set_terminal_init_failure_if_present(work)) return;

  if (!runtime_initialized) {
    set_error(work, "embedded R runtime is not initialized");
    return;
  }

  if (!load_r_api(work)) return;

  encode_data_frame_ctx ctx = {.work = work, .ok = false, .error = ""};

  if (!r.toplevel_exec(encode_data_frame_body, &ctx)) {
    set_error(work, "R non-local error during encode_data_frame");
    return;
  }

  if (!ctx.ok) {
    set_error(work, ctx.error[0] == '\0' ? "R encode_data_frame failed" : ctx.error);
  }
}

typedef struct decode_arrow_ctx {
  r_work *work;
  SEXP sexp;
  resource_kind kind;
  bool ok;
  char error[1024];
} decode_arrow_ctx;

static const char *inherits_dataframe_source = "inherits(.rx_df, \"data.frame\")";

static const char *write_ipc_dataframe_source =
    "arrow::write_to_raw(.rx_df, format = \"stream\")";

static void decode_arrow_body(void *data) {
  decode_arrow_ctx *ctx = (decode_arrow_ctx *)data;
  r_work *work = ctx->work;
  int protect_count = 0;

  SEXP env = r.protect(r.new_env(*r.global_env, TRUE, 29));
  protect_count++;
  r.define_var(r.install(".rx_df"), ctx->sexp, env);

  NDBG("decode_arrow require_arrow begin");
  int arrow_ok = require_arrow_in_env(env, ctx->error, sizeof(ctx->error));
  NDBG("decode_arrow require_arrow end");

  if (!arrow_ok) {
    r.unprotect(protect_count);
    return;
  }

  if (ctx->kind != RESOURCE_KIND_DATAFRAME) {
    SEXP inherits = *r.nil_value;
    NDBG("decode_arrow inherits_dataframe begin");
    int inherits_ok = eval_source_in_env(env, inherits_dataframe_source, &inherits, ctx->error, sizeof(ctx->error));
    NDBG("decode_arrow inherits_dataframe end");

    if (!inherits_ok) {
      r.unprotect(protect_count);
      return;
    }

    bool is_dataframe =
        r.type_of(inherits) == LGLSXP && r.xlength_fn(inherits) >= 1 && r.logical_elt(inherits, 0) == 1;

    if (!is_dataframe) {
      snprintf(ctx->error, sizeof(ctx->error), "not_dataframe: native object is not an Arrow-compatible data frame");
      r.unprotect(protect_count);
      return;
    }
  }

  SEXP raw = *r.nil_value;
  NDBG("decode_arrow write_to_raw begin");
  int write_ok = eval_source_in_env(env, write_ipc_dataframe_source, &raw, ctx->error, sizeof(ctx->error));
  NDBG("decode_arrow write_to_raw end");

  if (!write_ok) {
    r.unprotect(protect_count);
    return;
  }

  if (r.type_of(raw) != RAWSXP) {
    snprintf(ctx->error, sizeof(ctx->error), "R arrow writer did not return raw bytes");
    r.unprotect(protect_count);
    return;
  }

  R_xlen_t output_len = r.xlength_fn(raw);
  if (output_len < 0 || (uint64_t)output_len > SIZE_MAX) {
    snprintf(ctx->error, sizeof(ctx->error), "Arrow IPC output is too large");
    r.unprotect(protect_count);
    return;
  }

  work->output_len = (size_t)output_len;
  work->output_bytes = malloc(work->output_len + 1);
  if (work->output_bytes == NULL) {
    snprintf(ctx->error, sizeof(ctx->error), "failed to allocate Arrow IPC output");
    r.unprotect(protect_count);
    return;
  }

  if (work->output_len > 0) {
    memcpy(work->output_bytes, r.raw(raw), work->output_len);
  }
  work->output_bytes[work->output_len] = '\0';

  r.unprotect(protect_count);
  ctx->ok = true;
}

static void do_decode_arrow(r_work *work) {
  work->ok = true;
  work->error[0] = '\0';
  work->output_bytes = NULL;
  work->output_len = 0;

  if (set_terminal_init_failure_if_present(work)) return;

  if (!runtime_initialized) {
    set_error(work, "embedded R runtime is not initialized");
    return;
  }

  if (!load_r_api(work)) return;

  pthread_mutex_lock(&work->resource->mutex);
  SEXP sexp = work->resource->sexp;
  resource_kind kind = work->resource->kind;
  bool release_enqueued = work->resource->release_enqueued;
  pthread_mutex_unlock(&work->resource->mutex);

  if (sexp == NULL || release_enqueued) {
    set_error(work, "native R object resource has already been released");
    return;
  }

  decode_arrow_ctx ctx = {.work = work, .sexp = sexp, .kind = kind, .ok = false, .error = ""};

  if (!r.toplevel_exec(decode_arrow_body, &ctx)) {
    set_error(work, "R non-local error during decode_arrow");
    return;
  }

  if (!ctx.ok) {
    set_error(work, ctx.error[0] == '\0' ? "R decode_arrow failed" : ctx.error);
  }
}

typedef struct release_resource_ctx {
  SEXP sexp;
  bool ok;
} release_resource_ctx;

static void release_resource_body(void *data) {
  release_resource_ctx *ctx = (release_resource_ctx *)data;
  r.release_object(ctx->sexp);
  ctx->ok = true;
}

static void do_release_resource(r_work *work) {
  if (!runtime_initialized) {
    note_release_skipped_uninitialized(work->release_resource_id, "native runtime is not initialized");
    return;
  }

  if (!load_r_api(work)) {
    note_release_failure(work->release_resource_id, work->error[0] == '\0' ? "failed to load native R API for release" : work->error);
    return;
  }

  if (release_fault_enabled("skip_uninitialized")) {
    note_release_skipped_uninitialized(work->release_resource_id, "fault injected release skip before R_ReleaseObject");
    return;
  }

  if (release_fault_enabled("1")) {
    note_release_failure(work->release_resource_id, "fault injected during native resource release");
    return;
  }

  release_resource_ctx ctx = {.sexp = work->release_sexp, .ok = false};

  if (r.toplevel_exec(release_resource_body, &ctx) && ctx.ok) {
    note_release_on_owner_thread(work->release_resource_id);
  } else {
    note_release_failure(work->release_resource_id, "R non-local error during resource release");
  }
}

static void *owner_loop(void *arg) {
  (void)arg;
  set_owner_thread_id(current_thread_id());

  for (;;) {
    r_work *work = dequeue_work();

    switch (work->kind) {
      case WORK_INIT:
        do_init(work);
        break;
      case WORK_EVAL_STRING:
        do_eval_string(work);
        break;
      case WORK_EVAL:
        do_eval(work);
        break;
      case WORK_PRINT:
        do_print(work);
        break;
      case WORK_PLOT:
        do_plot(work);
        break;
      case WORK_ENCODE_RESOURCE:
        do_encode_resource(work);
        break;
      case WORK_DECODE_RESOURCE:
        do_decode_resource(work);
        break;
      case WORK_ENCODE_DATAFRAME:
        do_encode_dataframe(work);
        break;
      case WORK_ENCODE_DATA_FRAME:
        do_encode_data_frame_wire(work);
        break;
      case WORK_DECODE_DATA_FRAME:
        do_decode_data_frame(work);
        break;
      case WORK_DECODE_ARROW:
        do_decode_arrow(work);
        break;
      case WORK_RELEASE:
        do_release_resource(work);
        break;
    }

    finish_work(work);
  }

  return NULL;
}

static int ensure_owner_thread(void) {
  pthread_mutex_lock(&owner_start_mutex);
  if (owner_started) {
    pthread_mutex_unlock(&owner_start_mutex);
    return 1;
  }

  pthread_attr_t attr;
  pthread_attr_init(&attr);
  pthread_attr_setstacksize(&attr, 64 * 1024 * 1024);

  int result = pthread_create(&owner_thread, &attr, owner_loop, NULL);
  pthread_attr_destroy(&attr);

  if (result != 0) {
    pthread_mutex_unlock(&owner_start_mutex);
    return 0;
  }

  owner_started = true;
  pthread_mutex_unlock(&owner_start_mutex);
  return 1;
}

static ERL_NIF_TERM make_error(ErlNifEnv *env, const char *message) {
  return enif_make_tuple2(env, atom_error, enif_make_string(env, message, ERL_NIF_LATIN1));
}

static ERL_NIF_TERM make_tagged_error(ErlNifEnv *env, const char *tag, const char *message) {
  return enif_make_tuple2(
      env,
      atom_error,
      enif_make_tuple2(env, enif_make_atom(env, tag), enif_make_string(env, message, ERL_NIF_LATIN1)));
}

static ERL_NIF_TERM make_binary_term(ErlNifEnv *env, const char *bytes, size_t length, bool *ok) {
  ERL_NIF_TERM term;
  unsigned char *dest = enif_make_new_binary(env, length, &term);
  if (dest == NULL && length > 0) {
    *ok = false;
    return enif_make_atom(env, "nil");
  }

  if (length > 0) memcpy(dest, bytes, length);
  *ok = true;
  return term;
}

static ERL_NIF_TERM make_binary_string(ErlNifEnv *env, const char *value) {
  if (value == NULL) return enif_make_atom(env, "nil");

  bool ok = false;
  ERL_NIF_TERM term = make_binary_term(env, value, strlen(value), &ok);
  if (!ok) return enif_make_string(env, value, ERL_NIF_LATIN1);
  return term;
}

static int map_put_binary_key(ErlNifEnv *env, ERL_NIF_TERM *map, const char *key, ERL_NIF_TERM value) {
  ERL_NIF_TERM key_term = make_binary_string(env, key);
  return enif_make_map_put(env, *map, key_term, value, map);
}

static ERL_NIF_TERM dataframe_reason_term(ErlNifEnv *env, const char *reason) {
  if (strncmp(reason, "class:", 6) == 0) {
    return enif_make_tuple2(env, enif_make_atom(env, "class"), make_binary_string(env, reason + 6));
  }

  return enif_make_atom(env, reason);
}

static ERL_NIF_TERM make_tagged_error_atom(ErlNifEnv *env, const char *tag, const char *reason) {
  return enif_make_tuple2(
      env,
      atom_error,
      enif_make_tuple2(env, enif_make_atom(env, tag), dataframe_reason_term(env, reason)));
}

static ERL_NIF_TERM make_dataframe_column_error(ErlNifEnv *env, const char *detail) {
  const char *separator = strstr(detail, ": ");
  if (separator == NULL) return make_error(env, detail);

  size_t name_len = (size_t)(separator - detail);
  bool ok = false;
  ERL_NIF_TERM name = make_binary_term(env, detail, name_len, &ok);
  if (!ok) return make_error(env, "failed to allocate dataframe column error");

  return enif_make_tuple2(
      env,
      atom_error,
      enif_make_tuple3(
          env,
          enif_make_atom(env, "unsupported_dataframe_column"),
          name,
          dataframe_reason_term(env, separator + 2)));
}

static ERL_NIF_TERM make_dataframe_too_large_error(ErlNifEnv *env, const char *detail) {
  unsigned rows = 0;
  unsigned max_rows = 0;
  if (sscanf(detail, "%u: %u", &rows, &max_rows) != 2) return make_error(env, detail);

  return enif_make_tuple2(
      env,
      atom_error,
      enif_make_tuple3(
          env,
          enif_make_atom(env, "dataframe_too_large"),
          enif_make_uint(env, rows),
          enif_make_uint(env, max_rows)));
}

static ERL_NIF_TERM make_dataframe_error(ErlNifEnv *env, const char *error) {
  if (strncmp(error, "not_dataframe: ", 15) == 0) {
    return make_tagged_error(env, "not_dataframe", error + 15);
  }
  if (strncmp(error, "unsupported_dataframe_column: ", 30) == 0) {
    return make_dataframe_column_error(env, error + 30);
  }
  if (strncmp(error, "unsupported_dataframe_names: ", 29) == 0) {
    return make_tagged_error_atom(env, "unsupported_dataframe_names", error + 29);
  }
  if (strncmp(error, "unsupported_dataframe_row_names: ", 33) == 0) {
    return make_tagged_error_atom(env, "unsupported_dataframe_row_names", error + 33);
  }
  if (strncmp(error, "unsupported_dataframe_shape: ", 29) == 0) {
    return make_tagged_error_atom(env, "unsupported_dataframe_shape", error + 29);
  }
  if (strncmp(error, "invalid_dataframe: ", 19) == 0) {
    return make_tagged_error_atom(env, "invalid_dataframe", error + 19);
  }
  if (strncmp(error, "dataframe_too_large: ", 21) == 0) {
    return make_dataframe_too_large_error(env, error + 21);
  }

  return make_error(env, error);
}

static int make_dataframe_cell_term(ErlNifEnv *env, const dataframe_cell *cell, dataframe_column_type type, ERL_NIF_TERM *out) {
  if (cell->is_na) {
    ERL_NIF_TERM na = enif_make_new_map(env);
    if (!map_put_binary_key(env, &na, "kind", make_binary_string(env, "na")) ||
        !map_put_binary_key(env, &na, "type", make_binary_string(env, dataframe_type_name(type)))) {
      return 0;
    }
    *out = na;
    return 1;
  }

  switch (type) {
    case DATAFRAME_COLUMN_LOGICAL:
      *out = enif_make_atom(env, cell->logical_value ? "true" : "false");
      return 1;

    case DATAFRAME_COLUMN_INTEGER:
      *out = enif_make_int(env, cell->integer_value);
      return 1;

    case DATAFRAME_COLUMN_DOUBLE:
      *out = enif_make_double(env, cell->double_value);
      return 1;

    case DATAFRAME_COLUMN_CHARACTER: {
      bool ok = false;
      *out = make_binary_term(env, cell->string_value, cell->string_value_len, &ok);
      return ok;
    }
  }

  return 0;
}

static int make_dataframe_values_list(ErlNifEnv *env, const dataframe_column *column, ERL_NIF_TERM *out) {
  if (column->value_count == 0) {
    *out = enif_make_list(env, 0);
    return 1;
  }

  ERL_NIF_TERM *items = enif_alloc((size_t)column->value_count * sizeof(ERL_NIF_TERM));
  if (items == NULL) return 0;

  for (unsigned i = 0; i < column->value_count; i++) {
    if (!make_dataframe_cell_term(env, &column->values[i], column->type, &items[i])) {
      enif_free(items);
      return 0;
    }
  }

  *out = enif_make_list_from_array(env, items, column->value_count);
  enif_free(items);
  return 1;
}

static int make_dataframe_wire_term(ErlNifEnv *env, const dataframe_wire *wire, ERL_NIF_TERM *out) {
  ERL_NIF_TERM names = enif_make_list(env, 0);
  for (unsigned i = wire->name_count; i > 0; i--) {
    names = enif_make_list_cell(env, make_binary_string(env, wire->names[i - 1]), names);
  }

  ERL_NIF_TERM columns = enif_make_list(env, 0);
  for (unsigned i = wire->column_count; i > 0; i--) {
    const dataframe_column *column = &wire->columns[i - 1];
    ERL_NIF_TERM values;
    if (!make_dataframe_values_list(env, column, &values)) return 0;

    ERL_NIF_TERM column_map = enif_make_new_map(env);
    if (!map_put_binary_key(env, &column_map, "name", make_binary_string(env, column->name)) ||
        !map_put_binary_key(env, &column_map, "type", make_binary_string(env, dataframe_type_name(column->type))) ||
        !map_put_binary_key(env, &column_map, "values", values)) {
      return 0;
    }

    columns = enif_make_list_cell(env, column_map, columns);
  }

  ERL_NIF_TERM wire_map = enif_make_new_map(env);
  if (!map_put_binary_key(env, &wire_map, "kind", make_binary_string(env, "data_frame")) ||
      !map_put_binary_key(env, &wire_map, "names", names) ||
      !map_put_binary_key(env, &wire_map, "n_rows", enif_make_uint(env, wire->n_rows)) ||
      !map_put_binary_key(env, &wire_map, "columns", columns)) {
    return 0;
  }

  *out = wire_map;
  return 1;
}

static ERL_NIF_TERM make_init_config_map(ErlNifEnv *env, const init_config *config) {
  ERL_NIF_TERM map = enif_make_new_map(env);
  ERL_NIF_TERM lib_paths_list = enif_make_list(env, 0);

  for (unsigned i = config->lib_paths_count; i > 0; i--) {
    lib_paths_list = enif_make_list_cell(env, make_binary_string(env, config->lib_paths[i - 1]), lib_paths_list);
  }

  enif_make_map_put(env, map, enif_make_atom(env, "r_home"), make_binary_string(env, config->r_home), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "lib_r_path"), make_binary_string(env, config->lib_r_path), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "lib_paths"), lib_paths_list, &map);
  return map;
}

static ERL_NIF_TERM make_optional_config(ErlNifEnv *env, const init_config *config) {
  if (config->r_home == NULL || config->lib_r_path == NULL) return enif_make_atom(env, "nil");
  return make_init_config_map(env, config);
}

static ERL_NIF_TERM make_mismatch_list(ErlNifEnv *env, const init_mismatch_info *mismatch) {
  ERL_NIF_TERM items[3];
  unsigned count = 0;
  if (mismatch->r_home) items[count++] = enif_make_atom(env, "r_home");
  if (mismatch->lib_r_path) items[count++] = enif_make_atom(env, "lib_r_path");
  if (mismatch->lib_paths) items[count++] = enif_make_atom(env, "lib_paths");
  if (count == 0) return enif_make_list(env, 0);
  return enif_make_list_from_array(env, items, count);
}

static ERL_NIF_TERM make_native_init_mismatch_error(ErlNifEnv *env, const init_mismatch_info *mismatch) {
  ERL_NIF_TERM map = enif_make_new_map(env);
  enif_make_map_put(env, map, enif_make_atom(env, "message"), make_binary_string(env, mismatch->message), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "mismatches"), make_mismatch_list(env, mismatch), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "current"), make_init_config_map(env, &mismatch->current), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "requested"), make_init_config_map(env, &mismatch->requested), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "restart_required"), enif_make_atom(env, "true"), &map);
  return enif_make_tuple2(env, atom_error, enif_make_tuple2(env, enif_make_atom(env, "native_init_mismatch"), map));
}

static ERL_NIF_TERM init_state_atom(ErlNifEnv *env, init_state_tag state) {
  switch (state) {
    case INIT_STATE_INITIALIZED: return enif_make_atom(env, "initialized");
    case INIT_STATE_FAILED: return enif_make_atom(env, "failed");
    case INIT_STATE_UNINITIALIZED:
    default: return enif_make_atom(env, "uninitialized");
  }
}

static ERL_NIF_TERM make_optional_init_failure(ErlNifEnv *env, const init_failure_info *failure, bool has_failure) {
  if (!has_failure) return enif_make_atom(env, "nil");

  ERL_NIF_TERM map = enif_make_new_map(env);
  enif_make_map_put(env, map, enif_make_atom(env, "stage"), enif_make_atom(env, failure->stage), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "message"), make_binary_string(env, failure->message), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "retryable"), enif_make_atom(env, failure->retryable ? "true" : "false"), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "restart_required"), enif_make_atom(env, failure->restart_required ? "true" : "false"), &map);
  return map;
}

static ERL_NIF_TERM make_native_init_failed_error(ErlNifEnv *env, const init_failure_info *failure) {
  return enif_make_tuple2(env, atom_error,
                          enif_make_tuple2(env, enif_make_atom(env, "native_init_failed"),
                                           make_optional_init_failure(env, failure, true)));
}

static ERL_NIF_TERM rx_diagnostics(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  (void)argv;
  if (argc != 0) return enif_make_badarg(env);

  pthread_mutex_lock(&diagnostics_mutex);
  ERL_NIF_TERM map = enif_make_new_map(env);
  enif_make_map_put(env, map, enif_make_atom(env, "init_state"), init_state_atom(env, init_state), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "init_config"), make_optional_config(env, &successful_init_config), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "attempted_init_config"), make_optional_config(env, &attempted_init_config), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "last_init_error"), make_optional_init_failure(env, &last_init_error, has_last_init_error), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "init_attempt_count"), enif_make_uint64(env, init_attempt_count), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "init_mismatch_count"), enif_make_uint64(env, init_mismatch_count), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "owner_thread_id"), enif_make_uint64(env, owner_thread_id_value), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "release_enqueued_count"), enif_make_uint64(env, release_enqueued_count), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "release_success_count"), enif_make_uint64(env, release_success_count), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "release_failure_count"), enif_make_uint64(env, release_failure_count), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "release_skipped_uninitialized_count"), enif_make_uint64(env, release_skipped_uninitialized_count), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "last_release_thread_id"), enif_make_uint64(env, last_release_thread_id_value), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "last_release_resource_id"), has_last_release_resource_id ? enif_make_uint64(env, last_release_resource_id_value) : enif_make_atom(env, "nil"), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "last_release_error"), last_release_error[0] == '\0' ? enif_make_atom(env, "nil") : make_binary_string(env, last_release_error), &map);
  pthread_mutex_unlock(&diagnostics_mutex);
  return map;
}

static int result_list_count(r_work *work, unsigned *out) {
  if (work->result_length < 0 || (uint64_t)work->result_length > UINT_MAX) {
    return 0;
  }

  *out = (unsigned)work->result_length;
  return 1;
}

static int make_result_list(ErlNifEnv *env, r_work *work, r_result_kind kind, ERL_NIF_TERM *out) {
  unsigned count;
  if (!result_list_count(work, &count)) {
    return 0;
  }

  if (count == 0) {
    *out = enif_make_list(env, 0);
    return 1;
  }

  ERL_NIF_TERM *items = enif_alloc((size_t)count * sizeof(ERL_NIF_TERM));
  if (items == NULL) {
    return 0;
  }

  for (unsigned i = 0; i < count; i++) {
    switch (kind) {
      case RESULT_INTEGER_VECTOR:
        items[i] = enif_make_int(env, work->integer_values[i]);
        break;

      case RESULT_DOUBLE_VECTOR:
        items[i] = enif_make_double(env, work->double_values[i]);
        break;

      case RESULT_LOGICAL_VECTOR:
        items[i] = enif_make_atom(env, work->logical_values[i] ? "true" : "false");
        break;

      case RESULT_CHARACTER_VECTOR: {
        bool ok = false;
        items[i] = make_binary_term(env, work->string_values[i], work->string_value_lens[i], &ok);
        if (!ok) {
          enif_free(items);
          return 0;
        }
        break;
      }

      default:
        enif_free(items);
        return 0;
    }
  }

  *out = enif_make_list_from_array(env, items, count);
  enif_free(items);
  return 1;
}

static ERL_NIF_TERM decoded_atomic_atom(ErlNifEnv *env, decoded_atomic_type type) {
  switch (type) {
    case DECODE_ATOMIC_LOGICAL:
      return enif_make_atom(env, "logical");
    case DECODE_ATOMIC_INTEGER:
      return enif_make_atom(env, "integer");
    case DECODE_ATOMIC_DOUBLE:
      return enif_make_atom(env, "double");
    case DECODE_ATOMIC_CHARACTER:
      return enif_make_atom(env, "character");
  }

  return enif_make_atom(env, "unknown");
}

static ERL_NIF_TERM decoded_vector_atom(ErlNifEnv *env, decoded_atomic_type type) {
  switch (type) {
    case DECODE_ATOMIC_LOGICAL:
      return enif_make_atom(env, "logical_vector");
    case DECODE_ATOMIC_INTEGER:
      return enif_make_atom(env, "integer_vector");
    case DECODE_ATOMIC_DOUBLE:
      return enif_make_atom(env, "double_vector");
    case DECODE_ATOMIC_CHARACTER:
      return enif_make_atom(env, "character_vector");
  }

  return enif_make_atom(env, "unknown_vector");
}

static int make_decoded_term(ErlNifEnv *env, decoded_value *decoded, ERL_NIF_TERM *out);

static int make_decoded_value_list(ErlNifEnv *env, decoded_value **values, unsigned count, ERL_NIF_TERM *out) {
  if (count == 0) {
    *out = enif_make_list(env, 0);
    return 1;
  }

  ERL_NIF_TERM *items = enif_alloc((size_t)count * sizeof(ERL_NIF_TERM));
  if (items == NULL) return 0;

  for (unsigned i = 0; i < count; i++) {
    if (!make_decoded_term(env, values[i], &items[i])) {
      enif_free(items);
      return 0;
    }
  }

  *out = enif_make_list_from_array(env, items, count);
  enif_free(items);
  return 1;
}

static int make_decoded_entry_list(ErlNifEnv *env, decoded_entry *entries, unsigned count, ERL_NIF_TERM *out) {
  if (count == 0) {
    *out = enif_make_list(env, 0);
    return 1;
  }

  ERL_NIF_TERM *items = enif_alloc((size_t)count * sizeof(ERL_NIF_TERM));
  if (items == NULL) return 0;

  for (unsigned i = 0; i < count; i++) {
    ERL_NIF_TERM name = enif_make_atom(env, "nil");

    if (entries[i].name != NULL) {
      bool ok = false;
      name = make_binary_term(env, entries[i].name, strlen(entries[i].name), &ok);
      if (!ok) {
        enif_free(items);
        return 0;
      }
    }

    ERL_NIF_TERM value;
    if (!make_decoded_term(env, entries[i].value, &value)) {
      enif_free(items);
      return 0;
    }

    items[i] = enif_make_tuple2(env, name, value);
  }

  *out = enif_make_list_from_array(env, items, count);
  enif_free(items);
  return 1;
}

static int make_decoded_term(ErlNifEnv *env, decoded_value *decoded, ERL_NIF_TERM *out) {
  if (decoded == NULL) return 0;

  switch (decoded->kind) {
    case DECODE_VALUE_NULL:
      *out = enif_make_tuple2(env, enif_make_atom(env, "null"), enif_make_atom(env, "nil"));
      return 1;

    case DECODE_VALUE_LOGICAL:
      *out = enif_make_tuple2(
          env,
          enif_make_atom(env, "logical"),
          enif_make_atom(env, decoded->logical_value ? "true" : "false"));
      return 1;

    case DECODE_VALUE_INTEGER:
      *out = enif_make_tuple2(env, enif_make_atom(env, "integer"), enif_make_int(env, decoded->integer_value));
      return 1;

    case DECODE_VALUE_DOUBLE:
      *out = enif_make_tuple2(env, enif_make_atom(env, "double"), enif_make_double(env, decoded->double_value));
      return 1;

    case DECODE_VALUE_CHARACTER: {
      bool ok = false;
      ERL_NIF_TERM binary = make_binary_term(env, decoded->string_value, decoded->string_value_len, &ok);
      if (!ok) return 0;
      *out = enif_make_tuple2(env, enif_make_atom(env, "character"), binary);
      return 1;
    }

    case DECODE_VALUE_NA:
      *out = enif_make_tuple2(env, enif_make_atom(env, "na"), decoded_atomic_atom(env, decoded->atomic_type));
      return 1;

    case DECODE_VALUE_VECTOR: {
      ERL_NIF_TERM values;
      if (!make_decoded_value_list(env, decoded->items, decoded->count, &values)) return 0;
      *out = enif_make_tuple2(env, decoded_vector_atom(env, decoded->atomic_type), values);
      return 1;
    }

    case DECODE_VALUE_NAMED_LIST: {
      ERL_NIF_TERM entries;
      if (!make_decoded_entry_list(env, decoded->entries, decoded->count, &entries)) return 0;
      *out = enif_make_tuple2(env, enif_make_atom(env, "named_list"), entries);
      return 1;
    }

    case DECODE_VALUE_R_LIST: {
      ERL_NIF_TERM entries;
      if (!make_decoded_entry_list(env, decoded->entries, decoded->count, &entries)) return 0;
      *out = enif_make_tuple2(env, enif_make_atom(env, "r_list"), entries);
      return 1;
    }

    case DECODE_VALUE_OBJECT:
      if (decoded->resource == NULL) return 0;
      *out =
          enif_make_tuple2(env, enif_make_atom(env, "object"), enif_make_resource(env, decoded->resource));
      return 1;
  }

  return 0;
}

static ERL_NIF_TERM make_eval_success(ErlNifEnv *env, r_work *work) {
  ERL_NIF_TERM value;

  if (work->decoded_result != NULL) {
    if (!make_decoded_term(env, work->decoded_result, &value)) {
      return make_error(env, "failed to allocate native decode result");
    }

    return enif_make_tuple2(env, atom_ok, value);
  }

  switch (work->result_kind) {
    case RESULT_NULL:
      value = enif_make_tuple2(env, enif_make_atom(env, "null"), enif_make_atom(env, "nil"));
      break;

    case RESULT_INTEGER:
      value = enif_make_tuple2(env, enif_make_atom(env, "integer"), enif_make_int(env, work->integer_result));
      break;

    case RESULT_DOUBLE:
      value = enif_make_tuple2(env, enif_make_atom(env, "double"), enif_make_double(env, work->double_result));
      break;

    case RESULT_LOGICAL:
      value = enif_make_tuple2(
          env,
          enif_make_atom(env, "logical"),
          enif_make_atom(env, work->logical_result ? "true" : "false"));
      break;

    case RESULT_CHARACTER: {
      bool ok = false;
      ERL_NIF_TERM binary = make_binary_term(env, work->string_result, work->string_result_len, &ok);
      if (!ok) return make_error(env, "failed to allocate character binary");
      value = enif_make_tuple2(env, enif_make_atom(env, "character"), binary);
      break;
    }

    case RESULT_INTEGER_VECTOR:
      if (!make_result_list(env, work, RESULT_INTEGER_VECTOR, &value)) return make_error(env, "failed to allocate integer vector result");
      value = enif_make_tuple2(env, enif_make_atom(env, "integer_vector"), value);
      break;

    case RESULT_DOUBLE_VECTOR:
      if (!make_result_list(env, work, RESULT_DOUBLE_VECTOR, &value)) return make_error(env, "failed to allocate double vector result");
      value = enif_make_tuple2(env, enif_make_atom(env, "double_vector"), value);
      break;

    case RESULT_LOGICAL_VECTOR:
      if (!make_result_list(env, work, RESULT_LOGICAL_VECTOR, &value)) return make_error(env, "failed to allocate logical vector result");
      value = enif_make_tuple2(env, enif_make_atom(env, "logical_vector"), value);
      break;

    case RESULT_CHARACTER_VECTOR:
      if (!make_result_list(env, work, RESULT_CHARACTER_VECTOR, &value)) return make_error(env, "failed to allocate character vector result");
      value = enif_make_tuple2(env, enif_make_atom(env, "character_vector"), value);
      break;

    case RESULT_UNSUPPORTED:
      return make_tagged_error(env, "unsupported_r_value", "R result type is not supported by the native harness");

    case RESULT_NONE:
    default:
      return make_error(env, "native eval did not produce a result");
  }

  return enif_make_tuple2(env, atom_ok, value);
}

static ERL_NIF_TERM make_output_map(ErlNifEnv *env, r_work *work) {
  ERL_NIF_TERM map = enif_make_new_map(env);
  bool ok = false;
  ERL_NIF_TERM stdout_value =
      make_binary_term(env, work->stdout_buffer.data == NULL ? "" : work->stdout_buffer.data, work->stdout_buffer.len, &ok);
  if (!ok) stdout_value = enif_make_string(env, "", ERL_NIF_LATIN1);

  ERL_NIF_TERM messages_value =
      make_binary_term(env, work->messages_buffer.data == NULL ? "" : work->messages_buffer.data, work->messages_buffer.len, &ok);
  if (!ok) messages_value = enif_make_string(env, "", ERL_NIF_LATIN1);

  ERL_NIF_TERM warnings_value =
      make_binary_term(env, work->warnings_buffer.data == NULL ? "" : work->warnings_buffer.data, work->warnings_buffer.len, &ok);
  if (!ok) warnings_value = enif_make_string(env, "", ERL_NIF_LATIN1);

  enif_make_map_put(env, map, enif_make_atom(env, "stdout"), stdout_value, &map);
  enif_make_map_put(env, map, enif_make_atom(env, "messages"), messages_value, &map);
  enif_make_map_put(env, map, enif_make_atom(env, "warnings"), warnings_value, &map);
  return map;
}

static ERL_NIF_TERM make_string_list_term(ErlNifEnv *env, char **items, unsigned count) {
  if (count == 0) return enif_make_atom(env, "nil");

  ERL_NIF_TERM *terms = enif_alloc((size_t)count * sizeof(ERL_NIF_TERM));
  if (terms == NULL) return enif_make_atom(env, "nil");

  for (unsigned i = 0; i < count; i++) {
    bool ok = false;
    terms[i] = make_binary_term(env, items[i], strlen(items[i]), &ok);
    if (!ok) terms[i] = enif_make_string(env, items[i], ERL_NIF_LATIN1);
  }

  ERL_NIF_TERM list = enif_make_list_from_array(env, terms, count);
  enif_free(terms);
  return list;
}

static int make_eval_global_list(ErlNifEnv *env, r_work *work, ERL_NIF_TERM *out) {
  if (work->result_global_count == 0) {
    *out = enif_make_list(env, 0);
    return 1;
  }

  ERL_NIF_TERM *items = enif_alloc((size_t)work->result_global_count * sizeof(ERL_NIF_TERM));
  if (items == NULL) return 0;

  for (unsigned i = 0; i < work->result_global_count; i++) {
    bool ok = false;
    ERL_NIF_TERM name = make_binary_term(env, work->result_global_names[i], strlen(work->result_global_names[i]), &ok);
    if (!ok) {
      enif_free(items);
      return 0;
    }
    ERL_NIF_TERM resource = enif_make_resource(env, work->result_global_resources[i]);
    items[i] = enif_make_tuple2(env, name, resource);
  }

  *out = enif_make_list_from_array(env, items, work->result_global_count);
  enif_free(items);
  return 1;
}

static ERL_NIF_TERM make_native_eval_success(ErlNifEnv *env, r_work *work) {
  ERL_NIF_TERM result =
      work->result_resource == NULL ? enif_make_atom(env, "nil") : enif_make_resource(env, work->result_resource);

  ERL_NIF_TERM globals;
  if (!make_eval_global_list(env, work, &globals)) {
    return make_error(env, "failed to allocate native eval globals result");
  }

  ERL_NIF_TERM payload = enif_make_tuple3(env, result, globals, make_output_map(env, work));
  return enif_make_tuple2(env, atom_ok, payload);
}

static int make_plot_page_list(ErlNifEnv *env, r_work *work, ERL_NIF_TERM *out) {
  if (work->plot_page_count == 0) {
    *out = enif_make_list(env, 0);
    return 1;
  }

  ERL_NIF_TERM *items = enif_alloc((size_t)work->plot_page_count * sizeof(ERL_NIF_TERM));
  if (items == NULL) return 0;

  for (unsigned i = 0; i < work->plot_page_count; i++) {
    bool ok = false;
    items[i] = make_binary_term(env, work->plot_pages[i], work->plot_page_lens[i], &ok);
    if (!ok) {
      enif_free(items);
      return 0;
    }
  }

  *out = enif_make_list_from_array(env, items, work->plot_page_count);
  enif_free(items);
  return 1;
}

static ERL_NIF_TERM make_native_plot_success(ErlNifEnv *env, r_work *work) {
  ERL_NIF_TERM pages;
  if (!make_plot_page_list(env, work, &pages)) {
    return make_error(env, "failed to allocate native plot pages result");
  }

  ERL_NIF_TERM payload = enif_make_tuple4(
      env,
      enif_make_int(env, work->plot_width),
      enif_make_int(env, work->plot_height),
      pages,
      make_output_map(env, work));
  return enif_make_tuple2(env, atom_ok, payload);
}

static ERL_NIF_TERM make_native_eval_error(ErlNifEnv *env, r_work *work) {
  ERL_NIF_TERM map = enif_make_new_map(env);
  bool ok = false;
  ERL_NIF_TERM message =
      make_binary_term(env, work->error_message, strlen(work->error_message), &ok);
  if (!ok) message = enif_make_string(env, work->error_message, ERL_NIF_LATIN1);

  ERL_NIF_TERM call = enif_make_atom(env, "nil");
  if (work->error_call[0] != '\0') {
    call = make_binary_term(env, work->error_call, strlen(work->error_call), &ok);
    if (!ok) call = enif_make_string(env, work->error_call, ERL_NIF_LATIN1);
  }

  enif_make_map_put(env, map, enif_make_atom(env, "message"), message, &map);
  enif_make_map_put(
      env,
      map,
      enif_make_atom(env, "r_class"),
      make_string_list_term(env, work->error_classes, work->error_class_count),
      &map);
  enif_make_map_put(env, map, enif_make_atom(env, "call"), call, &map);
  enif_make_map_put(env, map, enif_make_atom(env, "traceback"), enif_make_list(env, 0), &map);
  enif_make_map_put(env, map, enif_make_atom(env, "output"), make_output_map(env, work), &map);
  return enif_make_tuple2(env, atom_error, map);
}

static void rx_object_resource_dtor(ErlNifEnv *env, void *obj) {
  (void)env;
  rx_object_resource *resource = (rx_object_resource *)obj;

  pthread_mutex_lock(&resource->mutex);
  SEXP sexp = resource->sexp;
  uint64_t resource_id = resource->id;

  if (resource->release_enqueued || sexp == NULL) {
    pthread_mutex_unlock(&resource->mutex);
    pthread_mutex_destroy(&resource->mutex);
    return;
  }

  r_work *work = calloc(1, sizeof(r_work));
  if (work == NULL) {
    note_release_failure(resource_id, "failed to allocate native resource release work");
    pthread_mutex_unlock(&resource->mutex);
    pthread_mutex_destroy(&resource->mutex);
    return;
  }

  resource->release_enqueued = true;
  pthread_mutex_unlock(&resource->mutex);
  pthread_mutex_destroy(&resource->mutex);

  work->kind = WORK_RELEASE;
  work->release_sexp = sexp;
  work->release_resource_id = resource_id;
  work->next = NULL;

  if (!ensure_owner_thread()) {
    note_release_failure(resource_id, "failed to start native R owner thread for release");
    free(work);
    return;
  }

  note_release_enqueued(resource_id);
  enqueue_work(work);
}

static ERL_NIF_TERM rx_init(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  if (argc != 3) return enif_make_badarg(env);

  r_work work = {.kind = WORK_INIT, .done = false, .ok = false, .next = NULL};

  if (!get_string(env, argv[0], &work.lib_r_path) ||
      !get_string(env, argv[1], &work.r_home) ||
      !get_string_list(env, argv[2], &work.lib_paths, &work.lib_paths_count)) {
    free(work.lib_r_path);
    free(work.r_home);
    free_string_array(work.lib_paths, work.lib_paths_count);
    return enif_make_badarg(env);
  }

  if (!ensure_owner_thread()) {
    free(work.lib_r_path);
    free(work.r_home);
    free_string_array(work.lib_paths, work.lib_paths_count);
    return make_error(env, "failed to start embedded R owner thread");
  }

  pthread_cond_init(&work.done_cond, NULL);
  enqueue_work(&work);

  pthread_mutex_lock(&queue_mutex);
  while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
  pthread_mutex_unlock(&queue_mutex);

  pthread_cond_destroy(&work.done_cond);
  free(work.lib_r_path);
  free(work.r_home);
  free_string_array(work.lib_paths, work.lib_paths_count);

  ERL_NIF_TERM result;
  if (work.ok) {
    result = atom_ok;
  } else if (work.init_return == INIT_RETURN_MISMATCH) {
    result = make_native_init_mismatch_error(env, &work.init_mismatch);
  } else if (work.init_return == INIT_RETURN_FAILED) {
    result = make_native_init_failed_error(env, &work.init_failure);
  } else {
    result = make_error(env, work.error);
  }

  free_init_mismatch(&work.init_mismatch);
  return result;
}

static ERL_NIF_TERM rx_eval_string(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  if (argc != 1) return enif_make_badarg(env);

  r_work work = {.kind = WORK_EVAL_STRING, .done = false, .ok = false, .next = NULL};

  if (!get_string(env, argv[0], &work.source)) {
    return enif_make_badarg(env);
  }

  if (!ensure_owner_thread()) {
    free(work.source);
    return make_error(env, "failed to start embedded R owner thread");
  }

  pthread_cond_init(&work.done_cond, NULL);
  enqueue_work(&work);

  pthread_mutex_lock(&queue_mutex);
  while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
  pthread_mutex_unlock(&queue_mutex);

  pthread_cond_destroy(&work.done_cond);
  free(work.source);

  ERL_NIF_TERM result;
  if (work.ok) {
    result = make_eval_success(env, &work);
  } else if (work.init_return == INIT_RETURN_FAILED) {
    result = make_native_init_failed_error(env, &work.init_failure);
  } else if (strcmp(work.error, "R evaluation failed") == 0) {
    result = make_tagged_error(env, "r_error", work.error);
  } else if (strcmp(work.error, "R parse failed") == 0) {
    result = make_tagged_error(env, "parse_error", work.error);
  } else {
    result = make_error(env, work.error);
  }

  free_result_value(&work);
  return result;
}

static ERL_NIF_TERM rx_eval(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  if (argc != 2) return enif_make_badarg(env);

  r_work work = {.kind = WORK_EVAL, .done = false, .ok = false, .next = NULL};

  bool source_has_nul = false;
  if (!get_string_reject_nul(env, argv[0], &work.source, &source_has_nul)) {
    return enif_make_badarg(env);
  }
  if (source_has_nul) {
    return make_error(env, "source contains NUL byte");
  }

  unsigned global_count;
  if (!enif_get_list_length(env, argv[1], &global_count)) {
    free(work.source);
    return enif_make_badarg(env);
  }

  work.global_count = global_count;
  if (global_count > 0) {
    work.global_names = calloc(global_count, sizeof(char *));
    work.global_resources = calloc(global_count, sizeof(rx_object_resource *));
    if (work.global_names == NULL || work.global_resources == NULL) {
      free(work.source);
      free_string_array(work.global_names, global_count);
      free(work.global_resources);
      return make_error(env, "failed to allocate native eval globals");
    }
  }

  ERL_NIF_TERM head;
  ERL_NIF_TERM tail = argv[1];
  for (unsigned i = 0; i < global_count; i++) {
    const ERL_NIF_TERM *tuple;
    int arity;

    if (!enif_get_list_cell(env, tail, &head, &tail) ||
        !enif_get_tuple(env, head, &arity, &tuple) ||
        arity != 2 ||
        !get_global_name(env, tuple[0], &work.global_names[i]) ||
        !enif_get_resource(env, tuple[1], object_resource_type, (void **)&work.global_resources[i])) {
      free(work.source);
      free_string_array(work.global_names, global_count);
      release_resource_array(work.global_resources, global_count);
      return enif_make_badarg(env);
    }

    enif_keep_resource(work.global_resources[i]);
  }

  if (!ensure_owner_thread()) {
    free(work.source);
    free_string_array(work.global_names, work.global_count);
    release_resource_array(work.global_resources, work.global_count);
    return make_error(env, "failed to start embedded R owner thread");
  }

  pthread_cond_init(&work.done_cond, NULL);
  enqueue_work(&work);

  pthread_mutex_lock(&queue_mutex);
  while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
  pthread_mutex_unlock(&queue_mutex);

  pthread_cond_destroy(&work.done_cond);
  free(work.source);
  free_string_array(work.global_names, work.global_count);
  release_resource_array(work.global_resources, work.global_count);

  ERL_NIF_TERM result;
  if (work.ok) {
    result = make_native_eval_success(env, &work);
  } else if (work.structured_error) {
    result = make_native_eval_error(env, &work);
  } else if (work.init_return == INIT_RETURN_FAILED) {
    result = make_native_init_failed_error(env, &work.init_failure);
  } else if (strcmp(work.error, "R evaluation failed") == 0) {
    result = make_tagged_error(env, "r_error", work.error);
  } else if (strcmp(work.error, "R parse failed") == 0) {
    result = make_tagged_error(env, "parse_error", work.error);
  } else {
    result = make_error(env, work.error);
  }

  if (work.result_resource != NULL) enif_release_resource(work.result_resource);
  free_eval_result_globals(&work);
  free_eval_capture(&work);
  return result;
}

static ERL_NIF_TERM rx_print(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  if (argc != 3) return enif_make_badarg(env);

  rx_object_resource *resource;
  if (!enif_get_resource(env, argv[0], object_resource_type, (void **)&resource)) {
    return enif_make_badarg(env);
  }

  bool has_width = false;
  int width = 0;
  bool has_max_print = false;
  int max_print = 0;

  if (!get_optional_int(env, argv[1], &has_width, &width) ||
      !get_optional_int(env, argv[2], &has_max_print, &max_print)) {
    return enif_make_badarg(env);
  }

  r_work work = {
      .kind = WORK_PRINT,
      .done = false,
      .ok = false,
      .resource = resource,
      .has_print_width = has_width,
      .print_width = width,
      .has_print_max_print = has_max_print,
      .print_max_print = max_print,
      .next = NULL};

  if (!ensure_owner_thread()) {
    return make_error(env, "failed to start embedded R owner thread");
  }

  enif_keep_resource(resource);
  pthread_cond_init(&work.done_cond, NULL);
  enqueue_work(&work);

  pthread_mutex_lock(&queue_mutex);
  while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
  pthread_mutex_unlock(&queue_mutex);

  pthread_cond_destroy(&work.done_cond);
  enif_release_resource(resource);

  ERL_NIF_TERM result;
  if (work.ok) {
    result = enif_make_tuple2(env, atom_ok, make_output_map(env, &work));
  } else if (work.structured_error) {
    result = make_native_eval_error(env, &work);
  } else if (work.init_return == INIT_RETURN_FAILED) {
    result = make_native_init_failed_error(env, &work.init_failure);
  } else {
    result = make_error(env, work.error);
  }

  free_eval_capture(&work);
  return result;
}

static int get_required_positive_int(ErlNifEnv *env, ERL_NIF_TERM term, int *out) {
  if (!get_int32(env, term, out)) return 0;
  return *out > 0;
}

static ERL_NIF_TERM rx_plot(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  if (argc != 8) return enif_make_badarg(env);

  r_work work = {.kind = WORK_PLOT, .done = false, .ok = false, .next = NULL};

  bool source_has_nul = false;
  if (!get_string_reject_nul(env, argv[0], &work.source, &source_has_nul)) {
    return enif_make_badarg(env);
  }
  if (source_has_nul) {
    return make_error(env, "source contains NUL byte");
  }

  unsigned global_count;
  if (!enif_get_list_length(env, argv[1], &global_count)) {
    free(work.source);
    return enif_make_badarg(env);
  }

  work.global_count = global_count;
  if (global_count > 0) {
    work.global_names = calloc(global_count, sizeof(char *));
    work.global_resources = calloc(global_count, sizeof(rx_object_resource *));
    if (work.global_names == NULL || work.global_resources == NULL) {
      free(work.source);
      free_string_array(work.global_names, global_count);
      free(work.global_resources);
      return make_error(env, "failed to allocate native plot globals");
    }
  }

  ERL_NIF_TERM head;
  ERL_NIF_TERM tail = argv[1];
  for (unsigned i = 0; i < global_count; i++) {
    const ERL_NIF_TERM *tuple;
    int arity;

    if (!enif_get_list_cell(env, tail, &head, &tail) ||
        !enif_get_tuple(env, head, &arity, &tuple) ||
        arity != 2 ||
        !get_global_name(env, tuple[0], &work.global_names[i]) ||
        !enif_get_resource(env, tuple[1], object_resource_type, (void **)&work.global_resources[i])) {
      free(work.source);
      free_string_array(work.global_names, global_count);
      release_resource_array(work.global_resources, global_count);
      return enif_make_badarg(env);
    }

    enif_keep_resource(work.global_resources[i]);
  }

  if (!get_required_positive_int(env, argv[2], &work.plot_width) ||
      !get_required_positive_int(env, argv[3], &work.plot_height) ||
      !get_required_positive_int(env, argv[4], &work.plot_res) ||
      !get_required_positive_int(env, argv[5], &work.plot_pointsize) ||
      !get_required_positive_int(env, argv[6], &work.plot_max_pages) ||
      !get_required_positive_int(env, argv[7], &work.plot_max_bytes)) {
    free(work.source);
    free_string_array(work.global_names, work.global_count);
    release_resource_array(work.global_resources, work.global_count);
    return enif_make_badarg(env);
  }

  if (!ensure_owner_thread()) {
    free(work.source);
    free_string_array(work.global_names, work.global_count);
    release_resource_array(work.global_resources, work.global_count);
    return make_error(env, "failed to start embedded R owner thread");
  }

  pthread_cond_init(&work.done_cond, NULL);
  enqueue_work(&work);

  pthread_mutex_lock(&queue_mutex);
  while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
  pthread_mutex_unlock(&queue_mutex);

  pthread_cond_destroy(&work.done_cond);
  free(work.source);
  free_string_array(work.global_names, work.global_count);
  release_resource_array(work.global_resources, work.global_count);

  ERL_NIF_TERM result;
  if (work.ok) {
    result = make_native_plot_success(env, &work);
  } else if (work.structured_error) {
    result = make_native_eval_error(env, &work);
  } else if (work.init_return == INIT_RETURN_FAILED) {
    result = make_native_init_failed_error(env, &work.init_failure);
  } else {
    result = make_error(env, work.error);
  }

  free_eval_capture(&work);
  free_plot_pages(&work);
  return result;
}

static ERL_NIF_TERM rx_encode(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  if (argc != 2) return enif_make_badarg(env);

  encode_value encoded;
  if (!parse_encode_value(env, argv[0], argv[1], &encoded)) {
    free_encode_value(&encoded);
    return enif_make_badarg(env);
  }

  rx_object_resource *resource = enif_alloc_resource(object_resource_type, sizeof(rx_object_resource));
  if (resource == NULL) {
    free_encode_value(&encoded);
    return make_error(env, "failed to allocate native R object resource");
  }

  resource->sexp = NULL;
  resource->id = reserve_object_id();
  resource->kind = encode_kind_to_resource_kind(encoded.kind);
  resource->release_enqueued = false;
  pthread_mutex_init(&resource->mutex, NULL);

  r_work work = {
      .kind = WORK_ENCODE_RESOURCE,
      .done = false,
      .ok = false,
      .encode = encoded,
      .resource = resource,
      .next = NULL};

  if (!ensure_owner_thread()) {
    enif_release_resource(resource);
    free_encode_value(&work.encode);
    return make_error(env, "failed to start embedded R owner thread");
  }

  pthread_cond_init(&work.done_cond, NULL);
  enqueue_work(&work);

  pthread_mutex_lock(&queue_mutex);
  while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
  pthread_mutex_unlock(&queue_mutex);

  pthread_cond_destroy(&work.done_cond);

  if (!work.ok) {
    enif_release_resource(resource);
    free_encode_value(&work.encode);
    if (work.init_return == INIT_RETURN_FAILED) {
      return make_native_init_failed_error(env, &work.init_failure);
    }
    return make_error(env, work.error);
  }

  ERL_NIF_TERM resource_term = enif_make_resource(env, resource);
  enif_release_resource(resource);
  free_encode_value(&work.encode);
  return enif_make_tuple2(env, atom_ok, resource_term);
}

static ERL_NIF_TERM rx_decode(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  if (argc != 1) return enif_make_badarg(env);

  rx_object_resource *resource;
  if (!enif_get_resource(env, argv[0], object_resource_type, (void **)&resource)) {
    return enif_make_badarg(env);
  }

  r_work work = {
      .kind = WORK_DECODE_RESOURCE,
      .done = false,
      .ok = false,
      .resource = resource,
      .next = NULL};

  if (!ensure_owner_thread()) {
    return make_error(env, "failed to start embedded R owner thread");
  }

  enif_keep_resource(resource);
  pthread_cond_init(&work.done_cond, NULL);
  enqueue_work(&work);

  pthread_mutex_lock(&queue_mutex);
  while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
  pthread_mutex_unlock(&queue_mutex);

  pthread_cond_destroy(&work.done_cond);
  enif_release_resource(resource);

  ERL_NIF_TERM result;
  if (work.ok) {
    result = make_eval_success(env, &work);
  } else if (work.init_return == INIT_RETURN_FAILED) {
    result = make_native_init_failed_error(env, &work.init_failure);
  } else {
    result = make_error(env, work.error);
  }

  free_result_value(&work);
  return result;
}

static ERL_NIF_TERM rx_encode_dataframe(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  if (argc != 1) return enif_make_badarg(env);

  r_work work = {
      .kind = WORK_ENCODE_DATAFRAME,
      .done = false,
      .ok = false,
      .next = NULL};

  if (!copy_binary_bytes(env, argv[0], &work.ipc_bytes, &work.ipc_len)) {
    return enif_make_badarg(env);
  }

  if (!ensure_owner_thread()) {
    free(work.ipc_bytes);
    return make_error(env, "failed to start embedded R owner thread");
  }

  pthread_cond_init(&work.done_cond, NULL);
  enqueue_work(&work);

  pthread_mutex_lock(&queue_mutex);
  while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
  pthread_mutex_unlock(&queue_mutex);

  pthread_cond_destroy(&work.done_cond);
  free(work.ipc_bytes);

  if (!work.ok) {
    if (work.result_resource != NULL) enif_release_resource(work.result_resource);

    if (work.init_return == INIT_RETURN_FAILED) {
      return make_native_init_failed_error(env, &work.init_failure);
    }

    if (strncmp(work.error, "missing_r_package: ", 19) == 0) {
      return make_tagged_error(env, "missing_r_package", work.error + 19);
    }

    return make_error(env, work.error);
  }

  ERL_NIF_TERM resource_term = enif_make_resource(env, work.result_resource);
  enif_release_resource(work.result_resource);
  return enif_make_tuple2(env, atom_ok, resource_term);
}

static ERL_NIF_TERM rx_encode_data_frame(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  if (argc != 1) return enif_make_badarg(env);

  dataframe_wire *wire = NULL;
  if (!parse_dataframe_wire(env, argv[0], &wire)) {
    return make_dataframe_error(env, "invalid_dataframe: malformed_wire");
  }

  r_work work = {
      .kind = WORK_ENCODE_DATA_FRAME,
      .done = false,
      .ok = false,
      .input_dataframe = wire,
      .next = NULL};

  if (!ensure_owner_thread()) {
    free_dataframe_wire(wire);
    return make_error(env, "failed to start embedded R owner thread");
  }

  pthread_cond_init(&work.done_cond, NULL);
  enqueue_work(&work);

  pthread_mutex_lock(&queue_mutex);
  while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
  pthread_mutex_unlock(&queue_mutex);

  pthread_cond_destroy(&work.done_cond);
  free_dataframe_wire(wire);

  if (!work.ok) {
    if (work.result_resource != NULL) enif_release_resource(work.result_resource);

    if (work.init_return == INIT_RETURN_FAILED) {
      return make_native_init_failed_error(env, &work.init_failure);
    }

    return make_dataframe_error(env, work.error);
  }

  ERL_NIF_TERM resource_term = enif_make_resource(env, work.result_resource);
  enif_release_resource(work.result_resource);
  return enif_make_tuple2(env, atom_ok, resource_term);
}

static ERL_NIF_TERM rx_decode_data_frame(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  if (argc != 2) return enif_make_badarg(env);

  rx_object_resource *resource;
  if (!enif_get_resource(env, argv[0], object_resource_type, (void **)&resource)) {
    return enif_make_badarg(env);
  }

  bool has_max_rows = false;
  unsigned max_rows = 0;
  if (!parse_dataframe_opts(env, argv[1], &has_max_rows, &max_rows)) {
    return enif_make_badarg(env);
  }

  r_work work = {
      .kind = WORK_DECODE_DATA_FRAME,
      .done = false,
      .ok = false,
      .resource = resource,
      .has_max_rows = has_max_rows,
      .max_rows = max_rows,
      .next = NULL};

  if (!ensure_owner_thread()) {
    return make_error(env, "failed to start embedded R owner thread");
  }

  enif_keep_resource(resource);
  pthread_cond_init(&work.done_cond, NULL);
  enqueue_work(&work);

  pthread_mutex_lock(&queue_mutex);
  while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
  pthread_mutex_unlock(&queue_mutex);

  pthread_cond_destroy(&work.done_cond);
  enif_release_resource(resource);

  ERL_NIF_TERM result;
  if (work.ok) {
    ERL_NIF_TERM wire_term;
    result = make_dataframe_wire_term(env, work.output_dataframe, &wire_term)
                 ? enif_make_tuple2(env, atom_ok, wire_term)
                 : make_error(env, "failed to allocate dataframe wire result");
  } else if (work.init_return == INIT_RETURN_FAILED) {
    result = make_native_init_failed_error(env, &work.init_failure);
  } else {
    result = make_dataframe_error(env, work.error);
  }

  free_dataframe_wire(work.output_dataframe);
  return result;
}

static ERL_NIF_TERM rx_decode_arrow(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  if (argc != 1) return enif_make_badarg(env);

  rx_object_resource *resource;
  if (!enif_get_resource(env, argv[0], object_resource_type, (void **)&resource)) {
    return enif_make_badarg(env);
  }

  r_work work = {
      .kind = WORK_DECODE_ARROW,
      .done = false,
      .ok = false,
      .resource = resource,
      .next = NULL};

  if (!ensure_owner_thread()) {
    return make_error(env, "failed to start embedded R owner thread");
  }

  enif_keep_resource(resource);
  pthread_cond_init(&work.done_cond, NULL);
  enqueue_work(&work);

  pthread_mutex_lock(&queue_mutex);
  while (!work.done) pthread_cond_wait(&work.done_cond, &queue_mutex);
  pthread_mutex_unlock(&queue_mutex);

  pthread_cond_destroy(&work.done_cond);
  enif_release_resource(resource);

  ERL_NIF_TERM result;
  if (work.ok) {
    bool ok = false;
    ERL_NIF_TERM binary = make_binary_term(env, work.output_bytes, work.output_len, &ok);
    result = ok ? enif_make_tuple2(env, atom_ok, binary) : make_error(env, "failed to allocate Arrow IPC binary");
  } else if (work.init_return == INIT_RETURN_FAILED) {
    result = make_native_init_failed_error(env, &work.init_failure);
  } else if (strncmp(work.error, "missing_r_package: ", 19) == 0) {
    result = make_tagged_error(env, "missing_r_package", work.error + 19);
  } else if (strncmp(work.error, "not_dataframe: ", 15) == 0) {
    result = make_tagged_error(env, "not_dataframe", work.error + 15);
  } else {
    result = make_error(env, work.error);
  }

  free(work.output_bytes);
  return result;
}

static ERL_NIF_TERM rx_owner_thread_id(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  (void)argv;
  if (argc != 0) return enif_make_badarg(env);
  return enif_make_uint64(env, read_diagnostic_uint64(&owner_thread_id_value));
}

static ERL_NIF_TERM rx_release_count(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  (void)argv;
  if (argc != 0) return enif_make_badarg(env);
  return enif_make_uint64(env, read_diagnostic_uint64(&release_count));
}

static ERL_NIF_TERM rx_last_release_thread_id(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
  (void)argv;
  if (argc != 0) return enif_make_badarg(env);
  return enif_make_uint64(env, read_diagnostic_uint64(&last_release_thread_id_value));
}

static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) {
  (void)priv_data;
  (void)load_info;
  object_resource_type = enif_open_resource_type(
      env,
      NULL,
      "rx_object_resource",
      rx_object_resource_dtor,
      ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER,
      NULL);
  if (object_resource_type == NULL) return 1;

  atom_ok = enif_make_atom(env, "ok");
  atom_error = enif_make_atom(env, "error");
  return 0;
}

static ErlNifFunc funcs[] = {
    {"init", 3, rx_init, ERL_NIF_DIRTY_JOB_IO_BOUND},
    {"eval_string", 1, rx_eval_string, ERL_NIF_DIRTY_JOB_CPU_BOUND},
    {"eval", 2, rx_eval, ERL_NIF_DIRTY_JOB_CPU_BOUND},
    {"print", 3, rx_print, ERL_NIF_DIRTY_JOB_CPU_BOUND},
    {"plot", 8, rx_plot, ERL_NIF_DIRTY_JOB_IO_BOUND},
    {"encode", 2, rx_encode, ERL_NIF_DIRTY_JOB_CPU_BOUND},
    {"decode", 1, rx_decode, ERL_NIF_DIRTY_JOB_CPU_BOUND},
    {"encode_dataframe", 1, rx_encode_dataframe, ERL_NIF_DIRTY_JOB_CPU_BOUND},
    {"encode_data_frame", 1, rx_encode_data_frame, ERL_NIF_DIRTY_JOB_CPU_BOUND},
    {"decode_data_frame", 2, rx_decode_data_frame, ERL_NIF_DIRTY_JOB_CPU_BOUND},
    {"decode_arrow", 1, rx_decode_arrow, ERL_NIF_DIRTY_JOB_CPU_BOUND},
    {"diagnostics", 0, rx_diagnostics, 0},
    {"owner_thread_id", 0, rx_owner_thread_id, 0},
    {"release_count", 0, rx_release_count, 0},
    {"last_release_thread_id", 0, rx_last_release_thread_id, 0},
};

ERL_NIF_INIT(Elixir.Rx.Native, funcs, load, NULL, NULL, NULL)