-
Notifications
You must be signed in to change notification settings - Fork 7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Modify compound ingest operation to wait for table build completion #970
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
__version__ = "3.10.0" | ||
__version__ = "3.11.0" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,9 @@ | ||
from gemd.enumeration.base_enumeration import BaseEnumeration | ||
from logging import getLogger | ||
from time import time, sleep | ||
from typing import Union | ||
from uuid import UUID | ||
from warnings import warn | ||
|
||
from citrine._rest.resource import Resource | ||
from citrine._serialization.properties import Set as PropertySet, String, Object | ||
|
@@ -23,6 +25,16 @@ class JobSubmissionResponse(Resource['JobSubmissionResponse']): | |
""":UUID: job id of the job submission request""" | ||
|
||
|
||
class JobStatus(BaseEnumeration): | ||
"""The valid status codes for a job.""" | ||
|
||
SUBMITTED = "Submitted" | ||
PENDING = "Pending" | ||
RUNNING = "Running" | ||
SUCCESS = "Success" | ||
FAILURE = "Failure" | ||
|
||
|
||
class TaskNode(Resource['TaskNode']): | ||
"""Individual task status. | ||
|
||
|
@@ -33,14 +45,29 @@ class TaskNode(Resource['TaskNode']): | |
""":str: unique identification number for the job task""" | ||
task_type = properties.String("task_type") | ||
""":str: the type of task running""" | ||
status = properties.String("status") | ||
""":str: The last reported status of this particular task. | ||
One of "Submitted", "Pending", "Running", "Success", or "Failure".""" | ||
_status = properties.String("status") | ||
dependencies = PropertySet(String(), "dependencies") | ||
""":Set[str]: all the tasks that this task is dependent on""" | ||
failure_reason = properties.Optional(String(), "failure_reason") | ||
""":str: if a task has failed, the failure reason will be in this parameter""" | ||
|
||
@property | ||
def status(self) -> Union[JobStatus, str]: | ||
"""The last reported status of this particular task.""" | ||
if resolved := JobStatus.from_str(self._status, exception=False): | ||
return resolved | ||
else: | ||
return self._status | ||
|
||
@status.setter | ||
def status(self, value: Union[JobStatus, str]) -> None: | ||
if JobStatus.from_str(value, exception=False) is None: | ||
warn( | ||
f"{value} is not a recognized JobStatus; this will become an error as of v4.0.0.", | ||
DeprecationWarning | ||
) | ||
self._status = value | ||
|
||
|
||
class JobStatusResponse(Resource['JobStatusResponse']): | ||
"""A response to a job status check. | ||
|
@@ -50,13 +77,37 @@ class JobStatusResponse(Resource['JobStatusResponse']): | |
|
||
job_type = properties.String("job_type") | ||
""":str: the type of job for this status report""" | ||
status = properties.String("status") | ||
_status = properties.String("status") | ||
""":str: The status of the job. One of "Running", "Success", or "Failure".""" | ||
tasks = properties.List(Object(TaskNode), "tasks") | ||
""":List[TaskNode]: all of the constituent task required to complete this job""" | ||
output = properties.Optional(properties.Mapping(String, String), 'output') | ||
""":Optional[dict[str, str]]: job output properties and results""" | ||
|
||
@property | ||
def status(self) -> Union[JobStatus, str]: | ||
"""The last reported status of this particular task.""" | ||
if resolved := JobStatus.from_str(self._status, exception=False): | ||
return resolved | ||
else: | ||
return self._status | ||
|
||
@status.setter | ||
def status(self, value: Union[JobStatus, str]) -> None: | ||
if resolved := JobStatus.from_str(value, exception=False): | ||
if resolved not in [JobStatus.RUNNING, JobStatus.SUCCESS, JobStatus.FAILURE]: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't this support any valid There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see. That's strange, that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's what's documented. This ought to just be a read only object, so maybe we don't care about the fact that the far end will only ever return a subset. |
||
warn( | ||
f"{value} is not a valid JobStatus for a JobStatusResponse; " | ||
f"this will become an error as of v4.0.0.", | ||
DeprecationWarning | ||
) | ||
else: | ||
warn( | ||
f"{value} is not a recognized JobStatus; this will become an error as of v4.0.0.", | ||
DeprecationWarning | ||
) | ||
self._status = value | ||
|
||
|
||
def _poll_for_job_completion(session: Session, | ||
job: Union[JobSubmissionResponse, UUID, str], | ||
|
@@ -102,7 +153,7 @@ def _poll_for_job_completion(session: Session, | |
while True: | ||
response = session.get_resource(path=path, params=params) | ||
status: JobStatusResponse = JobStatusResponse.build(response) | ||
if status.status in ['Success', 'Failure']: | ||
if status.status in [JobStatus.SUCCESS, JobStatus.FAILURE]: | ||
break | ||
elif time() - start_time < timeout: | ||
logger.info( | ||
|
@@ -115,12 +166,12 @@ def _poll_for_job_completion(session: Session, | |
f'Note job on server is unaffected by this timeout.') | ||
logger.debug('Last status: {}'.format(status.dump())) | ||
raise PollingTimeoutError('Job {} timed out.'.format(job_id)) | ||
if status.status == 'Failure': | ||
if status.status == JobStatus.FAILURE: | ||
logger.debug(f'Job terminated with Failure status: {status.dump()}') | ||
if raise_errors: | ||
failure_reasons = [] | ||
for task in status.tasks: | ||
if task.status == 'Failure': | ||
if task.status == JobStatus.FAILURE: | ||
logger.error(f'Task {task.id} failed with reason "{task.failure_reason}"') | ||
failure_reasons.append(task.failure_reason) | ||
raise JobFailureError( | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
from citrine.jobs.job import JobStatus, JobStatusResponse, TaskNode | ||
import pytest | ||
import warnings | ||
|
||
from tests.utils.factories import TaskNodeDataFactory, JobStatusResponseDataFactory | ||
|
||
def test_status_response_status(): | ||
status_response = JobStatusResponse.build(JobStatusResponseDataFactory(failure=True)) | ||
assert status_response.status == JobStatus.FAILURE | ||
|
||
with pytest.deprecated_call(): | ||
status_response.status = 'Failed' | ||
with warnings.catch_warnings(): | ||
warnings.simplefilter("error") | ||
assert not isinstance(status_response.status, JobStatus) | ||
|
||
with pytest.deprecated_call(): | ||
status_response.status = JobStatus.PENDING | ||
with warnings.catch_warnings(): | ||
warnings.simplefilter("error") | ||
assert status_response.status == JobStatus.PENDING | ||
|
||
with warnings.catch_warnings(): | ||
warnings.simplefilter("error") | ||
status_response.status = JobStatus.SUCCESS | ||
assert status_response.status == JobStatus.SUCCESS | ||
|
||
def test_task_node_status(): | ||
status_response = TaskNode.build(TaskNodeDataFactory(failure=True)) | ||
assert status_response.status == JobStatus.FAILURE | ||
|
||
with pytest.deprecated_call(): | ||
status_response.status = 'Failed' | ||
assert not isinstance(status_response.status, JobStatus) | ||
|
||
with warnings.catch_warnings(): | ||
warnings.simplefilter("error") | ||
status_response.status = JobStatus.SUCCESS | ||
assert status_response.status == JobStatus.SUCCESS |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since
BaseEnumeration
hasstr
as a superclass, returning aJobStatus
would not lead to a change in behavior, so this is not an API break.