Skip to main content

native/exmpeg_native/src/progress.rs

//! Throttled progress reporting from inside a long-running NIF call.
//!
//! Each operation creates a `ProgressEmitter` once; the hot loops call
//! `tick(stats)` after every meaningful step. The emitter coalesces
//! ticks so the caller receives at most one message every
//! `MIN_INTERVAL` (default 100 ms). The final tick at end-of-stream is
//! sent unconditionally via `finish` so the caller sees the closing
//! counts.
//!
//! Implementation note: `Env::send` from inside a NIF requires the
//! calling NIF's env (a process-bound, scheduler-thread-managed env).
//! `OwnedEnv::send_and_clear` panics on managed threads. We capture the
//! raw `NIF_ENV` pointer from the entry-point env at construction time
//! and reconstruct an `Env<'_>` for each emission. This is sound: the
//! pointer is valid for the duration of the NIF call (Rustler
//! guarantees this), and the emitter cannot outlive the call because
//! the NIF function consumes ownership of it before returning.

use std::time::{Duration, Instant};

use rsmpeg::ffi;
use rustler::types::LocalPid;
use rustler::wrapper::NIF_ENV;
use rustler::{Encoder, Env, NifMap};

mod atoms {
    rustler::atoms! {
        exmpeg_progress,
    }
}

const MIN_INTERVAL: Duration = Duration::from_millis(100);

/// Snapshot sent to the caller. Field naming mirrors the per-operation
/// stats so subscribers can render the same UI for any op.
#[derive(Debug, Clone, NifMap)]
pub(crate) struct ProgressUpdate {
    /// What this NIF is doing - `"probe"`, `"remux"`, `"transcode"`,
    /// `"extract_audio"`, `"extract_frame"`, `"concat"`.
    pub(crate) op: String,
    /// Packets handed to the muxer so far (or decoded for ops without
    /// a writable output, e.g. probe).
    pub(crate) packets_written: u64,
    /// Current output PTS in seconds. `0.0` when unknown.
    pub(crate) current_pts_s: f64,
    /// Container duration in seconds, if known up front (input.duration
    /// from the demuxer). `0.0` when unknown.
    pub(crate) total_duration_s: f64,
}

pub(crate) struct ProgressEmitter {
    inner: Option<Inner>,
}

struct Inner {
    pid: LocalPid,
    /// Raw `NIF_ENV` pointer for the calling process. Valid for the
    /// lifetime of the NIF call that constructed this emitter.
    env_ptr: NIF_ENV,
    op: &'static str,
    total_duration_s: f64,
    last_emit: Option<Instant>,
}

impl ProgressEmitter {
    /// Build an emitter. `env` is the calling NIF's environment (used
    /// to send messages from the dirty-scheduler thread). `pid` is the
    /// BEAM pid messages go to; if `None`, the emitter is a no-op.
    pub(crate) fn new(
        env: Env<'_>,
        pid: Option<LocalPid>,
        op: &'static str,
        total_duration_s: f64,
    ) -> Self {
        Self {
            inner: pid.map(|pid| Inner {
                pid,
                env_ptr: env.as_c_arg(),
                op,
                total_duration_s,
                last_emit: None,
            }),
        }
    }

    pub(crate) fn from_av_duration(
        env: Env<'_>,
        pid: Option<LocalPid>,
        op: &'static str,
        av_duration_ticks: i64,
    ) -> Self {
        let total = if av_duration_ticks > 0 {
            av_duration_ticks as f64 / f64::from(ffi::AV_TIME_BASE)
        } else {
            0.0
        };
        Self::new(env, pid, op, total)
    }

    /// Send a progress message unless we sent one within the throttle
    /// window. Cheap when there's no subscriber.
    pub(crate) fn tick(&mut self, packets_written: u64, current_pts_s: f64) {
        let Some(inner) = self.inner.as_mut() else {
            return;
        };
        let now = Instant::now();
        if let Some(last) = inner.last_emit {
            if now.duration_since(last) < MIN_INTERVAL {
                return;
            }
        }
        inner.last_emit = Some(now);
        send(inner, packets_written, current_pts_s);
    }

    /// Send the closing tick unconditionally - bypasses throttling so
    /// the caller always observes the final counters.
    pub(crate) fn finish(&mut self, packets_written: u64, current_pts_s: f64) {
        let Some(inner) = self.inner.as_mut() else {
            return;
        };
        inner.last_emit = Some(Instant::now());
        send(inner, packets_written, current_pts_s);
    }
}

fn send(inner: &Inner, packets_written: u64, current_pts_s: f64) {
    let update = ProgressUpdate {
        op: inner.op.to_owned(),
        packets_written,
        current_pts_s,
        total_duration_s: inner.total_duration_s,
    };
    let env = crate::ffi_helpers::reconstruct_env(inner.env_ptr);
    // Errors here are advisory: process gone, mailbox full, etc. Never
    // block a transcode on a slow subscriber.
    let _ = env.send(&inner.pid, (atoms::exmpeg_progress(), update).encode(env));
}