From e20f436844210a18a11ba3db33caae245184e755 Mon Sep 17 00:00:00 2001 From: Pavel Karpy Date: Thu, 29 Feb 2024 23:02:15 +0300 Subject: [PATCH] *: Adopt new big objects split On reading: Support both split chain versions a new and an old one. On writing: Split objects only with the V2 version: 1. Link objects do not have child objects in their headers, payload is used instead 2. Link objects contain child objects' sizes along with the IDs 3. SplitID is not attached to the objects anymore 4. The First part's ID of a big object is attached to every split part (a link has it too, but obviously the first part itself does not have its ID in its split header) 5. Receiving link object triggers verification of its children: they should have the same first object and have the same payload size as the link declares 6. The First object part now has an original object's paylaod (without payload-dependent fields set). Closes #2667. Signed-off-by: Pavel Karpy --- CHANGELOG.md | 1 + cmd/neofs-node/object.go | 102 ++++++++--- pkg/core/object/fmt.go | 87 +++++++++- pkg/core/object/fmt_test.go | 57 +++++++ pkg/local_object_storage/metabase/VERSION.md | 4 + .../metabase/containers.go | 6 + pkg/local_object_storage/metabase/delete.go | 2 + pkg/local_object_storage/metabase/get.go | 16 ++ pkg/local_object_storage/metabase/put.go | 59 +++++-- pkg/local_object_storage/metabase/select.go | 3 + .../metabase/select_test.go | 11 ++ pkg/local_object_storage/metabase/util.go | 13 +- pkg/local_object_storage/util/splitinfo.go | 4 + pkg/services/object/acl/acl.go | 10 ++ pkg/services/object/acl/acl_test.go | 11 +- pkg/services/object/acl/eacl/v2/eacl_test.go | 99 +++++++++++ pkg/services/object/acl/eacl/v2/headers.go | 45 ++++- pkg/services/object/acl/eacl/v2/opts.go | 6 + pkg/services/object/acl/v2/service.go | 4 +- pkg/services/object/get/assemble.go | 8 + pkg/services/object/get/assembly_v2.go | 161 ++++++++++++++++++ pkg/services/object/get/assembly_v2_test.go | 87 ++++++++++ pkg/services/object/get/exec.go | 4 + pkg/services/object/get/get_test.go | 64 ++++--- pkg/services/object/put/service.go | 6 + pkg/services/object/split/verify.go | 95 +++++++++++ 26 files changed, 901 insertions(+), 64 deletions(-) create mode 100644 pkg/services/object/get/assembly_v2.go create mode 100644 pkg/services/object/get/assembly_v2_test.go create mode 100644 pkg/services/object/split/verify.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 77a9c8ec9d..9dfc0dcc50 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ Changelog for NeoFS Node - IR now checks format of NULL and numeric eACL filters specified in the protocol (#2742) - Empty filter value is now treated as `NOT_PRESENT` op by CLI `acl extended create` cmd (#2742) - Storage nodes no longer accept objects with header larger than 16KB (#2749) +- Big objects are split with the new split scheme (#2667) ### Removed - Object notifications incl. NATS (#2750) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index b141352acb..c2f13e39c2 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" + lru "github.com/hashicorp/golang-lru/v2" "github.com/nspcc-dev/neofs-api-go/v2/object" objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc" replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator" @@ -28,6 +29,7 @@ import ( putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2" searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search" searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2" + "github.com/nspcc-dev/neofs-node/pkg/services/object/split" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement" "github.com/nspcc-dev/neofs-node/pkg/services/policer" @@ -224,6 +226,26 @@ func initObjectService(c *cfg) { c.workers = append(c.workers, c.shared.policer) + sGet := getsvc.New( + getsvc.WithLogger(c.log), + getsvc.WithLocalStorageEngine(ls), + getsvc.WithClientConstructor(coreConstructor), + getsvc.WithTraverserGenerator( + traverseGen.WithTraverseOptions( + placement.SuccessAfter(1), + ), + ), + getsvc.WithNetMapSource(c.netMapSource), + getsvc.WithKeyStorage(keyStorage), + ) + + *c.cfgObject.getSvc = *sGet // need smth better + + sGetV2 := getsvcV2.NewService( + getsvcV2.WithInternalService(sGet), + getsvcV2.WithKeyStorage(keyStorage), + ) + sPut := putsvc.NewService( putsvc.WithKeyStorage(keyStorage), putsvc.WithClientConstructor(putConstructor), @@ -235,6 +257,7 @@ func initObjectService(c *cfg) { putsvc.WithNetworkState(c.cfgNetmap.state), putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal), putsvc.WithLogger(c.log), + putsvc.WithSplitChainVerifier(split.NewVerifier(sGet)), ) sPutV2 := putsvcV2.NewService( @@ -260,26 +283,6 @@ func initObjectService(c *cfg) { searchsvcV2.WithKeyStorage(keyStorage), ) - sGet := getsvc.New( - getsvc.WithLogger(c.log), - getsvc.WithLocalStorageEngine(ls), - getsvc.WithClientConstructor(coreConstructor), - getsvc.WithTraverserGenerator( - traverseGen.WithTraverseOptions( - placement.SuccessAfter(1), - ), - ), - getsvc.WithNetMapSource(c.netMapSource), - getsvc.WithKeyStorage(keyStorage), - ) - - *c.cfgObject.getSvc = *sGet // need smth better - - sGetV2 := getsvcV2.NewService( - getsvcV2.WithInternalService(sGet), - getsvcV2.WithKeyStorage(keyStorage), - ) - sDelete := deletesvc.New( deletesvc.WithLogger(c.log), deletesvc.WithHeadService(sGet), @@ -312,6 +315,13 @@ func initObjectService(c *cfg) { }, ) + // cachedFirstObjectsNumber is a total cached objects number; the V2 split scheme + // expects the first part of the chain to hold a user-defined header of the original + // object which should be treated as a header to use for the eACL rules check; so + // every object part in every chain will try to refer to the first part, so caching + // should help a lot here + const cachedFirstObjectsNumber = 1000 + aclSvc := v2.New( v2.WithLogger(c.log), v2.WithIRFetcher(newCachedIRFetcher(irFetcher)), @@ -325,7 +335,8 @@ func initObjectService(c *cfg) { SetNetmapState(c.cfgNetmap.state). SetEACLSource(c.cfgObject.eaclSource). SetValidator(eaclSDK.NewValidator()). - SetLocalStorage(ls), + SetLocalStorage(ls). + SetHeaderSource(cachedHeaderSource(sGet, cachedFirstObjectsNumber)), ), ), ) @@ -535,3 +546,52 @@ func (e storageEngine) Lock(locker oid.Address, toLock []oid.ID) error { func (e storageEngine) Put(o *objectSDK.Object) error { return engine.Put(e.engine, o) } + +func cachedHeaderSource(getSvc *getsvc.Service, cacheSize int) headerSource { + hs := headerSource{getsvc: getSvc} + + if cacheSize > 0 { + hs.cache, _ = lru.New[oid.Address, *objectSDK.Object](cacheSize) + } + + return hs +} + +type headerSource struct { + getsvc *getsvc.Service + cache *lru.Cache[oid.Address, *objectSDK.Object] +} + +type headerWriter struct { + h *objectSDK.Object +} + +func (h *headerWriter) WriteHeader(o *objectSDK.Object) error { + h.h = o + return nil +} + +func (h headerSource) Head(address oid.Address) (*objectSDK.Object, error) { + if h.cache != nil { + head, ok := h.cache.Get(address) + if ok { + return head, nil + } + } + + var hw headerWriter + + // no custom common prms since a caller is expected to be a container + // participant so no additional headers, access tokens, etc + var prm getsvc.HeadPrm + prm.SetHeaderWriter(&hw) + prm.WithAddress(address) + prm.WithRawFlag(true) + + err := h.getsvc.Head(context.Background(), prm) + if err != nil { + return nil, fmt.Errorf("reading header: %w", err) + } + + return hw.h, nil +} diff --git a/pkg/core/object/fmt.go b/pkg/core/object/fmt.go index c533e0c040..7e6ce736d5 100644 --- a/pkg/core/object/fmt.go +++ b/pkg/core/object/fmt.go @@ -1,6 +1,7 @@ package object import ( + "context" "errors" "fmt" "strconv" @@ -23,6 +24,7 @@ type FormatValidatorOption func(*cfg) type cfg struct { netState netmap.State e LockSource + sv SplitVerifier } // DeleteHandler is an interface of delete queue processor. @@ -49,6 +51,18 @@ type Locker interface { Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error } +// SplitVerifier represent split validation unit. It verifies V2 split +// chains based on the link object's payload (list of ID+ObjectSize pairs). +type SplitVerifier interface { + // VerifySplit must verify split hierarchy and return any error that did + // not allow processing the chain. Must break (if possible) any internal + // computations if context is done. The second and the third args are the + // first part's address used as a chain unique identifier that also must + // be checked. The fourth arg is guaranteed to be the full list from the + // link's payload without item order change. + VerifySplit(context.Context, cid.ID, oid.ID, []object.MeasuredObject) error +} + var errNilObject = errors.New("object is nil") var errNilID = errors.New("missing identifier") @@ -103,6 +117,41 @@ func (v *FormatValidator) Validate(obj *object.Object, unprepared bool) error { return err } + _, firstSet := obj.FirstID() + splitID := obj.SplitID() + par := obj.Parent() + + if obj.HasParent() { + if splitID != nil { + // V1 split + if firstSet { + return errors.New("v1 split: first ID object is set") + } + } else { + // V2 split + + if !firstSet { + // first part only + if obj.Parent() == nil { + return errors.New("v2 split: first object part does not have parent header") + } + } else { + // 2nd+ parts + + typ := obj.Type() + + // link object only + if typ == object.TypeLink && (par == nil || par.Signature() == nil) { + return errors.New("v2 split: incorrect link object's parent header") + } + + if _, hasPrevious := obj.PreviousID(); typ != object.TypeLink && !hasPrevious { + return errors.New("v2 split: middle part does not have previous object ID") + } + } + } + } + if err := v.checkAttributes(obj); err != nil { return fmt.Errorf("invalid attributes: %w", err) } @@ -121,9 +170,9 @@ func (v *FormatValidator) Validate(obj *object.Object, unprepared bool) error { } } - if obj = obj.Parent(); obj != nil { + if par != nil && (firstSet || splitID != nil) { // Parent object already exists. - return v.Validate(obj, false) + return v.Validate(par, false) } return nil @@ -161,6 +210,7 @@ func (v *FormatValidator) validateSignatureKey(obj *object.Object) error { // is one of: // - object.TypeTombstone; // - object.TypeStorageGroup; +// - object.TypeLink; // - object.TypeLock. type ContentMeta struct { typ object.Type @@ -191,6 +241,32 @@ func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error) switch o.Type() { case object.TypeRegular: // ignore regular objects, they do not need payload formatting + case object.TypeLink: + if len(o.Payload()) == 0 { + return ContentMeta{}, fmt.Errorf("(%T) empty payload in the link object", v) + } + + firstObjID, set := o.FirstID() + if !set { + return ContentMeta{}, errors.New("link object does not have first object ID") + } + + cnr, set := o.ContainerID() + if !set { + return ContentMeta{}, errors.New("link object does not have container ID") + } + + var testLink object.Link + + err := o.ReadLink(&testLink) + if err != nil { + return ContentMeta{}, fmt.Errorf("reading link object's payload: %w", err) + } + + err = v.sv.VerifySplit(context.Background(), cnr, firstObjID, testLink.Objects()) + if err != nil { + return ContentMeta{}, fmt.Errorf("link object's split chain verification: %w", err) + } case object.TypeTombstone: if len(o.Payload()) == 0 { return ContentMeta{}, fmt.Errorf("(%T) empty payload in tombstone", v) @@ -392,3 +468,10 @@ func WithLockSource(e LockSource) FormatValidatorOption { c.e = e } } + +// WithSplitVerifier returns option to set a SplitVerifier. +func WithSplitVerifier(sv SplitVerifier) FormatValidatorOption { + return func(c *cfg) { + c.sv = sv + } +} diff --git a/pkg/core/object/fmt_test.go b/pkg/core/object/fmt_test.go index 88116d3d1b..9e4c6d9043 100644 --- a/pkg/core/object/fmt_test.go +++ b/pkg/core/object/fmt_test.go @@ -1,10 +1,12 @@ package object import ( + "context" "strconv" "testing" "github.com/nspcc-dev/neo-go/pkg/crypto/keys" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa" @@ -344,3 +346,58 @@ func TestFormatValidator_Validate(t *testing.T) { }) }) } + +type testSplitVerifier struct { +} + +func (t *testSplitVerifier) VerifySplit(ctx context.Context, cID cid.ID, firstID oid.ID, children []object.MeasuredObject) error { + return nil +} + +func TestLinkObjectSplitV2(t *testing.T) { + verifier := new(testSplitVerifier) + + v := NewFormatValidator( + WithSplitVerifier(verifier), + ) + + ownerKey, err := keys.NewPrivateKey() + require.NoError(t, err) + + signer := user.NewAutoIDSigner(ownerKey.PrivateKey) + obj := blankValidObject(signer) + obj.SetParent(object.New()) + obj.SetSplitID(object.NewSplitID()) + obj.SetFirstID(oidtest.ID()) + + t.Run("V1 split, first is set", func(t *testing.T) { + require.ErrorContains(t, v.Validate(obj, true), "first ID object is set") + }) + + t.Run("V2 split", func(t *testing.T) { + obj.ResetRelations() + obj.SetParent(object.New()) + + t.Run("first object is not set", func(t *testing.T) { + require.ErrorContains(t, v.Validate(obj, true), "first object part does not have parent header") + }) + + t.Run("link object without finished parent", func(t *testing.T) { + obj.ResetRelations() + obj.SetParent(object.New()) + obj.SetFirstID(oidtest.ID()) + obj.SetType(object.TypeLink) + + require.ErrorContains(t, v.Validate(obj, true), "incorrect link object's parent header") + }) + + t.Run("middle child does not have previous object ID", func(t *testing.T) { + obj.ResetRelations() + obj.SetParent(object.New()) + obj.SetFirstID(oidtest.ID()) + obj.SetType(object.TypeRegular) + + require.ErrorContains(t, v.Validate(obj, true), "middle part does not have previous object ID") + }) + }) +} diff --git a/pkg/local_object_storage/metabase/VERSION.md b/pkg/local_object_storage/metabase/VERSION.md index 6750900ce3..971033a41e 100644 --- a/pkg/local_object_storage/metabase/VERSION.md +++ b/pkg/local_object_storage/metabase/VERSION.md @@ -57,6 +57,10 @@ Numbers stand for a single byte value. - Name: container ID + `9` - Key: object ID - Value: marshaled object +- Buckets containing object or LINK type + - Name: container ID + `18` + - Key: object ID + - Value: marshaled object - Buckets mapping objects to the storage ID they are stored in - Name: container ID + `10` - Key: object ID diff --git a/pkg/local_object_storage/metabase/containers.go b/pkg/local_object_storage/metabase/containers.go index b037e8b7a3..2c95b0cfa1 100644 --- a/pkg/local_object_storage/metabase/containers.go +++ b/pkg/local_object_storage/metabase/containers.go @@ -184,6 +184,12 @@ func (db *DB) DeleteContainer(cID cid.ID) error { return fmt.Errorf("root object's bucket cleanup: %w", err) } + // Link objects + err = tx.DeleteBucket(linkObjectsBucketName(cID, buff)) + if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) { + return fmt.Errorf("link objects' bucket cleanup: %w", err) + } + // indexes err = tx.DeleteBucket(ownerBucketName(cID, buff)) diff --git a/pkg/local_object_storage/metabase/delete.go b/pkg/local_object_storage/metabase/delete.go index 75a4a55ad9..fed46acab3 100644 --- a/pkg/local_object_storage/metabase/delete.go +++ b/pkg/local_object_storage/metabase/delete.go @@ -336,6 +336,8 @@ func delUniqueIndexes(tx *bbolt.Tx, obj *objectSDK.Object, isParent bool) error bucketName = storageGroupBucketName(cnr, bucketName) case objectSDK.TypeLock: bucketName = bucketNameLockers(cnr, bucketName) + case objectSDK.TypeLink: + bucketName = linkObjectsBucketName(cnr, bucketName) default: return ErrUnknownObjectType } diff --git a/pkg/local_object_storage/metabase/get.go b/pkg/local_object_storage/metabase/get.go index d41d971b9e..f1a8c25128 100644 --- a/pkg/local_object_storage/metabase/get.go +++ b/pkg/local_object_storage/metabase/get.go @@ -109,6 +109,12 @@ func (db *DB) get(tx *bbolt.Tx, addr oid.Address, key []byte, checkStatus, raw b return obj, obj.Unmarshal(data) } + // if not found then check in link objects index + data = getFromBucket(tx, linkObjectsBucketName(cnr, bucketName), key) + if len(data) != 0 { + return obj, obj.Unmarshal(data) + } + // if not found then check if object is a virtual return getVirtualObject(tx, cnr, key, raw) } @@ -246,6 +252,16 @@ func listContainerObjects(tx *bbolt.Tx, cID cid.ID, unique map[oid.ID]struct{}, return nil } + // link objects + bktInit := tx.Bucket(linkObjectsBucketName(cID, buff)) + err = expandObjectsFromBucket(bktInit, unique, limit) + if err != nil { + return fmt.Errorf("link objects iteration: %w", err) + } + if len(unique) >= limit { + return nil + } + bktSmall := tx.Bucket(smallBucketName(cID, buff)) err = expandObjectsFromBucket(bktSmall, unique, limit) if err != nil { diff --git a/pkg/local_object_storage/metabase/put.go b/pkg/local_object_storage/metabase/put.go index ad8999e31e..4148d46b12 100644 --- a/pkg/local_object_storage/metabase/put.go +++ b/pkg/local_object_storage/metabase/put.go @@ -112,15 +112,18 @@ func (db *DB) put( return nil } - if par := obj.Parent(); par != nil && !isParent { // limit depth by two - parentSI, err := splitInfoFromObject(obj) - if err != nil { - return err - } + par := obj.Parent() + if par != nil && !isParent { // limit depth by two + if _, set := par.ID(); set { // skip the first object without useful info + parentSI, err := splitInfoFromObject(obj) + if err != nil { + return err + } - err = db.put(tx, par, id, parentSI, currEpoch) - if err != nil { - return err + err = db.put(tx, par, id, parentSI, currEpoch) + if err != nil { + return err + } } } @@ -187,6 +190,8 @@ func putUniqueIndexes( bucketName = storageGroupBucketName(cnr, bucketName) case objectSDK.TypeLock: bucketName = bucketNameLockers(cnr, bucketName) + case objectSDK.TypeLink: + bucketName = linkObjectsBucketName(cnr, bucketName) default: return ErrUnknownObjectType } @@ -498,6 +503,10 @@ func splitInfoFromObject(obj *objectSDK.Object) (*objectSDK.SplitInfo, error) { info := objectSDK.NewSplitInfo() info.SetSplitID(obj.SplitID()) + if firstID, set := obj.FirstID(); set { + info.SetFirstPart(firstID) + } + switch { case isLinkObject(obj): id, ok := obj.ID() @@ -520,14 +529,38 @@ func splitInfoFromObject(obj *objectSDK.Object) (*objectSDK.SplitInfo, error) { return info, nil } -// isLinkObject returns true if object contains parent header and list -// of children. +// isLinkObject returns true if +// V1: object contains parent header and list +// +// of children +// +// V2: object is LINK typed. func isLinkObject(obj *objectSDK.Object) bool { + // V2 split + if obj.Type() == objectSDK.TypeLink { + return true + } + + // V1 split return len(obj.Children()) > 0 && obj.Parent() != nil } -// isLastObject returns true if object contains only parent header without list -// of children. +// isLastObject returns true if an object has parent and +// V1: object has children in the object's header +// V2: there is no split ID, object's type is LINK, and it has first part's ID. func isLastObject(obj *objectSDK.Object) bool { - return len(obj.Children()) == 0 && obj.Parent() != nil + par := obj.Parent() + if par == nil { + return false + } + + _, hasFirstObjID := obj.FirstID() + + // V2 split + if obj.SplitID() == nil && (obj.Type() != objectSDK.TypeLink && hasFirstObjID) { + return true + } + + // V1 split + return len(obj.Children()) == 0 } diff --git a/pkg/local_object_storage/metabase/select.go b/pkg/local_object_storage/metabase/select.go index c4e2c6aa12..00c74aa312 100644 --- a/pkg/local_object_storage/metabase/select.go +++ b/pkg/local_object_storage/metabase/select.go @@ -152,6 +152,7 @@ func (db *DB) selectAll(tx *bbolt.Tx, cnr cid.ID, to map[string]int) { selectAllFromBucket(tx, storageGroupBucketName(cnr, bucketName), to, 0) selectAllFromBucket(tx, parentBucketName(cnr, bucketName), to, 0) selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, 0) + selectAllFromBucket(tx, linkObjectsBucketName(cnr, bucketName), to, 0) } // selectAllFromBucket goes through all keys in bucket and adds them in a @@ -206,6 +207,7 @@ func (db *DB) selectFastFilter( selectAllFromBucket(tx, tombstoneBucketName(cnr, bucketName), to, fNum) selectAllFromBucket(tx, storageGroupBucketName(cnr, bucketName), to, fNum) selectAllFromBucket(tx, bucketNameLockers(cnr, bucketName), to, fNum) + selectAllFromBucket(tx, linkObjectsBucketName(cnr, bucketName), to, fNum) default: // user attribute bucketName := attributeBucketName(cnr, f.Header(), bucketName) @@ -222,6 +224,7 @@ var mBucketNaming = map[string][]func(cid.ID, []byte) []byte{ object.TypeTombstone.EncodeToString(): {tombstoneBucketName}, object.TypeStorageGroup.EncodeToString(): {storageGroupBucketName}, object.TypeLock.EncodeToString(): {bucketNameLockers}, + object.TypeLink.EncodeToString(): {linkObjectsBucketName}, } func allBucketNames(cnr cid.ID) (names [][]byte) { diff --git a/pkg/local_object_storage/metabase/select_test.go b/pkg/local_object_storage/metabase/select_test.go index 5327238be8..2e482e5c3d 100644 --- a/pkg/local_object_storage/metabase/select_test.go +++ b/pkg/local_object_storage/metabase/select_test.go @@ -550,6 +550,17 @@ func TestDB_SelectObjectID(t *testing.T) { fs = objectSDK.SearchFilters{} fs.AddObjectIDFilter(objectSDK.MatchStringNotEqual, id) + _, err = metaGet(db, object.AddressOf(regular), false) + require.NoError(t, err) + _, err = metaGet(db, object.AddressOf(parent), false) + require.NoError(t, err) + _, err = metaGet(db, object.AddressOf(sg), false) + require.NoError(t, err) + _, err = metaGet(db, object.AddressOf(ts), false) + require.NoError(t, err) + _, err = metaGet(db, object.AddressOf(lock), false) + require.NoError(t, err) + testSelect(t, db, cnr, fs, object.AddressOf(regular), object.AddressOf(parent), diff --git a/pkg/local_object_storage/metabase/util.go b/pkg/local_object_storage/metabase/util.go index ff77fe0020..4c0822dfab 100644 --- a/pkg/local_object_storage/metabase/util.go +++ b/pkg/local_object_storage/metabase/util.go @@ -120,6 +120,11 @@ const ( // Key: container ID // Value: dummy value garbageContainersPrefix + + // linkObjectsPrefix is used for prefixing buckets containing objects of LINK type. + // Key: object ID + // Value: marshaled object + linkObjectsPrefix ) const ( @@ -157,6 +162,11 @@ func smallBucketName(cnr cid.ID, key []byte) []byte { return bucketName(cnr, smallPrefix, key) } +// linkObjectsBucketName returns link objects bucket key (`17`). +func linkObjectsBucketName(cnr cid.ID, key []byte) []byte { + return bucketName(cnr, linkObjectsPrefix, key) +} + // attributeBucketName returns _attr_. func attributeBucketName(cnr cid.ID, attributeKey string, key []byte) []byte { key[0] = userAttributePrefix @@ -240,7 +250,7 @@ func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) object panic("empty object list in firstIrregularObjectType") } - var keys [3][1 + cidSize]byte + var keys [4][1 + cidSize]byte irregularTypeBuckets := [...]struct { typ object.Type @@ -249,6 +259,7 @@ func firstIrregularObjectType(tx *bbolt.Tx, idCnr cid.ID, objs ...[]byte) object {object.TypeTombstone, tombstoneBucketName(idCnr, keys[0][:])}, {object.TypeStorageGroup, storageGroupBucketName(idCnr, keys[1][:])}, {object.TypeLock, bucketNameLockers(idCnr, keys[2][:])}, + {object.TypeLink, linkObjectsBucketName(idCnr, keys[3][:])}, } for i := range objs { diff --git a/pkg/local_object_storage/util/splitinfo.go b/pkg/local_object_storage/util/splitinfo.go index fcb97af1ef..5311cf2950 100644 --- a/pkg/local_object_storage/util/splitinfo.go +++ b/pkg/local_object_storage/util/splitinfo.go @@ -17,5 +17,9 @@ func MergeSplitInfo(from, to *object.SplitInfo) *object.SplitInfo { to.SetLink(link) } + if init, ok := from.FirstPart(); ok { + to.SetFirstPart(init) + } + return to } diff --git a/pkg/services/object/acl/acl.go b/pkg/services/object/acl/acl.go index 5360b06228..423f7360ac 100644 --- a/pkg/services/object/acl/acl.go +++ b/pkg/services/object/acl/acl.go @@ -25,6 +25,7 @@ type CheckerPrm struct { validator *eaclSDK.Validator localStorage *engine.StorageEngine state netmap.State + headerSource eaclV2.HeaderSource } func (c *CheckerPrm) SetEACLSource(v container.EACLSource) *CheckerPrm { @@ -47,6 +48,11 @@ func (c *CheckerPrm) SetNetmapState(v netmap.State) *CheckerPrm { return c } +func (c *CheckerPrm) SetHeaderSource(hs eaclV2.HeaderSource) *CheckerPrm { + c.headerSource = hs + return c +} + // Checker implements v2.ACLChecker interfaces and provides // ACL/eACL validation functionality. type Checker struct { @@ -54,6 +60,7 @@ type Checker struct { validator *eaclSDK.Validator localStorage *engine.StorageEngine state netmap.State + headerSource eaclV2.HeaderSource } // Various EACL check errors. @@ -79,12 +86,14 @@ func NewChecker(prm *CheckerPrm) *Checker { panicOnNil("EACLValidator", prm.validator) panicOnNil("LocalStorageEngine", prm.localStorage) panicOnNil("NetmapState", prm.state) + panicOnNil("HeaderSource", prm.headerSource) return &Checker{ eaclSrc: prm.eaclSrc, validator: prm.validator, localStorage: prm.localStorage, state: prm.state, + headerSource: prm.headerSource, } } @@ -154,6 +163,7 @@ func (c *Checker) CheckEACL(msg any, reqInfo v2.RequestInfo) error { eaclV2.WithLocalObjectStorage(c.localStorage), eaclV2.WithCID(cnr), eaclV2.WithOID(reqInfo.ObjectID()), + eaclV2.WithHeaderSource(c.headerSource), ) if req, ok := msg.(eaclV2.Request); ok { diff --git a/pkg/services/object/acl/acl_test.go b/pkg/services/object/acl/acl_test.go index 0052af2eb1..3062f9acff 100644 --- a/pkg/services/object/acl/acl_test.go +++ b/pkg/services/object/acl/acl_test.go @@ -9,6 +9,8 @@ import ( "github.com/nspcc-dev/neofs-sdk-go/container/acl" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" usertest "github.com/nspcc-dev/neofs-sdk-go/user/test" "github.com/stretchr/testify/require" @@ -22,6 +24,12 @@ func (e emptyEACLSource) GetEACL(_ cid.ID) (*container.EACL, error) { type emptyNetmapState struct{} +type emptyHeaderSource struct{} + +func (e emptyHeaderSource) Head(address oid.Address) (*object.Object, error) { + return nil, nil +} + func (e emptyNetmapState) CurrentEpoch() uint64 { return 0 } @@ -31,7 +39,8 @@ func TestStickyCheck(t *testing.T) { SetLocalStorage(&engine.StorageEngine{}). SetValidator(eaclSDK.NewValidator()). SetEACLSource(emptyEACLSource{}). - SetNetmapState(emptyNetmapState{}), + SetNetmapState(emptyNetmapState{}). + SetHeaderSource(emptyHeaderSource{}), ) t.Run("system role", func(t *testing.T) { diff --git a/pkg/services/object/acl/eacl/v2/eacl_test.go b/pkg/services/object/acl/eacl/v2/eacl_test.go index 1105b3bdc2..1b419b196d 100644 --- a/pkg/services/object/acl/eacl/v2/eacl_test.go +++ b/pkg/services/object/acl/eacl/v2/eacl_test.go @@ -9,10 +9,12 @@ import ( objectV2 "github.com/nspcc-dev/neofs-api-go/v2/object" "github.com/nspcc-dev/neofs-api-go/v2/refs" "github.com/nspcc-dev/neofs-api-go/v2/session" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" + objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" "github.com/stretchr/testify/require" ) @@ -26,6 +28,14 @@ type testLocalStorage struct { err error } +type testHeaderSource struct { + header object.Object +} + +func (t *testHeaderSource) Head(_ oid.Address) (*object.Object, error) { + return &t.header, nil +} + func (s *testLocalStorage) Head(addr oid.Address) (*object.Object, error) { require.True(s.t, addr.Container().Equals(s.expAddr.Container())) require.True(s.t, addr.Object().Equals(s.expAddr.Object())) @@ -163,3 +173,92 @@ func checkDefaultAction(t *testing.T, v *eaclSDK.Validator, u *eaclSDK.Validatio require.False(t, fromRule) require.Equal(t, eaclSDK.ActionAllow, actual) } + +func TestV2Split(t *testing.T) { + attrKey := "allow" + attrVal := "me" + + var restrictedAttr object.Attribute + restrictedAttr.SetKey(attrKey) + restrictedAttr.SetValue(attrVal) + + originalObject := objecttest.Object(t) + originalObject.SetAttributes(restrictedAttr) + originalObject.SetID(oid.ID{}) // no object ID for an original object in the first object + originalObject.SetSignature(&neofscrypto.Signature{}) + + firstObject := objecttest.Object(t) + firstObject.SetSplitID(nil) // not V1 split + firstObject.SetParent(&originalObject) + require.NoError(t, firstObject.CalculateAndSetID()) + + var firstIDV2 refs.ObjectID + firstID, _ := firstObject.ID() + firstID.WriteToV2(&firstIDV2) + + splitV2 := new(objectV2.SplitHeader) + splitV2.SetFirst(&firstIDV2) + headerV2 := new(objectV2.Header) + headerV2.SetSplit(splitV2) + + objPart := new(objectV2.PutObjectPartInit) + objPart.SetHeader(headerV2) + + body := new(objectV2.PutRequestBody) + body.SetObjectPart(objPart) + + meta := new(session.RequestMetaHeader) + + req := new(objectV2.PutRequest) + req.SetMetaHeader(meta) + req.SetBody(body) + + priv, err := keys.NewPrivateKey() + require.NoError(t, err) + senderKey := priv.PublicKey() + + r := eaclSDK.NewRecord() + r.SetOperation(eaclSDK.OperationPut) + r.SetAction(eaclSDK.ActionDeny) + r.AddFilter(eaclSDK.HeaderFromObject, eaclSDK.MatchStringEqual, attrKey, attrVal) + eaclSDK.AddFormedTarget(r, eaclSDK.RoleUnknown, (ecdsa.PublicKey)(*senderKey)) + + table := new(eaclSDK.Table) + table.AddRecord(r) + + hdrSrc := &testHeaderSource{header: firstObject} + + newSource := func(t *testing.T) eaclSDK.TypedHeaderSource { + hdrSrc, err := NewMessageHeaderSource( + WithHeaderSource(hdrSrc), + WithServiceRequest(req), + ) + require.NoError(t, err) + return hdrSrc + } + + unit := new(eaclSDK.ValidationUnit). + WithOperation(eaclSDK.OperationPut). + WithEACLTable(table). + WithSenderKey(senderKey.Bytes()) + + validator := eaclSDK.NewValidator() + + t.Run("denied by parent's attribute", func(t *testing.T) { + checkAction(t, eaclSDK.ActionDeny, validator, unit.WithHeaderSource(newSource(t))) + }) + + t.Run("allow cause no restricted attribute found", func(t *testing.T) { + originalObjectNoRestrictedAttr := objecttest.Object(t) + originalObject.SetID(oid.ID{}) // no object ID for an original object in the first object + originalObject.SetSignature(&neofscrypto.Signature{}) + + firstObject.SetParent(&originalObjectNoRestrictedAttr) + require.NoError(t, firstObject.CalculateAndSetID()) + + hdrSrc.header = firstObject + + // allow an object whose first obj does not have the restricted attribute + checkDefaultAction(t, validator, unit.WithHeaderSource(newSource(t))) + }) +} diff --git a/pkg/services/object/acl/eacl/v2/headers.go b/pkg/services/object/acl/eacl/v2/headers.go index 54d3569ac5..672c830309 100644 --- a/pkg/services/object/acl/eacl/v2/headers.go +++ b/pkg/services/object/acl/eacl/v2/headers.go @@ -18,7 +18,8 @@ import ( type Option func(*cfg) type cfg struct { - storage ObjectStorage + storage ObjectStorage + headerSource HeaderSource msg xHeaderSource @@ -30,6 +31,12 @@ type ObjectStorage interface { Head(oid.Address) (*object.Object, error) } +// HeaderSource represents a source of the object headers. +type HeaderSource interface { + // Head returns object (may be with or be without payload) by its address. + Head(oid.Address) (*object.Object, error) +} + type Request interface { GetMetaHeader() *session.RequestMetaHeader } @@ -129,11 +136,39 @@ func (h *cfg) readObjectHeaders(dst *headerSource) error { dst.objectHeaders = addressHeaders(h.cnr, h.obj) case *objectV2.PutRequest: if v, ok := req.GetBody().GetObjectPart().(*objectV2.PutObjectPartInit); ok { - oV2 := new(objectV2.Object) - oV2.SetObjectID(v.GetObjectID()) - oV2.SetHeader(v.GetHeader()) + if v.GetHeader().GetSplit().GetSplitID() != nil { + // V1 split scheme, only the received object's header + // can be checked + oV2 := new(objectV2.Object) + oV2.SetObjectID(v.GetObjectID()) + oV2.SetHeader(v.GetHeader()) - dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj) + dst.objectHeaders = headersFromObject(object.NewFromV2(oV2), h.cnr, h.obj) + + break + } + + if first := v.GetHeader().GetSplit().GetFirst(); first != nil { + // that is an object part from the V2 split scheme, check + // the original object header instead + + var firstID oid.ID + err := firstID.ReadFromV2(*first) + if err != nil { + return fmt.Errorf("converting first object ID: %w", err) + } + + var addr oid.Address + addr.SetObject(firstID) + addr.SetContainer(h.cnr) + + firstObject, err := h.headerSource.Head(addr) + if err != nil { + return fmt.Errorf("fetching first object header: %w", err) + } + + dst.objectHeaders = headersFromObject(firstObject.Parent(), h.cnr, h.obj) + } } case *objectV2.SearchRequest: cnrV2 := req.GetBody().GetContainerID() diff --git a/pkg/services/object/acl/eacl/v2/opts.go b/pkg/services/object/acl/eacl/v2/opts.go index 4a653757fe..5e6c286f04 100644 --- a/pkg/services/object/acl/eacl/v2/opts.go +++ b/pkg/services/object/acl/eacl/v2/opts.go @@ -20,6 +20,12 @@ func WithLocalObjectStorage(v *engine.StorageEngine) Option { } } +func WithHeaderSource(hs HeaderSource) Option { + return func(c *cfg) { + c.headerSource = hs + } +} + func WithServiceRequest(v Request) Option { return func(c *cfg) { c.msg = requestXHeaderSource{ diff --git a/pkg/services/object/acl/v2/service.go b/pkg/services/object/acl/v2/service.go index 33b3ee1c61..2ed3ef880c 100644 --- a/pkg/services/object/acl/v2/service.go +++ b/pkg/services/object/acl/v2/service.go @@ -456,7 +456,9 @@ func (p putStreamBasicChecker) Send(request *objectV2.PutRequest) error { return err } - idV2 := part.GetHeader().GetOwnerID() + header := part.GetHeader() + + idV2 := header.GetOwnerID() if idV2 == nil { return errors.New("missing object owner") } diff --git a/pkg/services/object/get/assemble.go b/pkg/services/object/get/assemble.go index b627846d94..2a9f155968 100644 --- a/pkg/services/object/get/assemble.go +++ b/pkg/services/object/get/assemble.go @@ -44,6 +44,14 @@ func (exec *execCtx) assemble() { } } + if splitInfo.SplitID() == nil { + exec.log.Debug("handling V2 split") + exec.processV2Split(splitInfo) + return + } + + exec.log.Debug("handling V1 split") + prev, children := exec.initFromChild(childID) if len(children) > 0 { diff --git a/pkg/services/object/get/assembly_v2.go b/pkg/services/object/get/assembly_v2.go new file mode 100644 index 0000000000..c3ba5fdd35 --- /dev/null +++ b/pkg/services/object/get/assembly_v2.go @@ -0,0 +1,161 @@ +package getsvc + +import ( + "errors" + + objectSDK "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "go.uber.org/zap" +) + +func (exec *execCtx) processV2Split(si *objectSDK.SplitInfo) { + if _, set := si.FirstPart(); !set { + exec.log.Debug("no first ID found in V2 split") + exec.err = errors.New("v2 split without first object ID") + + return + } + + linkID, set := si.Link() + if set && exec.processV2Link(linkID) { + return + } + + // fallback to the full chain assembly from the last part + prev, set := si.LastPart() + if set { + exec.processV2Last(prev) + } +} + +func (exec *execCtx) processV2Last(lastID oid.ID) { + lastObj, ok := exec.getChild(lastID, nil, true) + if !ok { + exec.log.Debug("failed to read last object") + return + } + + exec.collectedObject = lastObj.Parent() + + // copied from V1, and it has the same problems as V1; + // see it for comments and optimization suggestions + if ok := exec.writeCollectedHeader(); ok { + if ok := exec.overtakePayloadInReverse(lastID); ok { + exec.writeObjectPayload(exec.collectedObject) + } + } +} + +func (exec *execCtx) processV2Link(linkID oid.ID) bool { + linkObj, ok := exec.getChild(linkID, nil, true) + if !ok { + exec.log.Debug("failed to read link object") + return false + } + + exec.collectedObject = linkObj.Parent() + + var link objectSDK.Link + err := linkObj.ReadLink(&link) + if err != nil { + exec.log.Debug("failed to parse link object", zap.Error(err)) + return false + } + + if exec.ctxRange() == nil { + // GET case + + if exec.writeCollectedHeader() { + exec.overtakePayloadDirectly(measuredObjsToIDs(link.Objects()), nil, true) + return true + } + + exec.log.Debug("failed to write parent header") + + return false + } + + // RANGE case + return exec.rangeFromLink(link) +} + +func (exec *execCtx) rangeFromLink(link objectSDK.Link) bool { + children := link.Objects() + first, firstOffset, last, lastBound := requiredChildren(exec.ctxRange(), children) + + for i := first; i <= last; i++ { + child := children[i] + + var rngPerChild *objectSDK.Range + if i == first || i == last { + rngPerChild = new(objectSDK.Range) + + if i == first { + rngPerChild.SetOffset(uint64(firstOffset)) + } + if i == last { + rngPerChild.SetLength(uint64(child.ObjectSize()) - uint64(lastBound+firstOffset)) + } + } + + part, ok := exec.getChild(child.ObjectID(), rngPerChild, false) + if !ok { + return false + } + + if exec.writeObjectPayload(part) { + return false + } + } + + exec.status = statusOK + exec.err = nil + + return true +} + +// it is required for ranges to be in the bounds of the all objects' payload; +// it must be checked on higher levels; returns (firstObject, firstObjectOffset, +// lastObject, lastObjectRightBound). +func requiredChildren(rng *objectSDK.Range, children []objectSDK.MeasuredObject) (int, int, int, int) { + var firstChildIndex = -1 + var firstChildOffset int + var lastChildIndex int + var lastChildRightBound int + + leftBound := rng.GetOffset() + rightBound := leftBound + rng.GetLength() + + var bytesSeen uint64 + + for i, child := range children { + size := uint64(child.ObjectSize()) + bytesSeen += size + + if bytesSeen < leftBound { + continue + } + + if firstChildIndex == -1 { + firstChildIndex = i + firstChildOffset = int(size - (bytesSeen - leftBound)) + } + + if rightBound <= bytesSeen { + lastChildIndex = i + lastChildRightBound = int(size - (bytesSeen - rightBound)) + break + } + } + + return firstChildIndex, firstChildOffset, lastChildIndex, lastChildRightBound +} + +func measuredObjsToIDs(mm []objectSDK.MeasuredObject) []oid.ID { + res := make([]oid.ID, 0, len(mm)) + for i := range mm { + res = append(res, mm[i].ObjectID()) + } + + return res +} diff --git a/pkg/services/object/get/assembly_v2_test.go b/pkg/services/object/get/assembly_v2_test.go new file mode 100644 index 0000000000..e5a0e6f07b --- /dev/null +++ b/pkg/services/object/get/assembly_v2_test.go @@ -0,0 +1,87 @@ +package getsvc + +import ( + "testing" + + "github.com/nspcc-dev/neofs-sdk-go/object" + "github.com/stretchr/testify/require" +) + +func Test_RequiredChildren(t *testing.T) { + cases := []struct { + name string + childPayloads [][]byte + rngFrom uint64 + rngLength uint64 + expectRes []byte + }{ + { + name: "normal, same length", + childPayloads: [][]byte{{0, 1, 2}, {3, 4, 3}, {2, 1, 0}}, + rngFrom: 4, + rngLength: 3, + expectRes: []byte{4, 3, 2}, + }, + { + name: "normal, same length, range equals obj's bounds", + childPayloads: [][]byte{{0, 1, 2}, {3, 4, 3}, {2, 1, 0}}, + rngFrom: 3, + rngLength: 3, + expectRes: []byte{3, 4, 3}, + }, + { + name: "strange split, different length", + childPayloads: [][]byte{{0, 1, 2, 4, 5, 6}, {5}, {4, 3, 2}, {1, 0}}, + rngFrom: 4, + rngLength: 4, + expectRes: []byte{5, 6, 5, 4}, + }, + { + name: "strange split, obj with empty payload", + childPayloads: [][]byte{{0, 1, 2}, {}, {}, {1, 0}}, + rngFrom: 2, + rngLength: 2, + expectRes: []byte{2, 1}, + }, + } + + for _, test := range cases { + t.Run(test.name, func(t *testing.T) { + payloads := test.childPayloads + + children := make([]object.MeasuredObject, 0, len(payloads)) + for _, payload := range payloads { + var child object.MeasuredObject + child.SetObjectSize(uint32(len(payload))) + + children = append(children, child) + } + + var rng object.Range + rng.SetOffset(test.rngFrom) + rng.SetLength(test.rngLength) + + firstChild, firstChildOffset, lastChild, lastChildBound := requiredChildren(&rng, children) + + // collect payload + var res []byte + for i := firstChild; i <= lastChild; i++ { + var leftBound int + var rightBound = int(children[i].ObjectSize()) + + if i == firstChild { + leftBound = firstChildOffset + } + + if i == lastChild { + rightBound = lastChildBound + } + + payloadFromChild := payloads[i][leftBound:rightBound] + res = append(res, payloadFromChild...) + } + + require.Equal(t, test.expectRes, res) + }) + } +} diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index cb55a5fbbd..ad00474628 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -264,6 +264,10 @@ func mergeSplitInfo(dst, src *objectSDK.SplitInfo) { dst.SetLink(link) } + if first, ok := src.FirstPart(); ok { + dst.SetFirstPart(first) + } + if splitID := src.SplitID(); splitID != nil { dst.SetSplitID(splitID) } diff --git a/pkg/services/object/get/get_test.go b/pkg/services/object/get/get_test.go index baf5b61ab2..16f7fcec29 100644 --- a/pkg/services/object/get/get_test.go +++ b/pkg/services/object/get/get_test.go @@ -193,6 +193,11 @@ func (s *testStorage) inhume(addr oid.Address) { s.inhumed[addr.EncodeToString()] = struct{}{} } +const ( + splitV1 = iota + splitV2 +) + func generateObject(addr oid.Address, prev *oid.ID, payload []byte, children ...oid.ID) *objectSDK.Object { obj := objectSDK.New() obj.SetContainerID(addr.Container()) @@ -359,38 +364,51 @@ func TestGetLocalOnly(t *testing.T) { p := newPrm(true, nil) - addr := oidtest.Address() + testSplit := func(addr oid.Address, si *objectSDK.SplitInfo) { + p.WithAddress(addr) - splitInfo := objectSDK.NewSplitInfo() - splitInfo.SetSplitID(objectSDK.NewSplitID()) - splitInfo.SetLink(oidtest.ID()) - splitInfo.SetLastPart(oidtest.ID()) + storage.addVirtual(addr, si) - p.WithAddress(addr) + err := svc.Get(ctx, p) - storage.addVirtual(addr, splitInfo) + errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo()) - err := svc.Get(ctx, p) + require.True(t, errors.As(err, &errSplit)) - errSplit := objectSDK.NewSplitInfoError(objectSDK.NewSplitInfo()) + require.Equal(t, si, errSplit.SplitInfo()) - require.True(t, errors.As(err, &errSplit)) + rngPrm := newRngPrm(true, nil, 0, 0) + rngPrm.WithAddress(addr) - require.Equal(t, splitInfo, errSplit.SplitInfo()) + err = svc.Get(ctx, p) - rngPrm := newRngPrm(true, nil, 0, 0) - rngPrm.WithAddress(addr) + require.True(t, errors.As(err, &errSplit)) - err = svc.Get(ctx, p) + headPrm := newHeadPrm(true, nil) + headPrm.WithAddress(addr) - require.True(t, errors.As(err, &errSplit)) + err = svc.Head(ctx, headPrm) + require.True(t, errors.As(err, &errSplit)) + require.Equal(t, si, errSplit.SplitInfo()) + } - headPrm := newHeadPrm(true, nil) - headPrm.WithAddress(addr) + t.Run("V1 split", func(t *testing.T) { + splitInfo := objectSDK.NewSplitInfo() + splitInfo.SetSplitID(objectSDK.NewSplitID()) + splitInfo.SetLink(oidtest.ID()) + splitInfo.SetLastPart(oidtest.ID()) - err = svc.Head(ctx, headPrm) - require.True(t, errors.As(err, &errSplit)) - require.Equal(t, splitInfo, errSplit.SplitInfo()) + testSplit(oidtest.Address(), splitInfo) + }) + + t.Run("V2 split", func(t *testing.T) { + splitInfo := objectSDK.NewSplitInfo() + splitInfo.SetLink(oidtest.ID()) + splitInfo.SetLastPart(oidtest.ID()) + splitInfo.SetFirstPart(oidtest.ID()) + + testSplit(oidtest.Address(), splitInfo) + }) }) } @@ -698,6 +716,7 @@ func TestGetRemoteSmall(t *testing.T) { splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLink(oidtest.ID()) + splitInfo.SetSplitID(objectSDK.NewSplitID()) var splitAddr oid.Address splitAddr.SetContainer(idCnr) @@ -753,6 +772,7 @@ func TestGetRemoteSmall(t *testing.T) { splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLink(oidtest.ID()) + splitInfo.SetSplitID(objectSDK.NewSplitID()) children, childIDs, _ := generateChain(2, idCnr) @@ -827,6 +847,7 @@ func TestGetRemoteSmall(t *testing.T) { splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLink(oidtest.ID()) + splitInfo.SetSplitID(objectSDK.NewSplitID()) children, childIDs, payload := generateChain(2, idCnr) srcObj.SetPayload(payload) @@ -914,6 +935,7 @@ func TestGetRemoteSmall(t *testing.T) { splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLastPart(oidtest.ID()) + splitInfo.SetSplitID(objectSDK.NewSplitID()) var splitAddr oid.Address splitAddr.SetContainer(idCnr) @@ -969,6 +991,7 @@ func TestGetRemoteSmall(t *testing.T) { splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLastPart(oidtest.ID()) + splitInfo.SetSplitID(objectSDK.NewSplitID()) children, _, _ := generateChain(2, idCnr) @@ -1036,6 +1059,7 @@ func TestGetRemoteSmall(t *testing.T) { splitInfo := objectSDK.NewSplitInfo() splitInfo.SetLastPart(oidtest.ID()) + splitInfo.SetSplitID(objectSDK.NewSplitID()) children, _, payload := generateChain(2, idCnr) srcObj.SetPayloadSize(uint64(len(payload))) diff --git a/pkg/services/object/put/service.go b/pkg/services/object/put/service.go index 48067f6524..f6d14ccef1 100644 --- a/pkg/services/object/put/service.go +++ b/pkg/services/object/put/service.go @@ -136,6 +136,12 @@ func WithNetworkState(v netmap.State) Option { } } +func WithSplitChainVerifier(sv object.SplitVerifier) Option { + return func(c *cfg) { + c.fmtValidatorOpts = append(c.fmtValidatorOpts, object.WithSplitVerifier(sv)) + } +} + func WithClientConstructor(v ClientConstructor) Option { return func(c *cfg) { c.clientConstructor = v diff --git a/pkg/services/object/split/verify.go b/pkg/services/object/split/verify.go new file mode 100644 index 0000000000..28292b4408 --- /dev/null +++ b/pkg/services/object/split/verify.go @@ -0,0 +1,95 @@ +package split + +import ( + "context" + "fmt" + + getsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/get" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "golang.org/x/sync/errgroup" +) + +// NewVerifier returns Verifier that ready to Verifier.VerifySplit. +// Get service must be non-nil, otherwise stable work is not +// guaranteed. +func NewVerifier(get *getsvc.Service) *Verifier { + return &Verifier{ + get: get, + } +} + +// Verifier implements [object.SplitVerifier] interface. +type Verifier struct { + get *getsvc.Service +} + +// VerifySplit verifies split chain. +func (v *Verifier) VerifySplit(ctx context.Context, cnr cid.ID, firstID oid.ID, childrenFromLink []object.MeasuredObject) error { + // can be limited, depends on the expected big objects payload length + var wg errgroup.Group + + for i := range childrenFromLink { + iCopy := i + wg.Go(func() error { + var shouldHaveFirstObject *oid.ID + if iCopy != 0 { + shouldHaveFirstObject = &firstID + } + + return v.verifySinglePart(ctx, cnr, shouldHaveFirstObject, childrenFromLink[iCopy]) + }) + } + + return wg.Wait() +} + +type headerWriter struct { + h *object.Object +} + +func (w *headerWriter) WriteHeader(o *object.Object) error { + w.h = o + return nil +} + +func (v *Verifier) verifySinglePart(ctx context.Context, cnr cid.ID, firstID *oid.ID, objToCheck object.MeasuredObject) error { + var childAddr oid.Address + childAddr.SetContainer(cnr) + childAddr.SetObject(objToCheck.ObjectID()) + + var hw headerWriter + + // no custom common prms since a caller is expected to be a container + // participant so no additional headers, access tokens, etc + var prm getsvc.HeadPrm + prm.SetHeaderWriter(&hw) + prm.WithAddress(childAddr) + prm.WithRawFlag(true) + + err := v.get.Head(ctx, prm) + if err != nil { + return fmt.Errorf("reading %s header: %w", childAddr, err) + } + + if firstID != nil { + idRead, has := hw.h.FirstID() + if !has { + return readObjectErr(childAddr, "object that does not have first object's ID") + } + if idRead != *firstID { + return readObjectErr(childAddr, fmt.Sprintf("its first object is unknown: got: %s, want: %s", idRead, firstID)) + } + } + + if sizeRead := uint32(hw.h.PayloadSize()); sizeRead != objToCheck.ObjectSize() { + return readObjectErr(childAddr, fmt.Sprintf("its size differs: got: %d, want: %d", sizeRead, objToCheck.ObjectSize())) + } + + return nil +} + +func readObjectErr(a oid.Address, text string) error { + return fmt.Errorf("read %s object: %s", a, text) +}