From 7c993e723b46f2d89c95bc60716dfbd71d663269 Mon Sep 17 00:00:00 2001 From: "Ahmad N. F" Date: Wed, 3 Jul 2024 13:37:24 +0700 Subject: [PATCH] refactor: only show direct downstream when deleting jobs (#246) * refactor: only show direct downstream when deleting jobs * refactor: add 2 case unit test * refactor: use list of list of downstreams * refactor: skip lint --- core/job/service/job_service.go | 79 ++++++++++++----- core/job/service/job_service_test.go | 128 +++++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 24 deletions(-) diff --git a/core/job/service/job_service.go b/core/job/service/job_service.go index 75b37904bf..cf150ee527 100644 --- a/core/job/service/job_service.go +++ b/core/job/service/job_service.go @@ -845,12 +845,20 @@ func (j *JobService) RefreshResourceDownstream(ctx context.Context, resourceURNs return me.ToErr() } -func validateDeleteJob(jobTenant tenant.Tenant, downstreams []*job.Downstream, toDeleteMap map[job.FullName]*job.Spec, jobToDelete *job.Spec, logWriter writer.LogWriter, me *errors.MultiError) bool { - notDeleted, safeToDelete := isJobSafeToDelete(toDeleteMap, job.DownstreamList(downstreams).GetDownstreamFullNames()) +func validateDeleteJob(jobTenant tenant.Tenant, downstreamsPerLevel [][]*job.Downstream, toDeleteMap map[job.FullName]*job.Spec, jobToDelete *job.Spec, logWriter writer.LogWriter, me *errors.MultiError) bool { + notDeleted, safeToDelete := isJobSafeToDelete(toDeleteMap, downstreamsPerLevel) if !safeToDelete { + // only show direct downstreams that is not deleted in the error. + // if all direct downstreams are to be deleted, show all of them + directDownstreams := notDeleted[0] + if len(directDownstreams) == 0 { + directDownstreams = downstreamsPerLevel[0] + } + // TODO: refactor to put the log writer outside - errorMsg := fmt.Sprintf("deletion of job %s will fail. job is being used by %s", jobToDelete.Name().String(), job.FullNames(notDeleted).String()) + jobFullNames := job.DownstreamList(directDownstreams).GetDownstreamFullNames() + errorMsg := fmt.Sprintf("deletion of job %s will fail. job is being used by %s", jobToDelete.Name().String(), jobFullNames.String()) logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] %s", jobTenant.NamespaceName().String(), errorMsg)) me.Append(errors.NewError(errors.ErrFailedPrecond, job.EntityJob, errorMsg)) return false @@ -859,39 +867,54 @@ func validateDeleteJob(jobTenant tenant.Tenant, downstreams []*job.Downstream, t return true } -func isJobSafeToDelete(toDeleteMap map[job.FullName]*job.Spec, downstreamFullNames []job.FullName) ([]job.FullName, bool) { - notDeleted := []job.FullName{} - for _, downstreamFullName := range downstreamFullNames { - if _, ok := toDeleteMap[downstreamFullName]; !ok { - notDeleted = append(notDeleted, downstreamFullName) +func isJobSafeToDelete(toDeleteMap map[job.FullName]*job.Spec, downstreamsPerLevel [][]*job.Downstream) ([][]*job.Downstream, bool) { + notDeleted := make([][]*job.Downstream, len(downstreamsPerLevel)) + isSafeToDelete := true + + for i, downstreams := range downstreamsPerLevel { + for _, downstream := range downstreams { + downstreamFullName := downstream.FullName() + if _, ok := toDeleteMap[downstreamFullName]; !ok { + notDeleted[i] = append(notDeleted[i], downstream) + isSafeToDelete = false + } } } - return notDeleted, len(notDeleted) == 0 + return notDeleted, isSafeToDelete } -func (j *JobService) getAllDownstreams(ctx context.Context, projectName tenant.ProjectName, jobName job.Name, visited map[job.FullName]bool) ([]*job.Downstream, error) { +func (j *JobService) getAllDownstreams(ctx context.Context, projectName tenant.ProjectName, jobName job.Name, visited map[job.FullName]bool, level int) ([][]*job.Downstream, error) { currentJobFullName := job.FullNameFrom(projectName, jobName) - downstreams := []*job.Downstream{} visited[currentJobFullName] = true + downstreamsPerLevel := make([][]*job.Downstream, level+1) + childJobs, err := j.downstreamRepo.GetDownstreamByJobName(ctx, projectName, jobName) if err != nil { j.logger.Error("error getting downstream jobs for job [%s]: %s", jobName, err) return nil, err } + if len(childJobs) > 0 { + downstreamsPerLevel[level] = append(downstreamsPerLevel[level], childJobs...) + } + for _, childJob := range childJobs { - downstreams = append(downstreams, childJob) if visited[childJob.FullName()] { continue } - childDownstreams, err := j.getAllDownstreams(ctx, childJob.ProjectName(), childJob.Name(), visited) + childDownstreamsPerLevel, err := j.getAllDownstreams(ctx, childJob.ProjectName(), childJob.Name(), visited, level+1) if err != nil { j.logger.Error("error getting all downstreams for job [%s]: %s", childJob.Name(), err) return nil, err } - downstreams = append(downstreams, childDownstreams...) + for i, lr := range childDownstreamsPerLevel { + if len(downstreamsPerLevel) <= i { + downstreamsPerLevel = append(downstreamsPerLevel, []*job.Downstream{}) //nolint:makezero + } + downstreamsPerLevel[i] = append(downstreamsPerLevel[i], lr...) + } } - return downstreams, nil + return downstreamsPerLevel, nil } func (*JobService) getIdentifierToJobsMap(jobsToValidateMap map[job.Name]*job.WithUpstream) map[string][]*job.WithUpstream { @@ -999,7 +1022,7 @@ func (j *JobService) bulkDelete(ctx context.Context, jobTenant tenant.Tenant, to for _, spec := range toDelete { // TODO: reuse Delete method and pass forceFlag as false fullName := job.FullNameFrom(jobTenant.ProjectName(), spec.Name()) - downstreams, err := j.getAllDownstreams(ctx, jobTenant.ProjectName(), spec.Name(), map[job.FullName]bool{}) + downstreamsPerLevel, err := j.getAllDownstreams(ctx, jobTenant.ProjectName(), spec.Name(), map[job.FullName]bool{}, 0) if err != nil { j.logger.Error("error getting downstreams for job [%s]: %s", spec.Name(), err) logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] pre-delete check for job %s failed: %s", jobTenant.NamespaceName().String(), spec.Name().String(), err.Error())) @@ -1007,7 +1030,7 @@ func (j *JobService) bulkDelete(ctx context.Context, jobTenant tenant.Tenant, to continue } - isSafeToDelete := validateDeleteJob(jobTenant, downstreams, toDeleteMap, spec, logWriter, me) + isSafeToDelete := validateDeleteJob(jobTenant, downstreamsPerLevel, toDeleteMap, spec, logWriter, me) if !isSafeToDelete { j.logger.Warn("job [%s] is not safe to be deleted", spec.Name()) continue @@ -1015,21 +1038,29 @@ func (j *JobService) bulkDelete(ctx context.Context, jobTenant tenant.Tenant, to logWriter.Write(writer.LogLevelDebug, fmt.Sprintf("[%s] deleting job %s", jobTenant.NamespaceName().String(), spec.Name().String())) + // flatten downstreams per level into a single list, + // with direct downstreams will be put first & the leaf downstreams at the end + downstreams := []*job.Downstream{} + for _, dss := range downstreamsPerLevel { + downstreams = append(downstreams, dss...) + } + isDeletionFail := false for i := len(downstreams) - 1; i >= 0 && !isDeletionFail; i-- { - if alreadyDeleted[downstreams[i].FullName()] { + downstream := downstreams[i] + if alreadyDeleted[downstream.FullName()] { continue } - if err = j.jobRepo.Delete(ctx, downstreams[i].ProjectName(), downstreams[i].Name(), false); err != nil { - j.logger.Error("error deleting [%s] as downstream of [%s]", downstreams[i].Name(), spec.Name()) - logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] deleting job %s failed: %s", downstreams[i].NamespaceName().String(), downstreams[i].Name().String(), err.Error())) + if err = j.jobRepo.Delete(ctx, downstream.ProjectName(), downstream.Name(), false); err != nil { + j.logger.Error("error deleting [%s] as downstream of [%s]", downstream.Name(), spec.Name()) + logWriter.Write(writer.LogLevelError, fmt.Sprintf("[%s] deleting job %s failed: %s", downstream.NamespaceName().String(), downstream.Name().String(), err.Error())) me.Append(err) isDeletionFail = true } else { - alreadyDeleted[downstreams[i].FullName()] = true + alreadyDeleted[downstream.FullName()] = true j.raiseDeleteEvent(jobTenant, spec.Name()) raiseJobEventMetric(jobTenant, job.MetricJobEventStateDeleted, 1) - deletedJobNames = append(deletedJobNames, downstreams[i].Name()) + deletedJobNames = append(deletedJobNames, downstream.Name()) } } @@ -1487,7 +1518,7 @@ func (j *JobService) validateOneJobForDeletion( jobTenant tenant.Tenant, spec *job.Spec, specByFullName map[job.FullName]*job.Spec, ) []dto.ValidateResult { - downstreams, err := j.getAllDownstreams(ctx, jobTenant.ProjectName(), spec.Name(), make(map[job.FullName]bool)) + downstreams, err := j.getAllDownstreams(ctx, jobTenant.ProjectName(), spec.Name(), make(map[job.FullName]bool), 0) if err != nil { result := dto.ValidateResult{ Stage: dto.StageDeletionValidation, diff --git a/core/job/service/job_service_test.go b/core/job/service/job_service_test.go index 82845ced3d..2a6cc46257 100644 --- a/core/job/service/job_service_test.go +++ b/core/job/service/job_service_test.go @@ -3843,6 +3843,134 @@ func TestJobService(t *testing.T) { assert.EqualValues(t, expectedResult["job2"], actualResult["job2"]) assert.NoError(t, actualError) }) + + t.Run("show only job's direct dependencies when the job has multiple-level dependencies", func(t *testing.T) { + // testcase for case A -> B -> C, and A & B are the jobs to be deleted + // when showing direct downstream, + tenantDetailsGetter := new(TenantDetailsGetter) + defer tenantDetailsGetter.AssertExpectations(t) + + jobRepo := new(JobRepository) + defer jobRepo.AssertExpectations(t) + + downstreamRepo := new(DownstreamRepository) + defer downstreamRepo.AssertExpectations(t) + + jobRunInputCompiler := NewJobRunInputCompiler(t) + resourceExistenceChecker := NewResourceExistenceChecker(t) + + jobService := service.NewJobService(jobRepo, nil, downstreamRepo, nil, nil, tenantDetailsGetter, nil, log, nil, compiler.NewEngine(), jobRunInputCompiler, resourceExistenceChecker) + + jobSpec1, err := job.NewSpecBuilder(1, "job1", "optimus@goto", jobSchedule, jobWindow, jobTask).Build() + assert.NoError(t, err) + jobSpec2, err := job.NewSpecBuilder(1, "job2", "optimus@goto", jobSchedule, jobWindow, jobTask).Build() + assert.NoError(t, err) + jobSpec3, err := job.NewSpecBuilder(1, "job3", "optimus@goto", jobSchedule, jobWindow, jobTask).Build() + assert.NoError(t, err) + + job1 := job.NewJob(sampleTenant, jobSpec1, resource.ZeroURN(), nil, false) + job1Downstream := job.NewDownstream(jobSpec2.Name(), sampleTenant.ProjectName(), sampleTenant.NamespaceName(), taskName) + job2Downstream := job.NewDownstream(jobSpec3.Name(), sampleTenant.ProjectName(), sampleTenant.NamespaceName(), taskName) + + jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), job.Name("job1")).Return(job1, nil) + + tenantDetailsGetter.On("GetDetails", ctx, sampleTenant).Return(detailedTenant, nil) + + downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec1.Name()).Return([]*job.Downstream{job1Downstream}, nil) + downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec2.Name()).Return([]*job.Downstream{job2Downstream}, nil) + downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec3.Name()).Return([]*job.Downstream{}, nil) + + request := dto.ValidateRequest{ + Tenant: sampleTenant, + JobSpecs: nil, + JobNames: []string{"job1"}, + DeletionMode: true, + } + + expectedResult := map[job.Name][]dto.ValidateResult{ + "job1": { + { + Stage: "validation for deletion", + Messages: []string{"job is not safe for deletion", "validating job for deletion errors:\n failed precondition for entity job: deletion of job job1 will fail. job is being used by test-proj/job2"}, + Success: false, + }, + }, + } + + actualResult, actualError := jobService.Validate(ctx, request) + + assert.EqualValues(t, expectedResult["job1"], actualResult["job1"]) + assert.NoError(t, actualError) + }) + + t.Run("show each deleted jobs' direct downstream if 2 jobs to be deleted has a common remaining dependency", func(t *testing.T) { + // testcase for case A -> B -> C, and A & B are the jobs to be deleted + // when showing direct downstream, + tenantDetailsGetter := new(TenantDetailsGetter) + defer tenantDetailsGetter.AssertExpectations(t) + + jobRepo := new(JobRepository) + defer jobRepo.AssertExpectations(t) + + downstreamRepo := new(DownstreamRepository) + defer downstreamRepo.AssertExpectations(t) + + jobRunInputCompiler := NewJobRunInputCompiler(t) + resourceExistenceChecker := NewResourceExistenceChecker(t) + + jobService := service.NewJobService(jobRepo, nil, downstreamRepo, nil, nil, tenantDetailsGetter, nil, log, nil, compiler.NewEngine(), jobRunInputCompiler, resourceExistenceChecker) + + jobSpec1, err := job.NewSpecBuilder(1, "job1", "optimus@goto", jobSchedule, jobWindow, jobTask).Build() + assert.NoError(t, err) + jobSpec2, err := job.NewSpecBuilder(1, "job2", "optimus@goto", jobSchedule, jobWindow, jobTask).Build() + assert.NoError(t, err) + jobSpec3, err := job.NewSpecBuilder(1, "job3", "optimus@goto", jobSchedule, jobWindow, jobTask).Build() + assert.NoError(t, err) + + job1 := job.NewJob(sampleTenant, jobSpec1, resource.ZeroURN(), nil, false) + job2 := job.NewJob(sampleTenant, jobSpec2, resource.ZeroURN(), nil, false) + job1Downstream := job.NewDownstream(jobSpec2.Name(), sampleTenant.ProjectName(), sampleTenant.NamespaceName(), taskName) + job2Downstream := job.NewDownstream(jobSpec3.Name(), sampleTenant.ProjectName(), sampleTenant.NamespaceName(), taskName) + + jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), job.Name("job1")).Return(job1, nil) + jobRepo.On("GetByJobName", ctx, sampleTenant.ProjectName(), job.Name("job2")).Return(job2, nil) + + tenantDetailsGetter.On("GetDetails", ctx, sampleTenant).Return(detailedTenant, nil) + + downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec1.Name()).Return([]*job.Downstream{job1Downstream}, nil) + downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec2.Name()).Return([]*job.Downstream{job2Downstream}, nil) + downstreamRepo.On("GetDownstreamByJobName", ctx, sampleTenant.ProjectName(), jobSpec3.Name()).Return([]*job.Downstream{}, nil) + + request := dto.ValidateRequest{ + Tenant: sampleTenant, + JobSpecs: nil, + JobNames: []string{"job1", "job2"}, + DeletionMode: true, + } + + expectedResult := map[job.Name][]dto.ValidateResult{ + "job1": { + { + Stage: "validation for deletion", + Messages: []string{"job is not safe for deletion", "validating job for deletion errors:\n failed precondition for entity job: deletion of job job1 will fail. job is being used by test-proj/job2"}, + Success: false, + }, + }, + "job2": { + { + Stage: "validation for deletion", + Messages: []string{"job is not safe for deletion", "validating job for deletion errors:\n failed precondition for entity job: deletion of job job2 will fail. job is being used by test-proj/job3"}, + Success: false, + }, + }, + } + + actualResult, actualError := jobService.Validate(ctx, request) + + assert.EqualValues(t, expectedResult["job1"], actualResult["job1"]) + assert.EqualValues(t, expectedResult["job2"], actualResult["job2"]) + assert.NoError(t, actualError) + }) }) t.Run("non-deletion mode validation", func(t *testing.T) {