diff --git a/core/state/trie_prefetcher.go b/core/state/trie_prefetcher.go index f142c86bbf..929df9c502 100644 --- a/core/state/trie_prefetcher.go +++ b/core/state/trie_prefetcher.go @@ -40,6 +40,8 @@ type triePrefetcher struct { fetches map[string]Trie // Partially or fully fetcher tries fetchers map[string]*subfetcher // Subfetchers for each trie + fetchersLock sync.RWMutex // Lock to ensure thread-safe when read/write fetchers + deliveryMissMeter metrics.Meter accountLoadMeter metrics.Meter accountDupMeter metrics.Meter @@ -71,12 +73,43 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre return p } +// copy all fetchers +func (p *triePrefetcher) copyFetchers() map[string]*subfetcher { + fetchers := make(map[string]*subfetcher) + p.fetchersLock.RLock() + defer p.fetchersLock.RUnlock() + if p.fetchers == nil { + return fetchers + } + for key, f := range p.fetchers { + fetchers[key] = f + } + return fetchers +} + +// copy all fethcers and then clear them +func (p *triePrefetcher) popFetchers() map[string]*subfetcher { + fetchers := make(map[string]*subfetcher) + p.fetchersLock.Lock() + defer p.fetchersLock.Unlock() + if p.fetchers == nil { + return fetchers + } + for key, f := range p.fetchers { + fetchers[key] = f + } + // clear them all + p.fetchers = nil + return fetchers +} + // close iterates over all the subfetchers, aborts any that were left spinning // and reports the stats to the metrics subsystem. func (p *triePrefetcher) close() { - for _, fetcher := range p.fetchers { + for _, fetcher := range p.popFetchers() { fetcher.abort() // safe to do multiple times + // it's thread-safe to read all data of fetcher now, because the fetcher already aborted. if metrics.Enabled { if fetcher.root == p.root { p.accountLoadMeter.Mark(int64(len(fetcher.seen))) @@ -99,8 +132,6 @@ func (p *triePrefetcher) close() { } } } - // Clear out all fetchers (will crash on a second call, deliberate) - p.fetchers = nil } // copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data @@ -134,7 +165,7 @@ func (p *triePrefetcher) copy() *triePrefetcher { return copy } // Otherwise we're copying an active fetcher, retrieve the current states - for id, fetcher := range p.fetchers { + for id, fetcher := range p.copyFetchers() { copy.fetches[id] = fetcher.peek() } return copy @@ -148,11 +179,13 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, keys [][] } // Active fetcher, schedule the retrievals id := p.trieID(owner, root) + p.fetchersLock.Lock() fetcher := p.fetchers[id] if fetcher == nil { fetcher = newSubfetcher(p.db, p.root, owner, root) p.fetchers[id] = fetcher } + p.fetchersLock.Unlock() fetcher.schedule(keys) } @@ -170,7 +203,9 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie { return p.db.CopyTrie(trie) } // Otherwise the prefetcher is active, bail if no trie was prefetched for this root + p.fetchersLock.RLock() fetcher := p.fetchers[id] + p.fetchersLock.RUnlock() if fetcher == nil { p.deliveryMissMeter.Mark(1) return nil @@ -190,7 +225,10 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie { // used marks a batch of state items used to allow creating statistics as to // how useful or wasteful the prefetcher is. func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) { - if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil { + p.fetchersLock.RLock() + fetcher := p.fetchers[p.trieID(owner, root)] + p.fetchersLock.RUnlock() + if fetcher != nil { fetcher.used = used } } @@ -277,11 +315,14 @@ func (sf *subfetcher) peek() Trie { // abort interrupts the subfetcher immediately. It is safe to call abort multiple // times but it is not thread safe. func (sf *subfetcher) abort() { + // abort() would be called parallely, ensure to close channel only once. + sf.lock.Lock() select { case <-sf.stop: default: close(sf.stop) } + sf.lock.Unlock() <-sf.term }