Skip to content

Commit

Permalink
[fixup] Codereview
Browse files Browse the repository at this point in the history
  • Loading branch information
eugeneo committed Aug 1, 2024
1 parent 35fd043 commit 88fb135
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 112 deletions.
15 changes: 0 additions & 15 deletions docker/go-control-plane/controlplane/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,11 @@
package controlplane

import (
"strconv"
"time"

"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"

pb_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
pb_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
pb_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
Expand Down Expand Up @@ -127,13 +122,3 @@ func MakeHTTPListener(listenerName, cluster string) *pb_listener.Listener {
},
}
}

// GenerateSnapshot prepares a new xDS config snapshot for serving to clients.
func GenerateSnapshot(upstreamHost string, upstreamPort uint32) *cache.Snapshot {
snap, _ := cache.NewSnapshot(strconv.Itoa(snapshot_version), map[resource.Type][]types.Resource{
resource.ClusterType: {MakeCluster(ClusterName, upstreamHost, upstreamPort)},
resource.ListenerType: {MakeHTTPListener(ListenerName, ClusterName)},
})
snapshot_version++
return snap
}
178 changes: 81 additions & 97 deletions docker/go-control-plane/fallback-control-plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"google.golang.org/grpc"
channelz "google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/types/known/anypb"

"github.com/eugeneo/fallback-control-plane/controlplane"
Expand Down Expand Up @@ -67,11 +67,11 @@ type Filter struct {
type controlService struct {
xdsconfigpb.UnsafeXdsConfigControlServiceServer
version uint32
mu sync.Mutex // Guards access to all fields listed below
clusters map[string]*v3clusterpb.Cluster
listeners map[string]*v3listenerpb.Listener
filters map[string]map[string]bool
cache cache.SnapshotCache
mu sync.Mutex
}

// StopOnRequest instructs the control plane to stop if any xDS client request
Expand All @@ -86,10 +86,8 @@ func (srv *controlService) StopOnRequest(_ context.Context, req *xdsconfigpb.Sto
}
res := xdsconfigpb.StopOnRequestResponse{}
for t, names := range srv.filters {
for name, b := range names {
if b {
res.Filters = append(res.Filters, &xdsconfigpb.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name})
}
for name, _ := range names {
res.Filters = append(res.Filters, &xdsconfigpb.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name})
}
}
return &res, nil
Expand All @@ -108,152 +106,138 @@ func (srv *controlService) UpsertResources(_ context.Context, req *xdsconfigpb.U
}
srv.clusters[req.Cluster] = controlplane.MakeCluster(req.Cluster, req.UpstreamHost, req.UpstreamPort)
srv.listeners[listener] = controlplane.MakeHTTPListener(listener, req.Cluster)
snapshot, err := srv.MakeSnapshot()
if err != nil {
log.Printf("Snapshot inconsistency: %+v\n", err)
if err := srv.RefreshSnapshot(); err != nil {
return nil, err
}
srv.cache.SetSnapshot(context.Background(), *nodeid, snapshot)
res := &xdsconfigpb.UpsertResourcesResponse{}
for _, l := range srv.listeners {
a, err := anypb.New(l)
if err != nil {
panic(err)
log.Fatalf("Failed to convert listener %v to pb: %v\n", l, err)
}
res.Resource = append(res.Resource, a)
}
for _, c := range srv.clusters {
a, err := anypb.New(c)
if err != nil {
panic(err)
log.Fatalf("Failed to convert cluster %v to pb: %v\n", c, err)
}
res.Resource = append(res.Resource, a)
}
return res, nil
}

