Skip to content

Commit

Permalink
Review changes
Browse files Browse the repository at this point in the history
- import unittest.mock and typing
- add BuildQueryFn to templatize query filter string
- bag of expected_metric_labels only need to defined once per group of metrics
- tuple instead of list for hardcoded list
  • Loading branch information
stanley-cheung committed Jan 31, 2024
1 parent dc956ba commit c25bfb2
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 113 deletions.
2 changes: 2 additions & 0 deletions kubernetes-manifests/client.deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ spec:
- name: CSM_CANONICAL_SERVICE_NAME
value: ${csm_canonical_service_name}
% endif
% if enable_csm_observability:
- name: POD_NAME
valueFrom:
fieldRef:
Expand All @@ -84,6 +85,7 @@ spec:
fieldPath: metadata.namespace
- name: OTEL_RESOURCE_ATTRIBUTES
value: k8s.pod.name=$(POD_NAME),k8s.namespace.name=$(NAMESPACE_NAME)
% endif
volumeMounts:
- mountPath: /tmp/grpc-xds/
name: grpc-td-conf
Expand Down
2 changes: 2 additions & 0 deletions kubernetes-manifests/server.deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ spec:
- name: CSM_CANONICAL_SERVICE_NAME
value: ${csm_canonical_service_name}
% endif
% if enable_csm_observability:
- name: POD_NAME
valueFrom:
fieldRef:
Expand All @@ -70,6 +71,7 @@ spec:
fieldPath: metadata.namespace
- name: OTEL_RESOURCE_ATTRIBUTES
value: k8s.pod.name=$(POD_NAME),k8s.namespace.name=$(NAMESPACE_NAME)
% endif
volumeMounts:
- mountPath: /tmp/grpc-xds/
name: grpc-td-conf
Expand Down
226 changes: 113 additions & 113 deletions tests/gamma/csm_observability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
# limitations under the License.
import logging
import time
import unittest
from typing import Callable, Iterable
import unittest.mock

from absl import flags
from absl.testing import absltest
Expand Down Expand Up @@ -103,21 +104,51 @@ def setUpClass(cls):
super().setUpClass()
cls.metric_client = cls.gcp_api_manager.monitoring_metric_service("v3")

# query filter string to be passed to the Monitoring API
def build_filter_str(self, filter_str_template, replacements):
return filter_str_template % replacements
BuildQueryFn = Callable[[str], str]

@classmethod
def _build_histogram_query(cls, metric_type: str) -> str:
#
# The list_time_series API requires us to query one metric
# at a time.
#
# The 'grpc_status = "OK"' filter condition is needed because
# some time series data points were logged when the grpc_status
# was "UNAVAILABLE" when the client/server were establishing
# connections.
#
# The 'grpc_method' filter condition is needed because the
# server metrics are also serving on the Channelz requests.
#
return (
f'metric.type = "{metric_type}" AND '
'metric.labels.grpc_status = "OK" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
)

@classmethod
def _build_counter_query(cls, metric_type: str) -> str:
# For these num rpcs started counter metrics, they do not have the
# 'grpc_status' label
return (
f'metric.type = "{metric_type}" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
)

# A helper function to make the cloud monitoring API call to query metrics
# created by this test run.
def query_metrics(self, metric_names, filter_str_template, interval):
def query_metrics(
self,
metric_names: Iterable[str],
build_query_fn: BuildQueryFn,
interval: monitoring_v3.TimeInterval,
):
results = {}
metric_log_lines = [""]
for metric in metric_names:
response = self.metric_client.list_time_series(
name=f"projects/{self.project}",
filter=self.build_filter_str(
filter_str_template, {"metric": metric}
),
filter=build_query_fn(metric),
interval=interval,
view=monitoring_v3.ListTimeSeriesRequest.TimeSeriesView.FULL,
)
Expand Down Expand Up @@ -185,37 +216,12 @@ def test_csm_observability(self):
"start_time": {"seconds": start_secs, "nanos": 0},
}
)
#
# The list_time_series API requires us to query one metric
# at a time.
#
# The 'grpc_status = "OK"' filter condition is needed because
# some time series data points were logged when the grpc_status
# was "UNAVAILABLE" when the client/server were establishing
# connections.
#
# The 'grpc_method' filter condition is needed because the
# server metrics are also serving on the Channelz requests.
#
filter_str_template = (
'metric.type = "%(metric)s" AND '
'metric.labels.grpc_status = "OK" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
)
histogram_results = self.query_metrics(
HISTOGRAM_METRICS, filter_str_template, interval
)

