Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix waiting for all pods having specified labels to be Ready #20315

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions cmd/minikube/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,14 @@ import (
"github.com/spf13/viper"
"golang.org/x/text/cases"
"golang.org/x/text/language"
"k8s.io/minikube/pkg/minikube/command"
"k8s.io/minikube/pkg/minikube/firewall"
netutil "k8s.io/minikube/pkg/network"

"k8s.io/klog/v2"

cmdcfg "k8s.io/minikube/cmd/minikube/cmd/config"
"k8s.io/minikube/pkg/drivers/kic/oci"
"k8s.io/minikube/pkg/minikube/bootstrapper/bsutil"
"k8s.io/minikube/pkg/minikube/bootstrapper/bsutil/kverify"
"k8s.io/minikube/pkg/minikube/bootstrapper/images"
"k8s.io/minikube/pkg/minikube/command"
"k8s.io/minikube/pkg/minikube/config"
"k8s.io/minikube/pkg/minikube/constants"
"k8s.io/minikube/pkg/minikube/cruntime"
Expand All @@ -64,6 +63,7 @@ import (
"k8s.io/minikube/pkg/minikube/driver"
"k8s.io/minikube/pkg/minikube/driver/auxdriver"
"k8s.io/minikube/pkg/minikube/exit"
"k8s.io/minikube/pkg/minikube/firewall"
"k8s.io/minikube/pkg/minikube/kubeconfig"
"k8s.io/minikube/pkg/minikube/localpath"
"k8s.io/minikube/pkg/minikube/machine"
Expand All @@ -74,13 +74,14 @@ import (
"k8s.io/minikube/pkg/minikube/out/register"
"k8s.io/minikube/pkg/minikube/pause"
"k8s.io/minikube/pkg/minikube/reason"
"k8s.io/minikube/pkg/minikube/style"
pkgtrace "k8s.io/minikube/pkg/trace"

"k8s.io/minikube/pkg/minikube/registry"
"k8s.io/minikube/pkg/minikube/style"
"k8s.io/minikube/pkg/minikube/translate"
netutil "k8s.io/minikube/pkg/network"
pkgtrace "k8s.io/minikube/pkg/trace"
"k8s.io/minikube/pkg/util"
"k8s.io/minikube/pkg/version"
kconst "k8s.io/minikube/third_party/kubeadm/app/constants"
)

type versionJSON struct {
Expand Down Expand Up @@ -287,6 +288,12 @@ func runStart(cmd *cobra.Command, _ []string) {
exit.Error(reason.GuestStart, "failed to start node", err)
}

if starter.Cfg.VerifyComponents[kverify.ExtraKey] {
if err := kverify.WaitExtra(ClusterFlagValue(), kverify.CorePodsLabels, kconst.DefaultControlPlaneTimeout); err != nil {
exit.Message(reason.GuestStart, "extra waiting: {{.error}}", out.V{"error": err})
}
}

if err := showKubectlInfo(kubeconfig, starter.Node.KubernetesVersion, starter.Node.ContainerRuntime, starter.Cfg.Name); err != nil {
klog.Errorf("kubectl info: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/minikube/bootstrapper/bsutil/kverify/kverify.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ var (
// DefaultWaitList is list of all default components to wait for. only names to be used for start flags.
DefaultWaitList = []string{APIServerWaitKey, SystemPodsWaitKey}
// AllComponentsList list of all valid components keys to wait for. only names to be used used for start flags.
AllComponentsList = []string{APIServerWaitKey, SystemPodsWaitKey, DefaultSAWaitKey, AppsRunningKey, NodeReadyKey, KubeletKey}
AllComponentsList = []string{APIServerWaitKey, SystemPodsWaitKey, DefaultSAWaitKey, AppsRunningKey, NodeReadyKey, KubeletKey, ExtraKey}
// AppsRunningList running list are valid k8s-app components to wait for them to be running
AppsRunningList = []string{
"kube-dns", // coredns
Expand Down
42 changes: 23 additions & 19 deletions pkg/minikube/bootstrapper/bsutil/kverify/node_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,43 +38,47 @@ func WaitNodeCondition(cs *kubernetes.Clientset, name string, condition core.Nod
klog.Infof("duration metric: took %s for node %q to be %q ...", time.Since(start), name, condition)
}()

lap := time.Now()
checkCondition := func(_ context.Context) (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("timed out waiting %v for node %q to be %q (will not retry!)", timeout, name, condition)
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

status, reason := nodeConditionStatus(cs, name, condition)
lap := time.Now()
checkCondition := func(ctx context.Context) (bool, error) {
status, err := nodeConditionStatus(ctx, cs, name, condition)
// done if node has condition
if status == core.ConditionTrue {
klog.Info(reason)
klog.Infof("node %q is %q", name, condition)
return true, nil
}
// retry in all other cases, decrease log spam
if time.Since(lap) > (2 * time.Second) {
klog.Info(reason)
if err != nil {
klog.Warningf("error getting node %q condition %q status (will retry): %v", name, condition, err)
} else {
klog.Warningf("node %q has %q:%q status (will retry)", name, condition, status)
}
lap = time.Now()
}
return false, nil
}
if err := wait.PollUntilContextTimeout(context.Background(), kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, true, checkCondition); err != nil {
return fmt.Errorf("waitNodeCondition: %w", err)
if err := wait.PollUntilContextCancel(ctx, kconst.APICallRetryInterval, true, checkCondition); err != nil {
return fmt.Errorf("WaitNodeCondition: %w", err)
}

return nil
}

// nodeConditionStatus returns if node is in specified condition and verbose reason.
func nodeConditionStatus(cs *kubernetes.Clientset, name string, condition core.NodeConditionType) (status core.ConditionStatus, reason string) {
node, err := cs.CoreV1().Nodes().Get(context.Background(), name, meta.GetOptions{})
// nodeConditionStatus checks if node exists and returns condition status.
func nodeConditionStatus(ctx context.Context, cs *kubernetes.Clientset, name string, condition core.NodeConditionType) (core.ConditionStatus, error) {
node, err := cs.CoreV1().Nodes().Get(ctx, name, meta.GetOptions{})
if err != nil {
return core.ConditionUnknown, fmt.Sprintf("error getting node %q: %v", name, err)
return core.ConditionUnknown, err
}

// check if node has the condition
for _, c := range node.Status.Conditions {
if c.Type == condition {
return c.Status, fmt.Sprintf("node %q has status %q:%q", node.Name, condition, c.Status)
return c.Status, nil
}
}

// assume transient condition
return core.ConditionFalse, fmt.Sprintf("node %q doesn't have %q status: %+v", node.Name, condition, node.Status)
// assume transient error
return core.ConditionUnknown, fmt.Errorf("node %q does not have %q condition type: %+v", name, condition, node.Status)
}
148 changes: 73 additions & 75 deletions pkg/minikube/bootstrapper/bsutil/kverify/pod_ready.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,140 +19,138 @@ package kverify

import (
"context"
"errors"
"fmt"
"time"

core "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"k8s.io/minikube/pkg/kapi"
kconst "k8s.io/minikube/third_party/kubeadm/app/constants"
)

// WaitExtra calls waitPodCondition for all system-critical pods including those with specified labels.
func WaitExtra(cs *kubernetes.Clientset, labels []string, timeout time.Duration) error {
klog.Infof("extra waiting up to %v for all system-critical pods including labels %v to be %q ...", timeout, labels, core.PodReady)
// WaitExtra calls waitPodCondition for all (at least one) kube-system pods having one of specified labels to be "Ready" on profile cluster.
func WaitExtra(profile string, labels []string, timeout time.Duration) error {
klog.Infof("extra waiting up to %v for all %q pods having one of %v labels to be %q ...", timeout, meta.NamespaceSystem, labels, core.PodReady)
start := time.Now()
defer func() {
klog.Infof("duration metric: took %s for extra waiting for all system-critical and pods with labels %v to be %q ...", time.Since(start), labels, core.PodReady)
klog.Infof("duration metric: took %s for extra waiting for all %q pods having one of %v labels to be %q ...", time.Since(start), meta.NamespaceSystem, labels, core.PodReady)
}()

pods, err := cs.CoreV1().Pods(meta.NamespaceSystem).List(context.Background(), meta.ListOptions{})
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

cs, err := kapi.Client(profile)
if err != nil {
return fmt.Errorf("error listing pods in %q namespace: %w", meta.NamespaceSystem, err)
return fmt.Errorf("failed to get kube client: %v", err)
}

for _, pod := range pods.Items {
if time.Since(start) > timeout {
return fmt.Errorf("timed out waiting %v for all system-critical and pods with labels %v to be %q", timeout, labels, core.NodeReady)
// podsReady poll function checks if all (at least one) pods in the namespace having the label is Ready
var label string
podsReady := func(ctx context.Context) (bool, error) {
pods, err := cs.CoreV1().Pods(meta.NamespaceSystem).List(ctx, meta.ListOptions{LabelSelector: label})
if err != nil {
klog.Warningf("error listing pods in %q namespace with %q label, will retry: %v", meta.NamespaceSystem, label, err)
return false, nil
}

for k, v := range pod.Labels {
label := fmt.Sprintf("%s=%s", k, v)
match := false
for _, l := range labels {
if l == label {
match = true
break
}
}
// ignore system-critical pods' non-essential labels
if !match && pod.Namespace != meta.NamespaceSystem && k != "k8s-app" && k != "component" {
continue
}
if match || pod.Spec.PriorityClassName == "system-cluster-critical" || pod.Spec.PriorityClassName == "system-node-critical" {
if err := waitPodCondition(cs, pod.Name, pod.Namespace, core.PodReady, timeout); err != nil {
klog.Errorf("WaitExtra: %v", err)
}
break
if len(pods.Items) == 0 {
klog.Warningf("no pods in %q namespace with %q label found, will retry", meta.NamespaceSystem, label)
return false, nil
}
for _, pod := range pods.Items {
if err := waitPodCondition(ctx, cs, pod.Name, pod.Namespace, core.PodReady); err != nil {
klog.Warningf("not all pods in %q namespace with %q label are %q, will retry: %v", meta.NamespaceSystem, label, core.PodReady, err)
return false, nil
}
}
return true, nil
}
for _, l := range labels {
label = l
if err := wait.PollUntilContextCancel(ctx, kconst.APICallRetryInterval, true, podsReady); err != nil {
return fmt.Errorf("WaitExtra: %w", err)
}
}

return nil
}

// waitPodCondition waits for specified condition of podName in a namespace.
func waitPodCondition(cs *kubernetes.Clientset, name, namespace string, condition core.PodConditionType, timeout time.Duration) error {
klog.Infof("waiting up to %v for pod %q in %q namespace to be %q ...", timeout, name, namespace, condition)
// waitPodCondition waits for specified condition of pod name in namespace.
func waitPodCondition(ctx context.Context, cs *kubernetes.Clientset, name, namespace string, condition core.PodConditionType) error {
klog.Infof("waiting for pod %q in %q namespace to be %q or be gone ...", name, namespace, condition)
start := time.Now()
defer func() {
klog.Infof("duration metric: took %s for pod %q in %q namespace to be %q ...", time.Since(start), name, namespace, condition)
klog.Infof("duration metric: took %s for pod %q in %q namespace to be %q or be gone ...", time.Since(start), name, namespace, condition)
}()

lap := time.Now()
checkCondition := func(_ context.Context) (bool, error) {
if time.Since(start) > timeout {
return false, fmt.Errorf("timed out waiting %v for pod %q in %q namespace to be %q (will not retry!)", timeout, name, namespace, condition)
}

status, reason := podConditionStatus(cs, name, namespace, condition)
checkCondition := func(ctx context.Context) (bool, error) {
status, err := podConditionStatus(ctx, cs, name, namespace, condition)
// done if pod has condition
if status == core.ConditionTrue {
klog.Info(reason)
klog.Infof("pod %q is %q", name, condition)
return true, nil
}
// return immediately: status == core.ConditionUnknown
if status == core.ConditionUnknown {
klog.Info(reason)
return false, errors.New(reason)
// back off if pod or node is gone
if kerrors.IsNotFound(err) || status == core.TaintNodeUnreachable {
klog.Infof("pod %q in %q namespace is gone: %v", name, namespace, err)
return true, nil
}
// reduce log spam
// retry in all other cases, decrease log spam
if time.Since(lap) > (2 * time.Second) {
klog.Info(reason)
klog.Warningf("pod %q is not %q, error: %v", name, condition, err)
lap = time.Now()
}
// return immediately: status == core.ConditionFalse
return false, nil
}
if err := wait.PollUntilContextTimeout(context.Background(), kconst.APICallRetryInterval, kconst.DefaultControlPlaneTimeout, true, checkCondition); err != nil {
if err := wait.PollUntilContextCancel(ctx, kconst.APICallRetryInterval, true, checkCondition); err != nil {
return fmt.Errorf("waitPodCondition: %w", err)
}

return nil
}

// podConditionStatus returns if pod is in specified condition and verbose reason.
func podConditionStatus(cs *kubernetes.Clientset, name, namespace string, condition core.PodConditionType) (status core.ConditionStatus, reason string) {
pod, err := cs.CoreV1().Pods(namespace).Get(context.Background(), name, meta.GetOptions{})
// podConditionStatus returns if pod is in specified condition.
func podConditionStatus(ctx context.Context, cs *kubernetes.Clientset, name, namespace string, condition core.PodConditionType) (core.ConditionStatus, error) {
pod, err := cs.CoreV1().Pods(namespace).Get(ctx, name, meta.GetOptions{})
if err != nil {
return core.ConditionUnknown, fmt.Sprintf("error getting pod %q in %q namespace (skipping!): %v", name, namespace, err)
return core.ConditionUnknown, fmt.Errorf("getting pod %q in %q namespace (will retry): %w", name, namespace, err)
}

// check if undelying node is Ready - in case we got stale data about the pod
if pod.Spec.NodeName != "" {
if status, reason := nodeConditionStatus(cs, pod.Spec.NodeName, core.NodeReady); status != core.ConditionTrue {
return core.ConditionUnknown, fmt.Sprintf("node %q hosting pod %q in %q namespace is currently not %q (skipping!): %v", pod.Spec.NodeName, name, namespace, core.NodeReady, reason)
// check if pod is scheduled on any node
if pod.Spec.NodeName == "" {
return core.ConditionUnknown, fmt.Errorf("pod %q in %q namespace is not scheduled on any node (will retry): %+v", name, namespace, pod.Status)
}
// check if node exists and is Ready (KubeAPI)
nodeReadyStatus, err := nodeConditionStatus(ctx, cs, pod.Spec.NodeName, core.NodeReady)
if err != nil {
if kerrors.IsNotFound(err) {
return core.TaintNodeUnreachable, fmt.Errorf("node %q hosting pod %q is not found/running (skipping!): %v", pod.Spec.NodeName, name, err)
}
return core.ConditionUnknown, fmt.Errorf("node %q hosting pod %q is not %q (will retry): %v", pod.Spec.NodeName, name, core.NodeReady, err)
}

if pod.Status.Phase != core.PodRunning && pod.Status.Phase != core.PodPending {
return core.ConditionUnknown, fmt.Sprintf("pod %q in %q namespace has status phase %q (skipping!): %+v", pod.Name, pod.Namespace, pod.Status.Phase, pod.Status)
if nodeReadyStatus != core.ConditionTrue {
return core.ConditionUnknown, fmt.Errorf("node %q hosting pod %q is not %q (will retry)", pod.Spec.NodeName, name, core.NodeReady)
}

// check if pod has the condition
for _, c := range pod.Status.Conditions {
if c.Type == condition {
return c.Status, fmt.Sprintf("pod %q in %q namespace has status %q:%q", pod.Name, pod.Namespace, condition, c.Status)
return c.Status, nil
}
}

// assume transient condition
return core.ConditionFalse, fmt.Sprintf("pod %q in %q namespace doesn't have %q status: %+v", pod.Name, pod.Namespace, core.PodReady, pod.Status)
// assume transient error
return core.ConditionUnknown, fmt.Errorf("pod %q does not have %q condition type: %+v", name, condition, pod.Status)
}

// IsPodReady returns if pod is Ready and verbose reason.
func IsPodReady(pod *core.Pod) (ready bool, reason string) {
if pod.Status.Phase != core.PodRunning {
return false, fmt.Sprintf("pod %q in %q namespace is not Running: %+v", pod.Name, pod.Namespace, pod.Status)
}
// IsPodReady returns if pod is Ready.
func IsPodReady(pod *core.Pod) bool {
for _, c := range pod.Status.Conditions {
if c.Type == core.PodReady {
if c.Status != core.ConditionTrue {
return false, fmt.Sprintf("pod %q in %q namespace is not Ready: %+v", pod.Name, pod.Namespace, c)
}
return true, fmt.Sprintf("pod %q in %q namespace is Ready: %+v", pod.Name, pod.Namespace, c)
return c.Status == core.ConditionTrue
}
}
return false, fmt.Sprintf("pod %q in %q namespace does not have %q status: %+v", pod.Name, pod.Namespace, core.PodReady, pod.Status)
// assume transient error
return false
}
Loading