defmodule Strom.GenMix do
@moduledoc """
Generic functionality used by other components.
"""
use GenServer
@chunk 1
@buffer 1000
defstruct pid: nil,
composite: nil,
process_chunk: nil,
inputs: [],
outputs: %{},
accs: %{},
opts: [],
chunk: @chunk,
buffer: @buffer,
no_wait: false,
input_streams: %{},
tasks: %{},
clients: %{},
tasks_started: false,
tasks_run: false,
stopping: false,
before_stop: nil,
data: %{},
data_size: 0,
waiting_tasks: %{},
waiting_clients: %{}
alias Strom.Composite
alias Strom.GenMix.Streams
alias Strom.GenMix.Tasks
@type t() :: %__MODULE__{}
@spec start(__MODULE__.t()) :: __MODULE__.t()
def start(%__MODULE__{process_chunk: process_chunk, opts: opts, composite: composite} = gm)
when is_list(opts) do
gm = %{
gm
| process_chunk: if(process_chunk, do: process_chunk, else: &process_chunk/4),
chunk: Keyword.get(opts, :chunk, @chunk),
buffer: Keyword.get(opts, :buffer, @buffer),
no_wait: Keyword.get(opts, :no_wait, false)
}
partitions = PartitionSupervisor.partitions(Strom.ComponentSupervisor)
partition_key = Enum.random(1..partitions)
child_name =
case composite do
nil ->
{:via, PartitionSupervisor, {Strom.ComponentSupervisor, partition_key}}
name ->
Composite.component_supervisor_name(name)
end
{:ok, pid} =
DynamicSupervisor.start_child(
child_name,
%{id: __MODULE__, start: {__MODULE__, :start_link, [gm]}, restart: :temporary}
)
%{gm | pid: pid}
end
def start_link(%__MODULE__{} = gm) do
GenServer.start_link(__MODULE__, gm)
end
@impl true
@spec init(__MODULE__.t()) :: {:ok, __MODULE__.t()}
def init(%__MODULE__{} = gm) do
{:ok, %{gm | pid: self()}}
end
@spec start_tasks(pid(), Strom.flow()) :: {pid(), map()}
def start_tasks(gm_pid, input_streams) do
GenServer.call(gm_pid, {:start_tasks, input_streams})
end
@spec run_tasks(pid(), atom()) :: :ok
def run_tasks(gm_pid, output_name) do
GenServer.call(gm_pid, {:run_tasks, output_name})
end
@spec call(map(), map()) :: map() | no_return()
def call(flow, gm), do: Streams.call(flow, gm)
@spec state(pid()) :: __MODULE__.t()
def state(pid), do: GenServer.call(pid, :state)
@spec stop(any()) :: any()
def stop(gm) do
GenServer.call(gm.pid, :stop)
end
@spec transfer_tasks(pid(), map(), atom()) :: :ok
def transfer_tasks(gm_pid, new_tasks, all_or_old) when all_or_old in [:all, :old] do
GenServer.cast(gm_pid, {:transfer_tasks, new_tasks, all_or_old})
end
@spec process_chunk(atom(), list(), Strom.flow(), any()) :: {Strom.flow(), boolean(), any()}
def process_chunk(_input_stream_name, chunk, outputs, nil) do
outputs
|> Enum.reduce({%{}, false, nil}, fn {output_name, output_stream_fun}, {acc, any?, nil} ->
{data, _} = Enum.split_with(chunk, output_stream_fun)
{Map.put(acc, output_name, data), any? || Enum.any?(data), nil}
end)
end
@impl true
def handle_call({:start_tasks, input_streams}, _from, %__MODULE__{} = gm)
when is_map(input_streams) do
new_tasks = Tasks.start_tasks(input_streams, gm)
tasks = Map.merge(gm.tasks, new_tasks)
{:reply, {gm.pid, new_tasks},
%{gm | tasks_started: true, tasks_run: false, tasks: tasks, input_streams: input_streams}}
end
def handle_call(
{:run_tasks, output_name},
{client_pid, _ref},
%__MODULE__{tasks_started: true, tasks_run: false} = gm
) do
Tasks.run_tasks(gm.tasks, gm.accs)
clients = Map.put(gm.clients, client_pid, output_name)
{:reply, :ok, %{gm | tasks_run: true, clients: clients}}
end
def handle_call(
{:run_tasks, output_name},
{client_pid, _ref},
%__MODULE__{tasks_started: true, tasks_run: true} = gm
) do
clients = Map.put(gm.clients, client_pid, output_name)
{:reply, :ok, %{gm | clients: clients}}
end
def handle_call(:stop, _from, %__MODULE__{} = gm) do
gm = %{gm | stopping: true}
if ready_to_stop?(gm) do
if gm.before_stop, do: gm.before_stop.()
{:stop, :normal, :ok, gm}
else
{:reply, :ok, gm}
end
end
def handle_call(:state, _from, %__MODULE__{} = gm) do
{:reply, gm, gm}
end
@impl true
def handle_cast({:gen_mix, :stopping}, gm) do
after_action(%{gm | stopping: true})
end
def handle_cast({:transfer_tasks, new_tasks, :all}, gm) do
run_new_tasks_and_halt_the_old_ones(new_tasks, gm.tasks, gm.accs)
after_action(%{gm | stopping: true})
end
def handle_cast({:transfer_tasks, new_tasks, :old}, gm) do
old_tasks = Map.drop(gm.tasks, Map.keys(new_tasks))
run_new_tasks_and_halt_the_old_ones(new_tasks, old_tasks, gm.accs)
after_action(%{gm | tasks_run: true, tasks: new_tasks})
end
def handle_cast(
{:put_data, {input_name, task_pid}, {new_data, new_acc}},
%__MODULE__{} = gm
) do
{all_data, waiting_clients, total_count} =
Streams.new_data(new_data, gm.outputs, gm.data, gm.waiting_clients)
waiting_tasks =
if total_count < gm.buffer do
send(task_pid, {:task, input_name, :continue})
gm.waiting_tasks
else
Map.put(gm.waiting_tasks, task_pid, input_name)
end
gm = %{
gm
| data: all_data,
accs: Map.put(gm.accs, input_name, new_acc),
data_size: total_count,
waiting_clients: waiting_clients,
waiting_tasks: waiting_tasks
}
after_action(gm)
end
def handle_cast({:get_data, {output_name, client_pid}}, %__MODULE__{} = gm) do
output_data = Map.get(gm.data, output_name, [])
initital_data_size = gm.data_size
gm =
case {output_data, map_size(gm.tasks)} do
# no tasks left, send done
{[], 0} ->
send(client_pid, {:client, output_name, :done})
%{gm | data: Map.put(gm.data, output_name, [])}
# no data, but there are running tasks, wait.
{[], _} ->
%{gm | waiting_clients: Map.put(gm.waiting_clients, client_pid, output_name)}
# there is data, send it to the client
{output_data, _} ->
send(client_pid, {:client, output_name, {:data, output_data}})
%{
gm
| data: Map.put(gm.data, output_name, []),
data_size: gm.data_size - length(output_data)
}
end
any_data_taken = gm.data_size < initital_data_size
waiting_tasks =
if any_data_taken and gm.data_size < gm.buffer do
send_continue_to_tasks(gm.waiting_tasks)
%{}
else
gm.waiting_tasks
end
gm = %{gm | waiting_tasks: waiting_tasks}
after_action(gm)
end
defp after_action(gm) do
cond do
ready_to_stop?(gm) ->
{data, data_size} =
send_data_to_clients(gm.data, gm.data_size, gm.clients)
send_done_to_clients(gm.clients)
if gm.before_stop, do: gm.before_stop.()
{:stop, :normal,
%{
gm
| data: data,
data_size: data_size,
tasks_started: false,
tasks_run: false,
waiting_clients: %{},
clients: %{}
}}
ready_to_finish?(gm) ->
{data, data_size} =
send_data_to_clients(gm.data, gm.data_size, gm.waiting_clients)
send_done_to_clients(gm.waiting_clients)
# {data, data_size} = send_data_to_clients(gm.data, gm.data_size, gm.clients)
# send_done_to_clients(gm.clients)
{:noreply,
%{
gm
| data: data,
data_size: data_size,
tasks_run: false,
waiting_clients: %{}
}}
true ->
{:noreply, gm}
end
end
defp run_new_tasks_and_halt_the_old_ones(new_tasks, old_tasks, accs) do
Enum.each(new_tasks, fn {new_task_pid, stream_name} ->
case Enum.find(old_tasks, fn {_, name} -> name == stream_name end) do
{task_pid, ^stream_name} ->
send(task_pid, {:task, :run_new_tasks_and_halt, {new_task_pid, accs[stream_name]}})
nil ->
send(new_task_pid, {:task, :run, accs[stream_name]})
end
end)
end
defp send_continue_to_tasks(tasks) do
Enum.each(tasks, fn {task_pid, input_name} ->
send(task_pid, {:task, input_name, :continue})
end)
end
defp send_done_to_clients(clients) do
Enum.each(clients, fn {client_pid, output_name} ->
send(client_pid, {:client, output_name, :done})
end)
end
defp send_data_to_clients(data, data_size, clients) do
Enum.reduce(clients, {data, data_size}, fn {client_pid, output_name}, {data, count} ->
case Map.get(data, output_name, []) do
[] ->
{data, count}
data_for_client ->
send(client_pid, {:client, output_name, {:data, data_for_client}})
{Map.put(data, output_name, []), count - length(data_for_client)}
end
end)
end
def ready_to_finish?(gm) do
map_size(gm.tasks) == 0 and map_size(gm.waiting_tasks) == 0
end
defp ready_to_stop?(gm) do
ready_to_finish?(gm) and gm.stopping
end
@impl true
def handle_info({ref, pid}, gm) do
# Task finished when started with TaskSupervisor
Process.demonitor(ref, [:flush])
task_finished(pid, gm)
end
def handle_info({:DOWN, ref, :process, pid, :normal}, gm) do
# Task finished when started in Composite
Process.demonitor(ref, [:flush])
task_finished(pid, gm)
end
def handle_info({:DOWN, _ref, :process, pid, _reason}, gm) do
# The task failed
{task_pid, name} = Tasks.handle_task_error(gm, pid)
tasks =
gm.tasks
|> Map.delete(pid)
|> Map.put(task_pid, name)
{:noreply, %{gm | tasks: tasks}}
end
defp task_finished(pid, %__MODULE__{no_wait: false} = gm) do
gm = %{
gm
| tasks: Map.delete(gm.tasks, pid),
waiting_tasks: Map.delete(gm.waiting_tasks, pid)
}
after_action(gm)
end
defp task_finished(_pid, %__MODULE__{no_wait: true} = gm) do
send_data_to_clients(gm.data, gm.data_size, gm.clients)
send_done_to_clients(gm.clients)
{:noreply,
%{
gm
| tasks_run: false,
tasks: %{}
}}
end
end