Skip to content

Commit

Permalink
feat: add async buffer (#69)
Browse files Browse the repository at this point in the history
Co-authored-by: joeylichang <joeycli0919@gmail.com>
Co-authored-by: will@2012 <xibaow2020@qq.com>
  • Loading branch information
3 people authored Mar 19, 2024
1 parent ebb5060 commit 6c6833a
Show file tree
Hide file tree
Showing 19 changed files with 737 additions and 48 deletions.
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ var (
Usage: "Scheme to use for storing ethereum state ('hash' or 'path')",
Category: flags.StateCategory,
}
PathDBSyncFlag = &cli.BoolFlag{
Name: "pathdb.sync",
Usage: "sync flush nodes cache to disk in path schema",
Value: false,
Category: flags.StateCategory,
}
StateHistoryFlag = &cli.Uint64Flag{
Name: "history.state",
Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)",
Expand Down Expand Up @@ -1856,6 +1862,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
log.Warn("The flag --txlookuplimit is deprecated and will be removed, please use --history.transactions")
cfg.TransactionHistory = ctx.Uint64(TxLookupLimitFlag.Name)
}
if ctx.IsSet(PathDBSyncFlag.Name) {
cfg.PathSyncFlush = true
}
if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 {
cfg.TransactionHistory = 0
log.Warn("Disabled transaction unindexing for archive node")
Expand Down
18 changes: 13 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type CacheConfig struct {
Preimages bool // Whether to store preimage of trie key to the disk
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk.

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand All @@ -163,6 +164,7 @@ func (c *CacheConfig) triedbConfig() *trie.Config {
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
SyncFlush: c.PathSyncFlush,
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
Expand Down Expand Up @@ -372,6 +374,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if bc.triedb.Scheme() == rawdb.PathScheme {
recoverable, _ := bc.triedb.Recoverable(diskRoot)
if !bc.HasState(diskRoot) && !recoverable {
diskRoot = bc.triedb.Head()
}
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "snaproot", diskRoot)

Expand Down Expand Up @@ -1042,7 +1050,7 @@ func (bc *BlockChain) Stop() {
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem())
}
if _, nodes, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb
if _, nodes, _, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb
log.Error("Dangling trie nodes after full cleanup")
}
}
Expand Down Expand Up @@ -1465,8 +1473,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
_, nodes, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
_, nodes, _, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
bc.triedb.Cap(limit - ethdb.IdealBatchSize)
Expand Down Expand Up @@ -1921,8 +1929,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if bc.snaps != nil {
snapDiffItems, snapBufItems = bc.snaps.Size()
}
trieDiffNodes, trieBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead)
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)

