use rustler::{Encoder, Env, Term};
mod arrow;
mod atoms;
mod codec;
mod data_frame;
mod eval;
mod owner;
mod plot;
mod print;
mod r_api;
mod resource;
mod terms;
mod work;
#[rustler::nif(schedule = "DirtyIo")]
fn init<'a>(env: Env<'a>, lib_r_path: String, r_home: String, lib_paths: Vec<String>) -> Term<'a> {
match owner::submit(work::WorkKind::Init(work::InitWork {
lib_r_path,
r_home,
lib_paths,
})) {
work::WorkResult::Ok => atoms::ok().encode(env),
work::WorkResult::NativeInitMismatch(mismatch) => (
atoms::error(),
(
atoms::native_init_mismatch(),
crate::terms::encode_init_mismatch(env, &mismatch),
),
)
.encode(env),
work::WorkResult::NativeInitFailed(failure) => (
atoms::error(),
(
atoms::native_init_failed(),
crate::terms::encode_init_failure(env, &failure),
),
)
.encode(env),
work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
other => (atoms::error(), format!("unexpected init result: {other:?}")).encode(env),
}
}
#[rustler::nif(schedule = "DirtyCpu")]
fn eval_string<'a>(env: Env<'a>, source: String) -> Term<'a> {
match owner::submit(work::WorkKind::EvalString(work::EvalStringWork { source })) {
work::WorkResult::EvalString(value) => (atoms::ok(), value.encode(env)).encode(env),
work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
work::WorkResult::TaggedError(tag, message) => {
let tag_atom = match tag {
"r_error" => atoms::r_error(),
"parse_error" => atoms::parse_error(),
"unsupported_r_value" => atoms::unsupported_r_value(),
_ => atoms::error(),
};
(atoms::error(), (tag_atom, message)).encode(env)
}
work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
other => (
atoms::error(),
format!("unexpected eval_string result: {other:?}"),
)
.encode(env),
}
}
#[rustler::nif(schedule = "DirtyCpu")]
fn eval<'a>(
env: Env<'a>,
source: String,
globals: Vec<(String, rustler::ResourceArc<crate::resource::RxResource>)>,
) -> Result<Term<'a>, rustler::Error> {
if globals
.iter()
.any(|(name, _resource)| name.as_bytes().contains(&0))
{
return Err(rustler::Error::BadArg);
}
let result = match owner::submit(work::WorkKind::Eval(work::EvalWork { source, globals })) {
work::WorkResult::EvalSuccess(success) => crate::terms::encode_eval_success(env, success),
work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
work::WorkResult::StructuredError(error) => {
crate::terms::encode_structured_error(env, error)
}
work::WorkResult::TaggedError("parse_error", message) => {
(atoms::error(), (atoms::parse_error(), message)).encode(env)
}
work::WorkResult::TaggedError("r_error", message) => {
(atoms::error(), (atoms::r_error(), message)).encode(env)
}
work::WorkResult::TaggedError(_tag, message) => (atoms::error(), message).encode(env),
work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
other => (atoms::error(), format!("unexpected eval result: {other:?}")).encode(env),
};
Ok(result)
}
#[rustler::nif(schedule = "DirtyCpu")]
fn print<'a>(
env: Env<'a>,
resource: rustler::ResourceArc<crate::resource::RxResource>,
width: Option<i32>,
max_print: Option<i32>,
) -> Term<'a> {
match owner::submit(work::WorkKind::Print(work::PrintWork {
resource,
width,
max_print,
})) {
work::WorkResult::PrintSuccess(output) => {
(atoms::ok(), crate::terms::encode_output_map(env, &output)).encode(env)
}
work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
work::WorkResult::StructuredError(error) => {
crate::terms::encode_structured_error(env, error)
}
work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
other => (
atoms::error(),
format!("unexpected print result: {other:?}"),
)
.encode(env),
}
}
#[rustler::nif(schedule = "DirtyIo")]
fn plot<'a>(
env: Env<'a>,
source: String,
globals: Vec<(String, rustler::ResourceArc<crate::resource::RxResource>)>,
width: i32,
height: i32,
res: i32,
pointsize: i32,
max_pages: i32,
max_bytes: i32,
) -> Result<Term<'a>, rustler::Error> {
if globals
.iter()
.any(|(name, _resource)| name.as_bytes().contains(&0))
{
return Err(rustler::Error::BadArg);
}
let result = match owner::submit(work::WorkKind::Plot(work::PlotWork {
source,
globals,
width,
height,
res,
pointsize,
max_pages,
max_bytes,
})) {
work::WorkResult::PlotSuccess(success) => {
match crate::terms::encode_plot_success(env, success) {
Ok(term) => term,
Err(message) => (atoms::error(), message).encode(env),
}
}
work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
work::WorkResult::StructuredError(error) => {
crate::terms::encode_structured_error(env, error)
}
work::WorkResult::TaggedError("parse_error", message) => {
(atoms::error(), (atoms::parse_error(), message)).encode(env)
}
work::WorkResult::TaggedError("r_error", message) => {
(atoms::error(), (atoms::r_error(), message)).encode(env)
}
work::WorkResult::TaggedError(_tag, message) => (atoms::error(), message).encode(env),
work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
other => (atoms::error(), format!("unexpected plot result: {other:?}")).encode(env),
};
Ok(result)
}
#[rustler::nif(schedule = "DirtyCpu")]
fn encode<'a>(env: Env<'a>, kind: Term<'a>, value: Term<'a>) -> Result<Term<'a>, rustler::Error> {
let parsed = crate::codec::parse_encode_value(kind, value)?;
match owner::submit(work::WorkKind::EncodeResource(work::EncodeResourceWork {
value: parsed,
})) {
work::WorkResult::Resource(resource) => Ok((atoms::ok(), resource).encode(env)),
work::WorkResult::NativeInitFailed(failure) => Ok(encode_native_init_failed(env, &failure)),
work::WorkResult::Error(message) => Ok((atoms::error(), message).encode(env)),
other => Ok((
atoms::error(),
format!("unexpected encode result: {other:?}"),
)
.encode(env)),
}
}
#[rustler::nif(schedule = "DirtyCpu")]
fn decode<'a>(
env: Env<'a>,
resource: rustler::ResourceArc<crate::resource::RxResource>,
) -> Term<'a> {
match owner::submit(work::WorkKind::DecodeResource(work::DecodeResourceWork {
resource,
})) {
work::WorkResult::Decoded(value) => (atoms::ok(), value.encode(env)).encode(env),
work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
work::WorkResult::TaggedError("unsupported_r_value", message) => {
(atoms::error(), (atoms::unsupported_r_value(), message)).encode(env)
}
work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
other => (
atoms::error(),
format!("unexpected decode result: {other:?}"),
)
.encode(env),
}
}
#[rustler::nif(schedule = "DirtyCpu")]
fn encode_dataframe<'a>(env: Env<'a>, ipc: rustler::Binary<'a>) -> Term<'a> {
match owner::submit(work::WorkKind::EncodeDataframe(work::EncodeDataframeWork {
ipc_bytes: ipc.as_slice().to_vec(),
})) {
work::WorkResult::Resource(resource) => (atoms::ok(), resource).encode(env),
work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
work::WorkResult::TaggedError("missing_r_package", message) => {
(atoms::error(), (atoms::missing_r_package(), message)).encode(env)
}
work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
other => (
atoms::error(),
format!("unexpected encode_dataframe result: {other:?}"),
)
.encode(env),
}
}
#[rustler::nif(schedule = "DirtyCpu")]
fn encode_data_frame<'a>(env: Env<'a>, wire: Term<'a>) -> Result<Term<'a>, rustler::Error> {
let parsed = crate::data_frame::parse_wire(wire)?;
let result = match owner::submit(work::WorkKind::EncodeDataFrame(work::EncodeDataFrameWork {
wire: parsed,
})) {
work::WorkResult::Resource(resource) => (atoms::ok(), resource).encode(env),
work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
work::WorkResult::TaggedError(tag, message) => {
crate::data_frame::encode_tagged_error(env, tag, &message)
}
work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
other => (
atoms::error(),
format!("unexpected encode_data_frame result: {other:?}"),
)
.encode(env),
};
Ok(result)
}
#[rustler::nif(schedule = "DirtyCpu")]
fn decode_data_frame<'a>(
env: Env<'a>,
resource: rustler::ResourceArc<crate::resource::RxResource>,
opts: Term<'a>,
) -> Result<Term<'a>, rustler::Error> {
let max_rows = crate::data_frame::parse_max_rows(opts)?;
let result = match owner::submit(work::WorkKind::DecodeDataFrame(work::DecodeDataFrameWork {
resource,
max_rows,
})) {
work::WorkResult::DataFrameWire(wire) => {
(atoms::ok(), crate::data_frame::encode_wire(env, &wire)).encode(env)
}
work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
work::WorkResult::TaggedError(tag, message) => {
crate::data_frame::encode_tagged_error(env, tag, &message)
}
work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
other => (
atoms::error(),
format!("unexpected decode_data_frame result: {other:?}"),
)
.encode(env),
};
Ok(result)
}
#[rustler::nif(schedule = "DirtyCpu")]
fn decode_arrow<'a>(
env: Env<'a>,
resource: rustler::ResourceArc<crate::resource::RxResource>,
) -> Term<'a> {
match owner::submit(work::WorkKind::DecodeArrow(work::DecodeArrowWork {
resource,
})) {
work::WorkResult::ArrowBytes(bytes) => {
(atoms::ok(), crate::terms::binary(env, &bytes)).encode(env)
}
work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
work::WorkResult::TaggedError("missing_r_package", message) => {
(atoms::error(), (atoms::missing_r_package(), message)).encode(env)
}
work::WorkResult::TaggedError("not_dataframe", message) => {
(atoms::error(), (atoms::not_dataframe(), message)).encode(env)
}
work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
other => (
atoms::error(),
format!("unexpected decode_arrow result: {other:?}"),
)
.encode(env),
}
}
#[rustler::nif]
fn owner_thread_id() -> u64 {
owner::owner_thread_id()
}
#[rustler::nif]
fn release_count() -> u64 {
owner::release_count()
}
#[rustler::nif]
fn last_release_thread_id() -> u64 {
owner::last_release_thread_id()
}
#[rustler::nif(schedule = "DirtyIo")]
fn diagnostics<'a>(env: Env<'a>) -> Term<'a> {
match owner::diagnostics() {
Ok(diagnostics) => crate::terms::encode_native_diagnostics(env, diagnostics),
Err(message) => (atoms::error(), message).encode(env),
}
}
fn encode_native_init_failed<'a>(env: Env<'a>, failure: &work::InitFailure) -> Term<'a> {
(
atoms::error(),
(
atoms::native_init_failed(),
crate::terms::encode_init_failure(env, failure),
),
)
.encode(env)
}
fn load(env: rustler::Env, _info: rustler::Term) -> bool {
env.register::<resource::RxResource>().is_ok()
}
rustler::init!("Elixir.Rx.Native", load = load);