Skip to main content

native/rx_rust_nif/src/owner.rs

use crate::r_api::RApi;
use crate::work::{WorkItem, WorkKind, WorkResult};
use std::collections::VecDeque;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex, OnceLock};

struct LastRelease {
    resource_id: Option<u64>,
    error: Option<String>,
}

static OWNER_THREAD_ID: AtomicU64 = AtomicU64::new(0);
static NEXT_RESOURCE_ID: AtomicU64 = AtomicU64::new(1);
static RELEASE_ENQUEUED_COUNT: AtomicU64 = AtomicU64::new(0);
static RELEASE_SUCCESS_COUNT: AtomicU64 = AtomicU64::new(0);
static RELEASE_FAILURE_COUNT: AtomicU64 = AtomicU64::new(0);
static RELEASE_SKIPPED_UNINITIALIZED_COUNT: AtomicU64 = AtomicU64::new(0);
static LAST_RELEASE_THREAD_ID: AtomicU64 = AtomicU64::new(0);
static LAST_RELEASE: Mutex<LastRelease> = Mutex::new(LastRelease {
    resource_id: None,
    error: None,
});
static OWNER: OnceLock<Arc<OwnerRuntime>> = OnceLock::new();

pub struct OwnerRuntime {
    queue: Mutex<VecDeque<WorkItem>>,
    condvar: Condvar,
}

pub(crate) struct OwnerState {
    pub(crate) r: Option<RApi>,
    pub(crate) init_state: crate::work::InitState,
    pub(crate) init_config: Option<crate::work::InitConfig>,
    pub(crate) attempted_init_config: Option<crate::work::InitConfig>,
    pub(crate) last_init_error: Option<crate::work::InitFailure>,
    pub(crate) init_attempt_count: u64,
    pub(crate) init_mismatch_count: u64,
}

impl OwnerState {
    pub(crate) fn is_initialized(&self) -> bool {
        self.init_state == crate::work::InitState::Initialized
    }

    pub(crate) fn terminal_init_failure(&self) -> Option<crate::work::InitFailure> {
        match (&self.init_state, &self.last_init_error) {
            (crate::work::InitState::Failed, Some(failure)) if !failure.retryable => {
                Some(failure.clone())
            }
            _ => None,
        }
    }
}

pub fn owner_thread_id() -> u64 {
    OWNER_THREAD_ID.load(Ordering::SeqCst)
}

pub fn release_count() -> u64 {
    RELEASE_SUCCESS_COUNT.load(Ordering::SeqCst)
}

pub fn last_release_thread_id() -> u64 {
    LAST_RELEASE_THREAD_ID.load(Ordering::SeqCst)
}

pub fn reserve_resource_id() -> u64 {
    NEXT_RESOURCE_ID.fetch_add(1, Ordering::SeqCst)
}

pub fn record_release_failure(resource_id: u64, message: impl Into<String>) {
    RELEASE_FAILURE_COUNT.fetch_add(1, Ordering::SeqCst);
    LAST_RELEASE_THREAD_ID.store(stable_thread_id(), Ordering::SeqCst);
    record_last_release(resource_id, Some(message.into()));
}

pub fn record_release_skipped(resource_id: u64, message: impl Into<String>) {
    RELEASE_SKIPPED_UNINITIALIZED_COUNT.fetch_add(1, Ordering::SeqCst);
    LAST_RELEASE_THREAD_ID.store(stable_thread_id(), Ordering::SeqCst);
    record_last_release(resource_id, Some(message.into()));
}

#[allow(dead_code)]
pub fn mark_owner_thread_for_bootstrap() {
    OWNER_THREAD_ID.store(stable_thread_id(), Ordering::SeqCst);
}

pub fn submit(kind: WorkKind) -> WorkResult {
    let runtime = OWNER.get_or_init(start_owner).clone();
    let (item, state) = WorkItem::new(kind);

    {
        let mut queue = runtime.queue.lock().expect("owner queue mutex poisoned");
        queue.push_back(item);
        runtime.condvar.notify_one();
    }

    let (lock, condvar) = &*state;
    let mut result = lock.lock().expect("work result mutex poisoned");

    while result.is_none() {
        result = condvar.wait(result).expect("work result mutex poisoned");
    }

    result
        .take()
        .expect("work result missing after notification")
}

