From c36a3ce8591686c9a1580fcbf5b6c29a4b401d0d Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Thu, 23 Nov 2023 17:11:54 +0800 Subject: [PATCH 01/13] fix: wrong event log value (#16) Co-authored-by: Welkin --- core/state/pruner/pruner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/state/pruner/pruner.go b/core/state/pruner/pruner.go index 6d4b5c373f..af971c722c 100644 --- a/core/state/pruner/pruner.go +++ b/core/state/pruner/pruner.go @@ -325,7 +325,7 @@ func (p *Pruner) Prune(root common.Hash) error { } } else { if len(layers) > 0 { - log.Info("Selecting bottom-most difflayer as the pruning target", "root", root, "height", p.chainHeader.Number.Uint64()-127) + log.Info("Selecting bottom-most difflayer as the pruning target", "root", root, "height", p.chainHeader.Number.Uint64()-(p.triesInMemory-1)) } else { log.Info("Selecting user-specified state as the pruning target", "root", root) } From feb5339c65b950fe13a40fb6a4ff9a485f02e296 Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Fri, 24 Nov 2023 11:38:15 +0800 Subject: [PATCH 02/13] add option to reannounce local transactions (#33) Co-authored-by: andyzhang2023 Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com> --- cmd/geth/main.go | 1 + cmd/utils/flags.go | 9 +++++++++ core/txpool/txpool.go | 7 ++++--- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 3dc1bb9b40..b5ad595a84 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -89,6 +89,7 @@ var ( utils.TxPoolGlobalQueueFlag, utils.TxPoolLifetimeFlag, utils.TxPoolReannounceTimeFlag, + utils.TxPoolReannounceRemotesFlag, utils.SyncModeFlag, utils.SyncTargetFlag, utils.ExitWhenSyncedFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index e10cef3480..7994e671d4 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -465,6 +465,12 @@ var ( Value: ethconfig.Defaults.TxPool.ReannounceTime, Category: flags.TxPoolCategory, } + TxPoolReannounceRemotesFlag = &cli.BoolFlag{ + Name: "txpool.reannounceremotes", + Usage: "Wether reannnounce remote transactions or not(default = false)", + Value: ethconfig.Defaults.TxPool.ReannounceRemotes, + Category: flags.TxPoolCategory, + } // Performance tuning settings CacheFlag = &cli.IntFlag{ @@ -1669,6 +1675,9 @@ func setTxPool(ctx *cli.Context, cfg *txpool.Config) { if ctx.IsSet(TxPoolReannounceTimeFlag.Name) { cfg.ReannounceTime = ctx.Duration(TxPoolReannounceTimeFlag.Name) } + if ctx.IsSet(TxPoolReannounceRemotesFlag.Name) { + cfg.ReannounceRemotes = ctx.Bool(TxPoolReannounceRemotesFlag.Name) + } } func setEthash(ctx *cli.Context, cfg *ethconfig.Config) { diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 03e679a08e..6ec4f63a48 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -183,8 +183,9 @@ type Config struct { AccountQueue uint64 // Maximum number of non-executable transaction slots permitted per account GlobalQueue uint64 // Maximum number of non-executable transaction slots for all accounts - Lifetime time.Duration // Maximum amount of time non-executable transaction are queued - ReannounceTime time.Duration // Duration for announcing local pending transactions again + Lifetime time.Duration // Maximum amount of time non-executable transaction are queued + ReannounceTime time.Duration // Duration for announcing local pending transactions again + ReannounceRemotes bool // Wether reannounce remote transactions or not } // DefaultConfig contains the default configurations for the transaction @@ -433,7 +434,7 @@ func (pool *TxPool) loop() { reannoTxs := func() []*types.Transaction { txs := make([]*types.Transaction, 0) for addr, list := range pool.pending { - if !pool.locals.contains(addr) { + if !pool.config.ReannounceRemotes && !pool.locals.contains(addr) { continue } From c5d0d0c2de7d48d454779e1b710c1b43d2ca852d Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Fri, 24 Nov 2023 11:38:49 +0800 Subject: [PATCH 03/13] fix: clear underpriced buffer (#34) * eth/fetcher: allow underpriced transactions in after timeout * eth/fetcher: fix fetcher timeout (#28220) This changes fixes a bug in the fetcher, where the timeout for how long to remember underpriced transaction was erroneously compared, and the timeout never hit. --------- Co-authored-by: Martin Holst Swende (cherry picked from commit 667966c5c10e7bf1e38a0439c62b11d5b26a132a) * fix test case: TestTransactionForgotten --------- Co-authored-by: Marius van der Wijden Co-authored-by: andyzhang2023 Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com> --- core/types/transaction.go | 7 +++++++ eth/fetcher/tx_fetcher.go | 38 ++++++++++++++++++++-------------- eth/fetcher/tx_fetcher_test.go | 37 +++++++++++++++++++++++++++++++-- 3 files changed, 64 insertions(+), 18 deletions(-) diff --git a/core/types/transaction.go b/core/types/transaction.go index 6d199e81be..827d7bf016 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -449,6 +449,13 @@ func (tx *Transaction) EffectiveGasTipIntCmp(other *big.Int, baseFee *big.Int) i return tx.EffectiveGasTipValue(baseFee).Cmp(other) } +// SetTime sets the decoding time of a transaction. This is used by tests to set +// arbitrary times and by persistent transaction pools when loading old txs from +// disk. +func (tx *Transaction) SetTime(t time.Time) { + tx.time = t +} + // Hash returns the transaction hash. func (tx *Transaction) Hash() common.Hash { if hash := tx.hash.Load(); hash != nil { diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index f01e683ff2..4e016c833a 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -24,9 +24,9 @@ import ( "sort" "time" - mapset "github.com/deckarep/golang-set/v2" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" + "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/core/txpool" "github.com/ethereum/go-ethereum/core/types" @@ -54,6 +54,9 @@ const ( // re-request them. maxTxUnderpricedSetSize = 32768 + // maxTxUnderpricedTimeout is the max time a transaction should be stuck in the underpriced set. + maxTxUnderpricedTimeout = 5 * time.Minute + // txArriveTimeout is the time allowance before an announced transaction is // explicitly requested. txArriveTimeout = 500 * time.Millisecond @@ -149,7 +152,7 @@ type TxFetcher struct { drop chan *txDrop quit chan struct{} - underpriced mapset.Set[common.Hash] // Transactions discarded as too cheap (don't re-fetch) + underpriced *lru.Cache[common.Hash, time.Time] // Transactions discarded as too cheap (don't re-fetch) // Stage 1: Waiting lists for newly discovered transactions that might be // broadcast without needing explicit request/reply round trips. @@ -203,7 +206,7 @@ func NewTxFetcherForTests( fetching: make(map[common.Hash]string), requests: make(map[string]*txRequest), alternates: make(map[common.Hash]map[string]struct{}), - underpriced: mapset.NewSet[common.Hash](), + underpriced: lru.NewCache[common.Hash, time.Time](maxTxUnderpricedSetSize), hasTx: hasTx, addTxs: addTxs, fetchTxs: fetchTxs, @@ -224,17 +227,16 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { // still valuable to check here because it runs concurrent to the internal // loop, so anything caught here is time saved internally. var ( - unknowns = make([]common.Hash, 0, len(hashes)) - duplicate, underpriced int64 + unknowns = make([]common.Hash, 0, len(hashes)) + duplicate int64 + underpriced int64 ) for _, hash := range hashes { switch { case f.hasTx(hash): duplicate++ - - case f.underpriced.Contains(hash): + case f.isKnownUnderpriced(hash): underpriced++ - default: unknowns = append(unknowns, hash) } @@ -246,10 +248,7 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { if len(unknowns) == 0 { return nil } - announce := &txAnnounce{ - origin: peer, - hashes: unknowns, - } + announce := &txAnnounce{origin: peer, hashes: unknowns} select { case f.notify <- announce: return nil @@ -258,6 +257,16 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { } } +// isKnownUnderpriced reports whether a transaction hash was recently found to be underpriced. +func (f *TxFetcher) isKnownUnderpriced(hash common.Hash) bool { + prevTime, ok := f.underpriced.Peek(hash) + if ok && prevTime.Before(time.Now().Add(-maxTxUnderpricedTimeout)) { + f.underpriced.Remove(hash) + return false + } + return ok +} + // Enqueue imports a batch of received transaction into the transaction pool // and the fetcher. This method may be called by both transaction broadcasts and // direct request replies. The differentiation is important so the fetcher can @@ -300,10 +309,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) // Avoid re-request this transaction when we receive another // announcement. if errors.Is(err, txpool.ErrUnderpriced) || errors.Is(err, txpool.ErrReplaceUnderpriced) { - for f.underpriced.Cardinality() >= maxTxUnderpricedSetSize { - f.underpriced.Pop() - } - f.underpriced.Add(batch[j].Hash()) + f.underpriced.Add(batch[j].Hash(), batch[j].Time()) } // Track a few interesting failure types switch { diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go index 1715def99c..59a57f56a6 100644 --- a/eth/fetcher/tx_fetcher_test.go +++ b/eth/fetcher/tx_fetcher_test.go @@ -1509,8 +1509,8 @@ func testTransactionFetcher(t *testing.T, tt txFetcherTest) { } case isUnderpriced: - if fetcher.underpriced.Cardinality() != int(step) { - t.Errorf("step %d: underpriced set size mismatch: have %d, want %d", i, fetcher.underpriced.Cardinality(), step) + if fetcher.underpriced.Len() != int(step) { + t.Errorf("step %d: underpriced set size mismatch: have %d, want %d", i, fetcher.underpriced.Len(), step) } default: @@ -1535,3 +1535,36 @@ func containsHash(slice []common.Hash, hash common.Hash) bool { } return false } + +// Tests that a transaction is forgotten after the timeout. +func TestTransactionForgotten(t *testing.T) { + fetcher := NewTxFetcher( + func(common.Hash) bool { return false }, + func(txs []*types.Transaction) []error { + errs := make([]error, len(txs)) + for i := 0; i < len(errs); i++ { + errs[i] = txpool.ErrUnderpriced + } + return errs + }, + func(string, []common.Hash) error { return nil }) + fetcher.Start() + defer fetcher.Stop() + // Create one TX which is 5 minutes old, and one which is recent + tx1 := types.NewTx(&types.LegacyTx{Nonce: 0}) + tx1.SetTime(time.Now().Add(-maxTxUnderpricedTimeout - 1*time.Second)) + tx2 := types.NewTx(&types.LegacyTx{Nonce: 1}) + + // Enqueue both in the fetcher. They will be immediately tagged as underpriced + if err := fetcher.Enqueue("asdf", []*types.Transaction{tx1, tx2}, false); err != nil { + t.Fatal(err) + } + // isKnownUnderpriced should trigger removal of the first tx (no longer be known underpriced) + if fetcher.isKnownUnderpriced(tx1.Hash()) { + t.Fatal("transaction should be forgotten by now") + } + // isKnownUnderpriced should not trigger removal of the second + if !fetcher.isKnownUnderpriced(tx2.Hash()) { + t.Fatal("transaction should be known underpriced") + } +} From 9cfc55bf26c8c8f9d0e81bbd280d0b8c6fcb425b Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Tue, 28 Nov 2023 18:13:58 +0800 Subject: [PATCH 04/13] add metrics for invalid transactions detail --- core/txpool/invalid.go | 68 ++++++++++++++++++++++++++++++++++ core/txpool/txpool.go | 27 ++++++++++++++ eth/handler.go | 6 +++ eth/protocols/eth/broadcast.go | 4 +- 4 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 core/txpool/invalid.go diff --git a/core/txpool/invalid.go b/core/txpool/invalid.go new file mode 100644 index 0000000000..637919d13c --- /dev/null +++ b/core/txpool/invalid.go @@ -0,0 +1,68 @@ +package txpool + +import ( + "github.com/ethereum/go-ethereum/metrics" +) + +const ( + AlreadyKnown = "AlreadyKnown" + TypeNotSupportDeposit = "TypeNotSupportDeposit" + TypeNotSupport1559 = "TypeNotSupport1559" + TypeNotSupport2718 = "TypeNotSupport2718" + MissingTransaction = "MissingTransaction" + OversizedData = "OversizedData" + MaxInitCodeSizeExceeded = "MaxInitCodeSizeExceeded" + NegativeValue = "NegativeValue" + GasLimit = "GasLimit" + FeeCapVeryHigh = "FeeCapVeryHigh" + TipVeryHigh = "TipVeryHigh" + TipAboveFeeCap = "TipAboveFeeCap" + InvalidSender = "InvalidSender" + Underpriced = "Underpriced" + NonceTooLow = "NonceTooLow" + InsufficientFunds = "InsufficientFunds" + Overdraft = "Overdraft" + IntrinsicGas = "IntrinsicGas" + Throttle = "Throttle" + Overflow = "Overflow" + FutureReplacePending = "FutureReplacePending" + ReplaceUnderpriced = "ReplaceUnderpriced" + QueuedDiscard = "QueueDiscard" + GasUnitOverflow = "GasUnitOverflow" +) + +func meter(err string) metrics.Meter { + return metrics.GetOrRegisterMeter("txpool/invalid/"+err, nil) +} + +func init() { + // init the metrics + for _, err := range []string{ + AlreadyKnown, + TypeNotSupportDeposit, + TypeNotSupport1559, + TypeNotSupport2718, + MissingTransaction, + OversizedData, + MaxInitCodeSizeExceeded, + NegativeValue, + GasLimit, + FeeCapVeryHigh, + TipVeryHigh, + TipAboveFeeCap, + InvalidSender, + Underpriced, + NonceTooLow, + InsufficientFunds, + Overdraft, + IntrinsicGas, + Throttle, + Overflow, + FutureReplacePending, + ReplaceUnderpriced, + QueuedDiscard, + GasUnitOverflow, + } { + meter(err).Mark(0) + } +} diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 03e679a08e..376e9805a9 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -652,55 +652,68 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // This is for spam protection, not consensus, // as the external engine-API user authenticates deposits. if tx.Type() == types.DepositTxType { + meter(TypeNotSupportDeposit).Mark(1) return core.ErrTxTypeNotSupported } // Accept only legacy transactions until EIP-2718/2930 activates. if !pool.eip2718 && tx.Type() != types.LegacyTxType { + meter(TypeNotSupport2718).Mark(1) return core.ErrTxTypeNotSupported } // Reject dynamic fee transactions until EIP-1559 activates. if !pool.eip1559 && tx.Type() == types.DynamicFeeTxType { + meter(TypeNotSupport1559).Mark(1) return core.ErrTxTypeNotSupported } // Reject transactions over defined size to prevent DOS attacks if tx.Size() > txMaxSize { + meter(OversizedData).Mark(1) return ErrOversizedData } // Check whether the init code size has been exceeded. if pool.shanghai && tx.To() == nil && len(tx.Data()) > params.MaxInitCodeSize { + meter(MaxInitCodeSizeExceeded).Mark(1) return fmt.Errorf("%w: code size %v limit %v", core.ErrMaxInitCodeSizeExceeded, len(tx.Data()), params.MaxInitCodeSize) } // Transactions can't be negative. This may never happen using RLP decoded // transactions but may occur if you create a transaction using the RPC. if tx.Value().Sign() < 0 { + meter(NegativeValue).Mark(1) return ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. if pool.currentMaxGas < tx.Gas() { + meter(GasLimit).Mark(1) return ErrGasLimit } // Sanity check for extremely large numbers if tx.GasFeeCap().BitLen() > 256 { + meter(FeeCapVeryHigh).Mark(1) return core.ErrFeeCapVeryHigh } if tx.GasTipCap().BitLen() > 256 { + meter(TipVeryHigh).Mark(1) return core.ErrTipVeryHigh } // Ensure gasFeeCap is greater than or equal to gasTipCap. if tx.GasFeeCapIntCmp(tx.GasTipCap()) < 0 { + meter(TipAboveFeeCap).Mark(1) return core.ErrTipAboveFeeCap } // Make sure the transaction is signed properly. from, err := types.Sender(pool.signer, tx) if err != nil { + meter(InvalidSender).Mark(1) return ErrInvalidSender } // Drop non-local transactions under our own minimal accepted gas price or tip if !local && tx.GasTipCapIntCmp(pool.gasPrice) < 0 { + meter(Underpriced).Mark(1) return ErrUnderpriced } // Ensure the transaction adheres to nonce ordering if pool.currentState.GetNonce(from) > tx.Nonce() { + meter(NonceTooLow).Mark(1) return core.ErrNonceTooLow } // Transactor should have enough funds to cover the costs @@ -711,6 +724,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { } balance := pool.currentState.GetBalance(from) if balance.Cmp(cost) < 0 { + meter(InsufficientFunds).Mark(1) return core.ErrInsufficientFunds } @@ -728,6 +742,7 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { } if balance.Cmp(sum) < 0 { log.Trace("Replacing transactions would overdraft", "sender", from, "balance", pool.currentState.GetBalance(from), "required", sum) + meter(Overdraft).Mark(1) return ErrOverdraft } } @@ -735,9 +750,11 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // Ensure the transaction has more gas than the basic tx fee. intrGas, err := core.IntrinsicGas(tx.Data(), tx.AccessList(), tx.To() == nil, true, pool.istanbul, pool.shanghai) if err != nil { + meter(GasUnitOverflow).Mark(1) return err } if tx.Gas() < intrGas { + meter(IntrinsicGas).Mark(1) return core.ErrIntrinsicGas } return nil @@ -756,6 +773,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e if pool.all.Get(hash) != nil { log.Trace("Discarding already known transaction", "hash", hash) knownTxMeter.Mark(1) + meter(AlreadyKnown).Mark(1) return false, ErrAlreadyKnown } // Make the local flag. If it's from local source or it's from the network but @@ -778,6 +796,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e if !isLocal && pool.priced.Underpriced(tx) { log.Trace("Discarding underpriced transaction", "hash", hash, "gasTipCap", tx.GasTipCap(), "gasFeeCap", tx.GasFeeCap()) underpricedTxMeter.Mark(1) + meter(Underpriced).Mark(1) return false, ErrUnderpriced } @@ -787,6 +806,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e // replacements to 25% of the slots if pool.changesSinceReorg > int(pool.config.GlobalSlots/4) { throttleTxMeter.Mark(1) + meter(Throttle).Mark(1) return false, ErrTxPoolOverflow } @@ -799,6 +819,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e if !isLocal && !success { log.Trace("Discarding overflown transaction", "hash", hash) overflowedTxMeter.Mark(1) + meter(Overflow).Mark(1) return false, ErrTxPoolOverflow } @@ -818,6 +839,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e pool.priced.Put(dropTx, false) } log.Trace("Discarding future transaction replacing pending tx", "hash", hash) + meter(FutureReplacePending).Mark(1) return false, ErrFutureReplacePending } } @@ -837,6 +859,7 @@ func (pool *TxPool) add(tx *types.Transaction, local bool) (replaced bool, err e inserted, old := list.Add(tx, pool.config.PriceBump) if !inserted { pendingDiscardMeter.Mark(1) + meter(ReplaceUnderpriced).Mark(1) return false, ErrReplaceUnderpriced } // New transaction is better, replace old one @@ -902,6 +925,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo if !inserted { // An older transaction was better, discard this queuedDiscardMeter.Mark(1) + meter(QueuedDiscard).Mark(1) return false, ErrReplaceUnderpriced } // Discard any previous transaction and mark this @@ -916,6 +940,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo // If the transaction isn't in lookup set but it's expected to be there, // show the error log. if pool.all.Get(hash) == nil && !addAll { + meter(MissingTransaction).Mark(1) log.Error("Missing transaction in lookup set, please report the issue", "hash", hash) } if addAll { @@ -1034,6 +1059,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { if pool.all.Get(tx.Hash()) != nil { errs[i] = ErrAlreadyKnown knownTxMeter.Mark(1) + meter(AlreadyKnown).Mark(1) continue } // Exclude transactions with invalid signatures as soon as @@ -1043,6 +1069,7 @@ func (pool *TxPool) addTxs(txs []*types.Transaction, local, sync bool) []error { if err != nil { errs[i] = ErrInvalidSender invalidTxMeter.Mark(1) + meter(InvalidSender).Mark(1) continue } // Accumulate all unknown transactions for deeper processing diff --git a/eth/handler.go b/eth/handler.go index bcca1ff3b1..5c4709564c 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -672,11 +672,17 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { directPeers++ directCount += len(hashes) peer.AsyncSendTransactions(hashes) + log.Trace("Transaction broadcast bodies", "txs", len(hashes), + "peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(), + ) } for peer, hashes := range annos { annoPeers++ annoCount += len(hashes) peer.AsyncSendPooledTransactionHashes(hashes) + log.Trace("Transaction broadcast hashes", "txs", len(hashes), + "peer.id", peer.Node().ID().String(), "peer.IP", peer.Node().IP().String(), + ) } log.Debug("Transaction broadcast", "txs", len(txs), "announce packs", annoPeers, "announced hashes", annoCount, diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index bd07a9ba60..4aff122360 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -98,7 +98,7 @@ func (p *Peer) broadcastTransactions() { return } close(done) - p.Log().Trace("Sent transactions", "count", len(txs)) + p.Log().Trace("Sent transactions bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String()) }) } } @@ -176,7 +176,7 @@ func (p *Peer) announceTransactions() { } } close(done) - p.Log().Trace("Sent transaction announcements", "count", len(pending)) + p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String()) }) } } From adc05230c49f021e6c807e96412bb911e9511d21 Mon Sep 17 00:00:00 2001 From: andyzhang2023 Date: Thu, 30 Nov 2023 16:57:56 +0800 Subject: [PATCH 05/13] fix: transaction broadcasting trace logs --- eth/protocols/eth/broadcast.go | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index 4aff122360..fbfc9b5448 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -18,6 +18,7 @@ package eth import ( "math/big" + "strings" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" @@ -61,6 +62,22 @@ func (p *Peer) broadcastBlocks() { } } +func collectHashes(txs []*types.Transaction) []common.Hash { + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + } + return hashes +} + +func concat(hashes []common.Hash) string { + strslice := make([]string, len(hashes)) + for i, hash := range hashes { + strslice[i] = hash.String() + } + return strings.Join(strslice, ",") +} + // broadcastTransactions is a write loop that schedules transaction broadcasts // to the remote peer. The goal is to have an async writer that does not lock up // node internals and at the same time rate limits queued data. @@ -98,7 +115,7 @@ func (p *Peer) broadcastTransactions() { return } close(done) - p.Log().Trace("Sent transactions bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String()) + p.Log().Trace("Sent transaction bodies", "count", len(txs), "peer.id", p.Node().ID().String(), "peer.ip", p.Node().IP().String(), "hashes", concat(collectHashes(txs))) }) } } @@ -176,7 +193,7 @@ func (p *Peer) announceTransactions() { } } close(done) - p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String()) + p.Log().Trace("Sent transaction announcements", "count", len(pending), "peer.Id", p.ID(), "peer.IP", p.Node().IP().String(), "hashes", concat(pending)) }) } } From 23d7cb56aecf05f5f765d50f79e71a26062d84a7 Mon Sep 17 00:00:00 2001 From: redhdx <136775144+redhdx@users.noreply.github.com> Date: Wed, 13 Dec 2023 17:40:22 +0800 Subject: [PATCH 06/13] chore: add changelog doc (#37) * chore: add changelog doc * Update CHANGELOG.md * Update CHANGELOG.md --------- Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com> --- CHANGELOG.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c8672f5ec1..c6998d3629 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,32 @@ # Changelog +## v0.2.2 + +This is a minor release for opBNB Mainnet and Testnet. +It primarily optimizes op-geth and introduces an option to re-announce remote transactions. +Upgrading is optional. + +### User Facing Changes + +- The startup node will default to using the bootnodes of the opBNB mainnet. If the `--networkid=` is configured as testnet, the testnet bootnodes will be used. If `--bootnodes=` is configured, the specified bootnodes will be used. The configured `--bootnodes=` take precedence over other options.[#32](https://github.com/bnb-chain/op-geth/pull/32) +- Enable re-announce remote transactions by using the flag `--txpool.reannounceremotes=true`.[#33](https://github.com/bnb-chain/op-geth/pull/33) + +### Partial Changelog + +- [#16](https://github.com/bnb-chain/op-geth/pull/16): fix: wrong event log value +- [#31](https://github.com/bnb-chain/op-geth/pull/31): ci: fix blst error and unknown architecture +- [#32](https://github.com/bnb-chain/op-geth/pull/32): feature: add opBNB bootnodes +- [#33](https://github.com/bnb-chain/op-geth/pull/33): feat: add option to reannounce remote transactions +- [#34](https://github.com/bnb-chain/op-geth/pull/34): fix: clear underpriced buffer + +### Docker Images + +- ghcr.io/bnb-chain/op-geth:v0.2.2 + +### Full Changelog + +https://github.com/bnb-chain/op-geth/compare/v0.2.1...v0.2.2 + ## v0.2.1 This is the Fermat Hardfork release for opBNB Mainnet. From 99f82891d24b6ebdab1928bbc021fc952e499cef Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Fri, 5 Jan 2024 10:24:07 +0800 Subject: [PATCH 07/13] feat: add TrieCommitInterval configuration, commit trie every TrieCommitInterval blocks. (#45) * try * add TrieCommitInterval into ethconfig --------- Co-authored-by: Welkin --- core/blockchain.go | 4 +- eth/backend.go | 1 + eth/ethconfig/config.go | 2 + eth/ethconfig/gen_config.go | 225 ++++++++++++++++++++++-------------- 4 files changed, 143 insertions(+), 89 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 2b51883f81..cd0952e61c 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -143,6 +143,8 @@ type CacheConfig struct { SnapshotNoBuild bool // Whether the background generation is allowed SnapshotWait bool // Wait for snapshot construction on startup. TODO(karalabe): This is a dirty hack for testing, nuke it + + TrieCommitInterval uint64 // Define a block height interval, commit trie every TrieCommitInterval block height. } // defaultCacheConfig are the default caching values if none are specified by the @@ -1424,7 +1426,7 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. chosen := current - bc.cacheConfig.TriesInMemory flushInterval := time.Duration(atomic.LoadInt64(&bc.flushInterval)) // If we exceeded time allowance, flush an entire trie to disk - if bc.gcproc > flushInterval { + if bc.gcproc > flushInterval || (bc.cacheConfig.TrieCommitInterval != 0 && chosen%bc.cacheConfig.TrieCommitInterval == 0) { // If the header is missing (canonical chain behind), we're reorging a low // diff sidechain. Suspend committing until this operation is completed. header := bc.GetHeaderByNumber(chosen) diff --git a/eth/backend.go b/eth/backend.go index d55c80e4b4..a286227da0 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -199,6 +199,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { TriesInMemory: config.TriesInMemory, SnapshotLimit: config.SnapshotCache, Preimages: config.Preimages, + TrieCommitInterval: config.TrieCommitInterval, } ) // Override the chain config with provided settings. diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index d188fe1413..8db7894d09 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -84,6 +84,7 @@ var Defaults = Config{ TrieDirtyCache: 256, TrieTimeout: 60 * time.Minute, TriesInMemory: 128, + TrieCommitInterval: 0, SnapshotCache: 102, FilterLogCacheSize: 32, Miner: miner.DefaultConfig, @@ -168,6 +169,7 @@ type Config struct { TrieDirtyCache int TrieTimeout time.Duration TriesInMemory uint64 // How many tries keeps in memory + TrieCommitInterval uint64 // Define a block height interval, commit trie every TrieCommitInterval block height. SnapshotCache int Preimages bool diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index 6dbcf0fac9..3c029e2606 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -3,6 +3,7 @@ package ethconfig import ( + "math/big" "time" "github.com/ethereum/go-ethereum/common" @@ -18,50 +19,58 @@ import ( // MarshalTOML marshals as TOML. func (c Config) MarshalTOML() (interface{}, error) { type Config struct { - Genesis *core.Genesis `toml:",omitempty"` - NetworkId uint64 - SyncMode downloader.SyncMode - EthDiscoveryURLs []string - SnapDiscoveryURLs []string - NoPruning bool - NoPrefetch bool - TxLookupLimit uint64 `toml:",omitempty"` - RequiredBlocks map[uint64]common.Hash `toml:"-"` - LightServ int `toml:",omitempty"` - LightIngress int `toml:",omitempty"` - LightEgress int `toml:",omitempty"` - LightPeers int `toml:",omitempty"` - LightNoPrune bool `toml:",omitempty"` - LightNoSyncServe bool `toml:",omitempty"` - SyncFromCheckpoint bool `toml:",omitempty"` - UltraLightServers []string `toml:",omitempty"` - UltraLightFraction int `toml:",omitempty"` - UltraLightOnlyAnnounce bool `toml:",omitempty"` - SkipBcVersionCheck bool `toml:"-"` - DatabaseHandles int `toml:"-"` - DatabaseCache int - DatabaseFreezer string - TrieCleanCache int - TrieCleanCacheJournal string `toml:",omitempty"` - TrieCleanCacheRejournal time.Duration `toml:",omitempty"` - TrieDirtyCache int - TrieTimeout time.Duration - TriesInMemory uint64 `toml:",omitempty"` - SnapshotCache int - Preimages bool - FilterLogCacheSize int - Miner miner.Config - Ethash ethash.Config - TxPool txpool.Config - GPO gasprice.Config - EnablePreimageRecording bool - DocRoot string `toml:"-"` - RPCGasCap uint64 - RPCEVMTimeout time.Duration - RPCTxFeeCap float64 - Checkpoint *params.TrustedCheckpoint `toml:",omitempty"` - CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"` - OverrideShanghai *uint64 `toml:",omitempty"` + Genesis *core.Genesis `toml:",omitempty"` + NetworkId uint64 + SyncMode downloader.SyncMode + EthDiscoveryURLs []string + SnapDiscoveryURLs []string + NoPruning bool + NoPrefetch bool + TxLookupLimit uint64 `toml:",omitempty"` + RequiredBlocks map[uint64]common.Hash `toml:"-"` + LightServ int `toml:",omitempty"` + LightIngress int `toml:",omitempty"` + LightEgress int `toml:",omitempty"` + LightPeers int `toml:",omitempty"` + LightNoPrune bool `toml:",omitempty"` + LightNoSyncServe bool `toml:",omitempty"` + SyncFromCheckpoint bool `toml:",omitempty"` + UltraLightServers []string `toml:",omitempty"` + UltraLightFraction int `toml:",omitempty"` + UltraLightOnlyAnnounce bool `toml:",omitempty"` + SkipBcVersionCheck bool `toml:"-"` + DatabaseHandles int `toml:"-"` + DatabaseCache int + DatabaseFreezer string + TrieCleanCache int + TrieCleanCacheJournal string `toml:",omitempty"` + TrieCleanCacheRejournal time.Duration `toml:",omitempty"` + TrieDirtyCache int + TrieTimeout time.Duration + TriesInMemory uint64 + TrieCommitInterval uint64 + SnapshotCache int + Preimages bool + FilterLogCacheSize int + Miner miner.Config + Ethash ethash.Config + TxPool txpool.Config + GPO gasprice.Config + EnablePreimageRecording bool + DocRoot string `toml:"-"` + RPCGasCap uint64 + RPCEVMTimeout time.Duration + RPCTxFeeCap float64 + Checkpoint *params.TrustedCheckpoint `toml:",omitempty"` + CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"` + OverrideShanghai *uint64 `toml:",omitempty"` + OverrideOptimismBedrock *big.Int + OverrideOptimismRegolith *uint64 `toml:",omitempty"` + OverrideOptimism *bool + RollupSequencerHTTP string + RollupHistoricalRPC string + RollupHistoricalRPCTimeout time.Duration + RollupDisableTxPoolGossip bool } var enc Config enc.Genesis = c.Genesis @@ -93,6 +102,7 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.TrieDirtyCache = c.TrieDirtyCache enc.TrieTimeout = c.TrieTimeout enc.TriesInMemory = c.TriesInMemory + enc.TrieCommitInterval = c.TrieCommitInterval enc.SnapshotCache = c.SnapshotCache enc.Preimages = c.Preimages enc.FilterLogCacheSize = c.FilterLogCacheSize @@ -108,56 +118,71 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.Checkpoint = c.Checkpoint enc.CheckpointOracle = c.CheckpointOracle enc.OverrideShanghai = c.OverrideShanghai + enc.OverrideOptimismBedrock = c.OverrideOptimismBedrock + enc.OverrideOptimismRegolith = c.OverrideOptimismRegolith + enc.OverrideOptimism = c.OverrideOptimism + enc.RollupSequencerHTTP = c.RollupSequencerHTTP + enc.RollupHistoricalRPC = c.RollupHistoricalRPC + enc.RollupHistoricalRPCTimeout = c.RollupHistoricalRPCTimeout + enc.RollupDisableTxPoolGossip = c.RollupDisableTxPoolGossip return &enc, nil } // UnmarshalTOML unmarshals from TOML. func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { type Config struct { - Genesis *core.Genesis `toml:",omitempty"` - NetworkId *uint64 - SyncMode *downloader.SyncMode - EthDiscoveryURLs []string - SnapDiscoveryURLs []string - NoPruning *bool - NoPrefetch *bool - TxLookupLimit *uint64 `toml:",omitempty"` - RequiredBlocks map[uint64]common.Hash `toml:"-"` - LightServ *int `toml:",omitempty"` - LightIngress *int `toml:",omitempty"` - LightEgress *int `toml:",omitempty"` - LightPeers *int `toml:",omitempty"` - LightNoPrune *bool `toml:",omitempty"` - LightNoSyncServe *bool `toml:",omitempty"` - SyncFromCheckpoint *bool `toml:",omitempty"` - UltraLightServers []string `toml:",omitempty"` - UltraLightFraction *int `toml:",omitempty"` - UltraLightOnlyAnnounce *bool `toml:",omitempty"` - SkipBcVersionCheck *bool `toml:"-"` - DatabaseHandles *int `toml:"-"` - DatabaseCache *int - DatabaseFreezer *string - TrieCleanCache *int - TrieCleanCacheJournal *string `toml:",omitempty"` - TrieCleanCacheRejournal *time.Duration `toml:",omitempty"` - TrieDirtyCache *int - TrieTimeout *time.Duration - TriesInMemory *uint64 `toml:",omitempty"` - SnapshotCache *int - Preimages *bool - FilterLogCacheSize *int - Miner *miner.Config - Ethash *ethash.Config - TxPool *txpool.Config - GPO *gasprice.Config - EnablePreimageRecording *bool - DocRoot *string `toml:"-"` - RPCGasCap *uint64 - RPCEVMTimeout *time.Duration - RPCTxFeeCap *float64 - Checkpoint *params.TrustedCheckpoint `toml:",omitempty"` - CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"` - OverrideShanghai *uint64 `toml:",omitempty"` + Genesis *core.Genesis `toml:",omitempty"` + NetworkId *uint64 + SyncMode *downloader.SyncMode + EthDiscoveryURLs []string + SnapDiscoveryURLs []string + NoPruning *bool + NoPrefetch *bool + TxLookupLimit *uint64 `toml:",omitempty"` + RequiredBlocks map[uint64]common.Hash `toml:"-"` + LightServ *int `toml:",omitempty"` + LightIngress *int `toml:",omitempty"` + LightEgress *int `toml:",omitempty"` + LightPeers *int `toml:",omitempty"` + LightNoPrune *bool `toml:",omitempty"` + LightNoSyncServe *bool `toml:",omitempty"` + SyncFromCheckpoint *bool `toml:",omitempty"` + UltraLightServers []string `toml:",omitempty"` + UltraLightFraction *int `toml:",omitempty"` + UltraLightOnlyAnnounce *bool `toml:",omitempty"` + SkipBcVersionCheck *bool `toml:"-"` + DatabaseHandles *int `toml:"-"` + DatabaseCache *int + DatabaseFreezer *string + TrieCleanCache *int + TrieCleanCacheJournal *string `toml:",omitempty"` + TrieCleanCacheRejournal *time.Duration `toml:",omitempty"` + TrieDirtyCache *int + TrieTimeout *time.Duration + TriesInMemory *uint64 + TrieCommitInterval *uint64 + SnapshotCache *int + Preimages *bool + FilterLogCacheSize *int + Miner *miner.Config + Ethash *ethash.Config + TxPool *txpool.Config + GPO *gasprice.Config + EnablePreimageRecording *bool + DocRoot *string `toml:"-"` + RPCGasCap *uint64 + RPCEVMTimeout *time.Duration + RPCTxFeeCap *float64 + Checkpoint *params.TrustedCheckpoint `toml:",omitempty"` + CheckpointOracle *params.CheckpointOracleConfig `toml:",omitempty"` + OverrideShanghai *uint64 `toml:",omitempty"` + OverrideOptimismBedrock *big.Int + OverrideOptimismRegolith *uint64 `toml:",omitempty"` + OverrideOptimism *bool + RollupSequencerHTTP *string + RollupHistoricalRPC *string + RollupHistoricalRPCTimeout *time.Duration + RollupDisableTxPoolGossip *bool } var dec Config if err := unmarshal(&dec); err != nil { @@ -250,6 +275,9 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.TriesInMemory != nil { c.TriesInMemory = *dec.TriesInMemory } + if dec.TrieCommitInterval != nil { + c.TrieCommitInterval = *dec.TrieCommitInterval + } if dec.SnapshotCache != nil { c.SnapshotCache = *dec.SnapshotCache } @@ -295,5 +323,26 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.OverrideShanghai != nil { c.OverrideShanghai = dec.OverrideShanghai } + if dec.OverrideOptimismBedrock != nil { + c.OverrideOptimismBedrock = dec.OverrideOptimismBedrock + } + if dec.OverrideOptimismRegolith != nil { + c.OverrideOptimismRegolith = dec.OverrideOptimismRegolith + } + if dec.OverrideOptimism != nil { + c.OverrideOptimism = dec.OverrideOptimism + } + if dec.RollupSequencerHTTP != nil { + c.RollupSequencerHTTP = *dec.RollupSequencerHTTP + } + if dec.RollupHistoricalRPC != nil { + c.RollupHistoricalRPC = *dec.RollupHistoricalRPC + } + if dec.RollupHistoricalRPCTimeout != nil { + c.RollupHistoricalRPCTimeout = *dec.RollupHistoricalRPCTimeout + } + if dec.RollupDisableTxPoolGossip != nil { + c.RollupDisableTxPoolGossip = *dec.RollupDisableTxPoolGossip + } return nil } From edd4395310ae2c9019d3fbe56a59bc7382f7f03d Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Fri, 5 Jan 2024 10:26:54 +0800 Subject: [PATCH 08/13] fix: handle error (#19) Co-authored-by: Welkin Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com> --- common/gopool/pool.go | 8 ++++++-- core/blockchain.go | 10 ++++++++-- core/state_prefetcher.go | 7 ++++++- 3 files changed, 20 insertions(+), 5 deletions(-) diff --git a/common/gopool/pool.go b/common/gopool/pool.go index 2c6647aa76..990db4313b 100644 --- a/common/gopool/pool.go +++ b/common/gopool/pool.go @@ -3,6 +3,7 @@ package gopool import ( "time" + "github.com/ethereum/go-ethereum/log" "github.com/panjf2000/ants/v2" ) @@ -30,8 +31,11 @@ func init() { } // Submit submits a task to pool. -func Submit(task func()) error { - return defaultPool.Submit(task) +func Submit(task func()) { + err := defaultPool.Submit(task) + if err != nil { + log.Error("pool submit task fail", "err", err, "task", task) + } } // Running returns the number of the currently running goroutines. diff --git a/core/blockchain.go b/core/blockchain.go index cd0952e61c..9d21cb8e92 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1420,7 +1420,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. limit = common.StorageSize(bc.cacheConfig.TrieDirtyLimit) * 1024 * 1024 ) if nodes > limit || imgs > 4*1024*1024 { - bc.triedb.Cap(limit - ethdb.IdealBatchSize) + err := bc.triedb.Cap(limit - ethdb.IdealBatchSize) + if err != nil { + return err + } } // Find the next state trie we need to commit chosen := current - bc.cacheConfig.TriesInMemory @@ -1439,7 +1442,10 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. log.Info("State in memory for too long, committing", "time", bc.gcproc, "allowance", flushInterval, "optimum", float64(chosen-bc.lastWrite)/TriesInMemory) } // Flush an entire trie and restart the counters - bc.triedb.Commit(header.Root, true) + err := bc.triedb.Commit(header.Root, true) + if err != nil { + return err + } bc.lastWrite = chosen bc.gcproc = 0 } diff --git a/core/state_prefetcher.go b/core/state_prefetcher.go index 5e7f52c701..cf929a13b6 100644 --- a/core/state_prefetcher.go +++ b/core/state_prefetcher.go @@ -22,6 +22,7 @@ import ( "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/params" ) @@ -77,7 +78,11 @@ func (p *statePrefetcher) Prefetch(block *types.Block, statedb *state.StateDB, c return // Also invalid block, bail out } newStatedb.SetTxContext(tx.Hash(), i) - precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) + preCacheErr := precacheTransaction(msg, p.config, gaspool, newStatedb, header, evm) + if preCacheErr != nil { + log.Warn("precacheTransaction fail", "err", preCacheErr) + return + } case <-interruptCh: // If block precaching was interrupted, abort From 9d35ab5924b1e1595883ef59c6d7338038274b9e Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Fri, 5 Jan 2024 10:27:29 +0800 Subject: [PATCH 09/13] fix: remove redundant lock (#21) Co-authored-by: Welkin --- core/blockchain.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/core/blockchain.go b/core/blockchain.go index 9d21cb8e92..9808e48b8a 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -229,7 +229,6 @@ type BlockChain struct { quit chan struct{} // shutdown signal, closed in Stop. running int32 // 0 if chain is running, 1 when stopped procInterrupt int32 // interrupt signaler for block processing - commitLock sync.Mutex // CommitLock is used to protect above field from being modified concurrently engine consensus.Engine validator Validator // Block and state validator interface @@ -1397,8 +1396,6 @@ func (bc *BlockChain) writeBlockWithState(block *types.Block, receipts []*types. postCommitFuncs := []func() error{ func() error { - bc.commitLock.Lock() - defer bc.commitLock.Unlock() root := block.Root() // If we're running an archive node, always flush From e13d451e5429b514b84afa748faf3c254aff9735 Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Fri, 5 Jan 2024 10:28:05 +0800 Subject: [PATCH 10/13] fix: refraining from using gopool for long-running tasks (#20) Co-authored-by: Welkin --- eth/bloombits.go | 5 ++--- p2p/dial.go | 4 ++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/eth/bloombits.go b/eth/bloombits.go index 314317ae4f..0cb7050d23 100644 --- a/eth/bloombits.go +++ b/eth/bloombits.go @@ -20,7 +20,6 @@ import ( "time" "github.com/ethereum/go-ethereum/common/bitutil" - "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/rawdb" ) @@ -46,7 +45,7 @@ const ( // retrievals from possibly a range of filters and serving the data to satisfy. func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { for i := 0; i < bloomServiceThreads; i++ { - gopool.Submit(func() { + go func() { for { select { case <-eth.closeBloomHandler: @@ -70,6 +69,6 @@ func (eth *Ethereum) startBloomHandlers(sectionSize uint64) { request <- task } } - }) + }() } } diff --git a/p2p/dial.go b/p2p/dial.go index 49bf6d543a..c5dc3c41fa 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -178,8 +178,8 @@ func newDialScheduler(config dialConfig, it enode.Iterator, setupFunc dialSetupF d.lastStatsLog = d.clock.Now() d.ctx, d.cancel = context.WithCancel(context.Background()) d.wg.Add(2) - gopool.Submit(func() { d.readNodes(it) }) - gopool.Submit(func() { d.loop(it) }) + go func() { d.readNodes(it) }() + go func() { d.loop(it) }() return d } From 1f69107650d3b74055cb600107cd940c0412ef96 Mon Sep 17 00:00:00 2001 From: welkin22 <136572398+welkin22@users.noreply.github.com> Date: Fri, 5 Jan 2024 10:28:41 +0800 Subject: [PATCH 11/13] fix: bubbling up the error when DeriveField() fails (#18) Co-authored-by: Welkin --- core/blockchain.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/blockchain.go b/core/blockchain.go index 9808e48b8a..ed2e451979 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -1874,11 +1874,14 @@ func (bc *BlockChain) insertChain(chain types.Blocks, verifySeals, setHead bool) proctime := time.Since(start) // processing + validation // pre-cache the block and receipts, so that it can be retrieved quickly by rcp - bc.CacheBlock(block.Hash(), block) err = types.Receipts(receipts).DeriveFields(bc.chainConfig, block.Hash(), block.NumberU64(), block.Time(), block.BaseFee(), block.Transactions()) if err != nil { log.Warn("Failed to derive receipt fields", "block", block.Hash(), "err", err) + bc.reportBlock(block, receipts, err) + close(interruptCh) + return it.index, err } + bc.CacheBlock(block.Hash(), block) bc.CacheReceipts(block.Hash(), receipts) // Update the metrics touched during block processing and validation From 7a5b85ea4c5f93ed14c1b2632cd8ecf5f7f3a585 Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Fri, 5 Jan 2024 12:24:29 +0800 Subject: [PATCH 12/13] add reannounce metric for txpool (#43) Co-authored-by: andyzhang2023 --- core/txpool/txpool.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 52d5584d2b..c279b1a599 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -111,6 +111,8 @@ var ( ) var ( + staledMeter = metrics.NewRegisteredMeter("txpool/staled/count", nil) // staled transactions + // Metrics for the pending pool pendingDiscardMeter = metrics.NewRegisteredMeter("txpool/pending/discard", nil) pendingReplaceMeter = metrics.NewRegisteredMeter("txpool/pending/replace", nil) @@ -452,6 +454,7 @@ func (pool *TxPool) loop() { return txs }() pool.mu.RUnlock() + staledMeter.Mark(int64(len(reannoTxs))) if len(reannoTxs) > 0 { pool.reannoTxFeed.Send(core.ReannoTxsEvent{reannoTxs}) } From 69afb859804e8155e43795e08cd9e7f8b465cfc3 Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Fri, 5 Jan 2024 14:55:19 +0800 Subject: [PATCH 13/13] txpool: enhance some logs and metrics on broadcasting and annoucing (#41) * txpool: enhance some logs and metrics on broadcasting and annoucing * add transactions hashes detail in warning logs * feature: add tx send logs * enhance logs for announcement * enhance logs for announcement * fix metric txpool/valid * fix comment --------- Co-authored-by: andyzhang2023 Co-authored-by: redhdx Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com> --- core/txpool/txpool.go | 3 ++- eth/fetcher/tx_fetcher.go | 24 ++++++++++++++++++++++++ eth/handler.go | 2 ++ eth/protocols/eth/broadcast.go | 21 +++++++++++++++++++++ 4 files changed, 49 insertions(+), 1 deletion(-) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index c279b1a599..27f42d43a5 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -1113,10 +1113,10 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, replaced, err := pool.add(tx, local) errs[i] = err if err == nil && !replaced { + validTxMeter.Mark(1) dirty.addTx(tx) } } - validTxMeter.Mark(int64(len(dirty.accounts))) return errs, dirty } @@ -1539,6 +1539,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans if pool.promoteTx(addr, hash, tx) { promoted = append(promoted, tx) } + log.Trace("Promoted queued transaction", "hash", hash) } log.Trace("Promoted queued transactions", "count", len(promoted)) queuedGauge.Dec(int64(len(readies))) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 4e016c833a..82aba676f1 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -22,6 +22,7 @@ import ( "fmt" mrand "math/rand" "sort" + "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -237,6 +238,8 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { duplicate++ case f.isKnownUnderpriced(hash): underpriced++ + log.Info("announced transaction is underpriced", "hash", hash.String()) + default: unknowns = append(unknowns, hash) } @@ -323,6 +326,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) default: otherreject++ + log.Warn("Peer's transaction rejected", "peer", peer, "txHash", batch[j].Hash().String(), "err", err.Error()) } added = append(added, batch[j].Hash()) } @@ -389,11 +393,13 @@ func (f *TxFetcher) loop() { // check. Should be fine as the limit is in the thousands and the // request size in the hundreds. txAnnounceDOSMeter.Mark(int64(len(ann.hashes))) + log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes), "num", len(ann.hashes)) break } want := used + len(ann.hashes) if want > maxTxAnnounces { txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces)) + log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes[want-maxTxAnnounces:]), "num", len(ann.hashes)) ann.hashes = ann.hashes[:want-maxTxAnnounces] } // All is well, schedule the remainder of the transactions @@ -505,6 +511,7 @@ func (f *TxFetcher) loop() { for peer, req := range f.requests { if time.Duration(f.clock.Now()-req.time)+txGatherSlack > txFetchTimeout { txRequestTimeoutMeter.Mark(int64(len(req.hashes))) + log.Info("announced transaction request timeout", "hashes", joinHashes(req.hashes), "num", len(req.hashes)) // Reschedule all the not-yet-delivered fetches to alternate peers for _, hash := range req.hashes { @@ -824,6 +831,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, // failure (e.g. peer disconnected), reschedule the hashes. if err := f.fetchTxs(peer, hashes); err != nil { txRequestFailMeter.Mark(int64(len(hashes))) + log.Info("announced transaction request failed", "hashes", joinHashes(hashes), "num", len(hashes)) f.Drop(peer) } }) @@ -916,3 +924,19 @@ func rotateHashes(slice []common.Hash, n int) { slice[i] = orig[(i+n)%len(orig)] } } + +// joinHashes concat hashes into string, for debugging logs; 1024 hashes at most, to avoid +// too much cost of logging +func joinHashes(hashes []common.Hash) string { + num := len(hashes) + if num > 1024 { + num = 1024 + } + strs := make([]string, num) + for i, h := range hashes { + if i < num { + strs[i] = h.String() + } + } + return strings.Join(strs, ",") +} diff --git a/eth/handler.go b/eth/handler.go index 5c4709564c..0c19ad8388 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -662,10 +662,12 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { numDirect := int(math.Sqrt(float64(len(peers)))) for _, peer := range peers[:numDirect] { txset[peer] = append(txset[peer], tx.Hash()) + log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash()) } // For the remaining peers, send announcement only for _, peer := range peers[numDirect:] { annos[peer] = append(annos[peer], tx.Hash()) + log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash()) } } for peer, hashes := range txset { diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index fbfc9b5448..d9a35c9b81 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -23,6 +23,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + txAnnounceAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/abandon", nil) + txBroadcastAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/abandon", nil) ) const ( @@ -62,6 +68,14 @@ func (p *Peer) broadcastBlocks() { } } +// safeGetPeerIP +var safeGetPeerIP = func(p *Peer) string { + if p.Node() != nil && p.Node().IP() != nil { + return p.Node().IP().String() + } + return "UNKNOWN" +} + func collectHashes(txs []*types.Transaction) []common.Hash { hashes := make([]common.Hash, len(txs)) for i, tx := range txs { @@ -111,6 +125,7 @@ func (p *Peer) broadcastTransactions() { done = make(chan struct{}) gopool.Submit(func() { if err := p.SendTransactions(txs); err != nil { + p.Log().Warn("Broadcast transactions failed", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "lost", len(txs), "hashes", concat(collectHashes(txs)), "err", err.Error()) fail <- err return } @@ -130,6 +145,8 @@ func (p *Peer) broadcastTransactions() { queue = append(queue, hashes...) if len(queue) > maxQueuedTxs { // Fancy copy and resize to ensure buffer doesn't grow indefinitely + p.Log().Warn("Broadcast hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxs, "hashes", concat(queue[:len(queue)-maxQueuedTxs])) + txBroadcastAbandonMeter.Mark(int64(len(queue) - maxQueuedTxs)) queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])] } @@ -183,11 +200,13 @@ func (p *Peer) announceTransactions() { gopool.Submit(func() { if p.version >= ETH68 { if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil { + p.Log().Warn("Announce hashes68 failed", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "lost", len(pending), "hashes", concat(pending), "err", err.Error()) fail <- err return } } else { if err := p.sendPooledTransactionHashes66(pending); err != nil { + p.Log().Warn("Announce hashes66 failed", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "lost", len(pending), "hashes", concat(pending), "err", err.Error()) fail <- err return } @@ -207,6 +226,8 @@ func (p *Peer) announceTransactions() { // New batch of transactions to be broadcast, queue them (with cap) queue = append(queue, hashes...) if len(queue) > maxQueuedTxAnns { + p.Log().Warn("Announce hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxAnns, "hashes", concat(queue[:len(queue)-maxQueuedTxAnns])) + txAnnounceAbandonMeter.Mark(int64(len(queue) - maxQueuedTxAnns)) // Fancy copy and resize to ensure buffer doesn't grow indefinitely queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])] }