defmodule Membrane.File.Sink do
@moduledoc """
Element that creates a file and stores incoming buffers there (in binary format).
Can also be used as a pipe to standard output by setting location to :stdout,
though this requires additional configuration.
When `Membrane.File.SeekSinkEvent` is received, the element starts writing buffers starting
from `position`. By default, it overwrites previously stored bytes. You can set `insert?`
field of the event to `true` to start inserting new buffers without overwriting previous ones.
Please note, that inserting requires rewriting the file, what negatively impacts performance.
For more information refer to `Membrane.File.SeekSinkEvent` moduledoc.
Pipeline logs are directed to standard output by default. To separate them from the sink's output
we recommend redirecting the logger to standard error. For simple use cases using the default logger
configuration (like stand-alone scripts) this can be achieved by simply calling redirect_logs_to_stderr/0.
See examples/file_to_pipe.exs for a working example.
"""
use Membrane.Sink
alias Membrane.File.SeekSinkEvent
@common_file Membrane.File.CommonFileBehaviour.get_impl()
def_options location: [
spec: Path.t() | :stdout,
description: "Path of the output file or :stdout"
]
def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any
@spec redirect_logs_to_stderr() :: :ok
def redirect_logs_to_stderr() do
:ok = :logger.remove_handler(:default)
LoggerBackends.add(LoggerBackends.Console)
LoggerBackends.configure(LoggerBackends.Console, device: :standard_error)
end
@impl true
def handle_init(_ctx, %__MODULE__{location: :stdout}) do
{[],
%{
location: :stdout
}}
end
@impl true
def handle_init(_ctx, %__MODULE__{location: location}) do
{[],
%{
location: Path.expand(location),
temp_location: Path.expand(location <> ".tmp"),
fd: nil,
temp_fd: nil
}}
end
@impl true
def handle_setup(_ctx, %{location: :stdout} = state) do
{[], state}
end
@impl true
def handle_setup(_ctx, %{location: location} = state) do
fd = @common_file.open!(location, [:read, :write])
:ok = @common_file.truncate!(fd)
{[], %{state | fd: fd}}
end
@impl true
def handle_playing(_ctx, state) do
{[demand: :input], state}
end
@impl true
def handle_buffer(:input, buffer, _ctx, %{location: :stdout} = state) do
:ok = @common_file.write!(:stdio, buffer)
{[demand: :input], state}
end
@impl true
def handle_buffer(:input, buffer, _ctx, %{fd: fd} = state) do
:ok = @common_file.write!(fd, buffer)
{[demand: :input], state}
end
@impl true
def handle_event(:input, %SeekSinkEvent{}, _ctx, %{location: :stdout} = _state) do
raise "Seek event not supported for :stdout sink"
end
@impl true
def handle_event(:input, %SeekSinkEvent{insert?: insert?, position: position}, _ctx, state) do
state =
if insert?,
do: split_file(state, position),
else: seek_file(state, position)
{[], state}
end
def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)
@impl true
def handle_end_of_stream(:input, _ctx, %{location: :stdout} = state) do
{[], state}
end
@impl true
def handle_end_of_stream(:input, _ctx, state) do
{[], do_merge_and_close(state)}
end
@impl true
def handle_terminate_request(_ctx, %{location: :stdout} = state) do
{[terminate: :normal], state}
end
@impl true
def handle_terminate_request(_ctx, state) do
{[terminate: :normal], do_merge_and_close(state)}
end
defp do_merge_and_close(%{fd: nil} = state), do: state
defp do_merge_and_close(state) do
state = maybe_merge_temporary(state)
@common_file.close!(state.fd)
%{state | fd: nil}
end
defp seek_file(%{fd: fd} = state, position) do
state = maybe_merge_temporary(state)
_position = @common_file.seek!(fd, position)
state
end
defp split_file(%{fd: fd} = state, position) do
state =
state
|> seek_file(position)
|> open_temporary()
:ok = @common_file.split!(fd, state.temp_fd)
state
end
defp maybe_merge_temporary(%{temp_fd: nil} = state), do: state
defp maybe_merge_temporary(%{fd: fd, temp_fd: temp_fd, temp_location: temp_location} = state) do
# TODO: Consider improving performance for multi-insertion scenarios by using
# multiple temporary files and merging them only once on `handle_terminate_request/2`.
copy_and_remove_temporary(fd, temp_fd, temp_location)
%{state | temp_fd: nil}
end
defp open_temporary(%{temp_fd: nil, temp_location: temp_location} = state) do
temp_fd = @common_file.open!(temp_location, [:read, :exclusive])
%{state | temp_fd: temp_fd}
end
defp copy_and_remove_temporary(fd, temp_fd, temp_location) do
_bytes_copied = @common_file.copy!(temp_fd, fd)
:ok = @common_file.close!(temp_fd)
:ok = @common_file.rm!(temp_location)
end
end