defmodule K8s.Client.Runner.Wait do
@moduledoc """
Waiting functionality for `K8s.Client`.
Note: This is built using repeated GET operations rather than using a [watch](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.13/#watch-list-deployment-v1-apps) operation w/ `fieldSelector`.
"""
alias K8s.{Conn, Operation}
alias K8s.Client.Runner.{Base, Wait}
alias K8s.Operation.Error
@typedoc "A wait configuration"
@type t :: %__MODULE__{
timeout: pos_integer,
sleep: pos_integer,
eval: any | (any -> any),
find: list(binary) | (any -> any),
timeout_after: NaiveDateTime.t(),
processor: (map(), map() -> {:ok, map} | {:error, Error.t()})
}
defstruct [:timeout, :sleep, :eval, :find, :timeout_after, :processor]
@doc """
Continually perform a GET based operation until a condition is met.
## Example
This follow example will wait 60 seconds for the field `status.succeeded` to equal `1`.
```elixir
op = K8s.Client.get("batch/v1", :job, namespace: "default", name: "sleep")
opts = [find: ["status", "succeeded"], eval: 1, timeout: 60]
{:ok, conn} = K8s.Conn.from_file("test/support/kube-config.yaml")
resp = K8s.Client.Runner.Wait.run(conn, op, opts)
```
"""
@spec run(Conn.t(), Operation.t(), keyword()) ::
{:ok, map()} | {:error, :timeout | Error.t()}
def run(%Conn{} = conn, %Operation{method: :get} = op, opts) do
conditions =
Wait
|> struct(opts)
|> process_opts()
case conditions do
{:ok, opts} -> run_operation(conn, op, opts)
error -> error
end
end
def run(op, _, _),
do: {:error, %Error{message: "Only HTTP GET operations are supported. #{inspect(op)}"}}
@spec process_opts(Wait.t() | map) :: {:error, Error.t()} | {:ok, map}
defp process_opts(%Wait{eval: nil}), do: {:error, %Error{message: ":eval is required"}}
defp process_opts(%Wait{find: nil}), do: {:error, %Error{message: ":find is required"}}
defp process_opts(opts) when is_map(opts) do
timeout = Map.get(opts, :timeout) || 30
sleep = Map.get(opts, :sleep) || 1
now = NaiveDateTime.utc_now()
timeout_after = NaiveDateTime.add(now, timeout, :second)
processor = Map.get(opts, :processor) || (&Base.run/2)
processed =
opts
|> Map.put(:timeout, timeout)
|> Map.put(:sleep, sleep * 1000)
|> Map.put(:timeout_after, timeout_after)
|> Map.put(:processor, processor)
{:ok, processed}
end
@spec run_operation(Conn.t(), Operation.t(), Wait.t()) :: {:error, :timeout} | {:ok, any}
defp run_operation(
%Conn{} = conn,
%Operation{} = op,
%Wait{timeout_after: timeout_after} = opts
) do
case timed_out?(timeout_after) do
true -> {:error, :timeout}
false -> evaluate_operation(conn, op, opts)
end
end
@spec evaluate_operation(Conn.t(), Operation.t(), Wait.t()) :: {:error, :timeout} | {:ok, any}
defp evaluate_operation(
%Conn{} = conn,
%Operation{} = op,
%Wait{processor: processor, sleep: sleep, eval: eval, find: find} = opts
) do
with {:ok, resp} <- processor.(conn, op),
true <- satisfied?(resp, find, eval) do
{:ok, resp}
else
_not_satisfied ->
Process.sleep(sleep)
run_operation(conn, op, opts)
end
end
@spec satisfied?(map, function | list, any) :: boolean
defp satisfied?(resp = %{}, find, eval) when is_list(find) do
value = get_in(resp, find)
compare(value, eval)
end
defp satisfied?(resp = %{}, find, eval) when is_function(find) do
value = find.(resp)
compare(value, eval)
end
@spec compare(any, any) :: boolean
defp compare(value, eval) when not is_function(eval), do: value == eval
defp compare(value, eval) when is_function(eval), do: eval.(value)
@spec timed_out?(NaiveDateTime.t()) :: boolean
defp timed_out?(timeout_after) do
case NaiveDateTime.compare(NaiveDateTime.utc_now(), timeout_after) do
:gt -> true
_ -> false
end
end
end