Skip to main content

native/rx_rust_nif/src/work.rs

use std::fmt;
use std::sync::{Arc, Condvar, Mutex};

#[allow(dead_code)]
pub enum WorkKind {
    Init(InitWork),
    Diagnostics,
    EvalString(EvalStringWork),
    Eval(EvalWork),
    Print(PrintWork),
    Plot(PlotWork),
    EncodeDataframe(EncodeDataframeWork),
    EncodeDataFrame(EncodeDataFrameWork),
    DecodeDataFrame(DecodeDataFrameWork),
    DecodeArrow(DecodeArrowWork),
    EncodeResource(EncodeResourceWork),
    DecodeResource(DecodeResourceWork),
    Release { sexp: usize, resource_id: u64 },
}

#[derive(Debug)]
pub struct InitWork {
    pub lib_r_path: String,
    pub r_home: String,
    pub lib_paths: Vec<String>,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct InitConfig {
    pub r_home: String,
    pub lib_r_path: String,
    pub lib_paths: Vec<String>,
}

impl From<&InitWork> for InitConfig {
    fn from(work: &InitWork) -> Self {
        Self {
            r_home: work.r_home.clone(),
            lib_r_path: work.lib_r_path.clone(),
            lib_paths: work.lib_paths.clone(),
        }
    }
}

#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum InitState {
    Uninitialized,
    Initialized,
    Failed,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct InitFailure {
    pub stage: &'static str,
    pub message: String,
    pub retryable: bool,
    pub restart_required: bool,
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct InitMismatch {
    pub message: String,
    pub mismatches: Vec<&'static str>,
    pub current: InitConfig,
    pub requested: InitConfig,
}

#[derive(Clone, Debug)]
pub struct NativeDiagnostics {
    pub init_state: InitState,
    pub init_config: Option<InitConfig>,
    pub attempted_init_config: Option<InitConfig>,
    pub last_init_error: Option<InitFailure>,
    pub init_attempt_count: u64,
    pub init_mismatch_count: u64,
    pub owner_thread_id: u64,
    pub release_enqueued_count: u64,
    pub release_success_count: u64,
    pub release_failure_count: u64,
    pub release_skipped_uninitialized_count: u64,
    pub last_release_thread_id: u64,
    pub last_release_resource_id: Option<u64>,
    pub last_release_error: Option<String>,
}

#[derive(Debug)]
pub struct EvalStringWork {
    pub source: String,
}

pub struct EvalWork {
    pub source: String,
    pub globals: Vec<(String, rustler::ResourceArc<crate::resource::RxResource>)>,
}

#[derive(Debug)]
pub struct EvalOutput {
    pub stdout: Vec<u8>,
    pub messages: Vec<u8>,
    pub warnings: Vec<u8>,
}

impl EvalOutput {
    pub fn empty() -> Self {
        Self {
            stdout: Vec::new(),
            messages: Vec::new(),
            warnings: Vec::new(),
        }
    }
}

pub struct EvalSuccess {
    pub result: Option<rustler::ResourceArc<crate::resource::RxResource>>,
    pub globals: Vec<(Vec<u8>, rustler::ResourceArc<crate::resource::RxResource>)>,
    pub output: EvalOutput,
}

#[derive(Debug)]
pub struct StructuredError {
    pub message: Vec<u8>,
    pub r_class: Vec<Vec<u8>>,
    pub call: Option<Vec<u8>>,
    pub output: EvalOutput,
}

pub struct PrintWork {
    pub resource: rustler::ResourceArc<crate::resource::RxResource>,
    pub width: Option<i32>,
    pub max_print: Option<i32>,
}

pub struct PlotWork {
    pub source: String,
    pub globals: Vec<(String, rustler::ResourceArc<crate::resource::RxResource>)>,
    pub width: i32,
    pub height: i32,
    pub res: i32,
    pub pointsize: i32,
    pub max_pages: i32,
    pub max_bytes: i32,
}

pub struct PlotSuccess {
    pub width: i32,
    pub height: i32,
    pub pages: Vec<Vec<u8>>,
    pub output: EvalOutput,
}

#[derive(Debug)]
pub struct EncodeDataframeWork {
    pub ipc_bytes: Vec<u8>,
}

pub struct DecodeArrowWork {
    pub resource: rustler::ResourceArc<crate::resource::RxResource>,
}

#[derive(Clone, Debug, PartialEq)]
pub struct DataFrameWire {
    pub names: Vec<Vec<u8>>,
    pub n_rows: usize,
    pub columns: Vec<DataFrameColumn>,
}

#[derive(Clone, Debug, PartialEq)]
pub struct DataFrameColumn {
    pub name: Vec<u8>,
    pub column_type: NaType,
    pub values: Vec<DataFrameValue>,
}

#[derive(Clone, Debug, PartialEq)]
pub enum DataFrameValue {
    Logical(bool),
    Integer(i32),
    Double(f64),
    Character(Vec<u8>),
    Na(NaType),
}

#[derive(Debug)]
pub struct EncodeDataFrameWork {
    pub wire: DataFrameWire,
}

pub struct DecodeDataFrameWork {
    pub resource: rustler::ResourceArc<crate::resource::RxResource>,
    pub max_rows: Option<usize>,
}

pub enum EncodeValue {
    Null,
    Logical(bool),
    Integer(i32),
    Double(f64),
    Character(Vec<u8>),
    LogicalVector(Vec<bool>),
    IntegerVector(Vec<i32>),
    DoubleVector(Vec<f64>),
    CharacterVector(Vec<Vec<u8>>),
    Na(NaType),
    NamedList(Vec<(Vec<u8>, rustler::ResourceArc<crate::resource::RxResource>)>),
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NaType {
    Logical,
    Integer,
    Double,
    Character,
}

pub struct EncodeResourceWork {
    pub value: EncodeValue,
}

pub struct DecodeResourceWork {
    pub resource: rustler::ResourceArc<crate::resource::RxResource>,
}

#[allow(dead_code)]
pub enum WorkResult {
    Ok,
    EvalString(crate::codec::TaggedValue),
    EvalSuccess(EvalSuccess),
    StructuredError(StructuredError),
    PrintSuccess(EvalOutput),
    PlotSuccess(PlotSuccess),
    ArrowBytes(Vec<u8>),
    DataFrameWire(DataFrameWire),
    Resource(rustler::ResourceArc<crate::resource::RxResource>),
    Decoded(crate::codec::TaggedValue),
    Error(String),
    TaggedError(&'static str, String),
    NativeInitMismatch(InitMismatch),
    NativeInitFailed(InitFailure),
    NativeDiagnostics(NativeDiagnostics),
}

impl fmt::Debug for WorkResult {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::Ok => f.write_str("Ok"),
            Self::EvalString(value) => f.debug_tuple("EvalString").field(value).finish(),
            Self::EvalSuccess(_success) => f.write_str("EvalSuccess(..)"),
            Self::StructuredError(error) => f.debug_tuple("StructuredError").field(error).finish(),
            Self::PrintSuccess(output) => f.debug_tuple("PrintSuccess").field(output).finish(),
            Self::PlotSuccess(success) => f
                .debug_struct("PlotSuccess")
                .field("width", &success.width)
                .field("height", &success.height)
                .field("pages", &success.pages.len())
                .finish(),
            Self::ArrowBytes(bytes) => f.debug_tuple("ArrowBytes").field(&bytes.len()).finish(),
            Self::DataFrameWire(wire) => f.debug_tuple("DataFrameWire").field(wire).finish(),
            Self::Resource(_resource) => f.write_str("Resource(..)"),
            Self::Decoded(value) => f.debug_tuple("Decoded").field(value).finish(),
            Self::Error(message) => f.debug_tuple("Error").field(message).finish(),
            Self::TaggedError(tag, message) => f
                .debug_tuple("TaggedError")
                .field(tag)
                .field(message)
                .finish(),
            Self::NativeInitMismatch(mismatch) => {
                f.debug_tuple("NativeInitMismatch").field(mismatch).finish()
            }
            Self::NativeInitFailed(failure) => {
                f.debug_tuple("NativeInitFailed").field(failure).finish()
            }
            Self::NativeDiagnostics(_diagnostics) => f.write_str("NativeDiagnostics(..)"),
        }
    }
}

pub struct WorkItem {
    pub kind: WorkKind,
    state: Arc<(Mutex<Option<WorkResult>>, Condvar)>,
}

impl WorkItem {
    pub fn new(kind: WorkKind) -> (Self, Arc<(Mutex<Option<WorkResult>>, Condvar)>) {
        let state = Arc::new((Mutex::new(None), Condvar::new()));
        (
            Self {
                kind,
                state: Arc::clone(&state),
            },
            state,
        )
    }

    pub fn finish(self, result: WorkResult) {
        let (lock, condvar) = &*self.state;
        *lock.lock().expect("work result mutex poisoned") = Some(result);
        condvar.notify_one();
    }
}