Skip to content

Commit

Permalink
feat: support auto deps when destination job is changed (#242)
Browse files Browse the repository at this point in the history
* feat: auto dependency capability when destination is changed

* refactor: appending job to be resolved

* test: add test when add new upstream

* test: add test when update upstream

* feat: suport downstream auto deps on upsert api

* fix: support downstream autodeps on replace all

* fix: test on replace all

* feat: deduplicate jobs before resolve

* test: add testcase fror deduplicate + resolve deps

* test: fix ordering issue on test
  • Loading branch information
deryrahman authored and ahmadnaufal committed Jul 3, 2024
1 parent 503ae9b commit 0e5c6d7
Show file tree
Hide file tree
Showing 4 changed files with 302 additions and 11 deletions.
14 changes: 14 additions & 0 deletions core/job/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,20 @@ func (j Jobs) GetJobsWithUnresolvedStaticUpstreams() ([]*WithUpstream, error) {
return jobsWithUnresolvedUpstream, me.ToErr()
}

func (j Jobs) Deduplicate() []*Job {
jobByName := map[string]*Job{}
for _, subjectJob := range j {
jobByName[subjectJob.FullName()] = subjectJob
}
deduplicatedJobs := make([]*Job, len(jobByName))
i := 0
for _, subjectJob := range jobByName {
deduplicatedJobs[i] = subjectJob
i++
}
return deduplicatedJobs
}

type WithUpstream struct {
job *Job
upstreams []*Upstream
Expand Down
13 changes: 13 additions & 0 deletions core/job/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,4 +495,17 @@ func TestEntityJob(t *testing.T) {
assert.Len(t, jobsWithUpstreams, 2)
})
})
t.Run("JobsDeduplicate", func(t *testing.T) {
t.Run("should return deduplicate jobs when there's duplication", func(t *testing.T) {
specA, _ := job.NewSpecBuilder(jobVersion, "job-A", "sample-owner", jobSchedule, jobWindow, jobTask).Build()
specB, _ := job.NewSpecBuilder(jobVersion, "job-B", "sample-owner", jobSchedule, jobWindow, jobTask).Build()
jobA := job.NewJob(sampleTenant, specA, jobADestination, jobASources, false)
jobB := job.NewJob(sampleTenant, specB, jobADestination, jobASources, false)

jobs := []*job.Job{jobA, jobB, jobB, jobA}
deduplicated := job.Jobs(jobs).Deduplicate()
assert.Len(t, deduplicated, 2)
assert.ElementsMatch(t, deduplicated, []*job.Job{jobA, jobB})
})
})
}
98 changes: 87 additions & 11 deletions core/job/service/job_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,16 @@ func (j *JobService) Add(ctx context.Context, jobTenant tenant.Tenant, specs []*
addedJobs, err := j.jobRepo.Add(ctx, jobs)
me.Append(err)

jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), addedJobs, logWriter)
downstreamJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), jobs)
me.Append(err)

jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), append(addedJobs, downstreamJobs...), logWriter)
me.Append(err)

err = j.upstreamRepo.ReplaceUpstreams(ctx, jobsWithUpstreams)
me.Append(err)

err = j.uploadJobs(ctx, jobTenant, addedJobs, nil, nil)
err = j.uploadJobs(ctx, jobTenant, addedJobs, downstreamJobs, nil)
me.Append(err)

for _, addedJob := range addedJobs {
Expand Down Expand Up @@ -213,34 +216,48 @@ func (j *JobService) Update(ctx context.Context, jobTenant tenant.Tenant, specs
j.logger.Error("error getting tenant details: %s", err)
return []job.Name{}, err
}
existingJobs := make(map[job.Name]*job.Job)
existingJobs := []*job.Job{}
existingJobsMap := make(map[job.Name]*job.Job)
for _, spec := range specs {
existingJob, err := j.jobRepo.GetByJobName(ctx, jobTenant.ProjectName(), spec.Name())
if err != nil {
me.Append(err)
continue
}
existingJobs[spec.Name()] = existingJob
existingJobsMap[spec.Name()] = existingJob
existingJobs = append(existingJobs, existingJob)
}

downstreamExistingJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), existingJobs)
me.Append(err)

