lib/mixer.ex

defmodule Strom.Mixer do
  @moduledoc """
  Mix several streams into one. Use Strom.GenMix under the hood

      ## Example
      iex> alias Strom.Mixer
      iex> mixer = [:s1, :s2] |> Mixer.new(:stream) |> Mixer.start()
      iex> flow = %{s1: [1, 2, 3], s2: [4, 5, 6]}
      iex> %{stream: stream} = Mixer.call(flow, mixer)
      iex> stream |> Enum.to_list() |> Enum.sort()
      [1, 2, 3, 4, 5, 6]

      ## Can also accept a map with functions as values. Works like "filter".
      iex> alias Strom.Mixer
      iex> inputs = %{s1: &(rem(&1, 2) == 0), s2: &(rem(&1, 2) == 1)}
      iex> mixer = inputs |> Mixer.new(:stream) |> Mixer.start()
      iex> flow = %{s1: [1, 2, 3], s2: [4, 5, 6]}
      iex> %{stream: stream} = Mixer.call(flow, mixer)
      iex> stream |> Enum.to_list() |> Enum.sort()
      [2, 5]
  """
  alias Strom.GenMix

  defstruct pid: nil,
            inputs: [],
            output: nil,
            opts: []

  @type t() :: %__MODULE__{}
  @type event() :: any()

  @spec new(
          [Strom.stream_name()] | %{Strom.stream_name() => (event() -> as_boolean(any))},
          Strom.stream_name(),
          list()
        ) :: __MODULE__.t()
  def new(inputs, output, opts \\ [])
      when is_list(inputs) or (is_map(inputs) and map_size(inputs) > 0 and is_list(opts)) do
    %__MODULE__{inputs: inputs, output: output}
  end

  @spec start(__MODULE__.t()) :: __MODULE__.t()
  def start(%__MODULE__{inputs: inputs, output: output, opts: opts} = mixer) do
    inputs =
      if is_list(inputs) do
        Enum.reduce(inputs, %{}, fn name, acc ->
          Map.put(acc, name, fn _el -> true end)
        end)
      else
        inputs
      end

    outputs = %{output => fn _el -> true end}

    gen_mix = %GenMix{
      inputs: inputs,
      outputs: outputs,
      opts: opts
    }

    {:ok, pid} = GenMix.start(gen_mix)
    %{mixer | pid: pid}
  end

  @spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
  def call(flow, %__MODULE__{pid: pid}) do
    GenMix.call(flow, pid)
  end

  @spec stop(__MODULE__.t()) :: :ok
  def stop(%__MODULE__{pid: pid}), do: GenMix.stop(pid)
end