lib/membrane/testing/pipeline.ex

defmodule Membrane.Testing.Pipeline do
  @moduledoc """
  This Pipeline was created to reduce testing boilerplate and ease communication
  with its children. It also provides a utility for informing testing process about
  playback changes and received notifications.

  When you want a build Pipeline to test your children you need three things:
   - Pipeline Module
   - List of children
   - Links between those children

  To start a testing pipeline you need to build
  a keyword list representing the options used to determine the pipeline's behaviour and then
  pass that options list to the `Membrane.Testing.Pipeline.start_link_supervised!/1`.
  The testing pipeline can be started in one of two modes - either with its `:default` behaviour, or by
  injecting a custom module behaviour. The usage of a `:default` pipeline implementation is presented below:

  ```
  links = [
      child(:el1, MembraneElement1) |>
      child(:el2, MembraneElement2)
      ...
  ]
  options =  [
    module: :default # :default is the default value for this parameter, so you do not need to pass it here
    spec: links
  ]
  pipeline = Membrane.Testing.Pipeline.start_link_supervised!(options)
  ```

  You can also pass your custom pipeline's module as a `:module` option of
  the options list. Every callback of the module
  will be executed before the callbacks of Testing.Pipeline.
  Passed module has to return a proper spec. There should be no children
  nor links specified in options passed to test pipeline as that would
  result in a failure.

  ```
  options = [
    module: Your.Module
  ]
  pipeline = Membrane.Testing.Pipeline.start_link_supervised!(options)
  ```

  See `t:Membrane.Testing.Pipeline.options/0` for available options.

  ## Assertions

  This pipeline is designed to work with `Membrane.Testing.Assertions`. Check
  them out or see example below for more details.

  ## Messaging children

  You can send messages to children using their names specified in the children
  list. Please check `notify_child/3` for more details.

  ## Example usage

  Firstly, we can start the pipeline providing its options as a keyword list:
    import Membrane.ChildrenSpec
    children = [
        child(source, %Membrane.Testing.Source{} ) |>
        child(:tested_element, TestedElement) |>
        child(sink, %Membrane.Testing.Sink{})
    ]
    {:ok, pipeline} = Membrane.Testing.Pipeline.start_link(spec: links)

  We can now wait till the end of the stream reaches the sink element (don't forget
  to import `Membrane.Testing.Assertions`):

      assert_end_of_stream(pipeline, :sink)

  We can also assert that the `Membrane.Testing.Sink` processed a specific
  buffer:

      assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: 1})

  """

  use Membrane.Pipeline

  alias Membrane.Child
  alias Membrane.ChildrenSpec
  alias Membrane.Core.Message
  alias Membrane.{Element, Pipeline}
  alias Membrane.Testing.Notification

  require Membrane.Core.Message

  defmodule State do
    @moduledoc false

    @enforce_keys [:test_process, :module]
    defstruct @enforce_keys ++ [:custom_pipeline_state, :raise_on_child_pad_removed?]

    @typedoc """
    Structure for holding state

    ##  Test Process
    `pid` of process that shall receive messages from the pipeline

    ## Module
    Pipeline Module with custom callbacks.

    ## Custom Pipeline State
    State of the pipeline defined by Module.
    """
    @type t :: %__MODULE__{
            test_process: pid() | nil,
            module: module() | nil,
            custom_pipeline_state: Pipeline.state(),
            raise_on_child_pad_removed?: boolean() | nil
          }
  end

  @type options ::
          [
            module: :default,
            spec: [ChildrenSpec.builder()],
            test_process: pid(),
            name: Pipeline.name(),
            raise_on_child_pad_removed?: boolean()
          ]
          | [
              module: module(),
              custom_args: Pipeline.pipeline_options(),
              test_process: pid(),
              name: Pipeline.name()
            ]

  @spec child_spec(options) :: Supervisor.child_spec()
  def child_spec(options) do
    id = Keyword.get(options, :name, make_ref())
    super(options) |> Map.merge(%{restart: :transient, id: id})
  end

  @spec start_link(options) :: Pipeline.on_start()
  def start_link(options) do
    do_start(:start_link, options)
  end

  @spec start(options) :: Pipeline.on_start()
  def start(options) do
    do_start(:start, options)
  end

  defp do_start(type, options) do
    :ok = validate_options!(options)

    {process_options, options} = Keyword.split(options, [:name])
    options = Keyword.put_new(options, :test_process, self())
    apply(Pipeline, type, [__MODULE__, options, process_options])
  end

  @doc """
  Starts the pipeline under the ExUnit test supervisor and links it to the current process.

  Can be used only in tests.
  """
  @spec start_link_supervised(options) :: Pipeline.on_start()
  def start_link_supervised(pipeline_options \\ []) do
    pipeline_options = Keyword.put_new(pipeline_options, :test_process, self())

    # TODO use start_link_supervised when added
    with {:ok, supervisor, pipeline} <- ex_unit_start_supervised({__MODULE__, pipeline_options}) do
      Process.link(pipeline)
      {:ok, supervisor, pipeline}
    else
      {:error, {error, _child_info}} -> {:error, error}
    end
  end

  @spec start_link_supervised!(options) :: pipeline_pid :: pid
  def start_link_supervised!(pipeline_options \\ []) do
    {:ok, _supervisor, pipeline} = start_link_supervised(pipeline_options)
    pipeline
  end

  @doc """
  Starts the pipeline under the ExUnit test supervisor.

  Can be used only in tests.
  """
  @spec start_supervised(options) :: Pipeline.on_start()
  def start_supervised(pipeline_options \\ []) do
    pipeline_options = Keyword.put_new(pipeline_options, :test_process, self())
    ex_unit_start_supervised({__MODULE__, pipeline_options})
  end

  @spec start_supervised!(options) :: pipeline_pid :: pid
  def start_supervised!(pipeline_options \\ []) do
    {:ok, _supervisor, pipeline} = start_supervised(pipeline_options)
    pipeline
  end

  defp ex_unit_start_supervised(child_spec) do
    # It's not a 'normal' call to keep dialyzer quiet
    apply(ExUnit.Callbacks, :start_supervised, [child_spec])
  end

  @spec terminate(pid, Keyword.t()) :: :ok | {:ok, pid()} | {:error, :timeout}
  defdelegate terminate(pipeline, opts \\ []), to: Pipeline

  @doc """
  Sends notification to a child by Element name.

  ## Example

  Knowing that `pipeline` has child named `sink`, notification can be sent as follows:

      notify_child(pipeline, :sink, {:notification, "to handle"})
  """
  @spec notify_child(pid(), Element.name(), any()) :: :ok
  def notify_child(pipeline, child, notification) do
    send(pipeline, {:for_element, child, notification})
    :ok
  end

  @doc """
  Deprecated since `v1.1.0-rc0`, use `notify_child/3` instead.

  Sends message to a child by Element name.

  ## Example

  Knowing that `pipeline` has child named `sink`, message can be sent as follows:

      message_child(pipeline, :sink, {:message, "to handle"})
  """
  @deprecated "Use #{inspect(__MODULE__)}.notify_child/3 instead"
  @spec message_child(pid(), Element.name(), any()) :: :ok
  def message_child(pipeline, child, message) do
    notify_child(pipeline, child, message)
  end

  @doc """
  Executes specified actions in the pipeline.

  The actions are returned from the `handle_info` callback.
  """
  @spec execute_actions(pid(), Keyword.t()) :: :ok
  def execute_actions(pipeline, actions) do
    send(pipeline, {__MODULE__, :__execute_actions__, actions})
    :ok
  end

  @doc """
  Returns the pid of the children process.

  Accepts pipeline pid as a first argument and a child reference or a list
  of child references representing a path as a second argument.

  If second argument is a child reference, function gets pid of this child
  from pipeline.

  If second argument is a path of child references, function gets pid of
  last a component pointed by this path.

  Returns
   * `{:ok, child_pid}`, if a child was succesfully found
   * `{:error, reason}`, if, for example, pipeline is not alive or children path is invalid
  """
  @spec get_child_pid(pid(), child_name_path :: Child.name() | [Child.name()]) ::
          {:ok, pid()} | {:error, reason :: term()}
  def get_child_pid(pipeline, [_head | _tail] = child_name_path) do
    do_get_child_pid(pipeline, child_name_path)
  end

  def get_child_pid(pipeline, child_name) when not is_list(child_name) do
    do_get_child_pid(pipeline, [child_name])
  end

  @doc """
  Returns the pid of the children process.

  Works as get_child_pid/2, but raises an error instead of returning
  `{:error, reason}` tuple.
  """
  @spec get_child_pid!(pid(), child_name_path :: Child.name() | [Child.name()]) :: pid()
  def get_child_pid!(parent_pid, child_name_path) do
    {:ok, child_pid} = get_child_pid(parent_pid, child_name_path)
    child_pid
  end

  defp do_get_child_pid(component_pid, child_name_path, is_pipeline? \\ true)

  defp do_get_child_pid(component_pid, [], _is_pipeline?) do
    {:ok, component_pid}
  end

  defp do_get_child_pid(component_pid, [child_name | child_name_path_tail], is_pipeline?) do
    case Message.call(component_pid, :get_child_pid, child_name) do
      {:ok, child_pid} ->
        do_get_child_pid(child_pid, child_name_path_tail, false)

      {:error, {:call_failure, {:noproc, _call_info}}} ->
        if is_pipeline?,
          do: {:error, :pipeline_not_alive},
          else: {:error, :component_not_alive}

      {:error, _reason} = error ->
        error
    end
  end

  @impl true
  def handle_init(ctx, options) do
    case Keyword.get(options, :module, :default) do
      :default ->
        spec = Bunch.listify(Keyword.get(options, :spec, []))
        test_process = Keyword.fetch!(options, :test_process)
        raise? = Keyword.get(options, :raise_on_child_pad_removed?, true)

        new_state = %State{
          test_process: test_process,
          raise_on_child_pad_removed?: raise?,
          module: nil
        }

        {[spec: spec], new_state}

      module when is_atom(module) ->
        case Code.ensure_compiled(module) do
          {:module, _} -> :ok
          {:error, _} -> raise "Unknown module: #{inspect(module)}"
        end

        new_state = %State{
          module: module,
          custom_pipeline_state: Keyword.get(options, :custom_args),
          test_process: Keyword.fetch!(options, :test_process)
        }

        injected_module_result = eval_injected_module_callback(:handle_init, [ctx], new_state)
        testing_pipeline_result = {[], new_state}
        combine_results(injected_module_result, testing_pipeline_result)

      not_a_module ->
        raise "Not a module: #{inspect(not_a_module)}"
    end
  end

  @impl true
  def handle_setup(ctx, %State{} = state) do
    {custom_actions, custom_state} =
      eval_injected_module_callback(
        :handle_setup,
        [ctx],
        state
      )

    :ok = notify_test_process(state.test_process, :setup)

    {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
  end

  @impl true
  def handle_playing(ctx, %State{} = state) do
    {custom_actions, custom_state} =
      eval_injected_module_callback(
        :handle_playing,
        [ctx],
        state
      )

    :ok = notify_test_process(state.test_process, :play)

    {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
  end

  @impl true
  def handle_child_notification(
        %Notification{payload: notification},
        from,
        _ctx,
        %State{} = state
      ) do
    :ok =
      notify_test_process(state.test_process, {:handle_child_notification, {notification, from}})

    {[], state}
  end

  @impl true
  def handle_child_notification(notification, from, ctx, %State{} = state) do
    {custom_actions, custom_state} =
      eval_injected_module_callback(
        :handle_child_notification,
        [notification, from, ctx],
        state
      )

    :ok =
      notify_test_process(state.test_process, {:handle_child_notification, {notification, from}})

    {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
  end

  @impl true
  def handle_spec_started(children, ctx, %State{} = state) do
    {custom_actions, custom_state} =
      eval_injected_module_callback(
        :handle_spec_started,
        [children, ctx],
        state
      )

    {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
  end

  [:handle_child_setup_completed, :handle_child_playing]
  |> Enum.map(fn callback ->
    @impl true
    def unquote(callback)(child, ctx, %State{} = state) do
      {custom_actions, custom_state} =
        eval_injected_module_callback(
          unquote(callback),
          [child, ctx],
          state
        )

      {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
    end
  end)

  @impl true
  def handle_info({__MODULE__, :__execute_actions__, actions}, _ctx, %State{} = state) do
    {actions, state}
  end

  @impl true
  def handle_info({:for_element, element, message}, ctx, %State{} = state) do
    injected_module_result =
      eval_injected_module_callback(
        :handle_info,
        [{:for_element, element, message}, ctx],
        state
      )

    testing_pipeline_result = {[notify_child: {element, message}], state}

    combine_results(injected_module_result, testing_pipeline_result)
  end

  @impl true
  def handle_info(message, ctx, %State{} = state) do
    {custom_actions, custom_state} =
      eval_injected_module_callback(
        :handle_info,
        [message, ctx],
        state
      )

    :ok = notify_test_process(state.test_process, {:handle_info, message})

    {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
  end

  @impl true
  def handle_call(message, ctx, state) do
    {custom_actions, custom_state} =
      eval_injected_module_callback(
        :handle_call,
        [message, ctx],
        state
      )

    :ok = notify_test_process(state.test_process, {:handle_call, message})

    {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
  end

  @impl true
  def handle_element_start_of_stream(element, pad, ctx, state) do
    {custom_actions, custom_state} =
      eval_injected_module_callback(
        :handle_element_start_of_stream,
        [element, pad, ctx],
        state
      )

    :ok =
      notify_test_process(state.test_process, {:handle_element_start_of_stream, {element, pad}})

    {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
  end

  @impl true
  def handle_element_end_of_stream(element, pad, ctx, state) do
    {custom_actions, custom_state} =
      eval_injected_module_callback(
        :handle_element_end_of_stream,
        [element, pad, ctx],
        state
      )

    :ok = notify_test_process(state.test_process, {:handle_element_end_of_stream, {element, pad}})

    {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
  end

  @impl true
  def handle_tick(timer, ctx, state) do
    {custom_actions, custom_state} =
      eval_injected_module_callback(
        :handle_tick,
        [timer, ctx],
        state
      )

    {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
  end

  @impl true
  def handle_child_pad_removed(child, pad, ctx, state) do
    if state.raise_on_child_pad_removed? do
      raise """
      Child #{inspect(child)} removed it's pad #{inspect(pad)}. If you want to
      handle such a scenario, pass `raise_on_child_pad_removed?: false` option to
      `Membrane.Testing.Pipeline.start_*/2` or pass there a pipeline module
      implementing this callback via `:module` option.
      """
    end

    {custom_actions, custom_state} =
      eval_injected_module_callback(
        :handle_child_pad_removed,
        [child, pad, ctx],
        state
      )

    :ok = notify_test_process(state.test_process, {:handle_child_pad_removed, {child, pad}})

    {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
  end

  @impl true
  def handle_crash_group_down(group_name, ctx, state) do
    {custom_actions, custom_state} =
      eval_injected_module_callback(
        :handle_crash_group_down,
        [group_name, ctx],
        state
      )

    :ok = notify_test_process(state.test_process, {:handle_crash_group_down, group_name})

    {custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
  end

  defp eval_injected_module_callback(callback, args, state)

  defp eval_injected_module_callback(_callback, _args, %State{module: nil}),
    do: {[], nil}

  defp eval_injected_module_callback(callback, args, state) do
    if callback != :handle_spec_started or
         function_exported?(state.module, :handle_spec_started, 3) do
      apply(state.module, callback, args ++ [state.custom_pipeline_state])
    else
      {[], state.custom_pipeline_state}
    end
  end

  defp notify_test_process(test_process, message) do
    send(test_process, {__MODULE__, self(), message})
    :ok
  end

  defp combine_results({custom_actions, custom_state}, {actions, state}) do
    {Enum.concat(custom_actions, actions), Map.put(state, :custom_pipeline_state, custom_state)}
  end

  defp validate_options!(options) do
    allowed_keys_1 = [:module, :mode, :spec, :test_process, :name, :raise_on_child_pad_removed?]
    allowed_keys_2 = [:module, :custom_args, :test_process, :name]

    with {:error, _keys} <- Keyword.validate(options, allowed_keys_1),
         {:error, _keys} <- Keyword.validate(options, allowed_keys_2) do
      raise """
      Options passed to #{inspect(__MODULE__)} start function has to fulfill type
      #{inspect(__MODULE__)}.options, while they are #{inspect(options)}
      """
    else
      {:ok, _keyword} -> :ok
    end
  end
end