diff --git a/framework/test_app/client_app.py b/framework/test_app/client_app.py index bd0d8b7d..7af242ff 100644 --- a/framework/test_app/client_app.py +++ b/framework/test_app/client_app.py @@ -67,6 +67,7 @@ def __init__( hostname: str, rpc_host: Optional[str] = None, maintenance_port: Optional[int] = None, + monitoring_port: Optional[int] = None, ): super().__init__(rpc_host=(rpc_host or ip)) self.ip = ip @@ -74,6 +75,7 @@ def __init__( self.server_target = server_target self.maintenance_port = maintenance_port or rpc_port self.hostname = hostname + self.monitoring_port = monitoring_port @property @functools.lru_cache(None) diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index 9e6b2ac5..bb1ec425 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -45,6 +45,7 @@ class GammaServerRunner(KubernetesServerRunner): route_name: str frontend_service_name: str + enable_csm_observability: bool csm_workload_name: str csm_canonical_service_name: str @@ -77,6 +78,7 @@ def __init__( namespace_template: Optional[str] = None, debug_use_port_forwarding: bool = False, enable_workload_identity: bool = True, + enable_csm_observability: bool = False, csm_workload_name: str = "", csm_canonical_service_name: str = "", deployment_args: Optional[ServerDeploymentArgs] = None, @@ -108,6 +110,7 @@ def __init__( self.frontend_service_name = frontend_service_name self.route_name = route_name or f"route-{deployment_name}" + self.enable_csm_observability = enable_csm_observability self.csm_workload_name = csm_workload_name self.csm_canonical_service_name = csm_canonical_service_name @@ -122,7 +125,6 @@ def run( # pylint: disable=arguments-differ log_to_stdout: bool = False, bootstrap_version: Optional[str] = None, route_template: str = "gamma/route_http.yaml", - enable_csm_observability: bool = False, generate_mesh_id: bool = False, ) -> list[XdsTestServer]: if not maintenance_port: @@ -209,7 +211,7 @@ def run( # pylint: disable=arguments-differ maintenance_port=maintenance_port, secure_mode=secure_mode, bootstrap_version=bootstrap_version, - enable_csm_observability=enable_csm_observability, + enable_csm_observability=self.enable_csm_observability, generate_mesh_id=generate_mesh_id, csm_workload_name=self.csm_workload_name, csm_canonical_service_name=self.csm_canonical_service_name, @@ -218,13 +220,14 @@ def run( # pylint: disable=arguments-differ # Create a PodMonitoring resource if CSM Observability is enabled # This is GMP (Google Managed Prometheus) - if enable_csm_observability: + if self.enable_csm_observability: self.pod_monitoring_name = f"{self.deployment_id}-gmp" self.pod_monitoring = self._create_pod_monitoring( "csm/pod-monitoring.yaml", namespace_name=self.k8s_namespace.name, deployment_id=self.deployment_id, pod_monitoring_name=self.pod_monitoring_name, + pod_monitoring_port=self.DEFAULT_MONITORING_PORT, ) servers = self._make_servers_for_deployment( @@ -290,6 +293,32 @@ def create_backend_policy( draining_timeout_sec=draining_timeout_sec, ) + def _xds_test_server_for_pod( + self, + pod: k8s.V1Pod, + *, + test_port: int = KubernetesServerRunner.DEFAULT_TEST_PORT, + maintenance_port: Optional[int] = None, + secure_mode: bool = False, + monitoring_port: Optional[int] = None, + ) -> XdsTestServer: + if self.enable_csm_observability: + if self.debug_use_port_forwarding: + pf = self._start_port_forwarding_pod( + pod, self.DEFAULT_MONITORING_PORT + ) + monitoring_port = pf.local_port + else: + monitoring_port = self.DEFAULT_MONITORING_PORT + + return super()._xds_test_server_for_pod( + pod=pod, + test_port=test_port, + maintenance_port=maintenance_port, + secure_mode=secure_mode, + monitoring_port=monitoring_port, + ) + @override def cleanup(self, *, force=False, force_namespace=False): try: diff --git a/framework/test_app/runners/k8s/k8s_base_runner.py b/framework/test_app/runners/k8s/k8s_base_runner.py index a25e1518..9b35dd3e 100644 --- a/framework/test_app/runners/k8s/k8s_base_runner.py +++ b/framework/test_app/runners/k8s/k8s_base_runner.py @@ -76,6 +76,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): # Pylint wants abstract classes to override abstract methods. # pylint: disable=abstract-method + DEFAULT_MONITORING_PORT = 9464 TEMPLATE_DIR_NAME = "kubernetes-manifests" TEMPLATE_DIR_RELATIVE_PATH = f"../../../../{TEMPLATE_DIR_NAME}" ROLE_WORKLOAD_IDENTITY_USER = "roles/iam.workloadIdentityUser" @@ -384,6 +385,7 @@ def _create_pod_monitoring( namespace_name: str, deployment_id: str, pod_monitoring_name: str, + pod_monitoring_port: int, **kwargs, ) -> k8s.PodMonitoring: pod_monitoring = self._create_from_template( @@ -392,6 +394,7 @@ def _create_pod_monitoring( namespace_name=namespace_name, deployment_id=deployment_id, pod_monitoring_name=pod_monitoring_name, + pod_monitoring_port=pod_monitoring_port, **kwargs, ) if not ( diff --git a/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/framework/test_app/runners/k8s/k8s_xds_client_runner.py index e566a065..f3a7d48c 100644 --- a/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -28,6 +28,7 @@ @dataclasses.dataclass(frozen=True) class ClientDeploymentArgs: + enable_csm_observability: bool = False csm_workload_name: str = "" csm_canonical_service_name: str = "" @@ -75,8 +76,6 @@ def __init__( # pylint: disable=too-many-locals namespace_template: Optional[str] = None, debug_use_port_forwarding: bool = False, enable_workload_identity: bool = True, - csm_workload_name: str = "", - csm_canonical_service_name: str = "", deployment_args: Optional[ClientDeploymentArgs] = None, ): super().__init__( @@ -98,12 +97,7 @@ def __init__( # pylint: disable=too-many-locals # Client deployment arguments. if not deployment_args: - deployment_args = ClientDeploymentArgs( - # TODO(stanleycheung): remove once https://github.com/grpc/psm-interop/pull/33 - # is merged and removed self.csm_* removed as class args. - csm_workload_name=csm_workload_name, - csm_canonical_service_name=csm_canonical_service_name, - ) + deployment_args = ClientDeploymentArgs() self.deployment_args = deployment_args # Used by the TD bootstrap generator. @@ -132,7 +126,6 @@ def run( # pylint: disable=arguments-differ generate_mesh_id=False, print_response=False, log_to_stdout: bool = False, - enable_csm_observability: bool = False, request_payload_size: int = 0, response_payload_size: int = 0, ) -> client_app.XdsTestClient: @@ -194,19 +187,19 @@ def run( # pylint: disable=arguments-differ config_mesh=config_mesh, generate_mesh_id=generate_mesh_id, print_response=print_response, - enable_csm_observability=enable_csm_observability, **self.deployment_args.as_dict(), ) # Create a PodMonitoring resource if CSM Observability is enabled # This is GMP (Google Managed Prometheus) - if enable_csm_observability: + if self.deployment_args.enable_csm_observability: self.pod_monitoring_name = f"{self.deployment_id}-gmp" self.pod_monitoring = self._create_pod_monitoring( "csm/pod-monitoring.yaml", namespace_name=self.k8s_namespace.name, deployment_id=self.deployment_id, pod_monitoring_name=self.pod_monitoring_name, + pod_monitoring_port=self.DEFAULT_MONITORING_PORT, ) # We don't support for multiple client replicas at the moment. @@ -237,11 +230,19 @@ def _make_clients_for_deployment( def _xds_test_client_for_pod( self, pod: k8s.V1Pod, *, server_target: str ) -> client_app.XdsTestClient: + monitoring_port = None if self.debug_use_port_forwarding: pf = self._start_port_forwarding_pod(pod, self.stats_port) rpc_port, rpc_host = pf.local_port, pf.local_address + if self.deployment_args.enable_csm_observability: + pf = self._start_port_forwarding_pod( + pod, self.DEFAULT_MONITORING_PORT + ) + monitoring_port = pf.local_port else: rpc_port, rpc_host = self.stats_port, None + if self.deployment_args.enable_csm_observability: + monitoring_port = self.DEFAULT_MONITORING_PORT return client_app.XdsTestClient( ip=pod.status.pod_ip, @@ -249,6 +250,7 @@ def _xds_test_client_for_pod( server_target=server_target, hostname=pod.metadata.name, rpc_host=rpc_host, + monitoring_port=monitoring_port, ) # pylint: disable=arguments-differ diff --git a/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/framework/test_app/runners/k8s/k8s_xds_server_runner.py index 2cd0caf0..0e7885d4 100644 --- a/framework/test_app/runners/k8s/k8s_xds_server_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -305,6 +305,7 @@ def _xds_test_server_for_pod( test_port: int = DEFAULT_TEST_PORT, maintenance_port: Optional[int] = None, secure_mode: bool = False, + monitoring_port: Optional[int] = None, ) -> XdsTestServer: if maintenance_port is None: maintenance_port = self._get_default_maintenance_port(secure_mode) @@ -322,6 +323,7 @@ def _xds_test_server_for_pod( maintenance_port=rpc_port, secure_mode=secure_mode, rpc_host=rpc_host, + monitoring_port=monitoring_port, ) self.pods_to_servers[pod.metadata.name] = server return server diff --git a/framework/test_app/server_app.py b/framework/test_app/server_app.py index d6d1c4ce..d6926359 100644 --- a/framework/test_app/server_app.py +++ b/framework/test_app/server_app.py @@ -52,6 +52,7 @@ def __init__( xds_host: Optional[str] = None, xds_port: Optional[int] = None, rpc_host: Optional[str] = None, + monitoring_port: Optional[str] = None, ): super().__init__(rpc_host=(rpc_host or ip)) self.ip = ip @@ -60,6 +61,7 @@ def __init__( self.maintenance_port = maintenance_port or rpc_port self.secure_mode = secure_mode self.xds_host, self.xds_port = xds_host, xds_port + self.monitoring_port = monitoring_port @property @functools.lru_cache(None) diff --git a/kubernetes-manifests/csm/pod-monitoring.yaml b/kubernetes-manifests/csm/pod-monitoring.yaml index 3204f0f9..398f07b5 100644 --- a/kubernetes-manifests/csm/pod-monitoring.yaml +++ b/kubernetes-manifests/csm/pod-monitoring.yaml @@ -11,5 +11,5 @@ spec: matchLabels: deployment_id: ${deployment_id} endpoints: - - port: 9464 + - port: ${pod_monitoring_port} interval: 10s diff --git a/requirements.lock b/requirements.lock index c0c4821a..1eefc01f 100644 --- a/requirements.lock +++ b/requirements.lock @@ -16,6 +16,7 @@ packaging==23.1 Pygments==2.14.0 python-dateutil==2.8.2 protobuf==4.24.1 +requests==2.31.0 xds-protos==1.58.0rc1 ## The following requirements were added by pip freeze: cachetools==5.3.1 @@ -35,7 +36,6 @@ proto-plus==1.22.3 pyasn1==0.5.0 pyasn1-modules==0.3.0 pyparsing==3.1.1 -requests==2.31.0 requests-oauthlib==1.3.1 rsa==4.9 uritemplate==3.0.1 diff --git a/requirements.txt b/requirements.txt index c4f1d362..f8e865a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,4 +16,5 @@ packaging~=23.1 Pygments~=2.9 python-dateutil~=2.8 protobuf~=4.24 +requests~=2.31.0 xds-protos==1.58.0rc1 diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index f1518a95..c95900c9 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -14,7 +14,7 @@ import dataclasses import logging import time -from typing import Callable, Iterable +from typing import Callable, Iterable, TextIO import unittest.mock from absl import flags @@ -22,12 +22,15 @@ from google.api_core import exceptions as gapi_errors from google.api_core import retry as gapi_retries from google.cloud import monitoring_v3 +import requests +from requests.exceptions import RequestException import yaml from framework import xds_gamma_testcase from framework import xds_k8s_testcase from framework.helpers import skips from framework.test_app.runners.k8s import gamma_server_runner +from framework.test_app.runners.k8s import k8s_base_runner from framework.test_app.runners.k8s import k8s_xds_client_runner logger = logging.getLogger(__name__) @@ -96,6 +99,7 @@ GammaServerRunner = gamma_server_runner.GammaServerRunner KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner +ClientDeploymentArgs = k8s_xds_client_runner.ClientDeploymentArgs BuildQueryFn = Callable[[str, str], str] ANY = unittest.mock.ANY @@ -139,6 +143,31 @@ def pretty_print(self) -> str: return yaml.dump(metric, sort_keys=False) +# This class is purely for debugging purposes. We want to log what we see +# from the Prometheus endpoint before being sent to Cloud Monitoring. +# Once we determined the root cause of b/323596669 we can remove this +# class. +class PrometheusLogger: + def __init__( + self, k8s_runner: k8s_base_runner.KubernetesBaseRunner, pod_name: str + ): + logfile_name = ( + f"{k8s_runner.k8s_namespace.name}_{pod_name}_prometheus.log" + ) + log_path = k8s_runner.logs_subdir / logfile_name + self.log_stream: TextIO = open( + log_path, "w", errors="ignore", encoding="utf-8" + ) + + def write(self, line): + self.log_stream.write(line) + self.log_stream.write("\n") + self.log_stream.flush() + + def close(self): + self.log_stream.close() + + class CsmObservabilityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): metric_client: monitoring_v3.MetricServiceClient @@ -158,14 +187,18 @@ def setUpClass(cls): # each run(). def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: return super().initKubernetesClientRunner( - csm_workload_name=CSM_WORKLOAD_NAME_CLIENT, - csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_CLIENT, + deployment_args=ClientDeploymentArgs( + enable_csm_observability=True, + csm_workload_name=CSM_WORKLOAD_NAME_CLIENT, + csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_CLIENT, + ) ) # These parameters are more pertaining to the test itself, not to # each run(). def initKubernetesServerRunner(self, **kwargs) -> GammaServerRunner: return super().initKubernetesServerRunner( + enable_csm_observability=True, csm_workload_name=CSM_WORKLOAD_NAME_SERVER, csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER, ) @@ -175,14 +208,11 @@ def test_csm_observability(self): # resource creation out of self.startTestServers() with self.subTest("1_run_test_server"): start_secs = int(time.time()) - test_server: _XdsTestServer = self.startTestServers( - enable_csm_observability=True, - )[0] + test_server: _XdsTestServer = self.startTestServers()[0] with self.subTest("2_start_test_client"): test_client: _XdsTestClient = self.startTestClient( test_server, - enable_csm_observability=True, request_payload_size=REQUEST_PAYLOAD_SIZE, response_payload_size=RESPONSE_PAYLOAD_SIZE, ) @@ -190,12 +220,19 @@ def test_csm_observability(self): with self.subTest("3_test_server_received_rpcs_from_test_client"): self.assertSuccessfulRpcs(test_client) - with self.subTest("4_query_cloud_monitoring_metrics"): + with self.subTest("4_export_prometheus_metrics_data"): logger.info( "Letting test client run for %d seconds to produce metric data", TEST_RUN_SECS, ) - time.sleep(TEST_RUN_SECS) + if self.server_runner.should_collect_logs: + self._sleep_and_ping_prometheus_endpoint( + test_server, test_client + ) + else: + time.sleep(TEST_RUN_SECS) + + with self.subTest("5_query_cloud_monitoring_metrics"): end_secs = int(time.time()) interval = monitoring_v3.TimeInterval( start_time={"seconds": start_secs}, @@ -233,14 +270,14 @@ def test_csm_observability(self): } self.assertNotEmpty(all_results, msg="No query metrics results") - with self.subTest("5_check_metrics_time_series"): + with self.subTest("6_check_metrics_time_series"): for metric in ALL_METRICS: # Every metric needs to exist in the query results self.assertIn(metric, all_results) # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("6_check_metrics_labels_histogram_client"): + with self.subTest("7_check_metrics_labels_histogram_client"): expected_metric_labels = { "csm_mesh_id": ANY, "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER, @@ -268,7 +305,7 @@ def test_csm_observability(self): # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("7_check_metrics_labels_histogram_server"): + with self.subTest("8_check_metrics_labels_histogram_server"): expected_metric_labels = { "csm_mesh_id": ANY, "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, @@ -293,7 +330,7 @@ def test_csm_observability(self): # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("8_check_metrics_labels_counter_client"): + with self.subTest("9_check_metrics_labels_counter_client"): expected_metric_labels = { "grpc_method": GRPC_METHOD_NAME, "grpc_target": ANY, @@ -309,7 +346,7 @@ def test_csm_observability(self): # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("9_check_metrics_labels_counter_server"): + with self.subTest("10_check_metrics_labels_counter_server"): expected_metric_labels = { "grpc_method": GRPC_METHOD_NAME, "otel_scope_name": ANY, @@ -324,7 +361,7 @@ def test_csm_observability(self): # Testing whether each metric has the right set of monitored resource # label keys and values - with self.subTest("10_check_client_resource_labels_client"): + with self.subTest("11_check_client_resource_labels_client"): # all metrics should have the same set of monitored resource labels # keys, which come from the GMP job expected_resource_labels = { @@ -348,7 +385,7 @@ def test_csm_observability(self): # Testing whether each metric has the right set of monitored resource # label keys and values - with self.subTest("11_check_server_resource_labels_server"): + with self.subTest("12_check_server_resource_labels_server"): # all metrics should have the same set of monitored resource labels # keys, which come from the GMP job expected_resource_labels = { @@ -373,7 +410,7 @@ def test_csm_observability(self): # This tests whether each of the "bytes sent" histogram type metric # should have at least 1 data point whose mean should converge to be # close to the number of bytes being sent by the RPCs. - with self.subTest("12_check_bytes_sent_vs_data_points"): + with self.subTest("13_check_bytes_sent_vs_data_points"): for metric in (METRIC_CLIENT_ATTEMPT_SENT, METRIC_SERVER_CALL_RCVD): self.assertAtLeastOnePointWithinRange( all_results[metric].points, REQUEST_PAYLOAD_SIZE @@ -457,7 +494,7 @@ def query_metrics( view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, retry=retry_settings, ) - time_series = list(filter(self.is_legit_time_series, response)) + time_series = list(response) self.assertLen( time_series, @@ -475,21 +512,6 @@ def query_metrics( results[metric] = metric_time_series return results - @classmethod - def is_legit_time_series( - cls, series: monitoring_v3.types.TimeSeries - ) -> bool: - for point in series.points: - # Test flake: we found some time series with no actual data point - # in prod test runs. - # Here we will only include time series with actual data in it. - if point.value.distribution_value.count or point.value.double_value: - return True - logger.warning( - "Warning: found time_series with no valid data point\n%s", series - ) - return False - def assertAtLeastOnePointWithinRange( self, points: list[monitoring_v3.types.Point], @@ -511,6 +533,60 @@ def assertAtLeastOnePointWithinRange( f"No data point with {ref_bytes}±{tolerance*100}% bytes found" ) + def _sleep_and_ping_prometheus_endpoint( + self, test_server: _XdsTestServer, test_client: _XdsTestClient + ): + server_prometheus_logger = PrometheusLogger( + self.server_runner, test_server.hostname + ) + client_prometheus_logger = PrometheusLogger( + self.client_runner, test_client.hostname + ) + try: + for i in range(0, TEST_RUN_SECS // 10): + time.sleep(10) + curr_secs = int(time.time()) + server_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) + server_prometheus_logger.write( + self._ping_prometheus_endpoint( + test_server.rpc_host, + test_server.monitoring_port, + ) + ) + client_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) + client_prometheus_logger.write( + self._ping_prometheus_endpoint( + test_client.rpc_host, + test_client.monitoring_port, + ) + ) + finally: + server_prometheus_logger.close() + client_prometheus_logger.close() + + @classmethod + def _ping_prometheus_endpoint( + cls, monitoring_host: str, monitoring_port: int + ) -> str: + """ + A helper function to ping the pod's Prometheus endpoint to get what GMP + sees from the OTel exporter before passing metrics to Cloud Monitoring. + """ + try: + prometheus_log = requests.get( + f"http://{monitoring_host}:{monitoring_port}/metrics" + ) + return "\n".join(prometheus_log.text.splitlines()) + except RequestException as e: + logger.warning("Http request to Prometheus endpoint failed: %r", e) + # It's OK the caller will receive nothing in case of an exception. + # Caller can continue. + return "" + if __name__ == "__main__": absltest.main()