# The num rpcs started counter metrics do not have the
# 'grpc_status' label
filter_str_template = (
'metric.type = "%(metric)s" AND '
f'metric.labels.grpc_method = "{GRPC_METHOD_NAME}"'
HISTOGRAM_METRICS, self._build_histogram_query, interval
)
counter_results = self.query_metrics(
COUNTER_METRICS, filter_str_template, interval
COUNTER_METRICS, self._build_counter_query, interval
)

all_results = {**histogram_results, **counter_results}
self.assertNotEmpty(all_results, msg="No query metrics results")

Expand All @@ -232,27 +238,26 @@ def test_csm_observability(self):
# Testing whether each metric has the correct set of metric keys and
# values
with self.subTest("6_check_metrics_labels"):
expected_metric_labels = {
"csm_mesh_id": unittest.mock.ANY,
"csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER,
"csm_remote_workload_cluster_name": unittest.mock.ANY,
"csm_remote_workload_location": unittest.mock.ANY,
"csm_remote_workload_name": CSM_WORKLOAD_NAME_SERVER,
"csm_remote_workload_namespace_name": self.server_namespace,
"csm_remote_workload_project_id": self.project,
"csm_remote_workload_type": "gcp_kubernetes_engine",
"csm_service_name": self.server_runner.service_name,
"csm_service_namespace_name": self.server_namespace,
"csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT,
"grpc_method": GRPC_METHOD_NAME,
"grpc_status": "OK",
"grpc_target": unittest.mock.ANY,
"otel_scope_name": unittest.mock.ANY,
"otel_scope_version": unittest.mock.ANY,
"pod": test_client.hostname,
}
for metric in HISTOGRAM_CLIENT_METRICS:
expected_metric_labels = {
"csm_mesh_id": unittest.mock.ANY,
"csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER,
"csm_remote_workload_cluster_name": unittest.mock.ANY,
"csm_remote_workload_location": unittest.mock.ANY,
"csm_remote_workload_name": CSM_WORKLOAD_NAME_SERVER,
"csm_remote_workload_namespace_name": self.server_namespace,
"csm_remote_workload_project_id": self.project,
"csm_remote_workload_type": "gcp_kubernetes_engine",
"csm_service_name": self.server_runner.service_name,
"csm_service_namespace_name": self.server_namespace,
"csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT,
"grpc_method": GRPC_METHOD_NAME,
"grpc_status": "OK",
"grpc_target": unittest.mock.ANY,
"otel_scope_name": unittest.mock.ANY,
"otel_scope_version": unittest.mock.ANY,
"pod": test_client.hostname,
}

