Skip to content

Commit ae8e89f

Browse files
authored
feat: avoid compressing block data in replicator (#99)
Avoid compressing block data by reusing the downloaded bytes.
1 parent 1be7f22 commit ae8e89f

File tree

6 files changed

+350
-116
lines changed

6 files changed

+350
-116
lines changed

internal/storage/blobstorage/gcs/blob_storage.go

+85-53
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/coinbase/chainstorage/internal/utils/fxparams"
2323
"github.com/coinbase/chainstorage/internal/utils/instrument"
2424
"github.com/coinbase/chainstorage/internal/utils/log"
25+
"github.com/coinbase/chainstorage/protos/coinbase/c3/common"
2526
api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage"
2627
)
2728

@@ -44,6 +45,7 @@ type (
4445
presignedUrlExpiration time.Duration
4546
blobStorageMetrics *blobStorageMetrics
4647
instrumentUpload instrument.InstrumentWithResult[string]
48+
instrumentUploadRaw instrument.InstrumentWithResult[string]
4749
instrumentDownload instrument.InstrumentWithResult[*api.Block]
4850
}
4951

@@ -101,81 +103,111 @@ func New(params BlobStorageParams) (internal.BlobStorage, error) {
101103
presignedUrlExpiration: params.Config.GCP.PresignedUrlExpiration,
102104
blobStorageMetrics: blobStorageMetrics,
103105
instrumentUpload: instrument.NewWithResult[string](metrics, "upload"),
106+
instrumentUploadRaw: instrument.NewWithResult[string](metrics, "upload_raw"),
104107
instrumentDownload: instrument.NewWithResult[*api.Block](metrics, "download"),
105108
}, nil
106109
}
107110

