Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CSM O11y test] Ping GMP endpoint during test for debugging purpose #33

Merged
merged 29 commits into from
May 21, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
81b567e
[CSM O11y test] Ping GMP endpoint regularly for debug purpose
stanley-cheung Feb 7, 2024
d2c1e97
Use requests instead of pod exec
stanley-cheung Feb 7, 2024
916a6c9
Use port forwarding to ping GMP endpoint
stanley-cheung Feb 8, 2024
b9caa33
Add requests to requirements.txt
stanley-cheung Feb 8, 2024
3ab0f31
try catch RequestException
stanley-cheung Feb 8, 2024
73765f0
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung Feb 8, 2024
dc6c854
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung Feb 20, 2024
f11a723
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung Feb 20, 2024
54a4e73
Review changes
stanley-cheung Feb 21, 2024
d025083
remove unnecessary variable which broke unit test
stanley-cheung Feb 21, 2024
1c0f3dd
fix pylint for function signature
stanley-cheung Feb 21, 2024
90c533c
fixed variable initialization
stanley-cheung Feb 21, 2024
01f4c4d
add comment for temp debugging class
stanley-cheung Feb 21, 2024
a3e52dd
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung Feb 22, 2024
bcf43a0
Add a marker in the prometheus log files
stanley-cheung Feb 22, 2024
02ea1cc
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung Feb 29, 2024
d4fabf2
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung Mar 4, 2024
9d117f4
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung Mar 5, 2024
5a49e3a
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung Mar 5, 2024
bae1c81
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung Mar 5, 2024
e5d0270
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung Mar 5, 2024
c71a050
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung Mar 7, 2024
32bfb74
Merge branch 'main' into ping-gmp-endpoint
sergiitk May 10, 2024
3d0437f
Review feedback: check should_collect_logs flags first
stanley-cheung May 16, 2024
3020499
Merge branch 'main' into ping-gmp-endpoint
stanley-cheung May 16, 2024
19a9a2b
Merge branch 'ping-gmp-endpoint' of github.com:stanley-cheung/psm-int…
stanley-cheung May 16, 2024
2a4c603
Fully use ClientDeploymentArgs
stanley-cheung May 17, 2024
95edebd
ran black.sh
stanley-cheung May 17, 2024
0518a7d
Fix ClientDeploymentArgs initialization
stanley-cheung May 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions framework/test_app/runners/k8s/gamma_server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,13 @@ class GammaServerRunner(KubernetesServerRunner):
pre_stop_hook: bool = False
pod_monitoring: Optional[k8s.PodMonitoring] = None
pod_monitoring_name: Optional[str] = None
pod_monitoring_port: int = 9464
monitoring_port: Optional[int] = None
monitoring_host: Optional[str] = None

route_name: str
frontend_service_name: str
enable_csm_observability: bool
csm_workload_name: str
csm_canonical_service_name: str

