Skip to content

Commit

Permalink
PR review comment changes
Browse files Browse the repository at this point in the history
  • Loading branch information
stanley-cheung authored and sergiitk committed Jun 5, 2024
1 parent 0fed5a6 commit 78ed0fe
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 59 deletions.
45 changes: 7 additions & 38 deletions framework/test_app/runners/k8s/gamma_server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""
Run xDS Test Client on Kubernetes using Gamma
"""
import dataclasses
import datetime
import logging
from typing import Final, Optional
Expand All @@ -30,10 +31,14 @@
logger = logging.getLogger(__name__)


ServerDeploymentArgs = k8s_xds_server_runner.ServerDeploymentArgs
KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner


@dataclasses.dataclass(frozen=True)
class ServerDeploymentArgs(k8s_xds_server_runner.ServerDeploymentArgs):
"""Gamma Server Deployment Args"""


class GammaServerRunner(KubernetesServerRunner):
# Mutable state.
route: Optional[k8s.GammaHttpRoute] = None
Expand Down Expand Up @@ -207,17 +212,7 @@ def run( # pylint: disable=arguments-differ
**self.deployment_args.as_dict(),
)

# Create a PodMonitoring resource if CSM Observability is enabled
# This is GMP (Google Managed Prometheus)
if self.deployment_args.enable_csm_observability:
self.pod_monitoring_name = f"{self.deployment_id}-gmp"
self.pod_monitoring = self._create_pod_monitoring(
"csm/pod-monitoring.yaml",
namespace_name=self.k8s_namespace.name,
deployment_id=self.deployment_id,
pod_monitoring_name=self.pod_monitoring_name,
pod_monitoring_port=self.DEFAULT_MONITORING_PORT,
)
self._setup_csm_observability()

servers = self._make_servers_for_deployment(
replica_count,
Expand Down Expand Up @@ -282,32 +277,6 @@ def create_backend_policy(
draining_timeout_sec=draining_timeout_sec,
)

def _xds_test_server_for_pod(
self,
pod: k8s.V1Pod,
*,
test_port: int = KubernetesServerRunner.DEFAULT_TEST_PORT,
maintenance_port: Optional[int] = None,
secure_mode: bool = False,
monitoring_port: Optional[int] = None,
) -> XdsTestServer:
if self.deployment_args.enable_csm_observability:
if self.debug_use_port_forwarding:
pf = self._start_port_forwarding_pod(
pod, self.DEFAULT_MONITORING_PORT
)
monitoring_port = pf.local_port
else:
monitoring_port = self.DEFAULT_MONITORING_PORT

return super()._xds_test_server_for_pod(
pod=pod,
test_port=test_port,
maintenance_port=maintenance_port,
secure_mode=secure_mode,
monitoring_port=monitoring_port,
)

@override
def cleanup(self, *, force=False, force_namespace=False):
try:
Expand Down
2 changes: 1 addition & 1 deletion framework/test_app/runners/k8s/k8s_base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
# Pylint wants abstract classes to override abstract methods.
# pylint: disable=abstract-method

DEFAULT_MONITORING_PORT = 9464
DEFAULT_POD_MONITORING_PORT = 9464
TEMPLATE_DIR_NAME = "kubernetes-manifests"
TEMPLATE_DIR_RELATIVE_PATH = f"../../../../{TEMPLATE_DIR_NAME}"
ROLE_WORKLOAD_IDENTITY_USER = "roles/iam.workloadIdentityUser"
Expand Down
6 changes: 3 additions & 3 deletions framework/test_app/runners/k8s/k8s_xds_client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def run( # pylint: disable=arguments-differ
namespace_name=self.k8s_namespace.name,
deployment_id=self.deployment_id,
pod_monitoring_name=self.pod_monitoring_name,
pod_monitoring_port=self.DEFAULT_MONITORING_PORT,
pod_monitoring_port=self.DEFAULT_POD_MONITORING_PORT,
)

# We don't support for multiple client replicas at the moment.
Expand Down Expand Up @@ -236,13 +236,13 @@ def _xds_test_client_for_pod(
rpc_port, rpc_host = pf.local_port, pf.local_address
if self.deployment_args.enable_csm_observability:
pf = self._start_port_forwarding_pod(
pod, self.DEFAULT_MONITORING_PORT
pod, self.DEFAULT_POD_MONITORING_PORT
)
monitoring_port = pf.local_port
else:
rpc_port, rpc_host = self.stats_port, None
if self.deployment_args.enable_csm_observability:
monitoring_port = self.DEFAULT_MONITORING_PORT
monitoring_port = self.DEFAULT_POD_MONITORING_PORT

return client_app.XdsTestClient(
ip=pod.status.pod_ip,
Expand Down
43 changes: 29 additions & 14 deletions framework/test_app/runners/k8s/k8s_xds_server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
td_bootstrap_image: str
xds_server_uri: str
network: str
pod_monitoring: Optional[k8s.PodMonitoring] = None
pod_monitoring_name: Optional[str] = None

# Server Deployment args
deployment_args: ServerDeploymentArgs
Expand All @@ -80,6 +78,8 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
# Below is mutable state associated with the current run.
service: Optional[k8s.V1Service] = None
replica_count: int = 0
pod_monitoring: Optional[k8s.PodMonitoring] = None
pod_monitoring_name: Optional[str] = None

# A map from pod names to the server app.
pods_to_servers: dict[str, XdsTestServer]
Expand Down Expand Up @@ -261,6 +261,16 @@ def run( # pylint: disable=arguments-differ,too-many-branches
**self.deployment_args.as_dict(),
)

self._setup_csm_observability()

return self._make_servers_for_deployment(
replica_count,
test_port=test_port,
maintenance_port=maintenance_port,
secure_mode=secure_mode,
)

def _setup_csm_observability(self) -> None:
# Create a PodMonitoring resource if CSM Observability is enabled
# This is GMP (Google Managed Prometheus)
if self.deployment_args.enable_csm_observability:
Expand All @@ -270,16 +280,9 @@ def run( # pylint: disable=arguments-differ,too-many-branches
namespace_name=self.k8s_namespace.name,
deployment_id=self.deployment_id,
pod_monitoring_name=self.pod_monitoring_name,
pod_monitoring_port=self.DEFAULT_MONITORING_PORT,
pod_monitoring_port=self.DEFAULT_POD_MONITORING_PORT,
)

return self._make_servers_for_deployment(
replica_count,
test_port=test_port,
maintenance_port=maintenance_port,
secure_mode=secure_mode,
)

def _make_servers_for_deployment(
self,
replica_count,
Expand Down Expand Up @@ -337,15 +340,15 @@ def _xds_test_server_for_pod(
if self.debug_use_port_forwarding:
pf = self._start_port_forwarding_pod(pod, maintenance_port)
rpc_port, rpc_host = pf.local_port, pf.local_address
if self.deployment_args.enable_csm_observability:
if self.should_collect_logs_prometheus:
pf = self._start_port_forwarding_pod(
pod, self.DEFAULT_MONITORING_PORT
pod, self.DEFAULT_POD_MONITORING_PORT
)
monitoring_port = pf.local_port
else:
rpc_port, rpc_host = maintenance_port, None
if self.deployment_args.enable_csm_observability:
monitoring_port = self.DEFAULT_MONITORING_PORT
if self.should_collect_logs_prometheus:
monitoring_port = self.DEFAULT_POD_MONITORING_PORT

server = XdsTestServer(
ip=pod.status.pod_ip,
Expand All @@ -359,6 +362,13 @@ def _xds_test_server_for_pod(
self.pods_to_servers[pod.metadata.name] = server
return server

@property
def should_collect_logs_prometheus(self):
return (
self.deployment_args.enable_csm_observability
and self.should_collect_logs
)

@override
def stop_pod_dependencies(self, *, log_drain_sec: int = 0):
# Call pre-stop hook release if exists.
Expand Down Expand Up @@ -424,6 +434,11 @@ def cleanup(self, *, force=False, force_namespace=False):
)
self._delete_service_account(self.service_account_name)
self.service_account = None
# Pod monitoring name is only set when CSM observability is enabled.
if self.pod_monitoring_name and (self.pod_monitoring or force):
self._delete_pod_monitoring(self.pod_monitoring_name)
self.pod_monitoring = None
self.pod_monitoring_name = None
self._cleanup_namespace(force=(force_namespace and force))
finally:
self._stop()
Expand Down
2 changes: 1 addition & 1 deletion tests/app_net_csm_observability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ def test_csm_observability(self):
"Letting test client run for %d seconds to produce metric data",
TEST_RUN_SECS,
)
if self.server_runner.should_collect_logs:
if self.server_runner.should_collect_logs_prometheus:
self._sleep_and_ping_prometheus_endpoint(
test_server, test_client
)
Expand Down
3 changes: 1 addition & 2 deletions tests/gamma/csm_observability_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
ClientDeploymentArgs = k8s_xds_client_runner.ClientDeploymentArgs
KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
ServerDeploymentArgs = k8s_xds_server_runner.ServerDeploymentArgs
KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
BuildQueryFn = Callable[[str, str], str]
ANY = unittest.mock.ANY

Expand Down Expand Up @@ -231,7 +230,7 @@ def test_csm_observability(self):
"Letting test client run for %d seconds to produce metric data",
TEST_RUN_SECS,
)
if self.server_runner.should_collect_logs:
if self.server_runner.should_collect_logs_prometheus:
self._sleep_and_ping_prometheus_endpoint(
test_server, test_client
)
Expand Down

0 comments on commit 78ed0fe

Please sign in to comment.