Skip to content

Commit

Permalink
Extract Cassandra interaction as a separate function.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sventimir committed Mar 4, 2024
1 parent a731689 commit 0315ead
Showing 1 changed file with 43 additions and 42 deletions.
85 changes: 43 additions & 42 deletions uptime_service_validation/coordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,48 @@ def __warn_if_work_took_longer_then_expected(self):
self.current_timestamp,
)

def load_submissions(batch):
"""Load submissions from Cassandra and return them as a DataFrame."""
submissions = []
submissions_verified = []
cassandra = AWSKeyspacesClient()
try:
cassandra.connect()
submissions = cassandra.get_submissions(
submitted_at_start=batch.start_time,
submitted_at_end=batch.end_time,
start_inclusive=True,
end_inclusive=False,
)
# for further processing
# we use only submissions verified = True and validation_error = None
for submission in submissions:
if submission.verified and submission.validation_error is None:
submissions_verified.append(submission)
except Exception as e:
logging.error("Error in loading submissions: %s", e)
return pd.DataFrame([])
finally:
cassandra.close()

all_submissions_count = len(submissions)
submissions_to_process_count = len(submissions_verified)
logging.info("number of all submissions: %s", all_submissions_count)
logging.info(
"number of submissions to process: %s",
submissions_to_process_count
)
if submissions_to_process_count < all_submissions_count:
logging.warning(
"some submissions were not processed, because they were not \
verified or had validation errors"
)
return pd.DataFrame(
[asdict(submission) for submission in submissions_verified]
)

def process_statehash_df(db, batch, state_hash_df, verification_time):
"""Process the state hash dataframe and return the master dataframe."""
all_files_count = state_hash_df.shape[0]
master_df = pd.DataFrame()
master_df["state_hash"] = state_hash_df["state_hash"]
Expand Down Expand Up @@ -254,11 +295,9 @@ def process(db, state):

# sleep until batch ends, update the state accordingly, then continue.
state.wait_until_batch_ends()
# Step 2 Create time ranges:
time_intervals = list(state.batch.split(
int(os.environ["MINI_BATCH_NUMBER"])))

# Step 3 Create Kubernetes ZKValidators and pass mini-batches.
worker_image = os.environ["WORKER_IMAGE"]
worker_tag = os.environ["WORKER_TAG"]
timer = Timer()
Expand All @@ -270,7 +309,6 @@ def process(db, state):
with timer.measure():
setUpValidatorPods(time_intervals, logging, worker_image, worker_tag)

# Step 4 We need to read the ZKValidator results from a db.
logging.info(
"reading ZKValidator results from a db between the time range: %s - %s",
state.batch.start_time,
Expand All @@ -293,54 +331,18 @@ def process(db, state):
logging,
)

submissions = []
submissions_verified = []
cassandra = AWSKeyspacesClient()
try:
cassandra.connect()
submissions = cassandra.get_submissions(
submitted_at_start=state.batch.start_time,
submitted_at_end=state.batch.end_time,
start_inclusive=True,
end_inclusive=False,
)
# for further processing
# we use only submissions verified = True and validation_error = None
for submission in submissions:
if submission.verified and submission.validation_error is None:
submissions_verified.append(submission)
finally:
cassandra.close()

all_submissions_count = len(submissions)
submissions_to_process_count = len(submissions_verified)
logging.info("number of all submissions: %s", all_submissions_count)
logging.info(
"number of submissions to process: %s",
submissions_to_process_count
)
if submissions_to_process_count < all_submissions_count:
logging.warning(
"some submissions were not processed, because they were not \
verified or had validation errors"
)

# Step 5 checks for forks and writes to the db.
state_hash_df = pd.DataFrame(
[asdict(submission) for submission in submissions_verified]
)
state_hash_df = load_submissions(state.batch)
if not state_hash_df.empty:
try:
bot_log_id = process_statehash_df(
db, state.batch, state_hash_df, timer.duration
)
db.connection.commit()
except Exception as error:
db.connection.rollback()
logging.error("ERROR: %s", error)
state.retry_batch()
return
else:
db.connection.commit()
else:
# new bot log id hasn't been created, so proceed with the old one
bot_log_id = state.batch.bot_log_id
Expand Down Expand Up @@ -376,7 +378,6 @@ def main():
password=os.environ["POSTGRES_PASSWORD"],
)

# Step 1 Get previous record and build relations list
interval = int(os.environ["SURVEY_INTERVAL_MINUTES"])
db = DB(connection, logging)
batch = db.get_batch_timings(timedelta(minutes=interval))
Expand Down

0 comments on commit 0315ead

Please sign in to comment.