lib/runbox/runtime/stage/timezip.ex

defmodule Runbox.Runtime.Stage.Timezip do
  @moduledoc """
  Timezip component responsible for merging messages from multiple sources using message timestamps.
  """
  @default_max_demand 1000

  alias __MODULE__.MultiQueue
  alias Runbox.Deduplicator
  alias Runbox.RunStartContext
  alias Runbox.Runtime.Stage.SelectorBuilder

  use GenStage
  require Logger

  def component_name(subscriptions) do
    {:timezip, subscriptions}
  end

  def start_link(args, _runbox_ctx, start_ctx) do
    GenStage.start_link(__MODULE__, {args, start_ctx})
  end

  @impl true
  def init(
        {%{run_id: run_id, config: %{subscribe_to: subscribe_to, scenario_id: scenario_id}},
         start_ctx}
      ) do
    subscribe_to =
      Enum.map(subscribe_to, fn {component, subscription_properties} ->
        opts =
          case Keyword.get(subscription_properties, :selector) do
            :none_before_output_sink -> []
            _ -> [selector: SelectorBuilder.build(subscription_properties)]
          end

        {RunStartContext.component_pid(start_ctx, component), opts}
      end)

    Logger.metadata(run_id: run_id, scenario_id: scenario_id)

    {:producer_consumer,
     %{
       multi_queue: MultiQueue.new(),
       deduplicator: Deduplicator.new([], &message_timestamp/1),
       run_id: run_id
     }, subscribe_to: subscribe_to, dispatcher: GenStage.BroadcastDispatcher}
  end

  @impl true
  def handle_events(events, from, %{multi_queue: multi_queue, deduplicator: dedup} = state) do
    case zip_through_multiqueue(multi_queue, from, events) do
      {:ok, multi_queue, msgs} ->
        {msgs, dedup} = deduplicate(msgs, dedup)
        {:noreply, msgs, %{state | multi_queue: multi_queue, deduplicator: dedup}}

      error ->
        {:stop, error, state}
    end
  end

  @impl true
  def handle_subscribe(:producer, options, from, state) do
    new_multi_queue = MultiQueue.add_queue(state.multi_queue, from)
    init_demand = options[:max_demand] || @default_max_demand
    GenStage.ask(from, init_demand)
    {:manual, %{state | multi_queue: new_multi_queue}}
  end

  def handle_subscribe(:consumer, _options, _from, state) do
    {:automatic, state}
  end

  @impl true
  def handle_cancel(_, from, %{multi_queue: multi_queue} = state) do
    # well this (cancel) really cant happen during run, and when stopping it may as well be :stop
    new_multi_queue = MultiQueue.remove_queue(multi_queue, from)
    {:noreply, [], %{state | multi_queue: new_multi_queue}}
  end

  defp zip_through_multiqueue(multi_queue, queue_id, messages) do
    with {:ok, multi_queue} <- MultiQueue.enqueue(multi_queue, queue_id, messages),
         {:ok, multi_queue, demands, msgs} <-
           MultiQueue.dequeue_all(multi_queue, &message_comparator/2) do
      ask_for_more(demands)
      {:ok, multi_queue, msgs}
    end
  end

  defp ask_for_more(demand_distribution) do
    for {id, number} <- demand_distribution do
      GenStage.ask(id, number)
    end
  end

  defp deduplicate(msgs, dedup) do
    {msgs, dedup} =
      Enum.reduce(msgs, {[], dedup}, fn msg, {msgs, dedup} ->
        case Deduplicator.deduplicate(msg, dedup) do
          {:new, dedup} ->
            {[msg | msgs], dedup}

          {:duplicate, dedup} ->
            {msgs, dedup}

          {:old, dedup} ->
            # Should not happen if there is not error in scenario or timezip.
            # This may indicate, that a message with timestamp lower than
            # timestamps in previous messages was passed to timezip.
            Logger.error("Deduplicator reported as old following message #{inspect(msg)}")

            {[msg | msgs], dedup}
        end
      end)

    {Enum.reverse(msgs), dedup}
  end

  # here msg is either OutputAction, or Message, both have timestamp field
  defp message_comparator(msg1, msg2) do
    msg1.timestamp < msg2.timestamp
  end

  # here msg is either OutputAction, or Message, both have timestamp field
  defp message_timestamp(msg) do
    msg.timestamp
  end
end