Skip to content

Commit

Permalink
fix: use mutex lock for tx removal (#51)
Browse files Browse the repository at this point in the history
Acquire a mutex lock and unlock while removing transactions from the
mempool to avoid concurrent errors on the underlying maps.

Changes made:
1. Acquire the pool mutex lock in `ClearAstriaOrdered`
2. Add a test in `legacypool_test.go` called `TestRemoveTxSanity` which
adds a bunch of pending txs and removes them using `removeTx`. We use
`ValidatePoolInternals` method to ensure that the pool state is not
corrupted. This test is a sanity check for us to validate that
`removeTx` works as intended and does not have adverse side effects.
3. Add a test in for the ExecutionAPI where we add an invalid tx which
will have to be removed. We verify that the mempool is cleared
completely after it

---------

Co-authored-by: Bharath <vedabharath12345@gmail.com>
  • Loading branch information
joroshiba and bharath-123 committed Oct 11, 2024
1 parent 401c3d5 commit c3cfc16
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 0 deletions.
2 changes: 2 additions & 0 deletions core/txpool/legacypool/legacypool.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,8 @@ func (pool *LegacyPool) ClearAstriaOrdered() {
if pool.astria == nil {
return
}
pool.mu.Lock()
defer pool.mu.Unlock()

astriaExcludedFromBlockMeter.Mark(int64(len(pool.astria.excludedFromBlock)))
for _, tx := range pool.astria.excludedFromBlock {
Expand Down
61 changes: 61 additions & 0 deletions core/txpool/legacypool/legacypool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,67 @@ func TestChainFork(t *testing.T) {
}
}

func TestRemoveTxSanity(t *testing.T) {
t.Parallel()

pool, key := setupPool()
defer pool.Close()

addr := crypto.PubkeyToAddress(key.PublicKey)
resetState := func() {
statedb, _ := state.New(types.EmptyRootHash, state.NewDatabase(rawdb.NewMemoryDatabase()), nil)
statedb.AddBalance(addr, uint256.NewInt(100000000000000), tracing.BalanceChangeUnspecified)

pool.chain = newTestBlockChain(pool.chainconfig, 1000000, statedb, new(event.Feed))
<-pool.requestReset(nil, nil)
}
resetState()

tx1 := transaction(0, 100000, key)
tx2 := transaction(1, 100000, key)
tx3 := transaction(2, 100000, key)

if err := pool.addLocal(tx1); err != nil {
t.Error("didn't expect error", err)
}
if err := pool.addLocal(tx2); err != nil {
t.Error("didn't expect error", err)
}
if err := pool.addLocal(tx3); err != nil {
t.Error("didn't expect error", err)
}

pendingTxs := pool.pending[addr]
if pendingTxs.Len() != 3 {
t.Error("expected 3 pending transactions, got", pendingTxs.Len())
}

if err := validatePoolInternals(pool); err != nil {
t.Errorf("pool internals validation failed: %v", err)
}

n := pool.removeTx(tx1.Hash(), false, true)
if n != 3 {
t.Error("expected 3 transactions to be removed, got", n)
}
n = pool.removeTx(tx2.Hash(), false, true)
if n != 0 {
t.Error("expected 0 transactions to be removed, got", n)
}
n = pool.removeTx(tx3.Hash(), false, true)
if n != 0 {
t.Error("expected 0 transactions to be removed, got", n)
}

if len(pool.pending) != 0 {
t.Error("expected 0 pending transactions, got", pendingTxs.Len())
}

if err := validatePoolInternals(pool); err != nil {
t.Errorf("pool internals validation failed: %v", err)
}
}

func TestDoubleNonce(t *testing.T) {
t.Parallel()

Expand Down
137 changes: 137 additions & 0 deletions grpc/execution/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"google.golang.org/protobuf/types/known/timestamppb"
"math/big"
"testing"
"time"
)

func TestExecutionService_GetGenesisInfo(t *testing.T) {
Expand Down Expand Up @@ -475,3 +476,139 @@ func TestExecutionServiceServerV1Alpha2_ExecuteBlockAndUpdateCommitment(t *testi
balanceDiff := new(uint256.Int).Sub(chainDestinationAddressBalanceAfter, chainDestinationAddressBalanceBefore)
require.True(t, balanceDiff.Cmp(uint256.NewInt(1000000000000000000)) == 0, "Chain destination address balance is not correct")
}

// Check that invalid transactions are not added into a block and are removed from the mempool
func TestExecutionServiceServerV1Alpha2_ExecuteBlockAndUpdateCommitmentWithInvalidTransactions(t *testing.T) {
ethservice, serviceV1Alpha1 := setupExecutionService(t, 10)

// call genesis info
genesisInfo, err := serviceV1Alpha1.GetGenesisInfo(context.Background(), &astriaPb.GetGenesisInfoRequest{})
require.Nil(t, err, "GetGenesisInfo failed")
require.NotNil(t, genesisInfo, "GenesisInfo is nil")

// call get commitment state
commitmentState, err := serviceV1Alpha1.GetCommitmentState(context.Background(), &astriaPb.GetCommitmentStateRequest{})
require.Nil(t, err, "GetCommitmentState failed")
require.NotNil(t, commitmentState, "CommitmentState is nil")

ethservice.BlockChain().SetSafe(ethservice.BlockChain().CurrentBlock())

// get previous block hash
previousBlock := ethservice.BlockChain().CurrentSafeBlock()
require.NotNil(t, previousBlock, "Previous block not found")

gasLimit := ethservice.BlockChain().GasLimit()

stateDb, err := ethservice.BlockChain().StateAt(previousBlock.Root)
require.Nil(t, err, "Failed to get state db")

latestNonce := stateDb.GetNonce(testAddr)

// create 5 txs
txs := []*types.Transaction{}
marshalledTxs := []*sequencerblockv1alpha1.RollupData{}
for i := 0; i < 5; i++ {
unsignedTx := types.NewTransaction(latestNonce+uint64(i), testToAddress, big.NewInt(1), params.TxGas, big.NewInt(params.InitialBaseFee*2), nil)
tx, err := types.SignTx(unsignedTx, types.LatestSigner(ethservice.BlockChain().Config()), testKey)
require.Nil(t, err, "Failed to sign tx")
txs = append(txs, tx)

marshalledTx, err := tx.MarshalBinary()
require.Nil(t, err, "Failed to marshal tx")
marshalledTxs = append(marshalledTxs, &sequencerblockv1alpha1.RollupData{
Value: &sequencerblockv1alpha1.RollupData_SequencedData{SequencedData: marshalledTx},
})
}

// add a tx with lesser gas than the base gas
unsignedTx := types.NewTransaction(latestNonce+uint64(5), testToAddress, big.NewInt(1), gasLimit, big.NewInt(params.InitialBaseFee*2), nil)
tx, err := types.SignTx(unsignedTx, types.LatestSigner(ethservice.BlockChain().Config()), testKey)
require.Nil(t, err, "Failed to sign tx")
txs = append(txs, tx)

marshalledTx, err := tx.MarshalBinary()
require.Nil(t, err, "Failed to marshal tx")
marshalledTxs = append(marshalledTxs, &sequencerblockv1alpha1.RollupData{
Value: &sequencerblockv1alpha1.RollupData_SequencedData{SequencedData: marshalledTx},
})

errors := ethservice.TxPool().Add(txs, true, false)
for _, err := range errors {
require.Nil(t, err, "Failed to add tx to pool")
}

pending, queued := ethservice.TxPool().Stats()
require.Equal(t, 6, pending, "Pending txs should be 6")
require.Equal(t, 0, queued, "Queued txs should be 0")

executeBlockReq := &astriaPb.ExecuteBlockRequest{
PrevBlockHash: previousBlock.Hash().Bytes(),
Timestamp: &timestamppb.Timestamp{
Seconds: int64(previousBlock.Time + 2),
},
Transactions: marshalledTxs,
}

executeBlockRes, err := serviceV1Alpha1.ExecuteBlock(context.Background(), executeBlockReq)
require.Nil(t, err, "ExecuteBlock failed")

require.NotNil(t, executeBlockRes, "ExecuteBlock response is nil")

// check if astria ordered txs are cleared
astriaOrdered := ethservice.TxPool().AstriaOrdered()
require.Equal(t, 0, astriaOrdered.Len(), "AstriaOrdered should be empty")

// call update commitment state to set the block we executed as soft and firm
updateCommitmentStateReq := &astriaPb.UpdateCommitmentStateRequest{
CommitmentState: &astriaPb.CommitmentState{
Soft: &astriaPb.Block{
Hash: executeBlockRes.Hash,
ParentBlockHash: executeBlockRes.ParentBlockHash,
Number: executeBlockRes.Number,
Timestamp: executeBlockRes.Timestamp,
},
Firm: &astriaPb.Block{
Hash: executeBlockRes.Hash,
ParentBlockHash: executeBlockRes.ParentBlockHash,
Number: executeBlockRes.Number,
Timestamp: executeBlockRes.Timestamp,
},
BaseCelestiaHeight: commitmentState.BaseCelestiaHeight + 1,
},
}

updateCommitmentStateRes, err := serviceV1Alpha1.UpdateCommitmentState(context.Background(), updateCommitmentStateReq)
require.Nil(t, err, "UpdateCommitmentState failed")
require.NotNil(t, updateCommitmentStateRes, "UpdateCommitmentState response should not be nil")
require.Equal(t, updateCommitmentStateRes, updateCommitmentStateReq.CommitmentState, "CommitmentState response should match request")

// get the soft and firm block
softBlock := ethservice.BlockChain().CurrentSafeBlock()
require.NotNil(t, softBlock, "SoftBlock is nil")
firmBlock := ethservice.BlockChain().CurrentFinalBlock()
require.NotNil(t, firmBlock, "FirmBlock is nil")

block := ethservice.BlockChain().GetBlockByNumber(softBlock.Number.Uint64())
require.NotNil(t, block, "Soft Block not found")
require.Equal(t, block.Transactions().Len(), 5, "Soft Block should have 5 txs")

// give the tx loop time to run
time.Sleep(1 * time.Millisecond)

// after the tx loop is run, all pending txs should be removed
pending, queued = ethservice.TxPool().Stats()
require.Equal(t, 0, pending, "Pending txs should be 0")
require.Equal(t, 0, queued, "Queued txs should be 0")

// check if the soft and firm block are set correctly
require.True(t, bytes.Equal(softBlock.Hash().Bytes(), updateCommitmentStateRes.Soft.Hash), "Soft Block Hashes do not match")
require.True(t, bytes.Equal(softBlock.ParentHash.Bytes(), updateCommitmentStateRes.Soft.ParentBlockHash), "Soft Block Parent Hash do not match")
require.Equal(t, softBlock.Number.Uint64(), uint64(updateCommitmentStateRes.Soft.Number), "Soft Block Number do not match")

require.True(t, bytes.Equal(firmBlock.Hash().Bytes(), updateCommitmentStateRes.Firm.Hash), "Firm Block Hashes do not match")
require.True(t, bytes.Equal(firmBlock.ParentHash.Bytes(), updateCommitmentStateRes.Firm.ParentBlockHash), "Firm Block Parent Hash do not match")
require.Equal(t, firmBlock.Number.Uint64(), uint64(updateCommitmentStateRes.Firm.Number), "Firm Block Number do not match")

celestiaBaseHeight := ethservice.BlockChain().CurrentBaseCelestiaHeight()
require.Equal(t, celestiaBaseHeight, updateCommitmentStateRes.BaseCelestiaHeight, "BaseCelestiaHeight should be updated in db")
}

0 comments on commit c3cfc16

Please sign in to comment.