lib/membrane/filter_aggregator.ex

defmodule Membrane.FilterAggregator do
  @moduledoc """
  An element allowing to aggregate many filters within one Elixir process.

  Warning: This element is still in experimental phase

  This element supports only filters with one input and one output
  with following restrictions:
  * not using timers
  * not relying on received messages
  * not expecting any events coming from downstream elements
  * their pads have to be named `:input` and `:output`
  * their pads cannot use manual demands
  * the first filter must make demands in buffers
  """
  use Membrane.Filter

  alias Membrane.Core.FilterAggregator.Context

  require Membrane.Core.FilterAggregator.InternalAction, as: InternalAction
  require Membrane.Logger

  def_options filters: [
                spec: [{Membrane.Child.name(), module() | struct()}],
                description: "A list of filters applied to incoming stream"
              ]

  def_input_pad :input,
    accepted_format: _any,
    flow_control: :auto

  def_output_pad :output,
    flow_control: :auto,
    accepted_format: _any

  @impl true
  def handle_init(agg_ctx, %__MODULE__{filters: filter_specs}) do
    if filter_specs == [] do
      Membrane.Logger.warning("No filters provided, #{inspect(__MODULE__)} will be a no-op")
    end

    states =
      filter_specs
      |> Enum.map(fn
        {name, %module{} = options} ->
          {name, module, options}

        {name, module} ->
          unless is_atom(module) and Membrane.Element.element?(module) do
            raise ArgumentError, """
            The module "#{inspect(module)}" is not a Membrane Element.
            Make sure that given module is the right one, implements proper behaviour
            and all needed dependencies are properly specified in the Mixfile.
            """
          end

          options =
            module
            |> Code.ensure_loaded!()
            |> function_exported?(:__struct__, 1)
            |> case do
              true -> struct!(module, [])
              false -> %{}
            end

          {name, module, options}

        invalid_spec ->
          raise ArgumentError, "Invalid filter spec: `#{inspect(invalid_spec)}`"
      end)
      |> Enum.map(fn {name, module, options} ->
        context = Context.build_context!(name, module, agg_ctx)
        {[], state} = module.handle_init(context, options)
        {name, module, context, state}
      end)

    {[], %{states: states}}
  end

  @impl true
  def handle_setup(_ctx, %{states: states}) do
    {actions, states} = pipe_downstream([InternalAction.setup()], states)
    actions = reject_internal_actions(actions)
    {actions, %{states: states}}
  end

  @impl true
  def handle_playing(agg_ctx, %{states: states}) do
    contexts = states |> Enum.map(fn {_name, _module, context, _state} -> context end)
    prev_contexts = contexts |> List.insert_at(-1, agg_ctx)
    next_contexts = [agg_ctx | contexts]

    states =
      [prev_contexts, states, next_contexts]
      |> Enum.zip_with(fn [prev_context, {name, module, context, state}, next_context] ->
        context = Context.link_contexts(context, prev_context, next_context)
        {name, module, context, state}
      end)

    {actions, states} = pipe_downstream([InternalAction.playing()], states)
    actions = reject_internal_actions(actions)
    {actions, %{states: states}}
  end

  @impl true
  def handle_start_of_stream(:input, _ctx, %{states: states}) do
    {actions, states} = pipe_downstream([InternalAction.start_of_stream(:output)], states)
    actions = reject_internal_actions(actions)

    {actions, %{states: states}}
  end

  @impl true
  def handle_end_of_stream(:input, _ctx, %{states: states}) do
    {actions, states} = pipe_downstream([end_of_stream: :output], states)
    actions = reject_internal_actions(actions)

    {actions, %{states: states}}
  end

  @impl true
  def handle_event(:input, event, _ctx, %{states: states}) do
    {actions, states} = pipe_downstream([event: {:output, event}], states)
    actions = reject_internal_actions(actions)

    {actions, %{states: states}}
  end

  @impl true
  def handle_stream_format(:input, stream_format, _ctx, %{states: states}) do
    {actions, states} = pipe_downstream([stream_format: {:output, stream_format}], states)
    actions = reject_internal_actions(actions)
    {actions, %{states: states}}
  end

  @impl true
  def handle_buffer(:input, buffer, _ctx, %{states: states}) do
    {actions, states} = pipe_downstream([buffer: {:output, buffer}], states)
    actions = reject_internal_actions(actions)

    {actions, %{states: states}}
  end

  # Takes actions received from the upstream elements (closer to source) and performs them on elements from first to last,
  # i.e. in the direction of data flow
  defp pipe_downstream(initial_actions, states) do
    {states, actions} =
      states
      |> Enum.map_reduce(initial_actions, fn {name, module, context, state}, actions ->
        {actions, next_context, next_state} = perform_actions(actions, module, context, state, [])

        {{name, module, next_context, next_state}, actions}
      end)

    {actions, states}
  end

  defp perform_actions([], _module, context, state, next_actions_acc) do
    {next_actions_acc |> Enum.reverse() |> List.flatten(), context, state}
  end

  defp perform_actions([action | actions], module, context, state, next_actions_acc) do
    context = Context.before_incoming_action(context, action)
    result = perform_action(action, module, context, state)
    context = Context.after_incoming_action(context, action)

    case result do
      # Perform split actions within the same element
      {[{:split, _action} | _tail] = next_actions, next_state} ->
        perform_actions(
          next_actions ++ actions,
          module,
          context,
          next_state,
          next_actions_acc
        )

      {next_actions, next_state} when is_list(next_actions) ->
        next_actions = transform_out_actions(next_actions)
        next_context = Context.after_out_actions(context, next_actions)

        perform_actions(actions, module, next_context, next_state, [
          next_actions | next_actions_acc
        ])

      {[], next_state} ->
        perform_actions(actions, module, context, next_state, next_actions_acc)

      term ->
        raise "Invalid return from callback: #{inspect(term)}"
    end
  end

  defp perform_action({:buffer, {:output, buffer}}, module, context, state) do
    module.handle_buffer(:input, buffer, context, state)
  end

  defp perform_action({:stream_format, {:output, stream_format}}, module, context, state) do
    cb_context =
      context
      |> Map.put(:old_stream_format, context.pads.input.stream_format)

    module.handle_stream_format(:input, stream_format, cb_context, state)
  end

  defp perform_action({:event, {:output, event}}, module, context, state) do
    module.handle_event(:input, event, context, state)
  end

  # Internal, FilterAggregator action used to trigger handle_start_of_stream
  defp perform_action(InternalAction.start_of_stream(:output), module, context, state) do
    {actions, new_state} = module.handle_start_of_stream(:input, context, state)

    {[InternalAction.start_of_stream(:output) | actions], new_state}
  end

  defp perform_action({:end_of_stream, :output}, module, context, state) do
    module.handle_end_of_stream(:input, context, state)
  end

  defp perform_action({:demand, {:input, _size}}, _module, _context, _state) do
    raise "Manual demands are not supported by #{inspect(__MODULE__)}"
  end

  defp perform_action({:redemand, :output}, _module, _context, _state) do
    raise "Demands are not supported by #{inspect(__MODULE__)}"
  end

  defp perform_action({:notify, message}, _module, _context, state) do
    # Pass the action downstream
    {[notify: message], state}
  end

  # Internal action used to manipulate context after performing an action
  defp perform_action(InternalAction.merge_context(_ctx_data), _module, _context, state) do
    {[], state}
  end

  defp perform_action({:split, {:handle_buffer, []}}, _module, _context, state) do
    {[], state}
  end

  defp perform_action({:split, {:handle_buffer, args_lists}}, module, context, state) do
    {actions, {context, state}} =
      args_lists
      |> Enum.flat_map_reduce({context, state}, fn [:input, buffer], {acc_context, acc_state} ->
        acc_context = Context.before_incoming_action(acc_context, {:buffer, {:output, buffer}})

        {actions, state} = module.handle_buffer(:input, buffer, context, acc_state)

        acc_context = Context.after_incoming_action(acc_context, {:buffer, {:output, buffer}})

        {actions, {acc_context, state}}
      end)

    {actions ++ [InternalAction.merge_context(context)], state}
  end

  defp perform_action(InternalAction.setup(), module, context, state) do
    {actions, state} = module.handle_setup(context, state)
    {actions ++ [InternalAction.setup()], state}
  end

  defp perform_action(InternalAction.playing(), module, context, state) do
    {actions, state} = module.handle_playing(context, state)
    {actions ++ [InternalAction.playing()], state}
  end

  defp perform_action({:latency, _latency}, _module, _context, _state) do
    raise "latency action not supported in #{inspect(__MODULE__)}"
  end

  defp reject_internal_actions(actions) do
    actions
    |> Enum.reject(&InternalAction.is_internal_action/1)
  end

  defp transform_out_actions(actions) do
    actions
    |> Enum.map(fn
      {:forward, data} -> resolve_forward_action(data)
      action -> action
    end)
  end

  defp resolve_forward_action(%Membrane.Buffer{} = buffer) do
    {:buffer, {:output, buffer}}
  end

  defp resolve_forward_action([%Membrane.Buffer{} | _tail] = buffers) do
    {:buffer, {:output, buffers}}
  end

  defp resolve_forward_action(:end_of_stream) do
    {:end_of_stream, :output}
  end

  defp resolve_forward_action(%_struct{} = data) do
    if Membrane.Event.event?(data),
      do: {:event, {:output, data}},
      else: {:stream_format, {:output, data}}
  end
end