From c684f553ce880b2fae422c3f1f7fb39f6df67977 Mon Sep 17 00:00:00 2001 From: Stanley Cheung Date: Thu, 25 Jan 2024 08:31:55 +0000 Subject: [PATCH] Add assertions to CSM Observability Test --- .../runners/k8s/gamma_server_runner.py | 23 + .../runners/k8s/k8s_xds_client_runner.py | 4 + kubernetes-manifests/client.deployment.yaml | 16 + kubernetes-manifests/server.deployment.yaml | 16 + tests/gamma/csm_observability_test.py | 436 +++++++++++++++++- 5 files changed, 481 insertions(+), 14 deletions(-) diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index 745c6b8c..63aa4c96 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -38,6 +38,8 @@ class GammaServerRunner(KubernetesServerRunner): be_policy: Optional[k8s.GcpBackendPolicy] = None termination_grace_period_seconds: Optional[int] = None pre_stop_hook: bool = False + pod_monitoring: Optional[k8s.PodMonitoring] = None + pod_monitoring_name: Optional[str] = None route_name: str frontend_service_name: str @@ -116,6 +118,8 @@ def run( # pylint: disable=arguments-differ bootstrap_version: Optional[str] = None, route_template: str = "gamma/route_http.yaml", enable_csm_observability: bool = False, + csm_workload_name: str = "", + csm_canonical_service_name: str = "", ) -> List[XdsTestServer]: if not maintenance_port: maintenance_port = self._get_default_maintenance_port(secure_mode) @@ -208,8 +212,21 @@ def run( # pylint: disable=arguments-differ termination_grace_period_seconds=self.termination_grace_period_seconds, pre_stop_hook=self.pre_stop_hook, enable_csm_observability=enable_csm_observability, + csm_workload_name=csm_workload_name, + csm_canonical_service_name=csm_canonical_service_name, ) + # Create a PodMonitoring resource if CSM Observability is enabled + # This is GMP (Google Managed Prometheus) + if enable_csm_observability: + self.pod_monitoring_name = f"{self.deployment_id}-gmp" + self.pod_monitoring = self._create_pod_monitoring( + "csm/pod-monitoring.yaml", + namespace_name=self.k8s_namespace.name, + deployment_id=self.deployment_id, + pod_monitoring_name=self.pod_monitoring_name, + ) + servers = self._make_servers_for_deployment( replica_count, test_port=test_port, @@ -296,6 +313,12 @@ def cleanup(self, *, force=False, force_namespace=False): self._delete_service_account(self.service_account_name) self.service_account = None + # Pod monitoring name is only set when CSM observability is enabled. + if self.pod_monitoring_name and (self.pod_monitoring or force): + self._delete_pod_monitoring(self.pod_monitoring_name) + self.pod_monitoring = None + self.pod_monitoring_name = None + self._cleanup_namespace(force=(force_namespace and force)) finally: self._stop() diff --git a/framework/test_app/runners/k8s/k8s_xds_client_runner.py b/framework/test_app/runners/k8s/k8s_xds_client_runner.py index 610a25a7..d56768dd 100644 --- a/framework/test_app/runners/k8s/k8s_xds_client_runner.py +++ b/framework/test_app/runners/k8s/k8s_xds_client_runner.py @@ -107,6 +107,8 @@ def run( # pylint: disable=arguments-differ print_response=False, log_to_stdout: bool = False, enable_csm_observability: bool = False, + csm_workload_name: str = "", + csm_canonical_service_name: str = "", ) -> XdsTestClient: logger.info( ( @@ -162,6 +164,8 @@ def run( # pylint: disable=arguments-differ generate_mesh_id=generate_mesh_id, print_response=print_response, enable_csm_observability=enable_csm_observability, + csm_workload_name=csm_workload_name, + csm_canonical_service_name=csm_canonical_service_name, ) # Create a PodMonitoring resource if CSM Observability is enabled diff --git a/kubernetes-manifests/client.deployment.yaml b/kubernetes-manifests/client.deployment.yaml index 13fad0f8..a149454b 100644 --- a/kubernetes-manifests/client.deployment.yaml +++ b/kubernetes-manifests/client.deployment.yaml @@ -60,6 +60,22 @@ spec: value: "true" - name: GRPC_EXPERIMENTAL_XDS_ENABLE_OVERRIDE_HOST value: "true" + % if enable_csm_observability: + - name: CSM_WORKLOAD_NAME + value: ${csm_workload_name} + - name: CSM_CANONICAL_SERVICE_NAME + value: ${csm_canonical_service_name} + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE_NAME + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OTEL_RESOURCE_ATTRIBUTES + value: k8s.pod.name=$(POD_NAME),k8s.namespace.name=$(NAMESPACE_NAME) + % endif volumeMounts: - mountPath: /tmp/grpc-xds/ name: grpc-td-conf diff --git a/kubernetes-manifests/server.deployment.yaml b/kubernetes-manifests/server.deployment.yaml index de823cc6..c6275317 100644 --- a/kubernetes-manifests/server.deployment.yaml +++ b/kubernetes-manifests/server.deployment.yaml @@ -52,6 +52,22 @@ spec: value: "true" - name: GRPC_EXPERIMENTAL_XDS_ENABLE_OVERRIDE_HOST value: "true" + % if enable_csm_observability: + - name: CSM_WORKLOAD_NAME + value: ${csm_workload_name} + - name: CSM_CANONICAL_SERVICE_NAME + value: ${csm_canonical_service_name} + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE_NAME + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: OTEL_RESOURCE_ATTRIBUTES + value: k8s.pod.name=$(POD_NAME),k8s.namespace.name=$(NAMESPACE_NAME) + % endif volumeMounts: - mountPath: /tmp/grpc-xds/ name: grpc-td-conf diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index 08df75fe..62c9341a 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import time from absl import flags from absl.testing import absltest @@ -19,6 +20,7 @@ from google.cloud import monitoring_v3 from framework import xds_gamma_testcase +from framework import xds_k8s_flags from framework import xds_k8s_testcase from framework.helpers import skips @@ -29,6 +31,59 @@ _XdsTestClient = xds_k8s_testcase.XdsTestClient _Lang = skips.Lang +# Testing consts +_TEST_RUN_SECS = 90 +_REQUEST_PAYLOAD_SIZE = 271828 +_RESPONSE_PAYLOAD_SIZE = 314159 +_GRPC_METHOD_NAME = "grpc.testing.TestService/UnaryCall" +_CSM_WORKLOAD_NAME_SERVER = "csm_workload_name_from_server" +_CSM_WORKLOAD_NAME_CLIENT = "csm_workload_name_from_client" +_CSM_CANONICAL_SERVICE_NAME_SERVER = "csm_canonical_service_name_from_server" +_CSM_CANONICAL_SERVICE_NAME_CLIENT = "csm_canonical_service_name_from_client" +_METRIC_CLIENT_ATTEMPT_SENT = "prometheus.googleapis.com/grpc_client_attempt_sent_total_compressed_message_size_bytes/histogram" +_METRIC_CLIENT_ATTEMPT_RCVD = "prometheus.googleapis.com/grpc_client_attempt_rcvd_total_compressed_message_size_bytes/histogram" +_METRIC_CLIENT_ATTEMPT_DURATION = ( + "prometheus.googleapis.com/grpc_client_attempt_duration_seconds/histogram" +) +_METRIC_CLIENT_ATTEMPT_STARTED = ( + "prometheus.googleapis.com/grpc_client_attempt_started_total/counter" +) +_METRIC_SERVER_CALL_RCVD = "prometheus.googleapis.com/grpc_server_call_rcvd_total_compressed_message_size_bytes/histogram" +_METRIC_SERVER_CALL_SENT = "prometheus.googleapis.com/grpc_server_call_sent_total_compressed_message_size_bytes/histogram" +_METRIC_SERVER_CALL_DURATION = ( + "prometheus.googleapis.com/grpc_server_call_duration_seconds/histogram" +) +_METRIC_SERVER_CALL_STARTED = ( + "prometheus.googleapis.com/grpc_server_call_started_total/counter" +) +_HISTOGRAM_CLIENT_METRICS = [ + _METRIC_CLIENT_ATTEMPT_SENT, + _METRIC_CLIENT_ATTEMPT_RCVD, + _METRIC_CLIENT_ATTEMPT_DURATION, +] +_HISTOGRAM_SERVER_METRICS = [ + _METRIC_SERVER_CALL_DURATION, + _METRIC_SERVER_CALL_RCVD, + _METRIC_SERVER_CALL_SENT, +] +_CLIENT_SENT_METRICS = [ + _METRIC_CLIENT_ATTEMPT_SENT, + _METRIC_SERVER_CALL_RCVD, +] +_SERVER_SENT_METRICS = [ + _METRIC_CLIENT_ATTEMPT_RCVD, + _METRIC_SERVER_CALL_SENT, +] +_COUNTER_CLIENT_METRICS = [ + _METRIC_CLIENT_ATTEMPT_STARTED, +] +_COUNTER_SERVER_METRICS = [ + _METRIC_SERVER_CALL_STARTED, +] +_HISTOGRAM_METRICS = _HISTOGRAM_CLIENT_METRICS + _HISTOGRAM_SERVER_METRICS +_COUNTER_METRICS = _COUNTER_CLIENT_METRICS + _COUNTER_SERVER_METRICS +_ALL_METRICS = _HISTOGRAM_METRICS + _COUNTER_METRICS + class CsmObservabilityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): metric_client: monitoring_v3.MetricServiceClient @@ -45,36 +100,389 @@ def setUpClass(cls): super().setUpClass() cls.metric_client = cls.gcp_api_manager.monitoring_metric_service("v3") + # A helper function to make the cloud monitoring API call to query metrics + # created by this test run. + def _query_metrics(self, metric_names, filter_str, interval): + results = {} + for metric in metric_names: + response = self.metric_client.list_time_series( + name=f"projects/{self.project}", + filter=filter_str + % { + "metric": metric, + "grpc_method": _GRPC_METHOD_NAME, + }, + interval=interval, + view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + ) + time_series = [] + logger.debug("Metric: %s", metric) + for series in response: + logger.debug(" Metric Labels:") + for label_key, label_value in series.metric.labels.items(): + logger.debug( + " %(label_key)s: %(label_value)s", + { + "label_key": label_key, + "label_value": label_value, + }, + ) + logger.debug(" Resource Labels:") + for label_key, label_value in series.resource.labels.items(): + logger.debug( + " %(label_key)s: %(label_value)s", + { + "label_key": label_key, + "label_value": label_value, + }, + ) + time_series.append(series) + results[metric] = time_series + return results + + # A helper function to check whether at least one of the "points" whose + # mean should be within 5% of ref_bytes. + def _at_least_one_point_within_range(self, points, ref_bytes) -> bool: + for point in points: + if point.value.distribution_value.mean > ( + ref_bytes * 0.95 + ) and point.value.distribution_value.mean < (ref_bytes * 1.05): + return True + return False + + def _is_server_metric(self, metric_name) -> bool: + return "server" in metric_name + + def _is_client_metric(self, metric_name) -> bool: + return "client" in metric_name + def test_csm_observability(self): # TODO(sergiitk): [GAMMA] Consider moving out custom gamma # resource creation out of self.startTestServers() with self.subTest("1_run_test_server"): + start_secs = int(time.time()) test_server: _XdsTestServer = self.startTestServers( - enable_csm_observability=True + enable_csm_observability=True, + csm_workload_name=_CSM_WORKLOAD_NAME_SERVER, + csm_canonical_service_name=_CSM_CANONICAL_SERVICE_NAME_SERVER, )[0] with self.subTest("2_start_test_client"): test_client: _XdsTestClient = self.startTestClient( - test_server, enable_csm_observability=True + test_server, + enable_csm_observability=True, + csm_workload_name=_CSM_WORKLOAD_NAME_CLIENT, + csm_canonical_service_name=_CSM_CANONICAL_SERVICE_NAME_CLIENT, + request_payload_size=_REQUEST_PAYLOAD_SIZE, + response_payload_size=_RESPONSE_PAYLOAD_SIZE, + ) + logger.info( + "Letting test client run for %d seconds", _TEST_RUN_SECS ) + time.sleep(_TEST_RUN_SECS) with self.subTest("3_test_server_received_rpcs_from_test_client"): self.assertSuccessfulRpcs(test_client) - # For now, this just makes a bogus call to ensure metrics client - # connected to the remote API service. - with self.subTest("4_check_monitoring_metric_client"): - with self.assertRaises(gapi_errors.GoogleAPICallError) as cm: - self.metric_client.list_metric_descriptors( - request=monitoring_v3.ListMetricDescriptorsRequest( - name="whatever", + with self.subTest("4_query_monitoring_metric_client"): + end_secs = int(time.time()) + interval = monitoring_v3.TimeInterval( + { + "end_time": {"seconds": end_secs, "nanos": 0}, + "start_time": {"seconds": start_secs, "nanos": 0}, + } + ) + # + # The list_time_series API requires us to query one metric + # at a time. + # + # The 'grpc_status = "OK"' filter condition is needed because + # some time series data points were logged when the grpc_status + # was "UNAVAILABLE" when the client/server were establishing + # connections. + # + # The 'grpc_method' filter condition is needed because the + # server metrics are also serving on the Channelz requests. + # + filter_str = ( + 'metric.type = "%(metric)s" AND ' + 'metric.labels.grpc_status = "OK" AND ' + 'metric.labels.grpc_method = "%(grpc_method)s"' + ) + histogram_results = self._query_metrics( + _HISTOGRAM_METRICS, filter_str, interval + ) + + # The num rpcs started counter metrics do not have the + # 'grpc_status' label + filter_str = ( + 'metric.type = "%(metric)s" AND ' + 'metric.labels.grpc_method = "%(grpc_method)s"' + ) + counter_results = self._query_metrics( + _COUNTER_METRICS, filter_str, interval + ) + + all_results = {**histogram_results, **counter_results} + + with self.subTest("5_check_metrics_time_series"): + for metric, time_series in all_results.items(): + # There should be exactly 1 time series per metric with these + # label value combinations that we are expecting. + self.assertEqual(1, len(time_series)) + + # Testing whether each metric has the right set of metric label keys + with self.subTest("6_check_metrics_label_keys"): + for metric in _HISTOGRAM_CLIENT_METRICS: + metric_labels = all_results[metric][0].metric.labels + for label_key in [ + "csm_mesh_id", + "csm_remote_workload_canonical_service", + "csm_remote_workload_cluster_name", + "csm_remote_workload_location", + "csm_remote_workload_name", + "csm_remote_workload_namespace_name", + "csm_remote_workload_project_id", + "csm_remote_workload_type", + "csm_service_name", + "csm_service_namespace_name", + "csm_workload_canonical_service", + "grpc_method", + "grpc_status", + "grpc_target", + "otel_scope_name", + "otel_scope_version", + "pod", + ]: + self.assertIn(label_key, metric_labels) + + for metric in _HISTOGRAM_SERVER_METRICS: + metric_labels = all_results[metric][0].metric.labels + for label_key in [ + "csm_mesh_id", + "csm_remote_workload_canonical_service", + "csm_remote_workload_cluster_name", + "csm_remote_workload_location", + "csm_remote_workload_name", + "csm_remote_workload_namespace_name", + "csm_remote_workload_project_id", + "csm_remote_workload_type", + "csm_workload_canonical_service", + "grpc_method", + "grpc_status", + "otel_scope_name", + "otel_scope_version", + "pod", + ]: + self.assertIn(label_key, metric_labels) + + for metric in _COUNTER_CLIENT_METRICS: + metric_labels = all_results[metric][0].metric.labels + for label_key in [ + "grpc_method", + "grpc_target", + "otel_scope_name", + "otel_scope_version", + "pod", + ]: + self.assertIn(label_key, metric_labels) + + for metric in _COUNTER_SERVER_METRICS: + metric_labels = all_results[metric][0].metric.labels + for label_key in [ + "grpc_method", + "otel_scope_name", + "otel_scope_version", + "pod", + ]: + self.assertIn(label_key, metric_labels) + + # Testing whether each metric has the right set of monitored resource + # label keys + with self.subTest("7_check_resource_label_keys"): + # all metrics should have the same set of monitored resource labels + # which come from the GMP job + for metric in _ALL_METRICS: + resource_labels = all_results[metric][0].resource.labels + for label_key in [ + "cluster", + "instance", + "job", + "location", + "namespace", + "project_id", + ]: + self.assertIn(label_key, resource_labels) + + # Testing the values of metric labels + with self.subTest("8_check_metrics_label_values"): + for metric, time_series in all_results.items(): + series = time_series[0] + self.assertEqual(metric, series.metric.type) + for label_key, label_value in series.metric.labels.items(): + if label_key == "pod" and self._is_client_metric(metric): + # client pod name + self.assertEqual(test_client.hostname, label_value) + elif label_key == "pod" and self._is_server_metric(metric): + # server pod name + self.assertEqual(test_server.hostname, label_value) + elif label_key == "grpc_method": + self.assertEqual(_GRPC_METHOD_NAME, label_value) + elif label_key == "grpc_target": + # server namespace should be somewhere in the xds + # server target + self.assertIn(self.server_namespace, label_value) + elif ( + label_key == "csm_remote_workload_canonical_service" + and self._is_client_metric(metric) + ): + # the remote workload canonical service name + # for the client is the one we set in the server + self.assertEqual( + _CSM_CANONICAL_SERVICE_NAME_SERVER, label_value + ) + elif ( + label_key == "csm_remote_workload_canonical_service" + and self._is_server_metric(metric) + ): + # the remote workload canonical service name + # for the server is the one we set in the client + self.assertEqual( + _CSM_CANONICAL_SERVICE_NAME_CLIENT, label_value + ) + elif label_key == "csm_remote_workload_cluster_name": + # the cluster name should be somewhere in the + # kube context + self.assertIn( + label_value, xds_k8s_flags.KUBE_CONTEXT.value + ) + elif label_key == "csm_remote_workload_location": + # the location should be somewhere in the kube context + self.assertIn( + label_value, xds_k8s_flags.KUBE_CONTEXT.value + ) + elif ( + label_key == "csm_remote_workload_name" + and self._is_client_metric(metric) + ): + # the remote workload name for the client is the name + # we set on the server + self.assertEqual(_CSM_WORKLOAD_NAME_SERVER, label_value) + elif ( + label_key == "csm_remote_workload_name" + and self._is_server_metric(metric) + ): + # the remote workload name for the server is the name + # we set on the client + self.assertEqual(_CSM_WORKLOAD_NAME_CLIENT, label_value) + elif ( + label_key == "csm_remote_workload_namespace_name" + and self._is_client_metric(metric) + ): + # the server namespace name + self.assertEqual(self.server_namespace, label_value) + elif ( + label_key == "csm_remote_workload_namespace_name" + and self._is_server_metric(metric) + ): + # the client namespace name + self.assertEqual(self.client_namespace, label_value) + elif label_key == "csm_remote_workload_project_id": + self.assertEqual(self.project, label_value) + elif label_key == "csm_remote_workload_type": + # a hardcoded value + self.assertEqual("gcp_kubernetes_engine", label_value) + elif label_key == "csm_service_name": + # the service name + self.assertEqual( + self.server_runner.service_name, label_value + ) + elif label_key == "csm_service_namespace_name": + # the server namespace name + self.assertEqual(self.server_namespace, label_value) + elif ( + label_key == "csm_workload_canonical_service" + and self._is_client_metric(metric) + ): + # the client workload canonical service name + self.assertEqual( + _CSM_CANONICAL_SERVICE_NAME_CLIENT, label_value + ) + elif ( + label_key == "csm_workload_canonical_service" + and self._is_server_metric(metric) + ): + # the server workload canonical service name + self.assertEqual( + _CSM_CANONICAL_SERVICE_NAME_SERVER, label_value + ) + + # Testing the values of monitored resource labels + with self.subTest("9_check_resource_label_values"): + for metric, time_series in all_results.items(): + series = time_series[0] + self.assertEqual("prometheus_target", series.resource.type) + for label_key, label_value in series.resource.labels.items(): + if label_key == "project_id": + self.assertEqual(self.project, label_value) + elif label_key == "namespace" and self._is_client_metric( + metric + ): + # client namespace + self.assertEqual(self.client_namespace, label_value) + elif label_key == "namespace" and self._is_server_metric( + metric + ): + # server namespace + self.assertEqual(self.server_namespace, label_value) + elif label_key == "job" and self._is_client_metric(metric): + # the "job" label on the monitored resource refers to + # the GMP PodMonitoring resource name + self.assertEqual( + self.client_runner.pod_monitoring_name, + label_value, + ) + elif label_key == "job" and self._is_server_metric(metric): + # the "job" label on the monitored resource refers to + # the GMP PodMonitoring resource name + self.assertEqual( + self.server_runner.pod_monitoring_name, + label_value, + ) + elif label_key == "cluster": + self.assertIn( + label_value, xds_k8s_flags.KUBE_CONTEXT.value + ) + elif label_key == "instance" and self._is_client_metric( + metric + ): + # the "instance" label on the monitored resource refers + # to the GKE pod and port + self.assertIn(test_client.hostname, label_value) + elif label_key == "instance" and self._is_server_metric( + metric + ): + # the "instance" label on the monitored resource refers + # to the GKE pod and port + self.assertIn(test_server.hostname, label_value) + + # This tests whether each of the "byes sent" histogram type metric + # should have at least 1 data point whose mean should converge to be + # close to the number of bytes being sent by the RPCs. + with self.subTest("10_check_bytes_sent_vs_data_points"): + for metric in _CLIENT_SENT_METRICS: + self.assertTrue( + self._at_least_one_point_within_range( + all_results[metric][0].points, _REQUEST_PAYLOAD_SIZE + ) + ) + + for metric in _SERVER_SENT_METRICS: + self.assertTrue( + self._at_least_one_point_within_range( + all_results[metric][0].points, _RESPONSE_PAYLOAD_SIZE ) ) - err = cm.exception - self.assertIsInstance(err, gapi_errors.InvalidArgument) - self.assertIsNotNone(err.grpc_status_code) - self.assertStartsWith(err.message, "Name must begin with") - self.assertEndsWith(err.message, " got: whatever") if __name__ == "__main__":