diff --git a/eth/backend.go b/eth/backend.go index a286227da0..9f1c170974 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -255,6 +255,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { Checkpoint: checkpoint, RequiredBlocks: config.RequiredBlocks, NoTxGossip: config.RollupDisableTxPoolGossip, + TriesInMemory: config.TriesInMemory, }); err != nil { return nil, err } diff --git a/eth/downloader/beaconsync.go b/eth/downloader/beaconsync.go index ff985e6b03..741e898ac3 100644 --- a/eth/downloader/beaconsync.go +++ b/eth/downloader/beaconsync.go @@ -283,7 +283,7 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { var localHeaders []*types.Header if from < tail.Number.Uint64() { count := tail.Number.Uint64() - from - if count > uint64(fsMinFullBlocks) { + if count > d.fsMinFullBlocks { return fmt.Errorf("invalid origin (%d) of beacon sync (%d)", from, tail.Number) } localHeaders = d.readHeaderRange(tail, int(count)) @@ -300,10 +300,10 @@ func (d *Downloader) fetchBeaconHeaders(from uint64) error { // move it ahead to HEAD-64 d.pivotLock.Lock() if d.pivotHeader != nil { - if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*uint64(fsMinFullBlocks)-8 { + if head.Number.Uint64() > d.pivotHeader.Number.Uint64()+2*d.fsMinFullBlocks-8 { // Retrieve the next pivot header, either from skeleton chain // or the filled chain - number := head.Number.Uint64() - uint64(fsMinFullBlocks) + number := head.Number.Uint64() - d.fsMinFullBlocks log.Warn("Pivot seemingly stale, moving", "old", d.pivotHeader.Number, "new", number) if d.pivotHeader = d.skeleton.Header(number); d.pivotHeader == nil { diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index fb9de79912..bfa61e6af3 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -57,7 +57,6 @@ var ( fsHeaderSafetyNet = 2048 // Number of headers to discard in case a chain violation is detected fsHeaderForceVerify = 24 // Number of headers to verify before and after the pivot to accept it fsHeaderContCheck = 3 * time.Second // Time interval to check for header continuations during state download - fsMinFullBlocks = 64 // Number of blocks to retrieve fully even in snap sync ) var ( @@ -156,9 +155,11 @@ type Downloader struct { chainInsertHook func([]*fetchResult) // Method to call upon inserting a chain of blocks (possibly in multiple invocations) // Progress reporting metrics - syncStartBlock uint64 // Head snap block when Geth was started - syncStartTime time.Time // Time instance when chain sync started - syncLogTime time.Time // Time instance when status was last reported + syncStartBlock uint64 // Head snap block when Geth was started + syncStartTime time.Time // Time instance when chain sync started + syncLogTime time.Time // Time instance when status was last reported + triesInMemory uint64 // How many tries keeps in memory + fsMinFullBlocks uint64 //Number of blocks to retrieve fully even in snap sync } // LightChain encapsulates functions required to synchronise a light chain. @@ -219,24 +220,26 @@ type BlockChain interface { } // New creates a new downloader to fetch hashes and blocks from remote peers. -func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func()) *Downloader { +func New(checkpoint uint64, stateDb ethdb.Database, mux *event.TypeMux, chain BlockChain, lightchain LightChain, dropPeer peerDropFn, success func(), triesInMemory uint64) *Downloader { if lightchain == nil { lightchain = chain } dl := &Downloader{ - stateDB: stateDb, - mux: mux, - checkpoint: checkpoint, - queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), - peers: newPeerSet(), - blockchain: chain, - lightchain: lightchain, - dropPeer: dropPeer, - headerProcCh: make(chan *headerTask, 1), - quitCh: make(chan struct{}), - SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()), - stateSyncStart: make(chan *stateSync), - syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(), + stateDB: stateDb, + mux: mux, + checkpoint: checkpoint, + queue: newQueue(blockCacheMaxItems, blockCacheInitialItems), + peers: newPeerSet(), + blockchain: chain, + lightchain: lightchain, + dropPeer: dropPeer, + headerProcCh: make(chan *headerTask, 1), + quitCh: make(chan struct{}), + SnapSyncer: snap.NewSyncer(stateDb, chain.TrieDB().Scheme()), + stateSyncStart: make(chan *stateSync), + syncStartBlock: chain.CurrentSnapBlock().Number.Uint64(), + triesInMemory: triesInMemory, + fsMinFullBlocks: triesInMemory / 2, } // Create the post-merge skeleton syncer and start the process dl.skeleton = newSkeleton(stateDb, dl.peers, dropPeer, newBeaconBackfiller(dl, success)) @@ -493,8 +496,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * if err != nil { return err } - if latest.Number.Uint64() > uint64(fsMinFullBlocks) { - number := latest.Number.Uint64() - uint64(fsMinFullBlocks) + if latest.Number.Uint64() > d.fsMinFullBlocks { + number := latest.Number.Uint64() - d.fsMinFullBlocks // Retrieve the pivot header from the skeleton chain segment but // fallback to local chain if it's not found in skeleton space. @@ -550,7 +553,7 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td, ttd * // Ensure our origin point is below any snap sync pivot point if mode == SnapSync { - if height <= uint64(fsMinFullBlocks) { + if height <= d.fsMinFullBlocks { origin = 0 } else { pivotNumber := pivot.Number.Uint64() @@ -734,7 +737,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty if mode == SnapSync { fetch = 2 // head + pivot headers } - headers, hashes, err := d.fetchHeadersByHash(p, latest, fetch, fsMinFullBlocks-1, true) + headers, hashes, err := d.fetchHeadersByHash(p, latest, fetch, int(d.fsMinFullBlocks-1), true) if err != nil { return nil, nil, err } @@ -750,7 +753,7 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty return nil, nil, fmt.Errorf("%w: remote head %d below checkpoint %d", errUnsyncedPeer, head.Number, d.checkpoint) } if len(headers) == 1 { - if mode == SnapSync && head.Number.Uint64() > uint64(fsMinFullBlocks) { + if mode == SnapSync && head.Number.Uint64() > d.fsMinFullBlocks { return nil, nil, fmt.Errorf("%w: no pivot included along head header", errBadPeer) } p.log.Debug("Remote head identified, no pivot", "number", head.Number, "hash", hashes[0]) @@ -759,8 +762,8 @@ func (d *Downloader) fetchHead(p *peerConnection) (head *types.Header, pivot *ty // At this point we have 2 headers in total and the first is the // validated head of the chain. Check the pivot number and return, pivot = headers[1] - if pivot.Number.Uint64() != head.Number.Uint64()-uint64(fsMinFullBlocks) { - return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-uint64(fsMinFullBlocks)) + if pivot.Number.Uint64() != head.Number.Uint64()-d.fsMinFullBlocks { + return nil, nil, fmt.Errorf("%w: remote pivot %d != requested %d", errInvalidChain, pivot.Number, head.Number.Uint64()-d.fsMinFullBlocks) } return head, pivot, nil } @@ -1042,8 +1045,8 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e pivot := d.pivotHeader.Number.Uint64() d.pivotLock.RUnlock() - p.log.Trace("Fetching next pivot header", "number", pivot+uint64(fsMinFullBlocks)) - headers, hashes, err = d.fetchHeadersByNumber(p, pivot+uint64(fsMinFullBlocks), 2, fsMinFullBlocks-9, false) // move +64 when it's 2x64-8 deep + p.log.Trace("Fetching next pivot header", "number", pivot+d.fsMinFullBlocks) + headers, hashes, err = d.fetchHeadersByNumber(p, pivot+d.fsMinFullBlocks, 2, int(d.fsMinFullBlocks-9), false) // move +64 when it's 2x64-8 deep case skeleton: p.log.Trace("Fetching skeleton headers", "count", MaxHeaderFetch, "from", from) @@ -1090,11 +1093,11 @@ func (d *Downloader) fetchHeaders(p *peerConnection, from uint64, head uint64) e if pivoting { if len(headers) == 2 { - if have, want := headers[0].Number.Uint64(), pivot+uint64(fsMinFullBlocks); have != want { + if have, want := headers[0].Number.Uint64(), pivot+d.fsMinFullBlocks; have != want { log.Warn("Peer sent invalid next pivot", "have", have, "want", want) return fmt.Errorf("%w: next pivot number %d != requested %d", errInvalidChain, have, want) } - if have, want := headers[1].Number.Uint64(), pivot+2*uint64(fsMinFullBlocks)-8; have != want { + if have, want := headers[1].Number.Uint64(), pivot+2*d.fsMinFullBlocks-8; have != want { log.Warn("Peer sent invalid pivot confirmer", "have", have, "want", want) return fmt.Errorf("%w: next pivot confirmer number %d != requested %d", errInvalidChain, have, want) } @@ -1678,9 +1681,9 @@ func (d *Downloader) processSnapSyncContent() error { // Note, we have `reorgProtHeaderDelay` number of blocks withheld, Those // need to be taken into account, otherwise we're detecting the pivot move // late and will drop peers due to unavailable state!!! - if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*uint64(fsMinFullBlocks)-uint64(reorgProtHeaderDelay) { - log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-uint64(fsMinFullBlocks)+uint64(reorgProtHeaderDelay)) - pivot = results[len(results)-1-fsMinFullBlocks+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted + if height := latest.Number.Uint64(); height >= pivot.Number.Uint64()+2*d.fsMinFullBlocks-uint64(reorgProtHeaderDelay) { + log.Warn("Pivot became stale, moving", "old", pivot.Number.Uint64(), "new", height-d.fsMinFullBlocks+uint64(reorgProtHeaderDelay)) + pivot = results[len(results)-1-int(d.fsMinFullBlocks)+reorgProtHeaderDelay].Header // must exist as lower old pivot is uncommitted d.pivotLock.Lock() d.pivotHeader = pivot diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index a884c1e950..0d0ca57812 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -82,7 +82,7 @@ func newTesterWithNotification(t *testing.T, success func()) *downloadTester { chain: chain, peers: make(map[string]*downloadTesterPeer), } - tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, success) + tester.downloader = New(0, db, new(event.TypeMux), tester.chain, nil, tester.dropPeer, success, 128) return tester } @@ -889,7 +889,7 @@ func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) { defer tester.terminate() // Create a small enough block chain to download - targetBlocks := 3*fsHeaderSafetyNet + 256 + fsMinFullBlocks + targetBlocks := 3*fsHeaderSafetyNet + 256 + int(tester.downloader.fsMinFullBlocks) chain := testChainBase.shorten(targetBlocks) // Attempt to sync with an attacker that feeds junk during the fast sync phase. @@ -1426,7 +1426,7 @@ func testCheckpointEnforcement(t *testing.T, protocol uint, mode SyncMode) { tester := newTester(t) defer tester.terminate() - tester.downloader.checkpoint = uint64(fsMinFullBlocks) + 256 + tester.downloader.checkpoint = uint64(tester.downloader.fsMinFullBlocks) + 256 chain := testChainBase.shorten(int(tester.downloader.checkpoint) - 1) // Attempt to sync with the peer and validate the result @@ -1460,7 +1460,7 @@ func testBeaconSync(t *testing.T, protocol uint, mode SyncMode) { }{ {name: "Beacon sync since genesis", local: 0}, {name: "Beacon sync with short local chain", local: 1}, - {name: "Beacon sync with long local chain", local: blockCacheMaxItems - 15 - fsMinFullBlocks/2}, + {name: "Beacon sync with long local chain", local: blockCacheMaxItems - 15 - 64/2}, {name: "Beacon sync with full local chain", local: blockCacheMaxItems - 15 - 1}, } for _, c := range cases { diff --git a/eth/downloader/testchain_test.go b/eth/downloader/testchain_test.go index 01f81a7b1c..a9d30c0285 100644 --- a/eth/downloader/testchain_test.go +++ b/eth/downloader/testchain_test.go @@ -95,8 +95,8 @@ func init() { testChainBase.shorten(800 / 6), testChainBase.shorten(800 / 7), testChainBase.shorten(800 / 8), - testChainBase.shorten(3*fsHeaderSafetyNet + 256 + fsMinFullBlocks), - testChainBase.shorten(fsMinFullBlocks + 256 - 1), + testChainBase.shorten(3*fsHeaderSafetyNet + 256 + 64), + testChainBase.shorten(64 + 256 - 1), testChainForkLightA.shorten(len(testChainBase.blocks) + 80), testChainForkLightB.shorten(len(testChainBase.blocks) + 81), testChainForkLightA.shorten(len(testChainBase.blocks) + MaxHeaderFetch), diff --git a/eth/handler.go b/eth/handler.go index 0c19ad8388..d517fdd141 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -92,6 +92,7 @@ type handlerConfig struct { Checkpoint *params.TrustedCheckpoint // Hard coded checkpoint for sync challenges RequiredBlocks map[uint64]common.Hash // Hard coded map of required block hashes for sync challenges NoTxGossip bool // Disable P2P transaction gossip + TriesInMemory uint64 // How many tries keeps in memory } type handler struct { @@ -204,7 +205,7 @@ func newHandler(config *handlerConfig) (*handler, error) { } } // Construct the downloader (long sync) - h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer, success) + h.downloader = downloader.New(h.checkpointNumber, config.Database, h.eventMux, h.chain, nil, h.removePeer, success, config.TriesInMemory) if ttd := h.chain.Config().TerminalTotalDifficulty; ttd != nil { if h.chain.Config().TerminalTotalDifficultyPassed { log.Info("Chain post-merge, sync via beacon client") diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 0fc68b0189..ae54c4776b 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -115,22 +115,24 @@ func testForkIDSplit(t *testing.T, protocol uint) { _, blocksProFork, _ = core.GenerateChainWithGenesis(gspecProFork, engine, 2, nil) ethNoFork, _ = newHandler(&handlerConfig{ - Database: dbNoFork, - Chain: chainNoFork, - TxPool: newTestTxPool(), - Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()), - Network: 1, - Sync: downloader.FullSync, - BloomCache: 1, + Database: dbNoFork, + Chain: chainNoFork, + TxPool: newTestTxPool(), + Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()), + Network: 1, + Sync: downloader.FullSync, + BloomCache: 1, + TriesInMemory: 128, }) ethProFork, _ = newHandler(&handlerConfig{ - Database: dbProFork, - Chain: chainProFork, - TxPool: newTestTxPool(), - Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()), - Network: 1, - Sync: downloader.FullSync, - BloomCache: 1, + Database: dbProFork, + Chain: chainProFork, + TxPool: newTestTxPool(), + Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()), + Network: 1, + Sync: downloader.FullSync, + BloomCache: 1, + TriesInMemory: 128, }) ) ethNoFork.Start(1000) diff --git a/eth/handler_test.go b/eth/handler_test.go index 1c25d19284..f71273991c 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -165,13 +165,14 @@ func newTestHandlerWithBlocks(blocks int) *testHandler { txpool := newTestTxPool() handler, _ := newHandler(&handlerConfig{ - Database: db, - Chain: chain, - TxPool: txpool, - Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()), - Network: 1, - Sync: downloader.SnapSync, - BloomCache: 1, + Database: db, + Chain: chain, + TxPool: txpool, + Merger: consensus.NewMerger(rawdb.NewMemoryDatabase()), + Network: 1, + Sync: downloader.SnapSync, + BloomCache: 1, + TriesInMemory: 128, }) handler.Start(1000)