diff --git a/core/job/job.go b/core/job/job.go index 8c35893b40..095ba63ed0 100644 --- a/core/job/job.go +++ b/core/job/job.go @@ -252,6 +252,20 @@ func (j Jobs) GetJobsWithUnresolvedStaticUpstreams() ([]*WithUpstream, error) { return jobsWithUnresolvedUpstream, me.ToErr() } +func (j Jobs) Deduplicate() []*Job { + jobByName := map[string]*Job{} + for _, subjectJob := range j { + jobByName[subjectJob.FullName()] = subjectJob + } + deduplicatedJobs := make([]*Job, len(jobByName)) + i := 0 + for _, subjectJob := range jobByName { + deduplicatedJobs[i] = subjectJob + i++ + } + return deduplicatedJobs +} + type WithUpstream struct { job *Job upstreams []*Upstream diff --git a/core/job/job_test.go b/core/job/job_test.go index e04823c775..dbb7c8b016 100644 --- a/core/job/job_test.go +++ b/core/job/job_test.go @@ -495,4 +495,17 @@ func TestEntityJob(t *testing.T) { assert.Len(t, jobsWithUpstreams, 2) }) }) + t.Run("JobsDeduplicate", func(t *testing.T) { + t.Run("should return deduplicate jobs when there's duplication", func(t *testing.T) { + specA, _ := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).Build() + specB, _ := job.NewSpecBuilder(jobVersion, "job-B", "sample-owner", jobSchedule, jobWindow, jobTask).Build() + jobA := job.NewJob(sampleTenant, specA, jobADestination, jobASources, false) + jobB := job.NewJob(sampleTenant, specB, jobADestination, jobASources, false) + + jobs := []*job.Job{jobA, jobB, jobB, jobA} + deduplicated := job.Jobs(jobs).Deduplicate() + assert.Len(t, deduplicated, 2) + assert.ElementsMatch(t, deduplicated, []*job.Job{jobA, jobB}) + }) + }) } diff --git a/core/job/service/job_service.go b/core/job/service/job_service.go index 56f2d38b4d..75b37904bf 100644 --- a/core/job/service/job_service.go +++ b/core/job/service/job_service.go @@ -178,13 +178,16 @@ func (j *JobService) Add(ctx context.Context, jobTenant tenant.Tenant, specs []* addedJobs, err := j.jobRepo.Add(ctx, jobs) me.Append(err) - jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), addedJobs, logWriter) + downstreamJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), jobs) + me.Append(err) + + jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), append(addedJobs, downstreamJobs...), logWriter) me.Append(err) err = j.upstreamRepo.ReplaceUpstreams(ctx, jobsWithUpstreams) me.Append(err) - err = j.uploadJobs(ctx, jobTenant, addedJobs, nil, nil) + err = j.uploadJobs(ctx, jobTenant, addedJobs, downstreamJobs, nil) me.Append(err) for _, addedJob := range addedJobs { @@ -213,34 +216,48 @@ func (j *JobService) Update(ctx context.Context, jobTenant tenant.Tenant, specs j.logger.Error("error getting tenant details: %s", err) return []job.Name{}, err } - existingJobs := make(map[job.Name]*job.Job) + existingJobs := []*job.Job{} + existingJobsMap := make(map[job.Name]*job.Job) for _, spec := range specs { existingJob, err := j.jobRepo.GetByJobName(ctx, jobTenant.ProjectName(), spec.Name()) if err != nil { me.Append(err) continue } - existingJobs[spec.Name()] = existingJob + existingJobsMap[spec.Name()] = existingJob + existingJobs = append(existingJobs, existingJob) } + downstreamExistingJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), existingJobs) + me.Append(err) + jobs, err := j.generateJobs(ctx, tenantWithDetails, specs, logWriter) me.Append(err) updatedJobs, err := j.jobRepo.Update(ctx, jobs) me.Append(err) - jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), updatedJobs, logWriter) + downstreamUpdatedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), updatedJobs) + me.Append(err) + + jobsToBeResolved := []*job.Job{} + jobsToBeResolved = append(jobsToBeResolved, updatedJobs...) + jobsToBeResolved = append(jobsToBeResolved, downstreamExistingJobs...) + jobsToBeResolved = append(jobsToBeResolved, downstreamUpdatedJobs...) + jobsToBeResolved = job.Jobs(jobsToBeResolved).Deduplicate() + + jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), jobsToBeResolved, logWriter) me.Append(err) err = j.upstreamRepo.ReplaceUpstreams(ctx, jobsWithUpstreams) me.Append(err) - err = j.uploadJobs(ctx, jobTenant, nil, updatedJobs, nil) + err = j.uploadJobs(ctx, jobTenant, nil, jobsToBeResolved, nil) me.Append(err) if len(updatedJobs) > 0 { for _, updatedJob := range updatedJobs { - j.raiseUpdateEvent(updatedJob, getUpdateImpactType(existingJobs[updatedJob.Spec().Name()], updatedJob)) + j.raiseUpdateEvent(updatedJob, getUpdateImpactType(existingJobsMap[updatedJob.Spec().Name()], updatedJob)) } raiseJobEventMetric(jobTenant, job.MetricJobEventStateUpdated, len(updatedJobs)) } @@ -278,6 +295,8 @@ func (j *JobService) Upsert(ctx context.Context, jobTenant tenant.Tenant, specs } existingJobs = append(existingJobs, existingJob) } + downstreamExistingJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), existingJobs) + me.Append(err) specsToAdd, specsToUpdate, _, specsUnmodified, specsDirty := j.differentiateSpecs(tenantWithDetails.ToTenant(), existingJobs, specs, nil) specsToUpdate = append(specsToUpdate, specsDirty...) @@ -286,18 +305,28 @@ func (j *JobService) Upsert(ctx context.Context, jobTenant tenant.Tenant, specs me.Append(err) j.raiseUpdateEvents(existingJobs, addedJobs, updatedJobs) + downstreamUpdatedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), updatedJobs) + me.Append(err) + downstreamAddedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), addedJobs) + me.Append(err) + var upsertedJobs []*job.Job upsertedJobs = append(upsertedJobs, addedJobs...) upsertedJobs = append(upsertedJobs, updatedJobs...) + downstreamToBeResolved := []*job.Job{} + downstreamToBeResolved = append(downstreamToBeResolved, downstreamExistingJobs...) + downstreamToBeResolved = append(downstreamToBeResolved, downstreamUpdatedJobs...) + downstreamToBeResolved = append(downstreamToBeResolved, downstreamAddedJobs...) + downstreamToBeResolved = job.Jobs(downstreamToBeResolved).Deduplicate() if len(upsertedJobs) > 0 { - jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), upsertedJobs, logWriter) + jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), append(upsertedJobs, downstreamToBeResolved...), logWriter) me.Append(err) err = j.upstreamRepo.ReplaceUpstreams(ctx, jobsWithUpstreams) me.Append(err) - err = j.uploadJobs(ctx, jobTenant, addedJobs, updatedJobs, nil) + err = j.uploadJobs(ctx, jobTenant, addedJobs, append(updatedJobs, downstreamToBeResolved...), nil) me.Append(err) } @@ -648,6 +677,10 @@ func (j *JobService) ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, sp j.logger.Error("error getting all jobs for tenant: %s/%s, details: %s", jobTenant.ProjectName(), jobTenant.NamespaceName(), err) return err } + existingJobsMap := make(map[job.Name]*job.Job, len(existingJobs)) + for _, existingJob := range existingJobs { + existingJobsMap[existingJob.Spec().Name()] = existingJob + } toAdd, toUpdate, toDelete, _, unmodifiedDirtySpecs := j.differentiateSpecs(jobTenant, existingJobs, specs, jobNamesWithInvalidSpec) logWriter.Write(writer.LogLevelInfo, fmt.Sprintf("[%s] found %d new, %d modified, and %d deleted job specs", jobTenant.NamespaceName().String(), len(toAdd), len(toUpdate), len(toDelete))) @@ -658,6 +691,14 @@ func (j *JobService) ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, sp me := errors.NewMultiError("persist job error") toUpdate = append(toUpdate, unmodifiedDirtySpecs...) + toUpdateJobs := []*job.Job{} + for _, spec := range toUpdate { + if currentJob, ok := existingJobsMap[spec.Name()]; ok { + toUpdateJobs = append(toUpdateJobs, currentJob) + } + } + downstreamExistingJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), toUpdateJobs) + me.Append(err) addedJobs, updatedJobs, err := j.bulkJobPersist(ctx, tenantWithDetails, toAdd, toUpdate, logWriter) j.raiseUpdateEvents(existingJobs, addedJobs, updatedJobs) @@ -673,11 +714,22 @@ func (j *JobService) ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, sp return err } - if err := j.resolveAndSaveUpstreams(ctx, jobTenant, logWriter, addedJobs, updatedJobs); err != nil { + downstreamUpdatedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), updatedJobs) + me.Append(err) + downstreamAddedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), addedJobs) + me.Append(err) + + downstreamToBeResolved := []*job.Job{} + downstreamToBeResolved = append(downstreamToBeResolved, downstreamExistingJobs...) + downstreamToBeResolved = append(downstreamToBeResolved, downstreamUpdatedJobs...) + downstreamToBeResolved = append(downstreamToBeResolved, downstreamAddedJobs...) + downstreamToBeResolved = job.Jobs(downstreamToBeResolved).Deduplicate() + + if err := j.resolveAndSaveUpstreams(ctx, jobTenant, logWriter, addedJobs, updatedJobs, downstreamToBeResolved); err != nil { return errors.Wrap(job.EntityJob, "failed resolving job upstreams", err) } - if err := j.uploadJobs(ctx, jobTenant, addedJobs, updatedJobs, nil); err != nil { + if err := j.uploadJobs(ctx, jobTenant, addedJobs, append(updatedJobs, downstreamToBeResolved...), nil); err != nil { return errors.Wrap(job.EntityJob, "failed uploading compiled dags", err) } @@ -1915,3 +1967,27 @@ func (j *JobService) GetDownstreamByResourceURN(ctx context.Context, tnnt tenant return dependentJobs, nil } + +func (j *JobService) getDownstreamJobs(ctx context.Context, projectName tenant.ProjectName, jobs []*job.Job) ([]*job.Job, error) { + me := errors.NewMultiError("get downstream jobs errors") + downstreamJobs := []*job.Job{} + for _, currentJob := range jobs { + if currentJob.Destination() == resource.ZeroURN() { + continue + } + downstreams, err := j.downstreamRepo.GetDownstreamByDestination(ctx, projectName, currentJob.Destination()) + if err != nil { + me.Append(err) + continue + } + for _, d := range downstreams { + downstreamJob, err := j.jobRepo.GetByJobName(ctx, projectName, d.Name()) + if err != nil { + me.Append(err) + continue + } + downstreamJobs = append(downstreamJobs, downstreamJob) + } + } + return downstreamJobs, me.ToErr() +} diff --git a/core/job/service/job_service_test.go b/core/job/service/job_service_test.go index 450134ca37..82845ced3d 100644 --- a/core/job/service/job_service_test.go +++ b/core/job/service/job_service_test.go @@ -125,6 +125,8 @@ func TestJobService(t *testing.T) { jobs := []*job.Job{jobA} jobRepo.On("Add", ctx, mock.Anything).Return(jobs, nil, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, jobADestination).Return([]*job.Downstream{}, nil) + upstream := job.NewUpstreamResolved("job-B", "", resourceURNB, sampleTenant, "static", taskName, false) jobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{upstream}) upstreamResolver.On("BulkResolve", ctx, project.Name(), jobs, mock.Anything).Return([]*job.WithUpstream{jobWithUpstream}, nil, nil) @@ -141,6 +143,73 @@ func TestJobService(t *testing.T) { assert.NoError(t, err) assert.Len(t, addedJobs, len(specs)) }) + t.Run("add jobs as a new upstream for existing job", func(t *testing.T) { + jobRepo := new(JobRepository) + defer jobRepo.AssertExpectations(t) + + upstreamRepo := new(UpstreamRepository) + defer upstreamRepo.AssertExpectations(t) + + downstreamRepo := new(DownstreamRepository) + defer downstreamRepo.AssertExpectations(t) + + pluginService := NewPluginService(t) + + upstreamResolver := new(UpstreamResolver) + defer upstreamResolver.AssertExpectations(t) + + tenantDetailsGetter := new(TenantDetailsGetter) + defer tenantDetailsGetter.AssertExpectations(t) + + jobDeploymentService := new(JobDeploymentService) + defer jobDeploymentService.AssertExpectations(t) + + eventHandler := newEventHandler(t) + + specA, _ := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).WithAsset(jobAsset).Build() + specB, _ := job.NewSpecBuilder(jobVersion, "job-B", "sample-owner", jobSchedule, jobWindow, jobTask).WithAsset(jobAsset).Build() + specs := []*job.Spec{specA} + + tenantDetailsGetter.On("GetDetails", ctx, sampleTenant).Return(detailedTenant, nil) + + jobADestination := resourceURNA + jobBDestination := resourceURNB + pluginService.On("ConstructDestinationURN", ctx, specA.Task().Name().String(), mock.Anything).Return(jobADestination, nil) + + jobAUpstreamName := []resource.URN{} + jobBUpstreamName := []resource.URN{resourceURNA} + pluginService.On("IdentifyUpstreams", ctx, specA.Task().Name().String(), mock.Anything, mock.Anything).Return(jobAUpstreamName, nil) + + jobA := job.NewJob(sampleTenant, specA, jobADestination, jobAUpstreamName, false) + jobB := job.NewJob(sampleTenant, specB, jobBDestination, jobBUpstreamName, false) + jobs := []*job.Job{jobA} + jobRepo.On("Add", ctx, mock.Anything).Return(jobs, nil, nil) + + jobBDownstream := job.NewDownstream("job-B", sampleTenant.ProjectName(), sampleTenant.NamespaceName(), jobTask.Name()) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, jobADestination).Return([]*job.Downstream{jobBDownstream}, nil) + jobRepo.On("GetByJobName", ctx, mock.Anything, jobBDownstream.Name()).Return(jobB, nil) + + upstream := job.NewUpstreamResolved("job-A", "", resourceURNA, sampleTenant, "static", taskName, false) + jobAWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{}) + jobBWithUpstream := job.NewWithUpstream(jobB, []*job.Upstream{upstream}) + upstreamResolver.On("BulkResolve", ctx, project.Name(), mock.MatchedBy(func(elems []*job.Job) bool { + return assert.ElementsMatch(t, elems, append(jobs, jobB)) + }), mock.Anything).Return([]*job.WithUpstream{jobAWithUpstream, jobBWithUpstream}, nil, nil) + + upstreamRepo.On("ReplaceUpstreams", ctx, []*job.WithUpstream{jobAWithUpstream, jobBWithUpstream}).Return(nil) + + jobNamesToUpload := []string{jobA.GetName(), jobB.GetName()} + jobDeploymentService.On("UploadJobs", ctx, sampleTenant, mock.MatchedBy(func(elems []string) bool { + return assert.ElementsMatch(t, elems, jobNamesToUpload) + }), emptyJobNames).Return(nil) + + eventHandler.On("HandleEvent", mock.Anything).Times(1) + + jobService := service.NewJobService(jobRepo, upstreamRepo, downstreamRepo, pluginService, upstreamResolver, tenantDetailsGetter, eventHandler, log, jobDeploymentService, compiler.NewEngine(), nil, nil) + addedJobs, err := jobService.Add(ctx, sampleTenant, specs) + assert.NoError(t, err) + assert.Len(t, addedJobs, len(specs)) + }) t.Run("return error if unable to get detailed tenant", func(t *testing.T) { jobRepo := new(JobRepository) defer jobRepo.AssertExpectations(t) @@ -362,6 +431,8 @@ func TestJobService(t *testing.T) { savedJobs := []*job.Job{jobB} jobRepo.On("Add", ctx, mock.Anything).Return(savedJobs, errors.New("unable to save job A")) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + jobWithUpstreamB := job.NewWithUpstream(jobB, nil) upstreamResolver.On("BulkResolve", ctx, project.Name(), savedJobs, mock.Anything).Return([]*job.WithUpstream{jobWithUpstreamB}, nil, nil) @@ -399,6 +470,8 @@ func TestJobService(t *testing.T) { tenantDetailsGetter.On("GetDetails", ctx, sampleTenant).Return(detailedTenant, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + upstreamResolver.On("BulkResolve", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) upstreamRepo.On("ReplaceUpstreams", ctx, mock.Anything).Return(nil) @@ -454,6 +527,8 @@ func TestJobService(t *testing.T) { jobRepo.On("Add", ctx, mock.Anything).Return(jobs, nil, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + jobWithUpstreamA := job.NewWithUpstream(jobA, nil) upstreamResolver.On("BulkResolve", ctx, project.Name(), jobs, mock.Anything).Return([]*job.WithUpstream{jobWithUpstreamA}, nil, nil) @@ -506,6 +581,8 @@ func TestJobService(t *testing.T) { jobs := []*job.Job{jobA} jobRepo.On("Add", ctx, mock.Anything).Return(jobs, nil, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + upstream := job.NewUpstreamResolved("job-B", "", resourceURNB, sampleTenant, "static", taskName, false) jobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{upstream}) upstreamResolver.On("BulkResolve", ctx, project.Name(), jobs, mock.Anything).Return([]*job.WithUpstream{jobWithUpstream}, nil, nil) @@ -557,6 +634,8 @@ func TestJobService(t *testing.T) { jobAUpstreamName := []resource.URN{resourceURNB} pluginService.On("IdentifyUpstreams", ctx, specA.Task().Name().String(), mock.Anything, mock.Anything).Return(jobAUpstreamName, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + jobA := job.NewJob(sampleTenant, specA, jobADestination, jobAUpstreamName, false) jobs := []*job.Job{jobA} jobRepo.On("Update", ctx, mock.Anything).Return(jobs, nil, nil) @@ -578,6 +657,85 @@ func TestJobService(t *testing.T) { assert.NoError(t, err) assert.Len(t, updateJobs, 1) }) + + t.Run("update jobs as a new upstream for another job", func(t *testing.T) { + jobRepo := new(JobRepository) + defer jobRepo.AssertExpectations(t) + + upstreamRepo := new(UpstreamRepository) + defer upstreamRepo.AssertExpectations(t) + + downstreamRepo := new(DownstreamRepository) + defer downstreamRepo.AssertExpectations(t) + + pluginService := NewPluginService(t) + + upstreamResolver := new(UpstreamResolver) + defer upstreamResolver.AssertExpectations(t) + + tenantDetailsGetter := new(TenantDetailsGetter) + defer tenantDetailsGetter.AssertExpectations(t) + + jobDeploymentService := new(JobDeploymentService) + defer jobDeploymentService.AssertExpectations(t) + + eventHandler := newEventHandler(t) + + specA, _ := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).WithAsset(jobAsset).Build() + specB, _ := job.NewSpecBuilder(jobVersion, "job-B", "sample-owner", jobSchedule, jobWindow, jobTask).WithAsset(jobAsset).Build() + specC, _ := job.NewSpecBuilder(jobVersion, "job-C", "sample-owner", jobSchedule, jobWindow, jobTask).WithAsset(jobAsset).Build() + specs := []*job.Spec{specA} + + tenantDetailsGetter.On("GetDetails", ctx, sampleTenant).Return(detailedTenant, nil) + + jobADestination := resourceURNA + jobBDestination := resourceURNB + pluginService.On("ConstructDestinationURN", ctx, specA.Task().Name().String(), mock.Anything).Return(jobADestination, nil) + + jobAUpstreamName := []resource.URN{} + jobBUpstreamName := []resource.URN{resourceURNA} + jobCUpstreamName := []resource.URN{resourceURNA} + pluginService.On("IdentifyUpstreams", ctx, specA.Task().Name().String(), mock.Anything, mock.Anything).Return(jobAUpstreamName, nil) + + jobA := job.NewJob(sampleTenant, specA, jobADestination, jobAUpstreamName, false) + jobB := job.NewJob(sampleTenant, specB, jobBDestination, jobBUpstreamName, false) + jobC := job.NewJob(sampleTenant, specC, jobBDestination, jobCUpstreamName, false) + jobs := []*job.Job{jobA} + jobRepo.On("GetByJobName", ctx, project.Name(), specA.Name()).Return(jobA, nil) + + jobBDownstream := job.NewDownstream("job-B", sampleTenant.ProjectName(), sampleTenant.NamespaceName(), jobTask.Name()) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, jobADestination).Return([]*job.Downstream{jobBDownstream}, nil).Once() + jobRepo.On("GetByJobName", ctx, mock.Anything, jobBDownstream.Name()).Return(jobB, nil) + + jobRepo.On("Update", ctx, mock.Anything).Return(jobs, nil, nil) + + jobCDownstream := job.NewDownstream("job-C", sampleTenant.ProjectName(), sampleTenant.NamespaceName(), jobTask.Name()) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, jobADestination).Return([]*job.Downstream{jobCDownstream}, nil).Once() + jobRepo.On("GetByJobName", ctx, mock.Anything, jobCDownstream.Name()).Return(jobC, nil) + + upstream := job.NewUpstreamResolved("job-A", "", resourceURNA, sampleTenant, "static", taskName, false) + jobAWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{}) + jobBWithUpstream := job.NewWithUpstream(jobB, []*job.Upstream{}) + jobCWithUpstream := job.NewWithUpstream(jobC, []*job.Upstream{upstream}) + upstreamResolver.On("BulkResolve", ctx, project.Name(), mock.MatchedBy(func(elems []*job.Job) bool { + return assert.ElementsMatch(t, elems, append(jobs, jobB, jobC)) + }), mock.Anything).Return([]*job.WithUpstream{jobAWithUpstream, jobBWithUpstream, jobCWithUpstream}, nil, nil) + + upstreamRepo.On("ReplaceUpstreams", ctx, []*job.WithUpstream{jobAWithUpstream, jobBWithUpstream, jobCWithUpstream}).Return(nil) + + jobNamesToUpload := []string{jobA.GetName(), jobB.GetName(), jobC.GetName()} + jobDeploymentService.On("UploadJobs", ctx, sampleTenant, mock.MatchedBy(func(elems []string) bool { + return assert.ElementsMatch(t, elems, jobNamesToUpload) + }), emptyJobNames).Return(nil) + + eventHandler.On("HandleEvent", mock.Anything).Times(1) + + jobService := service.NewJobService(jobRepo, upstreamRepo, downstreamRepo, pluginService, upstreamResolver, tenantDetailsGetter, eventHandler, log, jobDeploymentService, compiler.NewEngine(), nil, nil) + updateJobs, err := jobService.Update(ctx, sampleTenant, specs) + assert.NoError(t, err) + assert.Len(t, updateJobs, 1) + }) + t.Run("return error if unable to get detailed tenant", func(t *testing.T) { jobRepo := new(JobRepository) defer jobRepo.AssertExpectations(t) @@ -659,6 +817,8 @@ func TestJobService(t *testing.T) { jobRepo.On("GetByJobName", ctx, project.Name(), specB.Name()).Return(jobB, nil) jobRepo.On("GetByJobName", ctx, project.Name(), specC.Name()).Return(jobC, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + upstream := job.NewUpstreamResolved("job-B", "", resourceURNB, sampleTenant, "static", taskName, false) jobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{upstream}) upstreamResolver.On("BulkResolve", ctx, project.Name(), jobs, mock.Anything).Return([]*job.WithUpstream{jobWithUpstream}, nil, nil) @@ -703,6 +863,8 @@ func TestJobService(t *testing.T) { jobRepo.On("GetByJobName", ctx, project.Name(), specA.Name()).Return(jobA, nil) jobRepo.On("GetByJobName", ctx, project.Name(), specB.Name()).Return(jobB, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + upstreamResolver.On("BulkResolve", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) upstreamRepo.On("ReplaceUpstreams", ctx, mock.Anything).Return(nil) @@ -819,6 +981,8 @@ func TestJobService(t *testing.T) { jobRepo.On("GetByJobName", ctx, project.Name(), specA.Name()).Return(jobA, nil) jobRepo.On("GetByJobName", ctx, project.Name(), specB.Name()).Return(jobB, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + jobWithUpstreamB := job.NewWithUpstream(jobB, nil) upstreamResolver.On("BulkResolve", ctx, project.Name(), savedJobs, mock.Anything).Return([]*job.WithUpstream{jobWithUpstreamB}, nil, nil) @@ -857,6 +1021,8 @@ func TestJobService(t *testing.T) { tenantDetailsGetter.On("GetDetails", ctx, sampleTenant).Return(detailedTenant, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + upstreamResolver.On("BulkResolve", ctx, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil) upstreamRepo.On("ReplaceUpstreams", ctx, mock.Anything).Return(nil) @@ -917,6 +1083,8 @@ func TestJobService(t *testing.T) { jobRepo.On("Update", ctx, mock.Anything).Return(jobs, nil, nil) jobRepo.On("GetByJobName", ctx, project.Name(), specA.Name()).Return(jobA, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + jobWithUpstreamA := job.NewWithUpstream(jobA, nil) upstreamResolver.On("BulkResolve", ctx, project.Name(), jobs, mock.Anything).Return([]*job.WithUpstream{jobWithUpstreamA}, nil, nil) @@ -971,6 +1139,8 @@ func TestJobService(t *testing.T) { jobRepo.On("Update", ctx, mock.Anything).Return(jobs, nil, nil) jobRepo.On("GetByJobName", ctx, project.Name(), specA.Name()).Return(jobA, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + upstream := job.NewUpstreamResolved("job-B", "", resourceURNB, sampleTenant, "static", taskName, false) jobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{upstream}) upstreamResolver.On("BulkResolve", ctx, project.Name(), jobs, mock.Anything).Return([]*job.WithUpstream{jobWithUpstream}, nil, nil) @@ -1053,6 +1223,8 @@ func TestJobService(t *testing.T) { upstreamC := job.NewUpstreamResolved("job-C", "", resourceURNC, sampleTenant, "static", taskName, false) jobBWithUpstream := job.NewWithUpstream(jobBToadd, []*job.Upstream{upstreamC}) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + jobsToUpsert := []*job.Job{jobBToadd, jobAToUpdate} upstreamResolver.On("BulkResolve", ctx, project.Name(), jobsToUpsert, mock.Anything).Return([]*job.WithUpstream{jobAWithUpstream, jobBWithUpstream}, nil, nil) @@ -1246,6 +1418,8 @@ func TestJobService(t *testing.T) { jobRepo.On("GetByJobName", ctx, project.Name(), specB.Name()).Return(nil, errors.New("internal error")) jobRepo.On("GetByJobName", ctx, project.Name(), specC.Name()).Return(nil, errors.New("internal error")) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + jobs := []*job.Job{jobA} jobRepo.On("Add", ctx, mock.Anything).Return(jobs, nil) @@ -1324,6 +1498,8 @@ func TestJobService(t *testing.T) { jobRepo.On("GetByJobName", ctx, project.Name(), specAToUpdate.Name()).Return(jobAExisting, nil).Once() jobRepo.On("GetByJobName", ctx, project.Name(), specBToUpdate.Name()).Return(jobBToUpdate, nil).Once() + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + upstreamB := job.NewUpstreamResolved("job-B", "", resourceURNB, sampleTenant, "static", taskName, false) jobAWithUpstream := job.NewWithUpstream(jobAToUpdate, []*job.Upstream{upstreamB}) @@ -1644,6 +1820,8 @@ func TestJobService(t *testing.T) { upstream := job.NewUpstreamResolved("job-B", "", resourceURNB, sampleTenant, "static", taskName, false) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + jobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{upstream}) upstreamResolver.On("BulkResolve", ctx, project.Name(), []*job.Job{jobA}, mock.Anything).Return([]*job.WithUpstream{jobWithUpstream}, nil, nil) @@ -1713,6 +1891,8 @@ func TestJobService(t *testing.T) { upstream := job.NewUpstreamResolved("job-B", "", resourceURNB, sampleTenant, "static", taskName, false) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + jobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{upstream}) upstreamResolver.On("BulkResolve", ctx, project.Name(), []*job.Job{jobA}, mock.Anything).Return([]*job.WithUpstream{jobWithUpstream}, nil, nil) @@ -1913,6 +2093,8 @@ func TestJobService(t *testing.T) { downstreamRepo.On("GetDownstreamByJobName", ctx, project.Name(), existingSpecC.Name()).Return(nil, nil) jobRepo.On("Delete", ctx, project.Name(), existingSpecC.Name(), false).Return(nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + upstream := job.NewUpstreamResolved("job-B", "", resourceURNB, sampleTenant, "static", taskName, false) jobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{upstream}) upstreamResolver.On("BulkResolve", ctx, project.Name(), []*job.Job{jobA, jobB}, mock.Anything).Return([]*job.WithUpstream{jobWithUpstream}, nil, nil) @@ -1997,6 +2179,8 @@ func TestJobService(t *testing.T) { downstreamRepo.On("GetDownstreamByJobName", ctx, project.Name(), existingSpecC.Name()).Return(nil, nil) jobRepo.On("Delete", ctx, project.Name(), existingSpecC.Name(), false).Return(nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + upstream := job.NewUpstreamResolved("job-B", "", resourceURNB, sampleTenant, "static", taskName, false) jobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{upstream}) upstreamResolver.On("BulkResolve", ctx, project.Name(), []*job.Job{jobA, jobB}, mock.Anything).Return([]*job.WithUpstream{jobWithUpstream}, nil, nil) @@ -2309,6 +2493,8 @@ func TestJobService(t *testing.T) { pluginService.On("ConstructDestinationURN", ctx, specA.Task().Name().String(), mock.Anything).Return(jobADestination, nil).Once() pluginService.On("IdentifyUpstreams", ctx, specA.Task().Name().String(), mock.Anything, mock.Anything).Return(jobAUpstreamName, nil) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + jobRepo.On("Update", ctx, mock.Anything).Return([]*job.Job{}, errors.New("internal error")) logWriter.On("Write", mock.Anything, mock.Anything).Return(nil) @@ -2564,6 +2750,8 @@ func TestJobService(t *testing.T) { upstream := job.NewUpstreamResolved("job-B", "", resourceURNB, sampleTenant, "static", taskName, false) + downstreamRepo.On("GetDownstreamByDestination", ctx, mock.Anything, mock.Anything).Return([]*job.Downstream{}, nil) + jobWithUpstream := job.NewWithUpstream(jobA, []*job.Upstream{upstream}) upstreamResolver.On("BulkResolve", ctx, project.Name(), []*job.Job{jobA}, mock.Anything).Return([]*job.WithUpstream{jobWithUpstream}, nil, nil)