diff --git a/docker/go-control-plane/controlplane/resource.go b/docker/go-control-plane/controlplane/resource.go index f3edcb35..17d18057 100644 --- a/docker/go-control-plane/controlplane/resource.go +++ b/docker/go-control-plane/controlplane/resource.go @@ -15,16 +15,11 @@ package controlplane import ( - "strconv" "time" "google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/durationpb" - "github.com/envoyproxy/go-control-plane/pkg/cache/types" - "github.com/envoyproxy/go-control-plane/pkg/cache/v3" - "github.com/envoyproxy/go-control-plane/pkg/resource/v3" - pb_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" pb_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" pb_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" @@ -127,13 +122,3 @@ func MakeHTTPListener(listenerName, cluster string) *pb_listener.Listener { }, } } - -// GenerateSnapshot prepares a new xDS config snapshot for serving to clients. -func GenerateSnapshot(upstreamHost string, upstreamPort uint32) *cache.Snapshot { - snap, _ := cache.NewSnapshot(strconv.Itoa(snapshot_version), map[resource.Type][]types.Resource{ - resource.ClusterType: {MakeCluster(ClusterName, upstreamHost, upstreamPort)}, - resource.ListenerType: {MakeHTTPListener(ListenerName, ClusterName)}, - }) - snapshot_version++ - return snap -} diff --git a/docker/go-control-plane/fallback-control-plane.go b/docker/go-control-plane/fallback-control-plane.go index 49b7cc80..27605d3a 100644 --- a/docker/go-control-plane/fallback-control-plane.go +++ b/docker/go-control-plane/fallback-control-plane.go @@ -35,7 +35,7 @@ import ( "google.golang.org/grpc" channelz "google.golang.org/grpc/channelz/service" "google.golang.org/grpc/reflection" - "google.golang.org/protobuf/encoding/prototext" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/anypb" "github.com/eugeneo/fallback-control-plane/controlplane" @@ -67,11 +67,11 @@ type Filter struct { type controlService struct { xdsconfigpb.UnsafeXdsConfigControlServiceServer version uint32 + mu sync.Mutex // Guards access to all fields listed below clusters map[string]*v3clusterpb.Cluster listeners map[string]*v3listenerpb.Listener filters map[string]map[string]bool cache cache.SnapshotCache - mu sync.Mutex } // StopOnRequest instructs the control plane to stop if any xDS client request @@ -86,10 +86,8 @@ func (srv *controlService) StopOnRequest(_ context.Context, req *xdsconfigpb.Sto } res := xdsconfigpb.StopOnRequestResponse{} for t, names := range srv.filters { - for name, b := range names { - if b { - res.Filters = append(res.Filters, &xdsconfigpb.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name}) - } + for name, _ := range names { + res.Filters = append(res.Filters, &xdsconfigpb.StopOnRequestResponse_ResourceFilter{ResourceType: t, ResourceName: name}) } } return &res, nil @@ -108,109 +106,101 @@ func (srv *controlService) UpsertResources(_ context.Context, req *xdsconfigpb.U } srv.clusters[req.Cluster] = controlplane.MakeCluster(req.Cluster, req.UpstreamHost, req.UpstreamPort) srv.listeners[listener] = controlplane.MakeHTTPListener(listener, req.Cluster) - snapshot, err := srv.MakeSnapshot() - if err != nil { - log.Printf("Snapshot inconsistency: %+v\n", err) + if err := srv.RefreshSnapshot(); err != nil { return nil, err } - srv.cache.SetSnapshot(context.Background(), *nodeid, snapshot) res := &xdsconfigpb.UpsertResourcesResponse{} for _, l := range srv.listeners { a, err := anypb.New(l) if err != nil { - panic(err) + log.Fatalf("Failed to convert listener %v to pb: %v\n", l, err) } res.Resource = append(res.Resource, a) } for _, c := range srv.clusters { a, err := anypb.New(c) if err != nil { - panic(err) + log.Fatalf("Failed to convert cluster %v to pb: %v\n", c, err) } res.Resource = append(res.Resource, a) } return res, nil } -// MakeSnapshot builds xDS configuration snapshot that will be served -// to clients. -func (srv *controlService) MakeSnapshot() (*cache.Snapshot, error) { - listeners := make([]types.Resource, len(srv.listeners)) - i := 0 +// Abruptly stops the server when the client requests a resource that the test +// marked as one that should trigger this behavior +func (srv *controlService) onStreamRequest(id int64, req *v3discoverypb.DiscoveryRequest) error { + srv.mu.Lock() + defer srv.mu.Unlock() + log.Printf("Received request for %s on stream %d: %v:%v\n", req.GetTypeUrl(), id, req.VersionInfo, req.ResourceNames) + filtered := srv.filters[req.GetTypeUrl()] + if filtered != nil { + for _, name := range req.ResourceNames { + if filtered[name] { + log.Printf("Self destructing: %s/%s\n", req.GetTypeUrl(), name) + os.Exit(0) + } + } + } + return nil +} + +func (srv *controlService) RefreshSnapshot() error { + var listeners []types.Resource for _, l := range srv.listeners { - listeners[i] = l - i++ + listeners = append(listeners, l) } - clusters := make([]types.Resource, len(srv.clusters)) - i = 0 + var clusters []types.Resource for _, c := range srv.clusters { - clusters[i] = c - i++ + clusters = append(clusters, c) } resources := map[resource.Type][]types.Resource{resource.ListenerType: listeners, resource.ClusterType: clusters} // Create the snapshot that we'll serve to Envoy - snapshot, error := cache.NewSnapshot(fmt.Sprint(srv.version), resources) - if error != nil { - return nil, error - } - if err := snapshot.Consistent(); err != nil { - log.Printf("Snapshot inconsistency: %+v\n", err) - for _, r := range snapshot.Resources { - for name, resource := range r.Items { - bytes, err := prototext.MarshalOptions{Multiline: true}.Marshal(resource.Resource) - if err != nil { - log.Printf("Can't marshal %s\n", name) - } else { - log.Printf("Resource: %s\n%s\n", - resource.Resource, - string(bytes)) - } - } - } - return nil, err + snapshot, err := cache.NewSnapshot(fmt.Sprint(srv.version), resources) + if err != nil { + return err } - log.Printf("Will serve snapshot:\n") + log.Printf("Snapshot contents:\n") for _, values := range snapshot.Resources { for name, item := range values.Items { - text, err := prototext.MarshalOptions{Multiline: true}.Marshal(item.Resource) + text, err := protojson.MarshalOptions{Multiline: true}.Marshal(item.Resource) if err != nil { - log.Printf("Resource %+v, error: %+v\n", name, err) - } else { - log.Printf("%+v => %+v\n", name, string(text)) + log.Printf("Resource %v, error: %v\n", name, err) + continue } + log.Printf("%v => %v\n", name, string(text)) } } - return snapshot, nil -} - -// Abruptly stops the server when the client requests a resource that the test -// marked as one that should trigger this behavior -func (cb *controlService) onStreamRequest(id int64, req *v3discoverypb.DiscoveryRequest) error { - cb.mu.Lock() - defer cb.mu.Unlock() - log.Printf("Received request for %s on stream %d: %v:%v\n", req.GetTypeUrl(), id, req.VersionInfo, req.ResourceNames) - filtered := cb.filters[req.GetTypeUrl()] - if filtered != nil { - for _, name := range req.ResourceNames { - if filtered[name] { - log.Printf("Self destructing: %s/%s\n", req.GetTypeUrl(), name) - os.Exit(0) - } - } + if err := snapshot.Consistent(); err != nil { + log.Printf("Snapshot inconsistency: %v\n", err) + return err + } + // Add the snapshot to the cache + if err := srv.cache.SetSnapshot(context.Background(), *nodeid, snapshot); err != nil { + log.Printf("Snapshot error %v for %v\n", err, snapshot) + return err } return nil } -func RunServer(srv server.Server, controlService xdsconfigpb.XdsConfigControlServiceServer, port uint) error { - grpcServer := grpc.NewServer() - reflection.Register(grpcServer) - channelz.RegisterChannelzServiceToServer(grpcServer) - xdsconfigpb.RegisterXdsConfigControlServiceServer(grpcServer, controlService) + +func (srv *controlService) RunServer(port uint) error { + if err := srv.RefreshSnapshot(); err != nil { + log.Fatalf("Failed to refresh snapshot: %v\n", err) + } + // Run the xDS server + server := server.NewServer(context.Background(), srv.cache, server.CallbackFuncs{ + StreamRequestFunc: srv.onStreamRequest, + }) lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) if err != nil { return err } - v3discoverypb.RegisterAggregatedDiscoveryServiceServer(grpcServer, srv) + grpcServer := grpc.NewServer() + reflection.Register(grpcServer) + channelz.RegisterChannelzServiceToServer(grpcServer) + xdsconfigpb.RegisterXdsConfigControlServiceServer(grpcServer, srv) + v3discoverypb.RegisterAggregatedDiscoveryServiceServer(grpcServer, server) log.Printf("Management server listening on %d\n", port) if err = grpcServer.Serve(lis); err != nil { log.Println(err) @@ -218,42 +208,36 @@ func RunServer(srv server.Server, controlService xdsconfigpb.XdsConfigControlSer return nil } -// Main entry point. Configures and starts a gRPC server that serves xDS traffic -// and provides an interface for tests to manage control plane behavior. -func main() { - flag.Parse() +func parseHostPort(host_port string) (string, uint32, error) { host, upstreamPort, err := net.SplitHostPort(*upstream) if err != nil { - log.Fatalf("Incorrect upstream host name: %+v: %+v\n", upstream, err) + return "", 0, fmt.Errorf("Incorrect upstream host name: %s: %v\n", host_port, err) } parsedUpstreamPort, err := strconv.Atoi(upstreamPort) if err != nil || parsedUpstreamPort <= 0 { - log.Fatalf("Not a valid port number: %+v: %+v\n", upstreamPort, err) + return "", 0, fmt.Errorf("Not a valid port number: %d: %v\n", upstreamPort, err) + } + return host, uint32(parsedUpstreamPort), nil +} + + +// Main entry point. Configures and starts a gRPC server that serves xDS traffic +// and provides an interface for tests to manage control plane behavior. +func main() { + flag.Parse() + host, upstreamPort, err := parseHostPort(*upstream) + if err != nil { + log.Fatalf("Incorrect upstream host name: %s: %v\n", upstream, err) } - // The type needs to be checked + initial_cds := controlplane.MakeCluster(controlplane.ClusterName, host, upstreamPort) + initial_lds := controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName) controlService := &controlService{version: 1, - clusters: map[string]*v3clusterpb.Cluster{controlplane.ListenerName: controlplane.MakeCluster(controlplane.ClusterName, host, uint32(parsedUpstreamPort))}, - listeners: map[string]*v3listenerpb.Listener{controlplane.ListenerName: controlplane.MakeHTTPListener(controlplane.ListenerName, controlplane.ClusterName)}, + clusters: map[string]*v3clusterpb.Cluster{controlplane.ListenerName: initial_cds}, + listeners: map[string]*v3listenerpb.Listener{controlplane.ListenerName: initial_lds}, filters: map[string]map[string]bool{}, cache: cache.NewSnapshotCache(false, cache.IDHash{}, nil), } - // Create a cache - snapshot, err := controlService.MakeSnapshot() - if err != nil { - log.Fatalf("Snapshot error %q for %+v\n", err, snapshot) - } - // Add the snapshot to the cache - if err := controlService.cache.SetSnapshot(context.Background(), *nodeid, snapshot); err != nil { - log.Fatalf("Snapshot error %q for %+v\n", err, snapshot) - } - - // Run the xDS server - ctx := context.Background() - srv := server.NewServer(ctx, controlService.cache, server.CallbackFuncs{ - StreamRequestFunc: controlService.onStreamRequest, - }) - err = RunServer(srv, controlService, *port) - if err != nil { - log.Fatalf("Server startup failed: %q\n", err) + if err := controlService.RunServer(*port); err != nil { + log.Fatalf("Server startup failed: %v\n", err) } }