Skip to content

Commit

Permalink
adjust coordinator load_submissions with ability to get from postgres
Browse files Browse the repository at this point in the history
  • Loading branch information
piotr-iohk committed Apr 26, 2024
1 parent 5c99485 commit 7a45595
Showing 1 changed file with 40 additions and 22 deletions.
62 changes: 40 additions & 22 deletions uptime_service_validation/coordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
get_contact_details_from_spreadsheet,
)
from uptime_service_validation.coordinator.server import (
bool_env_var_set,
setUpValidatorPods,
setUpValidatorProcesses,
)
Expand Down Expand Up @@ -111,32 +110,47 @@ def __warn_if_work_took_longer_then_expected(self):
)


def load_submissions(time_intervals):
def load_submissions(time_intervals, db, submission_storage=Config.SUBMISSION_STORAGE):
"""
Load submissions from Cassandra:
Load submissions from Config.SUBMISSION_STORAGE:
- return validated subs as a DataFrame for further processing.
- return all subs for storing in the submissions_by_submitter table.
"""
submissions = []
submissions_verified = []
cassandra = AWSKeyspacesClient()

try:
cassandra.connect()
for time_interval in time_intervals:
submissions.extend(
cassandra.get_submissions(
submitted_at_start=time_interval[0],
submitted_at_end=time_interval[1],
start_inclusive=True,
end_inclusive=False,
if submission_storage == Config.STORAGE_CASSANDRA:
cassandra = AWSKeyspacesClient()
try:
cassandra.connect()
for time_interval in time_intervals:
submissions.extend(
cassandra.get_submissions(
submitted_at_start=time_interval[0],
submitted_at_end=time_interval[1],
start_inclusive=True,
end_inclusive=False,
)
)
)
except Exception as e:
logging.error("Error in loading submissions: %s", e)
return pd.DataFrame([])
finally:
cassandra.close()
except Exception as e:
logging.error("Error in loading submissions: %s", e)
return [pd.DataFrame([]), submissions]
finally:
cassandra.close()
elif submission_storage == Config.STORAGE_POSTGRES:
start_date = time_intervals[0][0]
end_date = time_intervals[-1][1]
try:
submissions_result = db.get_submissions(start_date, end_date)
if submissions_result is None:
logging.error("Failed to load submissions from database.")
return [pd.DataFrame([]), submissions]
submissions.extend(submissions_result)
except Exception as e:
logging.error("Error in loading submissions: %s", e)
return [pd.DataFrame([]), submissions]
else:
raise ValueError(f"Invalid submission storage: {submission_storage}")

# for further processing
# we use only submissions verified = True and validation_error = None or ""
Expand Down Expand Up @@ -354,7 +368,9 @@ def process(db, state):
logging,
)

state_hash_df, all_submissions = load_submissions(time_intervals)
state_hash_df, all_submissions = load_submissions(
time_intervals, db, Config.SUBMISSION_STORAGE
)
if not state_hash_df.empty:
try:
bot_log_id = process_statehash_df(
Expand All @@ -375,7 +391,9 @@ def process(db, state):
state.batch.end_time,
Config.UPTIME_DAYS_FOR_SCORE,
)
db.insert_submissions(all_submissions)
# we only copy submissions to Postgres if we're using Cassandra as the primary storage
if Config.SUBMISSION_STORAGE == Config.STORAGE_CASSANDRA:
db.insert_submissions(all_submissions)
except Exception as error:
db.connection.rollback()
logging.error("ERROR: %s", error)
Expand All @@ -399,7 +417,7 @@ def main():
f"Invalid storage option: {Config.SUBMISSION_STORAGE}. Valid options are {Config.VALID_STORAGE_OPTIONS}"
)
else:
logging.info("Using %s as the submission storage.", Config.SUBMISSION_STORAGE)
logging.info("Using SUBMISSION_STORAGE: %s", Config.SUBMISSION_STORAGE)

connection = psycopg2.connect(
host=Config.POSTGRES_HOST,
Expand Down

0 comments on commit 7a45595

Please sign in to comment.