Skip to content

Commit

Permalink
add metric for count of validator who signed block
Browse files Browse the repository at this point in the history
  • Loading branch information
Solovyov1796 committed Dec 25, 2024
1 parent 78a74bf commit 1cd85c6
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 15 deletions.
25 changes: 25 additions & 0 deletions blockchain/cometbft.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,28 @@ func rpcGetUncommitTxCnt(url string) (int, error) {

return int(cnt), nil
}

func rpcGetBlockValidatorCnt(url string, height uint64) (int, error) {
client := resty.New()
var result map[string]interface{}
resp, err := client.R().SetResult(&result).Get(fmt.Sprintf("%s/validators?height=%d", url, height))
if err != nil {
logrus.WithError(err).WithField("url", url).WithField("height", height).Error("failed to get validator list")
return -1, err
}
if resp.StatusCode() != 200 {
logrus.WithError(err).WithField("url", url).WithField("height", height).WithField("status_code", resp.StatusCode()).Error("failed to get validator list")
return -1, fmt.Errorf("failed to get validator list, status code: %d", resp.StatusCode())
}

if logrus.IsLevelEnabled(logrus.DebugLevel) {
jsonStr, _ := json.Marshal(result)
logrus.WithFields(logrus.Fields{
"response": fmt.Sprintf("%+v", string(jsonStr)),
}).Debug("response of cometbft rpc: validators?height=")
}

validators := result["result"].(map[string]interface{})["validators"].([]interface{})

return len(validators), nil
}
64 changes: 56 additions & 8 deletions blockchain/mempool.go → blockchain/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,25 @@ import (
"github.com/sirupsen/logrus"
)

