diff --git a/framework/test_app/client_app.py b/framework/test_app/client_app.py index bd0d8b7d..7af242ff 100644 --- a/framework/test_app/client_app.py +++ b/framework/test_app/client_app.py @@ -67,6 +67,7 @@ def __init__( hostname: str, rpc_host: Optional[str] = None, maintenance_port: Optional[int] = None, + monitoring_port: Optional[int] = None, ): super().__init__(rpc_host=(rpc_host or ip)) self.ip = ip @@ -74,6 +75,7 @@ def __init__( self.server_target = server_target self.maintenance_port = maintenance_port or rpc_port self.hostname = hostname + self.monitoring_port = monitoring_port @property @functools.lru_cache(None) diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index b1a5b7c4..913e13df 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -40,13 +40,9 @@ class GammaServerRunner(KubernetesServerRunner): session_affinity_filter: Optional[k8s.GcpSessionAffinityFilter] = None session_affinity_policy: Optional[k8s.GcpSessionAffinityPolicy] = None backend_policy: Optional[k8s.GcpBackendPolicy] = None - pod_monitoring: Optional[k8s.PodMonitoring] = None - pod_monitoring_name: Optional[str] = None route_name: str frontend_service_name: str - csm_workload_name: str - csm_canonical_service_name: str SESSION_AFFINITY_FILTER_NAME: Final[str] = "ssa-filter" SESSION_AFFINITY_POLICY_NAME: Final[str] = "ssa-policy" @@ -77,6 +73,7 @@ def __init__( namespace_template: Optional[str] = None, debug_use_port_forwarding: bool = False, enable_workload_identity: bool = True, + enable_csm_observability: bool = False, csm_workload_name: str = "", csm_canonical_service_name: str = "", deployment_args: Optional[ServerDeploymentArgs] = None, @@ -103,13 +100,14 @@ def __init__( namespace_template=namespace_template, debug_use_port_forwarding=debug_use_port_forwarding, enable_workload_identity=enable_workload_identity, + enable_csm_observability=enable_csm_observability, + csm_workload_name=csm_workload_name, + csm_canonical_service_name=csm_canonical_service_name, deployment_args=deployment_args, ) self.frontend_service_name = frontend_service_name self.route_name = route_name or f"route-{deployment_name}" - self.csm_workload_name = csm_workload_name - self.csm_canonical_service_name = csm_canonical_service_name @override def run( # pylint: disable=arguments-differ @@ -122,7 +120,6 @@ def run( # pylint: disable=arguments-differ log_to_stdout: bool = False, bootstrap_version: Optional[str] = None, route_template: str = "gamma/route_http.yaml", - enable_csm_observability: bool = False, generate_mesh_id: bool = False, ) -> list[XdsTestServer]: if not maintenance_port: @@ -209,7 +206,7 @@ def run( # pylint: disable=arguments-differ maintenance_port=maintenance_port, secure_mode=secure_mode, bootstrap_version=bootstrap_version, - enable_csm_observability=enable_csm_observability, + enable_csm_observability=self.enable_csm_observability, generate_mesh_id=generate_mesh_id, csm_workload_name=self.csm_workload_name, csm_canonical_service_name=self.csm_canonical_service_name, @@ -218,13 +215,14 @@ def run( # pylint: disable=arguments-differ # Create a PodMonitoring resource if CSM Observability is enabled # This is GMP (Google Managed Prometheus) - if enable_csm_observability: + if self.enable_csm_observability: self.pod_monitoring_name = f"{self.deployment_id}-gmp" self.pod_monitoring = self._create_pod_monitoring( "csm/pod-monitoring.yaml", namespace_name=self.k8s_namespace.name, deployment_id=self.deployment_id, pod_monitoring_name=self.pod_monitoring_name, + pod_monitoring_port=self.DEFAULT_MONITORING_PORT, ) servers = self._make_servers_for_deployment( @@ -290,6 +288,32 @@ def create_backend_policy( draining_timeout_sec=draining_timeout_sec, ) + def _xds_test_server_for_pod( + self, + pod: k8s.V1Pod, + *, + test_port: int = KubernetesServerRunner.DEFAULT_TEST_PORT, + maintenance_port: Optional[int] = None, + secure_mode: bool = False, + monitoring_port: Optional[int] = None, + ) -> XdsTestServer: + if self.enable_csm_observability: + if self.debug_use_port_forwarding: + pf = self._start_port_forwarding_pod( + pod, self.DEFAULT_MONITORING_PORT + ) + monitoring_port = pf.local_port + else: + monitoring_port = self.DEFAULT_MONITORING_PORT + + return super()._xds_test_server_for_pod( + pod=pod, + test_port=test_port, + maintenance_port=maintenance_port, + secure_mode=secure_mode, + monitoring_port=monitoring_port, + ) + @override def cleanup(self, *, force=False, force_namespace=False): try: diff --git a/framework/test_app/runners/k8s/k8s_base_runner.py b/framework/test_app/runners/k8s/k8s_base_runner.py index a25e1518..9b35dd3e 100644 --- a/framework/test_app/runners/k8s/k8s_base_runner.py +++ b/framework/test_app/runners/k8s/k8s_base_runner.py @@ -76,6 +76,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): # Pylint wants abstract classes to override abstract methods. # pylint: disable=abstract-method + DEFAULT_MONITORING_PORT = 9464 TEMPLATE_DIR_NAME = "kubernetes-manifests" TEMPLATE_DIR_RELATIVE_PATH = f"../../../../{TEMPLATE_DIR_NAME}" ROLE_WORKLOAD_IDENTITY_USER = "roles/iam.workloadIdentityUser" @@ -384,6 +385,7 @@ def _create_pod_monitoring( namespace_name: str, deployment_id: str, pod_monitoring_name: str, + pod_monitoring_port: int, **kwargs, ) -> k8s.PodMonitoring: pod_monitoring = self._create_from_template( @@ -392,6 +394,7 @@ def _create_pod_monitoring( namespace_name=namespace_name, deployment_id=deployment_id, pod_monitoring_name=pod_monitoring_name, + pod_monitoring_port=pod_monitoring_port, **kwargs, ) if not ( diff --git a/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/framework/test_app/runners/k8s/k8s_xds_client_runner.py index c1eaa9d4..759d9289 100644 --- a/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -34,6 +34,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner): debug_use_port_forwarding: bool td_bootstrap_image: str network: str + enable_csm_observability: bool csm_workload_name: str csm_canonical_service_name: str @@ -64,6 +65,7 @@ def __init__( # pylint: disable=too-many-locals namespace_template: Optional[str] = None, debug_use_port_forwarding: bool = False, enable_workload_identity: bool = True, + enable_csm_observability: bool = False, csm_workload_name: str = "", csm_canonical_service_name: str = "", ): @@ -83,6 +85,7 @@ def __init__( # pylint: disable=too-many-locals self.deployment_template = deployment_template self.enable_workload_identity = enable_workload_identity self.debug_use_port_forwarding = debug_use_port_forwarding + self.enable_csm_observability = enable_csm_observability self.csm_workload_name = csm_workload_name self.csm_canonical_service_name = csm_canonical_service_name @@ -112,7 +115,6 @@ def run( # pylint: disable=arguments-differ generate_mesh_id=False, print_response=False, log_to_stdout: bool = False, - enable_csm_observability: bool = False, request_payload_size: int = 0, response_payload_size: int = 0, ) -> client_app.XdsTestClient: @@ -174,20 +176,21 @@ def run( # pylint: disable=arguments-differ config_mesh=config_mesh, generate_mesh_id=generate_mesh_id, print_response=print_response, - enable_csm_observability=enable_csm_observability, + enable_csm_observability=self.enable_csm_observability, csm_workload_name=self.csm_workload_name, csm_canonical_service_name=self.csm_canonical_service_name, ) # Create a PodMonitoring resource if CSM Observability is enabled # This is GMP (Google Managed Prometheus) - if enable_csm_observability: + if self.enable_csm_observability: self.pod_monitoring_name = f"{self.deployment_id}-gmp" self.pod_monitoring = self._create_pod_monitoring( "csm/pod-monitoring.yaml", namespace_name=self.k8s_namespace.name, deployment_id=self.deployment_id, pod_monitoring_name=self.pod_monitoring_name, + pod_monitoring_port=self.DEFAULT_MONITORING_PORT, ) # We don't support for multiple client replicas at the moment. @@ -218,11 +221,19 @@ def _make_clients_for_deployment( def _xds_test_client_for_pod( self, pod: k8s.V1Pod, *, server_target: str ) -> client_app.XdsTestClient: + monitoring_port = None if self.debug_use_port_forwarding: pf = self._start_port_forwarding_pod(pod, self.stats_port) rpc_port, rpc_host = pf.local_port, pf.local_address + if self.enable_csm_observability: + pf = self._start_port_forwarding_pod( + pod, self.DEFAULT_MONITORING_PORT + ) + monitoring_port = pf.local_port else: rpc_port, rpc_host = self.stats_port, None + if self.enable_csm_observability: + monitoring_port = self.DEFAULT_MONITORING_PORT return client_app.XdsTestClient( ip=pod.status.pod_ip, @@ -230,6 +241,7 @@ def _xds_test_client_for_pod( server_target=server_target, hostname=pod.metadata.name, rpc_host=rpc_host, + monitoring_port=monitoring_port, ) # pylint: disable=arguments-differ diff --git a/framework/test_app/runners/k8s/k8s_xds_server_runner.py b/framework/test_app/runners/k8s/k8s_xds_server_runner.py index 2cd0caf0..12b7a3f6 100644 --- a/framework/test_app/runners/k8s/k8s_xds_server_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_server_runner.py @@ -60,6 +60,11 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner): td_bootstrap_image: str xds_server_uri: str network: str + enable_csm_observability: bool + csm_workload_name: str + csm_canonical_service_name: str + pod_monitoring: Optional[k8s.PodMonitoring] = None + pod_monitoring_name: Optional[str] = None # Server Deployment args deployment_args: ServerDeploymentArgs @@ -98,6 +103,9 @@ def __init__( # pylint: disable=too-many-locals namespace_template: Optional[str] = None, debug_use_port_forwarding: bool = False, enable_workload_identity: bool = True, + enable_csm_observability: bool = False, + csm_workload_name: str = "", + csm_canonical_service_name: str = "", deployment_args: Optional[ServerDeploymentArgs] = None, ): super().__init__( @@ -143,6 +151,10 @@ def __init__( # pylint: disable=too-many-locals # permission to use GCP service account identity. self.gcp_iam = gcp.iam.IamV1(gcp_api_manager, gcp_project) + self.enable_csm_observability = enable_csm_observability + self.csm_workload_name = csm_workload_name + self.csm_canonical_service_name = csm_canonical_service_name + # Mutable state associated with each run. self._reset_state() @@ -248,9 +260,24 @@ def run( # pylint: disable=arguments-differ,too-many-branches maintenance_port=maintenance_port, secure_mode=secure_mode, bootstrap_version=bootstrap_version, + enable_csm_observability=self.enable_csm_observability, + csm_workload_name=self.csm_workload_name, + csm_canonical_service_name=self.csm_canonical_service_name, **self.deployment_args.as_dict(), ) + # Create a PodMonitoring resource if CSM Observability is enabled + # This is GMP (Google Managed Prometheus) + if self.enable_csm_observability: + self.pod_monitoring_name = f"{self.deployment_id}-gmp" + self.pod_monitoring = self._create_pod_monitoring( + "csm/pod-monitoring.yaml", + namespace_name=self.k8s_namespace.name, + deployment_id=self.deployment_id, + pod_monitoring_name=self.pod_monitoring_name, + pod_monitoring_port=self.DEFAULT_MONITORING_PORT, + ) + return self._make_servers_for_deployment( replica_count, test_port=test_port, @@ -305,6 +332,7 @@ def _xds_test_server_for_pod( test_port: int = DEFAULT_TEST_PORT, maintenance_port: Optional[int] = None, secure_mode: bool = False, + monitoring_port: Optional[int] = None, ) -> XdsTestServer: if maintenance_port is None: maintenance_port = self._get_default_maintenance_port(secure_mode) @@ -312,8 +340,15 @@ def _xds_test_server_for_pod( if self.debug_use_port_forwarding: pf = self._start_port_forwarding_pod(pod, maintenance_port) rpc_port, rpc_host = pf.local_port, pf.local_address + if self.enable_csm_observability: + pf = self._start_port_forwarding_pod( + pod, self.DEFAULT_MONITORING_PORT + ) + monitoring_port = pf.local_port else: rpc_port, rpc_host = maintenance_port, None + if self.enable_csm_observability: + monitoring_port = self.DEFAULT_MONITORING_PORT server = XdsTestServer( ip=pod.status.pod_ip, @@ -322,6 +357,7 @@ def _xds_test_server_for_pod( maintenance_port=rpc_port, secure_mode=secure_mode, rpc_host=rpc_host, + monitoring_port=monitoring_port, ) self.pods_to_servers[pod.metadata.name] = server return server diff --git a/framework/test_app/server_app.py b/framework/test_app/server_app.py index d6d1c4ce..d6926359 100644 --- a/framework/test_app/server_app.py +++ b/framework/test_app/server_app.py @@ -52,6 +52,7 @@ def __init__( xds_host: Optional[str] = None, xds_port: Optional[int] = None, rpc_host: Optional[str] = None, + monitoring_port: Optional[str] = None, ): super().__init__(rpc_host=(rpc_host or ip)) self.ip = ip @@ -60,6 +61,7 @@ def __init__( self.maintenance_port = maintenance_port or rpc_port self.secure_mode = secure_mode self.xds_host, self.xds_port = xds_host, xds_port + self.monitoring_port = monitoring_port @property @functools.lru_cache(None) diff --git a/kubernetes-manifests/csm/pod-monitoring.yaml b/kubernetes-manifests/csm/pod-monitoring.yaml index 3204f0f9..398f07b5 100644 --- a/kubernetes-manifests/csm/pod-monitoring.yaml +++ b/kubernetes-manifests/csm/pod-monitoring.yaml @@ -11,5 +11,5 @@ spec: matchLabels: deployment_id: ${deployment_id} endpoints: - - port: 9464 + - port: ${pod_monitoring_port} interval: 10s diff --git a/requirements.lock b/requirements.lock index c0c4821a..1eefc01f 100644 --- a/requirements.lock +++ b/requirements.lock @@ -16,6 +16,7 @@ packaging==23.1 Pygments==2.14.0 python-dateutil==2.8.2 protobuf==4.24.1 +requests==2.31.0 xds-protos==1.58.0rc1 ## The following requirements were added by pip freeze: cachetools==5.3.1 @@ -35,7 +36,6 @@ proto-plus==1.22.3 pyasn1==0.5.0 pyasn1-modules==0.3.0 pyparsing==3.1.1 -requests==2.31.0 requests-oauthlib==1.3.1 rsa==4.9 uritemplate==3.0.1 diff --git a/requirements.txt b/requirements.txt index c4f1d362..f8e865a8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,4 +16,5 @@ packaging~=23.1 Pygments~=2.9 python-dateutil~=2.8 protobuf~=4.24 +requests~=2.31.0 xds-protos==1.58.0rc1 diff --git a/tests/app_net_csm_observability_test.py b/tests/app_net_csm_observability_test.py new file mode 100644 index 00000000..93984f08 --- /dev/null +++ b/tests/app_net_csm_observability_test.py @@ -0,0 +1,598 @@ +# Copyright 2023 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import dataclasses +import logging +import time +from typing import Callable, Iterable, TextIO +import unittest.mock + +from absl import flags +from absl.testing import absltest +from google.api_core import exceptions as gapi_errors +from google.api_core import retry as gapi_retries +from google.cloud import monitoring_v3 +import requests +from requests.exceptions import RequestException +import yaml + +from framework import xds_k8s_testcase +from framework.helpers import skips +from framework.test_app.runners.k8s import k8s_base_runner +from framework.test_app.runners.k8s import k8s_xds_client_runner +from framework.test_app.runners.k8s import k8s_xds_server_runner + +logger = logging.getLogger(__name__) +flags.adopt_module_key_flags(xds_k8s_testcase) + +# Type aliases +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient +_Lang = skips.Lang + +# Testing consts +TEST_RUN_SECS = 90 +REQUEST_PAYLOAD_SIZE = 271828 +RESPONSE_PAYLOAD_SIZE = 314159 +GRPC_METHOD_NAME = "grpc.testing.TestService/UnaryCall" +CSM_WORKLOAD_NAME_SERVER = "csm_workload_name_from_server" +CSM_WORKLOAD_NAME_CLIENT = "csm_workload_name_from_client" +CSM_CANONICAL_SERVICE_NAME_SERVER = "csm_canonical_service_name_from_server" +CSM_CANONICAL_SERVICE_NAME_CLIENT = "csm_canonical_service_name_from_client" +PROMETHEUS_HOST = "prometheus.googleapis.com" +METRIC_CLIENT_ATTEMPT_SENT = ( + f"{PROMETHEUS_HOST}/" + "grpc_client_attempt_sent_total_compressed_message_size_bytes/histogram" +) +METRIC_CLIENT_ATTEMPT_RCVD = ( + f"{PROMETHEUS_HOST}/" + "grpc_client_attempt_rcvd_total_compressed_message_size_bytes/histogram" +) +METRIC_CLIENT_ATTEMPT_DURATION = ( + f"{PROMETHEUS_HOST}/grpc_client_attempt_duration_seconds/histogram" +) +METRIC_CLIENT_ATTEMPT_STARTED = ( + f"{PROMETHEUS_HOST}/grpc_client_attempt_started_total/counter" +) +METRIC_SERVER_CALL_RCVD = ( + f"{PROMETHEUS_HOST}/" + "grpc_server_call_rcvd_total_compressed_message_size_bytes/histogram" +) +METRIC_SERVER_CALL_SENT = ( + f"{PROMETHEUS_HOST}/" + "grpc_server_call_sent_total_compressed_message_size_bytes/histogram" +) +METRIC_SERVER_CALL_DURATION = ( + f"{PROMETHEUS_HOST}/grpc_server_call_duration_seconds/histogram" +) +METRIC_SERVER_CALL_STARTED = ( + f"{PROMETHEUS_HOST}/grpc_server_call_started_total/counter" +) +HISTOGRAM_CLIENT_METRICS = ( + METRIC_CLIENT_ATTEMPT_SENT, + METRIC_CLIENT_ATTEMPT_RCVD, + METRIC_CLIENT_ATTEMPT_DURATION, +) +HISTOGRAM_SERVER_METRICS = ( + METRIC_SERVER_CALL_DURATION, + METRIC_SERVER_CALL_RCVD, + METRIC_SERVER_CALL_SENT, +) +COUNTER_CLIENT_METRICS = (METRIC_CLIENT_ATTEMPT_STARTED,) +COUNTER_SERVER_METRICS = (METRIC_SERVER_CALL_STARTED,) +HISTOGRAM_METRICS = HISTOGRAM_CLIENT_METRICS + HISTOGRAM_SERVER_METRICS +COUNTER_METRICS = COUNTER_CLIENT_METRICS + COUNTER_SERVER_METRICS +CLIENT_METRICS = HISTOGRAM_CLIENT_METRICS + COUNTER_CLIENT_METRICS +SERVER_METRICS = HISTOGRAM_SERVER_METRICS + COUNTER_SERVER_METRICS +ALL_METRICS = HISTOGRAM_METRICS + COUNTER_METRICS + +KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner +KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner +BuildQueryFn = Callable[[str, str], str] +ANY = unittest.mock.ANY + + +@dataclasses.dataclass(eq=False) +class MetricTimeSeries: + """ + This class represents one TimeSeries object + from monitoring_v3.ListTimeSeriesResponse. + """ + + # the metric name + name: str + # each time series has a monitored resource + resource_type: str + # each time series has a set of metric labels + metric_labels: dict[str, str] + # each time series has a set of monitored resource labels + resource_labels: dict[str, str] + # each time series has a set of data points + points: list[monitoring_v3.types.Point] + + @classmethod + def from_response( + cls, + name: str, + response: monitoring_v3.types.TimeSeries, + ) -> "MetricTimeSeries": + return cls( + name=name, + resource_type=response.resource.type, + metric_labels=dict(sorted(response.metric.labels.items())), + resource_labels=dict(sorted(response.resource.labels.items())), + points=list(response.points), + ) + + def pretty_print(self) -> str: + metric = dataclasses.asdict(self) + # too much noise to print all data points from a time series + metric.pop("points") + return yaml.dump(metric, sort_keys=False) + + +# This class is purely for debugging purposes. We want to log what we see +# from the Prometheus endpoint before being sent to Cloud Monitoring. +# Once we determined the root cause of b/323596669 we can remove this +# class. +class PrometheusLogger: + def __init__( + self, k8s_runner: k8s_base_runner.KubernetesBaseRunner, pod_name: str + ): + logfile_name = ( + f"{k8s_runner.k8s_namespace.name}_{pod_name}_prometheus.log" + ) + log_path = k8s_runner.logs_subdir / logfile_name + self.log_stream: TextIO = open( + log_path, "w", errors="ignore", encoding="utf-8" + ) + + def write(self, line): + self.log_stream.write(line) + self.log_stream.write("\n") + self.log_stream.flush() + + def close(self): + self.log_stream.close() + + +class AppNetCsmObservabilityTest(xds_k8s_testcase.AppNetXdsKubernetesTestCase): + metric_client: monitoring_v3.MetricServiceClient + + @staticmethod + def is_supported(config: skips.TestConfig) -> bool: + if config.client_lang == _Lang.CPP and config.server_lang == _Lang.CPP: + # CSM Observability Test is only supported for CPP for now. + return config.version_gte("v1.62.x") + return False + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.metric_client = cls.gcp_api_manager.monitoring_metric_service("v3") + + # These parameters are more pertaining to the test itself, not to + # each run(). + def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: + return super().initKubernetesClientRunner( + enable_csm_observability=True, + csm_workload_name=CSM_WORKLOAD_NAME_CLIENT, + csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_CLIENT, + ) + + # These parameters are more pertaining to the test itself, not to + # each run(). + def initKubernetesServerRunner(self, **kwargs) -> KubernetesServerRunner: + return super().initKubernetesServerRunner( + enable_csm_observability=True, + csm_workload_name=CSM_WORKLOAD_NAME_SERVER, + csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER, + ) + + def test_csm_observability(self): + with self.subTest("0_create_health_check"): + self.td.create_health_check() + + with self.subTest("1_create_backend_service"): + self.td.create_backend_service() + + with self.subTest("2_create_mesh"): + self.td.create_mesh() + + with self.subTest("3_create_grpc_route"): + self.td.create_grpc_route( + self.server_xds_host, self.server_xds_port + ) + + with self.subTest("1_run_test_server"): + start_secs = int(time.time()) + test_server: _XdsTestServer = self.startTestServers( + replica_count=1 + )[0] + + with self.subTest("5_setup_server_backends"): + self.setupServerBackends() + + with self.subTest("2_start_test_client"): + test_client: _XdsTestClient = self.startTestClient( + test_server, + config_mesh=self.td.mesh.name, + request_payload_size=REQUEST_PAYLOAD_SIZE, + response_payload_size=RESPONSE_PAYLOAD_SIZE, + ) + + with self.subTest("7_assert_xds_config_exists"): + self.assertXdsConfigExists(test_client) + + with self.subTest("3_test_server_received_rpcs_from_test_client"): + self.assertSuccessfulRpcs(test_client) + + with self.subTest("4_export_prometheus_metrics_data"): + logger.info( + "Letting test client run for %d seconds to produce metric data", + TEST_RUN_SECS, + ) + server_prometheus_logger = PrometheusLogger( + self.server_runner, test_server.hostname + ) + client_prometheus_logger = PrometheusLogger( + self.client_runner, test_client.hostname + ) + try: + for i in range(0, TEST_RUN_SECS // 10): + time.sleep(10) + curr_secs = int(time.time()) + server_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) + server_prometheus_logger.write( + self.ping_prometheus_endpoint( + test_server.rpc_host, + test_server.monitoring_port, + ) + ) + client_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) + client_prometheus_logger.write( + self.ping_prometheus_endpoint( + test_client.rpc_host, + test_client.monitoring_port, + ) + ) + finally: + server_prometheus_logger.close() + client_prometheus_logger.close() + + with self.subTest("5_query_cloud_monitoring_metrics"): + end_secs = int(time.time()) + interval = monitoring_v3.TimeInterval( + start_time={"seconds": start_secs}, + end_time={"seconds": end_secs}, + ) + server_histogram_results = self.query_metrics( + HISTOGRAM_SERVER_METRICS, + self.build_histogram_query, + self.server_namespace, + interval, + ) + client_histogram_results = self.query_metrics( + HISTOGRAM_CLIENT_METRICS, + self.build_histogram_query, + self.client_namespace, + interval, + ) + server_counter_results = self.query_metrics( + COUNTER_SERVER_METRICS, + self.build_counter_query, + self.server_namespace, + interval, + ) + client_counter_results = self.query_metrics( + COUNTER_CLIENT_METRICS, + self.build_counter_query, + self.client_namespace, + interval, + ) + all_results = { + **server_histogram_results, + **client_histogram_results, + **server_counter_results, + **client_counter_results, + } + self.assertNotEmpty(all_results, msg="No query metrics results") + + with self.subTest("6_check_metrics_time_series"): + for metric in ALL_METRICS: + # Every metric needs to exist in the query results + self.assertIn(metric, all_results) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("7_check_metrics_labels_histogram_client"): + expected_metric_labels = { + "csm_mesh_id": ANY, + "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER, + "csm_remote_workload_cluster_name": ANY, + "csm_remote_workload_location": ANY, + "csm_remote_workload_name": CSM_WORKLOAD_NAME_SERVER, + "csm_remote_workload_namespace_name": self.server_namespace, + "csm_remote_workload_project_id": self.project, + "csm_remote_workload_type": "gcp_kubernetes_engine", + "csm_service_name": self.td.backend_service.name, + "csm_service_namespace_name": "unknown", + "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, + "grpc_method": GRPC_METHOD_NAME, + "grpc_status": "OK", + "grpc_target": ANY, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_client.hostname, + } + for metric in HISTOGRAM_CLIENT_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("8_check_metrics_labels_histogram_server"): + expected_metric_labels = { + "csm_mesh_id": ANY, + "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, + "csm_remote_workload_cluster_name": ANY, + "csm_remote_workload_location": ANY, + "csm_remote_workload_name": CSM_WORKLOAD_NAME_CLIENT, + "csm_remote_workload_namespace_name": self.client_namespace, + "csm_remote_workload_project_id": self.project, + "csm_remote_workload_type": "gcp_kubernetes_engine", + "csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER, + "grpc_method": GRPC_METHOD_NAME, + "grpc_status": "OK", + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_server.hostname, + } + for metric in HISTOGRAM_SERVER_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("9_check_metrics_labels_counter_client"): + expected_metric_labels = { + "grpc_method": GRPC_METHOD_NAME, + "grpc_target": ANY, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_client.hostname, + } + for metric in COUNTER_CLIENT_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the correct set of metric keys and + # values + with self.subTest("10_check_metrics_labels_counter_server"): + expected_metric_labels = { + "grpc_method": GRPC_METHOD_NAME, + "otel_scope_name": ANY, + "otel_scope_version": ANY, + "pod": test_server.hostname, + } + for metric in COUNTER_SERVER_METRICS: + actual_metric_labels = all_results[metric].metric_labels + self.assertDictEqual( + expected_metric_labels, actual_metric_labels + ) + + # Testing whether each metric has the right set of monitored resource + # label keys and values + with self.subTest("11_check_client_resource_labels_client"): + # all metrics should have the same set of monitored resource labels + # keys, which come from the GMP job + expected_resource_labels = { + "cluster": ANY, + "instance": ANY, + "job": self.client_runner.pod_monitoring_name, + "location": ANY, + "namespace": self.client_namespace, + "project_id": self.project, + } + for metric in CLIENT_METRICS: + metric_time_series = all_results[metric] + self.assertEqual( + "prometheus_target", metric_time_series.resource_type + ) + + actual_resource_labels = metric_time_series.resource_labels + self.assertDictEqual( + expected_resource_labels, actual_resource_labels + ) + + # Testing whether each metric has the right set of monitored resource + # label keys and values + with self.subTest("12_check_server_resource_labels_server"): + # all metrics should have the same set of monitored resource labels + # keys, which come from the GMP job + expected_resource_labels = { + "cluster": ANY, + "instance": ANY, + "job": self.server_runner.pod_monitoring_name, + "location": ANY, + "namespace": self.server_namespace, + "project_id": self.project, + } + for metric in SERVER_METRICS: + metric_time_series = all_results[metric] + self.assertEqual( + "prometheus_target", metric_time_series.resource_type + ) + + actual_resource_labels = metric_time_series.resource_labels + self.assertDictEqual( + expected_resource_labels, actual_resource_labels + ) + + # This tests whether each of the "bytes sent" histogram type metric + # should have at least 1 data point whose mean should converge to be + # close to the number of bytes being sent by the RPCs. + with self.subTest("13_check_bytes_sent_vs_data_points"): + for metric in (METRIC_CLIENT_ATTEMPT_SENT, METRIC_SERVER_CALL_RCVD): + self.assertAtLeastOnePointWithinRange( + all_results[metric].points, REQUEST_PAYLOAD_SIZE + ) + + for metric in (METRIC_CLIENT_ATTEMPT_RCVD, METRIC_SERVER_CALL_SENT): + self.assertAtLeastOnePointWithinRange( + all_results[metric].points, RESPONSE_PAYLOAD_SIZE + ) + + @classmethod + def build_histogram_query(cls, metric_type: str, namespace: str) -> str: + # + # The list_time_series API requires us to query one metric + # at a time. + # + # The 'grpc_status = "OK"' filter condition is needed because + # some time series data points were logged when the grpc_status + # was "UNAVAILABLE" when the client/server were establishing + # connections. + # + # The 'grpc_method' filter condition is needed because the + # server metrics are also serving on the Channelz requests. + # + # The 'resource.labels.namespace' filter condition allows us to + # filter metrics just for the current test run. + return ( + f'metric.type = "{metric_type}" AND ' + 'metric.labels.grpc_status = "OK" AND ' + f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}" AND ' + f'resource.labels.namespace = "{namespace}"' + ) + + @classmethod + def build_counter_query(cls, metric_type: str, namespace: str) -> str: + # For these num rpcs started counter metrics, they do not have the + # 'grpc_status' label + return ( + f'metric.type = "{metric_type}" AND ' + f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}" AND ' + f'resource.labels.namespace = "{namespace}"' + ) + + def query_metrics( + self, + metric_names: Iterable[str], + build_query_fn: BuildQueryFn, + namespace: str, + interval: monitoring_v3.TimeInterval, + ) -> dict[str, MetricTimeSeries]: + """ + A helper function to make the cloud monitoring API call to query + metrics created by this test run. + """ + # Based on default retry settings for list_time_series method: + # https://github.com/googleapis/google-cloud-python/blob/google-cloud-monitoring-v2.18.0/packages/google-cloud-monitoring/google/cloud/monitoring_v3/services/metric_service/transports/base.py#L210-L218 + # Modified: predicate extended to retry on a wider range of error types. + retry_settings = gapi_retries.Retry( + initial=0.1, + maximum=30.0, + multiplier=1.3, + predicate=gapi_retries.if_exception_type( + # Retry on 5xx, not just 503 ServiceUnavailable. This also + # covers gRPC Unknown, DataLoss, and DeadlineExceeded statuses. + # 501 MethodNotImplemented not excluded because most likely + # reason we'd see this error is server misconfiguration, so we + # want to give it a chance to recovering this situation too. + gapi_errors.ServerError, + # Retry on 429/ResourceExhausted: recoverable rate limiting. + gapi_errors.TooManyRequests, + ), + deadline=90.0, + ) + results = {} + for metric in metric_names: + logger.info("Requesting list_time_series for metric %s", metric) + response = self.metric_client.list_time_series( + name=f"projects/{self.project}", + filter=build_query_fn(metric, namespace), + interval=interval, + view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + retry=retry_settings, + ) + time_series = list(response) + + self.assertLen( + time_series, + 1, + msg=f"Query for {metric} should return exactly 1 time series." + f" Found {len(time_series)}.", + ) + + metric_time_series = MetricTimeSeries.from_response( + metric, time_series[0] + ) + logger.info( + "Metric %s:\n%s", metric, metric_time_series.pretty_print() + ) + results[metric] = metric_time_series + return results + + def assertAtLeastOnePointWithinRange( + self, + points: list[monitoring_v3.types.Point], + ref_bytes: int, + tolerance: float = 0.05, + ): + """ + A helper function to check whether at least one of the "points" whose + mean should be within X% of ref_bytes. + """ + for point in points: + if ( + ref_bytes * (1 - tolerance) + < point.value.distribution_value.mean + < ref_bytes * (1 + tolerance) + ): + return + self.fail( + f"No data point with {ref_bytes}±{tolerance*100}% bytes found" + ) + + def ping_prometheus_endpoint( + self, monitoring_host: str, monitoring_port: int + ) -> str: + """ + A helper function to ping the pod's Prometheus endpoint to get what GMP + sees from the OTel exporter before passing metrics to Cloud Monitoring. + """ + try: + prometheus_log = requests.get( + f"http://{monitoring_host}:{monitoring_port}/metrics" + ) + return "\n".join(prometheus_log.text.splitlines()) + except RequestException as e: + logger.warning("Http request to Prometheus endpoint failed: %r", e) + # It's OK the caller will receive nothing in case of an exception. + # Caller can continue. + return "" + + +if __name__ == "__main__": + absltest.main() diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index f1518a95..1a6ea5f8 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -14,7 +14,7 @@ import dataclasses import logging import time -from typing import Callable, Iterable +from typing import Callable, Iterable, TextIO import unittest.mock from absl import flags @@ -22,12 +22,15 @@ from google.api_core import exceptions as gapi_errors from google.api_core import retry as gapi_retries from google.cloud import monitoring_v3 +import requests +from requests.exceptions import RequestException import yaml from framework import xds_gamma_testcase from framework import xds_k8s_testcase from framework.helpers import skips from framework.test_app.runners.k8s import gamma_server_runner +from framework.test_app.runners.k8s import k8s_base_runner from framework.test_app.runners.k8s import k8s_xds_client_runner logger = logging.getLogger(__name__) @@ -139,6 +142,31 @@ def pretty_print(self) -> str: return yaml.dump(metric, sort_keys=False) +# This class is purely for debugging purposes. We want to log what we see +# from the Prometheus endpoint before being sent to Cloud Monitoring. +# Once we determined the root cause of b/323596669 we can remove this +# class. +class PrometheusLogger: + def __init__( + self, k8s_runner: k8s_base_runner.KubernetesBaseRunner, pod_name: str + ): + logfile_name = ( + f"{k8s_runner.k8s_namespace.name}_{pod_name}_prometheus.log" + ) + log_path = k8s_runner.logs_subdir / logfile_name + self.log_stream: TextIO = open( + log_path, "w", errors="ignore", encoding="utf-8" + ) + + def write(self, line): + self.log_stream.write(line) + self.log_stream.write("\n") + self.log_stream.flush() + + def close(self): + self.log_stream.close() + + class CsmObservabilityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): metric_client: monitoring_v3.MetricServiceClient @@ -158,6 +186,7 @@ def setUpClass(cls): # each run(). def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: return super().initKubernetesClientRunner( + enable_csm_observability=True, csm_workload_name=CSM_WORKLOAD_NAME_CLIENT, csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_CLIENT, ) @@ -166,6 +195,7 @@ def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: # each run(). def initKubernetesServerRunner(self, **kwargs) -> GammaServerRunner: return super().initKubernetesServerRunner( + enable_csm_observability=True, csm_workload_name=CSM_WORKLOAD_NAME_SERVER, csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER, ) @@ -175,14 +205,11 @@ def test_csm_observability(self): # resource creation out of self.startTestServers() with self.subTest("1_run_test_server"): start_secs = int(time.time()) - test_server: _XdsTestServer = self.startTestServers( - enable_csm_observability=True, - )[0] + test_server: _XdsTestServer = self.startTestServers()[0] with self.subTest("2_start_test_client"): test_client: _XdsTestClient = self.startTestClient( test_server, - enable_csm_observability=True, request_payload_size=REQUEST_PAYLOAD_SIZE, response_payload_size=RESPONSE_PAYLOAD_SIZE, ) @@ -190,12 +217,44 @@ def test_csm_observability(self): with self.subTest("3_test_server_received_rpcs_from_test_client"): self.assertSuccessfulRpcs(test_client) - with self.subTest("4_query_cloud_monitoring_metrics"): + with self.subTest("4_export_prometheus_metrics_data"): logger.info( "Letting test client run for %d seconds to produce metric data", TEST_RUN_SECS, ) - time.sleep(TEST_RUN_SECS) + server_prometheus_logger = PrometheusLogger( + self.server_runner, test_server.hostname + ) + client_prometheus_logger = PrometheusLogger( + self.client_runner, test_client.hostname + ) + try: + for i in range(0, TEST_RUN_SECS // 10): + time.sleep(10) + curr_secs = int(time.time()) + server_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) + server_prometheus_logger.write( + self.ping_prometheus_endpoint( + test_server.rpc_host, + test_server.monitoring_port, + ) + ) + client_prometheus_logger.write( + f"Prometheus endpoint content at {curr_secs}" + ) + client_prometheus_logger.write( + self.ping_prometheus_endpoint( + test_client.rpc_host, + test_client.monitoring_port, + ) + ) + finally: + server_prometheus_logger.close() + client_prometheus_logger.close() + + with self.subTest("5_query_cloud_monitoring_metrics"): end_secs = int(time.time()) interval = monitoring_v3.TimeInterval( start_time={"seconds": start_secs}, @@ -233,14 +292,14 @@ def test_csm_observability(self): } self.assertNotEmpty(all_results, msg="No query metrics results") - with self.subTest("5_check_metrics_time_series"): + with self.subTest("6_check_metrics_time_series"): for metric in ALL_METRICS: # Every metric needs to exist in the query results self.assertIn(metric, all_results) # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("6_check_metrics_labels_histogram_client"): + with self.subTest("7_check_metrics_labels_histogram_client"): expected_metric_labels = { "csm_mesh_id": ANY, "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER, @@ -268,7 +327,7 @@ def test_csm_observability(self): # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("7_check_metrics_labels_histogram_server"): + with self.subTest("8_check_metrics_labels_histogram_server"): expected_metric_labels = { "csm_mesh_id": ANY, "csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT, @@ -293,7 +352,7 @@ def test_csm_observability(self): # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("8_check_metrics_labels_counter_client"): + with self.subTest("9_check_metrics_labels_counter_client"): expected_metric_labels = { "grpc_method": GRPC_METHOD_NAME, "grpc_target": ANY, @@ -309,7 +368,7 @@ def test_csm_observability(self): # Testing whether each metric has the correct set of metric keys and # values - with self.subTest("9_check_metrics_labels_counter_server"): + with self.subTest("10_check_metrics_labels_counter_server"): expected_metric_labels = { "grpc_method": GRPC_METHOD_NAME, "otel_scope_name": ANY, @@ -324,7 +383,7 @@ def test_csm_observability(self): # Testing whether each metric has the right set of monitored resource # label keys and values - with self.subTest("10_check_client_resource_labels_client"): + with self.subTest("11_check_client_resource_labels_client"): # all metrics should have the same set of monitored resource labels # keys, which come from the GMP job expected_resource_labels = { @@ -348,7 +407,7 @@ def test_csm_observability(self): # Testing whether each metric has the right set of monitored resource # label keys and values - with self.subTest("11_check_server_resource_labels_server"): + with self.subTest("12_check_server_resource_labels_server"): # all metrics should have the same set of monitored resource labels # keys, which come from the GMP job expected_resource_labels = { @@ -373,7 +432,7 @@ def test_csm_observability(self): # This tests whether each of the "bytes sent" histogram type metric # should have at least 1 data point whose mean should converge to be # close to the number of bytes being sent by the RPCs. - with self.subTest("12_check_bytes_sent_vs_data_points"): + with self.subTest("13_check_bytes_sent_vs_data_points"): for metric in (METRIC_CLIENT_ATTEMPT_SENT, METRIC_SERVER_CALL_RCVD): self.assertAtLeastOnePointWithinRange( all_results[metric].points, REQUEST_PAYLOAD_SIZE @@ -457,7 +516,7 @@ def query_metrics( view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, retry=retry_settings, ) - time_series = list(filter(self.is_legit_time_series, response)) + time_series = list(response) self.assertLen( time_series, @@ -475,21 +534,6 @@ def query_metrics( results[metric] = metric_time_series return results - @classmethod - def is_legit_time_series( - cls, series: monitoring_v3.types.TimeSeries - ) -> bool: - for point in series.points: - # Test flake: we found some time series with no actual data point - # in prod test runs. - # Here we will only include time series with actual data in it. - if point.value.distribution_value.count or point.value.double_value: - return True - logger.warning( - "Warning: found time_series with no valid data point\n%s", series - ) - return False - def assertAtLeastOnePointWithinRange( self, points: list[monitoring_v3.types.Point], @@ -511,6 +555,24 @@ def assertAtLeastOnePointWithinRange( f"No data point with {ref_bytes}±{tolerance*100}% bytes found" ) + def ping_prometheus_endpoint( + self, monitoring_host: str, monitoring_port: int + ) -> str: + """ + A helper function to ping the pod's Prometheus endpoint to get what GMP + sees from the OTel exporter before passing metrics to Cloud Monitoring. + """ + try: + prometheus_log = requests.get( + f"http://{monitoring_host}:{monitoring_port}/metrics" + ) + return "\n".join(prometheus_log.text.splitlines()) + except RequestException as e: + logger.warning("Http request to Prometheus endpoint failed: %r", e) + # It's OK the caller will receive nothing in case of an exception. + # Caller can continue. + return "" + if __name__ == "__main__": absltest.main()