Skip to content

Commit

Permalink
Merge pull request #21 from facebookresearch/fix_issues_18and20
Browse files Browse the repository at this point in the history
[hta] Fix queue length summary to handle missing data, and avoid -ve …
  • Loading branch information
anupambhatnagar authored Feb 9, 2023
2 parents 1e70b8a + d3a3c78 commit b84eef8
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 38 deletions.
127 changes: 95 additions & 32 deletions hta/analyzers/trace_counters.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,30 @@ def __init__(self):
pass

@classmethod
def _get_queue_length_time_series_for_rank(cls, t: "Trace", rank: int) -> pd.DataFrame:
def _get_queue_length_time_series_for_rank(cls, t: "Trace", rank: int) -> Optional[pd.DataFrame]:
"""
Returns an (optional) dataframe with time series for the queue length
on a CUDA streams within requested rank.
Queue length is defined as the number of outstanding CUDA operations on a stream
The value of the queue length is:
1. Incremented when a CUDA runtime operation enqueues a kernel on a stream.
2. Decremented when a CUDA kernel/memcopy operation executes on a stream.
Args:
t (Trace): Input trace data structure.
rank (int): rank to generate the time series for.
Returns:
Optional[pd.DataFrame]
Returns an (optional) dataframe containing time series with the following
columns: ts (timestamp), pid, tid (of corresponding GPU, stream), stream and
queue_length.
Note that each row or timestamp denotes a change in the value of the
time series. The value remains constant until the next timestamp.
In essence, it can be thought of as a step function.
"""
# get trace for a rank
trace_df: pd.DataFrame = t.get_trace(rank)

Expand Down Expand Up @@ -62,7 +85,11 @@ def _get_queue_length_time_series_for_rank(cls, t: "Trace", rank: int) -> pd.Dat
stream_df["queue_length"] = stream_df["queue"].cumsum()
result_df_list.append(stream_df)

return pd.concat(result_df_list)[["ts", "pid", "tid", "stream", "queue_length"]]
return (
pd.concat(result_df_list)[["ts", "pid", "tid", "stream", "queue_length"]]
if len(result_df_list) > 0
else None
)

