Skip to main content

lib/runbox/runtime/stage/timezip.ex

defmodule Runbox.Runtime.Stage.Timezip do
  @moduledoc group: :internal
  @moduledoc """
  Timezip component responsible for merging multiple message streams using time for Stage runtime.
  """

  # Disable code duplication checks, since this is mostly copied to Simple. This is OK though,
  # since we aim to remove Stage later, so generalizing this would be a waste of time.
  # credo:disable-for-this-file Credo.Check.Design.DuplicatedCode

  @default_max_demand 1000

  alias Runbox.Deduplicator
  alias Runbox.RunStartContext
  alias Runbox.Runtime.Stage.SelectorBuilder
  alias Runbox.Runtime.Timezip.MultiQueue

  use GenStage
  require Logger

  def component_name(subscriptions) do
    {:timezip, subscriptions}
  end

  def start_link(args, _runbox_ctx, start_ctx) do
    GenStage.start_link(__MODULE__, {args, start_ctx})
  end

  @impl true
  def init(
        {%{run_id: run_id, config: %{subscribe_to: subscribe_to, scenario_id: scenario_id}},
         start_ctx}
      ) do
    subscribe_to =
      Enum.map(subscribe_to, fn {component, subscription_properties} ->
        opts =
          case Keyword.get(subscription_properties, :selector) do
            :none_before_output_sink -> []
            _ -> [selector: SelectorBuilder.build(subscription_properties)]
          end

        {RunStartContext.component_pid(start_ctx, component), opts}
      end)

    # Create a multi queue, note the order is important. MultiQueue is now ordered, the order of
    # `add_queue` calls represent the order in which the messages are emitted. If multiple producers
    # produce messages in the same timestamp, they need to be sorted somehow in a predictable
    # manner. Note the `subscribe_to` is also predictably sorted, so we just use that order here.
    multi_queue =
      Enum.reduce(subscribe_to, MultiQueue.new(), fn {pid, _}, multi_queue ->
        MultiQueue.add_queue(multi_queue, pid)
      end)

    Logger.metadata(run_id: run_id, scenario_id: scenario_id)

    {:producer_consumer,
     %{
       multi_queue: multi_queue,
       deduplicator: Deduplicator.new([], &message_timestamp/1),
       producer_refs: %{},
       run_id: run_id
     }, subscribe_to: subscribe_to, dispatcher: GenStage.BroadcastDispatcher}
  end

  @impl true
  def handle_events(
        events,
        {from, _},
        %{multi_queue: multi_queue, deduplicator: dedup, producer_refs: producer_refs} = state
      ) do
    case zip_through_multiqueue(multi_queue, from, events, producer_refs) do
      {:ok, multi_queue, msgs} ->
        {msgs, dedup} = deduplicate(msgs, dedup)
        {:noreply, msgs, %{state | multi_queue: multi_queue, deduplicator: dedup}}

      error ->
        {:stop, error, state}
    end
  end

  @impl true
  def handle_subscribe(:producer, options, {pid, _} = from, state) do
    init_demand = options[:max_demand] || @default_max_demand
    GenStage.ask(from, init_demand)
    {:manual, put_in(state.producer_refs[pid], from)}
  end

  def handle_subscribe(:consumer, _options, _from, state) do
    {:automatic, state}
  end

  @impl true
  def handle_cancel(_, {from, _}, %{multi_queue: multi_queue} = state) do
    # well this (cancel) really cant happen during run, and when stopping it may as well be :stop
    new_multi_queue = MultiQueue.remove_queue(multi_queue, from)
    {:noreply, [], %{state | multi_queue: new_multi_queue}}
  end

  defp zip_through_multiqueue(multi_queue, queue_id, messages, producer_refs) do
    with {:ok, multi_queue} <- MultiQueue.enqueue(multi_queue, queue_id, messages),
         {:ok, multi_queue, demands, msgs} <-
           MultiQueue.dequeue_all(multi_queue, &message_comparator/2) do
      ask_for_more(demands, producer_refs)
      {:ok, multi_queue, msgs}
    end
  end

  defp ask_for_more(demand_distribution, producer_refs) do
    for {id, number} <- demand_distribution do
      GenStage.ask(producer_refs[id], number)
    end
  end

  defp deduplicate(msgs, dedup) do
    {msgs, dedup} =
      Enum.reduce(msgs, {[], dedup}, fn msg, {msgs, dedup} ->
        case Deduplicator.deduplicate(msg, dedup) do
          {:new, dedup} ->
            {[msg | msgs], dedup}

          {:duplicate, dedup} ->
            {msgs, dedup}

          {:old, dedup} ->
            # Should not happen if there is not error in scenario or timezip.
            # This may indicate, that a message with timestamp lower than
            # timestamps in previous messages was passed to timezip.
            Logger.error("Deduplicator reported as old following message #{inspect(msg)}")

            {[msg | msgs], dedup}
        end
      end)

    {Enum.reverse(msgs), dedup}
  end

  # here msg is either OutputAction, or Message, both have timestamp field
  defp message_comparator(msg1, msg2) do
    msg1.timestamp < msg2.timestamp
  end

  # here msg is either OutputAction, or Message, both have timestamp field
  defp message_timestamp(msg) do
    msg.timestamp
  end
end