diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 26d6a494..55cc27d5 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -196,6 +196,10 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, er return "", err } + if err = n.reUploadFollowingParts(ctx, *p, p.PartNumber, p.Info.Bkt, multipartInfo); err != nil { + return "", fmt.Errorf("reuploading parts: %w", err) + } + return objInfo.HashSum, nil } @@ -385,6 +389,49 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf return objInfo, nil } +func (n *layer) reUploadFollowingParts(ctx context.Context, uploadParams UploadPartParams, partID int, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error { + parts, err := n.treeService.GetPartsAfter(ctx, bktInfo, multipartInfo.ID, partID) + if err != nil { + // nothing to re-upload. + if errors.Is(err, ErrPartListIsEmpty) { + return nil + } + + return fmt.Errorf("get parts after: %w", err) + } + + for _, part := range parts { + uploadParams.PartNumber = part.Number + + if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil { + return fmt.Errorf("reupload number=%d: %w", part.Number, err) + } + } + + return nil +} + +func (n *layer) reUploadPart(ctx context.Context, uploadParams UploadPartParams, id oid.ID, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error { + obj, err := n.objectGet(ctx, bktInfo, id) + if err != nil { + return fmt.Errorf("get id=%s: %w", id.String(), err) + } + + uploadParams.Size = int64(obj.PayloadSize()) + uploadParams.Reader = bytes.NewReader(obj.Payload()) + + if _, err = n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil { + return fmt.Errorf("upload id=%s: %w", id.String(), err) + } + + // remove old object, we just re-uploaded a new one. + if err = n.objectDelete(ctx, bktInfo, id); err != nil { + return fmt.Errorf("delete old id=%s: %w", id.String(), err) + } + + return nil +} + func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID) if err != nil { @@ -427,7 +474,13 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. Reader: pr, } - return n.uploadPart(ctx, multipartInfo, params) + objInfo, err := n.uploadPart(ctx, multipartInfo, params) + + if err = n.reUploadFollowingParts(ctx, *params, p.PartNumber, p.Info.Bkt, multipartInfo); err != nil { + return nil, fmt.Errorf("reuploading parts: %w", err) + } + + return objInfo, nil } func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) { diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 3700a39e..59ab5fdd 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -393,6 +393,60 @@ func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketI return parts[len(parts)-1], nil } +func (t *TreeServiceMock) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) { + parts, err := t.GetParts(ctx, bktInfo, multipartNodeID) + if err != nil { + return nil, err + } + + mp := make(map[int]*data.PartInfo) + for _, partInfo := range parts { + if partInfo.Number <= partID { + continue + } + + mapped, ok := mp[partInfo.Number] + if !ok { + mp[partInfo.Number] = partInfo + continue + } + + if mapped.ServerCreated.After(partInfo.ServerCreated) { + continue + } + + mp[partInfo.Number] = partInfo + } + + if len(mp) == 0 { + return nil, ErrPartListIsEmpty + } + + result := make([]*data.PartInfo, 0, len(mp)) + for _, p := range mp { + result = append(result, p) + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(result, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return result, nil +} + func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index a39ac8f3..c3e77294 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -75,6 +75,7 @@ type TreeService interface { AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) + GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) // Compound methods for optimizations diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 14eac6c6..76fa7aba 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -1025,6 +1025,75 @@ func (c *TreeClient) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, return parts[len(parts)-1], nil } +// GetPartsAfter returns parts uploaded after partID. These parts are sorted and filtered by creation time. +// It means, if any upload had a re-uploaded data (few part versions), the list contains only the latest version of the upload. +func (c *TreeClient) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) { + parts, err := c.getSubTree(ctx, bktInfo, systemTree, multipartNodeID, 2) + if err != nil { + return nil, err + } + + if len(parts) == 0 { + return nil, layer.ErrPartListIsEmpty + } + + mp := make(map[int]*data.PartInfo) + for _, part := range parts { + if part.GetNodeId() == multipartNodeID { + continue + } + + partInfo, err := newPartInfo(part) + if err != nil { + continue + } + + if partInfo.Number <= partID { + continue + } + + mapped, ok := mp[partInfo.Number] + if !ok { + mp[partInfo.Number] = partInfo + continue + } + + if mapped.ServerCreated.After(partInfo.ServerCreated) { + continue + } + + mp[partInfo.Number] = partInfo + } + + if len(mp) == 0 { + return nil, layer.ErrPartListIsEmpty + } + + result := make([]*data.PartInfo, 0, len(mp)) + for _, p := range mp { + result = append(result, p) + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(result, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return result, nil +} + func (c *TreeClient) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { return c.removeNode(ctx, bktInfo, systemTree, multipartNodeID) }