diff --git a/uptime_service_validation/coordinator/coordinator.py b/uptime_service_validation/coordinator/coordinator.py index 8a5096f..603bbfd 100644 --- a/uptime_service_validation/coordinator/coordinator.py +++ b/uptime_service_validation/coordinator/coordinator.py @@ -105,7 +105,48 @@ def __warn_if_work_took_longer_then_expected(self): self.current_timestamp, ) +def load_submissions(batch): + """Load submissions from Cassandra and return them as a DataFrame.""" + submissions = [] + submissions_verified = [] + cassandra = AWSKeyspacesClient() + try: + cassandra.connect() + submissions = cassandra.get_submissions( + submitted_at_start=batch.start_time, + submitted_at_end=batch.end_time, + start_inclusive=True, + end_inclusive=False, + ) + # for further processing + # we use only submissions verified = True and validation_error = None + for submission in submissions: + if submission.verified and submission.validation_error is None: + submissions_verified.append(submission) + except Exception as e: + logging.error("Error in loading submissions: %s", e) + return pd.DataFrame([]) + finally: + cassandra.close() + + all_submissions_count = len(submissions) + submissions_to_process_count = len(submissions_verified) + logging.info("number of all submissions: %s", all_submissions_count) + logging.info( + "number of submissions to process: %s", + submissions_to_process_count + ) + if submissions_to_process_count < all_submissions_count: + logging.warning( + "some submissions were not processed, because they were not \ + verified or had validation errors" + ) + return pd.DataFrame( + [asdict(submission) for submission in submissions_verified] + ) + def process_statehash_df(db, batch, state_hash_df, verification_time): + """Process the state hash dataframe and return the master dataframe.""" all_files_count = state_hash_df.shape[0] master_df = pd.DataFrame() master_df["state_hash"] = state_hash_df["state_hash"] @@ -254,11 +295,9 @@ def process(db, state): # sleep until batch ends, update the state accordingly, then continue. state.wait_until_batch_ends() - # Step 2 Create time ranges: time_intervals = list(state.batch.split( int(os.environ["MINI_BATCH_NUMBER"]))) - # Step 3 Create Kubernetes ZKValidators and pass mini-batches. worker_image = os.environ["WORKER_IMAGE"] worker_tag = os.environ["WORKER_TAG"] timer = Timer() @@ -270,7 +309,6 @@ def process(db, state): with timer.measure(): setUpValidatorPods(time_intervals, logging, worker_image, worker_tag) - # Step 4 We need to read the ZKValidator results from a db. logging.info( "reading ZKValidator results from a db between the time range: %s - %s", state.batch.start_time, @@ -293,54 +331,18 @@ def process(db, state): logging, ) - submissions = [] - submissions_verified = [] - cassandra = AWSKeyspacesClient() - try: - cassandra.connect() - submissions = cassandra.get_submissions( - submitted_at_start=state.batch.start_time, - submitted_at_end=state.batch.end_time, - start_inclusive=True, - end_inclusive=False, - ) - # for further processing - # we use only submissions verified = True and validation_error = None - for submission in submissions: - if submission.verified and submission.validation_error is None: - submissions_verified.append(submission) - finally: - cassandra.close() - - all_submissions_count = len(submissions) - submissions_to_process_count = len(submissions_verified) - logging.info("number of all submissions: %s", all_submissions_count) - logging.info( - "number of submissions to process: %s", - submissions_to_process_count - ) - if submissions_to_process_count < all_submissions_count: - logging.warning( - "some submissions were not processed, because they were not \ - verified or had validation errors" - ) - - # Step 5 checks for forks and writes to the db. - state_hash_df = pd.DataFrame( - [asdict(submission) for submission in submissions_verified] - ) + state_hash_df = load_submissions(state.batch) if not state_hash_df.empty: try: bot_log_id = process_statehash_df( db, state.batch, state_hash_df, timer.duration ) + db.connection.commit() except Exception as error: db.connection.rollback() logging.error("ERROR: %s", error) state.retry_batch() return - else: - db.connection.commit() else: # new bot log id hasn't been created, so proceed with the old one bot_log_id = state.batch.bot_log_id @@ -376,7 +378,6 @@ def main(): password=os.environ["POSTGRES_PASSWORD"], ) - # Step 1 Get previous record and build relations list interval = int(os.environ["SURVEY_INTERVAL_MINUTES"]) db = DB(connection, logging) batch = db.get_batch_timings(timedelta(minutes=interval))