Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Jul 31, 2024
1 parent 994e4a5 commit 9e75005
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 99 deletions.
81 changes: 0 additions & 81 deletions docker/go-control-plane/controlplane/server.go

This file was deleted.

57 changes: 39 additions & 18 deletions docker/go-control-plane/fallback-control-plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,23 @@ import (
"strconv"
"sync"

v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"google.golang.org/grpc"
channelz "google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/types/known/anypb"

"github.com/eugeneo/fallback-control-plane/controlplane"
pb_cs "github.com/eugeneo/fallback-control-plane/grpc/interop/grpc_testing/xdsconfig"
xdsconfigpb "github.com/eugeneo/fallback-control-plane/grpc/interop/grpc_testing/xdsconfig"

v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"

pb_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
pb_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/types/known/anypb"
)

var (
Expand All @@ -60,31 +65,30 @@ type Filter struct {
// controlService provides a gRPC API to configure test-specific control plane
// behaviors.
type controlService struct {
pb_cs.UnsafeXdsConfigControlServiceServer
xdsconfigpb.UnsafeXdsConfigControlServiceServer
version uint32
clusters map[string]*pb_cluster.Cluster
listeners map[string]*pb_listener.Listener
clusters map[string]*v3clusterpb.Cluster
listeners map[string]*v3listenerpb.Listener
filters map[string]map[string]bool
cache cache.SnapshotCache
mu sync.Mutex
}


// StopOnRequest instructs the control plane to stop if any xDS client request
// the specific resource.
func (srv *controlService) StopOnRequest(_ context.Context, req *pb_cs.StopOnRequestRequest) (*pb_cs.StopOnRequestResponse, error) {
func (srv *controlService) StopOnRequest(_ context.Context, req *xdsconfigpb.StopOnRequestRequest) (*xdsconfigpb.StopOnRequestResponse, error) {
srv.mu.Lock()
defer srv.mu.Unlock()
if val, ok := srv.filters[req.GetResourceType()]; ok {
val[req.GetResourceName()] = true
} else {
srv.filters[req.GetResourceType()] = map[string]bool{req.GetResourceName(): true}
}
res := pb_cs.StopOnRequestResponse{}
res := xdsconfigpb.StopOnRequestResponse{}
for t, names := range srv.filters {
for name, b := range names {
if b {
res.Filters = append(res.Filters, &pb_cs.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name})
res.Filters = append(res.Filters, &xdsconfigpb.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name})
}
}
}
Expand All @@ -94,7 +98,7 @@ func (srv *controlService) StopOnRequest(_ context.Context, req *pb_cs.StopOnReq
// UpsertResources allows the test to provide a new or replace existing xDS
// resource. Notification will be sent to any control plane clients watching
// the resource being updated.
func (srv *controlService) UpsertResources(_ context.Context, req *pb_cs.UpsertResourcesRequest) (*pb_cs.UpsertResourcesResponse, error) {
func (srv *controlService) UpsertResources(_ context.Context, req *xdsconfigpb.UpsertResourcesRequest) (*xdsconfigpb.UpsertResourcesResponse, error) {
srv.mu.Lock()
defer srv.mu.Unlock()
srv.version++
Expand All @@ -110,7 +114,7 @@ func (srv *controlService) UpsertResources(_ context.Context, req *pb_cs.UpsertR
return nil, err
}
srv.cache.SetSnapshot(context.Background(), *nodeid, snapshot)
res := &pb_cs.UpsertResourcesResponse{}
res := &xdsconfigpb.UpsertResourcesResponse{}
for _, l := range srv.listeners {
a, err := anypb.New(l)
if err != nil {
Expand Down Expand Up @@ -197,6 +201,23 @@ func (cb *controlService) onStreamRequest(id int64, req *v3discoverypb.Discovery
return nil
}

func RunServer(srv server.Server, controlService xdsconfigpb.XdsConfigControlServiceServer, port uint) error {
grpcServer := grpc.NewServer()
reflection.Register(grpcServer)
channelz.RegisterChannelzServiceToServer(grpcServer)
xdsconfigpb.RegisterXdsConfigControlServiceServer(grpcServer, controlService)
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return err
}
v3discoverypb.RegisterAggregatedDiscoveryServiceServer(grpcServer, srv)
log.Printf("management server listening on %d\n", port)
if err = grpcServer.Serve(lis); err != nil {
log.Println(err)
}
return nil
}

// Main entry point. Configures and starts a gRPC server that serves xDS traffic
// and provides an interface for tests to manage control plane behavior.
func main() {
Expand All @@ -211,8 +232,8 @@ func main() {
}
// The type needs to be checked
controlService := &controlService{version: 1,
clusters: map[string]*pb_cluster.Cluster{controlplane.ListenerName: controlplane.MakeCluster(controlplane.ClusterName, host, uint32(parsedUpstreamPort))},
listeners: map[string]*pb_listener.Listener{controlplane.ListenerName: controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName)},
clusters: map[string]*v3clusterpb.Cluster{controlplane.ListenerName: controlplane.MakeCluster(controlplane.ClusterName, host, uint32(parsedUpstreamPort))},
listeners: map[string]*v3listenerpb.Listener{controlplane.ListenerName: controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName)},
filters: map[string]map[string]bool{},
cache: cache.NewSnapshotCache(false, cache.IDHash{}, nil),
}
Expand All @@ -231,7 +252,7 @@ func main() {
srv := server.NewServer(ctx, controlService.cache, server.CallbackFuncs{
StreamRequestFunc: controlService.onStreamRequest,
})
err = controlplane.RunServer(srv, controlService, *port)
err = RunServer(srv, controlService, *port)
if err != nil {
log.Fatalf("Server startup failed: %q\n", err)
}
Expand Down

0 comments on commit 9e75005

Please sign in to comment.