Skip to content

Commit

Permalink
use time_intervals when loading submissions
Browse files Browse the repository at this point in the history
  • Loading branch information
piotr-iohk committed Mar 5, 2024
1 parent d08b620 commit 3104fb1
Showing 1 changed file with 19 additions and 27 deletions.
46 changes: 19 additions & 27 deletions uptime_service_validation/coordinator/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def __warn_if_work_took_longer_then_expected(self):
self.current_timestamp,
)

def load_submissions(batch, time_intervals):

def load_submissions(time_intervals):
"""Load submissions from Cassandra and return them as a DataFrame."""
submissions = []
submissions_verified = []
Expand Down Expand Up @@ -142,9 +143,8 @@ def load_submissions(batch, time_intervals):
"some submissions were not processed, because they were not \
verified or had validation errors"
)
return pd.DataFrame(
[asdict(submission) for submission in submissions_verified]
)
return pd.DataFrame([asdict(submission) for submission in submissions_verified])


def process_statehash_df(db, batch, state_hash_df, verification_time):
"""Process the state hash dataframe and return the master dataframe."""
Expand Down Expand Up @@ -179,8 +179,7 @@ def process_statehash_df(db, batch, state_hash_df, verification_time):
nodes_in_cur_batch = pd.DataFrame(
master_df["submitter"].unique(), columns=["block_producer_key"]
)
node_to_insert = find_new_values_to_insert(
existing_nodes, nodes_in_cur_batch)
node_to_insert = find_new_values_to_insert(existing_nodes, nodes_in_cur_batch)

if not node_to_insert.empty:
node_to_insert["updated_at"] = datetime.now(timezone.utc)
Expand All @@ -191,15 +190,13 @@ def process_statehash_df(db, batch, state_hash_df, verification_time):
columns={
"file_updated": "file_timestamps",
"submitter": "block_producer_key",
}
},
)

relation_df, p_selected_node_df = db.get_previous_statehash(
batch.bot_log_id)
relation_df, p_selected_node_df = db.get_previous_statehash(batch.bot_log_id)
p_map = list(get_relations(relation_df))
c_selected_node = filter_state_hash_percentage(master_df)
batch_graph = create_graph(
master_df, p_selected_node_df, c_selected_node, p_map)
batch_graph = create_graph(master_df, p_selected_node_df, c_selected_node, p_map)
weighted_graph = apply_weights(
batch_graph=batch_graph,
c_selected_node=c_selected_node,
Expand All @@ -218,8 +215,7 @@ def process_statehash_df(db, batch, state_hash_df, verification_time):
# but it's not used anywhere inside the function)
)
point_record_df = master_df[
master_df["state_hash"].isin(
shortlisted_state_hash_df["state_hash"].values)
master_df["state_hash"].isin(shortlisted_state_hash_df["state_hash"].values)
]

for index, row in shortlisted_state_hash_df.iterrows():
Expand All @@ -228,15 +224,13 @@ def process_statehash_df(db, batch, state_hash_df, verification_time):
p_selected_node_df = shortlisted_state_hash_df.copy()
parent_hash = []
for s in shortlisted_state_hash_df["state_hash"].values:
p_hash = master_df[master_df["state_hash"] == s][
"parent_state_hash"
].values[0]
p_hash = master_df[master_df["state_hash"] == s]["parent_state_hash"].values[0]
parent_hash.append(p_hash)
shortlisted_state_hash_df["parent_state_hash"] = parent_hash

p_map = list(get_relations(
shortlisted_state_hash_df[["parent_state_hash", "state_hash"]]
))
p_map = list(
get_relations(shortlisted_state_hash_df[["parent_state_hash", "state_hash"]])
)
if not point_record_df.empty:
file_timestamp = master_df.iloc[-1]["file_timestamps"]
else:
Expand All @@ -252,7 +246,7 @@ def process_statehash_df(db, batch, state_hash_df, verification_time):
file_timestamp,
batch.start_time.timestamp(),
batch.end_time.timestamp(),
verification_time.total_seconds()
verification_time.total_seconds(),
)
bot_log_id = db.create_bot_log(values)

Expand All @@ -261,8 +255,7 @@ def process_statehash_df(db, batch, state_hash_df, verification_time):

if not point_record_df.empty:
point_record_df.loc[:, "amount"] = 1
point_record_df.loc[:, "created_at"] = datetime.now(
timezone.utc)
point_record_df.loc[:, "created_at"] = datetime.now(timezone.utc)
point_record_df.loc[:, "bot_log_id"] = bot_log_id
point_record_df = point_record_df[
[
Expand All @@ -288,16 +281,15 @@ def process(db, state):
logging.info(
"iteration start at: %s, cur_timestamp: %s",
state.batch.start_time,
state.current_timestamp
state.current_timestamp,
)
logging.info(
"running for batch: %s - %s.", state.batch.start_time, state.batch.end_time
)

# sleep until batch ends, update the state accordingly, then continue.
state.wait_until_batch_ends()
time_intervals = list(state.batch.split(
int(os.environ["MINI_BATCH_NUMBER"])))
time_intervals = list(state.batch.split(int(os.environ["MINI_BATCH_NUMBER"])))

worker_image = os.environ["WORKER_IMAGE"]
worker_tag = os.environ["WORKER_TAG"]
Expand All @@ -313,7 +305,7 @@ def process(db, state):
logging.info(
"reading ZKValidator results from a db between the time range: %s - %s",
state.batch.start_time,
state.batch.end_time
state.batch.end_time,
)

logging.info("ZKValidator results read from a db in %s.", timer.duration)
Expand All @@ -332,7 +324,7 @@ def process(db, state):
logging,
)

state_hash_df = load_submissions(state.batch, time_intervals)
state_hash_df = load_submissions(time_intervals)
if not state_hash_df.empty:
try:
bot_log_id = process_statehash_df(
Expand Down

0 comments on commit 3104fb1

Please sign in to comment.