defmodule Runbox.Runtime.Stage.Timezip do
@moduledoc group: :internal
@moduledoc """
Timezip component responsible for merging multiple message streams using time for Stage runtime.
"""
# Disable code duplication checks, since this is mostly copied to Simple. This is OK though,
# since we aim to remove Stage later, so generalizing this would be a waste of time.
# credo:disable-for-this-file Credo.Check.Design.DuplicatedCode
@default_max_demand 1000
alias Runbox.Deduplicator
alias Runbox.RunStartContext
alias Runbox.Runtime.Stage.SelectorBuilder
alias Runbox.Runtime.Timezip.MultiQueue
use GenStage
require Logger
def component_name(subscriptions) do
{:timezip, subscriptions}
end
def start_link(args, _runbox_ctx, start_ctx) do
GenStage.start_link(__MODULE__, {args, start_ctx})
end
@impl true
def init(
{%{run_id: run_id, config: %{subscribe_to: subscribe_to, scenario_id: scenario_id}},
start_ctx}
) do
subscribe_to =
Enum.map(subscribe_to, fn {component, subscription_properties} ->
opts =
case Keyword.get(subscription_properties, :selector) do
:none_before_output_sink -> []
_ -> [selector: SelectorBuilder.build(subscription_properties)]
end
{RunStartContext.component_pid(start_ctx, component), opts}
end)
# Create a multi queue, note the order is important. MultiQueue is now ordered, the order of
# `add_queue` calls represent the order in which the messages are emitted. If multiple producers
# produce messages in the same timestamp, they need to be sorted somehow in a predictable
# manner. Note the `subscribe_to` is also predictably sorted, so we just use that order here.
multi_queue =
Enum.reduce(subscribe_to, MultiQueue.new(), fn {pid, _}, multi_queue ->
MultiQueue.add_queue(multi_queue, pid)
end)
Logger.metadata(run_id: run_id, scenario_id: scenario_id)
{:producer_consumer,
%{
multi_queue: multi_queue,
deduplicator: Deduplicator.new([], &message_timestamp/1),
producer_refs: %{},
run_id: run_id
}, subscribe_to: subscribe_to, dispatcher: GenStage.BroadcastDispatcher}
end
@impl true
def handle_events(
events,
{from, _},
%{multi_queue: multi_queue, deduplicator: dedup, producer_refs: producer_refs} = state
) do
case zip_through_multiqueue(multi_queue, from, events, producer_refs) do
{:ok, multi_queue, msgs} ->
{msgs, dedup} = deduplicate(msgs, dedup)
{:noreply, msgs, %{state | multi_queue: multi_queue, deduplicator: dedup}}
error ->
{:stop, error, state}
end
end
@impl true
def handle_subscribe(:producer, options, {pid, _} = from, state) do
init_demand = options[:max_demand] || @default_max_demand
GenStage.ask(from, init_demand)
{:manual, put_in(state.producer_refs[pid], from)}
end
def handle_subscribe(:consumer, _options, _from, state) do
{:automatic, state}
end
@impl true
def handle_cancel(_, {from, _}, %{multi_queue: multi_queue} = state) do
# well this (cancel) really cant happen during run, and when stopping it may as well be :stop
new_multi_queue = MultiQueue.remove_queue(multi_queue, from)
{:noreply, [], %{state | multi_queue: new_multi_queue}}
end
defp zip_through_multiqueue(multi_queue, queue_id, messages, producer_refs) do
with {:ok, multi_queue} <- MultiQueue.enqueue(multi_queue, queue_id, messages),
{:ok, multi_queue, demands, msgs} <-
MultiQueue.dequeue_all(multi_queue, &message_comparator/2) do
ask_for_more(demands, producer_refs)
{:ok, multi_queue, msgs}
end
end
defp ask_for_more(demand_distribution, producer_refs) do
for {id, number} <- demand_distribution do
GenStage.ask(producer_refs[id], number)
end
end
defp deduplicate(msgs, dedup) do
{msgs, dedup} =
Enum.reduce(msgs, {[], dedup}, fn msg, {msgs, dedup} ->
case Deduplicator.deduplicate(msg, dedup) do
{:new, dedup} ->
{[msg | msgs], dedup}
{:duplicate, dedup} ->
{msgs, dedup}
{:old, dedup} ->
# Should not happen if there is not error in scenario or timezip.
# This may indicate, that a message with timestamp lower than
# timestamps in previous messages was passed to timezip.
Logger.error("Deduplicator reported as old following message #{inspect(msg)}")
{[msg | msgs], dedup}
end
end)
{Enum.reverse(msgs), dedup}
end
# here msg is either OutputAction, or Message, both have timestamp field
defp message_comparator(msg1, msg2) do
msg1.timestamp < msg2.timestamp
end
# here msg is either OutputAction, or Message, both have timestamp field
defp message_timestamp(msg) do
msg.timestamp
end
end