jobs, err := j.generateJobs(ctx, tenantWithDetails, specs, logWriter)
me.Append(err)

updatedJobs, err := j.jobRepo.Update(ctx, jobs)
me.Append(err)

jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), updatedJobs, logWriter)
downstreamUpdatedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), updatedJobs)
me.Append(err)

jobsToBeResolved := []*job.Job{}
jobsToBeResolved = append(jobsToBeResolved, updatedJobs...)
jobsToBeResolved = append(jobsToBeResolved, downstreamExistingJobs...)
jobsToBeResolved = append(jobsToBeResolved, downstreamUpdatedJobs...)
jobsToBeResolved = job.Jobs(jobsToBeResolved).Deduplicate()

jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), jobsToBeResolved, logWriter)
me.Append(err)

err = j.upstreamRepo.ReplaceUpstreams(ctx, jobsWithUpstreams)
me.Append(err)

err = j.uploadJobs(ctx, jobTenant, nil, updatedJobs, nil)
err = j.uploadJobs(ctx, jobTenant, nil, jobsToBeResolved, nil)
me.Append(err)

if len(updatedJobs) > 0 {
for _, updatedJob := range updatedJobs {
j.raiseUpdateEvent(updatedJob, getUpdateImpactType(existingJobs[updatedJob.Spec().Name()], updatedJob))
j.raiseUpdateEvent(updatedJob, getUpdateImpactType(existingJobsMap[updatedJob.Spec().Name()], updatedJob))
}
raiseJobEventMetric(jobTenant, job.MetricJobEventStateUpdated, len(updatedJobs))
}
Expand Down Expand Up @@ -278,6 +295,8 @@ func (j *JobService) Upsert(ctx context.Context, jobTenant tenant.Tenant, specs
}
existingJobs = append(existingJobs, existingJob)
}
downstreamExistingJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), existingJobs)
me.Append(err)

specsToAdd, specsToUpdate, _, specsUnmodified, specsDirty := j.differentiateSpecs(tenantWithDetails.ToTenant(), existingJobs, specs, nil)
specsToUpdate = append(specsToUpdate, specsDirty...)
Expand All @@ -286,18 +305,28 @@ func (j *JobService) Upsert(ctx context.Context, jobTenant tenant.Tenant, specs
me.Append(err)
j.raiseUpdateEvents(existingJobs, addedJobs, updatedJobs)

downstreamUpdatedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), updatedJobs)
me.Append(err)
downstreamAddedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), addedJobs)
me.Append(err)

var upsertedJobs []*job.Job
upsertedJobs = append(upsertedJobs, addedJobs...)
upsertedJobs = append(upsertedJobs, updatedJobs...)
downstreamToBeResolved := []*job.Job{}
downstreamToBeResolved = append(downstreamToBeResolved, downstreamExistingJobs...)
downstreamToBeResolved = append(downstreamToBeResolved, downstreamUpdatedJobs...)
downstreamToBeResolved = append(downstreamToBeResolved, downstreamAddedJobs...)
downstreamToBeResolved = job.Jobs(downstreamToBeResolved).Deduplicate()

if len(upsertedJobs) > 0 {
jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), upsertedJobs, logWriter)
jobsWithUpstreams, err := j.upstreamResolver.BulkResolve(ctx, jobTenant.ProjectName(), append(upsertedJobs, downstreamToBeResolved...), logWriter)
me.Append(err)

err = j.upstreamRepo.ReplaceUpstreams(ctx, jobsWithUpstreams)
me.Append(err)

err = j.uploadJobs(ctx, jobTenant, addedJobs, updatedJobs, nil)
err = j.uploadJobs(ctx, jobTenant, addedJobs, append(updatedJobs, downstreamToBeResolved...), nil)
me.Append(err)
}

