c_src/lz4_nif.c

/*
 * lz4_nif — Erlang NIF wrapper around the LZ4 block-format codec.
 *
 * API:
 *   lz4_nif:compress(Binary)             -> {ok, Binary} | {error, atom()}.
 *   lz4_nif:compress(Binary, Options)    -> {ok, Binary} | {error, atom()}.
 *   lz4_nif:uncompress(Binary, OrigSize) -> {ok, Binary} | {error, atom()}.
 *
 * Implementation notes:
 * - Raw NIF (no rustler / no enif wrappers). Same approach as knot — for
 *   small/fast operations the rustler runtime layer dominates the cost.
 *   For lz4 the work itself is real, but the boundary is on the same
 *   hot paths (Cassandra frame encode/decode in marina), so we still
 *   want minimal dispatch overhead.
 * - Inputs over LZ4_NIF_DIRTY_THRESHOLD bytes (default 20 KB) are
 *   dispatched to the dirty CPU scheduler so they don't block normal
 *   schedulers. The threshold matches torque's "dispatch on >20 KB"
 *   convention.
 * - Output binaries are allocated via enif_alloc_binary and shrunk to
 *   the actual compressed/decompressed size before returning.
 *
 * License: MIT. The vendored LZ4 library in c_src/lz4/ is BSD-2-Clause,
 * copyright Yann Collet — see LICENSE.
 */

#include <erl_nif.h>
#include <string.h>

#include "lz4/lz4.h"

/* Inputs above this are handed to a dirty CPU scheduler. Matches torque's
   convention (20 KB). Smaller inputs run inline and report their cost
   via enif_consume_timeslice instead. */
#define LZ4_NIF_DIRTY_THRESHOLD (20 * 1024)

/* Cost model for the inline path's reduction accounting.
 *
 * BYTES_PER_REDUCTION estimates how many input bytes LZ4 processes per
 * BEAM reduction. LZ4_compress_default is ~500 MB/s on a modern x86
 * core; with a default timeslice of ~1ms = 4000 reductions that's
 * roughly 125 bytes/reduction. We round down to 100 to overstate the
 * cost slightly (more conservative scheduling, fairer to other
 * processes). Decompression is faster (~2 GB/s) but reuses the same
 * constant to keep things simple -- the upper bound on slack is
 * acceptable.
 *
 * REDUCTION_COUNT is the BEAM timeslice budget (since OTP 23: 4000).
 * Matches torque's REDUCTION_COUNT. */
#define LZ4_NIF_BYTES_PER_REDUCTION 100
#define LZ4_NIF_REDUCTION_COUNT     4000

/* Returns a timeslice percentage in [1, 100] proportional to bytes
   processed. Mirrors torque's timeslice_percent helper. */
static inline int
timeslice_percent(size_t bytes) {
    size_t reds = bytes / LZ4_NIF_BYTES_PER_REDUCTION;
    long pct = (long)((reds * 100) / LZ4_NIF_REDUCTION_COUNT);
    if (pct < 1)   pct = 1;
    if (pct > 100) pct = 100;
    return (int)pct;
}

/* ---- atoms cached at load time -------------------------------------- */

static ERL_NIF_TERM atom_ok;
static ERL_NIF_TERM atom_error;
static ERL_NIF_TERM atom_alloc_failed;
static ERL_NIF_TERM atom_compress_failed;
static ERL_NIF_TERM atom_decompress_failed;
static ERL_NIF_TERM atom_badarg;

static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) {
    (void)priv_data;
    (void)load_info;
    atom_ok                = enif_make_atom(env, "ok");
    atom_error             = enif_make_atom(env, "error");
    atom_alloc_failed      = enif_make_atom(env, "alloc_failed");
    atom_compress_failed   = enif_make_atom(env, "compress_failed");
    atom_decompress_failed = enif_make_atom(env, "decompress_failed");
    atom_badarg            = enif_make_atom(env, "badarg");
    return 0;
}

/* ---- helpers --------------------------------------------------------- */

static inline ERL_NIF_TERM
make_error(ErlNifEnv *env, ERL_NIF_TERM reason) {
    return enif_make_tuple2(env, atom_error, reason);
}

/* ---- compress -------------------------------------------------------- */

