Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pevm: fallback to sequencial processor when the TxDAG is too deep #251

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ var (
utils.ParallelTxDAGFlag,
utils.ParallelTxDAGFileFlag,
utils.ParallelTxDAGSenderPrivFlag,
utils.ParallelTxDATMaxDepthRatioFlag,
configFileFlag,
utils.LogDebugFlag,
utils.LogBacktraceAtFlag,
Expand Down
11 changes: 11 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -1136,6 +1136,13 @@ Please note that --` + MetricsHTTPFlag.Name + ` must be set to start the server.
Category: flags.VMCategory,
}

ParallelTxDATMaxDepthRatioFlag = &cli.Float64Flag{
Name: "parallel.txdag-max-depth-ratio",
Usage: "A ratio to decide whether or not to execute transactions in parallel, it will fallback to sequencial processor if the depth is larger than this value (default = 0.9)",
Value: 0.9,
Category: flags.VMCategory,
}

VMOpcodeOptimizeFlag = &cli.BoolFlag{
Name: "vm.opcode.optimize",
Usage: "enable opcode optimization",
Expand Down Expand Up @@ -2057,6 +2064,10 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) {
cfg.ParallelTxDAGFile = ctx.String(ParallelTxDAGFileFlag.Name)
}

if ctx.IsSet(ParallelTxDATMaxDepthRatioFlag.Name) {
cfg.ParallelTxDAGMaxDepthRatio = ctx.Float64(ParallelTxDATMaxDepthRatioFlag.Name)
}

