Skip to content

Commit

Permalink
Schduler: MAXFILLUP strategy will spread vreplicas across multiple pods
Browse files Browse the repository at this point in the history
the MAXFILLUP algorithm was using an affinity strategy, meaning that
it would prioritize adding new vreplicas to pods with the same resources.

However, the downside is that if one pod goes down or gets
re-scheduled the entire resource would be down and not produce
events. By spreading replicas across multiple real replicas we would
guarantee better availability.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Oct 21, 2024
1 parent e6490c3 commit 8b054a8
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 33 deletions.
94 changes: 64 additions & 30 deletions pkg/scheduler/statefulset/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
clientappsv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
corev1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/utils/integer"
"knative.dev/pkg/logging"
"knative.dev/pkg/reconciler"

Expand Down Expand Up @@ -740,48 +740,67 @@ func (s *StatefulSetScheduler) removeReplicas(diff int32, placements []duckv1alp
}

func (s *StatefulSetScheduler) addReplicas(states *st.State, diff int32, placements []duckv1alpha1.Placement) ([]duckv1alpha1.Placement, int32) {
// Pod affinity algorithm: prefer adding replicas to existing pods before considering other replicas
if states.Replicas <= 0 {
return placements, diff
}

newPlacements := make([]duckv1alpha1.Placement, 0, len(placements))

// Add to existing
for i := 0; i < len(placements); i++ {
podName := placements[i].PodName
ordinal := st.OrdinalFromPodName(podName)
// Preserve existing placements
for _, p := range placements {
newPlacements = append(newPlacements, *p.DeepCopy())
}

// Is there space in PodName?
f := states.Free(ordinal)
if diff >= 0 && f > 0 {
allocation := integer.Int32Min(f, diff)
newPlacements = append(newPlacements, duckv1alpha1.Placement{
PodName: podName,
VReplicas: placements[i].VReplicas + allocation,
})
candidates := make([]string, states.Replicas)
lastIdx := states.Replicas - 1
existingPlacements := sets.New[string]()

diff -= allocation
states.SetFree(ordinal, f-allocation)
} else {
newPlacements = append(newPlacements, placements[i])
// De-prioritize existing placements pods, add existing placements to the tail of the candidates.
for _, placement := range placements {
// This should really never happen as placements are de-duped, however, better to handle
// edge cases in case the prerequisite doesn't hold in the future.
if existingPlacements.Has(placement.PodName) {
continue
}
candidates[lastIdx] = placement.PodName
lastIdx--
existingPlacements.Insert(placement.PodName)
}

if diff > 0 {
// Needs to allocate replicas to additional pods
for ordinal := int32(0); ordinal < s.replicas; ordinal++ {
f := states.Free(ordinal)
if f > 0 {
allocation := integer.Int32Min(f, diff)
newPlacements = append(newPlacements, duckv1alpha1.Placement{
PodName: st.PodNameFromOrdinal(s.statefulSetName, ordinal),
// Add all the ordinals to the candidates list.
// De-prioritize the last ordinals over lower ordinals so that we reduce the chances for compaction.
for ordinal := s.replicas - 1; ordinal >= 0; ordinal-- {
podName := st.PodNameFromOrdinal(states.StatefulSetName, ordinal)
if existingPlacements.Has(podName) {
continue
}
candidates[lastIdx] = podName
lastIdx--
}

// Spread replicas in as many candidates as possible.
foundFreeCandidate := true
for diff > 0 && foundFreeCandidate {
foundFreeCandidate = false
for _, podName := range candidates {
if diff <= 0 {
break
}

ordinal := st.OrdinalFromPodName(podName)
// Is there space?
if f := states.Free(ordinal); f > 0 {
foundFreeCandidate = true
allocation := int32(1)

newPlacements = upsertPlacements(newPlacements, duckv1alpha1.Placement{
PodName: st.PodNameFromOrdinal(states.StatefulSetName, ordinal),
VReplicas: allocation,
})

diff -= allocation
states.SetFree(ordinal, f-allocation)
}

if diff == 0 {
break
}
}
}

Expand Down Expand Up @@ -859,3 +878,18 @@ func (s *StatefulSetScheduler) Reserved() map[types.NamespacedName]map[string]in

return r
}

func upsertPlacements(placements []duckv1alpha1.Placement, placement duckv1alpha1.Placement) []duckv1alpha1.Placement {
found := false
for i := range placements {
if placements[i].PodName == placement.PodName {
placements[i].VReplicas = placements[i].VReplicas + placement.VReplicas
found = true
break
}
}
if !found {
placements = append(placements, placement)
}
return placements
}
41 changes: 38 additions & 3 deletions pkg/scheduler/statefulset/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ func TestStatefulsetScheduler(t *testing.T) {
vreplicas: 1,
replicas: int32(0),
err: controller.NewRequeueAfter(5 * time.Second),
expected: []duckv1alpha1.Placement{},
schedulerPolicyType: scheduler.MAXFILLUP,
},
{
Expand Down Expand Up @@ -130,8 +129,44 @@ func TestStatefulsetScheduler(t *testing.T) {
vreplicas: 15,
replicas: int32(2),
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: 10},
{PodName: "statefulset-name-1", VReplicas: 5},
{PodName: "statefulset-name-0", VReplicas: 8},
{PodName: "statefulset-name-1", VReplicas: 7},
},
schedulerPolicyType: scheduler.MAXFILLUP,
},
{
name: "5 replicas, 4 vreplicas spread, scheduled",
vreplicas: 4,
replicas: int32(5),
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: 1},
{PodName: "statefulset-name-1", VReplicas: 1},
{PodName: "statefulset-name-2", VReplicas: 1},
{PodName: "statefulset-name-3", VReplicas: 1},
},
schedulerPolicyType: scheduler.MAXFILLUP,
},
{
name: "2 replicas, 4 vreplicas spread, scheduled",
vreplicas: 4,
replicas: int32(2),
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: 2},
{PodName: "statefulset-name-1", VReplicas: 2},
},
schedulerPolicyType: scheduler.MAXFILLUP,
},
{
name: "3 replicas, 2 new vreplicas spread, scheduled",
vreplicas: 5,
replicas: int32(3),
placements: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: 1},
},
expected: []duckv1alpha1.Placement{
{PodName: "statefulset-name-0", VReplicas: 2},
{PodName: "statefulset-name-1", VReplicas: 2},
{PodName: "statefulset-name-2", VReplicas: 1},
},
schedulerPolicyType: scheduler.MAXFILLUP,
},
Expand Down

0 comments on commit 8b054a8

Please sign in to comment.