lib/runtime.ex

# Copyright 2018 - 2022, Mathijs Saey, Vrije Universiteit Brussel

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

defmodule Skitter.Runtime do
  @moduledoc """
  Interface to the skitter runtime system.
  """
  alias Skitter.{Config, Remote, Workflow, Operation, Strategy}

  alias Skitter.Runtime.{
    Worker,
    ConstantStore,
    NodeStore,
    WorkflowManager,
    WorkerSupervisor,
    WorkflowWorkerSupervisor,
    WorkflowManagerSupervisor
  }

  use Skitter.Telemetry
  require ConstantStore
  require NodeStore

  @typedoc "Reference to a deployed workflow."
  @type ref :: reference()

  @doc """
  Get the current runtime mode.

  This function returns the mode of the current runtime. The available modes and their goal are
  documented in the [configuration documentation](configuration.html#modes). This function may
  also return `:test`, which is only used for testing.
  """
  @spec mode :: :worker | :master | :local | :test
  def mode, do: Config.get(:mode, :local)

  @doc """
  Get the workflow for a reference or context.

  The workflow will not have any in or out ports and only contain operation nodes, since
  `deploy/1` flattens workflows before it deploys them.
  """
  @spec get_workflow(ref() | Strategy.context()) :: Workflow.t()
  def get_workflow(r) when is_reference(r), do: %Workflow{nodes: ConstantStore.get(:wf_nodes, r)}
  def get_workflow(%Strategy.Context{_skr: {ref, _}}), do: get_workflow(ref)

  @doc """
  Get the name of the workflow node based on a context.
  """
  @spec node_name_for_context(Strategy.context()) :: Workflow.name()
  def node_name_for_context(%Strategy.Context{_skr: {ref, idx}}) do
    NodeStore.get(:wf_node_names, ref, idx)
  end

  @doc """
  Get the workflow node based on a context.

  The workflow node will always be an operation node.
  """
  @spec node_for_context(Strategy.context()) :: Workflow.operation_node()
  def node_for_context(context), do: get_workflow(context).nodes[node_name_for_context(context)]

  @doc """
  Get the reference based on a context.

  This can be used to link a telemetry event which contained a context to a deployed workflow.
  """
  @spec ref_for_context(Strategy.context()) :: ref()
  def ref_for_context(%Strategy.Context{_skr: {ref, _}}), do: ref
  def ref_for_context(%Strategy.Context{_skr: {:deploy, ref, _}}), do: ref

  @doc """
  Get a list with references to every spawned workflow.

  This function communicates with the master node when called from a worker runtime.
  """
  @spec spawned_workflows :: [ref()]
  def spawned_workflows do
    case mode() do
      :worker ->
        Remote.on(Remote.master(), WorkflowManagerSupervisor, :spawned_workflow_references, [])

      _ ->
        WorkflowManagerSupervisor.spawned_workflow_references()
    end
  end

  @doc """
  Deploy a workflow.

  Starts a Skitter application (i.e. a workflow) by deploying it over the cluster. Returns a
  reference to the deployed workflow.
  """
  @spec deploy(Workflow.t()) :: ref()
  def deploy(workflow) do
    ref = make_ref()
    nodes = Workflow.flatten(workflow).nodes

    # Store information to extract workflow information from the context
    ConstantStore.put_everywhere(nodes, :wf_nodes, ref)
    nodes |> Map.keys() |> NodeStore.put_everywhere(:wf_node_names, ref)

    # Create supervisors on all workers for every node in the workflow
    Remote.on_all_workers(WorkflowWorkerSupervisor, :spawn_local_workflow, [ref, map_size(nodes)])

    # Store deployment information and links on all nodes
    deploy_nodes(nodes, ref)
    expand_links(nodes, ref)

    # Create manager
    WorkflowManagerSupervisor.add_manager(ref)
    notify_workers(nodes, ref)

    Telemetry.emit([:runtime, :deploy], %{}, %{ref: ref})
    ref
  end

  # Deploy all nodes
  defp deploy_nodes(nodes, ref) do
    nodes
    |> Enum.with_index()
    |> Enum.map(fn {{_, node}, i} ->
      context = %Strategy.Context{
        operation: node.operation,
        strategy: node.strategy,
        args: node.args,
        _skr: {:deploy, ref, i}
      }

      Telemetry.wrap [:hook, :deploy], %{context: context} do
        node.strategy.deploy(context)
      end
    end)
    |> NodeStore.put_everywhere(:deployment, ref)
  end

  # Lookup link destinations and create contexts in advance to avoid doing this at runtime.
  defp expand_links(nodes, ref) do
    lookup =
      nodes
      |> Enum.with_index()
      |> Map.new(fn {{name, node}, i} ->
        {name,
         %Strategy.Context{
           operation: node.operation,
           strategy: node.strategy,
           args: node.args,
           deployment: NodeStore.get(:deployment, ref, i),
           _skr: {ref, i}
         }}
      end)

    nodes
    |> Enum.map(fn {_, node} ->
      Map.new(node.links, fn {out_port, destinations} ->
        {out_port,
         Enum.map(destinations, fn {name, in_port} ->
           context = lookup[name]
           {context, Operation.in_port_to_index(context.operation, in_port)}
         end)}
      end)
    end)
    |> NodeStore.put_everywhere(:links, ref)
  end

  # We notify workers to finish deploying in reverse topological order of the application DAG.
  # This avoids race conditions where nodes can send data to other nodes which did not finish
  # deploying yet.
  defp notify_workers(_, ref) do
    ref
    |> topological_indices()
    |> Enum.reverse()
    |> Enum.each(fn idx ->
      Remote.on_all_workers(WorkerSupervisor, :all_children, [ref, idx, &Worker.deploy_complete/1])
    end)
  end

  @doc "Stop the workflow with reference `ref`."
  @spec stop(ref()) :: :ok
  def stop(ref) do
    Telemetry.emit([:runtime, :stop], %{}, %{ref: ref})

    WorkflowManager.stop(ref)
    # TODO: stop hook if introduced
    stop_workers(ref)
    Remote.on_all_workers(WorkflowWorkerSupervisor, :stop_local_workflow, [ref])
    remove_constants(ref)
    :ok
  end

  defp stop_workers(ref) do
    ref
    |> topological_indices()
    |> Enum.each(fn idx ->
      Remote.on_all_workers(fn ->
        WorkerSupervisor.stop(NodeStore.get(:local_supervisors, ref, idx))
      end)
    end)
  end

  defp remove_constants(ref) do
    [:manager, :wf_nodes, :wf_node_names, :deployment, :links]
    |> Enum.each(&ConstantStore.remove(&1, ref))

    Remote.on_all_workers(fn ->
      [
        :wf_nodes,
        :wf_node_names,
        :operation_worker_supervisors,
        :deployment,
        :links,
        :local_supervisors
      ]
      |> Enum.each(&ConstantStore.remove(&1, ref))
    end)
  end

  defp topological_indices(ref) do
    NodeStore.get_all(:links, ref)
    |> Enum.map(&Map.values/1)
    |> Enum.map(&Enum.concat/1)
    |> Enum.map(&Enum.map(&1, fn {%Strategy.Context{_skr: {_, i}}, _} -> i end))
    |> Enum.map(&MapSet.new/1)
    |> Enum.with_index()
    |> build_topological()
  end

  defp build_topological([]), do: []

  defp build_topological(lst) do
    {tail, rem} = Enum.split_with(lst, &(MapSet.size(elem(&1, 0)) == 0))

    tail = Enum.map(tail, &elem(&1, 1))
    rem = Enum.map(rem, fn {set, idx} -> {MapSet.difference(set, MapSet.new(tail)), idx} end)

    build_topological(rem) ++ tail
  end
end