diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 4ceb9c9581..15f6bd5f93 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -96,6 +96,8 @@ var ( utils.StateHistoryFlag, utils.ProposeBlockIntervalFlag, utils.PathDBNodeBufferTypeFlag, + utils.EnableProofKeeperFlag, + utils.KeepProofBlockSpanFlag, utils.LightServeFlag, utils.LightIngressFlag, utils.LightEgressFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index fc9ae0f96a..b41e802126 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -23,7 +23,6 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/ethereum/go-ethereum/core/opcodeCompiler/compiler" "math" "math/big" "net" @@ -35,6 +34,8 @@ import ( "strings" "time" + "github.com/ethereum/go-ethereum/core/opcodeCompiler/compiler" + "github.com/ethereum/go-ethereum/accounts" "github.com/ethereum/go-ethereum/accounts/keystore" "github.com/ethereum/go-ethereum/common" @@ -313,6 +314,18 @@ var ( Value: pathdb.DefaultProposeBlockInterval, Category: flags.StateCategory, } + EnableProofKeeperFlag = &cli.BoolFlag{ + Name: "pathdb.enableproofkeeper", + Usage: "Enable path db proof keeper for store proposed proof", + Value: false, + Category: flags.StateCategory, + } + KeepProofBlockSpanFlag = &cli.Uint64Flag{ + Name: "pathdb.keepproofblockspan", + Usage: "Block span of keep proof (default = 90,000 blocks)", + Value: params.FullImmutabilityThreshold, + Category: flags.StateCategory, + } StateHistoryFlag = &cli.Uint64Flag{ Name: "history.state", Usage: "Number of recent blocks to retain state history for (default = 90,000 blocks, 0 = entire chain)", @@ -1881,6 +1894,12 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { if ctx.IsSet(ProposeBlockIntervalFlag.Name) { cfg.ProposeBlockInterval = ctx.Uint64(ProposeBlockIntervalFlag.Name) } + if ctx.IsSet(EnableProofKeeperFlag.Name) { + cfg.EnableProofKeeper = ctx.Bool(EnableProofKeeperFlag.Name) + } + if ctx.IsSet(KeepProofBlockSpanFlag.Name) { + cfg.KeepProofBlockSpan = ctx.Uint64(KeepProofBlockSpanFlag.Name) + } if ctx.String(GCModeFlag.Name) == "archive" && cfg.TransactionHistory != 0 { cfg.TransactionHistory = 0 log.Warn("Disabled transaction unindexing for archive node") diff --git a/common/types.go b/common/types.go index 79df1560e4..37a21f276b 100644 --- a/common/types.go +++ b/common/types.go @@ -480,11 +480,13 @@ func (d *Decimal) UnmarshalJSON(input []byte) error { // hex-strings for delivery to rpc-caller. type ProofList []string +// Put implements ethdb.KeyValueWriter put interface. func (n *ProofList) Put(key []byte, value []byte) error { *n = append(*n, hexutil.Encode(value)) return nil } +// Delete implements ethdb.KeyValueWriter delete interface. func (n *ProofList) Delete(key []byte) error { panic("not supported") } @@ -497,7 +499,7 @@ type AccountResult struct { CodeHash Hash `json:"codeHash"` Nonce uint64 `json:"nonce"` StorageHash Hash `json:"storageHash"` - StorageProof []StorageResult `json:"storageProof"` // optional + StorageProof []StorageResult `json:"storageProof"` } // StorageResult provides a proof for a key-value pair. diff --git a/core/blockchain.go b/core/blockchain.go index 532dcce9ed..10dbc96b67 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -47,7 +47,6 @@ import ( "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" - "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie/triedb/hashdb" "github.com/ethereum/go-ethereum/trie/triedb/pathdb" @@ -157,11 +156,12 @@ type CacheConfig struct { StateScheme string // Scheme used to store ethereum states and merkle tree nodes on top PathNodeBuffer pathdb.NodeBufferType // Type of trienodebuffer to cache trie nodes in disklayer ProposeBlockInterval uint64 // Propose block to L1 block interval. + EnableProofKeeper bool // Whether to enable proof keeper + KeepProofBlockSpan uint64 // Block span of keep proof SnapshotNoBuild bool // Whether the background generation is allowed SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it TrieCommitInterval uint64 // Define a block height interval, commit trie every TrieCommitInterval block height. - RpcClient *rpc.Client } // triedbConfig derives the configures for trie database. @@ -230,6 +230,7 @@ type BlockChain struct { flushInterval atomic.Int64 // Time interval (processing time) after which to flush a state triedb *trie.Database // The database handler for maintaining trie nodes. stateCache state.Database // State database to reuse between imports (contains state cache) + proofKeeper *ProofKeeper // Store/Query op-proposal proof to ensure consistent. // txLookupLimit is the maximum number of blocks from head whose tx indices // are reserved: @@ -281,8 +282,6 @@ type BlockChain struct { processor Processor // Block transaction processor interface forker *ForkChoice vmConfig vm.Config - - ProofKeeper *ProofKeeper } // NewBlockChain returns a fully initialised block chain using information @@ -293,7 +292,8 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis cacheConfig = defaultCacheConfig } opts := &proofKeeperOptions{ - enable: true, // todo + enable: cacheConfig.EnableProofKeeper, + keepProofBlockSpan: cacheConfig.KeepProofBlockSpan, watchStartKeepCh: make(chan *pathdb.KeepRecord), notifyFinishKeepCh: make(chan struct{}), } @@ -352,7 +352,7 @@ func NewBlockChain(db ethdb.Database, cacheConfig *CacheConfig, genesis *Genesis if err != nil { return nil, err } - bc.ProofKeeper = proofKeeper + bc.proofKeeper = proofKeeper bc.hc, err = NewHeaderChain(db, chainConfig, engine, bc.insertStopped) if err != nil { @@ -1048,6 +1048,9 @@ func (bc *BlockChain) Stop() { if err := bc.triedb.Journal(bc.CurrentBlock().Root); err != nil { log.Info("Failed to journal in-memory trie nodes", "err", err) } + if err := bc.proofKeeper.Stop(); err != nil { + log.Info("Failed to stop proof keeper", "err", err) + } } else { // Ensure the state of a recent block is also stored to disk before exiting. // We're writing three different states to catch different restart scenarios: diff --git a/core/blockchain_reader.go b/core/blockchain_reader.go index 30fbcb883b..d52a422aaa 100644 --- a/core/blockchain_reader.go +++ b/core/blockchain_reader.go @@ -425,3 +425,6 @@ func (bc *BlockChain) SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscript func (bc *BlockChain) SubscribeBlockProcessingEvent(ch chan<- bool) event.Subscription { return bc.scope.Track(bc.blockProcFeed.Subscribe(ch)) } + +// ProofKeeper returns block chain proof keeper. +func (bc *BlockChain) ProofKeeper() *ProofKeeper { return bc.proofKeeper } diff --git a/core/proof_keeper.go b/core/proof_keeper.go index 8a812308a2..39c2f07c1e 100644 --- a/core/proof_keeper.go +++ b/core/proof_keeper.go @@ -14,20 +14,33 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" + "github.com/ethereum/go-ethereum/params" trie2 "github.com/ethereum/go-ethereum/trie" "github.com/ethereum/go-ethereum/trie/triedb/pathdb" ) const ( + // l2ToL1MessagePasser pre-deploy address. l2ToL1MessagePasser = "0x4200000000000000000000000000000000000016" + + // gcProofIntervalSecond is used to control gc loop interval. + gcProofIntervalSecond = 3600 + + // maxKeeperMetaNumber is used to gc keep meta, trigger gc workflow + // when meta number > maxKeeperMetaNumber && meta.block_id < latest_block_id - keep_block_span. + maxKeeperMetaNumber = 100 ) var ( l2ToL1MessagePasserAddr = common.HexToAddress(l2ToL1MessagePasser) + addProofTimer = metrics.NewRegisteredTimer("proofkeeper/addproof/time", nil) + getInnerProofTimer = metrics.NewRegisteredTimer("proofkeeper/getinnerproof/time", nil) + queryProofTimer = metrics.NewRegisteredTimer("proofkeeper/queryproof/time", nil) ) // keeperMetaRecord is used to ensure proof continuous in scenarios such as enable/disable keeper, interval changes, reorg, etc. -// which is stored in kv db. +// which is stored in kv db, indexed by prefix+block-id. type keeperMetaRecord struct { BlockID uint64 `json:"blockID"` ProofID uint64 `json:"proofID"` @@ -35,7 +48,7 @@ type keeperMetaRecord struct { } // proofDataRecord is used to store proposed proof data. -// which is stored in ancient db. +// which is stored in ancient db, indexed by proof-id. type proofDataRecord struct { ProofID uint64 `json:"proofID"` BlockID uint64 `json:"blockID"` @@ -50,18 +63,16 @@ type proofDataRecord struct { StorageProof []common.StorageResult `json:"storageProof"` } -// todo: move metadb to opts. +// proofKeeperOptions defines proof keeper options. type proofKeeperOptions struct { enable bool + keepProofBlockSpan uint64 + gcInterval uint64 watchStartKeepCh chan *pathdb.KeepRecord notifyFinishKeepCh chan struct{} } -// todo: ensure ancient sync write?? -// add metris -// add ut -// polish log -// todo gc +// ProofKeeper is used to store proposed proof and op-proposer can query. type ProofKeeper struct { opts *proofKeeperOptions blockChain *BlockChain @@ -70,19 +81,35 @@ type ProofKeeper struct { queryProofCh chan uint64 waitQueryProofCh chan *proofDataRecord + stopCh chan struct{} + waitStopCh chan error + latestBlockID uint64 } +// newProofKeeper returns a proof keeper instance. func newProofKeeper(opts *proofKeeperOptions) *ProofKeeper { + if opts.keepProofBlockSpan == 0 { + opts.keepProofBlockSpan = params.FullImmutabilityThreshold + } + if opts.gcInterval == 0 { + opts.gcInterval = gcProofIntervalSecond + } keeper := &ProofKeeper{ opts: opts, queryProofCh: make(chan uint64), waitQueryProofCh: make(chan *proofDataRecord), + stopCh: make(chan struct{}), + waitStopCh: make(chan error), } log.Info("Succeed to init proof keeper", "options", opts) return keeper } +// Start is used to start event loop. func (keeper *ProofKeeper) Start(blockChain *BlockChain, keeperMetaDB ethdb.Database) error { + if !keeper.opts.enable { + return nil + } var ( err error ancientDir string @@ -104,10 +131,19 @@ func (keeper *ProofKeeper) Start(blockChain *BlockChain, keeperMetaDB ethdb.Data return nil } +// Stop is used to sync ancient db and stop the event loop. func (keeper *ProofKeeper) Stop() error { - return nil + if !keeper.opts.enable { + return nil + } + + close(keeper.stopCh) + err := <-keeper.waitStopCh + log.Info("Succeed to stop proof keeper", "error", err) + return err } +// GetKeepRecordWatchFunc returns a keeper callback func which is used by path db node buffer list. func (keeper *ProofKeeper) GetKeepRecordWatchFunc() pathdb.KeepRecordWatchFunc { return func(keepRecord *pathdb.KeepRecord) { if keeper == nil { @@ -125,21 +161,35 @@ func (keeper *ProofKeeper) GetKeepRecordWatchFunc() pathdb.KeepRecordWatchFunc { if keepRecord.BlockID%keepRecord.KeepInterval != 0 { return } + + startTimestamp := time.Now() + defer func() { + addProofTimer.UpdateSince(startTimestamp) + log.Info("Succeed to keep proof", "record", keepRecord, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) + }() + keeper.opts.watchStartKeepCh <- keepRecord <-keeper.opts.notifyFinishKeepCh - log.Info("Succeed to keep proof in stop", "record", keepRecord) } } +// getInnerProof is used to make proof by state db interface. func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofDataRecord, error) { var ( - err error - header *types.Header - stateDB *state.StateDB - worldTrie *trie2.StateTrie - pRecord *proofDataRecord + err error + header *types.Header + stateDB *state.StateDB + worldTrie *trie2.StateTrie + accountProof common.ProofList + pRecord *proofDataRecord ) + startTimestamp := time.Now() + defer func() { + getInnerProofTimer.UpdateSince(startTimestamp) + // log.Info("Succeed to get proof", "proof_record", pRecord, "error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) + }() + if header = keeper.blockChain.GetHeaderByNumber(kRecord.BlockID); header == nil { return nil, fmt.Errorf("block is not found, block_id=%d", kRecord.BlockID) } @@ -149,7 +199,6 @@ func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofData if worldTrie, err = trie2.NewStateTrie(trie2.StateTrieID(header.Root), stateDB.Database().TrieDB()); err != nil { return nil, err } - var accountProof common.ProofList if err = worldTrie.Prove(crypto.Keccak256(l2ToL1MessagePasserAddr.Bytes()), &accountProof); err != nil { return nil, err } @@ -165,27 +214,33 @@ func (keeper *ProofKeeper) getInnerProof(kRecord *pathdb.KeepRecord) (*proofData StorageProof: make([]common.StorageResult, 0), } err = stateDB.Error() - log.Info("Succeed to get proof", "proof_record", pRecord) return pRecord, err } +// eventLoop is used to update/query keeper meta and proof data in the event loop, which ensure thread-safe. func (keeper *ProofKeeper) eventLoop() { + if !keeper.opts.enable { + return + } var ( - putKeeperMetaRecordOnce bool // default = false - ancientInitSequenceID uint64 // default = 0 + err error + putKeeperMetaRecordOnce bool + ancientInitSequenceID uint64 ) + + gcProofTicker := time.NewTicker(time.Second * time.Duration(keeper.opts.gcInterval)) + defer gcProofTicker.Stop() + for { select { case keepRecord := <-keeper.opts.watchStartKeepCh: - // log.Info("keep proof", "record", keepRecord) var ( hasTruncatedMeta bool curProofID uint64 - startTimestamp time.Time + proofRecord *proofDataRecord ) - startTimestamp = time.Now() - proofRecord, err := keeper.getInnerProof(keepRecord) + proofRecord, err = keeper.getInnerProof(keepRecord) if err == nil { hasTruncatedMeta = keeper.truncateKeeperMetaRecordHeadIfNeeded(keepRecord.BlockID) metaList := keeper.getKeeperMetaRecordList() @@ -211,17 +266,19 @@ func (keeper *ProofKeeper) eventLoop() { }) } proofRecord.ProofID = curProofID - keeper.putProofDataRecord(proofRecord) + err = keeper.putProofDataRecord(proofRecord) + keeper.latestBlockID = keepRecord.BlockID } - log.Info("Keep a new proof", + log.Info("Succeed to keep a new proof", "block_id", keepRecord.BlockID, "state_root", keepRecord.StateRoot.String(), - "error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) + "error", err) keeper.opts.notifyFinishKeepCh <- struct{}{} + case queryBlockID := <-keeper.queryProofCh: var resultProofRecord *proofDataRecord metaList := keeper.getKeeperMetaRecordList() - if len(metaList) != 0 { + if len(metaList) != 0 && (queryBlockID+keeper.opts.keepProofBlockSpan > keeper.latestBlockID) { proofID := uint64(0) index := len(metaList) - 1 for index >= 0 { @@ -230,6 +287,7 @@ func (keeper *ProofKeeper) eventLoop() { if m.KeepInterval == 0 || queryBlockID%m.KeepInterval != 0 { // check break } + proofID = m.ProofID + (queryBlockID-m.BlockID)/m.KeepInterval resultProofRecord = keeper.getProofDataRecord(proofID) break @@ -238,21 +296,54 @@ func (keeper *ProofKeeper) eventLoop() { } } keeper.waitQueryProofCh <- resultProofRecord + + case <-keeper.stopCh: + err = keeper.proofDataDB.Sync() + if err == nil { + err = keeper.proofDataDB.Close() + } + keeper.waitStopCh <- err + return + + case <-gcProofTicker.C: + log.Info("Start to gc proof", "latest_block_id", keeper.latestBlockID, "keep_block_span", keeper.opts.keepProofBlockSpan) + if keeper.latestBlockID > keeper.opts.keepProofBlockSpan { + gcBeforeBlockID := keeper.latestBlockID - keeper.opts.keepProofBlockSpan + var gcBeforeKeepMetaRecord *keeperMetaRecord + var gcBeforeProofDataRecord *proofDataRecord + metaList := keeper.getKeeperMetaRecordList() + proofID := uint64(0) + if len(metaList) != 0 { + index := len(metaList) - 1 + for index >= 0 { + m := metaList[index] + if gcBeforeBlockID >= m.BlockID { + gcBeforeKeepMetaRecord = m + proofID = m.ProofID + (gcBeforeBlockID-m.BlockID)/m.KeepInterval + gcBeforeProofDataRecord = keeper.getProofDataRecord(proofID) + break + } + index = index - 1 + } + } + keeper.gcKeeperMetaRecordIfNeeded(gcBeforeKeepMetaRecord) + keeper.gcProofDataRecordIfNeeded(gcBeforeProofDataRecord) + + } } } } -// inner util func list -// keeper meta func +// getKeeperMetaRecordList returns keeper meta list. func (keeper *ProofKeeper) getKeeperMetaRecordList() []*keeperMetaRecord { var ( metaList []*keeperMetaRecord err error iter ethdb.Iterator ) + iter = rawdb.IterateKeeperMeta(keeper.keeperMetaDB) defer iter.Release() - for iter.Next() { keyBlockID := binary.BigEndian.Uint64(iter.Key()[1:]) m := keeperMetaRecord{} @@ -264,13 +355,15 @@ func (keeper *ProofKeeper) getKeeperMetaRecordList() []*keeperMetaRecord { log.Error("Failed to check consistency between key and value", "key_block_id", keyBlockID, "value_block_id", m.BlockID) continue } - log.Info("Keep meta", "key_block_id", keyBlockID, "meta_record", m) + // log.Info("Keep meta", "key_block_id", keyBlockID, "meta_record", m) metaList = append(metaList, &m) } - log.Info("Succeed to get meta list", "list", metaList) + // log.Info("Succeed to get meta list", "list", metaList) return metaList } +// truncateKeeperMetaRecordHeadIfNeeded is used to truncate keeper meta record head, +// which is used in reorg. func (keeper *ProofKeeper) truncateKeeperMetaRecordHeadIfNeeded(blockID uint64) bool { var ( err error @@ -278,11 +371,10 @@ func (keeper *ProofKeeper) truncateKeeperMetaRecordHeadIfNeeded(blockID uint64) batch ethdb.Batch hasTruncated bool ) + iter = rawdb.IterateKeeperMeta(keeper.keeperMetaDB) defer iter.Release() - batch = keeper.keeperMetaDB.NewBatch() - for iter.Next() { m := keeperMetaRecord{} if err = json.Unmarshal(iter.Value(), &m); err != nil { @@ -297,10 +389,11 @@ func (keeper *ProofKeeper) truncateKeeperMetaRecordHeadIfNeeded(blockID uint64) if err != nil { log.Crit("Failed to truncate keeper meta head", "err", err) } - log.Info("Succeed to truncate keeper meta", "block_id", blockID, "has_truncated", hasTruncated) + // log.Info("Succeed to truncate keeper meta", "block_id", blockID, "has_truncated", hasTruncated) return hasTruncated } +// putKeeperMetaRecord puts a new keeper meta record. func (keeper *ProofKeeper) putKeeperMetaRecord(m *keeperMetaRecord) { meta, err := json.Marshal(*m) if err != nil { @@ -308,10 +401,50 @@ func (keeper *ProofKeeper) putKeeperMetaRecord(m *keeperMetaRecord) { } rawdb.PutKeeperMeta(keeper.keeperMetaDB, m.BlockID, meta) log.Info("Succeed to put keeper meta", "record", m) +} + +// gcKeeperMetaRecordIfNeeded is used to the older keeper meta record. +func (keeper *ProofKeeper) gcKeeperMetaRecordIfNeeded(meta *keeperMetaRecord) { + if !keeper.opts.enable { + return + } + if meta == nil { + return + } + metaList := keeper.getKeeperMetaRecordList() + if len(metaList) < maxKeeperMetaNumber { + return + } + var ( + err error + iter ethdb.Iterator + batch ethdb.Batch + gcCounter uint64 + ) + + iter = rawdb.IterateKeeperMeta(keeper.keeperMetaDB) + defer iter.Release() + batch = keeper.keeperMetaDB.NewBatch() + for iter.Next() { + m := keeperMetaRecord{} + if err = json.Unmarshal(iter.Value(), &m); err != nil { + continue + } + if m.BlockID < meta.BlockID { + rawdb.DeleteKeeperMeta(batch, m.BlockID) + gcCounter++ + } + } + err = batch.Write() + if err != nil { + log.Crit("Failed to gc keeper meta", "err", err) + } + log.Info("Succeed to gc keeper meta", "gc_before_keep_meta", meta, "gc_counter", gcCounter) } -// proof data func +// truncateProofDataRecordHeadIfNeeded is used to truncate proof data record head, +// which is used in reorg. func (keeper *ProofKeeper) truncateProofDataRecordHeadIfNeeded(blockID uint64) { latestProofDataRecord := keeper.getLatestProofDataRecord() if latestProofDataRecord == nil { @@ -319,7 +452,7 @@ func (keeper *ProofKeeper) truncateProofDataRecordHeadIfNeeded(blockID uint64) { return } if blockID > latestProofDataRecord.BlockID { - log.Info("Skip to truncate proof data due to block id is newer") + // log.Info("Skip to truncate proof data due to block id is newer") return } @@ -341,6 +474,7 @@ func (keeper *ProofKeeper) truncateProofDataRecordHeadIfNeeded(blockID uint64) { log.Info("Succeed to truncate proof data", "block_id", blockID, "truncate_proof_id", truncateProofID) } +// getLatestProofDataRecord return the latest proof data record. func (keeper *ProofKeeper) getLatestProofDataRecord() *proofDataRecord { latestProofData := rawdb.GetLatestProofData(keeper.proofDataDB) if latestProofData == nil { @@ -352,10 +486,11 @@ func (keeper *ProofKeeper) getLatestProofDataRecord() *proofDataRecord { if err != nil { log.Crit("Failed to unmarshal proof data", "err", err) } - log.Info("Succeed to get latest proof data", "record", data) + // log.Info("Succeed to get latest proof data", "record", data) return &data } +// getProofDataRecord returns proof record by proofid. func (keeper *ProofKeeper) getProofDataRecord(proofID uint64) *proofDataRecord { latestProofData := rawdb.GetProofData(keeper.proofDataDB, proofID) if latestProofData == nil { @@ -367,17 +502,36 @@ func (keeper *ProofKeeper) getProofDataRecord(proofID uint64) *proofDataRecord { if err != nil { log.Crit("Failed to unmarshal proof data", "err", err) } - log.Info("Succeed to get proof data", "record", data) + // log.Info("Succeed to get proof data", "record", data) return &data } -func (keeper *ProofKeeper) putProofDataRecord(p *proofDataRecord) { +// putProofDataRecord puts a new proof data record. +func (keeper *ProofKeeper) putProofDataRecord(p *proofDataRecord) error { proof, err := json.Marshal(*p) if err != nil { - log.Crit("Failed to marshal proof data", "err", err) + log.Error("Failed to marshal proof data", "error", err) + return err } - rawdb.PutProofData(keeper.proofDataDB, p.ProofID, proof) - log.Info("Succeed to put proof data", "record", p) + err = rawdb.PutProofData(keeper.proofDataDB, p.ProofID, proof) + // log.Info("Succeed to put proof data", "record", p, "error", err) + return err +} + +// gcProofDataRecordIfNeeded is used to the older proof data record. +func (keeper *ProofKeeper) gcProofDataRecordIfNeeded(data *proofDataRecord) { + if !keeper.opts.enable { + return + } + if data == nil { + return + } + if data.ProofID == 0 { + return + } + + rawdb.TruncateProofDataTail(keeper.proofDataDB, data.ProofID) + log.Info("Succeed to gc proof data", "gc_before_proof_data", data) } // IsProposeProofQuery is used to determine whether it is proposed proof. @@ -391,7 +545,7 @@ func (keeper *ProofKeeper) IsProposeProofQuery(address common.Address, storageKe if len(storageKeys) != 0 { return false } - // blockID%keeper.opts.keepInterval == 0 is not checked because keepInterval may have been adjusted before. + // blockID%keepInterval == 0 is not checked because keepInterval may have been adjusted before. _ = blockID return true } @@ -403,13 +557,16 @@ func (keeper *ProofKeeper) QueryProposeProof(blockID uint64, stateRoot common.Ha err error startTimestamp time.Time ) + startTimestamp = time.Now() defer func() { + queryProofTimer.UpdateSince(startTimestamp) log.Info("Query propose proof", "block_id", blockID, "state_root", stateRoot.String(), "error", err, "elapsed", common.PrettyDuration(time.Since(startTimestamp))) }() + keeper.queryProofCh <- blockID resultProofRecord := <-keeper.waitQueryProofCh if resultProofRecord == nil { diff --git a/core/proof_keeper_test.go b/core/proof_keeper_test.go index 9a8bc9592b..9293f4b710 100644 --- a/core/proof_keeper_test.go +++ b/core/proof_keeper_test.go @@ -1 +1,198 @@ package core + +import ( + "os" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/trie/triedb/pathdb" + "github.com/stretchr/testify/assert" +) + +const ( + testProofKeeperDBDir = "./test_proof_keeper_db" +) + +var ( + mockBlockChain *BlockChain + mockKeeperMetaDB ethdb.Database +) + +func setupTestEnv() { + mockKeeperMetaDB, _ = rawdb.Open(rawdb.OpenOptions{ + Type: "pebble", + Directory: testProofKeeperDBDir, + AncientsDirectory: testProofKeeperDBDir + "/ancient", + Namespace: "test_proof_keeper", + Cache: 10, + Handles: 10, + ReadOnly: false, + }) +} + +func cleanupTestEnv() { + mockKeeperMetaDB.Close() + os.RemoveAll(testProofKeeperDBDir) +} + +func TestProofKeeperStartAndStop(t *testing.T) { + setupTestEnv() + + keeperOpts := &proofKeeperOptions{ + enable: true, + keepProofBlockSpan: 100, + watchStartKeepCh: make(chan *pathdb.KeepRecord), + notifyFinishKeepCh: make(chan struct{}), + } + keeper := newProofKeeper(keeperOpts) + assert.NotNil(t, keeper) + + err := keeper.Start(mockBlockChain, mockKeeperMetaDB) + assert.Nil(t, err) + + err = keeper.Stop() + assert.Nil(t, err) + + cleanupTestEnv() +} + +func TestProofKeeperGC(t *testing.T) { + setupTestEnv() + keeperOpts := &proofKeeperOptions{ + enable: true, + keepProofBlockSpan: 100, + gcInterval: 1, + watchStartKeepCh: make(chan *pathdb.KeepRecord), + notifyFinishKeepCh: make(chan struct{}), + } + keeper := newProofKeeper(keeperOpts) + assert.NotNil(t, keeper) + + err := keeper.Start(mockBlockChain, mockKeeperMetaDB) + assert.Nil(t, err) + + for i := uint64(1); i <= 100; i++ { + keeper.putKeeperMetaRecord(&keeperMetaRecord{ + BlockID: i, + ProofID: i - 1, + KeepInterval: 1, + }) + keeper.putProofDataRecord(&proofDataRecord{ + ProofID: i - 1, + BlockID: i, + StateRoot: common.Hash{}, + Address: common.Address{}, + AccountProof: nil, + Balance: nil, + CodeHash: common.Hash{}, + Nonce: 0, + StorageHash: common.Hash{}, + StorageProof: nil, + }) + } + keeper.latestBlockID = 100 + time.Sleep(2 * time.Second) // wait gc loop + + // no gc, becase keeper.latestBlockID <= keeper.opts.keepProofBlockSpan + metaList := keeper.getKeeperMetaRecordList() + assert.Equal(t, 100, len(metaList)) + + for i := uint64(101); i <= 105; i++ { + keeper.putKeeperMetaRecord(&keeperMetaRecord{ + BlockID: i, + ProofID: i - 1, + KeepInterval: 1, + }) + keeper.putProofDataRecord(&proofDataRecord{ + ProofID: i - 1, + BlockID: i, + StateRoot: common.Hash{}, + Address: common.Address{}, + AccountProof: nil, + Balance: nil, + CodeHash: common.Hash{}, + Nonce: 0, + StorageHash: common.Hash{}, + StorageProof: nil, + }) + } + + keeper.latestBlockID = 105 + time.Sleep(2 * time.Second) // wait gc loop + + // gc keep meta which block_id < 5(latestBlockID - keepProofBlockSpan), and 1/2/3/4 blockid keeper meta is truncated. + metaList = keeper.getKeeperMetaRecordList() + assert.Equal(t, 101, len(metaList)) + + // gc proof data, truncate proof id = 4, and 0/1/2/3 proofid proof data is truncated. + assert.NotNil(t, keeper.getProofDataRecord(4)) + assert.Nil(t, keeper.getProofDataRecord(3)) + + err = keeper.Stop() + assert.Nil(t, err) + + cleanupTestEnv() +} + +func TestProofKeeperQuery(t *testing.T) { + setupTestEnv() + + keeperOpts := &proofKeeperOptions{ + enable: true, + watchStartKeepCh: make(chan *pathdb.KeepRecord), + notifyFinishKeepCh: make(chan struct{}), + } + keeper := newProofKeeper(keeperOpts) + assert.NotNil(t, keeper) + + err := keeper.Start(mockBlockChain, mockKeeperMetaDB) + assert.Nil(t, err) + + for i := uint64(1); i <= 100; i++ { + if i%15 == 0 { + keeper.putKeeperMetaRecord(&keeperMetaRecord{ + BlockID: i, + ProofID: i - 1, + KeepInterval: 1, + }) + } + keeper.putProofDataRecord(&proofDataRecord{ + ProofID: i - 1, + BlockID: i, + StateRoot: common.Hash{}, + Address: common.Address{}, + AccountProof: nil, + Balance: nil, + CodeHash: common.Hash{}, + Nonce: 0, + StorageHash: common.Hash{}, + StorageProof: nil, + }) + + } + + keeper.latestBlockID = 100 + result, err := keeper.QueryProposeProof(45, common.Hash{}) + assert.Nil(t, err) + assert.NotNil(t, result) + result, err = keeper.QueryProposeProof(46, common.Hash{}) + assert.Nil(t, err) + assert.NotNil(t, result) + result, err = keeper.QueryProposeProof(1, common.Hash{}) // should >= 15 + assert.NotNil(t, err) + assert.Nil(t, result) + result, err = keeper.QueryProposeProof(100, common.Hash{}) + assert.Nil(t, err) + assert.NotNil(t, result) + result, err = keeper.QueryProposeProof(101, common.Hash{}) // should <= 100 + assert.NotNil(t, err) + assert.Nil(t, result) + + err = keeper.Stop() + assert.Nil(t, err) + + cleanupTestEnv() +} diff --git a/core/rawdb/accessors_proof.go b/core/rawdb/accessors_proof.go index fb97d3fc1f..36f2c9d72b 100644 --- a/core/rawdb/accessors_proof.go +++ b/core/rawdb/accessors_proof.go @@ -6,23 +6,22 @@ import ( ) const ( - blockNumberLength = 8 // uint64 is 8bytes + blockNumberLength = 8 // uint64 is 8 bytes. ) -// todo: cannot panic -// todo: more tips -// todo: inspect proof ancient -// Keeper Meta +// IterateKeeperMeta returns keep meta iterator. func IterateKeeperMeta(db ethdb.Iteratee) ethdb.Iterator { return NewKeyLengthIterator(db.NewIterator(proofKeeperMetaPrefix, nil), len(proofKeeperMetaPrefix)+blockNumberLength) } +// DeleteKeeperMeta is used to remove the specified keeper meta. func DeleteKeeperMeta(db ethdb.KeyValueWriter, blockID uint64) { if err := db.Delete(proofKeeperMetaKey(blockID)); err != nil { log.Crit("Failed to delete keeper meta", "err", err) } } +// PutKeeperMeta add a new keeper meta. func PutKeeperMeta(db ethdb.KeyValueWriter, blockID uint64, meta []byte) { key := proofKeeperMetaKey(blockID) if err := db.Put(key, meta); err != nil { @@ -30,7 +29,7 @@ func PutKeeperMeta(db ethdb.KeyValueWriter, blockID uint64, meta []byte) { } } -// Proof Data +// GetLatestProofData returns the latest head proof data. func GetLatestProofData(f *ResettableFreezer) []byte { proofTable := f.freezer.tables[proposeProofTable] if proofTable == nil { @@ -44,6 +43,7 @@ func GetLatestProofData(f *ResettableFreezer) []byte { return blob } +// GetProofData returns the specified proof data. func GetProofData(f *ResettableFreezer, proofID uint64) []byte { proofTable := f.freezer.tables[proposeProofTable] if proofTable == nil { @@ -56,17 +56,20 @@ func GetProofData(f *ResettableFreezer, proofID uint64) []byte { return blob } +// TruncateProofDataHead truncates [proofID, end...]. func TruncateProofDataHead(f *ResettableFreezer, proofID uint64) { f.freezer.TruncateHead(proofID) } -func PutProofData(db ethdb.AncientWriter, proofID uint64, proof []byte) { - db.ModifyAncients(func(op ethdb.AncientWriteOp) error { - err := op.AppendRaw(proposeProofTable, proofID, proof) - if err != nil { - // todo: panic - log.Error("Failed to put proof data", "proof_id", proofID, "error", err) - } - return nil +// TruncateProofDataTail truncates [start..., proofID). +func TruncateProofDataTail(f *ResettableFreezer, proofID uint64) { + f.freezer.TruncateTail(proofID) +} + +// PutProofData appends a new proof to ancient proof db, the proofID should be continuous. +func PutProofData(db ethdb.AncientWriter, proofID uint64, proof []byte) error { + _, err := db.ModifyAncients(func(op ethdb.AncientWriteOp) error { + return op.AppendRaw(proposeProofTable, proofID, proof) }) + return err } diff --git a/core/rawdb/accessors_proof_test.go b/core/rawdb/accessors_proof_test.go index 04d8429483..601b9b90bd 100644 --- a/core/rawdb/accessors_proof_test.go +++ b/core/rawdb/accessors_proof_test.go @@ -24,7 +24,6 @@ func setupTestEnv() { func cleanupTestEnv() { testAncientProofDB.Close() os.RemoveAll(testAncientProofDir) - } func TestProofDataAPI(t *testing.T) { @@ -37,14 +36,17 @@ func TestProofDataAPI(t *testing.T) { // case2: mismatch sequence put failed mismatchProofID := uint64(2) // should=0 - PutProofData(testAncientProofDB, mismatchProofID, mockData1) + err := PutProofData(testAncientProofDB, mismatchProofID, mockData1) + assert.NotNil(t, err) proofData = GetLatestProofData(testAncientProofDB) assert.Nil(t, proofData) // case3: put/get succeed matchProofID := uint64(0) - PutProofData(testAncientProofDB, matchProofID, mockData1) - PutProofData(testAncientProofDB, matchProofID+1, mockData2) + err = PutProofData(testAncientProofDB, matchProofID, mockData1) + assert.Nil(t, err) + err = PutProofData(testAncientProofDB, matchProofID+1, mockData2) + assert.Nil(t, err) proofData = GetLatestProofData(testAncientProofDB) assert.Equal(t, proofData, mockData2) proofData = GetProofData(testAncientProofDB, 0) @@ -52,7 +54,7 @@ func TestProofDataAPI(t *testing.T) { proofData = GetProofData(testAncientProofDB, 1) assert.Equal(t, proofData, mockData2) - // case4: truncate + // case4: truncate head TruncateProofDataHead(testAncientProofDB, 1) proofData = GetProofData(testAncientProofDB, 1) assert.Nil(t, proofData) @@ -67,5 +69,19 @@ func TestProofDataAPI(t *testing.T) { proofData = GetLatestProofData(testAncientProofDB) assert.Equal(t, proofData, mockData1) + // case6: truncate tail + PutProofData(testAncientProofDB, matchProofID+1, mockData2) + proofData = GetProofData(testAncientProofDB, matchProofID) + assert.Equal(t, proofData, mockData1) + PutProofData(testAncientProofDB, matchProofID+2, mockData2) + TruncateProofDataTail(testAncientProofDB, matchProofID+1) + proofData = GetProofData(testAncientProofDB, matchProofID) + assert.Nil(t, proofData) + proofData = GetProofData(testAncientProofDB, matchProofID+1) + assert.Equal(t, proofData, mockData2) + TruncateProofDataTail(testAncientProofDB, matchProofID+2) + proofData = GetProofData(testAncientProofDB, matchProofID+1) + assert.Nil(t, proofData) + cleanupTestEnv() } diff --git a/core/rawdb/ancient_scheme.go b/core/rawdb/ancient_scheme.go index 736667a22f..fc07a59172 100644 --- a/core/rawdb/ancient_scheme.go +++ b/core/rawdb/ancient_scheme.go @@ -67,7 +67,7 @@ var stateFreezerNoSnappy = map[string]bool{ } const ( - proposeProofTable = "propose_proof" + proposeProofTable = "propose.proof" ) var proofFreezerNoSnappy = map[string]bool{ @@ -82,7 +82,7 @@ var ( ) // freezers the collections of all builtin freezers. -var freezers = []string{chainFreezerName, stateFreezerName} +var freezers = []string{chainFreezerName, stateFreezerName, proofFreezerName} // NewStateFreezer initializes the freezer for state history. func NewStateFreezer(ancientDir string, readOnly bool) (*ResettableFreezer, error) { diff --git a/core/rawdb/ancient_utils.go b/core/rawdb/ancient_utils.go index dfb2fdfb67..2152b480f7 100644 --- a/core/rawdb/ancient_utils.go +++ b/core/rawdb/ancient_utils.go @@ -107,6 +107,26 @@ func inspectFreezers(db ethdb.Database) ([]freezerInfo, error) { } infos = append(infos, info) + case proofFreezerName: + if ReadStateScheme(db) != PathScheme { + continue + } + datadir, err := db.AncientDatadir() + if err != nil { + return nil, err + } + f, err := NewProofFreezer(datadir, true) + if err != nil { + return nil, err + } + defer f.Close() + + info, err := inspect(proofFreezerName, proofFreezerNoSnappy, f) + if err != nil { + return nil, err + } + infos = append(infos, info) + default: return nil, fmt.Errorf("unknown freezer, supported ones: %v", freezers) } diff --git a/eth/api_backend.go b/eth/api_backend.go index b34601c581..c9348052a3 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -54,8 +54,8 @@ type EthAPIBackend struct { gpo *gasprice.Oracle } -func (b *EthAPIBackend) GetProofKeeper() *core.ProofKeeper { - return b.eth.blockchain.ProofKeeper +func (b *EthAPIBackend) ProofKeeper() *core.ProofKeeper { + return b.eth.blockchain.ProofKeeper() } // ChainConfig returns the active chain configuration. diff --git a/eth/backend.go b/eth/backend.go index 765b96caa5..3d3a493817 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -209,6 +209,8 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { TrieCommitInterval: config.TrieCommitInterval, PathNodeBuffer: config.PathNodeBuffer, ProposeBlockInterval: config.ProposeBlockInterval, + EnableProofKeeper: config.EnableProofKeeper, + KeepProofBlockSpan: config.KeepProofBlockSpan, } ) // Override the chain config with provided settings. diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 30b2b848af..ecd6347927 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -114,6 +114,8 @@ type Config struct { StateScheme string `toml:",omitempty"` PathNodeBuffer pathdb.NodeBufferType `toml:",omitempty"` // Type of trienodebuffer to cache trie nodes in disklayer ProposeBlockInterval uint64 `toml:",omitempty"` // Keep the same with op-proposer propose block interval + EnableProofKeeper bool `toml:",omitempty"` // Whether to enable proof keeper + KeepProofBlockSpan uint64 `toml:",omitempty"` // Span block of keep proof // RequiredBlocks is a set of block number -> hash mappings which must be in the // canonical chain of all remote peers. Setting the option makes geth verify the diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 448c6ec45e..f4c5d82512 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -696,7 +696,7 @@ func (s *BlockChainAPI) GetProof(ctx context.Context, address common.Address, st } defer func() { - if proofKeeper := s.b.GetProofKeeper(); err != nil && proofKeeper != nil && header.Number != nil { + if proofKeeper := s.b.ProofKeeper(); err != nil && proofKeeper != nil && header.Number != nil { if proofKeeper.IsProposeProofQuery(address, storageKeys, header.Number.Uint64()) { if innerResult, innerError := proofKeeper.QueryProposeProof(header.Number.Uint64(), header.Root); innerError == nil { result = &AccountResult{ diff --git a/internal/ethapi/api_test.go b/internal/ethapi/api_test.go index 7f7b0f1495..2976c6367f 100644 --- a/internal/ethapi/api_test.go +++ b/internal/ethapi/api_test.go @@ -593,6 +593,10 @@ func newTestBackend(t *testing.T, n int, gspec *core.Genesis, engine consensus.E return backend } +func (b testBackend) ProofKeeper() *core.ProofKeeper { + return nil +} + func (b *testBackend) setPendingBlock(block *types.Block) { b.pending = block } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index b84db7a5dc..e05715b31f 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -73,7 +73,7 @@ type Backend interface { SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription SubscribeChainSideEvent(ch chan<- core.ChainSideEvent) event.Subscription - GetProofKeeper() *core.ProofKeeper + ProofKeeper() *core.ProofKeeper // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction) error diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index b743064625..386cbcf4f5 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -243,6 +243,10 @@ func newBackendMock() *backendMock { } } +func (b *backendMock) ProofKeeper() *core.ProofKeeper { + return nil +} + func (b *backendMock) activateLondon() { b.current.Number = big.NewInt(1100) } diff --git a/les/api_backend.go b/les/api_backend.go index c792f118b9..c7f9105895 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -48,7 +48,7 @@ type LesApiBackend struct { gpo *gasprice.Oracle } -func (b *LesApiBackend) GetProofKeeper() *core.ProofKeeper { +func (b *LesApiBackend) ProofKeeper() *core.ProofKeeper { return nil } diff --git a/trie/triedb/pathdb/database.go b/trie/triedb/pathdb/database.go index 2a54fd161f..3e9022e055 100644 --- a/trie/triedb/pathdb/database.go +++ b/trie/triedb/pathdb/database.go @@ -94,13 +94,13 @@ type layer interface { // Config contains the settings for database. type Config struct { - TrieNodeBufferType NodeBufferType // Type of trienodebuffer to cache trie nodes in disklayer - StateHistory uint64 // Number of recent blocks to maintain state history for - CleanCacheSize int // Maximum memory allowance (in bytes) for caching clean nodes - DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes - ReadOnly bool // Flag whether the database is opened in read only mode. - ProposeBlockInterval uint64 // Propose block to L1 block interval. - KeepFunc KeepRecordWatchFunc + TrieNodeBufferType NodeBufferType // Type of trienodebuffer to cache trie nodes in disklayer + StateHistory uint64 // Number of recent blocks to maintain state history for + CleanCacheSize int // Maximum memory allowance (in bytes) for caching clean nodes + DirtyCacheSize int // Maximum memory allowance (in bytes) for caching dirty nodes + ReadOnly bool // Flag whether the database is opened in read only mode. + ProposeBlockInterval uint64 // Propose block to L1 block interval. + KeepFunc KeepRecordWatchFunc // KeepFunc is used to keep the proof which maybe queried by op-proposal. } // sanitize checks the provided user configurations and changes anything that's diff --git a/trie/triedb/pathdb/nodebufferlist.go b/trie/triedb/pathdb/nodebufferlist.go index d78e9bf45c..c09cf86a72 100644 --- a/trie/triedb/pathdb/nodebufferlist.go +++ b/trie/triedb/pathdb/nodebufferlist.go @@ -65,13 +65,11 @@ type nodebufferlist struct { baseMux sync.RWMutex // The mutex of base multiDifflayer and persistID. flushMux sync.RWMutex // The mutex of flushing base multiDifflayer for reorg corner case. - isFlushing atomic.Bool // Flag indicates writing disk under background. - stopFlushing atomic.Bool // Flag stops writing disk under background. - stopCh chan struct{} - waitStopCh chan struct{} - //notifyKeepCh chan *KeepRecord - //waitKeepCh chan struct{} - keepFunc KeepRecordWatchFunc + isFlushing atomic.Bool // Flag indicates writing disk under background. + stopFlushing atomic.Bool // Flag stops writing disk under background. + stopCh chan struct{} // Trigger stop background event loop. + waitStopCh chan struct{} // Wait stop background event loop. + keepFunc KeepRecordWatchFunc // Used to keep op-proposal output proof. } // newNodeBufferList initializes the node buffer list with the provided nodes @@ -136,6 +134,7 @@ func newNodeBufferList( // node retrieves the trie node with given node info. func (nf *nodebufferlist) node(owner common.Hash, path []byte, hash common.Hash) (node *trienode.Node, err error) { nf.mux.RLock() + defer nf.mux.RUnlock() find := func(nc *multiDifflayer) bool { subset, ok := nc.nodes[owner] if !ok { @@ -155,14 +154,11 @@ func (nf *nodebufferlist) node(owner common.Hash, path []byte, hash common.Hash) } nf.traverse(find) if err != nil { - nf.mux.RUnlock() return nil, err } if node != nil { - nf.mux.RUnlock() return node, nil } - nf.mux.RUnlock() nf.baseMux.RLock() node, err = nf.base.node(owner, path, hash) @@ -244,6 +240,16 @@ func (nf *nodebufferlist) flush(db ethdb.KeyValueStore, clean *fastcache.Cache, return nil } + if nf.keepFunc != nil { + nf.mux.RLock() + traverseKeepFunc := func(buffer *multiDifflayer) bool { + nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}) + return true + } + nf.traverseReverse(traverseKeepFunc) + nf.mux.RUnlock() + } + // hang user read/write and background write nf.mux.Lock() nf.baseMux.Lock() @@ -471,7 +477,9 @@ func (nf *nodebufferlist) diffToBase() { log.Crit("committed block number misaligned", "block", buffer.block) } - nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}) + if nf.keepFunc != nil { + nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}) + } nf.baseMux.Lock() err := nf.base.commit(buffer.root, buffer.id, buffer.block, buffer.layers, buffer.nodes) @@ -530,14 +538,15 @@ func (nf *nodebufferlist) loop() { for { select { case <-nf.stopCh: - // force flush to ensure all proposed-block can be kept by proof keeper - nf.mux.RLock() - notifyKeeperFunc := func(buffer *multiDifflayer) bool { - nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}) - return true + if nf.keepFunc != nil { + nf.mux.RLock() + traverseKeepFunc := func(buffer *multiDifflayer) bool { + nf.keepFunc(&KeepRecord{BlockID: buffer.block, StateRoot: buffer.root, KeepInterval: nf.wpBlocks}) + return true + } + nf.traverseReverse(traverseKeepFunc) + nf.mux.RUnlock() } - nf.traverseReverse(notifyKeeperFunc) - nf.mux.RUnlock() nf.waitStopCh <- struct{}{} return case <-mergeTicker.C: @@ -628,7 +637,9 @@ func (w *proposedBlockReader) Node(owner common.Hash, path []byte, hash common.H current = current.next } + w.nf.baseMux.RLock() node, err := w.nf.base.node(owner, path, hash) + w.nf.baseMux.RUnlock() if err != nil { return nil, err }