Skip to content

Commit

Permalink
membuffer: implement snapshot get and iterator for ART (#1467)
Browse files Browse the repository at this point in the history
ref pingcap/tidb#55287

Signed-off-by: you06 <you1474600@gmail.com>
  • Loading branch information
you06 authored Sep 25, 2024
1 parent 527f80a commit 58f3322
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 47 deletions.
51 changes: 45 additions & 6 deletions internal/unionstore/art/art.go
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,11 @@ func (t *ART) InspectNode(addr arena.MemdbArenaAddr) (*artLeaf, arena.MemdbArena
return lf, lf.vAddr
}

// IsStaging returns whether the MemBuffer is in staging status.
func (t *ART) IsStaging() bool {
return len(t.stages) > 0
}

// Checkpoint returns a checkpoint of ART.
func (t *ART) Checkpoint() *arena.MemDBCheckpoint {
cp := t.allocator.vlogAllocator.Checkpoint()
Expand Down Expand Up @@ -471,7 +476,7 @@ func (t *ART) Cleanup(h int) {
return
}
if h < len(t.stages) {
panic(fmt.Sprintf("cannot cleanup staging buffer, h=%v, len(db.stages)=%v", h, len(t.stages)))
panic(fmt.Sprintf("cannot cleanup staging buffer, h=%v, len(tree.stages)=%v", h, len(t.stages)))
}

cp := &t.stages[h-1]
Expand Down Expand Up @@ -501,7 +506,8 @@ func (t *ART) Reset() {
// DiscardValues releases the memory used by all values.
// NOTE: any operation need value will panic after this function.
func (t *ART) DiscardValues() {
panic("unimplemented")
t.vlogInvalid = true
t.allocator.vlogAllocator.Reset()
}

// InspectStage used to inspect the value updates in the given stage.
Expand All @@ -514,16 +520,35 @@ func (t *ART) InspectStage(handle int, f func([]byte, kv.KeyFlags, []byte)) {

// SelectValueHistory select the latest value which makes `predicate` returns true from the modification history.
func (t *ART) SelectValueHistory(key []byte, predicate func(value []byte) bool) ([]byte, error) {
panic("unimplemented")
_, x := t.search(key)
if x == nil {
return nil, tikverr.ErrNotExist
}
if x.vAddr.IsNull() {
// A flags only key, act as value not exists
return nil, tikverr.ErrNotExist
}
result := t.allocator.vlogAllocator.SelectValueHistory(x.vAddr, func(addr arena.MemdbArenaAddr) bool {
return predicate(t.allocator.vlogAllocator.GetValue(addr))
})
if result.IsNull() {
return nil, nil
}
return t.allocator.vlogAllocator.GetValue(result), nil

}

func (t *ART) SetMemoryFootprintChangeHook(fn func(uint64)) {
panic("unimplemented")
func (t *ART) SetMemoryFootprintChangeHook(hook func(uint64)) {
innerHook := func() {
hook(t.allocator.nodeAllocator.Capacity() + t.allocator.vlogAllocator.Capacity())
}
t.allocator.nodeAllocator.SetMemChangeHook(innerHook)
t.allocator.vlogAllocator.SetMemChangeHook(innerHook)
}

// MemHookSet implements the MemBuffer interface.
func (t *ART) MemHookSet() bool {
panic("unimplemented")
return t.allocator.nodeAllocator.MemHookSet()
}

// GetKeyByHandle returns key by handle.
Expand All @@ -544,10 +569,24 @@ func (t *ART) GetValueByHandle(handle arena.MemKeyHandle) ([]byte, bool) {
return t.allocator.vlogAllocator.GetValue(lf.vAddr), true
}

// GetEntrySizeLimit gets the size limit for each entry and total buffer.
func (t *ART) GetEntrySizeLimit() (uint64, uint64) {
return t.entrySizeLimit, t.bufferSizeLimit
}

func (t *ART) SetEntrySizeLimit(entryLimit, bufferLimit uint64) {
t.entrySizeLimit, t.bufferSizeLimit = entryLimit, bufferLimit
}

// RemoveFromBuffer is a test function, not support yet.
func (t *ART) RemoveFromBuffer(key []byte) {
panic("unimplemented")
}

func (t *ART) GetCacheHitCount() uint64 {
return 0
}

func (t *ART) GetCacheMissCount() uint64 {
return 0
}
14 changes: 13 additions & 1 deletion internal/unionstore/art/art_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,10 @@ func (t *ART) iter(lowerBound, upperBound []byte, reverse, includeFlags bool) (*
inner: &baseIter{
allocator: &t.allocator,
},
currAddr: arena.BadAddr, // the default value of currAddr is not equal to any valid address
// the default value of currAddr is not equal to any valid address
// arena.BadAddr's idx is maxuint32 - 1, which is impossible in common cases,
// this avoids the initial value of currAddr equals to endAddr.
currAddr: arena.BadAddr,
endAddr: arena.NullAddr,
}
it.init(lowerBound, upperBound)
Expand Down Expand Up @@ -84,6 +87,15 @@ func (it *Iterator) Value() []byte {
return it.tree.allocator.vlogAllocator.GetValue(it.currLeaf.vAddr)
}

// HasValue returns false if it is flags only.
func (it *Iterator) HasValue() bool {
return !it.isFlagsOnly()
}

func (it *Iterator) isFlagsOnly() bool {
return it.currLeaf != nil && it.currLeaf.vAddr.IsNull()
}

func (it *Iterator) Next() error {
if !it.valid {
// iterate is finished
Expand Down
113 changes: 97 additions & 16 deletions internal/unionstore/art/art_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,111 @@

package art

import "context"
import (
"context"

func (*ART) SnapshotGetter() *SnapshotGetter {
panic("unimplemented")
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/unionstore/arena"
)

func (t *ART) getSnapshot() arena.MemDBCheckpoint {
if len(t.stages) > 0 {
return t.stages[0]
}
return t.checkpoint()
}

// SnapshotGetter returns a Getter for a snapshot of MemBuffer.
func (t *ART) SnapshotGetter() *SnapGetter {
return &SnapGetter{
tree: t,
cp: t.getSnapshot(),
}
}

// SnapshotIter returns an Iterator for a snapshot of MemBuffer.
func (t *ART) SnapshotIter(start, end []byte) *SnapIter {
inner, err := t.Iter(start, end)
if err != nil {
panic(err)
}
it := &SnapIter{
Iterator: inner,
cp: t.getSnapshot(),
}
for !it.setValue() && it.Valid() {
_ = it.Next()
}
return it
}

func (*ART) SnapshotIter([]byte, []byte) *SnapshotIter {
panic("unimplemented")
// SnapshotIterReverse returns a reverse Iterator for a snapshot of MemBuffer.
func (t *ART) SnapshotIterReverse(k, lowerBound []byte) *SnapIter {
inner, err := t.IterReverse(k, lowerBound)
if err != nil {
panic(err)
}
it := &SnapIter{
Iterator: inner,
cp: t.getSnapshot(),
}
for !it.setValue() && it.valid {
_ = it.Next()
}
return it
}

func (*ART) SnapshotIterReverse([]byte, []byte) *SnapshotIter {
panic("unimplemented")
type SnapGetter struct {
tree *ART
cp arena.MemDBCheckpoint
}

type SnapshotGetter struct{}
func (snap *SnapGetter) Get(ctx context.Context, key []byte) ([]byte, error) {
addr, lf := snap.tree.search(key)
if addr.IsNull() {
return nil, tikverr.ErrNotExist
}
if lf.vAddr.IsNull() {
// A flags only key, act as value not exists
return nil, tikverr.ErrNotExist
}
v, ok := snap.tree.allocator.vlogAllocator.GetSnapshotValue(lf.vAddr, &snap.cp)
if !ok {
return nil, tikverr.ErrNotExist
}
return v, nil
}

func (s *SnapshotGetter) Get(context.Context, []byte) ([]byte, error) {
panic("unimplemented")
type SnapIter struct {
*Iterator
value []byte
cp arena.MemDBCheckpoint
}

type SnapshotIter struct{}
func (i *SnapIter) Value() []byte {
return i.value
}

func (i *SnapshotIter) Valid() bool { panic("unimplemented") }
func (i *SnapshotIter) Key() []byte { panic("unimplemented") }
func (i *SnapshotIter) Value() []byte { panic("unimplemented") }
func (i *SnapshotIter) Next() error { panic("unimplemented") }
func (i *SnapshotIter) Close() { panic("unimplemented") }
func (i *SnapIter) Next() error {
i.value = nil
for i.Valid() {
if err := i.Iterator.Next(); err != nil {
return err
}
if i.setValue() {
return nil
}
}
return nil
}

func (i *SnapIter) setValue() bool {
if !i.Valid() {
return false
}
if v, ok := i.tree.allocator.vlogAllocator.GetSnapshotValue(i.currLeaf.vAddr, &i.cp); ok {
i.value = v
return true
}
return false
}
6 changes: 3 additions & 3 deletions internal/unionstore/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type MemDBCheckpoint = arena.MemDBCheckpoint

type MemKeyHandle = arena.MemKeyHandle

type MemDB = rbtDBWithContext
type MemDB = artDBWithContext

var NewMemDB = newRbtDBWithContext
var NewMemDBWithContext = newRbtDBWithContext
var NewMemDB = newArtDBWithContext
var NewMemDBWithContext = newArtDBWithContext
4 changes: 1 addition & 3 deletions internal/unionstore/memdb_art.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//nolint:unused
package unionstore

import (
Expand All @@ -36,7 +35,6 @@ type artDBWithContext struct {
skipMutex bool
}

//nolint:unused
func newArtDBWithContext() *artDBWithContext {
return &artDBWithContext{ART: art.New()}
}
Expand Down Expand Up @@ -118,7 +116,7 @@ func (db *artDBWithContext) FlushWait() error { return nil }

// GetMemDB implements the MemBuffer interface.
func (db *artDBWithContext) GetMemDB() *MemDB {
panic("unimplemented")
return db
}

// BatchGet returns the values for given keys from the MemBuffer.
Expand Down
3 changes: 2 additions & 1 deletion internal/unionstore/memdb_rbt.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func newRbtDBWithContext() *rbtDBWithContext {
}
}

//nolint:unused
func (db *rbtDBWithContext) setSkipMutex(skip bool) {
db.skipMutex = skip
}
Expand Down Expand Up @@ -125,7 +126,7 @@ func (db *rbtDBWithContext) FlushWait() error { return nil }

// GetMemDB implements the MemBuffer interface.
func (db *rbtDBWithContext) GetMemDB() *MemDB {
return db
return nil
}

// BatchGet returns the values for given keys from the MemBuffer.
Expand Down
33 changes: 16 additions & 17 deletions internal/unionstore/memdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ func testUnsetTemporaryFlag(t *testing.T, buffer MemBuffer) {

func TestSnapshotGetIter(t *testing.T) {
testSnapshotGetIter(t, newRbtDBWithContext())
testSnapshotGetIter(t, newArtDBWithContext())
}

func testSnapshotGetIter(t *testing.T, db MemBuffer) {
Expand Down Expand Up @@ -1089,21 +1090,19 @@ func testIterNoResult(t *testing.T, buffer MemBuffer) {
assert := assert.New(t)

assert.Nil(buffer.Set([]byte{1, 1}, []byte{1, 1}))
// Test lower bound and upper bound seek same position
iter, err := buffer.Iter([]byte{1, 0, 0}, []byte{1, 0, 1})
assert.Nil(err)
assert.False(iter.Valid())
iter, err = buffer.IterReverse([]byte{1, 0, 1}, []byte{1, 0, 0})
assert.Nil(err)
assert.False(iter.Valid())
// Test lower bound >= upper bound
iter, err = buffer.Iter([]byte{1, 0, 1}, []byte{1, 0, 0})
assert.Nil(err)
assert.False(iter.Valid())
iter, err = buffer.IterReverse([]byte{1, 0, 0}, []byte{1, 0, 1})
assert.Nil(err)
assert.False(iter.Valid())
iter, err = buffer.Iter([]byte{1, 1}, []byte{1, 1})
assert.Nil(err)
assert.False(iter.Valid())

checkFn := func(lowerBound, upperBound []byte) {
iter, err := buffer.Iter(lowerBound, upperBound)
assert.Nil(err)
assert.False(iter.Valid())
iter, err = buffer.IterReverse(upperBound, lowerBound)
assert.Nil(err)
assert.False(iter.Valid())
}

// Test lower bound and upper bound seek to the same position
checkFn([]byte{1, 1}, []byte{1, 1})
checkFn([]byte{1, 0, 0}, []byte{1, 0, 1})
// Test lower bound > upper bound
checkFn([]byte{1, 0, 1}, []byte{1, 0, 0})
}

0 comments on commit 58f3322

Please sign in to comment.