if ctx.IsSet(ParallelTxDAGSenderPrivFlag.Name) {
priHex := ctx.String(ParallelTxDAGSenderPrivFlag.Name)
if cfg.Miner.ParallelTxDAGSenderPriv, err = crypto.HexToECDSA(priHex); err != nil {
Expand Down
27 changes: 22 additions & 5 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1760,6 +1760,19 @@ func (bc *BlockChain) InsertChain(chain types.Blocks) (int, error) {
return bc.insertChain(chain, true)
}

func (bc *BlockChain) useSerialProcessor(block *types.Block) (bool, bool) {
// findout whether or not the dependencies of the block are too deep to be processed
// if the dependencies are too deep, we will fallback to serial processing
txCount := len(block.Transactions())
_, depth := BuildTxLevels(txCount, bc.vmConfig.TxDAG)
tooDeep := float64(depth)/float64(txCount) > bc.vmConfig.TxDAGMaxDepthRatio
isByzantium := bc.chainConfig.IsByzantium(block.Number())

txDAGMissButNecessary := bc.vmConfig.TxDAG == nil && (bc.vmConfig.EnableParallelUnorderedMerge || bc.vmConfig.EnableTxParallelMerge)
useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary || tooDeep || !isByzantium
return useSerialProcessor, tooDeep
}

// insertChain is the internal implementation of InsertChain, which assumes that
// 1) chains are contiguous, and 2) The chain mutex is held.
//
Expand Down Expand Up @@ -1971,6 +1984,10 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
)

blockProcessedInParallel := false
var (
tooDeep, useSerialProcessor bool
depth int
)
// skip block process if we already have the state, receipts and logs from mining work
if !(receiptExist && logExist && stateExist) {
// Retrieve the parent block and it's state to execute on top
Expand All @@ -1982,9 +1999,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
if bc.vmConfig.EnableParallelExec {
bc.parseTxDAG(block)
}
isByzantium := bc.chainConfig.IsByzantium(block.Number())

if bc.vmConfig.EnableParallelExec && bc.vmConfig.TxDAG != nil && bc.vmConfig.EnableTxParallelMerge && isByzantium {
useSerialProcessor, tooDeep = bc.useSerialProcessor(block)
if !useSerialProcessor {
statedb, err = state.NewParallel(parent.Root, bc.stateCache, bc.snaps)
} else {
statedb, err = state.New(parent.Root, bc.stateCache, bc.snaps)
Expand Down Expand Up @@ -2018,8 +2035,6 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)

// Process block using the parent state as reference point
pstart = time.Now()
txDAGMissButNecessary := bc.vmConfig.TxDAG == nil && (bc.vmConfig.EnableParallelUnorderedMerge || bc.vmConfig.EnableTxParallelMerge)
useSerialProcessor := !bc.vmConfig.EnableParallelExec || txDAGMissButNecessary
if useSerialProcessor {
receipts, logs, usedGas, err = bc.serialProcessor.Process(block, statedb, bc.vmConfig)
blockProcessedInParallel = false
Expand Down Expand Up @@ -2143,7 +2158,9 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error)
"accountUpdates", common.PrettyDuration(timers.AccountUpdates),
"storageUpdates", common.PrettyDuration(timers.StorageUpdates),
"accountHashes", common.PrettyDuration(timers.AccountHashes),
"storageHashes", common.PrettyDuration(timers.StorageHashes))
"storageHashes", common.PrettyDuration(timers.StorageHashes),
"tooDeep", tooDeep, "depth", depth,
)

// Write the block to the chain and get the status.
var (
Expand Down
62 changes: 44 additions & 18 deletions core/parallel_state_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,6 @@ func (tl TxLevel) predictTxDAG(dag types.TxDAG) {

func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
var levels TxLevels = make(TxLevels, 0, 8)
var currLevel int = 0

var enlargeLevelsIfNeeded = func(currLevel int, levels *TxLevels) {
if len(*levels) <= currLevel {
for i := len(*levels); i <= currLevel; i++ {
Expand All @@ -367,22 +365,47 @@ func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
return TxLevels{all}
}

marked := make(map[int]int, len(all))
for _, tx := range all {
dep := dag.TxDep(tx.txIndex)
// build the levels from the DAG
marked, _ := BuildTxLevels(len(all), dag)
// put the transactions into the levels
for txIndex, tx := range all {
level := marked[txIndex]
enlargeLevelsIfNeeded(level, &levels)
levels[level] = append(levels[level], tx)
}
return levels
}

func BuildTxLevels(txCount int, dag types.TxDAG) (marked map[int]int, depth int) {
if dag == nil {
return make(map[int]int), 0
}
// marked is used to record which level that each transaction should be put
marked = make(map[int]int, txCount)
var (
// currLevelHasTx marks if the current level has any transaction
currLevelHasTx bool
)

depth, currLevelHasTx = 0, false
for txIndex := 0; txIndex < txCount; txIndex++ {
dep := dag.TxDep(txIndex)
switch true {
case dep != nil && dep.CheckFlag(types.ExcludedTxFlag),
dep != nil && dep.CheckFlag(types.NonDependentRelFlag):
// excluted tx, occupies the whole level
// or dependent-to-all tx, occupies the whole level, too
levels = append(levels, TxLevel{tx})
marked[tx.txIndex], currLevel = len(levels)-1, len(levels)
if currLevelHasTx {
// shift to next level if there are transactions in the current level
depth++
}
marked[txIndex] = depth
// occupy the current level
depth, currLevelHasTx = depth+1, false

case dep == nil || len(dep.TxIndexes) == 0:
// dependent on none
enlargeLevelsIfNeeded(currLevel, &levels)
levels[currLevel] = append(levels[currLevel], tx)
marked[tx.txIndex] = currLevel
// dependent on none, just put it in the current level
marked[txIndex], currLevelHasTx = depth, true

case dep != nil && len(dep.TxIndexes) > 0:
// dependent on others
Expand All @@ -395,19 +418,22 @@ func NewTxLevels(all []*PEVMTxRequest, dag types.TxDAG) TxLevels {
}
if prevLevel < 0 {
// broken DAG, just ignored it
enlargeLevelsIfNeeded(currLevel, &levels)
levels[currLevel] = append(levels[currLevel], tx)
marked[tx.txIndex] = currLevel
marked[txIndex], currLevelHasTx = depth, true
continue
}
enlargeLevelsIfNeeded(prevLevel+1, &levels)
levels[prevLevel+1] = append(levels[prevLevel+1], tx)
// record the level of this tx
marked[tx.txIndex] = prevLevel + 1
marked[txIndex] = prevLevel + 1
if marked[txIndex] > depth {
depth, currLevelHasTx = marked[txIndex], true
}

default:
panic("unexpected case")
}
}
return levels
// check if the last level has any transaction, to avoid the empty level
if !currLevelHasTx {
depth--
}
return marked, depth + 1
}
47 changes: 45 additions & 2 deletions core/parallel_state_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -601,12 +601,44 @@ func TestNewTxLevels(t *testing.T) {
assertEqual(levels([]uint64{1, 2, 3, 4, 5}, [][]int{nil, nil, nil, {-2}, {-2}}), [][]uint64{{1, 2, 3}, {4}, {5}}, t)

// case 9: loop-back txdag
assertEqual(levels([]uint64{1, 2, 3, 4}, [][]int{{1}, nil, {0}, nil}), [][]uint64{{1, 2, 4}, {3}}, t)
assertEqual(levels([]uint64{1, 2, 3, 4}, [][]int{{1}, nil, {0}, nil}), [][]uint64{{1, 2}, {3, 4}}, t)

// case 10: nonedependent txs + execlude txs + nonedependent txs
assertEqual(levels([]uint64{1, 2, 3, 4, 5}, [][]int{nil, nil, {-2}, nil, nil}), [][]uint64{{1, 2}, {3}, {4, 5}}, t)
}

func TestBuildLevels(t *testing.T) {
var (
marks map[int]int
depth int
)
// case 1: 1 excluded tx + n no dependencies txs + n dependencies txs + 1 all-dependencies tx
marks, depth = levelMarks([]uint64{1, 2, 3, 4, 5, 6, 7}, [][]int{{-1}, nil, nil, nil, {0, 1}, {2}, {-2}})
assertEqualMarks(marks, map[int]int{0: 0, 1: 1, 2: 1, 3: 1, 4: 2, 5: 2, 6: 3}, t)
if depth != 4 {
t.Fatalf("expected depth: 4, got depth: %d", depth)
}
// case 2: nonedependent txs + execlude txs + nonedependent txs
marks, depth = levelMarks([]uint64{1, 2, 3, 4, 5}, [][]int{nil, nil, {-2}, nil, nil})
assertEqualMarks(marks, map[int]int{0: 0, 1: 0, 2: 1, 3: 2, 4: 2}, t)
if depth != 3 {
t.Fatalf("expected depth: 3, got depth: %d", depth)
}
// case 3: (broken TxDAG) n dependent txs + 1 execlude tx + none dependent txs
marks, depth = levelMarks([]uint64{1, 2, 3, 4, 5}, [][]int{{1}, {2}, {-1}, nil, nil})
assertEqualMarks(marks, map[int]int{0: 0, 1: 0, 2: 1, 3: 2, 4: 2}, t)
if depth != 3 {
t.Fatalf("expected depth: 3, got depth: %d", depth)
}
}

func TestMultiLevel(t *testing.T) {
// case 7: 1 excluded tx + n no dependencies txs + n dependencies txs + 1 all-dependencies tx
assertEqual(levels([]uint64{1, 2, 3, 4, 5, 6, 7, 8}, [][]int{nil, nil, nil, {0}, nil, {1}, nil, {2}}), [][]uint64{{1, 2, 3, 5, 7}, {4, 6, 8}}, t)
assertEqual(levels([]uint64{1, 2, 3, 4, 5, 6, 7, 8}, [][]int{nil, nil, nil, {0}, nil, {1}, nil, {2}}), [][]uint64{{1, 2, 3}, {4, 5, 6, 7, 8}}, t)
}

func levelMarks(nonces []uint64, txdag [][]int) (map[int]int, int) {
return BuildTxLevels(len(nonces), int2txdag(txdag))
}

func levels(nonces []uint64, txdag [][]int) TxLevels {
Expand Down Expand Up @@ -650,6 +682,17 @@ func int2txdag(txdag [][]int) types.TxDAG {
return &dag
}

func assertEqualMarks(actual map[int]int, expected map[int]int, t *testing.T) {
if len(actual) != len(expected) {
t.Fatalf("expected %d marks, got %d marks", len(expected), len(actual))
}
for i, mark := range actual {
if expected[i] != mark {
t.Fatalf("expected mark[%d]: %d, got mark[%d]: %d", i, expected[i], i, mark)
}
}
}

func assertEqual(actual TxLevels, expected [][]uint64, t *testing.T) {
if len(actual) != len(expected) {
t.Fatalf("expected %d levels, got %d levels", len(expected), len(actual))
Expand Down
1 change: 1 addition & 0 deletions core/vm/interpreter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Config struct {
TxDAG types.TxDAG
EnableParallelUnorderedMerge bool // Whether to enable unordered merge in parallel mode
EnableTxParallelMerge bool // Whether to enable parallel merge in parallel mode
TxDAGMaxDepthRatio float64
}

// ScopeContext contains the things that are per-call, such as stack and memory,
Expand Down
1 change: 1 addition & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
EnableTxParallelMerge: config.ParallelTxParallelMerge,
ParallelTxNum: config.ParallelTxNum,
EnableOpcodeOptimizations: config.EnableOpcodeOptimizing,
TxDAGMaxDepthRatio: config.ParallelTxDAGMaxDepthRatio,
}
cacheConfig = &core.CacheConfig{
TrieCleanLimit: config.TrieCleanCache,
Expand Down
16 changes: 8 additions & 8 deletions eth/ethconfig/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,14 @@ type Config struct {
RollupDisableTxPoolAdmission bool
RollupHaltOnIncompatibleProtocolVersion string

ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode
ParallelTxParallelMerge bool

ParallelTxMode bool // Whether to execute transaction in parallel mode when do full sync
ParallelTxNum int // Number of slot for transaction execution
EnableOpcodeOptimizing bool
EnableParallelTxDAG bool
ParallelTxDAGFile string
ParallelTxUnorderedMerge bool // Whether to enable unordered merge in parallel mode
ParallelTxParallelMerge bool
ParallelTxDAGMaxDepthRatio float64
}

// CreateConsensusEngine creates a consensus engine for the given chain config.
Expand Down