lib/membrane_file/sink.ex

defmodule Membrane.File.Sink do
  @moduledoc """
  Element that creates a file and stores incoming buffers there (in binary format).

  When `Membrane.File.SeekEvent` is received, the element starts writing buffers starting
  from `position`. By default, it overwrites previously stored bytes. You can set `insert?`
  field of the event to `true` to start inserting new buffers without overwriting previous ones.
  Please note, that inserting requires rewriting the file, what negatively impacts performance.
  For more information refer to `Membrane.File.SeekEvent` moduledoc.
  """
  use Membrane.Sink

  alias Membrane.File.SeekEvent
  alias Membrane.ResourceGuard

  @common_file Membrane.File.CommonFileBehaviour.get_impl()

  def_options location: [
                spec: Path.t(),
                description: "Path of the output file"
              ]

  def_input_pad :input, demand_unit: :buffers, accepted_format: _any

  @impl true
  def handle_init(_ctx, %__MODULE__{location: location}) do
    {[],
     %{
       location: Path.expand(location),
       temp_location: Path.expand(location <> ".tmp"),
       fd: nil,
       temp_fd: nil
     }}
  end

  @impl true
  def handle_setup(ctx, %{location: location} = state) do
    fd = @common_file.open!(location, [:read, :write])
    :ok = @common_file.truncate!(fd)

    Membrane.ResourceGuard.register(
      ctx.resource_guard,
      fn -> @common_file.close!(fd) end,
      tag: {:fd, fd}
    )

    {[], %{state | fd: fd}}
  end

  @impl true
  def handle_playing(_ctx, state) do
    {[demand: :input], state}
  end

  @impl true
  def handle_write(:input, buffer, _ctx, %{fd: fd} = state) do
    :ok = @common_file.write!(fd, buffer)
    {[demand: :input], state}
  end

  @impl true
  def handle_event(:input, %SeekEvent{insert?: insert?, position: position}, ctx, state) do
    state =
      if insert?,
        do: split_file(state, ctx.resource_guard, position),
        else: seek_file(state, ctx.resource_guard, position)

    {[], state}
  end

  def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

  defp seek_file(%{fd: fd} = state, resource_guard, position) do
    state = maybe_merge_temporary(state, resource_guard)
    _position = @common_file.seek!(fd, position)
    state
  end

  defp split_file(%{fd: fd} = state, resource_guard, position) do
    state =
      state
      |> seek_file(resource_guard, position)
      |> open_temporary(resource_guard)

    :ok = @common_file.split!(fd, state.temp_fd)
    state
  end

  defp maybe_merge_temporary(%{temp_fd: nil} = state, _resource_guard), do: state

  defp maybe_merge_temporary(
         %{fd: fd, temp_fd: temp_fd, temp_location: temp_location} = state,
         resource_guard
       ) do
    # TODO: Consider improving performance for multi-insertion scenarios by using
    # multiple temporary files and merging them only once on `handle_prepared_to_stopped/2`.
    ResourceGuard.unregister(resource_guard, {:temp_fd, temp_fd})
    copy_and_remove_temporary(fd, temp_fd, temp_location)
    %{state | temp_fd: nil}
  end

  defp open_temporary(
         %{temp_fd: nil, fd: fd, temp_location: temp_location} = state,
         resource_guard
       ) do
    temp_fd = @common_file.open!(temp_location, [:read, :exclusive])

    ResourceGuard.register(
      resource_guard,
      fn -> copy_and_remove_temporary(fd, temp_fd, temp_location) end,
      tag: {:temp_fd, temp_fd}
    )

    %{state | temp_fd: temp_fd}
  end

  defp copy_and_remove_temporary(fd, temp_fd, temp_location) do
    _bytes_copied = @common_file.copy!(temp_fd, fd)
    :ok = @common_file.close!(temp_fd)
    :ok = @common_file.rm!(temp_location)
  end
end