defmodule Runbox.Runtime.Simple.Timezip do
@moduledoc group: :internal
@moduledoc """
Timezip component responsible for merging multiple message streams for Simple runtime.
In a Simple scenario, there is only need for a Timezip if reading from multiple input topics. Then
we need to merge these streams into a single stream for the template.
"""
# Disable code duplication checks, since this is mostly copied from Stage. This is OK though,
# since we aim to remove Stage later, so generalizing this would be a waste of time.
# credo:disable-for-this-file Credo.Check.Design.DuplicatedCode
alias Runbox.Deduplicator
alias Runbox.Message
alias Runbox.RunStartContext
alias Runbox.Runtime.Timezip.MultiQueue
use GenStage
require Logger
@default_max_demand 1000
@doc """
Returns component name.
There are no parameters, since in Simple scenario runtime there is at most one Timezip.
"""
def component_name do
:timezip
end
@doc "Starts the GenStage."
def start_link(args, _runbox_ctx, start_ctx) do
GenStage.start_link(__MODULE__, {args, start_ctx})
end
@impl true
def init(
{%{run_id: run_id, config: %{subscribe_to: subscribe_to, scenario_id: scenario_id}},
start_ctx}
) do
subscribe_to =
Enum.map(subscribe_to, fn component ->
{RunStartContext.component_pid(start_ctx, component), []}
end)
# Create a multi queue, note the order is important. MultiQueue is now ordered, the order of
# `add_queue` calls represent the order in which the messages are emitted. If multiple producers
# produce messages in the same timestamp, they need to be sorted somehow in a predictable
# manner. Note the `subscribe_to` is also predictably sorted, so we just use that order here.
multi_queue =
Enum.reduce(subscribe_to, MultiQueue.new(), fn {pid, _}, multi_queue ->
MultiQueue.add_queue(multi_queue, pid)
end)
Logger.metadata(run_id: run_id, scenario_id: scenario_id)
{:producer_consumer,
%{
multi_queue: multi_queue,
deduplicator: Deduplicator.new([], &message_timestamp/1),
producer_refs: %{},
run_id: run_id
}, subscribe_to: subscribe_to}
end
@impl true
def handle_events(
events,
{from, _},
%{multi_queue: multi_queue, deduplicator: dedup, producer_refs: producer_refs} = state
) do
case zip_through_multiqueue(multi_queue, from, events, producer_refs) do
{:ok, multi_queue, msgs} ->
{msgs, dedup} = deduplicate(msgs, dedup)
{:noreply, msgs, %{state | multi_queue: multi_queue, deduplicator: dedup}}
error ->
{:stop, error, state}
end
end
@impl true
def handle_subscribe(:producer, options, {pid, _} = from, state) do
init_demand = options[:max_demand] || @default_max_demand
GenStage.ask(from, init_demand)
{:manual, put_in(state.producer_refs[pid], from)}
end
@impl true
def handle_subscribe(:consumer, _options, _from, state) do
{:automatic, state}
end
@impl true
def handle_cancel(_, {from, _}, %{multi_queue: multi_queue} = state) do
# well this (cancel) really cant happen during run, and when stopping it may as well be :stop
new_multi_queue = MultiQueue.remove_queue(multi_queue, from)
{:noreply, [], %{state | multi_queue: new_multi_queue}}
end
defp zip_through_multiqueue(multi_queue, queue_id, messages, producer_refs) do
with {:ok, multi_queue} <- MultiQueue.enqueue(multi_queue, queue_id, messages),
{:ok, multi_queue, demands, msgs} <-
MultiQueue.dequeue_all(multi_queue, &message_comparator/2) do
ask_for_more(demands, producer_refs)
{:ok, multi_queue, msgs}
end
end
defp ask_for_more(demand_distribution, producer_refs) do
for {id, number} <- demand_distribution do
GenStage.ask(producer_refs[id], number)
end
end
defp deduplicate(msgs, dedup) do
{msgs, dedup} =
Enum.reduce(msgs, {[], dedup}, fn msg, {msgs, dedup} ->
case Deduplicator.deduplicate(msg, dedup) do
{:new, dedup} ->
{[msg | msgs], dedup}
{:duplicate, dedup} ->
{msgs, dedup}
{:old, dedup} ->
# Should not happen if there is not error in scenario or timezip.
# This may indicate, that a message with timestamp lower than
# timestamps in previous messages was passed to timezip.
Logger.error("Deduplicator reported as old following message #{inspect(msg)}")
{[msg | msgs], dedup}
end
end)
{Enum.reverse(msgs), dedup}
end
defp message_comparator(%Message{timestamp: ts1}, %Message{timestamp: ts2}) do
ts1 < ts2
end
defp message_timestamp(%Message{timestamp: ts}) do
ts
end
end