From d23747861ec5fa8b9ec63bb2e43d73fb32febf2b Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Tue, 27 Feb 2024 12:09:47 +0100 Subject: [PATCH 01/17] sharding poc --- .../coordinator/aws_keyspaces_client.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/uptime_service_validation/coordinator/aws_keyspaces_client.py b/uptime_service_validation/coordinator/aws_keyspaces_client.py index f651576..83bf8e3 100644 --- a/uptime_service_validation/coordinator/aws_keyspaces_client.py +++ b/uptime_service_validation/coordinator/aws_keyspaces_client.py @@ -208,6 +208,8 @@ def get_submissions( submitted_at_start, submitted_at_end ) + shard_condition = "shard in (" + ",".join(map(str, range(1, 60))) + ")" + if len(submitted_at_date_list) == 1: submitted_at_date = submitted_at_date_list[0] else: @@ -224,6 +226,10 @@ def get_submissions( parameters.append(submitted_at_date) elif submitted_at_dates: conditions.append(f"submitted_at_date IN ({submitted_at_dates})") + + # Add shard condition here since we have a submitted_at_date or submitted_at_dates + conditions.append(shard_condition) + if submitted_at_start: start_operator = ">=" if start_inclusive else ">" conditions.append(f"submitted_at {start_operator} %s") From c0a9790d95602ec1e4e296f78e0c660d5c127386 Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Tue, 27 Feb 2024 15:32:34 +0100 Subject: [PATCH 02/17] log job start and end --- uptime_service_validation/coordinator/server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/uptime_service_validation/coordinator/server.py b/uptime_service_validation/coordinator/server.py index 72fdd8c..1410fd0 100644 --- a/uptime_service_validation/coordinator/server.py +++ b/uptime_service_validation/coordinator/server.py @@ -245,7 +245,9 @@ def setUpValidatorPods(time_intervals, logging, worker_image, worker_tag): # Create the job and configmap in Kubernetes try: api_batch.create_namespaced_job(namespace, job) - logging.info(f"Job {job_name} created in namespace {namespace}") + logging.info( + f"Job {job_name} created in namespace {namespace}; start: {datetime_formatter(mini_batch[0])}, end: {datetime_formatter(mini_batch[1])}." + ) jobs.append(job_name) except Exception as e: logging.error(f"Error creating job {job_name}: {e}") From 3894c1b6c26828afef5822d1ecdf3e8c71299ce1 Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Tue, 27 Feb 2024 15:32:50 +0100 Subject: [PATCH 03/17] longer numeric score_percent --- uptime_service_validation/database/createDB.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/uptime_service_validation/database/createDB.sql b/uptime_service_validation/database/createDB.sql index a28ca08..599a63b 100644 --- a/uptime_service_validation/database/createDB.sql +++ b/uptime_service_validation/database/createDB.sql @@ -40,7 +40,7 @@ CREATE TABLE nodes ( block_producer_key TEXT, updated_at TIMESTAMPTZ(6), score INT, - score_percent NUMERIC(6,2), + score_percent NUMERIC(10,2), discord_id TEXT, email_id TEXT, application_status BOOLEAN @@ -75,7 +75,7 @@ CREATE TABLE score_history ( node_id INT, score_at TIMESTAMP(6), score INT, - score_percent NUMERIC(6,2), + score_percent NUMERIC(10,2), CONSTRAINT fk_nodes FOREIGN KEY(node_id) REFERENCES nodes(id) From 27ab3a2872f58f2402197ec60305157f8fb6061c Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Fri, 1 Mar 2024 09:09:27 +0100 Subject: [PATCH 04/17] add ShardCalculator --- tests/test_aws_keyspaces_client.py | 40 +++++++++++++++++++ .../coordinator/aws_keyspaces_client.py | 31 +++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/tests/test_aws_keyspaces_client.py b/tests/test_aws_keyspaces_client.py index 1a62dd5..16a1cd3 100644 --- a/tests/test_aws_keyspaces_client.py +++ b/tests/test_aws_keyspaces_client.py @@ -1,6 +1,7 @@ from datetime import datetime from uptime_service_validation.coordinator.aws_keyspaces_client import ( AWSKeyspacesClient, + ShardCalculator, ) @@ -19,3 +20,42 @@ def test_get_submitted_at_date_list(): end = datetime(2023, 11, 8, 0, 0, 0) result = AWSKeyspacesClient.get_submitted_at_date_list(start, end) assert result == ["2023-11-06", "2023-11-07", "2023-11-08"] + + +def test_calculate_shard(): + matrix = [ + [0, 0, 0], + [0, 1, 0], + [0, 59, 9], + [1, 3, 10], + [10, 0, 100], + [22, 23, 223], + [44, 45, 447], + [59, 54, 599], + [59, 59, 599], + ] + for i in matrix: + minute = i[0] + second = i[1] + expected = i[2] + assert ShardCalculator.calculate_shard(minute, second) == expected + + +def test_calculate_shards_in_range(): + # 1 between days + start_time = datetime(2024, 2, 29, 23, 58, 29) + end_time = datetime(2024, 3, 1, 0, 3, 29) + expected_cql_statement = "shard in (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,584,585,586,587,588,589,590,591,592,593,594,595,596,597,598,599)" + result_cql_statement = ShardCalculator.calculate_shards_in_range( + start_time, end_time + ) + assert result_cql_statement == expected_cql_statement + + # 2 within the same day + start_time = datetime(2024, 2, 29, 12, 58, 1) + end_time = datetime(2024, 2, 29, 12, 59, 59) + expected_cql_statement = "shard in (580,581,582,583,584,585,586,587,588,589,590,591,592,593,594,595,596,597,598,599)" + result_cql_statement = ShardCalculator.calculate_shards_in_range( + start_time, end_time + ) + assert result_cql_statement == expected_cql_statement diff --git a/uptime_service_validation/coordinator/aws_keyspaces_client.py b/uptime_service_validation/coordinator/aws_keyspaces_client.py index 83bf8e3..ddf1c44 100644 --- a/uptime_service_validation/coordinator/aws_keyspaces_client.py +++ b/uptime_service_validation/coordinator/aws_keyspaces_client.py @@ -7,7 +7,7 @@ from cassandra.policies import DCAwareRoundRobinPolicy from ssl import SSLContext, CERT_REQUIRED, PROTOCOL_TLS_CLIENT from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta from typing import Optional, ByteString, List import pandas as pd @@ -208,7 +208,9 @@ def get_submissions( submitted_at_start, submitted_at_end ) - shard_condition = "shard in (" + ",".join(map(str, range(1, 60))) + ")" + shard_condition = ShardCalculator.calculate_shards_in_range( + submitted_at_start, submitted_at_end + ) if len(submitted_at_date_list) == 1: submitted_at_date = submitted_at_date_list[0] @@ -279,6 +281,31 @@ def close(self): self.cluster.shutdown() +class ShardCalculator: + @classmethod + def calculate_shard(cls, minute, second): + base_shard = minute * 10 + additional_value = second // 6 # Integer division to find the segment + return base_shard + additional_value + + @classmethod + def calculate_shards_in_range(cls, start_time, end_time): + shards = set() + current_time = start_time + + while current_time < end_time: + shard = cls.calculate_shard(current_time.minute, current_time.second) + shards.add(shard) + # Move to the next second + current_time += timedelta(seconds=1) + + # Format the shards into a CQL statement string + shards_list = sorted(list(shards)) # Sort the shards for readability + shards_str = ",".join(map(str, shards_list)) + cql_statement = f"shard in ({shards_str})" + return cql_statement + + # Usage Example if __name__ == "__main__": client = AWSKeyspacesClient() From 1c295ad18f9d4562191993fe34bae1ff21ca894b Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Fri, 1 Mar 2024 14:36:39 +0100 Subject: [PATCH 05/17] query from coordinator using shards --- .../coordinator/coordinator.py | 45 ++++++++++--------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index 603bbfd..e9e4291 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -2,6 +2,7 @@ the validator processes, distribute work them and, when they're done, collect their results, compute scores for the delegation program and put the results in the database.""" + from dataclasses import asdict from datetime import datetime, timedelta, timezone import logging @@ -21,7 +22,7 @@ create_graph, apply_weights, bfs, - send_slack_message + send_slack_message, ) from uptime_service_validation.coordinator.server import ( bool_env_var_set, @@ -33,8 +34,7 @@ ) # Add project root to python path -project_root = os.path.abspath(os.path.join( - os.path.dirname(__file__), "..", "..")) +project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) sys.path.insert(0, project_root) @@ -56,8 +56,7 @@ def wait_until_batch_ends(self): "If the time window if the current batch is not yet over, sleep until it is." if self.batch.end_time > self.current_timestamp: delta = timedelta(minutes=2) - sleep_interval = (self.batch.end_time - - self.current_timestamp) + delta + sleep_interval = (self.batch.end_time - self.current_timestamp) + delta time_until = self.current_timestamp + sleep_interval logging.info( "All submissions are processed till date. " @@ -105,37 +104,39 @@ def __warn_if_work_took_longer_then_expected(self): self.current_timestamp, ) -def load_submissions(batch): +def load_submissions(batch, time_intervals): """Load submissions from Cassandra and return them as a DataFrame.""" submissions = [] submissions_verified = [] cassandra = AWSKeyspacesClient() + try: cassandra.connect() - submissions = cassandra.get_submissions( - submitted_at_start=batch.start_time, - submitted_at_end=batch.end_time, - start_inclusive=True, - end_inclusive=False, - ) - # for further processing - # we use only submissions verified = True and validation_error = None - for submission in submissions: - if submission.verified and submission.validation_error is None: - submissions_verified.append(submission) + for time_interval in time_intervals: + submissions.extend( + cassandra.get_submissions( + submitted_at_start=time_interval[0], + submitted_at_end=time_interval[1], + start_inclusive=True, + end_inclusive=False, + ) + ) except Exception as e: logging.error("Error in loading submissions: %s", e) return pd.DataFrame([]) finally: cassandra.close() + # for further processing + # we use only submissions verified = True and validation_error = None + for submission in submissions: + if submission.verified and submission.validation_error is None: + submissions_verified.append(submission) + all_submissions_count = len(submissions) submissions_to_process_count = len(submissions_verified) logging.info("number of all submissions: %s", all_submissions_count) - logging.info( - "number of submissions to process: %s", - submissions_to_process_count - ) + logging.info("number of submissions to process: %s", submissions_to_process_count) if submissions_to_process_count < all_submissions_count: logging.warning( "some submissions were not processed, because they were not \ @@ -331,7 +332,7 @@ def process(db, state): logging, ) - state_hash_df = load_submissions(state.batch) + state_hash_df = load_submissions(state.batch, time_intervals) if not state_hash_df.empty: try: bot_log_id = process_statehash_df( From ed2eabf8d040e2ede2b3a80ae233056ab9710462 Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Mon, 4 Mar 2024 14:56:37 +0100 Subject: [PATCH 06/17] setUpValidatorPods update --- uptime_service_validation/coordinator/server.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/uptime_service_validation/coordinator/server.py b/uptime_service_validation/coordinator/server.py index 1410fd0..4f5c66d 100644 --- a/uptime_service_validation/coordinator/server.py +++ b/uptime_service_validation/coordinator/server.py @@ -153,6 +153,14 @@ def setUpValidatorPods(time_intervals, logging, worker_image, worker_tag): name="NO_CHECKS", value=os.environ.get("NO_CHECKS"), ), + client.V1EnvVar( + name="AWS_ACCESS_KEY_ID", + value=os.environ.get("AWS_ACCESS_KEY_ID"), + ), + client.V1EnvVar( + name="AWS_SECRET_ACCESS_KEY", + value=os.environ.get("AWS_SECRET_ACCESS_KEY"), + ), ] # Entrypoint configmap name @@ -217,7 +225,8 @@ def setUpValidatorPods(time_intervals, logging, worker_image, worker_tag): init_container = client.V1Container( name="delegation-verify-init", image=f"{worker_image}:{worker_tag}", - command=["/bin/authenticate.sh"], + # command=["/bin/authenticate.sh"], + command=["ls"], env=env_vars, image_pull_policy=os.environ.get("IMAGE_PULL_POLICY", "IfNotPresent"), volume_mounts=[auth_volume_mount], From 6d40c74889c85f31a8852775005b4bb4dd3ab5dc Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Tue, 5 Mar 2024 09:29:15 +0100 Subject: [PATCH 07/17] use time_intervals when loading submissions --- .../coordinator/coordinator.py | 46 ++++++++----------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index e9e4291..1ada58c 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -104,7 +104,8 @@ def __warn_if_work_took_longer_then_expected(self): self.current_timestamp, ) -def load_submissions(batch, time_intervals): + +def load_submissions(time_intervals): """Load submissions from Cassandra and return them as a DataFrame.""" submissions = [] submissions_verified = [] @@ -142,9 +143,8 @@ def load_submissions(batch, time_intervals): "some submissions were not processed, because they were not \ verified or had validation errors" ) - return pd.DataFrame( - [asdict(submission) for submission in submissions_verified] - ) + return pd.DataFrame([asdict(submission) for submission in submissions_verified]) + def process_statehash_df(db, batch, state_hash_df, verification_time): """Process the state hash dataframe and return the master dataframe.""" @@ -179,8 +179,7 @@ def process_statehash_df(db, batch, state_hash_df, verification_time): nodes_in_cur_batch = pd.DataFrame( master_df["submitter"].unique(), columns=["block_producer_key"] ) - node_to_insert = find_new_values_to_insert( - existing_nodes, nodes_in_cur_batch) + node_to_insert = find_new_values_to_insert(existing_nodes, nodes_in_cur_batch) if not node_to_insert.empty: node_to_insert["updated_at"] = datetime.now(timezone.utc) @@ -191,15 +190,13 @@ def process_statehash_df(db, batch, state_hash_df, verification_time): columns={ "file_updated": "file_timestamps", "submitter": "block_producer_key", - } + }, ) - relation_df, p_selected_node_df = db.get_previous_statehash( - batch.bot_log_id) + relation_df, p_selected_node_df = db.get_previous_statehash(batch.bot_log_id) p_map = list(get_relations(relation_df)) c_selected_node = filter_state_hash_percentage(master_df) - batch_graph = create_graph( - master_df, p_selected_node_df, c_selected_node, p_map) + batch_graph = create_graph(master_df, p_selected_node_df, c_selected_node, p_map) weighted_graph = apply_weights( batch_graph=batch_graph, c_selected_node=c_selected_node, @@ -218,8 +215,7 @@ def process_statehash_df(db, batch, state_hash_df, verification_time): # but it's not used anywhere inside the function) ) point_record_df = master_df[ - master_df["state_hash"].isin( - shortlisted_state_hash_df["state_hash"].values) + master_df["state_hash"].isin(shortlisted_state_hash_df["state_hash"].values) ] for index, row in shortlisted_state_hash_df.iterrows(): @@ -228,15 +224,13 @@ def process_statehash_df(db, batch, state_hash_df, verification_time): p_selected_node_df = shortlisted_state_hash_df.copy() parent_hash = [] for s in shortlisted_state_hash_df["state_hash"].values: - p_hash = master_df[master_df["state_hash"] == s][ - "parent_state_hash" - ].values[0] + p_hash = master_df[master_df["state_hash"] == s]["parent_state_hash"].values[0] parent_hash.append(p_hash) shortlisted_state_hash_df["parent_state_hash"] = parent_hash - p_map = list(get_relations( - shortlisted_state_hash_df[["parent_state_hash", "state_hash"]] - )) + p_map = list( + get_relations(shortlisted_state_hash_df[["parent_state_hash", "state_hash"]]) + ) if not point_record_df.empty: file_timestamp = master_df.iloc[-1]["file_timestamps"] else: @@ -252,7 +246,7 @@ def process_statehash_df(db, batch, state_hash_df, verification_time): file_timestamp, batch.start_time.timestamp(), batch.end_time.timestamp(), - verification_time.total_seconds() + verification_time.total_seconds(), ) bot_log_id = db.create_bot_log(values) @@ -261,8 +255,7 @@ def process_statehash_df(db, batch, state_hash_df, verification_time): if not point_record_df.empty: point_record_df.loc[:, "amount"] = 1 - point_record_df.loc[:, "created_at"] = datetime.now( - timezone.utc) + point_record_df.loc[:, "created_at"] = datetime.now(timezone.utc) point_record_df.loc[:, "bot_log_id"] = bot_log_id point_record_df = point_record_df[ [ @@ -288,7 +281,7 @@ def process(db, state): logging.info( "iteration start at: %s, cur_timestamp: %s", state.batch.start_time, - state.current_timestamp + state.current_timestamp, ) logging.info( "running for batch: %s - %s.", state.batch.start_time, state.batch.end_time @@ -296,8 +289,7 @@ def process(db, state): # sleep until batch ends, update the state accordingly, then continue. state.wait_until_batch_ends() - time_intervals = list(state.batch.split( - int(os.environ["MINI_BATCH_NUMBER"]))) + time_intervals = list(state.batch.split(int(os.environ["MINI_BATCH_NUMBER"]))) worker_image = os.environ["WORKER_IMAGE"] worker_tag = os.environ["WORKER_TAG"] @@ -313,7 +305,7 @@ def process(db, state): logging.info( "reading ZKValidator results from a db between the time range: %s - %s", state.batch.start_time, - state.batch.end_time + state.batch.end_time, ) logging.info("ZKValidator results read from a db in %s.", timer.duration) @@ -332,7 +324,7 @@ def process(db, state): logging, ) - state_hash_df = load_submissions(state.batch, time_intervals) + state_hash_df = load_submissions(time_intervals) if not state_hash_df.empty: try: bot_log_id = process_statehash_df( From 4734ce72ce6c269e5b603f6f334dd2338ed50344 Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Thu, 7 Mar 2024 16:00:12 +0100 Subject: [PATCH 08/17] update condition in load_submissions to include validation_error =="" --- uptime_service_validation/coordinator/coordinator.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index 1ada58c..b13ff04 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -129,9 +129,11 @@ def load_submissions(time_intervals): cassandra.close() # for further processing - # we use only submissions verified = True and validation_error = None + # we use only submissions verified = True and validation_error = None or "" for submission in submissions: - if submission.verified and submission.validation_error is None: + if submission.verified and ( + submission.validation_error is None or submission.validation_error == "" + ): submissions_verified.append(submission) all_submissions_count = len(submissions) From 6dd5c3607a34ca11ec0d3a98caab04a207632230 Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Thu, 7 Mar 2024 16:03:49 +0100 Subject: [PATCH 09/17] add protocol version when connectiong with Cassandra creds --- uptime_service_validation/coordinator/aws_keyspaces_client.py | 1 + 1 file changed, 1 insertion(+) diff --git a/uptime_service_validation/coordinator/aws_keyspaces_client.py b/uptime_service_validation/coordinator/aws_keyspaces_client.py index ddf1c44..afa3bf7 100644 --- a/uptime_service_validation/coordinator/aws_keyspaces_client.py +++ b/uptime_service_validation/coordinator/aws_keyspaces_client.py @@ -68,6 +68,7 @@ def __init__(self): ssl_context=self.ssl_context, auth_provider=self.auth_provider, port=int(self.cassandra_port), + protocol_version=ProtocolVersion.V4, ) else: self.auth_provider = self._create_sigv4auth_provider() From 992c3f1b8785e45f93c7ec46503f2b05cecbcf8f Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Thu, 7 Mar 2024 16:10:09 +0100 Subject: [PATCH 10/17] update local setUpValidatorProcesses --- uptime_service_validation/coordinator/server.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/uptime_service_validation/coordinator/server.py b/uptime_service_validation/coordinator/server.py index 4f5c66d..69a6663 100644 --- a/uptime_service_validation/coordinator/server.py +++ b/uptime_service_validation/coordinator/server.py @@ -218,7 +218,11 @@ def setUpValidatorPods(time_intervals, logging, worker_image, worker_tag): resources=resource_requirements_container, env=env_vars, image_pull_policy=os.environ.get("IMAGE_PULL_POLICY", "IfNotPresent"), - volume_mounts=[auth_volume_mount, entrypoint_volume_mount, cassandra_ssl_volume_mount], + volume_mounts=[ + auth_volume_mount, + entrypoint_volume_mount, + cassandra_ssl_volume_mount, + ], ) # Define the init container @@ -308,6 +312,8 @@ def setUpValidatorProcesses(time_intervals, logging, worker_image, worker_tag): "-e", "CASSANDRA_PASSWORD", "-e", + "AWS_KEYSPACE", + "-e", "AWS_ACCESS_KEY_ID", "-e", "AWS_SECRET_ACCESS_KEY", @@ -324,9 +330,9 @@ def setUpValidatorProcesses(time_intervals, logging, worker_image, worker_tag): "-e", "CQLSH=/bin/cqlsh-expansion", image, - "cassandra", - "--keyspace", - os.environ.get("AWS_KEYSPACE"), + # "cassandra", + # "--keyspace", + # os.environ.get("AWS_KEYSPACE"), f"{datetime_formatter(mini_batch[0])}", f"{datetime_formatter(mini_batch[1])}", ] From 15be7c3493e7a1708e28a6a9d0dd261e1aff74d2 Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Mon, 11 Mar 2024 14:54:28 +0100 Subject: [PATCH 11/17] sharding per 24h --- tests/test_aws_keyspaces_client.py | 38 +++++++++++-------- .../coordinator/aws_keyspaces_client.py | 22 ++++++++--- 2 files changed, 40 insertions(+), 20 deletions(-) diff --git a/tests/test_aws_keyspaces_client.py b/tests/test_aws_keyspaces_client.py index 16a1cd3..e2a85b8 100644 --- a/tests/test_aws_keyspaces_client.py +++ b/tests/test_aws_keyspaces_client.py @@ -24,28 +24,27 @@ def test_get_submitted_at_date_list(): def test_calculate_shard(): matrix = [ - [0, 0, 0], - [0, 1, 0], - [0, 59, 9], - [1, 3, 10], - [10, 0, 100], - [22, 23, 223], - [44, 45, 447], - [59, 54, 599], - [59, 59, 599], + [0, 0, 0, 0], + [0, 0, 1, 0], + [0, 2, 59, 1], + [12, 1, 3, 300], + [15, 10, 0, 379], + [23, 22, 23, 584], + [23, 59, 59, 599], ] for i in matrix: - minute = i[0] - second = i[1] - expected = i[2] - assert ShardCalculator.calculate_shard(minute, second) == expected + hour = i[0] + minute = i[1] + second = i[2] + expected = i[3] + assert ShardCalculator.calculate_shard(hour, minute, second) == expected def test_calculate_shards_in_range(): # 1 between days start_time = datetime(2024, 2, 29, 23, 58, 29) end_time = datetime(2024, 3, 1, 0, 3, 29) - expected_cql_statement = "shard in (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,584,585,586,587,588,589,590,591,592,593,594,595,596,597,598,599)" + expected_cql_statement = "shard in (0,1,599)" result_cql_statement = ShardCalculator.calculate_shards_in_range( start_time, end_time ) @@ -54,7 +53,16 @@ def test_calculate_shards_in_range(): # 2 within the same day start_time = datetime(2024, 2, 29, 12, 58, 1) end_time = datetime(2024, 2, 29, 12, 59, 59) - expected_cql_statement = "shard in (580,581,582,583,584,585,586,587,588,589,590,591,592,593,594,595,596,597,598,599)" + expected_cql_statement = "shard in (324)" + result_cql_statement = ShardCalculator.calculate_shards_in_range( + start_time, end_time + ) + assert result_cql_statement == expected_cql_statement + + # 2 shard boundary + start_time = datetime(2024, 2, 29, 0, 0, 0) + end_time = datetime(2024, 2, 29, 0, 2, 24) + expected_cql_statement = "shard in (0,1)" result_cql_statement = ShardCalculator.calculate_shards_in_range( start_time, end_time ) diff --git a/uptime_service_validation/coordinator/aws_keyspaces_client.py b/uptime_service_validation/coordinator/aws_keyspaces_client.py index afa3bf7..1324fb9 100644 --- a/uptime_service_validation/coordinator/aws_keyspaces_client.py +++ b/uptime_service_validation/coordinator/aws_keyspaces_client.py @@ -284,10 +284,8 @@ def close(self): class ShardCalculator: @classmethod - def calculate_shard(cls, minute, second): - base_shard = minute * 10 - additional_value = second // 6 # Integer division to find the segment - return base_shard + additional_value + def calculate_shard(cls, hour, minute, second): + return (3600 * hour + 60 * minute + second) // 144 @classmethod def calculate_shards_in_range(cls, start_time, end_time): @@ -295,11 +293,25 @@ def calculate_shards_in_range(cls, start_time, end_time): current_time = start_time while current_time < end_time: - shard = cls.calculate_shard(current_time.minute, current_time.second) + shard = cls.calculate_shard( + current_time.hour, current_time.minute, current_time.second + ) shards.add(shard) # Move to the next second current_time += timedelta(seconds=1) + # Check if endTime falls exactly on a new shard boundary and add it if necessary + end_shard = cls.calculate_shard(end_time.hour, end_time.minute, end_time.second) + if end_shard not in shards: + # Check if end_time is exactly on the boundary of a new shard + total_seconds_end = ( + (end_time.hour * 3600) + (end_time.minute * 60) + end_time.second + ) + if total_seconds_end % 144 == 0: + shards.add(end_shard) + + # Convert the set of unique shards into a sorted list for readability + shards_list = sorted(list(shards)) # Format the shards into a CQL statement string shards_list = sorted(list(shards)) # Sort the shards for readability shards_str = ",".join(map(str, shards_list)) From 1a325b8bf93b76569b6a3d89da77a31c3b33ea8a Mon Sep 17 00:00:00 2001 From: Simonas Narbutas Date: Tue, 12 Mar 2024 10:16:10 +0200 Subject: [PATCH 12/17] Remove hardcoded cassandra ssl mounts --- uptime_service_validation/coordinator/server.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/uptime_service_validation/coordinator/server.py b/uptime_service_validation/coordinator/server.py index 69a6663..2b924c4 100644 --- a/uptime_service_validation/coordinator/server.py +++ b/uptime_service_validation/coordinator/server.py @@ -179,24 +179,12 @@ def setUpValidatorPods(time_intervals, logging, worker_image, worker_tag): ), # 0777 permission in octal as int ) - cassandra_ssl_volume = client.V1Volume( - name="cassandra-crt", - secret=client.V1SecretVolumeSource( - secret_name="uptime-service-cassandra-crt" - ), - ) - # Define the volumeMounts auth_volume_mount = client.V1VolumeMount( name="auth-volume", mount_path=os.environ.get("AUTH_VOLUME_MOUNT_PATH"), ) - cassandra_ssl_volume_mount = client.V1VolumeMount( - name="cassandra-crt", - mount_path="/certs", - ) - entrypoint_volume_mount = client.V1VolumeMount( name="entrypoint-volume", mount_path="/bin/entrypoint", @@ -221,7 +209,6 @@ def setUpValidatorPods(time_intervals, logging, worker_image, worker_tag): volume_mounts=[ auth_volume_mount, entrypoint_volume_mount, - cassandra_ssl_volume_mount, ], ) @@ -249,7 +236,7 @@ def setUpValidatorPods(time_intervals, logging, worker_image, worker_tag): containers=[container], restart_policy="Never", service_account_name=service_account_name, - volumes=[auth_volume, entrypoint_volume, cassandra_ssl_volume], + volumes=[auth_volume, entrypoint_volume], ) ), ), From 07ea3c726bfa165c0d0a92c392f6dd253cfe59ac Mon Sep 17 00:00:00 2001 From: Simonas Narbutas Date: Tue, 12 Mar 2024 10:35:22 +0200 Subject: [PATCH 13/17] ssl is hardcoded, so should be it's var --- uptime_service_validation/coordinator/server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uptime_service_validation/coordinator/server.py b/uptime_service_validation/coordinator/server.py index 2b924c4..b71aebd 100644 --- a/uptime_service_validation/coordinator/server.py +++ b/uptime_service_validation/coordinator/server.py @@ -127,7 +127,7 @@ def setUpValidatorPods(time_intervals, logging, worker_image, worker_tag): client.V1EnvVar(name="CASSANDRA_USE_SSL", value="1"), client.V1EnvVar( name="SSL_CERTFILE", - value=os.environ.get("SSL_CERTFILE"), + value="/root/.cassandra/sf-class2-root.crt", ), client.V1EnvVar( name="CQLSH", From c579a62e167c0a151e50dfc6ac107dd879ff8904 Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Wed, 13 Mar 2024 11:48:00 +0100 Subject: [PATCH 14/17] use RetryPolicy --- .../coordinator/aws_keyspaces_client.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/uptime_service_validation/coordinator/aws_keyspaces_client.py b/uptime_service_validation/coordinator/aws_keyspaces_client.py index 1324fb9..b46c3bc 100644 --- a/uptime_service_validation/coordinator/aws_keyspaces_client.py +++ b/uptime_service_validation/coordinator/aws_keyspaces_client.py @@ -4,7 +4,7 @@ from cassandra.auth import PlainTextAuthProvider from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT from cassandra_sigv4.auth import SigV4AuthProvider -from cassandra.policies import DCAwareRoundRobinPolicy +from cassandra.policies import DCAwareRoundRobinPolicy, RetryPolicy from ssl import SSLContext, CERT_REQUIRED, PROTOCOL_TLS_CLIENT from dataclasses import dataclass from datetime import datetime, timedelta @@ -63,11 +63,16 @@ def __init__(self): self.auth_provider = PlainTextAuthProvider( username=self.cassandra_user, password=self.cassandra_pass ) + profile = ExecutionProfile( + # load_balancing_policy=DCAwareRoundRobinPolicy(local_dc=self.aws_region), + retry_policy=RetryPolicy(), + ) self.cluster = Cluster( [self.cassandra_host], ssl_context=self.ssl_context, auth_provider=self.auth_provider, port=int(self.cassandra_port), + execution_profiles={EXEC_PROFILE_DEFAULT: profile}, protocol_version=ProtocolVersion.V4, ) else: @@ -121,7 +126,8 @@ def _create_sigv4auth_provider(self): def _create_cluster(self): profile = ExecutionProfile( - load_balancing_policy=DCAwareRoundRobinPolicy(local_dc=self.aws_region) + load_balancing_policy=DCAwareRoundRobinPolicy(local_dc=self.aws_region), + retry_policy=RetryPolicy(), ) return Cluster( [self.cassandra_host], From 5bcc4b3877529687d998fba4f642c1dc06464478 Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Wed, 13 Mar 2024 11:51:20 +0100 Subject: [PATCH 15/17] slight refactor of creating a cluster session --- .../coordinator/aws_keyspaces_client.py | 28 +++++++++---------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/uptime_service_validation/coordinator/aws_keyspaces_client.py b/uptime_service_validation/coordinator/aws_keyspaces_client.py index b46c3bc..b04eb05 100644 --- a/uptime_service_validation/coordinator/aws_keyspaces_client.py +++ b/uptime_service_validation/coordinator/aws_keyspaces_client.py @@ -64,6 +64,7 @@ def __init__(self): username=self.cassandra_user, password=self.cassandra_pass ) profile = ExecutionProfile( + # assuming this is for hosted Cassandra, load balancing policy to be determined # load_balancing_policy=DCAwareRoundRobinPolicy(local_dc=self.aws_region), retry_policy=RetryPolicy(), ) @@ -77,7 +78,18 @@ def __init__(self): ) else: self.auth_provider = self._create_sigv4auth_provider() - self.cluster = self._create_cluster() + profile = ExecutionProfile( + load_balancing_policy=DCAwareRoundRobinPolicy(local_dc=self.aws_region), + retry_policy=RetryPolicy(), + ) + self.cluster = Cluster( + [self.cassandra_host], + ssl_context=self.ssl_context, + auth_provider=self.auth_provider, + port=int(self.cassandra_port), + execution_profiles={EXEC_PROFILE_DEFAULT: profile}, + protocol_version=ProtocolVersion.V4, + ) def _create_ssl_context(self): ssl_context = SSLContext(PROTOCOL_TLS_CLIENT) @@ -124,20 +136,6 @@ def _create_sigv4auth_provider(self): ) return SigV4AuthProvider(boto_session) - def _create_cluster(self): - profile = ExecutionProfile( - load_balancing_policy=DCAwareRoundRobinPolicy(local_dc=self.aws_region), - retry_policy=RetryPolicy(), - ) - return Cluster( - [self.cassandra_host], - ssl_context=self.ssl_context, - auth_provider=self.auth_provider, - port=int(self.cassandra_port), - execution_profiles={EXEC_PROFILE_DEFAULT: profile}, - protocol_version=ProtocolVersion.V4, - ) - def connect(self): self.session = self.cluster.connect() From 42337229b8fce4cab73e7562ffb4d4539cf17f31 Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Wed, 13 Mar 2024 14:36:58 +0100 Subject: [PATCH 16/17] add request_timeout and custom ExponentialBackOffRetryPolicy --- .../coordinator/aws_keyspaces_client.py | 59 ++++++++++++++++++- 1 file changed, 57 insertions(+), 2 deletions(-) diff --git a/uptime_service_validation/coordinator/aws_keyspaces_client.py b/uptime_service_validation/coordinator/aws_keyspaces_client.py index b04eb05..4db7674 100644 --- a/uptime_service_validation/coordinator/aws_keyspaces_client.py +++ b/uptime_service_validation/coordinator/aws_keyspaces_client.py @@ -1,5 +1,7 @@ import os import boto3 +import time +import random from cassandra import ProtocolVersion from cassandra.auth import PlainTextAuthProvider from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT @@ -58,6 +60,7 @@ def __init__(self): self.aws_ssl_certificate_path = os.environ.get("SSL_CERTFILE") self.aws_region = self.cassandra_host.split(".")[1] self.ssl_context = self._create_ssl_context() + self.request_timeout = 20.0 if self.cassandra_user and self.cassandra_pass: self.auth_provider = PlainTextAuthProvider( @@ -66,7 +69,8 @@ def __init__(self): profile = ExecutionProfile( # assuming this is for hosted Cassandra, load balancing policy to be determined # load_balancing_policy=DCAwareRoundRobinPolicy(local_dc=self.aws_region), - retry_policy=RetryPolicy(), + retry_policy=ExponentialBackOffRetryPolicy(), + request_timeout=self.request_timeout, ) self.cluster = Cluster( [self.cassandra_host], @@ -80,7 +84,8 @@ def __init__(self): self.auth_provider = self._create_sigv4auth_provider() profile = ExecutionProfile( load_balancing_policy=DCAwareRoundRobinPolicy(local_dc=self.aws_region), - retry_policy=RetryPolicy(), + retry_policy=ExponentialBackOffRetryPolicy(), + request_timeout=self.request_timeout, ) self.cluster = Cluster( [self.cassandra_host], @@ -286,6 +291,56 @@ def close(self): self.cluster.shutdown() +class ExponentialBackOffRetryPolicy(RetryPolicy): + def __init__(self, base_delay=0.1, max_delay=10, max_retries=10): + self.base_delay = base_delay # seconds + self.max_delay = max_delay # seconds + self.max_retries = max_retries + + def get_backoff_time(self, retry_num): + # Calculate exponential backoff time + delay = min(self.max_delay, self.base_delay * (2**retry_num)) + # Add some randomness to avoid thundering herd problem + jitter = random.uniform(0, 0.1) * delay + return delay + jitter + + def on_read_timeout( + self, + query, + consistency, + required_responses, + received_responses, + data_retrieved, + retry_num, + ): + if retry_num >= self.max_retries: + return (self.RETHROW, None) + time.sleep(self.get_backoff_time(retry_num)) + return (self.RETRY, consistency) + + def on_write_timeout( + self, + query, + consistency, + write_type, + required_responses, + received_responses, + retry_num, + ): + if retry_num >= self.max_retries: + return (self.RETHROW, None) + time.sleep(self.get_backoff_time(retry_num)) + return (self.RETRY, consistency) + + def on_unavailable( + self, query, consistency, required_replica, alive_replica, retry_num + ): + if retry_num >= self.max_retries: + return (self.RETHROW, None) + time.sleep(self.get_backoff_time(retry_num)) + return (self.RETRY_NEXT_HOST, None) + + class ShardCalculator: @classmethod def calculate_shard(cls, hour, minute, second): From 77b8b3681bc051d8c4f35c02bf3d8890ff2eda71 Mon Sep 17 00:00:00 2001 From: Piotr Stachyra Date: Thu, 14 Mar 2024 11:57:41 +0100 Subject: [PATCH 17/17] remove CQLSH env var --- uptime_service_validation/coordinator/server.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/uptime_service_validation/coordinator/server.py b/uptime_service_validation/coordinator/server.py index b71aebd..197be64 100644 --- a/uptime_service_validation/coordinator/server.py +++ b/uptime_service_validation/coordinator/server.py @@ -129,10 +129,6 @@ def setUpValidatorPods(time_intervals, logging, worker_image, worker_tag): name="SSL_CERTFILE", value="/root/.cassandra/sf-class2-root.crt", ), - client.V1EnvVar( - name="CQLSH", - value=os.environ.get("CQLSH"), - ), client.V1EnvVar( name="AUTH_VOLUME_MOUNT_PATH", value=os.environ.get("AUTH_VOLUME_MOUNT_PATH"), @@ -314,8 +310,6 @@ def setUpValidatorProcesses(time_intervals, logging, worker_image, worker_tag): "SSL_CERTFILE=/var/ssl/ssl-cert.crt", "-e", "CASSANDRA_USE_SSL=1", - "-e", - "CQLSH=/bin/cqlsh-expansion", image, # "cassandra", # "--keyspace",