Skip to content

Commit

Permalink
no cache on iterator, remove unused snapshot (#389)
Browse files Browse the repository at this point in the history
  • Loading branch information
frairon authored Jul 1, 2022
1 parent b02505d commit bbfecd8
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 27 deletions.
8 changes: 2 additions & 6 deletions partition_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,8 @@ func newPartitionTable(topic string,
builder storage.Builder,
log logger,
backoff Backoff,
backoffResetTimeout time.Duration) *PartitionTable {

backoffResetTimeout time.Duration,
) *PartitionTable {
pt := &PartitionTable{
partition: partition,
state: newPartitionTableState(),
Expand All @@ -107,7 +107,6 @@ func newPartitionTable(topic string,

// SetupAndRecover sets up the partition storage and recovers to HWM
func (p *PartitionTable) SetupAndRecover(ctx context.Context, restartOnError bool) error {

err := p.setup(ctx)
if err != nil {
return err
Expand Down Expand Up @@ -405,7 +404,6 @@ func (p *PartitionTable) markRecovered(ctx context.Context) error {
}

func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error {

timeoutCtx, cancel := context.WithTimeout(context.Background(), consumerDrainTimeout)
defer cancel()

Expand Down Expand Up @@ -448,7 +446,6 @@ func (p *PartitionTable) drainConsumer(cons sarama.PartitionConsumer) error {
}

func (p *PartitionTable) loadMessages(ctx context.Context, cons sarama.PartitionConsumer, partitionHwm int64, stopAfterCatchup bool) error {

stallTicker := time.NewTicker(p.stallPeriod)
defer stallTicker.Stop()

Expand Down Expand Up @@ -527,7 +524,6 @@ func (p *PartitionTable) enqueueStatsUpdate(ctx context.Context, updater func())
// recover/catchup mechanism so clients can always request stats even if the partition table is not
// running (like a processor table after it's recovered).
func (p *PartitionTable) RunStatsLoop(ctx context.Context) {

updateHwmStatsTicker := time.NewTicker(statsHwmUpdateInterval)
defer updateHwmStatsTicker.Stop()
for {
Expand Down
3 changes: 0 additions & 3 deletions storage/iterator.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package storage

import (
"github.com/syndtr/goleveldb/leveldb"
ldbiter "github.com/syndtr/goleveldb/leveldb/iterator"
)

// iterator wraps an Iterator implementation and handles the value decoding and
// offset key skipping.
type iterator struct {
iter ldbiter.Iterator
snap *leveldb.Snapshot
}

func (i *iterator) Next() bool {
Expand Down Expand Up @@ -40,7 +38,6 @@ func (i *iterator) Value() ([]byte, error) {

func (i *iterator) Release() {
i.iter.Release()
i.snap.Release()
}

func (i *iterator) Seek(key []byte) bool {
Expand Down
23 changes: 5 additions & 18 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)

Expand Down Expand Up @@ -44,7 +45,6 @@ type Iterator interface {
// Implementations of this interface must be safe for any number of concurrent
// readers with one writer.
type Storage interface {

// Opens/Initialize the storage
Open() error

Expand Down Expand Up @@ -95,7 +95,6 @@ type storage struct {

// New creates a new Storage backed by LevelDB.
func New(db *leveldb.DB) (Storage, error) {

return &storage{
db: db,
recovered: make(chan struct{}),
Expand All @@ -106,35 +105,23 @@ func New(db *leveldb.DB) (Storage, error) {

// Iterator returns an iterator that traverses over a snapshot of the storage.
func (s *storage) Iterator() (Iterator, error) {
snap, err := s.db.GetSnapshot()
if err != nil {
return nil, err
}

return &iterator{
iter: s.db.NewIterator(nil, nil),
snap: snap,
iter: s.db.NewIterator(nil, &opt.ReadOptions{
DontFillCache: true,
}),
}, nil
}

// Iterator returns an iterator that traverses over a snapshot of the storage.
func (s *storage) IteratorWithRange(start, limit []byte) (Iterator, error) {
snap, err := s.db.GetSnapshot()
if err != nil {
return nil, err
}

if limit != nil && len(limit) > 0 {
if len(limit) > 0 {
return &iterator{
iter: s.db.NewIterator(&util.Range{Start: start, Limit: limit}, nil),
snap: snap,
}, nil
}
return &iterator{
iter: s.db.NewIterator(util.BytesPrefix(start), nil),
snap: snap,
}, nil

}

func (s *storage) Has(key string) (bool, error) {
Expand Down

0 comments on commit bbfecd8

Please sign in to comment.