lib/agora/agora_source.ex

defmodule Membrane.Agora.Source do
  @moduledoc """
  Membrane source that wraps Agora's Server Gateway SDK.

  """
  use Membrane.Source

  alias Membrane.Agora.Source.Native
  alias Membrane.Buffer

  def_output_pad :video,
    accepted_format: %Membrane.H264{alignment: :au},
    flow_control: :push

  def_output_pad :audio,
    accepted_format: %Membrane.RawAudio{sample_rate: 44_100, channels: 2, sample_format: :s16le},
    flow_control: :push

  def_options app_id: [
                spec: String.t(),
                description: """
                ID of an Agora application.
                """
              ],
              channel_name: [
                spec: String.t(),
                descritpion: """
                A name of a channel to which the source should connect.
                """
              ],
              token: [
                spec: String.t(),
                descritpion: """
                A temporary token used for authorization of an access to Agora's channel.
                """
              ],
              user_id: [
                spec: String.t(),
                default: "0",
                description: """
                  User ID, must contain only numbers (0-9).

                  If set to "0" (default), the user ID will be chosen automatically.
                """
              ]

  @impl true
  def handle_init(_ctx, opts) do
    state = %{
      app_id: opts.app_id,
      token: opts.token,
      channel_name: opts.channel_name,
      user_id: opts.user_id,
      native_state: nil,
      peers_ids: MapSet.new(),
      first_video_timestamp: nil,
      first_audio_timestamp: nil
    }

    {[], state}
  end

  @impl true
  def handle_playing(_ctx, state) do
    {:ok, native_state} =
      try do
        Native.create(state.app_id, state.token, state.channel_name, state.user_id, self())
      rescue
        _e in UndefinedFunctionError ->
          raise """
          Couldn't setup NIF. Perhaps you have forgotten to set LD_LIBRARY_PATH:
          export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:#{Path.expand("#{__ENV__.file}/../../../agora_sdk")}
          """

        other_error ->
          raise other_error
      end

    {[
       stream_format: {:video, %Membrane.H264{}},
       stream_format:
         {:audio, %Membrane.RawAudio{channels: 2, sample_rate: 44_100, sample_format: :s16le}},
       notify_parent: :agora_connected
     ], %{state | native_state: native_state}}
  end

  @impl true
  def handle_info({:agora_video_payload, payload, id}, _ctx, state) do
    {dts, state} =
      if state.first_video_timestamp do
        {Membrane.Time.os_time() - state.first_video_timestamp, state}
      else
        {0, %{state | first_video_timestamp: Membrane.Time.os_time()}}
      end

    {[buffer: {:video, %Buffer{payload: payload, metadata: %{id: inspect(id)}, dts: dts}}], state}
  end

  @impl true
  def handle_info({:agora_audio_payload, payload, id_str}, _ctx, state) do
    {dts, state} =
      if state.first_audio_timestamp do
        {Membrane.Time.os_time() - state.first_audio_timestamp, state}
      else
        {0, %{state | first_audio_timestamp: Membrane.Time.os_time()}}
      end

    pts = dts

    {[buffer: {:audio, %Buffer{payload: payload, metadata: %{id: id_str}, pts: pts, dts: dts}}],
     state}
  end

  @impl true
  def handle_info({:user_joined, id_str}, _ctx, state) do
    peers_ids = MapSet.put(state.peers_ids, id_str)
    {[], %{state | peers_ids: peers_ids}}
  end

  @impl true
  def handle_info({:user_left, id_str}, _ctx, state) do
    peers_ids = MapSet.delete(state.peers_ids, id_str)
    state = %{state | peers_ids: peers_ids}

    if MapSet.size(peers_ids) == 0 do
      {[end_of_stream: :video, end_of_stream: :audio], state}
    else
      {[], state}
    end
  end
end