lib/transformer.ex

defmodule Strom.Transformer do
  @moduledoc """
  Transforms a stream or several streams.
  It works as Stream.map/2 or Stream.transform/3.

      ## `map` example:
      iex> alias Strom.Transformer
      iex> transformer = :numbers |> Transformer.new(&(&1*2)) |> Transformer.start()
      iex> flow = %{numbers: [1, 2, 3]}
      iex> %{numbers: stream} = Transformer.call(flow, transformer)
      iex> Enum.to_list(stream)
      [2, 4, 6]

      ## `reduce` example:
      iex> alias Strom.Transformer
      iex> fun = fn el, acc -> {[el, acc], acc + 10} end
      iex> transformer = :numbers |> Transformer.new(fun, 10) |> Transformer.start()
      iex> flow = %{numbers: [1, 2, 3]}
      iex> %{numbers: stream} = Transformer.call(flow, transformer)
      iex> Enum.to_list(stream)
      [1, 10, 2, 20, 3, 30]

      ## it can be applied to several streams:
      iex> alias Strom.Transformer
      iex> transformer = [:s1, :s2] |> Transformer.new(&(&1*2)) |> Transformer.start()
      iex> flow = %{s1: [1, 2, 3], s2: [4, 5, 6]}
      iex> %{s1: s1, s2: s2} = Transformer.call(flow, transformer)
      iex> {Enum.to_list(s1), Enum.to_list(s2)}
      {[2, 4, 6], [8, 10, 12]}
  """

  alias Strom.GenMix

  defstruct pid: nil,
            composite: nil,
            inputs: [],
            outputs: %{},
            acc: nil,
            opts: []

  @type t() :: %__MODULE__{}
  @type event() :: any()
  @type acc() :: any()
  @type func() ::
          (event() -> event())
          | (event(), acc() -> {[event()], acc()})

  @spec new(Strom.stream_name(), func(), acc(), list()) :: __MODULE__.t()
  def new(names, function, acc \\ nil, opts \\ [])
      when (is_atom(names) or is_list(names)) and is_function(function) and is_list(opts) do
    inputs = if is_list(names), do: names, else: [names]

    function =
      if is_function(function, 1) do
        fn el, nil -> {[function.(el)], nil} end
      else
        function
      end

    outputs =
      Enum.reduce(inputs, %{}, fn name, out ->
        Map.put(out, name, fn el, acc -> function.(el, acc) end)
      end)

    %__MODULE__{inputs: inputs, outputs: outputs, opts: opts, acc: acc}
  end

  @spec start(__MODULE__.t()) :: __MODULE__.t()
  def start(
        %__MODULE__{inputs: inputs, outputs: outputs, acc: acc, opts: opts, composite: composite} =
          transformer
      ) do
    gen_mix =
      GenMix.start(%GenMix{
        inputs: inputs,
        outputs: outputs,
        accs: Enum.reduce(inputs, %{}, fn name, accs -> Map.put(accs, name, acc) end),
        opts: opts,
        process_chunk: &process_chunk/4,
        composite: composite
      })

    %{transformer | pid: gen_mix.pid}
  end

  @spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
  def call(flow, %__MODULE__{} = transformer) do
    GenMix.call(flow, transformer)
  end

  @spec process_chunk(atom(), list(), Strom.flow(), any()) :: {Strom.flow(), boolean(), any()}
  def process_chunk(input_stream_name, chunk, outputs, acc) do
    output_function = Map.get(outputs, input_stream_name)

    {chunk, new_acc} =
      Enum.flat_map_reduce(chunk, acc, fn el, acc ->
        output_function.(el, acc)
      end)

    {%{input_stream_name => chunk}, Enum.any?(chunk), new_acc}
  end

  @spec stop(__MODULE__.t()) :: :ok
  def stop(%__MODULE__{} = transformer), do: GenMix.stop(transformer)
end