Skip to content

Commit

Permalink
NETOBSERV-2077: switch to syncMap to handle concurrency issues (#537)
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Mahmoud <mmahmoud@redhat.com>
(cherry picked from commit 6feda50)
  • Loading branch information
msherif1234 authored Jan 30, 2025
1 parent c2d377a commit 85d5340
Showing 1 changed file with 46 additions and 28 deletions.
74 changes: 46 additions & 28 deletions pkg/ifaces/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ const (
netnsVolume = "/var/run/netns"
)

var log = logrus.WithField("component", "ifaces.Watcher")

// Watcher uses system's netlink to get real-time information events about network interfaces'
// addition or removal.
type Watcher struct {
Expand All @@ -31,7 +33,7 @@ type Watcher struct {
linkSubscriberAt func(ns netns.NsHandle, ch chan<- netlink.LinkUpdate, done <-chan struct{}) error
mutex *sync.Mutex
netnsWatcher *fsnotify.Watcher
nsDone map[string]chan struct{}
nsDone sync.Map
}

func NewWatcher(bufLen int) *Watcher {
Expand All @@ -42,19 +44,19 @@ func NewWatcher(bufLen int) *Watcher {
linkSubscriberAt: netlink.LinkSubscribeAt,
mutex: &sync.Mutex{},
netnsWatcher: &fsnotify.Watcher{},
nsDone: make(map[string]chan struct{}),
nsDone: sync.Map{},
}
}

func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) {
out := make(chan Event, w.bufLen)
netns, err := getNetNS()
if err != nil {
w.nsDone[""] = make(chan struct{})
w.nsDone.Store("", make(chan struct{}))
go w.sendUpdates(ctx, "", out)
} else {
for _, n := range netns {
w.nsDone[n] = make(chan struct{})
w.nsDone.Store(n, make(chan struct{}))
go w.sendUpdates(ctx, n, out)
}
}
Expand All @@ -66,11 +68,13 @@ func (w *Watcher) Subscribe(ctx context.Context) (<-chan Event, error) {
func (w *Watcher) sendUpdates(ctx context.Context, ns string, out chan Event) {
var netnsHandle netns.NsHandle
var err error
log := logrus.WithField("component", "ifaces.Watcher")
doneChan := w.nsDone[ns]
ch, ok := w.nsDone.Load(ns)
if !ok {
log.WithError(err).Warnf("netns %s not found in netns map", ns)
return
}
doneChan := ch.(chan struct{})
defer func() {
close(doneChan)
delete(w.nsDone, ns)
if netnsHandle.IsOpen() {
netnsHandle.Close()
}
Expand Down Expand Up @@ -181,9 +185,41 @@ func getNetNS() ([]string, error) {
return netns, nil
}

func (w *Watcher) handleEvent(ctx context.Context, event fsnotify.Event, out chan Event) {
ns := filepath.Base(event.Name)

switch {
case event.Op&fsnotify.Create == fsnotify.Create:
log.WithField("netns", ns).Debug("netns create notification")
w.createNamespace(ctx, ns, out)
case event.Op&fsnotify.Remove == fsnotify.Remove:
log.WithField("netns", ns).Debug("netns delete notification")
w.deleteNamespace(ns)
}
}

func (w *Watcher) createNamespace(ctx context.Context, ns string, out chan Event) {
if ch, ok := w.nsDone.Load(ns); ok {
log.WithField("netns", ns).Debug("netns channel already exists, deleting it")
close(ch.(chan struct{}))
w.nsDone.Delete(ns)
}

w.nsDone.Store(ns, make(chan struct{}))
go w.sendUpdates(ctx, ns, out)
}

func (w *Watcher) deleteNamespace(ns string) {
if ch, ok := w.nsDone.Load(ns); ok {
close(ch.(chan struct{}))
w.nsDone.Delete(ns)
} else {
log.WithField("netns", ns).Debug("netns delete but no channel exists")
}
}

func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
var err error
log := logrus.WithField("component", "ifaces.Watcher")

w.netnsWatcher, err = fsnotify.NewWatcher()
if err != nil {
Expand All @@ -198,25 +234,7 @@ func (w *Watcher) netnsNotify(ctx context.Context, out chan Event) {
if !ok {
return
}
if event.Op&fsnotify.Create == fsnotify.Create {
ns := filepath.Base(event.Name)
log.WithField("netns", ns).Debug("netns create notification")
if _, ok := w.nsDone[ns]; ok {
log.WithField("netns", ns).Debug("netns channel already exists, delete it")
delete(w.nsDone, ns)
}
w.nsDone[ns] = make(chan struct{})
go w.sendUpdates(ctx, ns, out)
}
if event.Op&fsnotify.Remove == fsnotify.Remove {
ns := filepath.Base(event.Name)
log.WithField("netns", ns).Debug("netns delete notification")
if _, ok := w.nsDone[ns]; ok {
w.nsDone[ns] <- struct{}{}
} else {
log.WithField("netns", ns).Debug("netns delete but there is no channel to send events to")
}
}
w.handleEvent(ctx, event, out)
case err, ok := <-w.netnsWatcher.Errors:
if !ok {
return
Expand Down

0 comments on commit 85d5340

Please sign in to comment.