lib/flow.ex

defmodule Strom.Flow do
  defstruct pid: nil,
            name: nil,
            module: nil,
            sup_pid: nil,
            opts: [],
            topology: []

  use GenServer
  alias Strom.DSL
  alias Strom.FlowSupervisor

  @type t :: %__MODULE__{}

  def start(flow_module, opts \\ []) when is_atom(flow_module) do
    strom_sup_pid = Process.whereis(Strom.DynamicSupervisor)

    pid =
      case DynamicSupervisor.start_child(
             strom_sup_pid,
             %{
               id: __MODULE__,
               start:
                 {__MODULE__, :start_link,
                  [%__MODULE__{name: flow_module, module: flow_module, opts: opts}]},
               restart: :transient
             }
           ) do
        {:ok, pid} ->
          pid

        {:error, {:already_started, pid}} ->
          pid
      end

    __state__(pid)
  end

  def start_link(%__MODULE__{} = flow) do
    GenServer.start_link(__MODULE__, flow, name: flow.name)
  end

  @impl true
  def init(%__MODULE__{module: module} = flow) do
    sup_pid = start_flow_supervisor(flow.name)

    topology =
      flow.opts
      |> module.topology()
      |> List.flatten()
      |> build(self(), sup_pid)

    {:ok, %{flow | pid: self(), sup_pid: sup_pid, topology: topology}}
  end

  defp start_flow_supervisor(name) do
    sup_pid =
      case FlowSupervisor.start_link(%{name: :"#{name}_Supervisor"}) do
        {:ok, pid} -> pid
        {:error, {:already_started, pid}} -> pid
      end

    Process.unlink(sup_pid)
    Process.monitor(sup_pid)
    sup_pid
  end

  defp build(components, flow_pid, sup_pid) do
    components
    |> Enum.map(fn component ->
      case component do
        %DSL.Source{origin: origin} = source ->
          src = %Strom.Source{origin: origin, flow_pid: flow_pid, sup_pid: sup_pid}
          %{source | source: Strom.Source.start(src)}

        %DSL.Sink{origin: origin} = sink ->
          snk = %Strom.Sink{origin: origin, flow_pid: flow_pid, sup_pid: sup_pid}
          %{sink | sink: Strom.Sink.start(snk)}

        %DSL.Mix{opts: opts} = mix ->
          mixer = %Strom.Mixer{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
          %{mix | mixer: Strom.Mixer.start(mixer)}

        %DSL.Split{opts: opts} = split ->
          splitter = %Strom.Splitter{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
          %{split | splitter: Strom.Splitter.start(splitter)}

        %DSL.Transform{opts: opts} = transform when is_list(opts) ->
          transformer = %Strom.Transformer{opts: opts, flow_pid: flow_pid, sup_pid: sup_pid}
          %{transform | transformer: Strom.Transformer.start(transformer)}

        %DSL.Rename{names: names} = ren ->
          rename = Strom.Renamer.start(names)
          %{ren | rename: rename}
      end
    end)
  end

  def info(flow_module), do: GenServer.call(flow_module, :info)

  def call(flow_module, flow), do: GenServer.call(flow_module, {:call, flow}, :infinity)

  def stop(flow_module) when is_atom(flow_module), do: GenServer.call(flow_module, :stop)

  def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

  @impl true
  def handle_call(:info, _from, state), do: {:reply, state.topology, state}

  def handle_call(:__state__, _from, state), do: {:reply, state, state}

  def handle_call({:call, init_flow}, _from, %__MODULE__{} = state) do
    flow =
      state.topology
      |> Enum.reduce(init_flow, fn component, flow ->
        case component do
          %DSL.Source{source: source, names: names} ->
            Strom.Source.call(flow, source, names)

          %DSL.Sink{sink: sink, names: names, sync: sync} ->
            Strom.Sink.call(flow, sink, names, sync)

          %DSL.Mix{mixer: mixer, inputs: inputs, output: output} ->
            Strom.Mixer.call(flow, mixer, inputs, output)

          %DSL.Split{splitter: splitter, input: input, partitions: partitions} ->
            Strom.Splitter.call(flow, splitter, input, partitions)

          %DSL.Transform{transformer: transformer, function: function, acc: acc, inputs: inputs} ->
            if is_function(function, 1) do
              Strom.Transformer.call(flow, transformer, inputs, function)
            else
              Strom.Transformer.call(flow, transformer, inputs, {function, acc})
            end

          %DSL.Rename{rename: rename, names: names} ->
            Strom.Renamer.call(flow, rename, names)
        end
      end)

    {:reply, flow, state}
  end

  def handle_call(:stop, _from, %__MODULE__{} = flow) do
    flow.topology
    |> Enum.each(fn component ->
      case component do
        %DSL.Source{source: source} ->
          Strom.Source.stop(source)

        %DSL.Sink{sink: sink} ->
          Strom.Sink.stop(sink)

        %DSL.Mix{mixer: mixer} ->
          Strom.Mixer.stop(mixer)

        %DSL.Split{splitter: splitter} ->
          Strom.Splitter.stop(splitter)

        %DSL.Transform{transformer: transformer} ->
          Strom.Transformer.stop(transformer)
      end
    end)

    Supervisor.stop(flow.sup_pid)
    {:stop, :normal, :ok, flow}
  end

  @impl true
  def handle_info(:continue, flow) do
    {:noreply, flow}
  end

  def handle_info({_task_ref, :ok}, flow) do
    # do nothing for now
    {:noreply, flow}
  end

  def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, flow) do
    # do nothing for now
    {:noreply, flow}
  end
end