Skip to main content

native/ertmp/src/lib.rs

use bytes::Bytes;
use rtmp::{
    AudioChannels, AudioConfig, AudioData, RtmpAudioCodec, RtmpClient, RtmpClientConfig,
    RtmpVideoCodec, TrackId, VideoConfig, VideoData,
};
use rustler::{Atom, Binary, Env, Error, NifResult, ResourceArc, Term};
use std::sync::Mutex;
use std::time::Duration;

mod atoms {
    rustler::atoms! {
        ok,
        error,
        h264,
        vp8,
        vp9,
        aac,
        opus,
        mono,
        stereo,
    }
}

pub struct ClientResource(Mutex<Option<RtmpClient>>);

#[allow(non_local_definitions)]
fn on_load(env: Env, _: Term) -> bool {
    rustler::resource!(ClientResource, env)
}

rustler::init!("Elixir.Membrane.ERTMP.Native", load = on_load);

#[rustler::nif(schedule = "DirtyIo")]
fn connect(
    host: String,
    port: u16,
    app: String,
    stream_key: String,
    use_tls: bool,
) -> NifResult<ResourceArc<ClientResource>> {
    let config = RtmpClientConfig::new(host, app, stream_key)
        .with_port(port)
        .with_tls(use_tls);
    RtmpClient::connect(config)
        .map(|client| ResourceArc::new(ClientResource(Mutex::new(Some(client)))))
        .map_err(|e| Error::Term(Box::new(e.to_string())))
}

#[rustler::nif(schedule = "DirtyIo")]
fn send_video_config(
    resource: ResourceArc<ClientResource>,
    track_id: u8,
    codec: Atom,
    data: Binary,
) -> NifResult<Atom> {
    let codec = parse_video_codec(codec)?;
    let event = VideoConfig {
        track_id: TrackId(track_id),
        codec,
        data: Bytes::copy_from_slice(data.as_slice()),
    };
    with_client(&resource, |c| c.send(event))
}

#[rustler::nif(schedule = "DirtyIo")]
fn send_video(
    resource: ResourceArc<ClientResource>,
    track_id: u8,
    codec: Atom,
    pts_ns: u64,
    dts_ns: u64,
    data: Binary,
    is_keyframe: bool,
) -> NifResult<Atom> {
    let codec = parse_video_codec(codec)?;
    let event = VideoData {
        track_id: TrackId(track_id),
        codec,
        pts: Duration::from_nanos(pts_ns),
        dts: Duration::from_nanos(dts_ns),
        data: Bytes::copy_from_slice(data.as_slice()),
        is_keyframe,
    };
    with_client(&resource, |c| c.send(event))
}

#[rustler::nif(schedule = "DirtyIo")]
fn send_audio_config(
    resource: ResourceArc<ClientResource>,
    track_id: u8,
    codec: Atom,
    data: Binary,
    channels: Atom,
) -> NifResult<Atom> {
    let codec = parse_audio_codec(codec)?;
    let channels = parse_audio_channels(channels)?;
    let event = AudioConfig {
        track_id: TrackId(track_id),
        codec,
        data: Bytes::copy_from_slice(data.as_slice()),
        channels,
    };
    with_client(&resource, |c| c.send(event))
}

#[rustler::nif(schedule = "DirtyIo")]
fn send_audio(
    resource: ResourceArc<ClientResource>,
    track_id: u8,
    codec: Atom,
    pts_ns: u64,
    data: Binary,
) -> NifResult<Atom> {
    let codec = parse_audio_codec(codec)?;
    let event = AudioData {
        track_id: TrackId(track_id),
        codec,
        pts: Duration::from_nanos(pts_ns),
        data: Bytes::copy_from_slice(data.as_slice()),
    };
    with_client(&resource, |c| c.send(event))
}

fn with_client<F>(resource: &ResourceArc<ClientResource>, f: F) -> NifResult<Atom>
where
    F: FnOnce(&mut RtmpClient) -> Result<(), rtmp::RtmpStreamError>,
{
    let mut guard = resource.0.lock().map_err(|_| Error::BadArg)?;
    let client = guard.as_mut().ok_or(Error::BadArg)?;
    f(client).map_err(|e| Error::Term(Box::new(e.to_string())))?;
    Ok(atoms::ok())
}

fn parse_video_codec(atom: Atom) -> NifResult<RtmpVideoCodec> {
    if atom == atoms::h264() {
        Ok(RtmpVideoCodec::H264)
    } else if atom == atoms::vp8() {
        Ok(RtmpVideoCodec::Vp8)
    } else if atom == atoms::vp9() {
        Ok(RtmpVideoCodec::Vp9)
    } else {
        Err(Error::BadArg)
    }
}

fn parse_audio_codec(atom: Atom) -> NifResult<RtmpAudioCodec> {
    if atom == atoms::aac() {
        Ok(RtmpAudioCodec::Aac)
    } else if atom == atoms::opus() {
        Ok(RtmpAudioCodec::Opus)
    } else {
        Err(Error::BadArg)
    }
}

fn parse_audio_channels(atom: Atom) -> NifResult<AudioChannels> {
    if atom == atoms::mono() {
        Ok(AudioChannels::Mono)
    } else if atom == atoms::stereo() {
        Ok(AudioChannels::Stereo)
    } else {
        Err(Error::BadArg)
    }
}