From 81b567ee9ffe7cebc1005700d593a65b5ef6dc5f Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 7 Feb 2024 08:11:35 +0000 Subject: [PATCH 01/16] [CSM O11y test] Ping GMP endpoint regularly for debug purpose - Ping GMP endpoint in client and server pod curl localhost:9464/metrics during test - This will provide insight into what GMP gets from the OTel plugin before sending metrics to Cloud Monitoring --- tests/gamma/csm_observability_test.py | 33 ++++++++++++++++++++++++++- 1 file changed, 32 insertions(+), 1 deletion(-) diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index bd5a03bc..5828cc59 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -22,6 +22,7 @@ from google.api_core import exceptions as gapi_errors from google.api_core import retry as gapi_retries from google.cloud import monitoring_v3 +from kubernetes.stream import stream import yaml from framework import xds_gamma_testcase @@ -195,7 +196,22 @@ def test_csm_observability(self): "Letting test client run for %d seconds to produce metric data", TEST_RUN_SECS, ) - time.sleep(TEST_RUN_SECS) + for i in range(0, TEST_RUN_SECS // 10): + time.sleep(10) + logger.info( + self.ping_gmp_endpoint( + test_client.hostname, + self.client_namespace, + self.client_runner.deployment_name, + ) + ) + logger.info( + self.ping_gmp_endpoint( + test_server.hostname, + self.server_namespace, + self.server_runner.deployment_name, + ) + ) end_secs = int(time.time()) interval = monitoring_v3.TimeInterval( start_time={"seconds": start_secs}, @@ -468,6 +484,21 @@ def assertAtLeastOnePointWithinRange( f"No data point with {ref_bytes}±{tolerance*100}% bytes found" ) + def ping_gmp_endpoint( + self, pod_name: str, namespace_name: str, container_name: str + ) -> str: + return stream( + self.k8s_api_manager.core.connect_get_namespaced_pod_exec, + pod_name, + namespace_name, + container=container_name, + command=["/bin/sh", "-c", "curl -s localhost:9464/metrics"], + stderr=True, + stdin=False, + stdout=True, + tty=False, + ) + if __name__ == "__main__": absltest.main() From d2c1e976cd70b42c9b4e112d4fa14c86f571e4fb Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 7 Feb 2024 20:53:05 +0000 Subject: [PATCH 02/16] Use requests instead of pod exec - But this doesn't work yet. Can't quite figure out what's the exact request URL to use. Tried quite a few combinations and none of them works. --- tests/gamma/csm_observability_test.py | 32 +++++---------------------- 1 file changed, 5 insertions(+), 27 deletions(-) diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index 5828cc59..c2f0526c 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -22,7 +22,7 @@ from google.api_core import exceptions as gapi_errors from google.api_core import retry as gapi_retries from google.cloud import monitoring_v3 -from kubernetes.stream import stream +import requests import yaml from framework import xds_gamma_testcase @@ -199,17 +199,10 @@ def test_csm_observability(self): for i in range(0, TEST_RUN_SECS // 10): time.sleep(10) logger.info( - self.ping_gmp_endpoint( - test_client.hostname, - self.client_namespace, - self.client_runner.deployment_name, - ) - ) - logger.info( - self.ping_gmp_endpoint( - test_server.hostname, - self.server_namespace, - self.server_runner.deployment_name, + requests.get( + f"http://{test_server.hostname}" + f".{self.server_namespace}.pod.cluster.local" + ":9464/metrics" ) ) end_secs = int(time.time()) @@ -484,21 +477,6 @@ def assertAtLeastOnePointWithinRange( f"No data point with {ref_bytes}±{tolerance*100}% bytes found" ) - def ping_gmp_endpoint( - self, pod_name: str, namespace_name: str, container_name: str - ) -> str: - return stream( - self.k8s_api_manager.core.connect_get_namespaced_pod_exec, - pod_name, - namespace_name, - container=container_name, - command=["/bin/sh", "-c", "curl -s localhost:9464/metrics"], - stderr=True, - stdin=False, - stdout=True, - tty=False, - ) - if __name__ == "__main__": absltest.main() From 916a6c9757cad02288a900fb4121fa25390ad329 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 8 Feb 2024 00:18:55 +0000 Subject: [PATCH 03/16] Use port forwarding to ping GMP endpoint - `enable_csm_observability` is now a constructor parameter for the client runner and the server runner, instead of a `run()` parameter. - If `enable_csm_observability`, we want to do a port forwarding for the `pod_monitoring_port = 9464`, such that we can issue a GET request against it in the CSM test - Added `monitoring_port` and `monitoring_host` as class variable to the client runner and server runner. - Override `_xds_test_server_for_pod` in the `GammaServerRunner` class - Minimize the places where we need to hardcode the port `9464` --- .../runners/k8s/gamma_server_runner.py | 38 +++++++++++++++++-- .../test_app/runners/k8s/k8s_base_runner.py | 2 + .../runners/k8s/k8s_xds_client_runner.py | 21 ++++++++-- kubernetes-manifests/csm/pod-monitoring.yaml | 2 +- tests/gamma/csm_observability_test.py | 32 ++++++++++++---- 5 files changed, 80 insertions(+), 15 deletions(-) diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index e9a3ea20..34198509 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -41,9 +41,13 @@ class GammaServerRunner(KubernetesServerRunner): pre_stop_hook: bool = False pod_monitoring: Optional[k8s.PodMonitoring] = None pod_monitoring_name: Optional[str] = None + pod_monitoring_port: int = 9464 + monitoring_port: Optional[int] = None + monitoring_host: Optional[str] = None route_name: str frontend_service_name: str + enable_csm_observability: bool csm_workload_name: str csm_canonical_service_name: str @@ -77,6 +81,7 @@ def __init__( bepolicy_name: str = "backend-policy", termination_grace_period_seconds: int = 0, pre_stop_hook: bool = False, + enable_csm_observability: bool = False, csm_workload_name: str = "", csm_canonical_service_name: str = "", ): @@ -111,6 +116,7 @@ def __init__( self.bepolicy_name = bepolicy_name self.termination_grace_period_seconds = termination_grace_period_seconds self.pre_stop_hook = pre_stop_hook + self.enable_csm_observability = enable_csm_observability self.csm_workload_name = csm_workload_name self.csm_canonical_service_name = csm_canonical_service_name @@ -124,7 +130,6 @@ def run( # pylint: disable=arguments-differ log_to_stdout: bool = False, bootstrap_version: Optional[str] = None, route_template: str = "gamma/route_http.yaml", - enable_csm_observability: bool = False, generate_mesh_id: bool = False, ) -> List[XdsTestServer]: if not maintenance_port: @@ -210,7 +215,7 @@ def run( # pylint: disable=arguments-differ bootstrap_version=bootstrap_version, termination_grace_period_seconds=self.termination_grace_period_seconds, pre_stop_hook=self.pre_stop_hook, - enable_csm_observability=enable_csm_observability, + enable_csm_observability=self.enable_csm_observability, generate_mesh_id=generate_mesh_id, csm_workload_name=self.csm_workload_name, csm_canonical_service_name=self.csm_canonical_service_name, @@ -218,13 +223,14 @@ def run( # pylint: disable=arguments-differ # Create a PodMonitoring resource if CSM Observability is enabled # This is GMP (Google Managed Prometheus) - if enable_csm_observability: + if self.enable_csm_observability: self.pod_monitoring_name = f"{self.deployment_id}-gmp" self.pod_monitoring = self._create_pod_monitoring( "csm/pod-monitoring.yaml", namespace_name=self.k8s_namespace.name, deployment_id=self.deployment_id, pod_monitoring_name=self.pod_monitoring_name, + pod_monitoring_port=self.pod_monitoring_port, ) servers = self._make_servers_for_deployment( @@ -271,6 +277,32 @@ def createBackendPolicy(self): service_name=self.service_name, ) + def _xds_test_server_for_pod( + self, + pod: k8s.V1Pod, + *, + test_port: int = KubernetesServerRunner.DEFAULT_TEST_PORT, + maintenance_port: Optional[int] = None, + secure_mode: bool = False, + ) -> XdsTestServer: + if self.enable_csm_observability: + if self.debug_use_port_forwarding: + pf = self._start_port_forwarding_pod( + pod, self.pod_monitoring_port + ) + self.monitoring_port = pf.local_port + self.monitoring_host = pf.local_address + else: + self.monitoring_port = self.pod_monitoring_port + self.monitoring_host = pod.status.pod_ip + + return super()._xds_test_server_for_pod( + pod=pod, + test_port=test_port, + maintenance_port=maintenance_port, + secure_mode=secure_mode, + ) + # pylint: disable=arguments-differ def cleanup(self, *, force=False, force_namespace=False): try: diff --git a/framework/test_app/runners/k8s/k8s_base_runner.py b/framework/test_app/runners/k8s/k8s_base_runner.py index b3d6462e..2f993cbf 100644 --- a/framework/test_app/runners/k8s/k8s_base_runner.py +++ b/framework/test_app/runners/k8s/k8s_base_runner.py @@ -306,6 +306,7 @@ def _create_pod_monitoring( namespace_name: str, deployment_id: str, pod_monitoring_name: str, + pod_monitoring_port: int, **kwargs, ) -> k8s.PodMonitoring: pod_monitoring = self._create_from_template( @@ -314,6 +315,7 @@ def _create_pod_monitoring( namespace_name=namespace_name, deployment_id=deployment_id, pod_monitoring_name=pod_monitoring_name, + pod_monitoring_port=pod_monitoring_port, **kwargs, ) if not ( diff --git a/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/framework/test_app/runners/k8s/k8s_xds_client_runner.py index ead7fd79..7fbe9e8c 100644 --- a/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -34,6 +34,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): debug_use_port_forwarding: bool td_bootstrap_image: str network: str + enable_csm_observability: bool csm_workload_name: str csm_canonical_service_name: str @@ -43,6 +44,9 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): gcp_iam: Optional[gcp.iam.IamV1] = None pod_monitoring: Optional[k8s.PodMonitoring] = None pod_monitoring_name: Optional[str] = None + pod_monitoring_port: int = 9464 + monitoring_port: Optional[int] = None + monitoring_host: Optional[str] = None def __init__( # pylint: disable=too-many-locals self, @@ -64,6 +68,7 @@ def __init__( # pylint: disable=too-many-locals namespace_template: Optional[str] = None, debug_use_port_forwarding: bool = False, enable_workload_identity: bool = True, + enable_csm_observability: bool = False, csm_workload_name: str = "", csm_canonical_service_name: str = "", ): @@ -83,6 +88,7 @@ def __init__( # pylint: disable=too-many-locals self.deployment_template = deployment_template self.enable_workload_identity = enable_workload_identity self.debug_use_port_forwarding = debug_use_port_forwarding + self.enable_csm_observability = enable_csm_observability self.csm_workload_name = csm_workload_name self.csm_canonical_service_name = csm_canonical_service_name @@ -112,7 +118,6 @@ def run( # pylint: disable=arguments-differ generate_mesh_id=False, print_response=False, log_to_stdout: bool = False, - enable_csm_observability: bool = False, request_payload_size: int = 0, response_payload_size: int = 0, ) -> XdsTestClient: @@ -171,20 +176,21 @@ def run( # pylint: disable=arguments-differ config_mesh=config_mesh, generate_mesh_id=generate_mesh_id, print_response=print_response, - enable_csm_observability=enable_csm_observability, + enable_csm_observability=self.enable_csm_observability, csm_workload_name=self.csm_workload_name, csm_canonical_service_name=self.csm_canonical_service_name, ) # Create a PodMonitoring resource if CSM Observability is enabled # This is GMP (Google Managed Prometheus) - if enable_csm_observability: + if self.enable_csm_observability: self.pod_monitoring_name = f"{self.deployment_id}-gmp" self.pod_monitoring = self._create_pod_monitoring( "csm/pod-monitoring.yaml", namespace_name=self.k8s_namespace.name, deployment_id=self.deployment_id, pod_monitoring_name=self.pod_monitoring_name, + pod_monitoring_port=self.pod_monitoring_port, ) # Load test client pod. We need only one client at the moment @@ -205,8 +211,17 @@ def _xds_test_client_for_pod( if self.debug_use_port_forwarding: pf = self._start_port_forwarding_pod(pod, self.stats_port) rpc_port, rpc_host = pf.local_port, pf.local_address + if self.enable_csm_observability: + pf = self._start_port_forwarding_pod( + pod, self.pod_monitoring_port + ) + self.monitoring_port = pf.local_port + self.monitoring_host = pf.local_address else: rpc_port, rpc_host = self.stats_port, None + if self.enable_csm_observability: + self.monitoring_port = self.pod_monitoring_port + self.monitoring_host = pod.status.pod_ip return XdsTestClient( ip=pod.status.pod_ip, diff --git a/kubernetes-manifests/csm/pod-monitoring.yaml b/kubernetes-manifests/csm/pod-monitoring.yaml index 3204f0f9..398f07b5 100644 --- a/kubernetes-manifests/csm/pod-monitoring.yaml +++ b/kubernetes-manifests/csm/pod-monitoring.yaml @@ -11,5 +11,5 @@ spec: matchLabels: deployment_id: ${deployment_id} endpoints: - - port: 9464 + - port: ${pod_monitoring_port} interval: 10s diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index c2f0526c..c68ced11 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -159,6 +159,7 @@ def setUpClass(cls): # each run(). def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: return super().initKubernetesClientRunner( + enable_csm_observability=True, csm_workload_name=CSM_WORKLOAD_NAME_CLIENT, csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_CLIENT, ) @@ -167,6 +168,7 @@ def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: # each run(). def initKubernetesServerRunner(self, **kwargs) -> GammaServerRunner: return super().initKubernetesServerRunner( + enable_csm_observability=True, csm_workload_name=CSM_WORKLOAD_NAME_SERVER, csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER, ) @@ -176,14 +178,11 @@ def test_csm_observability(self): # resource creation out of self.startTestServers() with self.subTest("1_run_test_server"): start_secs = int(time.time()) - test_server: _XdsTestServer = self.startTestServers( - enable_csm_observability=True, - )[0] + test_server: _XdsTestServer = self.startTestServers()[0] with self.subTest("2_start_test_client"): test_client: _XdsTestClient = self.startTestClient( test_server, - enable_csm_observability=True, request_payload_size=REQUEST_PAYLOAD_SIZE, response_payload_size=RESPONSE_PAYLOAD_SIZE, ) @@ -199,10 +198,15 @@ def test_csm_observability(self): for i in range(0, TEST_RUN_SECS // 10): time.sleep(10) logger.info( - requests.get( - f"http://{test_server.hostname}" - f".{self.server_namespace}.pod.cluster.local" - ":9464/metrics" + self.ping_gmp_endpoint( + self.server_runner.monitoring_host, + self.server_runner.monitoring_port, + ) + ) + logger.info( + self.ping_gmp_endpoint( + self.client_runner.monitoring_host, + self.client_runner.monitoring_port, ) ) end_secs = int(time.time()) @@ -477,6 +481,18 @@ def assertAtLeastOnePointWithinRange( f"No data point with {ref_bytes}±{tolerance*100}% bytes found" ) + def ping_gmp_endpoint( + self, monitoring_host: str, monitoring_port: int + ) -> str: + """ + A helper function to ping the GMP endpoint to get what GMP sees + from the OTel exporter before passing metrics to Cloud Monitoring. + """ + gmp_log = requests.get( + f"http://{monitoring_host}:{monitoring_port}/metrics" + ) + return "\n".join(gmp_log.text.splitlines()) + if __name__ == "__main__": absltest.main() From b9caa3327e4c1819c4e045662e2be442f46fbe8b Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 8 Feb 2024 00:28:00 +0000 Subject: [PATCH 04/16] Add requests to requirements.txt --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 3be913f9..11600948 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,5 @@ packaging~=23.1 Pygments~=2.9 python-dateutil~=2.8 protobuf~=4.24 +requests==2.31.0 xds-protos==1.58.0rc1 From 3ab0f3182177bfd5591bba09748d822ea0e7259b Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 8 Feb 2024 00:43:17 +0000 Subject: [PATCH 05/16] try catch RequestException --- tests/gamma/csm_observability_test.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index c68ced11..638795db 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -23,6 +23,7 @@ from google.api_core import retry as gapi_retries from google.cloud import monitoring_v3 import requests +from requests.exceptions import RequestException import yaml from framework import xds_gamma_testcase @@ -488,10 +489,16 @@ def ping_gmp_endpoint( A helper function to ping the GMP endpoint to get what GMP sees from the OTel exporter before passing metrics to Cloud Monitoring. """ - gmp_log = requests.get( - f"http://{monitoring_host}:{monitoring_port}/metrics" - ) - return "\n".join(gmp_log.text.splitlines()) + try: + gmp_log = requests.get( + f"http://{monitoring_host}:{monitoring_port}/metrics" + ) + return "\n".join(gmp_log.text.splitlines()) + except RequestException as e: + logger.error("Http request to GMP endpoint failed: %r", e) + # It's OK the caller will receive nothing in case of an exception. + # Caller can continue. + return "" if __name__ == "__main__": From 54a4e73786309476ffbdea5bfba8cb5eb0ccd093 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 21 Feb 2024 03:21:04 +0000 Subject: [PATCH 06/16] Review changes - Added a PrometheusLogger class to write the prometheus endpoint log to a separate log file related to logs_subdir - monitoring_port is now stored in XdsTestClient and XdsTestServer rather than the runner - use rpc_host instead of a separate monitoring_host - put the constant DEFAULT_MONITORING_PORT in the KubernetesBaseRunner - fixed requirements.txt and requirements.lock - removed a previous hack is_legit_time_series() - renamed function to ping_prometheus_endpoint instead of referring it as GMP --- framework/test_app/client_app.py | 3 + .../runners/k8s/gamma_server_runner.py | 14 +-- .../test_app/runners/k8s/k8s_base_runner.py | 1 + .../runners/k8s/k8s_xds_client_runner.py | 14 +-- .../runners/k8s/k8s_xds_server_runner.py | 2 + framework/test_app/server_app.py | 3 + requirements.lock | 2 +- requirements.txt | 2 +- tests/gamma/csm_observability_test.py | 107 +++++++++++------- 9 files changed, 84 insertions(+), 64 deletions(-) diff --git a/framework/test_app/client_app.py b/framework/test_app/client_app.py index 81b6384b..5d2db0df 100644 --- a/framework/test_app/client_app.py +++ b/framework/test_app/client_app.py @@ -67,6 +67,7 @@ def __init__( hostname: str, rpc_host: Optional[str] = None, maintenance_port: Optional[int] = None, + monitoring_port: Optional[int] = None, ): super().__init__(rpc_host=(rpc_host or ip)) self.ip = ip @@ -74,6 +75,8 @@ def __init__( self.server_target = server_target self.maintenance_port = maintenance_port or rpc_port self.hostname = hostname + self.rpc_host = rpc_host + self.monitoring_port = monitoring_port @property @functools.lru_cache(None) diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index 34198509..bfced0b4 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -41,9 +41,6 @@ class GammaServerRunner(KubernetesServerRunner): pre_stop_hook: bool = False pod_monitoring: Optional[k8s.PodMonitoring] = None pod_monitoring_name: Optional[str] = None - pod_monitoring_port: int = 9464 - monitoring_port: Optional[int] = None - monitoring_host: Optional[str] = None route_name: str frontend_service_name: str @@ -230,7 +227,7 @@ def run( # pylint: disable=arguments-differ namespace_name=self.k8s_namespace.name, deployment_id=self.deployment_id, pod_monitoring_name=self.pod_monitoring_name, - pod_monitoring_port=self.pod_monitoring_port, + pod_monitoring_port=self.DEFAULT_MONITORING_PORT, ) servers = self._make_servers_for_deployment( @@ -288,19 +285,18 @@ def _xds_test_server_for_pod( if self.enable_csm_observability: if self.debug_use_port_forwarding: pf = self._start_port_forwarding_pod( - pod, self.pod_monitoring_port + pod, self.DEFAULT_MONITORING_PORT ) - self.monitoring_port = pf.local_port - self.monitoring_host = pf.local_address + monitoring_port = pf.local_port else: - self.monitoring_port = self.pod_monitoring_port - self.monitoring_host = pod.status.pod_ip + monitoring_port = self.DEFAULT_MONITORING_PORT return super()._xds_test_server_for_pod( pod=pod, test_port=test_port, maintenance_port=maintenance_port, secure_mode=secure_mode, + monitoring_port=monitoring_port, ) # pylint: disable=arguments-differ diff --git a/framework/test_app/runners/k8s/k8s_base_runner.py b/framework/test_app/runners/k8s/k8s_base_runner.py index 2f993cbf..a5eaaa40 100644 --- a/framework/test_app/runners/k8s/k8s_base_runner.py +++ b/framework/test_app/runners/k8s/k8s_base_runner.py @@ -55,6 +55,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): # Pylint wants abstract classes to override abstract methods. # pylint: disable=abstract-method + DEFAULT_MONITORING_PORT = 9464 TEMPLATE_DIR_NAME = "kubernetes-manifests" TEMPLATE_DIR_RELATIVE_PATH = f"../../../../{TEMPLATE_DIR_NAME}" ROLE_WORKLOAD_IDENTITY_USER = "roles/iam.workloadIdentityUser" diff --git a/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/framework/test_app/runners/k8s/k8s_xds_client_runner.py index 7fbe9e8c..f9eeb3ab 100644 --- a/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -44,9 +44,6 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): gcp_iam: Optional[gcp.iam.IamV1] = None pod_monitoring: Optional[k8s.PodMonitoring] = None pod_monitoring_name: Optional[str] = None - pod_monitoring_port: int = 9464 - monitoring_port: Optional[int] = None - monitoring_host: Optional[str] = None def __init__( # pylint: disable=too-many-locals self, @@ -190,7 +187,7 @@ def run( # pylint: disable=arguments-differ namespace_name=self.k8s_namespace.name, deployment_id=self.deployment_id, pod_monitoring_name=self.pod_monitoring_name, - pod_monitoring_port=self.pod_monitoring_port, + pod_monitoring_port=self.DEFAULT_MONITORING_PORT, ) # Load test client pod. We need only one client at the moment @@ -213,15 +210,13 @@ def _xds_test_client_for_pod( rpc_port, rpc_host = pf.local_port, pf.local_address if self.enable_csm_observability: pf = self._start_port_forwarding_pod( - pod, self.pod_monitoring_port + pod, self.DEFAULT_MONITORING_PORT ) - self.monitoring_port = pf.local_port - self.monitoring_host = pf.local_address + monitoring_port = pf.local_port else: rpc_port, rpc_host = self.stats_port, None if self.enable_csm_observability: - self.monitoring_port = self.pod_monitoring_port - self.monitoring_host = pod.status.pod_ip + monitoring_port = self.DEFAULT_MONITORING_PORT return XdsTestClient( ip=pod.status.pod_ip, @@ -229,6 +224,7 @@ def _xds_test_client_for_pod( server_target=server_target, hostname=pod.metadata.name, rpc_host=rpc_host, + monitoring_port=monitoring_port, ) # pylint: disable=arguments-differ diff --git a/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/framework/test_app/runners/k8s/k8s_xds_server_runner.py index 90615fd9..7e0edf8b 100644 --- a/framework/test_app/runners/k8s/k8s_xds_server_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -265,6 +265,7 @@ def _xds_test_server_for_pod( test_port: int = DEFAULT_TEST_PORT, maintenance_port: Optional[int] = None, secure_mode: bool = False, + monitoring_port: Optional[int] = None, ) -> XdsTestServer: if maintenance_port is None: maintenance_port = self._get_default_maintenance_port(secure_mode) @@ -282,6 +283,7 @@ def _xds_test_server_for_pod( maintenance_port=rpc_port, secure_mode=secure_mode, rpc_host=rpc_host, + monitoring_port=monitoring_port, ) # pylint: disable=arguments-differ diff --git a/framework/test_app/server_app.py b/framework/test_app/server_app.py index 57444b73..18d33bc1 100644 --- a/framework/test_app/server_app.py +++ b/framework/test_app/server_app.py @@ -52,6 +52,7 @@ def __init__( xds_host: Optional[str] = None, xds_port: Optional[int] = None, rpc_host: Optional[str] = None, + monitoring_port: Optional[str] = None, ): super().__init__(rpc_host=(rpc_host or ip)) self.ip = ip @@ -60,6 +61,8 @@ def __init__( self.maintenance_port = maintenance_port or rpc_port self.secure_mode = secure_mode self.xds_host, self.xds_port = xds_host, xds_port + self.rpc_host = rpc_host + self.monitoring_port = monitoring_port @property @functools.lru_cache(None) diff --git a/requirements.lock b/requirements.lock index 06b22cf5..b64268d3 100644 --- a/requirements.lock +++ b/requirements.lock @@ -15,6 +15,7 @@ packaging==23.1 Pygments==2.14.0 python-dateutil==2.8.2 protobuf==4.24.1 +requests==2.31.0 xds-protos==1.58.0rc1 ## The following requirements were added by pip freeze: cachetools==5.3.1 @@ -34,7 +35,6 @@ proto-plus==1.22.3 pyasn1==0.5.0 pyasn1-modules==0.3.0 pyparsing==3.1.1 -requests==2.31.0 requests-oauthlib==1.3.1 rsa==4.9 uritemplate==3.0.1 diff --git a/requirements.txt b/requirements.txt index 3349f081..5fbd991a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,5 +15,5 @@ packaging~=23.1 Pygments~=2.9 python-dateutil~=2.8 protobuf~=4.24 -requests==2.31.0 +requests~=2.31.0 xds-protos==1.58.0rc1 diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index 00d87447..16ccdb43 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -14,7 +14,7 @@ import dataclasses import logging import time -from typing import Callable, Iterable +from typing import Callable, Iterable, TextIO import unittest.mock from absl import flags @@ -30,6 +30,7 @@ from framework import xds_k8s_testcase from framework.helpers import skips from framework.test_app.runners.k8s import gamma_server_runner +from framework.test_app.runners.k8s import k8s_base_runner from framework.test_app.runners.k8s import k8s_xds_client_runner logger = logging.getLogger(__name__) @@ -141,6 +142,27 @@ def pretty_print(self) -> str: return yaml.dump(metric, sort_keys=False) +class PrometheusLogger: + def __init__( + self, k8s_runner: k8s_base_runner.KubernetesBaseRunner, pod_name: str + ): + logfile_name = ( + f"{k8s_runner.k8s_namespace.name}_{pod_name}_prometheus.log" + ) + log_path = k8s_runner.logs_subdir / logfile_name + self.log_stream: TextIO = open( + log_path, "w", errors="ignore", encoding="utf-8" + ) + + def write(self, line): + self.log_stream.write(line) + self.log_stream.write("\n") + self.log_stream.flush() + + def close(self): + self.log_stream.close() + + class CsmObservabilityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): metric_client: monitoring_v3.MetricServiceClient @@ -191,25 +213,37 @@ def test_csm_observability(self): with self.subTest("3_test_server_received_rpcs_from_test_client"): self.assertSuccessfulRpcs(test_client) - with self.subTest("4_query_cloud_monitoring_metrics"): + with self.subTest("4_export_prometheus_metrics_data"): logger.info( "Letting test client run for %d seconds to produce metric data", TEST_RUN_SECS, ) - for i in range(0, TEST_RUN_SECS // 10): - time.sleep(10) - logger.info( - self.ping_gmp_endpoint( - self.server_runner.monitoring_host, - self.server_runner.monitoring_port, + server_prometheus_logger = PrometheusLogger( + self.server_runner, test_server.hostname + ) + client_prometheus_logger = PrometheusLogger( + self.client_runner, test_client.hostname + ) + try: + for i in range(0, TEST_RUN_SECS // 10): + time.sleep(10) + server_prometheus_logger.write( + self.ping_prometheus_endpoint( + test_server.rpc_host, + test_server.monitoring_port, + ) ) - ) - logger.info( - self.ping_gmp_endpoint( - self.client_runner.monitoring_host, - self.client_runner.monitoring_port, + client_prometheus_logger.write( + self.ping_prometheus_endpoint( + test_client.rpc_host, + test_client.monitoring_port, + ) ) - ) + finally: + server_prometheus_logger.close() + client_prometheus_logger.close() + + with self.subTest("5_query_cloud_monitoring_metrics"): end_secs = int(time.time()) interval = monitoring_v3.TimeInterval( start_time={"seconds": start_secs}, @@ -247,14 +281,14 @@ def test_csm_observability(self): } self.assertNotEmpty(all_results, msg="No query metrics results") - with self.subTest("5_check_metrics_time_series"): + with self.subTest("6_check_metrics_time_series"): for metric in ALL_METRICS: # Every metric needs to exist in the query results self.assertIn(metric, all_results) # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("6_check_metrics_labels_histogram_client"): + with self.subTest("7_check_metrics_labels_histogram_client"): expected_metric_labels = { "csm_mesh_id": ANY, "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER, @@ -282,7 +316,7 @@ def test_csm_observability(self): # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("7_check_metrics_labels_histogram_server"): + with self.subTest("8_check_metrics_labels_histogram_server"): expected_metric_labels = { "csm_mesh_id": ANY, "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, @@ -307,7 +341,7 @@ def test_csm_observability(self): # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("8_check_metrics_labels_counter_client"): + with self.subTest("9_check_metrics_labels_counter_client"): expected_metric_labels = { "grpc_method": GRPC_METHOD_NAME, "grpc_target": ANY, @@ -323,7 +357,7 @@ def test_csm_observability(self): # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("9_check_metrics_labels_counter_server"): + with self.subTest("10_check_metrics_labels_counter_server"): expected_metric_labels = { "grpc_method": GRPC_METHOD_NAME, "otel_scope_name": ANY, @@ -338,7 +372,7 @@ def test_csm_observability(self): # Testing whether each metric has the right set of monitored resource # label keys and values - with self.subTest("10_check_client_resource_labels_client"): + with self.subTest("11_check_client_resource_labels_client"): # all metrics should have the same set of monitored resource labels # keys, which come from the GMP job expected_resource_labels = { @@ -362,7 +396,7 @@ def test_csm_observability(self): # Testing whether each metric has the right set of monitored resource # label keys and values - with self.subTest("11_check_server_resource_labels_server"): + with self.subTest("12_check_server_resource_labels_server"): # all metrics should have the same set of monitored resource labels # keys, which come from the GMP job expected_resource_labels = { @@ -387,7 +421,7 @@ def test_csm_observability(self): # This tests whether each of the "bytes sent" histogram type metric # should have at least 1 data point whose mean should converge to be # close to the number of bytes being sent by the RPCs. - with self.subTest("12_check_bytes_sent_vs_data_points"): + with self.subTest("13_check_bytes_sent_vs_data_points"): for metric in (METRIC_CLIENT_ATTEMPT_SENT, METRIC_SERVER_CALL_RCVD): self.assertAtLeastOnePointWithinRange( all_results[metric].points, REQUEST_PAYLOAD_SIZE @@ -471,7 +505,7 @@ def query_metrics( view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, retry=retry_settings, ) - time_series = list(filter(self.is_legit_time_series, response)) + time_series = list(response) self.assertLen( time_series, @@ -489,21 +523,6 @@ def query_metrics( results[metric] = metric_time_series return results - @classmethod - def is_legit_time_series( - cls, series: monitoring_v3.types.TimeSeries - ) -> bool: - for point in series.points: - # Test flake: we found some time series with no actual data point - # in prod test runs. - # Here we will only include time series with actual data in it. - if point.value.distribution_value.count or point.value.double_value: - return True - logger.warning( - "Warning: found time_series with no valid data point\n%s", series - ) - return False - def assertAtLeastOnePointWithinRange( self, points: list[monitoring_v3.types.Point], @@ -525,20 +544,20 @@ def assertAtLeastOnePointWithinRange( f"No data point with {ref_bytes}±{tolerance*100}% bytes found" ) - def ping_gmp_endpoint( + def ping_prometheus_endpoint( self, monitoring_host: str, monitoring_port: int ) -> str: """ - A helper function to ping the GMP endpoint to get what GMP sees - from the OTel exporter before passing metrics to Cloud Monitoring. + A helper function to ping the pod's Prometheus endpoint to get what GMP + sees from the OTel exporter before passing metrics to Cloud Monitoring. """ try: - gmp_log = requests.get( + prometheus_log = requests.get( f"http://{monitoring_host}:{monitoring_port}/metrics" ) - return "\n".join(gmp_log.text.splitlines()) + return "\n".join(prometheus_log.text.splitlines()) except RequestException as e: - logger.error("Http request to GMP endpoint failed: %r", e) + logger.warning("Http request to Prometheus endpoint failed: %r", e) # It's OK the caller will receive nothing in case of an exception. # Caller can continue. return "" From d02508326776bdca45fde2a1a840e6f584f44807 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 21 Feb 2024 03:39:14 +0000 Subject: [PATCH 07/16] remove unnecessary variable which broke unit test --- framework/test_app/client_app.py | 1 - framework/test_app/server_app.py | 1 - 2 files changed, 2 deletions(-) diff --git a/framework/test_app/client_app.py b/framework/test_app/client_app.py index 5d2db0df..cad955de 100644 --- a/framework/test_app/client_app.py +++ b/framework/test_app/client_app.py @@ -75,7 +75,6 @@ def __init__( self.server_target = server_target self.maintenance_port = maintenance_port or rpc_port self.hostname = hostname - self.rpc_host = rpc_host self.monitoring_port = monitoring_port @property diff --git a/framework/test_app/server_app.py b/framework/test_app/server_app.py index 18d33bc1..ae9d3140 100644 --- a/framework/test_app/server_app.py +++ b/framework/test_app/server_app.py @@ -61,7 +61,6 @@ def __init__( self.maintenance_port = maintenance_port or rpc_port self.secure_mode = secure_mode self.xds_host, self.xds_port = xds_host, xds_port - self.rpc_host = rpc_host self.monitoring_port = monitoring_port @property From 1c0f3dd43f05b015d7a48f88bf67d773aacade0f Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 21 Feb 2024 03:43:11 +0000 Subject: [PATCH 08/16] fix pylint for function signature --- framework/test_app/runners/k8s/gamma_server_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index bfced0b4..a7401a42 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -281,6 +281,7 @@ def _xds_test_server_for_pod( test_port: int = KubernetesServerRunner.DEFAULT_TEST_PORT, maintenance_port: Optional[int] = None, secure_mode: bool = False, + monitoring_port: Optional[int] = None, ) -> XdsTestServer: if self.enable_csm_observability: if self.debug_use_port_forwarding: From 90c533ce8d70f6f032b7dc4a8996fab912e03ed2 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 21 Feb 2024 04:33:36 +0000 Subject: [PATCH 09/16] fixed variable initialization --- framework/test_app/runners/k8s/k8s_xds_client_runner.py | 1 + 1 file changed, 1 insertion(+) diff --git a/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/framework/test_app/runners/k8s/k8s_xds_client_runner.py index f9eeb3ab..4611ba22 100644 --- a/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -205,6 +205,7 @@ def run( # pylint: disable=arguments-differ def _xds_test_client_for_pod( self, pod: k8s.V1Pod, *, server_target: str ) -> XdsTestClient: + monitoring_port = None if self.debug_use_port_forwarding: pf = self._start_port_forwarding_pod(pod, self.stats_port) rpc_port, rpc_host = pf.local_port, pf.local_address From 01f4c4d894d669768cdcd246a72b72f15c435584 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 21 Feb 2024 18:31:45 +0000 Subject: [PATCH 10/16] add comment for temp debugging class --- tests/gamma/csm_observability_test.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index 16ccdb43..1f663403 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -142,6 +142,10 @@ def pretty_print(self) -> str: return yaml.dump(metric, sort_keys=False) +# This class is purely for debugging purposes. We want to log what we see +# from the Prometheus endpoint before being sent to Cloud Monitoring. +# Once we determined the root cause of b/323596669 we can remove this +# class. class PrometheusLogger: def __init__( self, k8s_runner: k8s_base_runner.KubernetesBaseRunner, pod_name: str From bcf43a0d7cb2930ea3e97aebb0a59a42d7cfc834 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 22 Feb 2024 23:37:02 +0000 Subject: [PATCH 11/16] Add a marker in the prometheus log files --- tests/gamma/csm_observability_test.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index 1f663403..1a6ea5f8 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -231,12 +231,19 @@ def test_csm_observability(self): try: for i in range(0, TEST_RUN_SECS // 10): time.sleep(10) + curr_secs = int(time.time()) + server_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) server_prometheus_logger.write( self.ping_prometheus_endpoint( test_server.rpc_host, test_server.monitoring_port, ) ) + client_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) client_prometheus_logger.write( self.ping_prometheus_endpoint( test_client.rpc_host, From 2f509c4851e964a0f9f597edcecc0125d3b300ab Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Wed, 17 Apr 2024 23:59:44 +0000 Subject: [PATCH 12/16] Add App Net based CSM Observability Test - Added app_net_csm_observability_test.py that's largely a copy of gamma/csm_observability_test.py - Refactor some flags to be passed to the k8s_xds_server_runner --- .../runners/k8s/gamma_server_runner.py | 9 +- .../runners/k8s/k8s_xds_server_runner.py | 32 + tests/app_net_csm_observability_test.py | 596 ++++++++++++++++++ 3 files changed, 631 insertions(+), 6 deletions(-) create mode 100644 tests/app_net_csm_observability_test.py diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index 4c5014e2..3a059ce8 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -45,9 +45,6 @@ class GammaServerRunner(KubernetesServerRunner): route_name: str frontend_service_name: str - enable_csm_observability: bool - csm_workload_name: str - csm_canonical_service_name: str SESSION_AFFINITY_FILTER_NAME: Final[str] = "ssa-filter" SESSION_AFFINITY_POLICY_NAME: Final[str] = "ssa-policy" @@ -105,14 +102,14 @@ def __init__( namespace_template=namespace_template, debug_use_port_forwarding=debug_use_port_forwarding, enable_workload_identity=enable_workload_identity, + enable_csm_observability=enable_csm_observability, + csm_workload_name=csm_workload_name, + csm_canonical_service_name=csm_canonical_service_name, deployment_args=deployment_args, ) self.frontend_service_name = frontend_service_name self.route_name = route_name or f"route-{deployment_name}" - self.enable_csm_observability = enable_csm_observability - self.csm_workload_name = csm_workload_name - self.csm_canonical_service_name = csm_canonical_service_name @override def run( # pylint: disable=arguments-differ diff --git a/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/framework/test_app/runners/k8s/k8s_xds_server_runner.py index 0e7885d4..c0feec0f 100644 --- a/framework/test_app/runners/k8s/k8s_xds_server_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -60,6 +60,9 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): td_bootstrap_image: str xds_server_uri: str network: str + enable_csm_observability: bool + csm_workload_name: str + csm_canonical_service_name: str # Server Deployment args deployment_args: ServerDeploymentArgs @@ -98,6 +101,9 @@ def __init__( # pylint: disable=too-many-locals namespace_template: Optional[str] = None, debug_use_port_forwarding: bool = False, enable_workload_identity: bool = True, + enable_csm_observability: bool = False, + csm_workload_name: str = "", + csm_canonical_service_name: str = "", deployment_args: Optional[ServerDeploymentArgs] = None, ): super().__init__( @@ -143,6 +149,10 @@ def __init__( # pylint: disable=too-many-locals # permission to use GCP service account identity. self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project) + self.enable_csm_observability = enable_csm_observability + self.csm_workload_name = csm_workload_name + self.csm_canonical_service_name = csm_canonical_service_name + # Mutable state associated with each run. self._reset_state() @@ -248,9 +258,24 @@ def run( # pylint: disable=arguments-differ,too-many-branches maintenance_port=maintenance_port, secure_mode=secure_mode, bootstrap_version=bootstrap_version, + enable_csm_observability=self.enable_csm_observability, + csm_workload_name=self.csm_workload_name, + csm_canonical_service_name=self.csm_canonical_service_name, **self.deployment_args.as_dict(), ) + # Create a PodMonitoring resource if CSM Observability is enabled + # This is GMP (Google Managed Prometheus) + if self.enable_csm_observability: + self.pod_monitoring_name = f"{self.deployment_id}-gmp" + self.pod_monitoring = self._create_pod_monitoring( + "csm/pod-monitoring.yaml", + namespace_name=self.k8s_namespace.name, + deployment_id=self.deployment_id, + pod_monitoring_name=self.pod_monitoring_name, + pod_monitoring_port=self.DEFAULT_MONITORING_PORT, + ) + return self._make_servers_for_deployment( replica_count, test_port=test_port, @@ -313,8 +338,15 @@ def _xds_test_server_for_pod( if self.debug_use_port_forwarding: pf = self._start_port_forwarding_pod(pod, maintenance_port) rpc_port, rpc_host = pf.local_port, pf.local_address + if self.enable_csm_observability: + pf = self._start_port_forwarding_pod( + pod, self.DEFAULT_MONITORING_PORT + ) + monitoring_port = pf.local_port else: rpc_port, rpc_host = maintenance_port, None + if self.enable_csm_observability: + monitoring_port = self.DEFAULT_MONITORING_PORT server = XdsTestServer( ip=pod.status.pod_ip, diff --git a/tests/app_net_csm_observability_test.py b/tests/app_net_csm_observability_test.py new file mode 100644 index 00000000..8cdbf6a9 --- /dev/null +++ b/tests/app_net_csm_observability_test.py @@ -0,0 +1,596 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import dataclasses +import logging +import time +from typing import Callable, Iterable, TextIO +import unittest.mock + +from absl import flags +from absl.testing import absltest +from google.api_core import exceptions as gapi_errors +from google.api_core import retry as gapi_retries +from google.cloud import monitoring_v3 +import requests +from requests.exceptions import RequestException +import yaml + +from framework import xds_k8s_testcase +from framework.helpers import skips +from framework.test_app.runners.k8s import k8s_base_runner +from framework.test_app.runners.k8s import k8s_xds_client_runner +from framework.test_app.runners.k8s import k8s_xds_server_runner + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +# Type aliases +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient +_Lang = skips.Lang + +# Testing consts +TEST_RUN_SECS = 90 +REQUEST_PAYLOAD_SIZE = 271828 +RESPONSE_PAYLOAD_SIZE = 314159 +GRPC_METHOD_NAME = "grpc.testing.TestService/UnaryCall" +CSM_WORKLOAD_NAME_SERVER = "csm_workload_name_from_server" +CSM_WORKLOAD_NAME_CLIENT = "csm_workload_name_from_client" +CSM_CANONICAL_SERVICE_NAME_SERVER = "csm_canonical_service_name_from_server" +CSM_CANONICAL_SERVICE_NAME_CLIENT = "csm_canonical_service_name_from_client" +PROMETHEUS_HOST = "prometheus.googleapis.com" +METRIC_CLIENT_ATTEMPT_SENT = ( + f"{PROMETHEUS_HOST}/" + "grpc_client_attempt_sent_total_compressed_message_size_bytes/histogram" +) +METRIC_CLIENT_ATTEMPT_RCVD = ( + f"{PROMETHEUS_HOST}/" + "grpc_client_attempt_rcvd_total_compressed_message_size_bytes/histogram" +) +METRIC_CLIENT_ATTEMPT_DURATION = ( + f"{PROMETHEUS_HOST}/grpc_client_attempt_duration_seconds/histogram" +) +METRIC_CLIENT_ATTEMPT_STARTED = ( + f"{PROMETHEUS_HOST}/grpc_client_attempt_started_total/counter" +) +METRIC_SERVER_CALL_RCVD = ( + f"{PROMETHEUS_HOST}/" + "grpc_server_call_rcvd_total_compressed_message_size_bytes/histogram" +) +METRIC_SERVER_CALL_SENT = ( + f"{PROMETHEUS_HOST}/" + "grpc_server_call_sent_total_compressed_message_size_bytes/histogram" +) +METRIC_SERVER_CALL_DURATION = ( + f"{PROMETHEUS_HOST}/grpc_server_call_duration_seconds/histogram" +) +METRIC_SERVER_CALL_STARTED = ( + f"{PROMETHEUS_HOST}/grpc_server_call_started_total/counter" +) +HISTOGRAM_CLIENT_METRICS = ( + METRIC_CLIENT_ATTEMPT_SENT, + METRIC_CLIENT_ATTEMPT_RCVD, + METRIC_CLIENT_ATTEMPT_DURATION, +) +HISTOGRAM_SERVER_METRICS = ( + METRIC_SERVER_CALL_DURATION, + METRIC_SERVER_CALL_RCVD, + METRIC_SERVER_CALL_SENT, +) +COUNTER_CLIENT_METRICS = (METRIC_CLIENT_ATTEMPT_STARTED,) +COUNTER_SERVER_METRICS = (METRIC_SERVER_CALL_STARTED,) +HISTOGRAM_METRICS = HISTOGRAM_CLIENT_METRICS + HISTOGRAM_SERVER_METRICS +COUNTER_METRICS = COUNTER_CLIENT_METRICS + COUNTER_SERVER_METRICS +CLIENT_METRICS = HISTOGRAM_CLIENT_METRICS + COUNTER_CLIENT_METRICS +SERVER_METRICS = HISTOGRAM_SERVER_METRICS + COUNTER_SERVER_METRICS +ALL_METRICS = HISTOGRAM_METRICS + COUNTER_METRICS + +KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner +KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner +BuildQueryFn = Callable[[str, str], str] +ANY = unittest.mock.ANY + + +@dataclasses.dataclass(eq=False) +class MetricTimeSeries: + """ + This class represents one TimeSeries object + from monitoring_v3.ListTimeSeriesResponse. + """ + + # the metric name + name: str + # each time series has a monitored resource + resource_type: str + # each time series has a set of metric labels + metric_labels: dict[str, str] + # each time series has a set of monitored resource labels + resource_labels: dict[str, str] + # each time series has a set of data points + points: list[monitoring_v3.types.Point] + + @classmethod + def from_response( + cls, + name: str, + response: monitoring_v3.types.TimeSeries, + ) -> "MetricTimeSeries": + return cls( + name=name, + resource_type=response.resource.type, + metric_labels=dict(sorted(response.metric.labels.items())), + resource_labels=dict(sorted(response.resource.labels.items())), + points=list(response.points), + ) + + def pretty_print(self) -> str: + metric = dataclasses.asdict(self) + # too much noise to print all data points from a time series + metric.pop("points") + return yaml.dump(metric, sort_keys=False) + + +# This class is purely for debugging purposes. We want to log what we see +# from the Prometheus endpoint before being sent to Cloud Monitoring. +# Once we determined the root cause of b/323596669 we can remove this +# class. +class PrometheusLogger: + def __init__( + self, k8s_runner: k8s_base_runner.KubernetesBaseRunner, pod_name: str + ): + logfile_name = ( + f"{k8s_runner.k8s_namespace.name}_{pod_name}_prometheus.log" + ) + log_path = k8s_runner.logs_subdir / logfile_name + self.log_stream: TextIO = open( + log_path, "w", errors="ignore", encoding="utf-8" + ) + + def write(self, line): + self.log_stream.write(line) + self.log_stream.write("\n") + self.log_stream.flush() + + def close(self): + self.log_stream.close() + + +class AppNetCsmObservabilityTest(xds_k8s_testcase.AppNetXdsKubernetesTestCase): + metric_client: monitoring_v3.MetricServiceClient + + @staticmethod + def is_supported(config: skips.TestConfig) -> bool: + if config.client_lang == _Lang.CPP and config.server_lang == _Lang.CPP: + # CSM Observability Test is only supported for CPP for now. + return config.version_gte("v1.62.x") + return False + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.metric_client = cls.gcp_api_manager.monitoring_metric_service("v3") + + # These parameters are more pertaining to the test itself, not to + # each run(). + def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: + return super().initKubernetesClientRunner( + enable_csm_observability=True, + csm_workload_name=CSM_WORKLOAD_NAME_CLIENT, + csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_CLIENT, + ) + + # These parameters are more pertaining to the test itself, not to + # each run(). + def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner: + return super().initKubernetesServerRunner( + enable_csm_observability=True, + csm_workload_name=CSM_WORKLOAD_NAME_SERVER, + csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER, + ) + + def test_csm_observability(self): + with self.subTest("0_create_health_check"): + self.td.create_health_check() + + with self.subTest("1_create_backend_service"): + self.td.create_backend_service() + + with self.subTest("2_create_mesh"): + self.td.create_mesh() + + with self.subTest("3_create_grpc_route"): + self.td.create_grpc_route( + self.server_xds_host, self.server_xds_port + ) + + with self.subTest("1_run_test_server"): + start_secs = int(time.time()) + test_server: _XdsTestServer = self.startTestServers(replica_count=1)[0] + + with self.subTest("5_setup_server_backends"): + self.setupServerBackends() + + with self.subTest("2_start_test_client"): + test_client: _XdsTestClient = self.startTestClient( + test_server, + config_mesh=self.td.mesh.name, + request_payload_size=REQUEST_PAYLOAD_SIZE, + response_payload_size=RESPONSE_PAYLOAD_SIZE, + ) + + with self.subTest("7_assert_xds_config_exists"): + self.assertXdsConfigExists(test_client) + + with self.subTest("3_test_server_received_rpcs_from_test_client"): + self.assertSuccessfulRpcs(test_client) + + with self.subTest("4_export_prometheus_metrics_data"): + logger.info( + "Letting test client run for %d seconds to produce metric data", + TEST_RUN_SECS, + ) + server_prometheus_logger = PrometheusLogger( + self.server_runner, test_server.hostname + ) + client_prometheus_logger = PrometheusLogger( + self.client_runner, test_client.hostname + ) + try: + for i in range(0, TEST_RUN_SECS // 10): + time.sleep(10) + curr_secs = int(time.time()) + server_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) + server_prometheus_logger.write( + self.ping_prometheus_endpoint( + test_server.rpc_host, + test_server.monitoring_port, + ) + ) + client_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) + client_prometheus_logger.write( + self.ping_prometheus_endpoint( + test_client.rpc_host, + test_client.monitoring_port, + ) + ) + finally: + server_prometheus_logger.close() + client_prometheus_logger.close() + + with self.subTest("5_query_cloud_monitoring_metrics"): + end_secs = int(time.time()) + interval = monitoring_v3.TimeInterval( + start_time={"seconds": start_secs}, + end_time={"seconds": end_secs}, + ) + server_histogram_results = self.query_metrics( + HISTOGRAM_SERVER_METRICS, + self.build_histogram_query, + self.server_namespace, + interval, + ) + client_histogram_results = self.query_metrics( + HISTOGRAM_CLIENT_METRICS, + self.build_histogram_query, + self.client_namespace, + interval, + ) + server_counter_results = self.query_metrics( + COUNTER_SERVER_METRICS, + self.build_counter_query, + self.server_namespace, + interval, + ) + client_counter_results = self.query_metrics( + COUNTER_CLIENT_METRICS, + self.build_counter_query, + self.client_namespace, + interval, + ) + all_results = { + **server_histogram_results, + **client_histogram_results, + **server_counter_results, + **client_counter_results, + } + self.assertNotEmpty(all_results, msg="No query metrics results") + + with self.subTest("6_check_metrics_time_series"): + for metric in ALL_METRICS: + # Every metric needs to exist in the query results + self.assertIn(metric, all_results) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("7_check_metrics_labels_histogram_client"): + expected_metric_labels = { + "csm_mesh_id": ANY, + "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER, + "csm_remote_workload_cluster_name": ANY, + "csm_remote_workload_location": ANY, + "csm_remote_workload_name": CSM_WORKLOAD_NAME_SERVER, + "csm_remote_workload_namespace_name": self.server_namespace, + "csm_remote_workload_project_id": self.project, + "csm_remote_workload_type": "gcp_kubernetes_engine", + "csm_service_name": ANY, # TODO(yashykt) + "csm_service_namespace_name": ANY, # TODO(yashykt) + "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, + "grpc_method": GRPC_METHOD_NAME, + "grpc_status": "OK", + "grpc_target": ANY, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_client.hostname, + } + for metric in HISTOGRAM_CLIENT_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("8_check_metrics_labels_histogram_server"): + expected_metric_labels = { + "csm_mesh_id": ANY, + "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, + "csm_remote_workload_cluster_name": ANY, + "csm_remote_workload_location": ANY, + "csm_remote_workload_name": CSM_WORKLOAD_NAME_CLIENT, + "csm_remote_workload_namespace_name": self.client_namespace, + "csm_remote_workload_project_id": self.project, + "csm_remote_workload_type": "gcp_kubernetes_engine", + "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER, + "grpc_method": GRPC_METHOD_NAME, + "grpc_status": "OK", + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_server.hostname, + } + for metric in HISTOGRAM_SERVER_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("9_check_metrics_labels_counter_client"): + expected_metric_labels = { + "grpc_method": GRPC_METHOD_NAME, + "grpc_target": ANY, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_client.hostname, + } + for metric in COUNTER_CLIENT_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("10_check_metrics_labels_counter_server"): + expected_metric_labels = { + "grpc_method": GRPC_METHOD_NAME, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_server.hostname, + } + for metric in COUNTER_SERVER_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the right set of monitored resource + # label keys and values + with self.subTest("11_check_client_resource_labels_client"): + # all metrics should have the same set of monitored resource labels + # keys, which come from the GMP job + expected_resource_labels = { + "cluster": ANY, + "instance": ANY, + "job": self.client_runner.pod_monitoring_name, + "location": ANY, + "namespace": self.client_namespace, + "project_id": self.project, + } + for metric in CLIENT_METRICS: + metric_time_series = all_results[metric] + self.assertEqual( + "prometheus_target", metric_time_series.resource_type + ) + + actual_resource_labels = metric_time_series.resource_labels + self.assertDictEqual( + expected_resource_labels, actual_resource_labels + ) + + # Testing whether each metric has the right set of monitored resource + # label keys and values + with self.subTest("12_check_server_resource_labels_server"): + # all metrics should have the same set of monitored resource labels + # keys, which come from the GMP job + expected_resource_labels = { + "cluster": ANY, + "instance": ANY, + "job": self.server_runner.pod_monitoring_name, + "location": ANY, + "namespace": self.server_namespace, + "project_id": self.project, + } + for metric in SERVER_METRICS: + metric_time_series = all_results[metric] + self.assertEqual( + "prometheus_target", metric_time_series.resource_type + ) + + actual_resource_labels = metric_time_series.resource_labels + self.assertDictEqual( + expected_resource_labels, actual_resource_labels + ) + + # This tests whether each of the "bytes sent" histogram type metric + # should have at least 1 data point whose mean should converge to be + # close to the number of bytes being sent by the RPCs. + with self.subTest("13_check_bytes_sent_vs_data_points"): + for metric in (METRIC_CLIENT_ATTEMPT_SENT, METRIC_SERVER_CALL_RCVD): + self.assertAtLeastOnePointWithinRange( + all_results[metric].points, REQUEST_PAYLOAD_SIZE + ) + + for metric in (METRIC_CLIENT_ATTEMPT_RCVD, METRIC_SERVER_CALL_SENT): + self.assertAtLeastOnePointWithinRange( + all_results[metric].points, RESPONSE_PAYLOAD_SIZE + ) + + @classmethod + def build_histogram_query(cls, metric_type: str, namespace: str) -> str: + # + # The list_time_series API requires us to query one metric + # at a time. + # + # The 'grpc_status = "OK"' filter condition is needed because + # some time series data points were logged when the grpc_status + # was "UNAVAILABLE" when the client/server were establishing + # connections. + # + # The 'grpc_method' filter condition is needed because the + # server metrics are also serving on the Channelz requests. + # + # The 'resource.labels.namespace' filter condition allows us to + # filter metrics just for the current test run. + return ( + f'metric.type = "{metric_type}" AND ' + 'metric.labels.grpc_status = "OK" AND ' + f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}" AND ' + f'resource.labels.namespace = "{namespace}"' + ) + + @classmethod + def build_counter_query(cls, metric_type: str, namespace: str) -> str: + # For these num rpcs started counter metrics, they do not have the + # 'grpc_status' label + return ( + f'metric.type = "{metric_type}" AND ' + f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}" AND ' + f'resource.labels.namespace = "{namespace}"' + ) + + def query_metrics( + self, + metric_names: Iterable[str], + build_query_fn: BuildQueryFn, + namespace: str, + interval: monitoring_v3.TimeInterval, + ) -> dict[str, MetricTimeSeries]: + """ + A helper function to make the cloud monitoring API call to query + metrics created by this test run. + """ + # Based on default retry settings for list_time_series method: + # https://github.com/googleapis/google-cloud-python/blob/google-cloud-monitoring-v2.18.0/packages/google-cloud-monitoring/google/cloud/monitoring_v3/services/metric_service/transports/base.py#L210-L218 + # Modified: predicate extended to retry on a wider range of error types. + retry_settings = gapi_retries.Retry( + initial=0.1, + maximum=30.0, + multiplier=1.3, + predicate=gapi_retries.if_exception_type( + # Retry on 5xx, not just 503 ServiceUnavailable. This also + # covers gRPC Unknown, DataLoss, and DeadlineExceeded statuses. + # 501 MethodNotImplemented not excluded because most likely + # reason we'd see this error is server misconfiguration, so we + # want to give it a chance to recovering this situation too. + gapi_errors.ServerError, + # Retry on 429/ResourceExhausted: recoverable rate limiting. + gapi_errors.TooManyRequests, + ), + deadline=90.0, + ) + results = {} + for metric in metric_names: + logger.info("Requesting list_time_series for metric %s", metric) + response = self.metric_client.list_time_series( + name=f"projects/{self.project}", + filter=build_query_fn(metric, namespace), + interval=interval, + view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + retry=retry_settings, + ) + time_series = list(response) + + self.assertLen( + time_series, + 1, + msg=f"Query for {metric} should return exactly 1 time series." + f" Found {len(time_series)}.", + ) + + metric_time_series = MetricTimeSeries.from_response( + metric, time_series[0] + ) + logger.info( + "Metric %s:\n%s", metric, metric_time_series.pretty_print() + ) + results[metric] = metric_time_series + return results + + def assertAtLeastOnePointWithinRange( + self, + points: list[monitoring_v3.types.Point], + ref_bytes: int, + tolerance: float = 0.05, + ): + """ + A helper function to check whether at least one of the "points" whose + mean should be within X% of ref_bytes. + """ + for point in points: + if ( + ref_bytes * (1 - tolerance) + < point.value.distribution_value.mean + < ref_bytes * (1 + tolerance) + ): + return + self.fail( + f"No data point with {ref_bytes}±{tolerance*100}% bytes found" + ) + + def ping_prometheus_endpoint( + self, monitoring_host: str, monitoring_port: int + ) -> str: + """ + A helper function to ping the pod's Prometheus endpoint to get what GMP + sees from the OTel exporter before passing metrics to Cloud Monitoring. + """ + try: + prometheus_log = requests.get( + f"http://{monitoring_host}:{monitoring_port}/metrics" + ) + return "\n".join(prometheus_log.text.splitlines()) + except RequestException as e: + logger.warning("Http request to Prometheus endpoint failed: %r", e) + # It's OK the caller will receive nothing in case of an exception. + # Caller can continue. + return "" + + +if __name__ == "__main__": + absltest.main() From ee19c4ee8c4cd895b246ca420aa6d97eeae44200 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 18 Apr 2024 20:38:20 +0000 Subject: [PATCH 13/16] Fix black and pylin errors --- framework/test_app/runners/k8s/gamma_server_runner.py | 2 -- framework/test_app/runners/k8s/k8s_xds_server_runner.py | 2 ++ tests/app_net_csm_observability_test.py | 8 +++++--- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index 3a059ce8..913e13df 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -40,8 +40,6 @@ class GammaServerRunner(KubernetesServerRunner): session_affinity_filter: Optional[k8s.GcpSessionAffinityFilter] = None session_affinity_policy: Optional[k8s.GcpSessionAffinityPolicy] = None backend_policy: Optional[k8s.GcpBackendPolicy] = None - pod_monitoring: Optional[k8s.PodMonitoring] = None - pod_monitoring_name: Optional[str] = None route_name: str frontend_service_name: str diff --git a/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/framework/test_app/runners/k8s/k8s_xds_server_runner.py index c0feec0f..12b7a3f6 100644 --- a/framework/test_app/runners/k8s/k8s_xds_server_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -63,6 +63,8 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): enable_csm_observability: bool csm_workload_name: str csm_canonical_service_name: str + pod_monitoring: Optional[k8s.PodMonitoring] = None + pod_monitoring_name: Optional[str] = None # Server Deployment args deployment_args: ServerDeploymentArgs diff --git a/tests/app_net_csm_observability_test.py b/tests/app_net_csm_observability_test.py index 8cdbf6a9..c9c4a0df 100644 --- a/tests/app_net_csm_observability_test.py +++ b/tests/app_net_csm_observability_test.py @@ -216,7 +216,9 @@ def test_csm_observability(self): with self.subTest("1_run_test_server"): start_secs = int(time.time()) - test_server: _XdsTestServer = self.startTestServers(replica_count=1)[0] + test_server: _XdsTestServer = self.startTestServers( + replica_count=1 + )[0] with self.subTest("5_setup_server_backends"): self.setupServerBackends() @@ -327,8 +329,8 @@ def test_csm_observability(self): "csm_remote_workload_namespace_name": self.server_namespace, "csm_remote_workload_project_id": self.project, "csm_remote_workload_type": "gcp_kubernetes_engine", - "csm_service_name": ANY, # TODO(yashykt) - "csm_service_namespace_name": ANY, # TODO(yashykt) + "csm_service_name": ANY, # TODO(yashykt) + "csm_service_namespace_name": ANY, # TODO(yashykt) "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, "grpc_method": GRPC_METHOD_NAME, "grpc_status": "OK", From 11e6eeb2b294369d64b74b80457fc5c3465960ef Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 18 Apr 2024 23:23:30 +0000 Subject: [PATCH 14/16] Confirmed csm_service_namespace_name is supposed to be unknown in this app net test --- tests/app_net_csm_observability_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/app_net_csm_observability_test.py b/tests/app_net_csm_observability_test.py index c9c4a0df..4c56ecf6 100644 --- a/tests/app_net_csm_observability_test.py +++ b/tests/app_net_csm_observability_test.py @@ -330,7 +330,7 @@ def test_csm_observability(self): "csm_remote_workload_project_id": self.project, "csm_remote_workload_type": "gcp_kubernetes_engine", "csm_service_name": ANY, # TODO(yashykt) - "csm_service_namespace_name": ANY, # TODO(yashykt) + "csm_service_namespace_name": 'unknown', "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, "grpc_method": GRPC_METHOD_NAME, "grpc_status": "OK", From 6ac09d9d0b52ecab36983f2d5c27914606165356 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 18 Apr 2024 23:24:27 +0000 Subject: [PATCH 15/16] reran black.sh --- tests/app_net_csm_observability_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/app_net_csm_observability_test.py b/tests/app_net_csm_observability_test.py index 4c56ecf6..a9c680e8 100644 --- a/tests/app_net_csm_observability_test.py +++ b/tests/app_net_csm_observability_test.py @@ -330,7 +330,7 @@ def test_csm_observability(self): "csm_remote_workload_project_id": self.project, "csm_remote_workload_type": "gcp_kubernetes_engine", "csm_service_name": ANY, # TODO(yashykt) - "csm_service_namespace_name": 'unknown', + "csm_service_namespace_name": "unknown", "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, "grpc_method": GRPC_METHOD_NAME, "grpc_status": "OK", From 42a1ae67bf75899a8db50b02b89de672ef4eb728 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Fri, 19 Apr 2024 20:03:35 +0000 Subject: [PATCH 16/16] Update value of csm_service_name label --- tests/app_net_csm_observability_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/app_net_csm_observability_test.py b/tests/app_net_csm_observability_test.py index a9c680e8..93984f08 100644 --- a/tests/app_net_csm_observability_test.py +++ b/tests/app_net_csm_observability_test.py @@ -329,7 +329,7 @@ def test_csm_observability(self): "csm_remote_workload_namespace_name": self.server_namespace, "csm_remote_workload_project_id": self.project, "csm_remote_workload_type": "gcp_kubernetes_engine", - "csm_service_name": ANY, # TODO(yashykt) + "csm_service_name": self.td.backend_service.name, "csm_service_namespace_name": "unknown", "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, "grpc_method": GRPC_METHOD_NAME,