From 4bb5abfc7cd3c89697612fdb7c39bb080dc46dc0 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 21 Jan 2025 20:16:56 +0300 Subject: [PATCH] WIP filters Signed-off-by: Leonard Lyubich --- pkg/local_object_storage/metabase/metadata.go | 556 +++++++++++++++-- .../metabase/metadata_test.go | 581 ++++++++++++++---- 2 files changed, 949 insertions(+), 188 deletions(-) diff --git a/pkg/local_object_storage/metabase/metadata.go b/pkg/local_object_storage/metabase/metadata.go index e4e4c6bee1..302656a253 100644 --- a/pkg/local_object_storage/metabase/metadata.go +++ b/pkg/local_object_storage/metabase/metadata.go @@ -2,25 +2,35 @@ package meta import ( "bytes" + "crypto/sha256" "encoding/base64" "encoding/hex" "errors" "fmt" "math/big" + "slices" + "github.com/google/uuid" + "github.com/mr-tron/base58" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/nspcc-dev/neofs-sdk-go/user" "github.com/nspcc-dev/neofs-sdk-go/version" + "github.com/nspcc-dev/tzhash/tz" "go.etcd.io/bbolt" ) const ( - metaPrefixA = byte(iota) - metaPrefixBI // integer attributes - metaPrefixBS // all other attributes + intLen = 32 + intValLen = 1 + intLen // prefix byte for sign +) + +const ( + metaPrefixA = byte(iota) // TODO: replace A/B/C with meaning + metaPrefixBI // integer attributes + metaPrefixBS // all other attributes metaPrefixC ) @@ -31,7 +41,8 @@ var ( ) // TODO: fill on migration -// TODO: sort integers in buckets naturally +// TODO: ROOT and PHY props +// TODO: cleaning on obj removal func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner user.ID, typ object.Type, creationEpoch uint64, payloadLen uint64, pldHash, pldHmmHash, splitID []byte, parentID, firstID oid.ID, attrs []object.Attribute) error { mb, err := tx.CreateBucketIfNotExists(metaBucketKey(cnr)) @@ -86,25 +97,13 @@ func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner return nil } putInt := func(attr string, n *big.Int) error { - kLn, vOff := makeKeyB(metaPrefixBI, attr, 1+32) // sign + 256-bit. Sign makes some sensible order - if n.Sign() >= 0 { - k[vOff] = 1 - } else { - k[vOff] = 0 - } - vOff++ - n.FillBytes(k[vOff : vOff+32]) + kLn, vOff := makeKeyB(metaPrefixBI, attr, intValLen) // sign + 256-bit. Sign preserves key order + putInt(k[vOff:vOff+intValLen], n) if err := mb.Put(k[:kLn], nil); err != nil { return fmt.Errorf("put integer object attribute %q to container's meta bucket: %w", attr, err) } - kLn, vOff = makeKeyC(attr, 1+32) - if n.Sign() >= 0 { - k[vOff] = 1 - } else { - k[vOff] = 0 - } - vOff++ - n.FillBytes(k[vOff : vOff+32]) + kLn, vOff = makeKeyC(attr, intValLen) + putInt(k[vOff:vOff+intValLen], n) if err := mb.Put(k[:kLn], nil); err != nil { return fmt.Errorf("put integer object attribute %q to container's meta bucket: %w", attr, err) // TODO: distinguishable context } @@ -114,7 +113,7 @@ func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner if err := putPlain(object.FilterVersion, ver.String()); err != nil { return err } - if err := putPlain(object.FilterOwnerID, owner.String()); err != nil { + if err := putPlain(object.FilterOwnerID, string(owner[:])); err != nil { return err } if err := putPlain(object.FilterType, typ.String()); err != nil { @@ -126,20 +125,27 @@ func putMetadata(tx *bbolt.Tx, cnr cid.ID, id oid.ID, ver version.Version, owner if err := putInt(object.FilterPayloadSize, new(big.Int).SetUint64(payloadLen)); err != nil { return err } - if err := putPlain(object.FilterPayloadChecksum, hex.EncodeToString(pldHash)); err != nil { + if err := putPlain(object.FilterPayloadChecksum, string(pldHash)); err != nil { return err } - if err := putPlain(object.FilterPayloadHomomorphicHash, hex.EncodeToString(pldHmmHash)); err != nil { + if err := putPlain(object.FilterPayloadHomomorphicHash, string(pldHmmHash)); err != nil { return err } - if err := putPlain(object.FilterSplitID, string(splitID)); err != nil { - return err + if len(splitID) > 0 { + // TODO: deprecated, maybe not index? + if err := putPlain(object.FilterSplitID, string(splitID)); err != nil { + return err + } } - if err := putPlain(object.FilterFirstSplitObject, firstID.String()); err != nil { - return err + if !firstID.IsZero() { + if err := putPlain(object.FilterFirstSplitObject, string(firstID[:])); err != nil { + return err + } } - if err := putPlain(object.FilterParentID, parentID.String()); err != nil { - return err + if !parentID.IsZero() { + if err := putPlain(object.FilterParentID, string(parentID[:])); err != nil { + return err + } } for i := range attrs { @@ -167,28 +173,320 @@ func (db *DB) Search(cnr cid.ID, fs object.SearchFilters, attrs []string, cursor return nil, "", errors.New("zero count") } + if blindlyProcess(fs) { + return nil, "", nil + } + for i := range fs { + attr := fs[i].Header() + if attr == "" { + return nil, "", fmt.Errorf("invalid filter #%d: missing key", i) + } + // assuming https://github.com/nspcc-dev/neofs-api/issues/318 and https://github.com/nspcc-dev/neofs-api/issues/319 + if attr == object.FilterContainerID || attr == object.FilterID { + return nil, "", fmt.Errorf("prohibited filter %s", attr) + } + } + + var res []objectcore.SearchResultItem + var nextCursor []byte + var err error if len(fs) == 0 { if len(attrs) > 0 { return nil, "", errors.New("attributes are set without filters") } - return db.searchUnfiltered(cnr, cursor, count) + res, nextCursor, err = db.searchUnfiltered(cnr, []byte(cursor), count) + } else { + res, nextCursor, err = db.search(cnr, fs, attrs, []byte(cursor), count) } + if err != nil { + return nil, "", err + } + return res, base64.StdEncoding.EncodeToString(nextCursor), nil +} - return nil, "", errors.New("filters are not supported yet") // TODO +func (db *DB) search(cnr cid.ID, fs object.SearchFilters, attrs []string, cursor []byte, count uint32) ([]objectcore.SearchResultItem, []byte, error) { + seekKey := make([]byte, 1+base64.StdEncoding.DecodedLen(len(cursor))) // TODO: limit + n, err := base64.StdEncoding.Decode(seekKey[1:], cursor) + if err != nil { + return nil, nil, fmt.Errorf("decode cursor from Base64: %w", err) + } + seekKey = seekKey[:1+n] + + var res []objectcore.SearchResultItem + err = db.boltDB.View(func(tx *bbolt.Tx) error { + mb := tx.Bucket(metaBucketKey(cnr)) + if mb == nil { + return nil + } + var err error + res, cursor, err = db.searchInMetaBucket(mb, fs, attrs, seekKey, count) + if err == nil { + cursor = slices.Clone(cursor) + } + return err + }) + if err != nil { + return nil, nil, fmt.Errorf("view BoltDB: %w", err) + } + return res, cursor, nil } -func (db *DB) searchUnfiltered(cnr cid.ID, cursor string, count uint32) ([]objectcore.SearchResultItem, string, error) { - var cursorKey []byte - var err error - if cursor != "" { - cursorKey = make([]byte, 1+base64.StdEncoding.DecodedLen(len(cursor))) - n, err := base64.StdEncoding.Decode(cursorKey[1:], []byte(cursor)) - if err != nil { - return nil, "", fmt.Errorf("decode cursor from Base64: %w", err) +func (db *DB) searchInMetaBucket(mb *bbolt.Bucket, fs object.SearchFilters, attrs []string, + primSeekKey []byte, count uint32) ([]objectcore.SearchResultItem, []byte, error) { + // 1. build primary key to seek PK. It's either BX_KEY_DELIM_VAL_DELIM_OID or A_OID (for NOT_PRESENT). + // Also specify primary bucket. X can be N for NUM filters or both N|S for others. + // 2. seek primary key in primary bucket. If PK exists, switch to next. If nil key received - result is empty. + // 3. build primary key prefix PP. It's PREFIXN_KEY_DELIM for B or PREFIXA for A + // 4. iterate over primary bucket while prefix is PP, for each element do following. + // + // if not NOT_PRESENT: + // 1. parse VAL and OID from the current key. + // 2. apply all primary attribute filters (can be several) to the VAL. On mismatch, go NEXT. Otherwise, this is a result candidate. + // 3. for each not processed yet filter, seek C_OID_KEY_DELIM_VAL element by C_OID_KEY_DELIM key. + // 4. If matcher is NOT_PRESENT, go NEXT. + // 5. parse VAL from the current key. + // 6. apply all current attribute's filters (can be several). On mismatch, go NEXT. + // 7. for each requested attribute: + // + // 1. seek C_OID_KEY_DELIM_VAL by C_OID_KEY_DELIM key + // 2. get VAL or "" if missing + // 3. iterate over already collected resulting elements and compare VAL with them. If we find a place + // within the result by this attribute (P). If attribute is last, continue to + // cmp by ID to get the final place and go NEXT. Otherwise, continue with the + // next attribute, but starting from P. + // + // NOTE: integers should be compared properly. If some element is an integer and + // current one is not, consider it >. + // + // NOTE: if attribute is missing, VAL is "" which is < than non-empty values (mb + // except integers mentioned above). For better UX, we can consider this + // particular case as > so attributed objects go first in the result. + primMatcher := fs[0].Operation() + primMatcherInt := isNumericOp(primMatcher) + indexByID := primMatcher == object.MatchNotPresent + if indexByID { + primSeekKey[0] = metaPrefixA + } else if primMatcherInt { + primSeekKey[0] = metaPrefixBI + } else { + primSeekKey[0] = metaPrefixBS + } + + primAttr := fs[0].Header() // non-empty, already checked + cursorBytes := primSeekKey[1:] + var primPrefix []byte + if indexByID { + primPrefix = primSeekKey[:1] + } else { + primAttrBytes := []byte(primAttr) + if len(cursorBytes) > 0 { + ind := bytes.Index(cursorBytes, utf8Delimiter) + if ind < 0 { + return nil, nil, errors.New("invalid cursor: missing delimiter") + } + if !bytes.Equal(cursorBytes[:ind], primAttrBytes) { + return nil, nil, errors.New("invalid cursor: wrong primary attribute") + } + primPrefix = primSeekKey[:1+ind+len(utf8Delimiter)] + } else { + primSeekKey = slices.Concat(primSeekKey[:1], primAttrBytes, utf8Delimiter) + primPrefix = primSeekKey + } + } + + primCursor := mb.Cursor() + primKey, _ := primCursor.Seek(primSeekKey) // TODO: replace A/B/C with meaning + if bytes.Equal(primKey, primSeekKey) { // points to the last response element, so go next + primKey, _ = primCursor.Next() + } + if primKey == nil { + return nil, nil, nil + } + + res := make([]objectcore.SearchResultItem, count) + primKeys := make([][]byte, len(res)) + var n uint32 + var dbVal, id []byte + var moreObjs bool + var cSeekKey, biKey []byte // TODO: replace A/B/C with meaning + var cCursor *bbolt.Cursor +nextPrimKey: + for ; bytes.HasPrefix(primKey, primPrefix); primKey, _ = primCursor.Next() { + if indexByID { + if id = primKey[1:]; len(id) != oid.Size { + return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid OID len %d", len(id)) + } + } else { + // apply primary filter + dbValID := primKey[len(primPrefix):] // VAL_DELIM_OID + ind := bytes.LastIndex(dbValID, utf8Delimiter) // some attributes like payload hashes can contain delimiter + if ind < 0 { + return nil, nil, errors.New("invalid key in meta bucket: missing 2nd delimiter") + } + dbVal, id = dbValID[:ind], dbValID[ind+len(utf8Delimiter):] + switch { + case len(dbVal) == 0: + return nil, nil, errors.New("invalid key in meta bucket: missing attribute value") + case len(id) != oid.Size: + return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid OID len %d", len(id)) + } + if !primMatcherInt && primKey[0] == metaPrefixBI { + n, err := intFromBytes(dbVal) + if err != nil { + return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid integer value: %w", err) + } + dbVal = []byte(n.String()) + } + for i := range fs { + // there may be several filters by primary key, e.g. N >= 10 && N <= 20. We + // check them immediately before moving through the DB. + attr := fs[i].Header() + if i > 0 && attr != primAttr { + continue + } + checkedDBVal, fltVal, err := combineValues(attr, dbVal, fs[i].Value()) + if err != nil { + return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid attribute %s value: %w", attr, err) + } + if ok, err := matchValues(checkedDBVal, fs[i].Operation(), fltVal); err != nil { + return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid attribute value: %w", err) + } else if !ok { + continue nextPrimKey + } + } + } + // apply other filters + for i := range fs { + if !indexByID && i == 0 { // 1st already checked + continue + } + attr := fs[i].Header() // non-empty, checked above + for j := 1; j < i; j++ { + if fs[j].Header() == attr { // has already been checked and matches, skip + continue + } + } + + attrBytes := []byte(attr) + kln := 1 + oid.Size + len(attrBytes) + len(utf8Delimiter) // TODO: consts + if len(cSeekKey) < kln { + cSeekKey = make([]byte, kln) + } + cSeekKey[0] = metaPrefixC + off := 1 + copy(cSeekKey[1:], id) + off += copy(cSeekKey[off:], attrBytes) + copy(cSeekKey[off:], utf8Delimiter) + if cCursor == nil { + cCursor = mb.Cursor() + } + key, _ := cCursor.Seek(cSeekKey[:kln]) + found := key != nil && bytes.HasPrefix(key, cSeekKey[:kln]) + if found { + if dbVal = key[kln:]; len(dbVal) == 0 { + return nil, nil, errors.New("invalid key in meta bucket: missing attribute value") + } + } + + var isDBInt *bool + var dbValParsedInt []byte // set iff isDBInt is true + for j := i; j < len(fs); j++ { + if j > 0 && fs[j].Header() != attr { + continue + } + matcher := fs[j].Operation() + if !found { + if matcher == object.MatchNotPresent { + continue + } + continue nextPrimKey + } + if matcher == object.MatchNotPresent { + continue nextPrimKey + } + if isDBInt == nil { + if len(dbVal) != intValLen { + isDBInt = new(bool) + } else { + // do the same as for primary attribute, but unlike there, here we don't know + // whether the attribute is expected to be integer or not. + kln := 1 + oid.Size + len(attr) + len(dbVal) + len(utf8Delimiter)*2 + if len(biKey) < kln { + biKey = make([]byte, kln) + } + biKey[0] = metaPrefixBI + off := 1 + copy(biKey[1:], attr) + off += copy(biKey[off:], utf8Delimiter) + off += copy(biKey[off:], dbVal) + off += copy(biKey[off:], utf8Delimiter) + copy(biKey[off:], id[:]) + isDBInt = new(bool) + if *isDBInt = mb.Get(biKey) != nil; *isDBInt { + n, err := intFromBytes(dbVal) + if err != nil { + return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid integer value: %w", err) + } + dbValParsedInt = []byte(n.String()) + } + } + } + intMatcher := isNumericOp(matcher) + var checkedVal []byte + if intMatcher { + if !*isDBInt { + continue nextPrimKey + } + checkedVal = dbVal + } else if *isDBInt { + checkedVal = dbValParsedInt + } else { + checkedVal = dbVal + } + checkedDBVal, fltVal, err := combineValues(attr, checkedVal, fs[j].Value()) + if err != nil { + return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid attribute %s value: %w", attr, err) + } + if ok, err := matchValues(checkedDBVal, matcher, fltVal); err != nil { + return nil, nil, fmt.Errorf("invalid key in meta bucket: invalid attribute value: %w", err) + } else if !ok { + continue nextPrimKey + } + } + } + // object matches + // TODO: gather attributes + for i := range n { + if bytes.Compare(id, res[i].ID[:]) < 0 { + copy(res[i+1:], res[i:n]) + copy(primKeys[i+1:], primKeys[i:n]) + res[i].ID = oid.ID(id) + primKeys[i] = primKey + n++ + continue nextPrimKey + } + } + if n == count { + moreObjs = true + continue nextPrimKey } - cursorKey[0] = metaPrefixA - cursorKey = cursorKey[:1+n] + res[n].ID = oid.ID(id) + primKeys[n] = primKey + n++ + } + if moreObjs { + return res[:n], primKeys[n-1], nil + } + return res[:n], nil, nil +} + +// TODO: can be merged with filtered code? +func (db *DB) searchUnfiltered(cnr cid.ID, cursor []byte, count uint32) ([]objectcore.SearchResultItem, []byte, error) { + seekKey := make([]byte, 1+base64.StdEncoding.DecodedLen(len(cursor))) // TODO: limit + ln, err := base64.StdEncoding.Decode(seekKey[1:], cursor) + if err != nil { + return nil, nil, fmt.Errorf("decode cursor from Base64: %w", err) } + seekKey[0] = metaPrefixA + seekKey = seekKey[:1+ln] res := make([]objectcore.SearchResultItem, count) var n uint32 @@ -199,38 +497,190 @@ func (db *DB) searchUnfiltered(cnr cid.ID, cursor string, count uint32) ([]objec } mbc := mb.Cursor() - k, _ := mbc.Seek(cursorKey) - if cursor != "" && bytes.Equal(k, cursorKey) { // cursor is the last response element, so go next + k, _ := mbc.Seek(seekKey) + if len(cursor) > 0 && bytes.Equal(k, seekKey) { // cursor is the last response element, so go next k, _ = mbc.Next() } - for ; k != nil; k, _ = mbc.Next() { - if len(k) == 0 || k[0] != metaPrefixA { // empty key is not expected, but better to be safe - continue - } + for ; len(k) > 0 && k[0] == metaPrefixA; k, _ = mbc.Next() { if n == count { // there are still elements - // TODO: better to replace proto encoding to upper level and accept/return - // DB-detailed cursor at this level - cursor = base64.StdEncoding.EncodeToString(res[n-1].ID[:]) + cursor = res[n-1].ID[:] return nil } k = k[1:] if len(k) != oid.Size { return fmt.Errorf("unexpected object key len %d, expected 33", len(k)) } - copy(res[n].ID[:], k) + res[n].ID = oid.ID(k) n++ } - cursor = "" + cursor = nil return nil }) if err != nil { - return nil, "", fmt.Errorf("view BoltDB: %w", err) + return nil, nil, fmt.Errorf("view BoltDB: %w", err) } return res[:n], cursor, nil } +// combines formats attribute's DB and NeoFS API SearchV2 values to the +// matchable one. Returns DB errors only. +func combineValues(attr string, dbVal []byte, fltVal string) ([]byte, []byte, error) { + switch attr { + case object.FilterOwnerID: + if len(dbVal) != user.IDSize { + return nil, nil, fmt.Errorf("invalid owner len %d != %d", len(dbVal), user.IDSize) + } + if b, _ := base58.Decode(fltVal); len(b) == user.IDSize { + return dbVal, b, nil + } + // consider filter 'owner PREFIX N': + // - any object matches it + // - decoded filter byte is always 21 while the DB one is always 53 + // so we'd get false mismatch. To avoid this, we have to decode each DB val. + dbVal = []byte(base58.Encode(dbVal)) + case object.FilterFirstSplitObject, object.FilterParentID: + if len(dbVal) != oid.Size { + return nil, nil, fmt.Errorf("invalid OID len %d != %d", len(dbVal), oid.Size) + } + if b, _ := base58.Decode(fltVal); len(b) == oid.Size { + return dbVal, b, nil + } + // same as owner + dbVal = []byte(base58.Encode(dbVal)) + case object.FilterPayloadChecksum: + if len(dbVal) != sha256.Size { + return nil, nil, fmt.Errorf("invalid payload checksum len %d != %d", len(dbVal), sha256.Size) + } + if b, err := hex.DecodeString(fltVal); err == nil { + return dbVal, b, nil + } + dbVal = []byte(hex.EncodeToString(dbVal)) + case object.FilterPayloadHomomorphicHash: + if len(dbVal) != tz.Size { + return nil, nil, fmt.Errorf("invalid payload homomorphic hash len %d != %d", len(dbVal), tz.Size) + } + if b, err := hex.DecodeString(fltVal); err == nil { + return dbVal, b, nil + } + dbVal = []byte(hex.EncodeToString(dbVal)) + case object.FilterSplitID: + if len(dbVal) != 16 { + return nil, nil, fmt.Errorf("invalid split ID len %d != 16", len(dbVal)) + } + uid, err := uuid.Parse(fltVal) + if err == nil { + return dbVal, uid[:], nil + } + copy(uid[:], dbVal) + dbVal = []byte(uid.String()) + } + return dbVal, []byte(fltVal), nil +} + func metaBucketKey(cnr cid.ID) []byte { k := [1 + cid.Size]byte{metadataPrefix} copy(k[1:], cnr[:]) return k[:] } + +func intBytes(n *big.Int) []byte { + b := make([]byte, intValLen) + putInt(b, n) + return b +} + +func putInt(b []byte, n *big.Int) { + if len(b) < intValLen { + panic(fmt.Errorf("insufficient buffer len %d", len(b))) + } + neg := n.Sign() < 0 + if neg { + b[0] = 0 + } else { + b[0] = 1 + } + n.FillBytes(b[1:intValLen]) + if neg { + for i := range b[1:] { + b[1+i] = ^b[1+i] + } + } +} + +func intFromBytes(b []byte) (*big.Int, error) { + if len(b) != intValLen { + return nil, fmt.Errorf("invalid len %d", len(b)) + } + switch b[0] { + default: + return nil, fmt.Errorf("invalid sign byte %d", b[0]) + case 1: + return new(big.Int).SetBytes(b[1:]), nil + case 0: + cp := slices.Clone(b[1:]) + for i := range cp { + cp[i] = ^cp[i] + } + n := new(big.Int).SetBytes(cp) + return n.Neg(n), nil + } +} + +// matches object attribute's search query value to the DB-stored one. Matcher +// must be supported but not [object.MatchNotPresent]. Returns errors related to +// DB values only signaling storage problems. +func matchValues(dbVal []byte, matcher object.SearchMatchType, fltVal []byte) (bool, error) { + switch { + default: + return false, nil // TODO: check whether supported in blindlyProcess. Then panic here + case matcher == object.MatchNotPresent: + panic(errors.New("unexpected matcher NOT_PRESENT")) + case matcher == object.MatchStringEqual: + return bytes.Equal(dbVal, fltVal), nil + case matcher == object.MatchStringNotEqual: + return !bytes.Equal(dbVal, fltVal), nil + case matcher == object.MatchCommonPrefix: + return bytes.HasPrefix(dbVal, fltVal), nil + case isNumericOp(matcher): + if len(dbVal) != intValLen { // TODO: const + return false, fmt.Errorf("invalid integer len %d", len(dbVal)) + } + if dbVal[0] != 0 && dbVal[0] != 1 { + return false, fmt.Errorf("invalid integer sign byte %d", dbVal[0]) + } + var n big.Int + return n.UnmarshalText(fltVal) == nil && intMatches(dbVal, matcher, &n), nil + } +} + +func intMatches(dbVal []byte, matcher object.SearchMatchType, fltVal *big.Int) bool { + if c := fltVal.Cmp(maxUint256); c >= 0 { + if matcher == object.MatchNumGT || c > 0 && matcher == object.MatchNumGE { + return false + } + if matcher == object.MatchNumLE || c > 0 && matcher == object.MatchNumLT { + return true + } + } + if c := fltVal.Cmp(maxUint256Neg); c <= 0 { + if matcher == object.MatchNumLT || c < 0 && matcher == object.MatchNumLE { + return false + } + if matcher == object.MatchNumGE || c < 0 && matcher == object.MatchNumGT { + return true + } + } + fltValBytes := intBytes(fltVal) // TODO: buffer can be useful for other filters + switch matcher { + default: + panic(fmt.Errorf("unexpected integer matcher %d", matcher)) + case object.MatchNumGT: + return bytes.Compare(dbVal, fltValBytes) > 0 + case object.MatchNumGE: + return bytes.Compare(dbVal, fltValBytes) >= 0 + case object.MatchNumLT: + return bytes.Compare(dbVal, fltValBytes) < 0 + case object.MatchNumLE: + return bytes.Compare(dbVal, fltValBytes) <= 0 + } +} diff --git a/pkg/local_object_storage/metabase/metadata_test.go b/pkg/local_object_storage/metabase/metadata_test.go index 1b76b8d972..eb10a5b488 100644 --- a/pkg/local_object_storage/metabase/metadata_test.go +++ b/pkg/local_object_storage/metabase/metadata_test.go @@ -3,6 +3,8 @@ package meta import ( "bytes" "encoding/base64" + "math" + "math/big" "math/rand" "slices" "strconv" @@ -27,6 +29,10 @@ func sortObjectIDs(ids []oid.ID) []oid.ID { return s } +func appendAttribute(obj *object.Object, k, v string) { + obj.SetAttributes(append(obj.Attributes(), *object.NewAttribute(k, v))...) +} + func assertAttrPrefixed[T string | []byte](t testing.TB, mb *bbolt.Bucket, id oid.ID, prefix byte, attr string, val T) { k := []byte{prefix} k = append(k, attr...) @@ -112,13 +118,12 @@ func TestPutMetadata(t *testing.T) { require.Equal(t, []byte{}, mb.Get(append([]byte{0x00}, id[:]...))) assertAttr(t, mb, id, "$Object:version", "v2138538449.1476143219") - assertAttr(t, mb, id, "$Object:ownerID", "NTASyD5E5yCGHXbBd5hQXyoJWxjpW6D4QB") + assertAttr(t, mb, id, "$Object:ownerID", owner[:]) assertAttr(t, mb, id, "$Object:objectType", "LINK") - assertAttr(t, mb, id, "$Object:payloadHash", "5fa5624a3a436dc3e2eefdf14007f1f0f12ef3b68211c20b0799ab4f834c9a5b") - assertAttr(t, mb, id, "$Object:homomorphicHash", - "7c7f43ecbaa696ca0473a33af2499523995d04f73e120d96358d83accfa4bbf0101e121e8800c5d5b93e99df2ad5cf568390797ffbf8fdb09165454b0c611b13") - assertAttr(t, mb, id, "$Object:split.parent", "5RpvbXarudwzD6eRneJeokTmKua5VRpJC167yqENCJb8") - assertAttr(t, mb, id, "$Object:split.first", "ExF4Td91XaBJnXqvKUuijeJsTrTxovbaqksU6qy5n8AE") + assertAttr(t, mb, id, "$Object:payloadHash", pldHashBytes[:]) + assertAttr(t, mb, id, "$Object:homomorphicHash", pldHmmHashBytes[:]) + assertAttr(t, mb, id, "$Object:split.parent", parentID[:]) + assertAttr(t, mb, id, "$Object:split.first", firstID[:]) assertIntAttr(t, mb, id, "$Object:creationEpoch", []byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 101, 118, 30, 154, 145, 227, 159, 231}) assertIntAttr(t, mb, id, "$Object:payloadLength", []byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, @@ -126,12 +131,12 @@ func TestPutMetadata(t *testing.T) { assertAttr(t, mb, id, "attr_1", "val_1") assertAttr(t, mb, id, "attr_2", "val_2") assertAttr(t, mb, id, "num_negative_overflow", "-115792089237316195423570985008687907853269984665640564039457584007913129639936") - assertIntAttr(t, mb, id, "num_negative_min", []byte{0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, - 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255}) - assertIntAttr(t, mb, id, "num_negative_min64", []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 128, 0, 0, 0, 0, 0, 0, 0}) - assertIntAttr(t, mb, id, "num_negative_max", []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, - 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1}) + assertIntAttr(t, mb, id, "num_negative_min", []byte{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) + assertIntAttr(t, mb, id, "num_negative_min64", []byte{0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 127, 255, 255, 255, 255, 255, 255, 255}) + assertIntAttr(t, mb, id, "num_negative_max", []byte{0, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, + 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 255, 254}) assertIntAttr(t, mb, id, "num_zero", []byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}) assertIntAttr(t, mb, id, "num_positive_max64", []byte{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, @@ -145,8 +150,205 @@ func TestPutMetadata(t *testing.T) { require.NoError(t, err) } -func appendAttribute(obj *object.Object, k, v string) { - obj.SetAttributes(append(obj.Attributes(), *object.NewAttribute(k, v))...) +func TestApplyFilter(t *testing.T) { + t.Run("unsupported matcher", func(t *testing.T) { + ok, err := matchValues(nil, 9, nil) + require.NoError(t, err) + require.False(t, ok) + }) + t.Run("not present", func(t *testing.T) { + require.Panics(t, func() { _, _ = matchValues(nil, object.MatchNotPresent, nil) }) + }) + check := func(dbVal []byte, m object.SearchMatchType, fltVal []byte, exp bool) { + ok, err := matchValues(dbVal, m, fltVal) + require.NoError(t, err) + require.Equal(t, exp, ok) + } + anyData := []byte("Hello, world!") + t.Run("EQ", func(t *testing.T) { + check := func(dbVal, fltVal []byte, exp bool) { check(dbVal, object.MatchStringEqual, fltVal, exp) } + check(nil, nil, true) + check([]byte{}, nil, true) + check(anyData, anyData, true) + check(anyData, anyData[:len(anyData)-1], false) + check(anyData, append(anyData, 1), false) + for i := range anyData { + dbVal := slices.Clone(anyData) + dbVal[i]++ + check(dbVal, anyData, false) + } + }) + t.Run("NE", func(t *testing.T) { + check := func(dbVal, fltVal []byte, exp bool) { check(dbVal, object.MatchStringNotEqual, fltVal, exp) } + check(nil, nil, false) + check([]byte{}, nil, false) + check(anyData, anyData, false) + check(anyData, anyData[:len(anyData)-1], true) + check(anyData, append(anyData, 1), true) + for i := range anyData { + dbVal := slices.Clone(anyData) + dbVal[i]++ + check(dbVal, anyData, true) + } + }) + t.Run("has prefix", func(t *testing.T) { + check := func(dbVal, fltVal []byte, exp bool) { check(dbVal, object.MatchCommonPrefix, fltVal, exp) } + check(nil, nil, true) + check([]byte{}, nil, true) + check(anyData, anyData, true) + check(anyData, anyData[:len(anyData)-1], true) + check(anyData, append(anyData, 1), false) + for i := range len(anyData) { + check(anyData, anyData[:i], true) + changed := slices.Concat(anyData[:i], []byte{anyData[i] + 1}, anyData[i+1:]) + check(anyData, changed[:i+1], false) + } + }) + t.Run("int", func(t *testing.T) { + newBytes := func(ln int, prefix byte) []byte { + b := make([]byte, ln) + b[0] = prefix + return b + } + for _, tc := range []struct { + name, err string + v []byte + }{ + {name: "empty", err: "invalid integer len 0", v: []byte{}}, + {name: "undersize", err: "invalid integer len 16", v: newBytes(intValLen/2, 1)}, + {name: "oversize", err: "invalid integer len 34", v: newBytes(intValLen+1, 1)}, + {name: "sign", err: "invalid integer sign byte 13", v: newBytes(intValLen, 13)}, + } { + t.Run("invalid DB value/"+tc.name, func(t *testing.T) { + for _, matcher := range []object.SearchMatchType{ + object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, + } { + _, err := matchValues(tc.v, matcher, []byte("10")) + require.EqualError(t, err, tc.err) + } + }) + } + t.Run("non-int filter value", func(t *testing.T) { + for _, matcher := range []object.SearchMatchType{ + object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, + } { + ok, err := matchValues(make([]byte, intValLen), matcher, []byte("1.5")) + require.NoError(t, err) + require.False(t, ok) + } + }) + check := func(dbVal *big.Int, matcher object.SearchMatchType, fltVal *big.Int, exp bool) { + check(intBytes(dbVal), matcher, []byte(fltVal.String()), exp) + } + one := big.NewInt(1) + max64 := new(big.Int).SetUint64(math.MaxUint64) + ltMin := new(big.Int).Sub(maxUint256Neg, one) + gtMax := new(big.Int).Add(maxUint256, one) + ns := []*big.Int{ + maxUint256Neg, + new(big.Int).Add(maxUint256Neg, big.NewInt(1)), + new(big.Int).Neg(max64), + big.NewInt(-1), + big.NewInt(0), + one, + max64, + new(big.Int).Sub(maxUint256, big.NewInt(1)), + maxUint256, + } + for i, n := range ns { + check(n, object.MatchNumGT, ltMin, true) + check(n, object.MatchNumGE, ltMin, true) + check(n, object.MatchNumLT, ltMin, false) + check(n, object.MatchNumLE, ltMin, false) + + check(n, object.MatchNumGT, gtMax, false) + check(n, object.MatchNumGE, gtMax, false) + check(n, object.MatchNumLT, gtMax, true) + check(n, object.MatchNumLE, gtMax, true) + + check(n, object.MatchNumGT, n, false) + check(n, object.MatchNumGE, n, true) + check(n, object.MatchNumLT, n, false) + check(n, object.MatchNumLE, n, true) + + for j := range i { + check(n, object.MatchNumGT, ns[j], true) + check(n, object.MatchNumGE, ns[j], true) + check(n, object.MatchNumLT, ns[j], false) + check(n, object.MatchNumLE, ns[j], false) + } + for j := i + 1; j < len(ns); j++ { + check(n, object.MatchNumGT, ns[j], false) + check(n, object.MatchNumGE, ns[j], false) + check(n, object.MatchNumLT, ns[j], true) + check(n, object.MatchNumLE, ns[j], true) + } + + minusOne := new(big.Int).Sub(n, one) + check(n, object.MatchNumGT, minusOne, true) + check(n, object.MatchNumGE, minusOne, true) + check(n, object.MatchNumLT, minusOne, false) + check(n, object.MatchNumLE, minusOne, false) + plusOne := new(big.Int).Add(n, one) + check(n, object.MatchNumGT, plusOne, false) + check(n, object.MatchNumGE, plusOne, false) + check(n, object.MatchNumLT, plusOne, true) + check(n, object.MatchNumLE, plusOne, true) + } + }) +} + +func TestIntBucketOrder(t *testing.T) { + db := newDB(t) + ns := []*big.Int{ + maxUint256Neg, + new(big.Int).Add(maxUint256Neg, big.NewInt(1)), + big.NewInt(math.MinInt64), + big.NewInt(-1), + big.NewInt(0), + big.NewInt(1), + new(big.Int).SetUint64(math.MaxUint64), + new(big.Int).Sub(maxUint256, big.NewInt(1)), + maxUint256, + } + rand.Shuffle(len(ns), func(i, j int) { ns[i], ns[j] = ns[j], ns[i] }) + + err := db.boltDB.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists([]byte("any")) + if err != nil { + return err + } + for _, n := range ns { + if err := b.Put(intBytes(n), nil); err != nil { + return err + } + } + return nil + }) + require.NoError(t, err) + + var collected []string + err = db.boltDB.View(func(tx *bbolt.Tx) error { + c := tx.Bucket([]byte("any")).Cursor() + for k, _ := c.First(); k != nil; k, _ = c.Next() { + n, err := intFromBytes(k) + require.NoError(t, err) + collected = append(collected, n.String()) + } + return nil + }) + + require.Equal(t, []string{ + "-115792089237316195423570985008687907853269984665640564039457584007913129639935", + "-115792089237316195423570985008687907853269984665640564039457584007913129639934", + "-9223372036854775808", + "-1", + "0", + "1", + "18446744073709551615", + "115792089237316195423570985008687907853269984665640564039457584007913129639934", + "115792089237316195423570985008687907853269984665640564039457584007913129639935", + }, collected) } func TestDB_SearchObjects(t *testing.T) { @@ -164,14 +366,26 @@ func TestDB_SearchObjects(t *testing.T) { _, _, err := db.Search(cidtest.ID(), nil, nil, "not_base64", 1) require.ErrorContains(t, err, "decode cursor from Base64") }) - }) - t.Run("no filters", func(t *testing.T) { - t.Run("invalid input", func(t *testing.T) { - t.Run("with attributes", func(t *testing.T) { - _, _, err := db.Search(cidtest.ID(), nil, []string{"any"}, "", 1) - require.EqualError(t, err, "attributes are set without filters") + t.Run("attributes without filters", func(t *testing.T) { + _, _, err := db.Search(cidtest.ID(), nil, []string{"any"}, "", 1) + require.EqualError(t, err, "attributes are set without filters") + }) + t.Run("filters", func(t *testing.T) { + t.Run("container", func(t *testing.T) { + var fs object.SearchFilters + fs.AddObjectContainerIDFilter(object.SearchMatchType(rand.Int31()), cidtest.ID()) + _, _, err := db.Search(cidtest.ID(), fs, nil, "", 1) + require.EqualError(t, err, "prohibited filter $Object:containerID") + }) + t.Run("ID", func(t *testing.T) { + var fs object.SearchFilters + fs.AddObjectIDFilter(object.SearchMatchType(rand.Int31()), oidtest.ID()) + _, _, err := db.Search(cidtest.ID(), fs, nil, "", 1) + require.EqualError(t, err, "prohibited filter $Object:objectID") }) }) + }) + t.Run("no filters", func(t *testing.T) { t.Run("BoltDB failure", func(t *testing.T) { db := newDB(t) require.NoError(t, db.boltDB.Close()) @@ -190,7 +404,6 @@ func TestDB_SearchObjects(t *testing.T) { }) const n = 10 - ids := oidtest.IDs(n) objs := make([]object.Object, n) for i := range objs { @@ -275,7 +488,7 @@ func TestDB_SearchObjects(t *testing.T) { const nRoot = 2 const nPhy = 2 * nRoot const nAll = nRoot + nPhy - all := []uint{0, 1, 2, 3, 4, 5, 6} + all := []uint{0, 1, 2, 3, 4, 5} group1 := []uint{0, 2, 4} group2 := []uint{1, 3, 5} ids := [nAll]oid.ID{ @@ -363,7 +576,7 @@ func TestDB_SearchObjects(t *testing.T) { *object.NewAttribute("attr_common", "val_common"), *object.NewAttribute("unique_attr_"+si, "unique_val_"+si), groupAttrs[nGroup], - *object.NewAttribute("global_non_integer", "non an integer"), + *object.NewAttribute("global_non_integer", "not an integer"), ) } @@ -376,11 +589,9 @@ func TestDB_SearchObjects(t *testing.T) { for i := range phys { nGroup := i % nRoot initObj(&phys[i], nRoot+i, nGroup) - if nGroup == 1 { - phys[i].SetSplitID(object.NewSplitIDFromV2(splitIDs[nGroup])) - phys[i].SetFirstID(firstIDs[nGroup]) - phys[i].SetParent(&pars[nGroup]) - } + phys[i].SetSplitID(object.NewSplitIDFromV2(splitIDs[nGroup])) + phys[i].SetFirstID(firstIDs[nGroup]) + phys[i].SetParent(&pars[nGroup]) } appendAttribute(&pars[0], "attr_int", "-115792089237316195423570985008687907853269984665640564039457584007913129639935") @@ -395,61 +606,44 @@ func TestDB_SearchObjects(t *testing.T) { check := func(k string, m object.SearchMatchType, v string, matchInds []uint) { var fs object.SearchFilters - fs.AddFilter("attr_common", "val_common", object.MatchStringEqual) fs.AddFilter(k, v, m) - res, cursor, err := db.Search(cnr, fs, []string{"attr_common"}, "", nAll) + res, cursor, err := db.Search(cnr, fs, nil, "", nAll) require.NoError(t, err) require.Empty(t, cursor) require.Len(t, res, len(matchInds)) for i, ind := range matchInds { require.Equal(t, ids[ind], res[i].ID) - require.Len(t, res[i].Attributes, 1) + require.Empty(t, res[i].Attributes) } } t.Run("all", func(t *testing.T) { check("attr_common", object.MatchStringEqual, "val_common", all) }) - t.Run("container", func(t *testing.T) { - // Since container ID is a selection parameter itself, only EQ and PREFIX with - // CID prefix match. For other prefixes, result is always empty, and it can be - // obtained without touching the BoltDB. - check("$Object:containerID", object.MatchStringEqual, "HYFTEXkzpDWkXU6anQByuSPvV3imjzTKJBaAyD4VYg23", all) - check("$Object:containerID", object.MatchStringEqual, "EiyzAP7fv1DwtBycvZXQRvLfuxy6GFZm8mErhyR2KhKy", nil) - check("$Object:containerID", object.MatchStringEqual, "", nil) - check("$Object:containerID", object.MatchStringNotEqual, "HYFTEXkzpDWkXU6anQByuSPvV3imjzTKJBaAyD4VYg23", nil) - check("$Object:containerID", object.MatchStringNotEqual, "EiyzAP7fv1DwtBycvZXQRvLfuxy6GFZm8mErhyR2KhKy", all) - check("$Object:containerID", object.MatchStringNotEqual, "", all) - for _, m := range []object.SearchMatchType{ - object.MatchNotPresent, object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, - } { - check("$Object:containerID", m, "HYFTEXkzpDWkXU6anQByuSPvV3imjzTKJBaAyD4VYg23", nil) - } - check("$Object:containerID", object.MatchCommonPrefix, "", all) - check("$Object:containerID", object.MatchCommonPrefix, "H", all) - check("$Object:containerID", object.MatchCommonPrefix, "HYFTEXkzpDWkXU6anQByuSPv", all) - check("$Object:containerID", object.MatchCommonPrefix, "HYFTEXkzpDWkXU6anQByuSPvV3imjzTKJBaAyD4VYg23", all) - check("$Object:containerID", object.MatchCommonPrefix, "HYFTEXkzpDWkXU6anQByuSPvV3imjzTKJBaAyD4VYg234", nil) - check("$Object:containerID", object.MatchCommonPrefix, "X", nil) - // TODO: also check that BoltDB is untouched on mismatch - }) t.Run("user attributes", func(t *testing.T) { // unique for i := range all { si := strconv.Itoa(i) key := "unique_attr_" + si - val := "val_" + si + val := "unique_val_" + si check(key, object.MatchStringEqual, val, []uint{uint(i)}) check(key, object.MatchStringNotEqual, "other_val", []uint{uint(i)}) for j := range val { check(key, object.MatchCommonPrefix, val[:j], []uint{uint(i)}) } - for _, op := range []object.SearchMatchType{ + for _, matcher := range []object.SearchMatchType{ object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, } { - check(key, op, val, nil) + check(key, matcher, val, nil) } + var others []uint + for j := range all { + if j != i { + others = append(others, uint(j)) + } + } + check(key, object.MatchNotPresent, "", others) } // group const val1 = "group_val_1" @@ -466,61 +660,32 @@ func TestDB_SearchObjects(t *testing.T) { for i := range val1 { check("group_attr_2", object.MatchCommonPrefix, val2[:i], group2) } - for _, op := range []object.SearchMatchType{ + for _, matcher := range []object.SearchMatchType{ object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, } { - check("group_attr_1", op, val1, nil) - check("group_attr_2", op, val2, nil) + check("group_attr_1", matcher, val1, nil) + check("group_attr_2", matcher, val2, nil) } }) t.Run("ROOT", func(t *testing.T) { + t.Skip("not supported yet") check("$Object:ROOT", 0, "", []uint{0, 1}) - for _, op := range []object.SearchMatchType{ + for _, matcher := range []object.SearchMatchType{ object.MatchStringEqual, object.MatchStringNotEqual, object.MatchNotPresent, object.MatchCommonPrefix, object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, } { - check("$Object:ROOT", op, "", nil) + check("$Object:ROOT", matcher, "", nil) } }) t.Run("PHY", func(t *testing.T) { + t.Skip("not supported yet") check("$Object;PHY", 0, "", []uint{0, 1, 2, 3}) - for _, op := range []object.SearchMatchType{ + for _, matcher := range []object.SearchMatchType{ object.MatchStringEqual, object.MatchStringNotEqual, object.MatchNotPresent, object.MatchCommonPrefix, object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, } { - check("$Object:PHY", op, "", nil) - } - }) - t.Run("ID", func(t *testing.T) { - check := func(m object.SearchMatchType, v string, matchInds []uint) { - check("$Object:objectID", m, v, matchInds) - } - check(object.MatchStringEqual, "RSYscGLzKw1nkeVRGpowYTGgtgodXJrMyyiHTGGJW3S", []uint{0}) - check(object.MatchStringEqual, "6dMvfyLF7HZ1WsBRgrLUDZP4pLkvNRjB6HWGeNXP4fJp", []uint{1}) - check(object.MatchStringEqual, "6hkrsFBPpAKTAKHeC5gycCZsz2BQdKtAn9ADriNdWf4E", []uint{2}) - check(object.MatchStringEqual, "BQY3VShN1BmU6XDKiQaDo2tk7s7rkYuaGeVgmcHcWsRY", []uint{3}) - check(object.MatchStringEqual, "DsKLie7U2BVph5XkZttG8EERxt9DFQXkrowr6LFkxp8h", []uint{4}) - check(object.MatchStringEqual, "Gv9XcEW7KREB8cnjFbW8HBdJesMnbNKknfGdBNsVtQmB", []uint{5}) - check(object.MatchStringNotEqual, "RSYscGLzKw1nkeVRGpowYTGgtgodXJrMyyiHTGGJW3S", []uint{1, 2, 3, 4, 5}) - check(object.MatchStringNotEqual, "6dMvfyLF7HZ1WsBRgrLUDZP4pLkvNRjB6HWGeNXP4fJp", []uint{0, 2, 3, 4, 5}) - check(object.MatchStringNotEqual, "6hkrsFBPpAKTAKHeC5gycCZsz2BQdKtAn9ADriNdWf4E", []uint{0, 1, 3, 4, 5}) - check(object.MatchStringNotEqual, "BQY3VShN1BmU6XDKiQaDo2tk7s7rkYuaGeVgmcHcWsRY", []uint{0, 1, 2, 4, 5}) - check(object.MatchStringNotEqual, "DsKLie7U2BVph5XkZttG8EERxt9DFQXkrowr6LFkxp8h", []uint{0, 1, 2, 3, 5}) - check(object.MatchStringNotEqual, "Gv9XcEW7KREB8cnjFbW8HBdJesMnbNKknfGdBNsVtQmB", []uint{0, 1, 2, 3, 4}) - check(object.MatchStringEqual, "Dfot9FnhkJy9m8pXrF1fL5fmKmbHK8wL8PqExoQFNTrz", nil) // other - check(object.MatchStringNotEqual, "Dfot9FnhkJy9m8pXrF1fL5fmKmbHK8wL8PqExoQFNTrz", all) - for _, m := range []object.SearchMatchType{ - object.MatchNotPresent, object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, - } { - check(m, "RSYscGLzKw1nkeVRGpowYTGgtgodXJrMyyiHTGGJW3S", nil) + check("$Object:PHY", matcher, "", nil) } - check(object.MatchCommonPrefix, "", all) - check(object.MatchCommonPrefix, "R", []uint{0}) - check(object.MatchCommonPrefix, "6", []uint{1, 2}) - check(object.MatchCommonPrefix, "6h", []uint{2}) - check(object.MatchCommonPrefix, "6h1", nil) - check(object.MatchCommonPrefix, "6hkrsFBPpAKTAKHeC5gycCZsz2BQdKtAn9ADriNdWf4E", []uint{2}) - check(object.MatchCommonPrefix, "6hkrsFBPpAKTAKHeC5gycCZsz2BQdKtAn9ADriNdWf4E1", nil) }) t.Run("version", func(t *testing.T) { check := func(m object.SearchMatchType, v string, matchInds []uint) { @@ -542,7 +707,7 @@ func TestDB_SearchObjects(t *testing.T) { check(object.MatchCommonPrefix, "v1", all) check(object.MatchCommonPrefix, "v10", all) check(object.MatchCommonPrefix, "v100", group1) - check(object.MatchCommonPrefix, "v100.200", all) + check(object.MatchCommonPrefix, "v100.200", group1) check(object.MatchCommonPrefix, "v100.2001", nil) check(object.MatchCommonPrefix, "v101", group2) check(object.MatchCommonPrefix, "v101.201", group2) @@ -550,7 +715,7 @@ func TestDB_SearchObjects(t *testing.T) { }) t.Run("owner", func(t *testing.T) { check := func(m object.SearchMatchType, v string, matchInds []uint) { - check("$Object:owner", m, v, matchInds) + check("$Object:ownerID", m, v, matchInds) } check(object.MatchStringEqual, "NfzJyPrn1hRGuVJNvMYLTfWZGW2ZVR9Qmj", group1) check(object.MatchStringNotEqual, "NfzJyPrn1hRGuVJNvMYLTfWZGW2ZVR9Qmj", group2) @@ -582,18 +747,19 @@ func TestDB_SearchObjects(t *testing.T) { check(object.MatchStringNotEqual, "TOMBSTONE", all) check(object.MatchStringEqual, "0", nil) // numeric enum value check(object.MatchStringEqual, "2", nil) - for _, op := range []object.SearchMatchType{ + for _, matcher := range []object.SearchMatchType{ object.MatchNotPresent, object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, } { - check(op, "", nil) - check(op, "TOMBSTONE", nil) - check(op, "LOCK", nil) - check(op, "1", nil) - check(op, "3", nil) - } - for _, prefix := range []string{"", "T", "L"} { - check(object.MatchCommonPrefix, prefix, nil) + check(matcher, "", nil) + check(matcher, "TOMBSTONE", nil) + check(matcher, "LOCK", nil) + check(matcher, "1", nil) + check(matcher, "3", nil) } + check(object.MatchCommonPrefix, "", all) + check(object.MatchCommonPrefix, "R", group1) + check(object.MatchCommonPrefix, "S", group2) + check(object.MatchCommonPrefix, "L", nil) }) t.Run("payload checksum", func(t *testing.T) { check := func(m object.SearchMatchType, v string, matchInds []uint) { @@ -657,6 +823,7 @@ func TestDB_SearchObjects(t *testing.T) { check := func(m object.SearchMatchType, v string, matchInds []uint) { check("$Object:split.splitID", m, v, matchInds) } + group1, group2, all := []uint{2, 4}, []uint{3, 5}, []uint{2, 3, 4, 5} check(object.MatchStringEqual, "8b69e76d-5e95-4639-8213-46786c41ab73", group1) check(object.MatchStringNotEqual, "8b69e76d-5e95-4639-8213-46786c41ab73", group2) check(object.MatchStringEqual, "60c6b1ff-5e6d-4c0f-8699-15d54bf8a2e1", group2) @@ -671,13 +838,14 @@ func TestDB_SearchObjects(t *testing.T) { check(object.MatchCommonPrefix, "", all) check(object.MatchCommonPrefix, "8", group1) check(object.MatchCommonPrefix, "60", group2) - check(object.MatchCommonPrefix, "8b69e76d-5e95-4639-8213-46786c41ab73", group2) + check(object.MatchCommonPrefix, "8b69e76d-5e95-4639-8213-46786c41ab73", group1) check(object.MatchCommonPrefix, "8b69e76d-5e95-4639-8213-46786c41ab731", nil) }) t.Run("first ID", func(t *testing.T) { check := func(m object.SearchMatchType, v string, matchInds []uint) { check("$Object:split.first", m, v, matchInds) } + group1, group2, all := []uint{2, 4}, []uint{3, 5}, []uint{2, 3, 4, 5} check(object.MatchStringEqual, "61hnJaKip8c1QxvC2iT4Txfpxf37QBNRaw1XCeq72DbC", group1) check(object.MatchStringEqual, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH", group2) check(object.MatchStringNotEqual, "61hnJaKip8c1QxvC2iT4Txfpxf37QBNRaw1XCeq72DbC", group2) @@ -695,43 +863,94 @@ func TestDB_SearchObjects(t *testing.T) { check(object.MatchCommonPrefix, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH", group2) check(object.MatchCommonPrefix, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH1", nil) }) + t.Run("first ID", func(t *testing.T) { + check := func(m object.SearchMatchType, v string, matchInds []uint) { + check("$Object:split.first", m, v, matchInds) + } + group1, group2, all := []uint{2, 4}, []uint{3, 5}, []uint{2, 3, 4, 5} + check(object.MatchStringEqual, "61hnJaKip8c1QxvC2iT4Txfpxf37QBNRaw1XCeq72DbC", group1) + check(object.MatchStringEqual, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH", group2) + check(object.MatchStringNotEqual, "61hnJaKip8c1QxvC2iT4Txfpxf37QBNRaw1XCeq72DbC", group2) + check(object.MatchStringNotEqual, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH", group1) + check(object.MatchStringEqual, "Dfot9FnhkJy9m8pXrF1fL5fmKmbHK8wL8PqExoQFNTrz", nil) // other + check(object.MatchStringNotEqual, "Dfot9FnhkJy9m8pXrF1fL5fmKmbHK8wL8PqExoQFNTrz", all) + for _, m := range []object.SearchMatchType{ + object.MatchNotPresent, object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, + } { + check(m, "61hnJaKip8c1QxvC2iT4Txfpxf37QBNRaw1XCeq72DbC", nil) + } + check(object.MatchCommonPrefix, "", all) + check(object.MatchCommonPrefix, "6", group1) + check(object.MatchCommonPrefix, "C", group2) + check(object.MatchCommonPrefix, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH", group2) + check(object.MatchCommonPrefix, "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH1", nil) + }) + t.Run("parent ID", func(t *testing.T) { + check := func(m object.SearchMatchType, v string, matchInds []uint) { + check("$Object:split.parent", m, v, matchInds) + } + group1, group2, all := []uint{2, 4}, []uint{3, 5}, []uint{2, 3, 4, 5} + check(object.MatchStringEqual, "RSYscGLzKw1nkeVRGpowYTGgtgodXJrMyyiHTGGJW3S", group1) + check(object.MatchStringEqual, "6dMvfyLF7HZ1WsBRgrLUDZP4pLkvNRjB6HWGeNXP4fJp", group2) + check(object.MatchStringNotEqual, "RSYscGLzKw1nkeVRGpowYTGgtgodXJrMyyiHTGGJW3S", group2) + check(object.MatchStringNotEqual, "6dMvfyLF7HZ1WsBRgrLUDZP4pLkvNRjB6HWGeNXP4fJp", group1) + check(object.MatchStringEqual, "Dfot9FnhkJy9m8pXrF1fL5fmKmbHK8wL8PqExoQFNTrz", nil) // other + check(object.MatchStringNotEqual, "Dfot9FnhkJy9m8pXrF1fL5fmKmbHK8wL8PqExoQFNTrz", all) + for _, m := range []object.SearchMatchType{ + object.MatchNotPresent, object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, + } { + check(m, "RSYscGLzKw1nkeVRGpowYTGgtgodXJrMyyiHTGGJW3S", nil) + } + check(object.MatchCommonPrefix, "", all) + check(object.MatchCommonPrefix, "R", group1) + check(object.MatchCommonPrefix, "6", group2) + check(object.MatchCommonPrefix, "6dMvfyLF7HZ1WsBRgrLUDZP4pLkvNRjB6HWGeNXP4fJp", group2) + check(object.MatchCommonPrefix, "6dMvfyLF7HZ1WsBRgrLUDZP4pLkvNRjB6HWGeNXP4fJp1", nil) + }) t.Run("integers", func(t *testing.T) { - for _, op := range []object.SearchMatchType{ + allInt := []uint{0, 1, 2, 3, 4} + for _, matcher := range []object.SearchMatchType{ object.MatchNumGT, object.MatchNumGE, object.MatchNumLT, object.MatchNumLE, } { - check("global_non_integer", op, "123", nil) + check("global_non_integer", matcher, "123", nil) // TODO: also check that BoltDB is untouched in following cases - check("attr_common", op, "text", nil) - check("attr_common", op, "1.5", nil) - check("attr_common", op, "115792089237316195423570985008687907853269984665640564039457584007913129639936", nil) - check("attr_common", op, "-115792089237316195423570985008687907853269984665640564039457584007913129639936", nil) - } - check("attr_common", object.MatchNumLT, "-115792089237316195423570985008687907853269984665640564039457584007913129639935", nil) - check("attr_common", object.MatchNumLT, "-18446744073709551615", []uint{0}) - check("attr_common", object.MatchNumLT, "0", []uint{0, 2}) - check("attr_common", object.MatchNumLT, "18446744073709551615", []uint{0, 2, 3}) - check("attr_common", object.MatchNumLT, "115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{0, 2, 3}) - check("attr_common", object.MatchNumLE, "-115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{0}) - check("attr_common", object.MatchNumLE, "-18446744073709551615", []uint{0, 2}) - check("attr_common", object.MatchNumLE, "0", []uint{0, 2, 3}) - check("attr_common", object.MatchNumLE, "18446744073709551615", []uint{0, 2, 3, 4}) - check("attr_common", object.MatchNumLE, "115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{0, 1, 2, 3, 4}) - check("attr_common", object.MatchNumGT, "-115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{1, 2, 3, 4}) - check("attr_common", object.MatchNumGT, "-18446744073709551615", []uint{1, 3, 4}) - check("attr_common", object.MatchNumGT, "0", []uint{1, 4}) - check("attr_common", object.MatchNumGT, "18446744073709551615", []uint{1}) - check("attr_common", object.MatchNumGT, "115792089237316195423570985008687907853269984665640564039457584007913129639935", nil) - check("attr_common", object.MatchNumGE, "-115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{0, 1, 2, 3, 4}) - check("attr_common", object.MatchNumGE, "-18446744073709551615", []uint{1, 2, 3, 4}) - check("attr_common", object.MatchNumGE, "0", []uint{1, 3, 4}) - check("attr_common", object.MatchNumGE, "18446744073709551615", []uint{1, 2}) - check("attr_common", object.MatchNumGE, "115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{0}) + check("attr_int", matcher, "text", nil) + check("attr_int", matcher, "1.5", nil) + } + check("attr_int", object.MatchNumLT, "-115792089237316195423570985008687907853269984665640564039457584007913129639936", nil) + check("attr_int", object.MatchNumLE, "-115792089237316195423570985008687907853269984665640564039457584007913129639936", nil) + check("attr_int", object.MatchNumGT, "-115792089237316195423570985008687907853269984665640564039457584007913129639936", allInt) + check("attr_int", object.MatchNumGE, "-115792089237316195423570985008687907853269984665640564039457584007913129639936", allInt) + check("attr_int", object.MatchNumLT, "115792089237316195423570985008687907853269984665640564039457584007913129639936", allInt) + check("attr_int", object.MatchNumLE, "115792089237316195423570985008687907853269984665640564039457584007913129639936", allInt) + check("attr_int", object.MatchNumGT, "115792089237316195423570985008687907853269984665640564039457584007913129639936", nil) + check("attr_int", object.MatchNumGE, "115792089237316195423570985008687907853269984665640564039457584007913129639936", nil) + check("attr_int", object.MatchNumLT, "-115792089237316195423570985008687907853269984665640564039457584007913129639935", nil) + check("attr_int", object.MatchNumLT, "-18446744073709551615", []uint{0}) + check("attr_int", object.MatchNumLT, "0", []uint{0, 2}) + check("attr_int", object.MatchNumLT, "18446744073709551615", []uint{0, 2, 3}) + check("attr_int", object.MatchNumLT, "115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{0, 2, 3, 4}) + check("attr_int", object.MatchNumLE, "-115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{0}) + check("attr_int", object.MatchNumLE, "-18446744073709551615", []uint{0, 2}) + check("attr_int", object.MatchNumLE, "0", []uint{0, 2, 3}) + check("attr_int", object.MatchNumLE, "18446744073709551615", []uint{0, 2, 3, 4}) + check("attr_int", object.MatchNumLE, "115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{0, 1, 2, 3, 4}) + check("attr_int", object.MatchNumGT, "-115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{1, 2, 3, 4}) + check("attr_int", object.MatchNumGT, "-18446744073709551615", []uint{1, 3, 4}) + check("attr_int", object.MatchNumGT, "0", []uint{1, 4}) + check("attr_int", object.MatchNumGT, "18446744073709551615", []uint{1}) + check("attr_int", object.MatchNumGT, "115792089237316195423570985008687907853269984665640564039457584007913129639935", nil) + check("attr_int", object.MatchNumGE, "-115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{0, 1, 2, 3, 4}) + check("attr_int", object.MatchNumGE, "-18446744073709551615", []uint{1, 2, 3, 4}) + check("attr_int", object.MatchNumGE, "0", []uint{1, 3, 4}) + check("attr_int", object.MatchNumGE, "18446744073709551615", []uint{1, 4}) + check("attr_int", object.MatchNumGE, "115792089237316195423570985008687907853269984665640564039457584007913129639935", []uint{1}) for _, tc := range []struct { name, key string val1, val2 string }{ {name: "creation epoch", key: "$Object:creationEpoch", val1: "10", val2: "11"}, - {name: "creation epoch", key: "$Object:payloadLength", val1: "20", val2: "21"}, + {name: "payload length", key: "$Object:payloadLength", val1: "20", val2: "21"}, } { t.Run(tc.name, func(t *testing.T) { check(tc.key, object.MatchNumLT, "-115792089237316195423570985008687907853269984665640564039457584007913129639935", nil) @@ -761,5 +980,97 @@ func TestDB_SearchObjects(t *testing.T) { }) } }) + t.Run("complex", func(t *testing.T) { + type filter struct { + k string + m object.SearchMatchType + v string + } + for _, tc := range []struct { + is []uint + fs []filter + }{ + {is: group1, fs: []filter{ + {k: "group_attr_1", m: object.MatchStringEqual, v: "group_val_1"}, + {k: "attr_int", m: object.MatchNumGE, v: "-115792089237316195423570985008687907853269984665640564039457584007913129639935"}, + }}, + {is: []uint{1, 3}, fs: []filter{ + {k: "group_attr_2", m: object.MatchStringNotEqual, v: "group_val_1"}, + {k: "attr_int", m: object.MatchNumLT, v: "115792089237316195423570985008687907853269984665640564039457584007913129639936"}, + }}, + {is: nil, fs: []filter{ + {k: "attr_common", m: object.MatchCommonPrefix, v: "val_c"}, + {k: "attr_int", m: object.MatchNumLT, v: "0"}, + {k: "attr_int", m: object.MatchNumGT, v: "0"}, + }}, + {is: []uint{0, 1, 2, 3, 4}, fs: []filter{ + {k: "attr_common", m: object.MatchStringEqual, v: "val_common"}, + {k: "attr_int", m: object.MatchNumGE, v: "-115792089237316195423570985008687907853269984665640564039457584007913129639935"}, + {k: "attr_int", m: object.MatchNumLE, v: "115792089237316195423570985008687907853269984665640564039457584007913129639935"}, + }}, + {is: []uint{0, 2, 3, 4, 5}, fs: []filter{ + {k: "unique_attr_1", m: object.MatchNotPresent}, + {k: "attr_common", m: object.MatchStringNotEqual, v: "wrong text"}, + }}, + {is: []uint{0, 2, 3, 4, 5}, fs: []filter{ + {k: "unique_attr_1", m: object.MatchNotPresent}, + {k: "attr_common", m: object.MatchStringNotEqual, v: "wrong text"}, + }}, + {is: []uint{4}, fs: []filter{ + {k: "attr_int", m: object.MatchNumGT, v: "-18446744073709551615"}, + {k: "group_attr_1", m: object.MatchStringNotEqual, v: "random"}, + {k: "global_non_integer", m: object.MatchCommonPrefix, v: "not"}, + {k: "random", m: object.MatchNotPresent}, + {k: "attr_int", m: object.MatchNumGE, v: "18446744073709551615"}, + }}, + {is: nil, fs: []filter{ // like previous but > instead of >= + {k: "attr_int", m: object.MatchNumGT, v: "-18446744073709551615"}, + {k: "group_attr_1", m: object.MatchStringNotEqual, v: "random"}, + {k: "global_non_integer", m: object.MatchCommonPrefix, v: "not"}, + {k: "random", m: object.MatchNotPresent}, + {k: "attr_int", m: object.MatchNumGT, v: "18446744073709551615"}, + }}, + {is: group2, fs: []filter{ + {k: "$Object:payloadLength", m: object.MatchNumGT, v: "20"}, + {k: "$Object:creationEpoch", m: object.MatchNumLT, v: "30"}, + }}, + {is: all, fs: []filter{ + {k: "$Object:payloadLength", m: object.MatchNumGT, v: "19"}, + {k: "$Object:creationEpoch", m: object.MatchNumLE, v: "21"}, + }}, + {is: []uint{2, 4}, fs: []filter{ + {k: "$Object:split.first", m: object.MatchStringEqual, v: "61hnJaKip8c1QxvC2iT4Txfpxf37QBNRaw1XCeq72DbC"}, + {k: "$Object:split.parent", m: object.MatchStringEqual, v: "RSYscGLzKw1nkeVRGpowYTGgtgodXJrMyyiHTGGJW3S"}, + }}, + {is: []uint{3, 5}, fs: []filter{ + {k: "$Object:split.parent", m: object.MatchStringNotEqual, v: "RSYscGLzKw1nkeVRGpowYTGgtgodXJrMyyiHTGGJW3S"}, + {k: "$Object:split.first", m: object.MatchStringEqual, v: "Cdf8vnK5xTxmkdc1GcjkxaEQFtEmwHPRky4KRQik6rQH"}, + }}, + {is: []uint{3, 5}, fs: []filter{ + {k: "random", m: object.MatchNotPresent}, + {k: "$Object:split.parent", m: object.MatchStringNotEqual, v: "RSYscGLzKw1nkeVRGpowYTGgtgodXJrMyyiHTGGJW3S"}, + }}, + {is: []uint{2, 4}, fs: []filter{ + {k: "$Object:split.splitID", m: object.MatchCommonPrefix, v: "8b69e76d-5e95-4639"}, + {k: "random", m: object.MatchNotPresent}, + {k: "attr_common", m: object.MatchStringNotEqual, v: "random"}, + }}, + } { + t.Run("complex", func(t *testing.T) { + var fs object.SearchFilters + for _, f := range tc.fs { + fs.AddFilter(f.k, f.v, f.m) + } + res, cursor, err := db.Search(cnr, fs, nil, "", nAll) + require.NoError(t, err) + require.Empty(t, cursor) + require.Len(t, res, len(tc.is)) + for i, ind := range tc.is { + require.Equal(t, ids[ind], res[i].ID) + require.Empty(t, res[i].Attributes) + } + }) + } + }) }) }