Skip to content

Commit

Permalink
Evaluate: YCSB as timeseries per experiment run - code cleaning
Browse files Browse the repository at this point in the history
  • Loading branch information
perdelt committed Jan 29, 2025
1 parent a10b6ff commit bd69a22
Show file tree
Hide file tree
Showing 3 changed files with 4,284 additions and 110 deletions.
62 changes: 33 additions & 29 deletions bexhoma/evaluators.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,17 +839,17 @@ def benchmarking_aggregate_by_parallel_pods(self, df):
'[CLEANUP].AverageLatency(us)':'mean',
'[CLEANUP].MinLatency(us)':'min',
'[CLEANUP].MaxLatency(us)':'max',
'[CLEANUP].95thPercentileLatency(us)':'mean',
'[CLEANUP].99thPercentileLatency(us)':'mean',
'[CLEANUP].95thPercentileLatency(us)':'max',
'[CLEANUP].99thPercentileLatency(us)':'max',
}}
if '[READ].Operations' in grp.columns:
aggregate = {**aggregate, **{
'[READ].Operations':'sum',
'[READ].AverageLatency(us)':'mean',
'[READ].MinLatency(us)':'min',
'[READ].MaxLatency(us)':'max',
'[READ].95thPercentileLatency(us)':'mean',
'[READ].99thPercentileLatency(us)':'mean',
'[READ].95thPercentileLatency(us)':'max',
'[READ].99thPercentileLatency(us)':'max',
'[READ].Return=OK': 'sum',
}}
if '[INSERT].Operations' in grp.columns:
Expand All @@ -858,8 +858,8 @@ def benchmarking_aggregate_by_parallel_pods(self, df):
'[INSERT].AverageLatency(us)':'mean',
'[INSERT].MinLatency(us)':'min',
'[INSERT].MaxLatency(us)':'max',
'[INSERT].95thPercentileLatency(us)':'mean',
'[INSERT].99thPercentileLatency(us)':'mean',
'[INSERT].95thPercentileLatency(us)':'max',
'[INSERT].99thPercentileLatency(us)':'max',
'[INSERT].Return=OK': 'sum',
}}
if '[UPDATE].Operations' in grp.columns:
Expand All @@ -868,8 +868,8 @@ def benchmarking_aggregate_by_parallel_pods(self, df):
'[UPDATE].AverageLatency(us)':'mean',
'[UPDATE].MinLatency(us)':'min',
'[UPDATE].MaxLatency(us)':'max',
'[UPDATE].95thPercentileLatency(us)':'mean',
'[UPDATE].99thPercentileLatency(us)':'mean',
'[UPDATE].95thPercentileLatency(us)':'max',
'[UPDATE].99thPercentileLatency(us)':'max',
'[UPDATE].Return=OK': 'sum',
}}
if '[SCAN].Operations' in grp.columns:
Expand All @@ -878,8 +878,8 @@ def benchmarking_aggregate_by_parallel_pods(self, df):
'[SCAN].AverageLatency(us)':'mean',
'[SCAN].MinLatency(us)':'min',
'[SCAN].MaxLatency(us)':'max',
'[SCAN].95thPercentileLatency(us)':'mean',
'[SCAN].99thPercentileLatency(us)':'mean',
'[SCAN].95thPercentileLatency(us)':'max',
'[SCAN].99thPercentileLatency(us)':'max',
'[SCAN].Return=OK':'sum',
}}
#print(grp.agg(aggregate))
Expand Down Expand Up @@ -1023,15 +1023,21 @@ def get_df_loading(self):
#df#.sort_values(["configuration", "pod"])
return df
def parse_ycsb_log_file(self, file_path):
"""
Scans the lines of a YCSB log file.
Extracts relevant performance infos for time series analysis.
Each line starting with a time stamp is converted into a dict containing measurements (operations, sec of measurement, READ latency, ...)-
:param file_path: Full path of log file
:return: List of dicts of measures, one entry per line
"""
def parse_string(log):
log = re.sub(r'Avg=�', 'Avg=0', log)
try:
# Extract the date and time
date_time_match = re.match(r"(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}:\d{3})", log)
date_time_str = date_time_match.group(1) if date_time_match else None
date_time = datetime.strptime(date_time_str, "%Y-%m-%d %H:%M:%S:%f") if date_time_str else None
# Extract all numbers
#all_numbers = re.findall(r"\b\d+\.?\d*\b", log)
# Extract metrics from sections like [READ: ...] or [UPDATE: ...]
# Match the pattern for operations and ops/sec
match = re.search(r"(\d+)\s+operations", log)
Expand Down Expand Up @@ -1095,6 +1101,13 @@ def flatten_dict(d, parent_key='', sep='_'):
items.append((new_key, v))
return dict(items)
def find_matching_files(directory, pattern):
"""
Finds files in the specified directory that match the given pattern.
:param directory: The path to the directory where the search is performed.
:param pattern: The file pattern to match (e.g., "*.txt" for all text files).
:return: A list of file paths that match the pattern.
"""
# Use glob to find files matching the pattern
matching_files = glob.glob(os.path.join(directory, pattern))
return matching_files
Expand All @@ -1105,11 +1118,11 @@ def find_matching_files(directory, pattern):
num_logs = 0
for file_logs in list_logs:
pattern = 'bexhoma-benchmarker-*-{}.log'.format(file_logs)
#print(self.path+'/'+self.code)
#print("Scan for files in "+self.path+'/'+self.code)
matching_files = find_matching_files(self.path, pattern)
for file in matching_files:
num_logs = num_logs + 1
#print(file)
#print("Extract data from log file "+file)
parsed_results = self.parse_ycsb_log_file(file)
data = []
for result in parsed_results:
Expand All @@ -1131,38 +1144,29 @@ def find_matching_files(directory, pattern):
#print(data)
df = pd.DataFrame(data)
df = df.set_index('sec')
#print("BLA", df)
#df.fillna(0)
#df.fillna(0) # we need NaN for missing values (e.g., average computation)
df = df.groupby(df.index).last() # in case of duplicate indexes (i.e., times)
if remove_first > 0:
df = df.iloc[remove_first:]
if remove_last > 0:
df = df.iloc[:-remove_last]
#print(df)
#df['avg'] = df[column].mean()
#print(df)
#df_total = pd.concat([df_total, df], axis=1)
if not aggregate:
df_total.append(df.copy())
else:
if df_total.empty:
#df['avg'] = df[column].mean()
df_total = df.copy()
else:
if "9" in metric:
if "9" in metric or "Max" in metric:
df_total[column] = df_total[column].combine(df[column], lambda x, y: x if (x > y and pd.notna(x) and pd.notna(y)) or (pd.notna(x) and not pd.notna(y)) else y)
elif "Min" in metric:
df_total[column] = df_total[column].combine(df[column], lambda x, y: x if (x < y and pd.notna(x) and pd.notna(y)) or (pd.notna(x) and not pd.notna(y)) else y)
else:
# compute average or sum
df_total = df_total.add(df, fill_value=0)
#df_total[column] = df_total[column] + df[column]
#df.plot(ylim=(0,df['current_ops_per_sec'].max()*1.1))
if aggregate:
#print(df_total)
if not metric == "current_ops_per_sec" and not "9" in metric:
#print(df_total)
#print("divide by", num_logs)
if not metric == "current_ops_per_sec" and not "9" in metric and not "Max" in metric and not "Min" in metric:
df_total = df_total / num_logs
df_total['avg'] = int(df_total[column].mean())
#print(df_total)
return df_total
def get_benchmark_logs_timeseries_df_aggregated(self, metric="current_ops_per_sec", configuration="", client='1', experiment_run='1'):
#code = "1737365651"
Expand Down
Loading

0 comments on commit bd69a22

Please sign in to comment.