Skip to content

Commit

Permalink
Merge pull request #1319 from DanielXiao/fix-node-not-update
Browse files Browse the repository at this point in the history
Fix the stale Node issue in cache
  • Loading branch information
k8s-ci-robot authored Nov 20, 2024
2 parents 2e2657e + ab4d29e commit 5866fd9
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 122 deletions.
5 changes: 3 additions & 2 deletions hack/tools/go.mod
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
module tools

go 1.22
toolchain go1.22.5
go 1.22.0

toolchain go1.23.3

require (
github.com/onsi/ginkgo/v2 v2.21.0
Expand Down
33 changes: 1 addition & 32 deletions pkg/cloudprovider/vsphereparavirtual/cloud.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"fmt"
"io"

v1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"

Expand Down Expand Up @@ -134,14 +133,12 @@ func (cp *VSphereParavirtual) Initialize(clientBuilder cloudprovider.ControllerC
klog.Fatalf("Failed to get cluster namespace: %v", err)
}

routes, err := NewRoutes(clusterNS, kcfg, *cp.ownerReference, vpcModeEnabled)
routes, err := NewRoutes(clusterNS, kcfg, *cp.ownerReference, vpcModeEnabled, cp.informMgr.GetNodeLister())
if err != nil {
klog.Errorf("Failed to init Route: %v", err)
}
cp.routes = routes

cp.informMgr.AddNodeListener(cp.nodeAdded, cp.nodeDeleted, nil)

lb, err := NewLoadBalancer(clusterNS, kcfg, cp.ownerReference)
if err != nil {
klog.Errorf("Failed to init LoadBalancer: %v", err)
Expand Down Expand Up @@ -226,31 +223,3 @@ func (cp *VSphereParavirtual) ProviderName() string {
func (cp *VSphereParavirtual) HasClusterID() bool {
return true
}

// Notification handler when node is added into k8s cluster.
func (cp *VSphereParavirtual) nodeAdded(obj interface{}) {
node, ok := obj.(*v1.Node)
if node == nil || !ok {
klog.Warningf("nodeAdded: unrecognized object %+v", obj)
return
}

if cp.routes != nil {
klog.V(6).Infof("adding node: %s", node.Name)
cp.routes.AddNode(node)
}
}

// Notification handler when node is removed from k8s cluster.
func (cp *VSphereParavirtual) nodeDeleted(obj interface{}) {
node, ok := obj.(*v1.Node)
if node == nil || !ok {
klog.Warningf("nodeDeleted: unrecognized object %+v", obj)
return
}

if cp.routes != nil {
klog.V(6).Infof("deleting node: %s", node.Name)
cp.routes.DeleteNode(node)
}
}
44 changes: 5 additions & 39 deletions pkg/cloudprovider/vsphereparavirtual/route.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ import (
"context"
"fmt"
"net"
"sync"
"time"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
Expand All @@ -39,21 +39,18 @@ import (
// RoutesProvider is the interface definition for Routes functionality
type RoutesProvider interface {
cloudprovider.Routes
AddNode(*v1.Node)
DeleteNode(*v1.Node)
}

type routesProvider struct {
routeManager routemanager.RouteManager
nodeMap map[string]*v1.Node
nodeMapLock sync.RWMutex
ownerRefs []metav1.OwnerReference
nodeLister listerv1.NodeLister
}

var _ RoutesProvider = &routesProvider{}

// NewRoutes returns an implementation of RoutesProvider
func NewRoutes(clusterNS string, kcfg *rest.Config, ownerRef metav1.OwnerReference, vpcModeEnabled bool) (RoutesProvider, error) {
func NewRoutes(clusterNS string, kcfg *rest.Config, ownerRef metav1.OwnerReference, vpcModeEnabled bool, nodeLister listerv1.NodeLister) (RoutesProvider, error) {
routeManager, err := routemanager.GetRouteManager(vpcModeEnabled, kcfg, clusterNS)
if err != nil {
return nil, err
Expand All @@ -65,7 +62,7 @@ func NewRoutes(clusterNS string, kcfg *rest.Config, ownerRef metav1.OwnerReferen

return &routesProvider{
routeManager: routeManager,
nodeMap: make(map[string]*v1.Node),
nodeLister: nodeLister,
ownerRefs: ownerRefs,
}, nil
}
Expand Down Expand Up @@ -173,7 +170,7 @@ func (r *routesProvider) DeleteRoute(ctx context.Context, clusterName string, ro
// The order is to choose node internal IP first, then external IP
// Return the first IP address as node IP
func (r *routesProvider) getNodeIPAddress(nodeName string, isIPv4 bool) (string, error) {
node, err := r.getNode(nodeName)
node, err := r.nodeLister.Get(nodeName)
if err != nil {
klog.Errorf("getting node %s failed: %v", nodeName, err)
return "", err
Expand Down Expand Up @@ -208,34 +205,3 @@ func (r *routesProvider) getNodeIPAddress(nodeName string, isIPv4 bool) (string,

return "", fmt.Errorf("node %s does not have the same IP family with podCIDR", nodeName)
}

// AddNode adds v1.Node in nodeMap
func (r *routesProvider) AddNode(node *v1.Node) {
r.nodeMapLock.Lock()
r.nodeMap[node.Name] = node
klog.V(6).Infof("Added node %s into nodeMap", node.Name)
r.nodeMapLock.Unlock()
}

// DeleteNode deletes v1.Node from nodeMap and removes corresponding RouteSet CR
func (r *routesProvider) DeleteNode(node *v1.Node) {
r.nodeMapLock.Lock()
delete(r.nodeMap, node.Name)
klog.V(6).Infof("Deleted node %s from nodeMap", node.Name)
r.nodeMapLock.Unlock()

err := r.routeManager.DeleteRouteCR(node.Name)
if err != nil {
klog.Errorf("failed to delete Route CR for node %s: %v", node.Name, err)
}
}

// getNode returns v1.Node from nodeMap
func (r *routesProvider) getNode(nodeName string) (*v1.Node, error) {
r.nodeMapLock.Lock()
defer r.nodeMapLock.Unlock()
if r.nodeMap[nodeName] != nil {
return r.nodeMap[nodeName], nil
}
return nil, fmt.Errorf("node %s not found", nodeName)
}
68 changes: 19 additions & 49 deletions pkg/cloudprovider/vsphereparavirtual/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,15 @@ import (
"testing"

"github.com/stretchr/testify/assert"
k8sinformers "k8s.io/client-go/informers"
informerv1 "k8s.io/client-go/informers/core/v1"
k8sfake "k8s.io/client-go/kubernetes/fake"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider"

t1networkingapis "k8s.io/cloud-provider-vsphere/pkg/cloudprovider/vsphereparavirtual/apis/nsxnetworking/v1alpha1"
faket1networkingclients "k8s.io/cloud-provider-vsphere/pkg/cloudprovider/vsphereparavirtual/client/clientset/versioned/fake"
"k8s.io/cloud-provider-vsphere/pkg/cloudprovider/vsphereparavirtual/routemanager/helper"
Expand All @@ -41,20 +45,22 @@ const (
testNameHint = "62d347a4-1b70-435e-b92a-9a61453843ee"
)

func initRouteTest() (*routesProvider, *util.FakeRouteSetClientWrapper, *faket1networkingclients.Clientset) {
func initRouteTest() (*routesProvider, *util.FakeRouteSetClientWrapper, *faket1networkingclients.Clientset, informerv1.NodeInformer) {
// create the fake client
// test with non-vpc mode
fc := faket1networkingclients.NewSimpleClientset()
fcw := util.NewFakeRouteSetClientWrapper(fc)

informer := k8sinformers.NewSharedInformerFactory(k8sfake.NewClientset(), 0)

routeManager, _ := routeset.NewRouteManagerWithClients(fc, testClusterNameSpace)

routesProvider := &routesProvider{
routeManager: routeManager,
nodeMap: make(map[string]*v1.Node),
nodeLister: informer.Core().V1().Nodes().Lister(),
ownerRefs: []metav1.OwnerReference{},
}
return routesProvider, fcw, fc
return routesProvider, fcw, fc, informer.Core().V1().Nodes()
}

func buildFakeNode(nodeName string) *v1.Node {
Expand Down Expand Up @@ -136,22 +142,19 @@ func createFakeRouteSetCR(fc *faket1networkingclients.Clientset, clusterName str
}

func TestListRoutes(t *testing.T) {
r, _, fc := initRouteTest()
r, _, fc, _ := initRouteTest()

// create 3 fake Routes
fakeNode1 := buildFakeNode("fakeNode1")
r.AddNode(fakeNode1)
fakeRouteInfo1 := buildFakeRouteInfo(testClustername, testNameHint, "100.96.0.0/24", "fakeNode1", testNodeIP)
routeSet1, err := r.routeManager.CreateRouteCR(context.TODO(), fakeRouteInfo1)
assert.NoError(t, err)
assert.NotEqual(t, routeSet1, nil)
fakeNode2 := buildFakeNode("fakeNode2")
r.AddNode(fakeNode2)
routeSet2, err := createFakeRouteSetCR(fc, testClustername, testNameHint, "fakeNode2", "100.96.1.0/24", testNodeIP)
assert.NoError(t, err)
assert.NotEqual(t, routeSet2, nil)
fakeNode3 := buildFakeNode("fakeNode3")
r.AddNode(fakeNode3)
fakeRouteInfo3 := buildFakeRouteInfo("another-cluster-name", testNameHint, "100.96.2.0/24", "fakeNode3", testNodeIP)
routeSet3, err := r.routeManager.CreateRouteCR(context.TODO(), fakeRouteInfo3)
assert.NoError(t, err)
Expand All @@ -171,7 +174,7 @@ func TestListRoutes(t *testing.T) {
}

func TestListRoutesFailed(t *testing.T) {
r, fcw, _ := initRouteTest()
r, fcw, _, _ := initRouteTest()
fcw.ListFunc = func(ctx context.Context, opts metav1.ListOptions) (result *t1networkingapis.RouteSetList, err error) {
return nil, errors.New(helper.ErrListRouteCR.Error())
}
Expand All @@ -184,9 +187,10 @@ func TestListRoutesFailed(t *testing.T) {
}

func TestCreateRouteFailed(t *testing.T) {
r, _, _ := initRouteTest()
r, _, _, i := initRouteTest()
node := buildFakeNode(testNodeName)
r.nodeMap[testNodeName] = node
_ = i.Informer().GetIndexer().Add(node)

route := cloudprovider.Route{
Name: helper.GetRouteName(testNodeName, testCIDR, testClustername),
TargetNode: types.NodeName(testNodeName),
Expand All @@ -204,9 +208,9 @@ func TestCreateRouteFailed(t *testing.T) {
}

func TestCreateRouteFailedWithAlreadyExisting(t *testing.T) {
r, _, fc := initRouteTest()
r, _, fc, i := initRouteTest()
node := buildFakeNode(testNodeName)
r.nodeMap[testNodeName] = node
_ = i.Informer().GetIndexer().Add(node)
routeSetCR, err := createFakeRouteSetCR(fc, testClustername, testNameHint, testNodeName, testCIDR, testNodeIP)
assert.NoError(t, err)
assert.NotEqual(t, routeSetCR, nil)
Expand All @@ -228,9 +232,7 @@ func TestCreateRouteFailedWithAlreadyExisting(t *testing.T) {
}

func TestDeleteRoute(t *testing.T) {
r, _, _ := initRouteTest()
node := buildFakeNode(testNodeName)
r.nodeMap[testNodeName] = node
r, _, _, _ := initRouteTest()
route := cloudprovider.Route{
Name: helper.GetRouteName(testNodeName, testCIDR, testClustername),
TargetNode: types.NodeName(testNodeName),
Expand All @@ -247,9 +249,7 @@ func TestDeleteRoute(t *testing.T) {
}

func TestDeleteRouteFailed(t *testing.T) {
r, fcw, _ := initRouteTest()
node := buildFakeNode(testNodeName)
r.nodeMap[testNodeName] = node
r, fcw, _, _ := initRouteTest()
route := cloudprovider.Route{
Name: helper.GetRouteName(testNodeName, testCIDR, testClustername),
TargetNode: types.NodeName(testNodeName),
Expand All @@ -270,38 +270,9 @@ func TestDeleteRouteFailed(t *testing.T) {
}
}

func TestAddNode(t *testing.T) {
r := &routesProvider{
nodeMap: make(map[string]*v1.Node),
}
node := buildFakeNode(testNodeName)
r.AddNode(node)
assert.Equal(t, node, r.nodeMap[testNodeName])
}

func TestDeleteNode(t *testing.T) {
r, _, _ := initRouteTest()
node := buildFakeNode(testNodeName)
r.nodeMap[testNodeName] = node
r.DeleteNode(node)
assert.Equal(t, (*v1.Node)(nil), r.nodeMap[testNodeName])
}

func TestGetNode(t *testing.T) {
r := &routesProvider{
nodeMap: make(map[string]*v1.Node),
}
node := buildFakeNode(testNodeName)
r.nodeMap[testNodeName] = node
nodeInMap, err := r.getNode(testNodeName)
assert.Equal(t, node, nodeInMap)
assert.NoError(t, err)
}

func TestCheckStaticRouteRealizedState(t *testing.T) {
r, _, fc := initRouteTest()
r, _, fc, _ := initRouteTest()
node := buildFakeNode(testNodeName)
r.nodeMap[testNodeName] = node
fakeRouteInfo := buildFakeRouteInfo(testClustername, testNameHint, testCIDR, testNodeName, testNodeIP)
routeSet1, err := r.routeManager.CreateRouteCR(context.TODO(), fakeRouteInfo)
assert.NoError(t, err)
Expand All @@ -312,7 +283,6 @@ func TestCheckStaticRouteRealizedState(t *testing.T) {
assert.Equal(t, "timed out waiting for static route fakeNode1", err.Error())

fakeNode2 := buildFakeNode("fakeNode2")
r.AddNode(fakeNode2)
routeSet2, err := createFakeRouteSetCR(fc, testClustername, testNameHint, "fakeNode2", "100.96.1.0/24", testNodeIP)
assert.NoError(t, err)
assert.NotEqual(t, routeSet2, nil)
Expand Down

0 comments on commit 5866fd9

Please sign in to comment.