lib/topology.ex

defmodule Strom.Topology do
  defstruct pid: nil,
            module: nil,
            opts: [],
            components: []

  use GenServer

  alias Strom.Composite

  def start(topology_module, opts \\ []) when is_atom(topology_module) do
    strom_sup_pid = Process.whereis(Strom.DynamicSupervisor)

    pid =
      case DynamicSupervisor.start_child(
             strom_sup_pid,
             %{
               id: __MODULE__,
               start:
                 {__MODULE__, :start_link, [%__MODULE__{module: topology_module, opts: opts}]},
               restart: :transient
             }
           ) do
        {:ok, pid} ->
          pid

        {:error, {:already_started, pid}} ->
          pid
      end

    __state__(pid)
  end

  def start_link(%__MODULE__{} = topology) do
    GenServer.start_link(__MODULE__, topology, name: topology.module)
  end

  @impl true
  def init(%__MODULE__{module: module} = topology) do
    components =
      topology.opts
      |> module.topology()
      |> List.flatten()
      |> Composite.build()

    Enum.each(components, &Process.link(&1.pid))

    {:ok, %{topology | pid: self(), components: components}}
  end

  def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

  def info(module), do: GenServer.call(module, :__state__)

  def components(module), do: GenServer.call(module, :components)

  def call(flow, module), do: GenServer.call(module, {:call, flow}, :infinity)

  def stop(module) when is_atom(module), do: GenServer.call(module, :stop)

  @impl true
  def handle_call(:__state__, _from, topology), do: {:reply, topology, topology}

  def handle_call(:components, _from, topology), do: {:reply, topology.components, topology}

  def handle_call({:call, init_flow}, _from, %__MODULE__{} = topology) do
    flow = Composite.reduce_flow(topology.components, init_flow)

    {:reply, flow, topology}
  end

  def handle_call(:stop, _from, %__MODULE__{} = topology) do
    Composite.stop_components(topology.components)

    {:stop, :normal, :ok, topology}
  end

  @impl true
  def handle_info(:continue, topology) do
    {:noreply, topology}
  end

  def handle_info({_task_ref, :ok}, topology) do
    # do nothing for now
    {:noreply, topology}
  end

  def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, topology) do
    # do nothing for now
    {:noreply, topology}
  end

  defmacro __using__(_opts) do
    quote do
      import Strom.DSL

      def start(opts \\ []) do
        Strom.Topology.start(__MODULE__, opts)
      end

      def call(flow) when is_map(flow) do
        Strom.Topology.call(flow, __MODULE__)
      end

      def stop do
        Strom.Topology.stop(__MODULE__)
      end

      def info do
        Strom.Topology.info(__MODULE__)
      end

      def components do
        Strom.Topology.components(__MODULE__)
      end
    end
  end
end