diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 6fda1a50..8cea1e32 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -28,6 +28,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/nspcc-dev/tzhash/tz" "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) const ( @@ -285,6 +286,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf if err != nil { return nil, fmt.Errorf("getLastPart: %w", err) } + reqInfo := api.GetReqInfo(ctx) // The previous part is not uploaded yet. if lastPart == nil { @@ -304,6 +306,12 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf return nil, err } + n.log.Debug("upload part as slot", + zap.String("reqId", reqInfo.RequestID), + zap.String("bucket", bktInfo.Name), zap.Stringer("cid", bktInfo.CID), + zap.String("multipart upload", p.Info.UploadID), + zap.Int("part number", p.PartNumber), zap.String("object", p.Info.Key), zap.Stringer("oid", objInfo.ID), zap.String("ETag", objInfo.HashSum), zap.Int64("decSize", decSize)) + return objInfo, nil } @@ -372,13 +380,6 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf n.buffers.Put(chunk) } - reqInfo := api.GetReqInfo(ctx) - n.log.Debug("upload part", - zap.String("reqId", reqInfo.RequestID), - zap.String("bucket", bktInfo.Name), zap.Stringer("cid", bktInfo.CID), - zap.String("multipart upload", p.Info.UploadID), - zap.Int("part number", p.PartNumber), zap.String("object", p.Info.Key), zap.Stringer("oid", id)) - partInfo := &data.PartInfo{ Key: p.Info.Key, UploadID: p.Info.UploadID, @@ -390,6 +391,12 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf Elements: elements, } + n.log.Debug("upload part", + zap.String("reqId", reqInfo.RequestID), + zap.String("bucket", bktInfo.Name), zap.Stringer("cid", bktInfo.CID), + zap.String("multipart upload", p.Info.UploadID), + zap.Int("part number", p.PartNumber), zap.String("object", p.Info.Key), zap.Stringer("oid", id), zap.String("ETag", partInfo.ETag), zap.Int64("decSize", decSize)) + // encoding hash.Hash state to save it in tree service. // the required interface is guaranteed according to the docs, so just cast without checks. binaryMarshaler := multipartHash.(encoding.BinaryMarshaler) @@ -589,9 +596,96 @@ func (n *layer) reUploadFollowingParts(ctx context.Context, uploadParams UploadP for _, part := range parts { uploadParams.PartNumber = part.Number - if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil { - return fmt.Errorf("reupload number=%d: %w", part.Number, err) + if len(part.Elements) > 0 { + if err = n.reUploadSegmentedPart(ctx, uploadParams, part, bktInfo, multipartInfo); err != nil { + return fmt.Errorf("reupload number=%d: %w", part.Number, err) + } + } else { + if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil { + return fmt.Errorf("reupload number=%d: %w", part.Number, err) + } + } + } + + return nil +} + +func (n *layer) reUploadSegmentedPart(ctx context.Context, uploadParams UploadPartParams, part *data.PartInfo, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error { + var ( + eg errgroup.Group + pipeReader, pipeWriter = io.Pipe() + ) + + eg.Go(func() error { + var ( + err error + elementObj *object.Object + ) + + for _, element := range part.Elements { + elementObj, err = n.objectGet(ctx, bktInfo, element.OID) + if err != nil { + err = fmt.Errorf("get part oid=%s, element oid=%s: %w", part.OID.String(), element.OID.String(), err) + break + } + + if _, err = pipeWriter.Write(elementObj.Payload()); err != nil { + err = fmt.Errorf("write part oid=%s, element oid=%s: %w", part.OID.String(), element.OID.String(), err) + break + } + + // The part contains all elements for Split chain and contains itself as well. + // We mustn't remove it here, it will be removed on MultipartComplete. + if part.OID == element.OID { + continue + } + + if deleteErr := n.objectDelete(ctx, bktInfo, element.OID); deleteErr != nil { + n.log.Error( + "couldn't delete object", + zap.Error(deleteErr), + zap.String("cnrID", bktInfo.CID.EncodeToString()), + zap.String("uploadID", multipartInfo.UploadID), + zap.Int("partNumber", part.Number), + zap.String("part.OID", part.OID.String()), + zap.String("part element OID", element.OID.String()), + ) + // no return intentionally. + } + } + + pipeCloseErr := pipeWriter.Close() + + if err != nil { + return fmt.Errorf("pipe: %w", err) + } + + if pipeCloseErr != nil { + return fmt.Errorf("close writer part oid=%s: %w", part.OID.String(), err) } + + return nil + }) + + eg.Go(func() error { + uploadParams.Size = part.Size + uploadParams.Reader = pipeReader + + n.log.Debug("reUploadPart", zap.String("oid", part.OID.String()), zap.Int64("payload size", uploadParams.Size)) + if _, err := n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil { + return fmt.Errorf("upload id=%s: %w", part.OID.String(), err) + } + + return nil + }) + + if err := eg.Wait(); err != nil { + return fmt.Errorf("upload part oid=%s: %w", part.OID.String(), err) + } + + // remove old object, we just re-uploaded a new one. + if err := n.objectDelete(ctx, bktInfo, part.OID); err != nil { + return fmt.Errorf("delete old id=%s: %w", part.OID.String(), err) } return nil @@ -606,6 +700,7 @@ func (n *layer) reUploadPart(ctx context.Context, uploadParams UploadPartParams, uploadParams.Size = int64(obj.PayloadSize()) uploadParams.Reader = bytes.NewReader(obj.Payload()) + n.log.Debug("reUploadPart", zap.String("oid", id.String()), zap.Uint64("payload size", obj.PayloadSize())) if _, err = n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil { return fmt.Errorf("upload id=%s: %w", id.String(), err) } @@ -692,6 +787,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar // There are no parts which were uploaded in arbitrary order. if partNumber == 0 { + n.log.Debug("no arbitrary order parts", zap.String("uploadID", p.Info.UploadID)) + // In case of all parts were uploaded subsequently, but some of them were re-uploaded. partNumber, err = n.getMinDuplicatedPartNumber(ctx, p.Info, multipartInfo) if err != nil { @@ -701,6 +798,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar // We need to fix Split. if partNumber > 0 { + n.log.Debug("split fix required", zap.String("uploadID", p.Info.UploadID)) + var uploadPartParams = UploadPartParams{Info: p.Info} // We should take the part which broke the multipart upload sequence and re-upload all parts including this one. @@ -718,6 +817,11 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar encInfo := FormEncryptionInfo(multipartInfo.Meta) if len(partsInfo) < len(p.Parts) { + n.log.Debug( + "parts amount mismatch", + zap.Int("partsInfo", len(partsInfo)), + zap.Int("p.Parts", len(p.Parts)), + ) return nil, nil, s3errors.GetAPIError(s3errors.ErrInvalidPart) } @@ -1238,27 +1342,12 @@ func (n *layer) manualSlice(ctx context.Context, bktInfo *data.BucketInfo, prm P // uploadPartAsSlot uploads multipart part, but without correct link to previous part because we don't have it. // It uses zero part as pivot. Actual link will be set on CompleteMultipart. func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotParams) (*data.ObjectInfo, error) { - zeroPart, err := n.treeService.GetPartByNumber(ctx, params.bktInfo, params.multipartInfo.ID, 0) - if err != nil { - return nil, fmt.Errorf("get part by number: %w", err) - } - var ( - id oid.ID - chunk *[]byte - elements []data.LinkObjectPayload - isReturnToPool bool - splitFirstID = zeroPart.OID - splitPreviousID = zeroPart.OID - multipartHash = sha256.New() - currentPartHash = sha256.New() + id oid.ID + elements []data.LinkObjectPayload + multipartHash = sha256.New() ) - objHashes := []hash.Hash{multipartHash, currentPartHash} - if params.tzHash != nil { - objHashes = append(objHashes, params.tzHash) - } - params.attributes = append(params.attributes, [2]string{headerS3MultipartUpload, params.multipartInfo.UploadID}, [2]string{headerS3MultipartNumber, strconv.FormatInt(int64(params.uploadPartParams.PartNumber), 10)}, @@ -1271,26 +1360,13 @@ func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotPar Attributes: params.attributes, CreationTime: params.creationTime, CopiesNumber: params.multipartInfo.CopiesNumber, - Multipart: &Multipart{ - MultipartHashes: objHashes, - }, - } - - if params.uploadPartParams.Size > n.neoFS.MaxObjectSize()/2 { - chunk = n.buffers.Get().(*[]byte) - isReturnToPool = true - } else { - smallChunk := make([]byte, params.uploadPartParams.Size) - chunk = &smallChunk - } - - id, elements, err = n.manualSlice(ctx, params.bktInfo, prm, splitFirstID, splitPreviousID, *chunk, params.payloadReader) - if isReturnToPool { - n.buffers.Put(chunk) + Payload: params.payloadReader, + PayloadSize: uint64(params.decSize), } + id, objHashBts, err := n.objectPutAndHash(ctx, prm, params.bktInfo) if err != nil { - return nil, fmt.Errorf("manual slice: %w", err) + return nil, fmt.Errorf("object put and hash: %w", err) } partInfo := &data.PartInfo{ @@ -1299,7 +1375,7 @@ func (n *layer) uploadPartAsSlot(ctx context.Context, params uploadPartAsSlotPar Number: params.uploadPartParams.PartNumber, OID: id, Size: params.decSize, - ETag: hex.EncodeToString(currentPartHash.Sum(nil)), + ETag: hex.EncodeToString(objHashBts), Created: prm.CreationTime, Elements: elements, } diff --git a/go.mod b/go.mod index fbf71238..ac991b9e 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/urfave/cli/v2 v2.27.4 go.uber.org/zap v1.27.0 golang.org/x/crypto v0.26.0 + golang.org/x/sync v0.8.0 google.golang.org/grpc v1.66.0 google.golang.org/protobuf v1.34.2 ) @@ -47,7 +48,6 @@ require ( github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect go.etcd.io/bbolt v1.3.11 // indirect golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect - golang.org/x/sync v0.8.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240827150818-7e3bb234dfed // indirect ) diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 5671de59..f8aafa9c 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -284,15 +284,17 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) { case homoHashKV: partInfo.HomoHash = []byte(value) case elementsKV: - elements := strings.Split(value, ",") - partInfo.Elements = make([]data.LinkObjectPayload, len(elements)) - for i, e := range elements { - var element data.LinkObjectPayload - if err = element.Unmarshal(e); err != nil { - return nil, fmt.Errorf("invalid element: %w", err) + if value != "" { + elements := strings.Split(value, ",") + partInfo.Elements = make([]data.LinkObjectPayload, len(elements)) + for i, e := range elements { + var element data.LinkObjectPayload + if err = element.Unmarshal(e); err != nil { + return nil, fmt.Errorf("invalid element: %w", err) + } + + partInfo.Elements[i] = element } - - partInfo.Elements[i] = element } } }