From e7d03c32cca5af32d3a618f2e64e62f6550ede91 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Tue, 20 Feb 2024 14:32:20 +0400 Subject: [PATCH 1/7] layer: Remove stderrors package alias Signed-off-by: Evgenii Baidakov --- api/handler/acl.go | 3 +-- api/layer/multipart_upload.go | 12 ++++++------ 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/api/handler/acl.go b/api/handler/acl.go index bc1cd39e..0c39e7d0 100644 --- a/api/handler/acl.go +++ b/api/handler/acl.go @@ -8,7 +8,6 @@ import ( "encoding/json" "encoding/xml" "errors" - stderrors "errors" "fmt" "net/http" "sort" @@ -1464,7 +1463,7 @@ func bucketACLToTable(acp *AccessControlPolicy) (*eacl.Table, error) { for _, grant := range acp.AccessControlList { if !isValidGrant(grant) { - return nil, stderrors.New("unsupported grantee") + return nil, errors.New("unsupported grantee") } if grant.Grantee.ID == acp.Owner.ID { found = true diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 7a51aa92..46d51071 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -3,7 +3,7 @@ package layer import ( "context" "encoding/hex" - stderrors "errors" + "errors" "fmt" "io" "sort" @@ -176,7 +176,7 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID) if err != nil { - if stderrors.Is(err, ErrNodeNotFound) { + if errors.Is(err, ErrNodeNotFound) { return "", s3errors.GetAPIError(s3errors.ErrNoSuchUpload) } return "", err @@ -248,7 +248,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf } oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) - oldPartIDNotFound := stderrors.Is(err, ErrNoNodeToRemove) + oldPartIDNotFound := errors.Is(err, ErrNoNodeToRemove) if err != nil && !oldPartIDNotFound { return nil, err } @@ -278,7 +278,7 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID) if err != nil { - if stderrors.Is(err, ErrNodeNotFound) { + if errors.Is(err, ErrNodeNotFound) { return nil, s3errors.GetAPIError(s3errors.ErrNoSuchUpload) } return nil, err @@ -336,7 +336,7 @@ type multiObjectReader struct { func (x *multiObjectReader) Read(p []byte) (n int, err error) { if x.curReader != nil { n, err = x.curReader.Read(p) - if !stderrors.Is(err, io.EOF) { + if !errors.Is(err, io.EOF) { return n, err } } @@ -606,7 +606,7 @@ func (n *layer) ListParts(ctx context.Context, p *ListPartsParams) (*ListPartsIn func (n *layer) getUploadParts(ctx context.Context, p *UploadInfoParams) (*data.MultipartInfo, map[int]*data.PartInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Bkt, p.Key, p.UploadID) if err != nil { - if stderrors.Is(err, ErrNodeNotFound) { + if errors.Is(err, ErrNodeNotFound) { return nil, nil, s3errors.GetAPIError(s3errors.ErrNoSuchUpload) } return nil, nil, err From a079f9c42bb01aba3fbcff9e6c67548b2d56ffe1 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Fri, 16 Feb 2024 13:02:42 +0400 Subject: [PATCH 2/7] api: Store SplitID in multipart meta info Signed-off-by: Evgenii Baidakov --- api/data/tree.go | 1 + api/layer/multipart_upload.go | 2 ++ internal/neofs/tree.go | 4 ++++ 3 files changed, 7 insertions(+) diff --git a/api/data/tree.go b/api/data/tree.go index 7809c98e..5c1e8cb0 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -75,6 +75,7 @@ type MultipartInfo struct { Created time.Time Meta map[string]string CopiesNumber uint32 + SplitID string } // PartInfo is upload information about part. diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 46d51071..96e05d93 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -16,6 +16,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/data" "github.com/nspcc-dev/neofs-s3-gw/api/layer/encryption" "github.com/nspcc-dev/neofs-s3-gw/api/s3errors" + "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" "go.uber.org/zap" @@ -148,6 +149,7 @@ func (n *layer) CreateMultipartUpload(ctx context.Context, p *CreateMultipartPar Created: TimeNow(ctx), Meta: make(map[string]string, metaSize), CopiesNumber: p.CopiesNumber, + SplitID: object.NewSplitID().String(), } for key, val := range p.Header { diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 5534daff..80d7ee49 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -55,6 +55,7 @@ const ( isUnversionedKV = "IsUnversioned" isTagKV = "IsTag" uploadIDKV = "UploadId" + splitIDKV = "SplitId" partNumberKV = "Number" sizeKV = "Size" etagKV = "ETag" @@ -221,6 +222,8 @@ func newMultipartInfo(node NodeResponse) (*data.MultipartInfo, error) { } case ownerKV: _ = multipartInfo.Owner.DecodeString(string(kv.GetValue())) + case splitIDKV: + multipartInfo.SplitID = string(kv.GetValue()) default: multipartInfo.Meta[kv.GetKey()] = string(kv.GetValue()) } @@ -1191,6 +1194,7 @@ func metaFromMultipart(info *data.MultipartInfo, fileName string) map[string]str info.Meta[uploadIDKV] = info.UploadID info.Meta[ownerKV] = info.Owner.EncodeToString() info.Meta[createdKV] = strconv.FormatInt(info.Created.UTC().UnixMilli(), 10) + info.Meta[splitIDKV] = info.SplitID return info.Meta } From ef9a5baaa7e6e717f7434f34d79a30ebd4545d3c Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Wed, 28 Feb 2024 11:32:13 +0400 Subject: [PATCH 3/7] go.mod: Update github.com/nspcc-dev/tzhash to the v1.8.0 Signed-off-by: Evgenii Baidakov --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 024391ed..def8e9e8 100644 --- a/go.mod +++ b/go.mod @@ -35,6 +35,7 @@ require ( github.com/klauspost/compress v1.17.0 // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20240112074137-296698a162ae // indirect github.com/nspcc-dev/hrw/v2 v2.0.0-20231115095647-bf62f4ad0a43 // indirect + github.com/nspcc-dev/tzhash v1.8.0 // indirect github.com/pelletier/go-toml/v2 v2.1.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect @@ -65,7 +66,6 @@ require ( github.com/nats-io/nuid v1.0.1 // indirect github.com/nspcc-dev/neofs-crypto v0.4.0 // indirect github.com/nspcc-dev/rfc6979 v0.2.0 // indirect - github.com/nspcc-dev/tzhash v1.7.1 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect diff --git a/go.sum b/go.sum index bd24db96..421f6ad6 100644 --- a/go.sum +++ b/go.sum @@ -248,8 +248,8 @@ github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240130135633-cb11d035a4ed h1: github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240130135633-cb11d035a4ed/go.mod h1:dQzqPhx+7TUeEXCpOThNHxJqNgSoJzCfTcfstTlEQEA= github.com/nspcc-dev/rfc6979 v0.2.0 h1:3e1WNxrN60/6N0DW7+UYisLeZJyfqZTNOjeV/toYvOE= github.com/nspcc-dev/rfc6979 v0.2.0/go.mod h1:exhIh1PdpDC5vQmyEsGvc4YDM/lyQp/452QxGq/UEso= -github.com/nspcc-dev/tzhash v1.7.1 h1:6zmexLqdTF/ssbUAh7XJS7RxgKWaw28kdNpE/4UFdEU= -github.com/nspcc-dev/tzhash v1.7.1/go.mod h1:cIZAGSF8wA9Q8I9Yj9Ytc/IFpsdA54ZAQ5dLXijq178= +github.com/nspcc-dev/tzhash v1.8.0 h1:pJvzME2mZzP/h5rcy/Wb6amT9FJBFeKbJ3HEnWEeUpY= +github.com/nspcc-dev/tzhash v1.8.0/go.mod h1:oHiH0qwmTsZkeVs7pvCS5cVXUaLhXxSFvnmnZ++ijm4= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= From 2732c93b2fc2cfb328a4b7553b54752db2cc6738 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Mon, 19 Feb 2024 11:30:26 +0400 Subject: [PATCH 4/7] *: Remove multipart object re-slicing Closes #843. Before, multipartComplete read all parts of Big object to the memory, combine them and generate final Big object. These step consume time and memory, eventually any system will fail to load all parts in mem or timeout during the process. After, object slicing process works from the first uploaded part. Calculating each part hash and whole object hash during whole process. Storing object hash state to each part metadata in tree service. Signed-off-by: Evgenii Baidakov --- api/data/tree.go | 7 + api/layer/layer.go | 9 + api/layer/multipart_upload.go | 308 ++++++++++++++++++++++++++++------ api/layer/neofs.go | 27 +++ api/layer/neofs_mock.go | 14 ++ api/layer/object.go | 64 +++++++ api/layer/tree_mock.go | 31 ++++ api/layer/tree_service.go | 8 + go.mod | 2 +- internal/neofs/neofs.go | 68 ++++++++ internal/neofs/tree.go | 57 +++++++ 11 files changed, 544 insertions(+), 51 deletions(-) diff --git a/api/data/tree.go b/api/data/tree.go index 5c1e8cb0..7796a69b 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -90,6 +90,13 @@ type PartInfo struct { Created time.Time // Server creation time. ServerCreated time.Time + + // MultipartHash contains internal state of the [hash.Hash] to calculate whole object payload hash. + MultipartHash []byte + // HomoHash contains internal state of the [hash.Hash] to calculate whole object homomorphic payload hash. + HomoHash []byte + // Elements contain [oid.ID] object list for the current part. + Elements []oid.ID } // ToHeaderString form short part representation to use in S3-Completed-Parts header. diff --git a/api/layer/layer.go b/api/layer/layer.go index 16149098..1cae32ea 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/nats-io/nats.go" @@ -50,6 +51,7 @@ type ( ncontroller EventListener cache *Cache treeService TreeService + buffers *sync.Pool } Config struct { @@ -266,6 +268,12 @@ func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error // NewLayer creates an instance of a layer. It checks credentials // and establishes gRPC connection with the node. func NewLayer(log *zap.Logger, neoFS NeoFS, config *Config) Client { + buffers := sync.Pool{} + buffers.New = func() any { + b := make([]byte, neoFS.MaxObjectSize()) + return &b + } + return &layer{ neoFS: neoFS, log: log, @@ -273,6 +281,7 @@ func NewLayer(log *zap.Logger, neoFS NeoFS, config *Config) Client { resolver: config.Resolver, cache: NewCache(config.Caches), treeService: config.TreeService, + buffers: &buffers, } } diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 96e05d93..ca53c13b 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -1,10 +1,14 @@ package layer import ( + "bytes" "context" + "crypto/sha256" + "encoding" "encoding/hex" "errors" "fmt" + "hash" "io" "sort" "strconv" @@ -19,14 +23,13 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/tzhash/tz" "go.uber.org/zap" "golang.org/x/exp/slices" ) const ( - UploadIDAttributeName = "S3-Upload-Id" - UploadPartNumberAttributeName = "S3-Upload-Part-Number" - UploadCompletedParts = "S3-Completed-Parts" + UploadCompletedParts = "S3-Completed-Parts" metaPrefix = "meta-" aclPrefix = "acl-" @@ -203,34 +206,121 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf return nil, s3errors.GetAPIError(s3errors.ErrInvalidEncryptionParameters) } - bktInfo := p.Info.Bkt - prm := PrmObjectCreate{ - Container: bktInfo.CID, - Creator: bktInfo.Owner, - Attributes: make([][2]string, 2), - Payload: p.Reader, - CreationTime: TimeNow(ctx), - CopiesNumber: multipartInfo.CopiesNumber, - } + var ( + bktInfo = p.Info.Bkt + payloadReader = p.Reader + decSize = p.Size + attributes [][2]string + ) - decSize := p.Size if p.Info.Encryption.Enabled() { r, encSize, err := encryptionReader(p.Reader, uint64(p.Size), p.Info.Encryption.Key()) if err != nil { return nil, fmt.Errorf("failed to create ecnrypted reader: %w", err) } - prm.Attributes = append(prm.Attributes, [2]string{AttributeDecryptedSize, strconv.FormatInt(p.Size, 10)}) - prm.Payload = r + attributes = append(attributes, [2]string{AttributeDecryptedSize, strconv.FormatInt(p.Size, 10)}) + payloadReader = r p.Size = int64(encSize) } - prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID - prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber) + var ( + splitPreviousID oid.ID + isSetSplitPreviousID bool + multipartHash = sha256.New() + tzHash hash.Hash + ) - id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo) + if n.neoFS.IsHomomorphicHashingEnabled() { + tzHash = tz.New() + } + + lastPart, err := n.treeService.GetLastPart(ctx, bktInfo, multipartInfo.ID) if err != nil { - return nil, err + // if ErrPartListIsEmpty, there is the first part of multipart. + if !errors.Is(err, ErrPartListIsEmpty) { + return nil, fmt.Errorf("getLastPart: %w", err) + } + } else { + // try to restore hash state from the last part. + // the required interface is guaranteed according to the docs, so just cast without checks. + binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil { + return nil, fmt.Errorf("unmarshal previous part hash: %w", err) + } + + if tzHash != nil { + binaryUnmarshaler = tzHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil { + return nil, fmt.Errorf("unmarshal previous part homo hash: %w", err) + } + } + + isSetSplitPreviousID = true + splitPreviousID = lastPart.OID + } + + var ( + id oid.ID + elements []oid.ID + creationTime = TimeNow(ctx) + // User may upload part large maxObjectSize in NeoFS. From users point of view it is a single object. + // We have to calculate the hash from this object separately. + currentPartHash = sha256.New() + ) + + objHashes := []hash.Hash{multipartHash, currentPartHash} + if tzHash != nil { + objHashes = append(objHashes, tzHash) + } + + prm := PrmObjectCreate{ + Container: bktInfo.CID, + Creator: bktInfo.Owner, + Attributes: attributes, + CreationTime: creationTime, + CopiesNumber: multipartInfo.CopiesNumber, + Multipart: &Multipart{ + SplitID: multipartInfo.SplitID, + MultipartHashes: objHashes, + }, + } + + chunk := n.buffers.Get().(*[]byte) + + // slice part manually. Simultaneously considering the part is a single object for user. + for { + if isSetSplitPreviousID { + prm.Multipart.SplitPreviousID = &splitPreviousID + } + + nBts, readErr := io.ReadAtLeast(payloadReader, *chunk, len(*chunk)) + if nBts > 0 { + prm.Payload = bytes.NewReader((*chunk)[:nBts]) + prm.PayloadSize = uint64(nBts) + + id, _, err = n.objectPutAndHash(ctx, prm, bktInfo) + if err != nil { + return nil, err + } + + isSetSplitPreviousID = true + splitPreviousID = id + elements = append(elements, id) + } + + if readErr == nil { + continue + } + + // If an EOF happens after reading fewer than min bytes, ReadAtLeast returns ErrUnexpectedEOF. + // We have the whole payload. + if !errors.Is(readErr, io.EOF) && !errors.Is(readErr, io.ErrUnexpectedEOF) { + return nil, fmt.Errorf("read payload chunk: %w", err) + } + + break } + n.buffers.Put(chunk) reqInfo := api.GetReqInfo(ctx) n.log.Debug("upload part", @@ -245,8 +335,26 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf Number: p.PartNumber, OID: id, Size: decSize, - ETag: hex.EncodeToString(hash), + ETag: hex.EncodeToString(currentPartHash.Sum(nil)), Created: prm.CreationTime, + Elements: elements, + } + + // encoding hash.Hash state to save it in tree service. + // the required interface is guaranteed according to the docs, so just cast without checks. + binaryMarshaler := multipartHash.(encoding.BinaryMarshaler) + partInfo.MultipartHash, err = binaryMarshaler.MarshalBinary() + if err != nil { + return nil, fmt.Errorf("marshalBinary: %w", err) + } + + if tzHash != nil { + binaryMarshaler = tzHash.(encoding.BinaryMarshaler) + partInfo.HomoHash, err = binaryMarshaler.MarshalBinary() + + if err != nil { + return nil, fmt.Errorf("marshalBinary: %w", err) + } } oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) @@ -380,8 +488,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var multipartObjetSize int64 var encMultipartObjectSize uint64 - parts := make([]*data.PartInfo, 0, len(p.Parts)) - + var lastPartID int + var children []oid.ID var completedPartsHeader strings.Builder for i, part := range p.Parts { partInfo := partsInfo[part.PartNumber] @@ -392,7 +500,6 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar if i != len(p.Parts)-1 && partInfo.Size < uploadMinSize { return nil, nil, s3errors.GetAPIError(s3errors.ErrEntityTooSmall) } - parts = append(parts, partInfo) multipartObjetSize += partInfo.Size // even if encryption is enabled size is actual (decrypted) if encInfo.Enabled { @@ -410,6 +517,44 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil { return nil, nil, err } + + if part.PartNumber > lastPartID { + lastPartID = part.PartNumber + } + + children = append(children, partInfo.Elements...) + } + + multipartHash := sha256.New() + var homoHash hash.Hash + var splitPreviousID oid.ID + + if lastPartID > 0 { + lastPart := partsInfo[lastPartID] + + if lastPart != nil { + if len(lastPart.MultipartHash) > 0 { + splitPreviousID = lastPart.OID + + if len(lastPart.MultipartHash) > 0 { + binaryUnmarshaler := multipartHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil { + return nil, nil, fmt.Errorf("unmarshal last part hash: %w", err) + } + } + } + + if n.neoFS.IsHomomorphicHashingEnabled() && len(lastPart.HomoHash) > 0 { + homoHash = tz.New() + + if len(lastPart.MultipartHash) > 0 { + binaryUnmarshaler := homoHash.(encoding.BinaryUnmarshaler) + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil { + return nil, nil, fmt.Errorf("unmarshal last part homo hash: %w", err) + } + } + } + } } initMetadata := make(map[string]string, len(multipartInfo.Meta)+1) @@ -437,45 +582,108 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar multipartObjetSize = int64(encMultipartObjectSize) } - r := &multiObjectReader{ - ctx: ctx, - layer: n, - parts: parts, - } - - r.prm.bktInfo = p.Info.Bkt - - extObjInfo, err := n.PutObject(ctx, &PutObjectParams{ + // This is our "big object". It doesn't have any payload. + prmHeaderObject := &PutObjectParams{ BktInfo: p.Info.Bkt, Object: p.Info.Key, - Reader: r, + Reader: bytes.NewBuffer(nil), Header: initMetadata, Size: multipartObjetSize, Encryption: p.Info.Encryption, CopiesNumber: multipartInfo.CopiesNumber, - }) + } + + header, err := n.prepareMultipartHeadObject(ctx, prmHeaderObject, multipartHash, homoHash, uint64(multipartObjetSize)) if err != nil { - n.log.Error("could not put a completed object (multipart upload)", - zap.String("uploadID", p.Info.UploadID), - zap.String("uploadKey", p.Info.Key), - zap.Error(err)) + return nil, nil, err + } - return nil, nil, s3errors.GetAPIError(s3errors.ErrInternalError) + // last part + prm := PrmObjectCreate{ + Container: p.Info.Bkt.CID, + Creator: p.Info.Bkt.Owner, + Filepath: p.Info.Key, + CreationTime: TimeNow(ctx), + CopiesNumber: multipartInfo.CopiesNumber, + Multipart: &Multipart{ + SplitID: multipartInfo.SplitID, + SplitPreviousID: &splitPreviousID, + HeaderObject: header, + }, + Payload: bytes.NewBuffer(nil), } - var addr oid.Address - addr.SetContainer(p.Info.Bkt.CID) - for _, partInfo := range partsInfo { - if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { - n.log.Warn("could not delete upload part", - zap.Stringer("object id", &partInfo.OID), - zap.Stringer("bucket id", p.Info.Bkt.CID), - zap.Error(err)) - } - addr.SetObject(partInfo.OID) - n.cache.DeleteObject(addr) + lastPartObjID, _, err := n.objectPutAndHash(ctx, prm, p.Info.Bkt) + if err != nil { + return nil, nil, err } + children = append(children, lastPartObjID) + + // linking object + prm = PrmObjectCreate{ + Container: p.Info.Bkt.CID, + Creator: p.Info.Bkt.Owner, + CreationTime: TimeNow(ctx), + CopiesNumber: multipartInfo.CopiesNumber, + Multipart: &Multipart{ + SplitID: multipartInfo.SplitID, + HeaderObject: header, + Children: children, + }, + Payload: bytes.NewBuffer(nil), + } + + _, _, err = n.objectPutAndHash(ctx, prm, p.Info.Bkt) + if err != nil { + return nil, nil, err + } + + bktSettings, err := n.GetBucketSettings(ctx, p.Info.Bkt) + if err != nil { + return nil, nil, fmt.Errorf("couldn't get versioning settings object: %w", err) + } + + headerObjectID, _ := header.ID() + + // the "big object" is not presented in system, but we have to put correct info about it and its version. + + newVersion := &data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + FilePath: p.Info.Key, + Size: multipartObjetSize, + OID: headerObjectID, + ETag: hex.EncodeToString(multipartHash.Sum(nil)), + }, + IsUnversioned: !bktSettings.VersioningEnabled(), + } + + if newVersion.ID, err = n.treeService.AddVersion(ctx, p.Info.Bkt, newVersion); err != nil { + return nil, nil, fmt.Errorf("couldn't add multipart new verion to tree service: %w", err) + } + + n.cache.CleanListCacheEntriesContainingObject(p.Info.Key, p.Info.Bkt.CID) + + objInfo := &data.ObjectInfo{ + ID: headerObjectID, + CID: p.Info.Bkt.CID, + Owner: p.Info.Bkt.Owner, + Bucket: p.Info.Bkt.Name, + Name: p.Info.Key, + Size: multipartObjetSize, + Created: prm.CreationTime, + Headers: initMetadata, + ContentType: initMetadata[api.ContentType], + HashSum: newVersion.ETag, + } + + extObjInfo := &data.ExtendedObjectInfo{ + ObjectInfo: objInfo, + NodeVersion: newVersion, + } + + n.cache.PutObjectWithName(p.Info.Bkt.Owner, extObjInfo) + return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID) } diff --git a/api/layer/neofs.go b/api/layer/neofs.go index 402f6860..dd1fee0c 100644 --- a/api/layer/neofs.go +++ b/api/layer/neofs.go @@ -3,6 +3,7 @@ package layer import ( "context" "errors" + "hash" "io" "time" @@ -114,6 +115,23 @@ type PrmObjectCreate struct { // Number of object copies that is enough to consider put successful. CopiesNumber uint32 + + Multipart *Multipart +} + +// Multipart contains info for local object slicing inside s3-gate during multipart upload operation. +type Multipart struct { + // MultipartHashes contains hashes for the multipart object payload calculation (optional). + MultipartHashes []hash.Hash + // SplitID contains splitID for multipart object (optional). + SplitID string + // SplitPreviousID contains [oid.ID] of previous object in chain (optional). + SplitPreviousID *oid.ID + // Children contains all objects in multipart chain, for linking object (optional). + Children []oid.ID + // HeaderObject is a virtual representation of complete multipart object (optional). It is used to set Parent in + // linking object. + HeaderObject *object.Object } // PrmObjectDelete groups parameters of NeoFS.DeleteObject operation. @@ -208,6 +226,9 @@ type NeoFS interface { // prevented the container from being created. CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) + // FinalizeObjectWithPayloadChecksums fills and signs header object for complete multipart object. + FinalizeObjectWithPayloadChecksums(context.Context, object.Object, hash.Hash, hash.Hash, uint64) (*object.Object, error) + // DeleteObject marks the object to be removed from the NeoFS container by identifier. // Successful return does not guarantee actual removal. // @@ -223,4 +244,10 @@ type NeoFS interface { // // It returns any error encountered which prevented computing epochs. TimeToEpoch(ctx context.Context, now time.Time, future time.Time) (uint64, uint64, error) + + // MaxObjectSize returns configured payload size limit for object slicing when enabled. + MaxObjectSize() int64 + + // IsHomomorphicHashingEnabled shows if homomorphic hashing is enabled in config. + IsHomomorphicHashingEnabled() bool } diff --git a/api/layer/neofs_mock.go b/api/layer/neofs_mock.go index 868e5717..7c24e105 100644 --- a/api/layer/neofs_mock.go +++ b/api/layer/neofs_mock.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "errors" "fmt" + "hash" "io" "time" @@ -220,6 +221,10 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID return objID, nil } +func (t *TestNeoFS) FinalizeObjectWithPayloadChecksums(_ context.Context, header object.Object, _ hash.Hash, _ hash.Hash, _ uint64) (*object.Object, error) { + return &header, nil +} + func (t *TestNeoFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { var addr oid.Address addr.SetContainer(prm.Container) @@ -241,6 +246,15 @@ func (t *TestNeoFS) TimeToEpoch(_ context.Context, now, futureTime time.Time) (u return t.currentEpoch, t.currentEpoch + uint64(futureTime.Sub(now).Seconds()), nil } +func (t *TestNeoFS) MaxObjectSize() int64 { + // 64 MB + return 67108864 +} + +func (t *TestNeoFS) IsHomomorphicHashingEnabled() bool { + return false +} + func (t *TestNeoFS) AllObjects(cnrID cid.ID) []oid.ID { result := make([]oid.ID, 0, len(t.objects)) diff --git a/api/layer/object.go b/api/layer/object.go index 173d9188..7ec1ecf2 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "hash" "io" "mime" "path/filepath" @@ -13,6 +14,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/minio/sio" "github.com/nspcc-dev/neofs-s3-gw/api" @@ -23,6 +25,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -309,6 +312,62 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend return extendedObjInfo, nil } +func (n *layer) prepareMultipartHeadObject(ctx context.Context, p *PutObjectParams, payloadHash hash.Hash, homoHash hash.Hash, payloadLength uint64) (*object.Object, error) { + var ( + err error + owner = n.Owner(ctx) + ) + + if p.Encryption.Enabled() { + p.Header[AttributeDecryptedSize] = strconv.FormatInt(p.Size, 10) + if err = addEncryptionHeaders(p.Header, p.Encryption); err != nil { + return nil, fmt.Errorf("add encryption header: %w", err) + } + + var encSize uint64 + if _, encSize, err = encryptionReader(p.Reader, uint64(p.Size), p.Encryption.Key()); err != nil { + return nil, fmt.Errorf("create encrypter: %w", err) + } + p.Size = int64(encSize) + } + + var headerObject object.Object + headerObject.SetContainerID(p.BktInfo.CID) + headerObject.SetType(object.TypeRegular) + headerObject.SetOwnerID(&owner) + + currentVersion := version.Current() + headerObject.SetVersion(¤tVersion) + + attributes := make([]object.Attribute, 0, len(p.Header)) + for k, v := range p.Header { + if v == "" { + return nil, ErrMetaEmptyParameterValue + } + + attributes = append(attributes, *object.NewAttribute(k, v)) + } + + creationTime := TimeNow(ctx) + if creationTime.IsZero() { + creationTime = time.Now() + } + attributes = append(attributes, *object.NewAttribute(object.AttributeTimestamp, strconv.FormatInt(creationTime.Unix(), 10))) + + if p.Object != "" { + attributes = append(attributes, *object.NewAttribute(object.AttributeFilePath, p.Object)) + } + + headerObject.SetAttributes(attributes...) + + multipartHeader, err := n.neoFS.FinalizeObjectWithPayloadChecksums(ctx, headerObject, payloadHash, homoHash, payloadLength) + if err != nil { + return nil, fmt.Errorf("FinalizeObjectWithPayloadChecksums: %w", err) + } + + return multipartHeader, nil +} + func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ExtendedObjectInfo, error) { owner := n.Owner(ctx) if extObjInfo := n.cache.GetLastObject(owner, bkt.Name, objectName); extObjInfo != nil { @@ -416,6 +475,11 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn hash := sha256.New() prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) { hash.Write(buf) + if prm.Multipart != nil { + for _, h := range prm.Multipart.MultipartHashes { + h.Write(buf) + } + } }) id, err := n.neoFS.CreateObject(ctx, prm) if err != nil { diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index e871c415..3700a39e 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/data" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "golang.org/x/exp/slices" ) type TreeServiceMock struct { @@ -362,6 +363,36 @@ LOOP: return result, nil } +func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { + parts, err := t.GetParts(ctx, bktInfo, multipartNodeID) + if err != nil { + return nil, fmt.Errorf("get parts: %w", err) + } + + if len(parts) == 0 { + return nil, ErrPartListIsEmpty + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(parts, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return parts[len(parts)-1], nil +} + func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 4a30910d..8db3ffb5 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -74,6 +74,11 @@ type TreeService interface { // If object id to remove is not found returns ErrNoNodeToRemove error. AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) + // GetLastPart returns the latest uploaded part. + // + // Return errors: + // - [ErrPartListIsEmpty] if there is no parts in the upload id. + GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) // Compound methods for optimizations @@ -90,4 +95,7 @@ var ( // ErrNoNodeToRemove is returned from Tree service in case of the lack of node with OID to remove. ErrNoNodeToRemove = errors.New("no node to remove") + + // ErrPartListIsEmpty is returned if no parts available for the upload. + ErrPartListIsEmpty = errors.New("part list is empty") ) diff --git a/go.mod b/go.mod index def8e9e8..8910d79c 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240125143754-70b1ffbd8141 github.com/nspcc-dev/neofs-contract v0.19.1 github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240130135633-cb11d035a4ed + github.com/nspcc-dev/tzhash v1.8.0 github.com/panjf2000/ants/v2 v2.5.0 github.com/prometheus/client_golang v1.13.0 github.com/spf13/pflag v1.0.5 @@ -35,7 +36,6 @@ require ( github.com/klauspost/compress v1.17.0 // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20240112074137-296698a162ae // indirect github.com/nspcc-dev/hrw/v2 v2.0.0-20231115095647-bf62f4ad0a43 // indirect - github.com/nspcc-dev/tzhash v1.8.0 // indirect github.com/pelletier/go-toml/v2 v2.1.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index 44a2c20f..40993933 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -3,9 +3,11 @@ package neofs import ( "bytes" "context" + "crypto/sha256" "encoding/hex" "errors" "fmt" + "hash" "io" "math" "strconv" @@ -16,6 +18,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-s3-gw/authmate" "github.com/nspcc-dev/neofs-s3-gw/creds/tokens" + "github.com/nspcc-dev/neofs-sdk-go/checksum" "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/container" @@ -30,6 +33,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/waiter" + "github.com/nspcc-dev/tzhash/tz" ) // Config allows to configure some [NeoFS] parameters. @@ -270,6 +274,32 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi obj.SetAttributes(attrs...) obj.SetPayloadSize(prm.PayloadSize) + if prm.Multipart != nil && prm.Multipart.SplitID != "" { + var split object.SplitID + if err := split.Parse(prm.Multipart.SplitID); err != nil { + return oid.ID{}, fmt.Errorf("parse split ID: %w", err) + } + obj.SetSplitID(&split) + + if prm.Multipart.SplitPreviousID != nil { + obj.SetPreviousID(*prm.Multipart.SplitPreviousID) + } + + if len(prm.Multipart.Children) > 0 { + obj.SetChildren(prm.Multipart.Children...) + } + + if prm.Multipart.HeaderObject != nil { + id, isSet := prm.Multipart.HeaderObject.ID() + if !isSet { + return oid.ID{}, errors.New("HeaderObject id is not set") + } + + obj.SetParentID(id) + obj.SetParent(prm.Multipart.HeaderObject) + } + } + if len(prm.Locks) > 0 { var lock object.Lock lock.WriteMembers(prm.Locks) @@ -345,6 +375,34 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi return writer.GetResult().StoredObjectID(), nil } +// FinalizeObjectWithPayloadChecksums implements neofs.NeoFS interface method. +func (x *NeoFS) FinalizeObjectWithPayloadChecksums(ctx context.Context, header object.Object, metaChecksum hash.Hash, homomorphicChecksum hash.Hash, payloadLength uint64) (*object.Object, error) { + header.SetCreationEpoch(x.epochGetter.CurrentEpoch()) + + var cs checksum.Checksum + + var csBytes [sha256.Size]byte + copy(csBytes[:], metaChecksum.Sum(nil)) + + cs.SetSHA256(csBytes) + header.SetPayloadChecksum(cs) + + if homomorphicChecksum != nil { + var csHomoBytes [tz.Size]byte + copy(csHomoBytes[:], homomorphicChecksum.Sum(nil)) + + cs.SetTillichZemor(csHomoBytes) + header.SetPayloadHomomorphicHash(cs) + } + + header.SetPayloadSize(payloadLength) + if err := header.SetIDWithSignature(x.signer(ctx)); err != nil { + return nil, fmt.Errorf("setIDWithSignature: %w", err) + } + + return &header, nil +} + // wraps io.ReadCloser and transforms Read errors related to access violation // to neofs.ErrAccessDenied. type payloadReader struct { @@ -468,6 +526,16 @@ func (x *NeoFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) err return nil } +// MaxObjectSize returns configured payload size limit for object slicing when enabled. +func (x *NeoFS) MaxObjectSize() int64 { + return x.cfg.MaxObjectSize +} + +// IsHomomorphicHashingEnabled shows if homomorphic hashing is enabled in config. +func (x *NeoFS) IsHomomorphicHashingEnabled() bool { + return x.cfg.IsHomomorphicEnabled +} + func isErrAccessDenied(err error) (string, bool) { unwrappedErr := errors.Unwrap(err) for unwrappedErr != nil { diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 80d7ee49..14eac6c6 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -17,6 +17,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/internal/neofs/services/tree" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -59,6 +60,9 @@ const ( partNumberKV = "Number" sizeKV = "Size" etagKV = "ETag" + multipartHashKV = "MultipartHashes" + homoHashKV = "HomoHash" + elementsKV = "Elements" // keys for lock. isLockKV = "IsLock" @@ -269,6 +273,21 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) { return nil, fmt.Errorf("invalid server created timestamp: %w", err) } partInfo.ServerCreated = time.UnixMilli(utcMilli) + case multipartHashKV: + partInfo.MultipartHash = []byte(value) + case homoHashKV: + partInfo.HomoHash = []byte(value) + case elementsKV: + elements := strings.Split(value, ",") + partInfo.Elements = make([]oid.ID, len(elements)) + for i, e := range elements { + var id oid.ID + if err = id.DecodeString(e); err != nil { + return nil, fmt.Errorf("invalid oid: %w", err) + } + + partInfo.Elements[i] = id + } } } @@ -912,6 +931,11 @@ func (c *TreeClient) AddPart(ctx context.Context, bktInfo *data.BucketInfo, mult return oid.ID{}, err } + elements := make([]string, len(info.Elements)) + for i, e := range info.Elements { + elements[i] = e.String() + } + meta := map[string]string{ partNumberKV: strconv.Itoa(info.Number), oidKV: info.OID.EncodeToString(), @@ -919,6 +943,9 @@ func (c *TreeClient) AddPart(ctx context.Context, bktInfo *data.BucketInfo, mult createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10), serverCreatedKV: strconv.FormatInt(time.Now().UTC().UnixMilli(), 10), etagKV: info.ETag, + multipartHashKV: string(info.MultipartHash), + homoHashKV: string(info.HomoHash), + elementsKV: strings.Join(elements, ","), } var foundPartID uint64 @@ -968,6 +995,36 @@ func (c *TreeClient) GetParts(ctx context.Context, bktInfo *data.BucketInfo, mul return result, nil } +func (c *TreeClient) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { + parts, err := c.GetParts(ctx, bktInfo, multipartNodeID) + if err != nil { + return nil, fmt.Errorf("get parts: %w", err) + } + + if len(parts) == 0 { + return nil, layer.ErrPartListIsEmpty + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(parts, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return parts[len(parts)-1], nil +} + func (c *TreeClient) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { return c.removeNode(ctx, bktInfo, systemTree, multipartNodeID) } From a748b727d957c8c1e610cdbda8b256fbb7849e71 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Thu, 22 Feb 2024 14:36:26 +0400 Subject: [PATCH 5/7] layer: Remove unused reader Signed-off-by: Evgenii Baidakov --- api/layer/multipart_upload.go | 39 ----------------------------------- 1 file changed, 39 deletions(-) diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index ca53c13b..26d6a494 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -430,45 +430,6 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. return n.uploadPart(ctx, multipartInfo, params) } -// implements io.Reader of payloads of the object list stored in the NeoFS network. -type multiObjectReader struct { - ctx context.Context - - layer *layer - - prm getParams - - curReader io.Reader - - parts []*data.PartInfo -} - -func (x *multiObjectReader) Read(p []byte) (n int, err error) { - if x.curReader != nil { - n, err = x.curReader.Read(p) - if !errors.Is(err, io.EOF) { - return n, err - } - } - - if len(x.parts) == 0 { - return n, io.EOF - } - - x.prm.oid = x.parts[0].OID - - x.curReader, err = x.layer.initObjectPayloadReader(x.ctx, x.prm) - if err != nil { - return n, fmt.Errorf("init payload reader for the next part: %w", err) - } - - x.parts = x.parts[1:] - - next, err := x.Read(p[n:]) - - return n + next, err -} - func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) { for i := 1; i < len(p.Parts); i++ { if p.Parts[i].PartNumber <= p.Parts[i-1].PartNumber { From 2548e50618f628b13573a54e7264e512eb72f964 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Thu, 22 Feb 2024 13:58:44 +0400 Subject: [PATCH 6/7] layer: Actualize multipart tests Signed-off-by: Evgenii Baidakov --- api/handler/handlers_test.go | 8 +- api/layer/neofs_mock.go | 153 +++++++++++++++++++++++++++++++---- api/layer/versioning_test.go | 2 +- 3 files changed, 143 insertions(+), 20 deletions(-) diff --git a/api/handler/handlers_test.go b/api/handler/handlers_test.go index 3cfa13d3..285e18a6 100644 --- a/api/handler/handlers_test.go +++ b/api/handler/handlers_test.go @@ -78,14 +78,14 @@ func prepareHandlerContext(t *testing.T) *handlerContext { require.NoError(t, err) anonSigner := user.NewAutoIDSignerRFC6979(anonKey.PrivateKey) + signer := user.NewAutoIDSignerRFC6979(key.PrivateKey) + owner := signer.UserID() + l := zap.NewExample() - tp := layer.NewTestNeoFS() + tp := layer.NewTestNeoFS(signer) testResolver := &contResolver{layer: tp} - signer := user.NewAutoIDSignerRFC6979(key.PrivateKey) - owner := signer.UserID() - layerCfg := &layer.Config{ Caches: layer.DefaultCachesConfigs(zap.NewExample()), GateKey: key, diff --git a/api/layer/neofs_mock.go b/api/layer/neofs_mock.go index 7c24e105..6ac92f23 100644 --- a/api/layer/neofs_mock.go +++ b/api/layer/neofs_mock.go @@ -17,11 +17,13 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/checksum" "github.com/nspcc-dev/neofs-sdk-go/container" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/session" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/tzhash/tz" ) type TestNeoFS struct { @@ -31,13 +33,15 @@ type TestNeoFS struct { containers map[string]*container.Container eaclTables map[string]*eacl.Table currentEpoch uint64 + signer neofscrypto.Signer } -func NewTestNeoFS() *TestNeoFS { +func NewTestNeoFS(signer neofscrypto.Signer) *TestNeoFS { return &TestNeoFS{ objects: make(map[string]*object.Object), containers: make(map[string]*container.Container), eaclTables: make(map[string]*eacl.Table), + signer: signer, } } @@ -144,26 +148,97 @@ func (t *TestNeoFS) ReadObject(ctx context.Context, prm PrmObjectRead) (*ObjectP sAddr := addr.EncodeToString() - if obj, ok := t.objects[sAddr]; ok { - owner := getOwner(ctx) - if !obj.OwnerID().Equals(owner) { - return nil, ErrAccessDenied + obj, ok := t.objects[sAddr] + if !ok { + // trying to find linking object. + for _, o := range t.objects { + parentID, isSet := o.ParentID() + if !isSet { + continue + } + + if !parentID.Equals(prm.Object) { + continue + } + + if len(o.Children()) == 0 { + continue + } + + // linking object is found. + objPart, err := t.constructMupltipartObject(ctx, prm.Container, o) + if err != nil { + return nil, err + } + + obj = objPart.Head + + pl, err := io.ReadAll(objPart.Payload) + if err != nil { + return nil, err + } + + obj.SetPayload(pl) + ok = true + break } + } + + if !ok { + return nil, fmt.Errorf("object not found %s", addr) + } + + owner := getOwner(ctx) + if !obj.OwnerID().Equals(owner) { + return nil, ErrAccessDenied + } + + payload := obj.Payload() + + if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 { + off := prm.PayloadRange[0] + payload = payload[off : off+prm.PayloadRange[1]] + } + + return &ObjectPart{ + Head: obj, + Payload: io.NopCloser(bytes.NewReader(payload)), + }, nil +} + +func (t *TestNeoFS) constructMupltipartObject(ctx context.Context, containerID cid.ID, linkingObject *object.Object) (*ObjectPart, error) { + if _, isSet := linkingObject.ParentID(); !isSet { + return nil, fmt.Errorf("linking object is invalid") + } + + var ( + addr oid.Address + headObject = linkingObject.Parent() + payloadReaders = make([]io.Reader, 0, len(linkingObject.Children())) + childList = linkingObject.Children() + ) + + addr.SetContainer(containerID) + + for _, c := range childList { + addr.SetObject(c) - payload := obj.Payload() + objPart, err := t.ReadObject(ctx, PrmObjectRead{ + Container: containerID, + Object: c, + }) - if prm.PayloadRange[0]+prm.PayloadRange[1] > 0 { - off := prm.PayloadRange[0] - payload = payload[off : off+prm.PayloadRange[1]] + if err != nil { + return nil, fmt.Errorf("child read: %w", err) } - return &ObjectPart{ - Head: obj, - Payload: io.NopCloser(bytes.NewReader(payload)), - }, nil + payloadReaders = append(payloadReaders, objPart.Payload) } - return nil, fmt.Errorf("object not found %s", addr) + return &ObjectPart{ + Head: headObject, + Payload: io.NopCloser(io.MultiReader(payloadReaders...)), + }, nil } func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID, error) { @@ -195,6 +270,32 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID obj.SetOwnerID(&prm.Creator) t.currentEpoch++ + if prm.Multipart != nil && prm.Multipart.SplitID != "" { + var split object.SplitID + if err := split.Parse(prm.Multipart.SplitID); err != nil { + return oid.ID{}, fmt.Errorf("split parse: %w", err) + } + obj.SetSplitID(&split) + + if prm.Multipart.SplitPreviousID != nil { + obj.SetPreviousID(*prm.Multipart.SplitPreviousID) + } + + if len(prm.Multipart.Children) > 0 { + obj.SetChildren(prm.Multipart.Children...) + } + + if prm.Multipart.HeaderObject != nil { + id, isSet := prm.Multipart.HeaderObject.ID() + if !isSet { + return oid.ID{}, errors.New("HeaderObject id is not set") + } + + obj.SetParentID(id) + obj.SetParent(prm.Multipart.HeaderObject) + } + } + if len(prm.Locks) > 0 { var lock object.Lock lock.WriteMembers(prm.Locks) @@ -221,7 +322,29 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID return objID, nil } -func (t *TestNeoFS) FinalizeObjectWithPayloadChecksums(_ context.Context, header object.Object, _ hash.Hash, _ hash.Hash, _ uint64) (*object.Object, error) { +func (t *TestNeoFS) FinalizeObjectWithPayloadChecksums(_ context.Context, header object.Object, metaChecksum hash.Hash, homomorphicChecksum hash.Hash, payloadLength uint64) (*object.Object, error) { + header.SetCreationEpoch(t.currentEpoch) + + var cs checksum.Checksum + + var csBytes [sha256.Size]byte + copy(csBytes[:], metaChecksum.Sum(nil)) + + cs.SetSHA256(csBytes) + header.SetPayloadChecksum(cs) + + if homomorphicChecksum != nil { + var csHomoBytes [tz.Size]byte + copy(csHomoBytes[:], homomorphicChecksum.Sum(nil)) + + cs.SetTillichZemor(csHomoBytes) + header.SetPayloadHomomorphicHash(cs) + } + + header.SetPayloadSize(payloadLength) + if err := header.SetIDWithSignature(t.signer); err != nil { + return nil, fmt.Errorf("setIDWithSignature: %w", err) + } return &header, nil } diff --git a/api/layer/versioning_test.go b/api/layer/versioning_test.go index fc79ccb3..147a94e2 100644 --- a/api/layer/versioning_test.go +++ b/api/layer/versioning_test.go @@ -152,7 +152,7 @@ func prepareContext(t *testing.T, cachesConfig ...*CachesConfig) *testContext { GateKey: key.PublicKey(), }, }) - tp := NewTestNeoFS() + tp := NewTestNeoFS(signer) bktName := "testbucket1" bktID, err := tp.CreateContainer(ctx, PrmContainerCreate{ From 127dd3e7fc7bce49c865eb5a3f577eb846a463c2 Mon Sep 17 00:00:00 2001 From: Evgenii Baidakov Date: Tue, 19 Mar 2024 10:38:56 +0400 Subject: [PATCH 7/7] layer: Re-upload all parts after current one It is required, because we have to re-evaluate object payload hash. Signed-off-by: Evgenii Baidakov --- api/layer/multipart_upload.go | 58 ++++++++++++++++++++++++++++- api/layer/tree_mock.go | 54 +++++++++++++++++++++++++++ api/layer/tree_service.go | 1 + internal/neofs/tree.go | 69 +++++++++++++++++++++++++++++++++++ 4 files changed, 181 insertions(+), 1 deletion(-) diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 26d6a494..141caf0b 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -196,6 +196,10 @@ func (n *layer) UploadPart(ctx context.Context, p *UploadPartParams) (string, er return "", err } + if err = n.reUploadFollowingParts(ctx, *p, p.PartNumber, p.Info.Bkt, multipartInfo); err != nil { + return "", fmt.Errorf("reuploading parts: %w", err) + } + return objInfo.HashSum, nil } @@ -385,6 +389,49 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf return objInfo, nil } +func (n *layer) reUploadFollowingParts(ctx context.Context, uploadParams UploadPartParams, partID int, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error { + parts, err := n.treeService.GetPartsAfter(ctx, bktInfo, multipartInfo.ID, partID) + if err != nil { + // nothing to re-upload. + if errors.Is(err, ErrPartListIsEmpty) { + return nil + } + + return fmt.Errorf("get parts after: %w", err) + } + + for _, part := range parts { + uploadParams.PartNumber = part.Number + + if err = n.reUploadPart(ctx, uploadParams, part.OID, bktInfo, multipartInfo); err != nil { + return fmt.Errorf("reupload number=%d: %w", part.Number, err) + } + } + + return nil +} + +func (n *layer) reUploadPart(ctx context.Context, uploadParams UploadPartParams, id oid.ID, bktInfo *data.BucketInfo, multipartInfo *data.MultipartInfo) error { + obj, err := n.objectGet(ctx, bktInfo, id) + if err != nil { + return fmt.Errorf("get id=%s: %w", id.String(), err) + } + + uploadParams.Size = int64(obj.PayloadSize()) + uploadParams.Reader = bytes.NewReader(obj.Payload()) + + if _, err = n.uploadPart(ctx, multipartInfo, &uploadParams); err != nil { + return fmt.Errorf("upload id=%s: %w", id.String(), err) + } + + // remove old object, we just re-uploaded a new one. + if err = n.objectDelete(ctx, bktInfo, id); err != nil { + return fmt.Errorf("delete old id=%s: %w", id.String(), err) + } + + return nil +} + func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data.ObjectInfo, error) { multipartInfo, err := n.treeService.GetMultipartUpload(ctx, p.Info.Bkt, p.Info.Key, p.Info.UploadID) if err != nil { @@ -427,7 +474,16 @@ func (n *layer) UploadPartCopy(ctx context.Context, p *UploadCopyParams) (*data. Reader: pr, } - return n.uploadPart(ctx, multipartInfo, params) + objInfo, err := n.uploadPart(ctx, multipartInfo, params) + if err != nil { + return nil, fmt.Errorf("upload part: %w", err) + } + + if err = n.reUploadFollowingParts(ctx, *params, p.PartNumber, p.Info.Bkt, multipartInfo); err != nil { + return nil, fmt.Errorf("reuploading parts: %w", err) + } + + return objInfo, nil } func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipartParams) (*UploadData, *data.ExtendedObjectInfo, error) { diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index 3700a39e..59ab5fdd 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -393,6 +393,60 @@ func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketI return parts[len(parts)-1], nil } +func (t *TreeServiceMock) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) { + parts, err := t.GetParts(ctx, bktInfo, multipartNodeID) + if err != nil { + return nil, err + } + + mp := make(map[int]*data.PartInfo) + for _, partInfo := range parts { + if partInfo.Number <= partID { + continue + } + + mapped, ok := mp[partInfo.Number] + if !ok { + mp[partInfo.Number] = partInfo + continue + } + + if mapped.ServerCreated.After(partInfo.ServerCreated) { + continue + } + + mp[partInfo.Number] = partInfo + } + + if len(mp) == 0 { + return nil, ErrPartListIsEmpty + } + + result := make([]*data.PartInfo, 0, len(mp)) + for _, p := range mp { + result = append(result, p) + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(result, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return result, nil +} + func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 8db3ffb5..4494cf3b 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -79,6 +79,7 @@ type TreeService interface { // Return errors: // - [ErrPartListIsEmpty] if there is no parts in the upload id. GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) + GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) // Compound methods for optimizations diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 14eac6c6..76fa7aba 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -1025,6 +1025,75 @@ func (c *TreeClient) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, return parts[len(parts)-1], nil } +// GetPartsAfter returns parts uploaded after partID. These parts are sorted and filtered by creation time. +// It means, if any upload had a re-uploaded data (few part versions), the list contains only the latest version of the upload. +func (c *TreeClient) GetPartsAfter(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, partID int) ([]*data.PartInfo, error) { + parts, err := c.getSubTree(ctx, bktInfo, systemTree, multipartNodeID, 2) + if err != nil { + return nil, err + } + + if len(parts) == 0 { + return nil, layer.ErrPartListIsEmpty + } + + mp := make(map[int]*data.PartInfo) + for _, part := range parts { + if part.GetNodeId() == multipartNodeID { + continue + } + + partInfo, err := newPartInfo(part) + if err != nil { + continue + } + + if partInfo.Number <= partID { + continue + } + + mapped, ok := mp[partInfo.Number] + if !ok { + mp[partInfo.Number] = partInfo + continue + } + + if mapped.ServerCreated.After(partInfo.ServerCreated) { + continue + } + + mp[partInfo.Number] = partInfo + } + + if len(mp) == 0 { + return nil, layer.ErrPartListIsEmpty + } + + result := make([]*data.PartInfo, 0, len(mp)) + for _, p := range mp { + result = append(result, p) + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(result, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return result, nil +} + func (c *TreeClient) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { return c.removeNode(ctx, bktInfo, systemTree, multipartNodeID) }