diff --git a/framework/infrastructure/k8s.py b/framework/infrastructure/k8s.py index 7643a3ef..a71b34c1 100644 --- a/framework/infrastructure/k8s.py +++ b/framework/infrastructure/k8s.py @@ -59,6 +59,7 @@ GcpSessionAffinityPolicy = DynResourceInstance GcpSessionAffinityFilter = DynResourceInstance GcpBackendPolicy = DynResourceInstance +PodMonitoring = DynResourceInstance _timedelta = datetime.timedelta _datetime = datetime.datetime @@ -326,6 +327,13 @@ def api_session_affinity_policy(self) -> dynamic_res.Resource: "GCPSessionAffinityPolicy", ) + @functools.cached_property # pylint: disable=no-member + def pod_monitoring(self) -> dynamic_res.Resource: + return self._get_dynamic_api( + "monitoring.googleapis.com/v1", + "PodMonitoring", + ) + @functools.cached_property # pylint: disable=no-member def api_backend_policy(self) -> dynamic_res.Resource: return self._get_dynamic_api( @@ -648,6 +656,19 @@ def delete_session_affinity_filter( grace_period_seconds=grace_period_seconds, ) + def delete_pod_monitoring( + self, + name: str, + grace_period_seconds=DELETE_GRACE_PERIOD_SEC, + ) -> None: + self._execute( + self.pod_monitoring.delete, # pylint: disable=no-member + name=name, + namespace=self.name, + propagation_policy="Foreground", + grace_period_seconds=grace_period_seconds, + ) + def delete_backend_policy( self, name: str, diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index b50c6aa4..745c6b8c 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -115,6 +115,7 @@ def run( # pylint: disable=arguments-differ log_to_stdout: bool = False, bootstrap_version: Optional[str] = None, route_template: str = "gamma/route_http.yaml", + enable_csm_observability: bool = False, ) -> List[XdsTestServer]: if not maintenance_port: maintenance_port = self._get_default_maintenance_port(secure_mode) @@ -206,6 +207,7 @@ def run( # pylint: disable=arguments-differ bootstrap_version=bootstrap_version, termination_grace_period_seconds=self.termination_grace_period_seconds, pre_stop_hook=self.pre_stop_hook, + enable_csm_observability=enable_csm_observability, ) servers = self._make_servers_for_deployment( diff --git a/framework/test_app/runners/k8s/k8s_base_runner.py b/framework/test_app/runners/k8s/k8s_base_runner.py index 3df9bd19..056f9ae5 100644 --- a/framework/test_app/runners/k8s/k8s_base_runner.py +++ b/framework/test_app/runners/k8s/k8s_base_runner.py @@ -299,6 +299,38 @@ def _reuse_namespace(self) -> k8s.V1Namespace: logger.info("Reusing namespace: %s", self.k8s_namespace.name) return self.k8s_namespace.get() + def _create_pod_monitoring( + self, + template, + *, + namespace_name: str, + deployment_id: str, + pod_monitoring_name: str, + **kwargs, + ) -> k8s.PodMonitoring: + pod_monitoring = self._create_from_template( + template, + custom_object=True, + namespace_name=namespace_name, + deployment_id=deployment_id, + pod_monitoring_name=pod_monitoring_name, + **kwargs, + ) + if not ( + isinstance(pod_monitoring, k8s.PodMonitoring) + and pod_monitoring.kind == "PodMonitoring" + ): + raise _RunnerError( + f"Expected ResourceInstance[PodMonitoring] to be created from" + f" manifest {template}" + ) + logger.debug( + "PodMonitoring %s created at %s", + pod_monitoring.metadata.name, + pod_monitoring.metadata.creation_timestamp, + ) + return pod_monitoring + def _create_namespace(self, template, **kwargs) -> k8s.V1Namespace: namespace = self._create_from_template(template, **kwargs) if not isinstance(namespace, k8s.V1Namespace): @@ -641,6 +673,20 @@ def _delete_session_affinity_filter(self, name, wait_for_deletion=True): ) logger.info("GCPSessionAffinityFilter %s deleted", name) + def _delete_pod_monitoring(self, name): + logger.info("Deleting PodMonitoring %s", name) + try: + self.k8s_namespace.delete_pod_monitoring(name) + except k8s.NotFound: + logger.debug( + "PodMonitoring %s not deleted since it doesn't exist", name + ) + return + except retryers.RetryError as e: + logger.warning("PodMonitoring %s deletion failed: %s", name, e) + return + logger.info("PodMonitoring %s deleted", name) + def _delete_backend_policy(self, name, wait_for_deletion=True): logger.info("Deleting GCPBackendPolicy %s", name) try: diff --git a/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/framework/test_app/runners/k8s/k8s_xds_client_runner.py index 21579541..610a25a7 100644 --- a/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -39,6 +39,8 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): service_account_name: Optional[str] = None service_account_template: Optional[str] = None gcp_iam: Optional[gcp.iam.IamV1] = None + pod_monitoring: Optional[k8s.PodMonitoring] = None + pod_monitoring_name: Optional[str] = None def __init__( # pylint: disable=too-many-locals self, @@ -104,6 +106,7 @@ def run( # pylint: disable=arguments-differ generate_mesh_id=False, print_response=False, log_to_stdout: bool = False, + enable_csm_observability: bool = False, ) -> XdsTestClient: logger.info( ( @@ -158,8 +161,20 @@ def run( # pylint: disable=arguments-differ config_mesh=config_mesh, generate_mesh_id=generate_mesh_id, print_response=print_response, + enable_csm_observability=enable_csm_observability, ) + # Create a PodMonitoring resource if CSM Observability is enabled + # This is GMP (Google Managed Prometheus) + if enable_csm_observability: + self.pod_monitoring_name = f"{self.deployment_id}-gmp" + self.pod_monitoring = self._create_pod_monitoring( + "csm/pod-monitoring.yaml", + namespace_name=self.k8s_namespace.name, + deployment_id=self.deployment_id, + pod_monitoring_name=self.pod_monitoring_name, + ) + # Load test client pod. We need only one client at the moment pod_name = self._wait_deployment_pod_count(self.deployment)[0] pod: k8s.V1Pod = self._wait_pod_started(pod_name) @@ -206,6 +221,11 @@ def cleanup(self, *, force=False, force_namespace=False): ) self._delete_service_account(self.service_account_name) self.service_account = None + # Pod monitoring name is only set when CSM observability is enabled. + if self.pod_monitoring_name and (self.pod_monitoring or force): + self._delete_pod_monitoring(self.pod_monitoring_name) + self.pod_monitoring = None + self.pod_monitoring_name = None self._cleanup_namespace(force=force_namespace and force) finally: self._stop() diff --git a/kubernetes-manifests/client.deployment.yaml b/kubernetes-manifests/client.deployment.yaml index 2b52c2cb..13fad0f8 100644 --- a/kubernetes-manifests/client.deployment.yaml +++ b/kubernetes-manifests/client.deployment.yaml @@ -42,6 +42,9 @@ spec: - "--rpc=${rpc}" - "--metadata=${metadata}" - "--print_response=${print_response}" + % if enable_csm_observability: + - "--enable_csm_observability" + % endif ports: - containerPort: ${stats_port} env: diff --git a/kubernetes-manifests/csm/pod-monitoring.yaml b/kubernetes-manifests/csm/pod-monitoring.yaml index bc5e53b3..3204f0f9 100644 --- a/kubernetes-manifests/csm/pod-monitoring.yaml +++ b/kubernetes-manifests/csm/pod-monitoring.yaml @@ -3,7 +3,9 @@ apiVersion: monitoring.googleapis.com/v1 kind: PodMonitoring metadata: namespace: ${namespace_name} - name: ${deployment_id}-gmp + name: ${pod_monitoring_name} + labels: + owner: xds-k8s-interop-test spec: selector: matchLabels: diff --git a/kubernetes-manifests/server.deployment.yaml b/kubernetes-manifests/server.deployment.yaml index c8ed4a6b..de823cc6 100644 --- a/kubernetes-manifests/server.deployment.yaml +++ b/kubernetes-manifests/server.deployment.yaml @@ -40,6 +40,9 @@ spec: failureThreshold: 1000 args: - "--port=${test_port}" + % if enable_csm_observability: + - "--enable_csm_observability" + % endif ports: - containerPort: ${test_port} env: diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py new file mode 100644 index 00000000..0d738f51 --- /dev/null +++ b/tests/gamma/csm_observability_test.py @@ -0,0 +1,48 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from absl import flags +from absl.testing import absltest + +from framework import xds_gamma_testcase +from framework import xds_k8s_testcase + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient + + +class CsmObservabilityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): + def test_csm_observability(self): + # TODO(sergiitk): [GAMMA] Consider moving out custom gamma + # resource creation out of self.startTestServers() + with self.subTest("1_run_test_server"): + test_server: _XdsTestServer = self.startTestServers( + enable_csm_observability=True + )[0] + + with self.subTest("2_start_test_client"): + test_client: _XdsTestClient = self.startTestClient( + test_server, enable_csm_observability=True + ) + + with self.subTest("3_test_server_received_rpcs_from_test_client"): + self.assertSuccessfulRpcs(test_client) + + +if __name__ == "__main__": + absltest.main()