defmodule ALF.Manager do
use GenServer
defstruct name: nil,
pipeline_module: nil,
pid: nil,
pipeline: nil,
components: [],
stages_to_be_deleted: [],
pipeline_sup_pid: nil,
sup_pid: nil,
producer_pid: nil,
registry: %{},
registry_dump: %{},
autoscaling_enabled: nil,
telemetry_enabled: nil
alias ALF.AutoScaler
alias ALF.Manager.{Components, Streamer, ProcessingOptions, StreamRegistry}
alias ALF.Components.{Goto, Producer}
alias ALF.{Builder, Introspection, PipelineDynamicSupervisor, Pipeline}
@available_options [:autoscaling_enabled, :telemetry_enabled]
@max_producer_load 100
def max_producer_load, do: @max_producer_load
def start_link(%__MODULE__{} = state) do
GenServer.start_link(__MODULE__, state, name: state.name)
end
def init(%__MODULE__{} = state) do
state = %{state | pid: self()}
{:ok, state, {:continue, :init_pipeline}}
end
@spec start(atom) :: :ok
def start(module) when is_atom(module) do
start(module, module, [])
end
@spec start(atom, atom) :: :ok
def start(module, name) when is_atom(module) and is_atom(name) do
start(module, name, [])
end
@spec start(atom, list) :: :ok
def start(module, opts) when is_atom(module) and is_list(opts) do
start(module, module, opts)
end
@spec start(atom, atom, list) :: :ok
def start(module, name, opts) when is_atom(module) and is_atom(name) and is_list(opts) do
unless is_pipeline_module?(module) do
raise "The #{module} doesn't implement any pipeline"
end
wrong_options = Keyword.keys(opts) -- @available_options
if Enum.any?(wrong_options) do
raise "Wrong options for the '#{name}' pipeline: #{inspect(wrong_options)}. " <>
"Available options are #{inspect(@available_options)}"
end
sup_pid = Process.whereis(ALF.DynamicSupervisor)
name = if name, do: name, else: module
case DynamicSupervisor.start_child(
sup_pid,
%{
id: __MODULE__,
start:
{__MODULE__, :start_link,
[
%__MODULE__{
sup_pid: sup_pid,
name: name,
pipeline_module: module,
autoscaling_enabled: Keyword.get(opts, :autoscaling_enabled, false),
telemetry_enabled:
Keyword.get(opts, :telemetry_enabled, nil) ||
telemetry_enabled_in_configs?()
}
]},
restart: :transient
}
) do
{:ok, _manager_pid} ->
Introspection.add(module)
:ok
{:error, {:already_started, _pid}} ->
:ok
end
end
def stop(module) when is_atom(module) do
AutoScaler.unregister_pipeline(module)
result = GenServer.call(module, :stop, :infinity)
Introspection.remove(module)
result
catch
:exit, {reason, details} ->
{:exit, {reason, details}}
end
@spec stream_to(Enumerable.t(), atom(), map() | keyword()) :: Enumerable.t()
def stream_to(stream, name, opts \\ []) when is_atom(name) do
GenServer.call(name, {:stream_to, stream, ProcessingOptions.new(opts), false})
end
@spec steam_with_ids_to(Enumerable.t({term, term}), atom(), map() | keyword()) ::
Enumerable.t()
def steam_with_ids_to(stream, name, opts \\ []) when is_atom(name) do
GenServer.call(name, {:stream_to, stream, ProcessingOptions.new(opts), true})
end
@spec components(atom) :: list(map())
def components(name) when is_atom(name) do
GenServer.call(name, :components)
end
@spec reload_components_states(atom()) :: list(map())
def reload_components_states(name) when is_atom(name) do
GenServer.call(name, :reload_components_states)
end
@spec producer_ips_count(atom) :: integer()
def producer_ips_count(name) when is_atom(name) do
GenServer.call(name, :producer_ips_count)
end
def add_component(name, stage_set_ref) do
GenServer.call(name, {:add_component, stage_set_ref})
end
def remove_component(name, stage_set_ref) do
GenServer.call(name, {:remove_component, stage_set_ref})
end
def delete_marked_to_be_deleted(name) when is_atom(name) do
GenServer.call(name, :delete_marked_to_be_deleted)
end
def terminate(:normal, state) do
Supervisor.stop(state.pipeline_sup_pid)
end
def __state__(name_or_pid) when is_atom(name_or_pid) or is_pid(name_or_pid) do
GenServer.call(name_or_pid, :__state__)
end
def __set_state__(name_or_pid, new_state) when is_atom(name_or_pid) or is_pid(name_or_pid) do
GenServer.call(name_or_pid, {:__set_state__, new_state})
end
def handle_continue(:init_pipeline, %__MODULE__{} = state) do
{:noreply, start_pipeline(state), {:continue, :register_auto_scaling}}
end
def handle_continue(:register_auto_scaling, %__MODULE__{} = state) do
if state.autoscaling_enabled do
AutoScaler.register_pipeline(state.pipeline_module)
end
{:noreply, state}
end
defp start_pipeline(%__MODULE__{} = state) do
state
|> start_pipeline_supervisor()
|> build_pipeline()
|> save_stages_states()
|> prepare_gotos()
end
defp start_pipeline_supervisor(%__MODULE__{} = state) do
pipeline_sup_pid =
case PipelineDynamicSupervisor.start_link(%{name: :"#{state.name}_DynamicSupervisor"}) do
{:ok, pid} -> pid
{:error, {:already_started, pid}} -> pid
end
Process.unlink(pipeline_sup_pid)
Process.monitor(pipeline_sup_pid)
%{state | pipeline_sup_pid: pipeline_sup_pid}
end
defp build_pipeline(%__MODULE__{} = state) do
{:ok, pipeline} =
Builder.build(
state.pipeline_module.alf_components,
state.pipeline_sup_pid,
state.name,
state.pipeline_module,
state.telemetry_enabled
)
%{state | pipeline: pipeline, producer_pid: pipeline.producer.pid}
end
defp save_stages_states(%__MODULE__{} = state) do
components =
[state.pipeline.producer | Pipeline.stages_to_list(state.pipeline.components)] ++
[state.pipeline.consumer]
components =
components
|> Enum.map(fn stage ->
stage.__struct__.__state__(stage.pid)
end)
%{state | components: components}
end
defp prepare_gotos(%__MODULE__{} = state) do
components =
state.components
|> Enum.map(fn component ->
case component do
%Goto{} ->
Goto.find_where_to_go(component.pid, state.components)
stage ->
stage
end
end)
%{state | components: components}
end
def handle_call(:__state__, _from, state), do: {:reply, state, state}
def handle_call({:__set_state__, new_state}, _from, _state) do
{:reply, new_state, new_state}
end
def handle_call(:stop, _from, state) do
{:stop, :normal, state, state}
end
def handle_call({:stream_to, stream, opts, custom_ids?}, _from, %__MODULE__{} = state) do
{stream, state} = Streamer.prepare_streams(state, stream, opts, custom_ids?)
{:reply, stream, state}
end
def handle_call({:flush_queue, stream_ref}, _from, state) do
registry = state.registry[stream_ref]
if registry do
events =
case :queue.to_list(registry.queue) do
[] ->
if StreamRegistry.empty?(registry), do: :done, else: {:ok, []}
events when is_list(events) ->
{:ok, events}
end
new_registry = Map.put(state.registry, stream_ref, %{registry | queue: :queue.new()})
{:reply, events, %{state | registry: new_registry}}
else
{:reply, {:ok, []}, state}
end
end
def handle_call(:components, _from, state) do
{:reply, state.components, state}
end
def handle_call(:reload_components_states, _from, state) do
components =
state.components
|> Enum.map(fn stage ->
stage.__struct__.__state__(stage.pid)
end)
{:reply, components, %{state | components: components}}
end
def handle_call(:producer_ips_count, _from, state) do
count = Producer.ips_count(state.producer_pid)
{:reply, count, state}
end
def handle_call({:add_component, stage_set_ref}, _from, state) do
{new_stage, new_components} =
Components.add_component(state.components, stage_set_ref, state.pipeline_sup_pid)
{:reply, new_stage, %{state | components: new_components}}
end
def handle_call({:remove_component, stage_set_ref}, _from, state) do
case Components.remove_component(state.components, stage_set_ref) do
{:ok, {stage_to_delete, new_components}} ->
stages_to_be_deleted = [stage_to_delete | state.stages_to_be_deleted]
{:reply, stage_to_delete,
%{state | components: new_components, stages_to_be_deleted: stages_to_be_deleted}}
{:error, :only_one_left} ->
{:reply, {:error, :only_one_left}, state}
end
end
def handle_call(:delete_marked_to_be_deleted, _from, state) do
state.stages_to_be_deleted
|> Enum.each(fn stage ->
DynamicSupervisor.terminate_child(state.pipeline_sup_pid, stage.pid)
end)
{:reply, state.stages_to_be_deleted, state}
end
def handle_cast({:add_to_registry, ips, stream_ref}, state) do
new_registry = Streamer.add_to_registry(state.registry, stream_ref, ips)
{:noreply, %{state | registry: new_registry}}
end
def handle_cast({:remove_from_registry, ips, stream_ref}, state) do
new_registry = Streamer.remove_from_registry(state.registry, stream_ref, ips)
{:noreply, %{state | registry: new_registry}}
end
def handle_cast({:result_ready, ip}, state) do
new_registry = Streamer.rebuild_registry_on_result_ready(state.registry, ip)
{:noreply, %{state | registry: new_registry}}
end
def handle_info({:DOWN, _ref, :process, _pid, :shutdown}, %__MODULE__{} = state) do
state =
state
|> start_pipeline()
|> copy_registry_to_dump()
|> Streamer.resend_packets()
{:noreply, state}
end
defp copy_registry_to_dump(state) do
%{state | registry_dump: state.registry}
end
defp is_pipeline_module?(module) when is_atom(module) do
is_list(module.alf_components())
rescue
_error -> false
end
defp telemetry_enabled_in_configs? do
Application.get_env(:alf, :telemetry_enabled, false)
end
end