defmodule Bonny.Server.Scheduler do
@moduledoc """
Kubernetes custom scheduler interface. Built on top of `Reconciler`.
The only function that needs to be implemented is `select_node_for_pod/2`. All others defined by behaviour have default implementations.
## Examples
Will schedule each unschedule pod with `spec.schedulerName=cheap-node` to a node with a label `cheap=true`.
`nodes` is a stream that can be lazily filtered:
defmodule CheapNodeScheduler do
use Bonny.Server.Scheduler, name: "cheap-node"
@impl Bonny.Server.Scheduler
def select_node_for_pod(_pod, nodes) do
nodes
|> Stream.filter(fn(node) ->
is_cheap = K8s.Resource.label(node, "cheap")
is_cheap == "true"
end)
|> Enum.take(1)
|> List.first
end
end
CheapNodeScheduler.start_link()
Will schedule each unschedule pod with `spec.schedulerName=random-node` to a random node:
defmodule RandomNodeScheduler do
use Bonny.Server.Scheduler, name: "random-node"
@impl Bonny.Server.Scheduler
def select_node_for_pod(_pod, nodes) do
Enum.random(nodes)
end
end
RandomNodeScheduler.start_link()
Override `nodes/0` default implementation (`pods/0` can be overridden too).
Schedules pod on a random GPU node:
defmodule GpuScheduler do
use Bonny.Server.Scheduler, name: "gpu-node"
@impl Bonny.Server.Scheduler
def select_node_for_pod(_pod, nodes) do
Enum.random(nodes)
end
@impl Bonny.Server.Scheduler
def nodes() do
label = "my.label.on.gpu.instances"
conn = Bonny.Config.conn()
op = K8s.Client.list("v1", :nodes)
K8s.Client.stream(conn, op, params: %{labelSelector: label})
end
end
GpuScheduler.start_link()
"""
require Logger
@doc """
Name of the scheduler.
"""
@callback name() :: binary()
@doc """
List of nodes available to this scheduler.
Default implementation is all nodes in cluster.
"""
@callback nodes(K8s.Conn.t()) :: {:ok, Enumerable.t()} | {:error, any()}
@doc """
Field selector for selecting unscheduled pods waiting to be scheduled by this scheduler.
Default implementation is all unscheduled pods assigned to this scheduler.
"""
@callback field_selector() :: binary()
@callback conn() :: K8s.Conn.t()
@doc """
Selects the best node for the current `pod`.
Takes the current unscheduled pod and a `Stream` of nodes. `pod` is provided in the event that `taints` or `affinities` would need to be respected by the scheduler.
Returns the node to schedule on.
"""
@callback select_node_for_pod(map, list(map)) :: map
defmacro __using__(opts) do
quote bind_quoted: [opts: opts] do
@behaviour Bonny.Server.Scheduler
@behaviour Bonny.Server.Reconciler
@name opts[:name] || Macro.to_string(__MODULE__)
@doc "Scheduler name"
@impl Bonny.Server.Scheduler
def name(), do: @name
@doc "Kubernetes HTTP API `fieldSelector`."
@impl Bonny.Server.Scheduler
def field_selector(), do: Bonny.Server.Scheduler.field_selector(@name)
@doc "List of nodes available to this scheduler."
@impl Bonny.Server.Scheduler
def nodes(conn), do: Bonny.Server.Scheduler.nodes(conn)
@spec child_spec(keyword()) :: Supervisor.child_spec()
def child_spec(args \\ []) do
list_operation =
K8s.Client.list("v1", :pods, namespace: :all)
|> Map.put(:query_params, fieldSelector: field_selector())
conn = conn()
args
|> Keyword.put(
:stream,
Bonny.Server.Reconciler.get_stream(__MODULE__, conn, list_operation)
)
|> Keyword.put(:termination_delay, 5_000)
|> Bonny.Server.AsyncStreamRunner.child_spec()
end
defdelegate conn(), to: Bonny.Config
defoverridable nodes: 1, field_selector: 0, conn: 0
@impl Bonny.Server.Reconciler
def reconcile(pod), do: Bonny.Server.Scheduler.reconcile(__MODULE__, pod)
end
end
@spec reconcile(module(), map()) :: :ok
def reconcile(scheduler, pod) do
conn = scheduler.conn()
with {:ok, nodes} <- nodes(conn),
node <- scheduler.select_node_for_pod(pod, nodes),
{:ok, _} <- Bonny.Server.Scheduler.bind(scheduler.conn(), pod, node) do
:ok
end
end
@doc "Kubernetes API `fieldSelector` value for unbound pods waiting on the given scheduler."
@spec field_selector(binary) :: binary
def field_selector(scheduler_name) do
"spec.schedulerName=#{scheduler_name},spec.nodeName="
end
@doc "Binds a pod to a node"
@spec bind(K8s.Conn.t(), map(), map()) :: {:ok, map} | {:error, atom}
def bind(conn, pod, node) do
pod =
pod
|> Map.put("apiVersion", "v1")
|> Map.put("kind", "pod")
Bonny.Server.Scheduler.Binding.create(conn, pod, node)
end
@doc "Returns a list of all nodes in the cluster."
@spec nodes(K8s.Conn.t()) :: {:ok, list(map())} | {:error, any()}
def nodes(conn) do
op = K8s.Client.list("v1", :nodes)
response = K8s.Client.stream(conn, op)
metadata = %{operation: op, library: :bonny}
case response do
{:ok, stream} ->
Logger.debug("Scheduler fetching nodes succeeded", metadata)
{:ok, Enum.into(stream, [])}
{:error, error} ->
Logger.error("Scheduler fetching nodes failed", Map.put(metadata, :error, error))
{:error, error}
end
end
end