lib/membrane_rtc_engine/endpoints/hls_endpoint.ex

if Enum.all?(
     [
       Membrane.H264.FFmpeg.Parser,
       Membrane.HTTPAdaptiveStream.SinkBin,
       Membrane.AAC.FDK.Encoder,
       Membrane.AAC.Parser
     ],
     &Code.ensure_loaded?/1
   ) do
  defmodule Membrane.RTC.Engine.Endpoint.HLS do
    @moduledoc """
    An Endpoint responsible for converting incoming tracks to HLS playlist.

    This module requires the following plugins to be present in your `mix.exs`:
    * membrane_http_adaptive_stream_plugin,
    * membrane_mp4_plugin,
    * membrane_aac_plugin,
    * membrane_aac_fdk_plugin,
    """
    use Membrane.Bin
    alias Membrane.RTC.Engine
    require Membrane.Logger

    def_input_pad :input,
      demand_unit: :buffers,
      caps: :any,
      availability: :on_request

    def_options rtc_engine: [
                  spec: pid(),
                  description: "Pid of parent Engine"
                ],
                output_directory: [
                  spec: Path.t(),
                  description: "Path to directory under which HLS output will be saved",
                  default: "hls_output"
                ],
                owner: [
                  spec: pid(),
                  description: """
                  Pid of parent all notifications will be send to.

                  These notifications are:
                    * `{:playlist_playable, content_type, stream_id}`
                    * `{:cleanup, clean_function, stream_id}`
                  """
                ],
                framerate: [
                  spec: {integer(), integer()},
                  description: """
                  Framerate of tracks
                  """,
                  default: {30, 1}
                ],
                hls_mode: [
                  spec: :separate_av | :muxed_av,
                  default: :separate_av,
                  description: """
                  Defines output mode for `Membrane.HTTPAdaptiveStream.SinkBin`.

                  - `:separate_av` - audio and video tracks will be separated
                  - `:muxed_av` - audio will be attached to every video track
                  """
                ]

    @impl true
    def handle_init(opts) do
      state = %{
        rtc_engine: opts.rtc_engine,
        tracks: %{},
        stream_ids: MapSet.new(),
        output_directory: opts.output_directory,
        owner: opts.owner,
        framerate: opts.framerate,
        hls_mode: opts.hls_mode
      }

      {:ok, state}
    end

    @impl true
    def handle_other({:new_tracks, tracks}, ctx, state) do
      {:endpoint, endpoint_id} = ctx.name
      tracks = Enum.filter(tracks, fn track -> :raw in track.format end)

      Enum.each(tracks, fn track ->
        case Engine.subscribe(state.rtc_engine, endpoint_id, track.id, :raw) do
          :ok ->
            {:ok, put_in(state, [:tracks, track.id], track)}

          {:error, :invalid_track_id} ->
            Membrane.Logger.debug("""
            Couldn't subscribe to track: #{inspect(track.id)}. No such track.
            It had to be removed just after publishing it. Ignoring.
            """)

          {:error, reason} ->
            raise "Couldn't subscribe for track: #{inspect(track.id)}. Reason: #{inspect(reason)}"
        end
      end)
    end

    @impl true
    def handle_other(msg, _ctx, state) do
      Membrane.Logger.warn("Unexpected message: #{inspect(msg)}. Ignoring.")
      {:ok, state}
    end

    def handle_notification(
          {:track_playable, {content_type, _track_id}},
          {:hls_sink_bin, stream_id},
          _ctx,
          state
        ) do
      # notify about playable just when video becomes available
      send(state.owner, {:playlist_playable, content_type, stream_id})
      {:ok, state}
    end

    def handle_notification(
          {:cleanup, clean_function},
          {:hls_sink_bin, stream_id},
          _ctx,
          state
        ) do
      # notify about possibility to cleanup as the stream is finished.
      send(state.owner, {:cleanup, clean_function, stream_id})
      {:ok, state}
    end

    @impl true
    def handle_notification(notification, _element, _context, state) do
      Membrane.Logger.warn("Unexpected notification: #{inspect(notification)}. Ignoring.")
      {:ok, state}
    end

    @impl true
    def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
      link_builder = link_bin_input(pad)
      track = Map.get(state.tracks, track_id)

      directory = Path.join(state.output_directory, track.stream_id)

      # remove directory if it already exists
      File.rm_rf(directory)
      File.mkdir_p!(directory)

      spec =
        hls_links_and_children(
          link_builder,
          track.encoding,
          track_id,
          track.stream_id,
          state.framerate
        )

      {spec, state} =
        if MapSet.member?(state.stream_ids, track.stream_id) do
          {spec, state}
        else
          hls_sink_bin = %Membrane.HTTPAdaptiveStream.SinkBin{
            manifest_module: Membrane.HTTPAdaptiveStream.HLS,
            target_window_duration: 20 |> Membrane.Time.seconds(),
            target_segment_duration: 2 |> Membrane.Time.seconds(),
            persist?: false,
            storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{
              directory: directory
            },
            hls_mode: state.hls_mode
          }

          new_spec = %{
            spec
            | children: Map.put(spec.children, {:hls_sink_bin, track.stream_id}, hls_sink_bin)
          }

          {new_spec, %{state | stream_ids: MapSet.put(state.stream_ids, track.stream_id)}}
        end

      {{:ok, spec: spec}, state}
    end

    defp hls_links_and_children(link_builder, :OPUS, track_id, stream_id, _framerate),
      do: %ParentSpec{
        children: %{
          {:opus_decoder, track_id} => Membrane.Opus.Decoder,
          {:aac_encoder, track_id} => Membrane.AAC.FDK.Encoder,
          {:aac_parser, track_id} => %Membrane.AAC.Parser{out_encapsulation: :none}
        },
        links: [
          link_builder
          |> to({:opus_decoder, track_id})
          |> to({:aac_encoder, track_id})
          |> to({:aac_parser, track_id})
          |> via_in(Pad.ref(:input, {:audio, track_id}), options: [encoding: :AAC])
          |> to({:hls_sink_bin, stream_id})
        ]
      }

    defp hls_links_and_children(link_builder, :AAC, track_id, stream_id, _framerate),
      do: %ParentSpec{
        children: %{},
        links: [
          link_builder
          |> via_in(Pad.ref(:input, {:audio, track_id}), options: [encoding: :AAC])
          |> to({:hls_sink_bin, stream_id})
        ]
      }

    defp hls_links_and_children(link_builder, :H264, track_id, stream_id, framerate),
      do: %ParentSpec{
        children: %{
          {:video_parser, track_id} => %Membrane.H264.FFmpeg.Parser{
            framerate: framerate,
            alignment: :au,
            attach_nalus?: true
          }
        },
        links: [
          link_builder
          |> to({:video_parser, track_id})
          |> via_in(Pad.ref(:input, {:video, track_id}), options: [encoding: :H264])
          |> to({:hls_sink_bin, stream_id})
        ]
      }
  end
end