From 2c4165323a9b0e674389af78fe9f65279f4d0fb9 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Thu, 24 Oct 2024 09:23:31 -0700 Subject: [PATCH] Add error handling in various to_json() funcs. Break JSON write out from pickle write so that we maintain writing pickles if JSON fails --- .../runstate/dynamodb_state_store_test.py | 1 - tron/actioncommand.py | 22 +++-- tron/core/action.py | 61 ++++++++------ tron/core/actionrun.py | 82 +++++++++++-------- tron/core/job.py | 8 +- tron/core/jobrun.py | 31 ++++--- .../runstate/dynamodb_state_store.py | 32 ++++---- tron/utils/persistable.py | 3 +- 8 files changed, 142 insertions(+), 98 deletions(-) diff --git a/tests/serialize/runstate/dynamodb_state_store_test.py b/tests/serialize/runstate/dynamodb_state_store_test.py index 5a54e4cfe..a6d282455 100644 --- a/tests/serialize/runstate/dynamodb_state_store_test.py +++ b/tests/serialize/runstate/dynamodb_state_store_test.py @@ -125,7 +125,6 @@ def large_object(): } -# TODO: Add better test for to_json? @pytest.mark.usefixtures("store", "small_object", "large_object") class TestDynamoDBStateStore: def test_save(self, store, small_object, large_object): diff --git a/tron/actioncommand.py b/tron/actioncommand.py index 7a49fc1cf..cea3cfb85 100644 --- a/tron/actioncommand.py +++ b/tron/actioncommand.py @@ -3,6 +3,7 @@ import os from io import StringIO from shlex import quote +from typing import Optional from tron.config import schema from tron.serialize import filehandler @@ -203,13 +204,20 @@ def __ne__(self, other): return not self == other @staticmethod - def to_json(state_data: dict) -> str: - return json.dumps( - { - "status_path": state_data["status_path"], - "exec_path": state_data["exec_path"], - } - ) + def to_json(state_data: dict) -> Optional[str]: + try: + return json.dumps( + { + "status_path": state_data["status_path"], + "exec_path": state_data["exec_path"], + } + ) + except KeyError as e: + log.error(f"Missing key in state_data: {e}") + return None + except Exception as e: + log.error(f"Error serializing SubprocessActionRunnerFactory to JSON: {e}") + return None def create_action_runner_factory_from_config(config): diff --git a/tron/core/action.py b/tron/core/action.py index 67ec7ad62..6640c64ca 100644 --- a/tron/core/action.py +++ b/tron/core/action.py @@ -53,7 +53,7 @@ def copy(self): return ActionCommandConfig(**self.state_data) @staticmethod - def to_json(state_data: dict) -> str: + def to_json(state_data: dict) -> Optional[str]: """Serialize the ActionCommandConfig instance to a JSON string.""" def serialize_namedtuple(obj): @@ -61,31 +61,40 @@ def serialize_namedtuple(obj): return obj._asdict() return obj - return json.dumps( - { - "command": state_data["command"], - "cpus": state_data["cpus"], - "mem": state_data["mem"], - "disk": state_data["disk"], - "cap_add": state_data["cap_add"], - "cap_drop": state_data["cap_drop"], - "constraints": list(state_data["constraints"]), - "docker_image": state_data["docker_image"], - "docker_parameters": list(state_data["docker_parameters"]), - "env": state_data["env"], - "secret_env": state_data["secret_env"], - "secret_volumes": [serialize_namedtuple(volume) for volume in state_data["secret_volumes"]], - "projected_sa_volumes": [serialize_namedtuple(volume) for volume in state_data["projected_sa_volumes"]], - "field_selector_env": state_data["field_selector_env"], - "extra_volumes": list(state_data["extra_volumes"]), - "node_selectors": state_data["node_selectors"], - "node_affinities": [serialize_namedtuple(affinity) for affinity in state_data["node_affinities"]], - "labels": state_data["labels"], - "annotations": state_data["annotations"], - "service_account_name": state_data["service_account_name"], - "ports": state_data["ports"], - } - ) + try: + return json.dumps( + { + "command": state_data["command"], + "cpus": state_data["cpus"], + "mem": state_data["mem"], + "disk": state_data["disk"], + "cap_add": state_data["cap_add"], + "cap_drop": state_data["cap_drop"], + "constraints": list(state_data["constraints"]), + "docker_image": state_data["docker_image"], + "docker_parameters": list(state_data["docker_parameters"]), + "env": state_data["env"], + "secret_env": state_data["secret_env"], + "secret_volumes": [serialize_namedtuple(volume) for volume in state_data["secret_volumes"]], + "projected_sa_volumes": [ + serialize_namedtuple(volume) for volume in state_data["projected_sa_volumes"] + ], + "field_selector_env": state_data["field_selector_env"], + "extra_volumes": list(state_data["extra_volumes"]), + "node_selectors": state_data["node_selectors"], + "node_affinities": [serialize_namedtuple(affinity) for affinity in state_data["node_affinities"]], + "labels": state_data["labels"], + "annotations": state_data["annotations"], + "service_account_name": state_data["service_account_name"], + "ports": state_data["ports"], + } + ) + except KeyError as e: + log.error(f"Missing key in state_data: {e}") + return None + except Exception as e: + log.error(f"Error serializing ActionCommandConfig to JSON: {e}") + return None @dataclass diff --git a/tron/core/actionrun.py b/tron/core/actionrun.py index f3c06da5a..5fd9b926f 100644 --- a/tron/core/actionrun.py +++ b/tron/core/actionrun.py @@ -175,19 +175,26 @@ def state_data(self): return state_data @staticmethod - def to_json(state_data: dict) -> str: + def to_json(state_data: dict) -> Optional[str]: """Serialize the ActionRunAttempt instance to a JSON string.""" - return json.dumps( - { - "command_config": ActionCommandConfig.to_json(state_data["command_config"]), - "start_time": state_data["start_time"].isoformat() if state_data["start_time"] else None, - "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, - "rendered_command": state_data["rendered_command"], - "exit_status": state_data["exit_status"], - "mesos_task_id": state_data["mesos_task_id"], - "kubernetes_task_id": state_data["kubernetes_task_id"], - } - ) + try: + return json.dumps( + { + "command_config": ActionCommandConfig.to_json(state_data["command_config"]), + "start_time": state_data["start_time"].isoformat() if state_data["start_time"] else None, + "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, + "rendered_command": state_data["rendered_command"], + "exit_status": state_data["exit_status"], + "mesos_task_id": state_data["mesos_task_id"], + "kubernetes_task_id": state_data["kubernetes_task_id"], + } + ) + except KeyError as e: + log.error(f"Missing key in state_data: {e}") + return None + except Exception as e: + log.error(f"Error serializing ActionRunAttempt to JSON: {e}") + return None @classmethod def from_state(cls, state_data): @@ -731,7 +738,7 @@ def state_data(self): } @staticmethod - def to_json(state_data: dict) -> str: + def to_json(state_data: dict) -> Optional[str]: """Serialize the ActionRun instance to a JSON string.""" action_runner = state_data.get("action_runner") if action_runner is None: @@ -739,27 +746,34 @@ def to_json(state_data: dict) -> str: else: action_runner_json = SubprocessActionRunnerFactory.to_json(action_runner) - return json.dumps( - { - "job_run_id": state_data["job_run_id"], - "action_name": state_data["action_name"], - "state": state_data["state"], - "original_command": state_data["original_command"], - "start_time": state_data["start_time"].isoformat() if state_data["start_time"] else None, - "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, - "node_name": state_data["node_name"], - "exit_status": state_data["exit_status"], - "attempts": [ActionRunAttempt.to_json(attempt) for attempt in state_data["attempts"]], - "retries_remaining": state_data["retries_remaining"], - "retries_delay": state_data["retries_delay"], - "action_runner": action_runner_json, - "executor": state_data["executor"], - "trigger_downstreams": state_data["trigger_downstreams"], - "triggered_by": state_data["triggered_by"], - "on_upstream_rerun": state_data["on_upstream_rerun"], - "trigger_timeout_timestamp": state_data["trigger_timeout_timestamp"], - } - ) + try: + return json.dumps( + { + "job_run_id": state_data["job_run_id"], + "action_name": state_data["action_name"], + "state": state_data["state"], + "original_command": state_data["original_command"], + "start_time": state_data["start_time"].isoformat() if state_data["start_time"] else None, + "end_time": state_data["end_time"].isoformat() if state_data["end_time"] else None, + "node_name": state_data["node_name"], + "exit_status": state_data["exit_status"], + "attempts": [ActionRunAttempt.to_json(attempt) for attempt in state_data["attempts"]], + "retries_remaining": state_data["retries_remaining"], + "retries_delay": state_data["retries_delay"], + "action_runner": action_runner_json, + "executor": state_data["executor"], + "trigger_downstreams": state_data["trigger_downstreams"], + "triggered_by": state_data["triggered_by"], + "on_upstream_rerun": state_data["on_upstream_rerun"], + "trigger_timeout_timestamp": state_data["trigger_timeout_timestamp"], + } + ) + except KeyError as e: + log.error(f"Missing key in state_data: {e}") + return None + except Exception as e: + log.error(f"Error serializing ActionRun to JSON: {e}") + return None def render_template(self, template): """Render our configured command using the command context.""" diff --git a/tron/core/job.py b/tron/core/job.py index 93cb31226..d2a28e951 100644 --- a/tron/core/job.py +++ b/tron/core/job.py @@ -123,9 +123,13 @@ def __init__( log.info(f"{self} created") @staticmethod - def to_json(state_data: dict) -> str: + def to_json(state_data: dict) -> Optional[str]: """Serialize the Job instance to a JSON string.""" - return json.dumps(state_data) + try: + return json.dumps(state_data) + except Exception as e: + log.error(f"Error serializing Job to JSON: {e}") + return None @classmethod def from_config( diff --git a/tron/core/jobrun.py b/tron/core/jobrun.py index 4d3306d19..f3479b498 100644 --- a/tron/core/jobrun.py +++ b/tron/core/jobrun.py @@ -81,19 +81,26 @@ def __init__( self.context = command_context.build_context(self, base_context) @staticmethod - def to_json(state_data: dict) -> str: + def to_json(state_data: dict) -> Optional[str]: """Serialize the JobRun instance to a JSON string.""" - return json.dumps( - { - "job_name": state_data["job_name"], - "run_num": state_data["run_num"], - "run_time": state_data["run_time"].isoformat() if state_data["run_time"] else None, - "node_name": state_data["node_name"], - "runs": [ActionRun.to_json(run) for run in state_data["runs"]], - "cleanup_run": ActionRun.to_json(state_data["cleanup_run"]) if state_data["cleanup_run"] else None, - "manual": state_data["manual"], - } - ) + try: + return json.dumps( + { + "job_name": state_data["job_name"], + "run_num": state_data["run_num"], + "run_time": state_data["run_time"].isoformat() if state_data["run_time"] else None, + "node_name": state_data["node_name"], + "runs": [ActionRun.to_json(run) for run in state_data["runs"]], + "cleanup_run": ActionRun.to_json(state_data["cleanup_run"]) if state_data["cleanup_run"] else None, + "manual": state_data["manual"], + } + ) + except KeyError as e: + log.error(f"Missing key in state_data: {e}") + return None + except Exception as e: + log.error(f"Error serializing JobRun to JSON: {e}") + return None @property def id(self): diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 4244d4769..3d297611d 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -13,6 +13,7 @@ from typing import Dict from typing import List from typing import Literal +from typing import Optional from typing import Sequence from typing import Tuple from typing import TypeVar @@ -161,11 +162,12 @@ def save(self, key_value_pairs) -> None: self.save_queue[key] = (val, None) else: state_type = self.get_type_from_key(key) - json_val = self._serialize_item(state_type, val) - self.save_queue[key] = ( - val, - json_val, - ) + try: + json_val = self._serialize_item(state_type, val) + except Exception as e: + log.error(f"Failed to serialize JSON for key {key}: {e}") + json_val = None # Proceed without JSON if serialization fails + self.save_queue[key] = (val, json_val) break def _consume_save_queue(self): @@ -201,7 +203,7 @@ def get_type_from_key(self, key: str) -> str: return key.split()[0] # TODO: TRON-2305 - In an ideal world, we wouldn't be passing around state/state_data dicts. It would be a lot nicer to have regular objects here - def _serialize_item(self, key: Literal[runstate.JOB_STATE, runstate.JOB_RUN_STATE], state: Dict[str, Any]) -> str: # type: ignore + def _serialize_item(self, key: Literal[runstate.JOB_STATE, runstate.JOB_RUN_STATE], state: Dict[str, Any]) -> Optional[str]: # type: ignore if key == runstate.JOB_STATE: return Job.to_json(state) elif key == runstate.JOB_RUN_STATE: @@ -239,11 +241,9 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: pickled_val, json_val = value num_partitions = math.ceil(len(pickled_val) / OBJECT_SIZE) - num_json_val_partitions = math.ceil(len(json_val) / OBJECT_SIZE) + num_json_val_partitions = math.ceil(len(json_val) / OBJECT_SIZE) if json_val else 0 items = [] - # Use the maximum number of partitions (JSON can be larger - # than pickled value so this makes sure we save the entire item) max_partitions = max(num_partitions, num_json_val_partitions) for index in range(max_partitions): item = { @@ -263,17 +263,19 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: "num_partitions": { "N": str(num_partitions), }, - "json_val": { - "S": json_val[index * OBJECT_SIZE : min(index * OBJECT_SIZE + OBJECT_SIZE, len(json_val))] - }, - "num_json_val_partitions": { - "N": str(num_json_val_partitions), - }, }, "TableName": self.name, }, } + if json_val: + item["Put"]["Item"]["json_val"] = { + "S": json_val[index * OBJECT_SIZE : min(index * OBJECT_SIZE + OBJECT_SIZE, len(json_val))] + } + item["Put"]["Item"]["num_json_val_partitions"] = { + "N": str(num_json_val_partitions), + } + count = 0 items.append(item) diff --git a/tron/utils/persistable.py b/tron/utils/persistable.py index 04c7437cc..620956a2a 100644 --- a/tron/utils/persistable.py +++ b/tron/utils/persistable.py @@ -2,10 +2,11 @@ from abc import abstractmethod from typing import Any from typing import Dict +from typing import Optional class Persistable(ABC): @staticmethod @abstractmethod - def to_json(state_data: Dict[Any, Any]) -> str: + def to_json(state_data: Dict[Any, Any]) -> Optional[str]: pass