Skip to content

Commit

Permalink
layer: Re-upload all parts after current one
Browse files Browse the repository at this point in the history
It is required, because we have to re-evaluate object payload hash.

Signed-off-by: Evgenii Baidakov <evgenii@nspcc.io>
  • Loading branch information
smallhive committed Mar 19, 2024
1 parent 2548e50 commit 127dd3e
Show file tree
Hide file tree
Showing 4 changed files with 181 additions and 1 deletion.
58 changes: 57 additions & 1 deletion api/layer/multipart_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, er
return "", err
}

if err = n.reUploadFollowingParts(ctx, *p, p.PartNumber, p.Info.Bkt, multipartInfo); err != nil {
return "", fmt.Errorf("reuploading parts: %w", err)
}

Check warning on line 201 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L199-L201

Added lines #L199 - L201 were not covered by tests

return objInfo.HashSum, nil
}

Expand Down Expand Up @@ -385,6 +389,49 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf
return objInfo, nil
}

func (n *layer) reUploadFollowingParts(ctx context.Context, uploadParams UploadPartParams, partID int, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
parts, err := n.treeService.GetPartsAfter(ctx, bktInfo, multipartInfo.ID, partID)
if err != nil {
// nothing to re-upload.
if errors.Is(err, ErrPartListIsEmpty) {
return nil
}

Check warning on line 398 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L392-L398

Added lines #L392 - L398 were not covered by tests

return fmt.Errorf("get parts after: %w", err)

Check warning on line 400 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L400

Added line #L400 was not covered by tests
}

for _, part := range parts {
uploadParams.PartNumber = part.Number

if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil {
return fmt.Errorf("reupload number=%d: %w", part.Number, err)
}

Check warning on line 408 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L403-L408

Added lines #L403 - L408 were not covered by tests
}

return nil

Check warning on line 411 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L411

Added line #L411 was not covered by tests
}

func (n *layer) reUploadPart(ctx context.Context, uploadParams UploadPartParams, id oid.ID, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error {
obj, err := n.objectGet(ctx, bktInfo, id)
if err != nil {
return fmt.Errorf("get id=%s: %w", id.String(), err)
}

Check warning on line 418 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L414-L418

Added lines #L414 - L418 were not covered by tests

uploadParams.Size = int64(obj.PayloadSize())
uploadParams.Reader = bytes.NewReader(obj.Payload())

if _, err = n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil {
return fmt.Errorf("upload id=%s: %w", id.String(), err)
}

Check warning on line 425 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L420-L425

Added lines #L420 - L425 were not covered by tests

// remove old object, we just re-uploaded a new one.
if err = n.objectDelete(ctx, bktInfo, id); err != nil {
return fmt.Errorf("delete old id=%s: %w", id.String(), err)
}

Check warning on line 430 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L428-L430

Added lines #L428 - L430 were not covered by tests

return nil

Check warning on line 432 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L432

Added line #L432 was not covered by tests
}

func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) {
multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID)
if err != nil {
Expand Down Expand Up @@ -427,7 +474,16 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.
Reader: pr,
}

return n.uploadPart(ctx, multipartInfo, params)
objInfo, err := n.uploadPart(ctx, multipartInfo, params)

Check warning on line 477 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L477

Added line #L477 was not covered by tests
if err != nil {
return nil, fmt.Errorf("upload part: %w", err)

Check warning on line 479 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L479

Added line #L479 was not covered by tests
}

if err = n.reUploadFollowingParts(ctx, *params, p.PartNumber, p.Info.Bkt, multipartInfo); err != nil {
return nil, fmt.Errorf("reuploading parts: %w", err)
}

Check warning on line 484 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L482-L484

Added lines #L482 - L484 were not covered by tests

return objInfo, nil

Check warning on line 486 in api/layer/multipart_upload.go

View check run for this annotation

Codecov / codecov/patch

api/layer/multipart_upload.go#L486

Added line #L486 was not covered by tests
}

func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) {
Expand Down
54 changes: 54 additions & 0 deletions api/layer/tree_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,60 @@ func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketI
return parts[len(parts)-1], nil

Check warning on line 393 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L393

Added line #L393 was not covered by tests
}