fn start_owner() -> Arc<OwnerRuntime> {
    let runtime = Arc::new(OwnerRuntime {
        queue: Mutex::new(VecDeque::new()),
        condvar: Condvar::new(),
    });

    let thread_runtime = Arc::clone(&runtime);
    std::thread::Builder::new()
        .name("rx-rust-nif-r-owner".to_owned())
        .stack_size(64 * 1024 * 1024)
        .spawn(move || owner_loop(thread_runtime))
        .expect("failed to start embedded R owner thread");

    runtime
}

fn owner_loop(runtime: Arc<OwnerRuntime>) {
    OWNER_THREAD_ID.store(stable_thread_id(), Ordering::SeqCst);
    let mut state = OwnerState {
        r: None,
        init_state: crate::work::InitState::Uninitialized,
        init_config: None,
        attempted_init_config: None,
        last_init_error: None,
        init_attempt_count: 0,
        init_mismatch_count: 0,
    };

    loop {
        let item = {
            let mut queue = runtime.queue.lock().expect("owner queue mutex poisoned");

            while queue.is_empty() {
                queue = runtime
                    .condvar
                    .wait(queue)
                    .expect("owner queue mutex poisoned");
            }

            queue.pop_front().expect("owner queue item missing")
        };

        let result = catch_unwind(AssertUnwindSafe(|| dispatch(&mut state, &item.kind)))
            .unwrap_or_else(|_| {
                WorkResult::Error("Rust native owner thread panic was contained".to_owned())
            });

        item.finish(result);
    }
}

fn dispatch(state: &mut OwnerState, kind: &WorkKind) -> WorkResult {
    match kind {
        WorkKind::Init(init) => crate::eval::do_init(state, init),
        WorkKind::Diagnostics => WorkResult::NativeDiagnostics(diagnostics_snapshot(state)),
        WorkKind::EvalString(eval) => crate::eval::do_eval_string(state, eval),
        WorkKind::Eval(eval) => crate::eval::do_eval(state, eval),
        WorkKind::Print(print) => crate::print::do_print(state, print),
        WorkKind::Plot(plot) => crate::plot::do_plot(state, plot),
        WorkKind::EncodeDataframe(encode) => crate::arrow::do_encode_dataframe(state, encode),
        WorkKind::EncodeDataFrame(encode) => crate::data_frame::do_encode_data_frame(state, encode),
        WorkKind::DecodeDataFrame(decode) => crate::data_frame::do_decode_data_frame(state, decode),
        WorkKind::DecodeArrow(decode) => crate::arrow::do_decode_arrow(state, decode),
        WorkKind::EncodeResource(encode) => crate::codec::do_encode_resource(state, encode),
        WorkKind::DecodeResource(decode) => crate::codec::do_decode_resource(state, decode),
        WorkKind::Release { sexp, resource_id } => {
            if !state.is_initialized() {
                record_release_skipped(
                    *resource_id,
                    "embedded R runtime is not initialized during release",
                );
                return WorkResult::Ok;
            }

            if release_fault_enabled("skip_uninitialized") {
                record_release_skipped(
                    *resource_id,
                    "fault injected release skip before R_ReleaseObject",
                );
                return WorkResult::Ok;
            }

            if release_fault_enabled("1") {
                record_release_failure(
                    *resource_id,
                    "fault injected during native resource release",
                );
                return WorkResult::Ok;
            }

            if let Some(api) = state.r.as_ref() {
                unsafe { (api.release_object)(*sexp as crate::r_api::Sexp) };
                record_release_success(*resource_id);
            } else {
                record_release_failure(
                    *resource_id,
                    "embedded R API is not initialized during release",
                );
            }

            WorkResult::Ok
        }
    }
}

fn diagnostics_snapshot(state: &OwnerState) -> crate::work::NativeDiagnostics {
    let release = release_diagnostics_snapshot();

    crate::work::NativeDiagnostics {
        init_state: state.init_state,
        init_config: state.init_config.clone(),
        attempted_init_config: state.attempted_init_config.clone(),
        last_init_error: state.last_init_error.clone(),
        init_attempt_count: state.init_attempt_count,
        init_mismatch_count: state.init_mismatch_count,
        owner_thread_id: owner_thread_id(),
        release_enqueued_count: release.enqueued,
        release_success_count: release.success,
        release_failure_count: release.failure,
        release_skipped_uninitialized_count: release.skipped_uninitialized,
        last_release_thread_id: release.last_thread_id,
        last_release_resource_id: release.last_resource_id,
        last_release_error: release.last_error,
    }
}

