Skip to content

Commit

Permalink
Merge pull request volcano-sh#3680 from bibibox/fix_pipelined_nominated
Browse files Browse the repository at this point in the history
fix always update pod nominatedNodeName when pod is pipelined
  • Loading branch information
volcano-sh-bot authored Aug 20, 2024
2 parents 28569a9 + f8e7847 commit 60a7f75
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 8 deletions.
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/allocate/allocate.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ func (alloc *Action) allocateResourcesForTasks(tasks *util.PriorityQueue, job *a
if task.InitResreq.LessEqual(bestNode.FutureIdle(), api.Zero) {
klog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, bestNode.Name, task.InitResreq, bestNode.Releasing)
if err := stmt.Pipeline(task, bestNode.Name); err != nil {
if err := stmt.Pipeline(task, bestNode.Name, false); err != nil {
klog.Errorf("Failed to pipeline Task %v on %v in Session %v for %v.",
task.UID, bestNode.Name, ssn.UID, err)
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/scheduler/actions/backfill/backfill_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func TestPickUpPendingTasks(t *testing.T) {
stmt := framework.NewStatement(ssn)
task, found := ssn.Jobs[jobID].Tasks[api.PodKey(pod)]
if found {
stmt.Pipeline(task, "node1")
stmt.Pipeline(task, "node1", false)
}
}

Expand Down
7 changes: 6 additions & 1 deletion pkg/scheduler/actions/preempt/preempt.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,18 @@ func preempt(
preempted.Add(preemptee.Resreq)
}

evictionOccurred := false
if !preempted.IsEmpty() {
evictionOccurred = true
}

metrics.RegisterPreemptionAttempts()
klog.V(3).Infof("Preempted <%v> for Task <%s/%s> requested <%v>.",
preempted, preemptor.Namespace, preemptor.Name, preemptor.InitResreq)

// If preemptor's queue is overused, it means preemptor can not be allocated. So no need care about the node idle resource
if ssn.Allocatable(currentQueue, preemptor) && preemptor.InitResreq.LessEqual(node.FutureIdle(), api.Zero) {
if err := stmt.Pipeline(preemptor, node.Name); err != nil {
if err := stmt.Pipeline(preemptor, node.Name, evictionOccurred); err != nil {
klog.Errorf("Failed to pipeline Task <%s/%s> on Node <%s>",
preemptor.Namespace, preemptor.Name, node.Name)
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/scheduler/api/job_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,9 @@ type TaskID types.UID

// TransactionContext holds all the fields that needed by scheduling transaction
type TransactionContext struct {
NodeName string
Status TaskStatus
NodeName string
EvictionOccurred bool
Status TaskStatus
}

// Clone returns a clone of TransactionContext
Expand Down Expand Up @@ -683,7 +684,10 @@ func (ji *JobInfo) TaskSchedulingReason(tid TaskID) (reason, msg, nominatedNodeN
return PodReasonSchedulable, msg, ""
case Pipelined:
msg = fmt.Sprintf("Pod %s/%s can possibly be assigned to %s, once resource is released", taskInfo.Namespace, taskInfo.Name, ctx.NodeName)
return PodReasonUnschedulable, msg, ctx.NodeName
if ctx.EvictionOccurred {
nominatedNodeName = ctx.NodeName
}
return PodReasonUnschedulable, msg, nominatedNodeName
case Pending:
if fe := ji.NodesFitErrors[tid]; fe != nil {
// Pod is unschedulable
Expand Down
8 changes: 7 additions & 1 deletion pkg/scheduler/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,13 @@ func (sc *SchedulerCache) taskUnschedulable(task *schedulingapi.TaskInfo, reason
}

updateCond := podConditionHaveUpdate(&pod.Status, condition)
updateNomiNode := podNominatedNodeNameNeedUpdate(&pod.Status, nominatedNodeName)

// only update pod's nominatedNodeName when nominatedNodeName is not empty
// consider this situation:
// 1. at session 1, the pod A preempt another lower priority pod B, and we updated A's nominatedNodeName
// 2. at session 2, the pod B is still terminating, so the pod A is still pipelined, but it preempt none, so
// the nominatedNodeName is empty, but we should not override the A's nominatedNodeName to empty
updateNomiNode := len(nominatedNodeName) > 0 && podNominatedNodeNameNeedUpdate(&pod.Status, nominatedNodeName)

if updateCond || updateNomiNode {
pod = pod.DeepCopy()
Expand Down
3 changes: 2 additions & 1 deletion pkg/scheduler/framework/statement.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (s *Statement) unevict(reclaimee *api.TaskInfo) error {
}

// Pipeline the task for the node
func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error {
func (s *Statement) Pipeline(task *api.TaskInfo, hostname string, evictionOccurred bool) error {
job, found := s.ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Pipelined); err != nil {
Expand All @@ -155,6 +155,7 @@ func (s *Statement) Pipeline(task *api.TaskInfo, hostname string) error {
}

task.NodeName = hostname
task.EvictionOccurred = evictionOccurred

if node, found := s.ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
Expand Down

0 comments on commit 60a7f75

Please sign in to comment.