diff --git a/framework/test_app/runners/k8s/gamma_server_runner.py b/framework/test_app/runners/k8s/gamma_server_runner.py index 745c6b8c..716a500c 100644 --- a/framework/test_app/runners/k8s/gamma_server_runner.py +++ b/framework/test_app/runners/k8s/gamma_server_runner.py @@ -38,6 +38,8 @@ class GammaServerRunner(KubernetesServerRunner): be_policy: Optional[k8s.GcpBackendPolicy] = None termination_grace_period_seconds: Optional[int] = None pre_stop_hook: bool = False + pod_monitoring: Optional[k8s.PodMonitoring] = None + pod_monitoring_name: Optional[str] = None route_name: str frontend_service_name: str @@ -210,6 +212,17 @@ def run( # pylint: disable=arguments-differ enable_csm_observability=enable_csm_observability, ) + # Create a PodMonitoring resource if CSM Observability is enabled + # This is GMP (Google Managed Prometheus) + if enable_csm_observability: + self.pod_monitoring_name = f"{self.deployment_id}-gmp" + self.pod_monitoring = self._create_pod_monitoring( + "csm/pod-monitoring.yaml", + namespace_name=self.k8s_namespace.name, + deployment_id=self.deployment_id, + pod_monitoring_name=self.pod_monitoring_name, + ) + servers = self._make_servers_for_deployment( replica_count, test_port=test_port, @@ -296,6 +309,12 @@ def cleanup(self, *, force=False, force_namespace=False): self._delete_service_account(self.service_account_name) self.service_account = None + # Pod monitoring name is only set when CSM observability is enabled. + if self.pod_monitoring_name and (self.pod_monitoring or force): + self._delete_pod_monitoring(self.pod_monitoring_name) + self.pod_monitoring = None + self.pod_monitoring_name = None + self._cleanup_namespace(force=(force_namespace and force)) finally: self._stop() diff --git a/tests/gamma/csm_observability_test.py b/tests/gamma/csm_observability_test.py index 08df75fe..29fd3737 100644 --- a/tests/gamma/csm_observability_test.py +++ b/tests/gamma/csm_observability_test.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +import time from absl import flags from absl.testing import absltest @@ -19,6 +20,7 @@ from google.cloud import monitoring_v3 from framework import xds_gamma_testcase +from framework import xds_k8s_flags from framework import xds_k8s_testcase from framework.helpers import skips @@ -29,6 +31,47 @@ _XdsTestClient = xds_k8s_testcase.XdsTestClient _Lang = skips.Lang +# Testing consts +_TEST_RUN_SECS = 90 +_CLIENT_SENT_BYTES = 271828 +_SERVER_SENT_BYTES = 314159 +_GRPC_METHOD_NAME = "grpc.testing.TestService/UnaryCall" +_HISTOGRAM_METRICS = [ + "prometheus.googleapis.com/grpc_client_attempt_sent_total_compressed_message_size_bytes/histogram", + "prometheus.googleapis.com/grpc_client_attempt_rcvd_total_compressed_message_size_bytes/histogram", + "prometheus.googleapis.com/grpc_client_attempt_duration_seconds/histogram", + "prometheus.googleapis.com/grpc_server_call_duration_seconds/histogram", + "prometheus.googleapis.com/grpc_server_call_rcvd_total_compressed_message_size_bytes/histogram", + "prometheus.googleapis.com/grpc_server_call_sent_total_compressed_message_size_bytes/histogram", +] +_COUNTER_METRICS = [ + "prometheus.googleapis.com/grpc_client_attempt_started_total/counter", + "prometheus.googleapis.com/grpc_server_call_started_total/counter", +] +_ALL_METRICS = _HISTOGRAM_METRICS + _COUNTER_METRICS +_HISTOGRAM_CLIENT_METRICS = [ + metric for metric in _HISTOGRAM_METRICS if "client" in metric +] +_HISTOGRAM_SERVER_METRICS = [ + metric for metric in _HISTOGRAM_METRICS if "server" in metric +] +_COUNTER_CLIENT_METRICS = [ + metric for metric in _COUNTER_METRICS if "client" in metric +] +_COUNTER_SERVER_METRICS = [ + metric for metric in _COUNTER_METRICS if "server" in metric +] +_CLIENT_SENT_METRICS = [ + metric + for metric in _HISTOGRAM_METRICS + if "client_attempt_sent" in metric or "server_call_rcvd" in metric +] +_SERVER_SENT_METRICS = [ + metric + for metric in _HISTOGRAM_METRICS + if "client_attempt_rcvd" in metric or "server_call_sent" in metric +] + class CsmObservabilityTest(xds_gamma_testcase.GammaXdsKubernetesTestCase): metric_client: monitoring_v3.MetricServiceClient @@ -45,36 +88,249 @@ def setUpClass(cls): super().setUpClass() cls.metric_client = cls.gcp_api_manager.monitoring_metric_service("v3") + def _query_metrics(self, metric_names, filter_str, interval): + results = {} + for metric in metric_names: + response = self.metric_client.list_time_series( + name=f"projects/{self.project}", + filter=filter_str % (metric, _GRPC_METHOD_NAME), + interval=interval, + view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL, + ) + time_series = [] + for series in response: + time_series.append(series) + results[metric] = time_series + return results + + def _at_least_one_point_within_range(self, points, ref_bytes): + for point in points: + if point.value.distribution_value.mean > ( + ref_bytes * 0.95 + ) and point.value.distribution_value.mean < (ref_bytes * 1.05): + return True + return False + def test_csm_observability(self): # TODO(sergiitk): [GAMMA] Consider moving out custom gamma # resource creation out of self.startTestServers() with self.subTest("1_run_test_server"): + start_secs = int(time.time()) test_server: _XdsTestServer = self.startTestServers( enable_csm_observability=True )[0] with self.subTest("2_start_test_client"): test_client: _XdsTestClient = self.startTestClient( - test_server, enable_csm_observability=True + test_server, + enable_csm_observability=True, + request_payload_size=_CLIENT_SENT_BYTES, + response_payload_size=_SERVER_SENT_BYTES, + ) + logger.info( + "Letting test client run for %d seconds", _TEST_RUN_SECS ) + time.sleep(_TEST_RUN_SECS) with self.subTest("3_test_server_received_rpcs_from_test_client"): self.assertSuccessfulRpcs(test_client) - # For now, this just makes a bogus call to ensure metrics client - # connected to the remote API service. - with self.subTest("4_check_monitoring_metric_client"): - with self.assertRaises(gapi_errors.GoogleAPICallError) as cm: - self.metric_client.list_metric_descriptors( - request=monitoring_v3.ListMetricDescriptorsRequest( - name="whatever", + with self.subTest("4_query_monitoring_metric_client"): + end_secs = int(time.time()) + interval = monitoring_v3.TimeInterval( + { + "end_time": {"seconds": end_secs, "nanos": 0}, + "start_time": {"seconds": start_secs, "nanos": 0}, + } + ) + # + # The list_time_series API requires us to query one metric + # at a time. + # + # The 'grpc_status = "OK"' filter condition is needed because + # some time series data points were logged when the grpc_status + # was "UNAVAILABLE" when the client/server were establishing + # connections. + # + # The 'grpc_method' filter condition is needed because the + # server metrics are also serving on the Channelz requests. + # + filter_str = ( + 'metric.type = "%s" AND ' + 'metric.labels.grpc_status = "OK" AND ' + 'metric.labels.grpc_method = "%s"' + ) + histogram_results = self._query_metrics( + _HISTOGRAM_METRICS, filter_str, interval + ) + + # The counter metrics do not have the 'grpc_status' label + filter_str = ( + 'metric.type = "%s" AND ' 'metric.labels.grpc_method = "%s"' + ) + counter_results = self._query_metrics( + _COUNTER_METRICS, filter_str, interval + ) + + all_results = {**histogram_results, **counter_results} + + with self.subTest("5_check_metrics_time_series"): + for metric, time_series in all_results.items(): + # There should be exactly 1 time series with these label value + # combinations that we are expecting. + self.assertEqual(1, len(time_series)) + + # Testing whether the metrics have the right set of metrics label keys + with self.subTest("6_check_metrics_labels"): + for metric in _HISTOGRAM_CLIENT_METRICS: + metric_labels = all_results[metric][0].metric.labels + for label_key in [ + "csm_mesh_id", + "csm_remote_workload_canonical_service", + "csm_remote_workload_type", + "csm_service_name", + "csm_service_namespace_name", + "csm_workload_canonical_service", + "grpc_method", + "grpc_status", + "grpc_target", + "otel_scope_name", + "otel_scope_version", + "pod", + ]: + self.assertIn(label_key, metric_labels) + + for metric in _HISTOGRAM_SERVER_METRICS: + metric_labels = all_results[metric][0].metric.labels + for label_key in [ + "grpc_method", + "grpc_status", + "otel_scope_name", + "otel_scope_version", + "pod", + ]: + self.assertIn(label_key, metric_labels) + + for metric in _COUNTER_CLIENT_METRICS: + metric_labels = all_results[metric][0].metric.labels + for label_key in [ + "grpc_method", + "grpc_target", + "otel_scope_name", + "otel_scope_version", + "pod", + ]: + self.assertIn(label_key, metric_labels) + + for metric in _COUNTER_SERVER_METRICS: + metric_labels = all_results[metric][0].metric.labels + for label_key in [ + "grpc_method", + "otel_scope_name", + "otel_scope_version", + "pod", + ]: + self.assertIn(label_key, metric_labels) + + # Testing whether the metrics have the right set of monitored resource + # label keys + with self.subTest("7_check_resource_labels"): + # all metrics have the same set of monitored resource labels + for metric in _ALL_METRICS: + resource_labels = all_results[metric][0].resource.labels + for label_key in [ + "cluster", + "instance", + "job", + "location", + "namespace", + "project_id", + ]: + self.assertIn(label_key, resource_labels) + + # Testing the values of metric labels + with self.subTest("8_check_metrics_label_values"): + for metric, time_series in all_results.items(): + series = time_series[0] + self.assertEqual(metric, series.metric.type) + for label_key, label_value in series.metric.labels.items(): + if label_key == "pod" and "client" in metric: + # client pod name + self.assertEqual(test_client.hostname, label_value) + elif label_key == "pod" and "server" in metric: + # server pod name + self.assertEqual(test_server.hostname, label_value) + elif label_key == "grpc_method": + self.assertEqual(_GRPC_METHOD_NAME, label_value) + elif label_key == "grpc_target": + # server namespace should be somewhere in the xds + # server target + self.assertIn(self.server_namespace, label_value) + elif label_key == "csm_service_name": + self.assertEqual( + self.server_runner.service_name, label_value + ) + elif label_key == "csm_service_namespace_name": + self.assertEqual(self.server_namespace, label_value) + + # Testing the values of monitored resource labels + with self.subTest("9_check_resource_label_values"): + for metric, time_series in all_results.items(): + series = time_series[0] + self.assertEqual("prometheus_target", series.resource.type) + for label_key, label_value in series.resource.labels.items(): + if label_key == "project_id": + self.assertEqual(self.project, label_value) + elif label_key == "namespace" and "client" in metric: + # client namespace + self.assertEqual(self.client_namespace, label_value) + elif label_key == "namespace" and "server" in metric: + # server namespace + self.assertEqual(self.server_namespace, label_value) + elif label_key == "job" and "client" in metric: + # the "job" label on the monitored resource refers to + # the GMP PodMonitoring resource + self.assertEqual( + self.client_runner.pod_monitoring_name, + label_value, + ) + elif label_key == "job" and "server" in metric: + # the "job" label on the monitored resource refers to + # the GMP PodMonitoring resource + self.assertEqual( + self.server_runner.pod_monitoring_name, + label_value, + ) + elif label_key == "cluster": + self.assertIn( + label_value, xds_k8s_flags.KUBE_CONTEXT.value + ) + elif label_key == "instance" and "client" in metric: + # the "instance" label on the monitored resource refers + # to the GKE pod and port + self.assertIn(test_client.hostname, label_value) + elif label_key == "instance" and "server" in metric: + # the "instance" label on the monitored resource refers + # to the GKE pod and port + self.assertIn(test_server.hostname, label_value) + + # This tests whether each metric should have at least 1 data point + # whose mean should converge to be close to the number of bytes being + # sent by the RPCs. + with self.subTest("10_check_bytes_sent_vs_data_points"): + for metric in _CLIENT_SENT_METRICS: + self.assertTrue( + self._at_least_one_point_within_range( + all_results[metric][0].points, _CLIENT_SENT_BYTES + ) + ) + + for metric in _SERVER_SENT_METRICS: + self.assertTrue( + self._at_least_one_point_within_range( + all_results[metric][0].points, _SERVER_SENT_BYTES ) ) - err = cm.exception - self.assertIsInstance(err, gapi_errors.InvalidArgument) - self.assertIsNotNone(err.grpc_status_code) - self.assertStartsWith(err.message, "Name must begin with") - self.assertEndsWith(err.message, " got: whatever") if __name__ == "__main__":