-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgang.go
344 lines (286 loc) · 10 KB
/
gang.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
package gang
import (
"fmt"
"strings"
"sync"
"time"
"k8s.io/apimachinery/pkg/util/sets"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/pfnet/scheduler-plugins/utils"
)
type Gang interface {
fmt.Stringer
// NameAndSpec returns the GangNameAndSpec of this Gang.
NameAndSpec() *GangNameAndSpec
// AddOrUpdate adds a given Pod to this Gang, or updates a Pod in this Gang.
AddOrUpdate(*corev1.Pod)
// Delete deletes a given Pod from this Gang. If the Pod is not in this Gang, Delete is a no-op.
Delete(*corev1.Pod)
// Pods returns all Pods in the gang.
Pods() []*corev1.Pod
// CountPod returns the number of Pods in this Gang.
CountPod() int
// CountPodIf returns the number of Pods that meets a given predicate in this Gang.
CountPodIf(predicate func(*corev1.Pod) bool) int
// SatisfiesInvariantForgScheduling checks that this Gang satisfy an invariant for gang
// scheduling. If this Gang does not sastisfy the invariant, this plugin does not start
// scheduling of this Gang and rejects it immediately.
SatisfiesInvariantForScheduling(ScheduleTimeoutConfig) (bool, GangSchedulingEvent)
// IterateOverPods iterates over the Pods in this Gang while applying a given function.
// The function can mutate the Pods.
IterateOverPods(func(pod *corev1.Pod))
// IsAllNonCompleletedSpecIdenticalTo checks that the gang specs of all non-completed Pods in
// this Gang are identical to a given gang spec.
IsAllNonCompletedSpecIdenticalTo(GangSpec, ScheduleTimeoutConfig) bool
// Mark this Gang as "being deleted now".
SetDeleting(deleting bool)
// Returns whether this Gang is now being deleted.
IsDeleting() bool
// EventMessage returns a message to be notified as a plugin response or Kubernetes event.
EventMessage(event GangSchedulingEvent, pod *corev1.Pod) string
EventMessageForPodFunc(event GangSchedulingEvent) func(*corev1.Pod) string
GetPosition(podUID types.UID) PodPosition
PutPosition(pod *corev1.Pod, position PodPosition)
ReadyToGetSchedule() bool
UnreadyToSchedulePodNames() []string
}
// NewGang creates a new Gang.
// Gang interface methods are *not* thread-safe.
// But as far as accessed from Gangs, Gang methods are called sequentially because Gangs acquires a
// lock when accessing its gangs map.
func NewGang(nameSpec GangNameAndSpec, gangAnnotationPrefix string) Gang {
return &gangImpl{
gangAnnotationPrefix: gangAnnotationPrefix,
GangNameAndSpec: nameSpec,
pods: map[types.UID]*corev1.Pod{},
deleting: false,
positions: map[types.UID]PodPosition{},
unreadyToSchedulePods: sets.New[string](),
}
}
// PodPosition represents the place where a pod can be.
type PodPosition int
const (
// PodPositionUnknown represents that we don't know where a Pod is in the scheduler.
//
// The gang PreEnqueue is responsible to register a position for a newly created Pod.
// Until that, a newly created Pod's position will be Unknown.
PodPositionUnknown PodPosition = iota
// PodPositionUnschedulablePodPool represents that a Pod is or should be in the Unschedulable Pod Pool.
//
// The gang PostFilter is responsible to change a position for a rejected Pod to PodPositionUnschedulablePodPool.
PodPositionUnschedulablePodPool
// PodPositionReadyToSchedule represents that a Pod is in the Unschedulable Pod Pool, but ready to get schedule AND PodsToActivate isn't issued for it yet.
// When all Pods in the gang get PodPositionReadyToSchedule, then PodsToActivate is expected to get issued soon from the gang Permit.
//
// There are multiple scenario that a Pod can get PodPositionReadyToSchedule,
// the most popular one among them is PreEnqueue changing the given Pod's position to PodPositionReadyToSchedule.
// The scheduler tries to move a Pod from unschedulable Pod Pool to activeQ when an event which may make a Pod schedulable happens.
// So, we can regard a Pod which is coming on PreEnqueue as a ready-to-schedule Pod.
PodPositionReadyToSchedule
// PodPositionActiveQ represents that a Pod is in ActiveQ. Or in the Unschedulable Pod Pool but PodsToActivate has been issued for the Pod.
//
// The gang Permit is responsible to change a position to PodPositionActiveQ when it issues PodsToActivate for the Pod.
// Those Pods will soon reach the gang PreEnqueue, and get accepted to enqueued to activeQ.
PodPositionActiveQ
// PodPositionSchedulingCycle represents that a Pod is under scheduling.
//
// The gang PreFilter is responsible to change a position to PodPositionSchedulingCycle when it accepts the Pod.
PodPositionSchedulingCycle
// PodPositionWaitingOnPermit represents that a Pod is waiting on permit.
//
// The gang Permit is responsible to change a position to PodPositionWaitingOnPermit when the Pod go through Permit plugin with Wait status.
PodPositionWaitingOnPermit
)
func (p PodPosition) String() string {
switch p {
case PodPositionUnknown:
return "Unknown"
case PodPositionUnschedulablePodPool:
return "UnschedulablePodPool"
case PodPositionReadyToSchedule:
return "ReadyToSchedule"
case PodPositionActiveQ:
return "ActiveQ"
case PodPositionSchedulingCycle:
return "SchedulingCycle"
case PodPositionWaitingOnPermit:
return "WaitingOnPermit"
default:
return "nil"
}
}
type gangImpl struct {
GangNameAndSpec
gangAnnotationPrefix string
// podsLock protects accesses to `pods` maps.
podsLock sync.RWMutex
pods map[types.UID]*corev1.Pod
deleting bool
// positionsLock protects accesses to `positions` maps and `unreadyToSchedulePods`.
positionsLock sync.RWMutex
// positions where each unscheduled Pods are expected to be.
positions map[types.UID]PodPosition
// unreadyToSchedulePods aren't ready to get schedule.
unreadyToSchedulePods sets.Set[string]
}
// GetPosition gets PodPosition from store and returns it.
func (g *gangImpl) GetPosition(podUID types.UID) PodPosition {
g.positionsLock.RLock()
defer g.positionsLock.RUnlock()
po, ok := g.positions[podUID]
if ok {
return po
}
return PodPositionUnknown
}
func (g *gangImpl) ReadyToGetSchedule() bool {
return g.unreadyToSchedulePods.Len() == 0
}
func (g *gangImpl) UnreadyToSchedulePodNames() []string {
g.positionsLock.RLock()
defer g.positionsLock.RUnlock()
names := make([]string, 0, len(g.unreadyToSchedulePods))
for p := range g.unreadyToSchedulePods {
names = append(names, p)
}
return names
}
// PutCondition puts PodPosition to store.
// If the data for Pod already exist, it'll be overwrited.
func (g *gangImpl) PutPosition(pod *corev1.Pod, position PodPosition) {
g.positionsLock.Lock()
defer g.positionsLock.Unlock()
g.positions[pod.UID] = position
if position == PodPositionUnknown || position == PodPositionUnschedulablePodPool {
g.unreadyToSchedulePods.Insert(getNamespacedName(pod))
} else {
g.unreadyToSchedulePods.Delete(getNamespacedName(pod))
}
}
// String implements fmt.Stringer interface.
// Returns a human-readable string representation of the gang.
func (g *gangImpl) String() string {
g.podsLock.RLock()
defer g.podsLock.RUnlock()
pods := make([]string, 0, len(g.pods))
for _, p := range g.pods {
pods = append(pods, fmt.Sprintf("%s(%s)", p.Name, p.Status.Phase))
}
return g.Name.String() + ": [" + strings.Join(pods, ", ") + "]"
}
func (g *gangImpl) NameAndSpec() *GangNameAndSpec {
return &g.GangNameAndSpec
}
func (g *gangImpl) AddOrUpdate(pod *corev1.Pod) {
g.podsLock.Lock()
defer g.podsLock.Unlock()
g.pods[pod.UID] = pod
}
func (g *gangImpl) Delete(pod *corev1.Pod) {
g.podsLock.Lock()
defer g.podsLock.Unlock()
delete(g.pods, pod.UID)
}
func (g *gangImpl) CountPod() int {
return len(g.pods)
}
// Pods returns all Pods in the gang.
func (g *gangImpl) Pods() []*corev1.Pod {
g.podsLock.RLock()
defer g.podsLock.RUnlock()
pods := make([]*corev1.Pod, 0, len(g.pods))
for _, p := range g.pods {
pods = append(pods, p.DeepCopy())
}
return pods
}
func (g *gangImpl) CountPodIf(pred func(pod *corev1.Pod) bool) int {
g.podsLock.RLock()
defer g.podsLock.RUnlock()
c := 0
for _, pod := range g.pods {
if pred(pod) {
c++
}
}
return c
}
func (g *gangImpl) SatisfiesInvariantForScheduling(
timeoutConfig ScheduleTimeoutConfig,
) (bool, GangSchedulingEvent) {
if !g.isAllNonCompletedSpecIdentical(timeoutConfig) {
return false, GangSpecInvalid
}
if g.CountPodIf(utils.IsNonCompletedPod) < g.Spec.Size {
return false, GangNotReady
}
if g.CountPodIf(utils.IsTerminatingPod) > 0 {
return false, GangWaitForTerminating
}
if g.CountPodIf(utils.IsAssignedAndNonCompletedPod) >= g.Spec.Size {
return false, GangFullyScheduled
}
return true, ""
}
func (g *gangImpl) IterateOverPods(f func(pod *corev1.Pod)) {
g.podsLock.RLock()
defer g.podsLock.RUnlock()
for _, pod := range g.pods {
f(pod)
}
}
func (g *gangImpl) IsAllNonCompletedSpecIdenticalTo(
target GangSpec, timeoutConfig ScheduleTimeoutConfig,
) bool {
g.podsLock.RLock()
defer g.podsLock.RUnlock()
for _, pod := range g.pods {
if utils.IsNonCompletedPod(pod) {
s := gangSpecOf(pod, timeoutConfig, g.gangAnnotationPrefix)
if s != target {
return false
}
}
}
return true
}
func (g *gangImpl) SetDeleting(deleting bool) {
g.deleting = deleting
}
func (g *gangImpl) IsDeleting() bool {
return g.deleting
}
func (g *gangImpl) EventMessage(event GangSchedulingEvent, pod *corev1.Pod) string {
ev := event
if ev == "" {
ev = "SchedulerInternalError"
}
jitter := time.Duration(g.Spec.TimeoutJitterSeconds) * time.Second
return fmt.Sprintf(
"%s: %s for pod %s/%s (gang=%s size=%d timeout=%s+[0,%s))",
PluginName, ev, pod.Namespace, pod.Name, g.Name, g.Spec.Size, g.Spec.TimeoutBase, jitter,
)
}
func (g *gangImpl) EventMessageForPodFunc(event GangSchedulingEvent) func(*corev1.Pod) string {
return func(pod *corev1.Pod) string {
return g.EventMessage(event, pod)
}
}
func (g *gangImpl) isAllNonCompletedSpecIdentical(timeoutConfig ScheduleTimeoutConfig) bool {
g.podsLock.RLock()
defer g.podsLock.RUnlock()
var spec *GangSpec
for _, pod := range g.pods {
if utils.IsNonCompletedPod(pod) {
s := gangSpecOf(pod, timeoutConfig, g.gangAnnotationPrefix)
if spec == nil {
spec = &s
} else if s != *spec {
return false
}
}
}
return true
}