Skip to main content

native/rx_rust_nif/src/lib.rs

use rustler::{Encoder, Env, Term};

mod arrow;
mod atoms;
mod codec;
mod data_frame;
mod eval;
mod owner;
mod plot;
mod print;
mod r_api;
mod resource;
mod terms;
mod work;

#[rustler::nif(schedule = "DirtyIo")]
fn init<'a>(env: Env<'a>, lib_r_path: String, r_home: String, lib_paths: Vec<String>) -> Term<'a> {
    match owner::submit(work::WorkKind::Init(work::InitWork {
        lib_r_path,
        r_home,
        lib_paths,
    })) {
        work::WorkResult::Ok => atoms::ok().encode(env),
        work::WorkResult::NativeInitMismatch(mismatch) => (
            atoms::error(),
            (
                atoms::native_init_mismatch(),
                crate::terms::encode_init_mismatch(env, &mismatch),
            ),
        )
            .encode(env),
        work::WorkResult::NativeInitFailed(failure) => (
            atoms::error(),
            (
                atoms::native_init_failed(),
                crate::terms::encode_init_failure(env, &failure),
            ),
        )
            .encode(env),
        work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
        other => (atoms::error(), format!("unexpected init result: {other:?}")).encode(env),
    }
}

#[rustler::nif(schedule = "DirtyCpu")]
fn eval_string<'a>(env: Env<'a>, source: String) -> Term<'a> {
    match owner::submit(work::WorkKind::EvalString(work::EvalStringWork { source })) {
        work::WorkResult::EvalString(value) => (atoms::ok(), value.encode(env)).encode(env),
        work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
        work::WorkResult::TaggedError(tag, message) => {
            let tag_atom = match tag {
                "r_error" => atoms::r_error(),
                "parse_error" => atoms::parse_error(),
                "unsupported_r_value" => atoms::unsupported_r_value(),
                _ => atoms::error(),
            };

            (atoms::error(), (tag_atom, message)).encode(env)
        }
        work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
        other => (
            atoms::error(),
            format!("unexpected eval_string result: {other:?}"),
        )
            .encode(env),
    }
}

#[rustler::nif(schedule = "DirtyCpu")]
fn eval<'a>(
    env: Env<'a>,
    source: String,
    globals: Vec<(String, rustler::ResourceArc<crate::resource::RxResource>)>,
) -> Result<Term<'a>, rustler::Error> {
    if globals
        .iter()
        .any(|(name, _resource)| name.as_bytes().contains(&0))
    {
        return Err(rustler::Error::BadArg);
    }

    let result = match owner::submit(work::WorkKind::Eval(work::EvalWork { source, globals })) {
        work::WorkResult::EvalSuccess(success) => crate::terms::encode_eval_success(env, success),
        work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
        work::WorkResult::StructuredError(error) => {
            crate::terms::encode_structured_error(env, error)
        }
        work::WorkResult::TaggedError("parse_error", message) => {
            (atoms::error(), (atoms::parse_error(), message)).encode(env)
        }
        work::WorkResult::TaggedError("r_error", message) => {
            (atoms::error(), (atoms::r_error(), message)).encode(env)
        }
        work::WorkResult::TaggedError(_tag, message) => (atoms::error(), message).encode(env),
        work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
        other => (atoms::error(), format!("unexpected eval result: {other:?}")).encode(env),
    };

    Ok(result)
}

#[rustler::nif(schedule = "DirtyCpu")]
fn print<'a>(
    env: Env<'a>,
    resource: rustler::ResourceArc<crate::resource::RxResource>,
    width: Option<i32>,
    max_print: Option<i32>,
) -> Term<'a> {
    match owner::submit(work::WorkKind::Print(work::PrintWork {
        resource,
        width,
        max_print,
    })) {
        work::WorkResult::PrintSuccess(output) => {
            (atoms::ok(), crate::terms::encode_output_map(env, &output)).encode(env)
        }
        work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
        work::WorkResult::StructuredError(error) => {
            crate::terms::encode_structured_error(env, error)
        }
        work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
        other => (
            atoms::error(),
            format!("unexpected print result: {other:?}"),
        )
            .encode(env),
    }
}

