diff --git a/docker/go-control-plane/controlplane/server.go b/docker/go-control-plane/controlplane/server.go deleted file mode 100644 index 3c3cb029..00000000 --- a/docker/go-control-plane/controlplane/server.go +++ /dev/null @@ -1,81 +0,0 @@ -// Copyright 2020 Envoyproxy Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package controlplane - -import ( - "context" - "fmt" - "log" - "net" - "time" - - "google.golang.org/grpc" - channelz "google.golang.org/grpc/channelz/service" - "google.golang.org/grpc/reflection" - - cs "github.com/eugeneo/fallback-control-plane/grpc/interop/grpc_testing/xdsconfig" - - pb_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/envoyproxy/go-control-plane/pkg/server/v3" - "github.com/envoyproxy/go-control-plane/pkg/test/v3" -) - -const ( - grpcKeepaliveTime = 30 * time.Second - grpcKeepaliveTimeout = 5 * time.Second - grpcKeepaliveMinTime = 30 * time.Second - grpcMaxConcurrentStreams = 5 -) - -// Server serves xDS traffic. -type Server struct { - xdsserver server.Server -} - -// NewServer creates a new instance of the Server struct. -func NewServer(ctx context.Context, cache cache.Cache, cb *test.Callbacks) *Server { - srv := server.NewServer(ctx, cache, cb) - return &Server{srv} -} - -// Registers gRPC services needed to serve xDS traffic. -func registerServer(grpcServer *grpc.Server, server server.Server) { - // register services - pb_discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, server) -} - -// RunServer starts an xDS server at the given port. Blocks while the server is -// running -func RunServer(srv server.Server, controlService cs.XdsConfigControlServiceServer, port uint) error { - // gRPC golang library sets a very small upper bound for the number gRPC/h2 - // streams over a single TCP connection. If a proxy multiplexes requests over - // a single connection to the management server, then it might lead to - // availability problems. Keepalive timeouts based on connection_keepalive parameter https://www.envoyproxy.io/docs/envoy/latest/configuration/overview/examples#dynamic - grpcServer := grpc.NewServer() - reflection.Register(grpcServer) - channelz.RegisterChannelzServiceToServer(grpcServer) - cs.RegisterXdsConfigControlServiceServer(grpcServer, controlService) - lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) - if err != nil { - return err - } - registerServer(grpcServer, srv) - log.Printf("management server listening on %d\n", port) - if err = grpcServer.Serve(lis); err != nil { - log.Println(err) - } - return nil -} diff --git a/docker/go-control-plane/fallback-control-plane.go b/docker/go-control-plane/fallback-control-plane.go index 686a0f10..20737222 100644 --- a/docker/go-control-plane/fallback-control-plane.go +++ b/docker/go-control-plane/fallback-control-plane.go @@ -32,18 +32,23 @@ import ( "strconv" "sync" - v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "google.golang.org/grpc" + channelz "google.golang.org/grpc/channelz/service" + "google.golang.org/grpc/reflection" + "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/types/known/anypb" + "github.com/eugeneo/fallback-control-plane/controlplane" - pb_cs "github.com/eugeneo/fallback-control-plane/grpc/interop/grpc_testing/xdsconfig" + xdsconfigpb "github.com/eugeneo/fallback-control-plane/grpc/interop/grpc_testing/xdsconfig" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" - pb_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" - pb_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" "github.com/envoyproxy/go-control-plane/pkg/server/v3" - "google.golang.org/protobuf/encoding/prototext" - "google.golang.org/protobuf/types/known/anypb" ) var ( @@ -60,19 +65,18 @@ type Filter struct { // controlService provides a gRPC API to configure test-specific control plane // behaviors. type controlService struct { - pb_cs.UnsafeXdsConfigControlServiceServer + xdsconfigpb.UnsafeXdsConfigControlServiceServer version uint32 - clusters map[string]*pb_cluster.Cluster - listeners map[string]*pb_listener.Listener + clusters map[string]*v3clusterpb.Cluster + listeners map[string]*v3listenerpb.Listener filters map[string]map[string]bool cache cache.SnapshotCache mu sync.Mutex } - // StopOnRequest instructs the control plane to stop if any xDS client request // the specific resource. -func (srv *controlService) StopOnRequest(_ context.Context, req *pb_cs.StopOnRequestRequest) (*pb_cs.StopOnRequestResponse, error) { +func (srv *controlService) StopOnRequest(_ context.Context, req *xdsconfigpb.StopOnRequestRequest) (*xdsconfigpb.StopOnRequestResponse, error) { srv.mu.Lock() defer srv.mu.Unlock() if val, ok := srv.filters[req.GetResourceType()]; ok { @@ -80,11 +84,11 @@ func (srv *controlService) StopOnRequest(_ context.Context, req *pb_cs.StopOnReq } else { srv.filters[req.GetResourceType()] = map[string]bool{req.GetResourceName(): true} } - res := pb_cs.StopOnRequestResponse{} + res := xdsconfigpb.StopOnRequestResponse{} for t, names := range srv.filters { for name, b := range names { if b { - res.Filters = append(res.Filters, &pb_cs.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name}) + res.Filters = append(res.Filters, &xdsconfigpb.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name}) } } } @@ -94,7 +98,7 @@ func (srv *controlService) StopOnRequest(_ context.Context, req *pb_cs.StopOnReq // UpsertResources allows the test to provide a new or replace existing xDS // resource. Notification will be sent to any control plane clients watching // the resource being updated. -func (srv *controlService) UpsertResources(_ context.Context, req *pb_cs.UpsertResourcesRequest) (*pb_cs.UpsertResourcesResponse, error) { +func (srv *controlService) UpsertResources(_ context.Context, req *xdsconfigpb.UpsertResourcesRequest) (*xdsconfigpb.UpsertResourcesResponse, error) { srv.mu.Lock() defer srv.mu.Unlock() srv.version++ @@ -110,7 +114,7 @@ func (srv *controlService) UpsertResources(_ context.Context, req *pb_cs.UpsertR return nil, err } srv.cache.SetSnapshot(context.Background(), *nodeid, snapshot) - res := &pb_cs.UpsertResourcesResponse{} + res := &xdsconfigpb.UpsertResourcesResponse{} for _, l := range srv.listeners { a, err := anypb.New(l) if err != nil { @@ -197,6 +201,23 @@ func (cb *controlService) onStreamRequest(id int64, req *v3discoverypb.Discovery return nil } +func RunServer(srv server.Server, controlService xdsconfigpb.XdsConfigControlServiceServer, port uint) error { + grpcServer := grpc.NewServer() + reflection.Register(grpcServer) + channelz.RegisterChannelzServiceToServer(grpcServer) + xdsconfigpb.RegisterXdsConfigControlServiceServer(grpcServer, controlService) + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + return err + } + v3discoverypb.RegisterAggregatedDiscoveryServiceServer(grpcServer, srv) + log.Printf("management server listening on %d\n", port) + if err = grpcServer.Serve(lis); err != nil { + log.Println(err) + } + return nil +} + // Main entry point. Configures and starts a gRPC server that serves xDS traffic // and provides an interface for tests to manage control plane behavior. func main() { @@ -211,8 +232,8 @@ func main() { } // The type needs to be checked controlService := &controlService{version: 1, - clusters: map[string]*pb_cluster.Cluster{controlplane.ListenerName: controlplane.MakeCluster(controlplane.ClusterName, host, uint32(parsedUpstreamPort))}, - listeners: map[string]*pb_listener.Listener{controlplane.ListenerName: controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName)}, + clusters: map[string]*v3clusterpb.Cluster{controlplane.ListenerName: controlplane.MakeCluster(controlplane.ClusterName, host, uint32(parsedUpstreamPort))}, + listeners: map[string]*v3listenerpb.Listener{controlplane.ListenerName: controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName)}, filters: map[string]map[string]bool{}, cache: cache.NewSnapshotCache(false, cache.IDHash{}, nil), } @@ -231,7 +252,7 @@ func main() { srv := server.NewServer(ctx, controlService.cache, server.CallbackFuncs{ StreamRequestFunc: controlService.onStreamRequest, }) - err = controlplane.RunServer(srv, controlService, *port) + err = RunServer(srv, controlService, *port) if err != nil { log.Fatalf("Server startup failed: %q\n", err) }