From 0904251883723f5beefce150dd8a02c1fa6c54ea Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 4 Feb 2025 15:02:20 +0300 Subject: [PATCH] primary attribute sort only, improvements Untested Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/metabase/metadata.go | 435 +++++++-------- .../metabase/metadata_test.go | 511 ++++++++++++++++-- 2 files changed, 678 insertions(+), 268 deletions(-) diff --git a/pkg/local_object_storage/metabase/metadata.go b/pkg/local_object_storage/metabase/metadata.go index c271ff7a9d..f09c508dc7 100644 --- a/pkg/local_object_storage/metabase/metadata.go +++ b/pkg/local_object_storage/metabase/metadata.go @@ -40,6 +40,12 @@ var ( maxUint256Neg = new(big.Int).Neg(maxUint256) ) +var errInvalidCursor = errors.New("invalid cursor") + +func invalidMetaBucketKeyErr(key []byte, cause error) error { + return fmt.Errorf("invalid meta bucket key (prefix 0x%X): %w", key[0], cause) +} + // TODO: fill on migration // TODO: ROOT and PHY props // TODO: cleaning on obj removal @@ -59,9 +65,7 @@ func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner // TODO: move to global funcs makeKeyB := func(prefix byte, attr string, valLen int) (int, int) { ln := 1 + oid.Size + len(attr) + valLen + len(utf8Delimiter)*2 // TODO: constantize some stuff - if len(k) < ln { - k = make([]byte, ln) - } + k = growLen(k, ln) k[0] = prefix off := 1 + copy(k[1:], attr) off += copy(k[off:], utf8Delimiter) @@ -73,9 +77,7 @@ func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner } makeKeyC := func(attr string, valLen int) (int, int) { ln := 1 + oid.Size + len(attr) + valLen + len(utf8Delimiter) // TODO: constantize some stuff - if len(k) < ln { - k = make([]byte, ln) - } + k = growLen(k, ln) k[0] = metaPrefixC off := 1 + copy(k[1:], id[:]) off += copy(k[off:], attr) @@ -241,37 +243,44 @@ func (db *DB) search(cnr cid.ID, fs object.SearchFilters, attrs []string, cursor func (db *DB) searchInMetaBucket(mb *bbolt.Bucket, fs object.SearchFilters, attrs []string, primSeekKey []byte, count uint32) ([]objectcore.SearchResultItem, []byte, error) { + // TODO: make as much as possible outside the Bolt tx primMatcher := fs[0].Operation() - primMatcherInt := isNumericOp(primMatcher) - indexByID := primMatcher == object.MatchNotPresent - if indexByID { + intPrimMatcher := isNumericOp(primMatcher) + notPresentPrimMatcher := primMatcher == object.MatchNotPresent + if notPresentPrimMatcher { primSeekKey[0] = metaPrefixA - } else if primMatcherInt { + } else if intPrimMatcher { primSeekKey[0] = metaPrefixBI } else { primSeekKey[0] = metaPrefixBS } - primAttr := fs[0].Header() // non-empty, already checked - cursorBytes := primSeekKey[1:] - var primPrefix []byte - if indexByID { - primPrefix = primSeekKey[:1] - } else { - primAttrBytes := []byte(primAttr) - if len(cursorBytes) > 0 { - ind := bytes.Index(cursorBytes, utf8Delimiter) - if ind < 0 { - return nil, nil, errors.New("invalid cursor: missing delimiter") - } - if !bytes.Equal(cursorBytes[:ind], primAttrBytes) { - return nil, nil, errors.New("invalid cursor: wrong primary attribute") - } - primPrefix = primSeekKey[:1+ind+len(utf8Delimiter)] - } else { - primSeekKey = slices.Concat(primSeekKey[:1], primAttrBytes, utf8Delimiter) - primPrefix = primSeekKey + primAttr := fs[0].Header() // attribute emptiness already prevented + repeated := len(primSeekKey) > 1 // 1st is prefix + var primSeekPrefix []byte + var prevResOID, prevResPrimVal []byte + if notPresentPrimMatcher { + primSeekPrefix = primSeekKey[:1] + } else if repeated { + ind := bytes.Index(primSeekKey[1:], utf8Delimiter) // 1st is prefix + if ind < 0 { + return nil, nil, fmt.Errorf("%w: missing delimiter", errInvalidCursor) + } + if !bytes.Equal(primSeekKey[1:1+ind], []byte(primAttr)) { + return nil, nil, fmt.Errorf("%w: wrong primary attribute", errInvalidCursor) + } + primSeekPrefix = primSeekKey[:1+len(primAttr)+len(utf8Delimiter)] + var ok bool + if prevResPrimVal, prevResOID, ok = bytes.Cut(primSeekKey[len(primSeekPrefix):], utf8Delimiter); !ok { + return nil, nil, fmt.Errorf("%w: missing 2nd delimiter", errInvalidCursor) + } else if len(prevResPrimVal) == 0 { + return nil, nil, fmt.Errorf("%w: missing primary attribute value", errInvalidCursor) + } else if len(prevResOID) != oid.Size { + return nil, nil, fmt.Errorf("%w: wrong OID len %d", errInvalidCursor, len(prevResOID)) } + } else { + primSeekKey = slices.Concat(primSeekKey[:1], []byte(primAttr), utf8Delimiter) + primSeekPrefix = primSeekKey } primCursor := mb.Cursor() @@ -283,60 +292,68 @@ func (db *DB) searchInMetaBucket(mb *bbolt.Bucket, fs object.SearchFilters, attr return nil, nil, nil } - res := make([]objectcore.SearchResultItem, count+1) - if len(cursorBytes) > 0 { // VAL1_DELIM_VAL2_DELIM_..._VALN_OID - // FIXME: VALX can contain delimiter, e.g. payload hashes. - parts := bytes.Split(cursorBytes, utf8Delimiter) - if len(parts) != len(attrs)+1 { // + OID - // we could restore attributes by OID from DB (if it hasn't been deleted yet), - // but now we just fail for simplicity - return nil, nil, fmt.Errorf("invalid cursor: wrong number of parts %d, expected %d", len(parts), len(attrs)+1) - } - lastInd := len(parts) - 1 - if ln := len(parts[lastInd]); ln != oid.Size { - return nil, nil, fmt.Errorf("invalid cursor: non-OID last part len %d", ln) - } - res[0].ID = oid.ID(parts[lastInd]) - res[0].Attributes = make([]string, lastInd) // TODO: replace pre-split with iterator over cursorBytes - for i := range res[0].Attributes { - res[0].Attributes[i] = string(parts[i]) - } - } else { - // keep 1st element with zeros, any matching element is "greater" than it - res[0].Attributes = make([]string, len(attrs)) + res := make([]objectcore.SearchResultItem, count) + collectedPrimVals := make([][]byte, count) + collectedPrimKeys := make([][]byte, count) // TODO: can be done w/o slice + var n uint32 + var more bool + var id, seekKeyBuf, dbVal []byte + var secCursor *bbolt.Cursor + + // TODO: to helper struct + getAttributeVal := func(attr string) ([]byte, error) { + kln := 1 + oid.Size + len(attr) + len(utf8Delimiter) + seekKeyBuf = growLen(seekKeyBuf, kln) + seekKeyBuf[0] = metaPrefixC + off := 1 + copy(seekKeyBuf[1:], id) + off += copy(seekKeyBuf[off:], attr) + copy(seekKeyBuf[off:], utf8Delimiter) + if secCursor == nil { + secCursor = mb.Cursor() + } + key, _ := secCursor.Seek(seekKeyBuf[:kln]) + if !bytes.HasPrefix(key, seekKeyBuf[:kln]) { + return nil, nil + } + if len(key[kln:]) == 0 { + return nil, invalidMetaBucketKeyErr(primKey, errors.New("missing attribute value")) + } + return key[kln:], nil + } + isIntInDB := func(attr string, val []byte) bool { + kln := 1 + oid.Size + len(attr) + len(val) + len(utf8Delimiter)*2 + seekKeyBuf = growLen(seekKeyBuf, kln) + seekKeyBuf[0] = metaPrefixBI + off := 1 + copy(seekKeyBuf[1:], attr) + off += copy(seekKeyBuf[off:], utf8Delimiter) + off += copy(seekKeyBuf[off:], val) + off += copy(seekKeyBuf[off:], utf8Delimiter) + copy(seekKeyBuf[off:], id[:]) + return mb.Get(seekKeyBuf[:kln]) != nil } - var dbVal, id []byte - var moreObjs bool - var cFilterKey, biKey, cCollectorKey []byte // TODO: replace A/B/C with meaning - var cCursorFilter, cCursorCollector *bbolt.Cursor - nextI := uint32(1) nextPrimKey: - for ; bytes.HasPrefix(primKey, primPrefix); primKey, _ = primCursor.Next() { - if indexByID { + for ; bytes.HasPrefix(primKey, primSeekPrefix); primKey, _ = primCursor.Next() { + if notPresentPrimMatcher { if id = primKey[1:]; len(id) != oid.Size { - return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid OID len %d", len(id)) + return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid OID len %d", len(id))) } - } else { - // apply primary filter - dbValID := primKey[len(primPrefix):] // VAL_DELIM_OID - ind := bytes.LastIndex(dbValID, utf8Delimiter) // some attributes like payload hashes can contain delimiter + } else { // apply primary filter + valID := primKey[len(primSeekPrefix):] // VAL_DELIM_OID + ind := bytes.LastIndex(valID, utf8Delimiter) // some attributes like payload hashes can contain delimiter if ind < 0 { - return nil, nil, errors.New("invalid key in meta bucket: missing 2nd delimiter") + return nil, nil, invalidMetaBucketKeyErr(primKey, errors.New("missing 2nd delimiter")) } - dbVal, id = dbValID[:ind], dbValID[ind+len(utf8Delimiter):] - switch { - case len(dbVal) == 0: - return nil, nil, errors.New("invalid key in meta bucket: missing attribute value") - case len(id) != oid.Size: - return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid OID len %d", len(id)) + if dbVal, id = valID[:ind], valID[ind+len(utf8Delimiter):]; len(dbVal) == 0 { + return nil, nil, invalidMetaBucketKeyErr(primKey, errors.New("missing attribute value")) + } else if len(id) != oid.Size { + return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid OID len %d", len(id))) } - if !primMatcherInt && primKey[0] == metaPrefixBI { - n, err := intFromBytes(dbVal) - if err != nil { - return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid integer value: %w", err) + if !intPrimMatcher && primKey[0] == metaPrefixBI { + var err error + if dbVal, err = restoreIntAttributeVal(dbVal); err != nil { + return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid integer value: %w", err)) } - dbVal = []byte(n.String()) } for i := range fs { // there may be several filters by primary key, e.g. N >= 10 && N <= 20. We @@ -345,184 +362,169 @@ nextPrimKey: if i > 0 && attr != primAttr { continue } - checkedDBVal, fltVal, err := combineValues(attr, dbVal, fs[i].Value()) + checkedDBVal, fltVal, err := combineValues(attr, dbVal, fs[i].Value()) // TODO: deduplicate DB value preparation if err != nil { return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid attribute %s value: %w", attr, err) } - if ok, err := matchValues(checkedDBVal, fs[i].Operation(), fltVal); err != nil { - return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid attribute value: %w", err) - } else if !ok { + if !matchValues(checkedDBVal, fs[i].Operation(), fltVal) { continue nextPrimKey } // TODO: attribute value can be requested, it can be collected here, or we can - // detect earlier when an object goes beyond the current search range. The - // code can become even more complex. NOTE that we apply filters below + // detect earlier when an object goes beyond the already collected result. The + // code can become even more complex. Same below } } // apply other filters for i := range fs { - if !indexByID && i == 0 { // 1st already checked + if !notPresentPrimMatcher && i == 0 { // 1st already checked continue } - attr := fs[i].Header() // non-empty, checked above + attr := fs[i].Header() // emptiness already prevented for j := 1; j < i; j++ { - if fs[j].Header() == attr { // has already been checked and matches, skip + if fs[j].Header() == attr { // already match, checked in loop below continue } } - attrBytes := []byte(attr) - kln := 1 + oid.Size + len(attrBytes) + len(utf8Delimiter) // TODO: consts - if len(cFilterKey) < kln { - cFilterKey = make([]byte, kln) - } - cFilterKey[0] = metaPrefixC - off := 1 + copy(cFilterKey[1:], id) - off += copy(cFilterKey[off:], attrBytes) - copy(cFilterKey[off:], utf8Delimiter) - if cCursorFilter == nil { - cCursorFilter = mb.Cursor() - } - key, _ := cCursorFilter.Seek(cFilterKey[:kln]) - found := key != nil && bytes.HasPrefix(key, cFilterKey[:kln]) - if found { - if dbVal = key[kln:]; len(dbVal) == 0 { - return nil, nil, errors.New("invalid key in meta bucket: missing attribute value") - } + dbVal, err := getAttributeVal(attr) + if err != nil { + return nil, nil, err } - var isDBInt *bool - var dbValParsedInt []byte // set iff isDBInt is true + var dbValInt *[]byte // nil means not yet checked, pointer to nil means non-int for j := i; j < len(fs); j++ { if j > 0 && fs[j].Header() != attr { continue } - matcher := fs[j].Operation() - if !found { - if matcher == object.MatchNotPresent { + m := fs[j].Operation() + if dbVal == nil { + if m == object.MatchNotPresent { continue } continue nextPrimKey } - if matcher == object.MatchNotPresent { + if m == object.MatchNotPresent { continue nextPrimKey } - if isDBInt == nil { + if dbValInt == nil { if len(dbVal) != intValLen { - isDBInt = new(bool) + dbValInt = new([]byte) } else { // do the same as for primary attribute, but unlike there, here we don't know // whether the attribute is expected to be integer or not. - kln := 1 + oid.Size + len(attr) + len(dbVal) + len(utf8Delimiter)*2 - if len(biKey) < kln { - biKey = make([]byte, kln) - } - biKey[0] = metaPrefixBI - off := 1 + copy(biKey[1:], attr) - off += copy(biKey[off:], utf8Delimiter) - off += copy(biKey[off:], dbVal) - off += copy(biKey[off:], utf8Delimiter) - copy(biKey[off:], id[:]) - isDBInt = new(bool) - if *isDBInt = mb.Get(biKey) != nil; *isDBInt { - n, err := intFromBytes(dbVal) - if err != nil { - return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid integer value: %w", err) + dbValInt = new([]byte) + if isIntInDB(attr, dbVal) { + var err error + if *dbValInt, err = restoreIntAttributeVal(dbVal); err != nil { + return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid integer value: %w", err)) } - dbValParsedInt = []byte(n.String()) } } } - intMatcher := isNumericOp(matcher) - var checkedVal []byte - if intMatcher { - if !*isDBInt { + var checkedDBVal []byte + if isNumericOp(m) { + if *dbValInt == nil { continue nextPrimKey } - checkedVal = dbVal - } else if *isDBInt { - checkedVal = dbValParsedInt + checkedDBVal = dbVal + } else if *dbValInt != nil { + checkedDBVal = *dbValInt } else { - checkedVal = dbVal + checkedDBVal = dbVal } - checkedDBVal, fltVal, err := combineValues(attr, checkedVal, fs[j].Value()) + checkedDBVal, fltVal, err := combineValues(attr, checkedDBVal, fs[j].Value()) // TODO: deduplicate DB value preparation if err != nil { - return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid attribute %s value: %w", attr, err) + return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid attribute %s value: %w", attr, err)) } - if ok, err := matchValues(checkedDBVal, matcher, fltVal); err != nil { - return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid attribute value: %w", err) - } else if !ok { + if !matchValues(checkedDBVal, m, fltVal) { continue nextPrimKey } - // TODO: attribute value can be requested, it can be collected here. And the - // code can become even more complex } } - // object matches - // TODO: if attribute is missing, VAL is "" which is < than non-empty values (mb - // except integers mentioned above). For better UX, we can consider this - // particular case as > so attributed objects go first in the result. This can - // affect comparison with res[0] - collectedAttrs := make([]string, len(attrs)) - var attrI int + // object matches, collect attributes + collected := make([]string, len(attrs)) + var primDBVal []byte var insertI uint32 - nextInd: - for i := range nextI { - for ; attrI < len(attrs); attrI++ { - // TODO: share code with filtering above - kln := 1 + oid.Size + len(attrs[attrI]) + len(utf8Delimiter) - if len(cCollectorKey) < kln { - cCollectorKey = make([]byte, kln) + if len(attrs) > 0 { + var err error + if primDBVal, err = getAttributeVal(attrs[0]); err != nil { + return nil, nil, err + } + if repeated { // can be < than previous response chunk + if c := bytes.Compare(primDBVal, prevResPrimVal); c < 0 || c == 0 && bytes.Compare(id, prevResOID) <= 0 { + continue nextPrimKey + } + // note that if both values are integers, they are already sorted. Otherwise, the order is undefined. + // We could treat non-int values as < then the int ones, but the code would have grown huge + } + for i := range n { + if c := bytes.Compare(primDBVal, collectedPrimVals[i]); c < 0 || c == 0 && bytes.Compare(id, res[i].ID[:]) < 0 { + break } - cCollectorKey[0] = metaPrefixC - off := 1 + copy(cCollectorKey[1:], id) - off += copy(cCollectorKey[off:], attrs[attrI]) - copy(cCollectorKey[off:], utf8Delimiter) - if cCursorCollector == nil { - cCursorCollector = mb.Cursor() + if insertI++; insertI == count { + more = true + continue nextPrimKey } - if key, _ := cCursorCollector.Seek(cCollectorKey[:kln]); bytes.HasPrefix(key, cCollectorKey[:kln]) { - collectedAttrs[attrI] = string(key[kln:]) // FIXME: parse integers + } + if len(primDBVal) == intValLen && isIntInDB(attrs[0], primDBVal) { + n, err := restoreIntAttribute(primDBVal) + if err != nil { + return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid integer value: %w", err)) + } + collected[0] = n.String() + } else { + collected[0] = string(primDBVal) + } + } else { + if repeated { // can be < than previous response chunk + if bytes.Compare(id, prevResOID) <= 0 { + continue nextPrimKey } - if collectedAttrs[attrI] >= res[i].Attributes[attrI] { - insertI++ - continue nextInd // FIXME: do not calc val again + } + for i := insertI; i < n; i++ { + if bytes.Compare(id, res[i].ID[:]) >= 0 { + if insertI++; insertI == count { + more = true + continue nextPrimKey + } } } - if (len(attrs) == 0 || collectedAttrs[attrI-1] == res[i].Attributes[attrI-1]) && bytes.Compare(id, res[i].ID[:]) >= 0 { - insertI++ - continue + } + for i := 1; i < len(attrs); i++ { + // TODO: similar code above, share? + val, err := getAttributeVal(attrs[i]) + if err != nil { + return nil, nil, err } - if i == 0 { // "less" than last item from previous response - continue nextPrimKey + if len(val) == intValLen && isIntInDB(attrs[i], val) { + n, err := restoreIntAttribute(val) + if err != nil { + return nil, nil, invalidMetaBucketKeyErr(primKey, fmt.Errorf("invalid integer value: %w", err)) + } + collected[i] = n.String() + } else { + collected[i] = string(val) } - break } - if insertI == count+1 { - moreObjs = true - continue nextPrimKey + if n == count { + more = true } copy(res[insertI+1:], res[insertI:]) res[insertI].ID = oid.ID(id) - res[insertI].Attributes = collectedAttrs - nextI++ - } - res = res[1:nextI] - if !moreObjs { - return res, nil, nil - } - cursorLn := oid.Size - lastI := len(res) - 1 - for i := range res[lastI].Attributes { - cursorLn += len(res[lastI].Attributes[i]) + len(utf8Delimiter) + res[insertI].Attributes = collected + copy(collectedPrimVals[insertI+1:], collectedPrimVals[insertI:]) + collectedPrimVals[insertI] = primDBVal + copy(collectedPrimKeys[insertI+1:], collectedPrimKeys[insertI:]) + collectedPrimKeys[insertI] = primKey + if n < count { + n++ + } } - newCursor := make([]byte, 0, cursorLn) - for i := range res[lastI].Attributes { - newCursor = append(newCursor, res[lastI].Attributes[i]...) // FIXME: see FIXME in parsing above - newCursor = append(newCursor, utf8Delimiter...) + var resCursor []byte + if more { + resCursor = collectedPrimKeys[n-1][1:] } - newCursor = append(newCursor, res[lastI].ID[:]...) - return res, newCursor, nil + return res[:n], resCursor, nil } // TODO: can be merged with filtered code? @@ -548,16 +550,15 @@ func (db *DB) searchUnfiltered(cnr cid.ID, cursor []byte, count uint32) ([]objec if len(cursor) > 0 && bytes.Equal(k, seekKey) { // cursor is the last response element, so go next k, _ = mbc.Next() } - for ; len(k) > 0 && k[0] == metaPrefixA; k, _ = mbc.Next() { + for ; k[0] == metaPrefixA; k, _ = mbc.Next() { if n == count { // there are still elements cursor = res[n-1].ID[:] return nil } - k = k[1:] - if len(k) != oid.Size { - return fmt.Errorf("unexpected object key len %d, expected 33", len(k)) + if len(k) != oid.Size+1 { + return invalidMetaBucketKeyErr(k, fmt.Errorf("unexpected object key len %d", len(k))) } - res[n].ID = oid.ID(k) + res[n].ID = oid.ID(k[1:]) n++ } cursor = nil @@ -569,8 +570,8 @@ func (db *DB) searchUnfiltered(cnr cid.ID, cursor []byte, count uint32) ([]objec return res[:n], cursor, nil } -// combines formats attribute's DB and NeoFS API SearchV2 values to the -// matchable one. Returns DB errors only. +// combines attribute's DB and NeoFS API SearchV2 values to the matchable +// format. Returns DB errors only. func combineValues(attr string, dbVal []byte, fltVal string) ([]byte, []byte, error) { switch attr { case object.FilterOwnerID: @@ -654,7 +655,15 @@ func putInt(b []byte, n *big.Int) { } } -func intFromBytes(b []byte) (*big.Int, error) { +func restoreIntAttributeVal(b []byte) ([]byte, error) { + n, err := restoreIntAttribute(b) + if err != nil { + return nil, err + } + return []byte(n.String()), nil +} + +func restoreIntAttribute(b []byte) (*big.Int, error) { if len(b) != intValLen { return nil, fmt.Errorf("invalid len %d", len(b)) } @@ -674,29 +683,22 @@ func intFromBytes(b []byte) (*big.Int, error) { } // matches object attribute's search query value to the DB-stored one. Matcher -// must be supported but not [object.MatchNotPresent]. Returns errors related to -// DB values only signaling storage problems. -func matchValues(dbVal []byte, matcher object.SearchMatchType, fltVal []byte) (bool, error) { +// must be supported but not [object.MatchNotPresent]. +func matchValues(dbVal []byte, matcher object.SearchMatchType, fltVal []byte) bool { switch { default: - return false, nil // TODO: check whether supported in blindlyProcess. Then panic here + return false // TODO: check whether supported in blindlyProcess. Then panic here case matcher == object.MatchNotPresent: panic(errors.New("unexpected matcher NOT_PRESENT")) case matcher == object.MatchStringEqual: - return bytes.Equal(dbVal, fltVal), nil + return bytes.Equal(dbVal, fltVal) case matcher == object.MatchStringNotEqual: - return !bytes.Equal(dbVal, fltVal), nil + return !bytes.Equal(dbVal, fltVal) case matcher == object.MatchCommonPrefix: - return bytes.HasPrefix(dbVal, fltVal), nil + return bytes.HasPrefix(dbVal, fltVal) case isNumericOp(matcher): - if len(dbVal) != intValLen { // TODO: const - return false, fmt.Errorf("invalid integer len %d", len(dbVal)) - } - if dbVal[0] != 0 && dbVal[0] != 1 { - return false, fmt.Errorf("invalid integer sign byte %d", dbVal[0]) - } var n big.Int - return n.UnmarshalText(fltVal) == nil && intMatches(dbVal, matcher, &n), nil + return n.UnmarshalText(fltVal) == nil && intMatches(dbVal, matcher, &n) } } @@ -731,3 +733,10 @@ func intMatches(dbVal []byte, matcher object.SearchMatchType, fltVal *big.Int) b return bytes.Compare(dbVal, fltValBytes) <= 0 } } + +func growLen(b []byte, ln int) []byte { + if len(b) >= ln { + return b + } + return make([]byte, ln) +} diff --git a/pkg/local_object_storage/metabase/metadata_test.go b/pkg/local_object_storage/metabase/metadata_test.go index 395e35fb77..69f48f99fa 100644 --- a/pkg/local_object_storage/metabase/metadata_test.go +++ b/pkg/local_object_storage/metabase/metadata_test.go @@ -10,6 +10,7 @@ import ( "strconv" "testing" + objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" "github.com/nspcc-dev/neofs-sdk-go/checksum" checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" @@ -152,16 +153,14 @@ func TestPutMetadata(t *testing.T) { func TestApplyFilter(t *testing.T) { t.Run("unsupported matcher", func(t *testing.T) { - ok, err := matchValues(nil, 9, nil) - require.NoError(t, err) + ok := matchValues(nil, 9, nil) require.False(t, ok) }) t.Run("not present", func(t *testing.T) { - require.Panics(t, func() { _, _ = matchValues(nil, object.MatchNotPresent, nil) }) + require.Panics(t, func() { _ = matchValues(nil, object.MatchNotPresent, nil) }) }) check := func(dbVal []byte, m object.SearchMatchType, fltVal []byte, exp bool) { - ok, err := matchValues(dbVal, m, fltVal) - require.NoError(t, err) + ok := matchValues(dbVal, m, fltVal) require.Equal(t, exp, ok) } anyData := []byte("Hello, world!") @@ -205,35 +204,11 @@ func TestApplyFilter(t *testing.T) { } }) t.Run("int", func(t *testing.T) { - newBytes := func(ln int, prefix byte) []byte { - b := make([]byte, ln) - b[0] = prefix - return b - } - for _, tc := range []struct { - name, err string - v []byte - }{ - {name: "empty", err: "invalid integer len 0", v: []byte{}}, - {name: "undersize", err: "invalid integer len 16", v: newBytes(intValLen/2, 1)}, - {name: "oversize", err: "invalid integer len 34", v: newBytes(intValLen+1, 1)}, - {name: "sign", err: "invalid integer sign byte 13", v: newBytes(intValLen, 13)}, - } { - t.Run("invalid DB value/"+tc.name, func(t *testing.T) { - for _, matcher := range []object.SearchMatchType{ - object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, - } { - _, err := matchValues(tc.v, matcher, []byte("10")) - require.EqualError(t, err, tc.err) - } - }) - } t.Run("non-int filter value", func(t *testing.T) { for _, matcher := range []object.SearchMatchType{ object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, } { - ok, err := matchValues(make([]byte, intValLen), matcher, []byte("1.5")) - require.NoError(t, err) + ok := matchValues(make([]byte, intValLen), matcher, []byte("1.5")) require.False(t, ok) } }) @@ -331,7 +306,7 @@ func TestIntBucketOrder(t *testing.T) { err = db.boltDB.View(func(tx *bbolt.Tx) error { c := tx.Bucket([]byte("any")).Cursor() for k, _ := c.First(); k != nil; k, _ = c.Next() { - n, err := intFromBytes(k) + n, err := restoreIntAttribute(k) require.NoError(t, err) collected = append(collected, n.String()) } @@ -488,7 +463,7 @@ func TestDB_SearchObjects(t *testing.T) { require.NoError(t, err) _, _, err = db.Search(cnr, nil, nil, "", n) - require.EqualError(t, err, "view BoltDB: unexpected object key len 31, expected 33") + require.EqualError(t, err, "view BoltDB: invalid meta bucket key (prefix 0x0): unexpected object key len 32") }) }) t.Run("filters", func(t *testing.T) { @@ -872,28 +847,6 @@ func TestDB_SearchObjects(t *testing.T) { check(object.MatchCommonPrefix, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH", group2) check(object.MatchCommonPrefix, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH1", nil) }) - t.Run("first ID", func(t *testing.T) { - check := func(m object.SearchMatchType, v string, matchInds []uint) { - check("$Object:split.first", m, v, matchInds) - } - group1, group2, all := []uint{2, 4}, []uint{3, 5}, []uint{2, 3, 4, 5} - check(object.MatchStringEqual, "61hnJaKip8c1QxvC2iT4Txfpxf37QBNRaw1XCeq72DbC", group1) - check(object.MatchStringEqual, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH", group2) - check(object.MatchStringNotEqual, "61hnJaKip8c1QxvC2iT4Txfpxf37QBNRaw1XCeq72DbC", group2) - check(object.MatchStringNotEqual, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH", group1) - check(object.MatchStringEqual, "Dfot9FnhkJy9m8pXrF1fL5fmKmbHK8wL8PqExoQFNTrz", nil) // other - check(object.MatchStringNotEqual, "Dfot9FnhkJy9m8pXrF1fL5fmKmbHK8wL8PqExoQFNTrz", all) - for _, m := range []object.SearchMatchType{ - object.MatchNotPresent, object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, - } { - check(m, "61hnJaKip8c1QxvC2iT4Txfpxf37QBNRaw1XCeq72DbC", nil) - } - check(object.MatchCommonPrefix, "", all) - check(object.MatchCommonPrefix, "6", group1) - check(object.MatchCommonPrefix, "C", group2) - check(object.MatchCommonPrefix, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH", group2) - check(object.MatchCommonPrefix, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH1", nil) - }) t.Run("parent ID", func(t *testing.T) { check := func(m object.SearchMatchType, v string, matchInds []uint) { check("$Object:split.parent", m, v, matchInds) @@ -1083,6 +1036,454 @@ func TestDB_SearchObjects(t *testing.T) { }) }) t.Run("attributes", func(t *testing.T) { - // TODO + t.Run("range over integer attribute", func(t *testing.T) { + // Similar scenario is used by NeoGo block fetcher storing blocks in the NeoFS. + // Note that the test does not copy the approach to constructing objects, only + // imitates. + // + // Let node store objects corresponding to the block with heights: 0, 1, 50, + // 100-102, 150, 4294967295 (max). Block#101 is presented thrice (undesired but + // possible). Additional integer attribute is also presented for testing. + const heightAttr = "Height" + const otherAttr = "SomeHash" + otherAttrs := []string{ // sorted + "0de1d9f050abdfb0fd8a4ff061aaa305dbbc63bf03d0ae2b8c93fbb8954b0201", + "13912c19601cc2aa2c35347bc734c469907bcebe5f81812de77a4cc192f3892c", + "1c3cd7853cfb53a134101205b6d894355ccb02ad454ac33a5ced5771a6f6dd14", + "1c80940a5099c05680035f3fcfee6a1dc36335622428bcf40635bf86a75d512b", + "1db039d30914eacfdf71780961e4957d512cfae597969c892ed1b59d258968e8", + "80dbfec78f2d4b5128d8fe51c95f3bc42be741832c77c127d53ab32f4f341505", + "9811b76c7a2b6b0020c30d4a9895fad8f2edab60037139e2a2b01761e137fb1a", + "b7d56b41e13a4502dca18420816bb1ba1a0bc10644c5e3f2bc5c511026df5aef", + "d6c059ae6852e04826419b0381690a1d76906721f195644863931f32f2d23842", + "d6dc85d4c2bab1bbd6da3ebcd4c2c56f12c5c369b685cc301e9b61449abe390b", + } + ids := []oid.ID{ // sorted + {5, 254, 154, 170, 83, 237, 109, 56, 68, 68, 97, 248, 50, 161, 183, 217, 28, 94, 162, 37, 79, 45, 175, 120, 104, 7, 87, 127, 92, 17, 218, 117}, + {41, 204, 35, 189, 128, 42, 229, 31, 7, 157, 117, 193, 98, 150, 30, 172, 103, 253, 100, 69, 223, 91, 232, 120, 70, 86, 242, 110, 88, 161, 62, 182}, + {54, 88, 178, 234, 172, 94, 155, 197, 69, 215, 33, 181, 122, 70, 178, 21, 158, 201, 54, 74, 21, 250, 193, 135, 123, 236, 137, 8, 81, 250, 21, 201}, + {92, 89, 108, 190, 140, 175, 71, 21, 243, 27, 88, 40, 156, 231, 102, 194, 230, 6, 109, 91, 135, 25, 190, 62, 246, 144, 137, 45, 90, 87, 186, 140}, + {116, 181, 195, 91, 211, 242, 145, 117, 174, 58, 195, 47, 208, 182, 46, 246, 18, 85, 0, 40, 129, 154, 68, 97, 225, 189, 89, 187, 194, 109, 201, 95}, + {162, 20, 218, 85, 5, 146, 98, 157, 137, 168, 59, 54, 102, 59, 86, 136, 160, 217, 143, 195, 200, 186, 192, 175, 235, 211, 101, 210, 147, 14, 141, 162}, + {178, 29, 204, 231, 34, 173, 251, 163, 135, 160, 94, 96, 171, 183, 2, 198, 53, 69, 84, 160, 76, 213, 208, 32, 247, 144, 230, 167, 70, 91, 158, 136}, + {199, 65, 97, 53, 71, 144, 40, 246, 194, 114, 139, 109, 213, 129, 253, 106, 141, 36, 249, 20, 130, 126, 245, 11, 110, 113, 50, 171, 153, 210, 119, 245}, + {237, 43, 4, 240, 144, 194, 224, 217, 7, 63, 14, 22, 147, 70, 8, 191, 226, 199, 69, 43, 131, 32, 37, 79, 151, 212, 149, 94, 172, 17, 137, 148}, + {245, 142, 55, 147, 121, 184, 29, 75, 74, 192, 85, 213, 243, 183, 80, 108, 181, 57, 119, 15, 84, 220, 143, 72, 202, 247, 28, 220, 245, 116, 128, 110}, + } + objs := make([]object.Object, len(ids)) // 2 more objects for #101 + appendAttribute(&objs[0], heightAttr, "0") + appendAttribute(&objs[1], heightAttr, "1") + appendAttribute(&objs[2], heightAttr, "50") + appendAttribute(&objs[3], heightAttr, "100") + appendAttribute(&objs[4], heightAttr, "101") + appendAttribute(&objs[5], heightAttr, "101") + appendAttribute(&objs[6], heightAttr, "101") + appendAttribute(&objs[7], heightAttr, "102") + appendAttribute(&objs[8], heightAttr, "150") + appendAttribute(&objs[9], heightAttr, "4294967295") + for i := range ids { + objs[i].SetID(ids[len(ids)-1-i]) // reverse order + } + for i := range otherAttrs { + appendAttribute(&objs[i], otherAttr, otherAttrs[i]) + } + heightSorted := []objectcore.SearchResultItem{ + // attribute takes 1st order priority + {ids[9], []string{"0", otherAttrs[0]}}, + {ids[8], []string{"1", otherAttrs[1]}}, + {ids[7], []string{"50", otherAttrs[2]}}, + {ids[6], []string{"100", otherAttrs[3]}}, + // but if attribute equals, items are sorted by IDs. Secondary attributes have + // no effect, otherwise the order would not be reversed + {ids[3], []string{"101", otherAttrs[6]}}, + {ids[4], []string{"101", otherAttrs[5]}}, + {ids[5], []string{"101", otherAttrs[4]}}, + // attribute takes power again + {ids[2], []string{"102", otherAttrs[7]}}, + {ids[1], []string{"150", otherAttrs[8]}}, + {ids[0], []string{"4294967295", otherAttrs[9]}}, + } + // store + cnr := cidtest.ID() + for i := range objs { + objs[i].SetContainerID(cnr) + objs[i].SetPayloadChecksum(checksumtest.Checksum()) // Put requires + require.NoError(t, db.Put(&objs[i], nil, nil)) + } + t.Run("none", func(t *testing.T) { + for _, set := range []func(*object.SearchFilters){ + func(fs *object.SearchFilters) { fs.AddFilter(heightAttr, "", object.MatchNotPresent) }, + func(fs *object.SearchFilters) { fs.AddFilter(otherAttr, "", object.MatchNotPresent) }, + func(fs *object.SearchFilters) { fs.AddFilter(heightAttr, "0", object.MatchNumLT) }, + func(fs *object.SearchFilters) { fs.AddFilter(heightAttr, "4294967295", object.MatchNumGT) }, + func(fs *object.SearchFilters) { + fs.AddFilter(heightAttr, "0", object.MatchNumGE) + fs.AddFilter(heightAttr, "151", object.MatchStringEqual) + }, + } { + var fs object.SearchFilters + set(&fs) + res, cursor, err := db.Search(cnr, fs, nil, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Empty(t, res) + } + }) + t.Run("all", func(t *testing.T) { + t.Run("unfiltered", func(t *testing.T) { + res, cursor, err := db.Search(cnr, nil, nil, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Len(t, res, len(objs)) + for i := range res { + require.Equal(t, ids[i], res[i].ID) + require.Empty(t, res[i].Attributes) + } + }) + var fs object.SearchFilters + fs.AddFilter(heightAttr, "0", object.MatchNumGE) + t.Run("w/o attributes", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, nil, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Len(t, res, len(heightSorted)) + for i := range res { + // expected order by IDs + require.Equal(t, ids[i], res[i].ID) + require.Empty(t, res[i].Attributes) + } + t.Run("paging", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, nil, "", 2) + require.NoError(t, err) + require.Len(t, res, 2) + for i := range 2 { + require.Equal(t, ids[i], res[i].ID) + require.Empty(t, res[i].Attributes) + } + require.NotEmpty(t, cursor) + res, cursor, err = db.Search(cnr, fs, nil, cursor, 6) + require.NoError(t, err) + // TODO: issue + t.Skip("paging is broken when prim attribute is not requested, see also https://github.com/nspcc-dev/neofs-node/issues/3058#issuecomment-2553193094") + require.Len(t, res, 6) + for i := range 6 { + require.Equal(t, ids[2+i], res[i].ID, i) + require.Empty(t, res[i].Attributes) + } + require.NotEmpty(t, cursor) + res, cursor, err = db.Search(cnr, fs, nil, cursor, 3) + require.NoError(t, err) + require.Len(t, res, 2) + for i := range 2 { + require.Equal(t, ids[8+i], res[i].ID) + require.Empty(t, res[i].Attributes) + } + require.Empty(t, cursor) + }) + }) + t.Run("single attribute", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, []string{heightAttr}, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Len(t, res, len(heightSorted)) + for i, r := range heightSorted { + require.Equal(t, r.ID, res[i].ID) + require.Equal(t, []string{r.Attributes[0]}, res[i].Attributes) + } + res, cursor, err = db.Search(cnr, fs, []string{otherAttr}, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Len(t, res, len(heightSorted)) + for i := range ids { + require.Equal(t, ids[len(ids)-i-1], res[i].ID) + require.Equal(t, []string{otherAttrs[i]}, res[i].Attributes) + } + }) + t.Run("two attributes", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, []string{heightAttr, otherAttr}, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Equal(t, heightSorted, res) + t.Run("paging", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, []string{heightAttr, otherAttr}, "", 2) + require.NoError(t, err) + require.Equal(t, heightSorted[:2], res) + require.NotEmpty(t, cursor) + res, cursor, err = db.Search(cnr, fs, []string{heightAttr, otherAttr}, cursor, 6) + require.NoError(t, err) + require.Equal(t, heightSorted[2:8], res) + require.NotEmpty(t, cursor) + res, cursor, err = db.Search(cnr, fs, []string{heightAttr, otherAttr}, cursor, 3) + require.NoError(t, err) + require.Equal(t, heightSorted[8:], res) + require.Empty(t, cursor) + }) + }) + }) + t.Run("partial", func(t *testing.T) { + var fs object.SearchFilters + fs.AddFilter(heightAttr, "50", object.MatchNumGE) + fs.AddFilter(heightAttr, "150", object.MatchNumLE) + heightSorted := heightSorted[2:9] + ids := ids[1:8] + otherAttrs := otherAttrs[2:9] + t.Run("w/o attributes", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, nil, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Len(t, res, len(ids)) + for i := range res { + // expected order by IDs + require.Equal(t, ids[i], res[i].ID) + require.Empty(t, res[i].Attributes) + } + t.Run("paging", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, nil, "", 2) + require.NoError(t, err) + require.Len(t, res, 2) + for i := range 2 { + require.Equal(t, ids[i], res[i].ID) + require.Empty(t, res[i].Attributes) + } + require.NotEmpty(t, cursor) + res, cursor, err = db.Search(cnr, fs, nil, cursor, 6) + require.NoError(t, err) + t.Skip("paging is broken when prim attribute is not requested, see also https://github.com/nspcc-dev/neofs-node/issues/3058#issuecomment-2553193094") + require.Len(t, res, 6) + for i := range 6 { + require.Equal(t, ids[2+i], res[i].ID, i) + require.Empty(t, res[i].Attributes) + } + require.NotEmpty(t, cursor) + res, cursor, err = db.Search(cnr, fs, nil, cursor, 3) + require.NoError(t, err) + require.Len(t, res, 2) + for i := range 2 { + require.Equal(t, ids[8+i], res[i].ID) + require.Empty(t, res[i].Attributes) + } + require.Empty(t, cursor) + }) + }) + t.Run("single attribute", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, []string{heightAttr}, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Len(t, res, len(heightSorted)) + for i, r := range heightSorted { + require.Equal(t, r.ID, res[i].ID) + require.Equal(t, []string{r.Attributes[0]}, res[i].Attributes) + } + res, cursor, err = db.Search(cnr, fs, []string{otherAttr}, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Len(t, res, len(heightSorted)) + for i := range ids { + require.Equal(t, ids[len(ids)-i-1], res[i].ID) + require.Equal(t, []string{otherAttrs[i]}, res[i].Attributes) + } + }) + t.Run("two attributes", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, []string{heightAttr, otherAttr}, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Equal(t, heightSorted, res) + t.Run("paging", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, []string{heightAttr, otherAttr}, "", 2) + require.NoError(t, err) + require.Equal(t, heightSorted[:2], res) + require.NotEmpty(t, cursor) + res, cursor, err = db.Search(cnr, fs, []string{heightAttr, otherAttr}, cursor, 3) + require.NoError(t, err) + require.Equal(t, heightSorted[2:5], res) + require.NotEmpty(t, cursor) + res, cursor, err = db.Search(cnr, fs, []string{heightAttr, otherAttr}, cursor, 3) + require.NoError(t, err) + require.Equal(t, heightSorted[5:], res) + require.Empty(t, cursor) + }) + }) + }) + }) + t.Run("FilePath+Timestamp", func(t *testing.T) { + // REST GW use-case + ids := []oid.ID{ // sorted + {5, 254, 154, 170, 83, 237, 109, 56, 68, 68, 97, 248, 50, 161, 183, 217, 28, 94, 162, 37, 79, 45, 175, 120, 104, 7, 87, 127, 92, 17, 218, 117}, + {41, 204, 35, 189, 128, 42, 229, 31, 7, 157, 117, 193, 98, 150, 30, 172, 103, 253, 100, 69, 223, 91, 232, 120, 70, 86, 242, 110, 88, 161, 62, 182}, + {54, 88, 178, 234, 172, 94, 155, 197, 69, 215, 33, 181, 122, 70, 178, 21, 158, 201, 54, 74, 21, 250, 193, 135, 123, 236, 137, 8, 81, 250, 21, 201}, + {92, 89, 108, 190, 140, 175, 71, 21, 243, 27, 88, 40, 156, 231, 102, 194, 230, 6, 109, 91, 135, 25, 190, 62, 246, 144, 137, 45, 90, 87, 186, 140}, + } + objs := make([]object.Object, len(ids)) + appendAttribute(&objs[0], object.AttributeFilePath, "cat1.jpg") + appendAttribute(&objs[0], object.AttributeTimestamp, "1738760790") + appendAttribute(&objs[1], object.AttributeFilePath, "cat2.jpg") + appendAttribute(&objs[1], object.AttributeTimestamp, "1738760792") + appendAttribute(&objs[2], object.AttributeFilePath, "cat2.jpg") + appendAttribute(&objs[2], object.AttributeTimestamp, "1738760791") + appendAttribute(&objs[3], object.AttributeFilePath, "cat2.jpg") + appendAttribute(&objs[3], object.AttributeTimestamp, "1738760793") + // store + cnr := cidtest.ID() + for i := range objs { + objs[i].SetID(ids[i]) + objs[i].SetContainerID(cnr) + objs[i].SetPayloadChecksum(checksumtest.Checksum()) // Put requires + require.NoError(t, db.Put(&objs[i], nil, nil)) + } + t.Run("none", func(t *testing.T) { + var fs object.SearchFilters + fs.AddFilter(object.AttributeFilePath, "cat4.jpg", object.MatchStringEqual) + res, cursor, err := db.Search(cnr, fs, []string{object.AttributeTimestamp}, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Empty(t, res) + }) + t.Run("single", func(t *testing.T) { + var fs object.SearchFilters + fs.AddFilter(object.AttributeFilePath, "cat1.jpg", object.MatchStringEqual) + res, cursor, err := db.Search(cnr, fs, []string{object.AttributeFilePath, object.AttributeTimestamp}, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Equal(t, []objectcore.SearchResultItem{ + {ids[0], []string{"cat1.jpg", "1738760790"}}, + }, res) + }) + t.Run("multiple", func(t *testing.T) { + t.Run("both attributes", func(t *testing.T) { + fullRes := []objectcore.SearchResultItem{ + {ids[1], []string{"cat2.jpg", "1738760792"}}, + {ids[2], []string{"cat2.jpg", "1738760791"}}, + {ids[3], []string{"cat2.jpg", "1738760793"}}, + } + var fs object.SearchFilters + fs.AddFilter(object.AttributeFilePath, "cat2.jpg", object.MatchStringEqual) + res, cursor, err := db.Search(cnr, fs, []string{object.AttributeFilePath, object.AttributeTimestamp}, "", 1000) + require.NoError(t, err) + require.Equal(t, fullRes, res) + require.Empty(t, cursor) + t.Run("paging", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, []string{object.AttributeFilePath, object.AttributeTimestamp}, "", 2) + require.NoError(t, err) + require.Equal(t, fullRes[:2], res) + require.NotEmpty(t, cursor) + res, cursor, err = db.Search(cnr, fs, []string{object.AttributeFilePath, object.AttributeTimestamp}, cursor, 1000) + require.NoError(t, err) + require.Equal(t, fullRes[2:], res) + require.Empty(t, cursor) + }) + }) + t.Run("timestamp only", func(t *testing.T) { + fullRes := []objectcore.SearchResultItem{ + {ids[2], []string{"1738760791"}}, + {ids[1], []string{"1738760792"}}, + {ids[3], []string{"1738760793"}}, + } + var fs object.SearchFilters + fs.AddFilter(object.AttributeFilePath, "cat2.jpg", object.MatchStringEqual) + res, cursor, err := db.Search(cnr, fs, []string{object.AttributeTimestamp}, "", 1000) + require.NoError(t, err) + require.Equal(t, fullRes, res) + require.Empty(t, cursor) + t.Run("paging", func(t *testing.T) { + res, cursor, err := db.Search(cnr, fs, []string{object.AttributeTimestamp}, "", 2) + require.NoError(t, err) + require.Equal(t, fullRes[:2], res) + require.NotEmpty(t, cursor) + res, cursor, err = db.Search(cnr, fs, []string{object.AttributeTimestamp}, cursor, 1000) + require.NoError(t, err) + t.Skip("paging is broken when prim attribute is not requested, see also https://github.com/nspcc-dev/neofs-node/issues/3058#issuecomment-2553193094") + require.Equal(t, fullRes[2:], res) + require.Empty(t, cursor) + }) + }) + }) + }) + t.Run("precise select with many attributes", func(t *testing.T) { + // S3 GW use-case + ids := []oid.ID{ // sorted + {5, 254, 154, 170, 83, 237, 109, 56, 68, 68, 97, 248, 50, 161, 183, 217, 28, 94, 162, 37, 79, 45, 175, 120, 104, 7, 87, 127, 92, 17, 218, 117}, + {41, 204, 35, 189, 128, 42, 229, 31, 7, 157, 117, 193, 98, 150, 30, 172, 103, 253, 100, 69, 223, 91, 232, 120, 70, 86, 242, 110, 88, 161, 62, 182}, + {54, 88, 178, 234, 172, 94, 155, 197, 69, 215, 33, 181, 122, 70, 178, 21, 158, 201, 54, 74, 21, 250, 193, 135, 123, 236, 137, 8, 81, 250, 21, 201}, + {92, 89, 108, 190, 140, 175, 71, 21, 243, 27, 88, 40, 156, 231, 102, 194, 230, 6, 109, 91, 135, 25, 190, 62, 246, 144, 137, 45, 90, 87, 186, 140}, + } + objs := make([]object.Object, len(ids)) + appendAttribute(&objs[0], object.AttributeFilePath, "/home/Downloads/dog.jpg") + appendAttribute(&objs[0], "Type", "JPEG") + appendAttribute(&objs[0], "attr1", "val1_1") + appendAttribute(&objs[0], "attr2", "val2_1") + appendAttribute(&objs[1], object.AttributeFilePath, "/usr/local/bin/go") + appendAttribute(&objs[1], "Type", "BIN") + appendAttribute(&objs[1], "attr1", "val1_2") + appendAttribute(&objs[1], "attr2", "val2_2") + appendAttribute(&objs[2], object.AttributeFilePath, "/home/Downloads/cat.jpg") + appendAttribute(&objs[2], "Type", "JPEG") + appendAttribute(&objs[2], "attr1", "val1_3") + appendAttribute(&objs[2], "attr2", "val2_3") + appendAttribute(&objs[3], object.AttributeFilePath, "/var/log/neofs/node") + appendAttribute(&objs[3], "Type", "TEXT") + appendAttribute(&objs[3], "attr1", "val1_4") + appendAttribute(&objs[3], "attr2", "val2_4") + // store + cnr := cidtest.ID() + for i := range objs { + objs[i].SetID(ids[len(ids)-i-1]) + objs[i].SetContainerID(cnr) + objs[i].SetPayloadChecksum(checksumtest.Checksum()) // Put requires + require.NoError(t, db.Put(&objs[i], nil, nil)) + } + + attrs := []string{object.AttributeFilePath, "attr1", "attr2"} + + var fs object.SearchFilters + fs.AddFilter(object.AttributeFilePath, "/home/Downloads/", object.MatchCommonPrefix) + fs.AddFilter("Type", "JPEG", object.MatchStringEqual) + res, cursor, err := db.Search(cnr, fs, attrs, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Equal(t, []objectcore.SearchResultItem{ + {ID: ids[1], Attributes: []string{"/home/Downloads/cat.jpg", "val1_3", "val2_3"}}, + {ID: ids[3], Attributes: []string{"/home/Downloads/dog.jpg", "val1_1", "val2_1"}}, + }, res) + + fs = fs[:0] + fs.AddFilter(object.AttributeFilePath, "/usr", object.MatchCommonPrefix) + fs.AddFilter("Type", "BIN", object.MatchStringEqual) + res, cursor, err = db.Search(cnr, fs, attrs, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Equal(t, []objectcore.SearchResultItem{ + {ID: ids[2], Attributes: []string{"/usr/local/bin/go", "val1_2", "val2_2"}}, + }, res) + + fs = fs[:0] + fs.AddFilter(object.AttributeFilePath, "/", object.MatchCommonPrefix) + fs.AddFilter("Type", "BIN", object.MatchStringNotEqual) + res, cursor, err = db.Search(cnr, fs, attrs, "", 1000) + require.NoError(t, err) + require.Empty(t, cursor) + require.Equal(t, []objectcore.SearchResultItem{ + {ID: ids[1], Attributes: []string{"/home/Downloads/cat.jpg", "val1_3", "val2_3"}}, + {ID: ids[3], Attributes: []string{"/home/Downloads/dog.jpg", "val1_1", "val2_1"}}, + {ID: ids[0], Attributes: []string{"/var/log/neofs/node", "val1_4", "val2_4"}}, + }, res) + + t.Run("paging", func(t *testing.T) { + fs = fs[:0] + fs.AddFilter(object.AttributeFilePath, "/home/", object.MatchCommonPrefix) + fs.AddFilter("Type", "TEXT", object.MatchStringNotEqual) + res, cursor, err := db.Search(cnr, fs, attrs, "", 1) + require.NoError(t, err) + require.Equal(t, []objectcore.SearchResultItem{ + {ID: ids[1], Attributes: []string{"/home/Downloads/cat.jpg", "val1_3", "val2_3"}}, + }, res) + require.NotEmpty(t, cursor) + res, cursor, err = db.Search(cnr, fs, attrs, cursor, 1) + require.NoError(t, err) + require.Equal(t, []objectcore.SearchResultItem{ + {ID: ids[3], Attributes: []string{"/home/Downloads/dog.jpg", "val1_1", "val2_1"}}, + }, res) + require.Empty(t, cursor) + }) + }) }) }