if !setHead {
// After merge we expect few side chains. Simply count
Expand Down
3 changes: 2 additions & 1 deletion core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes common.StorageSize, setHead bool) {
func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes, trieImmutableBufNodes common.StorageSize, setHead bool) {
// Fetch the timings for the batch
var (
now = mclock.Now()
Expand Down Expand Up @@ -73,6 +73,7 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn
context = append(context, []interface{}{"triediffs", trieDiffNodes}...)
}
context = append(context, []interface{}{"triedirty", triebufNodes}...)
context = append(context, []interface{}{"trieimutabledirty", trieImmutableBufNodes}...)

if st.queued > 0 {
context = append(context, []interface{}{"queued", st.queued}...)
Expand Down
9 changes: 8 additions & 1 deletion core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,13 @@ func TestNoCommitCrashWithNewSnapshot(t *testing.T) {
// Expected head block : G
// Expected snapshot disk : C4
for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} {
// in hash scheme ,rewind use snapshot root and the snapshot root block number = 4.
// Last committed disk layer, wait recovery
snapshotBottom := uint64(4)
if scheme == rawdb.PathScheme {
// in path scheme, rewind use trie head as disk root and trie head block number = 0.
snapshotBottom = 0
}
test := &crashSnapshotTest{
snapshotTestBasic{
scheme: scheme,
Expand All @@ -493,7 +500,7 @@ func TestNoCommitCrashWithNewSnapshot(t *testing.T) {
expHeadHeader: 8,
expHeadFastBlock: 8,
expHeadBlock: 0,
expSnapshotBottom: 4, // Last committed disk layer, wait recovery
expSnapshotBottom: snapshotBottom,
},
}
test.test(t)
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1844,7 +1844,7 @@ func TestTrieForkGC(t *testing.T) {
chain.TrieDB().Dereference(blocks[len(blocks)-1-i].Root())
chain.TrieDB().Dereference(forks[len(blocks)-1-i].Root())
}
if _, nodes, _ := chain.TrieDB().Size(); nodes > 0 { // all memory is returned in the nodes return for hashdb
if _, nodes, _, _ := chain.TrieDB().Size(); nodes > 0 { // all memory is returned in the nodes return for hashdb
t.Fatalf("stale tries still alive after garbase collection")
}
}
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
StateHistory: config.StateHistory,
StateScheme: scheme,
TrieCommitInterval: config.TrieCommitInterval,
PathSyncFlush: config.PathSyncFlush,
}
)
// Override the chain config with provided settings.
Expand Down
3 changes: 2 additions & 1 deletion eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ type Config struct {
// State scheme represents the scheme used to store ethereum states and trie
// nodes on top. It can be 'hash', 'path', or none which means use the scheme
// consistent with persistent state.
StateScheme string `toml:",omitempty"`
StateScheme string `toml:",omitempty"`
PathSyncFlush bool `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top

// RequiredBlocks is a set of block number -> hash mappings which must be in the
// canonical chain of all remote peers. Setting the option makes geth verify the
Expand Down
6 changes: 6 additions & 0 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions eth/state_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u
parent = root
}
if report {
_, nodes, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs)
_, nodes, immutablenodes, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "immutablenodes", immutablenodes, "preimages", imgs)
}
return statedb, func() { triedb.Dereference(block.Root()) }, nil
}
Expand Down
4 changes: 2 additions & 2 deletions eth/tracers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
// if the relevant state is available in disk.
var preferDisk bool
if statedb != nil {
s1, s2, s3 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2+s3 > defaultTracechainMemLimit
s1, s2, s3, s4 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2+s3+s4 > defaultTracechainMemLimit
}
statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk)
if err != nil {
Expand Down
67 changes: 56 additions & 11 deletions trie/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package trie

import (
"errors"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
Expand Down Expand Up @@ -57,7 +59,7 @@ type backend interface {
//
// For hash scheme, there is no differentiation between diff layer nodes
// and dirty disk layer nodes, so both are merged into the second return.
Size() (common.StorageSize, common.StorageSize)
Size() (common.StorageSize, common.StorageSize, common.StorageSize)

// Update performs a state transition by committing dirty nodes contained
// in the given set in order to update state from the specified parent to
Expand Down Expand Up @@ -89,8 +91,22 @@ type Database struct {
// the legacy hash-based scheme is used by default.
func NewDatabase(diskdb ethdb.Database, config *Config) *Database {
// Sanitize the config and use the default one if it's not specified.
dbScheme := rawdb.ReadStateScheme(diskdb)
if config == nil {
config = HashDefaults
if dbScheme == rawdb.PathScheme {
config = &Config{
PathDB: pathdb.Defaults,
}
} else {
config = HashDefaults
}
}
if config.PathDB == nil && config.HashDB == nil {
if dbScheme == rawdb.PathScheme {
config.PathDB = pathdb.Defaults
} else {
config.HashDB = hashdb.Defaults
}
}
var preimages *preimageStore
if config.Preimages {
Expand All @@ -101,12 +117,30 @@ func NewDatabase(diskdb ethdb.Database, config *Config) *Database {
diskdb: diskdb,
preimages: preimages,
}
if config.HashDB != nil && config.PathDB != nil {
log.Crit("Both 'hash' and 'path' mode are configured")
}
if config.PathDB != nil {
/*
* 1. First, initialize db according to the user config
* 2. Second, initialize the db according to the scheme already used by db
* 3. Last, use the default scheme, namely hash scheme
*/
if config.HashDB != nil {
if rawdb.ReadStateScheme(diskdb) == rawdb.PathScheme {
log.Warn("incompatible state scheme", "old", rawdb.PathScheme, "new", rawdb.HashScheme)
}
db.backend = hashdb.New(diskdb, config.HashDB, mptResolver{})
} else if config.PathDB != nil {
if rawdb.ReadStateScheme(diskdb) == rawdb.HashScheme {
log.Warn("incompatible state scheme", "old", rawdb.HashScheme, "new", rawdb.PathScheme)
}
db.backend = pathdb.New(diskdb, config.PathDB)
} else if strings.Compare(dbScheme, rawdb.PathScheme) == 0 {
if config.PathDB == nil {
config.PathDB = pathdb.Defaults
}
db.backend = pathdb.New(diskdb, config.PathDB)
} else {
if config.HashDB == nil {
config.HashDB = hashdb.Defaults
}
db.backend = hashdb.New(diskdb, config.HashDB, mptResolver{})
}
return db
Expand Down Expand Up @@ -151,16 +185,16 @@ func (db *Database) Commit(root common.Hash, report bool) error {
// Size returns the storage size of diff layer nodes above the persistent disk
// layer, the dirty nodes buffered within the disk layer, and the size of cached
// preimages.
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize, common.StorageSize) {
var (
diffs, nodes common.StorageSize
preimages common.StorageSize
diffs, nodes, immutablenodes common.StorageSize
preimages common.StorageSize
)
diffs, nodes = db.backend.Size()
diffs, nodes, immutablenodes = db.backend.Size()
if db.preimages != nil {
preimages = db.preimages.size()
}
return diffs, nodes, preimages
return diffs, nodes, immutablenodes, preimages
}

// Initialized returns an indicator if the state data is already initialized
Expand Down Expand Up @@ -318,3 +352,14 @@ func (db *Database) SetBufferSize(size int) error {
}
return pdb.SetBufferSize(size)
}

// Head return the top non-fork difflayer/disklayer root hash for rewinding.
// It's only supported by path-based database and will return empty hash for
// others.
func (db *Database) Head() common.Hash {
pdb, ok := db.backend.(*pathdb.Database)
if !ok {
return common.Hash{}
}
return pdb.Head()
}
4 changes: 2 additions & 2 deletions trie/triedb/hashdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,15 +627,15 @@ func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, n
//
// The first return will always be 0, representing the memory stored in unbounded
// diff layers above the dirty cache. This is only available in pathdb.
func (db *Database) Size() (common.StorageSize, common.StorageSize) {
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {
db.lock.RLock()
defer db.lock.RUnlock()

// db.dirtiesSize only contains the useful data in the cache, but when reporting
// the total memory consumption, the maintenance metadata is also needed to be
// counted.
var metadataSize = common.StorageSize(len(db.dirties) * cachedNodeSize)
return 0, db.dirtiesSize + db.childrenSize + metadataSize
return 0, db.dirtiesSize + db.childrenSize + metadataSize, 0
}

// Close closes the trie database and releases all held resources.
Expand Down
Loading

0 comments on commit 6c6833a

Please sign in to comment.