defmodule Membrane.Testing.Pipeline do
@moduledoc """
This Pipeline was created to reduce testing boilerplate and ease communication
with its children. It also provides a utility for informing testing process about
playback state changes and received notifications.
When you want a build Pipeline to test your children you need three things:
- Pipeline Module
- List of children
- Links between those children
To start a testing pipeline you need to build
a keyword list representing the options used to determine the pipeline's behaviour and then
pass that options list to the `Membrane.Testing.Pipeline.start_link/2`.
The testing pipeline can be started in one of two modes - either with its `:default` behaviour, or by
injecting a custom module behaviour. The usage of a `:default` pipeline implementation is presented below:
```
children = [
el1: MembraneElement1,
el2: MembraneElement2,
...
]
options = [
module: :default # :default is the default value for this parameter, so you do not need to pass it here
links: Membrane.ParentSpec.link_linear(children)
]
{:ok, pipeline} = Membrane.Testing.Pipeline.start_link(options)
```
Note, that we have used `Membrane.Testing.ParentSpec.link_linear/1` function, that creates the list of links
for the given list of children, linking them in linear manner (that means - children are linked in a way that
`:output` pad of a given child is linked to `:input` pad of subsequent child). That is the case
which is often used while creating testing pipelines. Be aware, that `Membrane.Testing.ParentSpec.link_linear/1`
creates also a children specification itself, which means, that you cannot pass that children specification
as another option's argument (adding `children: children` option would lead to a duplication of children specifications).
If you need to link children in a different manner, you can of course do it by passing an appropriate list
of links as a `:links` option, just as you would do with a regular pipeline.
You can also pass your custom pipeline's module as a `:module` option of
the options list. Every callback of the module
will be executed before the callbacks of Testing.Pipeline.
Passed module has to return a proper spec. There should be no children
nor links specified in options passed to test pipeline as that would
result in a failure.
```
options = [
module: Your.Module
]
{:ok, pipeline} = Membrane.Testing.Pipeline.start_link(options)
```
See `Membrane.Testing.Pipeline.pipeline_keyword_list_t()` for available options.
## Assertions
This pipeline is designed to work with `Membrane.Testing.Assertions`. Check
them out or see example below for more details.
## Messaging children
You can send messages to children using their names specified in the children
list. Please check `message_child/3` for more details.
## Example usage
Firstly, we can start the pipeline providing its options as a keyword list:
children = [
source: %Membrane.Testing.Source{},
tested_element: TestedElement,
sink: %Membrane.Testing.Sink{}
]
{:ok, pipeline} = Membrane.Testing.Pipeline.start_link(links: Membrane.ParentSpec.link_linear(children))
We can now wait till the end of the stream reaches the sink element (don't forget
to import `Membrane.Testing.Assertions`):
assert_end_of_stream(pipeline, :sink)
We can also assert that the `Membrane.Testing.Sink` processed a specific
buffer:
assert_sink_buffer(pipeline, :sink, %Membrane.Buffer{payload: 1})
"""
use Membrane.Pipeline
alias Membrane.{Element, Pipeline}
alias Membrane.ParentSpec
alias Membrane.Testing.Notification
require Membrane.Logger
defmodule Options do
@moduledoc """
@deprecated
Structure representing `options` passed to testing pipeline.
## Struct fields
- `:test_process` - `pid` of process that shall receive messages from testing pipeline, e.g. when pipeline's playback state changes.
This allows using `Membrane.Testing.Assertions`
- `:elements` - a list of element specs. Allows to create a simple pipeline without defining a module for it.
- `:links` - a list describing the links between children. If ommited (or set to `nil`), they will be populated automatically
based on the children order using default pad names.
- `:module` - pipeline module with custom callbacks - useful if a simple list of children is not enough.
- `:custom_args`- arguments for the module's `handle_init` callback.
"""
defstruct [:elements, :links, :test_process, :module, :custom_args]
@type t :: %__MODULE__{
test_process: pid() | nil,
elements: ParentSpec.children_spec_t() | nil,
links: ParentSpec.links_spec_t() | nil,
module: module() | nil,
custom_args: Pipeline.pipeline_options_t() | nil
}
end
defmodule State do
@moduledoc """
Structure representing `state`.
## Test Process
`pid` of process that shall receive messages when Pipeline invokes playback
state change callback and receives notification.
## Module
Pipeline Module with custom callbacks.
## Custom Pipeline State
State of the pipeline defined by Module.
"""
@enforce_keys [:test_process, :module]
defstruct @enforce_keys ++ [:custom_pipeline_state]
@type t :: %__MODULE__{
test_process: pid() | nil,
module: module() | nil,
custom_pipeline_state: any
}
end
@type default_pipeline_keyword_list_t :: [
module: :default,
children: ParentSpec.children_spec_t(),
links: ParentSpec.links_spec_t(),
test_process: pid()
]
@type custom_pipeline_keyword_list_t :: [
module: module(),
custom_args: Pipeline.pipeline_options_t(),
test_process: pid()
]
@type pipeline_keyword_list_t ::
default_pipeline_keyword_list_t() | custom_pipeline_keyword_list_t()
@spec start_link(Options.t() | pipeline_keyword_list_t(), GenServer.options()) ::
GenServer.on_start()
def start_link(pipeline_options, process_options \\ [])
def start_link(pipeline_options, process_options) when is_struct(pipeline_options, Options) do
Membrane.Logger.warn(
"Please pass options to Membrane.Testing.Pipeline.start_link/2 as keyword list, instead of using Membrane.Testing.Options"
)
do_start(:start_link, pipeline_options, process_options)
end
def start_link(pipeline_options, process_options) do
pipeline_options = transform_pipeline_options(pipeline_options)
do_start(:start_link, pipeline_options, process_options)
end
@spec start(Options.t() | pipeline_keyword_list_t(), GenServer.options()) ::
GenServer.on_start()
def start(pipeline_options, process_options \\ [])
def start(pipeline_options, process_options) when is_struct(pipeline_options, Options) do
Membrane.Logger.warn(
"Please pass options to Membrane.Testing.Pipeline.start/2 as keyword list, instead of using Membrane.Testing.Options"
)
do_start(:start, pipeline_options, process_options)
end
def start(pipeline_options, process_options) do
pipeline_options = transform_pipeline_options(pipeline_options)
do_start(:start, pipeline_options, process_options)
end
defp transform_pipeline_options(pipeline_options) do
module = Keyword.get(pipeline_options, :module, :default)
case module do
:default ->
children = Keyword.get(pipeline_options, :children, [])
links = Keyword.get(pipeline_options, :links, [])
test_process = Keyword.get(pipeline_options, :test_process)
%{module: :default, children: children, links: links, test_process: test_process}
module when is_atom(module) ->
case Code.ensure_compiled(module) do
{:module, _} ->
custom_args = Keyword.get(pipeline_options, :custom_args)
test_process = Keyword.get(pipeline_options, :test_process)
%{module: module, custom_args: custom_args, test_process: test_process}
{:error, _} ->
raise "Unknown module: #{inspect(module)}"
end
not_a_module ->
raise "Not a module: #{inspect(not_a_module)}"
end
end
defp do_start(_type, %Options{elements: nil, module: nil}, _process_options) do
raise """
You provided no information about pipeline contents. Please provide either:
- list of children via `:children` field of Options struct with optional links between
them via `:links` field of `Options` struct
- module that implements `Membrane.Pipeline` callbacks via `module` field of `Options`
struct
"""
end
defp do_start(_type, %Options{elements: children, module: module}, _process_options)
when is_atom(module) and module != nil and children != nil do
raise """
When working with Membrane.Testing.Pipeline you can't provide both
override module and children list in the Membrane.Testing.Pipeline.Options
struct.
"""
end
defp do_start(type, options, process_options) do
pipeline_options = default_options(options)
args = [__MODULE__, pipeline_options, process_options]
apply(Pipeline, type, args)
end
@doc """
Sends message to a child by Element name.
## Example
Knowing that `pipeline` has child named `sink`, message can be sent as follows:
message_child(pipeline, :sink, {:message, "to handle"})
"""
@spec message_child(pid(), Element.name_t(), any()) :: :ok
def message_child(pipeline, child, message) do
send(pipeline, {:for_element, child, message})
:ok
end
@doc """
Executes specified actions in the pipeline.
The actions are returned from the `handle_other` callback.
"""
@spec execute_actions(pid(), Keyword.t()) :: :ok
def execute_actions(pipeline, actions) do
send(pipeline, {__MODULE__, :__execute_actions__, actions})
:ok
end
@impl true
def handle_init(%Options{links: nil, module: nil} = options) do
{links, children} =
if length(options.elements) <= 1 do
{[], options.elements}
else
{ParentSpec.link_linear(options.elements), []}
end
options_map = %{children: children, links: links, test_process: options.test_process}
do_handle_init_for_default_implementation(options_map)
end
@impl true
def handle_init(%Options{module: nil} = options) do
options_map = %{
children: options.elements,
links: options.links,
test_process: options.test_process
}
do_handle_init_for_default_implementation(options_map)
end
@impl true
def handle_init(%Options{links: nil, elements: nil} = options) do
options_map = %{
test_process: options.test_process,
module: options.module,
custom_args: options.custom_args
}
do_handle_init_with_custom_module(options_map)
end
@impl true
def handle_init(%{module: :default} = options) do
do_handle_init_for_default_implementation(options)
end
@impl true
def handle_init(%{module: _module} = options) do
do_handle_init_with_custom_module(options)
end
defp do_handle_init_for_default_implementation(options) do
spec = %Membrane.ParentSpec{
children: options.children,
links: options.links
}
new_state = %State{test_process: options.test_process, module: nil}
{{:ok, [spec: spec, playback: :playing]}, new_state}
end
defp do_handle_init_with_custom_module(options) do
new_state = %State{
test_process: options.test_process,
module: options.module,
custom_pipeline_state: options.custom_args
}
injected_module_result = eval_injected_module_callback(:handle_init, [], new_state)
testing_pipeline_result = {:ok, new_state}
combine_results(injected_module_result, testing_pipeline_result)
end
@impl true
def handle_stopped_to_prepared(ctx, %State{} = state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
:handle_stopped_to_prepared,
[ctx],
state
)
:ok = notify_playback_state_changed(state.test_process, :stopped, :prepared)
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
@impl true
def handle_prepared_to_playing(ctx, %State{} = state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
:handle_prepared_to_playing,
[ctx],
state
)
:ok = notify_playback_state_changed(state.test_process, :prepared, :playing)
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
@impl true
def handle_playing_to_prepared(ctx, %State{} = state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
:handle_playing_to_prepared,
[ctx],
state
)
:ok = notify_playback_state_changed(state.test_process, :playing, :prepared)
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
@impl true
def handle_prepared_to_stopped(ctx, %State{} = state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
:handle_prepared_to_stopped,
[ctx],
state
)
:ok = notify_playback_state_changed(state.test_process, :prepared, :stopped)
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
@impl true
def handle_notification(
%Notification{payload: notification},
from,
_ctx,
%State{} = state
) do
:ok = notify_test_process(state.test_process, {:handle_notification, {notification, from}})
{:ok, state}
end
@impl true
def handle_notification(notification, from, ctx, %State{} = state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
:handle_notification,
[notification, from, ctx],
state
)
:ok = notify_test_process(state.test_process, {:handle_notification, {notification, from}})
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
@impl true
def handle_spec_started(children, ctx, %State{} = state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
:handle_spec_started,
[children, ctx],
state
)
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
@impl true
def handle_other({__MODULE__, :__execute_actions__, actions}, _ctx, %State{} = state) do
{{:ok, actions}, state}
end
@impl true
def handle_other({:for_element, element, message}, ctx, %State{} = state) do
injected_module_result =
eval_injected_module_callback(
:handle_other,
[{:for_element, element, message}, ctx],
state
)
testing_pipeline_result = {{:ok, forward: {element, message}}, state}
combine_results(injected_module_result, testing_pipeline_result)
end
@impl true
def handle_other(message, ctx, %State{} = state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
:handle_other,
[message, ctx],
state
)
:ok = notify_test_process(state.test_process, {:handle_other, message})
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
@impl true
def handle_element_start_of_stream(endpoint, ctx, state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
:handle_element_start_of_stream,
[endpoint, ctx],
state
)
:ok = notify_test_process(state.test_process, {:handle_element_start_of_stream, endpoint})
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
@impl true
def handle_element_end_of_stream(endpoint, ctx, state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
:handle_element_end_of_stream,
[endpoint, ctx],
state
)
:ok = notify_test_process(state.test_process, {:handle_element_end_of_stream, endpoint})
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
@impl true
def handle_tick(timer, ctx, state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
:handle_tick,
[timer, ctx],
state
)
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
@impl true
def handle_crash_group_down(group_name, ctx, state) do
{custom_actions, custom_state} =
eval_injected_module_callback(
:handle_crash_group_down,
[group_name, ctx],
state
)
:ok = notify_test_process(state.test_process, {:handle_crash_group_down, group_name})
{custom_actions, Map.put(state, :custom_pipeline_state, custom_state)}
end
defp default_options(%Options{test_process: nil} = options),
do: %Options{options | test_process: self()}
defp default_options(%{test_process: nil} = options),
do: %{options | test_process: self()}
defp default_options(options), do: options
defp eval_injected_module_callback(callback, args, state)
defp eval_injected_module_callback(_callback, _args, %State{module: nil} = state),
do: {:ok, state} |> unify_result()
defp eval_injected_module_callback(callback, args, state) do
apply(state.module, callback, args ++ [state.custom_pipeline_state]) |> unify_result()
end
defp notify_playback_state_changed(test_process, previous, current) do
notify_test_process(test_process, {:playback_state_changed, previous, current})
end
defp notify_test_process(test_process, message) do
send(test_process, {__MODULE__, self(), message})
:ok
end
defp unify_result({:ok, state}),
do: {{:ok, []}, state}
defp unify_result({{_, _}, _} = result),
do: result
defp combine_results({custom_actions, custom_state}, {actions, state}) do
{combine_actions(custom_actions, actions),
Map.put(state, :custom_pipeline_state, custom_state)}
end
defp combine_actions(l, r) do
case {l, r} do
{l, :ok} -> l
{{:ok, actions_l}, {:ok, actions_r}} -> {:ok, actions_l ++ actions_r}
{_l, r} -> r
end
end
end