#[rustler::nif(schedule = "DirtyIo")]
fn plot<'a>(
    env: Env<'a>,
    source: String,
    globals: Vec<(String, rustler::ResourceArc<crate::resource::RxResource>)>,
    width: i32,
    height: i32,
    res: i32,
    pointsize: i32,
    max_pages: i32,
    max_bytes: i32,
) -> Result<Term<'a>, rustler::Error> {
    if globals
        .iter()
        .any(|(name, _resource)| name.as_bytes().contains(&0))
    {
        return Err(rustler::Error::BadArg);
    }

    let result = match owner::submit(work::WorkKind::Plot(work::PlotWork {
        source,
        globals,
        width,
        height,
        res,
        pointsize,
        max_pages,
        max_bytes,
    })) {
        work::WorkResult::PlotSuccess(success) => {
            match crate::terms::encode_plot_success(env, success) {
                Ok(term) => term,
                Err(message) => (atoms::error(), message).encode(env),
            }
        }
        work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
        work::WorkResult::StructuredError(error) => {
            crate::terms::encode_structured_error(env, error)
        }
        work::WorkResult::TaggedError("parse_error", message) => {
            (atoms::error(), (atoms::parse_error(), message)).encode(env)
        }
        work::WorkResult::TaggedError("r_error", message) => {
            (atoms::error(), (atoms::r_error(), message)).encode(env)
        }
        work::WorkResult::TaggedError(_tag, message) => (atoms::error(), message).encode(env),
        work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
        other => (atoms::error(), format!("unexpected plot result: {other:?}")).encode(env),
    };

    Ok(result)
}

#[rustler::nif(schedule = "DirtyCpu")]
fn encode<'a>(env: Env<'a>, kind: Term<'a>, value: Term<'a>) -> Result<Term<'a>, rustler::Error> {
    let parsed = crate::codec::parse_encode_value(kind, value)?;

    match owner::submit(work::WorkKind::EncodeResource(work::EncodeResourceWork {
        value: parsed,
    })) {
        work::WorkResult::Resource(resource) => Ok((atoms::ok(), resource).encode(env)),
        work::WorkResult::NativeInitFailed(failure) => Ok(encode_native_init_failed(env, &failure)),
        work::WorkResult::Error(message) => Ok((atoms::error(), message).encode(env)),
        other => Ok((
            atoms::error(),
            format!("unexpected encode result: {other:?}"),
        )
            .encode(env)),
    }
}

#[rustler::nif(schedule = "DirtyCpu")]
fn decode<'a>(
    env: Env<'a>,
    resource: rustler::ResourceArc<crate::resource::RxResource>,
) -> Term<'a> {
    match owner::submit(work::WorkKind::DecodeResource(work::DecodeResourceWork {
        resource,
    })) {
        work::WorkResult::Decoded(value) => (atoms::ok(), value.encode(env)).encode(env),
        work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
        work::WorkResult::TaggedError("unsupported_r_value", message) => {
            (atoms::error(), (atoms::unsupported_r_value(), message)).encode(env)
        }
        work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
        other => (
            atoms::error(),
            format!("unexpected decode result: {other:?}"),
        )
            .encode(env),
    }
}

#[rustler::nif(schedule = "DirtyCpu")]
fn encode_dataframe<'a>(env: Env<'a>, ipc: rustler::Binary<'a>) -> Term<'a> {
    match owner::submit(work::WorkKind::EncodeDataframe(work::EncodeDataframeWork {
        ipc_bytes: ipc.as_slice().to_vec(),
    })) {
        work::WorkResult::Resource(resource) => (atoms::ok(), resource).encode(env),
        work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
        work::WorkResult::TaggedError("missing_r_package", message) => {
            (atoms::error(), (atoms::missing_r_package(), message)).encode(env)
        }
        work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
        other => (
            atoms::error(),
            format!("unexpected encode_dataframe result: {other:?}"),
        )
            .encode(env),
    }
}

