Skip to content

Commit

Permalink
fix: fix scan multi db subscribe pattern
Browse files Browse the repository at this point in the history
  • Loading branch information
suxb201 committed Feb 24, 2025
1 parent 0a7aad7 commit 77be8bf
Showing 1 changed file with 27 additions and 24 deletions.
51 changes: 27 additions & 24 deletions internal/reader/scan_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,20 +63,7 @@ type scanStandaloneReader struct {

func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reader {
r := new(scanStandaloneReader)
// dbs
c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.TlsConfig, opts.PreferReplica)
if len(opts.DBS) != 0 {
r.dbs = opts.DBS
} else if c.IsCluster() { // not use opts.Cluster, because user may use standalone mode to scan a cluster node
r.dbs = []int{0}
} else {
c.Send("info", "keyspace")
info, err := c.Receive()
if err != nil {
log.Panicf(err.Error())
}
r.dbs = utils.ParseDBs(info.(string))
}
r.dbs = opts.DBS
r.opts = opts
r.ch = make(chan *entry.Entry, 1024)
r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1)
Expand All @@ -101,20 +88,27 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) []chan *entry.Entr

func (r *scanStandaloneReader) subscript() {
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.TlsConfig, r.opts.PreferReplica)
log.Infof("[%s] scanStandaloneReader subscript started. dbs=[%v]", r.stat.Name, r.dbs)
if len(r.dbs) == 0 {
c.Send("psubscribe", "__keyevent@*__:*")
_, err := c.Receive()
if err != nil {
log.Panicf(err.Error())
}
} else {
strs := make([]string, len(r.dbs))
for i, v := range r.dbs {
strs[i] = strconv.Itoa(v)
args := []interface{}{"psubscribe"}
for _, db := range r.dbs {
args = append(args, fmt.Sprintf("__keyevent@%v__:*", db))
}
c.Send(args...)
for range r.dbs {
_, err := c.Receive()
if err != nil {
log.Panicf(err.Error())
}
}
s := fmt.Sprintf("__keyevent@[%v]__:*", strings.Join(strs, ","))
c.Send("psubscribe", s)
}
_, err := c.Receive()
if err != nil {
log.Panicf(err.Error())
}

regex := regexp.MustCompile(`\d+`)
for {
select {
Expand Down Expand Up @@ -151,7 +145,16 @@ func (r *scanStandaloneReader) subscript() {
func (r *scanStandaloneReader) scan() {
c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.TlsConfig, r.opts.PreferReplica)
defer c.Close()
for _, dbId := range r.dbs {
dbs := r.dbs
if len(r.dbs) == 0 {
c.Send("info", "keyspace")
info, err := c.Receive()
if err != nil {
log.Panicf(err.Error())
}
dbs = utils.ParseDBs(info.(string))
}
for _, dbId := range dbs {
if dbId != 0 {
reply := c.DoWithStringReply("SELECT", strconv.Itoa(dbId))
if reply != "OK" {
Expand Down

0 comments on commit 77be8bf

Please sign in to comment.