lib/topology.ex

defmodule Strom.Topology do
  @moduledoc """
  Runs set of components. Restarts all of them in case of crash.
  Provides `start`, `stop`, `call` function on the module level.
  Starts topology as a process with the topology module name.
  So only one topology with the given name can exist.

      ## Example:
      iex>  defmodule OddEvenTopology do
      ...>    use Strom.Topology
      ...>    alias Strom.Sink.Null
      ...>
      ...>    def topology(_opts) do
      ...>      [
      ...>        Source.new(:s1, [1, 2, 3]),
      ...>        Source.new(:s2, [4, 5, 6]),
      ...>        Mixer.new([:s1, :s2], :s),
      ...>        Transformer.new(:s, &(&1 + 1)),
      ...>        Splitter.new(:s, %{odd: &(rem(&1, 2) == 1), even: &(rem(&1, 2) == 0)}),
      ...>        Sink.new(:odd, Null.new(), true)
      ...>      ]
      ...>    end
      ...>  end
      iex> OddEvenTopology.start()
      iex> %{even: even} = OddEvenTopology.call(%{})
      iex> Enum.sort(Enum.to_list(even))
      [2, 4, 6]
  """

  defstruct pid: nil,
            module: nil,
            opts: [],
            components: []

  use GenServer

  alias Strom.Composite

  def start(topology_module, opts \\ []) when is_atom(topology_module) do
    strom_sup_pid = Process.whereis(Strom.DynamicSupervisor)

    pid =
      case DynamicSupervisor.start_child(
             strom_sup_pid,
             %{
               id: __MODULE__,
               start:
                 {__MODULE__, :start_link, [%__MODULE__{module: topology_module, opts: opts}]},
               restart: :transient
             }
           ) do
        {:ok, pid} ->
          pid

        {:error, {:already_started, pid}} ->
          pid
      end

    __state__(pid)
  end

  def start_link(%__MODULE__{} = topology) do
    GenServer.start_link(__MODULE__, topology, name: topology.module)
  end

  @impl true
  def init(%__MODULE__{module: module} = topology) do
    components =
      topology.opts
      |> module.topology()
      |> List.flatten()
      |> Composite.build()

    Enum.each(components, &Process.link(&1.pid))

    {:ok, %{topology | pid: self(), components: components}}
  end

  def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

  def info(module), do: GenServer.call(module, :__state__)

  def components(module), do: GenServer.call(module, :components)

  def call(flow, module), do: GenServer.call(module, {:call, flow}, :infinity)

  def stop(module) when is_atom(module), do: GenServer.call(module, :stop)

  @impl true
  def handle_call(:__state__, _from, topology), do: {:reply, topology, topology}

  def handle_call(:components, _from, topology), do: {:reply, topology.components, topology}

  def handle_call({:call, init_flow}, _from, %__MODULE__{} = topology) do
    flow = Composite.reduce_flow(topology.components, init_flow)

    {:reply, flow, topology}
  end

  def handle_call(:stop, _from, %__MODULE__{} = topology) do
    Composite.stop_components(topology.components)

    {:stop, :normal, :ok, topology}
  end

  @impl true
  def handle_info(:continue, topology) do
    {:noreply, topology}
  end

  def handle_info({_task_ref, :ok}, topology) do
    # do nothing for now
    {:noreply, topology}
  end

  def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, topology) do
    # do nothing for now
    {:noreply, topology}
  end

  defmacro __using__(_opts) do
    quote do
      import Strom.DSL
      alias Strom.{Composite, Mixer, Sink, Source, Splitter, Transformer, Topology}

      def start(opts \\ []), do: Topology.start(__MODULE__, opts)

      def call(flow) when is_map(flow), do: Topology.call(flow, __MODULE__)

      def stop, do: Topology.stop(__MODULE__)

      def info, do: Topology.info(__MODULE__)

      def components, do: Topology.components(__MODULE__)
    end
  end
end