Skip to content

Commit

Permalink
enhancement and add metric
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Apr 22, 2024
1 parent e4f3997 commit e1386e3
Show file tree
Hide file tree
Showing 14 changed files with 221 additions and 63 deletions.
35 changes: 24 additions & 11 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,18 @@ import (
)

type SyncerConfig struct {
BucketName string `json:"bucket_name"` // BucketName is the identifier of bucket on Greenfield that store blob
StartSlot uint64 `json:"start_slot"` // StartSlot is used to init the syncer which slot of beacon chain to synced from, only need to provide once.
CreateBundleSlotInterval uint64 `json:"create_bundle_slot_interval"` // CreateBundleSlotInterval defines the number of slot that syncer would assemble blobs and upload to bundle service
BundleServiceEndpoints []string `json:"bundle_service_endpoints"` // BundleServiceEndpoints is a list of bundle service address
BeaconRPCAddrs []string `json:"beacon_rpc_addrs"` // BeaconRPCAddrs is a list of beacon chain RPC address
ETHRPCAddrs []string `json:"eth_rpc_addrs"`
TempDir string `json:"temp_dir"` // TempDir is used to store blobs and created bundle
PrivateKey string `json:"private_key"` // PrivateKey is the key of bucket owner, request to bundle service will be signed by it as well.
DBConfig DBConfig `json:"db_config"`
MetricsConfig MetricsConfig `json:"metrics_config"`
LogConfig LogConfig `json:"log_config"`
BucketName string `json:"bucket_name"` // BucketName is the identifier of bucket on Greenfield that store blob
StartSlot uint64 `json:"start_slot"` // StartSlot is used to init the syncer which slot of beacon chain to synced from, only need to provide once.
CreateBundleSlotInterval uint64 `json:"create_bundle_slot_interval"` // CreateBundleSlotInterval defines the number of slot that syncer would assemble blobs and upload to bundle service
BundleServiceEndpoints []string `json:"bundle_service_endpoints"` // BundleServiceEndpoints is a list of bundle service address
BeaconRPCAddrs []string `json:"beacon_rpc_addrs"` // BeaconRPCAddrs is a list of beacon chain RPC address
ETHRPCAddrs []string `json:"eth_rpc_addrs"`
TempDir string `json:"temp_dir"` // TempDir is used to store blobs and created bundle
PrivateKey string `json:"private_key"` // PrivateKey is the key of bucket owner, request to bundle service will be signed by it as well.
BundleNotSealedReuploadThreshold int64 `json:"bundle_not_sealed_reupload_threshold"` // BundleNotSealedReuploadThreshold for re-uploading a bundle if it cant be sealed within the time threshold.
DBConfig DBConfig `json:"db_config"`
MetricsConfig MetricsConfig `json:"metrics_config"`
LogConfig LogConfig `json:"log_config"`
}

