From 78ed0fe99fa95fcfe6c6cf7fb1819832eda89a9f Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Fri, 31 May 2024 18:41:31 +0000 Subject: [PATCH] PR review comment changes --- .../runners/k8s/gamma_server_runner.py | 45 +++---------------- .../test_app/runners/k8s/k8s_base_runner.py | 2 +- .../runners/k8s/k8s_xds_client_runner.py | 6 +-- .../runners/k8s/k8s_xds_server_runner.py | 43 ++++++++++++------ tests/app_net_csm_observability_test.py | 2 +- tests/gamma/csm_observability_test.py | 3 +- 6 files changed, 42 insertions(+), 59 deletions(-) diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index 18a8c9d0..253dd96e 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -14,6 +14,7 @@ """ Run xDS Test Client on Kubernetes using Gamma """ +import dataclasses import datetime import logging from typing import Final, Optional @@ -30,10 +31,14 @@ logger = logging.getLogger(__name__) -ServerDeploymentArgs = k8s_xds_server_runner.ServerDeploymentArgs KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner +@dataclasses.dataclass(frozen=True) +class ServerDeploymentArgs(k8s_xds_server_runner.ServerDeploymentArgs): + """Gamma Server Deployment Args""" + + class GammaServerRunner(KubernetesServerRunner): # Mutable state. route: Optional[k8s.GammaHttpRoute] = None @@ -207,17 +212,7 @@ def run( # pylint: disable=arguments-differ **self.deployment_args.as_dict(), ) - # Create a PodMonitoring resource if CSM Observability is enabled - # This is GMP (Google Managed Prometheus) - if self.deployment_args.enable_csm_observability: - self.pod_monitoring_name = f"{self.deployment_id}-gmp" - self.pod_monitoring = self._create_pod_monitoring( - "csm/pod-monitoring.yaml", - namespace_name=self.k8s_namespace.name, - deployment_id=self.deployment_id, - pod_monitoring_name=self.pod_monitoring_name, - pod_monitoring_port=self.DEFAULT_MONITORING_PORT, - ) + self._setup_csm_observability() servers = self._make_servers_for_deployment( replica_count, @@ -282,32 +277,6 @@ def create_backend_policy( draining_timeout_sec=draining_timeout_sec, ) - def _xds_test_server_for_pod( - self, - pod: k8s.V1Pod, - *, - test_port: int = KubernetesServerRunner.DEFAULT_TEST_PORT, - maintenance_port: Optional[int] = None, - secure_mode: bool = False, - monitoring_port: Optional[int] = None, - ) -> XdsTestServer: - if self.deployment_args.enable_csm_observability: - if self.debug_use_port_forwarding: - pf = self._start_port_forwarding_pod( - pod, self.DEFAULT_MONITORING_PORT - ) - monitoring_port = pf.local_port - else: - monitoring_port = self.DEFAULT_MONITORING_PORT - - return super()._xds_test_server_for_pod( - pod=pod, - test_port=test_port, - maintenance_port=maintenance_port, - secure_mode=secure_mode, - monitoring_port=monitoring_port, - ) - @override def cleanup(self, *, force=False, force_namespace=False): try: diff --git a/framework/test_app/runners/k8s/k8s_base_runner.py b/framework/test_app/runners/k8s/k8s_base_runner.py index 2e1075cc..867f50c7 100644 --- a/framework/test_app/runners/k8s/k8s_base_runner.py +++ b/framework/test_app/runners/k8s/k8s_base_runner.py @@ -76,7 +76,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): # Pylint wants abstract classes to override abstract methods. # pylint: disable=abstract-method - DEFAULT_MONITORING_PORT = 9464 + DEFAULT_POD_MONITORING_PORT = 9464 TEMPLATE_DIR_NAME = "kubernetes-manifests" TEMPLATE_DIR_RELATIVE_PATH = f"../../../../{TEMPLATE_DIR_NAME}" ROLE_WORKLOAD_IDENTITY_USER = "roles/iam.workloadIdentityUser" diff --git a/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/framework/test_app/runners/k8s/k8s_xds_client_runner.py index f3a7d48c..9c5110af 100644 --- a/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -199,7 +199,7 @@ def run( # pylint: disable=arguments-differ namespace_name=self.k8s_namespace.name, deployment_id=self.deployment_id, pod_monitoring_name=self.pod_monitoring_name, - pod_monitoring_port=self.DEFAULT_MONITORING_PORT, + pod_monitoring_port=self.DEFAULT_POD_MONITORING_PORT, ) # We don't support for multiple client replicas at the moment. @@ -236,13 +236,13 @@ def _xds_test_client_for_pod( rpc_port, rpc_host = pf.local_port, pf.local_address if self.deployment_args.enable_csm_observability: pf = self._start_port_forwarding_pod( - pod, self.DEFAULT_MONITORING_PORT + pod, self.DEFAULT_POD_MONITORING_PORT ) monitoring_port = pf.local_port else: rpc_port, rpc_host = self.stats_port, None if self.deployment_args.enable_csm_observability: - monitoring_port = self.DEFAULT_MONITORING_PORT + monitoring_port = self.DEFAULT_POD_MONITORING_PORT return client_app.XdsTestClient( ip=pod.status.pod_ip, diff --git a/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/framework/test_app/runners/k8s/k8s_xds_server_runner.py index 65c26db2..ffe60c39 100644 --- a/framework/test_app/runners/k8s/k8s_xds_server_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -66,8 +66,6 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): td_bootstrap_image: str xds_server_uri: str network: str - pod_monitoring: Optional[k8s.PodMonitoring] = None - pod_monitoring_name: Optional[str] = None # Server Deployment args deployment_args: ServerDeploymentArgs @@ -80,6 +78,8 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): # Below is mutable state associated with the current run. service: Optional[k8s.V1Service] = None replica_count: int = 0 + pod_monitoring: Optional[k8s.PodMonitoring] = None + pod_monitoring_name: Optional[str] = None # A map from pod names to the server app. pods_to_servers: dict[str, XdsTestServer] @@ -261,6 +261,16 @@ def run( # pylint: disable=arguments-differ,too-many-branches **self.deployment_args.as_dict(), ) + self._setup_csm_observability() + + return self._make_servers_for_deployment( + replica_count, + test_port=test_port, + maintenance_port=maintenance_port, + secure_mode=secure_mode, + ) + + def _setup_csm_observability(self) -> None: # Create a PodMonitoring resource if CSM Observability is enabled # This is GMP (Google Managed Prometheus) if self.deployment_args.enable_csm_observability: @@ -270,16 +280,9 @@ def run( # pylint: disable=arguments-differ,too-many-branches namespace_name=self.k8s_namespace.name, deployment_id=self.deployment_id, pod_monitoring_name=self.pod_monitoring_name, - pod_monitoring_port=self.DEFAULT_MONITORING_PORT, + pod_monitoring_port=self.DEFAULT_POD_MONITORING_PORT, ) - return self._make_servers_for_deployment( - replica_count, - test_port=test_port, - maintenance_port=maintenance_port, - secure_mode=secure_mode, - ) - def _make_servers_for_deployment( self, replica_count, @@ -337,15 +340,15 @@ def _xds_test_server_for_pod( if self.debug_use_port_forwarding: pf = self._start_port_forwarding_pod(pod, maintenance_port) rpc_port, rpc_host = pf.local_port, pf.local_address - if self.deployment_args.enable_csm_observability: + if self.should_collect_logs_prometheus: pf = self._start_port_forwarding_pod( - pod, self.DEFAULT_MONITORING_PORT + pod, self.DEFAULT_POD_MONITORING_PORT ) monitoring_port = pf.local_port else: rpc_port, rpc_host = maintenance_port, None - if self.deployment_args.enable_csm_observability: - monitoring_port = self.DEFAULT_MONITORING_PORT + if self.should_collect_logs_prometheus: + monitoring_port = self.DEFAULT_POD_MONITORING_PORT server = XdsTestServer( ip=pod.status.pod_ip, @@ -359,6 +362,13 @@ def _xds_test_server_for_pod( self.pods_to_servers[pod.metadata.name] = server return server + @property + def should_collect_logs_prometheus(self): + return ( + self.deployment_args.enable_csm_observability + and self.should_collect_logs + ) + @override def stop_pod_dependencies(self, *, log_drain_sec: int = 0): # Call pre-stop hook release if exists. @@ -424,6 +434,11 @@ def cleanup(self, *, force=False, force_namespace=False): ) self._delete_service_account(self.service_account_name) self.service_account = None + # Pod monitoring name is only set when CSM observability is enabled. + if self.pod_monitoring_name and (self.pod_monitoring or force): + self._delete_pod_monitoring(self.pod_monitoring_name) + self.pod_monitoring = None + self.pod_monitoring_name = None self._cleanup_namespace(force=(force_namespace and force)) finally: self._stop() diff --git a/tests/app_net_csm_observability_test.py b/tests/app_net_csm_observability_test.py index f9089d5a..51b8f98a 100644 --- a/tests/app_net_csm_observability_test.py +++ b/tests/app_net_csm_observability_test.py @@ -248,7 +248,7 @@ def test_csm_observability(self): "Letting test client run for %d seconds to produce metric data", TEST_RUN_SECS, ) - if self.server_runner.should_collect_logs: + if self.server_runner.should_collect_logs_prometheus: self._sleep_and_ping_prometheus_endpoint( test_server, test_client ) diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index 102b9857..9cbbbb8e 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -102,7 +102,6 @@ ClientDeploymentArgs = k8s_xds_client_runner.ClientDeploymentArgs KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner ServerDeploymentArgs = k8s_xds_server_runner.ServerDeploymentArgs -KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner BuildQueryFn = Callable[[str, str], str] ANY = unittest.mock.ANY @@ -231,7 +230,7 @@ def test_csm_observability(self): "Letting test client run for %d seconds to produce metric data", TEST_RUN_SECS, ) - if self.server_runner.should_collect_logs: + if self.server_runner.should_collect_logs_prometheus: self._sleep_and_ping_prometheus_endpoint( test_server, test_client )