Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Libevm upstream sync with pivot change #830

Draft
wants to merge 21 commits into
base: libevm-upstream-sync
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
d47c875
Prototyping block queueing
alarso16 Feb 20, 2025
4ec6e3b
Prototyped mechanism to switch pivot
alarso16 Feb 21, 2025
2d4cfdb
Added more logging to see results
alarso16 Feb 21, 2025
706962d
Fixed hash passed to Syncer, will flush on pivot change
alarso16 Feb 21, 2025
1f2fddb
Added fixes from draft PR, changed incoming verification and acceptan…
alarso16 Feb 24, 2025
f2c8035
Commented out idea how to fix issue
alarso16 Feb 25, 2025
58a03c8
Added logging used on test node
alarso16 Feb 25, 2025
370167f
Updated logging, ignore SetPreference, only update atomic state on re…
alarso16 Feb 25, 2025
89822ab
Added simple queue to downloader for incoming blocks, flushing on piv…
alarso16 Feb 26, 2025
b6d29d1
Adjusted reindexing for buffer, adding additional error checking
alarso16 Feb 26, 2025
344bec9
Fixed buffer len error, added close channel, refactored for readability
alarso16 Feb 26, 2025
a5c1b0c
Adjusted logging to be more informative
alarso16 Feb 27, 2025
e2e5e6e
Added additional logging, changed to pivot on multiple of interval
alarso16 Feb 27, 2025
b1f59bd
Big refactor of dynamic sync stuff - likely has bugs
alarso16 Feb 27, 2025
4f914b5
Fixes error where dl may be nil if not initialized
alarso16 Feb 27, 2025
23cd38c
Reduce diff for unnecesary logs
alarso16 Feb 27, 2025
2213833
Fixed some logging
alarso16 Feb 28, 2025
ed8bef6
Added a probably superfluous lock and logging for debugging
alarso16 Feb 28, 2025
0dbbd8f
Changed atomic lock to use channel, adjusted logging
alarso16 Feb 28, 2025
9338942
Added additional logging for debugging
alarso16 Mar 3, 2025
bd29ed8
Fixed bug in finding evmBlock for specific hash
alarso16 Mar 3, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,7 @@ func (bc *BlockChain) SenderCacher() *TxSenderCacher {
// assumes that the chain manager mutex is held.
func (bc *BlockChain) loadLastState(lastAcceptedHash common.Hash) error {
// Initialize genesis state
log.Debug("Called loadLastState with", "hash", lastAcceptedHash)
if lastAcceptedHash == (common.Hash{}) {
return bc.loadGenesisState()
}
Expand All @@ -677,6 +678,7 @@ func (bc *BlockChain) loadLastState(lastAcceptedHash common.Hash) error {
if headBlock == nil {
return fmt.Errorf("could not load head block %s", head.Hex())
}
log.Debug("Put head block in loadLastState as", "hash", headBlock.Hash(), "height", headBlock.NumberU64())
// Everything seems to be fine, set as the head block
bc.currentBlock.Store(headBlock.Header())

Expand Down Expand Up @@ -1142,6 +1144,26 @@ func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, parentRoot common
return nil
}

// InsertBlockDuringSync writes the block to the database without receipts or state.
// skipping pruning management for simplicity
func (bc *BlockChain) InsertBlockDuringSync(block *types.Block) error {
// Write the block to the database
batch := bc.db.NewBatch()
rawdb.WriteBlock(batch, block)
if err := batch.Write(); err != nil {
return fmt.Errorf("failed to write block during sync: %w", err)
}

// If [block] represents a new tip of the canonical chain, we optimistically add it before
// setPreference is called. Otherwise, we consider it a side chain block.
if bc.newTip(block) {
bc.writeCanonicalBlockWithLogs(block, nil)
} else {
bc.chainSideFeed.Send(ChainSideEvent{Block: block})
}
return nil
}

// writeBlockWithState writes the block and all associated state to the database,
// but it expects the chain mutex to be held.
func (bc *BlockChain) writeBlockWithState(block *types.Block, parentRoot common.Hash, receipts []*types.Receipt, state *state.StateDB) error {
Expand Down Expand Up @@ -2039,6 +2061,7 @@ func (bc *BlockChain) gatherBlockRootsAboveLastAccepted() map[common.Hash]struct
// in-memory and on disk current block pointers to [block].
// Only should be called after state sync has completed.
func (bc *BlockChain) ResetToStateSyncedBlock(block *types.Block) error {
log.Debug("Called ResetToStateSyncedBlock with", "hash", block.Hash(), "height", block.NumberU64())
bc.chainmu.Lock()
defer bc.chainmu.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion eth/protocols/snap/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -3070,7 +3070,7 @@ func (s *Syncer) reportSyncProgress(force bool) {
storage = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.storageSynced), s.storageBytes.TerminalString())
bytecode = fmt.Sprintf("%v@%v", log.FormatLogfmtUint64(s.bytecodeSynced), s.bytecodeBytes.TerminalString())
)
log.Info("Syncing: state download in progress", "synced", progress, "state", synced,
log.Info("Syncing: state download in progress", "synced", progress, "root", s.root, "state", synced,
"accounts", accounts, "slots", storage, "codes", bytecode, "eta", common.PrettyDuration(estTime-elapsed))
}

Expand Down
208 changes: 176 additions & 32 deletions plugin/evm/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/ava-labs/coreth/params"
"github.com/ava-labs/coreth/params/extras"
"github.com/ava-labs/coreth/plugin/evm/atomic"
"github.com/ava-labs/coreth/plugin/evm/statesync"
"github.com/ava-labs/coreth/precompile/precompileconfig"
"github.com/ava-labs/coreth/predicate"

Expand Down Expand Up @@ -140,6 +141,13 @@ func (b *Block) AtomicTxs() []*atomic.Tx { return b.atomicTxs }

// Accept implements the snowman.Block interface
func (b *Block) Accept(context.Context) error {
if b.vm.StateSyncClient.AsyncReceive() {
return b.vm.StateSyncClient.QueueBlockOrPivot(b, statesync.AcceptSyncBlockRequest)
}
return b.accept()
}

func (b *Block) accept() error {
vm := b.vm

// Although returning an error from Accept is considered fatal, it is good
Expand Down Expand Up @@ -187,6 +195,45 @@ func (b *Block) Accept(context.Context) error {
return atomicState.Accept(vdbBatch, nil)
}

func (b *Block) acceptDuringSync(final bool) error {
if final {
return b.accept()
}

vm := b.vm

// Although returning an error from Accept is considered fatal, it is good
// practice to cleanup the batch we were modifying in the case of an error.
defer vm.versiondb.Abort()
if err := vm.acceptedBlockDB.Put(lastAcceptedKey, b.id[:]); err != nil {
return fmt.Errorf("failed to put %s as the last accepted block: %w", b.ID(), err)
}

// Update VM state for atomic txs in this block. This includes updating the
// atomic tx repo, atomic trie, and shared memory.
atomicState, err := b.vm.atomicBackend.GetVerifiedAtomicState(common.Hash(b.ID()))
if err != nil {
// should never occur since [b] must be verified before calling Accept
return err
}
// Get pending operations on the vm's versionDB so we can apply them atomically
// with the shared memory changes.
vdbBatch, err := b.vm.versiondb.CommitBatch()
if err != nil {
return fmt.Errorf("could not create commit batch processing block[%s]: %w", b.ID(), err)
}

// Apply any shared memory changes atomically with other pending changes to
// the vm's versionDB.
if err := atomicState.Accept(vdbBatch, nil); err != nil {
return err
}

log.Debug("Returning from accept without error", "block", b.ID(), "height", b.Height())

return nil
}

// handlePrecompileAccept calls Accept on any logs generated with an active precompile address that implements
// contract.Accepter
func (b *Block) handlePrecompileAccept(rules extras.Rules) error {
Expand Down Expand Up @@ -224,6 +271,13 @@ func (b *Block) handlePrecompileAccept(rules extras.Rules) error {
// Reject implements the snowman.Block interface
// If [b] contains an atomic transaction, attempt to re-issue it
func (b *Block) Reject(context.Context) error {
if b.vm.StateSyncClient.AsyncReceive() {
return b.vm.StateSyncClient.QueueBlockOrPivot(b, statesync.RejectSyncBlockRequest)
}
return b.reject()
}

func (b *Block) reject() error {
log.Debug(fmt.Sprintf("Rejecting block %s (%s) at height %d", b.ID().Hex(), b.ID(), b.Height()))
for _, tx := range b.atomicTxs {
// Re-issue the transaction in the mempool, continue even if it fails
Expand All @@ -243,6 +297,25 @@ func (b *Block) Reject(context.Context) error {
return b.vm.blockChain.Reject(b.ethBlock)
}

func (b *Block) rejectDuringSync(final bool) error {
if final {
return b.reject()
}

atomicState, err := b.vm.atomicBackend.GetVerifiedAtomicState(common.Hash(b.ID()))
if err != nil {
// should never occur since [b] must be verified before calling Reject
log.Error("Should never happen because block must be verified before calling Reject", "block", b.ID(), "height", b.Height())
return err
}
if err := atomicState.Reject(); err != nil {
return err
}

log.Debug("Returning from reject without error", "block", b.ID(), "height", b.Height())
return nil
}

// Parent implements the snowman.Block interface
func (b *Block) Parent() ids.ID {
return ids.ID(b.ethBlock.ParentHash())
Expand All @@ -258,23 +331,33 @@ func (b *Block) Timestamp() time.Time {
return time.Unix(int64(b.ethBlock.Time()), 0)
}

// syntacticVerify verifies that a *Block is well-formed.
func (b *Block) syntacticVerify() error {
if b == nil || b.ethBlock == nil {
return errInvalidBlock
}

header := b.ethBlock.Header()
rules := b.vm.chainConfig.Rules(header.Number, params.IsMergeTODO, header.Time)
return b.vm.syntacticBlockValidator.SyntacticVerify(b, rules)
}

// Verify implements the snowman.Block interface
func (b *Block) Verify(context.Context) error {
return b.verify(&precompileconfig.PredicateContext{
log.Debug("Verifying block without context", "block", b.ID(), "height", b.Height())
if err := b.syntacticVerify(); err != nil {
return fmt.Errorf("syntactic block verification failed: %w", err)
}

predicateContext := &precompileconfig.PredicateContext{
SnowCtx: b.vm.ctx,
ProposerVMBlockCtx: nil,
}, true)
}
// Only enforce predicates if the chain has already bootstrapped.
// If the chain is still bootstrapping, we can assume that all blocks we are verifying have
// been accepted by the network (so the predicate was validated by the network when the
// block was originally verified).
if b.vm.bootstrapped.Get() {
if err := b.verifyPredicates(predicateContext); err != nil {
return fmt.Errorf("failed to verify predicates: %w", err)
}
}

// If currently dynamically syncing, we should simply postpone execution
if b.vm.StateSyncClient.AsyncReceive() {
return b.vm.StateSyncClient.QueueBlockOrPivot(b, statesync.VerifySyncBlockRequest)
}

return b.verify(true)
}

// ShouldVerifyWithContext implements the block.WithVerifyContext interface
Expand Down Expand Up @@ -303,30 +386,15 @@ func (b *Block) ShouldVerifyWithContext(context.Context) (bool, error) {

// VerifyWithContext implements the block.WithVerifyContext interface
func (b *Block) VerifyWithContext(ctx context.Context, proposerVMBlockCtx *block.Context) error {
return b.verify(&precompileconfig.PredicateContext{
SnowCtx: b.vm.ctx,
ProposerVMBlockCtx: proposerVMBlockCtx,
}, true)
}

// Verify the block is valid.
// Enforces that the predicates are valid within [predicateContext].
// Writes the block details to disk and the state to the trie manager iff writes=true.
func (b *Block) verify(predicateContext *precompileconfig.PredicateContext, writes bool) error {
if predicateContext.ProposerVMBlockCtx != nil {
log.Debug("Verifying block with context", "block", b.ID(), "height", b.Height())
} else {
log.Debug("Verifying block without context", "block", b.ID(), "height", b.Height())
}
log.Debug("Verifying block with context", "block", b.ID(), "height", b.Height())
if err := b.syntacticVerify(); err != nil {
return fmt.Errorf("syntactic block verification failed: %w", err)
}

// verify UTXOs named in import txs are present in shared memory.
if err := b.verifyUTXOsPresent(); err != nil {
return err
predicateContext := &precompileconfig.PredicateContext{
SnowCtx: b.vm.ctx,
ProposerVMBlockCtx: proposerVMBlockCtx,
}

// Only enforce predicates if the chain has already bootstrapped.
// If the chain is still bootstrapping, we can assume that all blocks we are verifying have
// been accepted by the network (so the predicate was validated by the network when the
Expand All @@ -337,6 +405,23 @@ func (b *Block) verify(predicateContext *precompileconfig.PredicateContext, writ
}
}

// If currently dynamically syncing, we should postpone execution
if b.vm.StateSyncClient.AsyncReceive() {
return b.vm.StateSyncClient.QueueBlockOrPivot(b, statesync.VerifySyncBlockRequest)
}

return b.verify(true)
}

// Verify the block is valid.
// Enforces that the predicates are valid within [predicateContext].
// Writes the block details to disk and the state to the trie manager iff writes=true.
func (b *Block) verify(writes bool) error {
// verify UTXOs named in import txs are present in shared memory.
if err := b.verifyUTXOsPresent(); err != nil {
return err
}

// The engine may call VerifyWithContext multiple times on the same block with different contexts.
// Since the engine will only call Accept/Reject once, we should only call InsertBlockManual once.
// Additionally, if a block is already in processing, then it has already passed verification and
Expand All @@ -358,6 +443,65 @@ func (b *Block) verify(predicateContext *precompileconfig.PredicateContext, writ
return err
}

func (b *Block) verifyDuringSync(final bool) error {
if final {
return b.verify(true)
}

log.Debug("Verifying block during sync", "block", b.ID(), "height", b.Height())
var (
block = b.ethBlock
header = block.Header()
vm = b.vm
rules = vm.chainConfig.Rules(header.Number, params.IsMergeTODO, header.Time)
rulesExtra = *params.GetRulesExtra(rules)
)

txs, err := atomic.ExtractAtomicTxs(block.ExtData(), rulesExtra.IsApricotPhase5, atomic.Codec)
if err != nil {
return err
}

if err := vm.blockChain.InsertBlockDuringSync(block); err != nil {
return err
}

// If [atomicBackend] is nil, the VM is still initializing and is reprocessing accepted blocks.
if vm.atomicBackend != nil {
if vm.atomicBackend.IsBonus(block.NumberU64(), block.Hash()) {
log.Info("skipping atomic tx verification on bonus block", "block", block.Hash())
} else {
// Verify [txs] do not conflict with themselves or ancestor blocks.
if err := vm.verifyTxs(txs, block.ParentHash(), block.BaseFee(), block.NumberU64(), rulesExtra); err != nil {
return err
}
}
// Update the atomic backend with [txs] from this block.
//
// Note: The atomic trie canonically contains the duplicate operations
// from any bonus blocks.
_, err := vm.atomicBackend.InsertTxs(block.Hash(), block.NumberU64(), block.ParentHash(), txs)
if err != nil {
return err
}
}

log.Debug("Returning from verify without error", "block", b.ID(), "height", b.Height())

return nil
}

// syntacticVerify verifies that a *Block is well-formed.
func (b *Block) syntacticVerify() error {
if b == nil || b.ethBlock == nil {
return errInvalidBlock
}

header := b.ethBlock.Header()
rules := b.vm.chainConfig.Rules(header.Number, params.IsMergeTODO, header.Time)
return b.vm.syntacticBlockValidator.SyntacticVerify(b, rules)
}

// verifyPredicates verifies the predicates in the block are valid according to predicateContext.
func (b *Block) verifyPredicates(predicateContext *precompileconfig.PredicateContext) error {
rules := b.vm.chainConfig.Rules(b.ethBlock.Number(), params.IsMergeTODO, b.ethBlock.Timestamp())
Expand Down
Loading
Loading