use std::fmt;
use std::sync::{Arc, Condvar, Mutex};
#[allow(dead_code)]
pub enum WorkKind {
Init(InitWork),
Diagnostics,
EvalString(EvalStringWork),
Eval(EvalWork),
Print(PrintWork),
Plot(PlotWork),
EncodeDataframe(EncodeDataframeWork),
EncodeDataFrame(EncodeDataFrameWork),
DecodeDataFrame(DecodeDataFrameWork),
DecodeArrow(DecodeArrowWork),
EncodeResource(EncodeResourceWork),
DecodeResource(DecodeResourceWork),
Release { sexp: usize, resource_id: u64 },
}
#[derive(Debug)]
pub struct InitWork {
pub lib_r_path: String,
pub r_home: String,
pub lib_paths: Vec<String>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct InitConfig {
pub r_home: String,
pub lib_r_path: String,
pub lib_paths: Vec<String>,
}
impl From<&InitWork> for InitConfig {
fn from(work: &InitWork) -> Self {
Self {
r_home: work.r_home.clone(),
lib_r_path: work.lib_r_path.clone(),
lib_paths: work.lib_paths.clone(),
}
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum InitState {
Uninitialized,
Initialized,
Failed,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct InitFailure {
pub stage: &'static str,
pub message: String,
pub retryable: bool,
pub restart_required: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct InitMismatch {
pub message: String,
pub mismatches: Vec<&'static str>,
pub current: InitConfig,
pub requested: InitConfig,
}
#[derive(Clone, Debug)]
pub struct NativeDiagnostics {
pub init_state: InitState,
pub init_config: Option<InitConfig>,
pub attempted_init_config: Option<InitConfig>,
pub last_init_error: Option<InitFailure>,
pub init_attempt_count: u64,
pub init_mismatch_count: u64,
pub owner_thread_id: u64,
pub release_enqueued_count: u64,
pub release_success_count: u64,
pub release_failure_count: u64,
pub release_skipped_uninitialized_count: u64,
pub last_release_thread_id: u64,
pub last_release_resource_id: Option<u64>,
pub last_release_error: Option<String>,
}
#[derive(Debug)]
pub struct EvalStringWork {
pub source: String,
}
pub struct EvalWork {
pub source: String,
pub globals: Vec<(String, rustler::ResourceArc<crate::resource::RxResource>)>,
}
#[derive(Debug)]
pub struct EvalOutput {
pub stdout: Vec<u8>,
pub messages: Vec<u8>,
pub warnings: Vec<u8>,
}
impl EvalOutput {
pub fn empty() -> Self {
Self {
stdout: Vec::new(),
messages: Vec::new(),
warnings: Vec::new(),
}
}
}
pub struct EvalSuccess {
pub result: Option<rustler::ResourceArc<crate::resource::RxResource>>,
pub globals: Vec<(Vec<u8>, rustler::ResourceArc<crate::resource::RxResource>)>,
pub output: EvalOutput,
}
#[derive(Debug)]
pub struct StructuredError {
pub message: Vec<u8>,
pub r_class: Vec<Vec<u8>>,
pub call: Option<Vec<u8>>,
pub output: EvalOutput,
}
pub struct PrintWork {
pub resource: rustler::ResourceArc<crate::resource::RxResource>,
pub width: Option<i32>,
pub max_print: Option<i32>,
}
pub struct PlotWork {
pub source: String,
pub globals: Vec<(String, rustler::ResourceArc<crate::resource::RxResource>)>,
pub width: i32,
pub height: i32,
pub res: i32,
pub pointsize: i32,
pub max_pages: i32,
pub max_bytes: i32,
}
pub struct PlotSuccess {
pub width: i32,
pub height: i32,
pub pages: Vec<Vec<u8>>,
pub output: EvalOutput,
}
#[derive(Debug)]
pub struct EncodeDataframeWork {
pub ipc_bytes: Vec<u8>,
}
pub struct DecodeArrowWork {
pub resource: rustler::ResourceArc<crate::resource::RxResource>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct DataFrameWire {
pub names: Vec<Vec<u8>>,
pub n_rows: usize,
pub columns: Vec<DataFrameColumn>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct DataFrameColumn {
pub name: Vec<u8>,
pub column_type: NaType,
pub values: Vec<DataFrameValue>,
}
#[derive(Clone, Debug, PartialEq)]
pub enum DataFrameValue {
Logical(bool),
Integer(i32),
Double(f64),
Character(Vec<u8>),
Na(NaType),
}
#[derive(Debug)]
pub struct EncodeDataFrameWork {
pub wire: DataFrameWire,
}
pub struct DecodeDataFrameWork {
pub resource: rustler::ResourceArc<crate::resource::RxResource>,
pub max_rows: Option<usize>,
}
pub enum EncodeValue {
Null,
Logical(bool),
Integer(i32),
Double(f64),
Character(Vec<u8>),
LogicalVector(Vec<bool>),
IntegerVector(Vec<i32>),
DoubleVector(Vec<f64>),
CharacterVector(Vec<Vec<u8>>),
Na(NaType),
NamedList(Vec<(Vec<u8>, rustler::ResourceArc<crate::resource::RxResource>)>),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NaType {
Logical,
Integer,
Double,
Character,
}
pub struct EncodeResourceWork {
pub value: EncodeValue,
}
pub struct DecodeResourceWork {
pub resource: rustler::ResourceArc<crate::resource::RxResource>,
}
#[allow(dead_code)]
pub enum WorkResult {
Ok,
EvalString(crate::codec::TaggedValue),
EvalSuccess(EvalSuccess),
StructuredError(StructuredError),
PrintSuccess(EvalOutput),
PlotSuccess(PlotSuccess),
ArrowBytes(Vec<u8>),
DataFrameWire(DataFrameWire),
Resource(rustler::ResourceArc<crate::resource::RxResource>),
Decoded(crate::codec::TaggedValue),
Error(String),
TaggedError(&'static str, String),
NativeInitMismatch(InitMismatch),
NativeInitFailed(InitFailure),
NativeDiagnostics(NativeDiagnostics),
}
impl fmt::Debug for WorkResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Ok => f.write_str("Ok"),
Self::EvalString(value) => f.debug_tuple("EvalString").field(value).finish(),
Self::EvalSuccess(_success) => f.write_str("EvalSuccess(..)"),
Self::StructuredError(error) => f.debug_tuple("StructuredError").field(error).finish(),
Self::PrintSuccess(output) => f.debug_tuple("PrintSuccess").field(output).finish(),
Self::PlotSuccess(success) => f
.debug_struct("PlotSuccess")
.field("width", &success.width)
.field("height", &success.height)
.field("pages", &success.pages.len())
.finish(),
Self::ArrowBytes(bytes) => f.debug_tuple("ArrowBytes").field(&bytes.len()).finish(),
Self::DataFrameWire(wire) => f.debug_tuple("DataFrameWire").field(wire).finish(),
Self::Resource(_resource) => f.write_str("Resource(..)"),
Self::Decoded(value) => f.debug_tuple("Decoded").field(value).finish(),
Self::Error(message) => f.debug_tuple("Error").field(message).finish(),
Self::TaggedError(tag, message) => f
.debug_tuple("TaggedError")
.field(tag)
.field(message)
.finish(),
Self::NativeInitMismatch(mismatch) => {
f.debug_tuple("NativeInitMismatch").field(mismatch).finish()
}
Self::NativeInitFailed(failure) => {
f.debug_tuple("NativeInitFailed").field(failure).finish()
}
Self::NativeDiagnostics(_diagnostics) => f.write_str("NativeDiagnostics(..)"),
}
}
}
pub struct WorkItem {
pub kind: WorkKind,
state: Arc<(Mutex<Option<WorkResult>>, Condvar)>,
}
impl WorkItem {
pub fn new(kind: WorkKind) -> (Self, Arc<(Mutex<Option<WorkResult>>, Condvar)>) {
let state = Arc::new((Mutex::new(None), Condvar::new()));
(
Self {
kind,
state: Arc::clone(&state),
},
state,
)
}
pub fn finish(self, result: WorkResult) {
let (lock, condvar) = &*self.state;
*lock.lock().expect("work result mutex poisoned") = Some(result);
condvar.notify_one();
}
}