Skip to content

Commit

Permalink
cloudrun server test
Browse files Browse the repository at this point in the history
  • Loading branch information
sourabhsinghs committed Jan 28, 2025
1 parent a7ffef9 commit ad3d5ca
Show file tree
Hide file tree
Showing 17 changed files with 932 additions and 31 deletions.
19 changes: 18 additions & 1 deletion bin/lib/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@

from framework import xds_flags
from framework import xds_k8s_flags
from framework.infrastructure import c6n
from framework.infrastructure import gcp
from framework.infrastructure import k8s
from framework.test_app import client_app
from framework.test_app import server_app
from framework.test_app.runners.c6n import c6n_xds_server_runner
from framework.test_app.runners.k8s import gamma_server_runner
from framework.test_app.runners.k8s import k8s_xds_client_runner
from framework.test_app.runners.k8s import k8s_xds_server_runner
Expand All @@ -45,6 +47,7 @@
KubernetesClientRunner = k8s_xds_client_runner.KubernetesClientRunner
KubernetesServerRunner = k8s_xds_server_runner.KubernetesServerRunner
GammaServerRunner = gamma_server_runner.GammaServerRunner
CloudRunServerRunner=c6n_xds_server_runner.CloudRunServerRunner
_XdsTestServer = server_app.XdsTestServer
_XdsTestClient = client_app.XdsTestClient

Expand All @@ -59,6 +62,10 @@ def gcp_api_manager():
return gcp.api.GcpApiManager()


@functools.cache
def c6n_api_manager():
return c6n.CloudRunApiManager()