actual_metric_labels = all_results[metric][0].metric.labels
self.assertDictEqual(
expected_metric_labels, dict(actual_metric_labels)
Expand All @@ -261,24 +266,23 @@ def test_csm_observability(self):
# Testing whether each metric has the correct set of metric keys and
# values
with self.subTest("7_check_metrics_labels"):
expected_metric_labels = {
"csm_mesh_id": unittest.mock.ANY,
"csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT,
"csm_remote_workload_cluster_name": unittest.mock.ANY,
"csm_remote_workload_location": unittest.mock.ANY,
"csm_remote_workload_name": CSM_WORKLOAD_NAME_CLIENT,
"csm_remote_workload_namespace_name": self.client_namespace,
"csm_remote_workload_project_id": self.project,
"csm_remote_workload_type": "gcp_kubernetes_engine",
"csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER,
"grpc_method": GRPC_METHOD_NAME,
"grpc_status": "OK",
"otel_scope_name": unittest.mock.ANY,
"otel_scope_version": unittest.mock.ANY,
"pod": test_server.hostname,
}
for metric in HISTOGRAM_SERVER_METRICS:
expected_metric_labels = {
"csm_mesh_id": unittest.mock.ANY,
"csm_remote_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_CLIENT,
"csm_remote_workload_cluster_name": unittest.mock.ANY,
"csm_remote_workload_location": unittest.mock.ANY,
"csm_remote_workload_name": CSM_WORKLOAD_NAME_CLIENT,
"csm_remote_workload_namespace_name": self.client_namespace,
"csm_remote_workload_project_id": self.project,
"csm_remote_workload_type": "gcp_kubernetes_engine",
"csm_workload_canonical_service": CSM_CANONICAL_SERVICE_NAME_SERVER,
"grpc_method": GRPC_METHOD_NAME,
"grpc_status": "OK",
"otel_scope_name": unittest.mock.ANY,
"otel_scope_version": unittest.mock.ANY,
"pod": test_server.hostname,
}

actual_metric_labels = all_results[metric][0].metric.labels
self.assertDictEqual(
expected_metric_labels, dict(actual_metric_labels)
Expand All @@ -287,15 +291,14 @@ def test_csm_observability(self):
# Testing whether each metric has the correct set of metric keys and
# values
with self.subTest("8_check_metrics_labels"):
expected_metric_labels = {
"grpc_method": GRPC_METHOD_NAME,
"grpc_target": unittest.mock.ANY,
"otel_scope_name": unittest.mock.ANY,
"otel_scope_version": unittest.mock.ANY,
"pod": test_client.hostname,
}
for metric in COUNTER_CLIENT_METRICS:
expected_metric_labels = {
"grpc_method": GRPC_METHOD_NAME,
"grpc_target": unittest.mock.ANY,
"otel_scope_name": unittest.mock.ANY,
"otel_scope_version": unittest.mock.ANY,
"pod": test_client.hostname,
}

actual_metric_labels = all_results[metric][0].metric.labels
self.assertDictEqual(
expected_metric_labels, dict(actual_metric_labels)
Expand All @@ -304,14 +307,13 @@ def test_csm_observability(self):
# Testing whether each metric has the correct set of metric keys and
# values
with self.subTest("9_check_metrics_labels"):
expected_metric_labels = {
"grpc_method": GRPC_METHOD_NAME,
"otel_scope_name": unittest.mock.ANY,
"otel_scope_version": unittest.mock.ANY,
"pod": test_server.hostname,
}
for metric in COUNTER_SERVER_METRICS:
expected_metric_labels = {
"grpc_method": GRPC_METHOD_NAME,
"otel_scope_name": unittest.mock.ANY,
"otel_scope_version": unittest.mock.ANY,
"pod": test_server.hostname,
}

actual_metric_labels = all_results[metric][0].metric.labels
self.assertDictEqual(
expected_metric_labels, dict(actual_metric_labels)
Expand All @@ -320,21 +322,20 @@ def test_csm_observability(self):
# Testing whether each metric has the right set of monitored resource
# label keys and values
with self.subTest("10_check_client_resource_labels"):
# all metrics should have the same set of monitored resource labels
# keys, which come from the GMP job
expected_resource_labels = {
"cluster": unittest.mock.ANY,
"instance": unittest.mock.ANY,
"job": self.client_runner.pod_monitoring_name,
"location": unittest.mock.ANY,
"namespace": self.client_namespace,
"project_id": self.project,
}
for metric in CLIENT_METRICS:
time_series = all_results[metric][0]
self.assertEqual("prometheus_target", time_series.resource.type)

# all metrics should have the same set of monitored resource labels
# keys, which come from the GMP job
expected_resource_labels = {
"cluster": unittest.mock.ANY,
"instance": unittest.mock.ANY,
"job": self.client_runner.pod_monitoring_name,
"location": unittest.mock.ANY,
"namespace": self.client_namespace,
"project_id": self.project,
}

actual_resource_labels = time_series.resource.labels
self.assertDictEqual(
expected_resource_labels, dict(actual_resource_labels)
Expand All @@ -343,21 +344,20 @@ def test_csm_observability(self):
# Testing whether each metric has the right set of monitored resource
# label keys and values
with self.subTest("11_check_server_resource_labels"):
# all metrics should have the same set of monitored resource labels
# keys, which come from the GMP job
expected_resource_labels = {
"cluster": unittest.mock.ANY,
"instance": unittest.mock.ANY,
"job": self.server_runner.pod_monitoring_name,
"location": unittest.mock.ANY,
"namespace": self.server_namespace,
"project_id": self.project,
}
for metric in SERVER_METRICS:
time_series = all_results[metric][0]
self.assertEqual("prometheus_target", time_series.resource.type)

# all metrics should have the same set of monitored resource labels
# keys, which come from the GMP job
expected_resource_labels = {
"cluster": unittest.mock.ANY,
"instance": unittest.mock.ANY,
"job": self.server_runner.pod_monitoring_name,
"location": unittest.mock.ANY,
"namespace": self.server_namespace,
"project_id": self.project,
}

actual_resource_labels = time_series.resource.labels
self.assertDictEqual(
expected_resource_labels, dict(actual_resource_labels)
Expand All @@ -367,18 +367,18 @@ def test_csm_observability(self):
# should have at least 1 data point whose mean should converge to be
# close to the number of bytes being sent by the RPCs.
with self.subTest("12_check_bytes_sent_vs_data_points"):
for metric in [
for metric in (
METRIC_CLIENT_ATTEMPT_SENT,
METRIC_SERVER_CALL_RCVD,
]:
):
self.assertAtLeastOnePointWithinRange(
all_results[metric][0].points, REQUEST_PAYLOAD_SIZE
)

for metric in [
for metric in (
METRIC_CLIENT_ATTEMPT_RCVD,
METRIC_SERVER_CALL_SENT,
]:
):
self.assertAtLeastOnePointWithinRange(
all_results[metric][0].points, RESPONSE_PAYLOAD_SIZE
)
Expand Down

0 comments on commit c25bfb2

Please sign in to comment.