@classmethod
def get_queue_length_time_series(
Expand All @@ -76,18 +103,21 @@ def get_queue_length_time_series(
Queue length is defined as the number of outstanding CUDA operations on a stream
The value of the queue length is:
1. Incremented when a CUDA runtime operation enqueues a kernel on a stream.
3. Decremented when a CUDA kernel/memcopy operation executes on a stream.
The dataframe returned contains time series points with columns
- ts (timestamp), pid, tid (of corresponding GPU,stream), stream,
and queue_length.
Note that each row or time point shows a changes in the value of the time series.
The value remains constant until the next time point. In essence, you can think
of it like a step function that keeps changing.
2. Decremented when a CUDA kernel/memcopy operation executes on a stream.
Args:
t (Trace): Input trace data structure.
ranks (list of int): ranks to perform this analysis for.
rank (int): rank to perform this analysis for.
Returns:
Dict[int, pd.DataFrame]:
A dictionary of rank -> time series with the queue length of each CUDA stream.
Each dataframe contains a time series consisting of the following columns:
ts (timestamp), pid, tid (of corresponding GPU, stream), stream and queue_length.
Note that each row or timestamp shows a change in the value of the
time series. The value remains constant until the next timestamp.
In essence, it can be thought of as a step function.
"""
if ranks is None or len(ranks) == 0:
ranks = [0]
Expand All @@ -98,23 +128,26 @@ def get_queue_length_time_series(
"stays constant until the next update."
)

return {rank: TraceCounters._get_queue_length_time_series_for_rank(t, rank) for rank in ranks}
result = {rank: TraceCounters._get_queue_length_time_series_for_rank(t, rank) for rank in ranks}
return dict(filter(lambda x: x[1] is not None, result.items()))

@classmethod
def get_queue_length_summary(
cls,
t: "Trace",
ranks: Optional[List[int]] = None,
) -> pd.DataFrame:
) -> Optional[pd.DataFrame]:
"""
Returns a dataframe with queue length statistics per CUDA stream and rank.
We summarize queue length per stream and rank using-
count, min, max, std-deviation, 25, 50th and 75th percentiles.
The summary uses the pandas describe() function.
Returns an (optional) dataframe with queue length statistics per CUDA stream and rank.
Args:
t (Trace): Input trace data structure.
ranks (list of int): ranks to perform this analysis for.
ranks (list of int): ranks to perform this analysis.
Returns:
Optional[pd.DataFrame]
An (optional) dataframe containing the summary statistics of queue length per
stream and rank.
"""
if ranks is None or len(ranks) == 0:
ranks = [0]
Expand All @@ -125,10 +158,25 @@ def get_queue_length_summary(
rank_df["rank"] = rank
result = rank_df[["rank", "stream", "queue_length"]].groupby(["rank", "stream"]).describe()
results_list.append(result)
return pd.concat(results_list)
return pd.concat(results_list) if len(results_list) > 0 else None

@classmethod
def _get_memory_bw_time_series_for_rank(cls, t: "Trace", rank: int) -> pd.DataFrame:
def _get_memory_bw_time_series_for_rank(cls, t: "Trace", rank: int) -> Optional[pd.DataFrame]:
"""
Returns time series for the memory bandwidth of memory copy and memory set operations
for specified rank.
Args:
t (Trace): Input trace data structure.
rank (int): rank to generate the time series for.
Returns:
Optional[pd.DataFrame]
Returns an (optional) dataframe with time series for the memory bandwidth.
The dataframe returned contains time series with columns:
ts (timestamp), pid (of corresponding GPU), name of memory copy type
and memory_bw_gbps (memory bandwidth in GB/sec).
"""
# get trace for a rank
trace_df: pd.DataFrame = t.get_trace(rank)
sym_table = t.symbol_table.get_sym_table()
Expand All @@ -143,6 +191,10 @@ def _get_memory_bw_time_series_for_rank(cls, t: "Trace", rank: int) -> pd.DataFr
lambda x: get_memory_kernel_type(sym_table[x["name"]]), axis=1
)

# In case of 0 us duration events round it up to 1 us to avoid -ve values
# see https://github.com/facebookresearch/HolisticTraceAnalysis/issues/20
memcpy_kernels.loc[memcpy_kernels.dur == 0, ["dur"]] = 1

membw_time_series_a = memcpy_kernels[["ts", "name", "pid", "memory_bw_gbps"]]
membw_time_series_b = memcpy_kernels[["ts", "name", "dur", "pid", "memory_bw_gbps"]].copy()

Expand All @@ -162,8 +214,11 @@ def _get_memory_bw_time_series_for_rank(cls, t: "Trace", rank: int) -> pd.DataFr
for _, membw_df in membw_time_series.groupby("name"):
membw_df.memory_bw_gbps = membw_df.memory_bw_gbps.cumsum()
result_df_list.append(membw_df)

if len(result_df_list) == 0:
return None

result_df = pd.concat(result_df_list)[["ts", "pid", "name", "memory_bw_gbps"]]
result_df["tid"] = 0
return result_df

@classmethod
Expand All @@ -175,12 +230,16 @@ def get_memory_bw_time_series(
"""
Returns a dictionary of rank -> time series for the memory bandwidth.
The dataframe returned contains time series points with columns
- ts (timestamp), pid (of corresponding GPU), name of memory copy type
and memory_bw_gbps - memory bandwidth in GB/sec
Args:
t (Trace): Input trace data structure.
ranks (list of int): ranks to perform this analysis for.
Returns:
Dict[int, pd.DataFrame]
Returns a dictionary of rank -> time series for the memory bandwidth.
The dataframe returned contains time series along with the following columns:
ts (timestamp), pid (of corresponding GPU), name of memory copy type
and memory_bw_gbps (memory bandwidth in GB/sec).
"""
if ranks is None or len(ranks) == 0:
ranks = [0]
Expand All @@ -190,23 +249,27 @@ def get_memory_bw_time_series(
"when the value changes. Once a values is observed the time series "
"stays constant until the next update."
)
return {rank: TraceCounters._get_memory_bw_time_series_for_rank(t, rank) for rank in ranks}
result = {rank: TraceCounters._get_memory_bw_time_series_for_rank(t, rank) for rank in ranks}
return dict(filter(lambda x: x[1] is not None, result.items()))

@classmethod
def get_memory_bw_summary(
cls,
t: "Trace",
ranks: Optional[List[int]] = None,
) -> pd.DataFrame:
) -> Optional[pd.DataFrame]:
"""
Returns a dataframe with memory copy bandwidth statistic per rank and
memory/memset copy type.
We summarize memory bandwidth by
count, min, max, std-deviation, 25, 50th and 75th percentiles.
The summary uses the pandas describe() function.
Returns an (optional) dataframe containing the summary statistics of memory ops. The
tracked memory ops are MemcpyDtoH, MemcpyHtoD, MemcpyDtoD and MemSet.
Args:
t (Trace): Input trace data structure.
ranks (list of int): ranks to perform this analysis for.
Returns:
Optional[pd.DataFrame]
An (optional) dataframe containing the summary statistics of the following memory ops:
MemcpyDtoH, MemcpyHtoD, MemcpyDtoD, MemSet.
"""
if ranks is None or len(ranks) == 0:
ranks = [0]
Expand All @@ -220,4 +283,4 @@ def get_memory_bw_summary(

result = rank_df[["rank", "name", "memory_bw_gbps"]].groupby(["rank", "name"]).describe()
results_list.append(result)
return pd.concat(results_list)
return pd.concat(results_list) if len(results_list) > 0 else None
4 changes: 2 additions & 2 deletions hta/common/trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -637,10 +637,10 @@ def convert_time_series_to_events(
Returns a list of json events that can be appended to the trace.
"""
required_columns = ["tid", "pid", "ts", counter_col]
required_columns = ["pid", "ts", counter_col]
if not set(required_columns).issubset(series.columns):
logger.warning(
"Time seried dataframe does NOT contain required columns "
"Time series dataframe does NOT contain required columns "
f"{required_columns}, columns contained = {series.columns}"
)
return []
Expand Down
8 changes: 5 additions & 3 deletions hta/trace_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def add_time_series(series_dict: Dict[int, pd.DataFrame], counter_name: str, cou
def get_queue_length_summary(
self,
ranks: Optional[List[int]] = None,
) -> pd.DataFrame:
) -> Optional[pd.DataFrame]:
r"""
Queue length is defined as the number of outstanding CUDA operations on a stream. This
functions calculates the summary statistics for the queue length on each CUDA stream for
Expand All @@ -334,9 +334,10 @@ def get_queue_length_summary(
ranks (List[int]): List of ranks for which to queue length summary is calculated. Default = [0].
Returns:
pd.DataFrame
pd.DataFrame or None
A dataframe summarizing the queue length statistics. The dataframe contains count,
min, max, standard deviation, 25th, 50th and 75th percentiles.
The function returns None when the dataframe is empty.
"""
return TraceCounters.get_queue_length_summary(self.t, ranks)

Expand Down Expand Up @@ -373,9 +374,10 @@ def get_memory_bw_summary(
ranks (List[int]): List of ranks for which memory bandwidth is calculated. Default = [0].
Returns:
pd.DataFrame
pd.DataFrame or None
A dataframe containing the summary statistics. The dataframe includes count, min, max, standard deviation,
25th, 50th and 75th percentiles of memory copy/memset operations.
The function returns None when the dataframe is empty.
"""
return TraceCounters.get_memory_bw_summary(self.t, ranks)

Expand Down
Binary file added tests/data/rank_non_gpu/rank_non_gpu.json.gz
Binary file not shown.
15 changes: 14 additions & 1 deletion tests/test_trace_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ class TraceAnalysisTestCase(unittest.TestCase):
vision_transformer_t: TraceAnalysis
inference_t: TraceAnalysis
df_index_resolver_t: TraceAnalysis
rank_non_gpu_t: TraceAnalysis

@classmethod
def setUpClass(cls):
super(TraceAnalysisTestCase, cls).setUpClass()
vision_transformer_trace_dir: str = "tests/data/vision_transformer"
inference_trace_dir: str = "tests/data/inference_single_rank"
df_index_resolver_trace_dir: str = "tests/data/df_index_resolver"
rank_non_gpu_trace_dir: str = "tests/data/rank_non_gpu/"
cls.vision_transformer_t = TraceAnalysis(trace_dir=vision_transformer_trace_dir)
cls.inference_t = TraceAnalysis(trace_dir=inference_trace_dir)
cls.df_index_resolver_t = TraceAnalysis(trace_dir=df_index_resolver_trace_dir)
cls.rank_non_gpu_t = TraceAnalysis(trace_dir=rank_non_gpu_trace_dir)

def setUp(self):
self.overlaid_trace_dir = "tests/data"
Expand Down Expand Up @@ -203,7 +206,7 @@ def test_generate_trace_with_counters(self, mock_write_trace):

counter_events = [ev for ev in trace_json["traceEvents"] if ev["ph"] == PHASE_COUNTER]
print(f"Trace has {len(counter_events)} counter events")
self.assertGreaterEqual(len(counter_events), 23000)
self.assertGreaterEqual(len(counter_events), 21000)

counter_names = {ev["name"] for ev in counter_events}
self.assertEqual(
Expand All @@ -225,6 +228,16 @@ def test_generate_trace_with_counters(self, mock_write_trace):
# 2 ranks x 6 streams
self.assertEqual(len(queue_len_summary_df), 12)

# Test traces without GPU kernels, these should return empty dicts or dataframes
queue_len_ts_dict = self.rank_non_gpu_t.get_queue_length_time_series()
self.assertEqual(len(queue_len_ts_dict), 0)

queue_len_summary_df = self.rank_non_gpu_t.get_queue_length_summary(ranks=[0])
self.assertIsNone(queue_len_summary_df)

mem_bw_summary_df = self.rank_non_gpu_t.get_memory_bw_summary(ranks=[0])
self.assertIsNone(mem_bw_summary_df)

def test_get_idle_time_breakdown(self):
(idle_time_df, idle_interval_df,) = self.vision_transformer_t.get_idle_time_breakdown(
ranks=[0, 1], visualize=False, show_idle_interval_stats=True
Expand Down

0 comments on commit b84eef8

Please sign in to comment.