lib/runbox/runtime/stage/component_network.ex

defmodule Runbox.Runtime.Stage.ComponentNetwork do
  @moduledoc """
  Component network builds and validates template dependency network.

  To build this network, module uses `c:Toolbox.Scenario.Template.StageBased.subscriptions/0`.
  Only templates are defined at first, logic than derive input-topic nodes, timezip where one
  template subscribes to two stream, and output sink in the end.
  """

  alias Runbox.ScenarioTemplate

  @typedoc "Part of component identifier."
  @type component_type ::
          :input_topic | :load_topic | :tick_timezip | :template | :output_sink | :timezip

  @typedoc "Full representation of particular component."
  @type component :: {component_type, term}

  @typedoc "Additional properties of subscription/edge. E.g. selector for Broadcast dispatcher."
  @type subscription_properties :: term

  @typedoc "Subscription edge defined with destination and props."
  @type subscription :: {component, subscription_properties}

  @typedoc "Component network is adjacency graph with subscription edges."
  @type t :: %{component => [subscription]}

  @doc """
  Main function. Given list of template modules,
  expands them into components network with input streams, timezips etc...

  ## Options

    * `:direct_ticking` - if true (default), creates network using direct ticking
  """
  @spec create([ScenarioTemplate.t()], Keyword.t()) :: {:ok, t} | {:error, term}
  def create(scenario_templates, opts \\ []) do
    use_direct_ticking? = Keyword.get(opts, :direct_ticking, true)
    base_network = convert_to_network(scenario_templates)

    with :ok <- validate_orphan_templates(base_network),
         {:ok, complete_network} <- expand(base_network, use_direct_ticking?),
         :ok <- validate_missing_components(complete_network),
         :ok <- validate_input_topic_not_load_as_well(complete_network) do
      {:ok, complete_network}
    end
  end

  @doc """
  Creates base network with only template nodes
  """
  @spec convert_to_network([ScenarioTemplate.t()]) :: t
  def convert_to_network(scenario_templates) do
    Enum.into(scenario_templates, %{}, fn %ScenarioTemplate{info: info, module: template_module} ->
      # only the message type part of the subscription is actually used
      # for message filtering (see Runtime.Run.Runtime.Stage.SelectorBuilder)
      subscriptions = Enum.map(info.subscriptions, &template_subscription_to_subscription/1)
      {{:template, template_module}, subscriptions}
    end)
  end

  @doc """
  Return all input topic names that are being subscribed to in component network.
  """
  @spec input_topics(t) :: [String.t()]
  def input_topics(component_network) do
    component_network
    |> Enum.filter(fn
      {{:input_topic, _}, _} -> true
      {{:load_topic, _}, _} -> true
      _ -> false
    end)
    |> Enum.map(fn {{_component_type, topic_name}, _} -> topic_name end)
  end

  defp validate_orphan_templates(base_network) do
    # check that all templates are subscribed somewhere -> no orphans
    case Enum.find(base_network, fn {_id, subs} -> subs == [] end) do
      nil -> :ok
      orphan -> {:error, {:orphan_template, orphan}}
    end
  end

  defp validate_input_topic_not_load_as_well(base_network) do
    # one topic is not allowed to be used in both input and load topics
    # this feature is not supported at the moment and would cause problems
    # down the road
    validation_res =
      Enum.reduce_while(base_network, {:ok, {[], []}}, fn
        {{:load_topic, topic_name}, _}, {:ok, {load_topics, input_topics}} ->
          if topic_name in input_topics do
            {:halt, {:error, {:topic_both_input_and_load, topic_name}}}
          else
            {:cont, {:ok, {[topic_name | load_topics], input_topics}}}
          end

        {{:input_topic, topic_name}, _}, {:ok, {load_topics, input_topics}} ->
          if topic_name in load_topics do
            {:halt, {:error, {:topic_both_input_and_load, topic_name}}}
          else
            {:cont, {:ok, {load_topics, [topic_name | input_topics]}}}
          end

        _, acc ->
          {:cont, acc}
      end)

    case validation_res do
      {:ok, _} -> :ok
      {:error, _reason} = error -> error
    end
  end

  defp validate_missing_components(network) do
    subscription_targets = subscription_targets_set(network)

    case Enum.find(subscription_targets, fn comp -> comp not in Map.keys(network) end) do
      nil -> :ok
      missing -> {:error, {:missing_component, missing}}
    end
  end

  defp template_subscription_to_subscription(template_subscription) do
    case template_subscription do
      {:input_topic, name, properties} -> {{:input_topic, name}, properties}
      {:load_topic, name, properties} -> {{:load_topic, name}, properties}
      {:template, module, properties} -> {{:template, module}, properties}
    end
  end

  # adds other component types to network (input, timezip, sink)
  @spec expand(t, boolean) :: {:ok, t} | {:error, term}
  defp expand(network, use_direct_ticking?) do
    network
    # add input_topic, load_topic and tick_timezip components
    |> Enum.flat_map(&maybe_with_input_topic_nodes(&1, use_direct_ticking?))
    # add timezip components
    |> Enum.flat_map(&maybe_with_timezip_node/1)
    |> Enum.into(%{})
    |> add_output_sink()
  end

  @spec maybe_with_input_topic_nodes({component, [subscription]}, boolean) :: [
          {component, [subscription]}
        ]
  defp maybe_with_input_topic_nodes(node, use_direct_ticking?) do
    if use_direct_ticking? do
      maybe_with_input_topic_nodes_direct_ticking(node)
    else
      maybe_with_input_topic_nodes_topic_ticking(node)
    end
  end

  @spec maybe_with_input_topic_nodes_topic_ticking({component, [subscription]}) :: [
          {component, [subscription]}
        ]
  defp maybe_with_input_topic_nodes_topic_ticking({_component, subscriptions} = node) do
    input_topic_nodes =
      subscriptions
      |> Enum.filter(fn
        {{:input_topic, _}, _} -> true
        {{:load_topic, _}, _} -> true
        _ -> false
      end)
      |> Enum.map(fn {topic_component, _sub_props} -> {topic_component, []} end)

    [node | input_topic_nodes]
  end

  @spec maybe_with_input_topic_nodes_direct_ticking({component, [subscription]}) :: [
          {component, [subscription]}
        ]
  defp maybe_with_input_topic_nodes_direct_ticking({component, subscriptions}) do
    # In this step we add input topic nodes and corresponding tick_timezip nodes.
    #
    # We transform this:
    #
    #            sub props
    #           /--------> nonexisting topic A node
    #   template
    #           \--------> nonexisting topic B node
    #
    # into this:
    #
    #            sub props                no sub props
    #           /--------> tick_timezip A -----------> topic A
    #   template
    #           \--------> tick_timezip B -----------> topic B
    #
    # Subscription property is typically a message type selector.
    # Note that each topic has always only one tick_timezip instance,
    # no matter how many message types it has.

    {updated_subs, new_nodes} =
      subscriptions
      |> Enum.map(fn
        {{comp_type, _topic_name} = topic_comp, sub_props}
        when comp_type in [:input_topic, :load_topic] ->
          # add topic and timezip node and update the subscription properly
          topic_node = {topic_comp, []}
          tick_timezip_comp = {:tick_timezip, topic_comp}
          tick_timezip_node = {tick_timezip_comp, [{topic_comp, []}]}
          {{tick_timezip_comp, sub_props}, [topic_node, tick_timezip_node]}

        subscription ->
          # leave other subscriptions unchanged
          {subscription, []}
      end)
      |> Enum.unzip()

    # we may add the components multiple times but it doesn't matter
    # as duplicate keys will be discarded when transforming to a map later on
    [{component, updated_subs} | List.flatten(new_nodes)]
  end

  @spec maybe_with_timezip_node({component, [subscription]}) :: [{component, [subscription]}]
  defp maybe_with_timezip_node({component, subscriptions}) when length(subscriptions) > 1 do
    # In this step we add timezip nodes between nodes with multiple subscriptions.
    #
    # We transform this:
    #
    #         sub props
    #        /--------> node B
    #   node A
    #        \--------> node C
    #
    # into this:
    #
    #          no original sub prop             sub props
    #                                          /--------> node B
    #   node A --------------------> timezip B,C
    #                                          \--------> node C

    timezip_node = {timezip_component = {:timezip, subscriptions}, subscriptions}
    [{component, [{timezip_component, selector: :none_after_timezip}]}, timezip_node]
  end

  defp maybe_with_timezip_node({_component, _subscriptions} = node) do
    [node]
  end

  defp add_output_sink(network) do
    has_any_subscriber? = has_subscriber_fun(network)

    templates_without_subscriber =
      network
      |> Map.keys()
      |> Enum.reject(has_any_subscriber?)

    case templates_without_subscriber do
      [] -> {:error, :component_network_does_not_contain_sink_nodes}
      [single] -> {:ok, Map.put(network, {:output_sink, :output_sink}, [{single, []}])}
      multiple -> add_output_sink_with_timezip(network, multiple)
    end
  end

  defp add_output_sink_with_timezip(network, sink_components) do
    timezip_subscription =
      Enum.map(sink_components, fn component ->
        {component, selector: :none_before_output_sink}
      end)

    timezip = {:timezip, timezip_subscription}

    complete_network =
      network
      |> Map.put(timezip, timezip_subscription)
      |> Map.put({:output_sink, :output_sink}, [{timezip, []}])

    {:ok, complete_network}
  end

  defp subscription_targets_set(network) do
    network
    |> Enum.flat_map(fn {_, subscriptions} ->
      Enum.map(subscriptions, fn {component, _props} -> component end)
    end)
    |> MapSet.new()
  end

  defp has_subscriber_fun(network) do
    all_subscription_targets = subscription_targets_set(network)
    fn component -> MapSet.member?(all_subscription_targets, component) end
  end

  @doc """
  Sorts components (keys) of component network topologicaly, but according to backward edges,
  because edge means subscription relation (not comes after but comes before).
  So instead of indegree implementation use outdegee. Its wrath of kahn's algorithm.
  """
  @spec topology_sort(t) :: [component]
  def topology_sort(network) do
    outdegrees = for {comp, subs} <- network, into: %{}, do: {comp, length(subs)}
    candidates = for {component, 0} <- outdegrees, do: component
    topsort(network, outdegrees, candidates, [])
  end

  defp topsort(_, _, [], result) do
    Enum.reverse(result)
  end

  defp topsort(network, outdegrees, [component | rest], result) do
    subscribers = subscribers_of(component, network)

    {network, outdegrees, candidates} =
      Enum.reduce(subscribers, {network, outdegrees, rest}, fn subscriber, acc ->
        inner_kahn(component, subscriber, acc)
      end)

    topsort(network, outdegrees, candidates, [component | result])
  end

  defp inner_kahn(subject, subscriber, {network, outdegrees, candidates}) do
    # remove subscription edge from subscriber to subject
    network = Map.update!(network, subscriber, &Enum.reject(&1, fn {x, _} -> x == subject end))
    outdegrees = Map.update!(outdegrees, subscriber, &(&1 - 1))

    if outdegrees[subscriber] == 0 do
      {network, outdegrees, [subscriber | candidates]}
    else
      {network, outdegrees, candidates}
    end
  end

  defp subscribers_of(subject_component, network) do
    for {component, subscriptions} <- network,
        subject_component in Enum.map(subscriptions, fn {comp, _props} -> comp end),
        do: component
  end
end