defmodule ALF.Components.Producer do
use ALF.Components.Basic
alias ALF.Manager.Streamer
defstruct Basic.common_attributes() ++
[
type: :producer,
demand: 0,
manager_name: nil,
source_code: nil,
ips: []
]
def start_link(%__MODULE__{} = state) do
GenStage.start_link(__MODULE__, state)
end
def init(state) do
{:producer,
%{
state
| pid: self(),
name: :producer,
source_code: read_source_code(state.pipeline_module)
}}
end
def load_ips(pid, ips) do
GenServer.cast(pid, {:load_ips, ips})
end
def ips_count(pid), do: GenServer.call(pid, :ips_count)
def handle_call(:ips_count, _from, state) do
{:reply, length(state.ips), [], state}
end
def handle_demand(1, %__MODULE__{ips: [_ip | _], demand: demand} = state) do
{ips, new_state} = prepare_state_and_ips(%{state | demand: demand + 1})
{:noreply, ips, new_state}
end
def handle_demand(1, %__MODULE__{ips: [], demand: demand} = state) do
state = %{state | demand: demand + 1}
{:noreply, [], state}
end
def handle_cast({:load_ips, new_ips}, %__MODULE__{ips: ips, demand: demand} = state) do
{ips, new_state} = prepare_state_and_ips(%{state | ips: ips ++ new_ips, demand: demand + 0})
{:noreply, ips, new_state}
end
defp prepare_state_and_ips(
%__MODULE__{ips: ips, manager_name: manager_name, demand: demand} = state
) do
case Enum.split(ips, demand) do
{[], ips_to_store} ->
{[], %{state | demand: demand, ips: ips_to_store}}
{ips_to_send, ips_to_store} ->
ips_to_send = add_ips_to_in_progress_registry(ips_to_send, manager_name)
if state.telemetry_enabled do
send_simple_telemetry_events(ips_to_send, state)
end
{ips_to_send, %{state | demand: demand - length(ips_to_send), ips: ips_to_store}}
end
end
defp add_ips_to_in_progress_registry([ip | _] = ips, manager_name) do
Streamer.cast_remove_from_registry(manager_name, ips, ip.stream_ref)
ips = Enum.map(ips, fn ip -> %{ip | in_progress: true} end)
Streamer.cast_add_to_registry(manager_name, ips, ip.stream_ref)
ips
end
defp send_simple_telemetry_events(ips_to_send, state) do
ips_to_send
|> Enum.map(fn ip ->
telemetry_data = telemetry_data(ip, state)
:telemetry.span([:alf, :component], telemetry_data, fn -> {:ok, telemetry_data} end)
end)
end
end