// MakeSnapshot builds xDS configuration snapshot that will be served
// to clients.
func (srv *controlService) MakeSnapshot() (*cache.Snapshot, error) {
listeners := make([]types.Resource, len(srv.listeners))
i := 0
// Abruptly stops the server when the client requests a resource that the test
// marked as one that should trigger this behavior
func (srv *controlService) onStreamRequest(id int64, req *v3discoverypb.DiscoveryRequest) error {
srv.mu.Lock()
defer srv.mu.Unlock()
log.Printf("Received request for %s on stream %d: %v:%v\n", req.GetTypeUrl(), id, req.VersionInfo, req.ResourceNames)
filtered := srv.filters[req.GetTypeUrl()]
if filtered != nil {
for _, name := range req.ResourceNames {
if filtered[name] {
log.Printf("Self destructing: %s/%s\n", req.GetTypeUrl(), name)
os.Exit(0)
}
}
}
return nil
}

func (srv *controlService) RefreshSnapshot() error {
var listeners []types.Resource
for _, l := range srv.listeners {
listeners[i] = l
i++
listeners = append(listeners, l)
}
clusters := make([]types.Resource, len(srv.clusters))
i = 0
var clusters []types.Resource
for _, c := range srv.clusters {
clusters[i] = c
i++
clusters = append(clusters, c)
}
resources := map[resource.Type][]types.Resource{resource.ListenerType: listeners, resource.ClusterType: clusters}
// Create the snapshot that we'll serve to Envoy
snapshot, error := cache.NewSnapshot(fmt.Sprint(srv.version), resources)
if error != nil {
return nil, error
}
if err := snapshot.Consistent(); err != nil {
log.Printf("Snapshot inconsistency: %+v\n", err)
for _, r := range snapshot.Resources {
for name, resource := range r.Items {
bytes, err := prototext.MarshalOptions{Multiline: true}.Marshal(resource.Resource)
if err != nil {
log.Printf("Can't marshal %s\n", name)
} else {
log.Printf("Resource: %s\n%s\n",
resource.Resource,
string(bytes))
}
}
}
return nil, err
snapshot, err := cache.NewSnapshot(fmt.Sprint(srv.version), resources)
if err != nil {
return err
}
log.Printf("Will serve snapshot:\n")
log.Printf("Snapshot contents:\n")
for _, values := range snapshot.Resources {
for name, item := range values.Items {
text, err := prototext.MarshalOptions{Multiline: true}.Marshal(item.Resource)
text, err := protojson.MarshalOptions{Multiline: true}.Marshal(item.Resource)
if err != nil {
log.Printf("Resource %+v, error: %+v\n", name, err)
} else {
log.Printf("%+v => %+v\n", name, string(text))
log.Printf("Resource %v, error: %v\n", name, err)
continue
}
log.Printf("%v => %v\n", name, string(text))
}
}
return snapshot, nil
}

// Abruptly stops the server when the client requests a resource that the test
// marked as one that should trigger this behavior
func (cb *controlService) onStreamRequest(id int64, req *v3discoverypb.DiscoveryRequest) error {
cb.mu.Lock()
defer cb.mu.Unlock()
log.Printf("Received request for %s on stream %d: %v:%v\n", req.GetTypeUrl(), id, req.VersionInfo, req.ResourceNames)
filtered := cb.filters[req.GetTypeUrl()]
if filtered != nil {
for _, name := range req.ResourceNames {
if filtered[name] {
log.Printf("Self destructing: %s/%s\n", req.GetTypeUrl(), name)
os.Exit(0)
}
}
if err := snapshot.Consistent(); err != nil {
log.Printf("Snapshot inconsistency: %v\n", err)
return err
}
// Add the snapshot to the cache
if err := srv.cache.SetSnapshot(context.Background(), *nodeid, snapshot); err != nil {
log.Printf("Snapshot error %v for %v\n", err, snapshot)
return err
}
return nil
}

func RunServer(srv server.Server, controlService xdsconfigpb.XdsConfigControlServiceServer, port uint) error {
grpcServer := grpc.NewServer()
reflection.Register(grpcServer)
channelz.RegisterChannelzServiceToServer(grpcServer)
xdsconfigpb.RegisterXdsConfigControlServiceServer(grpcServer, controlService)

func (srv *controlService) RunServer(port uint) error {
if err := srv.RefreshSnapshot(); err != nil {
log.Fatalf("Failed to refresh snapshot: %v\n", err)
}
// Run the xDS server
server := server.NewServer(context.Background(), srv.cache, server.CallbackFuncs{
StreamRequestFunc: srv.onStreamRequest,
})
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
return err
}
v3discoverypb.RegisterAggregatedDiscoveryServiceServer(grpcServer, srv)
grpcServer := grpc.NewServer()
reflection.Register(grpcServer)
channelz.RegisterChannelzServiceToServer(grpcServer)
xdsconfigpb.RegisterXdsConfigControlServiceServer(grpcServer, srv)
v3discoverypb.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
log.Printf("Management server listening on %d\n", port)
if err = grpcServer.Serve(lis); err != nil {
log.Println(err)
}
return nil
}

// Main entry point. Configures and starts a gRPC server that serves xDS traffic
// and provides an interface for tests to manage control plane behavior.
func main() {
flag.Parse()
func parseHostPort(host_port string) (string, uint32, error) {
host, upstreamPort, err := net.SplitHostPort(*upstream)
if err != nil {
log.Fatalf("Incorrect upstream host name: %+v: %+v\n", upstream, err)
return "", 0, fmt.Errorf("Incorrect upstream host name: %s: %v\n", host_port, err)
}
parsedUpstreamPort, err := strconv.Atoi(upstreamPort)
if err != nil || parsedUpstreamPort <= 0 {
log.Fatalf("Not a valid port number: %+v: %+v\n", upstreamPort, err)
return "", 0, fmt.Errorf("Not a valid port number: %d: %v\n", upstreamPort, err)
}
return host, uint32(parsedUpstreamPort), nil
}


// Main entry point. Configures and starts a gRPC server that serves xDS traffic
// and provides an interface for tests to manage control plane behavior.
func main() {
flag.Parse()
host, upstreamPort, err := parseHostPort(*upstream)
if err != nil {
log.Fatalf("Incorrect upstream host name: %s: %v\n", upstream, err)
}
// The type needs to be checked
initial_cds := controlplane.MakeCluster(controlplane.ClusterName, host, upstreamPort)
initial_lds := controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName)
controlService := &controlService{version: 1,
clusters: map[string]*v3clusterpb.Cluster{controlplane.ListenerName: controlplane.MakeCluster(controlplane.ClusterName, host, uint32(parsedUpstreamPort))},
listeners: map[string]*v3listenerpb.Listener{controlplane.ListenerName: controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName)},
clusters: map[string]*v3clusterpb.Cluster{controlplane.ListenerName: initial_cds},
listeners: map[string]*v3listenerpb.Listener{controlplane.ListenerName: initial_lds},
filters: map[string]map[string]bool{},
cache: cache.NewSnapshotCache(false, cache.IDHash{}, nil),
}
// Create a cache
snapshot, err := controlService.MakeSnapshot()
if err != nil {
log.Fatalf("Snapshot error %q for %+v\n", err, snapshot)
}
// Add the snapshot to the cache
if err := controlService.cache.SetSnapshot(context.Background(), *nodeid, snapshot); err != nil {
log.Fatalf("Snapshot error %q for %+v\n", err, snapshot)
}

// Run the xDS server
ctx := context.Background()
srv := server.NewServer(ctx, controlService.cache, server.CallbackFuncs{
StreamRequestFunc: controlService.onStreamRequest,
})
err = RunServer(srv, controlService, *port)
if err != nil {
log.Fatalf("Server startup failed: %q\n", err)
if err := controlService.RunServer(*port); err != nil {
log.Fatalf("Server startup failed: %v\n", err)
}
}

0 comments on commit 88fb135

Please sign in to comment.