Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Go control plane #103

Merged
merged 17 commits into from
Aug 2, 2024
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,6 @@ venv/
venv-*/
out/
protos/**/*_pb2*

### Intermitten build result
fallback-control-plane
32 changes: 32 additions & 0 deletions docker/go-control-plane/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#[eostroukhov] TODO migrate to Alpine to decrease image size
FROM golang:1.22.3-bookworm

WORKDIR /usr/src/app

RUN apt-get update -y && apt-get dist-upgrade -y && apt-get upgrade -y
RUN apt install -y unzip
RUN go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
RUN go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

ENV PATH="$PATH:$(go env GOPATH)/bin"

# Download proto zip
ENV PROTOC_ZIP=protoc-27.2-linux-x86_64.zip
RUN curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v27.2/${PROTOC_ZIP}
RUN unzip -o ${PROTOC_ZIP} -d ./proto
RUN chmod 755 -R ./proto/bin
ENV BASE=/usr
# Copy into path
RUN cp ./proto/bin/protoc ${BASE}/bin/
RUN cp -R ./proto/include/* ${BASE}/include/

# pre-copy/cache go.mod for pre-downloading dependencies and only redownloading them in subsequent builds if they change
COPY docker/go-control-plane/go.mod docker/go-control-plane/go.sum ./
RUN go mod download && go mod verify

COPY protos ./protos
RUN protoc -I=. --go_out=. protos/grpc/testing/xdsconfig/*.proto --go-grpc_out=.
COPY docker/go-control-plane .
RUN go build -v -o /usr/local/bin/fallback-control-plane .

ENTRYPOINT ["fallback-control-plane"]
163 changes: 163 additions & 0 deletions docker/go-control-plane/controlplane/callbacks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// Package controlplane provides components for the common control plane used
// for testing
package controlplane

import (
"context"
"log"
"os"
"sync"

pb_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
pb_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
)

// Implementation of the server.Callbacks interface that implements the behavior
// required by our tests.
type Callbacks struct {
Signal chan struct{}
Fetches int
Requests int
Responses int
DeltaRequests int
DeltaResponses int
Filters map[string]map[string]bool
mu sync.Mutex
}

var _ server.Callbacks = &Callbacks{}

func (cb *Callbacks) Report() {
cb.mu.Lock()
defer cb.mu.Unlock()
log.Printf("Server callbacks fetches=%d requests=%d responses=%d\n", cb.Fetches, cb.Requests, cb.Responses)
}

func (cb *Callbacks) OnStreamOpen(_ context.Context, id int64, typ string) error {
log.Printf("Stream %d open for %s\n", id, typ)
return nil
}

func (cb *Callbacks) OnStreamClosed(id int64, node *pb_core.Node) {
log.Printf("Stream %d of node %s closed\n", id, node.Id)
}

func (cb *Callbacks) OnDeltaStreamOpen(_ context.Context, id int64, typ string) error {
log.Printf("Delta stream %d open for %s\n", id, typ)
return nil
}

func (cb *Callbacks) OnDeltaStreamClosed(id int64, node *pb_core.Node) {
log.Printf("Delta stream %d of node %s closed\n", id, node.Id)
}

// Abruptly stops the server when the client requests a resource that the test
// marked as one that should trigger this behavior
func (cb *Callbacks) OnStreamRequest(id int64, req *pb_discovery.DiscoveryRequest) error {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.Requests++
if cb.Signal != nil {
close(cb.Signal)
cb.Signal = nil
}
log.Printf("Received request for %s on stream %d: %v:%v\n", req.GetTypeUrl(), id, req.VersionInfo, req.ResourceNames)
filtered := cb.Filters[req.GetTypeUrl()]
if filtered != nil {
for _, name := range req.ResourceNames {
if filtered[name] {
log.Printf("Self destructing: %s/%s\n", req.GetTypeUrl(), name)
os.Exit(0)
}
}
}
return nil
}

func (cb *Callbacks) OnStreamResponse(ctx context.Context, id int64, req *pb_discovery.DiscoveryRequest, res *pb_discovery.DiscoveryResponse) {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.Responses++
log.Printf("Responding to request for %s on stream %d\n", req.GetTypeUrl(), id)
}

func (cb *Callbacks) OnStreamDeltaResponse(id int64, req *pb_discovery.DeltaDiscoveryRequest, res *pb_discovery.DeltaDiscoveryResponse) {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.DeltaResponses++
}

func (cb *Callbacks) OnStreamDeltaRequest(int64, *pb_discovery.DeltaDiscoveryRequest) error {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.DeltaRequests++
if cb.Signal != nil {
close(cb.Signal)
cb.Signal = nil
}
return nil
}

func (cb *Callbacks) OnFetchRequest(context.Context, *pb_discovery.DiscoveryRequest) error {
cb.mu.Lock()
defer cb.mu.Unlock()
cb.Fetches++
if cb.Signal != nil {
close(cb.Signal)
cb.Signal = nil
}
return nil
}

func (cb *Callbacks) OnFetchResponse(*pb_discovery.DiscoveryRequest, *pb_discovery.DiscoveryResponse) {}

// Adds a resource type/name to the list of resources that stop the server
// if a client requests them
func (cb *Callbacks) AddFilter(resource_type string, resource_name string) {
cb.mu.Lock()
defer cb.mu.Unlock()
if val, ok := cb.Filters[resource_type]; ok {
val[resource_name] = true
} else {
cb.Filters[resource_type] = map[string]bool{resource_name: true}
}
}

type Filter struct {
ResourceType string
ResourceName string
}

// Returns a list of resource name/types that stop the server when requested
func (cb *Callbacks) GetFilters() []Filter {
cb.mu.Lock()
defer cb.mu.Unlock()
result := []Filter{}
for t, names := range cb.Filters {
for name, b := range names {
if b {
result = append(result, Filter{ResourceType: t, ResourceName: name})
}
}
}
return result
}
139 changes: 139 additions & 0 deletions docker/go-control-plane/controlplane/resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
// Copyright 2020 Envoyproxy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controlplane

import (
"strconv"
"time"

"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/durationpb"

"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"

pb_cluster "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
pb_core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
pb_endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
pb_listener "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
pb_route "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
pb_router "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
pb_hcm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
)

const (
ClusterName = "example_proxy_cluster"
ListenerName = "listener_0"
)

var snapshot_version int = 1;

// MakeCluster builds a CDS resource with a given clusterName that points
// the users to upstreamHost:upstreamPort
func MakeCluster(clusterName, upstreamHost string, upstreamPort uint32) *pb_cluster.Cluster {
return &pb_cluster.Cluster{
Name: clusterName,
ConnectTimeout: durationpb.New(5 * time.Second),
ClusterDiscoveryType: &pb_cluster.Cluster_Type{Type: pb_cluster.Cluster_LOGICAL_DNS},
LbPolicy: pb_cluster.Cluster_ROUND_ROBIN,
LoadAssignment: makeEndpoint(clusterName, upstreamHost, upstreamPort),
DnsLookupFamily: pb_cluster.Cluster_V4_ONLY,
}
}

func makeEndpoint(clusterName, upstreamHost string, upstreamPort uint32) *pb_endpoint.ClusterLoadAssignment {
return &pb_endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*pb_endpoint.LocalityLbEndpoints{{
LbEndpoints: []*pb_endpoint.LbEndpoint{{
HostIdentifier: &pb_endpoint.LbEndpoint_Endpoint{
Endpoint: &pb_endpoint.Endpoint{
Address: &pb_core.Address{
Address: &pb_core.Address_SocketAddress{
SocketAddress: &pb_core.SocketAddress{
Protocol: pb_core.SocketAddress_TCP,
Address: upstreamHost,
PortSpecifier: &pb_core.SocketAddress_PortValue{
PortValue: upstreamPort,
},
},
},
},
},
},
}},
}},
}
}

// MakeHTTPListener builds a LDS resource that routes traffic to a given
// cluster.
func MakeHTTPListener(listenerName, cluster string) *pb_listener.Listener {
any_route, _ := anypb.New(&pb_router.Router{})
httpcm := &pb_hcm.HttpConnectionManager{
RouteSpecifier: &pb_hcm.HttpConnectionManager_RouteConfig{
RouteConfig: &pb_route.RouteConfiguration{
VirtualHosts: []*pb_route.VirtualHost{
{
Domains: []string{"*"},
Routes: []*pb_route.Route{
{
Match: &pb_route.RouteMatch{
PathSpecifier: &pb_route.RouteMatch_Prefix{},
},
Action: &pb_route.Route_Route{
Route: &pb_route.RouteAction{
ClusterSpecifier: &pb_route.RouteAction_Cluster{
Cluster: cluster,
},
},
},
},
},
},
},
},
},
HttpFilters: []*pb_hcm.HttpFilter{
{
Name: "router",
ConfigType: &pb_hcm.HttpFilter_TypedConfig{
TypedConfig: any_route,
},
},
},
}
any_hcm, err := anypb.New(httpcm)
if err != nil {
panic(err)
}
return &pb_listener.Listener{
Name: listenerName,
ApiListener: &pb_listener.ApiListener{
ApiListener: any_hcm,
},
}
}

// GenerateSnapshot prepares a new xDS config snapshot for serving to clients.
func GenerateSnapshot(upstreamHost string, upstreamPort uint32) *cache.Snapshot {
snap, _ := cache.NewSnapshot(strconv.Itoa(snapshot_version), map[resource.Type][]types.Resource{
resource.ClusterType: {MakeCluster(ClusterName, upstreamHost, upstreamPort)},
resource.ListenerType: {MakeHTTPListener(ListenerName, ClusterName)},
})
snapshot_version++
return snap
}
Loading
Loading