Skip to main content

native/dcb_layer_nif/src/lib.rs

use std::sync::Once;

use dcb_layer::{AppendCondition, Event, FdbStore, Query, QueryItem, ReadOptions, Versionstamp};
use once_cell::sync::Lazy;
use rustler::types::tuple::make_tuple;
use rustler::{Binary, Encoder, Env, LocalPid, NifResult, OwnedBinary, OwnedEnv, ResourceArc, Term};
use tokio::runtime::Runtime;

mod atoms {
    rustler::atoms! {
        ok,
        error,
        // event / stored-event field keys
        type_name,
        tags,
        data,
        position,
        // query field keys
        items,
        types,
        // condition / opts field keys
        query,
        after,
        limit,
        reverse,
        // error atoms
        empty_events,
        append_condition_failed,
        invalid_query,
        missing_event_type,
        batch_too_large,
        too_many_tags,
        reserved_tag,
        all_ff_key,
        tuple_encode,
        tuple_decode,
        event_not_found,
        fdb_error,
        // nil sentinel
        nil,
        // subscription
        fdb_watch_fired,
    }
}

static RUNTIME: Lazy<Runtime> = Lazy::new(|| Runtime::new().expect("tokio runtime"));
static FDB_BOOT: Once = Once::new();

pub struct FdbStoreResource(FdbStore);

// FDB's C library is thread-safe; FdbStore wraps Database (Arc-backed) + String.
unsafe impl Send for FdbStoreResource {}
unsafe impl Sync for FdbStoreResource {}

#[allow(non_local_definitions)]
fn load(env: Env, _info: Term) -> bool {
    let _ = rustler::resource!(FdbStoreResource, env);

    // Boot the FDB network thread once per process; forget the guard so it
    // is never dropped and the network stays alive for the process lifetime.
    FDB_BOOT.call_once(|| {
        let guard = unsafe { foundationdb::boot() };
        std::mem::forget(guard);
    });

    Lazy::force(&RUNTIME);
    true
}

// ---------------------------------------------------------------------------
// Decode helpers (Elixir term → Rust type)
// ---------------------------------------------------------------------------

fn is_nil(term: Term) -> bool {
    term.is_atom() && term.atom_to_string().map(|s| s == "nil").unwrap_or(false)
}

fn decode_vs(term: Term) -> NifResult<Versionstamp> {
    let bin: Binary = term.decode()?;
    let slice = bin.as_slice();
    if slice.len() != 12 {
        return Err(rustler::Error::BadArg);
    }
    let mut vs = [0u8; 12];
    vs.copy_from_slice(slice);
    Ok(vs)
}

fn decode_opt_vs(term: Term) -> NifResult<Option<Versionstamp>> {
    if is_nil(term) { Ok(None) } else { Ok(Some(decode_vs(term)?)) }
}

fn decode_event(term: Term) -> NifResult<Event> {
    let env = term.get_env();
    let type_name: String = term.map_get(atoms::type_name().encode(env))?.decode()?;
    let tags: Vec<String> = term.map_get(atoms::tags().encode(env))?.decode()?;
    // Copy binary out of Erlang heap before returning owned Event
    let data_bin: Binary = term.map_get(atoms::data().encode(env))?.decode()?;
    let data = data_bin.as_slice().to_vec();
    Ok(Event::new(type_name, tags, data))
}

fn decode_query_item(term: Term) -> NifResult<QueryItem> {
    let env = term.get_env();
    let types: Vec<String> = term.map_get(atoms::types().encode(env))?.decode()?;
    let tags: Vec<String> = term.map_get(atoms::tags().encode(env))?.decode()?;
    Ok(QueryItem { types, tags })
}

fn decode_query(term: Term) -> NifResult<Query> {
    let env = term.get_env();
    let raw: Vec<Term> = term.map_get(atoms::items().encode(env))?.decode()?;
    let items = raw.into_iter().map(decode_query_item).collect::<NifResult<_>>()?;
    Ok(Query { items })
}

fn decode_condition(term: Term) -> NifResult<AppendCondition> {
    let env = term.get_env();
    let query = decode_query(term.map_get(atoms::query().encode(env))?)?;
    let after = decode_opt_vs(term.map_get(atoms::after().encode(env))?)?;
    Ok(AppendCondition { query, after })
}

fn decode_read_opts(term: Term) -> NifResult<ReadOptions> {
    let env = term.get_env();
    let limit: usize = term.map_get(atoms::limit().encode(env))?.decode()?;
    let after = decode_opt_vs(term.map_get(atoms::after().encode(env))?)?;
    let reverse: bool = term.map_get(atoms::reverse().encode(env))?.decode()?;
    Ok(ReadOptions { limit, after, reverse })
}

