Skip to content

Commit

Permalink
fix: some adds should turn into updates. add logs
Browse files Browse the repository at this point in the history
Signed-off-by: Hunter Gregory <42728408+huntergregory@users.noreply.github.com>
  • Loading branch information
huntergregory committed Aug 8, 2024
1 parent 0b08f01 commit abba400
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 16 deletions.
15 changes: 9 additions & 6 deletions npm/pkg/dataplane/ipsets/ipsetmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type IPSetManager struct {
dirtyCache dirtyCacheInterface
ioShim *common.IOShim
sync.RWMutex

// for Windows only
wp *workerPool
threadsStarted int
networkID string
Expand Down Expand Up @@ -85,12 +87,13 @@ func NewIPSetManager(iMgrCfg *IPSetManagerCfg, ioShim *common.IOShim) *IPSetMana
}

return &IPSetManager{
iMgrCfg: iMgrCfg,
emptySet: nil, // will be set if needed in calls to AddToLists
setMap: make(map[string]*IPSet),
dirtyCache: newDirtyCache(),
ioShim: ioShim,
wp: newWorkerPool(iMgrCfg.SetPolicyThreads),
iMgrCfg: iMgrCfg,
emptySet: nil, // will be set if needed in calls to AddToLists
setMap: make(map[string]*IPSet),
dirtyCache: newDirtyCache(),
ioShim: ioShim,
wp: newWorkerPool(iMgrCfg.SetPolicyThreads),
threadsStarted: 0,
}
}

Expand Down
74 changes: 64 additions & 10 deletions npm/pkg/dataplane/ipsets/ipsetmanager_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,9 +213,14 @@ func (iMgr *IPSetManager) applyIPSets() error {
// ignore the set and keep it in the dirtyCache

iMgr.dirtyCache.resetAddOrUpdateCache()
iMgr.dirtyCache.reset()
iMgr.clearDirtyCache()

blockers := iMgr.getBlockers(setPolicyBuilder)
blockers, addsThatShouldActuallyBeUpdates := iMgr.getBlockers(setPolicyBuilder)

for _, set := range addsThatShouldActuallyBeUpdates {
setPolicyBuilder.toUpdateSets[set.Name] = set
delete(setPolicyBuilder.toAddSets, set.Name)
}

createThread := iMgr.newThread(hcn.RequestTypeAdd, setPolicyBuilder.toAddSets)
updateThread := iMgr.newThread(hcn.RequestTypeUpdate, setPolicyBuilder.toUpdateSets)
Expand Down Expand Up @@ -260,49 +265,95 @@ func (iMgr *IPSetManager) applyIPSets() error {

klog.Info("[IPSetManager Windows] Done applying IPSets.")

iMgr.dirtyCache.reset()
iMgr.clearDirtyCache()

return nil
}

func (iMgr *IPSetManager) getBlockers(setPolicyBuilder *networkPolicyBuilder) []*sync.WaitGroup {
func (iMgr *IPSetManager) getBlockers(setPolicyBuilder *networkPolicyBuilder) ([]*sync.WaitGroup, []*hcn.SetPolicySetting) {
iMgr.wp.Lock()
defer iMgr.wp.Unlock()

blockers := make([]*sync.WaitGroup, 0)
addsThatShouldActuallyBeUpdates := make([]*hcn.SetPolicySetting, 0)

for setName := range setPolicyBuilder.toAddSets {
for setName, set := range setPolicyBuilder.toAddSets {
for _, th := range iMgr.wp.threads {
switch th.op {
case hcn.RequestTypeAdd:
addsThatShouldActuallyBeUpdates = append(addsThatShouldActuallyBeUpdates, set)
klog.Infof("[IPSetManager Windows] moving set from add to update: %s", setName)
case hcn.RequestTypeUpdate:
klog.Warning("[IPSetManager Windows] warning: found a thread with an update operation for a set slated to be added")
}

if _, ok := th.ipsets[setName]; ok {
blockers = append(blockers, th.wg)
break
}

if th.op == hcn.RequestTypeUpdate {
continue
}

// if the set being added is a nested set, check if any of the sets it references are being created/deleted
if set.Type == SetPolicyTypeNestedIPSet {
for _, memberSet := range strings.Split(set.Values, ",") {
if _, ok := th.ipsets[memberSet]; ok {
if th.op == hcn.RequestTypeRemove {
klog.Warning("[IPSetManager Windows] warning: found a thread with a remove operation for a child of a nested ipset slated to be added")
}

blockers = append(blockers, th.wg)
}
}
}
}
}

for setName := range setPolicyBuilder.toUpdateSets {
for setName, set := range setPolicyBuilder.toUpdateSets {
for _, th := range iMgr.wp.threads {
if th.op == hcn.RequestTypeUpdate {
continue
}

if th.op == hcn.RequestTypeRemove {
klog.Warning("[IPSetManager Windows] warning: found a thread with a remove operation for a set slated to be updated")
}

if _, ok := th.ipsets[setName]; ok {
blockers = append(blockers, th.wg)
break
}

// if the set being added is a nested set, check if any of the sets it references are being created/deleted
if set.Type == SetPolicyTypeNestedIPSet {
for _, memberSet := range strings.Split(set.Values, ",") {
if _, ok := th.ipsets[memberSet]; ok {
if th.op == hcn.RequestTypeRemove {
klog.Warning("[IPSetManager Windows] warning: found a thread with a remove operation for a child of a nested ipset slated to be updated")
}

blockers = append(blockers, th.wg)
}
}
}
}
}

for setName := range setPolicyBuilder.toDeleteSets {
for _, th := range iMgr.wp.threads {
if th.op == hcn.RequestTypeRemove {
klog.Warning("[IPSetManager Windows] warning: found a thread with a delete operation for a set slated to be deleted")
}

if _, ok := th.ipsets[setName]; ok {
blockers = append(blockers, th.wg)
break
}

// TODO for a first-level set, would also need to block on deleting/flushing nested ipsets which reference it
}
}

return blockers
return blockers, addsThatShouldActuallyBeUpdates
}

func (iMgr *IPSetManager) newThread(op hcn.RequestType, ipsets map[string]*hcn.SetPolicySetting) *thread {
Expand Down Expand Up @@ -336,8 +387,11 @@ func (iMgr *IPSetManager) submit(th *thread) {
th.wg.Done()
delete(wp.threads, th.id)
wp.Unlock()

klog.Infof("[IPSetManager Windows] workerpool finished thread. %s operation. thread: %d", th.op, th.id)
}()

klog.Infof("[IPSetManager Windows] workerpool starting thread. %s operation. thread: %d", th.op, th.id)
err := iMgr.modifySetPolicies(th.op, th.ipsets)
if err != nil {
klog.Errorf("[IPSetManager Windows] workerpool failed to run %s operation with error %s", th.op, err.Error())
Expand Down

0 comments on commit abba400

Please sign in to comment.