use crate::r_api::RApi;
use crate::work::{WorkItem, WorkKind, WorkResult};
use std::collections::VecDeque;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, OnceLock};
struct LastRelease {
resource_id: Option<u64>,
error: Option<String>,
}
static OWNER_THREAD_ID: AtomicU64 = AtomicU64::new(0);
static NEXT_RESOURCE_ID: AtomicU64 = AtomicU64::new(1);
static RELEASE_ENQUEUED_COUNT: AtomicU64 = AtomicU64::new(0);
static RELEASE_SUCCESS_COUNT: AtomicU64 = AtomicU64::new(0);
static RELEASE_FAILURE_COUNT: AtomicU64 = AtomicU64::new(0);
static RELEASE_SKIPPED_UNINITIALIZED_COUNT: AtomicU64 = AtomicU64::new(0);
static LAST_RELEASE_THREAD_ID: AtomicU64 = AtomicU64::new(0);
static LAST_RELEASE: Mutex<LastRelease> = Mutex::new(LastRelease {
resource_id: None,
error: None,
});
static OWNER: OnceLock<Arc<OwnerRuntime>> = OnceLock::new();
pub struct OwnerRuntime {
queue: Mutex<VecDeque<WorkItem>>,
condvar: Condvar,
}
pub(crate) struct OwnerState {
pub(crate) r: Option<RApi>,
pub(crate) init_state: crate::work::InitState,
pub(crate) init_config: Option<crate::work::InitConfig>,
pub(crate) attempted_init_config: Option<crate::work::InitConfig>,
pub(crate) last_init_error: Option<crate::work::InitFailure>,
pub(crate) init_attempt_count: u64,
pub(crate) init_mismatch_count: u64,
}
impl OwnerState {
pub(crate) fn is_initialized(&self) -> bool {
self.init_state == crate::work::InitState::Initialized
}
pub(crate) fn terminal_init_failure(&self) -> Option<crate::work::InitFailure> {
match (&self.init_state, &self.last_init_error) {
(crate::work::InitState::Failed, Some(failure)) if !failure.retryable => {
Some(failure.clone())
}
_ => None,
}
}
}
pub fn owner_thread_id() -> u64 {
OWNER_THREAD_ID.load(Ordering::SeqCst)
}
pub fn release_count() -> u64 {
RELEASE_SUCCESS_COUNT.load(Ordering::SeqCst)
}
pub fn last_release_thread_id() -> u64 {
LAST_RELEASE_THREAD_ID.load(Ordering::SeqCst)
}
pub fn reserve_resource_id() -> u64 {
NEXT_RESOURCE_ID.fetch_add(1, Ordering::SeqCst)
}
pub fn record_release_failure(resource_id: u64, message: impl Into<String>) {
RELEASE_FAILURE_COUNT.fetch_add(1, Ordering::SeqCst);
LAST_RELEASE_THREAD_ID.store(stable_thread_id(), Ordering::SeqCst);
record_last_release(resource_id, Some(message.into()));
}
pub fn record_release_skipped(resource_id: u64, message: impl Into<String>) {
RELEASE_SKIPPED_UNINITIALIZED_COUNT.fetch_add(1, Ordering::SeqCst);
LAST_RELEASE_THREAD_ID.store(stable_thread_id(), Ordering::SeqCst);
record_last_release(resource_id, Some(message.into()));
}
#[allow(dead_code)]
pub fn mark_owner_thread_for_bootstrap() {
OWNER_THREAD_ID.store(stable_thread_id(), Ordering::SeqCst);
}
pub fn submit(kind: WorkKind) -> WorkResult {
let runtime = OWNER.get_or_init(start_owner).clone();
let (item, state) = WorkItem::new(kind);
{
let mut queue = runtime.queue.lock().expect("owner queue mutex poisoned");
queue.push_back(item);
runtime.condvar.notify_one();
}
let (lock, condvar) = &*state;
let mut result = lock.lock().expect("work result mutex poisoned");
while result.is_none() {
result = condvar.wait(result).expect("work result mutex poisoned");
}
result
.take()
.expect("work result missing after notification")
}
fn start_owner() -> Arc<OwnerRuntime> {
let runtime = Arc::new(OwnerRuntime {
queue: Mutex::new(VecDeque::new()),
condvar: Condvar::new(),
});
let thread_runtime = Arc::clone(&runtime);
std::thread::Builder::new()
.name("rx-rust-nif-r-owner".to_owned())
.stack_size(64 * 1024 * 1024)
.spawn(move || owner_loop(thread_runtime))
.expect("failed to start embedded R owner thread");
runtime
}
fn owner_loop(runtime: Arc<OwnerRuntime>) {
OWNER_THREAD_ID.store(stable_thread_id(), Ordering::SeqCst);
let mut state = OwnerState {
r: None,
init_state: crate::work::InitState::Uninitialized,
init_config: None,
attempted_init_config: None,
last_init_error: None,
init_attempt_count: 0,
init_mismatch_count: 0,
};
loop {
let item = {
let mut queue = runtime.queue.lock().expect("owner queue mutex poisoned");
while queue.is_empty() {
queue = runtime
.condvar
.wait(queue)
.expect("owner queue mutex poisoned");
}
queue.pop_front().expect("owner queue item missing")
};
let result = catch_unwind(AssertUnwindSafe(|| dispatch(&mut state, &item.kind)))
.unwrap_or_else(|_| {
WorkResult::Error("Rust native owner thread panic was contained".to_owned())
});
item.finish(result);
}
}
fn dispatch(state: &mut OwnerState, kind: &WorkKind) -> WorkResult {
match kind {
WorkKind::Init(init) => crate::eval::do_init(state, init),
WorkKind::Diagnostics => WorkResult::NativeDiagnostics(diagnostics_snapshot(state)),
WorkKind::EvalString(eval) => crate::eval::do_eval_string(state, eval),
WorkKind::Eval(eval) => crate::eval::do_eval(state, eval),
WorkKind::Print(print) => crate::print::do_print(state, print),
WorkKind::Plot(plot) => crate::plot::do_plot(state, plot),
WorkKind::EncodeDataframe(encode) => crate::arrow::do_encode_dataframe(state, encode),
WorkKind::EncodeDataFrame(encode) => crate::data_frame::do_encode_data_frame(state, encode),
WorkKind::DecodeDataFrame(decode) => crate::data_frame::do_decode_data_frame(state, decode),
WorkKind::DecodeArrow(decode) => crate::arrow::do_decode_arrow(state, decode),
WorkKind::EncodeResource(encode) => crate::codec::do_encode_resource(state, encode),
WorkKind::DecodeResource(decode) => crate::codec::do_decode_resource(state, decode),
WorkKind::Release { sexp, resource_id } => {
if !state.is_initialized() {
record_release_skipped(
*resource_id,
"embedded R runtime is not initialized during release",
);
return WorkResult::Ok;
}
if release_fault_enabled("skip_uninitialized") {
record_release_skipped(
*resource_id,
"fault injected release skip before R_ReleaseObject",
);
return WorkResult::Ok;
}
if release_fault_enabled("1") {
record_release_failure(
*resource_id,
"fault injected during native resource release",
);
return WorkResult::Ok;
}
if let Some(api) = state.r.as_ref() {
unsafe { (api.release_object)(*sexp as crate::r_api::Sexp) };
record_release_success(*resource_id);
} else {
record_release_failure(
*resource_id,
"embedded R API is not initialized during release",
);
}
WorkResult::Ok
}
}
}
fn diagnostics_snapshot(state: &OwnerState) -> crate::work::NativeDiagnostics {
let release = release_diagnostics_snapshot();
crate::work::NativeDiagnostics {
init_state: state.init_state,
init_config: state.init_config.clone(),
attempted_init_config: state.attempted_init_config.clone(),
last_init_error: state.last_init_error.clone(),
init_attempt_count: state.init_attempt_count,
init_mismatch_count: state.init_mismatch_count,
owner_thread_id: owner_thread_id(),
release_enqueued_count: release.enqueued,
release_success_count: release.success,
release_failure_count: release.failure,
release_skipped_uninitialized_count: release.skipped_uninitialized,
last_release_thread_id: release.last_thread_id,
last_release_resource_id: release.last_resource_id,
last_release_error: release.last_error,
}
}
pub fn diagnostics() -> Result<crate::work::NativeDiagnostics, String> {
if OWNER.get().is_none() {
let release = release_diagnostics_snapshot();
return Ok(crate::work::NativeDiagnostics {
init_state: crate::work::InitState::Uninitialized,
init_config: None,
attempted_init_config: None,
last_init_error: None,
init_attempt_count: 0,
init_mismatch_count: 0,
owner_thread_id: owner_thread_id(),
release_enqueued_count: release.enqueued,
release_success_count: release.success,
release_failure_count: release.failure,
release_skipped_uninitialized_count: release.skipped_uninitialized,
last_release_thread_id: release.last_thread_id,
last_release_resource_id: release.last_resource_id,
last_release_error: release.last_error,
});
}
match submit(crate::work::WorkKind::Diagnostics) {
crate::work::WorkResult::NativeDiagnostics(diagnostics) => Ok(diagnostics),
other => Err(format!("unexpected diagnostics result: {other:?}")),
}
}
pub fn submit_release(sexp: usize, resource_id: u64) {
let runtime = OWNER.get_or_init(start_owner).clone();
let (item, _state) = WorkItem::new(WorkKind::Release { sexp, resource_id });
record_release_enqueued(resource_id);
let mut queue = runtime.queue.lock().expect("owner queue mutex poisoned");
queue.push_back(item);
runtime.condvar.notify_one();
}
struct ReleaseDiagnostics {
enqueued: u64,
success: u64,
failure: u64,
skipped_uninitialized: u64,
last_thread_id: u64,
last_resource_id: Option<u64>,
last_error: Option<String>,
}
fn release_diagnostics_snapshot() -> ReleaseDiagnostics {
let last_release = LAST_RELEASE
.lock()
.expect("last release diagnostics mutex poisoned");
ReleaseDiagnostics {
enqueued: RELEASE_ENQUEUED_COUNT.load(Ordering::SeqCst),
success: RELEASE_SUCCESS_COUNT.load(Ordering::SeqCst),
failure: RELEASE_FAILURE_COUNT.load(Ordering::SeqCst),
skipped_uninitialized: RELEASE_SKIPPED_UNINITIALIZED_COUNT.load(Ordering::SeqCst),
last_thread_id: LAST_RELEASE_THREAD_ID.load(Ordering::SeqCst),
last_resource_id: last_release.resource_id,
last_error: last_release.error.clone(),
}
}
fn record_release_enqueued(resource_id: u64) {
RELEASE_ENQUEUED_COUNT.fetch_add(1, Ordering::SeqCst);
record_last_release(resource_id, None);
}
fn record_release_success(resource_id: u64) {
RELEASE_SUCCESS_COUNT.fetch_add(1, Ordering::SeqCst);
LAST_RELEASE_THREAD_ID.store(stable_thread_id(), Ordering::SeqCst);
record_last_release(resource_id, None);
}
fn record_last_release(resource_id: u64, error: Option<String>) {
let mut last_release = LAST_RELEASE
.lock()
.expect("last release diagnostics mutex poisoned");
last_release.resource_id = Some(resource_id);
last_release.error = error;
}
fn release_fault_enabled(mode: &str) -> bool {
std::env::var("RX_CRASH_REPRO").ok().as_deref() == Some("1")
&& std::env::var("RX_NATIVE_FAULT_RELEASE").ok().as_deref() == Some(mode)
}
fn stable_thread_id() -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
std::thread::current().id().hash(&mut hasher);
hasher.finish()
}