111+
func (s *blobStorageImpl) getObjectKey(blockchain common.Blockchain, sidechain api.SideChain, network common.Network, metadata *api.BlockMetadata, compression api.Compression) (string, error) {
112+
var key string
113+
var err error
114+
blockchainNetwork := fmt.Sprintf("%s/%s", blockchain, network)
115+
tagHeightHash := fmt.Sprintf("%d/%d/%s", metadata.Tag, metadata.Height, metadata.Hash)
116+
if s.config.Chain.Sidechain != api.SideChain_SIDECHAIN_NONE {
117+
key = fmt.Sprintf(
118+
"%s/%s/%s", blockchainNetwork, sidechain, tagHeightHash,
119+
)
120+
} else {
121+
key = fmt.Sprintf(
122+
"%s/%s", blockchainNetwork, tagHeightHash,
123+
)
124+
}
125+
key, err = storage_utils.GetObjectKey(key, compression)
126+
if err != nil {
127+
return "", xerrors.Errorf("failed to get object key: %w", err)
128+
}
129+
return key, nil
130+
}
131+
132+
func (s *blobStorageImpl) uploadRaw(ctx context.Context, rawBlockData *internal.RawBlockData) (string, error) {
133+
key, err := s.getObjectKey(rawBlockData.Blockchain, rawBlockData.SideChain, rawBlockData.Network, rawBlockData.BlockMetadata, rawBlockData.BlockDataCompression)
134+
if err != nil {
135+
return "", err
136+
}
137+
138+
// #nosec G401
139+
h := md5.New()
140+
size, err := h.Write(rawBlockData.BlockData)
141+
if err != nil {
142+
return "", xerrors.Errorf("failed to compute checksum: %w", err)
143+
}
144+
145+
checksum := h.Sum(nil)
146+
147+
object := s.client.Bucket(s.bucket).Object(key)
148+
w := object.NewWriter(ctx)
149+
finalizer := finalizer.WithCloser(w)
150+
defer finalizer.Finalize()
151+
152+
_, err = w.Write(rawBlockData.BlockData)
153+
if err != nil {
154+
return "", xerrors.Errorf("failed to upload block data: %w", err)
155+
}
156+
err = finalizer.Close()
157+
if err != nil {
158+
return "", xerrors.Errorf("failed to upload block data: %w", err)
159+
}
160+
161+
attrs := w.Attrs()
162+
if !bytes.Equal(checksum, attrs.MD5) {
163+
return "", xerrors.Errorf("uploaded block md5 checksum %x is different from expected %x", attrs.MD5, checksum)
164+
}
165+
166+
// a workaround to use timer
167+
s.blobStorageMetrics.blobUploadedSize.Record(time.Duration(size) * time.Millisecond)
168+
169+
return key, nil
170+
}
171+
172+
func (s *blobStorageImpl) UploadRaw(ctx context.Context, rawBlockData *internal.RawBlockData) (string, error) {
173+
return s.instrumentUploadRaw.Instrument(ctx, func(ctx context.Context) (string, error) {
174+
defer s.logDuration("upload", time.Now())
175+
176+
// Skip the upload if the block itself is skipped.
177+
if rawBlockData.BlockMetadata.Skipped {
178+
return "", nil
179+
}
180+
181+
return s.uploadRaw(ctx, rawBlockData)
182+
})
183+
}
184+
108185
func (s *blobStorageImpl) Upload(ctx context.Context, block *api.Block, compression api.Compression) (string, error) {
109186
return s.instrumentUpload.Instrument(ctx, func(ctx context.Context) (string, error) {
110-
var key string
111187
defer s.logDuration("upload", time.Now())
112188

113189
// Skip the upload if the block itself is skipped.
114190
if block.Metadata.Skipped {
115191
return "", nil
116192
}
117-
118193
data, err := proto.Marshal(block)
119194
if err != nil {
120195
return "", xerrors.Errorf("failed to marshal block: %w", err)
121196
}
122197

123-
blockchainNetwork := fmt.Sprintf("%s/%s", block.Blockchain, block.Network)
124-
tagHeightHash := fmt.Sprintf("%d/%d/%s", block.Metadata.Tag, block.Metadata.Height, block.Metadata.Hash)
125-
if s.config.Chain.Sidechain != api.SideChain_SIDECHAIN_NONE {
126-
key = fmt.Sprintf(
127-
"%s/%s/%s", blockchainNetwork, block.SideChain, tagHeightHash,
128-
)
129-
} else {
130-
key = fmt.Sprintf(
131-
"%s/%s", blockchainNetwork, tagHeightHash,
132-
)
133-
}
134-
135198
data, err = storage_utils.Compress(data, compression)
136199
if err != nil {
137200
return "", xerrors.Errorf("failed to compress data with type %v: %w", compression.String(), err)
138201
}
139-
key, err = storage_utils.GetObjectKey(key, compression)
140-
if err != nil {
141-
return "", xerrors.Errorf("failed to get object key: %w", err)
142-
}
143-
144-
// #nosec G401
145-
h := md5.New()
146-
size, err := h.Write(data)
147-
if err != nil {
148-
return "", xerrors.Errorf("failed to compute checksum: %w", err)
149-
}
150-
151-
checksum := h.Sum(nil)
152-
153-
object := s.client.Bucket(s.bucket).Object(key)
154-
w := object.NewWriter(ctx)
155-
finalizer := finalizer.WithCloser(w)
156-
defer finalizer.Finalize()
157-
158-
_, err = w.Write(data)
159-
if err != nil {
160-
return "", xerrors.Errorf("failed to upload block data: %w", err)
161-
}
162-
err = finalizer.Close()
163-
if err != nil {
164-
return "", xerrors.Errorf("failed to upload block data: %w", err)
165-
}
166-
167-
attrs := w.Attrs()
168-
if err != nil {
169-
return "", xerrors.Errorf("failed to load attributes for uploaded block data: %w", err)
170-
}
171-
if !bytes.Equal(checksum, attrs.MD5) {
172-
return "", xerrors.Errorf("uploaded block md5 checksum %x is different from expected %x", attrs.MD5, checksum)
173-
}
174-
175-
// a workaround to use timer
176-
s.blobStorageMetrics.blobUploadedSize.Record(time.Duration(size) * time.Millisecond)
177202

178-
return key, nil
203+
return s.uploadRaw(ctx, &internal.RawBlockData{
204+
Blockchain: block.Blockchain,
205+
SideChain: block.SideChain,
206+
Network: block.Network,
207+
BlockMetadata: block.Metadata,
208+
BlockData: data,
209+
BlockDataCompression: compression,
210+
})
179211
})
180212
}
181213

internal/storage/blobstorage/internal/blobstorage.go

+11
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,23 @@ import (
88

99
"github.com/coinbase/chainstorage/internal/config"
1010
"github.com/coinbase/chainstorage/internal/utils/fxparams"
11+
"github.com/coinbase/chainstorage/protos/coinbase/c3/common"
1112
api "github.com/coinbase/chainstorage/protos/coinbase/chainstorage"
1213
)
1314

1415
type (
16+
RawBlockData struct {
17+
Blockchain common.Blockchain
18+
SideChain api.SideChain
19+
Network common.Network
20+
BlockMetadata *api.BlockMetadata
21+
BlockData []byte
22+
BlockDataCompression api.Compression
23+
}
24+
1525
BlobStorage interface {
1626
Upload(ctx context.Context, block *api.Block, compression api.Compression) (string, error)
27+
UploadRaw(ctx context.Context, rawBlockData *RawBlockData) (string, error)
1728
Download(ctx context.Context, metadata *api.BlockMetadata) (*api.Block, error)
1829
PreSign(ctx context.Context, objectKey string) (string, error)
1930
}

internal/storage/blobstorage/mocks/mocks.go

+16
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

internal/storage/blobstorage/module.go

+1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ type (
1212
BlobStorage = internal.BlobStorage
1313
BlobStorageFactory = internal.BlobStorageFactory
1414
BlobStorageFactoryParams = internal.BlobStorageFactoryParams
15+
RawBlockData = internal.RawBlockData
1516
)
1617

1718
var Module = fx.Options(

0 commit comments

Comments
 (0)