diff --git a/core/scheduler/job.go b/core/scheduler/job.go index 411192f206..ee30eb5f7d 100644 --- a/core/scheduler/job.go +++ b/core/scheduler/job.go @@ -154,6 +154,23 @@ func (j *JobWithDetails) GetUniqueLabelValues() []string { return labelValues } +func (j *JobWithDetails) GetSafeLabels() map[string]string { + emptyOutput := make(map[string]string) + if j == nil { + return emptyOutput + } + + if j.JobMetadata == nil { + return emptyOutput + } + + if j.JobMetadata.Labels == nil { + return emptyOutput + } + + return j.JobMetadata.Labels +} + type Retry struct { ExponentialBackoff bool Count int diff --git a/core/scheduler/job_test.go b/core/scheduler/job_test.go index e1fea88020..f302c84d25 100644 --- a/core/scheduler/job_test.go +++ b/core/scheduler/job_test.go @@ -213,6 +213,53 @@ func TestJob(t *testing.T) { labels := jobWithDetails.GetUniqueLabelValues() assert.ElementsMatch(t, labels, []string{"someVale", "another"}) }) + t.Run("GetSafeLabels", func(t *testing.T) { + t.Run("should return empty if job with details is nil", func(t *testing.T) { + var jobWithDetails *scheduler.JobWithDetails + + actualLabels := jobWithDetails.GetSafeLabels() + assert.NotNil(t, actualLabels) + assert.Empty(t, actualLabels) + }) + t.Run("should return empty if job metadata is nil", func(t *testing.T) { + jobWithDetails := scheduler.JobWithDetails{ + Name: "jobName", + JobMetadata: nil, + } + + actualLabels := jobWithDetails.GetSafeLabels() + assert.NotNil(t, actualLabels) + assert.Empty(t, actualLabels) + }) + t.Run("should return empty if job metadata label is nil", func(t *testing.T) { + jobWithDetails := scheduler.JobWithDetails{ + Name: "jobName", + JobMetadata: &scheduler.JobMetadata{ + Labels: nil, + }, + } + + actualLabels := jobWithDetails.GetSafeLabels() + assert.NotNil(t, actualLabels) + assert.Empty(t, actualLabels) + }) + t.Run("should return labels if job metadata label is not nil", func(t *testing.T) { + labels := map[string]string{ + "label1": "someVale", + "label2": "someVale", + "label3": "another", + } + + jobWithDetails := scheduler.JobWithDetails{ + Name: "jobName", + JobMetadata: &scheduler.JobMetadata{Labels: labels}, + } + + actualLabels := jobWithDetails.GetSafeLabels() + assert.NotNil(t, actualLabels) + assert.Equal(t, labels, actualLabels) + }) + }) t.Run("GroupJobsByTenant", func(t *testing.T) { t1, _ := tenant.NewTenant("proj", "ns1") t2, _ := tenant.NewTenant("proj", "ns1") diff --git a/ext/scheduler/airflow/dag/expected_dag.2.1.py b/ext/scheduler/airflow/dag/expected_dag.2.1.py index 5646e6b2cd..64c1285900 100644 --- a/ext/scheduler/airflow/dag/expected_dag.2.1.py +++ b/ext/scheduler/airflow/dag/expected_dag.2.1.py @@ -46,6 +46,9 @@ "on_success_callback": operator_success_event, "on_retry_callback" : operator_retry_event, "on_failure_callback": operator_failure_event, + "labels": { + "orchestrator": "optimus", + }, } """ diff --git a/ext/scheduler/airflow/dag/expected_dag.2.4.py b/ext/scheduler/airflow/dag/expected_dag.2.4.py index 5646e6b2cd..64c1285900 100644 --- a/ext/scheduler/airflow/dag/expected_dag.2.4.py +++ b/ext/scheduler/airflow/dag/expected_dag.2.4.py @@ -46,6 +46,9 @@ "on_success_callback": operator_success_event, "on_retry_callback" : operator_retry_event, "on_failure_callback": operator_failure_event, + "labels": { + "orchestrator": "optimus", + }, } """ diff --git a/ext/scheduler/airflow/dag/expected_dag.2.6.py b/ext/scheduler/airflow/dag/expected_dag.2.6.py index 5ed7f7fbc5..489a3adc72 100644 --- a/ext/scheduler/airflow/dag/expected_dag.2.6.py +++ b/ext/scheduler/airflow/dag/expected_dag.2.6.py @@ -46,6 +46,9 @@ "on_success_callback": operator_success_event, "on_retry_callback" : operator_retry_event, "on_failure_callback": operator_failure_event, + "labels": { + "orchestrator": "optimus", + }, } """ diff --git a/ext/scheduler/airflow/dag/template/dag.2.1.py.tmpl b/ext/scheduler/airflow/dag/template/dag.2.1.py.tmpl index 6650901af8..883092a602 100644 --- a/ext/scheduler/airflow/dag/template/dag.2.1.py.tmpl +++ b/ext/scheduler/airflow/dag/template/dag.2.1.py.tmpl @@ -54,6 +54,11 @@ default_args = { "on_success_callback": operator_success_event, "on_retry_callback" : operator_retry_event, "on_failure_callback": operator_failure_event, + "labels": { + {{- range $key, $value := $.JobDetails.GetSafeLabels }} + "{{ $key }}": "{{ $value }}", + {{- end }} + }, } {{ if ne .JobDetails.JobMetadata.Description "" -}} diff --git a/ext/scheduler/airflow/dag/template/dag.2.4.py.tmpl b/ext/scheduler/airflow/dag/template/dag.2.4.py.tmpl index 6650901af8..883092a602 100644 --- a/ext/scheduler/airflow/dag/template/dag.2.4.py.tmpl +++ b/ext/scheduler/airflow/dag/template/dag.2.4.py.tmpl @@ -54,6 +54,11 @@ default_args = { "on_success_callback": operator_success_event, "on_retry_callback" : operator_retry_event, "on_failure_callback": operator_failure_event, + "labels": { + {{- range $key, $value := $.JobDetails.GetSafeLabels }} + "{{ $key }}": "{{ $value }}", + {{- end }} + }, } {{ if ne .JobDetails.JobMetadata.Description "" -}} diff --git a/ext/scheduler/airflow/dag/template/dag.2.6.py.tmpl b/ext/scheduler/airflow/dag/template/dag.2.6.py.tmpl index a76d14d425..372ad28780 100644 --- a/ext/scheduler/airflow/dag/template/dag.2.6.py.tmpl +++ b/ext/scheduler/airflow/dag/template/dag.2.6.py.tmpl @@ -54,6 +54,11 @@ default_args = { "on_success_callback": operator_success_event, "on_retry_callback" : operator_retry_event, "on_failure_callback": operator_failure_event, + "labels": { + {{- range $key, $value := $.JobDetails.GetSafeLabels }} + "{{ $key }}": "{{ $value }}", + {{- end }} + }, } {{ if ne .JobDetails.JobMetadata.Description "" -}}