lib/runbox/runtime/simple/timezip.ex

defmodule Runbox.Runtime.Simple.Timezip do
  @moduledoc group: :internal
  @moduledoc """
  Timezip component responsible for merging multiple message streams for Simple runtime.

  In a Simple scenario, there is only need for a Timezip if reading from multiple input topics. Then
  we need to merge these streams into a single stream for the template.
  """

  # Disable code duplication checks, since this is mostly copied from Stage. This is OK though,
  # since we aim to remove Stage later, so generalizing this would be a waste of time.
  # credo:disable-for-this-file Credo.Check.Design.DuplicatedCode

  alias Runbox.Deduplicator
  alias Runbox.Message
  alias Runbox.RunStartContext
  alias Runbox.Runtime.Timezip.MultiQueue

  use GenStage
  require Logger

  @default_max_demand 1000

  @doc """
  Returns component name.

  There are no parameters, since in Simple scenario runtime there is at most one Timezip.
  """
  def component_name do
    :timezip
  end

  @doc "Starts the GenStage."
  def start_link(args, _runbox_ctx, start_ctx) do
    GenStage.start_link(__MODULE__, {args, start_ctx})
  end

  @impl true
  def init(
        {%{run_id: run_id, config: %{subscribe_to: subscribe_to, scenario_id: scenario_id}},
         start_ctx}
      ) do
    subscribe_to =
      Enum.map(subscribe_to, fn component ->
        {RunStartContext.component_pid(start_ctx, component), []}
      end)

    # Create a multi queue, note the order is important. MultiQueue is now ordered, the order of
    # `add_queue` calls represent the order in which the messages are emitted. If multiple producers
    # produce messages in the same timestamp, they need to be sorted somehow in a predictable
    # manner. Note the `subscribe_to` is also predictably sorted, so we just use that order here.
    multi_queue =
      Enum.reduce(subscribe_to, MultiQueue.new(), fn {pid, _}, multi_queue ->
        MultiQueue.add_queue(multi_queue, pid)
      end)

    Logger.metadata(run_id: run_id, scenario_id: scenario_id)

    {:producer_consumer,
     %{
       multi_queue: multi_queue,
       deduplicator: Deduplicator.new([], &message_timestamp/1),
       producer_refs: %{},
       run_id: run_id
     }, subscribe_to: subscribe_to}
  end

  @impl true
  def handle_events(
        events,
        {from, _},
        %{multi_queue: multi_queue, deduplicator: dedup, producer_refs: producer_refs} = state
      ) do
    case zip_through_multiqueue(multi_queue, from, events, producer_refs) do
      {:ok, multi_queue, msgs} ->
        {msgs, dedup} = deduplicate(msgs, dedup)
        {:noreply, msgs, %{state | multi_queue: multi_queue, deduplicator: dedup}}

      error ->
        {:stop, error, state}
    end
  end

  @impl true
  def handle_subscribe(:producer, options, {pid, _} = from, state) do
    init_demand = options[:max_demand] || @default_max_demand
    GenStage.ask(from, init_demand)
    {:manual, put_in(state.producer_refs[pid], from)}
  end

  @impl true
  def handle_subscribe(:consumer, _options, _from, state) do
    {:automatic, state}
  end

  @impl true
  def handle_cancel(_, {from, _}, %{multi_queue: multi_queue} = state) do
    # well this (cancel) really cant happen during run, and when stopping it may as well be :stop
    new_multi_queue = MultiQueue.remove_queue(multi_queue, from)
    {:noreply, [], %{state | multi_queue: new_multi_queue}}
  end

  defp zip_through_multiqueue(multi_queue, queue_id, messages, producer_refs) do
    with {:ok, multi_queue} <- MultiQueue.enqueue(multi_queue, queue_id, messages),
         {:ok, multi_queue, demands, msgs} <-
           MultiQueue.dequeue_all(multi_queue, &message_comparator/2) do
      ask_for_more(demands, producer_refs)
      {:ok, multi_queue, msgs}
    end
  end

  defp ask_for_more(demand_distribution, producer_refs) do
    for {id, number} <- demand_distribution do
      GenStage.ask(producer_refs[id], number)
    end
  end

  defp deduplicate(msgs, dedup) do
    {msgs, dedup} =
      Enum.reduce(msgs, {[], dedup}, fn msg, {msgs, dedup} ->
        case Deduplicator.deduplicate(msg, dedup) do
          {:new, dedup} ->
            {[msg | msgs], dedup}

          {:duplicate, dedup} ->
            {msgs, dedup}

          {:old, dedup} ->
            # Should not happen if there is not error in scenario or timezip.
            # This may indicate, that a message with timestamp lower than
            # timestamps in previous messages was passed to timezip.
            Logger.error("Deduplicator reported as old following message #{inspect(msg)}")

            {[msg | msgs], dedup}
        end
      end)

    {Enum.reverse(msgs), dedup}
  end

  defp message_comparator(%Message{timestamp: ts1}, %Message{timestamp: ts2}) do
    ts1 < ts2
  end

  defp message_timestamp(%Message{timestamp: ts}) do
    ts
  end
end