Expand Down Expand Up @@ -648,6 +677,10 @@ func (j *JobService) ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, sp
j.logger.Error("error getting all jobs for tenant: %s/%s, details: %s", jobTenant.ProjectName(), jobTenant.NamespaceName(), err)
return err
}
existingJobsMap := make(map[job.Name]*job.Job, len(existingJobs))
for _, existingJob := range existingJobs {
existingJobsMap[existingJob.Spec().Name()] = existingJob
}

toAdd, toUpdate, toDelete, _, unmodifiedDirtySpecs := j.differentiateSpecs(jobTenant, existingJobs, specs, jobNamesWithInvalidSpec)
logWriter.Write(writer.LogLevelInfo, fmt.Sprintf("[%s] found %d new, %d modified, and %d deleted job specs", jobTenant.NamespaceName().String(), len(toAdd), len(toUpdate), len(toDelete)))
Expand All @@ -658,6 +691,14 @@ func (j *JobService) ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, sp
me := errors.NewMultiError("persist job error")

toUpdate = append(toUpdate, unmodifiedDirtySpecs...)
toUpdateJobs := []*job.Job{}
for _, spec := range toUpdate {
if currentJob, ok := existingJobsMap[spec.Name()]; ok {
toUpdateJobs = append(toUpdateJobs, currentJob)
}
}
downstreamExistingJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), toUpdateJobs)
me.Append(err)

addedJobs, updatedJobs, err := j.bulkJobPersist(ctx, tenantWithDetails, toAdd, toUpdate, logWriter)
j.raiseUpdateEvents(existingJobs, addedJobs, updatedJobs)
Expand All @@ -673,11 +714,22 @@ func (j *JobService) ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, sp
return err
}

if err := j.resolveAndSaveUpstreams(ctx, jobTenant, logWriter, addedJobs, updatedJobs); err != nil {
downstreamUpdatedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), updatedJobs)
me.Append(err)
downstreamAddedJobs, err := j.getDownstreamJobs(ctx, jobTenant.ProjectName(), addedJobs)
me.Append(err)

downstreamToBeResolved := []*job.Job{}
downstreamToBeResolved = append(downstreamToBeResolved, downstreamExistingJobs...)
downstreamToBeResolved = append(downstreamToBeResolved, downstreamUpdatedJobs...)
downstreamToBeResolved = append(downstreamToBeResolved, downstreamAddedJobs...)
downstreamToBeResolved = job.Jobs(downstreamToBeResolved).Deduplicate()

if err := j.resolveAndSaveUpstreams(ctx, jobTenant, logWriter, addedJobs, updatedJobs, downstreamToBeResolved); err != nil {
return errors.Wrap(job.EntityJob, "failed resolving job upstreams", err)
}

if err := j.uploadJobs(ctx, jobTenant, addedJobs, updatedJobs, nil); err != nil {
if err := j.uploadJobs(ctx, jobTenant, addedJobs, append(updatedJobs, downstreamToBeResolved...), nil); err != nil {
return errors.Wrap(job.EntityJob, "failed uploading compiled dags", err)
}

Expand Down Expand Up @@ -1915,3 +1967,27 @@ func (j *JobService) GetDownstreamByResourceURN(ctx context.Context, tnnt tenant

return dependentJobs, nil
}

func (j *JobService) getDownstreamJobs(ctx context.Context, projectName tenant.ProjectName, jobs []*job.Job) ([]*job.Job, error) {
me := errors.NewMultiError("get downstream jobs errors")
downstreamJobs := []*job.Job{}
for _, currentJob := range jobs {
if currentJob.Destination() == resource.ZeroURN() {
continue
}
downstreams, err := j.downstreamRepo.GetDownstreamByDestination(ctx, projectName, currentJob.Destination())
if err != nil {
me.Append(err)
continue
}
for _, d := range downstreams {
downstreamJob, err := j.jobRepo.GetByJobName(ctx, projectName, d.Name())
if err != nil {
me.Append(err)
continue
}
downstreamJobs = append(downstreamJobs, downstreamJob)
}
}
return downstreamJobs, me.ToErr()
}
Loading

0 comments on commit 0e5c6d7

Please sign in to comment.