-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathplugin.go
241 lines (196 loc) · 7.6 KB
/
plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
package gang
import (
"context"
"fmt"
"time"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
schedulermetrics "k8s.io/kubernetes/pkg/scheduler/metrics"
"github.com/pfnet/scheduler-plugins/utils"
)
var (
_ framework.Plugin = &Plugin{}
_ framework.EnqueueExtensions = &Plugin{}
_ framework.PreFilterPlugin = &Plugin{}
_ framework.PermitPlugin = &Plugin{}
_ framework.ReservePlugin = &Plugin{}
)
type Plugin struct {
// Fields fixed in constructor
config PluginConfig
fwkHandle framework.Handle
// Fields that change at runtime
gangs *Gangs // Gangs has own lock
}
func NewPlugin(configuration runtime.Object, fwkHandle framework.Handle) (framework.Plugin, error) {
registerMetrics.Do(func() {
schedulermetrics.RegisterMetrics(gangSchedulingEventCounter)
})
// Load plugin config
config, err := DecodePluginConfig(configuration)
if err != nil {
return nil, err
}
klog.Infof("%s: PluginConfig=%+v", PluginName, config)
plugin := &Plugin{
config: *config,
fwkHandle: fwkHandle,
gangs: NewGangs(fwkHandle, fwkHandle.ClientSet(), config.TimeoutConfig(), config.GangAnnotationPrefix),
}
// Cache gang Pods in plugin.gangs
podInformer := fwkHandle.SharedInformerFactory().Core().V1().Pods().Informer()
if _, err := podInformer.AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: plugin.filterPodEvent,
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: plugin.handlePodAdd,
UpdateFunc: plugin.handlePodUpdate,
DeleteFunc: plugin.handlePodDelete,
},
}); err != nil {
return nil, fmt.Errorf("add EventHandler on podInformer: %w", err)
}
return plugin, nil
}
func (p *Plugin) Name() string {
return PluginName
}
// Note on parallelism:
// - Scheduler plugin entry point (PreFilter, Permit, Reserve, Unreserve)
// - Only one of PreFilter, Permit, Reserve is called at a time
// - For a specific pod, only one plugin entry point is called at a time
// - For different pods, Unreserve can be called in parallel with other plugin entry points
// - Event handler (handlePodAdd, handlePodUpdate, handlePodDelete)
// - Only one Pod event handler is called at a time
// - A scheduler plugin entry point and a Pod event handler can be called in parallel
// Scheduluer plugins entry points
func (p *Plugin) PreFilter(
ctx context.Context, state *framework.CycleState, pod *corev1.Pod,
) (*framework.PreFilterResult, *framework.Status) {
klog.V(5).Infof("%s: PreFilter start for pod %s/%s", p.Name(), pod.Namespace, pod.Name)
var status *framework.Status
defer func() {
klog.V(5).Infof("%s: PreFilter end for pod %s/%s (status: %v)", p.Name(), pod.Namespace, pod.Name, status)
}()
if !IsGang(pod, p.config.GangAnnotationPrefix) {
status, _ = allow("")
return nil, status
}
status = p.gangs.PreFilter(ctx, state, pod)
return nil, status
}
func (p *Plugin) PreFilterExtensions() framework.PreFilterExtensions { return nil }
func (p *Plugin) EventsToRegister() []framework.ClusterEventWithHint {
return []framework.ClusterEventWithHint{
{
Event: framework.ClusterEvent{
Resource: framework.Node,
ActionType: framework.All,
},
},
{
Event: framework.ClusterEvent{
// Note: framework.Pod isn't work anymore for non-scheduled Pod event.
// We're doing the workaround in handlePodAdd.
// see: https://github.com/kubernetes/kubernetes/issues/110175
Resource: framework.Pod,
ActionType: framework.All,
},
},
}
}
func (p *Plugin) Permit(
ctx context.Context, state *framework.CycleState, pod *corev1.Pod, nodeName string,
) (*framework.Status, time.Duration) {
var status *framework.Status
var timeout time.Duration
klog.V(5).Infof("%s: Permit start for pod %s/%s (node=%s)", p.Name(), pod.Namespace, pod.Name, nodeName)
defer func() {
klog.V(5).Infof("%s: Permit end for pod %s/%s (node=%s status=%v timeout=%s)",
p.Name(), pod.Namespace, pod.Name, nodeName, status, timeout)
}()
status, timeout = p.gangs.Permit(state, pod)
return status, timeout
}
func (p *Plugin) PostFilter(ctx context.Context, state *framework.CycleState, pod *corev1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {
p.gangs.PostFilter(ctx, pod)
return nil, framework.NewStatus(framework.Unschedulable)
}
func (p *Plugin) PreEnqueue(ctx context.Context, pod *corev1.Pod) *framework.Status {
if !IsGang(pod, p.config.GangAnnotationPrefix) {
return nil
}
return p.gangs.PreEnqueue(pod)
}
func (p *Plugin) Reserve(ctx context.Context, _ *framework.CycleState, pod *corev1.Pod, nodeName string) *framework.Status {
return nil
}
// Unreserve is called when a waiting gang pod is rejected due to time out, and this rejects all waiting pods in the gang
func (p *Plugin) Unreserve(ctx context.Context, _ *framework.CycleState, pod *corev1.Pod, nodeName string) {
klog.V(5).Infof("%s: Unreserve start for pod %s/%s (node=%s)", p.Name(), pod.Namespace, pod.Name, nodeName)
defer func() {
klog.V(5).Infof("%s: Unreserve end for pod %s/%s (node=%s)", p.Name(), pod.Namespace, pod.Name, nodeName)
}()
if !IsGang(pod, p.config.GangAnnotationPrefix) {
klog.V(5).Infof("%s: Pod %s/%s is not a gang. Unreserve is noop.", p.Name(), pod.Namespace, pod.Name)
return
}
p.gangs.Unreserve(pod, p.fwkHandle.EventRecorder())
}
// Pod event handlers
func (p *Plugin) filterPodEvent(obj interface{}) bool {
var pod *corev1.Pod
switch t := obj.(type) {
case *corev1.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
if po, ok := t.Obj.(*corev1.Pod); ok {
pod = po
} else {
utilruntime.HandleError(fmt.Errorf("%s: unable to convert object %T to *v1.Pod", p.Name(), t.Obj))
}
default:
utilruntime.HandleError(fmt.Errorf("%s: unable to handle object %T", p.Name(), obj))
}
return pod != nil &&
utils.ResponsibleForPod(pod, p.config.SchedulerName) &&
IsGang(pod, p.config.GangAnnotationPrefix) &&
utils.IsNonCompletedPod(pod)
}
func (p *Plugin) handlePodAdd(obj interface{}) {
// type assertion should never fail because handlers are registered with podInformer
pod := obj.(*corev1.Pod)
// ok must be true because non-gang Pods are filtered out in handler registration
gangName, _ := GangNameOf(pod, p.config.GangAnnotationPrefix)
klog.V(5).Infof("%s: handlePodAdd: pod=%s/%s gang=%s", p.Name(), pod.Namespace, pod.Name, gangName)
p.gangs.AddOrUpdate(pod, p.fwkHandle.EventRecorder())
}
func (p *Plugin) handlePodUpdate(oldObj, newObj interface{}) {
pod := newObj.(*corev1.Pod)
gangName, _ := GangNameOf(pod, p.config.GangAnnotationPrefix)
klog.V(5).Infof("%s: handlePodUpdate: pod=%s/%s gang=%s", p.Name(), pod.Namespace, pod.Name, gangName)
p.gangs.AddOrUpdate(pod, p.fwkHandle.EventRecorder())
}
func (p *Plugin) handlePodDelete(obj interface{}) {
pod := obj.(*corev1.Pod)
gangName, _ := GangNameOf(pod, p.config.GangAnnotationPrefix)
klog.V(5).Infof("%s: handlePodDelete: pod=%s/%s gang=%s", p.Name(), pod.Namespace, pod.Name, gangName)
p.gangs.Delete(pod)
}
// Permit plugin responses
func allow(msg string) (*framework.Status, time.Duration) {
return framework.NewStatus(framework.Success, msg), 0
}
func reject(msg string) (*framework.Status, time.Duration) {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, msg), 0
}
func wait(msg string, duration time.Duration) (*framework.Status, time.Duration) {
return framework.NewStatus(framework.Wait, msg), duration
}
// Scheduling event messages
func msgInternalError(format string, args ...interface{}) string {
return PluginName + ": SchedulerInternalError: " + fmt.Sprintf(format, args...)
}