Skip to content

Commit

Permalink
Reorder CSM o11y test function order to place main test function near…
Browse files Browse the repository at this point in the history
… the top
  • Loading branch information
stanley-cheung committed Feb 2, 2024
1 parent a776e29 commit a12d86d
Showing 1 changed file with 103 additions and 103 deletions.
206 changes: 103 additions & 103 deletions tests/gamma/csm_observability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,109 +152,6 @@ def setUpClass(cls):
super().setUpClass()
cls.metric_client = cls.gcp_api_manager.monitoring_metric_service("v3")

# These parameters are more pertaining to the test itself, not to
# each run().
def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner:
return super().initKubernetesClientRunner(
csm_workload_name=CSM_WORKLOAD_NAME_CLIENT,
csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_CLIENT,
)

# These parameters are more pertaining to the test itself, not to
# each run().
def initKubernetesServerRunner(self, **kwargs) -> GammaServerRunner:
return super().initKubernetesServerRunner(
csm_workload_name=CSM_WORKLOAD_NAME_SERVER,
csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER,
)

def assertAtLeastOnePointWithinRange(
self,
points: list[monitoring_v3.types.Point],
ref_bytes: int,
tolerance: float = 0.05,
):
"""
A helper function to check whether at least one of the "points" whose
mean should be within X% of ref_bytes.
"""
for point in points:
if (
ref_bytes * (1 - tolerance)
< point.value.distribution_value.mean
< ref_bytes * (1 + tolerance)
):
return
self.fail(
f"No data point with {ref_bytes}±{tolerance*100}% bytes found"
)

@classmethod
def build_histogram_query(cls, metric_type: str) -> str:
#
# The list_time_series API requires us to query one metric
# at a time.
#
# The 'grpc_status = "OK"' filter condition is needed because
# some time series data points were logged when the grpc_status
# was "UNAVAILABLE" when the client/server were establishing
# connections.
#
# The 'grpc_method' filter condition is needed because the
# server metrics are also serving on the Channelz requests.
#
return (
f'metric.type = "{metric_type}" AND '
'metric.labels.grpc_status = "OK" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
)

@classmethod
def build_counter_query(cls, metric_type: str) -> str:
# For these num rpcs started counter metrics, they do not have the
# 'grpc_status' label
return (
f'metric.type = "{metric_type}" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
)

def query_metrics(
self,
metric_names: Iterable[str],
build_query_fn: BuildQueryFn,
interval: monitoring_v3.TimeInterval,
) -> dict[str, MetricTimeSeries]:
"""
A helper function to make the cloud monitoring API call to query
metrics created by this test run.
"""
results = {}
for metric in metric_names:
logger.info("Requesting list_time_series for metric %s", metric)
response = self.metric_client.list_time_series(
name=f"projects/{self.project}",
filter=build_query_fn(metric),
interval=interval,
view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
)
time_series = list(response)

self.assertLen(
time_series,
1,
msg=f"Query for {metric} should return exactly 1 time series."
f" Found {len(time_series)}.",
)

metric_time_series = MetricTimeSeries.from_response(
metric, time_series[0]
)
logger.info(
"Metric %s:\n%s", metric, metric_time_series.pretty_print()
)
results[metric] = metric_time_series
return results

def test_csm_observability(self):
# TODO(sergiitk): [GAMMA] Consider moving out custom gamma
# resource creation out of self.startTestServers()
Expand Down Expand Up @@ -446,6 +343,109 @@ def test_csm_observability(self):
all_results[metric].points, RESPONSE_PAYLOAD_SIZE
)

# These parameters are more pertaining to the test itself, not to
# each run().
def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner:
return super().initKubernetesClientRunner(
csm_workload_name=CSM_WORKLOAD_NAME_CLIENT,
csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_CLIENT,
)

# These parameters are more pertaining to the test itself, not to
# each run().
def initKubernetesServerRunner(self, **kwargs) -> GammaServerRunner:
return super().initKubernetesServerRunner(
csm_workload_name=CSM_WORKLOAD_NAME_SERVER,
csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER,
)

@classmethod
def build_histogram_query(cls, metric_type: str) -> str:
#
# The list_time_series API requires us to query one metric
# at a time.
#
# The 'grpc_status = "OK"' filter condition is needed because
# some time series data points were logged when the grpc_status
# was "UNAVAILABLE" when the client/server were establishing
# connections.
#
# The 'grpc_method' filter condition is needed because the
# server metrics are also serving on the Channelz requests.
#
return (
f'metric.type = "{metric_type}" AND '
'metric.labels.grpc_status = "OK" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
)

@classmethod
def build_counter_query(cls, metric_type: str) -> str:
# For these num rpcs started counter metrics, they do not have the
# 'grpc_status' label
return (
f'metric.type = "{metric_type}" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
)

def query_metrics(
self,
metric_names: Iterable[str],
build_query_fn: BuildQueryFn,
interval: monitoring_v3.TimeInterval,
) -> dict[str, MetricTimeSeries]:
"""
A helper function to make the cloud monitoring API call to query
metrics created by this test run.
"""
results = {}
for metric in metric_names:
logger.info("Requesting list_time_series for metric %s", metric)
response = self.metric_client.list_time_series(
name=f"projects/{self.project}",
filter=build_query_fn(metric),
interval=interval,
view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
)
time_series = list(response)

self.assertLen(
time_series,
1,
msg=f"Query for {metric} should return exactly 1 time series."
f" Found {len(time_series)}.",
)

metric_time_series = MetricTimeSeries.from_response(
metric, time_series[0]
)
logger.info(
"Metric %s:\n%s", metric, metric_time_series.pretty_print()
)
results[metric] = metric_time_series
return results

def assertAtLeastOnePointWithinRange(
self,
points: list[monitoring_v3.types.Point],
ref_bytes: int,
tolerance: float = 0.05,
):
"""
A helper function to check whether at least one of the "points" whose
mean should be within X% of ref_bytes.
"""
for point in points:
if (
ref_bytes * (1 - tolerance)
< point.value.distribution_value.mean
< ref_bytes * (1 + tolerance)
):
return
self.fail(
f"No data point with {ref_bytes}±{tolerance*100}% bytes found"
)


if __name__ == "__main__":
absltest.main()

0 comments on commit a12d86d

Please sign in to comment.