From 77a20d3fc6ef4b96f910dad61b7c3cb2213d6bbf Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 1 Feb 2024 17:02:45 -0800 Subject: [PATCH] Add assertions to CSM Observability Test (#23) This PR adds some assertions to test whether the metrics generated by the CSM Observability Test match what we expect. We use the `list_time_series` cloud monitoring API to query metrics we just generated by the test run, and then verify whether the metrics data (label keys and values, and data points) is what we expected. --------- Co-authored-by: Sergii Tkachenko --- .../runners/k8s/gamma_server_runner.py | 27 ++ .../runners/k8s/k8s_xds_client_runner.py | 8 + framework/xds_gamma_testcase.py | 3 +- framework/xds_k8s_testcase.py | 16 +- kubernetes-manifests/client.deployment.yaml | 20 + kubernetes-manifests/server.deployment.yaml | 20 + tests/gamma/csm_observability_test.py | 401 +++++++++++++++++- 7 files changed, 471 insertions(+), 24 deletions(-) diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index 4a547152..e9a3ea20 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -39,9 +39,13 @@ class GammaServerRunner(KubernetesServerRunner): be_policy: Optional[k8s.GcpBackendPolicy] = None termination_grace_period_seconds: Optional[int] = None pre_stop_hook: bool = False + pod_monitoring: Optional[k8s.PodMonitoring] = None + pod_monitoring_name: Optional[str] = None route_name: str frontend_service_name: str + csm_workload_name: str + csm_canonical_service_name: str def __init__( self, @@ -73,6 +77,8 @@ def __init__( bepolicy_name: str = "backend-policy", termination_grace_period_seconds: int = 0, pre_stop_hook: bool = False, + csm_workload_name: str = "", + csm_canonical_service_name: str = "", ): # pylint: disable=too-many-locals super().__init__( @@ -105,6 +111,8 @@ def __init__( self.bepolicy_name = bepolicy_name self.termination_grace_period_seconds = termination_grace_period_seconds self.pre_stop_hook = pre_stop_hook + self.csm_workload_name = csm_workload_name + self.csm_canonical_service_name = csm_canonical_service_name def run( # pylint: disable=arguments-differ self, @@ -204,8 +212,21 @@ def run( # pylint: disable=arguments-differ pre_stop_hook=self.pre_stop_hook, enable_csm_observability=enable_csm_observability, generate_mesh_id=generate_mesh_id, + csm_workload_name=self.csm_workload_name, + csm_canonical_service_name=self.csm_canonical_service_name, ) + # Create a PodMonitoring resource if CSM Observability is enabled + # This is GMP (Google Managed Prometheus) + if enable_csm_observability: + self.pod_monitoring_name = f"{self.deployment_id}-gmp" + self.pod_monitoring = self._create_pod_monitoring( + "csm/pod-monitoring.yaml", + namespace_name=self.k8s_namespace.name, + deployment_id=self.deployment_id, + pod_monitoring_name=self.pod_monitoring_name, + ) + servers = self._make_servers_for_deployment( replica_count, test_port=test_port, @@ -292,6 +313,12 @@ def cleanup(self, *, force=False, force_namespace=False): self._delete_service_account(self.service_account_name) self.service_account = None + # Pod monitoring name is only set when CSM observability is enabled. + if self.pod_monitoring_name and (self.pod_monitoring or force): + self._delete_pod_monitoring(self.pod_monitoring_name) + self.pod_monitoring = None + self.pod_monitoring_name = None + self._cleanup_namespace(force=(force_namespace and force)) finally: self._stop() diff --git a/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/framework/test_app/runners/k8s/k8s_xds_client_runner.py index def640f1..ead7fd79 100644 --- a/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -34,6 +34,8 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): debug_use_port_forwarding: bool td_bootstrap_image: str network: str + csm_workload_name: str + csm_canonical_service_name: str # Optional fields. service_account_name: Optional[str] = None @@ -62,6 +64,8 @@ def __init__( # pylint: disable=too-many-locals namespace_template: Optional[str] = None, debug_use_port_forwarding: bool = False, enable_workload_identity: bool = True, + csm_workload_name: str = "", + csm_canonical_service_name: str = "", ): super().__init__( k8s_namespace, @@ -79,6 +83,8 @@ def __init__( # pylint: disable=too-many-locals self.deployment_template = deployment_template self.enable_workload_identity = enable_workload_identity self.debug_use_port_forwarding = debug_use_port_forwarding + self.csm_workload_name = csm_workload_name + self.csm_canonical_service_name = csm_canonical_service_name # Used by the TD bootstrap generator. self.td_bootstrap_image = td_bootstrap_image @@ -166,6 +172,8 @@ def run( # pylint: disable=arguments-differ generate_mesh_id=generate_mesh_id, print_response=print_response, enable_csm_observability=enable_csm_observability, + csm_workload_name=self.csm_workload_name, + csm_canonical_service_name=self.csm_canonical_service_name, ) # Create a PodMonitoring resource if CSM Observability is enabled diff --git a/framework/xds_gamma_testcase.py b/framework/xds_gamma_testcase.py index b535b9ff..fc8d9273 100644 --- a/framework/xds_gamma_testcase.py +++ b/framework/xds_gamma_testcase.py @@ -94,7 +94,7 @@ def initTrafficDirectorManager( compute_api_version=self.compute_api_version, ) - def initKubernetesServerRunner(self) -> GammaServerRunner: + def initKubernetesServerRunner(self, **kwargs) -> GammaServerRunner: return GammaServerRunner( k8s.KubernetesNamespace( self.k8s_api_manager, self.server_namespace @@ -112,6 +112,7 @@ def initKubernetesServerRunner(self) -> GammaServerRunner: enable_workload_identity=self.enable_workload_identity, termination_grace_period_seconds=self.termination_grace_period_seconds, pre_stop_hook=self.pre_stop_hook, + **kwargs, ) def startTestClient( diff --git a/framework/xds_k8s_testcase.py b/framework/xds_k8s_testcase.py index 00fc125e..57389a91 100644 --- a/framework/xds_k8s_testcase.py +++ b/framework/xds_k8s_testcase.py @@ -683,11 +683,11 @@ def initTrafficDirectorManager(self) -> TrafficDirectorManager: raise NotImplementedError @abc.abstractmethod - def initKubernetesServerRunner(self) -> KubernetesServerRunner: + def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner: raise NotImplementedError @abc.abstractmethod - def initKubernetesClientRunner(self) -> KubernetesClientRunner: + def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: raise NotImplementedError def tearDown(self): @@ -795,7 +795,7 @@ def initTrafficDirectorManager(self) -> TrafficDirectorManager: compute_api_version=self.compute_api_version, ) - def initKubernetesServerRunner(self) -> KubernetesServerRunner: + def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner: return KubernetesServerRunner( k8s.KubernetesNamespace( self.k8s_api_manager, self.server_namespace @@ -810,9 +810,10 @@ def initKubernetesServerRunner(self) -> KubernetesServerRunner: network=self.network, debug_use_port_forwarding=self.debug_use_port_forwarding, enable_workload_identity=self.enable_workload_identity, + **kwargs, ) - def initKubernetesClientRunner(self) -> KubernetesClientRunner: + def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: return KubernetesClientRunner( k8s.KubernetesNamespace( self.k8s_api_manager, self.client_namespace @@ -829,6 +830,7 @@ def initKubernetesClientRunner(self) -> KubernetesClientRunner: enable_workload_identity=self.enable_workload_identity, stats_port=self.client_port, reuse_namespace=self.server_namespace == self.client_namespace, + **kwargs, ) def startTestServers( @@ -904,7 +906,7 @@ def initTrafficDirectorManager(self) -> TrafficDirectorSecureManager: compute_api_version=self.compute_api_version, ) - def initKubernetesServerRunner(self) -> KubernetesServerRunner: + def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner: return KubernetesServerRunner( k8s.KubernetesNamespace( self.k8s_api_manager, self.server_namespace @@ -919,9 +921,10 @@ def initKubernetesServerRunner(self) -> KubernetesServerRunner: xds_server_uri=self.xds_server_uri, deployment_template="server-secure.deployment.yaml", debug_use_port_forwarding=self.debug_use_port_forwarding, + **kwargs, ) - def initKubernetesClientRunner(self) -> KubernetesClientRunner: + def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: return KubernetesClientRunner( k8s.KubernetesNamespace( self.k8s_api_manager, self.client_namespace @@ -938,6 +941,7 @@ def initKubernetesClientRunner(self) -> KubernetesClientRunner: stats_port=self.client_port, reuse_namespace=self.server_namespace == self.client_namespace, debug_use_port_forwarding=self.debug_use_port_forwarding, + **kwargs, ) def startSecureTestServer(self, replica_count=1, **kwargs) -> XdsTestServer: diff --git a/kubernetes-manifests/client.deployment.yaml b/kubernetes-manifests/client.deployment.yaml index 1e7eeff7..2709722c 100644 --- a/kubernetes-manifests/client.deployment.yaml +++ b/kubernetes-manifests/client.deployment.yaml @@ -66,6 +66,26 @@ spec: value: "true" - name: GRPC_EXPERIMENTAL_XDS_ENABLE_OVERRIDE_HOST value: "true" + % if csm_workload_name: + - name: CSM_WORKLOAD_NAME + value: ${csm_workload_name} + % endif + % if csm_canonical_service_name: + - name: CSM_CANONICAL_SERVICE_NAME + value: ${csm_canonical_service_name} + % endif + % if enable_csm_observability: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE_NAME + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OTEL_RESOURCE_ATTRIBUTES + value: k8s.pod.name=$(POD_NAME),k8s.namespace.name=$(NAMESPACE_NAME) + % endif volumeMounts: - mountPath: /tmp/grpc-xds/ name: grpc-td-conf diff --git a/kubernetes-manifests/server.deployment.yaml b/kubernetes-manifests/server.deployment.yaml index 4389c70b..8c755fd0 100644 --- a/kubernetes-manifests/server.deployment.yaml +++ b/kubernetes-manifests/server.deployment.yaml @@ -52,6 +52,26 @@ spec: value: "true" - name: GRPC_EXPERIMENTAL_XDS_ENABLE_OVERRIDE_HOST value: "true" + % if csm_workload_name: + - name: CSM_WORKLOAD_NAME + value: ${csm_workload_name} + % endif + % if csm_canonical_service_name: + - name: CSM_CANONICAL_SERVICE_NAME + value: ${csm_canonical_service_name} + % endif + % if enable_csm_observability: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE_NAME + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OTEL_RESOURCE_ATTRIBUTES + value: k8s.pod.name=$(POD_NAME),k8s.namespace.name=$(NAMESPACE_NAME) + % endif volumeMounts: - mountPath: /tmp/grpc-xds/ name: grpc-td-conf diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index 55523d71..9db3c089 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -11,24 +11,131 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import dataclasses import logging +import time +from typing import Callable, Iterable +import unittest.mock from absl import flags from absl.testing import absltest -from google.api_core import exceptions as gapi_errors from google.cloud import monitoring_v3 +import yaml from framework import xds_gamma_testcase from framework import xds_k8s_testcase from framework.helpers import skips +from framework.test_app.runners.k8s import gamma_server_runner +from framework.test_app.runners.k8s import k8s_xds_client_runner logger = logging.getLogger(__name__) flags.adopt_module_key_flags(xds_k8s_testcase) +# Type aliases _XdsTestServer = xds_k8s_testcase.XdsTestServer _XdsTestClient = xds_k8s_testcase.XdsTestClient _Lang = skips.Lang +# Testing consts +TEST_RUN_SECS = 90 +REQUEST_PAYLOAD_SIZE = 271828 +RESPONSE_PAYLOAD_SIZE = 314159 +GRPC_METHOD_NAME = "grpc.testing.TestService/UnaryCall" +CSM_WORKLOAD_NAME_SERVER = "csm_workload_name_from_server" +CSM_WORKLOAD_NAME_CLIENT = "csm_workload_name_from_client" +CSM_CANONICAL_SERVICE_NAME_SERVER = "csm_canonical_service_name_from_server" +CSM_CANONICAL_SERVICE_NAME_CLIENT = "csm_canonical_service_name_from_client" +PROMETHEUS_HOST = "prometheus.googleapis.com" +METRIC_CLIENT_ATTEMPT_SENT = ( + f"{PROMETHEUS_HOST}/" + "grpc_client_attempt_sent_total_compressed_message_size_bytes/histogram" +) +METRIC_CLIENT_ATTEMPT_RCVD = ( + f"{PROMETHEUS_HOST}/" + "grpc_client_attempt_rcvd_total_compressed_message_size_bytes/histogram" +) +METRIC_CLIENT_ATTEMPT_DURATION = ( + f"{PROMETHEUS_HOST}/grpc_client_attempt_duration_seconds/histogram" +) +METRIC_CLIENT_ATTEMPT_STARTED = ( + f"{PROMETHEUS_HOST}/grpc_client_attempt_started_total/counter" +) +METRIC_SERVER_CALL_RCVD = ( + f"{PROMETHEUS_HOST}/" + "grpc_server_call_rcvd_total_compressed_message_size_bytes/histogram" +) +METRIC_SERVER_CALL_SENT = ( + f"{PROMETHEUS_HOST}/" + "grpc_server_call_sent_total_compressed_message_size_bytes/histogram" +) +METRIC_SERVER_CALL_DURATION = ( + f"{PROMETHEUS_HOST}/grpc_server_call_duration_seconds/histogram" +) +METRIC_SERVER_CALL_STARTED = ( + f"{PROMETHEUS_HOST}/grpc_server_call_started_total/counter" +) +HISTOGRAM_CLIENT_METRICS = ( + METRIC_CLIENT_ATTEMPT_SENT, + METRIC_CLIENT_ATTEMPT_RCVD, + METRIC_CLIENT_ATTEMPT_DURATION, +) +HISTOGRAM_SERVER_METRICS = ( + METRIC_SERVER_CALL_DURATION, + METRIC_SERVER_CALL_RCVD, + METRIC_SERVER_CALL_SENT, +) +COUNTER_CLIENT_METRICS = (METRIC_CLIENT_ATTEMPT_STARTED,) +COUNTER_SERVER_METRICS = (METRIC_SERVER_CALL_STARTED,) +HISTOGRAM_METRICS = HISTOGRAM_CLIENT_METRICS + HISTOGRAM_SERVER_METRICS +COUNTER_METRICS = COUNTER_CLIENT_METRICS + COUNTER_SERVER_METRICS +CLIENT_METRICS = HISTOGRAM_CLIENT_METRICS + COUNTER_CLIENT_METRICS +SERVER_METRICS = HISTOGRAM_SERVER_METRICS + COUNTER_SERVER_METRICS +ALL_METRICS = HISTOGRAM_METRICS + COUNTER_METRICS + +GammaServerRunner = gamma_server_runner.GammaServerRunner +KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner +BuildQueryFn = Callable[[str], str] +ANY = unittest.mock.ANY + + +@dataclasses.dataclass(eq=False) +class MetricTimeSeries: + """ + This class represents one TimeSeries object + from monitoring_v3.ListTimeSeriesResponse. + """ + + # the metric name + name: str + # each time series has a monitored resource + resource_type: str + # each time series has a set of metric labels + metric_labels: dict[str, str] + # each time series has a set of monitored resource labels + resource_labels: dict[str, str] + # each time series has a set of data points + points: list[monitoring_v3.types.Point] + + @classmethod + def from_response( + cls, + name: str, + response: monitoring_v3.types.TimeSeries, + ) -> "MetricTimeSeries": + return cls( + name=name, + resource_type=response.resource.type, + metric_labels=dict(sorted(response.metric.labels.items())), + resource_labels=dict(sorted(response.resource.labels.items())), + points=list(response.points), + ) + + def pretty_print(self) -> str: + metric = dataclasses.asdict(self) + # too much noise to print all data points from a time series + metric.pop("points") + return yaml.dump(metric, sort_keys=False) + class CsmObservabilityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): metric_client: monitoring_v3.MetricServiceClient @@ -45,39 +152,299 @@ def setUpClass(cls): super().setUpClass() cls.metric_client = cls.gcp_api_manager.monitoring_metric_service("v3") + # These parameters are more pertaining to the test itself, not to + # each run(). + def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: + return super().initKubernetesClientRunner( + csm_workload_name=CSM_WORKLOAD_NAME_CLIENT, + csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_CLIENT, + ) + + # These parameters are more pertaining to the test itself, not to + # each run(). + def initKubernetesServerRunner(self, **kwargs) -> GammaServerRunner: + return super().initKubernetesServerRunner( + csm_workload_name=CSM_WORKLOAD_NAME_SERVER, + csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER, + ) + + def assertAtLeastOnePointWithinRange( + self, + points: list[monitoring_v3.types.Point], + ref_bytes: int, + tolerance: float = 0.05, + ): + """ + A helper function to check whether at least one of the "points" whose + mean should be within X% of ref_bytes. + """ + for point in points: + if ( + ref_bytes * (1 - tolerance) + < point.value.distribution_value.mean + < ref_bytes * (1 + tolerance) + ): + return + self.fail( + f"No data point with {ref_bytes}±{tolerance*100}% bytes found" + ) + + @classmethod + def build_histogram_query(cls, metric_type: str) -> str: + # + # The list_time_series API requires us to query one metric + # at a time. + # + # The 'grpc_status = "OK"' filter condition is needed because + # some time series data points were logged when the grpc_status + # was "UNAVAILABLE" when the client/server were establishing + # connections. + # + # The 'grpc_method' filter condition is needed because the + # server metrics are also serving on the Channelz requests. + # + return ( + f'metric.type = "{metric_type}" AND ' + 'metric.labels.grpc_status = "OK" AND ' + f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"' + ) + + @classmethod + def build_counter_query(cls, metric_type: str) -> str: + # For these num rpcs started counter metrics, they do not have the + # 'grpc_status' label + return ( + f'metric.type = "{metric_type}" AND ' + f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"' + ) + + def query_metrics( + self, + metric_names: Iterable[str], + build_query_fn: BuildQueryFn, + interval: monitoring_v3.TimeInterval, + ) -> dict[str, MetricTimeSeries]: + """ + A helper function to make the cloud monitoring API call to query + metrics created by this test run. + """ + results = {} + for metric in metric_names: + logger.info("Requesting list_time_series for metric %s", metric) + response = self.metric_client.list_time_series( + name=f"projects/{self.project}", + filter=build_query_fn(metric), + interval=interval, + view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + ) + time_series = list(response) + + self.assertLen( + time_series, + 1, + msg=f"Query for {metric} should return exactly 1 time series." + f" Found {len(time_series)}.", + ) + + metric_time_series = MetricTimeSeries.from_response( + metric, time_series[0] + ) + logger.info( + "Metric %s:\n%s", metric, metric_time_series.pretty_print() + ) + results[metric] = metric_time_series + return results + def test_csm_observability(self): # TODO(sergiitk): [GAMMA] Consider moving out custom gamma # resource creation out of self.startTestServers() with self.subTest("1_run_test_server"): + start_secs = int(time.time()) test_server: _XdsTestServer = self.startTestServers( - enable_csm_observability=True + enable_csm_observability=True, )[0] with self.subTest("2_start_test_client"): test_client: _XdsTestClient = self.startTestClient( test_server, enable_csm_observability=True, - request_payload_size=271828, - response_payload_size=314159, + request_payload_size=REQUEST_PAYLOAD_SIZE, + response_payload_size=RESPONSE_PAYLOAD_SIZE, ) with self.subTest("3_test_server_received_rpcs_from_test_client"): self.assertSuccessfulRpcs(test_client) - # For now, this just makes a bogus call to ensure metrics client - # connected to the remote API service. - with self.subTest("4_check_monitoring_metric_client"): - with self.assertRaises(gapi_errors.GoogleAPICallError) as cm: - self.metric_client.list_metric_descriptors( - request=monitoring_v3.ListMetricDescriptorsRequest( - name="whatever", - ) + with self.subTest("4_query_cloud_monitoring_metrics"): + logger.info( + "Letting test client run for %d seconds to produce metric data", + TEST_RUN_SECS, + ) + time.sleep(TEST_RUN_SECS) + end_secs = int(time.time()) + interval = monitoring_v3.TimeInterval( + start_time={"seconds": start_secs}, + end_time={"seconds": end_secs}, + ) + histogram_results = self.query_metrics( + HISTOGRAM_METRICS, self.build_histogram_query, interval + ) + counter_results = self.query_metrics( + COUNTER_METRICS, self.build_counter_query, interval + ) + all_results = {**histogram_results, **counter_results} + self.assertNotEmpty(all_results, msg="No query metrics results") + + with self.subTest("5_check_metrics_time_series"): + for metric in ALL_METRICS: + # Every metric needs to exist in the query results + self.assertIn(metric, all_results) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("6_check_metrics_labels_histogram_client"): + expected_metric_labels = { + "csm_mesh_id": ANY, + "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER, + "csm_remote_workload_cluster_name": ANY, + "csm_remote_workload_location": ANY, + "csm_remote_workload_name": CSM_WORKLOAD_NAME_SERVER, + "csm_remote_workload_namespace_name": self.server_namespace, + "csm_remote_workload_project_id": self.project, + "csm_remote_workload_type": "gcp_kubernetes_engine", + "csm_service_name": self.server_runner.service_name, + "csm_service_namespace_name": self.server_namespace, + "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, + "grpc_method": GRPC_METHOD_NAME, + "grpc_status": "OK", + "grpc_target": ANY, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_client.hostname, + } + for metric in HISTOGRAM_CLIENT_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("7_check_metrics_labels_histogram_server"): + expected_metric_labels = { + "csm_mesh_id": ANY, + "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, + "csm_remote_workload_cluster_name": ANY, + "csm_remote_workload_location": ANY, + "csm_remote_workload_name": CSM_WORKLOAD_NAME_CLIENT, + "csm_remote_workload_namespace_name": self.client_namespace, + "csm_remote_workload_project_id": self.project, + "csm_remote_workload_type": "gcp_kubernetes_engine", + "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER, + "grpc_method": GRPC_METHOD_NAME, + "grpc_status": "OK", + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_server.hostname, + } + for metric in HISTOGRAM_SERVER_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("8_check_metrics_labels_counter_client"): + expected_metric_labels = { + "grpc_method": GRPC_METHOD_NAME, + "grpc_target": ANY, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_client.hostname, + } + for metric in COUNTER_CLIENT_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("9_check_metrics_labels_counter_server"): + expected_metric_labels = { + "grpc_method": GRPC_METHOD_NAME, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_server.hostname, + } + for metric in COUNTER_SERVER_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the right set of monitored resource + # label keys and values + with self.subTest("10_check_client_resource_labels_client"): + # all metrics should have the same set of monitored resource labels + # keys, which come from the GMP job + expected_resource_labels = { + "cluster": ANY, + "instance": ANY, + "job": self.client_runner.pod_monitoring_name, + "location": ANY, + "namespace": self.client_namespace, + "project_id": self.project, + } + for metric in CLIENT_METRICS: + metric_time_series = all_results[metric] + self.assertEqual( + "prometheus_target", metric_time_series.resource_type + ) + + actual_resource_labels = metric_time_series.resource_labels + self.assertDictEqual( + expected_resource_labels, actual_resource_labels + ) + + # Testing whether each metric has the right set of monitored resource + # label keys and values + with self.subTest("11_check_server_resource_labels_server"): + # all metrics should have the same set of monitored resource labels + # keys, which come from the GMP job + expected_resource_labels = { + "cluster": ANY, + "instance": ANY, + "job": self.server_runner.pod_monitoring_name, + "location": ANY, + "namespace": self.server_namespace, + "project_id": self.project, + } + for metric in SERVER_METRICS: + metric_time_series = all_results[metric] + self.assertEqual( + "prometheus_target", metric_time_series.resource_type + ) + + actual_resource_labels = metric_time_series.resource_labels + self.assertDictEqual( + expected_resource_labels, actual_resource_labels + ) + + # This tests whether each of the "bytes sent" histogram type metric + # should have at least 1 data point whose mean should converge to be + # close to the number of bytes being sent by the RPCs. + with self.subTest("12_check_bytes_sent_vs_data_points"): + for metric in (METRIC_CLIENT_ATTEMPT_SENT, METRIC_SERVER_CALL_RCVD): + self.assertAtLeastOnePointWithinRange( + all_results[metric].points, REQUEST_PAYLOAD_SIZE + ) + + for metric in (METRIC_CLIENT_ATTEMPT_RCVD, METRIC_SERVER_CALL_SENT): + self.assertAtLeastOnePointWithinRange( + all_results[metric].points, RESPONSE_PAYLOAD_SIZE ) - err = cm.exception - self.assertIsInstance(err, gapi_errors.InvalidArgument) - self.assertIsNotNone(err.grpc_status_code) - self.assertStartsWith(err.message, "Name must begin with") - self.assertEndsWith(err.message, " got: whatever") if __name__ == "__main__":