// ---------------------------------------------------------------------------
// Encode helpers (Rust type → Elixir term)
// ---------------------------------------------------------------------------

fn vs_to_term<'a>(env: Env<'a>, vs: &Versionstamp) -> Term<'a> {
    let mut bin = OwnedBinary::new(12).unwrap();
    bin.as_mut_slice().copy_from_slice(vs);
    bin.release(env).encode(env)
}

fn bytes_to_term<'a>(env: Env<'a>, data: &[u8]) -> Term<'a> {
    let mut bin = OwnedBinary::new(data.len()).unwrap();
    bin.as_mut_slice().copy_from_slice(data);
    bin.release(env).encode(env)
}

fn encode_stored_event<'a>(env: Env<'a>, se: dcb_layer::StoredEvent) -> Term<'a> {
    let tags: Vec<Term> = se.event.tags.iter().map(|t| t.encode(env)).collect();
    Term::map_new(env)
        .map_put(atoms::type_name().encode(env), se.event.type_name.encode(env)).unwrap()
        .map_put(atoms::tags().encode(env), tags.encode(env)).unwrap()
        .map_put(atoms::data().encode(env), bytes_to_term(env, &se.event.data)).unwrap()
        .map_put(atoms::position().encode(env), vs_to_term(env, &se.position)).unwrap()
}

fn encode_dcb_error<'a>(env: Env<'a>, e: dcb_layer::Error) -> Term<'a> {
    use dcb_layer::Error::*;
    match e {
        EmptyEvents => atoms::empty_events().encode(env),
        AppendConditionFailed => atoms::append_condition_failed().encode(env),
        InvalidQuery => atoms::invalid_query().encode(env),
        MissingEventType => atoms::missing_event_type().encode(env),
        BatchTooLarge => atoms::batch_too_large().encode(env),
        TooManyTags => atoms::too_many_tags().encode(env),
        ReservedTag => atoms::reserved_tag().encode(env),
        AllFfKey => atoms::all_ff_key().encode(env),
        Fdb(e) => (atoms::fdb_error(), e.code()).encode(env),
        TupleEncode(s) => (atoms::tuple_encode(), s).encode(env),
        TupleDecode(s) => (atoms::tuple_decode(), s).encode(env),
        EventNotFound(s) => (atoms::event_not_found(), s).encode(env),
    }
}

fn ok<'a>(env: Env<'a>, val: Term<'a>) -> Term<'a> {
    (atoms::ok(), val).encode(env)
}

fn err<'a>(env: Env<'a>, reason: Term<'a>) -> Term<'a> {
    (atoms::error(), reason).encode(env)
}

// ---------------------------------------------------------------------------
// NIFs
// ---------------------------------------------------------------------------

/// Open a FoundationDB-backed store.
///
/// `cluster_file` — path to the cluster file, or `nil` to use the default.
/// Returns `{:ok, store_ref}` or `{:error, {:fdb_error, code}}`.
#[rustler::nif(schedule = "DirtyIo")]
fn dcb_store_open<'a>(env: Env<'a>, cluster_file: Term<'a>, namespace: String) -> Term<'a> {
    let db = if is_nil(cluster_file) {
        foundationdb::Database::default()
    } else {
        match cluster_file.decode::<String>() {
            Ok(path) => foundationdb::Database::from_path(&path),
            Err(_) => return err(env, "invalid_cluster_file".encode(env)),
        }
    };
    match db {
        Ok(db) => ok(env, ResourceArc::new(FdbStoreResource(FdbStore::new(db, namespace))).encode(env)),
        Err(e) => err(env, (atoms::fdb_error(), e.code()).encode(env)),
    }
}

/// Append events atomically.
///
/// `events` — list of `%{type_name: binary, tags: [binary], data: binary}`.
/// `conditions` — list of `%{query: query, after: nil | <<12 bytes>>}`.
/// Returns `{:ok, <<12 bytes>>}` (versionstamp of the last event) or `{:error, reason}`.
#[rustler::nif(schedule = "DirtyIo")]
fn dcb_store_append<'a>(
    env: Env<'a>,
    store: ResourceArc<FdbStoreResource>,
    events_term: Term<'a>,
    conditions_term: Term<'a>,
) -> Term<'a> {
    let events = match events_term
        .decode::<Vec<Term>>()
        .and_then(|ts| ts.into_iter().map(decode_event).collect::<NifResult<Vec<_>>>())
    {
        Ok(v) => v,
        Err(_) => return err(env, "invalid_events".encode(env)),
    };
    let conditions = match conditions_term
        .decode::<Vec<Term>>()
        .and_then(|ts| ts.into_iter().map(decode_condition).collect::<NifResult<Vec<_>>>())
    {
        Ok(v) => v,
        Err(_) => return err(env, "invalid_conditions".encode(env)),
    };

    match RUNTIME.block_on(store.0.append(events, conditions)) {
        Ok(vs) => ok(env, vs_to_term(env, &vs)),
        Err(e) => err(env, encode_dcb_error(env, e)),
    }
}

