Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: use triesInMemory in downloader #56

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
Checkpoint: checkpoint,
RequiredBlocks: config.RequiredBlocks,
NoTxGossip: config.RollupDisableTxPoolGossip,
TriesInMemory: config.TriesInMemory,
}); err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions eth/downloader/beaconsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
var localHeaders []*types.Header
if from < tail.Number.Uint64() {
count := tail.Number.Uint64() - from
if count > uint64(fsMinFullBlocks) {
if count > d.fsMinFullBlocks {
return fmt.Errorf("invalid origin (%d) of beacon sync (%d)", from, tail.Number)
}
localHeaders = d.readHeaderRange(tail, int(count))
Expand All @@ -300,10 +300,10 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error {
// move it ahead to HEAD-64
d.pivotLock.Lock()
if d.pivotHeader != nil {
if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*uint64(fsMinFullBlocks)-8 {
if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*d.fsMinFullBlocks-8 {
// Retrieve the next pivot header, either from skeleton chain
// or the filled chain
number := head.Number.Uint64() - uint64(fsMinFullBlocks)
number := head.Number.Uint64() - d.fsMinFullBlocks

log.Warn("Pivot seemingly stale, moving", "old", d.pivotHeader.Number, "new", number)
if d.pivotHeader = d.skeleton.Header(number); d.pivotHeader == nil {
Expand Down
67 changes: 35 additions & 32 deletions eth/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ var (
fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected
fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it
fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download
fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in snap sync
)

var (
Expand Down Expand Up @@ -156,9 +155,11 @@ type Downloader struct {
chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations)

// Progress reporting metrics
syncStartBlock uint64 // Head snap block when Geth was started
syncStartTime time.Time // Time instance when chain sync started
syncLogTime time.Time // Time instance when status was last reported
syncStartBlock uint64 // Head snap block when Geth was started
syncStartTime time.Time // Time instance when chain sync started
syncLogTime time.Time // Time instance when status was last reported
triesInMemory uint64 // How many tries keeps in memory
fsMinFullBlocks uint64 //Number of blocks to retrieve fully even in snap sync
}

// LightChain encapsulates functions required to synchronise a light chain.
Expand Down Expand Up @@ -219,24 +220,26 @@ type BlockChain interface {
}

// New creates a new downloader to fetch hashes and blocks from remote peers.
func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func()) *Downloader {
func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func(), triesInMemory uint64) *Downloader {
if lightchain == nil {
lightchain = chain
}
dl := &Downloader{
stateDB: stateDb,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()),
stateSyncStart: make(chan *stateSync),
syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(),
stateDB: stateDb,
mux: mux,
checkpoint: checkpoint,
queue: newQueue(blockCacheMaxItems, blockCacheInitialItems),
peers: newPeerSet(),
blockchain: chain,
lightchain: lightchain,
dropPeer: dropPeer,
headerProcCh: make(chan *headerTask, 1),
quitCh: make(chan struct{}),
SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()),
stateSyncStart: make(chan *stateSync),
syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(),
triesInMemory: triesInMemory,
fsMinFullBlocks: triesInMemory / 2,
}
// Create the post-merge skeleton syncer and start the process
dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success))
Expand Down Expand Up @@ -493,8 +496,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *
if err != nil {
return err
}
if latest.Number.Uint64() > uint64(fsMinFullBlocks) {
number := latest.Number.Uint64() - uint64(fsMinFullBlocks)
if latest.Number.Uint64() > d.fsMinFullBlocks {
number := latest.Number.Uint64() - d.fsMinFullBlocks

// Retrieve the pivot header from the skeleton chain segment but
// fallback to local chain if it's not found in skeleton space.
Expand Down Expand Up @@ -550,7 +553,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd *

// Ensure our origin point is below any snap sync pivot point
if mode == SnapSync {
if height <= uint64(fsMinFullBlocks) {
if height <= d.fsMinFullBlocks {
origin = 0
} else {
pivotNumber := pivot.Number.Uint64()
Expand Down Expand Up @@ -734,7 +737,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
if mode == SnapSync {
fetch = 2 // head + pivot headers
}
headers, hashes, err := d.fetchHeadersByHash(p, latest, fetch, fsMinFullBlocks-1, true)
headers, hashes, err := d.fetchHeadersByHash(p, latest, fetch, int(d.fsMinFullBlocks-1), true)
if err != nil {
return nil, nil, err
}
Expand All @@ -750,7 +753,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint)
}
if len(headers) == 1 {
if mode == SnapSync && head.Number.Uint64() > uint64(fsMinFullBlocks) {
if mode == SnapSync && head.Number.Uint64() > d.fsMinFullBlocks {
return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer)
}
p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", hashes[0])
Expand All @@ -759,8 +762,8 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty
// At this point we have 2 headers in total and the first is the
// validated head of the chain. Check the pivot number and return,
pivot = headers[1]
if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) {
return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks))
if pivot.Number.Uint64() != head.Number.Uint64()-d.fsMinFullBlocks {
return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-d.fsMinFullBlocks)
}
return head, pivot, nil
}
Expand Down Expand Up @@ -1042,8 +1045,8 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e
pivot := d.pivotHeader.Number.Uint64()
d.pivotLock.RUnlock()

p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks))
headers, hashes, err = d.fetchHeadersByNumber(p, pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep
p.log.Trace("Fetching next pivot header", "number", pivot+d.fsMinFullBlocks)
headers, hashes, err = d.fetchHeadersByNumber(p, pivot+d.fsMinFullBlocks, 2, int(d.fsMinFullBlocks-9), false) // move +64 when it's 2x64-8 deep

case skeleton:
p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from)
Expand Down Expand Up @@ -1090,11 +1093,11 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e

if pivoting {
if len(headers) == 2 {
if have, want := headers[0].Number.Uint64(), pivot+uint64(fsMinFullBlocks); have != want {
if have, want := headers[0].Number.Uint64(), pivot+d.fsMinFullBlocks; have != want {
log.Warn("Peer sent invalid next pivot", "have", have, "want", want)
return fmt.Errorf("%w: next pivot number %d != requested %d", errInvalidChain, have, want)
}
if have, want := headers[1].Number.Uint64(), pivot+2*uint64(fsMinFullBlocks)-8; have != want {
if have, want := headers[1].Number.Uint64(), pivot+2*d.fsMinFullBlocks-8; have != want {
log.Warn("Peer sent invalid pivot confirmer", "have", have, "want", want)
return fmt.Errorf("%w: next pivot confirmer number %d != requested %d", errInvalidChain, have, want)
}
Expand Down Expand Up @@ -1678,9 +1681,9 @@ func (d *Downloader) processSnapSyncContent() error {
// Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those
// need to be taken into account, otherwise we're detecting the pivot move
// late and will drop peers due to unavailable state!!!
if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) {
log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay))
pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted
if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*d.fsMinFullBlocks-uint64(reorgProtHeaderDelay) {
log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-d.fsMinFullBlocks+uint64(reorgProtHeaderDelay))
pivot = results[len(results)-1-int(d.fsMinFullBlocks)+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted

d.pivotLock.Lock()
d.pivotHeader = pivot
Expand Down
8 changes: 4 additions & 4 deletions eth/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func newTesterWithNotification(t *testing.T, success func()) *downloadTester {
chain: chain,
peers: make(map[string]*downloadTesterPeer),
}
tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, success)
tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, success, 128)
return tester
}

Expand Down Expand Up @@ -889,7 +889,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) {
defer tester.terminate()

// Create a small enough block chain to download
targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks
targetBlocks := 3*fsHeaderSafetyNet + 256 + int(tester.downloader.fsMinFullBlocks)
chain := testChainBase.shorten(targetBlocks)

// Attempt to sync with an attacker that feeds junk during the fast sync phase.
Expand Down Expand Up @@ -1426,7 +1426,7 @@ func testCheckpointEnforcement(t *testing.T, protocol uint, mode SyncMode) {
tester := newTester(t)
defer tester.terminate()

tester.downloader.checkpoint = uint64(fsMinFullBlocks) + 256
tester.downloader.checkpoint = uint64(tester.downloader.fsMinFullBlocks) + 256
chain := testChainBase.shorten(int(tester.downloader.checkpoint) - 1)

// Attempt to sync with the peer and validate the result
Expand Down Expand Up @@ -1460,7 +1460,7 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) {
}{
{name: "Beacon sync since genesis", local: 0},
{name: "Beacon sync with short local chain", local: 1},
{name: "Beacon sync with long local chain", local: blockCacheMaxItems - 15 - fsMinFullBlocks/2},
{name: "Beacon sync with long local chain", local: blockCacheMaxItems - 15 - 64/2},
{name: "Beacon sync with full local chain", local: blockCacheMaxItems - 15 - 1},
}
for _, c := range cases {
Expand Down
4 changes: 2 additions & 2 deletions eth/downloader/testchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func init() {
testChainBase.shorten(800 / 6),
testChainBase.shorten(800 / 7),
testChainBase.shorten(800 / 8),
testChainBase.shorten(3*fsHeaderSafetyNet + 256 + fsMinFullBlocks),
testChainBase.shorten(fsMinFullBlocks + 256 - 1),
testChainBase.shorten(3*fsHeaderSafetyNet + 256 + 64),
testChainBase.shorten(64 + 256 - 1),
testChainForkLightA.shorten(len(testChainBase.blocks) + 80),
testChainForkLightB.shorten(len(testChainBase.blocks) + 81),
testChainForkLightA.shorten(len(testChainBase.blocks) + MaxHeaderFetch),
Expand Down
3 changes: 2 additions & 1 deletion eth/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type handlerConfig struct {
Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges
RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges
NoTxGossip bool // Disable P2P transaction gossip
TriesInMemory uint64 // How many tries keeps in memory
}

type handler struct {
Expand Down Expand Up @@ -204,7 +205,7 @@ func newHandler(config *handlerConfig) (*handler, error) {
}
}
// Construct the downloader (long sync)
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer, success)
h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer, success, config.TriesInMemory)
if ttd := h.chain.Config().TerminalTotalDifficulty; ttd != nil {
if h.chain.Config().TerminalTotalDifficultyPassed {
log.Info("Chain post-merge, sync via beacon client")
Expand Down
30 changes: 16 additions & 14 deletions eth/handler_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,22 +115,24 @@ func testForkIDSplit(t *testing.T, protocol uint) {
_, blocksProFork, _ = core.GenerateChainWithGenesis(gspecProFork, engine, 2, nil)

ethNoFork, _ = newHandler(&handlerConfig{
Database: dbNoFork,
Chain: chainNoFork,
TxPool: newTestTxPool(),
Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()),
Network: 1,
Sync: downloader.FullSync,
BloomCache: 1,
Database: dbNoFork,
Chain: chainNoFork,
TxPool: newTestTxPool(),
Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()),
Network: 1,
Sync: downloader.FullSync,
BloomCache: 1,
TriesInMemory: 128,
})
ethProFork, _ = newHandler(&handlerConfig{
Database: dbProFork,
Chain: chainProFork,
TxPool: newTestTxPool(),
Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()),
Network: 1,
Sync: downloader.FullSync,
BloomCache: 1,
Database: dbProFork,
Chain: chainProFork,
TxPool: newTestTxPool(),
Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()),
Network: 1,
Sync: downloader.FullSync,
BloomCache: 1,
TriesInMemory: 128,
})
)
ethNoFork.Start(1000)
Expand Down
15 changes: 8 additions & 7 deletions eth/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,14 @@ func newTestHandlerWithBlocks(blocks int) *testHandler {
txpool := newTestTxPool()

handler, _ := newHandler(&handlerConfig{
Database: db,
Chain: chain,
TxPool: txpool,
Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()),
Network: 1,
Sync: downloader.SnapSync,
BloomCache: 1,
Database: db,
Chain: chain,
TxPool: txpool,
Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()),
Network: 1,
Sync: downloader.SnapSync,
BloomCache: 1,
TriesInMemory: 128,
})
handler.Start(1000)

Expand Down
Loading