diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index 9db3c089..24eb0d37 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -168,93 +168,6 @@ def initKubernetesServerRunner(self, **kwargs) -> GammaServerRunner: csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER, ) - def assertAtLeastOnePointWithinRange( - self, - points: list[monitoring_v3.types.Point], - ref_bytes: int, - tolerance: float = 0.05, - ): - """ - A helper function to check whether at least one of the "points" whose - mean should be within X% of ref_bytes. - """ - for point in points: - if ( - ref_bytes * (1 - tolerance) - < point.value.distribution_value.mean - < ref_bytes * (1 + tolerance) - ): - return - self.fail( - f"No data point with {ref_bytes}±{tolerance*100}% bytes found" - ) - - @classmethod - def build_histogram_query(cls, metric_type: str) -> str: - # - # The list_time_series API requires us to query one metric - # at a time. - # - # The 'grpc_status = "OK"' filter condition is needed because - # some time series data points were logged when the grpc_status - # was "UNAVAILABLE" when the client/server were establishing - # connections. - # - # The 'grpc_method' filter condition is needed because the - # server metrics are also serving on the Channelz requests. - # - return ( - f'metric.type = "{metric_type}" AND ' - 'metric.labels.grpc_status = "OK" AND ' - f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"' - ) - - @classmethod - def build_counter_query(cls, metric_type: str) -> str: - # For these num rpcs started counter metrics, they do not have the - # 'grpc_status' label - return ( - f'metric.type = "{metric_type}" AND ' - f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"' - ) - - def query_metrics( - self, - metric_names: Iterable[str], - build_query_fn: BuildQueryFn, - interval: monitoring_v3.TimeInterval, - ) -> dict[str, MetricTimeSeries]: - """ - A helper function to make the cloud monitoring API call to query - metrics created by this test run. - """ - results = {} - for metric in metric_names: - logger.info("Requesting list_time_series for metric %s", metric) - response = self.metric_client.list_time_series( - name=f"projects/{self.project}", - filter=build_query_fn(metric), - interval=interval, - view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, - ) - time_series = list(response) - - self.assertLen( - time_series, - 1, - msg=f"Query for {metric} should return exactly 1 time series." - f" Found {len(time_series)}.", - ) - - metric_time_series = MetricTimeSeries.from_response( - metric, time_series[0] - ) - logger.info( - "Metric %s:\n%s", metric, metric_time_series.pretty_print() - ) - results[metric] = metric_time_series - return results - def test_csm_observability(self): # TODO(sergiitk): [GAMMA] Consider moving out custom gamma # resource creation out of self.startTestServers() @@ -446,6 +359,93 @@ def test_csm_observability(self): all_results[metric].points, RESPONSE_PAYLOAD_SIZE ) + @classmethod + def build_histogram_query(cls, metric_type: str) -> str: + # + # The list_time_series API requires us to query one metric + # at a time. + # + # The 'grpc_status = "OK"' filter condition is needed because + # some time series data points were logged when the grpc_status + # was "UNAVAILABLE" when the client/server were establishing + # connections. + # + # The 'grpc_method' filter condition is needed because the + # server metrics are also serving on the Channelz requests. + # + return ( + f'metric.type = "{metric_type}" AND ' + 'metric.labels.grpc_status = "OK" AND ' + f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"' + ) + + @classmethod + def build_counter_query(cls, metric_type: str) -> str: + # For these num rpcs started counter metrics, they do not have the + # 'grpc_status' label + return ( + f'metric.type = "{metric_type}" AND ' + f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"' + ) + + def query_metrics( + self, + metric_names: Iterable[str], + build_query_fn: BuildQueryFn, + interval: monitoring_v3.TimeInterval, + ) -> dict[str, MetricTimeSeries]: + """ + A helper function to make the cloud monitoring API call to query + metrics created by this test run. + """ + results = {} + for metric in metric_names: + logger.info("Requesting list_time_series for metric %s", metric) + response = self.metric_client.list_time_series( + name=f"projects/{self.project}", + filter=build_query_fn(metric), + interval=interval, + view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + ) + time_series = list(response) + + self.assertLen( + time_series, + 1, + msg=f"Query for {metric} should return exactly 1 time series." + f" Found {len(time_series)}.", + ) + + metric_time_series = MetricTimeSeries.from_response( + metric, time_series[0] + ) + logger.info( + "Metric %s:\n%s", metric, metric_time_series.pretty_print() + ) + results[metric] = metric_time_series + return results + + def assertAtLeastOnePointWithinRange( + self, + points: list[monitoring_v3.types.Point], + ref_bytes: int, + tolerance: float = 0.05, + ): + """ + A helper function to check whether at least one of the "points" whose + mean should be within X% of ref_bytes. + """ + for point in points: + if ( + ref_bytes * (1 - tolerance) + < point.value.distribution_value.mean + < ref_bytes * (1 + tolerance) + ): + return + self.fail( + f"No data point with {ref_bytes}±{tolerance*100}% bytes found" + ) + if __name__ == "__main__": absltest.main()