pub fn diagnostics() -> Result<crate::work::NativeDiagnostics, String> {
    if OWNER.get().is_none() {
        let release = release_diagnostics_snapshot();

        return Ok(crate::work::NativeDiagnostics {
            init_state: crate::work::InitState::Uninitialized,
            init_config: None,
            attempted_init_config: None,
            last_init_error: None,
            init_attempt_count: 0,
            init_mismatch_count: 0,
            owner_thread_id: owner_thread_id(),
            release_enqueued_count: release.enqueued,
            release_success_count: release.success,
            release_failure_count: release.failure,
            release_skipped_uninitialized_count: release.skipped_uninitialized,
            last_release_thread_id: release.last_thread_id,
            last_release_resource_id: release.last_resource_id,
            last_release_error: release.last_error,
        });
    }

    match submit(crate::work::WorkKind::Diagnostics) {
        crate::work::WorkResult::NativeDiagnostics(diagnostics) => Ok(diagnostics),
        other => Err(format!("unexpected diagnostics result: {other:?}")),
    }
}

pub fn submit_release(sexp: usize, resource_id: u64) {
    let runtime = OWNER.get_or_init(start_owner).clone();
    let (item, _state) = WorkItem::new(WorkKind::Release { sexp, resource_id });

    record_release_enqueued(resource_id);

    let mut queue = runtime.queue.lock().expect("owner queue mutex poisoned");
    queue.push_back(item);
    runtime.condvar.notify_one();
}

struct ReleaseDiagnostics {
    enqueued: u64,
    success: u64,
    failure: u64,
    skipped_uninitialized: u64,
    last_thread_id: u64,
    last_resource_id: Option<u64>,
    last_error: Option<String>,
}

fn release_diagnostics_snapshot() -> ReleaseDiagnostics {
    let last_release = LAST_RELEASE
        .lock()
        .expect("last release diagnostics mutex poisoned");

    ReleaseDiagnostics {
        enqueued: RELEASE_ENQUEUED_COUNT.load(Ordering::SeqCst),
        success: RELEASE_SUCCESS_COUNT.load(Ordering::SeqCst),
        failure: RELEASE_FAILURE_COUNT.load(Ordering::SeqCst),
        skipped_uninitialized: RELEASE_SKIPPED_UNINITIALIZED_COUNT.load(Ordering::SeqCst),
        last_thread_id: LAST_RELEASE_THREAD_ID.load(Ordering::SeqCst),
        last_resource_id: last_release.resource_id,
        last_error: last_release.error.clone(),
    }
}

fn record_release_enqueued(resource_id: u64) {
    RELEASE_ENQUEUED_COUNT.fetch_add(1, Ordering::SeqCst);
    record_last_release(resource_id, None);
}

fn record_release_success(resource_id: u64) {
    RELEASE_SUCCESS_COUNT.fetch_add(1, Ordering::SeqCst);
    LAST_RELEASE_THREAD_ID.store(stable_thread_id(), Ordering::SeqCst);
    record_last_release(resource_id, None);
}

fn record_last_release(resource_id: u64, error: Option<String>) {
    let mut last_release = LAST_RELEASE
        .lock()
        .expect("last release diagnostics mutex poisoned");
    last_release.resource_id = Some(resource_id);
    last_release.error = error;
}

fn release_fault_enabled(mode: &str) -> bool {
    std::env::var("RX_CRASH_REPRO").ok().as_deref() == Some("1")
        && std::env::var("RX_NATIVE_FAULT_RELEASE").ok().as_deref() == Some(mode)
}

fn stable_thread_id() -> u64 {
    use std::collections::hash_map::DefaultHasher;
    use std::hash::{Hash, Hasher};

    let mut hasher = DefaultHasher::new();
    std::thread::current().id().hash(&mut hasher);
    hasher.finish()
}