Skip to content

Commit

Permalink
feat: [WIN-NPM] parallelize HNS SetPolicy calls
Browse files Browse the repository at this point in the history
Signed-off-by: Hunter Gregory <42728408+huntergregory@users.noreply.github.com>
  • Loading branch information
huntergregory committed Aug 8, 2024
1 parent ae690d2 commit 15205ad
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 8 deletions.
4 changes: 4 additions & 0 deletions npm/cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,10 @@ func start(config npmconfig.Config, flags npmconfig.Flags) error {
npmV2DataplaneCfg.IPSetMode = ipsets.ApplyAllIPSets
}

if config.SetPolicyThreads == 0 {
config.SetPolicyThreads = npmconfig.DefaultConfig.SetPolicyThreads
}

var nodeIP string
if util.IsWindowsDP() {
nodeIP, err = util.NodeIP()
Expand Down
10 changes: 8 additions & 2 deletions npm/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ const (
defaultMaxBatchedACLsPerPod = 30
defaultMaxPendingNetPols = 100
defaultNetPolInterval = 500
defaultSetPolicyThreads = 10
defaultListeningPort = 10091
defaultGrpcPort = 10092
defaultGrpcServicePort = 9002
Expand Down Expand Up @@ -40,6 +41,8 @@ var DefaultConfig = Config{
MaxPendingNetPols: defaultMaxPendingNetPols,
NetPolInvervalInMilliseconds: defaultNetPolInterval,

SetPolicyThreads: defaultSetPolicyThreads,

Toggles: Toggles{
EnablePrometheusMetrics: true,
EnablePprof: true,
Expand All @@ -50,7 +53,8 @@ var DefaultConfig = Config{
// ApplyInBackground is currently used in Windows to apply the following in background: IPSets and NetPols for new/updated Pods
ApplyInBackground: true,
// NetPolInBackground is currently used in Linux to apply NetPol controller Add events in the background
NetPolInBackground: true,
NetPolInBackground: true,
ParallelizeSetPolicyCalls: true,
},
}

Expand Down Expand Up @@ -80,6 +84,7 @@ type Config struct {
MaxBatchedACLsPerPod int `json:"MaxBatchedACLsPerPod,omitempty"`
MaxPendingNetPols int `json:"MaxPendingNetPols,omitempty"`
NetPolInvervalInMilliseconds int `json:"NetPolInvervalInMilliseconds,omitempty"`
SetPolicyThreads int `json:"SetPolicyThreads,omitempty"`
Toggles Toggles `json:"Toggles,omitempty"`
}

Expand All @@ -93,7 +98,8 @@ type Toggles struct {
// ApplyInBackground applies for Windows only
ApplyInBackground bool
// NetPolInBackground
NetPolInBackground bool
NetPolInBackground bool
ParallelizeSetPolicyCalls bool
}

type Flags struct {
Expand Down
1 change: 1 addition & 0 deletions npm/pkg/dataplane/dataplane_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (dp *DataPlane) getNetworkInfo() error {
for ; true; <-ticker.C {
err = dp.setNetworkIDByName(dp.NetworkName)
if err == nil || !isNetworkNotFoundErr(err) {
dp.ipsetMgr.SetNetworkID(dp.networkID)
return err
}
retryNumber++
Expand Down
8 changes: 8 additions & 0 deletions npm/pkg/dataplane/ipsets/ipsetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type IPSetManager struct {
dirtyCache dirtyCacheInterface
ioShim *common.IOShim
sync.RWMutex
wp *workerPool
threadsStarted int

Check failure on line 56 in npm/pkg/dataplane/ipsets/ipsetmanager.go

View workflow job for this annotation

GitHub Actions / Lint (1.21.x, ubuntu-latest)

field `threadsStarted` is unused (unused)
networkID string
}

type IPSetManagerCfg struct {
Expand All @@ -71,9 +74,14 @@ func NewIPSetManager(iMgrCfg *IPSetManagerCfg, ioShim *common.IOShim) *IPSetMana
setMap: make(map[string]*IPSet),
dirtyCache: newDirtyCache(),
ioShim: ioShim,
wp: newWorkerPool(),
}
}

func (iMgr *IPSetManager) SetNetworkID(networkID string) {
iMgr.networkID = networkID
}

/*
Reconcile removes empty/unreferenced sets from the cache.
For ApplyAllIPSets mode, those sets are added to the toDeleteCache.
Expand Down
122 changes: 116 additions & 6 deletions npm/pkg/dataplane/ipsets/ipsetmanager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"strings"
"sync"

"github.com/Azure/azure-container-networking/npm/metrics"
"github.com/Azure/azure-container-networking/npm/util"
Expand All @@ -19,6 +20,8 @@ const (
SetPolicyTypeNestedIPSet hcn.SetPolicyType = "NESTEDIPSET"
resetIPSetsTrue = true
donotResetIPSets = false

parallelizeSetPolicyCalls = true
)

var errUnsupportedNetwork = errors.New("only 'azure' and 'Calico' networks are supported")
Expand Down Expand Up @@ -186,7 +189,7 @@ func (iMgr *IPSetManager) resetIPSets() error {
}

klog.Infof("[IPSetManager Windows] Deleting %d Set Policies", len(toDeleteSets))
err = iMgr.modifySetPolicies(network, hcn.RequestTypeRemove, toDeleteSets)
err = iMgr.modifySetPolicies(hcn.RequestTypeRemove, toDeleteSets)
if err != nil {
klog.Infof("[IPSetManager Windows] Update set policies failed with error %s", err.Error())
return err
Expand All @@ -206,16 +209,41 @@ func (iMgr *IPSetManager) applyIPSets() error {
return err
}

if parallelizeSetPolicyCalls {
// if a set is being updated, we can update it in parallel
// otherwise, hold off on operations to a set until the other operation is complete
// ignore the set and keep it in the dirtyCache

iMgr.dirtyCache.resetAddOrUpdateCache()
iMgr.dirtyCache.reset()

blockers := iMgr.getBlockers(setPolicyBuilder)

createThread := iMgr.newThread(hcn.RequestTypeAdd, setPolicyBuilder.toAddSets)
updateThread := iMgr.newThread(hcn.RequestTypeUpdate, setPolicyBuilder.toUpdateSets)
deleteThread := iMgr.newThread(hcn.RequestTypeRemove, setPolicyBuilder.toDeleteSets)

for _, b := range blockers {
b.Wait()
}

iMgr.submit(createThread)
iMgr.submit(updateThread)
iMgr.submit(deleteThread)

return nil
}

if len(setPolicyBuilder.toAddSets) > 0 {
err = iMgr.modifySetPolicies(network, hcn.RequestTypeAdd, setPolicyBuilder.toAddSets)
err = iMgr.modifySetPolicies(hcn.RequestTypeAdd, setPolicyBuilder.toAddSets)
if err != nil {
klog.Infof("[IPSetManager Windows] Add set policies failed with error %s", err.Error())
return err
}
}

if len(setPolicyBuilder.toUpdateSets) > 0 {
err = iMgr.modifySetPolicies(network, hcn.RequestTypeUpdate, setPolicyBuilder.toUpdateSets)
err = iMgr.modifySetPolicies(hcn.RequestTypeUpdate, setPolicyBuilder.toUpdateSets)
if err != nil {
klog.Infof("[IPSetManager Windows] Update set policies failed with error %s", err.Error())
return err
Expand All @@ -225,7 +253,7 @@ func (iMgr *IPSetManager) applyIPSets() error {
iMgr.dirtyCache.resetAddOrUpdateCache()

if len(setPolicyBuilder.toDeleteSets) > 0 {
err = iMgr.modifySetPolicies(network, hcn.RequestTypeRemove, setPolicyBuilder.toDeleteSets)
err = iMgr.modifySetPolicies(hcn.RequestTypeRemove, setPolicyBuilder.toDeleteSets)
if err != nil {
klog.Infof("[IPSetManager Windows] Delete set policies failed with error %s", err.Error())
return err
Expand All @@ -234,11 +262,91 @@ func (iMgr *IPSetManager) applyIPSets() error {

klog.Info("[IPSetManager Windows] Done applying IPSets.")

iMgr.clearDirtyCache()
iMgr.dirtyCache.reset()

return nil
}

func (iMgr *IPSetManager) getBlockers(setPolicyBuilder *networkPolicyBuilder) []*sync.WaitGroup {
iMgr.wp.Lock()
defer iMgr.wp.Unlock()

blockers := make([]*sync.WaitGroup, 0)

for setName := range setPolicyBuilder.toAddSets {
for _, th := range iMgr.wp.threads {
if _, ok := th.ipsets[setName]; ok {
blockers = append(blockers, th.wg)
break
}
}
}

for setName := range setPolicyBuilder.toUpdateSets {
for _, th := range iMgr.wp.threads {
if th.op == hcn.RequestTypeUpdate {
continue
}

if _, ok := th.ipsets[setName]; ok {
blockers = append(blockers, th.wg)
break
}
}
}

for setName := range setPolicyBuilder.toDeleteSets {
for _, th := range iMgr.wp.threads {
if _, ok := th.ipsets[setName]; ok {
blockers = append(blockers, th.wg)
break
}
}
}

return blockers
}

func (iMgr *IPSetManager) newThread(op hcn.RequestType, ipsets map[string]*hcn.SetPolicySetting) *thread {
iMgr.threadsStarted++
return &thread{
id: iMgr.threadsStarted,
op: op,
wg: &sync.WaitGroup{},
ipsets: ipsets,
}
}

func (iMgr *IPSetManager) submit(th *thread) {
if len(th.ipsets) == 0 {
return
}

wp := iMgr.wp
wp.Lock()
wp.threads[th.id] = th
wp.Unlock()

// block until there is a free thread
wp.semaphore <- struct{}{}
th.wg.Add(1)

go func() {
defer func() {
<-wp.semaphore
wp.Lock()
th.wg.Done()
delete(wp.threads, th.id)
wp.Unlock()
}()

err := iMgr.modifySetPolicies(th.op, th.ipsets)
if err != nil {
klog.Errorf("[IPSetManager Windows] workerpool failed to run %s operation with error %s", th.op, err.Error())
}
}()
}

// calculateNewSetPolicies will take in existing setPolicies on network in HNS and the dirty cache, will return back
// networkPolicyBuild which contains the new setPolicies to be added, updated and deleted
// Assumes that the dirty cache is locked (or equivalently, the ipsetmanager itself).
Expand Down Expand Up @@ -327,7 +435,7 @@ func (iMgr *IPSetManager) getHCnNetwork() (*hcn.HostComputeNetwork, error) {
return network, nil
}

func (iMgr *IPSetManager) modifySetPolicies(network *hcn.HostComputeNetwork, operation hcn.RequestType, setPolicies map[string]*hcn.SetPolicySetting) error {
func (iMgr *IPSetManager) modifySetPolicies(operation hcn.RequestType, setPolicies map[string]*hcn.SetPolicySetting) error {
klog.Infof("[IPSetManager Windows] %s operation on set policies is called", operation)
/*
Due to complexities in HNS, we need to do the following:
Expand Down Expand Up @@ -375,6 +483,8 @@ func (iMgr *IPSetManager) modifySetPolicies(network *hcn.HostComputeNetwork, ope
}

timer := metrics.StartNewTimer()
// going off the assumption that hnsv2wrapper.ModifyNetworkSettings(network, request) -> network.ModifyNetworkSettings(request) only uses network.Id field
network := &hcn.HostComputeNetwork{Id: iMgr.networkID}
err = iMgr.ioShim.Hns.ModifyNetworkSettings(network, requestMessage)
metrics.RecordSetPolicyLatency(timer, op, isNested)
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions npm/pkg/dataplane/ipsets/workerpool_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ipsets

type workerPool struct{}

func newWorkerPool() *workerPool {
return &workerPool{}
}
29 changes: 29 additions & 0 deletions npm/pkg/dataplane/ipsets/workerpool_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ipsets

import (
"sync"

"github.com/Microsoft/hcsshim/hcn"
)

const maxThreads = 10

type thread struct {
id int
op hcn.RequestType
wg *sync.WaitGroup
ipsets map[string]*hcn.SetPolicySetting
}

type workerPool struct {
sync.Mutex
semaphore chan struct{}
threads map[int]*thread
}

func newWorkerPool() *workerPool {
return &workerPool{
semaphore: make(chan struct{}, maxThreads),
threads: make(map[int]*thread),
}
}

0 comments on commit 15205ad

Please sign in to comment.