#[rustler::nif(schedule = "DirtyCpu")]
fn encode_data_frame<'a>(env: Env<'a>, wire: Term<'a>) -> Result<Term<'a>, rustler::Error> {
    let parsed = crate::data_frame::parse_wire(wire)?;

    let result = match owner::submit(work::WorkKind::EncodeDataFrame(work::EncodeDataFrameWork {
        wire: parsed,
    })) {
        work::WorkResult::Resource(resource) => (atoms::ok(), resource).encode(env),
        work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
        work::WorkResult::TaggedError(tag, message) => {
            crate::data_frame::encode_tagged_error(env, tag, &message)
        }
        work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
        other => (
            atoms::error(),
            format!("unexpected encode_data_frame result: {other:?}"),
        )
            .encode(env),
    };

    Ok(result)
}

#[rustler::nif(schedule = "DirtyCpu")]
fn decode_data_frame<'a>(
    env: Env<'a>,
    resource: rustler::ResourceArc<crate::resource::RxResource>,
    opts: Term<'a>,
) -> Result<Term<'a>, rustler::Error> {
    let max_rows = crate::data_frame::parse_max_rows(opts)?;

    let result = match owner::submit(work::WorkKind::DecodeDataFrame(work::DecodeDataFrameWork {
        resource,
        max_rows,
    })) {
        work::WorkResult::DataFrameWire(wire) => {
            (atoms::ok(), crate::data_frame::encode_wire(env, &wire)).encode(env)
        }
        work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
        work::WorkResult::TaggedError(tag, message) => {
            crate::data_frame::encode_tagged_error(env, tag, &message)
        }
        work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
        other => (
            atoms::error(),
            format!("unexpected decode_data_frame result: {other:?}"),
        )
            .encode(env),
    };

    Ok(result)
}

#[rustler::nif(schedule = "DirtyCpu")]
fn decode_arrow<'a>(
    env: Env<'a>,
    resource: rustler::ResourceArc<crate::resource::RxResource>,
) -> Term<'a> {
    match owner::submit(work::WorkKind::DecodeArrow(work::DecodeArrowWork {
        resource,
    })) {
        work::WorkResult::ArrowBytes(bytes) => {
            (atoms::ok(), crate::terms::binary(env, &bytes)).encode(env)
        }
        work::WorkResult::NativeInitFailed(failure) => encode_native_init_failed(env, &failure),
        work::WorkResult::TaggedError("missing_r_package", message) => {
            (atoms::error(), (atoms::missing_r_package(), message)).encode(env)
        }
        work::WorkResult::TaggedError("not_dataframe", message) => {
            (atoms::error(), (atoms::not_dataframe(), message)).encode(env)
        }
        work::WorkResult::Error(message) => (atoms::error(), message).encode(env),
        other => (
            atoms::error(),
            format!("unexpected decode_arrow result: {other:?}"),
        )
            .encode(env),
    }
}

#[rustler::nif]
fn owner_thread_id() -> u64 {
    owner::owner_thread_id()
}

#[rustler::nif]
fn release_count() -> u64 {
    owner::release_count()
}

#[rustler::nif]
fn last_release_thread_id() -> u64 {
    owner::last_release_thread_id()
}

#[rustler::nif(schedule = "DirtyIo")]
fn diagnostics<'a>(env: Env<'a>) -> Term<'a> {
    match owner::diagnostics() {
        Ok(diagnostics) => crate::terms::encode_native_diagnostics(env, diagnostics),
        Err(message) => (atoms::error(), message).encode(env),
    }
}

fn encode_native_init_failed<'a>(env: Env<'a>, failure: &work::InitFailure) -> Term<'a> {
    (
        atoms::error(),
        (
            atoms::native_init_failed(),
            crate::terms::encode_init_failure(env, failure),
        ),
    )
        .encode(env)
}

fn load(env: rustler::Env, _info: rustler::Term) -> bool {
    env.register::<resource::RxResource>().is_ok()
}

rustler::init!("Elixir.Rx.Native", load = load);