Skip to content

Commit

Permalink
*: Adopt new big objects split
Browse files Browse the repository at this point in the history
On reading:
Support both split chain versions a new and an old one.
On writing:
Split objects only with the V2 version:
 1. Link objects do not have child objects in their headers, payload is used
 instead
 2. Link objects contain child objects' sizes along with the IDs
 3. SplitID is not attached to the objects anymore
 4. The First part's ID of a big object is attached to every split part (a link
 has it too, but obviously the first part itself does not have its ID in
 its split header)
 5. Receiving link object triggers verification of its children: they should
 have the same first object and have the same payload size as the link
 declares
 6. The First object part now has an original object's paylaod (without
 payload-dependent fields set).

 Closes #2667.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
  • Loading branch information
carpawell committed Feb 29, 2024
1 parent f87a375 commit e20f436
Show file tree
Hide file tree
Showing 26 changed files with 901 additions and 64 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Changelog for NeoFS Node
- IR now checks format of NULL and numeric eACL filters specified in the protocol (#2742)
- Empty filter value is now treated as `NOT_PRESENT` op by CLI `acl extended create` cmd (#2742)
- Storage nodes no longer accept objects with header larger than 16KB (#2749)
- Big objects are split with the new split scheme (#2667)

### Removed
- Object notifications incl. NATS (#2750)
Expand Down
102 changes: 81 additions & 21 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"

lru "github.com/hashicorp/golang-lru/v2"
"github.com/nspcc-dev/neofs-api-go/v2/object"
objectGRPC "github.com/nspcc-dev/neofs-api-go/v2/object/grpc"
replicatorconfig "github.com/nspcc-dev/neofs-node/cmd/neofs-node/config/replicator"
Expand All @@ -28,6 +29,7 @@ import (
putsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/put/v2"
searchsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/search"
searchsvcV2 "github.com/nspcc-dev/neofs-node/pkg/services/object/search/v2"
"github.com/nspcc-dev/neofs-node/pkg/services/object/split"
"github.com/nspcc-dev/neofs-node/pkg/services/object/util"
"github.com/nspcc-dev/neofs-node/pkg/services/object_manager/placement"
"github.com/nspcc-dev/neofs-node/pkg/services/policer"
Expand Down Expand Up @@ -224,6 +226,26 @@ func initObjectService(c *cfg) {

c.workers = append(c.workers, c.shared.policer)

sGet := getsvc.New(
getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientConstructor(coreConstructor),
getsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
),
),
getsvc.WithNetMapSource(c.netMapSource),
getsvc.WithKeyStorage(keyStorage),
)

*c.cfgObject.getSvc = *sGet // need smth better

sGetV2 := getsvcV2.NewService(
getsvcV2.WithInternalService(sGet),
getsvcV2.WithKeyStorage(keyStorage),
)

sPut := putsvc.NewService(
putsvc.WithKeyStorage(keyStorage),
putsvc.WithClientConstructor(putConstructor),
Expand All @@ -235,6 +257,7 @@ func initObjectService(c *cfg) {
putsvc.WithNetworkState(c.cfgNetmap.state),
putsvc.WithWorkerPools(c.cfgObject.pool.putRemote, c.cfgObject.pool.putLocal),
putsvc.WithLogger(c.log),
putsvc.WithSplitChainVerifier(split.NewVerifier(sGet)),
)

sPutV2 := putsvcV2.NewService(
Expand All @@ -260,26 +283,6 @@ func initObjectService(c *cfg) {
searchsvcV2.WithKeyStorage(keyStorage),
)

sGet := getsvc.New(
getsvc.WithLogger(c.log),
getsvc.WithLocalStorageEngine(ls),
getsvc.WithClientConstructor(coreConstructor),
getsvc.WithTraverserGenerator(
traverseGen.WithTraverseOptions(
placement.SuccessAfter(1),
),
),
getsvc.WithNetMapSource(c.netMapSource),
getsvc.WithKeyStorage(keyStorage),
)

*c.cfgObject.getSvc = *sGet // need smth better

sGetV2 := getsvcV2.NewService(
getsvcV2.WithInternalService(sGet),
getsvcV2.WithKeyStorage(keyStorage),
)

sDelete := deletesvc.New(
deletesvc.WithLogger(c.log),
deletesvc.WithHeadService(sGet),
Expand Down Expand Up @@ -312,6 +315,13 @@ func initObjectService(c *cfg) {
},
)

// cachedFirstObjectsNumber is a total cached objects number; the V2 split scheme
// expects the first part of the chain to hold a user-defined header of the original
// object which should be treated as a header to use for the eACL rules check; so
// every object part in every chain will try to refer to the first part, so caching
// should help a lot here
const cachedFirstObjectsNumber = 1000

aclSvc := v2.New(
v2.WithLogger(c.log),
v2.WithIRFetcher(newCachedIRFetcher(irFetcher)),
Expand All @@ -325,7 +335,8 @@ func initObjectService(c *cfg) {
SetNetmapState(c.cfgNetmap.state).
SetEACLSource(c.cfgObject.eaclSource).
SetValidator(eaclSDK.NewValidator()).
SetLocalStorage(ls),
SetLocalStorage(ls).
SetHeaderSource(cachedHeaderSource(sGet, cachedFirstObjectsNumber)),
),
),
)
Expand Down Expand Up @@ -535,3 +546,52 @@ func (e storageEngine) Lock(locker oid.Address, toLock []oid.ID) error {
func (e storageEngine) Put(o *objectSDK.Object) error {
return engine.Put(e.engine, o)
}

func cachedHeaderSource(getSvc *getsvc.Service, cacheSize int) headerSource {
hs := headerSource{getsvc: getSvc}

if cacheSize > 0 {
hs.cache, _ = lru.New[oid.Address, *objectSDK.Object](cacheSize)
}

return hs
}

type headerSource struct {
getsvc *getsvc.Service
cache *lru.Cache[oid.Address, *objectSDK.Object]
}

type headerWriter struct {
h *objectSDK.Object
}

func (h *headerWriter) WriteHeader(o *objectSDK.Object) error {
h.h = o
return nil
}

func (h headerSource) Head(address oid.Address) (*objectSDK.Object, error) {
if h.cache != nil {
head, ok := h.cache.Get(address)
if ok {
return head, nil
}
}

var hw headerWriter

// no custom common prms since a caller is expected to be a container
// participant so no additional headers, access tokens, etc
var prm getsvc.HeadPrm
prm.SetHeaderWriter(&hw)
prm.WithAddress(address)
prm.WithRawFlag(true)

err := h.getsvc.Head(context.Background(), prm)
if err != nil {
return nil, fmt.Errorf("reading header: %w", err)
}

return hw.h, nil
}
87 changes: 85 additions & 2 deletions pkg/core/object/fmt.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package object

import (
"context"
"errors"
"fmt"
"strconv"
Expand All @@ -23,6 +24,7 @@ type FormatValidatorOption func(*cfg)
type cfg struct {
netState netmap.State
e LockSource
sv SplitVerifier
}

// DeleteHandler is an interface of delete queue processor.
Expand All @@ -49,6 +51,18 @@ type Locker interface {
Lock(idCnr cid.ID, locker oid.ID, locked []oid.ID) error
}

// SplitVerifier represent split validation unit. It verifies V2 split
// chains based on the link object's payload (list of ID+ObjectSize pairs).
type SplitVerifier interface {
// VerifySplit must verify split hierarchy and return any error that did
// not allow processing the chain. Must break (if possible) any internal
// computations if context is done. The second and the third args are the
// first part's address used as a chain unique identifier that also must
// be checked. The fourth arg is guaranteed to be the full list from the
// link's payload without item order change.
VerifySplit(context.Context, cid.ID, oid.ID, []object.MeasuredObject) error
}

var errNilObject = errors.New("object is nil")

var errNilID = errors.New("missing identifier")
Expand Down Expand Up @@ -103,6 +117,41 @@ func (v *FormatValidator) Validate(obj *object.Object, unprepared bool) error {
return err
}

_, firstSet := obj.FirstID()
splitID := obj.SplitID()
par := obj.Parent()

if obj.HasParent() {
if splitID != nil {
// V1 split
if firstSet {
return errors.New("v1 split: first ID object is set")
}
} else {
// V2 split

if !firstSet {
// first part only
if obj.Parent() == nil {
return errors.New("v2 split: first object part does not have parent header")
}
} else {
// 2nd+ parts

typ := obj.Type()

// link object only
if typ == object.TypeLink && (par == nil || par.Signature() == nil) {
return errors.New("v2 split: incorrect link object's parent header")
}

if _, hasPrevious := obj.PreviousID(); typ != object.TypeLink && !hasPrevious {
return errors.New("v2 split: middle part does not have previous object ID")
}
}
}
}

if err := v.checkAttributes(obj); err != nil {
return fmt.Errorf("invalid attributes: %w", err)
}
Expand All @@ -121,9 +170,9 @@ func (v *FormatValidator) Validate(obj *object.Object, unprepared bool) error {
}
}

if obj = obj.Parent(); obj != nil {
if par != nil && (firstSet || splitID != nil) {
// Parent object already exists.
return v.Validate(obj, false)
return v.Validate(par, false)
}

return nil
Expand Down Expand Up @@ -161,6 +210,7 @@ func (v *FormatValidator) validateSignatureKey(obj *object.Object) error {
// is one of:
// - object.TypeTombstone;
// - object.TypeStorageGroup;
// - object.TypeLink;
// - object.TypeLock.
type ContentMeta struct {
typ object.Type
Expand Down Expand Up @@ -191,6 +241,32 @@ func (v *FormatValidator) ValidateContent(o *object.Object) (ContentMeta, error)
switch o.Type() {
case object.TypeRegular:
// ignore regular objects, they do not need payload formatting
case object.TypeLink:
if len(o.Payload()) == 0 {
return ContentMeta{}, fmt.Errorf("(%T) empty payload in the link object", v)
}

firstObjID, set := o.FirstID()
if !set {
return ContentMeta{}, errors.New("link object does not have first object ID")
}

cnr, set := o.ContainerID()
if !set {
return ContentMeta{}, errors.New("link object does not have container ID")
}

var testLink object.Link

err := o.ReadLink(&testLink)
if err != nil {
return ContentMeta{}, fmt.Errorf("reading link object's payload: %w", err)
}

err = v.sv.VerifySplit(context.Background(), cnr, firstObjID, testLink.Objects())
if err != nil {
return ContentMeta{}, fmt.Errorf("link object's split chain verification: %w", err)
}
case object.TypeTombstone:
if len(o.Payload()) == 0 {
return ContentMeta{}, fmt.Errorf("(%T) empty payload in tombstone", v)
Expand Down Expand Up @@ -392,3 +468,10 @@ func WithLockSource(e LockSource) FormatValidatorOption {
c.e = e
}
}

// WithSplitVerifier returns option to set a SplitVerifier.
func WithSplitVerifier(sv SplitVerifier) FormatValidatorOption {
return func(c *cfg) {
c.sv = sv
}
}
57 changes: 57 additions & 0 deletions pkg/core/object/fmt_test.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package object

import (
"context"
"strconv"
"testing"

"github.com/nspcc-dev/neo-go/pkg/crypto/keys"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
neofsecdsa "github.com/nspcc-dev/neofs-sdk-go/crypto/ecdsa"
Expand Down Expand Up @@ -344,3 +346,58 @@ func TestFormatValidator_Validate(t *testing.T) {
})
})
}

type testSplitVerifier struct {
}

func (t *testSplitVerifier) VerifySplit(ctx context.Context, cID cid.ID, firstID oid.ID, children []object.MeasuredObject) error {
return nil
}

func TestLinkObjectSplitV2(t *testing.T) {
verifier := new(testSplitVerifier)

v := NewFormatValidator(
WithSplitVerifier(verifier),
)

ownerKey, err := keys.NewPrivateKey()
require.NoError(t, err)

signer := user.NewAutoIDSigner(ownerKey.PrivateKey)
obj := blankValidObject(signer)
obj.SetParent(object.New())
obj.SetSplitID(object.NewSplitID())
obj.SetFirstID(oidtest.ID())

t.Run("V1 split, first is set", func(t *testing.T) {
require.ErrorContains(t, v.Validate(obj, true), "first ID object is set")
})

t.Run("V2 split", func(t *testing.T) {
obj.ResetRelations()
obj.SetParent(object.New())

t.Run("first object is not set", func(t *testing.T) {
require.ErrorContains(t, v.Validate(obj, true), "first object part does not have parent header")
})

t.Run("link object without finished parent", func(t *testing.T) {
obj.ResetRelations()
obj.SetParent(object.New())
obj.SetFirstID(oidtest.ID())
obj.SetType(object.TypeLink)

require.ErrorContains(t, v.Validate(obj, true), "incorrect link object's parent header")
})

t.Run("middle child does not have previous object ID", func(t *testing.T) {
obj.ResetRelations()
obj.SetParent(object.New())
obj.SetFirstID(oidtest.ID())
obj.SetType(object.TypeRegular)

require.ErrorContains(t, v.Validate(obj, true), "middle part does not have previous object ID")
})
})
}
4 changes: 4 additions & 0 deletions pkg/local_object_storage/metabase/VERSION.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ Numbers stand for a single byte value.
- Name: container ID + `9`
- Key: object ID
- Value: marshaled object
- Buckets containing object or LINK type
- Name: container ID + `18`
- Key: object ID
- Value: marshaled object
- Buckets mapping objects to the storage ID they are stored in
- Name: container ID + `10`
- Key: object ID
Expand Down
6 changes: 6 additions & 0 deletions pkg/local_object_storage/metabase/containers.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ func (db *DB) DeleteContainer(cID cid.ID) error {
return fmt.Errorf("root object's bucket cleanup: %w", err)
}

// Link objects
err = tx.DeleteBucket(linkObjectsBucketName(cID, buff))
if err != nil && !errors.Is(err, bbolt.ErrBucketNotFound) {
return fmt.Errorf("link objects' bucket cleanup: %w", err)
}

// indexes

err = tx.DeleteBucket(ownerBucketName(cID, buff))
Expand Down
Loading

0 comments on commit e20f436

Please sign in to comment.