diff --git a/ext/scheduler/airflow/__lib.py b/ext/scheduler/airflow/__lib.py index 2444b23280..03b3f972d5 100644 --- a/ext/scheduler/airflow/__lib.py +++ b/ext/scheduler/airflow/__lib.py @@ -27,6 +27,7 @@ TIMESTAMP_MS_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ" SCHEDULER_ERR_MSG = "scheduler_error" +OPTIMUS_REQUEST_TIMEOUT_IN_SECS = int(Variable.get("optimus_request_timeout_in_secs", default_var=5 * 60)) def lookup_non_standard_cron_expression(expr: str) -> str: expr_mapping = { @@ -81,7 +82,7 @@ def get_job_run(self, optimus_project: str, optimus_job: str, startDate: str, en optimus_project=optimus_project, optimus_job=optimus_job, ) - response = requests.get(url, params={'start_date': startDate, 'end_date': endDate}) + response = requests.get(url, params={'start_date': startDate, 'end_date': endDate}, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS) self._raise_error_if_request_failed(response) return response.json() @@ -95,7 +96,7 @@ def get_task_window(self, scheduled_at: str, version: int, window_size: str, win window_offset=window_offset, window_truncate_upto=window_truncate_upto, ) - response = requests.get(url) + response = requests.get(url, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS) self._raise_error_if_request_failed(response) return response.json() @@ -104,7 +105,7 @@ def get_job_run_input(self, execution_date: str, project_name: str, job_name: st response = requests.post(url="{}/api/v1beta1/project/{}/job/{}/run_input".format(self.host, project_name, job_name), json={'scheduled_at': execution_date, 'instance_name': instance_name, - 'instance_type': "TYPE_" + job_type.upper()}) + 'instance_type': "TYPE_" + job_type.upper()}, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS) self._raise_error_if_request_failed(response) return response.json() @@ -115,7 +116,7 @@ def get_job_metadata(self, execution_date, namespace, project, job) -> dict: namespace_name=namespace, project_name=project, job_name=job) - response = requests.get(url) + response = requests.get(url, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS) self._raise_error_if_request_failed(response) return response.json() @@ -129,7 +130,7 @@ def notify_event(self, project, namespace, job, event) -> dict: request_data = { "event": event } - response = requests.post(url, data=json.dumps(request_data)) + response = requests.post(url, data=json.dumps(request_data), timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS) self._raise_error_if_request_failed(response) return response.json() @@ -637,7 +638,7 @@ def __init__( def poke(self, context: 'Context') -> bool: self.log.info('Poking: %s', self.endpoint) - r = requests.get(url=self.endpoint, headers=self.headers, params=self.request_params) + r = requests.get(url=self.endpoint, headers=self.headers, params=self.request_params, timeout=OPTIMUS_REQUEST_TIMEOUT_IN_SECS)) if (r.status_code >= 200 and r.status_code <= 300): return True return False diff --git a/ext/scheduler/airflow/dag/compiler_test.go b/ext/scheduler/airflow/dag/compiler_test.go index 9e63cdae97..80aa5fc3a8 100644 --- a/ext/scheduler/airflow/dag/compiler_test.go +++ b/ext/scheduler/airflow/dag/compiler_test.go @@ -109,7 +109,7 @@ func setupJobDetails(tnnt tenant.Tenant) *scheduler.JobWithDetails { jobMeta := &scheduler.JobMetadata{ Version: 1, Owner: "infra-team@example.com", - Description: "This job collects the billing information related to infrastructure", + Description: "This job collects the billing information related to infrastructure.\nThis job will run in a weekly basis.", Labels: map[string]string{"orchestrator": "optimus"}, } diff --git a/ext/scheduler/airflow/dag/dag.py.tmpl b/ext/scheduler/airflow/dag/dag.py.tmpl index 3db92b61a5..ecf1bdac5c 100644 --- a/ext/scheduler/airflow/dag/dag.py.tmpl +++ b/ext/scheduler/airflow/dag/dag.py.tmpl @@ -59,7 +59,9 @@ default_args = { } {{ if ne .JobDetails.JobMetadata.Description "" -}} -# {{.JobDetails.JobMetadata.Description}} +""" +{{.JobDetails.JobMetadata.Description}} +""" {{- end }} dag = DAG( dag_id={{.JobDetails.Name.String | quote}}, diff --git a/ext/scheduler/airflow/dag/expected_dag.py b/ext/scheduler/airflow/dag/expected_dag.py index 09172ee30e..c09af54e68 100644 --- a/ext/scheduler/airflow/dag/expected_dag.py +++ b/ext/scheduler/airflow/dag/expected_dag.py @@ -50,7 +50,10 @@ "on_failure_callback": operator_failure_event, } -# This job collects the billing information related to infrastructure +""" +This job collects the billing information related to infrastructure. +This job will run in a weekly basis. +""" dag = DAG( dag_id="infra.billing.weekly-status-reports", default_args=default_args,