Expand Down Expand Up @@ -77,6 +81,7 @@ def __init__(
bepolicy_name: str = "backend-policy",
termination_grace_period_seconds: int = 0,
pre_stop_hook: bool = False,
enable_csm_observability: bool = False,
csm_workload_name: str = "",
csm_canonical_service_name: str = "",
):
Expand Down Expand Up @@ -111,6 +116,7 @@ def __init__(
self.bepolicy_name = bepolicy_name
self.termination_grace_period_seconds = termination_grace_period_seconds
self.pre_stop_hook = pre_stop_hook
self.enable_csm_observability = enable_csm_observability
self.csm_workload_name = csm_workload_name
self.csm_canonical_service_name = csm_canonical_service_name

Expand All @@ -124,7 +130,6 @@ def run( # pylint: disable=arguments-differ
log_to_stdout: bool = False,
bootstrap_version: Optional[str] = None,
route_template: str = "gamma/route_http.yaml",
enable_csm_observability: bool = False,
generate_mesh_id: bool = False,
) -> List[XdsTestServer]:
if not maintenance_port:
Expand Down Expand Up @@ -210,21 +215,22 @@ def run( # pylint: disable=arguments-differ
bootstrap_version=bootstrap_version,
termination_grace_period_seconds=self.termination_grace_period_seconds,
pre_stop_hook=self.pre_stop_hook,
enable_csm_observability=enable_csm_observability,
enable_csm_observability=self.enable_csm_observability,
generate_mesh_id=generate_mesh_id,
csm_workload_name=self.csm_workload_name,
csm_canonical_service_name=self.csm_canonical_service_name,
)

# Create a PodMonitoring resource if CSM Observability is enabled
# This is GMP (Google Managed Prometheus)
if enable_csm_observability:
if self.enable_csm_observability:
self.pod_monitoring_name = f"{self.deployment_id}-gmp"
self.pod_monitoring = self._create_pod_monitoring(
"csm/pod-monitoring.yaml",
namespace_name=self.k8s_namespace.name,
deployment_id=self.deployment_id,
pod_monitoring_name=self.pod_monitoring_name,
pod_monitoring_port=self.pod_monitoring_port,
)

servers = self._make_servers_for_deployment(
Expand Down Expand Up @@ -271,6 +277,32 @@ def createBackendPolicy(self):
service_name=self.service_name,
)

def _xds_test_server_for_pod(
self,
pod: k8s.V1Pod,
*,
test_port: int = KubernetesServerRunner.DEFAULT_TEST_PORT,
maintenance_port: Optional[int] = None,
secure_mode: bool = False,
) -> XdsTestServer:
if self.enable_csm_observability:
if self.debug_use_port_forwarding:
pf = self._start_port_forwarding_pod(
pod, self.pod_monitoring_port
)
self.monitoring_port = pf.local_port
self.monitoring_host = pf.local_address
else:
self.monitoring_port = self.pod_monitoring_port
self.monitoring_host = pod.status.pod_ip

return super()._xds_test_server_for_pod(
pod=pod,
test_port=test_port,
maintenance_port=maintenance_port,
secure_mode=secure_mode,
)

# pylint: disable=arguments-differ
def cleanup(self, *, force=False, force_namespace=False):
try:
Expand Down
2 changes: 2 additions & 0 deletions framework/test_app/runners/k8s/k8s_base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def _create_pod_monitoring(
namespace_name: str,
deployment_id: str,
pod_monitoring_name: str,
pod_monitoring_port: int,
**kwargs,
) -> k8s.PodMonitoring:
pod_monitoring = self._create_from_template(
Expand All @@ -314,6 +315,7 @@ def _create_pod_monitoring(
namespace_name=namespace_name,
deployment_id=deployment_id,
pod_monitoring_name=pod_monitoring_name,
pod_monitoring_port=pod_monitoring_port,
**kwargs,
)
if not (
Expand Down
21 changes: 18 additions & 3 deletions framework/test_app/runners/k8s/k8s_xds_client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
debug_use_port_forwarding: bool
td_bootstrap_image: str
network: str
enable_csm_observability: bool
csm_workload_name: str
csm_canonical_service_name: str

Expand All @@ -43,6 +44,9 @@ class KubernetesClientRunner(k8s_base_runner.KubernetesBaseRunner):
gcp_iam: Optional[gcp.iam.IamV1] = None
pod_monitoring: Optional[k8s.PodMonitoring] = None
pod_monitoring_name: Optional[str] = None
pod_monitoring_port: int = 9464
monitoring_port: Optional[int] = None
monitoring_host: Optional[str] = None

def __init__( # pylint: disable=too-many-locals
self,
Expand All @@ -64,6 +68,7 @@ def __init__( # pylint: disable=too-many-locals
namespace_template: Optional[str] = None,
debug_use_port_forwarding: bool = False,
enable_workload_identity: bool = True,
enable_csm_observability: bool = False,
csm_workload_name: str = "",
csm_canonical_service_name: str = "",
):
Expand All @@ -83,6 +88,7 @@ def __init__( # pylint: disable=too-many-locals
self.deployment_template = deployment_template
self.enable_workload_identity = enable_workload_identity
self.debug_use_port_forwarding = debug_use_port_forwarding
self.enable_csm_observability = enable_csm_observability
self.csm_workload_name = csm_workload_name
self.csm_canonical_service_name = csm_canonical_service_name

Expand Down Expand Up @@ -112,7 +118,6 @@ def run( # pylint: disable=arguments-differ
generate_mesh_id=False,
print_response=False,
log_to_stdout: bool = False,
enable_csm_observability: bool = False,
request_payload_size: int = 0,
response_payload_size: int = 0,
) -> XdsTestClient:
Expand Down Expand Up @@ -171,20 +176,21 @@ def run( # pylint: disable=arguments-differ
config_mesh=config_mesh,
generate_mesh_id=generate_mesh_id,
print_response=print_response,
enable_csm_observability=enable_csm_observability,
enable_csm_observability=self.enable_csm_observability,
csm_workload_name=self.csm_workload_name,
csm_canonical_service_name=self.csm_canonical_service_name,
)

# Create a PodMonitoring resource if CSM Observability is enabled
# This is GMP (Google Managed Prometheus)
if enable_csm_observability:
if self.enable_csm_observability:
self.pod_monitoring_name = f"{self.deployment_id}-gmp"
self.pod_monitoring = self._create_pod_monitoring(
"csm/pod-monitoring.yaml",
namespace_name=self.k8s_namespace.name,
deployment_id=self.deployment_id,
pod_monitoring_name=self.pod_monitoring_name,
pod_monitoring_port=self.pod_monitoring_port,
)

# Load test client pod. We need only one client at the moment
Expand All @@ -205,8 +211,17 @@ def _xds_test_client_for_pod(
if self.debug_use_port_forwarding:
pf = self._start_port_forwarding_pod(pod, self.stats_port)
rpc_port, rpc_host = pf.local_port, pf.local_address
if self.enable_csm_observability:
pf = self._start_port_forwarding_pod(
pod, self.pod_monitoring_port
)
self.monitoring_port = pf.local_port
self.monitoring_host = pf.local_address
else:
rpc_port, rpc_host = self.stats_port, None
if self.enable_csm_observability:
self.monitoring_port = self.pod_monitoring_port
self.monitoring_host = pod.status.pod_ip

return XdsTestClient(
ip=pod.status.pod_ip,
Expand Down
2 changes: 1 addition & 1 deletion kubernetes-manifests/csm/pod-monitoring.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ spec:
matchLabels:
deployment_id: ${deployment_id}
endpoints:
- port: 9464
- port: ${pod_monitoring_port}
interval: 10s
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ packaging~=23.1
Pygments~=2.9
python-dateutil~=2.8
protobuf~=4.24
requests==2.31.0
xds-protos==1.58.0rc1
42 changes: 37 additions & 5 deletions tests/gamma/csm_observability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
from google.api_core import exceptions as gapi_errors
from google.api_core import retry as gapi_retries
from google.cloud import monitoring_v3
import requests
from requests.exceptions import RequestException
import yaml

from framework import xds_gamma_testcase
Expand Down Expand Up @@ -158,6 +160,7 @@ def setUpClass(cls):
# each run().
def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner:
return super().initKubernetesClientRunner(
enable_csm_observability=True,
csm_workload_name=CSM_WORKLOAD_NAME_CLIENT,
csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_CLIENT,
)
Expand All @@ -166,6 +169,7 @@ def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner:
# each run().
def initKubernetesServerRunner(self, **kwargs) -> GammaServerRunner:
return super().initKubernetesServerRunner(
enable_csm_observability=True,
csm_workload_name=CSM_WORKLOAD_NAME_SERVER,
csm_canonical_service_name=CSM_CANONICAL_SERVICE_NAME_SERVER,
)
Expand All @@ -175,14 +179,11 @@ def test_csm_observability(self):
# resource creation out of self.startTestServers()
with self.subTest("1_run_test_server"):
start_secs = int(time.time())
test_server: _XdsTestServer = self.startTestServers(
enable_csm_observability=True,
)[0]
test_server: _XdsTestServer = self.startTestServers()[0]

with self.subTest("2_start_test_client"):
test_client: _XdsTestClient = self.startTestClient(
test_server,
enable_csm_observability=True,
request_payload_size=REQUEST_PAYLOAD_SIZE,
response_payload_size=RESPONSE_PAYLOAD_SIZE,
)
Expand All @@ -195,7 +196,20 @@ def test_csm_observability(self):
"Letting test client run for %d seconds to produce metric data",
TEST_RUN_SECS,
)
time.sleep(TEST_RUN_SECS)
for i in range(0, TEST_RUN_SECS // 10):
time.sleep(10)
logger.info(
self.ping_gmp_endpoint(
self.server_runner.monitoring_host,
self.server_runner.monitoring_port,
)
)
logger.info(
self.ping_gmp_endpoint(
self.client_runner.monitoring_host,
self.client_runner.monitoring_port,
)
)
end_secs = int(time.time())
interval = monitoring_v3.TimeInterval(
start_time={"seconds": start_secs},
Expand Down Expand Up @@ -483,6 +497,24 @@ def assertAtLeastOnePointWithinRange(
f"No data point with {ref_bytes}±{tolerance*100}% bytes found"
)

def ping_gmp_endpoint(
self, monitoring_host: str, monitoring_port: int
) -> str:
"""
A helper function to ping the GMP endpoint to get what GMP sees
from the OTel exporter before passing metrics to Cloud Monitoring.
"""
try:
gmp_log = requests.get(
f"http://{monitoring_host}:{monitoring_port}/metrics"
)
return "\n".join(gmp_log.text.splitlines())
except RequestException as e:
logger.error("Http request to GMP endpoint failed: %r", e)
# It's OK the caller will receive nothing in case of an exception.
# Caller can continue.
return ""


if __name__ == "__main__":
absltest.main()
Loading