diff --git a/api/data/tree.go b/api/data/tree.go index 5c1e8cb0..9cb3ded4 100644 --- a/api/data/tree.go +++ b/api/data/tree.go @@ -90,6 +90,13 @@ type PartInfo struct { Created time.Time // Server creation time. ServerCreated time.Time + + // MultipartHash contains internal state of the [hash.Hash] to calculate whole object payload hash. + MultipartHash []byte + // HomoHash contains internal state of the [hash.Hash] to calculate whole object homomorphic payload hash. + HomoHash []byte + // Elements contain [oid.ID] object list for the current part. + Elements []string } // ToHeaderString form short part representation to use in S3-Completed-Parts header. diff --git a/api/layer/layer.go b/api/layer/layer.go index 16149098..1cae32ea 100644 --- a/api/layer/layer.go +++ b/api/layer/layer.go @@ -9,6 +9,7 @@ import ( "net/url" "strconv" "strings" + "sync" "time" "github.com/nats-io/nats.go" @@ -50,6 +51,7 @@ type ( ncontroller EventListener cache *Cache treeService TreeService + buffers *sync.Pool } Config struct { @@ -266,6 +268,12 @@ func (f MsgHandlerFunc) HandleMessage(ctx context.Context, msg *nats.Msg) error // NewLayer creates an instance of a layer. It checks credentials // and establishes gRPC connection with the node. func NewLayer(log *zap.Logger, neoFS NeoFS, config *Config) Client { + buffers := sync.Pool{} + buffers.New = func() any { + b := make([]byte, neoFS.MaxObjectSize()) + return &b + } + return &layer{ neoFS: neoFS, log: log, @@ -273,6 +281,7 @@ func NewLayer(log *zap.Logger, neoFS NeoFS, config *Config) Client { resolver: config.Resolver, cache: NewCache(config.Caches), treeService: config.TreeService, + buffers: &buffers, } } diff --git a/api/layer/multipart_upload.go b/api/layer/multipart_upload.go index 96e05d93..3466f94c 100644 --- a/api/layer/multipart_upload.go +++ b/api/layer/multipart_upload.go @@ -1,10 +1,14 @@ package layer import ( + "bytes" "context" + "crypto/sha256" + "encoding" "encoding/hex" "errors" "fmt" + "hash" "io" "sort" "strconv" @@ -19,14 +23,13 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "github.com/nspcc-dev/tzhash/tz" "go.uber.org/zap" "golang.org/x/exp/slices" ) const ( - UploadIDAttributeName = "S3-Upload-Id" - UploadPartNumberAttributeName = "S3-Upload-Part-Number" - UploadCompletedParts = "S3-Completed-Parts" + UploadCompletedParts = "S3-Completed-Parts" metaPrefix = "meta-" aclPrefix = "acl-" @@ -203,34 +206,119 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf return nil, s3errors.GetAPIError(s3errors.ErrInvalidEncryptionParameters) } - bktInfo := p.Info.Bkt - prm := PrmObjectCreate{ - Container: bktInfo.CID, - Creator: bktInfo.Owner, - Attributes: make([][2]string, 2), - Payload: p.Reader, - CreationTime: TimeNow(ctx), - CopiesNumber: multipartInfo.CopiesNumber, - } + var ( + bktInfo = p.Info.Bkt + payloadReader = p.Reader + decSize = p.Size + attributes [][2]string + ) - decSize := p.Size if p.Info.Encryption.Enabled() { r, encSize, err := encryptionReader(p.Reader, uint64(p.Size), p.Info.Encryption.Key()) if err != nil { return nil, fmt.Errorf("failed to create ecnrypted reader: %w", err) } - prm.Attributes = append(prm.Attributes, [2]string{AttributeDecryptedSize, strconv.FormatInt(p.Size, 10)}) - prm.Payload = r + attributes = append(attributes, [2]string{AttributeDecryptedSize, strconv.FormatInt(p.Size, 10)}) + payloadReader = r p.Size = int64(encSize) } - prm.Attributes[0][0], prm.Attributes[0][1] = UploadIDAttributeName, p.Info.UploadID - prm.Attributes[1][0], prm.Attributes[1][1] = UploadPartNumberAttributeName, strconv.Itoa(p.PartNumber) + var ( + splitPreviousID oid.ID + isSetSplitPreviousID bool + multipartHash = sha256.New() + tzHash hash.Hash + ) - id, hash, err := n.objectPutAndHash(ctx, prm, bktInfo) + if n.neoFS.IsHomomorphicEnabled() { + tzHash = tz.New() + } + + lastPart, err := n.treeService.GetLastPart(ctx, bktInfo, multipartInfo.ID) if err != nil { - return nil, err + // if ErrPartListIsEmpty, there is the first part of multipart. + if !errors.Is(err, ErrPartListIsEmpty) { + return nil, fmt.Errorf("getLastPart: %w", err) + } + } else { + // try to restore hash state from the last part. + if binaryUnmarshaler, ok := multipartHash.(encoding.BinaryUnmarshaler); ok { + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil { + return nil, fmt.Errorf("unmarshalBinary: %w", err) + } + } + + if n.neoFS.IsHomomorphicEnabled() { + if binaryUnmarshaler, ok := tzHash.(encoding.BinaryUnmarshaler); ok { + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil { + return nil, fmt.Errorf("unmarshalBinary: %w", err) + } + } + } + + isSetSplitPreviousID = true + splitPreviousID = lastPart.OID + } + + var ( + id oid.ID + elements []string + creationTime = TimeNow(ctx) + // User may upload part large maxObjectSize in NeoFS. From users point of view it is a single object. + // We have to calculate the hash from this object separately. + currentPartHash = sha256.New() + ) + + multipartHashes := []hash.Hash{multipartHash, currentPartHash} + if tzHash != nil { + multipartHashes = append(multipartHashes, tzHash) + } + + prm := PrmObjectCreate{ + Container: bktInfo.CID, + Creator: bktInfo.Owner, + Attributes: attributes, + CreationTime: creationTime, + CopiesNumber: multipartInfo.CopiesNumber, + SplitID: multipartInfo.SplitID, + MultipartHashes: multipartHashes, + } + + dt := n.buffers.Get() + chunk := dt.(*[]byte) + + // slice part manually. Simultaneously considering the part is a single object for user. + for { + if isSetSplitPreviousID { + prm.SplitPreviousID = &splitPreviousID + } + + nBts, readErr := payloadReader.Read(*chunk) + if nBts > 0 { + prm.Payload = bytes.NewReader((*chunk)[:nBts]) + prm.PayloadSize = uint64(nBts) + + id, _, err = n.objectPutAndHash(ctx, prm, bktInfo) + if err != nil { + return nil, err + } + + isSetSplitPreviousID = true + splitPreviousID = id + elements = append(elements, id.String()) + } + + if readErr == nil { + continue + } + + if !errors.Is(readErr, io.EOF) { + return nil, fmt.Errorf("read payload chunk: %w", err) + } + + break } + n.buffers.Put(chunk) reqInfo := api.GetReqInfo(ctx) n.log.Debug("upload part", @@ -245,8 +333,28 @@ func (n *layer) uploadPart(ctx context.Context, multipartInfo *data.MultipartInf Number: p.PartNumber, OID: id, Size: decSize, - ETag: hex.EncodeToString(hash), + ETag: hex.EncodeToString(currentPartHash.Sum(nil)), Created: prm.CreationTime, + Elements: elements, + } + + // encoding hash.Hash state to save it in tree service. + if binaryMarshaler, ok := multipartHash.(encoding.BinaryMarshaler); ok { + partInfo.MultipartHash, err = binaryMarshaler.MarshalBinary() + + if err != nil { + return nil, fmt.Errorf("marshalBinary: %w", err) + } + } + + if n.neoFS.IsHomomorphicEnabled() { + if binaryMarshaler, ok := tzHash.(encoding.BinaryMarshaler); ok { + partInfo.HomoHash, err = binaryMarshaler.MarshalBinary() + + if err != nil { + return nil, fmt.Errorf("marshalBinary: %w", err) + } + } } oldPartID, err := n.treeService.AddPart(ctx, bktInfo, multipartInfo.ID, partInfo) @@ -380,8 +488,8 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar var multipartObjetSize int64 var encMultipartObjectSize uint64 - parts := make([]*data.PartInfo, 0, len(p.Parts)) - + var lastPartID int + var children []oid.ID var completedPartsHeader strings.Builder for i, part := range p.Parts { partInfo := partsInfo[part.PartNumber] @@ -392,7 +500,6 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar if i != len(p.Parts)-1 && partInfo.Size < uploadMinSize { return nil, nil, s3errors.GetAPIError(s3errors.ErrEntityTooSmall) } - parts = append(parts, partInfo) multipartObjetSize += partInfo.Size // even if encryption is enabled size is actual (decrypted) if encInfo.Enabled { @@ -410,6 +517,49 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar if _, err = completedPartsHeader.WriteString(partInfoStr); err != nil { return nil, nil, err } + + if part.PartNumber > lastPartID { + lastPartID = part.PartNumber + } + + var o oid.ID + for _, c := range partInfo.Elements { + if err = o.DecodeString(c); err != nil { + return nil, nil, fmt.Errorf("decode oid: %w", err) + } + + children = append(children, o) + } + } + + multipartHash := sha256.New() + var homoHash hash.Hash + var splitPreviousID oid.ID + + if lastPartID > 0 { + lastPart := partsInfo[lastPartID] + + if lastPart != nil { + if len(lastPart.MultipartHash) > 0 { + splitPreviousID = lastPart.OID + + if binaryUnmarshaler, ok := multipartHash.(encoding.BinaryUnmarshaler); ok && len(lastPart.MultipartHash) > 0 { + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.MultipartHash); err != nil { + return nil, nil, fmt.Errorf("unmarshalBinary: %w", err) + } + } + } + + if n.neoFS.IsHomomorphicEnabled() && len(lastPart.HomoHash) > 0 { + homoHash = tz.New() + + if binaryUnmarshaler, ok := homoHash.(encoding.BinaryUnmarshaler); ok && len(lastPart.MultipartHash) > 0 { + if err = binaryUnmarshaler.UnmarshalBinary(lastPart.HomoHash); err != nil { + return nil, nil, fmt.Errorf("unmarshalBinary: %w", err) + } + } + } + } } initMetadata := make(map[string]string, len(multipartInfo.Meta)+1) @@ -437,45 +587,104 @@ func (n *layer) CompleteMultipartUpload(ctx context.Context, p *CompleteMultipar multipartObjetSize = int64(encMultipartObjectSize) } - r := &multiObjectReader{ - ctx: ctx, - layer: n, - parts: parts, - } - - r.prm.bktInfo = p.Info.Bkt - - extObjInfo, err := n.PutObject(ctx, &PutObjectParams{ + // This is our "big object". It doesn't have any payload. + prmHeaderObject := &PutObjectParams{ BktInfo: p.Info.Bkt, Object: p.Info.Key, - Reader: r, + Reader: bytes.NewBuffer(nil), Header: initMetadata, Size: multipartObjetSize, Encryption: p.Info.Encryption, CopiesNumber: multipartInfo.CopiesNumber, - }) + } + + header, err := n.prepareMultipartHeadObject(ctx, prmHeaderObject, multipartHash, homoHash, uint64(multipartObjetSize)) if err != nil { - n.log.Error("could not put a completed object (multipart upload)", - zap.String("uploadID", p.Info.UploadID), - zap.String("uploadKey", p.Info.Key), - zap.Error(err)) + return nil, nil, err + } - return nil, nil, s3errors.GetAPIError(s3errors.ErrInternalError) + // last part + prm := PrmObjectCreate{ + Container: p.Info.Bkt.CID, + Creator: p.Info.Bkt.Owner, + Filepath: p.Info.Key, + CreationTime: TimeNow(ctx), + CopiesNumber: multipartInfo.CopiesNumber, + SplitID: multipartInfo.SplitID, + SplitPreviousID: &splitPreviousID, + HeaderObject: header, + Payload: bytes.NewBuffer(nil), + } + + lastPartObjID, _, err := n.objectPutAndHash(ctx, prm, p.Info.Bkt) + if err != nil { + return nil, nil, err } - var addr oid.Address - addr.SetContainer(p.Info.Bkt.CID) - for _, partInfo := range partsInfo { - if err = n.objectDelete(ctx, p.Info.Bkt, partInfo.OID); err != nil { - n.log.Warn("could not delete upload part", - zap.Stringer("object id", &partInfo.OID), - zap.Stringer("bucket id", p.Info.Bkt.CID), - zap.Error(err)) - } - addr.SetObject(partInfo.OID) - n.cache.DeleteObject(addr) + children = append(children, lastPartObjID) + + // linking object + prm = PrmObjectCreate{ + Container: p.Info.Bkt.CID, + Creator: p.Info.Bkt.Owner, + CreationTime: TimeNow(ctx), + CopiesNumber: multipartInfo.CopiesNumber, + SplitID: multipartInfo.SplitID, + HeaderObject: header, + Children: children, + Payload: bytes.NewBuffer(nil), + } + + _, _, err = n.objectPutAndHash(ctx, prm, p.Info.Bkt) + if err != nil { + return nil, nil, err + } + + bktSettings, err := n.GetBucketSettings(ctx, p.Info.Bkt) + if err != nil { + return nil, nil, fmt.Errorf("couldn't get versioning settings object: %w", err) + } + + headerObjectID, _ := header.ID() + + // the "big object" is not presented in system, but we have to put correct info about it and its version. + + newVersion := &data.NodeVersion{ + BaseNodeVersion: data.BaseNodeVersion{ + FilePath: p.Info.Key, + Size: multipartObjetSize, + OID: headerObjectID, + ETag: hex.EncodeToString(multipartHash.Sum(nil)), + }, + IsUnversioned: !bktSettings.VersioningEnabled(), + } + + if newVersion.ID, err = n.treeService.AddVersion(ctx, p.Info.Bkt, newVersion); err != nil { + return nil, nil, fmt.Errorf("couldn't add multipart new verion to tree service: %w", err) + } + + n.cache.CleanListCacheEntriesContainingObject(p.Info.Key, p.Info.Bkt.CID) + + objInfo := &data.ObjectInfo{ + ID: headerObjectID, + CID: p.Info.Bkt.CID, + Owner: p.Info.Bkt.Owner, + Bucket: p.Info.Bkt.Name, + Name: p.Info.Key, + Size: multipartObjetSize, + Created: prm.CreationTime, + Headers: initMetadata, + ContentType: initMetadata[api.ContentType], + HashSum: newVersion.ETag, } + extObjInfo := &data.ExtendedObjectInfo{ + ObjectInfo: objInfo, + NodeVersion: newVersion, + } + + n.cache.PutObjectWithName(p.Info.Bkt.Owner, extObjInfo) + return uploadData, extObjInfo, n.treeService.DeleteMultipartUpload(ctx, p.Info.Bkt, multipartInfo.ID) } diff --git a/api/layer/neofs.go b/api/layer/neofs.go index 402f6860..8d69c242 100644 --- a/api/layer/neofs.go +++ b/api/layer/neofs.go @@ -3,6 +3,7 @@ package layer import ( "context" "errors" + "hash" "io" "time" @@ -114,6 +115,19 @@ type PrmObjectCreate struct { // Number of object copies that is enough to consider put successful. CopiesNumber uint32 + + // Multipart fields should be used together. + // MultipartHashes contains hashes for the multipart object payload calculation (optional). + MultipartHashes []hash.Hash + // SplitID contains splitID for multipart object (optional). + SplitID string + // SplitPreviousID contains [oid.ID] of previous object in chain (optional). + SplitPreviousID *oid.ID + // Children contains all objects in multipart chain, for linking object (optional). + Children []oid.ID + // HeaderObject is a virtual representation of complete multipart object (optional). It is used to set Parent in + // linking object. + HeaderObject *object.Object } // PrmObjectDelete groups parameters of NeoFS.DeleteObject operation. @@ -208,6 +222,9 @@ type NeoFS interface { // prevented the container from being created. CreateObject(context.Context, PrmObjectCreate) (oid.ID, error) + // PrepareMulptipartHeader fills and signs header object for complete multipart object. + PrepareMulptipartHeader(context.Context, object.Object, hash.Hash, hash.Hash, uint64) (*object.Object, error) + // DeleteObject marks the object to be removed from the NeoFS container by identifier. // Successful return does not guarantee actual removal. // @@ -223,4 +240,10 @@ type NeoFS interface { // // It returns any error encountered which prevented computing epochs. TimeToEpoch(ctx context.Context, now time.Time, future time.Time) (uint64, uint64, error) + + // MaxObjectSize returns max single object size in bytes in NeoFS. + MaxObjectSize() int64 + + // IsHomomorphicEnabled shows if homomorphic hashing is enabled in NeoFS. + IsHomomorphicEnabled() bool } diff --git a/api/layer/neofs_mock.go b/api/layer/neofs_mock.go index 868e5717..b90ac335 100644 --- a/api/layer/neofs_mock.go +++ b/api/layer/neofs_mock.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "errors" "fmt" + "hash" "io" "time" @@ -220,6 +221,10 @@ func (t *TestNeoFS) CreateObject(_ context.Context, prm PrmObjectCreate) (oid.ID return objID, nil } +func (t *TestNeoFS) PrepareMulptipartHeader(_ context.Context, header object.Object, _ hash.Hash, _ hash.Hash, _ uint64) (*object.Object, error) { + return &header, nil +} + func (t *TestNeoFS) DeleteObject(ctx context.Context, prm PrmObjectDelete) error { var addr oid.Address addr.SetContainer(prm.Container) @@ -241,6 +246,15 @@ func (t *TestNeoFS) TimeToEpoch(_ context.Context, now, futureTime time.Time) (u return t.currentEpoch, t.currentEpoch + uint64(futureTime.Sub(now).Seconds()), nil } +func (t *TestNeoFS) MaxObjectSize() int64 { + // 64 MB + return 67108864 +} + +func (t *TestNeoFS) IsHomomorphicEnabled() bool { + return false +} + func (t *TestNeoFS) AllObjects(cnrID cid.ID) []oid.ID { result := make([]oid.ID, 0, len(t.objects)) diff --git a/api/layer/object.go b/api/layer/object.go index 173d9188..473e9e3c 100644 --- a/api/layer/object.go +++ b/api/layer/object.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "hash" "io" "mime" "path/filepath" @@ -13,6 +14,7 @@ import ( "strconv" "strings" "sync" + "time" "github.com/minio/sio" "github.com/nspcc-dev/neofs-s3-gw/api" @@ -23,6 +25,7 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/nspcc-dev/neofs-sdk-go/version" "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) @@ -309,6 +312,62 @@ func (n *layer) PutObject(ctx context.Context, p *PutObjectParams) (*data.Extend return extendedObjInfo, nil } +func (n *layer) prepareMultipartHeadObject(ctx context.Context, p *PutObjectParams, payloadHash hash.Hash, homoHash hash.Hash, payloadLength uint64) (*object.Object, error) { + var ( + err error + owner = n.Owner(ctx) + ) + + if p.Encryption.Enabled() { + p.Header[AttributeDecryptedSize] = strconv.FormatInt(p.Size, 10) + if err = addEncryptionHeaders(p.Header, p.Encryption); err != nil { + return nil, fmt.Errorf("add encryption header: %w", err) + } + + var encSize uint64 + if _, encSize, err = encryptionReader(p.Reader, uint64(p.Size), p.Encryption.Key()); err != nil { + return nil, fmt.Errorf("create encrypter: %w", err) + } + p.Size = int64(encSize) + } + + var headerObject object.Object + headerObject.SetContainerID(p.BktInfo.CID) + headerObject.SetType(object.TypeRegular) + headerObject.SetOwnerID(&owner) + + currentVersion := version.Current() + headerObject.SetVersion(¤tVersion) + + attributes := make([]object.Attribute, 0, len(p.Header)) + for k, v := range p.Header { + if v == "" { + return nil, ErrMetaEmptyParameterValue + } + + attributes = append(attributes, *object.NewAttribute(k, v)) + } + + creationTime := TimeNow(ctx) + if creationTime.IsZero() { + creationTime = time.Now() + } + attributes = append(attributes, *object.NewAttribute(object.AttributeTimestamp, strconv.FormatInt(creationTime.Unix(), 10))) + + if p.Object != "" { + attributes = append(attributes, *object.NewAttribute(object.AttributeFilePath, p.Object)) + } + + headerObject.SetAttributes(attributes...) + + multipartHeader, err := n.neoFS.PrepareMulptipartHeader(ctx, headerObject, payloadHash, homoHash, payloadLength) + if err != nil { + return nil, fmt.Errorf("PrepareMulptipartHeader: %w", err) + } + + return multipartHeader, nil +} + func (n *layer) headLastVersionIfNotDeleted(ctx context.Context, bkt *data.BucketInfo, objectName string) (*data.ExtendedObjectInfo, error) { owner := n.Owner(ctx) if extObjInfo := n.cache.GetLastObject(owner, bkt.Name, objectName); extObjInfo != nil { @@ -416,6 +475,9 @@ func (n *layer) objectPutAndHash(ctx context.Context, prm PrmObjectCreate, bktIn hash := sha256.New() prm.Payload = wrapReader(prm.Payload, 64*1024, func(buf []byte) { hash.Write(buf) + for _, h := range prm.MultipartHashes { + h.Write(buf) + } }) id, err := n.neoFS.CreateObject(ctx, prm) if err != nil { diff --git a/api/layer/tree_mock.go b/api/layer/tree_mock.go index e871c415..b68e11b6 100644 --- a/api/layer/tree_mock.go +++ b/api/layer/tree_mock.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/data" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "golang.org/x/exp/slices" ) type TreeServiceMock struct { @@ -362,6 +363,36 @@ LOOP: return result, nil } +func (t *TreeServiceMock) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { + parts, err := t.GetParts(ctx, bktInfo, multipartNodeID) + if err != nil { + return nil, fmt.Errorf("getParts: %w", err) + } + + if len(parts) == 0 { + return nil, ErrPartListIsEmpty + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(parts, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return parts[len(parts)-1], nil +} + func (t *TreeServiceMock) DeleteMultipartUpload(_ context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { cnrMultipartsMap := t.multiparts[bktInfo.CID.EncodeToString()] diff --git a/api/layer/tree_service.go b/api/layer/tree_service.go index 4a30910d..a39ac8f3 100644 --- a/api/layer/tree_service.go +++ b/api/layer/tree_service.go @@ -74,6 +74,7 @@ type TreeService interface { // If object id to remove is not found returns ErrNoNodeToRemove error. AddPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64, info *data.PartInfo) (oldObjIDToDelete oid.ID, err error) GetParts(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) ([]*data.PartInfo, error) + GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) // Compound methods for optimizations @@ -90,4 +91,7 @@ var ( // ErrNoNodeToRemove is returned from Tree service in case of the lack of node with OID to remove. ErrNoNodeToRemove = errors.New("no node to remove") + + // ErrPartListIsEmpty is returned if no parts available for the upload. + ErrPartListIsEmpty = errors.New("part list is empty") ) diff --git a/go.mod b/go.mod index 31c3f3e7..31575f3b 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,7 @@ require ( github.com/nspcc-dev/neofs-api-go/v2 v2.14.1-0.20240125143754-70b1ffbd8141 github.com/nspcc-dev/neofs-contract v0.19.1 github.com/nspcc-dev/neofs-sdk-go v1.0.0-rc.11.0.20240130135633-cb11d035a4ed + github.com/nspcc-dev/tzhash v1.8.0 github.com/panjf2000/ants/v2 v2.5.0 github.com/prometheus/client_golang v1.13.0 github.com/spf13/pflag v1.0.5 @@ -35,7 +36,6 @@ require ( github.com/klauspost/compress v1.17.0 // indirect github.com/nspcc-dev/go-ordered-json v0.0.0-20240112074137-296698a162ae // indirect github.com/nspcc-dev/hrw/v2 v2.0.0-20231115095647-bf62f4ad0a43 // indirect - github.com/nspcc-dev/tzhash v1.8.0 // indirect github.com/pelletier/go-toml/v2 v2.1.1 // indirect github.com/sagikazarmark/locafero v0.4.0 // indirect github.com/sagikazarmark/slog-shim v0.1.0 // indirect diff --git a/internal/neofs/neofs.go b/internal/neofs/neofs.go index 44a2c20f..a5a4a85b 100644 --- a/internal/neofs/neofs.go +++ b/internal/neofs/neofs.go @@ -3,9 +3,11 @@ package neofs import ( "bytes" "context" + "crypto/sha256" "encoding/hex" "errors" "fmt" + "hash" "io" "math" "strconv" @@ -16,11 +18,13 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/api/layer" "github.com/nspcc-dev/neofs-s3-gw/authmate" "github.com/nspcc-dev/neofs-s3-gw/creds/tokens" + "github.com/nspcc-dev/neofs-sdk-go/checksum" "github.com/nspcc-dev/neofs-sdk-go/client" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/container" "github.com/nspcc-dev/neofs-sdk-go/container/acl" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -30,6 +34,7 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/stat" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/waiter" + "github.com/nspcc-dev/tzhash/tz" ) // Config allows to configure some [NeoFS] parameters. @@ -270,6 +275,32 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi obj.SetAttributes(attrs...) obj.SetPayloadSize(prm.PayloadSize) + if prm.SplitID != "" { + var split object.SplitID + if err := split.Parse(prm.SplitID); err != nil { + return oid.ID{}, fmt.Errorf("split parse: %w", err) + } + obj.SetSplitID(&split) + + if prm.SplitPreviousID != nil { + obj.SetPreviousID(*prm.SplitPreviousID) + } + + if len(prm.Children) > 0 { + obj.SetChildren(prm.Children...) + } + + if prm.HeaderObject != nil { + id, isSet := prm.HeaderObject.ID() + if !isSet { + return oid.ID{}, errors.New("HeaderObject id is not set") + } + + obj.SetParentID(id) + obj.SetParent(prm.HeaderObject) + } + } + if len(prm.Locks) > 0 { var lock object.Lock lock.WriteMembers(prm.Locks) @@ -345,6 +376,52 @@ func (x *NeoFS) CreateObject(ctx context.Context, prm layer.PrmObjectCreate) (oi return writer.GetResult().StoredObjectID(), nil } +// PrepareMulptipartHeader implements neofs.NeoFS interface method. +func (x *NeoFS) PrepareMulptipartHeader(ctx context.Context, header object.Object, metaChecksum hash.Hash, homomorphicChecksum hash.Hash, payloadLength uint64) (*object.Object, error) { + header.SetCreationEpoch(x.epochGetter.CurrentEpoch()) + + var cs checksum.Checksum + + var csBytes [sha256.Size]byte + copy(csBytes[:], metaChecksum.Sum(nil)) + + cs.SetSHA256(csBytes) + header.SetPayloadChecksum(cs) + + if homomorphicChecksum != nil { + var csHomoBytes [tz.Size]byte + copy(csHomoBytes[:], homomorphicChecksum.Sum(nil)) + + cs.SetTillichZemor(csHomoBytes) + header.SetPayloadHomomorphicHash(cs) + } + + header.SetPayloadSize(payloadLength) + + id, err := header.CalculateID() + if err != nil { + return nil, fmt.Errorf("calculate ID: %w", err) + } + + header.SetID(id) + + bID, err := id.Marshal() + if err != nil { + return nil, fmt.Errorf("marshal object ID: %w", err) + } + + var sig neofscrypto.Signature + + err = sig.Calculate(x.signer(ctx), bID) + if err != nil { + return nil, fmt.Errorf("sign object ID: %w", err) + } + + header.SetSignature(&sig) + + return &header, nil +} + // wraps io.ReadCloser and transforms Read errors related to access violation // to neofs.ErrAccessDenied. type payloadReader struct { @@ -468,6 +545,16 @@ func (x *NeoFS) DeleteObject(ctx context.Context, prm layer.PrmObjectDelete) err return nil } +// MaxObjectSize returns max single object size in bytes in NeoFS. +func (x *NeoFS) MaxObjectSize() int64 { + return x.cfg.MaxObjectSize +} + +// IsHomomorphicEnabled shows if homomorphic hashing is enabled in NeoFS. +func (x *NeoFS) IsHomomorphicEnabled() bool { + return x.cfg.IsHomomorphicEnabled +} + func isErrAccessDenied(err error) (string, bool) { unwrappedErr := errors.Unwrap(err) for unwrappedErr != nil { diff --git a/internal/neofs/tree.go b/internal/neofs/tree.go index 80d7ee49..39569568 100644 --- a/internal/neofs/tree.go +++ b/internal/neofs/tree.go @@ -17,6 +17,7 @@ import ( "github.com/nspcc-dev/neofs-s3-gw/internal/neofs/services/tree" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" + "golang.org/x/exp/slices" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -59,6 +60,9 @@ const ( partNumberKV = "Number" sizeKV = "Size" etagKV = "ETag" + multipartHashKV = "MultipartHashes" + homoHashKV = "HomoHash" + elementsKV = "Elements" // keys for lock. isLockKV = "IsLock" @@ -269,6 +273,12 @@ func newPartInfo(node NodeResponse) (*data.PartInfo, error) { return nil, fmt.Errorf("invalid server created timestamp: %w", err) } partInfo.ServerCreated = time.UnixMilli(utcMilli) + case multipartHashKV: + partInfo.MultipartHash = []byte(value) + case homoHashKV: + partInfo.HomoHash = []byte(value) + case elementsKV: + partInfo.Elements = strings.Split(value, ",") } } @@ -919,6 +929,9 @@ func (c *TreeClient) AddPart(ctx context.Context, bktInfo *data.BucketInfo, mult createdKV: strconv.FormatInt(info.Created.UTC().UnixMilli(), 10), serverCreatedKV: strconv.FormatInt(time.Now().UTC().UnixMilli(), 10), etagKV: info.ETag, + multipartHashKV: string(info.MultipartHash), + homoHashKV: string(info.HomoHash), + elementsKV: strings.Join(info.Elements, ","), } var foundPartID uint64 @@ -968,6 +981,36 @@ func (c *TreeClient) GetParts(ctx context.Context, bktInfo *data.BucketInfo, mul return result, nil } +func (c *TreeClient) GetLastPart(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) (*data.PartInfo, error) { + parts, err := c.GetParts(ctx, bktInfo, multipartNodeID) + if err != nil { + return nil, fmt.Errorf("getParts: %w", err) + } + + if len(parts) == 0 { + return nil, layer.ErrPartListIsEmpty + } + + // Sort parts by part number, then by server creation time to make actual last uploaded parts with the same number. + slices.SortFunc(parts, func(a, b *data.PartInfo) int { + if a.Number < b.Number { + return -1 + } + + if a.ServerCreated.Before(b.ServerCreated) { + return -1 + } + + if a.ServerCreated.Equal(b.ServerCreated) { + return 0 + } + + return 1 + }) + + return parts[len(parts)-1], nil +} + func (c *TreeClient) DeleteMultipartUpload(ctx context.Context, bktInfo *data.BucketInfo, multipartNodeID uint64) error { return c.removeNode(ctx, bktInfo, systemTree, multipartNodeID) }