func (s *SyncerConfig) Validate() {
Expand Down Expand Up @@ -54,6 +55,10 @@ func (s *SyncerConfig) Validate() {
if s.CreateBundleSlotInterval > 30 {
panic("create_bundle_slot_interval is supposed less than 20")
}
if s.BundleNotSealedReuploadThreshold <= 60 {
panic("Bundle_not_sealed_reupload_threshold is supposed larger than 20")
}

s.DBConfig.Validate()
}

Expand All @@ -64,6 +69,13 @@ func (s *SyncerConfig) GetCreateBundleSlotInterval() uint64 {
return s.CreateBundleSlotInterval
}

func (s *SyncerConfig) GetReUploadBundleThresh() int64 {
if s.BundleNotSealedReuploadThreshold == 0 {
return DefaultReUploadBundleThresh
}
return s.BundleNotSealedReuploadThreshold
}

type ServerConfig struct {
BucketName string `json:"bucket_name"`
BundleServiceEndpoints []string `json:"bundle_service_endpoints"` // BundleServiceEndpoints is a list of bundle service address
Expand Down Expand Up @@ -118,6 +130,7 @@ func (cfg *DBConfig) Validate() {
type MetricsConfig struct {
Enable bool `json:"enable"`
HttpAddress string `json:"http_address"`
SPEndpoint string `json:"sp_endpoint"`
}

type LogConfig struct {
Expand Down
3 changes: 3 additions & 0 deletions config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,7 @@ const (
EnvVarPrivateKey = "PRIVATE_KEY"

DefaultCreateBundleSlotInterval = 20

DefaultReUploadBundleThresh = 3600 // in second

)
2 changes: 1 addition & 1 deletion db/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type Block struct {
BlobCount int

BundleName string `gorm:"NOT NULL"`
Status Status
Status Status `gorm:"index:idx_block_status"`
}

func (*Block) TableName() string {
Expand Down
9 changes: 5 additions & 4 deletions db/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ const (
)

type Bundle struct {
Id int64
Name string `gorm:"NOT NULL;uniqueIndex:idx_bundle_name;size:64"`
Status InnerBundleStatus `gorm:"NOT NULL"`
Calibrated bool
Id int64
Name string `gorm:"NOT NULL;uniqueIndex:idx_bundle_name;size:64"`
Status InnerBundleStatus `gorm:"NOT NULL"`
Calibrated bool
CreatedTime int64 `gorm:"NOT NULL"`
}

func (*Bundle) TableName() string {
Expand Down
5 changes: 2 additions & 3 deletions external/beacon_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type BeaconClient struct {
}

// NewBeaconClient returns a new beacon client.
func NewBeaconClient(host string, timeout time.Duration) (*BeaconClient, error) {
func NewBeaconClient(host string) (*BeaconClient, error) {
transport := &http.Transport{
DisableCompression: true,
MaxIdleConnsPerHost: 1000,
Expand All @@ -41,8 +41,7 @@ func NewBeaconClient(host string, timeout time.Duration) (*BeaconClient, error)
Timeout: 10 * time.Minute,
Transport: transport,
}
return &BeaconClient{hc: client,
timeout: timeout, host: host}, nil
return &BeaconClient{hc: client, host: host}, nil
}

func (c *BeaconClient) GetBlob(ctx context.Context, slotNumber uint64) ([]*structs.Sidecar, error) {
Expand Down
2 changes: 1 addition & 1 deletion external/eth_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func NewETHClient(rpcAddrs, beaconRPCAddrs string) *ETHClient {
if err != nil {
panic("new eth client error")
}
beaconClient, err := NewBeaconClient(beaconRPCAddrs, time.Second*3)
beaconClient, err := NewBeaconClient(beaconRPCAddrs)
if err != nil {
panic("new eth client error")
}
Expand Down
64 changes: 64 additions & 0 deletions external/sp_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package external

import (
"context"
"encoding/xml"
"net/http"
"strconv"
"strings"
"time"
)

type SPClient struct {
hc *http.Client
host string
address string

Check failure on line 15 in external/sp_client.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.20.x, ubuntu-latest)

field `address` is unused (unused)
}

func NewSPClient(host string) (*SPClient, error) {
transport := &http.Transport{
DisableCompression: true,
MaxIdleConnsPerHost: 1000,
MaxConnsPerHost: 1000,
IdleConnTimeout: 90 * time.Second,
}
client := &http.Client{
Timeout: 10 * time.Minute,
Transport: transport,
}
return &SPClient{hc: client, host: host}, nil
}

func (c *SPClient) GetBucketReadQuota(ctx context.Context, bucketName string) (QuotaInfo, error) {
year, month, _ := time.Now().Date()
var date string
if int(month) < 10 {
date = strconv.Itoa(year) + "-" + "0" + strconv.Itoa(int(month))
} else {
date = strconv.Itoa(year) + "-" + strconv.Itoa(int(month))
}
var urlStr string
parts := strings.Split(c.host, "//")
urlStr = parts[0] + "//" + bucketName + "." + parts[1] + "/"

req, err := http.NewRequestWithContext(ctx, http.MethodGet, urlStr, nil)
if err != nil {
return QuotaInfo{}, err
}
// set query parameters
q := req.URL.Query()
q.Add("read-quota", "")
q.Add("year-month", date)
req.URL.RawQuery = q.Encode()
resp, err := c.hc.Do(req)
if err != nil {
return QuotaInfo{}, err
}
defer resp.Body.Close()
QuotaResult := QuotaInfo{}
err = xml.NewDecoder(resp.Body).Decode(&QuotaResult)
if err != nil {
return QuotaInfo{}, err
}
return QuotaResult, nil
}
17 changes: 17 additions & 0 deletions external/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package external

import "encoding/xml"

// QuotaInfo indicates the quota info of bucket
type QuotaInfo struct {
XMLName xml.Name `xml:"GetReadQuotaResult"`
Version string `xml:"version,attr"`
BucketName string `xml:"BucketName"`
BucketID string `xml:"BucketID"` // BucketID defines the bucket read quota value on chain
ReadQuotaSize uint64 `xml:"ReadQuotaSize"` // ReadQuotaSize defines the bucket read quota value on chain
SPFreeReadQuotaSize uint64 `xml:"SPFreeReadQuotaSize"` // SPFreeReadQuotaSize defines the free quota of this month
ReadConsumedSize uint64 `xml:"ReadConsumedSize"` // ReadConsumedSize defines the consumed total read quota of this month
FreeConsumedSize uint64 `xml:"FreeConsumedSize"` // FreeConsumedSize defines the consumed free quota
MonthlyFreeQuota uint64 `xml:"MonthlyFreeQuota"` // MonthlyFreeQuota defines the consumed monthly free quota
MonthlyFreeConsumedSize uint64 `xml:"MonthlyQuotaConsumedSize"` // MonthlyFreeConsumedSize defines the consumed monthly free quota
}
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/prysmaticlabs/prysm/v5 v5.0.2
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
github.com/stretchr/testify v1.8.4
golang.org/x/net v0.21.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gorm.io/driver/mysql v1.5.1
Expand All @@ -40,6 +41,7 @@ require (
github.com/consensys/bavard v0.1.13 // indirect
github.com/consensys/gnark-crypto v0.12.1 // indirect
github.com/crate-crypto/go-kzg-4844 v0.7.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.5.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
Expand Down Expand Up @@ -72,6 +74,7 @@ require (
github.com/oklog/ulid v1.3.1 // indirect
github.com/pelletier/go-toml/v2 v2.0.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
Expand Down
3 changes: 1 addition & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bits-and-blooms/bitset v1.11.0 h1:RMyy2mBBShArUAhfVRZJ2xyBO58KCBCtZFShw3umo6k=
github.com/bits-and-blooms/bitset v1.11.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bnb-chain/blob-syncer v0.0.1 h1:GImArcAzx23ntV2l8dOrPJX11YiAoMWXsRh3JLfHQKc=
github.com/bnb-chain/blob-syncer v0.0.1/go.mod h1:7JAEkO3ID/Kj56eyRV4SpiIf2vi1/czOYrlv3Z7XMBQ=
github.com/bnb-chain/greenfield-bundle-sdk v1.1.0 h1:0BWQsV+c32wHxEEpJY9igBSBg5N1Fm3KoSLC+Yef2n0=
github.com/bnb-chain/greenfield-bundle-sdk v1.1.0/go.mod h1:NCjQp0sniAbBR5yR5pYiXpYwYd1okSIBLj+31sTpmXA=
github.com/btcsuite/btcd v0.23.3 h1:4KH/JKy9WiCd+iUS9Mu0Zp7Dnj17TGdKrg9xc/FGj24=
Expand Down Expand Up @@ -415,6 +413,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/subosito/gotenv v1.4.2 h1:X1TuBLAMDFbaTAChgCBLu3DU3UPyELpnF2jjJ2cz/S8=
github.com/subosito/gotenv v1.4.2/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0=
github.com/supranational/blst v0.3.11 h1:LyU6FolezeWAhvQk0k6O/d49jqgO52MSDDfYgbeoEm4=
Expand Down
6 changes: 6 additions & 0 deletions metrics/metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ var (
Help: "Verified slot number, all blobs have been verified against the bundle service.",
})

BucketRemainingQuotaGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "bucket_remaining_quota",
Help: "Remaining read quota of bucket in bytes",
})

MetricsItems = []prometheus.Collector{
SyncedSlotGauge,
VerifiedSlotGauge,
BucketRemainingQuotaGauge,
}
)

Expand Down
28 changes: 28 additions & 0 deletions syncer/monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package syncer

import (
"context"
"time"

"github.com/bnb-chain/blob-hub/logging"
"github.com/bnb-chain/blob-hub/metrics"
)

func (s *BlobSyncer) monitorQuota() {
if s.spClient == nil {
return
}
monitorTicket := time.NewTicker(MonitorQuotaInterval)
for range monitorTicket.C {
ctx, cancel := context.WithTimeout(context.Background(), RPCTimeout)
defer cancel()

Check failure on line 18 in syncer/monitor.go

View workflow job for this annotation

GitHub Actions / golangci-lint (1.20.x, ubuntu-latest)

SA9001: defers in this range loop won't run unless the channel gets closed (staticcheck)
quota, err := s.spClient.GetBucketReadQuota(ctx, s.getBucketName())
if err != nil {
logging.Logger.Errorf("failed to get bucket info from SP, err=%s", err.Error())
continue
}
remaining := quota.ReadQuotaSize + quota.SPFreeReadQuotaSize - quota.ReadConsumedSize - quota.FreeConsumedSize
metrics.BucketRemainingQuotaGauge.Set(float64(remaining))
logging.Logger.Infof("remaining quota in bytes is %d", remaining)
}
}
28 changes: 20 additions & 8 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ import (
const (
BundleStatusFinalized = 1
BundleStatusCreatedOnChain = 2
BundleStatusSealedOnChain = 3 // todo The post verification process should check if a bundle is indeed sealed onchain
BundleStatusSealedOnChain = 3

LoopSleepTime = 10 * time.Millisecond
PauseTime = 90 * time.Second
RPCTimeout = 10 * time.Second
LoopSleepTime = 10 * time.Millisecond
PauseTime = 90 * time.Second
RPCTimeout = 20 * time.Second
MonitorQuotaInterval = 5 * time.Minute
)

type curBundleDetail struct {
Expand All @@ -51,6 +52,7 @@ type BlobSyncer struct {
bundleClient *external.BundleClient
config *config.SyncerConfig
bundleDetail *curBundleDetail
spClient *external.SPClient
}

func NewBlobSyncer(
Expand All @@ -66,12 +68,20 @@ func NewBlobSyncer(
panic(err)
}
clients := external.NewETHClient(config.ETHRPCAddrs[0], config.BeaconRPCAddrs[0])
return &BlobSyncer{
bs := &BlobSyncer{
blobDao: blobDao,
ethClients: clients,
bundleClient: bundleClient,
config: config,
}
if config.MetricsConfig.Enable && len(config.MetricsConfig.SPEndpoint) > 0 {
spClient, err := external.NewSPClient(config.MetricsConfig.SPEndpoint)
if err != nil {
panic(err)
}
bs.spClient = spClient
}
return bs
}

func (s *BlobSyncer) StartLoop() {
Expand Down Expand Up @@ -101,6 +111,7 @@ func (s *BlobSyncer) StartLoop() {
}
}
}()
go s.monitorQuota()
}

func (s *BlobSyncer) sync() error {
Expand Down Expand Up @@ -238,8 +249,9 @@ func (s *BlobSyncer) createLocalBundleDir() error {
}
return s.blobDao.CreateBundle(
&db.Bundle{
Name: s.bundleDetail.name,
Status: db.Finalizing,
Name: s.bundleDetail.name,
Status: db.Finalizing,
CreatedTime: time.Now().Unix(),
})
}
func (s *BlobSyncer) finalizeBundle(bundleName, bundleDir, bundleFilePath string) error {
Expand Down Expand Up @@ -366,7 +378,7 @@ func (s *BlobSyncer) ToBlockAndBlobs(blockResp *structs.GetBlockV2Response, blob
defer cancel()
header, err := s.ethClients.BeaconClient.GetHeader(ctx, slot)
if err != nil {
logging.Logger.Errorf("failed to get header, err=%s", header.Data.Root, err.Error())
logging.Logger.Errorf("failed to get header, slot=%d, err=%s", slot, err.Error())
return nil, nil, err
}
rootBz, err := hexutil.Decode(header.Data.Root)
Expand Down
Loading

0 comments on commit e1386e3

Please sign in to comment.