diff --git a/Makefile b/Makefile index 1aa2b1e..5de2a11 100644 --- a/Makefile +++ b/Makefile @@ -126,7 +126,7 @@ export INTEGRATION_GOMEGA_DEFAULT_CONSISTENTLY_DURATION=2s INTEGRATION_PAUSE_IMAGE=k8s.gcr.io/pause:3.2 INTEGRATION_KIND_KUBECNOFIG = $(DEV_TOOL_PREFIX)/.kubeconfig INTEGRATION_KIND_CONF=./hack/integration/kind.conf -INTEGRATION_NODE_IMAGE ?= kindest/node:v1.25.3 +INTEGRATION_NODE_IMAGE ?= kindest/node:v1.30.0 integration-setup: $(KIND) get clusters | grep kube-throttler-integration 2>&1 >/dev/null \ || $(KIND) create cluster --name=kube-throttler-integration \ diff --git a/pkg/apis/schedule/v1alpha1/clusterthrottle_types.go b/pkg/apis/schedule/v1alpha1/clusterthrottle_types.go index cdc67f1..7e0fd24 100644 --- a/pkg/apis/schedule/v1alpha1/clusterthrottle_types.go +++ b/pkg/apis/schedule/v1alpha1/clusterthrottle_types.go @@ -27,7 +27,7 @@ type ClusterThrottleSpec struct { Selector ClusterThrottleSelector `json:"selector,omitempty"` } -func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus { +func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, usedByGroup ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus { threshold := thr.Spec.Threshold if !thr.Status.CalculatedThreshold.CalculatedAt.Time.IsZero() { threshold = thr.Status.CalculatedThreshold.Threshold @@ -46,11 +46,16 @@ func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAm return CheckThrottleStatusActive } - used := ResourceAmount{}.Add(thr.Status.Used).Add(ResourceAmountOfPod(pod)).Add(reservedResourceAmount) - if threshold.IsThrottled(used, isThrottledOnEqual).IsThrottledFor(pod) { + usedWithPod := alreadyUsed.Add(ResourceAmountOfPod(pod)) + if threshold.IsThrottled(usedWithPod, isThrottledOnEqual).IsThrottledFor(pod) { return CheckThrottleStatusInsufficient } + usedWithGroup := usedWithPod.Add(usedByGroup) + if threshold.IsThrottled(usedWithGroup, isThrottledOnEqual).IsThrottledFor(pod) { + return CheckThrottleStatusInsufficientIncludingPodGroup + } + return CheckThrottleStatusNotThrottled } diff --git a/pkg/apis/schedule/v1alpha1/throttle_types.go b/pkg/apis/schedule/v1alpha1/throttle_types.go index 9a7ce47..f3d3124 100644 --- a/pkg/apis/schedule/v1alpha1/throttle_types.go +++ b/pkg/apis/schedule/v1alpha1/throttle_types.go @@ -119,10 +119,11 @@ type ThrottleStatus struct { type CheckThrottleStatus string var ( - CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled" - CheckThrottleStatusActive CheckThrottleStatus = "active" - CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient" - CheckThrottleStatusPodRequestsExceedsThreshold CheckThrottleStatus = "pod-requests-exceeds-threshold" + CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled" + CheckThrottleStatusActive CheckThrottleStatus = "active" + CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient" + CheckThrottleStatusInsufficientIncludingPodGroup CheckThrottleStatus = "insufficient-including-pod-group" + CheckThrottleStatusPodRequestsExceedsThreshold CheckThrottleStatus = "pod-requests-exceeds-threshold" ) func (thr Throttle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus { diff --git a/pkg/controllers/clusterthrottle_controller.go b/pkg/controllers/clusterthrottle_controller.go index 3b8f1ce..6517f6a 100644 --- a/pkg/controllers/clusterthrottle_controller.go +++ b/pkg/controllers/clusterthrottle_controller.go @@ -378,27 +378,81 @@ func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *corev1.Pod, func (c *ClusterThrottleController) CheckThrottled( pod *corev1.Pod, isThrottledOnEqual bool, + groupNameAnnotation string, ) ( - []schedulev1alpha1.ClusterThrottle, - []schedulev1alpha1.ClusterThrottle, - []schedulev1alpha1.ClusterThrottle, - []schedulev1alpha1.ClusterThrottle, - error, + alreadyThrottled []schedulev1alpha1.ClusterThrottle, + insufficient []schedulev1alpha1.ClusterThrottle, + insufficientIncludingGroup []schedulev1alpha1.ClusterThrottle, + podRequestsExceedsThreshold []schedulev1alpha1.ClusterThrottle, + affected []schedulev1alpha1.ClusterThrottle, + _ error, ) { throttles, err := c.affectedClusterThrottles(pod) if err != nil { - return nil, nil, nil, nil, err + return nil, nil, nil, nil, nil, err } - affected := []schedulev1alpha1.ClusterThrottle{} - alreadyThrottled := []schedulev1alpha1.ClusterThrottle{} - insufficient := []schedulev1alpha1.ClusterThrottle{} - podRequestsExceedsThreshold := []schedulev1alpha1.ClusterThrottle{} + + // Fetch the pods which have group name's annotation + var podGroup []*corev1.Pod + if groupNameAnnotation != "" { + groupName, isGroup := pod.Annotations[groupNameAnnotation] + if isGroup { + candidatePods, err := c.podInformer.Lister().Pods(pod.Namespace).List(labels.Everything()) + if err != nil { + return nil, nil, nil, nil, nil, err + } + + for _, candidatePod := range candidatePods { + if isScheduled(candidatePod) { + continue + } + + if gn, ok := candidatePod.Annotations[groupNameAnnotation]; !ok || gn != groupName { + continue + } + + // Don't count the scheduling pod itself. + if candidatePod.UID == pod.UID { + continue + } + + podGroup = append(podGroup, candidatePod) + } + } + } + for _, thr := range throttles { affected = append(affected, *thr) reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name}) + + requestedByGroup := schedulev1alpha1.ResourceAmount{} + for _, groupPod := range podGroup { + ns, err := c.namespaceInformer.Lister().Get(groupPod.Namespace) + if err != nil { + return nil, nil, nil, nil, nil, err + } + + // If a pod of a group is already counted, skip it because it'll be counted as a reserved resource amount. + thrnn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name} + if c.cache.exist(thrnn, groupPod) { + continue + } + + match, err := thr.Spec.Selector.MatchesToPod(groupPod, ns) + if err != nil { + return nil, nil, nil, nil, nil, err + } + if !match { + continue + } + + requestedByGroup = requestedByGroup.Add(schedulev1alpha1.ResourceAmountOfPod(groupPod)) + } + checkStatus := thr.CheckThrottledFor( pod, reservedAmt, + requestedByGroup, isThrottledOnEqual, ) klog.V(3).InfoS("CheckThrottled result", @@ -408,6 +462,7 @@ func (c *ClusterThrottleController) CheckThrottled( "Threashold", thr.Status.CalculatedThreshold.Threshold, "RequestedByPod", schedulev1alpha1.ResourceAmountOfPod(pod), "UsedInClusterThrottle", thr.Status.Used, + "ReqeustedByPodGroup", requestedByGroup, "ReservedAmountInScheduler", reservedAmt, "ReservedPodsInScheduler", strings.Join(sets.List(reservedPodNNs), ","), "AmountForCheck", schedulev1alpha1.ResourceAmount{}.Add(thr.Status.Used).Add(schedulev1alpha1.ResourceAmountOfPod(pod)).Add(reservedAmt), @@ -417,11 +472,13 @@ func (c *ClusterThrottleController) CheckThrottled( alreadyThrottled = append(alreadyThrottled, *thr) case schedulev1alpha1.CheckThrottleStatusInsufficient: insufficient = append(insufficient, *thr) + case schedulev1alpha1.CheckThrottleStatusInsufficientIncludingPodGroup: + insufficientIncludingGroup = append(insufficientIncludingGroup, *thr) case schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold: podRequestsExceedsThreshold = append(podRequestsExceedsThreshold, *thr) } } - return alreadyThrottled, insufficient, podRequestsExceedsThreshold, affected, nil + return alreadyThrottled, insufficient, insufficientIncludingGroup, podRequestsExceedsThreshold, affected, nil } // mustSetupEventHandler sets up event handlers. If something wrong happens, it will panic. diff --git a/pkg/controllers/reserved_resource_amounts.go b/pkg/controllers/reserved_resource_amounts.go index 41677a9..75d052f 100644 --- a/pkg/controllers/reserved_resource_amounts.go +++ b/pkg/controllers/reserved_resource_amounts.go @@ -63,6 +63,16 @@ func (c *reservedResourceAmounts) getPodResourceAmountMap(nn types.NamespacedNam return c.cache[nn] } +func (c *reservedResourceAmounts) exist(nn types.NamespacedName, pod *corev1.Pod) bool { + c.keyMutex.LockKey(nn.String()) + defer func() { + _ = c.keyMutex.UnlockKey(nn.String()) + }() + + m := c.getPodResourceAmountMap(nn) + return m.exist(types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}) +} + func (c *reservedResourceAmounts) addPod(nn types.NamespacedName, pod *corev1.Pod) bool { c.keyMutex.LockKey(nn.String()) defer func() { @@ -145,6 +155,11 @@ func (c podResourceAmountMap) removeByNN(nn types.NamespacedName) bool { return ok } +func (c podResourceAmountMap) exist(nn types.NamespacedName) bool { + _, ok := c[nn] + return ok +} + func (c podResourceAmountMap) totalResoruceAmount() (schedulev1alpha1.ResourceAmount, sets.Set[string]) { result := schedulev1alpha1.ResourceAmount{} nns := sets.New[string]() diff --git a/pkg/scheduler_plugin/plugin.go b/pkg/scheduler_plugin/plugin.go index 8807980..6bfa00c 100644 --- a/pkg/scheduler_plugin/plugin.go +++ b/pkg/scheduler_plugin/plugin.go @@ -46,9 +46,10 @@ const ( ) type KubeThrottler struct { - fh framework.Handle - throttleCtr *controllers.ThrottleController - clusterThrottleCtr *controllers.ClusterThrottleController + fh framework.Handle + throttleCtr *controllers.ThrottleController + clusterThrottleCtr *controllers.ClusterThrottleController + groupNameAnnotation string } var _ framework.PreFilterPlugin = &KubeThrottler{} @@ -135,9 +136,10 @@ func NewPlugin(ctx context.Context, configuration runtime.Object, fh framework.H } pl := KubeThrottler{ - fh: fh, - throttleCtr: throttleController, - clusterThrottleCtr: clusterthrottleController, + fh: fh, + throttleCtr: throttleController, + clusterThrottleCtr: clusterthrottleController, + groupNameAnnotation: kubeThrottlerArgs.GroupNameAnnotation, } return &pl, nil @@ -160,7 +162,7 @@ func (pl *KubeThrottler) PreFilter( "#AffectedThrottles", len(thrAffected), ) - clthrActive, clthrInsufficient, clthrPodRequestsExceeds, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false) + clthrActive, clthrInsufficient, clthrInsufficientGroup, clthrPodRequestsExceeds, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false, pl.groupNameAnnotation) if err != nil { return nil, framework.NewStatus(framework.Error, err.Error()) } @@ -168,12 +170,13 @@ func (pl *KubeThrottler) PreFilter( "Pod", pod.Namespace+"/"+pod.Name, "#ActiveClusterThrottles", len(clthrActive), "#InsufficientClusterThrottles", len(clthrInsufficient), + "#InsufficientClusterThrottlesIncludingGroup", len(clthrInsufficientGroup), "#PodRequestsExceedsThresholdClusterThrottles", len(clthrPodRequestsExceeds), "#AffectedClusterThrottles", len(clThrAffected), ) if len(thrActive)+len(thrInsufficient)+len(thrPodRequestsExceeds)+ - len(clthrActive)+len(clthrInsufficient)+len(clthrPodRequestsExceeds) == 0 { + len(clthrActive)+len(clthrInsufficient)+len(clthrInsufficientGroup)+len(clthrPodRequestsExceeds) == 0 { return nil, framework.NewStatus(framework.Success) } @@ -206,6 +209,9 @@ func (pl *KubeThrottler) PreFilter( if len(clthrInsufficient) != 0 { reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusInsufficient, strings.Join(clusterThrottleNames(clthrInsufficient), ","))) } + if len(clthrInsufficientGroup) != 0 { + reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusInsufficientIncludingPodGroup, strings.Join(clusterThrottleNames(clthrInsufficientGroup), ","))) + } if len(thrInsufficient) != 0 { reasons = append(reasons, fmt.Sprintf("throttle[%s]=%s", schedulev1alpha1.CheckThrottleStatusInsufficient, strings.Join(throttleNames(thrInsufficient), ","))) } diff --git a/pkg/scheduler_plugin/plugin_args.go b/pkg/scheduler_plugin/plugin_args.go index 7cdc1b3..cef2e3a 100644 --- a/pkg/scheduler_plugin/plugin_args.go +++ b/pkg/scheduler_plugin/plugin_args.go @@ -37,6 +37,7 @@ type KubeThrottlerPluginArgs struct { TargetSchedulerName string `json:"targetSchedulerName"` ControllerThrediness int `json:"controllerThrediness"` NumKeyMutex int `json:"numKeyMutex"` + GroupNameAnnotation string `json:"groupNameAnnotation"` } func DecodePluginArgs(configuration runtime.Object) (*KubeThrottlerPluginArgs, error) { diff --git a/test/integration/clusterthrottle_test.go b/test/integration/clusterthrottle_test.go index a8aa4a8..4d8b319 100644 --- a/test/integration/clusterthrottle_test.go +++ b/test/integration/clusterthrottle_test.go @@ -20,11 +20,15 @@ package integration import ( "context" "fmt" + "time" "github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8stypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" ) var _ = Describe("Clusterthrottle Test", func() { @@ -163,6 +167,73 @@ var _ = Describe("Clusterthrottle Test", func() { }) }) + When("Group Pods", func() { + var ( + podPassedGroup []*corev1.Pod + thr *v1alpha1.ClusterThrottle + podThrottledGroup []*corev1.Pod + ) + BeforeEach(func() { + thr = MustCreateClusterThrottle(ctx, + MakeClusterThrottle(throttleName).Selector(DefaultNs, throttleKey, throttleName). + ThresholdPod(4). + ThresholdCpu("4"). + Obj(), + ) + + for i := 0; i < 2; i++ { + podPassedGroup = append(podPassedGroup, MustCreatePod(ctx, MakePod(DefaultNs, fmt.Sprintf("passed-pod%d", i), "100m").Annotation(groupNameAnnotation, "passed").Label(throttleKey, throttleName).Obj())) + } + + err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, time.Second, false, func(context.Context) (bool, error) { + for _, pod := range podPassedGroup { + got, err := k8sCli.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if got.Spec.NodeName == "" { + return false, nil + } + } + + return true, nil + }) + Expect(err).NotTo(HaveOccurred()) + + // Make all the Pods once to prevent them from reaching PreFilter one by one before all the Pods in the PodGroup are created. + for i := 0; i < 3; i++ { + pod := MakePod(DefaultNs, fmt.Sprintf("throttled-pod%d", i), "100m").Annotation(groupNameAnnotation, "throttled").Label(throttleKey, throttleName).Obj() + pod.Spec.SchedulingGates = []corev1.PodSchedulingGate{{Name: "group"}} + pod = MustCreatePod(ctx, pod) + podThrottledGroup = append(podThrottledGroup, pod) + } + + for _, pod := range podThrottledGroup { + _, err := k8sCli.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, k8stypes.StrategicMergePatchType, []byte(`{"spec":{"schedulingGates":null}}`), metav1.PatchOptions{}) + Expect(err).NotTo(HaveOccurred()) + } + }) + It("should not schedule podThrottledGroup", func() { + for _, pod := range podPassedGroup { + Eventually(PodIsScheduled(ctx, DefaultNs, pod.Name)).Should(Succeed()) + } + for _, pod := range podThrottledGroup { + Eventually(MustPodFailedScheduling(ctx, DefaultNs, pod.Name, v1alpha1.CheckThrottleStatusInsufficientIncludingPodGroup)).Should(Succeed()) + } + Eventually(AsyncAll( + WakeupBackoffPod(ctx), + ClusterThottleHasStatus( + ctx, thr.Name, + ClthrOpts.WithCalculatedThreshold(thr.Spec.Threshold), + ClthrOpts.WithUsedPod(len(podPassedGroup)), + ClthrOpts.WithUsedCpuReq(fmt.Sprintf("%dm", len(podPassedGroup)*100)), + ClthrOpts.WithPodThrottled(false), + ClthrOpts.WithCpuThrottled(false), + ), + )).Should(Succeed()) + }) + }) + When("Many pods are created at once", func() { var thr *v1alpha1.ClusterThrottle var scheduled = make([]*corev1.Pod, 20) diff --git a/test/integration/integration_suite_test.go b/test/integration/integration_suite_test.go index a2b897e..68fa04b 100644 --- a/test/integration/integration_suite_test.go +++ b/test/integration/integration_suite_test.go @@ -40,9 +40,10 @@ import ( ) const ( - SchedulerName = "kube-throttler-integration" - ThrottlerName = "kube-throttler" - DefaultNs = "default" + SchedulerName = "kube-throttler-integration" + ThrottlerName = "kube-throttler" + DefaultNs = "default" + groupNameAnnotation = "scheduling.k8s.pfn.io/group-name" ) var ( @@ -111,12 +112,13 @@ func mustStartKubeThrottler() { name: %s targetSchedulerName: %s kubeconfig: %s + groupNameAnnotation: %s controllerThrediness: 64 numKeyMutex: 128 `, - kubeConfigPath, // clientConnection.kubeconfig - SchedulerName, // prifiles[0].scedulerName - ThrottlerName, SchedulerName, kubeConfigPath, // profiles[0].pluginConfig[0].args + kubeConfigPath, // clientConnection.kubeconfig + SchedulerName, // prifiles[0].scedulerName + ThrottlerName, SchedulerName, kubeConfigPath, groupNameAnnotation, // profiles[0].pluginConfig[0].args ), )) Expect(err).NotTo(HaveOccurred())