Skip to content

Commit

Permalink
ClusterThrottles support PodGroup aka Gang (#12)
Browse files Browse the repository at this point in the history
* ClusterThrottles support PodGroup aka Gang

Signed-off-by: utam0k <utam0k@preferred.jp>

* Fix CI of goreleaser

Signed-off-by: utam0k <utam0k@preferred.jp>

* fixup! ClusterThrottles support PodGroup aka Gang

Co-authored-by: Hidehito Yabuuchi <hyab@preferred.jp>
Signed-off-by: utam0k <utam0k@preferred.jp>

* Don't schedule until all group pods are created in the integration test

Signed-off-by: utam0k <utam0k@preferred.jp>

* fixup! ClusterThrottles support PodGroup aka Gang

Signed-off-by: utam0k <utam0k@preferred.jp>

* Add the comment

Signed-off-by: utam0k <utam0k@preferred.jp>

* Update test/integration/clusterthrottle_test.go

Co-authored-by: Hidehito Yabuuchi <hdht.ybuc+github@gmail.com>

* Update test/integration/clusterthrottle_test.go

* fixup! ClusterThrottles support PodGroup aka Gang

Signed-off-by: utam0k <utam0k@preferred.jp>

---------

Signed-off-by: utam0k <utam0k@preferred.jp>
Co-authored-by: Hidehito Yabuuchi <hyab@preferred.jp>
  • Loading branch information
utam0k and ordovicia authored Sep 6, 2024
1 parent 6a0f27f commit 00aba44
Show file tree
Hide file tree
Showing 9 changed files with 191 additions and 33 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ export INTEGRATION_GOMEGA_DEFAULT_CONSISTENTLY_DURATION=2s
INTEGRATION_PAUSE_IMAGE=k8s.gcr.io/pause:3.2
INTEGRATION_KIND_KUBECNOFIG = $(DEV_TOOL_PREFIX)/.kubeconfig
INTEGRATION_KIND_CONF=./hack/integration/kind.conf
INTEGRATION_NODE_IMAGE ?= kindest/node:v1.25.3
INTEGRATION_NODE_IMAGE ?= kindest/node:v1.30.0
integration-setup:
$(KIND) get clusters | grep kube-throttler-integration 2>&1 >/dev/null \
|| $(KIND) create cluster --name=kube-throttler-integration \
Expand Down
11 changes: 8 additions & 3 deletions pkg/apis/schedule/v1alpha1/clusterthrottle_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ClusterThrottleSpec struct {
Selector ClusterThrottleSelector `json:"selector,omitempty"`
}

func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus {
func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, usedByGroup ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus {
threshold := thr.Spec.Threshold
if !thr.Status.CalculatedThreshold.CalculatedAt.Time.IsZero() {
threshold = thr.Status.CalculatedThreshold.Threshold
Expand All @@ -46,11 +46,16 @@ func (thr ClusterThrottle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAm
return CheckThrottleStatusActive
}

used := ResourceAmount{}.Add(thr.Status.Used).Add(ResourceAmountOfPod(pod)).Add(reservedResourceAmount)
if threshold.IsThrottled(used, isThrottledOnEqual).IsThrottledFor(pod) {
usedWithPod := alreadyUsed.Add(ResourceAmountOfPod(pod))
if threshold.IsThrottled(usedWithPod, isThrottledOnEqual).IsThrottledFor(pod) {
return CheckThrottleStatusInsufficient
}

usedWithGroup := usedWithPod.Add(usedByGroup)
if threshold.IsThrottled(usedWithGroup, isThrottledOnEqual).IsThrottledFor(pod) {
return CheckThrottleStatusInsufficientIncludingPodGroup
}

return CheckThrottleStatusNotThrottled
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/apis/schedule/v1alpha1/throttle_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,11 @@ type ThrottleStatus struct {
type CheckThrottleStatus string

var (
CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled"
CheckThrottleStatusActive CheckThrottleStatus = "active"
CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient"
CheckThrottleStatusPodRequestsExceedsThreshold CheckThrottleStatus = "pod-requests-exceeds-threshold"
CheckThrottleStatusNotThrottled CheckThrottleStatus = "not-throttled"
CheckThrottleStatusActive CheckThrottleStatus = "active"
CheckThrottleStatusInsufficient CheckThrottleStatus = "insufficient"
CheckThrottleStatusInsufficientIncludingPodGroup CheckThrottleStatus = "insufficient-including-pod-group"
CheckThrottleStatusPodRequestsExceedsThreshold CheckThrottleStatus = "pod-requests-exceeds-threshold"
)

func (thr Throttle) CheckThrottledFor(pod *corev1.Pod, reservedResourceAmount ResourceAmount, isThrottledOnEqual bool) CheckThrottleStatus {
Expand Down
79 changes: 68 additions & 11 deletions pkg/controllers/clusterthrottle_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,27 +378,81 @@ func (c *ClusterThrottleController) UnReserveOnClusterThrottle(pod *corev1.Pod,
func (c *ClusterThrottleController) CheckThrottled(
pod *corev1.Pod,
isThrottledOnEqual bool,
groupNameAnnotation string,
) (
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
[]schedulev1alpha1.ClusterThrottle,
error,
alreadyThrottled []schedulev1alpha1.ClusterThrottle,
insufficient []schedulev1alpha1.ClusterThrottle,
insufficientIncludingGroup []schedulev1alpha1.ClusterThrottle,
podRequestsExceedsThreshold []schedulev1alpha1.ClusterThrottle,
affected []schedulev1alpha1.ClusterThrottle,
_ error,
) {
throttles, err := c.affectedClusterThrottles(pod)
if err != nil {
return nil, nil, nil, nil, err
return nil, nil, nil, nil, nil, err
}
affected := []schedulev1alpha1.ClusterThrottle{}
alreadyThrottled := []schedulev1alpha1.ClusterThrottle{}
insufficient := []schedulev1alpha1.ClusterThrottle{}
podRequestsExceedsThreshold := []schedulev1alpha1.ClusterThrottle{}

// Fetch the pods which have group name's annotation
var podGroup []*corev1.Pod
if groupNameAnnotation != "" {
groupName, isGroup := pod.Annotations[groupNameAnnotation]
if isGroup {
candidatePods, err := c.podInformer.Lister().Pods(pod.Namespace).List(labels.Everything())
if err != nil {
return nil, nil, nil, nil, nil, err
}

for _, candidatePod := range candidatePods {
if isScheduled(candidatePod) {
continue
}

if gn, ok := candidatePod.Annotations[groupNameAnnotation]; !ok || gn != groupName {
continue
}

// Don't count the scheduling pod itself.
if candidatePod.UID == pod.UID {
continue
}

podGroup = append(podGroup, candidatePod)
}
}
}

for _, thr := range throttles {
affected = append(affected, *thr)
reservedAmt, reservedPodNNs := c.cache.reservedResourceAmount(types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name})

requestedByGroup := schedulev1alpha1.ResourceAmount{}
for _, groupPod := range podGroup {
ns, err := c.namespaceInformer.Lister().Get(groupPod.Namespace)
if err != nil {
return nil, nil, nil, nil, nil, err
}

// If a pod of a group is already counted, skip it because it'll be counted as a reserved resource amount.
thrnn := types.NamespacedName{Namespace: thr.Namespace, Name: thr.Name}
if c.cache.exist(thrnn, groupPod) {
continue
}

match, err := thr.Spec.Selector.MatchesToPod(groupPod, ns)
if err != nil {
return nil, nil, nil, nil, nil, err
}
if !match {
continue
}

requestedByGroup = requestedByGroup.Add(schedulev1alpha1.ResourceAmountOfPod(groupPod))
}

checkStatus := thr.CheckThrottledFor(
pod,
reservedAmt,
requestedByGroup,
isThrottledOnEqual,
)
klog.V(3).InfoS("CheckThrottled result",
Expand All @@ -408,6 +462,7 @@ func (c *ClusterThrottleController) CheckThrottled(
"Threashold", thr.Status.CalculatedThreshold.Threshold,
"RequestedByPod", schedulev1alpha1.ResourceAmountOfPod(pod),
"UsedInClusterThrottle", thr.Status.Used,
"ReqeustedByPodGroup", requestedByGroup,
"ReservedAmountInScheduler", reservedAmt,
"ReservedPodsInScheduler", strings.Join(sets.List(reservedPodNNs), ","),
"AmountForCheck", schedulev1alpha1.ResourceAmount{}.Add(thr.Status.Used).Add(schedulev1alpha1.ResourceAmountOfPod(pod)).Add(reservedAmt),
Expand All @@ -417,11 +472,13 @@ func (c *ClusterThrottleController) CheckThrottled(
alreadyThrottled = append(alreadyThrottled, *thr)
case schedulev1alpha1.CheckThrottleStatusInsufficient:
insufficient = append(insufficient, *thr)
case schedulev1alpha1.CheckThrottleStatusInsufficientIncludingPodGroup:
insufficientIncludingGroup = append(insufficientIncludingGroup, *thr)
case schedulev1alpha1.CheckThrottleStatusPodRequestsExceedsThreshold:
podRequestsExceedsThreshold = append(podRequestsExceedsThreshold, *thr)
}
}
return alreadyThrottled, insufficient, podRequestsExceedsThreshold, affected, nil
return alreadyThrottled, insufficient, insufficientIncludingGroup, podRequestsExceedsThreshold, affected, nil
}

// mustSetupEventHandler sets up event handlers. If something wrong happens, it will panic.
Expand Down
15 changes: 15 additions & 0 deletions pkg/controllers/reserved_resource_amounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,16 @@ func (c *reservedResourceAmounts) getPodResourceAmountMap(nn types.NamespacedNam
return c.cache[nn]
}

func (c *reservedResourceAmounts) exist(nn types.NamespacedName, pod *corev1.Pod) bool {
c.keyMutex.LockKey(nn.String())
defer func() {
_ = c.keyMutex.UnlockKey(nn.String())
}()

m := c.getPodResourceAmountMap(nn)
return m.exist(types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name})
}

func (c *reservedResourceAmounts) addPod(nn types.NamespacedName, pod *corev1.Pod) bool {
c.keyMutex.LockKey(nn.String())
defer func() {
Expand Down Expand Up @@ -145,6 +155,11 @@ func (c podResourceAmountMap) removeByNN(nn types.NamespacedName) bool {
return ok
}

func (c podResourceAmountMap) exist(nn types.NamespacedName) bool {
_, ok := c[nn]
return ok
}

func (c podResourceAmountMap) totalResoruceAmount() (schedulev1alpha1.ResourceAmount, sets.Set[string]) {
result := schedulev1alpha1.ResourceAmount{}
nns := sets.New[string]()
Expand Down
22 changes: 14 additions & 8 deletions pkg/scheduler_plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ const (
)

type KubeThrottler struct {
fh framework.Handle
throttleCtr *controllers.ThrottleController
clusterThrottleCtr *controllers.ClusterThrottleController
fh framework.Handle
throttleCtr *controllers.ThrottleController
clusterThrottleCtr *controllers.ClusterThrottleController
groupNameAnnotation string
}

var _ framework.PreFilterPlugin = &KubeThrottler{}
Expand Down Expand Up @@ -135,9 +136,10 @@ func NewPlugin(ctx context.Context, configuration runtime.Object, fh framework.H
}

pl := KubeThrottler{
fh: fh,
throttleCtr: throttleController,
clusterThrottleCtr: clusterthrottleController,
fh: fh,
throttleCtr: throttleController,
clusterThrottleCtr: clusterthrottleController,
groupNameAnnotation: kubeThrottlerArgs.GroupNameAnnotation,
}

return &pl, nil
Expand All @@ -160,20 +162,21 @@ func (pl *KubeThrottler) PreFilter(
"#AffectedThrottles", len(thrAffected),
)

clthrActive, clthrInsufficient, clthrPodRequestsExceeds, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false)
clthrActive, clthrInsufficient, clthrInsufficientGroup, clthrPodRequestsExceeds, clThrAffected, err := pl.clusterThrottleCtr.CheckThrottled(pod, false, pl.groupNameAnnotation)
if err != nil {
return nil, framework.NewStatus(framework.Error, err.Error())
}
klog.V(2).InfoS("PreFilter: clusterthrottle check result",
"Pod", pod.Namespace+"/"+pod.Name,
"#ActiveClusterThrottles", len(clthrActive),
"#InsufficientClusterThrottles", len(clthrInsufficient),
"#InsufficientClusterThrottlesIncludingGroup", len(clthrInsufficientGroup),
"#PodRequestsExceedsThresholdClusterThrottles", len(clthrPodRequestsExceeds),
"#AffectedClusterThrottles", len(clThrAffected),
)

if len(thrActive)+len(thrInsufficient)+len(thrPodRequestsExceeds)+
len(clthrActive)+len(clthrInsufficient)+len(clthrPodRequestsExceeds) == 0 {
len(clthrActive)+len(clthrInsufficient)+len(clthrInsufficientGroup)+len(clthrPodRequestsExceeds) == 0 {
return nil, framework.NewStatus(framework.Success)
}

Expand Down Expand Up @@ -206,6 +209,9 @@ func (pl *KubeThrottler) PreFilter(
if len(clthrInsufficient) != 0 {
reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusInsufficient, strings.Join(clusterThrottleNames(clthrInsufficient), ",")))
}
if len(clthrInsufficientGroup) != 0 {
reasons = append(reasons, fmt.Sprintf("clusterthrottle[%s]=%s", schedulev1alpha1.CheckThrottleStatusInsufficientIncludingPodGroup, strings.Join(clusterThrottleNames(clthrInsufficientGroup), ",")))
}
if len(thrInsufficient) != 0 {
reasons = append(reasons, fmt.Sprintf("throttle[%s]=%s", schedulev1alpha1.CheckThrottleStatusInsufficient, strings.Join(throttleNames(thrInsufficient), ",")))
}
Expand Down
1 change: 1 addition & 0 deletions pkg/scheduler_plugin/plugin_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type KubeThrottlerPluginArgs struct {
TargetSchedulerName string `json:"targetSchedulerName"`
ControllerThrediness int `json:"controllerThrediness"`
NumKeyMutex int `json:"numKeyMutex"`
GroupNameAnnotation string `json:"groupNameAnnotation"`
}

func DecodePluginArgs(configuration runtime.Object) (*KubeThrottlerPluginArgs, error) {
Expand Down
71 changes: 71 additions & 0 deletions test/integration/clusterthrottle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ package integration
import (
"context"
"fmt"
"time"

"github.com/everpeace/kube-throttler/pkg/apis/schedule/v1alpha1"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
)

var _ = Describe("Clusterthrottle Test", func() {
Expand Down Expand Up @@ -163,6 +167,73 @@ var _ = Describe("Clusterthrottle Test", func() {
})
})

When("Group Pods", func() {
var (
podPassedGroup []*corev1.Pod
thr *v1alpha1.ClusterThrottle
podThrottledGroup []*corev1.Pod
)
BeforeEach(func() {
thr = MustCreateClusterThrottle(ctx,
MakeClusterThrottle(throttleName).Selector(DefaultNs, throttleKey, throttleName).
ThresholdPod(4).
ThresholdCpu("4").
Obj(),
)

for i := 0; i < 2; i++ {
podPassedGroup = append(podPassedGroup, MustCreatePod(ctx, MakePod(DefaultNs, fmt.Sprintf("passed-pod%d", i), "100m").Annotation(groupNameAnnotation, "passed").Label(throttleKey, throttleName).Obj()))
}

err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, time.Second, false, func(context.Context) (bool, error) {
for _, pod := range podPassedGroup {
got, err := k8sCli.CoreV1().Pods(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err != nil {
return false, err
}
if got.Spec.NodeName == "" {
return false, nil
}
}

return true, nil
})
Expect(err).NotTo(HaveOccurred())

// Make all the Pods once to prevent them from reaching PreFilter one by one before all the Pods in the PodGroup are created.
for i := 0; i < 3; i++ {
pod := MakePod(DefaultNs, fmt.Sprintf("throttled-pod%d", i), "100m").Annotation(groupNameAnnotation, "throttled").Label(throttleKey, throttleName).Obj()
pod.Spec.SchedulingGates = []corev1.PodSchedulingGate{{Name: "group"}}
pod = MustCreatePod(ctx, pod)
podThrottledGroup = append(podThrottledGroup, pod)
}

for _, pod := range podThrottledGroup {
_, err := k8sCli.CoreV1().Pods(pod.Namespace).Patch(ctx, pod.Name, k8stypes.StrategicMergePatchType, []byte(`{"spec":{"schedulingGates":null}}`), metav1.PatchOptions{})
Expect(err).NotTo(HaveOccurred())
}
})
It("should not schedule podThrottledGroup", func() {
for _, pod := range podPassedGroup {
Eventually(PodIsScheduled(ctx, DefaultNs, pod.Name)).Should(Succeed())
}
for _, pod := range podThrottledGroup {
Eventually(MustPodFailedScheduling(ctx, DefaultNs, pod.Name, v1alpha1.CheckThrottleStatusInsufficientIncludingPodGroup)).Should(Succeed())
}
Eventually(AsyncAll(
WakeupBackoffPod(ctx),
ClusterThottleHasStatus(
ctx, thr.Name,
ClthrOpts.WithCalculatedThreshold(thr.Spec.Threshold),
ClthrOpts.WithUsedPod(len(podPassedGroup)),
ClthrOpts.WithUsedCpuReq(fmt.Sprintf("%dm", len(podPassedGroup)*100)),
ClthrOpts.WithPodThrottled(false),
ClthrOpts.WithCpuThrottled(false),
),
)).Should(Succeed())
})
})

When("Many pods are created at once", func() {
var thr *v1alpha1.ClusterThrottle
var scheduled = make([]*corev1.Pod, 20)
Expand Down
14 changes: 8 additions & 6 deletions test/integration/integration_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ import (
)

const (
SchedulerName = "kube-throttler-integration"
ThrottlerName = "kube-throttler"
DefaultNs = "default"
SchedulerName = "kube-throttler-integration"
ThrottlerName = "kube-throttler"
DefaultNs = "default"
groupNameAnnotation = "scheduling.k8s.pfn.io/group-name"
)

var (
Expand Down Expand Up @@ -111,12 +112,13 @@ func mustStartKubeThrottler() {
name: %s
targetSchedulerName: %s
kubeconfig: %s
groupNameAnnotation: %s
controllerThrediness: 64
numKeyMutex: 128
`,
kubeConfigPath, // clientConnection.kubeconfig
SchedulerName, // prifiles[0].scedulerName
ThrottlerName, SchedulerName, kubeConfigPath, // profiles[0].pluginConfig[0].args
kubeConfigPath, // clientConnection.kubeconfig
SchedulerName, // prifiles[0].scedulerName
ThrottlerName, SchedulerName, kubeConfigPath, groupNameAnnotation, // profiles[0].pluginConfig[0].args
),
))
Expect(err).NotTo(HaveOccurred())
Expand Down

0 comments on commit 00aba44

Please sign in to comment.