lib/membrane_file/sink_multi.ex

defmodule Membrane.File.Sink.Multi do
  @moduledoc """
  Element that writes buffers to a set of files. File is switched on event.

  Files are named according to naming_fun passed in options.
  This function receives sequential number of file and should return string.
  It defaults to file000, file001, ...

  The event type, which starts writing to a next file,
  is passed as atom in `split_on` option.
  It defaults to `:split`.
  """
  use Membrane.Sink
  alias Membrane.Buffer
  alias Membrane.File.CommonFile

  import Mockery.Macro

  def_options location: [
                type: :string,
                description: "Base path to the file, will be passed to the naming function"
              ],
              extension: [
                type: :string,
                default: "",
                description: """
                Extension of the file, should be preceeded with dot (.). It is
                passed to the naming function.
                """
              ],
              naming_fun: [
                type: :function,
                spec: (String.t(), non_neg_integer, String.t() -> String.t()),
                default: &__MODULE__.default_naming_fun/3,
                description: """
                Function accepting base path, sequential number and file extension,
                and returning file path as a string. Default one generates
                path/to/file0.ext, path/to/file1.ext, ...
                """
              ],
              split_event: [
                type: :module,
                default: Membrane.File.SplitEvent,
                description: "Event causing switching to a new file"
              ]

  def default_naming_fun(path, i, ext), do: "#{path}#{i}#{ext}"

  def_input_pad :input, demand_unit: :buffers, caps: :any

  # Private API

  @impl true
  def handle_init(%__MODULE__{} = options) do
    {:ok,
     %{
       naming_fun: &options.naming_fun.(options.location, &1, options.extension),
       split_on: options.split_event,
       fd: nil,
       index: 0
     }}
  end

  @impl true
  def handle_stopped_to_prepared(_ctx, state) do
    mockable(CommonFile).open(state.naming_fun.(state.index), :write, state)
  end

  @impl true
  def handle_prepared_to_playing(_ctx, state) do
    {{:ok, demand: :input}, state}
  end

  @impl true
  def handle_event(:input, %split_on{}, _ctx, %{split_on: split_on} = state) do
    with {:ok, state} <- state |> mockable(CommonFile).close do
      state = state |> Map.update!(:index, &(&1 + 1))
      mockable(CommonFile).open(state.naming_fun.(state.index), :write, state)
    end
  end

  def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

  @impl true
  def handle_write(:input, %Buffer{payload: payload}, _ctx, %{fd: fd} = state) do
    bin_payload = Membrane.Payload.to_binary(payload)

    with :ok <- mockable(CommonFile).binwrite(fd, bin_payload) do
      {{:ok, demand: :input}, state}
    else
      {:error, reason} -> {{:error, {:write, reason}}, state}
    end
  end

  @impl true
  def handle_prepared_to_stopped(_ctx, state) do
    state = state |> Map.update!(:index, &(&1 + 1))
    state |> mockable(CommonFile).close
  end
end