From 5271edd9e2c259c78926581c167446209c1300ec Mon Sep 17 00:00:00 2001 From: Michelle Au Date: Thu, 15 Feb 2018 17:12:32 -0800 Subject: [PATCH] Index PVs by StorageClass in assume cache --- .../scheduler_assume_cache.go | 55 +++++++-- .../scheduler_assume_cache_test.go | 105 ++++++++++++++---- .../persistentvolume/scheduler_binder.go | 9 +- 3 files changed, 134 insertions(+), 35 deletions(-) diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go index 28884004d7cc8..c5217be0e7e40 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache.go @@ -43,7 +43,7 @@ type AssumeCache interface { Get(objName string) (interface{}, error) // List all the objects in the cache - List() []interface{} + List(indexObj interface{}) []interface{} } type errWrongType struct { @@ -89,7 +89,11 @@ type assumeCache struct { description string // Stores objInfo pointers - store cache.Store + store cache.Indexer + + // Index function for object + indexFunc cache.IndexFunc + indexName string } type objInfo struct { @@ -111,9 +115,21 @@ func objInfoKeyFunc(obj interface{}) (string, error) { return objInfo.name, nil } -func NewAssumeCache(informer cache.SharedIndexInformer, description string) *assumeCache { - // TODO: index by storageclass - c := &assumeCache{store: cache.NewStore(objInfoKeyFunc), description: description} +func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) { + objInfo, ok := obj.(*objInfo) + if !ok { + return []string{""}, &errWrongType{"objInfo", obj} + } + return c.indexFunc(objInfo.latestObj) +} + +func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) *assumeCache { + c := &assumeCache{ + description: description, + indexFunc: indexFunc, + indexName: indexName, + } + c.store = cache.NewIndexer(objInfoKeyFunc, cache.Indexers{indexName: c.objInfoIndexFunc}) // Unit tests don't use informers if informer != nil { @@ -211,12 +227,18 @@ func (c *assumeCache) Get(objName string) (interface{}, error) { return objInfo.latestObj, nil } -func (c *assumeCache) List() []interface{} { +func (c *assumeCache) List(indexObj interface{}) []interface{} { c.mutex.Lock() defer c.mutex.Unlock() allObjs := []interface{}{} - for _, obj := range c.store.List() { + objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj}) + if err != nil { + glog.Errorf("list index error: %v", err) + return nil + } + + for _, obj := range objs { objInfo, ok := obj.(*objInfo) if !ok { glog.Errorf("list error: %v", &errWrongType{"objInfo", obj}) @@ -280,15 +302,22 @@ type PVAssumeCache interface { AssumeCache GetPV(pvName string) (*v1.PersistentVolume, error) - ListPVs() []*v1.PersistentVolume + ListPVs(storageClassName string) []*v1.PersistentVolume } type pvAssumeCache struct { *assumeCache } +func pvStorageClassIndexFunc(obj interface{}) ([]string, error) { + if pv, ok := obj.(*v1.PersistentVolume); ok { + return []string{pv.Spec.StorageClassName}, nil + } + return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj) +} + func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache { - return &pvAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolume")} + return &pvAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)} } func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { @@ -304,8 +333,12 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) { return pv, nil } -func (c *pvAssumeCache) ListPVs() []*v1.PersistentVolume { - objs := c.List() +func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume { + objs := c.List(&v1.PersistentVolume{ + Spec: v1.PersistentVolumeSpec{ + StorageClassName: storageClassName, + }, + }) pvs := []*v1.PersistentVolume{} for _, obj := range objs { pv, ok := obj.(*v1.PersistentVolume) diff --git a/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go b/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go index 7332c4d474afd..2664e4c41e00d 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go +++ b/pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go @@ -24,8 +24,16 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func makePV(name, version string) *v1.PersistentVolume { - return &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: version}} +func makePV(name, version, storageClass string) *v1.PersistentVolume { + return &v1.PersistentVolume{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + ResourceVersion: version, + }, + Spec: v1.PersistentVolumeSpec{ + StorageClassName: storageClass, + }, + } } func TestAssumePV(t *testing.T) { @@ -35,33 +43,38 @@ func TestAssumePV(t *testing.T) { shouldSucceed bool }{ "success-same-version": { - oldPV: makePV("pv1", "5"), - newPV: makePV("pv1", "5"), + oldPV: makePV("pv1", "5", ""), + newPV: makePV("pv1", "5", ""), + shouldSucceed: true, + }, + "success-storageclass-same-version": { + oldPV: makePV("pv1", "5", "class1"), + newPV: makePV("pv1", "5", "class1"), shouldSucceed: true, }, "success-new-higher-version": { - oldPV: makePV("pv1", "5"), - newPV: makePV("pv1", "6"), + oldPV: makePV("pv1", "5", ""), + newPV: makePV("pv1", "6", ""), shouldSucceed: true, }, "fail-old-not-found": { - oldPV: makePV("pv2", "5"), - newPV: makePV("pv1", "5"), + oldPV: makePV("pv2", "5", ""), + newPV: makePV("pv1", "5", ""), shouldSucceed: false, }, "fail-new-lower-version": { - oldPV: makePV("pv1", "5"), - newPV: makePV("pv1", "4"), + oldPV: makePV("pv1", "5", ""), + newPV: makePV("pv1", "4", ""), shouldSucceed: false, }, "fail-new-bad-version": { - oldPV: makePV("pv1", "5"), - newPV: makePV("pv1", "a"), + oldPV: makePV("pv1", "5", ""), + newPV: makePV("pv1", "a", ""), shouldSucceed: false, }, "fail-old-bad-version": { - oldPV: makePV("pv1", "a"), - newPV: makePV("pv1", "5"), + oldPV: makePV("pv1", "a", ""), + newPV: makePV("pv1", "5", ""), shouldSucceed: false, }, } @@ -107,8 +120,8 @@ func TestRestorePV(t *testing.T) { t.Fatalf("Failed to get internal cache") } - oldPV := makePV("pv1", "5") - newPV := makePV("pv1", "5") + oldPV := makePV("pv1", "5", "") + newPV := makePV("pv1", "5", "") // Restore PV that doesn't exist cache.Restore("nothing") @@ -159,21 +172,21 @@ func TestBasicPVCache(t *testing.T) { // Add a bunch of PVs pvs := map[string]*v1.PersistentVolume{} for i := 0; i < 10; i++ { - pv := makePV(fmt.Sprintf("test-pv%v", i), "1") + pv := makePV(fmt.Sprintf("test-pv%v", i), "1", "") pvs[pv.Name] = pv internal_cache.add(pv) } // List them - verifyListPVs(t, cache, pvs) + verifyListPVs(t, cache, pvs, "") // Update a PV - updatedPV := makePV("test-pv3", "2") + updatedPV := makePV("test-pv3", "2", "") pvs[updatedPV.Name] = updatedPV internal_cache.update(nil, updatedPV) // List them - verifyListPVs(t, cache, pvs) + verifyListPVs(t, cache, pvs, "") // Delete a PV deletedPV := pvs["test-pv7"] @@ -181,11 +194,57 @@ func TestBasicPVCache(t *testing.T) { internal_cache.delete(deletedPV) // List them - verifyListPVs(t, cache, pvs) + verifyListPVs(t, cache, pvs, "") +} + +func TestPVCacheWithStorageClasses(t *testing.T) { + cache := NewPVAssumeCache(nil) + internal_cache, ok := cache.(*pvAssumeCache) + if !ok { + t.Fatalf("Failed to get internal cache") + } + + // Add a bunch of PVs + pvs1 := map[string]*v1.PersistentVolume{} + for i := 0; i < 10; i++ { + pv := makePV(fmt.Sprintf("test-pv%v", i), "1", "class1") + pvs1[pv.Name] = pv + internal_cache.add(pv) + } + + // Add a bunch of PVs + pvs2 := map[string]*v1.PersistentVolume{} + for i := 0; i < 10; i++ { + pv := makePV(fmt.Sprintf("test2-pv%v", i), "1", "class2") + pvs2[pv.Name] = pv + internal_cache.add(pv) + } + + // List them + verifyListPVs(t, cache, pvs1, "class1") + verifyListPVs(t, cache, pvs2, "class2") + + // Update a PV + updatedPV := makePV("test-pv3", "2", "class1") + pvs1[updatedPV.Name] = updatedPV + internal_cache.update(nil, updatedPV) + + // List them + verifyListPVs(t, cache, pvs1, "class1") + verifyListPVs(t, cache, pvs2, "class2") + + // Delete a PV + deletedPV := pvs1["test-pv7"] + delete(pvs1, deletedPV.Name) + internal_cache.delete(deletedPV) + + // List them + verifyListPVs(t, cache, pvs1, "class1") + verifyListPVs(t, cache, pvs2, "class2") } -func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume) { - pvList := cache.ListPVs() +func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) { + pvList := cache.ListPVs(storageClassName) if len(pvList) != len(expectedPVs) { t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs)) } diff --git a/pkg/controller/volume/persistentvolume/scheduler_binder.go b/pkg/controller/volume/persistentvolume/scheduler_binder.go index 7edd1ef459d28..f30f4cd6db5c7 100644 --- a/pkg/controller/volume/persistentvolume/scheduler_binder.go +++ b/pkg/controller/volume/persistentvolume/scheduler_binder.go @@ -350,10 +350,17 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI // Sort all the claims by increasing size request to get the smallest fits sort.Sort(byPVCSize(claimsToBind)) - allPVs := b.pvCache.ListPVs() chosenPVs := map[string]*v1.PersistentVolume{} for _, bindingInfo := range claimsToBind { + // Get storage class name from each PVC + storageClassName := "" + storageClass := bindingInfo.pvc.Spec.StorageClassName + if storageClass != nil { + storageClassName = *storageClass + } + allPVs := b.pvCache.ListPVs(storageClassName) + // Find a matching PV bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true) if err != nil {