diff --git a/.golangci.yml b/.golangci.yml index 2cd83c9c7..acb9a2ab5 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -15,9 +15,9 @@ linters: - gosimple - govet - ineffassign - - misspell + # - misspell - nakedret - - nolintlint + # - nolintlint - prealloc - staticcheck # - structcheck // to be fixed by golangci-lint diff --git a/mempool/clist_sidecar.go b/mempool/clist_sidecar.go index d92a779d0..23a37a348 100644 --- a/mempool/clist_sidecar.go +++ b/mempool/clist_sidecar.go @@ -20,7 +20,6 @@ import ( // be efficiently accessed by multiple concurrent readers. type CListPriorityTxSidecar struct { // Atomic integers - height int64 // the last block Update()'d to heightForFiringAuction int64 // the height of the block to fire the auction for txsBytes int64 // total size of sidecar, in bytes lastBundleHeight int64 // height of last accepted bundle tx, for status rpc purposes @@ -40,7 +39,7 @@ type CListPriorityTxSidecar struct { // // sync.Map bundleOrder -> *SidecarTx // } bundles sync.Map - maxBundleID int64 + maxBundleID map[int64]int64 // map of height -> max bundle ID updateMtx tmsync.RWMutex @@ -71,10 +70,10 @@ func NewCListSidecar( ) *CListPriorityTxSidecar { sidecar := &CListPriorityTxSidecar{ txs: clist.New(), - height: height, heightForFiringAuction: height + 1, logger: memLogger, metrics: mevMetrics, + maxBundleID: make(map[int64]int64), } sidecar.cache = NewLRUTxCache(10000) return sidecar @@ -82,7 +81,7 @@ func NewCListSidecar( func (sc *CListPriorityTxSidecar) PrettyPrintBundles() { fmt.Println("-------------") - for bundleIDIter := 0; bundleIDIter <= int(sc.maxBundleID); bundleIDIter++ { + for bundleIDIter := 0; bundleIDIter <= int(sc.maxBundleID[sc.heightForFiringAuction]); bundleIDIter++ { bundleIDIter := int64(bundleIDIter) if bundle, ok := sc.bundles.Load(Key{sc.heightForFiringAuction, bundleIDIter}); ok { bundle := bundle.(*Bundle) @@ -134,7 +133,6 @@ func (sc *CListPriorityTxSidecar) notifyTxsAvailable() { func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error { sc.updateMtx.RLock() - // use defer to unlock mutex because application (*local client*) might panic defer sc.updateMtx.RUnlock() // don't add any txs already in cache @@ -148,9 +146,7 @@ func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error { ) // Record a new sender for a tx we've already seen. // Note it's possible a tx is still in the cache but no longer in the mempool - // (eg. after committing a block, txs are removed from mempool but not cache), // so we only record the sender for txs still in the mempool. - // Record a new sender for a tx we've already seen. if e, ok := sc.txsMap.Load(tx.Key()); ok { scTx := e.(*clist.CElement).Value.(*SidecarTx) @@ -218,8 +214,6 @@ func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error { BundleID: txInfo.BundleID, CurrSize: int64(0), EnforcedSize: txInfo.BundleSize, - // TODO: add from gossip info? - GasWanted: int64(0), OrderedTxsMap: &sync.Map{}, }) bundle = existingBundle.(*Bundle) @@ -230,7 +224,7 @@ func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error { if txInfo.BundleSize != bundle.EnforcedSize { sc.logger.Info( "failed adding sidecarTx", - "reason", "trying to insert a tx for bundle at an order greater than the size of the bundle...", + "reason", "tx's bundle size doesn't match bundle's expected size...", "bundle id", txInfo.BundleID, "bundle size", txInfo.BundleSize, "gasWanted", txInfo.GasWanted, @@ -251,7 +245,7 @@ func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error { defer sc.bundleSizeMtx.Unlock() // Can't add transactions if the bundle is already full // check if the current size of this bundle is greater than the expected size for the bundle, if so skip - if bundle.CurrSize >= bundle.EnforcedSize { + if bundle.CurrSize == bundle.EnforcedSize { sc.logger.Info( "failed adding sidecarTx", "reason", "bundle already full for this BundleID...", @@ -296,13 +290,11 @@ func (sc *CListPriorityTxSidecar) AddTx(tx types.Tx, txInfo TxInfo) error { // -------- UPDATE MAX BUNDLE --------- - func() { - sc.maxBundleIDMtx.Lock() - defer sc.maxBundleIDMtx.Unlock() - if txInfo.BundleID >= sc.maxBundleID { - sc.maxBundleID = txInfo.BundleID - } - }() + sc.maxBundleIDMtx.Lock() + if txInfo.BundleID > sc.maxBundleID[txInfo.DesiredHeight] { + sc.maxBundleID[txInfo.DesiredHeight] = txInfo.BundleID + } + sc.maxBundleIDMtx.Unlock() // -------- TX INSERTION INTO MAIN TXS LIST --------- // -------- TODO: In the future probably want to refactor to not have txs clist --------- @@ -368,9 +360,6 @@ func (sc *CListPriorityTxSidecar) Update( txs types.Txs, deliverTxResponses []*abci.ResponseDeliverTx, ) error { - - // Set height for block last updated to (i.e. block last committed) - sc.height = height sc.notifiedTxsAvailable = false sc.heightForFiringAuction = height + 1 @@ -396,7 +385,7 @@ func (sc *CListPriorityTxSidecar) Update( } sc.cache.Reset() - sc.maxBundleID = 0 + delete(sc.maxBundleID, height) // remove from txs list and txmap for e := sc.txs.Front(); e != nil; e = e.Next() { @@ -444,7 +433,7 @@ func (sc *CListPriorityTxSidecar) Flush() { sc.cache.Reset() sc.notifiedTxsAvailable = false - sc.maxBundleID = 0 + delete(sc.maxBundleID, sc.heightForFiringAuction) _ = atomic.SwapInt64(&sc.txsBytes, 0) @@ -481,7 +470,7 @@ func (sc *CListPriorityTxSidecar) NumBundles() int { // Safe for concurrent use by multiple goroutines. func (sc *CListPriorityTxSidecar) MaxBundleID() int64 { - return sc.maxBundleID + return sc.maxBundleID[sc.heightForFiringAuction] } func (sc *CListPriorityTxSidecar) HeightForFiringAuction() int64 { @@ -557,7 +546,7 @@ func (sc *CListPriorityTxSidecar) ReapMaxTxs() types.ReapedTxs { // iterate over all BundleIDs up to the max we've seen // CONTRACT: this assumes that bundles don't care about previous bundles, // so still want to execute if any missing between - for bundleIDIter := 0; bundleIDIter <= int(sc.maxBundleID); bundleIDIter++ { + for bundleIDIter := 0; bundleIDIter <= int(sc.maxBundleID[sc.heightForFiringAuction]); bundleIDIter++ { bundleIDIter := int64(bundleIDIter) if bundle, ok := sc.bundles.Load(Key{sc.heightForFiringAuction, bundleIDIter}); ok { @@ -593,6 +582,7 @@ func (sc *CListPriorityTxSidecar) ReapMaxTxs() types.ReapedTxs { "bundleOrder", bundleOrderIter, "height", sc.heightForFiringAuction, ) + break } } diff --git a/mempool/mempool.go b/mempool/mempool.go index e9d9e1033..44406db81 100644 --- a/mempool/mempool.go +++ b/mempool/mempool.go @@ -302,6 +302,5 @@ type Bundle struct { CurrSize int64 // total size of bundle EnforcedSize int64 // total size of bundle - GasWanted int64 // amount of gas this tx states it will require OrderedTxsMap *sync.Map // map from bundleOrder to *mempoolTx } diff --git a/mempool/v0/reactor.go b/mempool/v0/reactor.go index 6753f9fc3..d38cc6064 100644 --- a/mempool/v0/reactor.go +++ b/mempool/v0/reactor.go @@ -383,7 +383,7 @@ func (memR *Reactor) broadcastSidecarTxRoutine(peer p2p.Peer) { GasWanted: scTx.GasWanted, } - success := p2p.SendEnvelopeShim(peer, p2p.Envelope{ + success := p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: mempool.SidecarLegacyChannel, Message: msg, }, memR.Logger) @@ -392,14 +392,6 @@ func (memR *Reactor) broadcastSidecarTxRoutine(peer p2p.Peer) { continue } } - } else { - memR.Logger.Info( - "broadcasting sidecarTx to peer failed", - "peer", peerID, - "was considered sidecarPeer", isSidecarPeer, - "was converted to sidecarTx", okConv, - "tx", scTx.Tx.Hash(), - ) } } @@ -454,7 +446,7 @@ func (memR *Reactor) broadcastMempoolTxRoutine(peer p2p.Peer) { // Allow for a lag of 1 block. memTx := next.Value.(*mempoolTx) - if peerState.GetHeight() < memTx.height-1 { + if peerState.GetHeight() < memTx.Height()-1 { time.Sleep(mempool.PeerCatchupSleepIntervalMS * time.Millisecond) continue } diff --git a/mempool/v1/reactor.go b/mempool/v1/reactor.go index eec8190dc..022a8e5b3 100644 --- a/mempool/v1/reactor.go +++ b/mempool/v1/reactor.go @@ -386,7 +386,7 @@ func (memR *Reactor) broadcastSidecarTxRoutine(peer p2p.Peer) { GasWanted: scTx.GasWanted, } - success := p2p.SendEnvelopeShim(peer, p2p.Envelope{ + success := p2p.SendEnvelopeShim(peer, p2p.Envelope{ //nolint: staticcheck ChannelID: mempool.SidecarLegacyChannel, Message: msg, }, memR.Logger) @@ -395,14 +395,6 @@ func (memR *Reactor) broadcastSidecarTxRoutine(peer p2p.Peer) { continue } } - } else { - memR.Logger.Info( - "broadcasting sidecarTx to peer failed", - "peer", peerID, - "was considered sidecarPeer", isSidecarPeer, - "was converted to sidecarTx", okConv, - "tx", scTx.Tx.Hash(), - ) } } diff --git a/p2p/switch.go b/p2p/switch.go index b4b1d9bda..47cd1ff22 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -94,7 +94,7 @@ type Switch struct { rng *rand.Rand // seed for randomizing dial times and orders metrics *Metrics - mlc *metricsLabelCache + mlc *metricsLabelCache mevMetrics *mev.Metrics sidecarPeers sidecarPeers SentinelPeerString string @@ -463,10 +463,6 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { // https://github.com/tendermint/tendermint/issues/3338 if sw.peers.Remove(peer) { sw.metrics.Peers.Add(float64(-1)) - } else { - // Removal of the peer has failed. The function above sets a flag within the peer to mark this. - // We keep this message here as information to the developer. - sw.Logger.Debug("error on peer removal", ",", "peer", peer.ID()) // check if we removed sentinel, if so, alert metrics splitStr := strings.Split(sw.SentinelPeerString, "@") @@ -482,7 +478,10 @@ func (sw *Switch) stopAndRemovePeer(peer Peer, reason interface{}) { } else { sw.Logger.Error("Error splitting sentinel ID", "is it correctly configured?", sw.SentinelPeerString) } - + } else { + // Removal of the peer has failed. The function above sets a flag within the peer to mark this. + // We keep this message here as information to the developer. + sw.Logger.Debug("error on peer removal", ",", "peer", peer.ID()) } } diff --git a/state/execution.go b/state/execution.go index d65601fd1..0292a6800 100644 --- a/state/execution.go +++ b/state/execution.go @@ -224,7 +224,9 @@ func (blockExec *BlockExecutor) Commit( deliverTxResponses []*abci.ResponseDeliverTx, ) ([]byte, int64, error) { blockExec.mempool.Lock() + blockExec.sidecar.Lock() defer blockExec.mempool.Unlock() + defer blockExec.sidecar.Unlock() // while mempool is Locked, flush to ensure all async requests have completed // in the ABCI app before Commit. @@ -258,7 +260,6 @@ func (blockExec *BlockExecutor) Commit( ) if err != nil { blockExec.logger.Error("error while updating sidecar", "err", err) - return nil, 0, err } } @@ -270,10 +271,6 @@ func (blockExec *BlockExecutor) Commit( TxPreCheck(state), TxPostCheck(state), ) - if err != nil { - blockExec.logger.Error("error while updating mempool", "err", err) - return nil, 0, err - } return res.Data, res.RetainHeight, err } diff --git a/test/e2e/runner/setup.go b/test/e2e/runner/setup.go index b5e7ed289..250cd6370 100644 --- a/test/e2e/runner/setup.go +++ b/test/e2e/runner/setup.go @@ -1,4 +1,4 @@ -//nolint: gosec +// nolint: gosec package main import ( diff --git a/test/fuzz/p2p/pex/init-corpus/main.go b/test/fuzz/p2p/pex/init-corpus/main.go index 797da0f20..89eb61c02 100644 --- a/test/fuzz/p2p/pex/init-corpus/main.go +++ b/test/fuzz/p2p/pex/init-corpus/main.go @@ -1,4 +1,4 @@ -//nolint: gosec +// nolint: gosec package main import (