lib/membrane_http_adaptive_stream/storage.ex

defmodule Membrane.HTTPAdaptiveStream.Storage do
  @moduledoc """
  Behaviour for storing manifests and stream segments.
  """
  use Bunch
  use Bunch.Access

  alias Membrane.HTTPAdaptiveStream.Manifest
  alias Membrane.HTTPAdaptiveStream.Manifest.Changeset

  @type config_t :: struct
  @type state_t :: any
  @type callback_result_t :: :ok | {:error, reason :: any}

  @type segment_metadata_t :: %{duration: Membrane.Time.t()}
  @type partial_segment_metadata :: %{
          duration: Membrane.Time.t(),
          independent: boolean(),
          byte_offset: non_neg_integer()
        }
  @type metadata_t :: segment_metadata_t() | partial_segment_metadata() | %{}

  @typedoc """
  The identifier of a parent that the resource belongs to (the track identifier).

  It can either be a master or secondary playlist (a track playlist).
  In case of master playlist the identifier will be `:master`  while for tracks it can be an arbitrary value.
  """
  @type parent_t :: any()

  @doc """
  Generates the storage state based on the configuration struct.
  """
  @callback init(config_t) :: state_t

  @doc """
  Stores the resource on a storage.

  Gets the mode that should be used when writing to a file and type of the resource
  """
  @callback store(
              parent_id :: parent_t(),
              resource_name :: String.t(),
              content :: String.t() | binary,
              metadata :: metadata_t(),
              context :: %{
                type: :manifest | :header | :segment | :partial_segment,
                mode: :text | :binary
              },
              state_t
            ) :: {callback_result_t, state :: state_t}

  @doc """
  Removes the resource.

  In case of removing a segment the storage should make sure to remove all
  previous partial segments with the same name. It is the user's responsibility to remember
  and distinguish between the partial segments.
  """
  @callback remove(
              parent_id :: parent_t(),
              resource_name :: String.t(),
              context :: %{type: :manifest | :header | :segment},
              state_t
            ) :: {callback_result_t, state :: state_t}

  @enforce_keys [:storage_impl, :impl_state, :cache_enabled?]
  defstruct @enforce_keys ++ [cache: %{}, stored_manifests: MapSet.new()]

  @opaque t :: %__MODULE__{
            storage_impl: module,
            impl_state: any,
            cache_enabled?: bool,
            cache: map,
            stored_manifests: MapSet.t()
          }

  @doc """
  Initializes the storage.

  Accepts the following options:
  - `enable_cache` - if true (default), manifests will be stored only if they've been changed
  """
  @spec new(config_t, [{:enable_cache, boolean}]) :: t
  def new(%storage_impl{} = storage_config, opts \\ []) do
    %__MODULE__{
      storage_impl: storage_impl,
      impl_state: storage_impl.init(storage_config),
      cache_enabled?: Keyword.get(opts, :enable_cache?, true)
    }
  end

  @doc """
  Stores serialized manifest files
  """
  @spec store_manifests(t, Manifest.serialized_manifests_t()) ::
          {callback_result_t, t}
  def store_manifests(storage, %{
        master_manifest: master_manifest,
        manifest_per_track: manifest_per_track
      }) do
    manifests = [{:master, master_manifest} | Map.to_list(manifest_per_track)]
    Bunch.Enum.try_reduce(manifests, storage, &store_manifest/2)
  end

  defp store_manifest({id, {name, manifest}}, storage) do
    %__MODULE__{
      storage_impl: storage_impl,
      impl_state: impl_state,
      cache: cache,
      cache_enabled?: cache_enabled?,
      stored_manifests: stored_manifests
    } = storage

    withl cache: false <- cache[name] == manifest,
          store:
            {:ok, impl_state} <-
              storage_impl.store(
                id,
                name,
                manifest,
                %{},
                %{mode: :text, type: :manifest},
                impl_state
              ),
          do:
            storage = %{
              storage
              | stored_manifests: MapSet.put(stored_manifests, {id, name}),
                impl_state: impl_state
            },
          update_cache?: true <- cache_enabled? do
      storage = put_in(storage, [:cache, name], manifest)
      {:ok, storage}
    else
      cache: true ->
        {:ok, storage}

      store: {{:error, reason}, impl_state} ->
        {{:error, reason}, %{storage | impl_state: impl_state}}

      update_cache?: false ->
        {:ok, storage}
    end
  end

  @doc """
  Stores stream header file.
  """
  @spec store_header(t, track_id :: term(), name :: String.t(), payload :: binary) ::
          {callback_result_t, t}
  def store_header(storage, track_id, name, payload) do
    %__MODULE__{storage_impl: storage_impl, impl_state: impl_state} = storage

    {result, impl_state} =
      storage_impl.store(
        track_id,
        name,
        payload,
        %{},
        %{mode: :binary, type: :header},
        impl_state
      )

    {result, %{storage | impl_state: impl_state}}
  end

  @doc """
  Stores a new segment and removes stale ones.
  """
  @spec apply_track_changeset(
          t,
          track_id :: term(),
          Changeset.t()
        ) :: {callback_result_t, t}
  def apply_track_changeset(storage, track_id, changeset) do
    %__MODULE__{storage_impl: storage_impl, impl_state: impl_state} = storage
    %Changeset{to_add: to_add, to_remove: to_remove} = changeset

    grouped =
      Enum.group_by(
        to_remove,
        fn {type, _value} -> type end,
        fn {_type, value} -> value end
      )

    segment_names = Map.get(grouped, :segment, [])
    header_names = Map.get(grouped, :header, [])

    with {:ok, impl_state} <-
           Bunch.Enum.try_reduce(
             segment_names,
             impl_state,
             &storage_impl.remove(track_id, &1, %{type: :segment}, &2)
           ),
         {:ok, impl_state} <-
           Bunch.Enum.try_reduce(
             header_names,
             impl_state,
             &storage_impl.remove(track_id, &1, %{type: :header}, &2)
           ) do
      {result, impl_state} =
        Bunch.Enum.try_reduce(to_add, impl_state, fn segment, impl_state ->
          %{
            type: type,
            name: name,
            partial_name: partial_name,
            duration: duration,
            sequence_number: sequence_number,
            independent?: independent?,
            byte_offset: byte_offset,
            payload: payload
          } = segment

          metadata = %{
            partial_name: partial_name,
            duration: duration,
            sequence_number: sequence_number,
            independent?: independent?,
            byte_offset: byte_offset
          }

          storage_impl.store(
            track_id,
            name,
            payload,
            metadata,
            %{mode: :binary, type: type},
            impl_state
          )
        end)

      {result, %{storage | impl_state: impl_state}}
    end
  end

  @doc """
  Removes all segments grouped by track.
  """
  @type segments :: [String.t()]
  @type header :: String.t()

  @spec clean_all_tracks(t(), %{(id :: any()) => segments()}, %{(id :: any()) => header()}) ::
          {callback_result_t, t}
  def clean_all_tracks(storage, segments_per_track, header_per_track) do
    segments_per_track
    |> Enum.map(fn {track_id, segments} ->
      {track_id, segments, Map.fetch!(header_per_track, track_id)}
    end)
    |> Bunch.Enum.try_reduce(storage, fn {track_id, segments, header}, storage ->
      cleanup(storage, track_id, segments, header)
    end)
  end

  @doc """
  Removes all the saved segments and manifest for given id.
  """
  @spec cleanup(t, id :: any(), segments :: segments(), header :: header() | nil) ::
          {callback_result_t, t}
  def cleanup(storage, id, segments, header) do
    %__MODULE__{storage_impl: storage_impl, impl_state: impl_state, stored_manifests: manifests} =
      storage

    manifest_to_delete = Enum.find(manifests, fn {manifest_id, _name} -> manifest_id == id end)

    {manifest_remove_result, impl_state} =
      case manifest_to_delete do
        nil ->
          :ok

        {manifest_id, manifest_name} ->
          storage_impl.remove(manifest_id, manifest_name, %{type: :manifest}, impl_state)
      end

    with :ok <- manifest_remove_result,
         {:ok, impl_state} <-
           Bunch.Enum.try_reduce(
             segments,
             impl_state,
             &storage_impl.remove(id, &1, %{type: :segment}, &2)
           ),
         {:ok, _impl_state} <- maybe_remove_header(id, header, impl_state, storage_impl) do
      {:ok,
       %__MODULE__{
         storage
         | cache: %{},
           stored_manifests: MapSet.delete(manifests, manifest_to_delete)
       }}
    else
      {error, impl_state} -> {error, %{storage | impl_state: impl_state}}
    end
  end

  @doc """
  Clears the manifest cache.
  """
  @spec clear_cache(t) :: t
  def clear_cache(storage) do
    %__MODULE__{storage | cache: %{}}
  end

  defp maybe_remove_header(_track_id, nil, impl_state, _storage_impl), do: {:ok, impl_state}

  defp maybe_remove_header(track_id, header, impl_state, storage_impl),
    do: storage_impl.remove(track_id, header, %{type: :header}, impl_state)
end