From c2212831c83c94b5bbc62ab409bb1a26a82f3cd3 Mon Sep 17 00:00:00 2001 From: yanshushuang Date: Wed, 6 Mar 2024 10:46:51 +0800 Subject: [PATCH 01/11] update else if to switch --- cmd/redis-shake/main.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index 0c98de9a..5b5b4e35 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -34,7 +34,8 @@ func main() { // create reader var theReader reader.Reader - if v.IsSet("sync_reader") { + switch { + case v.IsSet("sync_reader"): opts := new(reader.SyncReaderOptions) defaults.SetDefaults(opts) err := v.UnmarshalKey("sync_reader", opts) @@ -48,7 +49,7 @@ func main() { theReader = reader.NewSyncStandaloneReader(ctx, opts) log.Infof("create SyncStandaloneReader: %v", opts.Address) } - } else if v.IsSet("scan_reader") { + case v.IsSet("scan_reader"): opts := new(reader.ScanReaderOptions) defaults.SetDefaults(opts) err := v.UnmarshalKey("scan_reader", opts) @@ -62,7 +63,7 @@ func main() { theReader = reader.NewScanStandaloneReader(ctx, opts) log.Infof("create ScanStandaloneReader: %v", opts.Address) } - } else if v.IsSet("rdb_reader") { + case v.IsSet("rdb_reader"): opts := new(reader.RdbReaderOptions) defaults.SetDefaults(opts) err := v.UnmarshalKey("rdb_reader", opts) @@ -71,7 +72,7 @@ func main() { } theReader = reader.NewRDBReader(opts) log.Infof("create RdbReader: %v", opts.Filepath) - } else if v.IsSet("aof_reader") { + case v.IsSet("aof_reader"): opts := new(reader.AOFReaderOptions) defaults.SetDefaults(opts) err := v.UnmarshalKey("aof_reader", opts) @@ -80,10 +81,9 @@ func main() { } theReader = reader.NewAOFReader(opts) log.Infof("create AOFReader: %v", opts.Filepath) - } else { + default: log.Panicf("no reader config entry found") } - // create writer var theWriter writer.Writer if v.IsSet("redis_writer") { From d80c68becff7a0aa646f6e09d2252251ad4c8564 Mon Sep 17 00:00:00 2001 From: yanshushuang Date: Wed, 6 Mar 2024 15:45:07 +0800 Subject: [PATCH 02/11] update else if to switch --- cmd/redis-shake/main.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index 5b5b4e35..91ab37bc 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -86,7 +86,8 @@ func main() { } // create writer var theWriter writer.Writer - if v.IsSet("redis_writer") { + switch { + case v.IsSet("redis_writer"): opts := new(writer.RedisWriterOptions) defaults.SetDefaults(opts) err := v.UnmarshalKey("redis_writer", opts) @@ -109,10 +110,9 @@ func main() { entry.Argv = []string{"FLUSHALL"} theWriter.Write(entry) } - } else { + default: log.Panicf("no writer config entry found") } - // create status status.Init(theReader, theWriter) From d84f6ffe088dfd7c92641dd0a886df0d831868e8 Mon Sep 17 00:00:00 2001 From: blight Date: Tue, 19 Mar 2024 09:26:35 +0800 Subject: [PATCH 03/11] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=8A=A8=E6=80=81?= =?UTF-8?q?=E8=AF=86=E5=88=AB=E5=BA=93=E5=8F=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/reader/scan_standalone_reader.go | 7 ++++++- internal/utils/parse.go | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) create mode 100644 internal/utils/parse.go diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 2b131594..43178991 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -58,7 +58,12 @@ func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reade r.dbs = []int{0} } else { if len(opts.DBS) == 0 { - r.dbs = []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15} + c.Send("info", "keyspace") + info, err := c.Receive() + if err != nil { + log.Panicf(err.Error()) + } + r.dbs = utils.ParseDBs(info.(string)) } else { r.dbs = opts.DBS } diff --git a/internal/utils/parse.go b/internal/utils/parse.go new file mode 100644 index 00000000..3109cfa8 --- /dev/null +++ b/internal/utils/parse.go @@ -0,0 +1,19 @@ +package utils + +import ( + "regexp" + "strconv" +) + +func ParseDBs(s string) []int { + dbsString := regexp.MustCompile(`db(\d+):`).FindAllStringSubmatch(s, -1) + if dbsString == nil { + return []int{} + } + dbs := make([]int, len(dbsString)) + for i, dbString := range dbsString { + db, _ := strconv.Atoi(dbString[1]) + dbs[i] = db + } + return dbs +} From 36915f0f2984e73e26be4e2392b38b85fe5a1f02 Mon Sep 17 00:00:00 2001 From: blight Date: Wed, 20 Mar 2024 09:20:38 +0800 Subject: [PATCH 04/11] =?UTF-8?q?=E6=80=A7=E8=83=BD=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/reader/scan_standalone_reader.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 43178991..e18121a3 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -106,8 +106,9 @@ func (r *scanStandaloneReader) subscript() { if err != nil { log.Panicf(err.Error()) } - key := resp.([]interface{})[3].(string) - dbId := regex.FindString(resp.([]interface{})[2].(string)) + respSlice := resp.([]interface{}) + key := respSlice[3].(string) + dbId := regex.FindString(respSlice[2].(string)) dbIdInt, err := strconv.Atoi(dbId) if err != nil { log.Panicf(err.Error()) From f286da4ea73329404b3b91d354d137f93e0053ca Mon Sep 17 00:00:00 2001 From: blight Date: Wed, 20 Mar 2024 09:22:58 +0800 Subject: [PATCH 05/11] =?UTF-8?q?=E6=B7=BB=E5=8A=A0scan=E6=97=B6=E5=80=99?= =?UTF-8?q?=E7=9A=84=E4=B8=80=E6=AC=A1=E6=89=AB=E6=8F=8F=E6=AC=A1=E6=95=B0?= =?UTF-8?q?=E6=8E=A7=E5=88=B6=E9=85=8D=E7=BD=AE=EF=BC=8C=E9=98=B2=E6=AD=A2?= =?UTF-8?q?=E6=BA=90=E7=AB=AFcpu=E9=AB=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- internal/client/redis.go | 4 ++-- internal/reader/scan_standalone_reader.go | 4 +++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/internal/client/redis.go b/internal/client/redis.go index c884deeb..2d43ff48 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -148,8 +148,8 @@ func (r *Redis) Close() { /* Commands */ -func (r *Redis) Scan(cursor uint64) (newCursor uint64, keys []string) { - r.Send("scan", strconv.FormatUint(cursor, 10), "count", "2048") +func (r *Redis) Scan(cursor uint64, batch string) (newCursor uint64, keys []string) { + r.Send("scan", strconv.FormatUint(cursor, 10), "count", batch) reply, err := r.Receive() if err != nil { log.Panicf(err.Error()) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index e18121a3..55d66988 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -26,6 +26,7 @@ type ScanReaderOptions struct { KSN bool `mapstructure:"ksn" default:"false"` DBS []int `mapstructure:"dbs"` PreferReplica bool `mapstructure:"prefer_replica" default:"false"` + Batch int `mapstructure:"batch" default:"2048"` } type dbKey struct { @@ -131,9 +132,10 @@ func (r *scanStandaloneReader) scan() { } var cursor uint64 = 0 + batch := strconv.Itoa(r.opts.Batch) for { var keys []string - cursor, keys = c.Scan(cursor) + cursor, keys = c.Scan(cursor, batch) for _, key := range keys { r.keyQueue.Put(dbKey{dbId, key}) // pass value not pointer } From 55b41f51f1c81fe5b4209f58b687d5bf145d41e3 Mon Sep 17 00:00:00 2001 From: blight Date: Wed, 20 Mar 2024 15:35:57 +0800 Subject: [PATCH 06/11] rename batch to count --- internal/client/redis.go | 4 ++-- internal/reader/scan_standalone_reader.go | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/internal/client/redis.go b/internal/client/redis.go index 2d43ff48..583897ad 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -148,8 +148,8 @@ func (r *Redis) Close() { /* Commands */ -func (r *Redis) Scan(cursor uint64, batch string) (newCursor uint64, keys []string) { - r.Send("scan", strconv.FormatUint(cursor, 10), "count", batch) +func (r *Redis) Scan(cursor uint64, count string) (newCursor uint64, keys []string) { + r.Send("scan", strconv.FormatUint(cursor, 10), "count", count) reply, err := r.Receive() if err != nil { log.Panicf(err.Error()) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index 55d66988..c9c14a75 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -26,7 +26,7 @@ type ScanReaderOptions struct { KSN bool `mapstructure:"ksn" default:"false"` DBS []int `mapstructure:"dbs"` PreferReplica bool `mapstructure:"prefer_replica" default:"false"` - Batch int `mapstructure:"batch" default:"2048"` + Count int `mapstructure:"count" default:"2048"` } type dbKey struct { @@ -132,10 +132,10 @@ func (r *scanStandaloneReader) scan() { } var cursor uint64 = 0 - batch := strconv.Itoa(r.opts.Batch) + count := strconv.Itoa(r.opts.Count) for { var keys []string - cursor, keys = c.Scan(cursor, batch) + cursor, keys = c.Scan(cursor, count) for _, key := range keys { r.keyQueue.Put(dbKey{dbId, key}) // pass value not pointer } From 1ef449dfc99b8895e576d21658b284e6254fe5fa Mon Sep 17 00:00:00 2001 From: blight Date: Wed, 20 Mar 2024 15:57:34 +0800 Subject: [PATCH 07/11] edit ...string to ...interface --- internal/client/redis.go | 8 ++++---- internal/reader/scan_standalone_reader.go | 2 +- internal/reader/sync_standalone_reader.go | 6 +++--- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/client/redis.go b/internal/client/redis.go index 583897ad..7d75dc2c 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -71,7 +71,7 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo return r } -func (r *Redis) DoWithStringReply(args ...string) string { +func (r *Redis) DoWithStringReply(args ...interface{}) string { r.Send(args...) replyInterface, err := r.Receive() @@ -82,7 +82,7 @@ func (r *Redis) DoWithStringReply(args ...string) string { return reply } -func (r *Redis) Do(args ...string) interface{} { +func (r *Redis) Do(args ...interface{}) interface{} { r.Send(args...) reply, err := r.Receive() @@ -92,7 +92,7 @@ func (r *Redis) Do(args ...string) interface{} { return reply } -func (r *Redis) Send(args ...string) { +func (r *Redis) Send(args ...interface{}) { argsInterface := make([]interface{}, len(args)) for inx, item := range args { argsInterface[inx] = item @@ -148,7 +148,7 @@ func (r *Redis) Close() { /* Commands */ -func (r *Redis) Scan(cursor uint64, count string) (newCursor uint64, keys []string) { +func (r *Redis) Scan(cursor uint64, count int) (newCursor uint64, keys []string) { r.Send("scan", strconv.FormatUint(cursor, 10), "count", count) reply, err := r.Receive() if err != nil { diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index c9c14a75..0f04792d 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -132,7 +132,7 @@ func (r *scanStandaloneReader) scan() { } var cursor uint64 = 0 - count := strconv.Itoa(r.opts.Count) + count := r.opts.Count for { var keys []string cursor, keys = c.Scan(cursor, count) diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index 9dc7a8c4..bab36105 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -116,7 +116,7 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry func (r *syncStandaloneReader) sendReplconfListenPort() { // use status_port as redis-shake port - argv := []string{"replconf", "listening-port", strconv.Itoa(config.Opt.Advanced.StatusPort)} + argv := []interface{}{"replconf", "listening-port", strconv.Itoa(config.Opt.Advanced.StatusPort)} r.client.Send(argv...) _, err := r.client.Receive() if err != nil { @@ -126,9 +126,9 @@ func (r *syncStandaloneReader) sendReplconfListenPort() { func (r *syncStandaloneReader) sendPSync() { // send PSync - argv := []string{"PSYNC", "?", "-1"} + argv := []interface{}{"PSYNC", "?", "-1"} if config.Opt.Advanced.AwsPSync != "" { - argv = []string{config.Opt.Advanced.GetPSyncCommand(r.stat.Address), "?", "-1"} + argv = []interface{}{config.Opt.Advanced.GetPSyncCommand(r.stat.Address), "?", "-1"} } r.client.Send(argv...) From ca2d6bd586762df39c7fe0b22fac655c685f7e45 Mon Sep 17 00:00:00 2001 From: yanshushuang Date: Wed, 12 Jun 2024 13:51:28 +0800 Subject: [PATCH 08/11] performance optimization --- internal/reader/scan_standalone_reader.go | 34 ++++++++++------------- 1 file changed, 15 insertions(+), 19 deletions(-) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index c7c5a453..a12d3a1a 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -101,12 +101,12 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry func (r *scanStandaloneReader) subscript() { c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) - c.Send("psubscribe", "__keyevent@*__:*") - // filter dbs - dbIDmap := make(map[int]struct{}) - for _, db := range r.dbs { - dbIDmap[db] = struct{}{} + strs := make([]string, len(r.dbs)) + for i, v := range r.dbs { + strs[i] = strconv.Itoa(v) } + s := fmt.Sprintf("__keyevent@[%v]__:*", strings.Join(strs, ",")) + c.Send("psubscribe", s) _, err := c.Receive() if err != nil { log.Panicf(err.Error()) @@ -130,20 +130,16 @@ func (r *scanStandaloneReader) subscript() { if err != nil { log.Panicf(err.Error()) } - // if the db is not in the dbs, ignore it - if _, ok := dbIDmap[dbIdInt]; ok { - // handle del action - eventSlice := strings.Split(respSlice[2].(string), ":") - if eventSlice[1] == "del" { - e := entry.NewEntry() - e.DbId = dbIdInt - e.Argv = []string{"DEL", key} - r.ch <- e - continue - } - - r.needDumpQueue.Put(dbKey{db: dbIdInt, key: key}) + // handle del action + eventSlice := strings.Split(respSlice[2].(string), ":") + if eventSlice[1] == "del" { + e := entry.NewEntry() + e.DbId = dbIdInt + e.Argv = []string{"DEL", key} + r.ch <- e + continue } + r.needDumpQueue.Put(dbKey{db: dbIdInt, key: key}) } } } @@ -200,7 +196,7 @@ func (r *scanStandaloneReader) dump() { r.dumpClient.Do("READONLY") log.Infof("running dump() in read-only mode") } - + for item := range r.needDumpQueue.Ch { r.stat.NeedUpdateCount = int64(r.needDumpQueue.Len()) dbId := item.(dbKey).db From a7efa0e4be3959d28b04d85840e48403e49fbcfe Mon Sep 17 00:00:00 2001 From: yanshushuang Date: Thu, 13 Jun 2024 09:31:39 +0800 Subject: [PATCH 09/11] fix r.db length is 0 --- internal/reader/scan_standalone_reader.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index a12d3a1a..a3eec7f2 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -101,12 +101,16 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry func (r *scanStandaloneReader) subscript() { c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) - strs := make([]string, len(r.dbs)) - for i, v := range r.dbs { - strs[i] = strconv.Itoa(v) + if len(r.dbs) == 0 { + c.Send("psubscribe", "__keyevent@*__:*") + } else { + strs := make([]string, len(r.dbs)) + for i, v := range r.dbs { + strs[i] = strconv.Itoa(v) + } + s := fmt.Sprintf("__keyevent@[%v]__:*", strings.Join(strs, ",")) + c.Send("psubscribe", s) } - s := fmt.Sprintf("__keyevent@[%v]__:*", strings.Join(strs, ",")) - c.Send("psubscribe", s) _, err := c.Receive() if err != nil { log.Panicf(err.Error()) From 5a14fd71d3acce304c7a04489a3f0f42c1106250 Mon Sep 17 00:00:00 2001 From: yanshushuang Date: Fri, 14 Jun 2024 08:52:40 +0800 Subject: [PATCH 10/11] fix prefer replica bug --- internal/client/redis.go | 71 +++++++++++++++++++++- internal/reader/scan_standalone_reader.go | 8 +-- internal/reader/sync_standalone_reader.go | 2 +- internal/utils/cluster_nodes.go | 2 +- internal/writer/redis_sentinel_writer.go | 2 +- internal/writer/redis_standalone_writer.go | 2 +- 6 files changed, 76 insertions(+), 11 deletions(-) diff --git a/internal/client/redis.go b/internal/client/redis.go index 01a70661..11cec85a 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -5,7 +5,9 @@ import ( "context" "crypto/tls" "net" + "regexp" "strconv" + "strings" "time" "RedisShake/internal/client/proto" @@ -20,11 +22,11 @@ type Redis struct { protoWriter *proto.Writer } -func NewSentinelClient(ctx context.Context, address string, Tls bool) *Redis { - return NewRedisClient(ctx, address, "", "", Tls) +func NewSentinelMasterClient(ctx context.Context, address string, Tls bool) *Redis { + return NewRedisClient(ctx, address, "", "", Tls, false) } -func NewRedisClient(ctx context.Context, address string, username string, password string, Tls bool) *Redis { +func NewRedisClient(ctx context.Context, address string, username string, password string, Tls bool, replica bool) *Redis { r := new(Redis) var conn net.Conn var dialer = &net.Dialer{ @@ -71,10 +73,73 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo if reply != "PONG" { panic("ping failed with reply: " + reply) } + reply = r.DoWithStringReply("info", "replication") + // get best replica + if replica { + replicaInfo := getReplicaAddr(reply) + log.Infof("best replica: %s", replicaInfo.BestReplica) + r = NewRedisClient(ctx, replicaInfo.BestReplica, username, password, Tls, false) + } return r } +type Replica struct { + Addr string + Offset string +} + +type RedisReplicaInfo struct { + Role string + BestReplica string +} + +func getReplicaAddr(info string) RedisReplicaInfo { + infoReplica := RedisReplicaInfo{} + replicas := make([]Replica, 0) + slaveInfoRegexp := regexp.MustCompile(`slave\d+:ip=.*`) + for _, line := range strings.Split(info, "\n") { + line = strings.TrimSpace(line) + switch { + case strings.HasPrefix(line, "role:slave"): + infoReplica.Role = "slave" + return infoReplica + case strings.HasPrefix(line, "role:master"): + infoReplica.Role = "master" + case slaveInfoRegexp.MatchString(line): + slaveInfo := strings.Split(line, ":") + s1 := slaveInfo[1] + slaveInfo = strings.Split(s1, ",") + replica := Replica{} + var host string + var port string + var offset string + for _, item := range slaveInfo { + if strings.HasPrefix(item, "ip=") { + host = strings.Split(item, "=")[1] + } + if strings.HasPrefix(item, "port=") { + port = strings.Split(item, "=")[1] + } + if strings.HasPrefix(item, "offset=") { + offset = strings.Split(item, "=")[1] + } + } + replica.Addr = host + ":" + port + replica.Offset = offset + replicas = append(replicas, replica) + } + } + best := replicas[0] + for _, replica := range replicas { + if replica.Offset > best.Offset { + best = replica + } + } + infoReplica.BestReplica = best.Addr + return infoReplica +} + func (r *Redis) DoWithStringReply(args ...interface{}) string { r.Send(args...) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index a3eec7f2..f881d588 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -63,7 +63,7 @@ type scanStandaloneReader struct { func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reader { r := new(scanStandaloneReader) // dbs - c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls) + c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) if c.IsCluster() { // not use opts.Cluster, because user may use standalone mode to scan a cluster node r.dbs = []int{0} } else { @@ -100,7 +100,7 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry } func (r *scanStandaloneReader) subscript() { - c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica) if len(r.dbs) == 0 { c.Send("psubscribe", "__keyevent@*__:*") } else { @@ -149,7 +149,7 @@ func (r *scanStandaloneReader) subscript() { } func (r *scanStandaloneReader) scan() { - c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica) defer c.Close() for _, dbId := range r.dbs { if dbId != 0 { @@ -194,7 +194,7 @@ func (r *scanStandaloneReader) scan() { func (r *scanStandaloneReader) dump() { nowDbId := 0 - r.dumpClient = client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + r.dumpClient = client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica) // Support prefer_replica=true in both Cluster and Standalone mode if r.opts.PreferReplica { r.dumpClient.Do("READONLY") diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index df0cd590..77542f7d 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -83,7 +83,7 @@ type syncStandaloneReader struct { func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader { r := new(syncStandaloneReader) r.opts = opts - r.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls) + r.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) r.rd = r.client.BufioReader() r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1) r.stat.Address = opts.Address diff --git a/internal/utils/cluster_nodes.go b/internal/utils/cluster_nodes.go index cf33b207..73d41a65 100644 --- a/internal/utils/cluster_nodes.go +++ b/internal/utils/cluster_nodes.go @@ -11,7 +11,7 @@ import ( ) func GetRedisClusterNodes(ctx context.Context, address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) { - c := client.NewRedisClient(ctx, address, username, password, Tls) + c := client.NewRedisClient(ctx, address, username, password, Tls, false) reply := c.DoWithStringReply("cluster", "nodes") reply = strings.TrimSpace(reply) slotsCount := 0 diff --git a/internal/writer/redis_sentinel_writer.go b/internal/writer/redis_sentinel_writer.go index 95bc65cc..0b7e7157 100644 --- a/internal/writer/redis_sentinel_writer.go +++ b/internal/writer/redis_sentinel_writer.go @@ -8,7 +8,7 @@ import ( ) func NewRedisSentinelWriter(ctx context.Context, opts *RedisWriterOptions) Writer { - sentinel := client.NewSentinelClient(ctx, opts.Address, opts.Tls) + sentinel := client.NewSentinelMasterClient(ctx, opts.Address, opts.Tls) sentinel.Send("SENTINEL", "GET-MASTER-ADDR-BY-NAME", opts.Master) addr, err := sentinel.Receive() if err != nil { diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go index 9ca81a72..392a8ff3 100644 --- a/internal/writer/redis_standalone_writer.go +++ b/internal/writer/redis_standalone_writer.go @@ -48,7 +48,7 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri rw := new(redisStandaloneWriter) rw.address = opts.Address rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1) - rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls) + rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false) if opts.OffReply { log.Infof("turn off the reply of write") rw.offReply = true From 908c575c80e65331acf8e10a3cb41e0370611c75 Mon Sep 17 00:00:00 2001 From: yanshushuang Date: Fri, 14 Jun 2024 09:16:52 +0800 Subject: [PATCH 11/11] fix prefer replica bug --- internal/reader/scan_standalone_reader.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index a3eec7f2..f881d588 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -63,7 +63,7 @@ type scanStandaloneReader struct { func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reader { r := new(scanStandaloneReader) // dbs - c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls) + c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) if c.IsCluster() { // not use opts.Cluster, because user may use standalone mode to scan a cluster node r.dbs = []int{0} } else { @@ -100,7 +100,7 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry } func (r *scanStandaloneReader) subscript() { - c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica) if len(r.dbs) == 0 { c.Send("psubscribe", "__keyevent@*__:*") } else { @@ -149,7 +149,7 @@ func (r *scanStandaloneReader) subscript() { } func (r *scanStandaloneReader) scan() { - c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica) defer c.Close() for _, dbId := range r.dbs { if dbId != 0 { @@ -194,7 +194,7 @@ func (r *scanStandaloneReader) scan() { func (r *scanStandaloneReader) dump() { nowDbId := 0 - r.dumpClient = client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + r.dumpClient = client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica) // Support prefer_replica=true in both Cluster and Standalone mode if r.opts.PreferReplica { r.dumpClient.Do("READONLY")