From 69afb859804e8155e43795e08cd9e7f8b465cfc3 Mon Sep 17 00:00:00 2001 From: andyzhang2023 <147463846+andyzhang2023@users.noreply.github.com> Date: Fri, 5 Jan 2024 14:55:19 +0800 Subject: [PATCH] txpool: enhance some logs and metrics on broadcasting and annoucing (#41) * txpool: enhance some logs and metrics on broadcasting and annoucing * add transactions hashes detail in warning logs * feature: add tx send logs * enhance logs for announcement * enhance logs for announcement * fix metric txpool/valid * fix comment --------- Co-authored-by: andyzhang2023 Co-authored-by: redhdx Co-authored-by: Owen <103096885+owen-reorg@users.noreply.github.com> --- core/txpool/txpool.go | 3 ++- eth/fetcher/tx_fetcher.go | 24 ++++++++++++++++++++++++ eth/handler.go | 2 ++ eth/protocols/eth/broadcast.go | 21 +++++++++++++++++++++ 4 files changed, 49 insertions(+), 1 deletion(-) diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index c279b1a599..27f42d43a5 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -1113,10 +1113,10 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) ([]error, replaced, err := pool.add(tx, local) errs[i] = err if err == nil && !replaced { + validTxMeter.Mark(1) dirty.addTx(tx) } } - validTxMeter.Mark(int64(len(dirty.accounts))) return errs, dirty } @@ -1539,6 +1539,7 @@ func (pool *TxPool) promoteExecutables(accounts []common.Address) []*types.Trans if pool.promoteTx(addr, hash, tx) { promoted = append(promoted, tx) } + log.Trace("Promoted queued transaction", "hash", hash) } log.Trace("Promoted queued transactions", "count", len(promoted)) queuedGauge.Dec(int64(len(readies))) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go index 4e016c833a..82aba676f1 100644 --- a/eth/fetcher/tx_fetcher.go +++ b/eth/fetcher/tx_fetcher.go @@ -22,6 +22,7 @@ import ( "fmt" mrand "math/rand" "sort" + "strings" "time" "github.com/ethereum/go-ethereum/common" @@ -237,6 +238,8 @@ func (f *TxFetcher) Notify(peer string, hashes []common.Hash) error { duplicate++ case f.isKnownUnderpriced(hash): underpriced++ + log.Info("announced transaction is underpriced", "hash", hash.String()) + default: unknowns = append(unknowns, hash) } @@ -323,6 +326,7 @@ func (f *TxFetcher) Enqueue(peer string, txs []*types.Transaction, direct bool) default: otherreject++ + log.Warn("Peer's transaction rejected", "peer", peer, "txHash", batch[j].Hash().String(), "err", err.Error()) } added = append(added, batch[j].Hash()) } @@ -389,11 +393,13 @@ func (f *TxFetcher) loop() { // check. Should be fine as the limit is in the thousands and the // request size in the hundreds. txAnnounceDOSMeter.Mark(int64(len(ann.hashes))) + log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes), "num", len(ann.hashes)) break } want := used + len(ann.hashes) if want > maxTxAnnounces { txAnnounceDOSMeter.Mark(int64(want - maxTxAnnounces)) + log.Info("announced transaction DOS overflow", "hashes", joinHashes(ann.hashes[want-maxTxAnnounces:]), "num", len(ann.hashes)) ann.hashes = ann.hashes[:want-maxTxAnnounces] } // All is well, schedule the remainder of the transactions @@ -505,6 +511,7 @@ func (f *TxFetcher) loop() { for peer, req := range f.requests { if time.Duration(f.clock.Now()-req.time)+txGatherSlack > txFetchTimeout { txRequestTimeoutMeter.Mark(int64(len(req.hashes))) + log.Info("announced transaction request timeout", "hashes", joinHashes(req.hashes), "num", len(req.hashes)) // Reschedule all the not-yet-delivered fetches to alternate peers for _, hash := range req.hashes { @@ -824,6 +831,7 @@ func (f *TxFetcher) scheduleFetches(timer *mclock.Timer, timeout chan struct{}, // failure (e.g. peer disconnected), reschedule the hashes. if err := f.fetchTxs(peer, hashes); err != nil { txRequestFailMeter.Mark(int64(len(hashes))) + log.Info("announced transaction request failed", "hashes", joinHashes(hashes), "num", len(hashes)) f.Drop(peer) } }) @@ -916,3 +924,19 @@ func rotateHashes(slice []common.Hash, n int) { slice[i] = orig[(i+n)%len(orig)] } } + +// joinHashes concat hashes into string, for debugging logs; 1024 hashes at most, to avoid +// too much cost of logging +func joinHashes(hashes []common.Hash) string { + num := len(hashes) + if num > 1024 { + num = 1024 + } + strs := make([]string, num) + for i, h := range hashes { + if i < num { + strs[i] = h.String() + } + } + return strings.Join(strs, ",") +} diff --git a/eth/handler.go b/eth/handler.go index 5c4709564c..0c19ad8388 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -662,10 +662,12 @@ func (h *handler) BroadcastTransactions(txs types.Transactions) { numDirect := int(math.Sqrt(float64(len(peers)))) for _, peer := range peers[:numDirect] { txset[peer] = append(txset[peer], tx.Hash()) + log.Trace("Broadcast transaction", "peer", peer.ID(), "hash", tx.Hash()) } // For the remaining peers, send announcement only for _, peer := range peers[numDirect:] { annos[peer] = append(annos[peer], tx.Hash()) + log.Trace("Announce transaction", "peer", peer.ID(), "hash", tx.Hash()) } } for peer, hashes := range txset { diff --git a/eth/protocols/eth/broadcast.go b/eth/protocols/eth/broadcast.go index fbfc9b5448..d9a35c9b81 100644 --- a/eth/protocols/eth/broadcast.go +++ b/eth/protocols/eth/broadcast.go @@ -23,6 +23,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/gopool" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/metrics" +) + +var ( + txAnnounceAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/announces/abandon", nil) + txBroadcastAbandonMeter = metrics.NewRegisteredMeter("eth/fetcher/transaction/broadcasts/abandon", nil) ) const ( @@ -62,6 +68,14 @@ func (p *Peer) broadcastBlocks() { } } +// safeGetPeerIP +var safeGetPeerIP = func(p *Peer) string { + if p.Node() != nil && p.Node().IP() != nil { + return p.Node().IP().String() + } + return "UNKNOWN" +} + func collectHashes(txs []*types.Transaction) []common.Hash { hashes := make([]common.Hash, len(txs)) for i, tx := range txs { @@ -111,6 +125,7 @@ func (p *Peer) broadcastTransactions() { done = make(chan struct{}) gopool.Submit(func() { if err := p.SendTransactions(txs); err != nil { + p.Log().Warn("Broadcast transactions failed", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "lost", len(txs), "hashes", concat(collectHashes(txs)), "err", err.Error()) fail <- err return } @@ -130,6 +145,8 @@ func (p *Peer) broadcastTransactions() { queue = append(queue, hashes...) if len(queue) > maxQueuedTxs { // Fancy copy and resize to ensure buffer doesn't grow indefinitely + p.Log().Warn("Broadcast hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxs, "hashes", concat(queue[:len(queue)-maxQueuedTxs])) + txBroadcastAbandonMeter.Mark(int64(len(queue) - maxQueuedTxs)) queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxs:])] } @@ -183,11 +200,13 @@ func (p *Peer) announceTransactions() { gopool.Submit(func() { if p.version >= ETH68 { if err := p.sendPooledTransactionHashes68(pending, pendingTypes, pendingSizes); err != nil { + p.Log().Warn("Announce hashes68 failed", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "lost", len(pending), "hashes", concat(pending), "err", err.Error()) fail <- err return } } else { if err := p.sendPooledTransactionHashes66(pending); err != nil { + p.Log().Warn("Announce hashes66 failed", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "lost", len(pending), "hashes", concat(pending), "err", err.Error()) fail <- err return } @@ -207,6 +226,8 @@ func (p *Peer) announceTransactions() { // New batch of transactions to be broadcast, queue them (with cap) queue = append(queue, hashes...) if len(queue) > maxQueuedTxAnns { + p.Log().Warn("Announce hashes abandon", "peerId", p.ID(), "peerIP", safeGetPeerIP(p), "abandon", len(queue)-maxQueuedTxAnns, "hashes", concat(queue[:len(queue)-maxQueuedTxAnns])) + txAnnounceAbandonMeter.Mark(int64(len(queue) - maxQueuedTxAnns)) // Fancy copy and resize to ensure buffer doesn't grow indefinitely queue = queue[:copy(queue, queue[len(queue)-maxQueuedTxAnns:])] }