def td_attrs():
return dict(
gcp_api_manager=gcp_api_manager(),
Expand Down Expand Up @@ -117,7 +124,6 @@ def make_server_namespace(
)
return k8s.KubernetesNamespace(k8s_api_manager(), namespace_name)


def make_server_runner(
namespace: k8s.KubernetesNamespace,
*,
Expand Down Expand Up @@ -155,6 +161,17 @@ def make_server_runner(

return server_runner(namespace, **runner_kwargs)

def make_c6n_server_runner() -> CloudRunServerRunner:
# CloudRunServerRunner arguments.
runner_kwargs = dict(
project = xds_flags.PROJECT.value,
service_name = xds_flags.SERVER_NAME.value,
image_name = xds_k8s_flags.SERVER_IMAGE.value,
network = xds_flags.NETWORK.value,
region = xds_flags.REGION.value,
)
server_runner = CloudRunServerRunner
return server_runner(**runner_kwargs)

def _ensure_atexit(signum, frame):
"""Needed to handle signals or atexit handler won't be called."""
Expand Down
66 changes: 66 additions & 0 deletions bin/run_test_server_c6n.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Copyright 2020 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Run test xds server.
Typical usage examples:
# Help.
./run.sh ./bin/run_test_server_c6n.py --help
# Run modes.
./run.sh ./bin/run_test_server_c6n.py --mode=app_net
./run.sh ./bin/run_test_server_c6n.py --mode=secure
# Gamma run mode: uses HTTPRoute by default.
./run.sh ./bin/run_test_server_c6n.py --mode=gamma
# Gamma run mode: use GRPCRoute.
./run.sh ./bin/run_test_server_c6n.py --mode=gamma --gamma_route_kind=grpc
# Running multipler server replicas.
./run.sh ./bin/run_test_server_c6n.py --server_replica_count=3
# Cleanup: make sure to set the same mode used to create.
./run.sh ./bin/run_test_server_c6n.py --mode=gamma --cmd=cleanup
"""
import logging

from absl import app
from absl import flags

from bin.lib import common
from framework import xds_flags
from framework import xds_k8s_flags

logger = logging.getLogger(__name__)

flags.adopt_module_key_flags(xds_flags)
flags.adopt_module_key_flags(xds_k8s_flags)
flags.adopt_module_key_flags(common)


def main(argv):
if len(argv) > 1:
raise app.UsageError("Too many command-line arguments.")

xds_flags.set_socket_default_timeout_from_flag()

run_kwargs = dict()
server_runner = common.make_c6n_server_runner()
server_runner.run(**run_kwargs)


if __name__ == "__main__":
app.run(main)
93 changes: 93 additions & 0 deletions framework/infrastructure/c6n.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# Copyright 2024 gRPC authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging

from google.cloud import run_v2

logger = logging.getLogger(__name__)


class CloudRunApiManager:
project: str
region: str
_parent: str
_client: run_v2.ServicesClient
_service: run_v2.Service

def __init__(self, project: str, region: str):
if not project:
raise ValueError("Project ID cannot be empty or None.")
if not region:
raise ValueError("Region cannot be empty or None.")

self.project = project
self.region = region
client_options = {"api_endpoint": f"{self.region}-run.googleapis.com"}
self._client = run_v2.ServicesClient(client_options=client_options)
self._parent = f"projects/{self.project}/locations/{self.region}"
self._service = None

def deploy_service(self, service_name: str, image_name: str):
if not service_name:
raise ValueError("service_name cannot be empty or None")
if not image_name:
raise ValueError("image_name cannot be empty or None")
service_name = service_name[:49]

service = run_v2.Service(
template=run_v2.RevisionTemplate(
containers=[
run_v2.Container(
image=image_name,
ports=[
run_v2.ContainerPort(
name="http1",
container_port=50051,
),
],
)
]
),
)

request = run_v2.CreateServiceRequest(
parent=self._parent, service=service, service_id=service_name
)

try:
operation = self._client.create_service(request=request)
self._service = operation.result(timeout=300)
logger.info("Deployed service: %s", self._service.uri)
return self._service.uri
except Exception as e:
logger.exception("Error deploying service: %s", e)
raise

def get_service_url(self):
if self._service is None:
raise RuntimeError("Cloud Run service not deployed yet.")
return self._service.uri

def delete_service(self, service_name: str):
try:
request = run_v2.DeleteServiceRequest(
name=f"{self._parent}/services/{service_name}"
)
operation = self._client.delete_service(request=request)
operation.result(timeout=300)
logger.info("Deleted service: %s", service_name)
except Exception as e:
logger.exception("Error deleting service: %s", e)
raise
87 changes: 84 additions & 3 deletions framework/infrastructure/gcp/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ def create_backend_service_traffic_director(
body = {
"name": name,
"loadBalancingScheme": "INTERNAL_SELF_MANAGED", # Traffic Director
"healthChecks": [health_check.url],
# "healthChecks": [health_check.url],
"protocol": protocol.name,
}
# If add dualstack support is specified True, config the backend service
Expand Down Expand Up @@ -210,8 +210,8 @@ def backend_service_patch_backends(
backend_list = [
{
"group": backend.url,
"balancingMode": "RATE",
"maxRatePerEndpoint": max_rate_per_endpoint,
# "balancingMode": "CONNECTION",
# "maxRatePerEndpoint": max_rate_per_endpoint,
}
for backend in backends
]
Expand Down Expand Up @@ -558,6 +558,85 @@ def get_backend_service_backend_health(self, backend_service, backend):
.execute()
)

def create_serverless_neg(
self, name: str, region: str, service_name: str, network: str
):
"""Creates a serverless NEG.
Args:
name: The name of the NEG.
region: The region in which to create the NEG.
service_name: The name of the Cloud Run service. Format: "namespaces/{namespace}/services/{service}"
network: The network of the NEG. Format: "projects/{project}/global/networks/{network}"
Returns:
The NEG selfLink URL
"""
name = name + "-neg"
neg_body = {
"name": name,
"networkEndpointType": "SERVERLESS",
"cloudRun": {"service": service_name},
}

try:
logger.info("Creating serverless NEG %s in %s", name, region)
operation = (
self.api.regionNetworkEndpointGroups()
.insert(project=self.project, region=region, body=neg_body)
.execute()
)
neg = self.get_serverless_network_endpoint_group(name, region)
print(neg)
return neg

except Exception as e:
logger.exception("Error creating serverless NEG: %s", e)
raise

def delete_serverless_neg(self, name: str, zone: str):
"""Deletes a serverless NEG.
Args:
name: The name of the NEG to delete.
zone: The zone of the NEG.
"""
try:
logger.info("Deleting serverless NEG %s in %s", name, zone)
operation = (
self.api.networkEndpointGroups()
.delete(
project=self.project, zone=zone, networkEndpointGroup=name
)
.execute()
)
self._wait(
operation["name"], self._WAIT_FOR_OPERATION_SEC
) # Wait for operation completion

except googleapiclient.errors.HttpError as error:
if error.resp.status == 404: # NEG not found
logger.debug(
"NEG %s not found in zone %s. Skipping deletion.",
name,
zone,
)
return
logger.exception("Error deleting serverless NEG: %s", error)
raise
except Exception as e:
logger.exception("Error deleting serverless NEG: %s", e)
raise

def get_serverless_network_endpoint_group(self, name, region):
neg = (
self.api.regionNetworkEndpointGroups()
.get(project=self.project, networkEndpointGroup=name, region=region)
.execute()
)
# TODO(sergiitk): dataclass
return neg

def _get_resource(
self, collection: discovery.Resource, **kwargs
) -> "GcpResource":
Expand Down Expand Up @@ -651,6 +730,8 @@ def _execute( # pylint: disable=arguments-differ
)
request.headers[DEBUG_HEADER_KEY] = self.gfe_debug_header
request.add_response_callback(self._log_debug_header)
logger.info("Executing request: %s", request)
logger.info(request.to_json())
operation = request.execute(num_retries=self._GCP_API_RETRIES)
logger.debug("Operation %s", operation)
return self._wait(operation["name"], timeout_sec)
Expand Down
42 changes: 26 additions & 16 deletions framework/infrastructure/gcp/network_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,19 @@ class RouteMatch:
@classmethod
def from_response(cls, d: Dict[str, Any]) -> "GrpcRoute.RouteMatch":
return cls(
method=GrpcRoute.MethodMatch.from_response(d["method"])
if "method" in d
else None,
headers=tuple(
GrpcRoute.HeaderMatch.from_response(h) for h in d["headers"]
)
if "headers" in d
else (),
method=(
GrpcRoute.MethodMatch.from_response(d["method"])
if "method" in d
else None
),
headers=(
tuple(
GrpcRoute.HeaderMatch.from_response(h)
for h in d["headers"]
)
if "headers" in d
else ()
),
)

@dataclasses.dataclass(frozen=True)
Expand Down Expand Up @@ -224,14 +229,19 @@ class RouteMatch:
@classmethod
def from_response(cls, d: Dict[str, Any]) -> "HttpRoute.RouteMatch":
return cls(
method=HttpRoute.MethodMatch.from_response(d["method"])
if "method" in d
else None,
headers=tuple(
HttpRoute.HeaderMatch.from_response(h) for h in d["headers"]
)
if "headers" in d
else (),
method=(
HttpRoute.MethodMatch.from_response(d["method"])
if "method" in d
else None
),
headers=(
tuple(
HttpRoute.HeaderMatch.from_response(h)
for h in d["headers"]
)
if "headers" in d
else ()
),
)

@dataclasses.dataclass(frozen=True)
Expand Down
Loading

0 comments on commit ad3d5ca

Please sign in to comment.