Skip to content

Commit

Permalink
enable fast integrity check on bundle
Browse files Browse the repository at this point in the history
  • Loading branch information
alexgao001 committed Jul 3, 2024
1 parent 873152e commit 0b40d8b
Show file tree
Hide file tree
Showing 9 changed files with 441 additions and 31 deletions.
2 changes: 2 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ type SyncerConfig struct {
BundleServiceEndpoints []string `json:"bundle_service_endpoints"` // BundleServiceEndpoints is a list of bundle service address
BeaconRPCAddrs []string `json:"beacon_rpc_addrs"` // BeaconRPCAddrs is a list of beacon chain RPC address
RPCAddrs []string `json:"rpc_addrs"` // RPCAddrs ETH or BSC RPC addr
GnfdRpcAddr string `json:"gnfd_rpc_addr"` // GnfdRpcAddr is the Greenfield RPC address
TempDir string `json:"temp_dir"` // TempDir is used to store blobs and created bundle
PrivateKey string `json:"private_key"` // PrivateKey is the key of bucket owner, request to bundle service will be signed by it as well.
BundleNotSealedReuploadThreshold int64 `json:"bundle_not_sealed_reupload_threshold"` // BundleNotSealedReuploadThreshold for re-uploading a bundle if it cant be sealed within the time threshold.
EnableIndivBlobVerification bool `json:"enable_indiv_blob_verification"` // EnableIndivBlobVerification is used to enable individual blob verification, otherwise only bundle level verification is enabled.
DBConfig DBConfig `json:"db_config"`
MetricsConfig MetricsConfig `json:"metrics_config"`
LogConfig LogConfig `json:"log_config"`
Expand Down
5 changes: 3 additions & 2 deletions external/cmn/bundle_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (c *BundleClient) DeleteBundle(bundleName, bucketName string) error {

func (c *BundleClient) UploadAndFinalizeBundle(bundleName, bucketName, bundleDir, bundlePath string) error {

bundleObject, _, err := bundleDirectory(bundleDir)
bundleObject, _, err := BundleObjectFromDirectory(bundleDir)
if err != nil {
return err
}
Expand Down Expand Up @@ -364,7 +364,7 @@ func (c *BundleClient) signMessage(message []byte) ([]byte, error) {
return signature, err
}

func bundleDirectory(dir string) (io.ReadSeekCloser, int64, error) {
func BundleObjectFromDirectory(dir string) (io.ReadSeekCloser, int64, error) {
b, err := bundlesdk.NewBundle()
if err != nil {
return nil, 0, err
Expand All @@ -376,6 +376,7 @@ func bundleDirectory(dir string) (io.ReadSeekCloser, int64, error) {
}
return b.FinalizeBundle()
}

func visit(root string, b *bundlesdk.Bundle) filepath.WalkFunc {
return func(path string, f os.FileInfo, err error) error {
if err != nil {
Expand Down
84 changes: 84 additions & 0 deletions external/cmn/chain_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package cmn

import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)

const getObjectPath = "/greenfield/storage/head_object/%s/%s" // bucketName, objectName

type ChainClient struct {
hc *http.Client
host string
}

func NewChainClient(host string) (*ChainClient, error) {
transport := &http.Transport{
DisableCompression: true,
MaxIdleConnsPerHost: 1000,
MaxConnsPerHost: 1000,
IdleConnTimeout: 90 * time.Second,
}
client := &http.Client{
Timeout: 10 * time.Minute,
Transport: transport,
}
return &ChainClient{hc: client, host: host}, nil
}

func (c *ChainClient) GetObjectMeta(ctx context.Context, bucketName, objectName string) (*ObjectInfo, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.host+fmt.Sprintf(getObjectPath, bucketName, objectName), nil)
if err != nil {
return nil, err
}
resp, err := c.hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("received non-OK response status: %s", resp.Status)
}

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

getObjectResp := GetObjectInfoResponse{}
err = json.Unmarshal(body, &getObjectResp)
if err != nil {
return nil, err
}
return &getObjectResp.ObjectInfo, nil
}

func (c *ChainClient) GetParams(ctx context.Context) (*VersionedParams, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.host+"/greenfield/storage/params", nil)
if err != nil {
return nil, err
}
resp, err := c.hc.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

getParamsResp := GetParamsResponse{}
err = json.Unmarshal(body, &getParamsResp)
if err != nil {
return nil, err
}

return &getParamsResp.Params.VersionedParams, nil
}
22 changes: 22 additions & 0 deletions external/cmn/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,25 @@ type QuotaInfo struct {
MonthlyFreeQuota uint64 `xml:"MonthlyFreeQuota"` // MonthlyFreeQuota defines the consumed monthly free quota
MonthlyFreeConsumedSize uint64 `xml:"MonthlyQuotaConsumedSize"` // MonthlyFreeConsumedSize defines the consumed monthly free quota
}

type ObjectInfo struct {
Checksums []string `json:"checksums"`
}

type GetObjectInfoResponse struct {
ObjectInfo ObjectInfo `json:"object_info"`
}

type VersionedParams struct {
MaxSegmentSize string `json:"max_segment_size"` // MaxSegmentSize defines the max segment size of the Object
RedundantDataChunkNum int `json:"redundant_data_chunk_num"` // RedundantDataChunkNum defines the redundant data chunk num of the Object
RedundantParityChunkNum int `json:"redundant_parity_chunk_num"` // RedundantParityChunkNum defines the redundant parity chunk num of the Object
}

type Params struct {
VersionedParams VersionedParams `json:"versioned_params"`
}

type GetParamsResponse struct {
Params Params `json:"params"`
}
9 changes: 5 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d
github.com/jessevdk/go-flags v1.5.0
github.com/klauspost/reedsolomon v1.11.8
github.com/node-real/greenfield-bundle-service v0.0.1-beta
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7
github.com/prometheus/client_golang v1.17.0
github.com/prysmaticlabs/prysm/v5 v5.0.2
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.15.0
golang.org/x/net v0.21.0
google.golang.org/genproto/googleapis/api v0.0.0-20231012201019-e917dd12ba7a
google.golang.org/grpc v1.59.0
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.31.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gorm.io/driver/mysql v1.5.1
Expand Down Expand Up @@ -104,8 +105,8 @@ require (
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.16.0 // indirect
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
22 changes: 12 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,8 @@ github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69
github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg=
github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo=
github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ=
github.com/golang/glog v1.1.0 h1:/d3pCKDPWNnvIWe0vVUpNP32qc8U3PDVxySP/y360qE=
github.com/golang/glog v1.1.0/go.mod h1:pfYeQZ3JWZoXTV5sFc986z3HTpwQs9At6P4ImfuP3NQ=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down Expand Up @@ -329,6 +329,8 @@ github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
github.com/klauspost/cpuid/v2 v2.2.5/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws=
github.com/klauspost/reedsolomon v1.11.8 h1:s8RpUW5TK4hjr+djiOpbZJB4ksx+TdYbRH7vHQpwPOY=
github.com/klauspost/reedsolomon v1.11.8/go.mod h1:4bXRN+cVzMdml6ti7qLouuYi32KHJ5MGv0Qd8a47h6A=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
Expand Down Expand Up @@ -799,12 +801,12 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b h1:+YaDE2r2OG8t/z5qmsh7Y+XXwCbvadxxZ0YY6mTdrVA=
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:CgAqfJo+Xmu0GwA0411Ht3OU3OntXwsGmrmjI8ioGXI=
google.golang.org/genproto/googleapis/api v0.0.0-20231012201019-e917dd12ba7a h1:myvhA4is3vrit1a6NZCWBIwN0kNEnX21DJOJX/NvIfI=
google.golang.org/genproto/googleapis/api v0.0.0-20231012201019-e917dd12ba7a/go.mod h1:SUBoKXbI1Efip18FClrQVGjWcyd0QZd8KkvdP34t7ww=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo=
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc=
google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98 h1:Z0hjGZePRE0ZBWotvtrwxFNrNE9CUAGtplaDK5NNI/g=
google.golang.org/genproto v0.0.0-20230711160842-782d3b101e98/go.mod h1:S7mY02OqCJTD0E1OiQy1F72PWFB4bZJ87cAtLPYgDR0=
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 h1:FmF5cCW94Ij59cfpoLiwTgodWmm60eEV0CjlsVg2fuw=
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98/go.mod h1:rsr7RhLuwsDKL7RmgDDCUc6yaGr1iqceVb5Wv6f6YvQ=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 h1:bVf09lpb+OJbByTj913DRJioFFAjf/ZGxEz7MajTp2U=
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98/go.mod h1:TUfxEVdsvPg18p6AslUXFoLdpED4oBnGwyqk3dV1XzM=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
Expand All @@ -822,8 +824,8 @@ google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTp
google.golang.org/grpc v1.33.2/go.mod h1:JMHMWHQWaTccqQQlmk3MJZS+GWXOdAesneDmEnv2fbc=
google.golang.org/grpc v1.34.0/go.mod h1:WotjhfgOW/POjDeRt8vscBtXq+2VjORFy659qA51WJ8=
google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.59.0 h1:Z5Iec2pjwb+LEOqzpB2MR12/eKFhDPhuqW91O+4bwUk=
google.golang.org/grpc v1.59.0/go.mod h1:aUPDwccQo6OTjy7Hct4AfBPD1GptF4fyUjIkQ9YtF98=
google.golang.org/grpc v1.58.3 h1:BjnpXut1btbtgN/6sp+brB2Kbm2LjNXnidYujAVbSoQ=
google.golang.org/grpc v1.58.3/go.mod h1:tgX3ZQDlNJGU96V6yHh1T/JeoBQ2TXdr43YbYSsCJk0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
Expand Down
23 changes: 22 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@ type BlobSyncer struct {
blobDao db.BlobDao
client external.IClient
bundleClient *cmn.BundleClient
chainClient *cmn.ChainClient
config *config.SyncerConfig
bundleDetail *curBundleDetail
spClient *cmn.SPClient
params *cmn.VersionedParams
}

func NewBlobSyncer(
Expand All @@ -71,9 +73,15 @@ func NewBlobSyncer(
if err != nil {
panic(err)
}
chainClient, err := cmn.NewChainClient(cfg.GnfdRpcAddr)
if err != nil {
panic(err)
}

bs := &BlobSyncer{
blobDao: blobDao,
bundleClient: bundleClient,
chainClient: chainClient,
config: cfg,
}
bs.client = external.NewClient(cfg)
Expand Down Expand Up @@ -468,7 +476,7 @@ func (s *BlobSyncer) toBlockAndBlobs(blockResp *structs.GetBlockV2Response, side

rootBz, err := hexutil.Decode(header.Data.Root)
if err != nil {
logging.Logger.Errorf("failed to decode header.Data.Root=%s, err=%s", header.Data.Root, err.Error())
logging.Logger.Errorf("failed to decode header.Data.GetObjectInfoResponse=%s, err=%s", header.Data.Root, err.Error())
return nil, nil, err
}
sigBz, err := hexutil.Decode(header.Data.Header.Signature)
Expand Down Expand Up @@ -525,3 +533,16 @@ func (s *BlobSyncer) BSCChain() bool {
func (s *BlobSyncer) ETHChain() bool {
return s.config.Chain == config.ETH
}

func (s *BlobSyncer) GetParams() (*cmn.VersionedParams, error) {
if s.params == nil {
params, err := s.chainClient.GetParams(context.Background())
if err != nil {
logging.Logger.Errorf("failed to get params, err=%s", err.Error())
return nil, err
}
s.params = params
}
return s.params, nil

}
Loading

0 comments on commit 0b40d8b

Please sign in to comment.