static ERL_NIF_TERM
compress_impl(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
    (void)argc;
    ErlNifBinary in;
    if (!enif_inspect_binary(env, argv[0], &in)) {
        return make_error(env, atom_badarg);
    }
    if (in.size > (size_t)LZ4_MAX_INPUT_SIZE) {
        return make_error(env, atom_badarg);
    }

    int bound = LZ4_compressBound((int)in.size);
    if (bound <= 0) {
        return make_error(env, atom_badarg);
    }

    ErlNifBinary out;
    if (!enif_alloc_binary((size_t)bound, &out)) {
        return make_error(env, atom_alloc_failed);
    }

    int written = LZ4_compress_default(
        (const char *)in.data,
        (char *)out.data,
        (int)in.size,
        bound);

    if (written <= 0) {
        enif_release_binary(&out);
        return make_error(env, atom_compress_failed);
    }

    if (!enif_realloc_binary(&out, (size_t)written)) {
        enif_release_binary(&out);
        return make_error(env, atom_alloc_failed);
    }

    enif_consume_timeslice(env, timeslice_percent(in.size));
    return enif_make_tuple2(env, atom_ok, enif_make_binary(env, &out));
}

/* compress/1 — entry point that dispatches small inputs inline and
   large inputs to a dirty CPU scheduler. */
static ERL_NIF_TERM
nif_compress_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
    ErlNifBinary in;
    if (!enif_inspect_binary(env, argv[0], &in)) {
        return make_error(env, atom_badarg);
    }
    if (in.size > LZ4_NIF_DIRTY_THRESHOLD) {
        return enif_schedule_nif(env, "compress_dirty", ERL_NIF_DIRTY_JOB_CPU_BOUND,
                                 compress_impl, argc, argv);
    }
    return compress_impl(env, argc, argv);
}

/* compress/2 — same as compress/1 but accepts an options list.
   Options are currently ignored (reserved for future expansion: e.g.,
   `hc` for high-compression mode). The two-arity form exists for API
   parity with szktty/erlang-lz4 so existing callers can migrate
   without rewriting. */
static ERL_NIF_TERM
nif_compress_2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
    /* Validate that argv[1] is a list, but don't interpret its contents
       yet — silently accept and ignore options for forward compat. */
    if (!enif_is_list(env, argv[1])) {
        return make_error(env, atom_badarg);
    }
    return nif_compress_1(env, argc, argv);
}

/* ---- uncompress ------------------------------------------------------ */

static ERL_NIF_TERM
uncompress_impl(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
    (void)argc;
    ErlNifBinary in;
    unsigned long orig_size;

    if (!enif_inspect_binary(env, argv[0], &in)) {
        return make_error(env, atom_badarg);
    }
    if (!enif_get_ulong(env, argv[1], &orig_size)) {
        return make_error(env, atom_badarg);
    }
    if (orig_size > (unsigned long)LZ4_MAX_INPUT_SIZE) {
        return make_error(env, atom_badarg);
    }

    ErlNifBinary out;
    if (!enif_alloc_binary((size_t)orig_size, &out)) {
        return make_error(env, atom_alloc_failed);
    }

    int written = LZ4_decompress_safe(
        (const char *)in.data,
        (char *)out.data,
        (int)in.size,
        (int)orig_size);

    if (written < 0 || (unsigned long)written != orig_size) {
        enif_release_binary(&out);
        return make_error(env, atom_decompress_failed);
    }

    enif_consume_timeslice(env, timeslice_percent((size_t)orig_size));
    return enif_make_tuple2(env, atom_ok, enif_make_binary(env, &out));
}

static ERL_NIF_TERM
nif_uncompress_2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
    ErlNifBinary in;
    unsigned long orig_size;
    if (!enif_inspect_binary(env, argv[0], &in)) {
        return make_error(env, atom_badarg);
    }
    if (!enif_get_ulong(env, argv[1], &orig_size)) {
        return make_error(env, atom_badarg);
    }
    if (orig_size > LZ4_NIF_DIRTY_THRESHOLD) {
        return enif_schedule_nif(env, "uncompress_dirty",
                                 ERL_NIF_DIRTY_JOB_CPU_BOUND,
                                 uncompress_impl, argc, argv);
    }
    return uncompress_impl(env, argc, argv);
}

/* ---- entry table ---------------------------------------------------- */

static ErlNifFunc nif_functions[] = {
    {"compress",   1, nif_compress_1,   0},
    {"compress",   2, nif_compress_2,   0},
    {"uncompress", 2, nif_uncompress_2, 0}
};

ERL_NIF_INIT(lz4_nif, nif_functions, load, NULL, NULL, NULL)