defmodule Strom.Transformer do
@moduledoc """
Transforms a stream or several streams.
It works as Stream.map/2 or Stream.transform/3.
## `map` example:
iex> alias Strom.Transformer
iex> transformer = :numbers |> Transformer.new(&(&1*2)) |> Transformer.start()
iex> flow = %{numbers: [1, 2, 3]}
iex> %{numbers: stream} = Transformer.call(flow, transformer)
iex> Enum.to_list(stream)
[2, 4, 6]
## `transform` example:
iex> alias Strom.Transformer
iex> fun = fn el, acc -> {[el, acc], acc + 10} end
iex> transformer = :numbers |> Transformer.new(fun, 10) |> Transformer.start()
iex> flow = %{numbers: [1, 2, 3]}
iex> %{numbers: stream} = Transformer.call(flow, transformer)
iex> Enum.to_list(stream)
[1, 10, 2, 20, 3, 30]
## it can be applied to several streams:
iex> alias Strom.Transformer
iex> transformer = [:s1, :s2] |> Transformer.new(&(&1*2)) |> Transformer.start()
iex> flow = %{s1: [1, 2, 3], s2: [4, 5, 6]}
iex> %{s1: s1, s2: s2} = Transformer.call(flow, transformer)
iex> {Enum.to_list(s1), Enum.to_list(s2)}
{[2, 4, 6], [8, 10, 12]}
"""
use GenServer
@chunk 1
@buffer 1000
defstruct pid: nil,
opts: [],
chunk: @chunk,
buffer: @buffer,
input_streams: %{},
function: nil,
acc: nil,
names: [],
tasks: %{},
data: %{},
waiting_clients: %{}
@type t() :: %__MODULE__{}
@type event() :: any()
@type acc() :: any()
@type func() ::
(event() -> event())
| (event(), acc() -> {[event()], acc()})
@spec new(Strom.stream_name(), func(), acc(), list()) :: __MODULE__.t()
def new(names, function, acc \\ nil, opts \\ []) when is_function(function) and is_list(opts) do
%__MODULE__{
function: function,
acc: acc,
names: names,
opts: opts
}
end
@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{opts: opts} = transformer) do
transformer = %{
transformer
| chunk: Keyword.get(opts, :chunk, @chunk),
buffer: Keyword.get(opts, :buffer, @buffer)
}
{:ok, pid} = start_link(transformer)
__state__(pid)
end
def start_link(%__MODULE__{} = transformer) do
GenServer.start_link(__MODULE__, transformer)
end
@impl true
def init(%__MODULE__{} = transformer) do
function =
if is_function(transformer.function, 1) do
fn el, nil -> {[transformer.function.(el)], nil} end
else
transformer.function
end
{:ok, %{transformer | pid: self(), function: function}}
end
@spec call(Strom.flow(), __MODULE__.t()) :: Strom.flow()
def call(flow, %__MODULE__{names: names, function: function} = transformer)
when is_map(flow) and is_function(function, 2) do
names = if is_list(names), do: names, else: [names]
input_streams =
Enum.reduce(names, %{}, fn name, streams ->
Map.put(streams, name, Map.fetch!(flow, name))
end)
:ok = GenServer.call(transformer.pid, {:run_inputs, input_streams})
sub_flow =
names
|> Enum.reduce(%{}, fn name, flow ->
stream =
Stream.resource(
fn ->
nil
end,
fn nil ->
case GenServer.call(transformer.pid, {:get_data, name}, :infinity) do
{:data, data} ->
{data, nil}
:done ->
{:halt, nil}
:pause ->
receive do
:continue_client ->
{[], nil}
end
end
end,
fn nil -> nil end
)
Map.put(flow, name, stream)
end)
flow
|> Map.drop(names)
|> Map.merge(sub_flow)
end
@spec stop(__MODULE__.t()) :: :ok
def stop(%__MODULE__{pid: pid}) do
GenServer.call(pid, :stop)
end
def __state__(pid) when is_pid(pid), do: GenServer.call(pid, :__state__)
defp run_inputs(streams, transformer) do
Enum.reduce(streams, %{}, fn {name, stream}, streams_acc ->
task = async_run_stream({name, stream}, transformer)
Map.put(streams_acc, name, task)
end)
end
defp async_run_stream({name, stream}, transformer) do
Task.Supervisor.async_nolink(Strom.TaskSupervisor, fn ->
stream
|> Stream.chunk_every(transformer.chunk)
|> Stream.transform(transformer.acc, fn chunk, acc ->
{chunk, new_acc} =
Enum.reduce(chunk, {[], acc}, fn el, {events, acc} ->
{new_events, acc} = transformer.function.(el, acc)
{events ++ new_events, acc}
end)
GenServer.cast(transformer.pid, {:new_data, name, chunk})
receive do
:continue_task ->
flush(:continue_task)
end
{[], new_acc}
end)
|> Stream.run()
{:task_done, name}
end)
end
defp flush(message) do
receive do
^message ->
flush(message)
after
0 -> :ok
end
end
@impl true
def handle_call({:run_inputs, streams_to_call}, _from, %__MODULE__{} = transformer) do
tasks = run_inputs(streams_to_call, transformer)
{:reply, :ok, %{transformer | tasks: tasks, input_streams: streams_to_call}}
end
def handle_call({:get_data, name}, {pid, _ref}, transformer) do
if task = transformer.tasks[name] do
send(task.pid, :continue_task)
end
data = Map.get(transformer.data, name, [])
cond do
length(data) == 0 and is_nil(transformer.tasks[name]) ->
{:reply, :done, transformer}
length(data) == 0 ->
waiting_clients = Map.put(transformer.waiting_clients, name, pid)
{:reply, :pause, %{transformer | waiting_clients: waiting_clients}}
true ->
transformer = %{transformer | data: Map.put(transformer.data, name, [])}
{:reply, {:data, data}, transformer}
end
end
def handle_call(:stop, _from, %__MODULE__{} = transformer) do
{:stop, :normal, :ok, transformer}
end
def handle_call(:__state__, _from, transformer), do: {:reply, transformer, transformer}
@impl true
def handle_cast({:new_data, name, chunk}, %__MODULE__{} = transformer) do
prev_data = Map.get(transformer.data, name, [])
all_data = prev_data ++ chunk
waiting_clients = continue_waiting_client(transformer.waiting_clients, name)
if length(all_data) < transformer.buffer do
task = Map.fetch!(transformer.tasks, name)
send(task.pid, :continue_task)
end
new_data = Map.put(transformer.data, name, all_data)
transformer = %{transformer | data: new_data, waiting_clients: waiting_clients}
{:noreply, transformer}
end
@impl true
def handle_info({_task_ref, {:task_done, name}}, transformer) do
tasks = Map.delete(transformer.tasks, name)
waiting_clients = continue_waiting_client(transformer.waiting_clients, name)
{:noreply, %{transformer | waiting_clients: waiting_clients, tasks: tasks}}
end
def handle_info({:DOWN, _task_ref, :process, _task_pid, :normal}, transformer) do
# do nothing for now
{:noreply, transformer}
end
def handle_info({:DOWN, _task_ref, :process, task_pid, _not_normal}, transformer) do
{name, _task} = Enum.find(transformer.tasks, fn {_name, task} -> task.pid == task_pid end)
stream = Map.fetch!(transformer.input_streams, name)
new_task = async_run_stream({name, stream}, transformer)
tasks = Map.put(transformer.tasks, name, new_task)
{:noreply, %{transformer | tasks: tasks}}
end
defp continue_waiting_client(waiting_clients, name) do
case waiting_clients[name] do
nil ->
waiting_clients
client_pid when is_pid(client_pid) ->
send(client_pid, :continue_client)
Map.delete(waiting_clients, name)
end
end
end