Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add App Net based CSM Observability Test #86

Merged
merged 5 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .kokoro/psm_interop_kokoro_lib.sh
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ psm::csm::get_tests() {
"gamma.affinity_session_drain_test"
"gamma.csm_observability_test"
"app_net_ssa_test"
"app_net_csm_observability_test"
)
}

Expand Down
59 changes: 7 additions & 52 deletions framework/test_app/runners/k8s/gamma_server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""
Run xDS Test Client on Kubernetes using Gamma
"""
import dataclasses
import datetime
import logging
from typing import Final, Optional
Expand All @@ -30,26 +31,25 @@
logger = logging.getLogger(__name__)


ServerDeploymentArgs = k8s_xds_server_runner.ServerDeploymentArgs
KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner


@dataclasses.dataclass(frozen=True)
class ServerDeploymentArgs(k8s_xds_server_runner.ServerDeploymentArgs):
"""Gamma Server Deployment Args"""


class GammaServerRunner(KubernetesServerRunner):
# Mutable state.
route: Optional[k8s.GammaHttpRoute] = None
frontend_service: Optional[k8s.V1Service] = None
session_affinity_filter: Optional[k8s.GcpSessionAffinityFilter] = None
session_affinity_policy: Optional[k8s.GcpSessionAffinityPolicy] = None
backend_policy: Optional[k8s.GcpBackendPolicy] = None
pod_monitoring: Optional[k8s.PodMonitoring] = None
pod_monitoring_name: Optional[str] = None

route_kind: Final[RouteKind]
route_name: Final[str]
frontend_service_name: str
enable_csm_observability: bool
csm_workload_name: str
csm_canonical_service_name: str

SESSION_AFFINITY_FILTER_NAME: Final[str] = "ssa-filter"
SESSION_AFFINITY_POLICY_NAME: Final[str] = "ssa-policy"
Expand Down Expand Up @@ -80,9 +80,6 @@ def __init__(
namespace_template: Optional[str] = None,
debug_use_port_forwarding: bool = False,
enable_workload_identity: bool = True,
enable_csm_observability: bool = False,
csm_workload_name: str = "",
csm_canonical_service_name: str = "",
deployment_args: Optional[ServerDeploymentArgs] = None,
):
# pylint: disable=too-many-locals
Expand Down Expand Up @@ -111,9 +108,6 @@ def __init__(
)

self.frontend_service_name = frontend_service_name
self.enable_csm_observability = enable_csm_observability
self.csm_workload_name = csm_workload_name
self.csm_canonical_service_name = csm_canonical_service_name
self.route_kind = route_kind
self.route_name = f"route-{route_kind.value.lower()}-{deployment_name}"

Expand Down Expand Up @@ -214,24 +208,11 @@ def run( # pylint: disable=arguments-differ
maintenance_port=maintenance_port,
secure_mode=secure_mode,
bootstrap_version=bootstrap_version,
enable_csm_observability=self.enable_csm_observability,
generate_mesh_id=generate_mesh_id,
csm_workload_name=self.csm_workload_name,
csm_canonical_service_name=self.csm_canonical_service_name,
**self.deployment_args.as_dict(),
)

# Create a PodMonitoring resource if CSM Observability is enabled
# This is GMP (Google Managed Prometheus)
if self.enable_csm_observability:
self.pod_monitoring_name = f"{self.deployment_id}-gmp"
self.pod_monitoring = self._create_pod_monitoring(
"csm/pod-monitoring.yaml",
namespace_name=self.k8s_namespace.name,
deployment_id=self.deployment_id,
pod_monitoring_name=self.pod_monitoring_name,
pod_monitoring_port=self.DEFAULT_MONITORING_PORT,
)
self._setup_csm_observability()

servers = self._make_servers_for_deployment(
replica_count,
Expand Down Expand Up @@ -296,32 +277,6 @@ def create_backend_policy(
draining_timeout_sec=draining_timeout_sec,
)

def _xds_test_server_for_pod(
self,
pod: k8s.V1Pod,
*,
test_port: int = KubernetesServerRunner.DEFAULT_TEST_PORT,
maintenance_port: Optional[int] = None,
secure_mode: bool = False,
monitoring_port: Optional[int] = None,
) -> XdsTestServer:
if self.enable_csm_observability:
if self.debug_use_port_forwarding:
pf = self._start_port_forwarding_pod(
pod, self.DEFAULT_MONITORING_PORT
)
monitoring_port = pf.local_port
else:
monitoring_port = self.DEFAULT_MONITORING_PORT

return super()._xds_test_server_for_pod(
pod=pod,
test_port=test_port,
maintenance_port=maintenance_port,
secure_mode=secure_mode,
monitoring_port=monitoring_port,
)

@override
def cleanup(self, *, force=False, force_namespace=False):
try:
Expand Down
2 changes: 1 addition & 1 deletion framework/test_app/runners/k8s/k8s_base_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class KubernetesBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta):
# Pylint wants abstract classes to override abstract methods.
# pylint: disable=abstract-method

DEFAULT_MONITORING_PORT = 9464
DEFAULT_POD_MONITORING_PORT = 9464
TEMPLATE_DIR_NAME = "kubernetes-manifests"
TEMPLATE_DIR_RELATIVE_PATH = f"../../../../{TEMPLATE_DIR_NAME}"
ROLE_WORKLOAD_IDENTITY_USER = "roles/iam.workloadIdentityUser"
Expand Down
6 changes: 3 additions & 3 deletions framework/test_app/runners/k8s/k8s_xds_client_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def run( # pylint: disable=arguments-differ
namespace_name=self.k8s_namespace.name,
deployment_id=self.deployment_id,
pod_monitoring_name=self.pod_monitoring_name,
pod_monitoring_port=self.DEFAULT_MONITORING_PORT,
pod_monitoring_port=self.DEFAULT_POD_MONITORING_PORT,
)

# We don't support for multiple client replicas at the moment.
Expand Down Expand Up @@ -236,13 +236,13 @@ def _xds_test_client_for_pod(
rpc_port, rpc_host = pf.local_port, pf.local_address
if self.deployment_args.enable_csm_observability:
pf = self._start_port_forwarding_pod(
pod, self.DEFAULT_MONITORING_PORT
pod, self.DEFAULT_POD_MONITORING_PORT
)
monitoring_port = pf.local_port
else:
rpc_port, rpc_host = self.stats_port, None
if self.deployment_args.enable_csm_observability:
monitoring_port = self.DEFAULT_MONITORING_PORT
monitoring_port = self.DEFAULT_POD_MONITORING_PORT

return client_app.XdsTestClient(
ip=pod.status.pod_ip,
Expand Down
42 changes: 42 additions & 0 deletions framework/test_app/runners/k8s/k8s_xds_server_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,19 @@
class ServerDeploymentArgs:
pre_stop_hook: bool = False
termination_grace_period: dt.timedelta = dt.timedelta()
enable_csm_observability: bool = False
csm_workload_name: str = ""
csm_canonical_service_name: str = ""

def as_dict(self):
return {
"pre_stop_hook": self.pre_stop_hook,
"termination_grace_period_seconds": int(
self.termination_grace_period.total_seconds()
),
"enable_csm_observability": self.enable_csm_observability,
"csm_workload_name": self.csm_workload_name,
"csm_canonical_service_name": self.csm_canonical_service_name,
}


Expand Down Expand Up @@ -72,6 +78,8 @@ class KubernetesServerRunner(k8s_base_runner.KubernetesBaseRunner):
# Below is mutable state associated with the current run.
service: Optional[k8s.V1Service] = None
replica_count: int = 0
pod_monitoring: Optional[k8s.PodMonitoring] = None
pod_monitoring_name: Optional[str] = None

# A map from pod names to the server app.
pods_to_servers: dict[str, XdsTestServer]
Expand Down Expand Up @@ -253,13 +261,28 @@ def run( # pylint: disable=arguments-differ,too-many-branches
**self.deployment_args.as_dict(),
)

self._setup_csm_observability()

return self._make_servers_for_deployment(
replica_count,
test_port=test_port,
maintenance_port=maintenance_port,
secure_mode=secure_mode,
)

def _setup_csm_observability(self) -> None:
# Create a PodMonitoring resource if CSM Observability is enabled
# This is GMP (Google Managed Prometheus)
if self.deployment_args.enable_csm_observability:
self.pod_monitoring_name = f"{self.deployment_id}-gmp"
self.pod_monitoring = self._create_pod_monitoring(
"csm/pod-monitoring.yaml",
namespace_name=self.k8s_namespace.name,
deployment_id=self.deployment_id,
pod_monitoring_name=self.pod_monitoring_name,
pod_monitoring_port=self.DEFAULT_POD_MONITORING_PORT,
)

def _make_servers_for_deployment(
self,
replica_count,
Expand Down Expand Up @@ -317,8 +340,15 @@ def _xds_test_server_for_pod(
if self.debug_use_port_forwarding:
pf = self._start_port_forwarding_pod(pod, maintenance_port)
rpc_port, rpc_host = pf.local_port, pf.local_address
if self.should_collect_logs_prometheus:
pf = self._start_port_forwarding_pod(
pod, self.DEFAULT_POD_MONITORING_PORT
)
monitoring_port = pf.local_port
else:
rpc_port, rpc_host = maintenance_port, None
if self.should_collect_logs_prometheus:
monitoring_port = self.DEFAULT_POD_MONITORING_PORT

server = XdsTestServer(
ip=pod.status.pod_ip,
Expand All @@ -332,6 +362,13 @@ def _xds_test_server_for_pod(
self.pods_to_servers[pod.metadata.name] = server
return server

@property
def should_collect_logs_prometheus(self):
return (
self.deployment_args.enable_csm_observability
and self.should_collect_logs
)

@override
def stop_pod_dependencies(self, *, log_drain_sec: int = 0):
# Call pre-stop hook release if exists.
Expand Down Expand Up @@ -397,6 +434,11 @@ def cleanup(self, *, force=False, force_namespace=False):
)
self._delete_service_account(self.service_account_name)
self.service_account = None
# Pod monitoring name is only set when CSM observability is enabled.
if self.pod_monitoring_name and (self.pod_monitoring or force):
self._delete_pod_monitoring(self.pod_monitoring_name)
self.pod_monitoring = None
self.pod_monitoring_name = None
self._cleanup_namespace(force=(force_namespace and force))
finally:
self._stop()
Expand Down
Loading
Loading