type Mempool struct {
type Consensus struct {
url string

cometbftRpcHealth health.TimedCounter
cometbftRpcError string // last rpc error message
}

func MustNewMempool(urlstr string) *Mempool {
func MustNewConsensus(urlstr string) *Consensus {
url, _ := url.Parse(urlstr)

metrics.GetOrRegisterHistogram(mempoolUncommitTxCntPattern).Update(0)
metrics.GetOrRegisterGauge(mempoolHighLoadPattern).Update(0)

return &Mempool{
return &Consensus{
url: url.String(),
}
}

func (m *Mempool) UpdateUncommitTxCnt(config health.TimedCounterConfig) int {
func (m *Consensus) UpdateUncommitTxCnt(config health.TimedCounterConfig) int {
var unconfirmedTxCnt int
executeRequest(
func() error {
Expand All @@ -45,7 +45,7 @@ func (m *Mempool) UpdateUncommitTxCnt(config health.TimedCounterConfig) int {
// report unhealthy
if unhealthy {
logrus.WithFields(logrus.Fields{
"node": "mempool",
"node": "consensus",
"elapsed": utils.PrettyElapsed(elapsed),
"error": err,
}).Error("Node cometbft RPC became unhealthy")
Expand All @@ -54,7 +54,7 @@ func (m *Mempool) UpdateUncommitTxCnt(config health.TimedCounterConfig) int {
// remind unhealthy
if unrecovered {
logrus.WithFields(logrus.Fields{
"node": "mempool",
"node": "consensus",
"elapsed": utils.PrettyElapsed(elapsed),
}).Error("Node cometbft RPC not recovered yet")
}
Expand All @@ -63,15 +63,63 @@ func (m *Mempool) UpdateUncommitTxCnt(config health.TimedCounterConfig) int {
m.cometbftRpcError = ""
if recovered {
logrus.WithFields(logrus.Fields{
"node": "mempool",
"node": "consensus",
"elapsed": utils.PrettyElapsed(elapsed),
}).Warn("Node cometbft RPC is healthy now")
}
},
nodeCometbftRpcLatencyPattern, nodeCometbftRpcUnhealthPattern, "mempool",
nodeCometbftRpcLatencyPattern, nodeCometbftRpcUnhealthPattern, "consensus",
&m.cometbftRpcHealth,
config,
)

return unconfirmedTxCnt
}

func (m *Consensus) GetBlockValidatorCnt(config health.TimedCounterConfig, height uint64) int {
var validatorCnt int
executeRequest(
func() error {
var err error
validatorCnt, err = rpcGetBlockValidatorCnt(m.url, height)
if err != nil {
return err
} else {
return nil
}
},
func(err error, unhealthy, unrecovered bool, elapsed time.Duration) {
m.cometbftRpcError = err.Error()
// report unhealthy
if unhealthy {
logrus.WithFields(logrus.Fields{
"node": "consensus",
"elapsed": utils.PrettyElapsed(elapsed),
"error": err,
}).Error("Node cometbft RPC became unhealthy")
}

// remind unhealthy
if unrecovered {
logrus.WithFields(logrus.Fields{
"node": "consensus",
"elapsed": utils.PrettyElapsed(elapsed),
}).Error("Node cometbft RPC not recovered yet")
}
},
func(recovered bool, elapsed time.Duration) {
m.cometbftRpcError = ""
if recovered {
logrus.WithFields(logrus.Fields{
"node": "consensus",
"elapsed": utils.PrettyElapsed(elapsed),
}).Warn("Node cometbft RPC is healthy now")
}
},
nodeCometbftRpcLatencyPattern, nodeCometbftRpcUnhealthPattern, "consensus",
&m.cometbftRpcHealth,
config,
)

return validatorCnt
}
2 changes: 2 additions & 0 deletions blockchain/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ const (
// count of tx in block
blockTxCountPattern = "monitor/blockchain/block/tx/count"

blockValidatorCountPattern = "monitor/blockchain/block/validator/count"

// chain
chainForkPattern = "monitor/blockchain/fork/%v"
// ethermint rpc
Expand Down
22 changes: 15 additions & 7 deletions blockchain/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,20 @@ func Monitor(config Config) {
validators = append(validators, MustNewValidator(url, name, address))
}

mempool := MustNewMempool(config.CometbftRPC)
consensus := MustNewConsensus(config.CometbftRPC)

blockTxCntRecord = make(map[uint64]int, config.BlockTxCntLimit)
blockFailedTxCntRecord = make(map[uint64]int, config.BlockTxCntLimit)

// Monitor once immediately
monitorOnce(&config, nodes, validators, mempool)
monitorOnce(&config, nodes, validators, consensus)

// Monitor node status periodically
ticker := time.NewTicker(config.Interval)
defer ticker.Stop()

for range ticker.C {
monitorOnce(&config, nodes, validators, mempool)
monitorOnce(&config, nodes, validators, consensus)
}
}

Expand All @@ -74,7 +74,7 @@ func createMetricsForChain() {
metrics.GetOrRegisterHistogram(blockTxCountPattern).Update(0)
}

func monitorOnce(config *Config, nodes []*Node, validators []*Validator, mempool *Mempool) {
func monitorOnce(config *Config, nodes []*Node, validators []*Validator, consensus *Consensus) {
blockSwitched := false
var blockTxInfo *BlockTxInfo
for _, v := range nodes {
Expand Down Expand Up @@ -114,12 +114,14 @@ func monitorOnce(config *Config, nodes []*Node, validators []*Validator, mempool
for _, v := range nodes {
v.CheckFork(recordor)
}

monitorBlockValidator(config, consensus, blockTxInfo.Height)
}

// update validator status
monitorValidator(config, validators)

monitorMempool(config, mempool)
monitorMempool(config, consensus)
}

func countFailedTx(statusMap map[string]bool) int {
Expand Down Expand Up @@ -208,8 +210,8 @@ func monitorValidator(config *Config, validators []*Validator) {
logrus.WithField("active", activeValidatorCount).WithField("jailed", jailedCnt).Debug("Validators status report")
}

func monitorMempool(config *Config, mempool *Mempool) {
unconfirmedTxCnt := mempool.UpdateUncommitTxCnt(config.MempoolReport.TimedCounterConfig)
func monitorMempool(config *Config, consensus *Consensus) {
unconfirmedTxCnt := consensus.UpdateUncommitTxCnt(config.MempoolReport.TimedCounterConfig)

metrics.GetOrRegisterHistogram(mempoolUncommitTxCntPattern).Update(int64(unconfirmedTxCnt))
percentage := float64(unconfirmedTxCnt*100) / float64(config.MempoolReport.PoolSize)
Expand All @@ -221,6 +223,12 @@ func monitorMempool(config *Config, mempool *Mempool) {
}
}

func monitorBlockValidator(config *Config, consensus *Consensus, blockHeight uint64) {
blkValidatorCnt := consensus.GetBlockValidatorCnt(config.MempoolReport.TimedCounterConfig, blockHeight)
logrus.Debug(fmt.Sprintf("count of validator who signed block %d = %d", blockHeight, blkValidatorCnt))
metrics.GetOrRegisterHistogram(blockValidatorCountPattern).Update(int64(blkValidatorCnt))
}

func FindMaxBlockHeight(nodes []*Node) uint64 {
max := uint64(0)

Expand Down

0 comments on commit 1cd85c6

Please sign in to comment.