diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index c7c5a453..a3eec7f2 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -101,11 +101,15 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry func (r *scanStandaloneReader) subscript() { c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) - c.Send("psubscribe", "__keyevent@*__:*") - // filter dbs - dbIDmap := make(map[int]struct{}) - for _, db := range r.dbs { - dbIDmap[db] = struct{}{} + if len(r.dbs) == 0 { + c.Send("psubscribe", "__keyevent@*__:*") + } else { + strs := make([]string, len(r.dbs)) + for i, v := range r.dbs { + strs[i] = strconv.Itoa(v) + } + s := fmt.Sprintf("__keyevent@[%v]__:*", strings.Join(strs, ",")) + c.Send("psubscribe", s) } _, err := c.Receive() if err != nil { @@ -130,20 +134,16 @@ func (r *scanStandaloneReader) subscript() { if err != nil { log.Panicf(err.Error()) } - // if the db is not in the dbs, ignore it - if _, ok := dbIDmap[dbIdInt]; ok { - // handle del action - eventSlice := strings.Split(respSlice[2].(string), ":") - if eventSlice[1] == "del" { - e := entry.NewEntry() - e.DbId = dbIdInt - e.Argv = []string{"DEL", key} - r.ch <- e - continue - } - - r.needDumpQueue.Put(dbKey{db: dbIdInt, key: key}) + // handle del action + eventSlice := strings.Split(respSlice[2].(string), ":") + if eventSlice[1] == "del" { + e := entry.NewEntry() + e.DbId = dbIdInt + e.Argv = []string{"DEL", key} + r.ch <- e + continue } + r.needDumpQueue.Put(dbKey{db: dbIdInt, key: key}) } } } @@ -200,7 +200,7 @@ func (r *scanStandaloneReader) dump() { r.dumpClient.Do("READONLY") log.Infof("running dump() in read-only mode") } - + for item := range r.needDumpQueue.Ch { r.stat.NeedUpdateCount = int64(r.needDumpQueue.Len()) dbId := item.(dbKey).db