defmodule Strom.Composite do
@moduledoc """
Runs a set of components and is a component itself, meaning that a composite has the same interface - it accepts flow as input and returns a modified flow.
## Example
iex> alias Strom.{Composite, Transformer, Splitter, Source, Sink}
iex> transformer = Transformer.new(:s, &(&1 + 1))
iex> splitter = Splitter.new(:s, %{odd: &(rem(&1, 2) == 1), even: &(rem(&1, 2) == 0)})
iex> composite = [transformer, splitter] |> Composite.new() |> Composite.start()
iex> source = :s |> Source.new([1, 2, 3]) |> Source.start()
iex> %{odd: odd, even: even} = %{} |> Source.call(source) |> Composite.call(composite)
iex> {Enum.to_list(odd), Enum.to_list(even)}
{[3], [2, 4]}
## Composites can be created from other composites
iex> alias Strom.{Composite, Transformer, Splitter, Source, Sink}
iex> transformer = Transformer.new(:s, &(&1 + 1))
iex> splitter = Splitter.new(:s, %{odd: &(rem(&1, 2) == 1), even: &(rem(&1, 2) == 0)})
iex> c1 = Composite.new([transformer])
iex> c2 = Composite.new([splitter])
iex> source = Source.new(:s, [1, 2, 3])
iex> composite = [source, c1, c2] |> Composite.new() |> Composite.start()
iex> %{odd: odd, even: even} = %{} |> Composite.call(composite)
iex> {Enum.to_list(odd), Enum.to_list(even)}
{[3], [2, 4]}
"""
defstruct pid: nil,
name: nil,
components: []
use GenServer
alias Strom.Composite.Manipulations
alias Strom.Composite.StartStop
@type t :: %__MODULE__{}
@spec new([struct()]) :: __MODULE__.t()
def new(components, name \\ nil) when is_list(components) do
components =
components
|> List.flatten()
|> Enum.flat_map(fn
%__MODULE__{components: components} -> components
component -> [component]
end)
name = if name, do: name, else: StartStop.generate_name(components)
%__MODULE__{name: name, components: components}
end
@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{} = composite) do
%{composite | pid: StartStop.start(composite)}
end
def supervisor_name(name), do: :"Supervisor_#{name}"
def component_supervisor_name(name), do: :"ComponentSupervisor_#{name}"
def task_supervisor_name(name), do: :"TaskSupervisor_#{name}"
def start_link(%__MODULE__{name: name} = composite) do
GenServer.start_link(__MODULE__, composite, name: name)
end
@impl true
def init(%__MODULE__{} = composite) do
{:ok, %{composite | pid: self()}, {:continue, :start_components}}
end
@impl true
def handle_continue(
:start_components,
%__MODULE__{name: name, components: components} = composite
) do
{:noreply, %{composite | components: StartStop.start_components(components, name)}}
end
def components(%__MODULE__{name: name}) do
GenServer.call(name, :components)
end
@spec call(Strom.flow(), __MODULE__.t() | atom()) :: Strom.flow()
def call(flow, %__MODULE__{name: name}),
do: GenServer.call(name, {:call, flow}, :infinity)
def call(flow, name) when is_atom(name),
do: GenServer.call(name, {:call, flow}, :infinity)
def call_flow(components, init_flow) do
Enum.reduce(components, init_flow, fn %{__struct__: module} = component, flow ->
module.call(flow, component)
end)
end
@spec stop(__MODULE__.t()) :: :ok
def stop(%__MODULE__{} = composite), do: StartStop.stop(composite)
@spec delete(__MODULE__.t(), {integer(), integer()}) :: __MODULE__.t()
def delete(composite, {index_from, index_to}) do
GenServer.call(composite.name, {:delete, index_from, index_to})
end
@spec delete(__MODULE__.t(), integer()) :: __MODULE__.t()
def delete(composite, index) do
delete(composite, {index, index})
end
@spec insert(__MODULE__.t(), integer(), Strom.component()) :: {__MODULE__.t(), Strom.flow()}
def insert(composite, index, new_component) when is_struct(new_component) do
insert(composite, index, [new_component])
end
@spec insert(__MODULE__.t(), integer(), list(Strom.component())) ::
{__MODULE__.t(), Strom.flow()}
def insert(composite, index, new_components) when is_list(new_components) do
GenServer.call(composite.name, {:insert, index, new_components})
end
@spec replace(__MODULE__.t(), integer(), Strom.component()) :: {__MODULE__.t(), Strom.flow()}
def replace(composite, index, new_component)
when is_integer(index) and is_struct(new_component) do
replace(composite, {index, index}, [new_component])
end
@spec replace(__MODULE__.t(), {integer(), integer()}, Strom.component()) ::
{__MODULE__.t(), Strom.flow()}
def replace(composite, {index_from, index_to}, new_component) when is_struct(new_component) do
replace(composite, {index_from, index_to}, [new_component])
end
@spec replace(__MODULE__.t(), {integer(), integer()}, list(Strom.component())) ::
{__MODULE__.t(), Strom.flow()}
def replace(composite, {index_from, index_to}, new_components) when is_list(new_components) do
GenServer.call(composite.name, {:replace, {index_from, index_to}, new_components})
end
@spec replace(__MODULE__.t(), integer(), list(Strom.component())) ::
{__MODULE__.t(), Strom.flow()}
def replace(composite, index, new_components)
when is_integer(index) and is_list(new_components) do
replace(composite, {index, index}, new_components)
end
@impl true
def handle_call({:call, init_flow}, _from, %__MODULE__{} = composite) do
flow = call_flow(composite.components, init_flow)
{:reply, flow, composite}
end
def handle_call(:components, _from, %__MODULE__{components: components} = composite) do
{:reply, components, composite}
end
def handle_call(:stop_components, _from, %__MODULE__{components: components} = composite) do
stop_components(components)
{:reply, :ok, composite}
end
def handle_call(:stop, _from, %__MODULE__{} = composite) do
{:stop, :normal, :ok, composite}
end
def handle_call(
{:delete, index_from, index_to},
_from,
%__MODULE__{components: components, name: name} = composite
) do
{components, _deleted_components, %{}} =
Manipulations.replace(components, index_from, index_to, [], name)
composite = %{composite | components: components}
{:reply, composite, composite}
end
def handle_call(
{:insert, index, new_components},
_from,
%__MODULE__{components: components, name: name} = composite
)
when is_list(new_components) do
{components, [], subflow} = Manipulations.insert(components, index, new_components, name)
composite = %{composite | components: components}
{:reply, {composite, subflow}, composite}
end
def handle_call(
{:replace, {index_from, index_to}, new_components},
_from,
%__MODULE__{components: components, name: name} = composite
)
when is_list(new_components) do
{components, _deleted_components, subflow} =
Manipulations.replace(components, index_from, index_to, new_components, name)
composite = %{composite | components: components}
{:reply, {composite, subflow}, composite}
end
@impl true
def handle_info({:DOWN, ref, :process, _pid, :normal}, composite) do
# component stopped normally
Process.demonitor(ref, [:flush])
{:noreply, composite}
end
def handle_info(
{:DOWN, _ref, :process, pid, _not_normal},
%__MODULE__{components: components} = composite
) do
component = Enum.find(components, fn %{pid: ^pid} -> true end)
Enum.each(components, & &1.__struct__.stop(&1))
{:stop, {:component_crashed, component}, composite}
end
defp stop_components(components) do
Enum.each(components, fn %{__struct__: module} = component ->
module.stop(component)
end)
end
end