Skip to content

Commit

Permalink
feat: task config extrapolation for asset (#178)
Browse files Browse the repository at this point in the history
* feat: add task config extrapolation for asset

* fix: test compile asset after config compilation
  • Loading branch information
deryrahman committed Nov 17, 2023
1 parent de95831 commit 491bb08
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 15 deletions.
22 changes: 10 additions & 12 deletions core/scheduler/service/executor_input_compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,16 +112,21 @@ func (i InputCompiler) Compile(ctx context.Context, job *scheduler.JobWithDetail
compiler.From(systemDefinedVars).WithName(contextSystemDefined).AddToContext(),
)

// Compile asset files
fileMap, err := i.assetCompiler.CompileJobRunAssets(ctx, job.Job, systemDefinedVars, interval, taskContext)
confs, secretConfs, err := i.compileConfigs(job.Job.Task.Config, taskContext)
if err != nil {
i.logger.Error("error compiling job run assets: %s", err)
i.logger.Error("error compiling task config: %s", err)
return nil, err
}

confs, secretConfs, err := i.compileConfigs(job.Job.Task.Config, taskContext)
// Compile asset files
allTaskConfigs := compiler.PrepareContext(
compiler.From(confs, secretConfs).WithName(contextTask).WithKeyPrefix(taskConfigPrefix),
)

mergedContext := utils.MergeAnyMaps(taskContext, allTaskConfigs)
fileMap, err := i.assetCompiler.CompileJobRunAssets(ctx, job.Job, systemDefinedVars, interval, mergedContext)
if err != nil {
i.logger.Error("error compiling task config: %s", err)
i.logger.Error("error compiling job run assets: %s", err)
return nil, err
}

Expand Down Expand Up @@ -150,13 +155,6 @@ func (i InputCompiler) Compile(ctx context.Context, job *scheduler.JobWithDetail
}, nil
}

// If request for hook, add task configs to templateContext
hookContext := compiler.PrepareContext(
compiler.From(confs, secretConfs).WithName(contextTask).WithKeyPrefix(taskConfigPrefix),
)

mergedContext := utils.MergeAnyMaps(taskContext, hookContext)

hook, err := job.Job.GetHook(config.Executor.Name)
if err != nil {
i.logger.Error("error getting hook [%s]: %s", config.Executor.Name, err)
Expand Down
11 changes: 8 additions & 3 deletions core/scheduler/service/executor_input_compiler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@ func TestExecutorCompiler(t *testing.T) {
Tenant: tnnt,
WindowConfig: cw,
Assets: nil,
Task: &scheduler.Task{
Name: "bq2bq",
Config: map[string]string{},
},
}
details := scheduler.JobWithDetails{
Job: &job,
Expand Down Expand Up @@ -142,11 +146,14 @@ func TestExecutorCompiler(t *testing.T) {
}
taskContext := mock.Anything

templateCompiler := new(mockTemplateCompiler)
templateCompiler.On("Compile", mock.Anything, taskContext).Return(map[string]string{}, nil)
defer templateCompiler.AssertExpectations(t)
assetCompiler := new(mockAssetCompiler)
assetCompiler.On("CompileJobRunAssets", ctx, &job, systemDefinedVars, interval, taskContext).Return(nil, fmt.Errorf("CompileJobRunAssets error"))
defer assetCompiler.AssertExpectations(t)

inputCompiler := service.NewJobInputCompiler(tenantService, nil, assetCompiler, logger)
inputCompiler := service.NewJobInputCompiler(tenantService, templateCompiler, assetCompiler, logger)
inputExecutor, err := inputCompiler.Compile(ctx, &details, config, executedAt)

assert.NotNil(t, err)
Expand Down Expand Up @@ -212,7 +219,6 @@ func TestExecutorCompiler(t *testing.T) {
Return(nil, fmt.Errorf("some.config compilation error"))
defer templateCompiler.AssertExpectations(t)
assetCompiler := new(mockAssetCompiler)
assetCompiler.On("CompileJobRunAssets", ctx, &job, systemDefinedVars, interval, taskContext).Return(compiledFile, nil)
defer assetCompiler.AssertExpectations(t)
inputCompiler := service.NewJobInputCompiler(tenantService, templateCompiler, assetCompiler, logger)
inputExecutor, err := inputCompiler.Compile(ctx, &details, config, executedAt)
Expand All @@ -229,7 +235,6 @@ func TestExecutorCompiler(t *testing.T) {
Return(nil, fmt.Errorf("secret.config compilation error"))
defer templateCompiler.AssertExpectations(t)
assetCompiler := new(mockAssetCompiler)
assetCompiler.On("CompileJobRunAssets", ctx, &job, systemDefinedVars, interval, taskContext).Return(compiledFile, nil)
defer assetCompiler.AssertExpectations(t)
inputCompiler := service.NewJobInputCompiler(tenantService, templateCompiler, assetCompiler, logger)
inputExecutor, err := inputCompiler.Compile(ctx, &details, config, executedAt)
Expand Down

0 comments on commit 491bb08

Please sign in to comment.