Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Jul 31, 2024
1 parent fb792d0 commit 994e4a5
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 98 deletions.
91 changes: 0 additions & 91 deletions docker/go-control-plane/controlplane/callbacks.go

This file was deleted.

50 changes: 43 additions & 7 deletions docker/go-control-plane/fallback-control-plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
"fmt"
"log"
"net"
"os"
"strconv"
"sync"

v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/eugeneo/fallback-control-plane/controlplane"
pb_cs "github.com/eugeneo/fallback-control-plane/grpc/interop/grpc_testing/xdsconfig"

Expand All @@ -50,25 +52,41 @@ var (
upstream = flag.String("upstream", "localhost:3000", "upstream server")
)

type Filter struct {
ResourceType string
ResourceName string
}

// controlService provides a gRPC API to configure test-specific control plane
// behaviors.
type controlService struct {
pb_cs.UnsafeXdsConfigControlServiceServer
version uint32
clusters map[string]*pb_cluster.Cluster
listeners map[string]*pb_listener.Listener
filters map[string]map[string]bool
cache cache.SnapshotCache
Cb *controlplane.Callbacks
mu sync.Mutex
}


// StopOnRequest instructs the control plane to stop if any xDS client request
// the specific resource.
func (srv *controlService) StopOnRequest(_ context.Context, req *pb_cs.StopOnRequestRequest) (*pb_cs.StopOnRequestResponse, error) {
srv.Cb.AddFilter(req.GetResourceType(), req.GetResourceName())
srv.mu.Lock()
defer srv.mu.Unlock()
if val, ok := srv.filters[req.GetResourceType()]; ok {
val[req.GetResourceName()] = true
} else {
srv.filters[req.GetResourceType()] = map[string]bool{req.GetResourceName(): true}
}
res := pb_cs.StopOnRequestResponse{}
for _, f := range srv.Cb.GetFilters() {
res.Filters = append(res.Filters, &pb_cs.StopOnRequestResponse_ResourceFilter{ResourceType: f.ResourceType, ResourceName: f.ResourceName})
for t, names := range srv.filters {
for name, b := range names {
if b {
res.Filters = append(res.Filters, &pb_cs.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name})
}
}
}
return &res, nil
}
Expand Down Expand Up @@ -161,6 +179,24 @@ func (srv *controlService) MakeSnapshot() (*cache.Snapshot, error) {
return snapshot, nil
}

// Abruptly stops the server when the client requests a resource that the test
// marked as one that should trigger this behavior
func (cb *controlService) onStreamRequest(id int64, req *v3discoverypb.DiscoveryRequest) error {
cb.mu.Lock()
defer cb.mu.Unlock()
log.Printf("Received request for %s on stream %d: %v:%v\n", req.GetTypeUrl(), id, req.VersionInfo, req.ResourceNames)
filtered := cb.filters[req.GetTypeUrl()]
if filtered != nil {
for _, name := range req.ResourceNames {
if filtered[name] {
log.Printf("Self destructing: %s/%s\n", req.GetTypeUrl(), name)
os.Exit(0)
}
}
}
return nil
}

// Main entry point. Configures and starts a gRPC server that serves xDS traffic
// and provides an interface for tests to manage control plane behavior.
func main() {
Expand All @@ -173,11 +209,11 @@ func main() {
if err != nil || parsedUpstreamPort <= 0 {
log.Fatalf("Not a valid port number: %+v: %+v\n", upstreamPort, err)
}
cb := &controlplane.Callbacks{Filters: make(map[string]map[string]bool)}
// The type needs to be checked
controlService := &controlService{Cb: cb, version: 1,
controlService := &controlService{version: 1,
clusters: map[string]*pb_cluster.Cluster{controlplane.ListenerName: controlplane.MakeCluster(controlplane.ClusterName, host, uint32(parsedUpstreamPort))},
listeners: map[string]*pb_listener.Listener{controlplane.ListenerName: controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName)},
filters: map[string]map[string]bool{},
cache: cache.NewSnapshotCache(false, cache.IDHash{}, nil),
}
// Create a cache
Expand All @@ -193,7 +229,7 @@ func main() {
// Run the xDS server
ctx := context.Background()
srv := server.NewServer(ctx, controlService.cache, server.CallbackFuncs{
StreamRequestFunc: cb.OnStreamRequest,
StreamRequestFunc: controlService.onStreamRequest,
})
err = controlplane.RunServer(srv, controlService, *port)
if err != nil {
Expand Down

0 comments on commit 994e4a5

Please sign in to comment.