From e382156279f9bcf01f499a814a2512f511c8f7d5 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Wed, 15 Jan 2025 08:54:23 -0800 Subject: [PATCH 1/8] Add dynamodb retry config for throttling and other errors. Add exponential backoff and jitter for unprocessed keys. Fix edge case where we succesfully process keys on our last attempt but still fail --- .../runstate/dynamodb_state_store.py | 85 ++++++++++++------- 1 file changed, 55 insertions(+), 30 deletions(-) diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 35ea52da3..6fbac8d10 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -4,6 +4,7 @@ import math import os import pickle +import random import threading import time from collections import defaultdict @@ -20,6 +21,7 @@ from typing import TypeVar import boto3 # type: ignore +from botocore.config import Config import tron.prom_metrics as prom_metrics from tron.core.job import Job @@ -35,7 +37,10 @@ # to contain other attributes like object name and number of partitions. OBJECT_SIZE = 200_000 # TODO: TRON-2240 - consider swapping back to 400_000 now that we've removed pickles MAX_SAVE_QUEUE = 500 -MAX_ATTEMPTS = 10 +# This is distinct from the number of retries in the retry_config as this is used for handling unprocessed +# keys outside the bounds of something like retrying on a ThrottlingException. We need this limit to avoid +# infinite loops in the case where a key is truly unprocessable. +MAX_UNPROCESSED_KEYS_RETRIES = 10 MAX_TRANSACT_WRITE_ITEMS = 100 log = logging.getLogger(__name__) T = TypeVar("T") @@ -43,8 +48,22 @@ class DynamoDBStateStore: def __init__(self, name, dynamodb_region, stopping=False) -> None: - self.dynamodb = boto3.resource("dynamodb", region_name=dynamodb_region) - self.client = boto3.client("dynamodb", region_name=dynamodb_region) + # Standard mode includes an exponential backoff by a base factor of 2 for a + # maximum backoff time of 20 seconds (min(b*r^i, MAX_BACKOFF) where b is a + # random number between 0 and 1 and r is the base factor of 2). This might + # look like: + # + # seconds_to_sleep = min(1 × 2^1, 20) = min(2, 20) = 2 seconds + # + # By our 5th retry (2^5 is 32) we will be sleeping *up to* 20 seconds, depending + # on the random jitter. + # + # It handles transient errors like RequestTimeout and ConnectionError, as well + # as Service-side errors like Throttling, SlowDown, and LimitExceeded. + retry_config = Config(retries={"max_attempts": 5, "mode": "standard"}) + + self.dynamodb = boto3.resource("dynamodb", region_name=dynamodb_region, config=retry_config) + self.client = boto3.client("dynamodb", region_name=dynamodb_region, config=retry_config) self.name = name self.dynamodb_region = dynamodb_region self.table = self.dynamodb.Table(name) @@ -63,11 +82,11 @@ def build_key(self, type, iden) -> str: def restore(self, keys, read_json: bool = False) -> dict: """ - Fetch all under the same parition key(s). + Fetch all under the same partition key(s). ret: """ # format of the keys always passed here is - # job_state job_name --> high level info about the job: enabled, run_nums + # job_state job_name --> high level info about the job: enabled, run_nums # job_run_state job_run_name --> high level info about the job run first_items = self._get_first_partitions(keys) remaining_items = self._get_remaining_partitions(first_items, read_json) @@ -87,8 +106,11 @@ def _get_items(self, table_keys: list) -> object: items = [] # let's avoid potentially mutating our input :) cand_keys_list = copy.copy(table_keys) - attempts_to_retrieve_keys = 0 - while len(cand_keys_list) != 0: + attempts = 0 + base_delay = 0.5 + max_delay = 10 + + while len(cand_keys_list) != 0 and attempts < MAX_UNPROCESSED_KEYS_RETRIES: with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: responses = [ executor.submit( @@ -106,20 +128,33 @@ def _get_items(self, table_keys: list) -> object: cand_keys_list = [] for resp in concurrent.futures.as_completed(responses): try: - items.extend(resp.result()["Responses"][self.name]) - # add any potential unprocessed keys to the thread pool - if resp.result()["UnprocessedKeys"].get(self.name) and attempts_to_retrieve_keys < MAX_ATTEMPTS: - cand_keys_list.extend(resp.result()["UnprocessedKeys"][self.name]["Keys"]) - elif attempts_to_retrieve_keys >= MAX_ATTEMPTS: - failed_keys = resp.result()["UnprocessedKeys"][self.name]["Keys"] - error = Exception( - f"tron_dynamodb_restore_failure: failed to retrieve items with keys \n{failed_keys}\n from dynamodb\n{resp.result()}" - ) - raise error + result = resp.result() + items.extend(result.get("Responses", {}).get(self.name, [])) + + # If DynamoDB returns unprocessed keys, we need to collect them and retry + unprocessed_keys = result.get("UnprocessedKeys", {}).get(self.name, {}).get("Keys", []) + if unprocessed_keys: + cand_keys_list.extend(unprocessed_keys) except Exception as e: log.exception("Encountered issues retrieving data from DynamoDB") raise e - attempts_to_retrieve_keys += 1 + if cand_keys_list: + attempts += 1 + # Exponential backoff for retrying unprocessed keys + exponential_delay = min(base_delay * (2 ** (attempts - 1)), max_delay) + # Full jitter (i.e. from 0 to exponential_delay) will help minimize the number and length of calls + jitter = random.uniform(0, exponential_delay) + delay = jitter + log.warning( + f"Attempt {attempts}/{MAX_UNPROCESSED_KEYS_RETRIES} - Retrying {len(cand_keys_list)} unprocessed keys after {delay:.2f}s delay." + ) + time.sleep(delay) + if cand_keys_list: + error = Exception( + f"tron_dynamodb_restore_failure: failed to retrieve items with keys \n{cand_keys_list}\n from dynamodb after {MAX_UNPROCESSED_KEYS_RETRIES} retries." + ) + log.error(repr(error)) + raise error return items def _get_first_partitions(self, keys: list): @@ -337,25 +372,15 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: "N": str(num_json_val_partitions), } - count = 0 items.append(item) while len(items) == MAX_TRANSACT_WRITE_ITEMS or index == max_partitions - 1: try: self.client.transact_write_items(TransactItems=items) items = [] - break # exit the while loop on successful writing except Exception as e: - count += 1 - if count > 3: - timer( - name="tron.dynamodb.setitem", - delta=time.time() - start, - ) - log.error(f"Failed to save partition for key: {key}, error: {repr(e)}") - raise e - else: - log.warning(f"Got error while saving {key}, trying again: {repr(e)}") + log.error(f"Failed to save partition for key: {key}, error: {repr(e)}") + raise e timer( name="tron.dynamodb.setitem", delta=time.time() - start, From 3e74d75288ebc38d1253e814ce622a3f11f89287 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Mon, 20 Jan 2025 07:41:30 -0800 Subject: [PATCH 2/8] Ignore botocore --- tron/serialize/runstate/dynamodb_state_store.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 6fbac8d10..929a7249b 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -21,7 +21,7 @@ from typing import TypeVar import boto3 # type: ignore -from botocore.config import Config +from botocore.config import Config # type: ignore import tron.prom_metrics as prom_metrics from tron.core.job import Job From f88d7f9e7746c66989b834d0ce814bb4b3205b75 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Mon, 20 Jan 2025 08:24:27 -0800 Subject: [PATCH 3/8] Fix setitem loop now that we use Dynamo retry config. Update number for retries in test assertion --- tests/serialize/runstate/dynamodb_state_store_test.py | 2 +- tron/serialize/runstate/dynamodb_state_store.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/serialize/runstate/dynamodb_state_store_test.py b/tests/serialize/runstate/dynamodb_state_store_test.py index 687b331bd..d2cbd3475 100644 --- a/tests/serialize/runstate/dynamodb_state_store_test.py +++ b/tests/serialize/runstate/dynamodb_state_store_test.py @@ -345,7 +345,7 @@ def test_retry_reading(self, store, small_object, large_object): ): store.restore(keys) except Exception: - assert_equal(mock_failed_read.call_count, 11) + assert_equal(mock_failed_read.call_count, 10) def test_restore_exception_propagation(self, store, small_object): # This test is to ensure that restore propagates exceptions upwards: see DAR-2328 diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 929a7249b..6d69afe08 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -374,7 +374,9 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: items.append(item) - while len(items) == MAX_TRANSACT_WRITE_ITEMS or index == max_partitions - 1: + # We want to write the items when we've either reached the max number of items + # for a transaction, or when we're done processing all partitions + if len(items) == MAX_TRANSACT_WRITE_ITEMS or index == max_partitions - 1: try: self.client.transact_write_items(TransactItems=items) items = [] From 6902ab0cf5ca798085c3631c1089ef392afe0260 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Mon, 20 Jan 2025 08:44:21 -0800 Subject: [PATCH 4/8] Add timer back to exception block to capture failures --- tron/serialize/runstate/dynamodb_state_store.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 6d69afe08..fd0234346 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -381,6 +381,10 @@ def __setitem__(self, key: str, value: Tuple[bytes, str]) -> None: self.client.transact_write_items(TransactItems=items) items = [] except Exception as e: + timer( + name="tron.dynamodb.setitem", + delta=time.time() - start, + ) log.error(f"Failed to save partition for key: {key}, error: {repr(e)}") raise e timer( From b4e423d177fe7d0a4d6628d262349cdc9ee1ee4c Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Mon, 20 Jan 2025 11:13:10 -0800 Subject: [PATCH 5/8] Add unit of measurement to base_delay and max_delay. Expand the retry_reading test --- .../runstate/dynamodb_state_store_test.py | 58 ++++++++++++------- .../runstate/dynamodb_state_store.py | 6 +- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/tests/serialize/runstate/dynamodb_state_store_test.py b/tests/serialize/runstate/dynamodb_state_store_test.py index d2cbd3475..e727eb98d 100644 --- a/tests/serialize/runstate/dynamodb_state_store_test.py +++ b/tests/serialize/runstate/dynamodb_state_store_test.py @@ -8,7 +8,9 @@ from moto.dynamodb2.responses import dynamo_json_dump from testifycompat import assert_equal +from testifycompat.assertions import assert_in from tron.serialize.runstate.dynamodb_state_store import DynamoDBStateStore +from tron.serialize.runstate.dynamodb_state_store import MAX_UNPROCESSED_KEYS_RETRIES def mock_transact_write_items(self): @@ -294,7 +296,8 @@ def test_delete_item_with_json_partitions(self, store, small_object, large_objec vals = store.restore([key]) assert key not in vals - def test_retry_saving(self, store, small_object, large_object): + @mock.patch("time.sleep", return_value=None) + def test_retry_saving(self, mock_sleep, store, small_object, large_object): with mock.patch( "moto.dynamodb2.responses.DynamoHandler.transact_write_items", side_effect=KeyError("foo"), @@ -307,45 +310,56 @@ def test_retry_saving(self, store, small_object, large_object): except Exception: assert_equal(mock_failed_write.call_count, 3) - def test_retry_reading(self, store, small_object, large_object): + @mock.patch("time.sleep") + @mock.patch("random.uniform") + def test_retry_reading(self, mock_random_uniform, mock_sleep, store, small_object, large_object): unprocessed_value = { - "Responses": { - store.name: [ - { - "index": {"N": "0"}, - "key": {"S": "job_state 0"}, - }, - ], - }, + "Responses": {}, "UnprocessedKeys": { store.name: { - "ConsistentRead": True, "Keys": [ { - "index": {"N": "0"}, "key": {"S": "job_state 0"}, + "index": {"N": "0"}, } ], - }, + "ConsistentRead": True, + } }, - "ResponseMetadata": {}, } keys = [store.build_key("job_state", i) for i in range(1)] value = small_object - pairs = zip(keys, (value for i in range(len(keys)))) + pairs = zip(keys, [value] * len(keys)) store.save(pairs) + store._consume_save_queue() + + # Mock random.uniform to return the upper limit of the range so that we are simulating max jitter + def side_effect_random_uniform(a, b): + return b + + mock_random_uniform.side_effect = side_effect_random_uniform + with mock.patch.object( store.client, "batch_get_item", return_value=unprocessed_value, ) as mock_failed_read: - try: - with mock.patch("tron.config.static_config.load_yaml_file", autospec=True), mock.patch( - "tron.config.static_config.build_configuration_watcher", autospec=True - ): - store.restore(keys) - except Exception: - assert_equal(mock_failed_read.call_count, 10) + with pytest.raises(Exception) as exec_info, mock.patch( + "tron.config.static_config.load_yaml_file", autospec=True + ), mock.patch("tron.config.static_config.build_configuration_watcher", autospec=True): + store.restore(keys) + assert_in("failed to retrieve items with keys", str(exec_info.value)) + assert_equal(mock_failed_read.call_count, MAX_UNPROCESSED_KEYS_RETRIES) + + # We also need to verify that sleep was called with expected delays + expected_delays = [] + base_delay_seconds = 0.5 + max_delay_seconds = 10 + for attempt in range(1, MAX_UNPROCESSED_KEYS_RETRIES + 1): + expected_delay = min(base_delay_seconds * (2 ** (attempt - 1)), max_delay_seconds) + expected_delays.append(expected_delay) + actual_delays = [call.args[0] for call in mock_sleep.call_args_list] + assert_equal(actual_delays, expected_delays) def test_restore_exception_propagation(self, store, small_object): # This test is to ensure that restore propagates exceptions upwards: see DAR-2328 diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index fd0234346..2eec708b2 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -107,8 +107,8 @@ def _get_items(self, table_keys: list) -> object: # let's avoid potentially mutating our input :) cand_keys_list = copy.copy(table_keys) attempts = 0 - base_delay = 0.5 - max_delay = 10 + base_delay_seconds = 0.5 + max_delay_seconds = 10 while len(cand_keys_list) != 0 and attempts < MAX_UNPROCESSED_KEYS_RETRIES: with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: @@ -141,7 +141,7 @@ def _get_items(self, table_keys: list) -> object: if cand_keys_list: attempts += 1 # Exponential backoff for retrying unprocessed keys - exponential_delay = min(base_delay * (2 ** (attempts - 1)), max_delay) + exponential_delay = min(base_delay_seconds * (2 ** (attempts - 1)), max_delay_seconds) # Full jitter (i.e. from 0 to exponential_delay) will help minimize the number and length of calls jitter = random.uniform(0, exponential_delay) delay = jitter From b0e890b7f02c62789cfd997d46068b9a31f4c8b2 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Wed, 22 Jan 2025 09:03:45 -0800 Subject: [PATCH 6/8] Move backoff logic for UnprocessedKeys to new function. Remove jitter for UnprocessedKeys retry --- .../serialize/runstate/dynamodb_state_store.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index 2eec708b2..bfc9f8592 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -4,7 +4,6 @@ import math import os import pickle -import random import threading import time from collections import defaultdict @@ -102,13 +101,17 @@ def chunk_keys(self, keys: Sequence[T]) -> List[Sequence[T]]: cand_keys_chunks.append(keys[i : min(len(keys), i + 100)]) return cand_keys_chunks + def _calculate_backoff_delay(self, attempt: int) -> float: + base_delay_seconds = 0.5 + max_delay_seconds = 10 + delay = min(base_delay_seconds * (2 ** (attempt - 1)), max_delay_seconds) + return delay + def _get_items(self, table_keys: list) -> object: items = [] # let's avoid potentially mutating our input :) cand_keys_list = copy.copy(table_keys) attempts = 0 - base_delay_seconds = 0.5 - max_delay_seconds = 10 while len(cand_keys_list) != 0 and attempts < MAX_UNPROCESSED_KEYS_RETRIES: with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: @@ -140,13 +143,10 @@ def _get_items(self, table_keys: list) -> object: raise e if cand_keys_list: attempts += 1 - # Exponential backoff for retrying unprocessed keys - exponential_delay = min(base_delay_seconds * (2 ** (attempts - 1)), max_delay_seconds) - # Full jitter (i.e. from 0 to exponential_delay) will help minimize the number and length of calls - jitter = random.uniform(0, exponential_delay) - delay = jitter + delay = self._calculate_backoff_delay(attempts) log.warning( - f"Attempt {attempts}/{MAX_UNPROCESSED_KEYS_RETRIES} - Retrying {len(cand_keys_list)} unprocessed keys after {delay:.2f}s delay." + f"Attempt {attempts}/{MAX_UNPROCESSED_KEYS_RETRIES} - " + f"Retrying {len(cand_keys_list)} unprocessed keys after {delay:.2f}s delay." ) time.sleep(delay) if cand_keys_list: From de2e2f93faf753db7088ccc909d1f5d6604516e1 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Wed, 22 Jan 2025 09:06:14 -0800 Subject: [PATCH 7/8] Add test for backoff, Update test_retry_saving to actually test assertions and requeue of failed items. Remove backoff from test_retry_reading --- .../runstate/dynamodb_state_store_test.py | 111 ++++++++++-------- 1 file changed, 60 insertions(+), 51 deletions(-) diff --git a/tests/serialize/runstate/dynamodb_state_store_test.py b/tests/serialize/runstate/dynamodb_state_store_test.py index e727eb98d..9ef083659 100644 --- a/tests/serialize/runstate/dynamodb_state_store_test.py +++ b/tests/serialize/runstate/dynamodb_state_store_test.py @@ -8,7 +8,6 @@ from moto.dynamodb2.responses import dynamo_json_dump from testifycompat import assert_equal -from testifycompat.assertions import assert_in from tron.serialize.runstate.dynamodb_state_store import DynamoDBStateStore from tron.serialize.runstate.dynamodb_state_store import MAX_UNPROCESSED_KEYS_RETRIES @@ -296,70 +295,80 @@ def test_delete_item_with_json_partitions(self, store, small_object, large_objec vals = store.restore([key]) assert key not in vals - @mock.patch("time.sleep", return_value=None) - def test_retry_saving(self, mock_sleep, store, small_object, large_object): - with mock.patch( - "moto.dynamodb2.responses.DynamoHandler.transact_write_items", - side_effect=KeyError("foo"), - ) as mock_failed_write: - keys = [store.build_key("job_state", i) for i in range(1)] - value = small_object - pairs = zip(keys, (value for i in range(len(keys)))) - try: - store.save(pairs) - except Exception: - assert_equal(mock_failed_write.call_count, 3) - - @mock.patch("time.sleep") - @mock.patch("random.uniform") - def test_retry_reading(self, mock_random_uniform, mock_sleep, store, small_object, large_object): + @pytest.mark.parametrize( + "test_object, side_effects, expected_save_errors, expected_queue_length", + [ + # All attempts fail + ("small_object", [KeyError("foo")] * 3, 3, 1), + ("large_object", [KeyError("foo")] * 3, 3, 1), + # Failure followed by success + ("small_object", [KeyError("foo"), {}], 0, 0), + ("large_object", [KeyError("foo"), {}], 0, 0), + ], + ) + def test_retry_saving( + self, test_object, side_effects, expected_save_errors, expected_queue_length, store, small_object, large_object + ): + object_mapping = { + "small_object": small_object, + "large_object": large_object, + } + value = object_mapping[test_object] + + with mock.patch.object( + store.client, + "transact_write_items", + side_effect=side_effects, + ) as mock_transact_write: + keys = [store.build_key("job_state", 0)] + pairs = zip(keys, [value]) + store.save(pairs) + + for _ in side_effects: + store._consume_save_queue() + + assert mock_transact_write.call_count == len(side_effects) + assert store.save_errors == expected_save_errors + assert len(store.save_queue) == expected_queue_length + + @pytest.mark.parametrize( + "attempt, expected_delay", + [ + (1, 0.5), + (2, 1.0), + (3, 2.0), + (4, 4.0), + (5, 8.0), + (6, 10.0), + (7, 10.0), + ], + ) + def test_calculate_backoff_delay(self, store, attempt, expected_delay): + delay = store._calculate_backoff_delay(attempt) + assert_equal(delay, expected_delay) + + def test_retry_reading(self, store): unprocessed_value = { "Responses": {}, "UnprocessedKeys": { store.name: { - "Keys": [ - { - "key": {"S": "job_state 0"}, - "index": {"N": "0"}, - } - ], + "Keys": [{"key": {"S": store.build_key("job_state", 0)}, "index": {"N": "0"}}], "ConsistentRead": True, } }, } - keys = [store.build_key("job_state", i) for i in range(1)] - value = small_object - pairs = zip(keys, [value] * len(keys)) - store.save(pairs) - store._consume_save_queue() - - # Mock random.uniform to return the upper limit of the range so that we are simulating max jitter - def side_effect_random_uniform(a, b): - return b - mock_random_uniform.side_effect = side_effect_random_uniform + keys = [store.build_key("job_state", 0)] with mock.patch.object( store.client, "batch_get_item", return_value=unprocessed_value, - ) as mock_failed_read: - with pytest.raises(Exception) as exec_info, mock.patch( - "tron.config.static_config.load_yaml_file", autospec=True - ), mock.patch("tron.config.static_config.build_configuration_watcher", autospec=True): - store.restore(keys) - assert_in("failed to retrieve items with keys", str(exec_info.value)) - assert_equal(mock_failed_read.call_count, MAX_UNPROCESSED_KEYS_RETRIES) - - # We also need to verify that sleep was called with expected delays - expected_delays = [] - base_delay_seconds = 0.5 - max_delay_seconds = 10 - for attempt in range(1, MAX_UNPROCESSED_KEYS_RETRIES + 1): - expected_delay = min(base_delay_seconds * (2 ** (attempt - 1)), max_delay_seconds) - expected_delays.append(expected_delay) - actual_delays = [call.args[0] for call in mock_sleep.call_args_list] - assert_equal(actual_delays, expected_delays) + ) as mock_batch_get_item, mock.patch("time.sleep") as mock_sleep, pytest.raises(Exception) as exec_info: + store.restore(keys) + assert "failed to retrieve items with keys" in str(exec_info.value) + assert mock_batch_get_item.call_count == MAX_UNPROCESSED_KEYS_RETRIES + assert mock_sleep.call_count == MAX_UNPROCESSED_KEYS_RETRIES def test_restore_exception_propagation(self, store, small_object): # This test is to ensure that restore propagates exceptions upwards: see DAR-2328 From be7e063a06a8efc763df7199903778ecff478925 Mon Sep 17 00:00:00 2001 From: Kevin Kaspari Date: Fri, 24 Jan 2025 09:46:00 -0800 Subject: [PATCH 8/8] Catch ClientError and log ResponseMetadata. Make mypy happy about _calculate_backoff_delay --- .../runstate/dynamodb_state_store_test.py | 14 +++++++------- .../serialize/runstate/dynamodb_state_store.py | 18 +++++++++++------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/tests/serialize/runstate/dynamodb_state_store_test.py b/tests/serialize/runstate/dynamodb_state_store_test.py index 9ef083659..e7ad7a1ec 100644 --- a/tests/serialize/runstate/dynamodb_state_store_test.py +++ b/tests/serialize/runstate/dynamodb_state_store_test.py @@ -334,13 +334,13 @@ def test_retry_saving( @pytest.mark.parametrize( "attempt, expected_delay", [ - (1, 0.5), - (2, 1.0), - (3, 2.0), - (4, 4.0), - (5, 8.0), - (6, 10.0), - (7, 10.0), + (1, 1), + (2, 2), + (3, 4), + (4, 8), + (5, 10), + (6, 10), + (7, 10), ], ) def test_calculate_backoff_delay(self, store, attempt, expected_delay): diff --git a/tron/serialize/runstate/dynamodb_state_store.py b/tron/serialize/runstate/dynamodb_state_store.py index bfc9f8592..ffaee7e47 100644 --- a/tron/serialize/runstate/dynamodb_state_store.py +++ b/tron/serialize/runstate/dynamodb_state_store.py @@ -20,6 +20,7 @@ from typing import TypeVar import boto3 # type: ignore +import botocore # type: ignore from botocore.config import Config # type: ignore import tron.prom_metrics as prom_metrics @@ -101,10 +102,10 @@ def chunk_keys(self, keys: Sequence[T]) -> List[Sequence[T]]: cand_keys_chunks.append(keys[i : min(len(keys), i + 100)]) return cand_keys_chunks - def _calculate_backoff_delay(self, attempt: int) -> float: - base_delay_seconds = 0.5 + def _calculate_backoff_delay(self, attempt: int) -> int: + base_delay_seconds = 1 max_delay_seconds = 10 - delay = min(base_delay_seconds * (2 ** (attempt - 1)), max_delay_seconds) + delay: int = min(base_delay_seconds * (2 ** (attempt - 1)), max_delay_seconds) return delay def _get_items(self, table_keys: list) -> object: @@ -138,22 +139,25 @@ def _get_items(self, table_keys: list) -> object: unprocessed_keys = result.get("UnprocessedKeys", {}).get(self.name, {}).get("Keys", []) if unprocessed_keys: cand_keys_list.extend(unprocessed_keys) - except Exception as e: + except botocore.exceptions.ClientError as e: + log.exception(f"ClientError during batch_get_item: {e.response}") + raise + except Exception: log.exception("Encountered issues retrieving data from DynamoDB") - raise e + raise if cand_keys_list: attempts += 1 delay = self._calculate_backoff_delay(attempts) log.warning( f"Attempt {attempts}/{MAX_UNPROCESSED_KEYS_RETRIES} - " - f"Retrying {len(cand_keys_list)} unprocessed keys after {delay:.2f}s delay." + f"Retrying {len(cand_keys_list)} unprocessed keys after {delay}s delay." ) time.sleep(delay) if cand_keys_list: error = Exception( f"tron_dynamodb_restore_failure: failed to retrieve items with keys \n{cand_keys_list}\n from dynamodb after {MAX_UNPROCESSED_KEYS_RETRIES} retries." ) - log.error(repr(error)) + log.error(error) raise error return items