lib/membrane_rtmp_plugin/rtmp/sink/sink.ex

defmodule Membrane.RTMP.Sink do
  @moduledoc """
  Membrane element being client-side of RTMP streams.
  To work successfuly it requires to receive both audio and video streams in AAC and H264 format respectively. Currently it supports only:
    - RTMP proper - "plain" RTMP protocol
    - RTMPS - RTMP over TLS/SSL
  other RTMP veriants - RTMPT, RTMPE, RTMFP are not supported.
  Implementation based on FFmpeg.
  """
  use Membrane.Sink

  require Membrane.Logger

  alias __MODULE__.Native
  alias Membrane.{AAC, MP4}

  @supported_protocols ["rtmp://", "rtmps://"]
  @connection_attempt_interval 500
  @default_state %{
    attempts: 0,
    native: nil,
    buffered_frame: nil,
    ready: false,
    current_timestamps: %{}
  }

  def_input_pad :audio,
    availability: :always,
    caps: AAC,
    mode: :pull,
    demand_unit: :buffers

  def_input_pad :video,
    availability: :always,
    caps: MP4.Payload,
    mode: :pull,
    demand_unit: :buffers

  def_options rtmp_url: [
                type: :string,
                spec: String.t(),
                description:
                  "Destination URL of the stream. It needs to start with rtmp:// or rtmps:// depending on the protocol variant.
                This URL should be provided by your streaming service."
              ],
              max_attempts: [
                type: :integer,
                spec: pos_integer(),
                default: 1,
                description: """
                Maximum number of connection attempts before failing with an error.
                The attempts will happen every #{@connection_attempt_interval} ms
                """
              ]

  @impl true
  def handle_init(options) do
    unless String.starts_with?(options.rtmp_url, @supported_protocols) do
      raise ArgumentError, "Invalid destination URL provided"
    end

    unless is_integer(options.max_attempts) and options.max_attempts >= 1 do
      raise ArgumentError, "Invalid max_attempts option value: #{options.max_attempts}"
    end

    {:ok, options |> Map.from_struct() |> Map.merge(@default_state)}
  end

  @impl true
  def handle_prepared_to_playing(_ctx, state) do
    {:ok, native} = Native.create(state.rtmp_url)
    send(self(), :try_connect)

    {{:ok, playback_change: :suspend}, %{state | native: native}}
  end

  @impl true
  def handle_playing_to_prepared(_ctx, state) do
    state = Map.merge(state, @default_state)
    Membrane.Logger.debug("Stream correctly closed")
    {:ok, state}
  end

  @impl true
  def handle_caps(
        :video,
        %MP4.Payload{content: %MP4.Payload.AVC1{avcc: avc_config}} = caps,
        _ctx,
        state
      ) do
    case Native.init_video_stream(state.native, caps.width, caps.height, avc_config) do
      {:ok, ready, native} ->
        Membrane.Logger.debug("Correctly initialized video stream.")
        state = Map.merge(state, %{native: native, ready: ready})
        {:ok, state}

      {:error, :caps_resent} ->
        Membrane.Logger.error(
          "Input caps redefined on pad :video. RTMP Sink does not support changing stream parameters"
        )

        {:ok, state}
    end
  end

  @impl true
  def handle_caps(:audio, %Membrane.AAC{} = caps, _ctx, state) do
    profile = AAC.profile_to_aot_id(caps.profile)
    sr_index = AAC.sample_rate_to_sampling_frequency_id(caps.sample_rate)
    channel_configuration = AAC.channels_to_channel_config_id(caps.channels)
    frame_length_id = AAC.samples_per_frame_to_frame_length_id(caps.samples_per_frame)

    aac_config =
      <<profile::5, sr_index::4, channel_configuration::4, frame_length_id::1, 0::1, 0::1>>

    case Native.init_audio_stream(state.native, caps.channels, caps.sample_rate, aac_config) do
      {:ok, ready, native} ->
        Membrane.Logger.debug("Correctly initialized audio stream.")
        state = Map.merge(state, %{native: native, ready: ready})
        {:ok, state}

      {:error, :caps_resent} ->
        Membrane.Logger.error(
          "Input caps redefined on pas :audio. RTMP Sink does not support changing stream paremeters"
        )

        {:ok, state}
    end
  end

  @impl true
  def handle_write(pad, buffer, _ctx, %{ready: false} = state) do
    state = Map.put(state, :buffered_frame, {pad, buffer})
    {:ok, state}
  end

  @impl true
  def handle_write(
        pad,
        buffer,
        _ctx,
        %{ready: true, buffered_frame: {frame_pad, frame}} = state
      ) do
    state = write_frame(state, frame_pad, frame) |> write_frame(pad, buffer)
    {{:ok, demand: get_demand(state)}, Map.put(state, :buffered_frame, nil)}
  end

  @impl true
  def handle_write(pad, buffer, _ctx, state) do
    state = write_frame(state, pad, buffer)
    {{:ok, demand: get_demand(state)}, state}
  end

  @impl true
  def handle_end_of_stream(pad, ctx, state) do
    if ctx.pads |> Map.values() |> Enum.all?(& &1.end_of_stream?) do
      Native.finalize_stream(state.native)
      {:ok, state}
    else
      state = Map.update!(state, :current_timestamps, &Map.delete(&1, pad))
      {{:ok, demand: get_demand(state)}, state}
    end
  end

  @impl true
  def handle_other(:try_connect, _ctx, %{attempts: attempts, max_attempts: max_attempts} = state)
      when attempts >= max_attempts do
    raise "Failed to connect to '#{state.rtmp_url}' #{attempts} times, aborting"
  end

  def handle_other(:try_connect, ctx, state) do
    state = %{state | attempts: state.attempts + 1}

    case Native.try_connect(state.native) do
      :ok ->
        Membrane.Logger.debug("Correctly initialized connection with: #{state.rtmp_url}")
        demands = ctx.pads |> Map.keys() |> Enum.map(&{:demand, &1})
        {{:ok, [{:playback_change, :resume} | demands]}, state}

      {:error, :econnrefused} ->
        Process.send_after(self(), :try_connect, @connection_attempt_interval)

        Membrane.Logger.warn(
          "Connection to #{state.rtmp_url} refused, retrying in #{@connection_attempt_interval}ms"
        )

        {:ok, state}

      {:error, reason} ->
        raise "Failed to connect to '#{state.rtmp_url}': #{reason}"
    end
  end

  defp write_frame(state, :audio, buffer) do
    buffer_pts = buffer.pts |> Ratio.ceil()

    case Native.write_audio_frame(state.native, buffer.payload, buffer_pts) do
      {:ok, native} ->
        Map.put(state, :native, native)
        |> Map.update!(:current_timestamps, fn curr_tmps ->
          Map.put(curr_tmps, :audio, buffer_pts)
        end)

      {:error, reason} ->
        raise("Writing audio frame failed with reason: #{reason}")
    end
  end

  defp write_frame(state, :video, buffer) do
    case Native.write_video_frame(
           state.native,
           buffer.payload,
           buffer.dts,
           buffer.metadata.h264.key_frame?
         ) do
      {:ok, native} ->
        Map.put(state, :native, native)
        |> Map.update!(:current_timestamps, fn curr_tmps ->
          Map.put(curr_tmps, :video, buffer.dts)
        end)

      {:error, reason} ->
        raise("Writing video frame failed with reason: #{reason}")
    end
  end

  defp get_demand(state) do
    {pad, _timestamp} =
      state.current_timestamps |> Enum.min_by(fn {_pad, timestamp} -> timestamp end)

    {pad, 1}
  end
end