From 82e5c38ba5d2495820a79596b5124ed05f8fe58f Mon Sep 17 00:00:00 2001 From: Sventimir Date: Wed, 28 Feb 2024 08:39:23 +0100 Subject: [PATCH 01/10] Reorganise the imports to fix linter's complaints. --- .../coordinator/coordinator.py | 35 ++++++++++++++----- 1 file changed, 27 insertions(+), 8 deletions(-) diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index 95bbc5b..a90248b 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -1,14 +1,33 @@ -from dataclasses import dataclass -import sys -import os +from dataclasses import asdict +from datetime import datetime, timedelta, timezone import logging +import os +import sys +from time import sleep, time + +from dotenv import load_dotenv import pandas as pd import psycopg2 -from datetime import datetime, timedelta, timezone -from dotenv import load_dotenv -from time import sleep, time -from dataclasses import asdict -from uptime_service_validation.coordinator.helper import * +from uptime_service_validation.coordinator.helper import ( + getTimeBatches, + getBatchTimings, + getPreviousStatehash, + getRelationList, + getStatehashDF, + findNewValuesToInsert, + createStatehash, + createNodeRecord, + filterStateHashPercentage, + createGraph, + applyWeights, + bfs, + createBotLog, + insertStatehashResults, + createPointRecord, + updateScoreboard, + getExistingNodes, + sendSlackMessage +) from uptime_service_validation.coordinator.server import ( bool_env_var_set, setUpValidatorPods, From ca427fc7811e248e013576b8613c3bd9e60667ff Mon Sep 17 00:00:00 2001 From: Sventimir Date: Wed, 28 Feb 2024 09:10:40 +0100 Subject: [PATCH 02/10] Add docstrings for State and rearrange its methods a bit. --- .../coordinator/coordinator.py | 50 ++++++++++++------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index a90248b..428eb2c 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -1,3 +1,7 @@ +"""The coordinator script of the uptime service. Its job is to manage +the validator processes, distribute work them and, when they're done, +collect their results, compute scores for the delegation program and +put the results in the database.""" from dataclasses import asdict from datetime import datetime, timedelta, timezone import logging @@ -43,6 +47,11 @@ class State: + """The state aggregates all the data that remains constant while processing + a single batch, but changes between batches. It also takes care of valid + state transitions. It should likely be split into smaller chunks, but at + this stage it's sufficient.""" + def __init__(self, connection, bot_log_id, prev_batch_end, current_batch_end): self.bot_log_id = bot_log_id self.conn = connection @@ -54,49 +63,56 @@ def __init__(self, connection, bot_log_id, prev_batch_end, current_batch_end): self.loop_count = 0 self.stop = False - def update_timestamp(self): - self.current_timestamp = datetime.now(timezone.utc) - - def next_loop(self): - self.loop_count += 1 - logging.info(f"Processed it loop count : {self.loop_count}") - def wait_until_batch_ends(self): + "If the time window if the current batch is not yet over, sleep until it is." if self.current_batch_end > self.current_timestamp: delta = timedelta(minutes=2) sleep_interval = (self.current_batch_end - self.current_timestamp) + delta time_until = self.current_timestamp + sleep_interval logging.info( - "all submissions are processed till date. Will wait %s (until %s) before starting next batch...", + "All submissions are processed till date. " + "Will wait %s (until %s) before starting next batch...", sleep_interval, time_until, ) sleep(sleep_interval.total_seconds()) - self.update_timestamp() + self.__update_timestamp() def advance_to_next_batch(self, next_bot_log_id): + """Update the state so that it describes the next batch in line; + transitioning the state to the next loop pass.""" self.retrials_left = os.environ["RETRY_COUNT"] self.bot_log_id = next_bot_log_id self.prev_batch_end = self.current_batch_end self.current_batch_end = self.prev_batch_end + timedelta(minutes=self.interval) - self.warn_if_work_took_longer_then_expected() - self.next_loop() - self.update_timestamp() + self.__warn_if_work_took_longer_then_expected() + self.__next_loop() + self.__update_timestamp() def retry_batch(self): + "Transition the state for retrial of the current (failed) batch." if self.retrials_left > 0: self.retrials_left -= 1 logging.error("Error in processing, retrying the batch...") logging.error("Error in processing, retry count exceeded... Exitting!") self.stop = True - self.warn_if_work_took_longer_then_expected() - self.next_loop() - self.update_timestamp() + self.__warn_if_work_took_longer_then_expected() + self.__next_loop() + self.__update_timestamp() + + def __update_timestamp(self): + self.current_timestamp = datetime.now(timezone.utc) + + def __next_loop(self): + self.loop_count += 1 + logging.info("Processed it loop count : %s.", self.loop_count) - def warn_if_work_took_longer_then_expected(self): + def __warn_if_work_took_longer_then_expected(self): if self.prev_batch_end >= self.current_timestamp: logging.warning( - "It seems that batch processing took a bit too long than expected as prev_batch_end: %s >= cur_timestamp: %s... progressing to the next batch anyway...", + "It seems that batch processing took a bit too long than \ + expected as prev_batch_end: %s >= cur_timestamp: %s... \ + progressing to the next batch anyway...", self.prev_batch_end, self.current_timestamp, ) From 8f2859facd3d0f90d45f183ccfb758672dfd54a6 Mon Sep 17 00:00:00 2001 From: Sventimir Date: Wed, 28 Feb 2024 09:27:14 +0100 Subject: [PATCH 03/10] Fix linter's complaints (logging, formatting and style). --- .../coordinator/coordinator.py | 48 +++++++++++-------- 1 file changed, 27 insertions(+), 21 deletions(-) diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index 428eb2c..a086d01 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -119,15 +119,18 @@ def __warn_if_work_took_longer_then_expected(self): def process(state): + """Perform a signle iteration of the coordinator loop, processing exactly + one batch of submissions. Launch verifiers to process submissions, then + compute scores and store them in the database.""" logging.info( - "iteration start at: {0}, cur_timestamp: {1}".format( - state.prev_batch_end, state.current_timestamp - ) + "iteration start at: %s, cur_timestamp: %s", + state.prev_batch_end, + state.current_timestamp ) existing_state_df = getStatehashDF(state.conn, logging) existing_nodes = getExistingNodes(state.conn, logging) logging.info( - f"running for batch: {state.prev_batch_end} - {state.current_batch_end}" + "running for batch: %s - %s.", state.prev_batch_end, state.current_batch_end ) # sleep until batch ends, update the state accordingly, then continue. @@ -151,23 +154,23 @@ def process(state): end = time() # Step 4 We need to read the ZKValidator results from a db. logging.info( - "reading ZKValidator results from a db between the time range: {0} - {1}".format( - state.prev_batch_end, state.current_batch_end - ) + "reading ZKValidator results from a db between the time range: %s - %s", + state.prev_batch_end, + state.current_batch_end ) - webhookURL = os.environ.get("WEBHOOK_URL") - if webhookURL != None: + webhook_url = os.environ.get("WEBHOOK_URL") + if webhook_url is not None: if end - start < float(os.environ["ALARM_ZK_LOWER_LIMIT_SEC"]): sendSlackMessage( - webhookURL, - f"ZkApp Validation took {end- start} seconds, which is too quick", + webhook_url, + f"ZkApp Validation took {end - start} seconds, which is too quick", logging, ) if end - start > float(os.environ["ALARM_ZK_UPPER_LIMIT_SEC"]): sendSlackMessage( - webhookURL, - f"ZkApp Validation took {end- start} seconds, which is too long", + webhook_url, + f"ZkApp Validation took {end - start} seconds, which is too long", logging, ) @@ -192,13 +195,15 @@ def process(state): all_submissions_count = len(submissions) submissions_to_process_count = len(submissions_verified) - logging.info("number of all submissions: {0}".format(all_submissions_count)) + logging.info("number of all submissions: %s", all_submissions_count) logging.info( - "number of submissions to process: {0}".format(submissions_to_process_count) + "number of submissions to process: %s", + submissions_to_process_count ) if submissions_to_process_count < all_submissions_count: logging.warning( - "some submissions were not processed, because they were not verified or had validation errors" + "some submissions were not processed, because they were not \ + verified or had validation errors" ) # Step 5 checks for forks and writes to the db. @@ -267,7 +272,8 @@ def process(state): graph=weighted_graph, queue_list=queue_list, node=queue_list[0], - # batch_statehash=batch_state_hash, (this used to be here in old code, but it's not used anywhere inside the function) + # batch_statehash=batch_state_hash, (this used to be here in old code, + # but it's not used anywhere inside the function) ) point_record_df = master_df[ master_df["state_hash"].isin(shortlisted_state_hash_df["state_hash"].values) @@ -294,10 +300,9 @@ def process(state): else: file_timestamp = state.current_batch_end logging.info( - "empty point record for start epoch {0} end epoch {1} ".format( + "empty point record for start epoch %s end epoch %s", state.prev_batch_end.timestamp(), state.current_batch_end.timestamp(), - ) ) values = ( @@ -333,7 +338,7 @@ def process(state): createPointRecord(state.conn, logging, point_record_df) except Exception as error: state.conn.rollback() - logging.error(ERROR.format(error)) + logging.error("ERROR: %s", error) state.retry_batch() else: state.conn.commit() @@ -351,13 +356,14 @@ def process(state): ) except Exception as error: state.conn.rollback() - logging.error(ERROR.format(error)) + logging.error("ERROR: %s", error) else: state.conn.commit() state.advance_to_next_batch(bot_log_id) def main(): + "The entrypoint to the program." load_dotenv() logging.basicConfig( From 4e1e21bb510e7f3b914d8284320c6dd9d96a7fea Mon Sep 17 00:00:00 2001 From: Sventimir Date: Wed, 28 Feb 2024 10:05:29 +0100 Subject: [PATCH 04/10] Rename columns in master_df in place. The old version gets thrown away anyway. --- uptime_service_validation/coordinator/coordinator.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index a086d01..b7383ff 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -245,7 +245,8 @@ def process(state): node_to_insert["updated_at"] = datetime.now(timezone.utc) createNodeRecord(state.conn, logging, node_to_insert, 100) - master_df = master_df.rename( + master_df.rename( + inplace=True, columns={ "file_updated": "file_timestamps", "submitter": "block_producer_key", From ff03d100e067f9975aec03fe6bcf59d798289a0c Mon Sep 17 00:00:00 2001 From: Sventimir Date: Thu, 29 Feb 2024 09:38:21 +0100 Subject: [PATCH 05/10] Extract batch timeframe into a separate object. --- .../coordinator/coordinator.py | 92 +++++++++---------- .../coordinator/helper.py | 50 +++++++--- 2 files changed, 77 insertions(+), 65 deletions(-) diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index b7383ff..c372964 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -13,7 +13,6 @@ import pandas as pd import psycopg2 from uptime_service_validation.coordinator.helper import ( - getTimeBatches, getBatchTimings, getPreviousStatehash, getRelationList, @@ -52,11 +51,8 @@ class State: state transitions. It should likely be split into smaller chunks, but at this stage it's sufficient.""" - def __init__(self, connection, bot_log_id, prev_batch_end, current_batch_end): - self.bot_log_id = bot_log_id - self.conn = connection - self.prev_batch_end = prev_batch_end - self.current_batch_end = current_batch_end + def __init__(self, batch): + self.batch = batch self.current_timestamp = datetime.now(timezone.utc) self.retrials_left = os.environ["RETRY_COUNT"] self.interval = int(os.environ["SURVEY_INTERVAL_MINUTES"]) @@ -65,9 +61,9 @@ def __init__(self, connection, bot_log_id, prev_batch_end, current_batch_end): def wait_until_batch_ends(self): "If the time window if the current batch is not yet over, sleep until it is." - if self.current_batch_end > self.current_timestamp: + if self.batch.end_time > self.current_timestamp: delta = timedelta(minutes=2) - sleep_interval = (self.current_batch_end - self.current_timestamp) + delta + sleep_interval = (self.batch.end_time - self.current_timestamp) + delta time_until = self.current_timestamp + sleep_interval logging.info( "All submissions are processed till date. " @@ -82,9 +78,7 @@ def advance_to_next_batch(self, next_bot_log_id): """Update the state so that it describes the next batch in line; transitioning the state to the next loop pass.""" self.retrials_left = os.environ["RETRY_COUNT"] - self.bot_log_id = next_bot_log_id - self.prev_batch_end = self.current_batch_end - self.current_batch_end = self.prev_batch_end + timedelta(minutes=self.interval) + self.batch = self.batch.next(next_bot_log_id) self.__warn_if_work_took_longer_then_expected() self.__next_loop() self.__update_timestamp() @@ -108,40 +102,36 @@ def __next_loop(self): logging.info("Processed it loop count : %s.", self.loop_count) def __warn_if_work_took_longer_then_expected(self): - if self.prev_batch_end >= self.current_timestamp: + if self.batch.start_time >= self.current_timestamp: logging.warning( "It seems that batch processing took a bit too long than \ expected as prev_batch_end: %s >= cur_timestamp: %s... \ progressing to the next batch anyway...", - self.prev_batch_end, + self.batch.start_time, self.current_timestamp, ) -def process(state): +def process(conn, state): """Perform a signle iteration of the coordinator loop, processing exactly one batch of submissions. Launch verifiers to process submissions, then compute scores and store them in the database.""" logging.info( "iteration start at: %s, cur_timestamp: %s", - state.prev_batch_end, + state.batch.start_time, state.current_timestamp ) - existing_state_df = getStatehashDF(state.conn, logging) - existing_nodes = getExistingNodes(state.conn, logging) + existing_state_df = getStatehashDF(conn, logging) + existing_nodes = getExistingNodes(conn, logging) logging.info( - "running for batch: %s - %s.", state.prev_batch_end, state.current_batch_end + "running for batch: %s - %s.", state.batch.start_time, state.batch.end_time ) # sleep until batch ends, update the state accordingly, then continue. state.wait_until_batch_ends() master_df = pd.DataFrame() # Step 2 Create time ranges: - time_intervals = getTimeBatches( - state.prev_batch_end, - state.current_batch_end, - int(os.environ["MINI_BATCH_NUMBER"]), - ) + time_intervals = list(state.batch.split(int(os.environ["MINI_BATCH_NUMBER"]))) # Step 3 Create Kubernetes ZKValidators and pass mini-batches. worker_image = os.environ["WORKER_IMAGE"] worker_tag = os.environ["WORKER_TAG"] @@ -155,8 +145,8 @@ def process(state): # Step 4 We need to read the ZKValidator results from a db. logging.info( "reading ZKValidator results from a db between the time range: %s - %s", - state.prev_batch_end, - state.current_batch_end + state.batch.start_time, + state.batch.end_time ) webhook_url = os.environ.get("WEBHOOK_URL") @@ -180,8 +170,8 @@ def process(state): try: cassandra.connect() submissions = cassandra.get_submissions( - submitted_at_start=state.prev_batch_end, - submitted_at_end=state.current_batch_end, + submitted_at_start=state.batch.start_time, + submitted_at_end=state.batch.end_time, start_inclusive=True, end_inclusive=False, ) @@ -234,7 +224,7 @@ def process(state): existing_state_df, pd.DataFrame(state_hash, columns=["statehash"]) ) if not state_hash_to_insert.empty: - createStatehash(state.conn, logging, state_hash_to_insert) + createStatehash(conn, logging, state_hash_to_insert) nodes_in_cur_batch = pd.DataFrame( master_df["submitter"].unique(), columns=["block_producer_key"] @@ -243,7 +233,7 @@ def process(state): if not node_to_insert.empty: node_to_insert["updated_at"] = datetime.now(timezone.utc) - createNodeRecord(state.conn, logging, node_to_insert, 100) + createNodeRecord(conn, logging, node_to_insert, 100) master_df.rename( inplace=True, @@ -254,7 +244,7 @@ def process(state): ) relation_df, p_selected_node_df = getPreviousStatehash( - state.conn, logging, state.bot_log_id + conn, logging, state.batch.bot_log_id ) p_map = getRelationList(relation_df) c_selected_node = filterStateHashPercentage(master_df) @@ -299,29 +289,29 @@ def process(state): if not point_record_df.empty: file_timestamp = master_df.iloc[-1]["file_timestamps"] else: - file_timestamp = state.current_batch_end + file_timestamp = state.batch.end_time logging.info( "empty point record for start epoch %s end epoch %s", - state.prev_batch_end.timestamp(), - state.current_batch_end.timestamp(), + state.batch.start_time.timestamp(), + state.batch.end_time.timestamp(), ) values = ( all_files_count, file_timestamp, - state.prev_batch_end.timestamp(), - state.current_batch_end.timestamp(), + state.batch.start_time.timestamp(), + state.batch.end_time.timestamp(), end - start, ) - bot_log_id = createBotLog(state.conn, logging, values) + bot_log_id = createBotLog(conn, logging, values) - shortlisted_state_hash_df["bot_log_id"] = state.bot_log_id - insertStatehashResults(state.conn, logging, shortlisted_state_hash_df) + shortlisted_state_hash_df["bot_log_id"] = bot_log_id + insertStatehashResults(conn, logging, shortlisted_state_hash_df) if not point_record_df.empty: point_record_df.loc[:, "amount"] = 1 point_record_df.loc[:, "created_at"] = datetime.now(timezone.utc) - point_record_df.loc[:, "bot_log_id"] = state.bot_log_id + point_record_df.loc[:, "bot_log_id"] = bot_log_id point_record_df = point_record_df[ [ "file_name", @@ -336,30 +326,30 @@ def process(state): ] ] - createPointRecord(state.conn, logging, point_record_df) + createPointRecord(conn, logging, point_record_df) except Exception as error: - state.conn.rollback() + conn.rollback() logging.error("ERROR: %s", error) state.retry_batch() else: - state.conn.commit() + conn.commit() else: # new bot log id hasn't been created, so proceed with the old one - bot_log_id = state.bot_log_id + bot_log_id = state.batch.bot_log_id logging.info("Finished processing data from table.") try: updateScoreboard( - state.conn, + conn, logging, - state.current_batch_end, + state.batch.end_time, int(os.environ["UPTIME_DAYS_FOR_SCORE"]), ) except Exception as error: - state.conn.rollback() + conn.rollback() logging.error("ERROR: %s", error) else: - state.conn.commit() + conn.commit() state.advance_to_next_batch(bot_log_id) @@ -383,12 +373,12 @@ def main(): # Step 1 Get previous record and build relations list interval = int(os.environ["SURVEY_INTERVAL_MINUTES"]) - prev_batch_end, cur_batch_end, bot_log_id = getBatchTimings( - connection, logging, interval + batch = getBatchTimings( + connection, logging, timedelta(minutes=interval) ) - state = State(connection, bot_log_id, prev_batch_end, cur_batch_end) + state = State(batch) while not state.stop: - process(state) + process(connection, state) if __name__ == "__main__": diff --git a/uptime_service_validation/coordinator/helper.py b/uptime_service_validation/coordinator/helper.py index d6555fb..4382cb0 100644 --- a/uptime_service_validation/coordinator/helper.py +++ b/uptime_service_validation/coordinator/helper.py @@ -1,21 +1,38 @@ -import psycopg2 -import psycopg2.extras as extras -import matplotlib.pyplot as plt +from dataclasses import dataclass from datetime import datetime, timedelta, timezone -import pandas as pd +import matplotlib.pyplot as plt import networkx as nx +import pandas as pd +import psycopg2 +from psycopg2 import extras import requests + ERROR = "Error: {0}" +@dataclass +class Batch: + """Represents the timeframe of the current batch and the database + identifier of the previous batch for reference.""" + start_time: datetime + end_time: datetime + bot_log_id: int + interval: timedelta + + def next(self, bot_log_id): + "Return an object representing the next batch." + return self.__class__( + start_time=self.end_time, + end_time=self.end_time + self.interval, + interval=self.interval, + bot_log_id =- bot_log_id + ) -def getTimeBatches(start_time: datetime, end_time: datetime, range_number: int): - diff = (end_time - start_time) / range_number - print(diff) - time_intervals = [] - for i in range(range_number): - time_intervals.append((start_time + diff * i, start_time + diff * (i + 1))) - return time_intervals + def split(self, parts_number): + "Splits the batch time window into equal parts for parallel proecessing." + diff = (self.end_time - self.start_time) / parts_number + return ((self.start_time + diff * i, self.start_time + diff * (i + 1)) \ + for i in range(parts_number)) def getBatchTimings(conn, logger, interval): @@ -29,13 +46,18 @@ def getBatchTimings(conn, logger, interval): prev_epoch = result[1] prev_batch_end = datetime.fromtimestamp(prev_epoch, timezone.utc) - cur_batch_end = prev_batch_end + timedelta(minutes=interval) + cur_batch_end = prev_batch_end + interval except (Exception, psycopg2.DatabaseError) as error: logger.error(ERROR.format(error)) - return -1 + raise RuntimeError("Could not load the latest batch.") finally: cursor.close() - return prev_batch_end, cur_batch_end, bot_log_id + return Batch( + start_time = prev_batch_end, + end_time = cur_batch_end, + bot_log_id = bot_log_id, + interval = interval + ) def getPreviousStatehash(conn, logger, bot_log_id): From 01d694336e0954c4bffadf24e86c369d8383c1ad Mon Sep 17 00:00:00 2001 From: Sventimir Date: Thu, 29 Feb 2024 09:39:06 +0100 Subject: [PATCH 06/10] Make test-environment log an info rather than warning. --- uptime_service_validation/coordinator/coordinator.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index c372964..1528a2c 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -137,7 +137,7 @@ def process(conn, state): worker_tag = os.environ["WORKER_TAG"] start = time() if bool_env_var_set("TEST_ENV"): - logging.warning("running in test environment") + logging.info("running in test environment") setUpValidatorProcesses(time_intervals, logging, worker_image, worker_tag) else: setUpValidatorPods(time_intervals, logging, worker_image, worker_tag) From 2164d94c4ce7bfc101e413e6662202dd4f9f5acb Mon Sep 17 00:00:00 2001 From: Sventimir Date: Thu, 29 Feb 2024 14:11:24 +0100 Subject: [PATCH 07/10] Extract database-interacting code into a separate class. Make the connection state internal to the class and expose high-level queries and update methods. This also allows us to substitute a mock for this object in the future. --- .../coordinator/coordinator.py | 58 +-- .../coordinator/helper.py | 463 +++++++++--------- 2 files changed, 259 insertions(+), 262 deletions(-) diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index 1528a2c..c3f503a 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -13,22 +13,13 @@ import pandas as pd import psycopg2 from uptime_service_validation.coordinator.helper import ( - getBatchTimings, - getPreviousStatehash, - getRelationList, - getStatehashDF, + DB, + getRelations, findNewValuesToInsert, - createStatehash, - createNodeRecord, filterStateHashPercentage, createGraph, applyWeights, bfs, - createBotLog, - insertStatehashResults, - createPointRecord, - updateScoreboard, - getExistingNodes, sendSlackMessage ) from uptime_service_validation.coordinator.server import ( @@ -112,7 +103,7 @@ def __warn_if_work_took_longer_then_expected(self): ) -def process(conn, state): +def process(db, state): """Perform a signle iteration of the coordinator loop, processing exactly one batch of submissions. Launch verifiers to process submissions, then compute scores and store them in the database.""" @@ -121,8 +112,8 @@ def process(conn, state): state.batch.start_time, state.current_timestamp ) - existing_state_df = getStatehashDF(conn, logging) - existing_nodes = getExistingNodes(conn, logging) + existing_state_df = db.get_statehash_df() + existing_nodes = db.get_existing_nodes() logging.info( "running for batch: %s - %s.", state.batch.start_time, state.batch.end_time ) @@ -224,7 +215,7 @@ def process(conn, state): existing_state_df, pd.DataFrame(state_hash, columns=["statehash"]) ) if not state_hash_to_insert.empty: - createStatehash(conn, logging, state_hash_to_insert) + db.create_statehash(state_hash_to_insert) nodes_in_cur_batch = pd.DataFrame( master_df["submitter"].unique(), columns=["block_producer_key"] @@ -233,7 +224,7 @@ def process(conn, state): if not node_to_insert.empty: node_to_insert["updated_at"] = datetime.now(timezone.utc) - createNodeRecord(conn, logging, node_to_insert, 100) + db.create_node_record(node_to_insert, 100) master_df.rename( inplace=True, @@ -243,10 +234,8 @@ def process(conn, state): } ) - relation_df, p_selected_node_df = getPreviousStatehash( - conn, logging, state.batch.bot_log_id - ) - p_map = getRelationList(relation_df) + relation_df, p_selected_node_df = db.get_previous_statehash(state.batch.bot_log_id) + p_map = list(getRelations(relation_df)) c_selected_node = filterStateHashPercentage(master_df) batch_graph = createGraph(master_df, p_selected_node_df, c_selected_node, p_map) weighted_graph = applyWeights( @@ -282,9 +271,7 @@ def process(conn, state): parent_hash.append(p_hash) shortlisted_state_hash_df["parent_state_hash"] = parent_hash - p_map = getRelationList( - shortlisted_state_hash_df[["parent_state_hash", "state_hash"]] - ) + p_map = list(getRelations(shortlisted_state_hash_df[["parent_state_hash", "state_hash"]])) try: if not point_record_df.empty: file_timestamp = master_df.iloc[-1]["file_timestamps"] @@ -303,10 +290,10 @@ def process(conn, state): state.batch.end_time.timestamp(), end - start, ) - bot_log_id = createBotLog(conn, logging, values) + bot_log_id = db.create_bot_log(values) shortlisted_state_hash_df["bot_log_id"] = bot_log_id - insertStatehashResults(conn, logging, shortlisted_state_hash_df) + db.insert_statehash_results(shortlisted_state_hash_df) if not point_record_df.empty: point_record_df.loc[:, "amount"] = 1 @@ -326,30 +313,28 @@ def process(conn, state): ] ] - createPointRecord(conn, logging, point_record_df) + db.create_point_record(point_record_df) except Exception as error: - conn.rollback() + db.connection.rollback() logging.error("ERROR: %s", error) state.retry_batch() else: - conn.commit() + db.connection.commit() else: # new bot log id hasn't been created, so proceed with the old one bot_log_id = state.batch.bot_log_id logging.info("Finished processing data from table.") try: - updateScoreboard( - conn, - logging, + db.update_scoreboard( state.batch.end_time, int(os.environ["UPTIME_DAYS_FOR_SCORE"]), ) except Exception as error: - conn.rollback() + db.connection.rollback() logging.error("ERROR: %s", error) else: - conn.commit() + db.connection.commit() state.advance_to_next_batch(bot_log_id) @@ -373,12 +358,11 @@ def main(): # Step 1 Get previous record and build relations list interval = int(os.environ["SURVEY_INTERVAL_MINUTES"]) - batch = getBatchTimings( - connection, logging, timedelta(minutes=interval) - ) + db = DB(connection, logging) + batch = db.get_batch_timings(timedelta(minutes=interval)) state = State(batch) while not state.stop: - process(connection, state) + process(db, state) if __name__ == "__main__": diff --git a/uptime_service_validation/coordinator/helper.py b/uptime_service_validation/coordinator/helper.py index 4382cb0..e5f1a04 100644 --- a/uptime_service_validation/coordinator/helper.py +++ b/uptime_service_validation/coordinator/helper.py @@ -34,73 +34,246 @@ def split(self, parts_number): return ((self.start_time + diff * i, self.start_time + diff * (i + 1)) \ for i in range(parts_number)) - -def getBatchTimings(conn, logger, interval): - try: - cursor = conn.cursor() - cursor.execute( - "SELECT id, batch_end_epoch FROM bot_logs ORDER BY batch_end_epoch DESC limit 1 " +class DB: + """A wrapper around the database connection, providing high-level methods + for querying and updating the database.""" + + def __init__(self, connection, logger): + self.connection = connection + self.logger = logger + + def get_batch_timings(self, interval): + "Get the time-frame for the next batch and previous batch's id." + try: + cursor = self.connection.cursor() + cursor.execute( + "SELECT id, batch_end_epoch FROM bot_logs ORDER BY batch_end_epoch DESC limit 1 " + ) + result = cursor.fetchone() + bot_log_id = result[0] + prev_epoch = result[1] + + prev_batch_end = datetime.fromtimestamp(prev_epoch, timezone.utc) + cur_batch_end = prev_batch_end + interval + except (Exception, psycopg2.DatabaseError) as error: + self.logger.error(ERROR.format(error)) + raise RuntimeError("Could not load the latest batch.") from error + finally: + cursor.close() + return Batch( + start_time = prev_batch_end, + end_time = cur_batch_end, + bot_log_id = bot_log_id, + interval = interval ) - result = cursor.fetchone() - bot_log_id = result[0] - prev_epoch = result[1] - - prev_batch_end = datetime.fromtimestamp(prev_epoch, timezone.utc) - cur_batch_end = prev_batch_end + interval - except (Exception, psycopg2.DatabaseError) as error: - logger.error(ERROR.format(error)) - raise RuntimeError("Could not load the latest batch.") - finally: - cursor.close() - return Batch( - start_time = prev_batch_end, - end_time = cur_batch_end, - bot_log_id = bot_log_id, - interval = interval - ) + def get_previous_statehash(self, bot_log_id): + "Get the statehash of the latest batch." + cursor = self.connection.cursor() + try: + sql_query = """select ps.value parent_statehash, s1.value statehash, b.weight + from bot_logs_statehash b join statehash s1 ON s1.id = b.statehash_id + join statehash ps on b.parent_statehash_id = ps.id where bot_log_id =%s""" + cursor.execute(sql_query, (bot_log_id,)) + result = cursor.fetchall() + + df = pd.DataFrame(result, columns=["parent_state_hash", "state_hash", "weight"]) + previous_result_df = df[["parent_state_hash", "state_hash"]] + p_selected_node_df = df[["state_hash", "weight"]] + except (Exception, psycopg2.DatabaseError) as error: + self.logger.error(ERROR.format(error)) + cursor.close() + return -1 + finally: + cursor.close() + return previous_result_df, p_selected_node_df + + def get_statehash_df(self): + "Get the list of all known statehashes as a data frame." + cursor = self.connection.cursor() + try: + cursor.execute("select value from statehash") + result = cursor.fetchall() + state_hash = pd.DataFrame(result, columns=["statehash"]) + except (Exception, psycopg2.DatabaseError) as error: + self.logger.error(ERROR.format(error)) + return -1 + finally: + cursor.close() + return state_hash + + def create_statehash(self, statehash_df, page_size=100): + "Add a new statehashto the database." + tuples = [tuple(x) for x in statehash_df.to_numpy()] + self.logger.info("create_statehash: %s", tuples) + query = """INSERT INTO statehash ( value) + VALUES ( %s) """ + cursor = self.connection.cursor() + try: + extras.execute_batch(cursor, query, tuples, page_size) + except (Exception, psycopg2.DatabaseError) as error: + self.logger.error(ERROR.format(error)) + cursor.close() + return -1 + finally: + cursor.close() + self.logger.info("create_statehash end ") + return 0 + + def create_node_record(self, df, page_size=100): + "Add new block producers to the database." + self.logger.info("create_point_record start ") + tuples = [tuple(x) for x in df.to_numpy()] + query = """INSERT INTO nodes ( block_producer_key, updated_at) + VALUES ( %s, %s ) """ + cursor = self.connection.cursor() + try: + extras.execute_batch(cursor, query, tuples, page_size) + except (Exception, psycopg2.DatabaseError) as error: + logger.error(ERROR.format(error)) + cursor.close() + return 1 + finally: + cursor.close() + self.logger.info("create_point_record end ") + return 0 + + def create_bot_log(self, values): + "Add a new batch to the database." + self.logger.info("create_bot_log start ") + query = """INSERT INTO bot_logs(files_processed, file_timestamps, batch_start_epoch, batch_end_epoch, + processing_time) values ( %s, %s, %s, %s, %s) RETURNING id """ + try: + cursor = self.connection.cursor() + cursor.execute(query, values) + result = cursor.fetchone() + except (Exception, psycopg2.DatabaseError) as error: + self.logger.error(ERROR.format(error)) + cursor.close() + return -1 + finally: + cursor.close() + self.logger.info("create_bot_log end ") + return result[0] + + + def insert_statehash_results(self, df, page_size=100): + "Relate statehashes to the batches they were observed in." + self.logger.info("create_botlogs_statehash start ") + temp_df = df[["parent_state_hash", "state_hash", "weight", "bot_log_id"]] + tuples = [tuple(x) for x in temp_df.to_numpy()] + query = """INSERT INTO bot_logs_statehash(parent_statehash_id, statehash_id, weight, bot_log_id ) + VALUES ( + (SELECT id FROM statehash WHERE value= %s), + (SELECT id FROM statehash WHERE value= %s), + %s, + %s ) """ + cursor = self.connection.cursor() + + try: + extras.execute_batch(cursor, query, tuples, page_size) + except (Exception, psycopg2.DatabaseError) as error: + self.logger.error(ERROR.format(error)) + cursor.close() + return 1 + finally: + cursor.close() + self.logger.info("create_botlogs_statehash end ") + return 0 + + + def create_point_record(self, df, page_size=100): + "Add a new scoring submission to the database." + self.logger.info("create_point_record start ") + tuples = [tuple(x) for x in df.to_numpy()] + query = """INSERT INTO points + (file_name, file_timestamps, blockchain_epoch, node_id, + blockchain_height, amount, created_at, bot_log_id, statehash_id) + VALUES ( %s, %s, %s, (SELECT id FROM nodes WHERE block_producer_key= %s), + %s, %s, %s, %s, (SELECT id FROM statehash WHERE value= %s) )""" + try: + cursor = self.connection.cursor() + extras.execute_batch(cursor, query, tuples, page_size) + except (Exception, psycopg2.DatabaseError) as error: + self.logger.error(ERROR.format(error)) + cursor.close() + return 1 + finally: + cursor.close() + self.logger.info("create_point_record end ") + return 0 + + def update_scoreboard(self, score_till_time, uptime_days=30): + "Update the block producer scores." + self.logger.info("updateScoreboard start ") + sql = """with vars (snapshot_date, start_date) as( values (%s AT TIME ZONE 'UTC', + (%s - interval '%s' day) AT TIME ZONE 'UTC') + ) + , epochs as( + select extract('epoch' from snapshot_date) as end_epoch, + extract('epoch' from start_date) as start_epoch from vars + ) + , b_logs as( + select (count(1) ) as surveys + from bot_logs b , epochs e + where b.batch_start_epoch >= start_epoch and b.batch_end_epoch <= end_epoch + ) + , scores as ( + select p.node_id, count(p.bot_log_id) bp_points + from points p join bot_logs b on p.bot_log_id =b.id, epochs + where b.batch_start_epoch >= start_epoch and b.batch_end_epoch <= end_epoch + group by 1 + ) + , final_scores as ( + select node_id, bp_points, + surveys, trunc( ((bp_points::decimal*100) / surveys),2) as score_perc + from scores l join nodes n on l.node_id=n.id, b_logs t + ) + update nodes nrt set score = s.bp_points, score_percent=s.score_perc + from final_scores s where nrt.id=s.node_id """ + + history_sql = """insert into score_history (node_id, score_at, score, score_percent) + SELECT id as node_id, %s, score, score_percent from nodes where score is not null """ + try: + cursor = self.connection.cursor() + cursor.execute( + sql, + ( + score_till_time, + score_till_time, + uptime_days, + ), + ) + cursor.execute(history_sql, (score_till_time,)) + except (Exception, psycopg2.DatabaseError) as error: + self.logger.error(ERROR.format(error)) + cursor.close() + return -1 + finally: + cursor.close() + self.logger.info("updateScoreboard end ") + return 0 + + def get_existing_nodes(self): + "Get the list of all known block producers." + cursor = self.connection.cursor() + try: + cursor.execute("select block_producer_key from nodes") + result = cursor.fetchall() + nodes = pd.DataFrame(result, columns=["block_producer_key"]) + except (Exception, psycopg2.DatabaseError) as error: + self.logger.error(ERROR.format(error)) + return -1 + finally: + cursor.close() + return nodes + + +def getRelations(df): + return ((parent, child) for child, parent \ + in df[["state_hash", "parent_state_hash"]].values \ + if parent in df["state_hash"].values) -def getPreviousStatehash(conn, logger, bot_log_id): - cursor = conn.cursor() - try: - sql_query = """select ps.value parent_statehash, s1.value statehash, b.weight - from bot_logs_statehash b join statehash s1 ON s1.id = b.statehash_id - join statehash ps on b.parent_statehash_id = ps.id where bot_log_id =%s""" - cursor.execute(sql_query, (bot_log_id,)) - result = cursor.fetchall() - - df = pd.DataFrame(result, columns=["parent_state_hash", "state_hash", "weight"]) - previous_result_df = df[["parent_state_hash", "state_hash"]] - p_selected_node_df = df[["state_hash", "weight"]] - except (Exception, psycopg2.DatabaseError) as error: - logger.error(ERROR.format(error)) - cursor.close() - return -1 - finally: - cursor.close() - return previous_result_df, p_selected_node_df - - -def getRelationList(df): - relation_list = [] - for child, parent in df[["state_hash", "parent_state_hash"]].values: - if parent in df["state_hash"].values: - relation_list.append((parent, child)) - return relation_list - - -def getStatehashDF(conn, logger): - cursor = conn.cursor() - try: - cursor.execute("select value from statehash") - result = cursor.fetchall() - state_hash = pd.DataFrame(result, columns=["statehash"]) - except (Exception, psycopg2.DatabaseError) as error: - logger.error(ERROR.format(error)) - return -1 - finally: - cursor.close() - return state_hash def findNewValuesToInsert(existing_values, new_values): @@ -112,43 +285,6 @@ def findNewValuesToInsert(existing_values, new_values): ) -def createStatehash(conn, logger, statehash_df, page_size=100): - tuples = [tuple(x) for x in statehash_df.to_numpy()] - logger.info("create_statehash: {0}".format(tuples)) - query = """INSERT INTO statehash ( value) - VALUES ( %s) """ - cursor = conn.cursor() - try: - cursor = conn.cursor() - extras.execute_batch(cursor, query, tuples, page_size) - except (Exception, psycopg2.DatabaseError) as error: - logger.error(ERROR.format(error)) - cursor.close() - return -1 - finally: - cursor.close() - logger.info("create_statehash end ") - return 0 - - -def createNodeRecord(conn, logger, df, page_size=100): - logger.info("create_point_record start ") - tuples = [tuple(x) for x in df.to_numpy()] - query = """INSERT INTO nodes ( block_producer_key, updated_at) - VALUES ( %s, %s ) """ - cursor = conn.cursor() - try: - extras.execute_batch(cursor, query, tuples, page_size) - except (Exception, psycopg2.DatabaseError) as error: - logger.error(ERROR.format(error)) - cursor.close() - return 1 - finally: - cursor.close() - logger.info("create_point_record end ") - return 0 - - def filterStateHashPercentage(df, p=0.34): state_hash_list = ( df["state_hash"].value_counts().sort_values(ascending=False).index.to_list() @@ -273,132 +409,9 @@ def bfs(graph, queue_list, node, max_depth=2): return shortlisted_state_hash_df -def createBotLog(conn, logger, values): - logger.info("create_bot_log start ") - query = """INSERT INTO bot_logs(files_processed, file_timestamps, batch_start_epoch, batch_end_epoch, - processing_time) values ( %s, %s, %s, %s, %s) RETURNING id """ - try: - cursor = conn.cursor() - cursor.execute(query, values) - result = cursor.fetchone() - except (Exception, psycopg2.DatabaseError) as error: - logger.error(ERROR.format(error)) - cursor.close() - return -1 - finally: - cursor.close() - logger.info("create_bot_log end ") - return result[0] - - -def insertStatehashResults(conn, logger, df, page_size=100): - logger.info("create_botlogs_statehash start ") - temp_df = df[["parent_state_hash", "state_hash", "weight", "bot_log_id"]] - tuples = [tuple(x) for x in temp_df.to_numpy()] - query = """INSERT INTO bot_logs_statehash(parent_statehash_id, statehash_id, weight, bot_log_id ) - VALUES ( (SELECT id FROM statehash WHERE value= %s), (SELECT id FROM statehash WHERE value= %s), %s, %s ) """ - cursor = conn.cursor() - - try: - extras.execute_batch(cursor, query, tuples, page_size) - except (Exception, psycopg2.DatabaseError) as error: - logger.error(ERROR.format(error)) - cursor.close() - return 1 - finally: - cursor.close() - logger.info("create_botlogs_statehash end ") - return 0 - - -def createPointRecord(conn, logger, df, page_size=100): - logger.info("create_point_record start ") - tuples = [tuple(x) for x in df.to_numpy()] - query = """INSERT INTO points (file_name, file_timestamps, blockchain_epoch, node_id, blockchain_height, - amount, created_at, bot_log_id, statehash_id) - VALUES ( %s, %s, %s, (SELECT id FROM nodes WHERE block_producer_key= %s), %s, %s, - %s, %s, (SELECT id FROM statehash WHERE value= %s) )""" - try: - cursor = conn.cursor() - extras.execute_batch(cursor, query, tuples, page_size) - except (Exception, psycopg2.DatabaseError) as error: - logger.error(ERROR.format(error)) - cursor.close() - return 1 - finally: - cursor.close() - logger.info("create_point_record end ") - return 0 - - -def updateScoreboard(conn, logger, score_till_time, uptime_days=30): - logger.info("updateScoreboard start ") - sql = """with vars (snapshot_date, start_date) as( values (%s AT TIME ZONE 'UTC', - (%s - interval '%s' day) AT TIME ZONE 'UTC') - ) - , epochs as( - select extract('epoch' from snapshot_date) as end_epoch, - extract('epoch' from start_date) as start_epoch from vars - ) - , b_logs as( - select (count(1) ) as surveys - from bot_logs b , epochs e - where b.batch_start_epoch >= start_epoch and b.batch_end_epoch <= end_epoch - ) - , scores as ( - select p.node_id, count(p.bot_log_id) bp_points - from points p join bot_logs b on p.bot_log_id =b.id, epochs - where b.batch_start_epoch >= start_epoch and b.batch_end_epoch <= end_epoch - group by 1 - ) - , final_scores as ( - select node_id, bp_points, - surveys, trunc( ((bp_points::decimal*100) / surveys),2) as score_perc - from scores l join nodes n on l.node_id=n.id, b_logs t - ) - update nodes nrt set score = s.bp_points, score_percent=s.score_perc - from final_scores s where nrt.id=s.node_id """ - - history_sql = """insert into score_history (node_id, score_at, score, score_percent) - SELECT id as node_id, %s, score, score_percent from nodes where score is not null """ - try: - cursor = conn.cursor() - cursor.execute( - sql, - ( - score_till_time, - score_till_time, - uptime_days, - ), - ) - cursor.execute(history_sql, (score_till_time,)) - except (Exception, psycopg2.DatabaseError) as error: - logger.error(ERROR.format(error)) - cursor.close() - return -1 - finally: - cursor.close() - logger.info("updateScoreboard end ") - return 0 - - -def getExistingNodes(conn, logger): - cursor = conn.cursor() - try: - cursor.execute("select block_producer_key from nodes") - result = cursor.fetchall() - nodes = pd.DataFrame(result, columns=["block_producer_key"]) - except (Exception, psycopg2.DatabaseError) as error: - logger.error(ERROR.format(error)) - cursor.close() - return -1 - finally: - cursor.close() - return nodes - def sendSlackMessage(url, message, logger): payload='{"text": "%s" }' % message response = requests.post(url, data=payload) logger.info(response) - + From 8a23e90849b47321901835f699bda3d086fc0a52 Mon Sep 17 00:00:00 2001 From: Sventimir Date: Thu, 29 Feb 2024 14:13:48 +0100 Subject: [PATCH 08/10] Add a docstring for the helper module. --- uptime_service_validation/coordinator/helper.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/uptime_service_validation/coordinator/helper.py b/uptime_service_validation/coordinator/helper.py index e5f1a04..ee6f489 100644 --- a/uptime_service_validation/coordinator/helper.py +++ b/uptime_service_validation/coordinator/helper.py @@ -1,3 +1,5 @@ +"""This module contains various helper functions and classes for the +coordinator.""" from dataclasses import dataclass from datetime import datetime, timedelta, timezone import matplotlib.pyplot as plt @@ -414,4 +416,3 @@ def sendSlackMessage(url, message, logger): payload='{"text": "%s" }' % message response = requests.post(url, data=payload) logger.info(response) - From e047e60b5193ea6bcba5881167611fb266996121 Mon Sep 17 00:00:00 2001 From: Sventimir Date: Thu, 29 Feb 2024 16:44:11 +0100 Subject: [PATCH 09/10] Rename other functions in helper.py as well. --- tests/test_helper.py | 251 +++++++++++------- .../coordinator/coordinator.py | 61 +++-- .../coordinator/helper.py | 70 ++--- 3 files changed, 224 insertions(+), 158 deletions(-) diff --git a/tests/test_helper.py b/tests/test_helper.py index 9756260..697cb8e 100644 --- a/tests/test_helper.py +++ b/tests/test_helper.py @@ -2,9 +2,11 @@ from uptime_service_validation.coordinator.aws_keyspaces_client import Submission from uptime_service_validation.coordinator.helper import (getTimeBatches) import pandas as pd -from uptime_service_validation.coordinator.helper import (filterStateHashPercentage, createGraph, applyWeights, bfs) +from uptime_service_validation.coordinator.helper import ( + filter_state_hash_percentage, create_graph, apply_weights, bfs) import calendar + def test_get_time_batches(): a = datetime(2023, 11, 6, 15, 35, 47, 630499) b = a + timedelta(minutes=5) @@ -46,11 +48,12 @@ def test_get_time_batches(): ) assert result[9] == (datetime(2023, 11, 6, 15, 40, 17, 630499), b) + def test_array_dataframe(): submissions = [ Submission( "submitted_at_date_1", - "submitted_at_1" , + "submitted_at_1", "submitter_1", "created_at_1", "block_hash_1", @@ -59,14 +62,14 @@ def test_array_dataframe(): "snark_work_1", "graphql_control_port_1", "built_with_commit_sha_1", - "state_hash_1", + "state_hash_1", "parent_1", "height_1", "slot_1", "validation_error_1"), Submission( "submitted_at_date_2", - "submitted_at_2" , + "submitted_at_2", "submitter_2", "created_at_2", "block_hash_2", @@ -75,7 +78,7 @@ def test_array_dataframe(): "snark_work_2", "graphql_control_port_2", "built_with_commit_sha_2", - "state_hash_2", + "state_hash_2", "parent_2", "height_2", "slot_2", @@ -83,34 +86,51 @@ def test_array_dataframe(): ] state_hash_df = pd.DataFrame(submissions) - pd.testing.assert_frame_equal(state_hash_df[["submitted_at_date"]], pd.DataFrame(["submitted_at_date_1", "submitted_at_date_2"], columns=["submitted_at_date"])) - pd.testing.assert_frame_equal(state_hash_df[["submitted_at"]], pd.DataFrame(["submitted_at_1", "submitted_at_2"], columns=["submitted_at"])) - pd.testing.assert_frame_equal(state_hash_df[["submitter"]], pd.DataFrame(["submitter_1", "submitter_2"], columns=["submitter"])) - pd.testing.assert_frame_equal(state_hash_df[["created_at"]], pd.DataFrame(["created_at_1", "created_at_2"], columns=["created_at"])) - pd.testing.assert_frame_equal(state_hash_df[["block_hash"]], pd.DataFrame(["block_hash_1", "block_hash_2"], columns=["block_hash"])) - pd.testing.assert_frame_equal(state_hash_df[["remote_addr"]], pd.DataFrame(["remote_addr_1", "remote_addr_2"], columns=["remote_addr"])) - pd.testing.assert_frame_equal(state_hash_df[["peer_id"]], pd.DataFrame(["peer_id_1", "peer_id_2"], columns=["peer_id"])) - pd.testing.assert_frame_equal(state_hash_df[["snark_work"]], pd.DataFrame(["snark_work_1", "snark_work_2"], columns=["snark_work"])) - pd.testing.assert_frame_equal(state_hash_df[["graphql_control_port"]], pd.DataFrame(["graphql_control_port_1", "graphql_control_port_2"], columns=["graphql_control_port"])) - pd.testing.assert_frame_equal(state_hash_df[["built_with_commit_sha"]], pd.DataFrame(["built_with_commit_sha_1", "built_with_commit_sha_2"], columns=["built_with_commit_sha"])) - pd.testing.assert_frame_equal(state_hash_df[["state_hash"]], pd.DataFrame(["state_hash_1", "state_hash_2"], columns=["state_hash"])) - pd.testing.assert_frame_equal(state_hash_df[["parent"]], pd.DataFrame(["parent_1", "parent_2"], columns=["parent"])) - pd.testing.assert_frame_equal(state_hash_df[["height"]], pd.DataFrame(["height_1", "height_2"], columns=["height"])) - pd.testing.assert_frame_equal(state_hash_df[["slot"]], pd.DataFrame(["slot_1", "slot_2"], columns=["slot"])) - pd.testing.assert_frame_equal(state_hash_df[["validation_error"]], pd.DataFrame(["validation_error_1", "validation_error_2"], columns=["validation_error"])) + pd.testing.assert_frame_equal(state_hash_df[["submitted_at_date"]], pd.DataFrame( + ["submitted_at_date_1", "submitted_at_date_2"], columns=["submitted_at_date"])) + pd.testing.assert_frame_equal(state_hash_df[["submitted_at"]], pd.DataFrame( + ["submitted_at_1", "submitted_at_2"], columns=["submitted_at"])) + pd.testing.assert_frame_equal(state_hash_df[["submitter"]], pd.DataFrame( + ["submitter_1", "submitter_2"], columns=["submitter"])) + pd.testing.assert_frame_equal(state_hash_df[["created_at"]], pd.DataFrame( + ["created_at_1", "created_at_2"], columns=["created_at"])) + pd.testing.assert_frame_equal(state_hash_df[["block_hash"]], pd.DataFrame( + ["block_hash_1", "block_hash_2"], columns=["block_hash"])) + pd.testing.assert_frame_equal(state_hash_df[["remote_addr"]], pd.DataFrame( + ["remote_addr_1", "remote_addr_2"], columns=["remote_addr"])) + pd.testing.assert_frame_equal(state_hash_df[["peer_id"]], pd.DataFrame( + ["peer_id_1", "peer_id_2"], columns=["peer_id"])) + pd.testing.assert_frame_equal(state_hash_df[["snark_work"]], pd.DataFrame( + ["snark_work_1", "snark_work_2"], columns=["snark_work"])) + pd.testing.assert_frame_equal(state_hash_df[["graphql_control_port"]], pd.DataFrame( + ["graphql_control_port_1", "graphql_control_port_2"], columns=["graphql_control_port"])) + pd.testing.assert_frame_equal(state_hash_df[["built_with_commit_sha"]], pd.DataFrame( + ["built_with_commit_sha_1", "built_with_commit_sha_2"], columns=["built_with_commit_sha"])) + pd.testing.assert_frame_equal(state_hash_df[["state_hash"]], pd.DataFrame( + ["state_hash_1", "state_hash_2"], columns=["state_hash"])) + pd.testing.assert_frame_equal(state_hash_df[["parent"]], pd.DataFrame( + ["parent_1", "parent_2"], columns=["parent"])) + pd.testing.assert_frame_equal(state_hash_df[["height"]], pd.DataFrame( + ["height_1", "height_2"], columns=["height"])) + pd.testing.assert_frame_equal(state_hash_df[["slot"]], pd.DataFrame([ + "slot_1", "slot_2"], columns=["slot"])) + pd.testing.assert_frame_equal(state_hash_df[["validation_error"]], pd.DataFrame( + ["validation_error_1", "validation_error_2"], columns=["validation_error"])) + def test_filter_state_hash_single(): - master_state_hash = pd.DataFrame([['state_hash_1', 'block_producer_key_1']], - columns=['state_hash', 'block_producer_key']) - output = filterStateHashPercentage(master_state_hash) + master_state_hash = pd.DataFrame([['state_hash_1', 'block_producer_key_1']], + columns=['state_hash', 'block_producer_key']) + output = filter_state_hash_percentage(master_state_hash) assert output == ['state_hash_1'] + def test_filter_state_hash_multi(): - master_state_hash = pd.DataFrame([['state_hash_1', 'block_producer_key_1'], - ['state_hash_1', 'block_producer_key_2'], - ['state_hash_2', 'block_producer_key_3']], - columns=['state_hash', 'block_producer_key']) - output = filterStateHashPercentage(master_state_hash) + master_state_hash = pd.DataFrame([['state_hash_1', 'block_producer_key_1'], + ['state_hash_1', 'block_producer_key_2'], + ['state_hash_2', 'block_producer_key_3']], + columns=['state_hash', 'block_producer_key']) + output = filter_state_hash_percentage(master_state_hash) assert output == ['state_hash_1'] # The create_graph function creates a graph and adds all the state_hashes that appear in the batch as nodes, as well as the hashes from the previous batch. @@ -121,38 +141,43 @@ def test_filter_state_hash_multi(): # --c_selected_node: these are the hashes from the current batch above 34% threshold # --p_map: this lists the parent-child relationships in the previous batch. + def test_create_graph_count_number_of_nodes_and_edges(): # current batch that was downloaded - batch_df = pd.DataFrame([['state_hash_1', 'parent_state_hash_1'], ['state_hash_2', 'parent_state_hash_2']], columns=['state_hash', 'parent_state_hash']) + batch_df = pd.DataFrame([['state_hash_1', 'parent_state_hash_1'], [ + 'state_hash_2', 'parent_state_hash_2']], columns=['state_hash', 'parent_state_hash']) # previous_state_hashes with weight - p_selected_node_df = pd.DataFrame(['parent_state_hash_1'], columns=['state_hash']) - # filtered_state_hashes - c_selected_node = ['state_hash_1', 'state_hash_2'] + p_selected_node_df = pd.DataFrame( + ['parent_state_hash_1'], columns=['state_hash']) + # filtered_state_hashes + c_selected_node = ['state_hash_1', 'state_hash_2'] # relations between parents and children, i.e. those previous stte hashes that are parents in this batch. p_map = [['parent_state_hash_1', 'state_hash_1']] - output = createGraph(batch_df, p_selected_node_df, c_selected_node, p_map) + output = create_graph(batch_df, p_selected_node_df, c_selected_node, p_map) # total number of nodes is always those in the current batch + those from previous assert len(output.nodes) == len(batch_df) + len(p_selected_node_df) # there are no nodes in the current batch that are also parents of later nodes in the batch (see next test) assert len(output.edges) == len(p_map) + def test_create_graph_count_number_of_nodes_and_edges_nested(): # current batch that was downloaded batch_df = pd.DataFrame([ - ['state_hash_1', 'parent_state_hash_1'], - ['state_hash_2', 'state_hash_1'], - ['state_hash_3', 'state_hash_2']], + ['state_hash_1', 'parent_state_hash_1'], + ['state_hash_2', 'state_hash_1'], + ['state_hash_3', 'state_hash_2']], columns=['state_hash', 'parent_state_hash']) # previous_state_hashes with weight - p_selected_node_df = pd.DataFrame([['parent_state_hash_1'] ,['parent_state_hash_2']], columns=['state_hash']) - # filtered_state_hashes - c_selected_node = ['state_hash_1', 'state_hash_2'] + p_selected_node_df = pd.DataFrame( + [['parent_state_hash_1'], ['parent_state_hash_2']], columns=['state_hash']) + # filtered_state_hashes + c_selected_node = ['state_hash_1', 'state_hash_2'] # relations between parents and children, i.e. those previous stte hashes that are parents in this batch. p_map = [['parent_state_hash_2', 'parent_state_hash_1']] - output = createGraph(batch_df, p_selected_node_df, c_selected_node, p_map) + output = create_graph(batch_df, p_selected_node_df, c_selected_node, p_map) # total number of nodes is the same assert len(output.nodes) == len(batch_df) + len(p_selected_node_df) - # total number of edges is the parent-child relations in p_map, but plus also the parent-child relationships in the batch (i.e. 2) and between the two batches (i.e. 1). + # total number of edges is the parent-child relations in p_map, but plus also the parent-child relationships in the batch (i.e. 2) and between the two batches (i.e. 1). assert len(output.edges) == len(p_map) + 3 # The apply_weights function sets the weights to 0 for any node above the 34% threshold and if a parent_hash to the weight computed form last time. @@ -160,38 +185,46 @@ def test_create_graph_count_number_of_nodes_and_edges_nested(): # --batch_df: state-hashes of current batch. # --p_selected_node_df: these are all the (short-listed) state-hashes from the previous batch (as well as their weights). # --c_selected_node: these are the hashes from the current batch above 34% threshold + + def test_apply_weights_sum_weights_empty_parents_and_empty_selected_node(): - batch_df = pd.DataFrame([['state_hash_1', 'parent_state_hash_1'], - ['state_hash_2', 'state_hash_1'], - ['state_hash_3', 'state_hash_2']], - columns=['state_hash', 'parent_state_hash']) + batch_df = pd.DataFrame([['state_hash_1', 'parent_state_hash_1'], + ['state_hash_2', 'state_hash_1'], + ['state_hash_3', 'state_hash_2']], + columns=['state_hash', 'parent_state_hash']) p_selected_node_df = pd.DataFrame([['parent_state_hash_1', 123], - ['parent_state_hash_2', 345]], - columns=['state_hash', 'weight']) - c_selected_node = ['state_hash_1', 'state_hash_2'] + ['parent_state_hash_2', 345]], + columns=['state_hash', 'weight']) + c_selected_node = ['state_hash_1', 'state_hash_2'] p_map = [['parent_state_hash_2', 'parent_state_hash_1']] - batch_graph = createGraph(batch_df, p_selected_node_df, c_selected_node, p_map) + batch_graph = create_graph( + batch_df, p_selected_node_df, c_selected_node, p_map) # pass in empty short-lists and parent nodes to the weight function and ensure every node has infinite weighting. c_selected_node_empty = [] - p_selected_node_df_empty = pd.DataFrame([], columns=['state_hash', 'weight']) - weighted_graph = applyWeights(batch_graph, c_selected_node_empty, p_selected_node_df_empty) + p_selected_node_df_empty = pd.DataFrame( + [], columns=['state_hash', 'weight']) + weighted_graph = apply_weights( + batch_graph, c_selected_node_empty, p_selected_node_df_empty) assert len(list(weighted_graph.nodes)) == 5 for node in list(weighted_graph.nodes): assert weighted_graph.nodes[node]['weight'] == 9999 + def test_apply_weights_sum_weights_nested(): - batch_df = pd.DataFrame([['state_hash_1', 'parent_state_hash_1'], - ['state_hash_2', 'state_hash_1'], - ['state_hash_3', 'state_hash_2']], - columns=['state_hash', 'parent_state_hash']) - p_selected_node_df = pd.DataFrame([['parent_state_hash_1', 123], - ['parent_state_hash_2', 345]], - columns=['state_hash', 'weight']) - c_selected_node = ['state_hash_1', 'state_hash_2'] + batch_df = pd.DataFrame([['state_hash_1', 'parent_state_hash_1'], + ['state_hash_2', 'state_hash_1'], + ['state_hash_3', 'state_hash_2']], + columns=['state_hash', 'parent_state_hash']) + p_selected_node_df = pd.DataFrame([['parent_state_hash_1', 123], + ['parent_state_hash_2', 345]], + columns=['state_hash', 'weight']) + c_selected_node = ['state_hash_1', 'state_hash_2'] p_map = [['parent_state_hash_2', 'parent_state_hash_1']] - batch_graph = createGraph(batch_df, p_selected_node_df, c_selected_node, p_map) - weighted_graph = applyWeights(batch_graph, c_selected_node, p_selected_node_df) - assert len(list(weighted_graph.nodes))== 5 + batch_graph = create_graph( + batch_df, p_selected_node_df, c_selected_node, p_map) + weighted_graph = apply_weights( + batch_graph, c_selected_node, p_selected_node_df) + assert len(list(weighted_graph.nodes)) == 5 for node in list(weighted_graph.nodes): if node == 'state_hash_1': assert weighted_graph.nodes[node]['weight'] == 0 @@ -200,9 +233,9 @@ def test_apply_weights_sum_weights_nested(): if node == 'state_hash_3': assert weighted_graph.nodes[node]['weight'] == 9999 if node == 'parent_state_hash_1': - assert weighted_graph.nodes[node]['weight']== 123 + assert weighted_graph.nodes[node]['weight'] == 123 if node == 'parent_state_hash_2': - assert weighted_graph.nodes[node]['weight']== 345 + assert weighted_graph.nodes[node]['weight'] == 345 # The bfs is what computes the weight for nodes that aren't previous hashes or above the 34% threshold (which automatically have weight 0). # The bfs output actually includes the parent-hashes, as well, and all those hashes from the current batch with computed weight <= 2. @@ -210,63 +243,79 @@ def test_apply_weights_sum_weights_nested(): # --graph: weighted graph computed from create_graph and apply_weights function # --queue_list: these are the parent_hashes and the theshold hashes from the current batch. # --node: first element of the queue + + def test_bfs_easy(): batch_df = pd.DataFrame([['state_hash_1', 'parent_state_hash_1'], - ['state_hash_2', 'state_hash_1'], - ['state_hash_3', 'state_hash_2']], + ['state_hash_2', 'state_hash_1'], + ['state_hash_3', 'state_hash_2']], columns=['state_hash', 'parent_state_hash']) p_selected_node_df = pd.DataFrame([['parent_state_hash_1', 123], - ['parent_state_hash_2', 345]], - columns=['state_hash', 'weight']) + ['parent_state_hash_2', 345]], + columns=['state_hash', 'weight']) # empty short-list - c_selected_node = ['state_hash_1', 'state_hash_2'] + c_selected_node = ['state_hash_1', 'state_hash_2'] p_map = [['parent_state_hash_2', 'parent_state_hash_1']] - batch_graph = createGraph(batch_df, p_selected_node_df, c_selected_node, p_map) - weighted_graph = applyWeights(batch_graph, c_selected_node, p_selected_node_df) - queue_list = list(p_selected_node_df['state_hash'].values) + c_selected_node - shortlist = bfs(graph=weighted_graph, queue_list=queue_list, node=queue_list[0]) - - expected = pd.DataFrame([['state_hash_1', 0], ['state_hash_2', 0], ['state_hash_3', 1]], columns=['state_hash','weight']) + batch_graph = create_graph( + batch_df, p_selected_node_df, c_selected_node, p_map) + weighted_graph = apply_weights( + batch_graph, c_selected_node, p_selected_node_df) + queue_list = list( + p_selected_node_df['state_hash'].values) + c_selected_node + shortlist = bfs(graph=weighted_graph, + queue_list=queue_list, node=queue_list[0]) + + expected = pd.DataFrame([['state_hash_1', 0], ['state_hash_2', 0], [ + 'state_hash_3', 1]], columns=['state_hash', 'weight']) pd.testing.assert_frame_equal(shortlist, expected) + def test_bfs_hard(): batch_df = pd.DataFrame([ - ['state_hash_11', 'parent_state_hash_1'], - ['state_hash_12', 'state_hash_11'], - ['state_hash_13', 'state_hash_12'], + ['state_hash_11', 'parent_state_hash_1'], + ['state_hash_12', 'state_hash_11'], + ['state_hash_13', 'state_hash_12'], ['state_hash_14', 'state_hash_13'], ['state_hash_15', 'state_hash_14'], ['state_hash_16', 'state_hash_15'], - ['state_hash_21', 'parent_state_hash_2'], - ['state_hash_22', 'state_hash_21'], - ['state_hash_23', 'state_hash_22'], + ['state_hash_21', 'parent_state_hash_2'], + ['state_hash_22', 'state_hash_21'], + ['state_hash_23', 'state_hash_22'], ['state_hash_24', 'state_hash_23'], ['state_hash_25', 'state_hash_24'], ['state_hash_26', 'state_hash_25'] - ], + ], columns=['state_hash', 'parent_state_hash']) - p_selected_node_df = pd.DataFrame([['parent_state_hash_1', 1], - ['parent_state_hash_2', 1]], - columns=['state_hash', 'weight']) - c_selected_node = ['state_hash_11', 'state_hash_21'] + p_selected_node_df = pd.DataFrame([['parent_state_hash_1', 1], + ['parent_state_hash_2', 1]], + columns=['state_hash', 'weight']) + c_selected_node = ['state_hash_11', 'state_hash_21'] p_map = [['parent_state_hash_2', 'parent_state_hash_1']] - batch_graph = createGraph(batch_df, p_selected_node_df, c_selected_node, p_map) - weighted_graph = applyWeights(batch_graph, c_selected_node, p_selected_node_df) - queue_list = list(p_selected_node_df['state_hash'].values) + c_selected_node + batch_graph = create_graph( + batch_df, p_selected_node_df, c_selected_node, p_map) + weighted_graph = apply_weights( + batch_graph, c_selected_node, p_selected_node_df) + queue_list = list( + p_selected_node_df['state_hash'].values) + c_selected_node shortlist = bfs(weighted_graph, queue_list, queue_list[0]) expected = pd.DataFrame([['parent_state_hash_1', 1], - ['parent_state_hash_2', 1], - ['state_hash_11', 0], - ['state_hash_21', 0], - ['state_hash_12', 1], - ['state_hash_22', 1], - ['state_hash_13', 2], - ['state_hash_23', 2]], - columns=['state_hash','weight']) + ['parent_state_hash_2', 1], + ['state_hash_11', 0], + ['state_hash_21', 0], + ['state_hash_12', 1], + ['state_hash_22', 1], + ['state_hash_13', 2], + ['state_hash_23', 2]], + columns=['state_hash', 'weight']) assert set(shortlist['state_hash']) == set(expected['state_hash']) + def test_blockchain_epoch(): - state_hash_df = pd.DataFrame(['2021-12-21T10:15:30Z', '2021-12-31T10:15:30Z'], columns=['created_at']) - state_hash_df['blockchain_epoch'] = state_hash_df['created_at'].apply(lambda row: int(calendar.timegm(datetime.strptime(row, "%Y-%m-%dT%H:%M:%SZ").timetuple()) * 1000)) - expected = pd.DataFrame([1640081730000, 1640945730000], columns=['blockchain_epoch']) - pd.testing.assert_frame_equal(state_hash_df[['blockchain_epoch']], expected) \ No newline at end of file + state_hash_df = pd.DataFrame( + ['2021-12-21T10:15:30Z', '2021-12-31T10:15:30Z'], columns=['created_at']) + state_hash_df['blockchain_epoch'] = state_hash_df['created_at'].apply(lambda row: int( + calendar.timegm(datetime.strptime(row, "%Y-%m-%dT%H:%M:%SZ").timetuple()) * 1000)) + expected = pd.DataFrame( + [1640081730000, 1640945730000], columns=['blockchain_epoch']) + pd.testing.assert_frame_equal( + state_hash_df[['blockchain_epoch']], expected) diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index c3f503a..d2ee866 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -14,13 +14,13 @@ import psycopg2 from uptime_service_validation.coordinator.helper import ( DB, - getRelations, - findNewValuesToInsert, - filterStateHashPercentage, - createGraph, - applyWeights, + get_relations, + find_new_values_to_insert, + filter_state_hash_percentage, + create_graph, + apply_weights, bfs, - sendSlackMessage + send_slack_message ) from uptime_service_validation.coordinator.server import ( bool_env_var_set, @@ -32,7 +32,8 @@ ) # Add project root to python path -project_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +project_root = os.path.abspath(os.path.join( + os.path.dirname(__file__), "..", "..")) sys.path.insert(0, project_root) @@ -54,7 +55,8 @@ def wait_until_batch_ends(self): "If the time window if the current batch is not yet over, sleep until it is." if self.batch.end_time > self.current_timestamp: delta = timedelta(minutes=2) - sleep_interval = (self.batch.end_time - self.current_timestamp) + delta + sleep_interval = (self.batch.end_time - + self.current_timestamp) + delta time_until = self.current_timestamp + sleep_interval logging.info( "All submissions are processed till date. " @@ -122,14 +124,16 @@ def process(db, state): state.wait_until_batch_ends() master_df = pd.DataFrame() # Step 2 Create time ranges: - time_intervals = list(state.batch.split(int(os.environ["MINI_BATCH_NUMBER"]))) + time_intervals = list(state.batch.split( + int(os.environ["MINI_BATCH_NUMBER"]))) # Step 3 Create Kubernetes ZKValidators and pass mini-batches. worker_image = os.environ["WORKER_IMAGE"] worker_tag = os.environ["WORKER_TAG"] start = time() if bool_env_var_set("TEST_ENV"): logging.info("running in test environment") - setUpValidatorProcesses(time_intervals, logging, worker_image, worker_tag) + setUpValidatorProcesses(time_intervals, logging, + worker_image, worker_tag) else: setUpValidatorPods(time_intervals, logging, worker_image, worker_tag) end = time() @@ -143,13 +147,13 @@ def process(db, state): webhook_url = os.environ.get("WEBHOOK_URL") if webhook_url is not None: if end - start < float(os.environ["ALARM_ZK_LOWER_LIMIT_SEC"]): - sendSlackMessage( + send_slack_message( webhook_url, f"ZkApp Validation took {end - start} seconds, which is too quick", logging, ) if end - start > float(os.environ["ALARM_ZK_UPPER_LIMIT_SEC"]): - sendSlackMessage( + send_slack_message( webhook_url, f"ZkApp Validation took {end - start} seconds, which is too long", logging, @@ -211,7 +215,7 @@ def process(db, state): state_hash = pd.unique( master_df[["state_hash", "parent_state_hash"]].values.ravel("k") ) - state_hash_to_insert = findNewValuesToInsert( + state_hash_to_insert = find_new_values_to_insert( existing_state_df, pd.DataFrame(state_hash, columns=["statehash"]) ) if not state_hash_to_insert.empty: @@ -220,7 +224,8 @@ def process(db, state): nodes_in_cur_batch = pd.DataFrame( master_df["submitter"].unique(), columns=["block_producer_key"] ) - node_to_insert = findNewValuesToInsert(existing_nodes, nodes_in_cur_batch) + node_to_insert = find_new_values_to_insert( + existing_nodes, nodes_in_cur_batch) if not node_to_insert.empty: node_to_insert["updated_at"] = datetime.now(timezone.utc) @@ -234,17 +239,20 @@ def process(db, state): } ) - relation_df, p_selected_node_df = db.get_previous_statehash(state.batch.bot_log_id) - p_map = list(getRelations(relation_df)) - c_selected_node = filterStateHashPercentage(master_df) - batch_graph = createGraph(master_df, p_selected_node_df, c_selected_node, p_map) - weighted_graph = applyWeights( + relation_df, p_selected_node_df = db.get_previous_statehash( + state.batch.bot_log_id) + p_map = list(get_relations(relation_df)) + c_selected_node = filter_state_hash_percentage(master_df) + batch_graph = create_graph( + master_df, p_selected_node_df, c_selected_node, p_map) + weighted_graph = apply_weights( batch_graph=batch_graph, c_selected_node=c_selected_node, p_selected_node=p_selected_node_df, ) - queue_list = list(p_selected_node_df["state_hash"].values) + c_selected_node + queue_list = list( + p_selected_node_df["state_hash"].values) + c_selected_node batch_state_hash = list(master_df["state_hash"].unique()) @@ -256,7 +264,8 @@ def process(db, state): # but it's not used anywhere inside the function) ) point_record_df = master_df[ - master_df["state_hash"].isin(shortlisted_state_hash_df["state_hash"].values) + master_df["state_hash"].isin( + shortlisted_state_hash_df["state_hash"].values) ] for index, row in shortlisted_state_hash_df.iterrows(): @@ -271,7 +280,8 @@ def process(db, state): parent_hash.append(p_hash) shortlisted_state_hash_df["parent_state_hash"] = parent_hash - p_map = list(getRelations(shortlisted_state_hash_df[["parent_state_hash", "state_hash"]])) + p_map = list(get_relations( + shortlisted_state_hash_df[["parent_state_hash", "state_hash"]])) try: if not point_record_df.empty: file_timestamp = master_df.iloc[-1]["file_timestamps"] @@ -279,8 +289,8 @@ def process(db, state): file_timestamp = state.batch.end_time logging.info( "empty point record for start epoch %s end epoch %s", - state.batch.start_time.timestamp(), - state.batch.end_time.timestamp(), + state.batch.start_time.timestamp(), + state.batch.end_time.timestamp(), ) values = ( @@ -297,7 +307,8 @@ def process(db, state): if not point_record_df.empty: point_record_df.loc[:, "amount"] = 1 - point_record_df.loc[:, "created_at"] = datetime.now(timezone.utc) + point_record_df.loc[:, "created_at"] = datetime.now( + timezone.utc) point_record_df.loc[:, "bot_log_id"] = bot_log_id point_record_df = point_record_df[ [ diff --git a/uptime_service_validation/coordinator/helper.py b/uptime_service_validation/coordinator/helper.py index ee6f489..5d54585 100644 --- a/uptime_service_validation/coordinator/helper.py +++ b/uptime_service_validation/coordinator/helper.py @@ -12,6 +12,7 @@ ERROR = "Error: {0}" + @dataclass class Batch: """Represents the timeframe of the current batch and the database @@ -27,15 +28,16 @@ def next(self, bot_log_id): start_time=self.end_time, end_time=self.end_time + self.interval, interval=self.interval, - bot_log_id =- bot_log_id + bot_log_id=- bot_log_id ) def split(self, parts_number): "Splits the batch time window into equal parts for parallel proecessing." diff = (self.end_time - self.start_time) / parts_number - return ((self.start_time + diff * i, self.start_time + diff * (i + 1)) \ + return ((self.start_time + diff * i, self.start_time + diff * (i + 1)) for i in range(parts_number)) + class DB: """A wrapper around the database connection, providing high-level methods for querying and updating the database.""" @@ -63,10 +65,10 @@ def get_batch_timings(self, interval): finally: cursor.close() return Batch( - start_time = prev_batch_end, - end_time = cur_batch_end, - bot_log_id = bot_log_id, - interval = interval + start_time=prev_batch_end, + end_time=cur_batch_end, + bot_log_id=bot_log_id, + interval=interval ) def get_previous_statehash(self, bot_log_id): @@ -79,7 +81,8 @@ def get_previous_statehash(self, bot_log_id): cursor.execute(sql_query, (bot_log_id,)) result = cursor.fetchall() - df = pd.DataFrame(result, columns=["parent_state_hash", "state_hash", "weight"]) + df = pd.DataFrame( + result, columns=["parent_state_hash", "state_hash", "weight"]) previous_result_df = df[["parent_state_hash", "state_hash"]] p_selected_node_df = df[["state_hash", "weight"]] except (Exception, psycopg2.DatabaseError) as error: @@ -132,7 +135,7 @@ def create_node_record(self, df, page_size=100): try: extras.execute_batch(cursor, query, tuples, page_size) except (Exception, psycopg2.DatabaseError) as error: - logger.error(ERROR.format(error)) + self.logger.error(ERROR.format(error)) cursor.close() return 1 finally: @@ -158,11 +161,11 @@ def create_bot_log(self, values): self.logger.info("create_bot_log end ") return result[0] - def insert_statehash_results(self, df, page_size=100): "Relate statehashes to the batches they were observed in." self.logger.info("create_botlogs_statehash start ") - temp_df = df[["parent_state_hash", "state_hash", "weight", "bot_log_id"]] + temp_df = df[["parent_state_hash", + "state_hash", "weight", "bot_log_id"]] tuples = [tuple(x) for x in temp_df.to_numpy()] query = """INSERT INTO bot_logs_statehash(parent_statehash_id, statehash_id, weight, bot_log_id ) VALUES ( @@ -183,7 +186,6 @@ def insert_statehash_results(self, df, page_size=100): self.logger.info("create_botlogs_statehash end ") return 0 - def create_point_record(self, df, page_size=100): "Add a new scoring submission to the database." self.logger.info("create_point_record start ") @@ -271,14 +273,15 @@ def get_existing_nodes(self): return nodes -def getRelations(df): - return ((parent, child) for child, parent \ - in df[["state_hash", "parent_state_hash"]].values \ +def get_relations(df): + "Extract parent-child relations between statehashes oin a dataframe." + return ((parent, child) for child, parent + in df[["state_hash", "parent_state_hash"]].values if parent in df["state_hash"].values) - -def findNewValuesToInsert(existing_values, new_values): +def find_new_values_to_insert(existing_values, new_values): + "Find the new values to insert into the database." return ( existing_values.merge(new_values, how="outer", indicator=True) .loc[lambda x: x["_merge"] == "right_only"] @@ -287,9 +290,11 @@ def findNewValuesToInsert(existing_values, new_values): ) -def filterStateHashPercentage(df, p=0.34): +def filter_state_hash_percentage(df, p=0.34): + "Filter statehashes by percentage of block producers who submitted them." state_hash_list = ( - df["state_hash"].value_counts().sort_values(ascending=False).index.to_list() + df["state_hash"].value_counts().sort_values( + ascending=False).index.to_list() ) # get 34% number of blk in given batch total_unique_blk = df["block_producer_key"].nunique() @@ -303,7 +308,9 @@ def filterStateHashPercentage(df, p=0.34): return good_state_hash_list -def createGraph(batch_df, p_selected_node_df, c_selected_node, p_map): +def create_graph(batch_df, p_selected_node_df, c_selected_node, p_map): + """Create a directed graph of parent-child relations between blocks + in the batch dataframe.""" batch_graph = nx.DiGraph() parent_hash_list = batch_df["parent_state_hash"].unique() state_hash_list = set( @@ -313,12 +320,6 @@ def createGraph(batch_df, p_selected_node_df, c_selected_node, p_map): selected_parent = [ parent for parent in parent_hash_list if parent in state_hash_list ] - """ t1=[w[42:] for w in list(p_selected_node_df['state_hash'].values)] - t2=[w[42:] for w in c_selected_node] - t3=[w[42:] for w in state_hash_list] - batch_graph.add_nodes_from(t1) - batch_graph.add_nodes_from( t2) - batch_graph.add_nodes_from(t3) """ batch_graph.add_nodes_from(list(p_selected_node_df["state_hash"].values)) batch_graph.add_nodes_from(c_selected_node) @@ -337,7 +338,8 @@ def createGraph(batch_df, p_selected_node_df, c_selected_node, p_map): return batch_graph -def applyWeights(batch_graph, c_selected_node, p_selected_node): +def apply_weights(batch_graph, c_selected_node, p_selected_node): + "Apply weights to to statehashes," for node in list(batch_graph.nodes()): if node in c_selected_node: batch_graph.nodes[node]["weight"] = 0 @@ -351,7 +353,8 @@ def applyWeights(batch_graph, c_selected_node, p_selected_node): return batch_graph -def plotGraph(batch_graph, g_pos, title): +def plot_graph(batch_graph, g_pos, title): + "Plot the graph of parent-child relations between state hashes." # plot the graph plt.figure(figsize=(8, 8)) plt.title(title) @@ -375,7 +378,8 @@ def plotGraph(batch_graph, g_pos, title): return g_pos -def getMinimumWeight(graph, child_node): +def get_minimum_weight(graph, child_node): + "Find the statehash with the minimum weight." child_node_weight = graph.nodes[child_node]["weight"] for parent in list(graph.predecessors(child_node)): lower = min(graph.nodes[parent]["weight"] + 1, child_node_weight) @@ -384,6 +388,7 @@ def getMinimumWeight(graph, child_node): def bfs(graph, queue_list, node, max_depth=2): + "Breadth-first search through the graph." visited = list() visited.append(node) cnt = 2 @@ -391,7 +396,8 @@ def bfs(graph, queue_list, node, max_depth=2): m = queue_list.pop(0) for neighbour in list(graph.neighbors(m)): if neighbour not in visited: - graph.nodes[neighbour]["weight"] = getMinimumWeight(graph, neighbour) + graph.nodes[neighbour]["weight"] = get_minimum_weight( + graph, neighbour) visited.append(neighbour) # if not neighbour in visited: queue_list.append(neighbour) @@ -411,8 +417,8 @@ def bfs(graph, queue_list, node, max_depth=2): return shortlisted_state_hash_df - -def sendSlackMessage(url, message, logger): - payload='{"text": "%s" }' % message +def send_slack_message(url, message, logger): + "Send a slack message to the specified URL." + payload = '{"text": "%s" }' % message response = requests.post(url, data=payload) logger.info(response) From 25f54ba7711db3f2646163b978854085e284023e Mon Sep 17 00:00:00 2001 From: Sventimir Date: Thu, 29 Feb 2024 17:07:25 +0100 Subject: [PATCH 10/10] Fix broken test. --- tests/test_helper.py | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/tests/test_helper.py b/tests/test_helper.py index 697cb8e..e746693 100644 --- a/tests/test_helper.py +++ b/tests/test_helper.py @@ -1,19 +1,24 @@ from datetime import datetime, timedelta from uptime_service_validation.coordinator.aws_keyspaces_client import Submission -from uptime_service_validation.coordinator.helper import (getTimeBatches) import pandas as pd from uptime_service_validation.coordinator.helper import ( - filter_state_hash_percentage, create_graph, apply_weights, bfs) + Batch, filter_state_hash_percentage, create_graph, apply_weights, bfs) import calendar def test_get_time_batches(): - a = datetime(2023, 11, 6, 15, 35, 47, 630499) - b = a + timedelta(minutes=5) - result = getTimeBatches(a, b, 10) + start_time = datetime(2023, 11, 6, 15, 35, 47, 630499) + interval = timedelta(minutes=5) + batch = Batch( + start_time = start_time, + end_time = start_time + interval, + interval = interval, + bot_log_id = 1 + ) + result = list(batch.split(10)) assert len(result) == 10 - assert result[0] == (a, datetime(2023, 11, 6, 15, 36, 17, 630499)) + assert result[0] == (batch.start_time, datetime(2023, 11, 6, 15, 36, 17, 630499)) assert result[1] == ( datetime(2023, 11, 6, 15, 36, 17, 630499), datetime(2023, 11, 6, 15, 36, 47, 630499), @@ -46,7 +51,7 @@ def test_get_time_batches(): datetime(2023, 11, 6, 15, 39, 47, 630499), datetime(2023, 11, 6, 15, 40, 17, 630499), ) - assert result[9] == (datetime(2023, 11, 6, 15, 40, 17, 630499), b) + assert result[9] == (datetime(2023, 11, 6, 15, 40, 17, 630499), batch.end_time) def test_array_dataframe():