Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix prefetcher concurrent map read and write issue #63

Closed
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions core/state/trie_prefetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type triePrefetcher struct {
fetches map[string]Trie // Partially or fully fetcher tries
fetchers map[string]*subfetcher // Subfetchers for each trie

fetchersLock sync.RWMutex // Lock to ensure thread-safe when read/write fetchers

deliveryMissMeter metrics.Meter
accountLoadMeter metrics.Meter
accountDupMeter metrics.Meter
Expand Down Expand Up @@ -71,12 +73,43 @@ func newTriePrefetcher(db Database, root common.Hash, namespace string) *triePre
return p
}

// copy all fetchers
func (p *triePrefetcher) copyFetchers() map[string]*subfetcher {
fetchers := make(map[string]*subfetcher)
p.fetchersLock.RLock()
defer p.fetchersLock.RUnlock()
if p.fetchers == nil {
return fetchers
}
for key, f := range p.fetchers {
fetchers[key] = f
}
return fetchers
}

// copy all fethcers and then clear them
func (p *triePrefetcher) popFetchers() map[string]*subfetcher {
fetchers := make(map[string]*subfetcher)
p.fetchersLock.Lock()
defer p.fetchersLock.Unlock()
if p.fetchers == nil {
return fetchers
}
for key, f := range p.fetchers {
fetchers[key] = f
}
// clear them all
p.fetchers = nil
return fetchers
}

// close iterates over all the subfetchers, aborts any that were left spinning
// and reports the stats to the metrics subsystem.
func (p *triePrefetcher) close() {
for _, fetcher := range p.fetchers {
for _, fetcher := range p.popFetchers() {
fetcher.abort() // safe to do multiple times

// it's thread-safe to read all data of fetcher now, because the fetcher already aborted.
if metrics.Enabled {
if fetcher.root == p.root {
p.accountLoadMeter.Mark(int64(len(fetcher.seen)))
Expand All @@ -99,8 +132,6 @@ func (p *triePrefetcher) close() {
}
}
}
// Clear out all fetchers (will crash on a second call, deliberate)
p.fetchers = nil
}

// copy creates a deep-but-inactive copy of the trie prefetcher. Any trie data
Expand Down Expand Up @@ -134,7 +165,7 @@ func (p *triePrefetcher) copy() *triePrefetcher {
return copy
}
// Otherwise we're copying an active fetcher, retrieve the current states
for id, fetcher := range p.fetchers {
for id, fetcher := range p.copyFetchers() {
copy.fetches[id] = fetcher.peek()
}
return copy
Expand All @@ -148,11 +179,13 @@ func (p *triePrefetcher) prefetch(owner common.Hash, root common.Hash, keys [][]
}
// Active fetcher, schedule the retrievals
id := p.trieID(owner, root)
p.fetchersLock.Lock()
fetcher := p.fetchers[id]
if fetcher == nil {
fetcher = newSubfetcher(p.db, p.root, owner, root)
p.fetchers[id] = fetcher
}
p.fetchersLock.Unlock()
fetcher.schedule(keys)
}

Expand All @@ -170,7 +203,9 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
return p.db.CopyTrie(trie)
}
// Otherwise the prefetcher is active, bail if no trie was prefetched for this root
p.fetchersLock.RLock()
fetcher := p.fetchers[id]
p.fetchersLock.RUnlock()
if fetcher == nil {
p.deliveryMissMeter.Mark(1)
return nil
Expand All @@ -190,7 +225,10 @@ func (p *triePrefetcher) trie(owner common.Hash, root common.Hash) Trie {
// used marks a batch of state items used to allow creating statistics as to
// how useful or wasteful the prefetcher is.
func (p *triePrefetcher) used(owner common.Hash, root common.Hash, used [][]byte) {
if fetcher := p.fetchers[p.trieID(owner, root)]; fetcher != nil {
p.fetchersLock.RLock()
fetcher := p.fetchers[p.trieID(owner, root)]
p.fetchersLock.RUnlock()
if fetcher != nil {
fetcher.used = used
}
}
Expand Down
Loading