/*
* lz4_nif — Erlang NIF wrapper around the LZ4 block-format codec.
*
* API:
* lz4_nif:compress(Binary) -> {ok, Binary} | {error, atom()}.
* lz4_nif:compress(Binary, Options) -> {ok, Binary} | {error, atom()}.
* lz4_nif:uncompress(Binary, OrigSize) -> {ok, Binary} | {error, atom()}.
*
* Implementation notes:
* - Raw NIF (no rustler / no enif wrappers). Same approach as knot — for
* small/fast operations the rustler runtime layer dominates the cost.
* For lz4 the work itself is real, but the boundary is on the same
* hot paths (Cassandra frame encode/decode in marina), so we still
* want minimal dispatch overhead.
* - Inputs over LZ4_NIF_DIRTY_THRESHOLD bytes (default 20 KB) are
* dispatched to the dirty CPU scheduler so they don't block normal
* schedulers. The threshold matches torque's "dispatch on >20 KB"
* convention.
* - Output binaries are allocated via enif_alloc_binary and shrunk to
* the actual compressed/decompressed size before returning.
*
* License: MIT. The vendored LZ4 library in c_src/lz4/ is BSD-2-Clause,
* copyright Yann Collet — see LICENSE.
*/
#include <erl_nif.h>
#include <string.h>
#include "lz4/lz4.h"
/* Inputs above this are handed to a dirty CPU scheduler. Matches torque's
convention (20 KB). Smaller inputs run inline and report their cost
via enif_consume_timeslice instead. */
#define LZ4_NIF_DIRTY_THRESHOLD (20 * 1024)
/* Cost model for the inline path's reduction accounting.
*
* BYTES_PER_REDUCTION estimates how many input bytes LZ4 processes per
* BEAM reduction. LZ4_compress_default is ~500 MB/s on a modern x86
* core; with a default timeslice of ~1ms = 4000 reductions that's
* roughly 125 bytes/reduction. We round down to 100 to overstate the
* cost slightly (more conservative scheduling, fairer to other
* processes). Decompression is faster (~2 GB/s) but reuses the same
* constant to keep things simple -- the upper bound on slack is
* acceptable.
*
* REDUCTION_COUNT is the BEAM timeslice budget (since OTP 23: 4000).
* Matches torque's REDUCTION_COUNT. */
#define LZ4_NIF_BYTES_PER_REDUCTION 100
#define LZ4_NIF_REDUCTION_COUNT 4000
/* Returns a timeslice percentage in [1, 100] proportional to bytes
processed. Mirrors torque's timeslice_percent helper. */
static inline int
timeslice_percent(size_t bytes) {
size_t reds = bytes / LZ4_NIF_BYTES_PER_REDUCTION;
long pct = (long)((reds * 100) / LZ4_NIF_REDUCTION_COUNT);
if (pct < 1) pct = 1;
if (pct > 100) pct = 100;
return (int)pct;
}
/* ---- atoms cached at load time -------------------------------------- */
static ERL_NIF_TERM atom_ok;
static ERL_NIF_TERM atom_error;
static ERL_NIF_TERM atom_alloc_failed;
static ERL_NIF_TERM atom_compress_failed;
static ERL_NIF_TERM atom_decompress_failed;
static ERL_NIF_TERM atom_badarg;
static int load(ErlNifEnv *env, void **priv_data, ERL_NIF_TERM load_info) {
(void)priv_data;
(void)load_info;
atom_ok = enif_make_atom(env, "ok");
atom_error = enif_make_atom(env, "error");
atom_alloc_failed = enif_make_atom(env, "alloc_failed");
atom_compress_failed = enif_make_atom(env, "compress_failed");
atom_decompress_failed = enif_make_atom(env, "decompress_failed");
atom_badarg = enif_make_atom(env, "badarg");
return 0;
}
/* ---- helpers --------------------------------------------------------- */
static inline ERL_NIF_TERM
make_error(ErlNifEnv *env, ERL_NIF_TERM reason) {
return enif_make_tuple2(env, atom_error, reason);
}
/* ---- compress -------------------------------------------------------- */
static ERL_NIF_TERM
compress_impl(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
(void)argc;
ErlNifBinary in;
if (!enif_inspect_binary(env, argv[0], &in)) {
return make_error(env, atom_badarg);
}
if (in.size > (size_t)LZ4_MAX_INPUT_SIZE) {
return make_error(env, atom_badarg);
}
int bound = LZ4_compressBound((int)in.size);
if (bound <= 0) {
return make_error(env, atom_badarg);
}
ErlNifBinary out;
if (!enif_alloc_binary((size_t)bound, &out)) {
return make_error(env, atom_alloc_failed);
}
int written = LZ4_compress_default(
(const char *)in.data,
(char *)out.data,
(int)in.size,
bound);
if (written <= 0) {
enif_release_binary(&out);
return make_error(env, atom_compress_failed);
}
if (!enif_realloc_binary(&out, (size_t)written)) {
enif_release_binary(&out);
return make_error(env, atom_alloc_failed);
}
enif_consume_timeslice(env, timeslice_percent(in.size));
return enif_make_tuple2(env, atom_ok, enif_make_binary(env, &out));
}
/* compress/1 — entry point that dispatches small inputs inline and
large inputs to a dirty CPU scheduler. */
static ERL_NIF_TERM
nif_compress_1(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
ErlNifBinary in;
if (!enif_inspect_binary(env, argv[0], &in)) {
return make_error(env, atom_badarg);
}
if (in.size > LZ4_NIF_DIRTY_THRESHOLD) {
return enif_schedule_nif(env, "compress_dirty", ERL_NIF_DIRTY_JOB_CPU_BOUND,
compress_impl, argc, argv);
}
return compress_impl(env, argc, argv);
}
/* compress/2 — same as compress/1 but accepts an options list.
Options are currently ignored (reserved for future expansion: e.g.,
`hc` for high-compression mode). The two-arity form exists for API
parity with szktty/erlang-lz4 so existing callers can migrate
without rewriting. */
static ERL_NIF_TERM
nif_compress_2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
/* Validate that argv[1] is a list, but don't interpret its contents
yet — silently accept and ignore options for forward compat. */
if (!enif_is_list(env, argv[1])) {
return make_error(env, atom_badarg);
}
return nif_compress_1(env, argc, argv);
}
/* ---- uncompress ------------------------------------------------------ */
static ERL_NIF_TERM
uncompress_impl(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
(void)argc;
ErlNifBinary in;
unsigned long orig_size;
if (!enif_inspect_binary(env, argv[0], &in)) {
return make_error(env, atom_badarg);
}
if (!enif_get_ulong(env, argv[1], &orig_size)) {
return make_error(env, atom_badarg);
}
if (orig_size > (unsigned long)LZ4_MAX_INPUT_SIZE) {
return make_error(env, atom_badarg);
}
ErlNifBinary out;
if (!enif_alloc_binary((size_t)orig_size, &out)) {
return make_error(env, atom_alloc_failed);
}
int written = LZ4_decompress_safe(
(const char *)in.data,
(char *)out.data,
(int)in.size,
(int)orig_size);
if (written < 0 || (unsigned long)written != orig_size) {
enif_release_binary(&out);
return make_error(env, atom_decompress_failed);
}
enif_consume_timeslice(env, timeslice_percent((size_t)orig_size));
return enif_make_tuple2(env, atom_ok, enif_make_binary(env, &out));
}
static ERL_NIF_TERM
nif_uncompress_2(ErlNifEnv *env, int argc, const ERL_NIF_TERM argv[]) {
ErlNifBinary in;
unsigned long orig_size;
if (!enif_inspect_binary(env, argv[0], &in)) {
return make_error(env, atom_badarg);
}
if (!enif_get_ulong(env, argv[1], &orig_size)) {
return make_error(env, atom_badarg);
}
if (orig_size > LZ4_NIF_DIRTY_THRESHOLD) {
return enif_schedule_nif(env, "uncompress_dirty",
ERL_NIF_DIRTY_JOB_CPU_BOUND,
uncompress_impl, argc, argv);
}
return uncompress_impl(env, argc, argv);
}
/* ---- entry table ---------------------------------------------------- */
static ErlNifFunc nif_functions[] = {
{"compress", 1, nif_compress_1, 0},
{"compress", 2, nif_compress_2, 0},
{"uncompress", 2, nif_uncompress_2, 0}
};
ERL_NIF_INIT(lz4_nif, nif_functions, load, NULL, NULL, NULL)