Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add async buffer #69

Merged
merged 4 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,12 @@ var (
Usage: "Scheme to use for storing ethereum state ('hash' or 'path')",
Category: flags.StateCategory,
}
PathDBSyncFlag = &cli.BoolFlag{
Name: "pathdb.sync",
Usage: "sync flush nodes cache to disk in path schema",
Value: false,
Category: flags.StateCategory,
}
StateHistoryFlag = &cli.Uint64Flag{
Name: "history.state",
Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)",
Expand Down Expand Up @@ -1856,6 +1862,9 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
log.Warn("The flag --txlookuplimit is deprecated and will be removed, please use --history.transactions")
cfg.TransactionHistory = ctx.Uint64(TxLookupLimitFlag.Name)
}
if ctx.IsSet(PathDBSyncFlag.Name) {
cfg.PathSyncFlush = true
}
if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 {
cfg.TransactionHistory = 0
log.Warn("Disabled transaction unindexing for archive node")
Expand Down
18 changes: 13 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type CacheConfig struct {
Preimages bool // Whether to store preimage of trie key to the disk
StateHistory uint64 // Number of blocks from head whose state histories are reserved.
StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top
PathSyncFlush bool // Whether sync flush the trienodebuffer of pathdb to disk.

SnapshotNoBuild bool // Whether the background generation is allowed
SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it
Expand All @@ -163,6 +164,7 @@ func (c *CacheConfig) triedbConfig() *trie.Config {
}
if c.StateScheme == rawdb.PathScheme {
config.PathDB = &pathdb.Config{
SyncFlush: c.PathSyncFlush,
StateHistory: c.StateHistory,
CleanCacheSize: c.TrieCleanLimit * 1024 * 1024,
DirtyCacheSize: c.TrieDirtyLimit * 1024 * 1024,
Expand Down Expand Up @@ -372,6 +374,12 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis
if bc.cacheConfig.SnapshotLimit > 0 {
diskRoot = rawdb.ReadSnapshotRoot(bc.db)
}
if bc.triedb.Scheme() == rawdb.PathScheme {
recoverable, _ := bc.triedb.Recoverable(diskRoot)
if !bc.HasState(diskRoot) && !recoverable {
diskRoot = bc.triedb.Head()
}
}
if diskRoot != (common.Hash{}) {
log.Warn("Head state missing, repairing", "number", head.Number, "hash", head.Hash(), "snaproot", diskRoot)

Expand Down Expand Up @@ -1042,7 +1050,7 @@ func (bc *BlockChain) Stop() {
for !bc.triegc.Empty() {
triedb.Dereference(bc.triegc.PopItem())
}
if _, nodes, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb
if _, nodes, _, _ := triedb.Size(); nodes != 0 { // all memory is contained within the nodes return for hashdb
log.Error("Dangling trie nodes after full cleanup")
}
}
Expand Down Expand Up @@ -1465,8 +1473,8 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types.
}
// If we exceeded our memory allowance, flush matured singleton nodes to disk
var (
_, nodes, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
_, nodes, _, imgs = bc.triedb.Size() // all memory is contained within the nodes return for hashdb
limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024
)
if nodes > limit || imgs > 4*1024*1024 {
bc.triedb.Cap(limit - ethdb.IdealBatchSize)
Expand Down Expand Up @@ -1921,8 +1929,8 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if bc.snaps != nil {
snapDiffItems, snapBufItems = bc.snaps.Size()
}
trieDiffNodes, trieBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, setHead)
trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ := bc.triedb.Size()
stats.report(chain, it.index, snapDiffItems, snapBufItems, trieDiffNodes, trieBufNodes, trieImmutableBufNodes, setHead)

if !setHead {
// After merge we expect few side chains. Simply count
Expand Down
3 changes: 2 additions & 1 deletion core/blockchain_insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ const statsReportLimit = 8 * time.Second

// report prints statistics if some number of blocks have been processed
// or more than a few seconds have passed since the last message.
func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes common.StorageSize, setHead bool) {
func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, snapBufItems, trieDiffNodes, triebufNodes, trieImmutableBufNodes common.StorageSize, setHead bool) {
// Fetch the timings for the batch
var (
now = mclock.Now()
Expand Down Expand Up @@ -73,6 +73,7 @@ func (st *insertStats) report(chain []*types.Block, index int, snapDiffItems, sn
context = append(context, []interface{}{"triediffs", trieDiffNodes}...)
}
context = append(context, []interface{}{"triedirty", triebufNodes}...)
context = append(context, []interface{}{"trieimutabledirty", trieImmutableBufNodes}...)

if st.queued > 0 {
context = append(context, []interface{}{"queued", st.queued}...)
Expand Down
9 changes: 8 additions & 1 deletion core/blockchain_snapshot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,13 @@ func TestNoCommitCrashWithNewSnapshot(t *testing.T) {
// Expected head block : G
// Expected snapshot disk : C4
for _, scheme := range []string{rawdb.HashScheme, rawdb.PathScheme} {
// in hash scheme ,rewind use snapshot root and the snapshot root block number = 4.
// Last committed disk layer, wait recovery
snapshotBottom := uint64(4)
if scheme == rawdb.PathScheme {
// in path scheme, rewind use trie head as disk root and trie head block number = 0.
snapshotBottom = 0
}
test := &crashSnapshotTest{
snapshotTestBasic{
scheme: scheme,
Expand All @@ -493,7 +500,7 @@ func TestNoCommitCrashWithNewSnapshot(t *testing.T) {
expHeadHeader: 8,
expHeadFastBlock: 8,
expHeadBlock: 0,
expSnapshotBottom: 4, // Last committed disk layer, wait recovery
expSnapshotBottom: snapshotBottom,
},
}
test.test(t)
Expand Down
2 changes: 1 addition & 1 deletion core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1844,7 +1844,7 @@ func TestTrieForkGC(t *testing.T) {
chain.TrieDB().Dereference(blocks[len(blocks)-1-i].Root())
chain.TrieDB().Dereference(forks[len(blocks)-1-i].Root())
}
if _, nodes, _ := chain.TrieDB().Size(); nodes > 0 { // all memory is returned in the nodes return for hashdb
if _, nodes, _, _ := chain.TrieDB().Size(); nodes > 0 { // all memory is returned in the nodes return for hashdb
t.Fatalf("stale tries still alive after garbase collection")
}
}
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
StateHistory: config.StateHistory,
StateScheme: scheme,
TrieCommitInterval: config.TrieCommitInterval,
PathSyncFlush: config.PathSyncFlush,
}
)
// Override the chain config with provided settings.
Expand Down
3 changes: 2 additions & 1 deletion eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ type Config struct {
// State scheme represents the scheme used to store ethereum states and trie
// nodes on top. It can be 'hash', 'path', or none which means use the scheme
// consistent with persistent state.
StateScheme string `toml:",omitempty"`
StateScheme string `toml:",omitempty"`
PathSyncFlush bool `toml:",omitempty"` // State scheme used to store ethereum state and merkle trie nodes on top

// RequiredBlocks is a set of block number -> hash mappings which must be in the
// canonical chain of all remote peers. Setting the option makes geth verify the
Expand Down
6 changes: 6 additions & 0 deletions eth/ethconfig/gen_config.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions eth/state_accessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ func (eth *Ethereum) hashState(ctx context.Context, block *types.Block, reexec u
parent = root
}
if report {
_, nodes, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "preimages", imgs)
_, nodes, immutablenodes, imgs := triedb.Size() // all memory is contained within the nodes return in hashdb
log.Info("Historical state regenerated", "block", current.NumberU64(), "elapsed", time.Since(start), "nodes", nodes, "immutablenodes", immutablenodes, "preimages", imgs)
}
return statedb, func() { triedb.Dereference(block.Root()) }, nil
}
Expand Down
4 changes: 2 additions & 2 deletions eth/tracers/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ func (api *API) traceChain(start, end *types.Block, config *TraceConfig, closed
// if the relevant state is available in disk.
var preferDisk bool
if statedb != nil {
s1, s2, s3 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2+s3 > defaultTracechainMemLimit
s1, s2, s3, s4 := statedb.Database().TrieDB().Size()
preferDisk = s1+s2+s3+s4 > defaultTracechainMemLimit
}
statedb, release, err = api.backend.StateAtBlock(ctx, block, reexec, statedb, false, preferDisk)
if err != nil {
Expand Down
67 changes: 56 additions & 11 deletions trie/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package trie

import (
"errors"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/trie/triedb/hashdb"
Expand Down Expand Up @@ -57,7 +59,7 @@ type backend interface {
//
// For hash scheme, there is no differentiation between diff layer nodes
// and dirty disk layer nodes, so both are merged into the second return.
Size() (common.StorageSize, common.StorageSize)
Size() (common.StorageSize, common.StorageSize, common.StorageSize)

// Update performs a state transition by committing dirty nodes contained
// in the given set in order to update state from the specified parent to
Expand Down Expand Up @@ -89,8 +91,22 @@ type Database struct {
// the legacy hash-based scheme is used by default.
func NewDatabase(diskdb ethdb.Database, config *Config) *Database {
// Sanitize the config and use the default one if it's not specified.
dbScheme := rawdb.ReadStateScheme(diskdb)
if config == nil {
config = HashDefaults
if dbScheme == rawdb.PathScheme {
config = &Config{
PathDB: pathdb.Defaults,
}
} else {
config = HashDefaults
}
}
if config.PathDB == nil && config.HashDB == nil {
if dbScheme == rawdb.PathScheme {
config.PathDB = pathdb.Defaults
} else {
config.HashDB = hashdb.Defaults
}
}
var preimages *preimageStore
if config.Preimages {
Expand All @@ -101,12 +117,30 @@ func NewDatabase(diskdb ethdb.Database, config *Config) *Database {
diskdb: diskdb,
preimages: preimages,
}
if config.HashDB != nil && config.PathDB != nil {
log.Crit("Both 'hash' and 'path' mode are configured")
}
if config.PathDB != nil {
/*
* 1. First, initialize db according to the user config
* 2. Second, initialize the db according to the scheme already used by db
* 3. Last, use the default scheme, namely hash scheme
*/
if config.HashDB != nil {
if rawdb.ReadStateScheme(diskdb) == rawdb.PathScheme {
log.Warn("incompatible state scheme", "old", rawdb.PathScheme, "new", rawdb.HashScheme)
}
db.backend = hashdb.New(diskdb, config.HashDB, mptResolver{})
} else if config.PathDB != nil {
if rawdb.ReadStateScheme(diskdb) == rawdb.HashScheme {
log.Warn("incompatible state scheme", "old", rawdb.HashScheme, "new", rawdb.PathScheme)
}
db.backend = pathdb.New(diskdb, config.PathDB)
} else if strings.Compare(dbScheme, rawdb.PathScheme) == 0 {
if config.PathDB == nil {
config.PathDB = pathdb.Defaults
}
db.backend = pathdb.New(diskdb, config.PathDB)
} else {
if config.HashDB == nil {
config.HashDB = hashdb.Defaults
}
db.backend = hashdb.New(diskdb, config.HashDB, mptResolver{})
}
return db
Expand Down Expand Up @@ -151,16 +185,16 @@ func (db *Database) Commit(root common.Hash, report bool) error {
// Size returns the storage size of diff layer nodes above the persistent disk
// layer, the dirty nodes buffered within the disk layer, and the size of cached
// preimages.
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize, common.StorageSize) {
var (
diffs, nodes common.StorageSize
preimages common.StorageSize
diffs, nodes, immutablenodes common.StorageSize
preimages common.StorageSize
)
diffs, nodes = db.backend.Size()
diffs, nodes, immutablenodes = db.backend.Size()
if db.preimages != nil {
preimages = db.preimages.size()
}
return diffs, nodes, preimages
return diffs, nodes, immutablenodes, preimages
}

// Initialized returns an indicator if the state data is already initialized
Expand Down Expand Up @@ -318,3 +352,14 @@ func (db *Database) SetBufferSize(size int) error {
}
return pdb.SetBufferSize(size)
}

// Head return the top non-fork difflayer/disklayer root hash for rewinding.
// It's only supported by path-based database and will return empty hash for
// others.
func (db *Database) Head() common.Hash {
pdb, ok := db.backend.(*pathdb.Database)
if !ok {
return common.Hash{}
}
return pdb.Head()
}
4 changes: 2 additions & 2 deletions trie/triedb/hashdb/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,15 +627,15 @@ func (db *Database) Update(root common.Hash, parent common.Hash, block uint64, n
//
// The first return will always be 0, representing the memory stored in unbounded
// diff layers above the dirty cache. This is only available in pathdb.
func (db *Database) Size() (common.StorageSize, common.StorageSize) {
func (db *Database) Size() (common.StorageSize, common.StorageSize, common.StorageSize) {
db.lock.RLock()
defer db.lock.RUnlock()

// db.dirtiesSize only contains the useful data in the cache, but when reporting
// the total memory consumption, the maintenance metadata is also needed to be
// counted.
var metadataSize = common.StorageSize(len(db.dirties) * cachedNodeSize)
return 0, db.dirtiesSize + db.childrenSize + metadataSize
return 0, db.dirtiesSize + db.childrenSize + metadataSize, 0
}

// Close closes the trie database and releases all held resources.
Expand Down
Loading
Loading