lib/membrane/testing/pipeline.ex

defmodule Membrane.Testing.Pipeline do
  @moduledoc """
  This Pipeline was created to reduce testing boilerplate and ease communication
  with its elements. It also provides a utility for informing testing process about
  playback state changes and received notifications.

  When you want a build Pipeline to test your elements you need three things:
   - Pipeline Module
   - List of elements
   - Links between those elements

  When creating pipelines for tests the only essential part is the list of
   elements. In most cases during the tests, elements are linked in a way that
  `:output` pad is linked to `:input` pad of subsequent element. So we only need
   to pass a list of elements and links can be generated automatically.

  To start a testing pipeline you need to build
  `Membrane.Testing.Pipeline.Options` struct and pass to
  `Membrane.Testing.Pipeline.start_link/2`. Links are generated by
  `populate_links/1`.

  ```
  options = %Membrane.Testing.Pipeline.Options {
    elements: [
      el1: MembraneElement1,
      el2: MembraneElement2,
      ...
    ]
  }
  {:ok, pipeline} = Membrane.Testing.Pipeline.start_link(options)
  ```

  If you need to pass custom links, you can always do it using `:links` field of
  `Membrane.Testing.Pipeline.Options` struct.

  ```
  options = %Membrane.Testing.Pipeline.Options {
    elements: [
      el1: MembraneElement1,
      el2: MembraneElement2,
      ],
      links: [
        link(:el1) |> to(:el2)
      ]
    }
    ```

  You can also pass a custom pipeline module, by using `:module` field of
  `Membrane.Testing.Pipeline.Options` struct. Every callback of the module
  will be executed before the callbacks of Testing.Pipeline.
  Passed module has to return a proper spec. There should be no elements
  nor links specified in options passed to test pipeline as that would
  result in a failure.

  ```
  options = %Membrane.Testing.Pipeline.Options {
      module: Your.Module
    }
    ```

  See `Membrane.Testing.Pipeline.Options` for available options.

  ## Assertions

  This pipeline is designed to work with `Membrane.Testing.Assertions`. Check
  them out or see example below for more details.

  ## Messaging children

  You can send messages to children using their names specified in the elements
  list. Please check `message_child/3` for more details.

  ## Example usage

  Firstly, we can start the pipeline providing its options:

      options = %Membrane.Testing.Pipeline.Options {
        elements: [
          source: %Membrane.Testing.Source{},
          tested_element: TestedElement,
          sink: %Membrane.Testing.Sink{}
        ]
      }
      {:ok, pipeline} = Membrane.Testing.Pipeline.start_link(options)

  We can now wait till the end of the stream reaches the sink element (don't forget
  to import `Membrane.Testing.Assertions`):

      assert_end_of_stream(pipeline, :sink)

  We can also assert that the `Membrane.Testing.Sink` processed a specific
  buffer:

      assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: 1})

  """

  use Membrane.Pipeline

  alias Membrane.{Element, Pipeline}
  alias Membrane.ParentSpec

  defmodule Options do
    @moduledoc """
    Structure representing `options` passed to testing pipeline.

    ##  Test Process
    `pid` of process that shall receive messages when Pipeline invokes playback
    state change callback and receives notification.

    ## Elements
    List of element specs.

    ## Links
    List describing the links between elements.

    ## Module
    Pipeline Module with custom callbacks.

    ## Custom Args
    Arguments for Module's `handle_init` callback.

    If links are not present or set to nil they will be populated automatically
    based on elements order using default pad names.
    """

    defstruct [:elements, :links, :test_process, :module, :custom_args]

    @type t :: %__MODULE__{
            test_process: pid() | nil,
            elements: ParentSpec.children_spec_t() | nil,
            links: ParentSpec.links_spec_t() | nil,
            module: module() | nil,
            custom_args: Pipeline.pipeline_options_t() | nil
          }
  end

  defmodule State do
    @moduledoc """
    Structure representing `state`.

    ##  Test Process
    `pid` of process that shall receive messages when Pipeline invokes playback
    state change callback and receives notification.

    ## Module
    Pipeline Module with custom callbacks.

    ## Custom Pipeline State
    State of the pipeline defined by Module.
    """

    @enforce_keys [:test_process, :module]
    defstruct @enforce_keys ++ [:custom_pipeline_state]

    @type t :: %__MODULE__{
            test_process: pid() | nil,
            module: module() | nil,
            custom_pipeline_state: any
          }
  end

  @spec start_link(Options.t(), GenServer.options()) :: GenServer.on_start()
  def start_link(pipeline_options, process_options \\ []) do
    do_start(:start_link, pipeline_options, process_options)
  end

  @spec start(Options.t(), GenServer.options()) :: GenServer.on_start()
  def start(pipeline_options, process_options \\ []) do
    do_start(:start, pipeline_options, process_options)
  end

  defp do_start(_type, %Options{elements: nil, module: nil}, _process_options) do
    raise """

    You provided no information about pipeline contents. Please provide either:
     - list of elemenst via `elements` field of Options struct with optional links between
     them via `links` field of `Options` struct
     - module that implements `Membrane.Pipeline` callbacks via `module` field of `Options`
     struct
    """
  end

  defp do_start(_type, %Options{elements: elements, module: module}, _process_options)
       when is_atom(module) and module != nil and elements != nil do
    raise """

    When working with Membrane.Testing.Pipeline you can't provide both
    override module and elements list in the Membrane.Testing.Pipeline.Options
    struct.
    """
  end

  defp do_start(type, options, process_options) do
    pipeline_options = default_options(options)
    args = [__MODULE__, pipeline_options, process_options]
    apply(Pipeline, type, args)
  end

  @doc """
  Links subsequent elements using default pads (linking `:input` to `:output` of
  previous element).

  ## Example

      Pipeline.populate_links([el1: MembraneElement1, el2: MembraneElement2])
  """
  @spec populate_links(elements :: ParentSpec.children_spec_t()) :: ParentSpec.links_spec_t()
  def populate_links(elements) when length(elements) < 2 do
    []
  end

  def populate_links(elements) when is_list(elements) do
    import ParentSpec
    [h | t] = elements |> Keyword.keys()
    links = t |> Enum.reduce(link(h), &to(&2, &1))
    [links]
  end

  @doc """
  Sends message to a child by Element name.

  ## Example

  Knowing that `pipeline` has child named `sink`, message can be sent as follows:

      message_child(pipeline, :sink, {:message, "to handle"})
  """
  @spec message_child(pid(), Element.name_t(), any()) :: :ok
  def message_child(pipeline, child, message) do
    send(pipeline, {:for_element, child, message})
    :ok
  end

  @impl true
  def handle_init(%Options{links: nil, module: nil} = options) do
    new_links = populate_links(options.elements)
    handle_init(%Options{options | links: new_links})
  end

  @impl true
  def handle_init(%Options{module: nil} = options) do
    spec = %Membrane.ParentSpec{
      children: options.elements,
      links: options.links
    }

    new_state = %State{test_process: options.test_process, module: nil}
    {{:ok, spec: spec}, new_state}
  end

  @impl true
  def handle_init(%Options{links: nil, elements: nil} = options) do
    new_state = %State{
      test_process: options.test_process,
      module: options.module,
      custom_pipeline_state: options.custom_args
    }

    eval(:handle_init, [], fn -> {:ok, new_state} end, new_state)
  end

  @impl true
  def handle_stopped_to_prepared(ctx, %State{} = state) do
    eval(
      :handle_stopped_to_prepared,
      [ctx],
      fn -> notify_playback_state_changed(:stopped, :prepared, state) end,
      state
    )
  end

  @impl true
  def handle_prepared_to_playing(ctx, %State{} = state) do
    eval(
      :handle_prepared_to_playing,
      [ctx],
      fn -> notify_playback_state_changed(:prepared, :playing, state) end,
      state
    )
  end

  @impl true
  def handle_playing_to_prepared(ctx, %State{} = state) do
    eval(
      :handle_playing_to_prepared,
      [ctx],
      fn -> notify_playback_state_changed(:playing, :prepared, state) end,
      state
    )
  end

  @impl true
  def handle_prepared_to_stopped(ctx, %State{} = state) do
    eval(
      :handle_prepared_to_stopped,
      [ctx],
      fn -> notify_playback_state_changed(:prepared, :stopped, state) end,
      state
    )
  end

  @impl true
  def handle_notification(notification, from, ctx, %State{} = state) do
    eval(
      :handle_notification,
      [notification, from, ctx],
      fn -> notify_test_process({:handle_notification, {notification, from}}, state) end,
      state
    )
  end

  @impl true
  def handle_spec_started(elements, ctx, %State{} = state) do
    eval(
      :handle_spec_started,
      [elements, ctx],
      fn -> {:ok, state} end,
      state
    )
  end

  @impl true
  def handle_other({:for_element, element, message}, ctx, %State{} = state) do
    eval(
      :handle_other,
      [{:for_element, element, message}, ctx],
      fn -> {{:ok, forward: {element, message}}, state} end,
      state
    )
  end

  @impl true
  def handle_other(message, ctx, %State{} = state) do
    eval(
      :handle_other,
      [message, ctx],
      fn -> notify_test_process({:handle_other, message}, state) end,
      state
    )
  end

  @impl true
  def handle_element_start_of_stream(endpoint, ctx, state) do
    eval(
      :handle_element_start_of_stream,
      [endpoint, ctx],
      fn ->
        notify_test_process({:handle_element_start_of_stream, endpoint}, state)
      end,
      state
    )
  end

  @impl true
  def handle_element_end_of_stream(endpoint, ctx, state) do
    eval(
      :handle_element_end_of_stream,
      [endpoint, ctx],
      fn ->
        notify_test_process({:handle_element_end_of_stream, endpoint}, state)
      end,
      state
    )
  end

  @impl true
  def handle_tick(timer, ctx, state) do
    eval(
      :handle_tick,
      [timer, ctx],
      fn -> {:ok, state} end,
      state
    )
  end

  @impl true
  def handle_crash_group_down(group_name, ctx, state) do
    eval(
      :handle_crash_group_down,
      [group_name, ctx],
      fn -> {:ok, state} end,
      state
    )
  end

  defp default_options(%Options{test_process: nil} = options),
    do: %Options{options | test_process: self()}

  defp default_options(options), do: options

  defp eval(custom_function, custom_args, function, state)

  defp eval(_custom_function, _custom_args, function, %State{module: nil}),
    do: function.()

  defp eval(custom_function, custom_args, function, %State{module: module} = state) do
    with custom_result = {{:ok, _actions}, _state} <-
           apply(module, custom_function, custom_args ++ [state.custom_pipeline_state])
           |> unify_result do
      result = function.()
      combine_results(custom_result, result)
    end
  end

  defp notify_playback_state_changed(previous, current, %State{} = state) do
    notify_test_process({:playback_state_changed, previous, current}, state)
  end

  defp notify_test_process(message, %State{test_process: test_process} = state) do
    send(test_process, {__MODULE__, self(), message})
    {:ok, state}
  end

  defp unify_result({:ok, state}),
    do: {{:ok, []}, state}

  defp unify_result({{_, _}, _} = result),
    do: result

  defp combine_results({custom_actions, custom_state}, {actions, state}) do
    {combine_actions(custom_actions, actions),
     Map.put(state, :custom_pipeline_state, custom_state)}
  end

  defp combine_actions(l, r) do
    case {l, r} do
      {l, :ok} -> l
      {{:ok, actions_l}, {:ok, actions_r}} -> {:ok, actions_l ++ actions_r}
      {_l, r} -> r
    end
  end
end