diff --git a/waku/v2/api/missing/missing_messages.go b/waku/v2/api/missing/missing_messages.go index b10a9de9a..578d1f74b 100644 --- a/waku/v2/api/missing/missing_messages.go +++ b/waku/v2/api/missing/missing_messages.go @@ -44,10 +44,12 @@ type MissingMessageVerifier struct { criteriaInterest map[string]criteriaInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages criteriaInterestMu sync.RWMutex - C <-chan *protocol.Envelope + C chan *protocol.Envelope - timesource timesource.Timesource - logger *zap.Logger + timesource timesource.Timesource + logger *zap.Logger + isRunning bool + runningMutex sync.RWMutex } // NewMissingMessageVerifier creates an instance of a MissingMessageVerifier @@ -64,6 +66,8 @@ func NewMissingMessageVerifier(storenodeRequester common.StorenodeRequestor, mes messageTracker: messageTracker, logger: logger.Named("missing-msg-verifier"), params: params, + criteriaInterest: make(map[string]criteriaInterest), + C: make(chan *protocol.Envelope, 1000), } } @@ -99,14 +103,18 @@ func (m *MissingMessageVerifier) SetCriteriaInterest(peerID peer.ID, contentFilt } func (m *MissingMessageVerifier) Start(ctx context.Context) { + m.runningMutex.RLock() + if m.isRunning { //make sure verifier only runs once. + m.runningMutex.RUnlock() + return + } ctx, cancelFunc := context.WithCancel(ctx) m.ctx = ctx m.cancel = cancelFunc - m.criteriaInterest = make(map[string]criteriaInterest) - - c := make(chan *protocol.Envelope, 1000) - m.C = c + m.runningMutex.Lock() + m.isRunning = true + m.runningMutex.Unlock() go func() { defer utils.LogOnPanic() t := time.NewTicker(m.params.interval) @@ -131,7 +139,7 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { semaphore <- struct{}{} go func(interest criteriaInterest) { defer utils.LogOnPanic() - m.fetchHistory(c, interest) + m.fetchHistory(m.C, interest) <-semaphore }(interest) } @@ -146,6 +154,9 @@ func (m *MissingMessageVerifier) Start(ctx context.Context) { func (m *MissingMessageVerifier) Stop() { m.cancel() + m.runningMutex.Lock() + defer m.runningMutex.Unlock() + m.isRunning = false } func (m *MissingMessageVerifier) fetchHistory(c chan<- *protocol.Envelope, interest criteriaInterest) {