lib/flow.ex

defmodule Strom.Flow do
  defstruct pid: nil,
            name: nil,
            streams: [],
            pipelines: [],
            sources: [],
            sinks: [],
            mixers: [],
            splitters: []

  use GenServer

  @type t :: %__MODULE__{}

  # TODO Supervisor
  def start(flow_module, _opts \\ []) when is_atom(flow_module) do
    state = %__MODULE__{name: flow_module}

    {:ok, pid} = GenServer.start_link(__MODULE__, state, name: flow_module)

    Strom.Builder.build(flow_module.topology(), pid)
    __state__(pid)
  end

  @impl true
  def init(%__MODULE__{} = state) do
    {:ok, %{state | pid: self()}}
  end

  def add_stream(pid, stream), do: GenServer.call(pid, {:add_stream, stream})

  def add_component(pid, {type, component}),
    do: GenServer.call(pid, {:add_component, {type, component}})

  def run(flow_module), do: GenServer.call(flow_module, :run)

  def stop(flow_module) when is_atom(flow_module), do: GenServer.call(flow_module, :stop)

  def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

  @impl true
  def handle_call(:__state__, _from, state), do: {:reply, state, state}

  def handle_call({:add_stream, stream}, _from, %__MODULE__{streams: streams} = state) do
    state = %{state | streams: [stream | streams]}
    {:reply, :ok, state}
  end

  def handle_call({:add_component, {type, component}}, _from, %__MODULE__{} = state) do
    state =
      case type do
        :mixer ->
          %{state | mixers: [component | state.mixers]}

        :splitter ->
          %{state | splitters: [component | state.splitters]}

        :source ->
          %{state | sources: [component | state.sources]}

        :sink ->
          %{state | sinks: [component | state.sinks]}

        :pipeline ->
          %{state | pipelines: [component | state.pipelines]}
      end

    {:reply, :ok, state}
  end

  def handle_call(:run, _from, %__MODULE__{streams: streams} = state) do
    streams
    |> Enum.map(fn stream ->
      Task.async(fn ->
        Stream.run(stream)
      end)
    end)
    |> Enum.map(&Task.await(&1, :infinity))

    {:reply, :ok, state}
  end

  def handle_call(:stop, _from, %__MODULE__{} = state) do
    Enum.each(state.sources, & &1.__struct__.stop(&1))
    Enum.each(state.sinks, & &1.__struct__.stop(&1))
    Enum.each(state.pipelines, & &1.stop/0)
    Enum.each(state.mixers, & &1.__struct__.stop(&1))
    Enum.each(state.splitters, & &1.__struct__.stop(&1))

    {:stop, :normal, :ok, state}
  end
end