lib/composite.ex

defmodule Strom.Composite do
  @moduledoc """
  Runs a set of components and is a component itself, meaning that a composite has the same interface - it accepts flow as input and returns a modified flow.

      ## Example
      iex> alias Strom.{Composite, Transformer, Splitter, Source, Sink}
      iex> transformer = Transformer.new(:s, &(&1 + 1))
      iex> splitter = Splitter.new(:s, %{odd: &(rem(&1, 2) == 1), even: &(rem(&1, 2) == 0)})
      iex> composite = [transformer, splitter] |> Composite.new() |> Composite.start()
      iex> source = :s |> Source.new([1, 2, 3]) |> Source.start()
      iex> %{odd: odd, even: even} = %{} |> Source.call(source) |> Composite.call(composite)
      iex> {Enum.to_list(odd), Enum.to_list(even)}
      {[3], [2, 4]}

      ## Composites can be created from other composites
      iex> alias Strom.{Composite, Transformer, Splitter, Source, Sink}
      iex> transformer = Transformer.new(:s, &(&1 + 1))
      iex> splitter = Splitter.new(:s, %{odd: &(rem(&1, 2) == 1), even: &(rem(&1, 2) == 0)})
      iex> c1 = Composite.new([transformer])
      iex> c2 = Composite.new([splitter])
      iex> source = Source.new(:s, [1, 2, 3])
      iex> composite = [source, c1, c2] |> Composite.new() |> Composite.start()
      iex> %{odd: odd, even: even} = %{} |> Composite.call(composite)
      iex> {Enum.to_list(odd), Enum.to_list(even)}
      {[3], [2, 4]}
  """

  defstruct pid: nil,
            components: []

  use GenServer

  @type t :: %__MODULE__{}

  @spec new([struct()]) :: __MODULE__.t()
  def new(components) when is_list(components) do
    components =
      Enum.reduce(components, [], fn component, acc ->
        case component do
          %__MODULE__{components: components} ->
            acc ++ components

          component ->
            acc ++ [component]
        end
      end)

    %__MODULE__{components: components}
  end

  @spec start(__MODULE__.t()) :: __MODULE__.t()
  def start(%__MODULE__{} = composite) do
    {:ok, pid} = GenServer.start_link(__MODULE__, composite)
    __state__(pid)
  end

  @impl true
  def init(%__MODULE__{components: components} = composite) do
    components =
      components
      |> List.flatten()
      |> build()

    {:ok, %{composite | pid: self(), components: components}}
  end

  def build(components) do
    components
    |> Enum.map(fn component ->
      case component do
        %Strom.Source{} = source ->
          Strom.Source.start(source)

        %Strom.Sink{} = sink ->
          Strom.Sink.start(sink)

        %Strom.Mixer{} = mixer ->
          Strom.Mixer.start(mixer)

        %Strom.Splitter{} = splitter ->
          Strom.Splitter.start(splitter)

        %Strom.Transformer{} = transformer ->
          Strom.Transformer.start(transformer)

        %Strom.Renamer{} = renamer ->
          Strom.Renamer.start(renamer)
      end
    end)
  end

  @spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
  def call(flow, %__MODULE__{pid: pid}), do: GenServer.call(pid, {:call, flow}, :infinity)

  @spec stop(__MODULE__.t()) :: :ok
  def stop(%__MODULE__{pid: pid}), do: GenServer.call(pid, :stop)

  def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

  @impl true
  def handle_call(:__state__, _from, composite), do: {:reply, composite, composite}

  def handle_call({:call, init_flow}, _from, %__MODULE__{} = composite) do
    flow = reduce_flow(composite.components, init_flow)

    {:reply, flow, composite}
  end

  def handle_call(:stop, _from, %__MODULE__{components: components} = composite) do
    stop_components(components)

    {:stop, :normal, :ok, composite}
  end

  def reduce_flow(components, init_flow) do
    Enum.reduce(components, init_flow, fn component, flow ->
      case component do
        %Strom.Source{} = source ->
          Strom.Source.call(flow, source)

        %Strom.Sink{} = sink ->
          Strom.Sink.call(flow, sink)

        %Strom.Mixer{} = mixer ->
          Strom.Mixer.call(flow, mixer)

        %Strom.Splitter{} = splitter ->
          Strom.Splitter.call(flow, splitter)

        %Strom.Transformer{} = transformer ->
          Strom.Transformer.call(flow, transformer)

        %Strom.Renamer{} = renamer ->
          Strom.Renamer.call(flow, renamer)
      end
    end)
  end

  def stop_components(components) do
    Enum.each(components, fn component ->
      case component do
        %Strom.Source{} = source ->
          Strom.Source.stop(source)

        %Strom.Sink{} = sink ->
          Strom.Sink.stop(sink)

        %Strom.Mixer{} = mixer ->
          Strom.Mixer.stop(mixer)

        %Strom.Splitter{} = splitter ->
          Strom.Splitter.stop(splitter)

        %Strom.Transformer{} = transformer ->
          Strom.Transformer.stop(transformer)

        %Strom.Renamer{} = renamer ->
          Strom.Renamer.stop(renamer)
      end
    end)
  end

  @impl true
  def handle_info(:continue, composite) do
    {:noreply, composite}
  end

  def handle_info({_task_ref, :ok}, composite) do
    # do nothing for now
    {:noreply, composite}
  end

  def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, composite) do
    # do nothing for now
    {:noreply, composite}
  end
end