lib/membrane/rc_pipeline.ex

defmodule Membrane.RCPipeline do
  @moduledoc """
  Remote controlled pipeline - a basic `Membrane.Pipeline` implementation that can be remotely controlled from an external process.

  The easiest way to start this pipeline is to use `start_link!/1`
  ```
    pipeline = Membrane.RCPipeline.start_link!()
  ```

  The controlling process can request the execution of arbitrary
  valid `Membrane.Pipeline.Action`:
  ```
    children = ...
    links = ...
    actions = [{:spec, children++links}]
    Pipeline.exec_actions(pipeline, actions)
  ```

  The controlling process can also subscribe to the messages
  sent by the pipeline and later on synchronously await for these messages:
  ```
  # subscribes to message which is sent when the pipeline enters `playing`
  Membrane.RCPipeline.subscribe(pipeline, %Message.Playing{})
  ...
  # awaits for the message sent when the pipeline enters :playing playback
  Membrane.RCPipeline.await_playing(pipeline)
  ...
  ```

  `Membrane.RCPipeline` can be used when there is no need for introducing a custom
  logic in the `Membrane.Pipeline` callbacks implementation. An example of usage could be running a
  pipeline from the elixir script. `Membrane.RCPipeline` sends the following messages:
  * `t:Membrane.RCMessage.Playing.t/0` sent when pipeline enters `playing` playback,
  * `t:Membrane.RCMessage.StartOfStream.t/0` sent
  when one of direct pipeline children informs the pipeline about start of a stream,
  * `t:Membrane.RCMessage.EndOfStream.t/0` sent
  when one of direct pipeline children informs the pipeline about end of a stream,
  * `t:Membrane.RCMessage.Notification.t/0` sent when pipeline
  receives notification from one of its children,
  * `t:Membrane.RCMessage.Terminated.t/0` sent when the pipeline gracefully terminates.
  """

  use Membrane.Pipeline

  alias Membrane.Pipeline

  alias Membrane.RCMessage.{
    EndOfStream,
    Notification,
    Playing,
    StartOfStream,
    Terminated
  }

  defmodule State do
    @moduledoc false

    @enforce_keys [:controller_pid]
    defstruct @enforce_keys ++ [matching_functions: []]
  end

  @doc """
  Starts the `Membrane.RCPipeline` and links it to the current process. The process
  that makes the call to the `start_link/1` automatically becomes the controller process.
  """
  @spec start_link([Pipeline.config_entry() | {:controller_pid, pid()}]) :: Pipeline.on_start()
  def start_link(options \\ []) do
    {controller_pid, config} = Keyword.pop(options, :controller_pid, self())
    Pipeline.start_link(__MODULE__, %{controller_pid: controller_pid}, config)
  end

  @spec start_link!([Pipeline.config_entry() | {:controller_pid, pid()}]) :: pid
  def start_link!(options \\ []) do
    {:ok, _supervisor, pipeline} = start_link(options)
    pipeline
  end

  @doc """
  Does the same as the `start_link/1` but starts the process outside of the current supervision tree.
  """
  @spec start([Pipeline.config_entry() | {:controller_pid, pid()}]) :: Pipeline.on_start()
  def start(options \\ []) do
    {controller_pid, config} = Keyword.pop(options, :controller_pid, self())
    Pipeline.start(__MODULE__, %{controller_pid: controller_pid}, config)
  end

  @spec start!([Pipeline.config_entry() | {:controller_pid, pid()}]) :: pid
  def start!(options \\ []) do
    {:ok, _supervisor, pipeline} = start(options)
    pipeline
  end

  @spec terminate(pid, Keyword.t()) :: :ok | {:ok, pid()} | {:error, :timeout}
  defdelegate terminate(pipeline, opts \\ []), to: Pipeline

  defmacrop pin_leaf_nodes(ast) do
    quote do
      Macro.postwalk(unquote(ast), fn node ->
        if not Macro.quoted_literal?(node) and match?({_name, _ctx, _args}, node) do
          {_name, ctx, args} = node

          case args do
            nil -> {:^, ctx, [node]}
            _not_nil -> node
          end
        else
          node
        end
      end)
    end
  end

  defmacrop do_await(pipeline, message_type, keywords \\ []) do
    keywords = pin_leaf_nodes(keywords)

    quote do
      receive do
        %unquote(message_type){
          unquote_splicing(Macro.expand(keywords, __ENV__)),
          from: ^unquote(pipeline)
        } = msg ->
          msg
      end
    end
  end

  @doc """
  Awaits for the first `t:Membrane.RCMessage.t/0` wrapping the `t:Membrane.RCMessage.Playing.t/0`
  It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting
  for that message.

  Usage example:
    1) awaiting until the pipeline starts playing:
    ```
    Pipeline.await_playing(pipeline)
    ```
  """
  @spec await_playing(pid()) :: Membrane.RCMessage.Playing.t()
  def await_playing(pipeline) do
    do_await(pipeline, Playing)
  end

  @doc """
  Awaits for the first `t:Membrane.RCMessage.t/0` wrapping the `t:Membrane.RCMessage.StartOfStream.t/0` message
  with no further constraints, sent by the process with `pipeline` pid.
  It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting
  for that message.

  Usage example:
    1) awaiting for the first `start_of_stream` occuring on any pad of any element in the pipeline:
    ```
    Pipeline.await_start_of_stream(pipeline)
    ```
  """
  @spec await_start_of_stream(pid) :: Membrane.RCMessage.StartOfStream.t()
  def await_start_of_stream(pipeline) do
    do_await(pipeline, StartOfStream)
  end

  @doc """
  Awaits for the first `t:Membrane.RCMessage.t/0` wrapping the `t:Membrane.RCMessage.StartOfStream.t/0` message
  concerning the given `element`, sent by the process with `pipeline` pid.
  It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting
  for that message.

  Usage example:
    1) awaiting for the first `start_of_stream` occuring on any pad of the `:element_id` element in the pipeline:
    ```
    Pipeline.await_start_of_stream(pipeline, :element_id)
    ```
  """
  @spec await_start_of_stream(pid(), Membrane.Element.name()) ::
          Membrane.RCMessage.StartOfStream.t()
  def await_start_of_stream(pipeline, element) do
    do_await(pipeline, StartOfStream, element: element)
  end

  @doc """
  Awaits for the first `t:Membrane.RCMessage.t/0` wrapping the `t:Membrane.RCMessage.StartOfStream.t/0` message
  concerning the given `element` and the `pad`, sent by the process with `pipeline` pid.
  It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting
  for that message.

  Usage example:
    1) awaiting for the first `start_of_stream` occuring on the `:pad_id` pad of the `:element_id` element in the pipeline:
    ```
    Pipeline.await_start_of_stream(pipeline, :element_id, :pad_id)
    ```
  """
  @spec await_start_of_stream(pid(), Membrane.Element.name(), Membrane.Pad.name()) ::
          Membrane.RCMessage.StartOfStream.t()
  def await_start_of_stream(pipeline, element, pad) do
    do_await(pipeline, StartOfStream, element: element, pad: pad)
  end

  @doc """
  Awaits for the first `t:Membrane.RCMessage.t/0` wrapping the `t:Membrane.RCMessage.EndOfStream.t/0` message
  with no further constraints, sent by the process with `pipeline` pid.
  It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting
  for that message.

  Usage example:
    1) awaiting for the first `end_of_stream` occuring on any pad of any element in the pipeline:
    ```
    Pipeline.await_end_of_stream(pipeline)
    ```
  """
  @spec await_end_of_stream(pid()) :: Membrane.RCMessage.EndOfStream.t()
  def await_end_of_stream(pipeline) do
    do_await(pipeline, EndOfStream)
  end

  @doc """
  Awaits for the first `t:Membrane.RCMessage.t/0` wrapping the `t:Membrane.RCMessage.EndOfStream.t/0` message
  concerning the given `element`, sent by the process with `pipeline` pid.
  It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting
  for that message.

  Usage example:
    1) awaiting for the first `end_of_stream` occuring on any pad of the `:element_id` element in the pipeline:
    ```
    Pipeline.await_end_of_stream(pipeline, :element_id)
    ```
  """
  @spec await_end_of_stream(pid(), Membrane.Element.name()) ::
          Membrane.RCMessage.EndOfStream.t()
  def await_end_of_stream(pipeline, element) do
    do_await(pipeline, EndOfStream, element: element)
  end

  @doc """
  Awaits for the first `t:Membrane.RCMessage.t/0` wrapping the `t:Membrane.RCMessage.EndOfStream.t/0` message
  concerning the given `element` and the `pad`, sent by the process with `pipeline` pid.
  It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting
  for that message.

  Usage example:
    1) awaiting for the first `end_of_stream` occuring on the `:pad_id` of the `:element_id` element in the pipeline:
    ```
    Pipeline.await_end_of_stream(pipeline, :element_id, :pad_id)
    ```
  """
  @spec await_end_of_stream(pid(), Membrane.Element.name(), Membrane.Pad.name()) ::
          Membrane.RCMessage.EndOfStream.t()
  def await_end_of_stream(pipeline, element, pad) do
    do_await(pipeline, EndOfStream, element: element, pad: pad)
  end

  @doc """
  Awaits for the first `t:Membrane.RCMessage.t/0` wrapping the `t:Membrane.RCMessage.Notification.t/0`
  message with no further constraints, sent by the process with `pipeline` pid.
  It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting
  for that message.

  Usage example:
    1) awaiting for the first notification send to any element in the pipeline:
    ```
    Pipeline.await_notification(pipeline)
    ```
  """
  @spec await_notification(pid()) :: Membrane.RCMessage.Notification.t()
  def await_notification(pipeline) do
    do_await(pipeline, Notification)
  end

  @doc """
  Awaits for the first `t:Membrane.RCMessage.t/0` wrapping the `t:Membrane.RCMessage.Notification.t/0` message
  concerning the given `element`, sent by the process with `pipeline` pid.
  It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting
  for that message.

  Usage example:
    1) awaiting for the first notification send to the `:element_id` element in the pipeline:
    ```
    Pipeline.await_notification(pipeline, :element_id)
    ```
  """
  @spec await_notification(pid(), Membrane.ParentNotification.t()) ::
          Membrane.RCMessage.Notification.t()
  def await_notification(pipeline, element) do
    do_await(pipeline, Notification, element: element)
  end

  @doc """
  Awaits for the `t:Membrane.RCMessage.t/0` wrapping the `Membrane.RCMessage.Terminated` message,
  which is send when the pipeline gracefully terminates.
  It is required to firstly use the `subscribe/2` to subscribe to a given message before awaiting
  for that message.

  Usage example:
    1) awaiting for the pipeline termination:
    ```
    Pipeline.await_termination(pipeline)
    ```
  """
  @spec await_termination(pid()) :: Membrane.RCMessage.Terminated.t()
  def await_termination(pipeline) do
    do_await(pipeline, Terminated)
  end

  @doc """
  Subscribes to a given `subscription_pattern`. The `subscription_pattern` should describe some subset
  of elements of `t:Membrane.RCPipeline.Message.t/0` type. The `subscription_pattern`
  must be a match pattern.


  Usage examples:
  1) making the `Membrane.RCPipeline` send to the controlling process `Message.StartOfStream` message
    when any pad of the `:element_id` receives `:start_of_stream` event.

    ```
    subscribe(pipeline, %Message.StartOfStream{element: :element_id, pad: _})
    ```

  2) making the `Membrane.RCPipeline` send to the controlling process `Message.Playing` message when the pipeline playback changes to `:playing`

    ```
    subscribe(pipeline, %Message.Playing{})
    ```
  """
  defmacro subscribe(pipeline, subscription_pattern) do
    quote do
      send(
        unquote(pipeline),
        {:subscription, fn message -> match?(unquote(subscription_pattern), message) end}
      )
    end
  end

  @doc """
  Sends a list of `t:Pipeline.Action.t/0` to the given `Membrane.RCPipeline` for execution.

  Usage example:
    1) making the `Membrane.RCPipeline` start the `Membrane.ChildrenSpec`
       specified in the action.
    ```
    children = ...
    links = ...
    actions = [{:spec, children++links}]
    Pipeline.exec_actions(pipeline, actions)
    ```
  """
  @spec exec_actions(pid(), [Pipeline.Action.t()]) :: :ok
  def exec_actions(pipeline, actions) do
    send(pipeline, {:exec_actions, actions})
    :ok
  end

  @impl true
  def handle_init(_ctx, opts) do
    %{controller_pid: controller_pid} = opts
    state = %State{controller_pid: controller_pid}
    {[], state}
  end

  @impl true
  def handle_playing(_ctx, state) do
    pipeline_event = %Playing{from: self()}
    send_event_to_controller_if_subscribed(pipeline_event, state)
    {[], state}
  end

  @impl true
  def handle_element_end_of_stream(element_name, pad_ref, _ctx, state) do
    pipeline_event = %EndOfStream{from: self(), element: element_name, pad: pad_ref}
    send_event_to_controller_if_subscribed(pipeline_event, state)
    {[], state}
  end

  @impl true
  def handle_element_start_of_stream(element_name, pad_ref, _ctx, state) do
    pipeline_event = %StartOfStream{from: self(), element: element_name, pad: pad_ref}
    send_event_to_controller_if_subscribed(pipeline_event, state)
    {[], state}
  end

  @impl true
  def handle_child_notification(notification, element, _ctx, state) do
    pipeline_event = %Notification{from: self(), data: notification, element: element}
    send_event_to_controller_if_subscribed(pipeline_event, state)
    {[], state}
  end

  @impl true
  def handle_info({:exec_actions, actions}, _ctx, state) do
    {actions, state}
  end

  @impl true
  def handle_info({:subscription, pattern}, _ctx, state) do
    {[], %{state | matching_functions: [pattern | state.matching_functions]}}
  end

  @impl true
  def handle_terminate_request(_ctx, state) do
    pipeline_event = %Terminated{from: self()}
    send_event_to_controller_if_subscribed(pipeline_event, state)
    {[terminate: :normal], state}
  end

  defp send_event_to_controller_if_subscribed(message, state) do
    if Enum.any?(state.matching_functions, & &1.(message)) do
      send(state.controller_pid, message)
    end
  end
end