-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtraceparser.py
205 lines (165 loc) · 7.99 KB
/
traceparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
import json
import argparse
from collections import defaultdict
import re
import graphviz
import hashlib
import networkx as nx
def load_json_data(file_path):
"""
Load and return the JSON data from the given file path.
"""
try:
with open(file_path, 'r') as file:
return json.load(file)
except FileNotFoundError:
print(f"Error: The file {file_path} was not found.")
exit(1)
except json.JSONDecodeError:
print(f"Error: Failed to parse JSON from {file_path}.")
exit(1)
def format_function_call(input_str):
# Initialize variables
function_name = None
file_path = None
line = None
# Use regex to extract the function name, file path, and line number
match = re.search(r"(.+) \(([^:()]+(?![^()]*[<>])):(\d+)\)", input_str)
if match:
function_name, file_path, line = match.groups()
if any(char in function_name for char in ["<", ">"]):
function_name = None
# Proceed only if function_name is successfully extracted and does not contain "<" or ">"
if file_path:
# Define the regex pattern to split by, which matches "py-wwpdb_" followed by any string and a "/"
pattern = r"py-wwpdb_[^/]+/"
# Extract the substring that matches the pattern
match = re.search(pattern, file_path)
if match:
# Extract the matched substring
repository = match.group(0)
# Use the matched substring to split the file path
file_path = file_path.split(repository)[1]
return function_name, file_path, line
def hash_string_to_rgb(input_str):
"""
Hash a string to an RGB color.
"""
hash_object = hashlib.md5(input_str.encode())
hash_hex = hash_object.hexdigest()
# Use the first 6 characters of the hash to create an RGB value
rgb = tuple(int(hash_hex[i:i+2], 16) for i in (0, 2, 4))
return f'#{rgb[0]:02x}{rgb[1]:02x}{rgb[2]:02x}'
def process_events(data):
events = data if isinstance(data, list) else data.get('traceEvents', [])
filtered_events = [event for event in events if 'ts' in event and event['ph'] == 'X']
sorted_events = sorted(filtered_events, key=lambda x: x['ts'])
event_stack = [] # Stack to keep track of the call hierarchy
dependency_graph = defaultdict(set) # Initialize the dependency graph
for i, event in enumerate(sorted_events):
event_end = event['ts'] + event.get('dur', 0)
# Pop events from the stack that have ended
while event_stack and event_stack[-1]['end'] <= event['ts']:
popped_event = event_stack.pop()
if event_stack: # If there's a caller, update the graph
caller, caller_path, caller_line = format_function_call(event_stack[-1]['name'])
callee, callee_path, callee_line = format_function_call(popped_event['name'])
if caller and callee:
source = f"name={caller}\nfile={caller_path}\nline={caller_line}"
target = f"name={callee}\nfile={callee_path}\nline={callee_line}"
dependency_graph[source].add(target)
# Determine the indentation
indent = ''.join(['| ' for _ in range(len(event_stack))])
# Check if the current event is the last one at its depth
is_last_at_depth = True # Assume it's the last by default
if i + 1 < len(sorted_events):
next_event_start = sorted_events[i + 1]['ts']
is_last_at_depth = not (next_event_start < event_end or len(event_stack) == len(sorted_events[i + 1].get('stack', [])))
# Adjust branch symbol based on whether it's the last event at its depth
branch_symbol = '└─ ' if is_last_at_depth else '├─ '
input_str = event['name']
call, path, line = format_function_call(input_str)
formatted_str = f"{call} ({path}:{line})" if call else None
if formatted_str:
print(f"{indent}{branch_symbol}{formatted_str}")
event['end'] = event_end
event_stack.append(event)
print(f"└─-{2*len(indent)*'-'}-END") # Print the end of the call stack
return dependency_graph # Return the graph for visualization
def extract_file_attribute(node_string):
"""Extract the file attribute from the node string."""
match = re.search(r'file=([^\n]+)', node_string)
if match:
return match.group(1) # Return the matched file path
return None # Return None if no file attribute is found
def visualize_dependency_graph(dependency_graph):
dot = graphviz.Digraph(comment='Dependency Graph', graph_attr={'rankdir': 'LR'})
graph_data = {'nodes': [], 'edges': []}
color_file_mapping = {}
for caller, callees in dependency_graph.items():
caller_file = extract_file_attribute(caller)
caller_color = hash_string_to_rgb(caller_file if caller_file else caller)
color_file_mapping[caller_color] = caller_file if caller_file else "Unknown"
if caller not in graph_data['nodes']:
graph_data['nodes'].append(caller)
dot.node(caller, color=caller_color, style='filled', fillcolor=caller_color)
for callee in callees:
callee_file = extract_file_attribute(callee)
callee_color = hash_string_to_rgb(callee_file if callee_file else callee)
color_file_mapping[callee_color] = callee_file if callee_file else "Unknown"
if callee not in graph_data['nodes']:
graph_data['nodes'].append(callee)
graph_data['edges'].append({'source': caller, 'target': callee})
dot.node(callee, color=callee_color, style='filled', fillcolor=callee_color)
dot.edge(caller, callee)
# Add a legend for color-file mapping
with dot.subgraph(name='cluster_legend') as c:
c.attr(label='Legend', style='filled', color='lightgrey', fontsize='20', fontcolor='black', rank='min')
for color, file in color_file_mapping.items():
c.node(file, label=file, color=color, style='filled', fillcolor=color)
c.attr(pos='0,0!') # Position the legend in the upper left corner
# Save the graph visualization
dot.render('dependency_graph', view=True)
# Export the graph data to JSON for further analysis
with open('dependency_graph.json', 'w') as f:
json.dump(graph_data, f, indent=4)
def parse_viztracer_output(file_path):
"""
Parse the VizTracer output from the given file path.
"""
data = load_json_data(file_path)
dependency_graph = process_events(data)
visualize_dependency_graph(dependency_graph) # Visualize the graph after processing
# Define a function that parses the dependency graph JSON file and analyzes the graph, then convert it into a nested dictionary and export it to a JSON file
def analyze_dependency_graph(file_path):
"""
Analyze the dependency graph from the given file path.
"""
with open(file_path, 'r') as f:
graph_data = json.load(f)
# Initialize the graph dictionary
graph_dict = {'nodes': {}, 'edges': {}}
# Process the nodes
for node in graph_data['nodes']:
node_id = node
node_data = {'id': node_id, 'name': node, 'file': extract_file_attribute(node)}
graph_dict['nodes'][node_id] = node_data
# Process the edges
for edge in graph_data['edges']:
source = edge['source']
target = edge['target']
if source not in graph_dict['edges']:
graph_dict['edges'][source] = []
graph_dict['edges'][source].append(target)
# Export the graph dictionary to a JSON file
with open('call_stack.json', 'w') as f:
json.dump(graph_dict, f, indent=4)
def main():
parser = argparse.ArgumentParser(description="Parse VizTracer output JSON file and print call stacks.")
parser.add_argument('file_path', type=str, help="Path to the VizTracer output JSON file")
args = parser.parse_args()
parse_viztracer_output(args.file_path)
# Analyze the dependency graph
analyze_dependency_graph('dependency_graph.json')
if __name__ == "__main__":
main()