/// Read events matching a query.
///
/// `query` — `%{items: [%{types: [binary], tags: [binary]}]}`.
/// `opts` — `%{limit: non_neg_integer, after: nil | <<12 bytes>>, reverse: boolean}`.
/// Returns `{:ok, [stored_event]}` or `{:error, reason}`.
#[rustler::nif(schedule = "DirtyIo")]
fn dcb_store_read<'a>(
    env: Env<'a>,
    store: ResourceArc<FdbStoreResource>,
    query_term: Term<'a>,
    opts_term: Term<'a>,
) -> Term<'a> {
    let query = match decode_query(query_term) {
        Ok(q) => q,
        Err(_) => return err(env, "invalid_query".encode(env)),
    };
    let opts = match decode_read_opts(opts_term) {
        Ok(o) => o,
        Err(_) => return err(env, "invalid_opts".encode(env)),
    };
    match RUNTIME.block_on(store.0.read(query, Some(opts))) {
        Ok(events) => {
            let terms: Vec<Term> = events.into_iter().map(|se| encode_stored_event(env, se)).collect();
            ok(env, terms.encode(env))
        }
        Err(e) => err(env, encode_dcb_error(env, e)),
    }
}

/// Read all events in insertion order (no index used).
///
/// Returns `{:ok, [stored_event]}` or `{:error, reason}`.
#[rustler::nif(schedule = "DirtyIo")]
fn dcb_store_read_all<'a>(env: Env<'a>, store: ResourceArc<FdbStoreResource>) -> Term<'a> {
    match RUNTIME.block_on(store.0.read_all()) {
        Ok(events) => {
            let terms: Vec<Term> = events.into_iter().map(|se| encode_stored_event(env, se)).collect();
            ok(env, terms.encode(env))
        }
        Err(e) => err(env, encode_dcb_error(env, e)),
    }
}

/// Register a one-shot watch on the namespace sentinel key.
///
/// Spawns a tokio task that parks until the sentinel changes (i.e. any append in this
/// namespace), then sends `{:fdb_watch_fired}` to `pid`. Returns `:ok` immediately.
/// If `pid` is dead when the watch fires, the message is silently discarded.
#[rustler::nif(schedule = "DirtyIo")]
fn dcb_store_watch<'a>(
    env: Env<'a>,
    store: ResourceArc<FdbStoreResource>,
    pid_term: Term<'a>,
) -> Term<'a> {
    let pid: LocalPid = match pid_term.decode() {
        Ok(p) => p,
        Err(_) => return err(env, "invalid_pid".encode(env)),
    };
    let store_clone = store.0.clone();
    RUNTIME.spawn(async move {
        store_clone.wait_for_sentinel_change().await;
        let mut msg_env = OwnedEnv::new();
        let _ = msg_env.send_and_clear(&pid, |env| {
            make_tuple(env, &[atoms::fdb_watch_fired().encode(env)])
        });
    });
    atoms::ok().encode(env)
}

/// Read the durable cursor for a named subscription.
///
/// Returns `{:ok, <<12 bytes>>}` or `{:ok, nil}` if no cursor exists yet.
#[rustler::nif(schedule = "DirtyIo")]
fn dcb_store_get_cursor<'a>(
    env: Env<'a>,
    store: ResourceArc<FdbStoreResource>,
    name: String,
) -> Term<'a> {
    match RUNTIME.block_on(store.0.get_cursor(&name)) {
        Ok(None) => ok(env, atoms::nil().encode(env)),
        Ok(Some(vs)) => ok(env, vs_to_term(env, &vs)),
        Err(e) => err(env, encode_dcb_error(env, e)),
    }
}

/// Persist the cursor for a named subscription.
///
/// `position` must be `<<12 bytes>>`. Returns `:ok` or `{:error, reason}`.
#[rustler::nif(schedule = "DirtyIo")]
fn dcb_store_set_cursor<'a>(
    env: Env<'a>,
    store: ResourceArc<FdbStoreResource>,
    name: String,
    position_term: Term<'a>,
) -> Term<'a> {
    let position = match decode_vs(position_term) {
        Ok(vs) => vs,
        Err(_) => return err(env, "invalid_position".encode(env)),
    };
    match RUNTIME.block_on(store.0.set_cursor(&name, position)) {
        Ok(()) => atoms::ok().encode(env),
        Err(e) => err(env, encode_dcb_error(env, e)),
    }
}

rustler::init!( "Elixir.Dcb.Native", load = load);