Skip to content

Commit

Permalink
Merge branch 'develop' into fastNode
Browse files Browse the repository at this point in the history
  • Loading branch information
yutianwu authored Apr 8, 2024
2 parents 97dd248 + c5a0092 commit a96c695
Show file tree
Hide file tree
Showing 14 changed files with 218 additions and 9 deletions.
42 changes: 35 additions & 7 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,14 @@ var (
blockExecutionTimer = metrics.NewRegisteredTimer("chain/execution", nil)
blockWriteTimer = metrics.NewRegisteredTimer("chain/write", nil)

blockWriteExternalTimer = metrics.NewRegisteredTimer("chain/block/write/external", nil)
stateCommitExternalTimer = metrics.NewRegisteredTimer("chain/state/commit/external", nil)
triedbCommitExternalTimer = metrics.NewRegisteredTimer("chain/triedb/commit/external", nil)
innerExecutionTimer = metrics.NewRegisteredTimer("chain/inner/execution", nil)

blockGasUsedGauge = metrics.NewRegisteredGauge("chain/block/gas/used", nil)
mgaspsGauge = metrics.NewRegisteredGauge("chain/mgas/ps", nil)

blockReorgMeter = metrics.NewRegisteredMeter("chain/reorg/executes", nil)
blockReorgAddMeter = metrics.NewRegisteredMeter("chain/reorg/add", nil)
blockReorgDropMeter = metrics.NewRegisteredMeter("chain/reorg/drop", nil)
Expand Down Expand Up @@ -1445,6 +1453,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
//
// Note all the components of block(td, hash->number map, header, body, receipts)
// should be written atomically. BlockBatch is used for containing all components.
start := time.Now()
blockBatch := bc.db.NewBatch()
rawdb.WriteTd(blockBatch, block.Hash(), block.NumberU64(), externTd)
rawdb.WriteBlock(blockBatch, block)
Expand All @@ -1453,17 +1462,29 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
if err := blockBatch.Write(); err != nil {
log.Crit("Failed to write block into disk", "err", err)
}
blockWriteExternalTimer.UpdateSince(start)
log.Debug("blockWriteExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash())

// Commit all cached state changes into underlying memory database.
start = time.Now()
root, err := state.Commit(block.NumberU64(), bc.chainConfig.IsEIP158(block.Number()))
if err != nil {
return err
}
stateCommitExternalTimer.UpdateSince(start)
log.Debug("stateCommitExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash())

// If node is running in path mode, skip explicit gc operation
// which is unnecessary in this mode.
if bc.triedb.Scheme() == rawdb.PathScheme {
return nil
}
// If we're running an archive node, always flush
start = time.Now()
defer func () {
triedbCommitExternalTimer.UpdateSince(start)
log.Debug("triedbCommitExternalTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", block.Hash())
} ()
if bc.cacheConfig.TrieDirtyDisabled {
return bc.triedb.Commit(root, false)
}
Expand Down Expand Up @@ -1768,7 +1789,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
}()

defer func () {
DebugInnerExecutionDuration = 0
}()
for ; block != nil && err == nil || errors.Is(err, ErrKnownBlock); block, err = it.next() {
DebugInnerExecutionDuration = 0
// If the chain is terminating, stop processing blocks
if bc.insertStopped() {
log.Debug("Abort during block processing")
Expand Down Expand Up @@ -1897,12 +1922,12 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
storageUpdateTimer.Update(statedb.StorageUpdates) // Storage updates are complete(in validation)
accountHashTimer.Update(statedb.AccountHashes) // Account hashes are complete(in validation)
storageHashTimer.Update(statedb.StorageHashes) // Storage hashes are complete(in validation)
triehash := statedb.AccountHashes + statedb.StorageHashes // The time spent on tries hashing
trieUpdate := statedb.AccountUpdates + statedb.StorageUpdates // The time spent on tries update
trieRead := statedb.SnapshotAccountReads + statedb.AccountReads // The time spent on account read
trieRead += statedb.SnapshotStorageReads + statedb.StorageReads // The time spent on storage read
blockExecutionTimer.Update(ptime - trieRead) // The time spent on EVM processing
blockValidationTimer.Update(vtime - (triehash + trieUpdate)) // The time spent on block validation
blockExecutionTimer.Update(ptime) // The time spent on block execution
blockValidationTimer.Update(vtime) // The time spent on block validation

innerExecutionTimer.Update(DebugInnerExecutionDuration)

log.Debug("New payload execution and validation metrics", "hash", block.Hash(), "execution", common.PrettyDuration(ptime), "validation", common.PrettyDuration(vtime), "accountReads", common.PrettyDuration(statedb.AccountReads), "storageReads", common.PrettyDuration(statedb.StorageReads), "snapshotAccountReads", common.PrettyDuration(statedb.SnapshotAccountReads), "snapshotStorageReads", common.PrettyDuration(statedb.SnapshotStorageReads), "accountUpdates", common.PrettyDuration(statedb.AccountUpdates), "storageUpdates", common.PrettyDuration(statedb.StorageUpdates), "accountHashes", common.PrettyDuration(statedb.AccountHashes), "storageHashes", common.PrettyDuration(statedb.StorageHashes))

// Write the block to the chain and get the status.
var (
Expand All @@ -1925,9 +1950,11 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
snapshotCommitTimer.Update(statedb.SnapshotCommits) // Snapshot commits are complete, we can mark them
triedbCommitTimer.Update(statedb.TrieDBCommits) // Trie database commits are complete, we can mark them

blockWriteTimer.Update(time.Since(wstart) - statedb.AccountCommits - statedb.StorageCommits - statedb.SnapshotCommits - statedb.TrieDBCommits)
blockWriteTimer.UpdateSince(wstart)
blockInsertTimer.UpdateSince(start)

log.Debug("New payload db write metrics", "hash", block.Hash(), "insert", common.PrettyDuration(time.Since(start)), "writeDB", common.PrettyDuration(time.Since(wstart)), "writeBlock", common.PrettyDuration(time.Since(wstart)), "accountCommit", common.PrettyDuration(statedb.AccountCommits), "storageCommit", common.PrettyDuration(statedb.StorageCommits), "snapshotCommits", common.PrettyDuration(statedb.SnapshotCommits), "triedbCommit", common.PrettyDuration(statedb.TrieDBCommits))

// Report the import stats before returning the various results
stats.processed++
stats.usedGas += usedGas
Expand All @@ -1938,6 +1965,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
}
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)
blockGasUsedGauge.Update(int64(block.GasUsed())/1000000)

if !setHead {
// After merge we expect few side chains. Simply count
Expand Down
1 change: 1 addition & 0 deletions core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn
"blocks", st.processed, "txs", txs, "mgas", float64(st.usedGas) / 1000000,
"elapsed", common.PrettyDuration(elapsed), "mgasps", float64(st.usedGas) * 1000 / float64(elapsed),
}
mgaspsGauge.Update(int64(st.usedGas)*1000/int64(elapsed))
if timestamp := time.Unix(int64(end.Time()), 0); time.Since(timestamp) > time.Minute {
context = append(context, []interface{}{"age", common.PrettyAge(timestamp)}...)
}
Expand Down
10 changes: 10 additions & 0 deletions core/rawdb/accessors_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package rawdb

import (
"encoding/binary"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
)

// ReadSnapshotDisabled retrieves if the snapshot maintenance is disabled.
Expand Down Expand Up @@ -74,6 +76,10 @@ func DeleteSnapshotRoot(db ethdb.KeyValueWriter) {

// ReadAccountSnapshot retrieves the snapshot entry of an account trie leaf.
func ReadAccountSnapshot(db ethdb.KeyValueReader, hash common.Hash) []byte {
if metrics.EnabledExpensive {
start := time.Now()
defer func() { rawdbGetAccountSnapNodeTimer.UpdateSince(start) }()
}
data, _ := db.Get(accountSnapshotKey(hash))
return data
}
Expand All @@ -94,6 +100,10 @@ func DeleteAccountSnapshot(db ethdb.KeyValueWriter, hash common.Hash) {

// ReadStorageSnapshot retrieves the snapshot entry of an storage trie leaf.
func ReadStorageSnapshot(db ethdb.KeyValueReader, accountHash, storageHash common.Hash) []byte {
if metrics.EnabledExpensive {
start := time.Now()
defer func() { rawdbGetStorageSnapNodeTimer.UpdateSince(start) }()
}
data, _ := db.Get(storageSnapshotKey(accountHash, storageHash))
return data
}
Expand Down
10 changes: 10 additions & 0 deletions core/rawdb/accessors_trie.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package rawdb
import (
"fmt"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"golang.org/x/crypto/sha3"
)

Expand Down Expand Up @@ -68,6 +70,10 @@ func (h *hasher) release() {
// ReadAccountTrieNode retrieves the account trie node and the associated node
// hash with the specified node path.
func ReadAccountTrieNode(db ethdb.KeyValueReader, path []byte) ([]byte, common.Hash) {
if metrics.EnabledExpensive {
start := time.Now()
defer func() { rawdbGetAccountTrieNodeTimer.UpdateSince(start) }()
}
data, err := db.Get(accountTrieNodeKey(path))
if err != nil {
return nil, common.Hash{}
Expand Down Expand Up @@ -116,6 +122,10 @@ func DeleteAccountTrieNode(db ethdb.KeyValueWriter, path []byte) {
// ReadStorageTrieNode retrieves the storage trie node and the associated node
// hash with the specified node path.
func ReadStorageTrieNode(db ethdb.KeyValueReader, accountHash common.Hash, path []byte) ([]byte, common.Hash) {
if metrics.EnabledExpensive {
start := time.Now()
defer func() { rawdbGetStorageTrieNodeTimer.UpdateSince(start) }()
}
data, err := db.Get(storageTrieNodeKey(accountHash, path))
if err != nil {
return nil, common.Hash{}
Expand Down
10 changes: 10 additions & 0 deletions core/rawdb/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package rawdb

import "github.com/ethereum/go-ethereum/metrics"

var (
rawdbGetAccountTrieNodeTimer = metrics.NewRegisteredTimer("rawdb/get/account/trienode/time", nil)
rawdbGetStorageTrieNodeTimer = metrics.NewRegisteredTimer("rawdb/get/storage/trienode/time", nil)
rawdbGetAccountSnapNodeTimer = metrics.NewRegisteredTimer("rawdb/get/account/snapnode/time", nil)
rawdbGetStorageSnapNodeTimer = metrics.NewRegisteredTimer("rawdb/get/storage/snapnode/time", nil)
)
8 changes: 8 additions & 0 deletions core/state_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus"
Expand All @@ -28,9 +29,12 @@ import (
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/params"
)

var processTxTimer = metrics.NewRegisteredTimer("process/tx/time", nil)

// StateProcessor is a basic Processor, which takes care of transitioning
// state from one point to another.
//
Expand Down Expand Up @@ -88,6 +92,7 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg
}
// Iterate over and process the individual transactions
for i, tx := range block.Transactions() {
start := time.Now()
msg, err := TransactionToMessage(tx, signer, header.BaseFee)
if err != nil {
return nil, nil, 0, fmt.Errorf("could not apply tx %d [%v]: %w", i, tx.Hash().Hex(), err)
Expand All @@ -100,6 +105,9 @@ func (p *StateProcessor) Process(block *types.Block, statedb *state.StateDB, cfg

receipts = append(receipts, receipt)
allLogs = append(allLogs, receipt.Logs...)
if metrics.EnabledExpensive {
processTxTimer.UpdateSince(start)
}
}
// Fail if Shanghai not enabled and len(withdrawals) is non-zero.
withdrawals := block.Withdrawals()
Expand Down
6 changes: 6 additions & 0 deletions core/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"math"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
cmath "github.com/ethereum/go-ethereum/common/math"
Expand All @@ -29,6 +30,9 @@ import (
"github.com/ethereum/go-ethereum/params"
)

// TODO delete after debug performance metrics
var DebugInnerExecutionDuration time.Duration

// ExecutionResult includes all output after executing given evm
// message no matter the execution itself is successful or not.
type ExecutionResult struct {
Expand Down Expand Up @@ -473,13 +477,15 @@ func (st *StateTransition) innerTransitionDb() (*ExecutionResult, error) {
ret []byte
vmerr error // vm errors do not effect consensus and are therefore not assigned to err
)
start := time.Now()
if contractCreation {
ret, _, st.gasRemaining, vmerr = st.evm.Create(sender, msg.Data, st.gasRemaining, msg.Value)
} else {
// Increment the nonce for the next transaction
st.state.SetNonce(msg.From, st.state.GetNonce(sender.Address())+1)
ret, st.gasRemaining, vmerr = st.evm.Call(sender, st.to(), msg.Data, st.gasRemaining, msg.Value)
}
DebugInnerExecutionDuration += time.Since(start)

// if deposit: skip refunds, skip tipping coinbase
// Regolith changes this behaviour to report the actual gasUsed instead of always reporting all gas used.
Expand Down
25 changes: 25 additions & 0 deletions eth/catalyst/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,19 @@ import (
"github.com/ethereum/go-ethereum/eth"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/metrics"
"github.com/ethereum/go-ethereum/miner"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/rpc"
)

var (
forkchoiceUpdateAttributesTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/attributes", nil)
forkchoiceUpdateHeadsTimer = metrics.NewRegisteredTimer("api/engine/forkchoiceUpdate/heads", nil)
getPayloadTimer = metrics.NewRegisteredTimer("api/engine/get/payload", nil)
newPayloadTimer = metrics.NewRegisteredTimer("api/engine/new/payload", nil)
)

// Register adds the engine API to the full node.
func Register(stack *node.Node, backend *eth.Ethereum) error {
log.Warn("Engine API enabled", "protocol", "eth")
Expand Down Expand Up @@ -228,6 +236,8 @@ func checkAttribute(active func(*big.Int, uint64) bool, exists bool, block *big.
}

func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payloadAttributes *engine.PayloadAttributes) (engine.ForkChoiceResponse, error) {
start := time.Now()

api.forkchoiceLock.Lock()
defer api.forkchoiceLock.Unlock()

Expand Down Expand Up @@ -398,8 +408,12 @@ func (api *ConsensusAPI) forkchoiceUpdated(update engine.ForkchoiceStateV1, payl
return valid(nil), engine.InvalidPayloadAttributes.With(err)
}
api.localBlocks.put(id, payload)
forkchoiceUpdateAttributesTimer.UpdateSince(start)
log.Debug("forkchoiceUpdateAttributesTimer", "duration", common.PrettyDuration(time.Since(start)), "id", id)
return valid(&id), nil
}
forkchoiceUpdateHeadsTimer.UpdateSince(start)
log.Debug("forkchoiceUpdateAttributesTimer", "duration", common.PrettyDuration(time.Since(start)), "hash", update.HeadBlockHash)
return valid(nil), nil
}

Expand Down Expand Up @@ -453,6 +467,11 @@ func (api *ConsensusAPI) GetPayloadV3(payloadID engine.PayloadID) (*engine.Execu
}

func (api *ConsensusAPI) getPayload(payloadID engine.PayloadID, full bool) (*engine.ExecutionPayloadEnvelope, error) {
start := time.Now()
defer func () {
getPayloadTimer.UpdateSince(start)
log.Debug("getPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "id", payloadID)
} ()
log.Trace("Engine API request received", "method", "GetPayload", "id", payloadID)
data := api.localBlocks.get(payloadID, full)
if data == nil {
Expand Down Expand Up @@ -507,6 +526,12 @@ func (api *ConsensusAPI) NewPayloadV3(params engine.ExecutableData, versionedHas
}

func (api *ConsensusAPI) newPayload(params engine.ExecutableData, versionedHashes []common.Hash, beaconRoot *common.Hash) (engine.PayloadStatusV1, error) {
start := time.Now()
defer func () {
newPayloadTimer.UpdateSince(start)
log.Debug("newPayloadTimer", "duration", common.PrettyDuration(time.Since(start)), "parentHash", params.ParentHash)
} ()

// The locking here is, strictly, not required. Without these locks, this can happen:
//
// 1. NewPayload( execdata-N ) is invoked from the CL. It goes all the way down to
Expand Down
10 changes: 10 additions & 0 deletions ethdb/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package ethdb

import "github.com/ethereum/go-ethereum/metrics"

var (
EthdbGetTimer = metrics.NewRegisteredTimer("ethdb/get/time", nil)
EthdbPutTimer = metrics.NewRegisteredTimer("ethdb/put/time", nil)
EthdbDeleteTimer = metrics.NewRegisteredTimer("ethdb/delete/time", nil)
EthdbBatchWriteTimer = metrics.NewRegisteredTimer("ethdb/batch/write/time", nil)
)
17 changes: 17 additions & 0 deletions ethdb/pebble/pebble.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,10 @@ func (d *Database) Has(key []byte) (bool, error) {

// Get retrieves the given key if it's present in the key-value store.
func (d *Database) Get(key []byte) ([]byte, error) {
if metrics.EnabledExpensive {
start := time.Now()
defer func() { ethdb.EthdbGetTimer.UpdateSince(start) }()
}
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
Expand All @@ -316,6 +320,10 @@ func (d *Database) Get(key []byte) ([]byte, error) {

// Put inserts the given value into the key-value store.
func (d *Database) Put(key []byte, value []byte) error {
if metrics.EnabledExpensive {
start := time.Now()
defer func() { ethdb.EthdbPutTimer.UpdateSince(start) }()
}
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
Expand All @@ -326,6 +334,10 @@ func (d *Database) Put(key []byte, value []byte) error {

// Delete removes the key from the key-value store.
func (d *Database) Delete(key []byte) error {
if metrics.EnabledExpensive {
start := time.Now()
defer func() { ethdb.EthdbDeleteTimer.UpdateSince(start) }()
}
d.quitLock.RLock()
defer d.quitLock.RUnlock()
if d.closed {
Expand Down Expand Up @@ -482,6 +494,7 @@ func (d *Database) meter(refresh time.Duration, namespace string) {
nonLevel0CompCount = int64(d.nonLevel0Comp.Load())
level0CompCount = int64(d.level0Comp.Load())
)
d.log.Info("loop print db stats", "comp_time", compTime, "write_delay_count", writeDelayCount, "write_delay_time", writeDelayTime, "non_level0_comp_count", nonLevel0CompCount, "level0_comp_count", level0CompCount)
writeDelayTimes[i%2] = writeDelayTime
writeDelayCounts[i%2] = writeDelayCount
compTimes[i%2] = compTime
Expand Down Expand Up @@ -580,6 +593,10 @@ func (b *batch) ValueSize() int {

// Write flushes any accumulated data to disk.
func (b *batch) Write() error {
if metrics.EnabledExpensive {
start := time.Now()
defer func() { ethdb.EthdbBatchWriteTimer.UpdateSince(start) }()
}
b.db.quitLock.RLock()
defer b.db.quitLock.RUnlock()
if b.db.closed {
Expand Down
Loading

0 comments on commit a96c695

Please sign in to comment.