lib/membrane_http_adaptive_stream/storage.ex

defmodule Membrane.HTTPAdaptiveStream.Storage do
  @moduledoc """
  Behaviour for storing manifests and stream segments.
  """
  use Bunch
  use Bunch.Access

  @type config_t :: struct
  @type state_t :: any
  @type callback_result_t :: :ok | {:error, reason :: any}

  @doc """
  Generates the storage state based on the configuration struct.
  """
  @callback init(config_t) :: state_t

  @doc """
  Stores the resource on a storage.

  Gets the mode that should be used when writing to a file and type of the resource
  """
  @callback store(
              resource_name :: String.t(),
              content :: String.t() | binary,
              context :: %{type: :manifest | :header | :segment, mode: :text | :binary},
              state_t
            ) :: callback_result_t

  @doc """
  Removes the resource.
  """
  @callback remove(
              resource_name :: String.t(),
              context :: %{type: :manifest | :header | :segment},
              state_t
            ) :: callback_result_t

  @enforce_keys [:storage_impl, :impl_state, :cache_enabled?]
  defstruct @enforce_keys ++ [cache: %{}, stored_manifests: MapSet.new()]

  @opaque t :: %__MODULE__{
            storage_impl: module,
            impl_state: any,
            cache_enabled?: bool,
            cache: map,
            stored_manifests: MapSet.t()
          }

  @doc """
  Initializes the storage.

  Accepts the following options:
  - `enable_cache` - if true (default), manifests will be stored only if they've been changed
  """
  @spec new(config_t, [{:enable_cache, boolean}]) :: t
  def new(%storage_impl{} = storage_config, opts \\ []) do
    %__MODULE__{
      storage_impl: storage_impl,
      impl_state: storage_impl.init(storage_config),
      cache_enabled?: Keyword.get(opts, :enable_cache?, true)
    }
  end

  @doc """
  Stores serialized manifest files
  """
  @spec store_manifests(t, [{name :: String.t(), content :: String.t()}]) ::
          {callback_result_t, t}
  def store_manifests(storage, manifests) do
    Bunch.Enum.try_reduce(manifests, storage, &store_manifest/2)
  end

  defp store_manifest({name, manifest}, storage) do
    %__MODULE__{
      storage_impl: storage_impl,
      impl_state: impl_state,
      cache: cache,
      cache_enabled?: cache_enabled?,
      stored_manifests: stored_manifests
    } = storage

    withl cache: false <- cache[name] == manifest,
          store:
            :ok <-
              storage_impl.store(name, manifest, %{mode: :text, type: :manifest}, impl_state),
          do: storage = %{storage | stored_manifests: MapSet.put(stored_manifests, name)},
          update_cache?: true <- cache_enabled? do
      storage = put_in(storage, [:cache, name], manifest)
      {:ok, storage}
    else
      cache: true -> {:ok, storage}
      store: {:error, reason} -> {{:error, reason}, storage}
      update_cache?: false -> {:ok, storage}
    end
  end

  @doc """
  Stores stream header file.
  """
  @spec store_header(t, name :: String.t(), payload :: binary) :: {callback_result_t, t}
  def store_header(storage, name, payload) do
    %__MODULE__{storage_impl: storage_impl, impl_state: impl_state} = storage

    result = storage_impl.store(name, payload, %{mode: :binary, type: :header}, impl_state)
    {result, storage}
  end

  @doc """
  Stores a new segment and removes stale ones.
  """
  @spec apply_segment_changeset(
          t,
          {to_add :: String.t(), to_remove :: [String.t()]},
          payload :: binary
        ) :: {callback_result_t, t}
  def apply_segment_changeset(storage, {to_add, to_remove}, payload) do
    %__MODULE__{storage_impl: storage_impl, impl_state: impl_state} = storage

    with :ok <-
           Bunch.Enum.try_each(
             to_remove[:segment_names],
             &storage_impl.remove(&1, %{type: :segment}, impl_state)
           ),
         :ok <-
           Bunch.Enum.try_each(
             to_remove[:header_names],
             &storage_impl.remove(&1, %{type: :header}, impl_state)
           ) do
      storage_impl.store(to_add, payload, %{mode: :binary, type: :segment}, impl_state)
    end
    |> then(&{&1, storage})
  end

  @doc """
  Removes all the saved segments and manifests.
  """
  @spec cleanup(t, segments :: [String.t()]) :: {callback_result_t, t}
  def cleanup(storage, segments) do
    %__MODULE__{storage_impl: storage_impl, impl_state: impl_state, stored_manifests: manifests} =
      storage

    with :ok <-
           Bunch.Enum.try_each(
             manifests,
             &storage_impl.remove(&1, %{type: :manifest}, impl_state)
           ),
         :ok <-
           Bunch.Enum.try_each(segments, &storage_impl.remove(&1, %{type: :segment}, impl_state)) do
      {:ok, %__MODULE__{storage | cache: %{}, stored_manifests: MapSet.new()}}
    else
      error -> {error, storage}
    end
  end

  @doc """
  Clears the manifest cache.
  """
  @spec clear_cache(t) :: t
  def clear_cache(storage) do
    %__MODULE__{storage | cache: %{}}
  end
end