From ad3d5caa39942aaf5711ac982d57fadffe655106 Mon Sep 17 00:00:00 2001 From: Sourabh Singh Date: Mon, 25 Nov 2024 11:02:22 +0000 Subject: [PATCH] cloudrun server test --- bin/lib/common.py | 19 ++- bin/run_test_server_c6n.py | 66 ++++++++ framework/infrastructure/c6n.py | 93 +++++++++++ framework/infrastructure/gcp/compute.py | 87 ++++++++++- .../infrastructure/gcp/network_services.py | 42 +++-- framework/infrastructure/traffic_director.py | 127 ++++++++++++++- .../test_app/runners/c6n/c6n_base_runner.py | 146 ++++++++++++++++++ .../runners/c6n/c6n_xds_server_runner.py | 106 +++++++++++++ framework/xds_c6n_testcase.py | 0 framework/xds_flags.py | 6 + framework/xds_k8s_flags.py | 12 ++ framework/xds_k8s_testcase.py | 102 ++++++++++++ .../client.cloudrun.deployment.yaml | 72 +++++++++ kubernetes-manifests/client.deployment.yaml | 2 +- tests/baseline_test_c6n_server.py | 65 ++++++++ tests/bootstrap_generator_test.py | 6 +- tests/url_map/affinity_test.py | 12 +- 17 files changed, 932 insertions(+), 31 deletions(-) create mode 100644 bin/run_test_server_c6n.py create mode 100644 framework/infrastructure/c6n.py create mode 100644 framework/test_app/runners/c6n/c6n_base_runner.py create mode 100644 framework/test_app/runners/c6n/c6n_xds_server_runner.py create mode 100644 framework/xds_c6n_testcase.py create mode 100644 kubernetes-manifests/client.cloudrun.deployment.yaml create mode 100644 tests/baseline_test_c6n_server.py diff --git a/bin/lib/common.py b/bin/lib/common.py index ebe6681e..6f755b5d 100644 --- a/bin/lib/common.py +++ b/bin/lib/common.py @@ -22,10 +22,12 @@ from framework import xds_flags from framework import xds_k8s_flags +from framework.infrastructure import c6n from framework.infrastructure import gcp from framework.infrastructure import k8s from framework.test_app import client_app from framework.test_app import server_app +from framework.test_app.runners.c6n import c6n_xds_server_runner from framework.test_app.runners.k8s import gamma_server_runner from framework.test_app.runners.k8s import k8s_xds_client_runner from framework.test_app.runners.k8s import k8s_xds_server_runner @@ -45,6 +47,7 @@ KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner GammaServerRunner = gamma_server_runner.GammaServerRunner +CloudRunServerRunner=c6n_xds_server_runner.CloudRunServerRunner _XdsTestServer = server_app.XdsTestServer _XdsTestClient = client_app.XdsTestClient @@ -59,6 +62,10 @@ def gcp_api_manager(): return gcp.api.GcpApiManager() +@functools.cache +def c6n_api_manager(): + return c6n.CloudRunApiManager() + def td_attrs(): return dict( gcp_api_manager=gcp_api_manager(), @@ -117,7 +124,6 @@ def make_server_namespace( ) return k8s.KubernetesNamespace(k8s_api_manager(), namespace_name) - def make_server_runner( namespace: k8s.KubernetesNamespace, *, @@ -155,6 +161,17 @@ def make_server_runner( return server_runner(namespace, **runner_kwargs) +def make_c6n_server_runner() -> CloudRunServerRunner: + # CloudRunServerRunner arguments. + runner_kwargs = dict( + project = xds_flags.PROJECT.value, + service_name = xds_flags.SERVER_NAME.value, + image_name = xds_k8s_flags.SERVER_IMAGE.value, + network = xds_flags.NETWORK.value, + region = xds_flags.REGION.value, + ) + server_runner = CloudRunServerRunner + return server_runner(**runner_kwargs) def _ensure_atexit(signum, frame): """Needed to handle signals or atexit handler won't be called.""" diff --git a/bin/run_test_server_c6n.py b/bin/run_test_server_c6n.py new file mode 100644 index 00000000..91d4243b --- /dev/null +++ b/bin/run_test_server_c6n.py @@ -0,0 +1,66 @@ +# Copyright 2020 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Run test xds server. + +Typical usage examples: + + # Help. + ./run.sh ./bin/run_test_server_c6n.py --help + + # Run modes. + ./run.sh ./bin/run_test_server_c6n.py --mode=app_net + ./run.sh ./bin/run_test_server_c6n.py --mode=secure + + # Gamma run mode: uses HTTPRoute by default. + ./run.sh ./bin/run_test_server_c6n.py --mode=gamma + + # Gamma run mode: use GRPCRoute. + ./run.sh ./bin/run_test_server_c6n.py --mode=gamma --gamma_route_kind=grpc + + # Running multipler server replicas. + ./run.sh ./bin/run_test_server_c6n.py --server_replica_count=3 + + # Cleanup: make sure to set the same mode used to create. + ./run.sh ./bin/run_test_server_c6n.py --mode=gamma --cmd=cleanup +""" +import logging + +from absl import app +from absl import flags + +from bin.lib import common +from framework import xds_flags +from framework import xds_k8s_flags + +logger = logging.getLogger(__name__) + +flags.adopt_module_key_flags(xds_flags) +flags.adopt_module_key_flags(xds_k8s_flags) +flags.adopt_module_key_flags(common) + + +def main(argv): + if len(argv) > 1: + raise app.UsageError("Too many command-line arguments.") + + xds_flags.set_socket_default_timeout_from_flag() + + run_kwargs = dict() + server_runner = common.make_c6n_server_runner() + server_runner.run(**run_kwargs) + + +if __name__ == "__main__": + app.run(main) diff --git a/framework/infrastructure/c6n.py b/framework/infrastructure/c6n.py new file mode 100644 index 00000000..cd063cdb --- /dev/null +++ b/framework/infrastructure/c6n.py @@ -0,0 +1,93 @@ +# Copyright 2024 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from google.cloud import run_v2 + +logger = logging.getLogger(__name__) + + +class CloudRunApiManager: + project: str + region: str + _parent: str + _client: run_v2.ServicesClient + _service: run_v2.Service + + def __init__(self, project: str, region: str): + if not project: + raise ValueError("Project ID cannot be empty or None.") + if not region: + raise ValueError("Region cannot be empty or None.") + + self.project = project + self.region = region + client_options = {"api_endpoint": f"{self.region}-run.googleapis.com"} + self._client = run_v2.ServicesClient(client_options=client_options) + self._parent = f"projects/{self.project}/locations/{self.region}" + self._service = None + + def deploy_service(self, service_name: str, image_name: str): + if not service_name: + raise ValueError("service_name cannot be empty or None") + if not image_name: + raise ValueError("image_name cannot be empty or None") + service_name = service_name[:49] + + service = run_v2.Service( + template=run_v2.RevisionTemplate( + containers=[ + run_v2.Container( + image=image_name, + ports=[ + run_v2.ContainerPort( + name="http1", + container_port=50051, + ), + ], + ) + ] + ), + ) + + request = run_v2.CreateServiceRequest( + parent=self._parent, service=service, service_id=service_name + ) + + try: + operation = self._client.create_service(request=request) + self._service = operation.result(timeout=300) + logger.info("Deployed service: %s", self._service.uri) + return self._service.uri + except Exception as e: + logger.exception("Error deploying service: %s", e) + raise + + def get_service_url(self): + if self._service is None: + raise RuntimeError("Cloud Run service not deployed yet.") + return self._service.uri + + def delete_service(self, service_name: str): + try: + request = run_v2.DeleteServiceRequest( + name=f"{self._parent}/services/{service_name}" + ) + operation = self._client.delete_service(request=request) + operation.result(timeout=300) + logger.info("Deleted service: %s", service_name) + except Exception as e: + logger.exception("Error deleting service: %s", e) + raise diff --git a/framework/infrastructure/gcp/compute.py b/framework/infrastructure/gcp/compute.py index d1f4f96c..643a81a9 100644 --- a/framework/infrastructure/gcp/compute.py +++ b/framework/infrastructure/gcp/compute.py @@ -158,7 +158,7 @@ def create_backend_service_traffic_director( body = { "name": name, "loadBalancingScheme": "INTERNAL_SELF_MANAGED", # Traffic Director - "healthChecks": [health_check.url], + # "healthChecks": [health_check.url], "protocol": protocol.name, } # If add dualstack support is specified True, config the backend service @@ -210,8 +210,8 @@ def backend_service_patch_backends( backend_list = [ { "group": backend.url, - "balancingMode": "RATE", - "maxRatePerEndpoint": max_rate_per_endpoint, + # "balancingMode": "CONNECTION", + # "maxRatePerEndpoint": max_rate_per_endpoint, } for backend in backends ] @@ -558,6 +558,85 @@ def get_backend_service_backend_health(self, backend_service, backend): .execute() ) + def create_serverless_neg( + self, name: str, region: str, service_name: str, network: str + ): + """Creates a serverless NEG. + + Args: + name: The name of the NEG. + region: The region in which to create the NEG. + service_name: The name of the Cloud Run service. Format: "namespaces/{namespace}/services/{service}" + network: The network of the NEG. Format: "projects/{project}/global/networks/{network}" + + Returns: + The NEG selfLink URL + """ + name = name + "-neg" + neg_body = { + "name": name, + "networkEndpointType": "SERVERLESS", + "cloudRun": {"service": service_name}, + } + + try: + logger.info("Creating serverless NEG %s in %s", name, region) + operation = ( + self.api.regionNetworkEndpointGroups() + .insert(project=self.project, region=region, body=neg_body) + .execute() + ) + neg = self.get_serverless_network_endpoint_group(name, region) + print(neg) + return neg + + except Exception as e: + logger.exception("Error creating serverless NEG: %s", e) + raise + + def delete_serverless_neg(self, name: str, zone: str): + """Deletes a serverless NEG. + + Args: + name: The name of the NEG to delete. + zone: The zone of the NEG. + """ + try: + logger.info("Deleting serverless NEG %s in %s", name, zone) + operation = ( + self.api.networkEndpointGroups() + .delete( + project=self.project, zone=zone, networkEndpointGroup=name + ) + .execute() + ) + self._wait( + operation["name"], self._WAIT_FOR_OPERATION_SEC + ) # Wait for operation completion + + except googleapiclient.errors.HttpError as error: + if error.resp.status == 404: # NEG not found + logger.debug( + "NEG %s not found in zone %s. Skipping deletion.", + name, + zone, + ) + return + logger.exception("Error deleting serverless NEG: %s", error) + raise + except Exception as e: + logger.exception("Error deleting serverless NEG: %s", e) + raise + + def get_serverless_network_endpoint_group(self, name, region): + neg = ( + self.api.regionNetworkEndpointGroups() + .get(project=self.project, networkEndpointGroup=name, region=region) + .execute() + ) + # TODO(sergiitk): dataclass + return neg + def _get_resource( self, collection: discovery.Resource, **kwargs ) -> "GcpResource": @@ -651,6 +730,8 @@ def _execute( # pylint: disable=arguments-differ ) request.headers[DEBUG_HEADER_KEY] = self.gfe_debug_header request.add_response_callback(self._log_debug_header) + logger.info("Executing request: %s", request) + logger.info(request.to_json()) operation = request.execute(num_retries=self._GCP_API_RETRIES) logger.debug("Operation %s", operation) return self._wait(operation["name"], timeout_sec) diff --git a/framework/infrastructure/gcp/network_services.py b/framework/infrastructure/gcp/network_services.py index 86d4a97a..09132536 100644 --- a/framework/infrastructure/gcp/network_services.py +++ b/framework/infrastructure/gcp/network_services.py @@ -112,14 +112,19 @@ class RouteMatch: @classmethod def from_response(cls, d: Dict[str, Any]) -> "GrpcRoute.RouteMatch": return cls( - method=GrpcRoute.MethodMatch.from_response(d["method"]) - if "method" in d - else None, - headers=tuple( - GrpcRoute.HeaderMatch.from_response(h) for h in d["headers"] - ) - if "headers" in d - else (), + method=( + GrpcRoute.MethodMatch.from_response(d["method"]) + if "method" in d + else None + ), + headers=( + tuple( + GrpcRoute.HeaderMatch.from_response(h) + for h in d["headers"] + ) + if "headers" in d + else () + ), ) @dataclasses.dataclass(frozen=True) @@ -224,14 +229,19 @@ class RouteMatch: @classmethod def from_response(cls, d: Dict[str, Any]) -> "HttpRoute.RouteMatch": return cls( - method=HttpRoute.MethodMatch.from_response(d["method"]) - if "method" in d - else None, - headers=tuple( - HttpRoute.HeaderMatch.from_response(h) for h in d["headers"] - ) - if "headers" in d - else (), + method=( + HttpRoute.MethodMatch.from_response(d["method"]) + if "method" in d + else None + ), + headers=( + tuple( + HttpRoute.HeaderMatch.from_response(h) + for h in d["headers"] + ) + if "headers" in d + else () + ), ) @dataclasses.dataclass(frozen=True) diff --git a/framework/infrastructure/traffic_director.py b/framework/infrastructure/traffic_director.py index 0004ad9d..74b76bcb 100644 --- a/framework/infrastructure/traffic_director.py +++ b/framework/infrastructure/traffic_director.py @@ -14,7 +14,8 @@ import functools import logging import random -from typing import Any, Dict, Final, List, Optional +import re +from typing import Any, Dict, Final, List, Optional, Sequence import googleapiclient.errors from typing_extensions import TypeAlias @@ -263,6 +264,8 @@ def create_backend_service( ) self.backend_service = resource self.backend_service_protocol = protocol + logger.info("backend service protocal") + logger.info(self.backend_service_protocol) def load_backend_service(self): name = self.make_resource_name(self.BACKEND_SERVICE_NAME) @@ -1282,3 +1285,125 @@ def _get_certificate_provider(cls): "pluginInstance": cls.CERTIFICATE_PROVIDER_INSTANCE, }, } + + +class TrafficDirectorCloudRunManager(TrafficDirectorAppNetManager): + MESH_NAME = "grpc-mesh" + SERVER_TLS_POLICY_NAME = "server-tls-policy" + CLIENT_TLS_POLICY_NAME = "client-tls-policy" + AUTHZ_POLICY_NAME = "authz-policy" + ENDPOINT_POLICY = "endpoint-policy" + CERTIFICATE_PROVIDER_INSTANCE = "google_cloud_private_spiffe" + + netsec: _NetworkSecurityV1Beta1 + netsvc: _NetworkServicesV1Beta1 + + def __init__( + self, + gcp_api_manager: gcp.api.GcpApiManager, + project: str, + *, + resource_prefix: str, + resource_suffix: Optional[str] = None, + network: str = "default", + compute_api_version: str = "v1", + enable_dualstack: bool = False, + ): + super().__init__( + gcp_api_manager, + project, + resource_prefix=resource_prefix, + resource_suffix=resource_suffix, + network=network, + compute_api_version=compute_api_version, + enable_dualstack=enable_dualstack, + ) + + # API + self.netsvc = gcp.network_services.NetworkServicesV1( + gcp_api_manager, project + ) + + # Managed resources + # TODO(gnossen) PTAL at the pylint error + self.grpc_route: Optional[GrpcRoute] = None + self.http_route: Optional[HttpRoute] = None + self.mesh: Optional[Mesh] = None + + # Managed resources + self.server_tls_policy: Optional[ServerTlsPolicy] = None + self.client_tls_policy: Optional[ClientTlsPolicy] = None + self.authz_policy: Optional[AuthorizationPolicy] = None + self.endpoint_policy: Optional[EndpointPolicy] = None + + def backend_service_add_backends( + self, + backends: Sequence[str], + region: Optional[str] = None, + balancing_mode: str = "CONNECTION", + max_rate_per_endpoint: Optional[int] = None, + capacity_scaler: float = 1.0, + *, + circuit_breakers: Optional[dict[str, int]] = None, + ): + + new_backends = [] + for backend in backends: + new_backend = { + "group": backend, + "balancingMode": balancing_mode, + "maxRatePerEndpoint": max_rate_per_endpoint, + "capacityScaler": capacity_scaler, + } + + if circuit_breakers is not None: + new_backend["circuitBreakers"] = circuit_breakers + + new_backends.append(new_backend) + + # self.backends.update(new_backends) + backend_service = self.backend_service + + logging.info( + "Adding backends to Backend Service %s: %r", + backend_service.name, + new_backends, + ) + + self.compute.backend_service_patch_backends( + backend_service, + backends, + ) + + def create_grpc_route(self, src_host: str, src_port: int): + hostname = src_host + route_name = f"{self.resource_prefix}-grpc-route-{self.resource_suffix}" + logger.info("Creating gRPC route: %s", route_name) + + try: + backend_service_url = self.backend_service.url.split("v1/")[-1] + # URL of the backend service + route_body = { + "name": route_name, + "hostnames": [hostname], + "rules": [ + { + "matches": [], # Empty matches means any rpc + "action": { + "destinations": [ + {"serviceName": backend_service_url} + ] + }, + } + ], + } + if hasattr(self, "mesh") and self.mesh: + route_body["meshes"] = self.mesh.url + + resource = self.netsvc.create_grpc_route(route_name, route_body) + self.grpc_route = self.netsvc.get_grpc_route(route_name) + logger.info("gRPC Route created successfully: %s", self.grpc_route) + + except Exception as e: # Catching generic exceptions for now + logger.exception("Error creating gRPC route: %s", e) + raise diff --git a/framework/test_app/runners/c6n/c6n_base_runner.py b/framework/test_app/runners/c6n/c6n_base_runner.py new file mode 100644 index 00000000..95bb2cc2 --- /dev/null +++ b/framework/test_app/runners/c6n/c6n_base_runner.py @@ -0,0 +1,146 @@ +# Copyright 2022 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Common functionality for running xDS Test Client and Server on CloudRun. +""" +from abc import ABCMeta +import collections +import dataclasses +import datetime as dt +import logging +from typing import Optional + +import framework +from framework.infrastructure import c6n +from framework.test_app.runners import base_runner + +logger = logging.getLogger(__name__) + +_RunnerError = base_runner.RunnerError +_HighlighterYaml = framework.helpers.highlighter.HighlighterYaml +_helper_datetime = framework.helpers.datetime +_datetime = dt.datetime +_timedelta = dt.timedelta + + +@dataclasses.dataclass(frozen=True) +class RunHistory: + revision_id: str + time_start_requested: _datetime + time_start_completed: Optional[_datetime] + time_stopped: _datetime + + +class CloudRunBaseRunner(base_runner.BaseRunner, metaclass=ABCMeta): + """Runs xDS Test Client/Server on Cloud Run.""" + + project: str + service_name: str + image_name: str + network: Optional[str] = None + tag: str = "latest" + region: Optional[str] = None + _current_revision: Optional[str] = None + gcp_project: Optional[str] = None + gcp_ui_url: Optional[str] = None + + run_history: collections.deque[RunHistory] + + time_start_requested: Optional[dt.datetime] = None + time_start_completed: Optional[dt.datetime] = None + time_stopped: Optional[dt.datetime] = None + + def __init__( + self, + project: str, + service_name: str, + image_name: str, + network: Optional[str] = None, + region: Optional[str] = None, + ) -> None: + super().__init__() + + self.project = project + self.service_name = service_name + self.image_name = image_name + self.network = network + self.region = region + self.current_revision = None + self.gcp_project = None + self.gcp_ui_url = None + self.run_history = collections.deque() + + # Persistent across many runs. + self.run_history = collections.deque() + + # Mutable state associated with each run. + self._reset_state() + + # Highlighter. + self._highlighter = _HighlighterYaml() + + self._initalize_cloudrun_api_manager() + + def _initalize_cloudrun_api_manager(self): + """Initializes the CloudRunApiManager.""" + self.cloudrun_api_manager = c6n.CloudRunApiManager( + project=self.project, region=self.region + ) + + def run(self, **kwargs): + if self.time_start_requested and not self.time_stopped: + if self.time_start_completed: + raise RuntimeError( + f"Service {self.service_name}: has already been started " + f"at {self.time_start_completed.isoformat()}" + ) + raise RuntimeError( + f"Service {self.service_name}: start has already been " + f"requested at {self.time_start_requested.isoformat()}" + ) + + self._reset_state() + self.time_start_requested = dt.datetime.now(tz=dt.timezone.utc) + self._current_revision = self.cloudrun_api_manager.deploy_service( + self.service_name, self.image_name + ) + + def _start_completed(self): + self.time_start_completed = dt.datetime.now(tz=dt.timezone.utc) + + def _stop(self): + self.time_stopped = dt.datetime.now(tz=dt.timezone.utc) + if self.time_start_requested: + run_history = RunHistory( + revision_id=self._current_revision, + time_start_requested=self.time_start_requested, + time_start_completed=self.time_start_completed, + time_stopped=self.time_stopped, + ) + self.run_history.append(run_history) + + @classmethod + def _get_workload_identity_member_name( + cls, project, namespace_name, service_account_name + ): + """ + Returns workload identity member name used to authenticate Kubernetes + service accounts. + + https://cloud.google.com/kubernetes-engine/docs/how-to/workload-identity + """ + return ( + f"serviceAccount:{project}.svc.id.goog" + f"[{namespace_name}/{service_account_name}]" + ) diff --git a/framework/test_app/runners/c6n/c6n_xds_server_runner.py b/framework/test_app/runners/c6n/c6n_xds_server_runner.py new file mode 100644 index 00000000..5fece4ce --- /dev/null +++ b/framework/test_app/runners/c6n/c6n_xds_server_runner.py @@ -0,0 +1,106 @@ +# Copyright 2022 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Run xDS Test Client on Kubernetes. +""" +import dataclasses +import logging +from typing import List, Optional +import uuid + +from absl import flags +from absl import logging +from typing_extensions import override + +from framework import xds_flags +from framework import xds_k8s_flags +from framework.infrastructure import c6n +from framework.test_app.runners.c6n import c6n_base_runner +from framework.test_app.server_app import XdsTestServer + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass(frozen=True) +class CloudRunDeploymentArgs: + """Arguments for deploying a server to Cloud Run.""" + + env_vars: dict = dataclasses.field(default_factory=dict) + max_instances: int = 10 # Example: Maximum number of instances + min_instances: int = 0 # Example: Minimum number of instances + service_account_email: str = "" # Email address of the service account + timeout_seconds: int = 300 # Timeout for requests + revision_suffix: Optional[str] = None + + def as_dict(self): + return { + "env_vars": self.env_vars, + "max_instances": self.max_instances, + "min_instances": self.min_instances, + "service_account_email": self.service_account_email, + "timeout_seconds": self.timeout_seconds, + } + + +class CloudRunServerRunner(c6n_base_runner.CloudRunBaseRunner): + """Manages xDS Test Servers running on Cloud Run.""" + + def __init__( + self, + project: str, + service_name: str, + image_name: str, + network: str, + region: str, + ): + super().__init__( + project, + service_name, + image_name, + network=network, + region=region, + ) + # Mutable state associated with each run. + self._reset_state() + + @override + def _reset_state(self): + super()._reset_state() + self.service = None + self.pods_to_servers = {} + self.replica_count = 0 + + @override + def run(self, **kwargs) -> List[XdsTestServer]: + """Deploys and manages the xDS Test Server on Cloud Run.""" + logger.info(self.service_name) + logger.info(self.image_name) + deployed_service_url = self.cloudrun_api_manager.deploy_service( + service_name=self.service_name, + image_name=self.image_name, + ) + servers = [ + XdsTestServer( + ip="0.0.0.0", rpc_port=0, hostname=deployed_service_url + ) + ] + self.servers = servers # Add servers to the list + return servers + + def get_service_url(self): + return self.cloudrun_api_manager.get_service_url() + + @override + def cleanup(self, *, force=False): + return super().cleanup(force=force) diff --git a/framework/xds_c6n_testcase.py b/framework/xds_c6n_testcase.py new file mode 100644 index 00000000..e69de29b diff --git a/framework/xds_flags.py b/framework/xds_flags.py index 3960b27f..4656593d 100644 --- a/framework/xds_flags.py +++ b/framework/xds_flags.py @@ -196,6 +196,12 @@ help="Enable support for Dual Stack resources to the framework.", ) +REGION = flags.DEFINE_string( + "region", + default="us-central1", + help="The region for deployment", +) + def set_socket_default_timeout_from_flag() -> None: """A helper to configure default socket timeout from a flag. diff --git a/framework/xds_k8s_flags.py b/framework/xds_k8s_flags.py index 8070642b..ef729c37 100644 --- a/framework/xds_k8s_flags.py +++ b/framework/xds_k8s_flags.py @@ -33,6 +33,18 @@ help="Traffic Director gRPC Bootstrap Docker image", ) +CLOUDRUN_SERVER_IMAGE = flags.DEFINE_string( + "cloudrun_server_image", + default=None, + help="Cloudrun Server Docker image name", +) + +CLOUDRUN_CLIENT_IMAGE = flags.DEFINE_string( + "cloudrun_client_image", + default=None, + help="Cloudrun Client Docker image name", +) + # Test app SERVER_IMAGE = flags.DEFINE_string( "server_image", default=None, help="Server Docker image name" diff --git a/framework/xds_k8s_testcase.py b/framework/xds_k8s_testcase.py index c302af89..0a9688c4 100644 --- a/framework/xds_k8s_testcase.py +++ b/framework/xds_k8s_testcase.py @@ -39,11 +39,13 @@ from framework.infrastructure import gcp from framework.infrastructure import k8s from framework.infrastructure import traffic_director +from framework.infrastructure.gcp.compute import ComputeV1 from framework.rpc import grpc_channelz from framework.rpc import grpc_csds from framework.rpc import grpc_testing from framework.test_app import client_app from framework.test_app import server_app +from framework.test_app.runners.c6n import c6n_xds_server_runner from framework.test_app.runners.k8s import k8s_xds_client_runner from framework.test_app.runners.k8s import k8s_xds_server_runner from framework.test_cases import base_testcase @@ -63,6 +65,8 @@ TrafficDirectorManager = traffic_director.TrafficDirectorManager TrafficDirectorAppNetManager = traffic_director.TrafficDirectorAppNetManager TrafficDirectorSecureManager = traffic_director.TrafficDirectorSecureManager +CloudRunServerRunner = c6n_xds_server_runner.CloudRunServerRunner +TrafficDirectorCloudRunManager = traffic_director.TrafficDirectorCloudRunManager XdsTestServer = server_app.XdsTestServer XdsTestClient = client_app.XdsTestClient ClientDeploymentArgs = k8s_xds_client_runner.ClientDeploymentArgs @@ -198,6 +202,7 @@ def setUpClass(cls): cls.resource_suffix = xds_flags.RESOURCE_SUFFIX.value # Test server + cls.cloudrun_server_image = xds_k8s_flags.CLOUDRUN_SERVER_IMAGE.value cls.server_image = xds_k8s_flags.SERVER_IMAGE.value cls.server_name = xds_flags.SERVER_NAME.value cls.server_port = xds_flags.SERVER_PORT.value @@ -206,6 +211,7 @@ def setUpClass(cls): cls.server_xds_port = xds_flags.SERVER_XDS_PORT.value # Test client + cls.cloudrun_client_image = xds_k8s_flags.CLOUDRUN_CLIENT_IMAGE.value cls.client_image = xds_k8s_flags.CLIENT_IMAGE.value cls.client_name = xds_flags.CLIENT_NAME.value cls.client_port = xds_flags.CLIENT_PORT.value @@ -1380,3 +1386,99 @@ def debug_cert(cert): return "missing" sha1 = hashlib.sha1(cert) return f"sha1={sha1.hexdigest()}, len={len(cert)}" + + +class CloudRunXdsKubernetesTestCase(RegularXdsKubernetesTestCase): + server_runner: CloudRunServerRunner + td: TrafficDirectorCloudRunManager + region: str + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.compute = cls.gcp_api_manager.compute(cls.compute_api_version) + cls.compute_v1 = ComputeV1( + cls.gcp_api_manager, cls.project, version=cls.compute_api_version + ) + cls.region = xds_flags.REGION.value + + def initTrafficDirectorManager(self) -> TrafficDirectorManager: + return TrafficDirectorCloudRunManager( + self.gcp_api_manager, + project=self.project, + resource_prefix=self.resource_prefix, + resource_suffix=self.resource_suffix, + network=self.network, + compute_api_version=self.compute_api_version, + enable_dualstack=self.enable_dualstack, + ) + + def initKubernetesClientRunner(self, **kwargs) -> KubernetesClientRunner: + return KubernetesClientRunner( + k8s.KubernetesNamespace( + self.k8s_api_manager, self.client_namespace + ), + deployment_name=self.client_name, + deployment_template="client.cloudrun.deployment.yaml", + image_name=self.cloudrun_client_image, + td_bootstrap_image=self.td_bootstrap_image, + gcp_project=self.project, + gcp_api_manager=self.gcp_api_manager, + gcp_service_account=self.gcp_service_account, + xds_server_uri=self.xds_server_uri, + network=self.network, + debug_use_port_forwarding=self.debug_use_port_forwarding, + enable_workload_identity=self.enable_workload_identity, + stats_port=self.client_port, + reuse_namespace=self.server_namespace == self.client_namespace, + **kwargs, + ) + + def setupServerBackends( + self, + *, + wait_for_healthy_status=True, + server_runner=None, + max_rate_per_endpoint: Optional[int] = None, + ): + if server_runner is None: + server_runner = self.server_runner + service_url = server_runner.get_service_url() + self.td.backend_service_add_backends([service_url]) + + def startTestServers( + self, server_runner=None, **kwargs + ) -> List[XdsTestServer]: + logger.info(self.cloudrun_server_image) + if server_runner is None: + self.server_runner = CloudRunServerRunner( + project=self.project, + service_name=self.server_namespace, + image_name=self.cloudrun_server_image, + network=self.network, + region=self.region, + ) + test_servers = self.server_runner.run() + for test_server in test_servers: + test_server.set_xds_address( + self.server_xds_host, self.server_xds_port + ) + return test_servers + + def startTestClient( + self, test_server: XdsTestServer, **kwargs + ) -> XdsTestClient: + return self._start_test_client(test_server.xds_uri, **kwargs) + + def backend_service_add_serverless_neg_backends(self): + logger.info("Creating serverless NEG") + neg = self.compute_v1.create_serverless_neg( + self.server_namespace, + self.region, + self.server_namespace, + "serverless", + ) + return neg + + def tearDown(self): + logger.info("----- TestMethod %s teardown -----", self.test_name) diff --git a/kubernetes-manifests/client.cloudrun.deployment.yaml b/kubernetes-manifests/client.cloudrun.deployment.yaml new file mode 100644 index 00000000..87a28c95 --- /dev/null +++ b/kubernetes-manifests/client.cloudrun.deployment.yaml @@ -0,0 +1,72 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: psm-grpc-client + labels: + app: psm-grpc-client +spec: + replicas: 1 + selector: + matchLabels: + app: psm-grpc-client + template: + metadata: + labels: + app: psm-grpc-client + spec: + containers: + - name: psm-grpc-client + image: docker.io/grpc/xds-example-cpp-client:v1.69.x + imagePullPolicy: Always + args: + - "--xds_creds" + - "--target=xds:///psm-grpc-server" + env: + - name: GRPC_TRACE + value: "http1" + - name: GRPC_XDS_BOOTSTRAP + value: "/tmp/grpc-xds/td-grpc-bootstrap.json" + - name: GRPC_EXPERIMENTAL_XDS_AUTHORITY_REWRITE + value: "true" + - name: GRPC_EXPERIMENTAL_XDS_SYSTEM_ROOT_CERTS + value: "true" + - name: GRPC_EXPERIMENTAL_XDS_GCP_AUTHENTICATION_FILTER + value: "true" + + - name: CSM_WORKLOAD_NAME + value: psm-grpc-client + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name + - name: NAMESPACE_NAME + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: CONTAINER_NAME + value: psm-grpc-client + - name: OTEL_RESOURCE_ATTRIBUTES + value: k8s.pod.name=$(POD_NAME),k8s.namespace.name=$(NAMESPACE_NAME),k8s.container.name=$(CONTAINER_NAME) + volumeMounts: + - mountPath: /tmp/grpc-xds/ + name: grpc-td-conf + readOnly: true + initContainers: + - name: grpc-td-init + image: gcr.io/trafficdirector-prod/td-grpc-bootstrap:0.18.0 + imagePullPolicy: Always + args: + - "--output=/tmp/bootstrap/td-grpc-bootstrap.json" + - "--vpc-network-name=default" + - "--xds-server-uri=trafficdirector.googleapis.com:443" + - "--config-mesh=${config_mesh}" + - "--is-trusted-xds-server-experimental=true" + volumeMounts: + - mountPath: /tmp/bootstrap/ + name: grpc-td-conf + volumes: + - name: grpc-td-conf + emptyDir: + medium: Memory +... \ No newline at end of file diff --git a/kubernetes-manifests/client.deployment.yaml b/kubernetes-manifests/client.deployment.yaml index 09a18e43..67ef56d2 100644 --- a/kubernetes-manifests/client.deployment.yaml +++ b/kubernetes-manifests/client.deployment.yaml @@ -36,7 +36,7 @@ spec: ## driver waiting for the container to start. failureThreshold: 1000 args: - - "--server=${server_target}" + - "--server=psm-grpc-server:8080" - "--stats_port=${stats_port}" - "--qps=${qps}" - "--rpc=${rpc}" diff --git a/tests/baseline_test_c6n_server.py b/tests/baseline_test_c6n_server.py new file mode 100644 index 00000000..466101c5 --- /dev/null +++ b/tests/baseline_test_c6n_server.py @@ -0,0 +1,65 @@ +# Copyright 2025 gRPC authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from absl.testing import absltest + +from framework import xds_k8s_testcase + +logger = logging.getLogger(__name__) + +_XdsTestServer = xds_k8s_testcase.XdsTestServer +_XdsTestClient = xds_k8s_testcase.XdsTestClient + + +class BaselineTest(xds_k8s_testcase.CloudRunXdsKubernetesTestCase): + def test_traffic_director_grpc_setup(self): + with self.subTest("0_create_mesh"): + self.td.create_mesh() + + with self.subTest("1_start_cloudrun_test_server"): + test_server: _XdsTestServer = self.startTestServers()[0] + + with self.subTest("2_create_serverless_neg"): + neg = self.backend_service_add_serverless_neg_backends() + + with self.subTest("3_create_backend_service"): + self.td.create_backend_service( + protocol=self.compute_v1.BackendServiceProtocol.HTTP2 + ) + + with self.subTest("4_add_server_backends_to_backend_service"): + neg_resource = self.compute_v1.GcpResource( + name=neg["name"], + url=neg["selfLink"], + ) + self.td.backend_service_add_backends([neg_resource], self.region) + + with self.subTest("5_create_grpc_route"): + self.td.create_grpc_route(self.server_xds_host, self.server_port) + + with self.subTest("7_start_test_client"): + test_client: _XdsTestClient = self.startTestClient( + test_server, config_mesh=self.td.mesh.name + ) + + with self.subTest("8_test_client_xds_config_exists"): + self.assertXdsConfigExists(test_client) + + with self.subTest("9_test_server_received_rpcs_from_test_client"): + self.assertSuccessfulRpcs(test_client) + + +if __name__ == "__main__": + absltest.main(failfast=True) diff --git a/tests/bootstrap_generator_test.py b/tests/bootstrap_generator_test.py index 70fe4c4b..471dde8f 100644 --- a/tests/bootstrap_generator_test.py +++ b/tests/bootstrap_generator_test.py @@ -38,9 +38,9 @@ # Constants GCR_PROD: Final[str] = "gcr.io/trafficdirector-prod/td-grpc-bootstrap" -GCR_TESTING: Final[ - str -] = "us-docker.pkg.dev/grpc-testing/trafficdirector/td-grpc-bootstrap" +GCR_TESTING: Final[str] = ( + "us-docker.pkg.dev/grpc-testing/trafficdirector/td-grpc-bootstrap" +) # Returns a list of bootstrap generator versions to be tested along with their diff --git a/tests/url_map/affinity_test.py b/tests/url_map/affinity_test.py index 2bd93405..e3e2102c 100644 --- a/tests/url_map/affinity_test.py +++ b/tests/url_map/affinity_test.py @@ -101,9 +101,9 @@ def url_map_change( host_rule: HostRule, path_matcher: PathMatcher ) -> Tuple[HostRule, PathMatcher]: # Update default service to the affinity service. - path_matcher[ - "defaultService" - ] = GcpResourceManager().affinity_backend_service() + path_matcher["defaultService"] = ( + GcpResourceManager().affinity_backend_service() + ) return host_rule, path_matcher def xds_config_validate(self, xds_config: grpc_csds.DumpedXdsConfig): @@ -176,9 +176,9 @@ def url_map_change( host_rule: HostRule, path_matcher: PathMatcher ) -> Tuple[HostRule, PathMatcher]: # Update default service to the affinity service. - path_matcher[ - "defaultService" - ] = GcpResourceManager().affinity_backend_service() + path_matcher["defaultService"] = ( + GcpResourceManager().affinity_backend_service() + ) return host_rule, path_matcher def xds_config_validate(self, xds_config: grpc_csds.DumpedXdsConfig):