func (t *TreeServiceMock) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) {
parts, err := t.GetParts(ctx, bktInfo, multipartNodeID)
if err != nil {
return nil, err
}

Check warning on line 400 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L396-L400

Added lines #L396 - L400 were not covered by tests

mp := make(map[int]*data.PartInfo)
for _, partInfo := range parts {
if partInfo.Number <= partID {
continue

Check warning on line 405 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L402-L405

Added lines #L402 - L405 were not covered by tests
}

mapped, ok := mp[partInfo.Number]
if !ok {
mp[partInfo.Number] = partInfo
continue

Check warning on line 411 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L408-L411

Added lines #L408 - L411 were not covered by tests
}

if mapped.ServerCreated.After(partInfo.ServerCreated) {
continue

Check warning on line 415 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L414-L415

Added lines #L414 - L415 were not covered by tests
}

mp[partInfo.Number] = partInfo

Check warning on line 418 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L418

Added line #L418 was not covered by tests
}

if len(mp) == 0 {
return nil, ErrPartListIsEmpty
}

Check warning on line 423 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L421-L423

Added lines #L421 - L423 were not covered by tests

result := make([]*data.PartInfo, 0, len(mp))
for _, p := range mp {
result = append(result, p)
}

Check warning on line 428 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L425-L428

Added lines #L425 - L428 were not covered by tests

// Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number.
slices.SortFunc(result, func(a, b *data.PartInfo) int {
if a.Number < b.Number {
return -1
}

Check warning on line 434 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L431-L434

Added lines #L431 - L434 were not covered by tests

if a.ServerCreated.Before(b.ServerCreated) {
return -1
}

Check warning on line 438 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L436-L438

Added lines #L436 - L438 were not covered by tests

if a.ServerCreated.Equal(b.ServerCreated) {
return 0
}

Check warning on line 442 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L440-L442

Added lines #L440 - L442 were not covered by tests

return 1

Check warning on line 444 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L444

Added line #L444 was not covered by tests
})

return result, nil

Check warning on line 447 in api/layer/tree_mock.go

View check run for this annotation

Codecov / codecov/patch

api/layer/tree_mock.go#L447

Added line #L447 was not covered by tests
}

func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error {
cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()]

Expand Down
1 change: 1 addition & 0 deletions api/layer/tree_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type TreeService interface {
// Return errors:
// - [ErrPartListIsEmpty] if there is no parts in the upload id.
GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error)
GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error)

// Compound methods for optimizations

Expand Down
69 changes: 69 additions & 0 deletions internal/neofs/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -1025,6 +1025,75 @@ func (c *TreeClient) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo,
return parts[len(parts)-1], nil

Check warning on line 1025 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1025

Added line #L1025 was not covered by tests
}

// GetPartsAfter returns parts uploaded after partID. These parts are sorted and filtered by creation time.
// It means, if any upload had a re-uploaded data (few part versions), the list contains only the latest version of the upload.
func (c *TreeClient) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) {
parts, err := c.getSubTree(ctx, bktInfo, systemTree, multipartNodeID, 2)
if err != nil {
return nil, err
}

Check warning on line 1034 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1030-L1034

Added lines #L1030 - L1034 were not covered by tests

if len(parts) == 0 {
return nil, layer.ErrPartListIsEmpty
}

Check warning on line 1038 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1036-L1038

Added lines #L1036 - L1038 were not covered by tests

mp := make(map[int]*data.PartInfo)
for _, part := range parts {
if part.GetNodeId() == multipartNodeID {
continue

Check warning on line 1043 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1040-L1043

Added lines #L1040 - L1043 were not covered by tests
}

partInfo, err := newPartInfo(part)
if err != nil {
continue

Check warning on line 1048 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1046-L1048

Added lines #L1046 - L1048 were not covered by tests
}

if partInfo.Number <= partID {
continue

Check warning on line 1052 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1051-L1052

Added lines #L1051 - L1052 were not covered by tests
}

mapped, ok := mp[partInfo.Number]
if !ok {
mp[partInfo.Number] = partInfo
continue

Check warning on line 1058 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1055-L1058

Added lines #L1055 - L1058 were not covered by tests
}

if mapped.ServerCreated.After(partInfo.ServerCreated) {
continue

Check warning on line 1062 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1061-L1062

Added lines #L1061 - L1062 were not covered by tests
}

mp[partInfo.Number] = partInfo

Check warning on line 1065 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1065

Added line #L1065 was not covered by tests
}

if len(mp) == 0 {
return nil, layer.ErrPartListIsEmpty
}

Check warning on line 1070 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1068-L1070

Added lines #L1068 - L1070 were not covered by tests

result := make([]*data.PartInfo, 0, len(mp))
for _, p := range mp {
result = append(result, p)
}

Check warning on line 1075 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1072-L1075

Added lines #L1072 - L1075 were not covered by tests

// Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number.
slices.SortFunc(result, func(a, b *data.PartInfo) int {
if a.Number < b.Number {
return -1
}

Check warning on line 1081 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1078-L1081

Added lines #L1078 - L1081 were not covered by tests

if a.ServerCreated.Before(b.ServerCreated) {
return -1
}

Check warning on line 1085 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1083-L1085

Added lines #L1083 - L1085 were not covered by tests

if a.ServerCreated.Equal(b.ServerCreated) {
return 0
}

Check warning on line 1089 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1087-L1089

Added lines #L1087 - L1089 were not covered by tests

return 1

Check warning on line 1091 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1091

Added line #L1091 was not covered by tests
})

return result, nil

Check warning on line 1094 in internal/neofs/tree.go

View check run for this annotation

Codecov / codecov/patch

internal/neofs/tree.go#L1094

Added line #L1094 was not covered by tests
}

func (c *TreeClient) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error {
return c.removeNode(ctx, bktInfo, systemTree, multipartNodeID)
}
Expand Down

0 comments on commit 127dd3e

Please sign in to comment.