defmodule Runbox.Runtime.Stage.Timezip do
@moduledoc """
Timezip component responsible for merging messages from multiple sources using message timestamps.
"""
@default_max_demand 1000
alias __MODULE__.MultiQueue
alias Runbox.Deduplicator
alias Runbox.RunStartContext
alias Runbox.Runtime.Stage.SelectorBuilder
use GenStage
require Logger
def component_name(subscriptions) do
{:timezip, subscriptions}
end
def start_link(args, _runbox_ctx, start_ctx) do
GenStage.start_link(__MODULE__, {args, start_ctx})
end
@impl true
def init(
{%{run_id: run_id, config: %{subscribe_to: subscribe_to, scenario_id: scenario_id}},
start_ctx}
) do
subscribe_to =
Enum.map(subscribe_to, fn {component, subscription_properties} ->
opts =
case Keyword.get(subscription_properties, :selector) do
:none_before_output_sink -> []
_ -> [selector: SelectorBuilder.build(subscription_properties)]
end
{RunStartContext.component_pid(start_ctx, component), opts}
end)
Logger.metadata(run_id: run_id, scenario_id: scenario_id)
{:producer_consumer,
%{
multi_queue: MultiQueue.new(),
deduplicator: Deduplicator.new([], &message_timestamp/1),
run_id: run_id
}, subscribe_to: subscribe_to, dispatcher: GenStage.BroadcastDispatcher}
end
@impl true
def handle_events(events, from, %{multi_queue: multi_queue, deduplicator: dedup} = state) do
case zip_through_multiqueue(multi_queue, from, events) do
{:ok, multi_queue, msgs} ->
{msgs, dedup} = deduplicate(msgs, dedup)
{:noreply, msgs, %{state | multi_queue: multi_queue, deduplicator: dedup}}
error ->
{:stop, error, state}
end
end
@impl true
def handle_subscribe(:producer, options, from, state) do
new_multi_queue = MultiQueue.add_queue(state.multi_queue, from)
init_demand = options[:max_demand] || @default_max_demand
GenStage.ask(from, init_demand)
{:manual, %{state | multi_queue: new_multi_queue}}
end
def handle_subscribe(:consumer, _options, _from, state) do
{:automatic, state}
end
@impl true
def handle_cancel(_, from, %{multi_queue: multi_queue} = state) do
# well this (cancel) really cant happen during run, and when stopping it may as well be :stop
new_multi_queue = MultiQueue.remove_queue(multi_queue, from)
{:noreply, [], %{state | multi_queue: new_multi_queue}}
end
defp zip_through_multiqueue(multi_queue, queue_id, messages) do
with {:ok, multi_queue} <- MultiQueue.enqueue(multi_queue, queue_id, messages),
{:ok, multi_queue, demands, msgs} <-
MultiQueue.dequeue_all(multi_queue, &message_comparator/2) do
ask_for_more(demands)
{:ok, multi_queue, msgs}
end
end
defp ask_for_more(demand_distribution) do
for {id, number} <- demand_distribution do
GenStage.ask(id, number)
end
end
defp deduplicate(msgs, dedup) do
{msgs, dedup} =
Enum.reduce(msgs, {[], dedup}, fn msg, {msgs, dedup} ->
case Deduplicator.deduplicate(msg, dedup) do
{:new, dedup} ->
{[msg | msgs], dedup}
{:duplicate, dedup} ->
{msgs, dedup}
{:old, dedup} ->
# Should not happen if there is not error in scenario or timezip.
# This may indicate, that a message with timestamp lower than
# timestamps in previous messages was passed to timezip.
Logger.error("Deduplicator reported as old following message #{inspect(msg)}")
{[msg | msgs], dedup}
end
end)
{Enum.reverse(msgs), dedup}
end
# here msg is either OutputAction, or Message, both have timestamp field
defp message_comparator(msg1, msg2) do
msg1.timestamp < msg2.timestamp
end
# here msg is either OutputAction, or Message, both have timestamp field
defp message_timestamp(msg) do
msg.timestamp
end
end