From f8e7847dcfcbcc67bd5ae363b2b3c472afa9dc8a Mon Sep 17 00:00:00 2001 From: Wenbo Zhang Date: Mon, 19 Aug 2024 19:55:46 +0800 Subject: [PATCH] fix always update pod nominatedNodeName when pod is pipelined Signed-off-by: Wenbo Zhang --- pkg/scheduler/actions/allocate/allocate.go | 2 +- pkg/scheduler/actions/backfill/backfill_test.go | 2 +- pkg/scheduler/actions/preempt/preempt.go | 7 ++++++- pkg/scheduler/api/job_info.go | 10 +++++++--- pkg/scheduler/cache/cache.go | 8 +++++++- pkg/scheduler/framework/statement.go | 3 ++- 6 files changed, 24 insertions(+), 8 deletions(-) diff --git a/pkg/scheduler/actions/allocate/allocate.go b/pkg/scheduler/actions/allocate/allocate.go index ec21a1b570..23807b4b2a 100644 --- a/pkg/scheduler/actions/allocate/allocate.go +++ b/pkg/scheduler/actions/allocate/allocate.go @@ -278,7 +278,7 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a if task.InitResreq.LessEqual(bestNode.FutureIdle(), api.Zero) { klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>", task.Namespace, task.Name, bestNode.Name, task.InitResreq, bestNode.Releasing) - if err := stmt.Pipeline(task, bestNode.Name); err != nil { + if err := stmt.Pipeline(task, bestNode.Name, false); err != nil { klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.", task.UID, bestNode.Name, ssn.UID, err) } else { diff --git a/pkg/scheduler/actions/backfill/backfill_test.go b/pkg/scheduler/actions/backfill/backfill_test.go index 8afd67e93e..827793213e 100644 --- a/pkg/scheduler/actions/backfill/backfill_test.go +++ b/pkg/scheduler/actions/backfill/backfill_test.go @@ -145,7 +145,7 @@ func TestPickUpPendingTasks(t *testing.T) { stmt := framework.NewStatement(ssn) task, found := ssn.Jobs[jobID].Tasks[api.PodKey(pod)] if found { - stmt.Pipeline(task, "node1") + stmt.Pipeline(task, "node1", false) } } diff --git a/pkg/scheduler/actions/preempt/preempt.go b/pkg/scheduler/actions/preempt/preempt.go index 308efef6d4..c3dd90657b 100644 --- a/pkg/scheduler/actions/preempt/preempt.go +++ b/pkg/scheduler/actions/preempt/preempt.go @@ -278,13 +278,18 @@ func preempt( preempted.Add(preemptee.Resreq) } + evictionOccurred := false + if !preempted.IsEmpty() { + evictionOccurred = true + } + metrics.RegisterPreemptionAttempts() klog.V(3).Infof("Preempted <%v> for Task <%s/%s> requested <%v>.", preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq) // If preemptor's queue is overused, it means preemptor can not be allocated. So no need care about the node idle resource if ssn.Allocatable(currentQueue, preemptor) && preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) { - if err := stmt.Pipeline(preemptor, node.Name); err != nil { + if err := stmt.Pipeline(preemptor, node.Name, evictionOccurred); err != nil { klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>", preemptor.Namespace, preemptor.Name, node.Name) } diff --git a/pkg/scheduler/api/job_info.go b/pkg/scheduler/api/job_info.go index 001383f664..d6ce83747c 100644 --- a/pkg/scheduler/api/job_info.go +++ b/pkg/scheduler/api/job_info.go @@ -69,8 +69,9 @@ type TaskID types.UID // TransactionContext holds all the fields that needed by scheduling transaction type TransactionContext struct { - NodeName string - Status TaskStatus + NodeName string + EvictionOccurred bool + Status TaskStatus } // Clone returns a clone of TransactionContext @@ -683,7 +684,10 @@ func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason, msg, nominatedNodeN return PodReasonSchedulable, msg, "" case Pipelined: msg = fmt.Sprintf("Pod %s/%s can possibly be assigned to %s, once resource is released", taskInfo.Namespace, taskInfo.Name, ctx.NodeName) - return PodReasonUnschedulable, msg, ctx.NodeName + if ctx.EvictionOccurred { + nominatedNodeName = ctx.NodeName + } + return PodReasonUnschedulable, msg, nominatedNodeName case Pending: if fe := ji.NodesFitErrors[tid]; fe != nil { // Pod is unschedulable diff --git a/pkg/scheduler/cache/cache.go b/pkg/scheduler/cache/cache.go index d03ad6f162..f0caf61cb1 100644 --- a/pkg/scheduler/cache/cache.go +++ b/pkg/scheduler/cache/cache.go @@ -993,7 +993,13 @@ func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason } updateCond := podConditionHaveUpdate(&pod.Status, condition) - updateNomiNode := podNominatedNodeNameNeedUpdate(&pod.Status, nominatedNodeName) + + // only update pod's nominatedNodeName when nominatedNodeName is not empty + // consider this situation: + // 1. at session 1, the pod A preempt another lower priority pod B, and we updated A's nominatedNodeName + // 2. at session 2, the pod B is still terminating, so the pod A is still pipelined, but it preempt none, so + // the nominatedNodeName is empty, but we should not override the A's nominatedNodeName to empty + updateNomiNode := len(nominatedNodeName) > 0 && podNominatedNodeNameNeedUpdate(&pod.Status, nominatedNodeName) if updateCond || updateNomiNode { pod = pod.DeepCopy() diff --git a/pkg/scheduler/framework/statement.go b/pkg/scheduler/framework/statement.go index 22bd7ebb92..1e951caa13 100644 --- a/pkg/scheduler/framework/statement.go +++ b/pkg/scheduler/framework/statement.go @@ -142,7 +142,7 @@ func (s *Statement) unevict(reclaimee *api.TaskInfo) error { } // Pipeline the task for the node -func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error { +func (s *Statement) Pipeline(task *api.TaskInfo, hostname string, evictionOccurred bool) error { job, found := s.ssn.Jobs[task.Job] if found { if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil { @@ -155,6 +155,7 @@ func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error { } task.NodeName = hostname + task.EvictionOccurred = evictionOccurred if node, found := s.ssn.Nodes[hostname]; found { if err := node.AddTask(task); err != nil {