# Copyright 2018 - 2022, Mathijs Saey, Vrije Universiteit Brussel
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
defmodule Skitter.Runtime do
@moduledoc """
Skitter runtime system API.
This modules defines various functions which can be used to query the Skitter runtime system.
Many of these functions are only useful to obtain additional information based on data returned
by [telemetry events](telemetry.md).
"""
alias Skitter.{Config, Remote, Workflow, Strategy}
alias Skitter.Runtime.{
Worker,
ConstantStore,
NodeStore,
WorkflowManager,
WorkerSupervisor,
WorkflowWorkerSupervisor,
WorkflowManagerSupervisor
}
use Skitter.Telemetry
require ConstantStore
require NodeStore
@typedoc "Reference to a deployed workflow."
@type ref :: reference()
@doc """
Get the current runtime mode.
This function returns the mode of the current runtime. The available modes and their goal are
documented in the [configuration documentation](configuration.html#modes). This function may
also return `:test`, which is only used for testing.
"""
@spec mode :: :worker | :master | :local | :test
def mode, do: Config.get(:mode, :local)
@doc """
Get the workflow for a reference or context.
The workflow will not have any in or out ports and only contain operation nodes, since
`deploy/1` flattens workflows before it deploys them.
"""
@spec get_workflow(ref() | Strategy.context()) :: Workflow.t()
def get_workflow(r) when is_reference(r), do: %Workflow{nodes: ConstantStore.get(:wf_nodes, r)}
def get_workflow(%Strategy.Context{_skr: {ref, _}}), do: get_workflow(ref)
@doc """
Get the name of the workflow node based on a context.
"""
@spec node_name_for_context(Strategy.context()) :: Workflow.name()
def node_name_for_context(%Strategy.Context{_skr: {ref, idx}}) do
NodeStore.get(:wf_node_names, ref, idx)
end
@doc """
Get the workflow node based on a context.
The workflow node will always be an operation node.
"""
@spec node_for_context(Strategy.context()) :: Workflow.operation_node()
def node_for_context(context), do: get_workflow(context).nodes[node_name_for_context(context)]
@doc """
Get the reference based on a context.
This can be used to link a telemetry event which contained a context to a deployed workflow.
"""
@spec ref_for_context(Strategy.context()) :: ref()
def ref_for_context(%Strategy.Context{_skr: {ref, _}}), do: ref
def ref_for_context(%Strategy.Context{_skr: {:deploy, ref, _}}), do: ref
@doc """
Get a list with references to every spawned workflow.
This function communicates with the master node when called from a worker runtime.
"""
@spec spawned_workflows :: [ref()]
def spawned_workflows do
case mode() do
:worker ->
Remote.on(Remote.master(), WorkflowManagerSupervisor, :spawned_workflow_references, [])
_ ->
WorkflowManagerSupervisor.spawned_workflow_references()
end
end
@doc """
Deploy a workflow over the cluster.
Starts a Skitter application (i.e. a `t:Skitter.Workflow.t/0`) by deploying it over the cluster.
The workflow is flattened (using `Skitter.Workflow.flatten/1`) before it is deployed. After
deployment, this function returns a `t:ref/0`, which can be used by various functions in this
module.
"""
@spec deploy(Workflow.t()) :: ref()
def deploy(workflow) do
ref = make_ref()
nodes = Workflow.flatten(workflow).nodes
# Store information to extract workflow information from the context
ConstantStore.put_everywhere(nodes, :wf_nodes, ref)
nodes |> Map.keys() |> NodeStore.put_everywhere(:wf_node_names, ref)
# Create supervisors on all workers for every node in the workflow
Remote.on_all_workers(WorkflowWorkerSupervisor, :spawn_local_workflow, [ref, map_size(nodes)])
# Store deployment information and links on all nodes
deploy_nodes(nodes, ref)
expand_links(nodes, ref)
# Create manager
WorkflowManagerSupervisor.add_manager(ref)
notify_workers(nodes, ref)
Telemetry.emit([:runtime, :deploy], %{}, %{ref: ref})
ref
end
# Deploy all nodes
defp deploy_nodes(nodes, ref) do
nodes
|> Enum.with_index()
|> Enum.map(fn {{_, node}, i} ->
context = %Strategy.Context{
operation: node.operation,
strategy: node.strategy,
_skr: {:deploy, ref, i}
}
Telemetry.wrap [:hook, :deploy], %{context: context} do
node.strategy.deploy(context, node.args)
end
end)
|> NodeStore.put_everywhere(:deployment, ref)
end
# Lookup link destinations and create contexts in advance to avoid doing this at runtime.
defp expand_links(nodes, ref) do
lookup =
nodes
|> Enum.with_index()
|> Map.new(fn {{name, node}, i} ->
{name,
%Strategy.Context{
operation: node.operation,
strategy: node.strategy,
deployment: NodeStore.get(:deployment, ref, i),
_skr: {ref, i}
}}
end)
nodes
|> Enum.map(fn {_, node} ->
Map.new(node.links, fn {out_port, destinations} ->
{out_port, Enum.map(destinations, fn {name, in_port} -> {lookup[name], in_port} end)}
end)
end)
|> NodeStore.put_everywhere(:links, ref)
end
# We notify workers to finish deploying in reverse topological order of the application DAG.
# This avoids race conditions where nodes can send data to other nodes which did not finish
# deploying yet.
defp notify_workers(_, ref) do
ref
|> topological_indices()
|> Enum.reverse()
|> Enum.each(fn idx ->
Remote.on_all_workers(WorkerSupervisor, :all_children, [ref, idx, &Worker.deploy_complete/1])
end)
end
@doc "Stop the workflow with reference `ref`."
@spec stop(ref()) :: :ok
def stop(ref) do
Telemetry.emit([:runtime, :stop], %{}, %{ref: ref})
WorkflowManager.stop(ref)
# TODO: stop hook if introduced
stop_workers(ref)
Remote.on_all_workers(WorkflowWorkerSupervisor, :stop_local_workflow, [ref])
remove_constants(ref)
:ok
end
defp stop_workers(ref) do
ref
|> topological_indices()
|> Enum.each(fn idx ->
Remote.on_all_workers(fn ->
WorkerSupervisor.stop(NodeStore.get(:local_supervisors, ref, idx))
end)
end)
end
defp remove_constants(ref) do
[:manager, :wf_nodes, :wf_node_names, :deployment, :links]
|> Enum.each(&ConstantStore.remove(&1, ref))
Remote.on_all_workers(fn ->
[
:wf_nodes,
:wf_node_names,
:operation_worker_supervisors,
:deployment,
:links,
:local_supervisors
]
|> Enum.each(&ConstantStore.remove(&1, ref))
end)
end
defp topological_indices(ref) do
NodeStore.get_all(:links, ref)
|> Enum.map(&Map.values/1)
|> Enum.map(&Enum.concat/1)
|> Enum.map(&Enum.map(&1, fn {%Strategy.Context{_skr: {_, i}}, _} -> i end))
|> Enum.map(&MapSet.new/1)
|> Enum.with_index()
|> build_topological()
end
defp build_topological([]), do: []
defp build_topological(lst) do
{tail, rem} = Enum.split_with(lst, &(MapSet.size(elem(&1, 0)) == 0))
tail = Enum.map(tail, &elem(&1, 1))
rem = Enum.map(rem, fn {set, idx} -> {MapSet.difference(set, MapSet.new(tail)), idx} end)
build_topological(rem) ++ tail
end
end