Skip to content

Commit

Permalink
revert: rework job validation (#215) (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mryashbhardwaj authored May 9, 2024
1 parent 8113b57 commit 995884a
Show file tree
Hide file tree
Showing 61 changed files with 1,873 additions and 4,318 deletions.
328 changes: 56 additions & 272 deletions client/cmd/job/validate.go

Large diffs are not rendered by default.

19 changes: 0 additions & 19 deletions core/job/dto/stage.go

This file was deleted.

19 changes: 0 additions & 19 deletions core/job/dto/validation.go

This file was deleted.

71 changes: 22 additions & 49 deletions core/job/handler/v1beta1/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/goto/optimus/core/job"
"github.com/goto/optimus/core/job/dto"
"github.com/goto/optimus/core/job/service/filter"
"github.com/goto/optimus/core/tenant"
"github.com/goto/optimus/internal/errors"
Expand All @@ -25,6 +24,7 @@ import (
const (
metricReplaceAllDuration = "job_replace_all_duration_seconds"
metricRefreshDuration = "job_refresh_duration_seconds"
metricValidationDuration = "job_validation_duration_seconds"
)

type JobHandler struct {
Expand Down Expand Up @@ -53,7 +53,7 @@ type JobService interface {
GetByFilter(ctx context.Context, filters ...filter.FilterOpt) (jobSpecs []*job.Job, err error)
ReplaceAll(ctx context.Context, jobTenant tenant.Tenant, jobs []*job.Spec, jobNamesWithInvalidSpec []job.Name, logWriter writer.LogWriter) error
Refresh(ctx context.Context, projectName tenant.ProjectName, namespaceNames, jobNames []string, logWriter writer.LogWriter) error
Validate(context.Context, dto.ValidateRequest) (map[job.Name][]dto.ValidateResult, error)
Validate(ctx context.Context, jobTenant tenant.Tenant, jobSpecs []*job.Spec, jobNamesWithInvalidSpec []job.Name, logWriter writer.LogWriter) error

GetJobBasicInfo(ctx context.Context, jobTenant tenant.Tenant, jobName job.Name, spec *job.Spec) (*job.Job, writer.BufferedLogger)
GetUpstreamsToInspect(ctx context.Context, subjectJob *job.Job, localJob bool) ([]*job.Upstream, error)
Expand Down Expand Up @@ -419,42 +419,35 @@ func (jh *JobHandler) RefreshJobs(request *pb.RefreshJobsRequest, stream pb.JobS
return nil
}

// Deprecated: Do not use.
func (*JobHandler) CheckJobSpecifications(_ *pb.CheckJobSpecificationsRequest, _ pb.JobSpecificationService_CheckJobSpecificationsServer) error {
panic("deprecated, use validate endpoint instead")
}
func (jh *JobHandler) CheckJobSpecifications(req *pb.CheckJobSpecificationsRequest, stream pb.JobSpecificationService_CheckJobSpecificationsServer) error {
startTime := time.Now()

func (jh *JobHandler) Validate(ctx context.Context, request *pb.ValidateRequest) (*pb.ValidateResponse, error) {
tnnt, err := tenant.NewTenant(request.GetProjectName(), request.GetNamespaceName())
responseWriter := writer.NewCheckJobSpecificationResponseWriter(stream)
jobTenant, err := tenant.NewTenant(req.ProjectName, req.NamespaceName)
if err != nil {
return nil, err
jh.l.Error("invalid tenant information request project [%s] namespace [%s]: %s", req.GetProjectName(), req.GetNamespaceName(), err)
return err
}

jobSpecs := make([]*job.Spec, len(request.GetFromOutside().GetJobs()))
for i, job := range request.GetFromOutside().GetJobs() {
spec, err := fromJobProto(job)
if err != nil {
return nil, err
}

jobSpecs[i] = spec
me := errors.NewMultiError("check / validate job spec errors")
jobSpecs, jobNamesWithInvalidSpec, err := fromJobProtos(req.Jobs)
if err != nil {
jh.l.Error("error when adapting job specifications: %s", err)
me.Append(err)
}

validateRequest := dto.ValidateRequest{
Tenant: tnnt,
JobSpecs: jobSpecs,
JobNames: request.GetFromServer().GetJobNames(),
DeletionMode: request.GetFromServer().GetDeletionMode(),
if err := jh.jobService.Validate(stream.Context(), jobTenant, jobSpecs, jobNamesWithInvalidSpec, responseWriter); err != nil {
jh.l.Error("error validating job: %s", err)
me.Append(err)
}

result, err := jh.jobService.Validate(ctx, validateRequest)
if err != nil {
return nil, err
}
processDuration := time.Since(startTime)
telemetry.NewGauge(metricValidationDuration, map[string]string{
"project": jobTenant.ProjectName().String(),
"namespace": jobTenant.NamespaceName().String(),
}).Add(processDuration.Seconds())

return &pb.ValidateResponse{
ResultsByJobName: toValidateResultProto(result),
}, nil
return me.ToErr()
}

func (jh *JobHandler) GetJobTask(ctx context.Context, req *pb.GetJobTaskRequest) (*pb.GetJobTaskResponse, error) {
Expand Down Expand Up @@ -645,23 +638,3 @@ func raiseJobEventMetric(jobTenant tenant.Tenant, state string, metricValue int)
"status": state,
}).Add(float64(metricValue))
}

func toValidateResultProto(result map[job.Name][]dto.ValidateResult) map[string]*pb.ValidateResponse_ResultList {
output := make(map[string]*pb.ValidateResponse_ResultList)
for jobName, validateResults := range result {
resultsProto := make([]*pb.ValidateResponse_Result, len(validateResults))
for i, rst := range validateResults {
resultsProto[i] = &pb.ValidateResponse_Result{
Name: rst.Stage.String(),
Messages: rst.Messages,
Success: rst.Success,
}
}

output[jobName.String()] = &pb.ValidateResponse_ResultList{
Results: resultsProto,
}
}

return output
}
3 changes: 1 addition & 2 deletions core/job/handler/v1beta1/job_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"google.golang.org/protobuf/types/known/durationpb"

"github.com/goto/optimus/core/job"
"github.com/goto/optimus/core/resource"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/internal/lib/labels"
"github.com/goto/optimus/internal/lib/window"
Expand Down Expand Up @@ -200,7 +199,7 @@ func fromJobProto(js *pb.JobSpecification) (*job.Spec, error) {
return jobSpecBuilder.Build()
}

func fromResourceURNs(resourceURNs []resource.URN) []string {
func fromResourceURNs(resourceURNs []job.ResourceURN) []string {
var resources []string
for _, resourceURN := range resourceURNs {
resources = append(resources, resourceURN.String())
Expand Down
Loading

0 comments on commit 995884a

Please sign in to comment.