lib/membrane_rtc_engine/endpoints/hls_endpoint.ex

if Enum.all?(
     [
       Membrane.H264.FFmpeg.Parser,
       Membrane.HTTPAdaptiveStream.SinkBin,
       Membrane.Opus.Decoder,
       Membrane.AAC.Parser,
       Membrane.AAC.FDK.Encoder
     ],
     &Code.ensure_loaded?/1
   ) do
  defmodule Membrane.RTC.Engine.Endpoint.HLS do
    @moduledoc """
    An Endpoint responsible for converting incoming tracks to HLS playlist.
    This module requires the following plugins to be present in your `mix.exs` for H264 & OPUS input:
    ```
    [
      :membrane_h264_ffmpeg_plugin,
      :membrane_http_adaptive_stream_plugin,
      :membrane_opus_plugin,
      :membrane_aac_plugin,
      :membrane_aac_fdk_plugin
    ]
    ```
    It can perform mixing audio and composing video (see `Membrane.RTC.Engine.Endpoint.HLS.MixerConfig`),
    in such case these plugins are also needed:
    ```
    [
      :membrane_video_compositor_plugin,
      :membrane_audio_mix_plugin,
      :membrane_generator_plugin,
      :membrane_realtimer_plugin.
      :membrane_audio_filler_plugin
    ]
    ```
    """
    use Membrane.Bin

    require Membrane.Logger

    alias Membrane.RTC.Engine

    alias Membrane.RTC.Engine.Endpoint.HLS.{
      HLSConfig,
      MixerConfig,
      StreamFormatUpdater
    }

    alias Membrane.RTC.Engine.Endpoint.WebRTC.TrackReceiver
    alias Membrane.RTC.Engine.Track
    alias Membrane.VideoCompositor.RustStructs.BaseVideoPlacement

    @compositor_deps [
      Membrane.H264.FFmpeg.Decoder,
      Membrane.H264.FFmpeg.Encoder,
      Membrane.BlankVideoGenerator,
      Membrane.Realtimer
    ]
    @audio_mixer_deps [
      Membrane.AudioMixer,
      Membrane.AAC.Parser,
      Membrane.AAC.FDK.Encoder,
      Membrane.SilenceGenerator,
      Membrane.Realtimer
    ]

    @initial_placement %BaseVideoPlacement{
      position: {0, 0},
      size: {100, 100},
      z_value: 0.0
    }

    @track_children [
      :opus_decoder,
      :aac_encoder,
      :aac_parser,
      :video_parser,
      :decoder,
      :framerate_converter,
      :track_receiver,
      :depayloader,
      :blank,
      :realtimer,
      :audio_filler,
      :stream_format_updater
    ]

    @common_children [
      :fake_source,
      :video_realtimer,
      :silence_generator,
      :audio_realtimer,
      :audio_mixer,
      :aac_encoder,
      :aac_parser,
      :compositor,
      :encoder,
      :video_parser_out,
      {:hls_sink_bin, :muxed}
    ]

    def_input_pad :input,
      demand_unit: :buffers,
      accepted_format: _any,
      availability: :on_request

    def_options rtc_engine: [
                  spec: pid(),
                  description: "Pid of parent Engine"
                ],
                owner: [
                  spec: pid(),
                  description: """
                  Pid of parent all notifications will be send to.
                  These notifications are:
                  * `{:playlist_playable, content_type}`
                  * `{:cleanup, clean_function}`
                  """
                ],
                output_directory: [
                  spec: Path.t(),
                  description: "Path to directory under which HLS output will be saved",
                  default: "hls_output"
                ],
                synchronize_tracks?: [
                  spec: boolean(),
                  default: true,
                  description: """
                  Set to false if source is different than webrtc.
                  If set to true HLS Endpoint will calculate track offset based on `handle_pad_added` call.
                  """
                ],
                mixer_config: [
                  spec: MixerConfig.t() | nil,
                  default: nil,
                  description: """
                  Audio and video mixer configuration. If you don't want to use compositor pass nil.
                  """
                ],
                hls_config: [
                  spec: HLSConfig.t(),
                  default: %HLSConfig{},
                  description: """
                  HLS stream and playlist configuration.
                  """
                ]

    @impl true
    def handle_init(_context, options) do
      state =
        options
        |> Map.from_struct()
        |> Map.merge(%{
          tracks: %{},
          stream_beginning: nil,
          video_layout_state: nil,
          video_layout_tracks_added: %{}
        })

      video_layout_state =
        if is_nil(options.mixer_config),
          do: nil,
          else:
            state.mixer_config.video.layout_module.init(options.mixer_config.video.stream_format)

      {[], %{state | video_layout_state: video_layout_state}}
    end

    @impl true
    def handle_playing(context, %{mixer_config: %{persist?: true}} = state) do
      spec =
        generate_audio_mixer(state, context) ++
          generate_compositor(state, context) ++
          get_hls_sink_spec(state)

      {[spec: spec], state}
    end

    @impl true
    def handle_playing(_context, state), do: {[], state}

    @impl true
    def handle_pad_removed(Pad.ref(:input, track_id), ctx, state) do
      track_children =
        @track_children
        |> Enum.map(&{&1, track_id})
        |> Enum.filter(&Map.has_key?(ctx.children, &1))

      {removed_track, tracks} = Map.pop!(state.tracks, track_id)
      state = %{state | tracks: tracks}

      sink_bin_used? =
        Enum.any?(tracks, fn {_id, track} ->
          track.stream_id == removed_track.stream_id
        end)

      {children_to_remove, state} =
        case {state.mixer_config, sink_bin_used?} do
          {nil, false} ->
            {[{:hls_sink_bin, removed_track.stream_id}], state}

          {%{persist?: false}, _sink_bin_used?} when map_size(tracks) == 0 ->
            {@common_children, %{state | stream_beginning: nil}}

          _else ->
            {[], state}
        end

      children_to_remove = track_children ++ children_to_remove

      {update_layout_actions, state} = compositor_update_layout(:remove, removed_track, state)

      result_actions = [remove_child: children_to_remove] ++ update_layout_actions

      state = %{
        state
        | video_layout_tracks_added: Map.delete(state.video_layout_tracks_added, track_id)
      }

      {result_actions, state}
    end

    @impl true
    def handle_pad_added(Pad.ref(:input, track_id) = pad, ctx, state) do
      {offset, state} = get_track_offset(state)

      track = Map.get(state.tracks, track_id)
      track_spec = get_track_spec(offset, bin_input(pad), track, state, ctx)

      {spec, state} =
        if hls_sink_bin_exists?(track, ctx, state) do
          {track_spec, state}
        else
          hls_sink_spec = get_hls_sink_spec(state, track.stream_id)
          {track_spec ++ hls_sink_spec, state}
        end

      {[spec: spec], state}
    end

    @impl true
    def handle_child_notification(
          :end_of_stream,
          {:hls_sink_bin, stream},
          _ctx,
          state
        ) do
      {[notify_parent: {:forward_to_parent, {:end_of_stream, stream}}], state}
    end

    @impl true
    def handle_child_notification(
          {:update_layout, stream_format},
          {:stream_format_updater, track_id} = child,
          _ctx,
          state
        ) do
      track = Map.get(state.tracks, track_id)

      action = if is_map_key(state.video_layout_tracks_added, track_id), do: :update, else: :add

      {update_layout_action, state} =
        compositor_update_layout(action, track, state, stream_format)

      result_actions = update_layout_action ++ [notify_child: {child, :layout_updated}]
      state = put_in(state, [:video_layout_tracks_added, track_id], true)

      {result_actions, state}
    end

    def handle_child_notification(
          {:track_playable, data},
          {:hls_sink_bin, stream_id},
          _ctx,
          state
        ) do
      content_type =
        case data do
          {content_type, _track_id} -> content_type
          content_type -> content_type
        end

      output_dir = get_hls_stream_directory(state, stream_id)
      send(state.owner, {:playlist_playable, content_type, output_dir})
      {[], state}
    end

    @impl true
    def handle_child_notification(notification, _element, _context, state) do
      Membrane.Logger.warn("Unexpected notification: #{inspect(notification)}. Ignoring.")
      {[], state}
    end

    @impl true
    def handle_parent_notification({:new_tracks, tracks}, ctx, state) do
      {:endpoint, endpoint_id} = ctx.name

      state =
        Enum.reduce(tracks, state, fn track, state ->
          case Engine.subscribe(state.rtc_engine, endpoint_id, track.id) do
            :ok ->
              put_in(state, [:tracks, track.id], track)

            {:error, :invalid_track_id} ->
              Membrane.Logger.debug("""
              Couldn't subscribe to the track: #{inspect(track.id)}. No such track.
              It had to be removed just after publishing it. Ignoring.
              """)

              state

            {:error, reason} ->
              raise "Couldn't subscribe to the track: #{inspect(track.id)}. Reason: #{inspect(reason)}"
          end
        end)

      {[], state}
    end

    @impl true
    def handle_parent_notification(msg, _ctx, state) do
      Membrane.Logger.warn("Unexpected message: #{inspect(msg)}. Ignoring.")
      {[], state}
    end

    defp get_hls_sink_spec(state, stream_id \\ nil) do
      directory = get_hls_stream_directory(state, stream_id)

      File.rm_rf(directory)
      File.mkdir_p!(directory)

      config =
        state.hls_config
        |> Map.update!(:storage, fn storage -> storage.(directory) end)
        |> Map.put(:mp4_parameters_in_band?, is_nil(state.mixer_config))

      hls_sink = struct(Membrane.HTTPAdaptiveStream.SinkBin, Map.from_struct(config))

      child_name =
        if is_nil(state.mixer_config),
          do: {:hls_sink_bin, stream_id},
          else: {:hls_sink_bin, :muxed}

      [child(child_name, hls_sink)]
    end

    defp get_track_spec(
           offset,
           link_builder,
           track,
           state,
           ctx
         ) do
      get_depayloading_track_spec(link_builder, track) ++
        attach_track_spec(offset, track, state) ++
        generate_blank_spec(state, ctx) ++
        generate_silence_spec(state, ctx) ++
        generate_audio_mixer(state, ctx) ++
        generate_compositor(state, ctx)
    end

    defp get_depayloading_track_spec(link_builder, track),
      do: [
        link_builder
        |> child({:track_receiver, track.id}, %TrackReceiver{
          track: track,
          initial_target_variant: :high
        })
        |> child({:depayloader, track.id}, get_depayloader(track))
      ]

    defp attach_track_spec(offset, %{type: :audio} = track, state),
      do: attach_audio_track_spec(offset, track, state)

    defp attach_track_spec(offset, %{type: :video} = track, state),
      do: attach_video_track_spec(offset, track, state)

    defp attach_audio_track_spec(_offset, track, %{mixer_config: nil} = state),
      do: [
        get_child({:depayloader, track.id})
        |> child({:opus_decoder, track.id}, Membrane.Opus.Decoder)
        |> child({:aac_encoder, track.id}, Membrane.AAC.FDK.Encoder)
        |> child({:aac_parser, track.id}, %Membrane.AAC.Parser{out_encapsulation: :none})
        |> via_in(Pad.ref(:input, {:audio, track.id}),
          options: [
            encoding: :AAC,
            segment_duration: state.hls_config.segment_duration,
            partial_segment_duration: state.hls_config.partial_segment_duration
          ]
        )
        |> get_child({:hls_sink_bin, track.stream_id})
      ]

    if Enum.all?(@audio_mixer_deps, &Code.ensure_loaded?/1) do
      defp attach_audio_track_spec(offset, track, _state),
        do: [
          get_child({:depayloader, track.id})
          |> child({:opus_decoder, track.id}, Membrane.Opus.Decoder)
          |> child({:audio_filler, track.id}, Membrane.AudioFiller)
          |> via_in(Pad.ref(:input, {:extra, track.id}), options: [offset: offset])
          |> get_child(:audio_mixer)
        ]
    else
      defp attach_audio_track_spec(_offset, _track, _state),
        do: raise_missing_deps(:audio, @audio_mixer_deps)
    end

    defp attach_video_track_spec(_offset, track, %{mixer_config: nil} = state),
      do: [
        get_child({:depayloader, track.id})
        |> child({:video_parser, track.id}, %Membrane.H264.FFmpeg.Parser{
          alignment: :au,
          attach_nalus?: true
        })
        |> via_in(Pad.ref(:input, {:video, track.id}),
          options: [
            encoding: :H264,
            segment_duration: state.hls_config.segment_duration,
            partial_segment_duration: state.hls_config.partial_segment_duration
          ]
        )
        |> get_child({:hls_sink_bin, track.stream_id})
      ]

    if Enum.all?(@compositor_deps, &Code.ensure_loaded?/1) do
      defp attach_video_track_spec(offset, track, _state),
        do: [
          get_child({:depayloader, track.id})
          |> child({:video_parser, track.id}, %Membrane.H264.FFmpeg.Parser{
            attach_nalus?: true,
            alignment: :au
          })
          |> child({:stream_format_updater, track.id}, StreamFormatUpdater)
          |> child({:decoder, track.id}, Membrane.H264.FFmpeg.Decoder)
          |> via_in(Pad.ref(:input, track.id),
            options: [initial_placement: @initial_placement, timestamp_offset: offset]
          )
          |> get_child(:compositor)
        ]
    else
      defp attach_video_track_spec(_offset, track, _state),
        do: raise_missing_deps(:video, @compositor_deps)
    end

    defp generate_silence_spec(%{mixer_config: nil}, _ctx), do: []

    if Enum.all?(@audio_mixer_deps, &Code.ensure_loaded?/1) do
      defp generate_silence_spec(_state, ctx) when is_map_key(ctx.children, :silence_generator),
        do: []

      defp generate_silence_spec(state, _ctx) do
        silence_generator =
          if is_nil(state.mixer_config.audio.background),
            do: get_silence_generator(state.mixer_config.audio.stream_format),
            else: state.mixer_config.audio.background

        [
          child(:silence_generator, silence_generator)
          |> child(:audio_realtimer, Membrane.Realtimer)
          |> get_child(:audio_mixer)
        ]
      end

      defp get_silence_generator(stream_format),
        do: %Membrane.SilenceGenerator{
          stream_format: stream_format,
          duration: :infinity,
          # Value 960 is consistent with what we get from browser also with addition of default stream_format
          # it generates total values so we don't lose data becouse of rounding error
          frames_per_buffer: 960
        }
    else
      defp generate_silence_spec(_state, _ctx), do: raise_missing_deps(:audio, @audio_mixer_deps)
    end

    defp generate_blank_spec(%{mixer_config: nil}, _ctx), do: []

    if Enum.all?(@compositor_deps, &Code.ensure_loaded?/1) do
      defp generate_blank_spec(_state, ctx) when is_map_key(ctx.children, :fake_source), do: []

      defp generate_blank_spec(state, _ctx) do
        background_generator =
          if is_nil(state.mixer_config.video.background),
            do: get_blank_generator(state.mixer_config.video),
            else: state.mixer_config.video.background

        [
          child(:fake_source, background_generator)
          |> child(:video_realtimer, Membrane.Realtimer)
          |> via_in(:input, options: [initial_placement: @initial_placement])
          |> get_child(:compositor)
        ]
      end

      defp get_blank_generator(%{stream_format: stream_format}),
        do: %Membrane.BlankVideoGenerator{
          stream_format: stream_format,
          duration: :infinity
        }
    else
      defp generate_blank_spec(_state, _ctx), do: raise_missing_deps(:video, @compositor_deps)
    end

    defp generate_compositor(_state, ctx) when is_map_key(ctx.children, :compositor), do: []

    if Enum.all?(@compositor_deps, &Code.ensure_loaded?/1) do
      defp generate_compositor(%{mixer_config: nil}, _ctx), do: []

      defp generate_compositor(state, _ctx) do
        compositor = %Membrane.VideoCompositor{
          stream_format: state.mixer_config.video.stream_format
        }

        video_parser_out = %Membrane.H264.FFmpeg.Parser{
          alignment: :au,
          attach_nalus?: true
        }

        {frames_per_second, 1} = state.mixer_config.video.stream_format.framerate
        seconds_number = Membrane.Time.as_seconds(state.hls_config.segment_duration.target)

        [
          child(:compositor, compositor)
          |> child(:encoder, %Membrane.H264.FFmpeg.Encoder{
            profile: :baseline,
            gop_size: frames_per_second * seconds_number
          })
          |> child(:video_parser_out, video_parser_out)
          |> via_in(Pad.ref(:input, :video),
            options: [
              encoding: :H264,
              segment_duration: state.hls_config.segment_duration,
              partial_segment_duration: state.hls_config.partial_segment_duration
            ]
          )
          |> get_child({:hls_sink_bin, :muxed})
        ]
      end
    else
      defp generate_compositor(_state, _ctx), do: raise_missing_deps(:video, @compositor_deps)
    end

    defp generate_audio_mixer(%{mixer_config: nil}, _ctx), do: []

    if Enum.all?(@audio_mixer_deps, &Code.ensure_loaded?/1) do
      defp generate_audio_mixer(_state, ctx) when is_map_key(ctx.children, :audio_mixer), do: []

      defp generate_audio_mixer(state, _ctx) do
        audio_mixer = %Membrane.AudioMixer{
          stream_format: state.mixer_config.audio.stream_format,
          synchronize_buffers?: true
        }

        [
          child(:audio_mixer, audio_mixer)
          |> child(:aac_encoder, Membrane.AAC.FDK.Encoder)
          |> child(:aac_parser, %Membrane.AAC.Parser{out_encapsulation: :none})
          |> via_in(Pad.ref(:input, :audio),
            options: [
              encoding: :AAC,
              segment_duration: state.hls_config.segment_duration,
              partial_segment_duration: state.hls_config.partial_segment_duration
            ]
          )
          |> get_child({:hls_sink_bin, :muxed})
        ]
      end
    else
      defp generate_audio_mixer(_state, _ctx), do: raise_missing_deps(:audio, @audio_mixer_deps)
    end

    defp get_track_offset(%{synchronize_tracks?: false} = state), do: {0, state}

    defp get_track_offset(%{stream_beginning: nil} = state),
      do: {0, %{state | stream_beginning: System.monotonic_time()}}

    defp get_track_offset(state), do: {System.monotonic_time() - state.stream_beginning, state}

    defp hls_sink_bin_exists?(track, ctx, %{mixer_config: nil}),
      do: Map.has_key?(ctx.children, {:hls_sink_bin, track.stream_id})

    defp hls_sink_bin_exists?(_track, ctx, _state),
      do: Map.has_key?(ctx.children, {:hls_sink_bin, :muxed})

    defp get_depayloader(track) do
      track
      |> Track.get_depayloader()
      |> tap(&unless &1, do: raise("Couldn't find depayloader for track #{inspect(track)}"))
    end

    defp get_hls_stream_directory(%{mixer_config: nil} = state, stream_id),
      do: Path.join(state.output_directory, stream_id)

    defp get_hls_stream_directory(state, _stream_id), do: state.output_directory

    defp compositor_update_layout(_action, _track, _state, _stream_format \\ nil)

    defp compositor_update_layout(_action, %{type: :audio}, state, _stream_format),
      do: {[], state}

    defp compositor_update_layout(_action, _track, %{mixer_config: nil} = state, _stream_format),
      do: {[], state}

    defp compositor_update_layout(
           action,
           track,
           %{video_layout_state: video_layout_state} = state,
           stream_format
         ) do
      {updated_layout, video_layout_state} =
        case action do
          :add ->
            state.mixer_config.video.layout_module.track_added(
              track,
              stream_format,
              video_layout_state
            )

          :update ->
            state.mixer_config.video.layout_module.track_updated(
              track,
              stream_format,
              video_layout_state
            )

          :remove ->
            state.mixer_config.video.layout_module.track_removed(track, video_layout_state)
        end

      {layouts, transformations} =
        updated_layout
        |> Enum.map(fn {track_id, layout, transformations} ->
          {{Pad.ref(:input, track_id), layout}, {Pad.ref(:input, track_id), transformations}}
        end)
        |> Enum.unzip()

      update_layout_actions = [
        notify_child: {:compositor, {:update_transformations, transformations}},
        notify_child: {:compositor, {:update_placement, layouts}}
      ]

      {update_layout_actions, %{state | video_layout_state: video_layout_state}}
    end

    unless Enum.all?(@compositor_deps ++ @audio_mixer_deps, &Code.ensure_loaded?/1) do
      defp merge_strings(strings), do: Enum.join(strings, ", ")

      defp raise_missing_deps(type, deps) do
        raise """
        Cannot find some of the modules required to use the #{type} mixer.
        Ensure that the following dependencies are added to the deps.
        #{merge_strings(deps)}
        """
      end
    end
  end
end