Skip to content

Commit

Permalink
node/object: Serve SearchV2 RPC
Browse files Browse the repository at this point in the history
Refs #3058.

Signed-off-by: Leonard Lyubich <leonard@morphbits.io>
  • Loading branch information
cthulhu-rider committed Feb 11, 2025
1 parent 64dac4c commit 1c3f5fe
Show file tree
Hide file tree
Showing 13 changed files with 776 additions and 62 deletions.
71 changes: 61 additions & 10 deletions cmd/neofs-node/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
containercore "github.com/nspcc-dev/neofs-node/pkg/core/container"
"github.com/nspcc-dev/neofs-node/pkg/core/netmap"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase"
morphClient "github.com/nspcc-dev/neofs-node/pkg/morph/client"
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-node/pkg/network"
objectService "github.com/nspcc-dev/neofs-node/pkg/services/object"
"github.com/nspcc-dev/neofs-node/pkg/services/object/acl"
v2 "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2"
Expand All @@ -42,6 +44,7 @@ import (
apireputation "github.com/nspcc-dev/neofs-sdk-go/reputation"
"github.com/nspcc-dev/neofs-sdk-go/user"
"go.uber.org/zap"
"google.golang.org/grpc"
)

type objectSvc struct {
Expand Down Expand Up @@ -131,6 +134,51 @@ func (fn *innerRingFetcherWithNotary) InnerRingKeys() ([][]byte, error) {

type coreClientConstructor reputationClientConstructor

func (x *coreClientConstructor) Search(ctx context.Context, node netmapsdk.NodeInfo, req *protoobject.SearchV2Request) (*protoobject.SearchV2Response, error) {
// TODO: copy-pasted from old search implementation, consider deduplicating in
// the client constructor
var endpoints network.AddressGroup
if err := endpoints.FromIterator(network.NodeEndpointsIterator(node)); err != nil {
// critical error that may ultimately block the storage service. Normally it
// should not appear because entry into the network map under strict control.
return nil, fmt.Errorf("failed to decode network endpoints of the storage node from the network map: %w", err)
}
var info coreclient.NodeInfo
info.SetAddressGroup(endpoints)
info.SetPublicKey(node.PublicKey())
if ext := node.ExternalAddresses(); len(ext) > 0 {
var externalEndpoints network.AddressGroup
if err := externalEndpoints.FromStringSlice(ext); err != nil {
// less critical since the main ones must work, but also important
x.log.Warn("failed to decode external network endpoints of the storage node from the network map, ignore them",
zap.Strings("endpoints", ext), zap.Error(err))
} else {
info.SetExternalAddressGroup(externalEndpoints)
}

Check warning on line 157 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L137-L157

Added lines #L137 - L157 were not covered by tests
}
c, err := x.Get(info)
if err != nil {
return nil, fmt.Errorf("get node client: %w", err)
}

Check warning on line 162 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L159-L162

Added lines #L159 - L162 were not covered by tests

var resp *protoobject.SearchV2Response
var firstErr error
for i := range endpoints {
if err = c.RawForAddress(endpoints[i], func(conn *grpc.ClientConn) error {
var err error
resp, err = protoobject.NewObjectServiceClient(conn).SearchV2(ctx, req)
return err
}); err == nil {
firstErr = nil
break

Check warning on line 173 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L164-L173

Added lines #L164 - L173 were not covered by tests
}
if firstErr == nil {
firstErr = fmt.Errorf("send request over gRPC: %w", err)
}

Check warning on line 177 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L175-L177

Added lines #L175 - L177 were not covered by tests
}
return resp, firstErr

Check warning on line 179 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L179

Added line #L179 was not covered by tests
}

func (x *coreClientConstructor) Get(info coreclient.NodeInfo) (coreclient.MultiAddressClient, error) {
c, err := (*reputationClientConstructor)(x).Get(info)
if err != nil {
Expand Down Expand Up @@ -303,10 +351,11 @@ func initObjectService(c *cfg) {
)

storage := storageForObjectService{
local: ls,

Check warning on line 354 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L354

Added line #L354 was not covered by tests
putSvc: sPut,
keys: keyStorage,
}
server := objectService.New(objSvc, mNumber, fsChain, storage, c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc)
server := objectService.New(objSvc, mNumber, fsChain, storage, c.shared.basics.key.PrivateKey, c.metricsCollector, aclChecker, aclSvc, coreConstructor)

Check warning on line 358 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L358

Added line #L358 was not covered by tests

for _, srv := range c.cfgGRPC.servers {
protoobject.RegisterObjectServiceServer(srv, server)
Expand Down Expand Up @@ -594,13 +643,9 @@ func newFSChainForObjects(cnrNodes *containerNodes, isLocalPubKey func([]byte) b
}
}

// ForEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
//
// Implements [object.Node] interface.
func (x *fsChainForObjects) ForEachContainerNodePublicKeyInLastTwoEpochs(id cid.ID, f func(pubKey []byte) bool) error {
return x.containerNodes.forEachContainerNodePublicKeyInLastTwoEpochs(id, f)
// ForEachContainerNodeInLastTwoEpochs implements [objectService.FSChain] interface.
func (x *fsChainForObjects) ForEachContainerNodeInLastTwoEpochs(id cid.ID, f func(netmapsdk.NodeInfo) bool) error {
return x.containerNodes.forEachContainerNode(id, true, f)

Check warning on line 648 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L647-L648

Added lines #L647 - L648 were not covered by tests
}

// IsOwnPublicKey checks whether given binary-encoded public key is assigned to
Expand All @@ -616,10 +661,16 @@ func (x *fsChainForObjects) IsOwnPublicKey(pubKey []byte) bool {
func (x *fsChainForObjects) LocalNodeUnderMaintenance() bool { return x.isMaintenance.Load() }

type storageForObjectService struct {
local *engine.StorageEngine
putSvc *putsvc.Service
keys *util.KeyStorage
}

// SearchObjects implements [objectService.Storage] interface.
func (x storageForObjectService) SearchObjects(cnr cid.ID, fs objectSDK.SearchFilters, attrs []string, cursor *meta.SearchCursor, count uint16) ([]client.SearchResultItem, *meta.SearchCursor, error) {
return x.local.Search(cnr, fs, attrs, cursor, count)

Check warning on line 671 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L670-L671

Added lines #L670 - L671 were not covered by tests
}

func (x storageForObjectService) VerifyAndStoreObjectLocally(obj objectSDK.Object) error {
return x.putSvc.ValidateAndStoreObjectLocally(obj)
}
Expand Down Expand Up @@ -699,8 +750,8 @@ type netmapSourceWithNodes struct {

func (n netmapSourceWithNodes) ServerInContainer(cID cid.ID) (bool, error) {
var serverInContainer bool
err := n.fsChain.ForEachContainerNodePublicKeyInLastTwoEpochs(cID, func(pubKey []byte) bool {
if n.fsChain.isLocalPubKey(pubKey) {
err := n.fsChain.ForEachContainerNodeInLastTwoEpochs(cID, func(node netmapsdk.NodeInfo) bool {
if n.fsChain.isLocalPubKey(node.PublicKey()) {

Check warning on line 754 in cmd/neofs-node/object.go

View check run for this annotation

Codecov / codecov/patch

cmd/neofs-node/object.go#L753-L754

Added lines #L753 - L754 were not covered by tests
serverInContainer = true
return false
}
Expand Down
9 changes: 0 additions & 9 deletions cmd/neofs-node/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,15 +80,6 @@ func newContainerNodes(containers container.Source, network netmap.Source) (*con
}, nil
}

// forEachContainerNodePublicKeyInLastTwoEpochs passes binary-encoded public key
// of each node match the referenced container's storage policy at two latest
// epochs into f. When f returns false, nil is returned instantly.
func (x *containerNodes) forEachContainerNodePublicKeyInLastTwoEpochs(cnrID cid.ID, f func(pubKey []byte) bool) error {
return x.forEachContainerNode(cnrID, true, func(node netmapsdk.NodeInfo) bool {
return f(node.PublicKey())
})
}

func (x *containerNodes) forEachContainerNode(cnrID cid.ID, withPrevEpoch bool, f func(netmapsdk.NodeInfo) bool) error {
curEpoch, err := x.network.Epoch()
if err != nil {
Expand Down
46 changes: 23 additions & 23 deletions cmd/neofs-node/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ func newNetmapWithContainer(tb testing.TB, nodeNum int, selected ...[]int) ([]ne
func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.T) {
const anyEpoch = 42
anyCnr := cidtest.ID()
failOnCall := func(tb testing.TB) func([]byte) bool {
return func([]byte) bool {
failOnCall := func(tb testing.TB) func(netmap.NodeInfo) bool {
return func(netmap.NodeInfo) bool {
tb.Fatal("must not be called")
return false
}
Expand All @@ -155,7 +155,7 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.
require.NoError(t, err)

for n := 1; n < 10; n++ {
err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t))
err = ns.forEachContainerNode(anyCnr, true, failOnCall(t))
require.ErrorIs(t, err, epochErr)
require.EqualError(t, err, "read current NeoFS epoch: any epoch error")
// such error must not be cached
Expand All @@ -170,7 +170,7 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.
require.NoError(t, err)

for n := 1; n < 10; n++ {
err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t))
err = ns.forEachContainerNode(anyCnr, true, failOnCall(t))
require.ErrorIs(t, err, cnrErr)
require.EqualError(t, err, "select container nodes for current epoch #42: read container by ID: any container error")
// such error must not be cached
Expand All @@ -185,7 +185,7 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.
require.NoError(t, err)

for n := 1; n <= 10; n++ {
err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, failOnCall(t))
err = ns.forEachContainerNode(anyCnr, true, failOnCall(t))
require.ErrorIs(t, err, curNetmapErr)
require.EqualError(t, err, "select container nodes for current epoch #42: read network map by epoch: any current netmap error")
network.assertEpochCallCount(t, n)
Expand All @@ -203,8 +203,8 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.

for n := 1; n < 10; n++ {
var calledKeys [][]byte
err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool {
calledKeys = append(calledKeys, pubKey)
err = ns.forEachContainerNode(anyCnr, true, func(node netmap.NodeInfo) bool {
calledKeys = append(calledKeys, node.PublicKey())
return true
})
require.NoError(t, err)
Expand All @@ -229,8 +229,8 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.

for n := 1; n <= 10; n++ {
var calledKeys [][]byte
err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool {
calledKeys = append(calledKeys, pubKey)
err = ns.forEachContainerNode(anyCnr, true, func(node netmap.NodeInfo) bool {
calledKeys = append(calledKeys, node.PublicKey())
return true
})
require.ErrorIs(t, err, prevNetmapErr)
Expand Down Expand Up @@ -264,8 +264,8 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.

for n := 1; n <= 10; n++ {
var calledKeys [][]byte
err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool {
calledKeys = append(calledKeys, pubKey)
err = ns.forEachContainerNode(anyCnr, true, func(node netmap.NodeInfo) bool {
calledKeys = append(calledKeys, node.PublicKey())
return true
})
require.EqualError(t, err, fmt.Sprintf("select container nodes for previous epoch #41: %v", policyErr))
Expand Down Expand Up @@ -308,8 +308,8 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.

for n := 1; n <= 10; n++ {
var calledKeys [][]byte
err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool {
calledKeys = append(calledKeys, pubKey)
err = ns.forEachContainerNode(anyCnr, true, func(node netmap.NodeInfo) bool {
calledKeys = append(calledKeys, node.PublicKey())
return true
})
require.EqualError(t, err, fmt.Sprintf("select container nodes for current epoch #42: %v", policyErr))
Expand All @@ -335,8 +335,8 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.

for n := 1; n <= 10; n++ {
var calledKeys [][]byte
err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool {
calledKeys = append(calledKeys, pubKey)
err = ns.forEachContainerNode(anyCnr, true, func(node netmap.NodeInfo) bool {
calledKeys = append(calledKeys, node.PublicKey())
return true
})
require.EqualError(t, err,
Expand All @@ -358,8 +358,8 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.

for n := 1; n <= 10; n++ {
var calledKeys [][]byte
err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool {
calledKeys = append(calledKeys, pubKey)
err = ns.forEachContainerNode(anyCnr, true, func(node netmap.NodeInfo) bool {
calledKeys = append(calledKeys, node.PublicKey())
return true
})
require.EqualError(t, err,
Expand All @@ -385,8 +385,8 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.

for n := 1; n <= 10; n++ {
var calledKeys [][]byte
err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool {
calledKeys = append(calledKeys, pubKey)
err = ns.forEachContainerNode(anyCnr, true, func(node netmap.NodeInfo) bool {
calledKeys = append(calledKeys, node.PublicKey())
return true
})
require.NoError(t, err)
Expand Down Expand Up @@ -414,8 +414,8 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.

for limit := 1; limit <= 4; limit++ {
var calledKeys [][]byte
err = ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool {
calledKeys = append(calledKeys, pubKey)
err = ns.forEachContainerNode(anyCnr, true, func(node netmap.NodeInfo) bool {
calledKeys = append(calledKeys, node.PublicKey())
return len(calledKeys) < limit
})
require.NoError(t, err)
Expand All @@ -439,8 +439,8 @@ func TestContainerNodes_ForEachContainerNodePublicKeyInLastTwoEpochs(t *testing.
newNodes1, newNetmap1, _ := newNetmapWithContainer(t, 6, []int{2, 5})
newNodes2, newNetmap2, _ := newNetmapWithContainer(t, 6, []int{3, 4})
call := func(ns *containerNodes) (res [][]byte) {
err := ns.forEachContainerNodePublicKeyInLastTwoEpochs(anyCnr, func(pubKey []byte) bool {
res = append(res, pubKey)
err := ns.forEachContainerNode(anyCnr, true, func(node netmap.NodeInfo) bool {
res = append(res, node.PublicKey())
return true
})
require.NoError(t, err)
Expand Down
111 changes: 111 additions & 0 deletions pkg/core/object/metadata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package object

import (
"bytes"
"fmt"
"math/big"
"slices"
"strings"

"github.com/nspcc-dev/neofs-sdk-go/client"
)

// SearchResultMerger merges SearchV2 results from several sources. Must not be
// used concurrently.
type SearchResultMerger struct {
limit, n uint16
more bool
buf []client.SearchResultItem
iBuf, iNext *big.Int
}

// NewSearchResultMerger constructs SearchResultMerger merging up to limit
// results.
func NewSearchResultMerger(limit uint16) *SearchResultMerger {
return &SearchResultMerger{limit: limit}
}

// Add accumulates the next result.
func (x *SearchResultMerger) Add(withCursor bool, next []client.SearchResultItem) {
if len(next) > int(x.limit) {
panic(fmt.Sprintf("limit of items exceeded %d > %d", len(next), x.limit))
}
if withCursor && len(next) < int(x.limit) {
panic(fmt.Sprintf("cursor is set with fewer items %d < %d", len(next), x.limit))
}
if len(next) == 0 {
return
}

Check warning on line 38 in pkg/core/object/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/object/metadata.go#L37-L38

Added lines #L37 - L38 were not covered by tests
if x.buf == nil {
x.buf = slices.Grow(next, int(x.limit))[:x.limit]
x.n = uint16(len(next))
x.more = withCursor
if len(next[0].Attributes) > 0 {
x.iBuf, x.iNext = new(big.Int), new(big.Int)
}
return
}
buf := x.buf
n := x.n
next:
for len(next) > 0 {
for i := range n {
if next[0].ID == buf[i].ID {
if len(next) == 1 {
x.more = x.more || withCursor
return
}
buf, next = buf[i+1:], next[1:]
n -= uint16(i) + 1
continue next
}
}
var cmpInt bool
if x.iBuf != nil {
_, cmpInt = x.iNext.SetString(next[0].Attributes[0], 10)
}
var i uint16
for i = 0; i < n; i++ { //nolint:intrange // do not use range: if next[0] is the biggest, i=n is desired, following condition catches
if x.iBuf != nil {
if cmpInt {
_, cmpInt = x.iBuf.SetString(buf[i].Attributes[0], 10)
}
if cmpInt {
if c := x.iNext.Cmp(x.iBuf); c < 0 {
break
} else if c > 0 {
continue

Check warning on line 77 in pkg/core/object/metadata.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/object/metadata.go#L76-L77

Added lines #L76 - L77 were not covered by tests
}
} else {
if c := strings.Compare(next[0].Attributes[0], buf[i].Attributes[0]); c < 0 {
break
} else if c > 0 {
continue
}
}
}
if c := bytes.Compare(next[0].ID[:], buf[i].ID[:]); c < 0 { // == 0 caught above
break
}
}
if i == uint16(len(buf)) {
x.more = true
return
}
if i == n {
cpd := copy(buf[n:], next)
x.n += uint16(cpd)
x.more = withCursor || cpd < len(next)
return
}
x.more = x.more || n == uint16(len(buf))
copy(buf[i+1:], buf[i:n])
buf[i] = next[0]
x.n = min(x.n+1, x.limit)
buf, next = buf[i+1:], next[1:]
n -= i
}
}

// Fin returns final result merged from the accumulated ones.
func (x *SearchResultMerger) Fin() ([]client.SearchResultItem, bool) { return x.buf[:x.n], x.more }
Loading

0 comments on commit 1c3f5fe

Please sign in to comment.