lib/flow.ex

defmodule Strom.Flow do
  defstruct pid: nil,
            name: nil,
            streams: [],
            modules: [],
            sources: [],
            sinks: [],
            mixers: [],
            splitters: []

  use GenServer

  @type t :: %__MODULE__{}

  # TODO Supervisor
  def start(flow_module, _opts \\ []) when is_atom(flow_module) do
    state = %__MODULE__{name: flow_module}

    {:ok, pid} = GenServer.start_link(__MODULE__, state, name: flow_module)

    Strom.Builder.build(flow_module.topology(), pid)
    __state__(pid)
  end

  @impl true
  def init(%__MODULE__{} = state) do
    {:ok, %{state | pid: self()}}
  end

  def add_stream(pid, stream), do: GenServer.call(pid, {:add_stream, stream})

  def add_component(pid, component),
    do: GenServer.call(pid, {:add_component, component})

  def run(flow_module), do: GenServer.call(flow_module, :run)

  def stop(flow_module) when is_atom(flow_module), do: GenServer.call(flow_module, :stop)

  def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

  @impl true
  def handle_call(:__state__, _from, state), do: {:reply, state, state}

  def handle_call({:add_stream, stream}, _from, %__MODULE__{streams: streams} = state) do
    state = %{state | streams: [stream | streams]}
    {:reply, :ok, state}
  end

  def handle_call({:add_component, component}, _from, %__MODULE__{} = state) do
    state =
      case component do
        %Strom.Mixer{} ->
          %{state | mixers: [component | state.mixers]}

        %Strom.Splitter{} ->
          %{state | splitters: [component | state.splitters]}

        %Strom.Source{} ->
          %{state | sources: [component | state.sources]}

        %Strom.Sink{} ->
          %{state | sinks: [component | state.sinks]}

        %Strom.DSL.Module{} ->
          %{state | modules: [component | state.modules]}
      end

    {:reply, :ok, state}
  end

  def handle_call(:run, _from, %__MODULE__{streams: streams} = state) do
    streams
    |> Enum.map(fn stream ->
      Task.async(fn ->
        Stream.run(stream)
      end)
    end)
    |> Enum.map(&Task.await(&1, :infinity))

    {:reply, :ok, state}
  end

  def handle_call(:stop, _from, %__MODULE__{} = state) do
    Enum.each(state.sources, & &1.__struct__.stop(&1))
    Enum.each(state.sinks, & &1.__struct__.stop(&1))
    Enum.each(state.mixers, & &1.__struct__.stop(&1))
    Enum.each(state.splitters, & &1.__struct__.stop(&1))

    Enum.each(state.modules, fn %{module: module, state: state} ->
      if Strom.DSL.Module.is_pipeline_module?(module) do
        apply(module, :stop, [])
      else
        apply(module, :stop, [state])
      end
    end)

    {:stop, :normal, :ok, state}
  end
end