Skip to content
This repository has been archived by the owner on Aug 30, 2024. It is now read-only.

Commit

Permalink
Merge pull request kubernetes#59954 from msau42/index-sc
Browse files Browse the repository at this point in the history
Automatic merge from submit-queue (batch tested with PRs 57700, 59954). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Index PVs by StorageClass in assume cache

**What this PR does / why we need it**:
Performance optimization for delayed binding in the scheduler to only search for PVs with a matching StorageClass name.  This means that if you prebind the PV to a PVC, the PV must have a matching StorageClass name.  This behavior is different from when you prebind with immediate binding, which doesn't care about StorageClass.

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Fixes kubernetes#56102

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
  • Loading branch information
Kubernetes Submit Queue authored Feb 16, 2018
2 parents f9c3a0a + 5271edd commit d594a13
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 35 deletions.
55 changes: 44 additions & 11 deletions pkg/controller/volume/persistentvolume/scheduler_assume_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ type AssumeCache interface {
Get(objName string) (interface{}, error)

// List all the objects in the cache
List() []interface{}
List(indexObj interface{}) []interface{}
}

type errWrongType struct {
Expand Down Expand Up @@ -89,7 +89,11 @@ type assumeCache struct {
description string

// Stores objInfo pointers
store cache.Store
store cache.Indexer

// Index function for object
indexFunc cache.IndexFunc
indexName string
}

type objInfo struct {
Expand All @@ -111,9 +115,21 @@ func objInfoKeyFunc(obj interface{}) (string, error) {
return objInfo.name, nil
}

func NewAssumeCache(informer cache.SharedIndexInformer, description string) *assumeCache {
// TODO: index by storageclass
c := &assumeCache{store: cache.NewStore(objInfoKeyFunc), description: description}
func (c *assumeCache) objInfoIndexFunc(obj interface{}) ([]string, error) {
objInfo, ok := obj.(*objInfo)
if !ok {
return []string{""}, &errWrongType{"objInfo", obj}
}
return c.indexFunc(objInfo.latestObj)
}

func NewAssumeCache(informer cache.SharedIndexInformer, description, indexName string, indexFunc cache.IndexFunc) *assumeCache {
c := &assumeCache{
description: description,
indexFunc: indexFunc,
indexName: indexName,
}
c.store = cache.NewIndexer(objInfoKeyFunc, cache.Indexers{indexName: c.objInfoIndexFunc})

// Unit tests don't use informers
if informer != nil {
Expand Down Expand Up @@ -211,12 +227,18 @@ func (c *assumeCache) Get(objName string) (interface{}, error) {
return objInfo.latestObj, nil
}

func (c *assumeCache) List() []interface{} {
func (c *assumeCache) List(indexObj interface{}) []interface{} {
c.mutex.Lock()
defer c.mutex.Unlock()

allObjs := []interface{}{}
for _, obj := range c.store.List() {
objs, err := c.store.Index(c.indexName, &objInfo{latestObj: indexObj})
if err != nil {
glog.Errorf("list index error: %v", err)
return nil
}

for _, obj := range objs {
objInfo, ok := obj.(*objInfo)
if !ok {
glog.Errorf("list error: %v", &errWrongType{"objInfo", obj})
Expand Down Expand Up @@ -280,15 +302,22 @@ type PVAssumeCache interface {
AssumeCache

GetPV(pvName string) (*v1.PersistentVolume, error)
ListPVs() []*v1.PersistentVolume
ListPVs(storageClassName string) []*v1.PersistentVolume
}

type pvAssumeCache struct {
*assumeCache
}

func pvStorageClassIndexFunc(obj interface{}) ([]string, error) {
if pv, ok := obj.(*v1.PersistentVolume); ok {
return []string{pv.Spec.StorageClassName}, nil
}
return []string{""}, fmt.Errorf("object is not a v1.PersistentVolume: %v", obj)
}

func NewPVAssumeCache(informer cache.SharedIndexInformer) PVAssumeCache {
return &pvAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolume")}
return &pvAssumeCache{assumeCache: NewAssumeCache(informer, "v1.PersistentVolume", "storageclass", pvStorageClassIndexFunc)}
}

func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
Expand All @@ -304,8 +333,12 @@ func (c *pvAssumeCache) GetPV(pvName string) (*v1.PersistentVolume, error) {
return pv, nil
}

func (c *pvAssumeCache) ListPVs() []*v1.PersistentVolume {
objs := c.List()
func (c *pvAssumeCache) ListPVs(storageClassName string) []*v1.PersistentVolume {
objs := c.List(&v1.PersistentVolume{
Spec: v1.PersistentVolumeSpec{
StorageClassName: storageClassName,
},
})
pvs := []*v1.PersistentVolume{}
for _, obj := range objs {
pv, ok := obj.(*v1.PersistentVolume)
Expand Down
105 changes: 82 additions & 23 deletions pkg/controller/volume/persistentvolume/scheduler_assume_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,16 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func makePV(name, version string) *v1.PersistentVolume {
return &v1.PersistentVolume{ObjectMeta: metav1.ObjectMeta{Name: name, ResourceVersion: version}}
func makePV(name, version, storageClass string) *v1.PersistentVolume {
return &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: name,
ResourceVersion: version,
},
Spec: v1.PersistentVolumeSpec{
StorageClassName: storageClass,
},
}
}

func TestAssumePV(t *testing.T) {
Expand All @@ -35,33 +43,38 @@ func TestAssumePV(t *testing.T) {
shouldSucceed bool
}{
"success-same-version": {
oldPV: makePV("pv1", "5"),
newPV: makePV("pv1", "5"),
oldPV: makePV("pv1", "5", ""),
newPV: makePV("pv1", "5", ""),
shouldSucceed: true,
},
"success-storageclass-same-version": {
oldPV: makePV("pv1", "5", "class1"),
newPV: makePV("pv1", "5", "class1"),
shouldSucceed: true,
},
"success-new-higher-version": {
oldPV: makePV("pv1", "5"),
newPV: makePV("pv1", "6"),
oldPV: makePV("pv1", "5", ""),
newPV: makePV("pv1", "6", ""),
shouldSucceed: true,
},
"fail-old-not-found": {
oldPV: makePV("pv2", "5"),
newPV: makePV("pv1", "5"),
oldPV: makePV("pv2", "5", ""),
newPV: makePV("pv1", "5", ""),
shouldSucceed: false,
},
"fail-new-lower-version": {
oldPV: makePV("pv1", "5"),
newPV: makePV("pv1", "4"),
oldPV: makePV("pv1", "5", ""),
newPV: makePV("pv1", "4", ""),
shouldSucceed: false,
},
"fail-new-bad-version": {
oldPV: makePV("pv1", "5"),
newPV: makePV("pv1", "a"),
oldPV: makePV("pv1", "5", ""),
newPV: makePV("pv1", "a", ""),
shouldSucceed: false,
},
"fail-old-bad-version": {
oldPV: makePV("pv1", "a"),
newPV: makePV("pv1", "5"),
oldPV: makePV("pv1", "a", ""),
newPV: makePV("pv1", "5", ""),
shouldSucceed: false,
},
}
Expand Down Expand Up @@ -107,8 +120,8 @@ func TestRestorePV(t *testing.T) {
t.Fatalf("Failed to get internal cache")
}

oldPV := makePV("pv1", "5")
newPV := makePV("pv1", "5")
oldPV := makePV("pv1", "5", "")
newPV := makePV("pv1", "5", "")

// Restore PV that doesn't exist
cache.Restore("nothing")
Expand Down Expand Up @@ -159,33 +172,79 @@ func TestBasicPVCache(t *testing.T) {
// Add a bunch of PVs
pvs := map[string]*v1.PersistentVolume{}
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "1")
pv := makePV(fmt.Sprintf("test-pv%v", i), "1", "")
pvs[pv.Name] = pv
internal_cache.add(pv)
}

// List them
verifyListPVs(t, cache, pvs)
verifyListPVs(t, cache, pvs, "")

// Update a PV
updatedPV := makePV("test-pv3", "2")
updatedPV := makePV("test-pv3", "2", "")
pvs[updatedPV.Name] = updatedPV
internal_cache.update(nil, updatedPV)

// List them
verifyListPVs(t, cache, pvs)
verifyListPVs(t, cache, pvs, "")

// Delete a PV
deletedPV := pvs["test-pv7"]
delete(pvs, deletedPV.Name)
internal_cache.delete(deletedPV)

// List them
verifyListPVs(t, cache, pvs)
verifyListPVs(t, cache, pvs, "")
}

func TestPVCacheWithStorageClasses(t *testing.T) {
cache := NewPVAssumeCache(nil)
internal_cache, ok := cache.(*pvAssumeCache)
if !ok {
t.Fatalf("Failed to get internal cache")
}

// Add a bunch of PVs
pvs1 := map[string]*v1.PersistentVolume{}
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test-pv%v", i), "1", "class1")
pvs1[pv.Name] = pv
internal_cache.add(pv)
}

// Add a bunch of PVs
pvs2 := map[string]*v1.PersistentVolume{}
for i := 0; i < 10; i++ {
pv := makePV(fmt.Sprintf("test2-pv%v", i), "1", "class2")
pvs2[pv.Name] = pv
internal_cache.add(pv)
}

// List them
verifyListPVs(t, cache, pvs1, "class1")
verifyListPVs(t, cache, pvs2, "class2")

// Update a PV
updatedPV := makePV("test-pv3", "2", "class1")
pvs1[updatedPV.Name] = updatedPV
internal_cache.update(nil, updatedPV)

// List them
verifyListPVs(t, cache, pvs1, "class1")
verifyListPVs(t, cache, pvs2, "class2")

// Delete a PV
deletedPV := pvs1["test-pv7"]
delete(pvs1, deletedPV.Name)
internal_cache.delete(deletedPV)

// List them
verifyListPVs(t, cache, pvs1, "class1")
verifyListPVs(t, cache, pvs2, "class2")
}

func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume) {
pvList := cache.ListPVs()
func verifyListPVs(t *testing.T, cache PVAssumeCache, expectedPVs map[string]*v1.PersistentVolume, storageClassName string) {
pvList := cache.ListPVs(storageClassName)
if len(pvList) != len(expectedPVs) {
t.Errorf("ListPVs() returned %v PVs, expected %v", len(pvList), len(expectedPVs))
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/controller/volume/persistentvolume/scheduler_binder.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,10 +350,17 @@ func (b *volumeBinder) findMatchingVolumes(pod *v1.Pod, claimsToBind []*bindingI
// Sort all the claims by increasing size request to get the smallest fits
sort.Sort(byPVCSize(claimsToBind))

allPVs := b.pvCache.ListPVs()
chosenPVs := map[string]*v1.PersistentVolume{}

for _, bindingInfo := range claimsToBind {
// Get storage class name from each PVC
storageClassName := ""
storageClass := bindingInfo.pvc.Spec.StorageClassName
if storageClass != nil {
storageClassName = *storageClass
}
allPVs := b.pvCache.ListPVs(storageClassName)

// Find a matching PV
bindingInfo.pv, err = findMatchingVolume(bindingInfo.pvc, allPVs, node, chosenPVs, true)
if err != nil {
Expand Down

0 comments on commit d594a13

Please sign in to comment.