Skip to content

Commit

Permalink
node/object: Serve SearchV2 RPC
Browse files Browse the repository at this point in the history
WIP

Refs #3058.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 11, 2025
1 parent 9e2e544 commit d89f165
Show file tree
Hide file tree
Showing 11 changed files with 704 additions and 27 deletions.
112 changes: 102 additions & 10 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
containercore "github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
morphClient "github.com/nspcc-dev/neofs-node/pkg/morph/client"
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-node/pkg/network"
objectService "github.com/nspcc-dev/neofs-node/pkg/services/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl"
v2 "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2"
Expand All @@ -33,7 +35,9 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/services/replicator"
truststorage "github.com/nspcc-dev/neofs-node/pkg/services/reputation/local/storage"
"github.com/nspcc-dev/neofs-sdk-go/client"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto"
eaclSDK "github.com/nspcc-dev/neofs-sdk-go/eacl"
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
objectSDK "github.com/nspcc-dev/neofs-sdk-go/object"
Expand All @@ -42,6 +46,7 @@ import (
apireputation "github.com/nspcc-dev/neofs-sdk-go/reputation"
"github.com/nspcc-dev/neofs-sdk-go/user"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type objectSvc struct {
Expand Down Expand Up @@ -131,6 +136,90 @@ func (fn *innerRingFetcherWithNotary) InnerRingKeys() ([][]byte, error) {

type coreClientConstructor reputationClientConstructor

func (x *coreClientConstructor) SendSearchRequest(ctx context.Context, node netmapsdk.NodeInfo, req *protoobject.SearchV2Request) ([]client.SearchResultItem, bool, error) {
// TODO: copy-pasted from old search implementation, consider deduplicating in
// the client constructor
var endpoints network.AddressGroup
if err := endpoints.FromIterator(network.NodeEndpointsIterator(node)); err != nil {
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control.
return nil, false, fmt.Errorf("failed to decode network endpoints of the storage node from the network map: %w", err)
}
var info coreclient.NodeInfo
info.SetAddressGroup(endpoints)
info.SetPublicKey(node.PublicKey())
c, err := x.Get(info)
if err != nil {
return nil, false, fmt.Errorf("get node client: %w", err)
}

var resp *protoobject.SearchV2Response
var firstErr error
for i := range endpoints {
if err = c.RawForAddress(endpoints[i], func(conn *grpc.ClientConn) error {
var err error
resp, err = protoobject.NewObjectServiceClient(conn).SearchV2(ctx, req)
return err
}); err == nil {
break
}
if firstErr == nil {
firstErr = fmt.Errorf("send request over gRPC: %w", err)
}
}
if firstErr != nil {
return nil, false, firstErr
}

if !bytes.Equal(resp.GetVerifyHeader().GetBodySignature().GetKey(), node.PublicKey()) {
return nil, false, coreclient.ErrWrongPublicKey
}
if err := neofscrypto.VerifyResponseWithBuffer(resp, nil); err != nil {
return nil, false, fmt.Errorf("response verification failed: %w", err)
}
if err := apistatus.ToError(resp.GetMetaHeader().GetStatus()); err != nil {
return nil, false, err
}
// TODO: copy-pasted from SDK (*) client code, consider sharing
// (*) added check that cursor is set only when result has count items
if resp.Body == nil {
return nil, false, nil
}
n := uint32(len(resp.Body.Result))
if n == 0 {
if resp.Body.Cursor != "" {
return nil, false, errors.New("invalid response body: cursor is set with empty result")
}
return nil, false, nil
}
if reqCursor := req.Body.Cursor; reqCursor != "" && resp.Body.Cursor == reqCursor {
return nil, false, errors.New("invalid response body: cursor repeats the initial one")
}
if n > req.Body.Count {
return nil, false, errors.New("invalid response body: more items than requested")
}
if resp.Body.Cursor != "" && n < req.Body.Count {
return nil, false, fmt.Errorf("invalid response body: cursor is set with less items than requested %d < %d", n, req.Body.Count)
}

res := make([]client.SearchResultItem, n)
for i, r := range resp.Body.Result {
switch {
case r == nil:
return nil, false, fmt.Errorf("invalid response body: nil element #%d", i)
case r.Id == nil:
return nil, false, fmt.Errorf("invalid response body: invalid element #%d: missing ID", i)
case len(r.Attributes) != len(req.Body.Attributes):
return nil, false, fmt.Errorf("invalid response body: invalid element #%d: wrong attribute count %d", i, len(r.Attributes))
}
if err = res[i].ID.FromProtoMessage(r.Id); err != nil {
return nil, false, fmt.Errorf("invalid response body: invalid element #%d: invalid ID: %w", i, err)
}
res[i].Attributes = r.Attributes
}
return res, resp.Body.Cursor != "", nil
}

func (x *coreClientConstructor) Get(info coreclient.NodeInfo) (coreclient.MultiAddressClient, error) {
c, err := (*reputationClientConstructor)(x).Get(info)
if err != nil {
Expand Down Expand Up @@ -303,10 +392,11 @@ func initObjectService(c *cfg) {
)

storage := storageForObjectService{
local: ls,
putSvc: sPut,
keys: keyStorage,
}
server := objectService.New(objSvc, mNumber, fsChain, storage, c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc)
server := objectService.New(objSvc, mNumber, fsChain, storage, c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc, coreConstructor)

for _, srv := range c.cfgGRPC.servers {
protoobject.RegisterObjectServiceServer(srv, server)
Expand Down Expand Up @@ -594,13 +684,9 @@ func newFSChainForObjects(cnrNodes *containerNodes, isLocalPubKey func([]byte) b
}
}

// ForEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
//
// Implements [object.Node] interface.
func (x *fsChainForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error {
return x.containerNodes.forEachContainerNodePublicKeyInLastTwoEpochs(id, f)
// ForEachContainerNodeInLastTwoEpochs implements [objectService.FSChain] interface.
func (x *fsChainForObjects) ForEachContainerNodeInLastTwoEpochs(id cid.ID, f func(netmapsdk.NodeInfo) bool) error {
return x.containerNodes.forEachContainerNode(id, true, f)
}

// IsOwnPublicKey checks whether given binary-encoded public key is assigned to
Expand All @@ -616,10 +702,16 @@ func (x *fsChainForObjects) IsOwnPublicKey(pubKey []byte) bool {
func (x *fsChainForObjects) LocalNodeUnderMaintenance() bool { return x.isMaintenance.Load() }

type storageForObjectService struct {
local *engine.StorageEngine
putSvc *putsvc.Service
keys *util.KeyStorage
}

// SearchObjects implements [objectService.Storage] interface.
func (x storageForObjectService) SearchObjects(cnr cid.ID, fs objectSDK.SearchFilters, attrs []string, cursor *meta.SearchCursor, count uint16) ([]client.SearchResultItem, *meta.SearchCursor, error) {
return x.local.Search(cnr, fs, attrs, cursor, count)
}

func (x storageForObjectService) VerifyAndStoreObjectLocally(obj objectSDK.Object) error {
return x.putSvc.ValidateAndStoreObjectLocally(obj)
}
Expand Down Expand Up @@ -699,8 +791,8 @@ type netmapSourceWithNodes struct {

func (n netmapSourceWithNodes) ServerInContainer(cID cid.ID) (bool, error) {
var serverInContainer bool
err := n.fsChain.ForEachContainerNodePublicKeyInLastTwoEpochs(cID, func(pubKey []byte) bool {
if n.fsChain.isLocalPubKey(pubKey) {
err := n.fsChain.ForEachContainerNodeInLastTwoEpochs(cID, func(node netmapsdk.NodeInfo) bool {
if n.fsChain.isLocalPubKey(node.PublicKey()) {
serverInContainer = true
return false
}
Expand Down
9 changes: 0 additions & 9 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ func newContainerNodes(containers container.Source, network netmap.Source) (*con
}, nil
}

// forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.ID, f func(pubKey []byte) bool) error {
return x.forEachContainerNode(cnrID, true, func(node netmapsdk.NodeInfo) bool {
return f(node.PublicKey())
})
}

func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool, f func(netmapsdk.NodeInfo) bool) error {
curEpoch, err := x.network.Epoch()
if err != nil {
Expand Down
111 changes: 111 additions & 0 deletions pkg/core/object/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package object

import (
"bytes"
"fmt"
"math/big"
"slices"
"strings"

"github.com/nspcc-dev/neofs-sdk-go/client"
)

// TODO: docs.
type SearchResultMerger struct {
limit, n uint16
more bool
buf []client.SearchResultItem
iBuf, iNext *big.Int
}

// TODO: docs.
func NewSearchResultMerger(limit uint16) *SearchResultMerger {
return &SearchResultMerger{limit: limit}
}

// TODO: docs.
func (x *SearchResultMerger) Add(withCursor bool, next []client.SearchResultItem) {
if len(next) > int(x.limit) {
panic(fmt.Sprintf("limit of items exceeded %d > %d", len(next), x.limit))
}
if withCursor && len(next) < int(x.limit) {
panic(fmt.Sprintf("cursor is set with fewer items %d < %d", len(next), x.limit))
}
if len(next) == 0 {
return
}
if x.buf == nil {
x.buf = slices.Grow(next, int(x.limit))[:x.limit]
x.n = uint16(len(next))
x.more = withCursor
if len(next[0].Attributes) > 0 {
x.iBuf, x.iNext = new(big.Int), new(big.Int)
}
return
}
buf := x.buf
n := x.n
next:
for len(next) > 0 {
for i := range n {
if next[0].ID == buf[i].ID {
if len(next) == 1 {
x.more = x.more || withCursor
return
}
buf, next = buf[i+1:], next[1:]
n -= uint16(i) + 1
continue next
}
}
var cmpInt bool
if x.iBuf != nil {
_, cmpInt = x.iNext.SetString(next[0].Attributes[0], 10)
}
var i uint16
for i = 0; i < n; i++ { // do not use range: if next[0] is the biggest, i=n is desired, following condition catches
if x.iBuf != nil {
if cmpInt {
_, cmpInt = x.iBuf.SetString(buf[i].Attributes[0], 10)
}
if cmpInt {
if c := x.iNext.Cmp(x.iBuf); c < 0 {
break
} else if c > 0 {
continue
}
} else {
if c := strings.Compare(next[0].Attributes[0], buf[i].Attributes[0]); c < 0 {
break
} else if c > 0 {
continue
}
}
}
if c := bytes.Compare(next[0].ID[:], buf[i].ID[:]); c < 0 { // == 0 caught above
break
}
}
if i == uint16(len(buf)) {
x.more = true
return
}
if i == n {
cpd := copy(buf[n:], next)
x.n += uint16(cpd)
x.more = withCursor || cpd < len(next)
return
}
x.more = x.more || n == uint16(len(buf))
copy(buf[i+1:], buf[i:n])
buf[i] = next[0]
x.n = min(x.n+1, x.limit)
buf, next = buf[i+1:], next[1:]
n -= i
}
}

// TODO: docs.
func (x *SearchResultMerger) Fin() ([]client.SearchResultItem, bool) {
return x.buf[:x.n], x.more
}
Loading

0 comments on commit d89f165

Please sign in to comment.