lib/transformer.ex

defmodule Strom.Transformer do
  @moduledoc """
  Transforms a stream or several streams.
  It works as Stream.map/2 or Stream.transform/3.

      ## `map` example:
      iex> alias Strom.Transformer
      iex> transformer = :numbers |> Transformer.new(&(&1*2)) |> Transformer.start()
      iex> flow = %{numbers: [1, 2, 3]}
      iex> %{numbers: stream} = Transformer.call(flow, transformer)
      iex> Enum.to_list(stream)
      [2, 4, 6]

      ## `transform` example:
      iex> alias Strom.Transformer
      iex> fun = fn el, acc -> {[el, acc], acc + 10} end
      iex> transformer = :numbers |> Transformer.new(fun, 10) |> Transformer.start()
      iex> flow = %{numbers: [1, 2, 3]}
      iex> %{numbers: stream} = Transformer.call(flow, transformer)
      iex> Enum.to_list(stream)
      [1, 10, 2, 20, 3, 30]

      ## it can be applied to several streams:
      iex> alias Strom.Transformer
      iex> transformer = [:s1, :s2] |> Transformer.new(&(&1*2)) |> Transformer.start()
      iex> flow = %{s1: [1, 2, 3], s2: [4, 5, 6]}
      iex> %{s1: s1, s2: s2} = Transformer.call(flow, transformer)
      iex> {Enum.to_list(s1), Enum.to_list(s2)}
      {[2, 4, 6], [8, 10, 12]}
  """

  use GenServer

  @buffer 1000

  defstruct pid: nil,
            running: false,
            opts: [],
            buffer: @buffer,
            function: nil,
            acc: nil,
            names: [],
            tasks: %{},
            data: %{}

  @type t() :: %__MODULE__{}
  @type event() :: any()
  @type acc() :: any()

  @type func() ::
          (event() -> event())
          | (event(), acc() -> {[event()], acc()})

  @spec new(Strom.stream_name(), func(), acc(), list()) :: __MODULE__.t()
  def new(names, function, acc \\ nil, opts \\ []) when is_function(function) and is_list(opts) do
    %__MODULE__{
      function: function,
      acc: acc,
      names: names,
      opts: opts
    }
  end

  @spec start(__MODULE__.t()) :: __MODULE__.t()
  def start(%__MODULE__{opts: opts} = transformer) do
    transformer = %{transformer | buffer: Keyword.get(opts, :buffer, @buffer)}

    {:ok, pid} = start_link(transformer)
    __state__(pid)
  end

  def start_link(%__MODULE__{} = transformer) do
    GenServer.start_link(__MODULE__, transformer)
  end

  @impl true
  def init(%__MODULE__{} = call) do
    {:ok, %{call | pid: self()}}
  end

  @spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
  def call(flow, %__MODULE__{names: names, function: function, acc: acc} = transformer)
      when is_map(flow) and is_function(function, 2) do
    names = if is_list(names), do: names, else: [names]

    input_streams =
      Enum.reduce(names, %{}, fn name, streams ->
        Map.put(streams, {name, function, acc}, Map.fetch!(flow, name))
      end)

    :ok = GenServer.call(transformer.pid, {:run_inputs, input_streams})

    sub_flow =
      names
      |> Enum.reduce(%{}, fn name, flow ->
        stream =
          Stream.resource(
            fn ->
              nil
            end,
            fn nil ->
              case GenServer.call(transformer.pid, {:get_data, name}, :infinity) do
                {:ok, data} ->
                  if length(data) == 0 do
                    receive do
                      :continue ->
                        flush()
                    end
                  end

                  {data, nil}

                {:error, :done} ->
                  {:halt, nil}
              end
            end,
            fn nil -> nil end
          )

        Map.put(flow, name, stream)
      end)

    flow
    |> Map.drop(names)
    |> Map.merge(sub_flow)
  end

  def call(flow, %__MODULE__{function: function} = transformer)
      when is_map(flow) and is_function(function, 1) do
    fun = fn el, nil -> {[function.(el)], nil} end
    transformer = %{transformer | function: fun}
    call(flow, transformer)
  end

  @spec stop(__MODULE__.t()) :: :ok
  def stop(%__MODULE__{pid: pid}) do
    GenServer.call(pid, :stop)
  end

  def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)

  defp run_inputs(streams, pid, buffer) do
    Enum.reduce(streams, %{}, fn {{name, fun, acc}, stream}, streams_acc ->
      task = async_run_stream({name, fun, acc}, stream, buffer, pid)
      Map.put(streams_acc, name, task)
    end)
  end

  defp async_run_stream({name, fun, acc}, stream, buffer, pid) do
    Task.async(fn ->
      stream
      |> Stream.chunk_every(buffer)
      |> Stream.transform(acc, fn chunk, acc ->
        {chunk, new_acc} =
          Enum.reduce(chunk, {[], acc}, fn el, {events, acc} ->
            {new_events, acc} = fun.(el, acc)
            {events ++ new_events, acc}
          end)

        GenServer.cast(pid, {:new_data, name, chunk})

        receive do
          :continue ->
            flush()
        end

        {[], new_acc}
      end)
      |> Stream.run()

      GenServer.cast(pid, {:done, name})
    end)
  end

  defp flush do
    receive do
      _ -> flush()
    after
      0 -> :ok
    end
  end

  @impl true
  def handle_call({:run_inputs, streams_to_call}, _from, %__MODULE__{} = transformer) do
    tasks = run_inputs(streams_to_call, transformer.pid, transformer.buffer)

    {:reply, :ok, %{transformer | running: true, tasks: tasks}}
  end

  def handle_call({:get_data, name}, {pid, _ref}, transformer) do
    send(pid, :continue)

    data = Map.get(transformer.data, name, [])

    if length(data) == 0 and !transformer.running do
      {:reply, {:error, :done}, transformer}
    else
      transformer = %{transformer | data: Map.put(transformer.data, name, [])}
      {:reply, {:ok, data}, transformer}
    end
  end

  def handle_call(:stop, _from, %__MODULE__{} = transformer) do
    {:stop, :normal, :ok, %{transformer | running: false}}
  end

  def handle_call(:__state__, _from, transformer), do: {:reply, transformer, transformer}

  @impl true
  def handle_cast({:new_data, name, chunk}, %__MODULE__{} = transformer) do
    task = Map.fetch!(transformer.tasks, name)
    send(task.pid, :continue)

    prev_data = Map.get(transformer.data, name, [])
    new_data = Map.put(transformer.data, name, prev_data ++ chunk)
    transformer = %{transformer | data: new_data}

    {:noreply, transformer}
  end

  def handle_cast({:done, name}, %__MODULE__{} = transformer) do
    transformer = %{transformer | tasks: Map.delete(transformer.tasks, name)}
    running = map_size(transformer.tasks) > 0
    {:noreply, %{transformer | running: running}}
  end

  @impl true
  def handle_info({_task_ref, :ok}, transformer) do
    # do nothing for now
    {:noreply, transformer}
  end

  def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, transformer) do
    # do nothing for now
    {:noreply, transformer}
  end
end