defmodule Strom.Splitter do
@moduledoc """
Split a stream into several streams by applying given functions on events
## Example
iex> alias Strom.Splitter
iex> outputs = %{s1: &(rem(&1, 2) == 0), s2: &(rem(&1, 2) == 1)}
iex> splitter = :stream |> Splitter.new(outputs) |> Splitter.start()
iex> %{s1: s1, s2: s2} = Splitter.call(%{stream: [1, 2, 3]}, splitter)
iex> {Enum.to_list(s1), Enum.to_list(s2)}
{[2], [1, 3]}
## Can also just duplicate a stream
iex> alias Strom.Splitter
iex> splitter = :stream |> Splitter.new([:s1, :s2]) |> Splitter.start()
iex> %{s1: s1, s2: s2} = Splitter.call(%{stream: [1, 2, 3]}, splitter)
iex> {Enum.to_list(s1), Enum.to_list(s2)}
{[1, 2, 3], [1, 2, 3]}
"""
alias Strom.GenMix
defstruct pid: nil,
composite: nil,
inputs: %{},
outputs: %{},
opts: []
@type t() :: %__MODULE__{}
@type event() :: any()
@spec new(
Strom.stream_name(),
[Strom.stream_name()] | %{Strom.stream_name() => (event() -> as_boolean(any))},
list()
) :: __MODULE__.t()
def new(input, outputs, opts \\ [])
def new(input, outputs, opts) when is_list(outputs) and is_list(opts) do
outputs =
Enum.reduce(outputs, %{}, fn name, acc ->
Map.put(acc, name, fn _el -> true end)
end)
%__MODULE__{inputs: [input], outputs: outputs, opts: opts}
end
def new(input, outputs, opts)
when is_map(outputs) and map_size(outputs) > 0 and is_list(opts) do
%__MODULE__{inputs: [input], outputs: outputs, opts: opts}
end
@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(
%__MODULE__{inputs: inputs, outputs: outputs, opts: opts, composite: composite} = splitter
) do
gen_mix =
GenMix.start(%GenMix{
inputs: inputs,
outputs: outputs,
opts: opts,
process_chunk: &process_chunk/4,
composite: composite
})
%{splitter | pid: gen_mix.pid}
end
@spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
def call(flow, %__MODULE__{} = splitter) do
GenMix.call(flow, splitter)
end
@spec process_chunk(atom(), list(), Strom.flow(), nil) :: {Strom.flow(), boolean(), nil}
def process_chunk(_input_stream_name, chunk, outputs, nil) do
outputs
|> Enum.reduce({%{}, false, nil}, fn {stream_name, fun}, {acc, any?, nil} ->
{data, _} = Enum.split_with(chunk, fun)
{Map.put(acc, stream_name, data), any? || Enum.any?(data), nil}
end)
end
@spec stop(__MODULE__.t()) :: :ok
def stop(%__MODULE__{} = splitter), do: GenMix.stop(splitter)
end