lib/membrane_http_adaptive_stream/sink.ex

defmodule Membrane.HTTPAdaptiveStream.Sink do
  @moduledoc """
  Sink for generating HTTP streaming manifests.

  Uses `Membrane.HTTPAdaptiveStream.Manifest` for manifest serialization
  and `Membrane.HTTPAdaptiveStream.Storage` for saving files.

  ## Notifications

  - `{:track_playable, input_pad_id}` - sent when the first segment of a track is
    stored, and thus the track is ready to be played

  ## Examples

  The following configuration:

  %#{inspect(__MODULE__)}{
        manifest_config: %ManifestConfig{name: "manifest", module: Membrane.HTTPAdaptiveStream.HLS}
        storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{directory: "output"}
      }

  will generate a HLS manifest in the `output` directory, playable from
  `output/manifest.m3u8` file.
  """

  use Membrane.Sink

  require Membrane.HTTPAdaptiveStream.Manifest.SegmentAttribute

  alias Membrane.CMAF
  alias Membrane.HTTPAdaptiveStream.Manifest
  alias Membrane.HTTPAdaptiveStream.Storage

  defmodule TrackConfig do
    @moduledoc """
    Track configuration. For more information checkout `Membrane.HTTPAdaptiveStream.Manifest.Track.Config`
    """
    @type t :: %__MODULE__{
            target_window_duration: Membrane.Time.t() | :infinity,
            mode: :live | :vod,
            header_naming_fun: (Manifest.Track.t(), counter :: non_neg_integer -> String.t()),
            segment_naming_fun: (Manifest.Track.t() -> String.t()),
            partial_naming_fun: (String.t(), Keyword.t() -> String.t()),
            persist?: boolean()
          }

    defstruct target_window_duration: Membrane.Time.seconds(40),
              mode: :vod,
              header_naming_fun: &Manifest.Track.default_header_naming_fun/2,
              segment_naming_fun: &Manifest.Track.default_segment_naming_fun/1,
              partial_naming_fun: &Manifest.Track.default_partial_naming_fun/2,
              persist?: false
  end

  defmodule ManifestConfig do
    @moduledoc """
    `Membrane.HTTPAdaptiveStream.Manifest` configuration.
    """

    @typedoc """
    Manifest configuration consists of the following fields:
    - `name` - name of the main manifest file.
    - `module` - implementation of the `Membrane.HTTPAdaptiveStream.Manifest` behaviour.
    """
    @type t() :: %__MODULE__{
            name: String.t(),
            module: module()
          }

    @enforce_keys [:module]
    defstruct @enforce_keys ++ [name: "index"]
  end

  def_input_pad :input,
    availability: :on_request,
    flow_control: :manual,
    demand_unit: :buffers,
    accepted_format: CMAF.Track,
    options: [
      track_name: [
        spec: String.t() | nil,
        default: nil,
        description: """
        Name that will be used to name the media playlist for the given track, as well as its header and segments files.
        It must not contain any URI reserved characters.
        """
      ],
      segment_duration: [
        spec: Membrane.Time.t(),
        description: """
        The minimal duration of media segments produced by this particular track.

        In case of regular paced streams the parameter may not have any impact, but when
        partial segments gets used it may decide when regular segments gets finalized and new gets started.
        """
      ],
      partial_segment_duration: [
        spec: Membrane.Time.t() | nil,
        default: nil,
        description: """
        The target duration of partial segments.

        When set to nil then the track is not supposed to emit partial segments.
        """
      ],
      max_framerate: [
        spec: float() | nil,
        default: nil,
        description: """
        The maximal framerate of video variant. This information is used in master playlist.

        When set to nil then this information won't be added to master playlist. For audio it should be set to nil.
        """
      ]
    ]

  def_options manifest_config: [
                spec: ManifestConfig.t(),
                description: """
                """
              ],
              track_config: [
                spec: TrackConfig.t(),
                description: """
                """
              ],
              storage: [
                spec: Storage.config_t(),
                description: """
                Storage configuration. May be one of `Membrane.HTTPAdaptiveStream.Storages.*`.
                See `Membrane.HTTPAdaptiveStream.Storage` behaviour.
                """
              ],
              cleanup_after: [
                spec: nil | Membrane.Time.t(),
                default: nil,
                description: """
                If not `nil`, time after a storage cleanup function should run.

                The function will remove all manifests and segments stored during the stream.
                """
              ]

  @impl true
  def handle_init(_ctx, options) do
    state =
      options
      |> Map.from_struct()
      |> Map.merge(%{
        storage: Storage.new(options.storage),
        manifest: %Manifest{
          name: options.manifest_config.name,
          module: options.manifest_config.module
        },
        playlist_playable_sent: MapSet.new()
      })

    {[], state}
  end

  @impl true
  def handle_stream_format(
        Pad.ref(:input, track_id) = pad_ref,
        %CMAF.Track{} = stream_format,
        ctx,
        state
      ) do
    {header_name, manifest} =
      if Manifest.has_track?(state.manifest, track_id) do
        # Arrival of new stream format for an already existing track indicate that stream parameters have changed.
        # According to section 4.3.2.3 of RFC 8216, discontinuity needs to be signaled and new header supplied.
        Manifest.discontinue_track(state.manifest, track_id)
      else
        track_options = ctx.pads[pad_ref].options
        track_name = serialize_track_name(track_options[:track_name] || track_id)

        track_config_params =
          state.track_config
          |> Map.from_struct()
          |> Map.merge(%{
            id: track_id,
            track_name: track_name,
            content_type: stream_format.content_type,
            header_extension: ".mp4",
            segment_extension: ".m4s",
            segment_duration: track_options.segment_duration,
            partial_segment_duration: track_options.partial_segment_duration,
            encoding: stream_format.codecs,
            resolution: stream_format.resolution,
            max_framerate: track_options.max_framerate
          })

        track_config = struct!(Manifest.Track.Config, track_config_params)

        Manifest.add_track(
          state.manifest,
          track_config
        )
      end

    case Storage.store_header(state.storage, track_id, header_name, stream_format.header) do
      {:ok, storage} ->
        {[], %{state | storage: storage, manifest: manifest}}

      {{:error, reason}, _storage} ->
        raise "Failed to store the header for track #{inspect(track_id)} due to #{inspect(reason)}"
    end
  end

  @impl true
  def handle_playing(ctx, state) do
    demands = ctx.pads |> Map.keys() |> Enum.map(&{:demand, &1})
    {demands, state}
  end

  @impl true
  def handle_pad_added(pad, %{playback: :playing}, state), do: {[demand: pad], state}

  @impl true
  def handle_pad_added(_pad, _ctx, state), do: {[], state}

  @impl true
  def handle_buffer(Pad.ref(:input, track_id) = pad, buffer, _ctx, %{storage: storage} = state) do
    {changeset, manifest} = Manifest.add_chunk(state.manifest, track_id, buffer)

    with {:ok, storage} <- Storage.apply_track_changeset(storage, track_id, changeset),
         {:ok, storage} <- serialize_and_store_manifest(manifest, storage) do
      {notify, state} = maybe_notify_playable(track_id, state)
      {notify ++ [demand: pad], %{state | manifest: manifest, storage: storage}}
    else
      {{:error, reason}, _storage} ->
        raise "Failed to store a buffer for track #{inspect(track_id)} due to #{inspect(reason)}"
    end
  end

  @impl true
  def handle_end_of_stream(
        Pad.ref(:input, track_id),
        _ctx,
        %{manifest: manifest, storage: storage} = state
      ) do
    {changeset, manifest} = Manifest.finish(manifest, track_id)

    with {:ok, storage} <- Storage.apply_track_changeset(storage, track_id, changeset),
         {:ok, storage} <- serialize_and_store_manifest(manifest, storage) do
      storage = Storage.clear_cache(storage)
      {[], %{state | storage: storage, manifest: manifest}}
    else
      {{:error, reason}, _storage} ->
        raise "Failed to store the finalized manifest for track #{inspect(track_id)} due to #{inspect(reason)}"
    end
  end

  @impl true
  def handle_terminate_request(ctx, state) do
    %{
      manifest: manifest,
      storage: storage
    } = state

    track_ids =
      ctx.pads
      |> Map.keys()
      |> Enum.map(fn
        Pad.ref(:input, track_id) -> track_id
      end)

    # prevent storing empty manifest, such situation can happen
    # when the sink goes from prepared -> playing -> prepared -> stopped
    # and in the meantime no media has flown through input pads
    any_track_persisted? =
      Enum.any?(track_ids, fn track_id ->
        Manifest.has_track?(manifest, track_id) and Manifest.persisted?(manifest, track_id)
      end)

    result =
      if any_track_persisted? do
        {result, storage} =
          manifest
          |> Manifest.from_beginning()
          |> serialize_and_store_manifest(storage)

        {result, %{state | storage: storage}}
      else
        {:ok, state}
      end

    case result do
      {:ok, state} ->
        :ok = maybe_schedule_cleanup_task(state)

        {[terminate: :normal], state}

      {{:error, reason}, _state} ->
        raise "Failed to persist the manifest due to #{inspect(reason)}"
    end
  end

  defp serialize_track_name(track_id) when is_binary(track_id) do
    valid_filename_regex = ~r/^[^\/:*?"<>|]+$/

    if String.match?(track_id, valid_filename_regex) do
      track_id
    else
      raise "The provided track identifier #{inspect(track_id)} is not a valid filename"
    end
  end

  defp serialize_track_name(track_id) do
    track_id |> :erlang.term_to_binary() |> Base.url_encode64(padding: false)
  end

  defp maybe_notify_playable(track_id, %{playlist_playable_sent: playlist_playable_sent} = state) do
    if MapSet.member?(playlist_playable_sent, track_id) do
      {[], state}
    else
      {[notify_parent: {:track_playable, track_id}],
       %{state | playlist_playable_sent: MapSet.put(playlist_playable_sent, track_id)}}
    end
  end

  defp serialize_and_store_manifest(manifest, storage) do
    serialized_manifest = Manifest.serialize(manifest)
    Storage.store_manifests(storage, serialized_manifest)
  end

  defp maybe_schedule_cleanup_task(%{cleanup_after: nil}), do: :ok

  defp maybe_schedule_cleanup_task(%{
         manifest: manifest,
         storage: storage,
         cleanup_after: cleanup_after
       }) do
    {:ok, _pid} =
      Task.start(fn ->
        segments_to_remove = Manifest.segments_per_track(manifest)
        headers_to_remove = Manifest.header_per_track(manifest)

        timeout = Membrane.Time.as_milliseconds(cleanup_after, :round)

        Process.sleep(timeout)

        # cleanup all data of the secondary playlist and the master one
        with {:ok, storage} <-
               Storage.clean_all_tracks(storage, segments_to_remove, headers_to_remove),
             {:ok, _storage} <- Storage.cleanup(storage, :master, [], nil) do
          :ok
        else
          {{:error, reason}, _storage} ->
            raise "Failed to cleanup the storage due to #{inspect(reason)}"
        end
      end)

    :ok
  end
end