diff --git a/internal/client/redis.go b/internal/client/redis.go index 01a70661..11cec85a 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -5,7 +5,9 @@ import ( "context" "crypto/tls" "net" + "regexp" "strconv" + "strings" "time" "RedisShake/internal/client/proto" @@ -20,11 +22,11 @@ type Redis struct { protoWriter *proto.Writer } -func NewSentinelClient(ctx context.Context, address string, Tls bool) *Redis { - return NewRedisClient(ctx, address, "", "", Tls) +func NewSentinelMasterClient(ctx context.Context, address string, Tls bool) *Redis { + return NewRedisClient(ctx, address, "", "", Tls, false) } -func NewRedisClient(ctx context.Context, address string, username string, password string, Tls bool) *Redis { +func NewRedisClient(ctx context.Context, address string, username string, password string, Tls bool, replica bool) *Redis { r := new(Redis) var conn net.Conn var dialer = &net.Dialer{ @@ -71,10 +73,73 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo if reply != "PONG" { panic("ping failed with reply: " + reply) } + reply = r.DoWithStringReply("info", "replication") + // get best replica + if replica { + replicaInfo := getReplicaAddr(reply) + log.Infof("best replica: %s", replicaInfo.BestReplica) + r = NewRedisClient(ctx, replicaInfo.BestReplica, username, password, Tls, false) + } return r } +type Replica struct { + Addr string + Offset string +} + +type RedisReplicaInfo struct { + Role string + BestReplica string +} + +func getReplicaAddr(info string) RedisReplicaInfo { + infoReplica := RedisReplicaInfo{} + replicas := make([]Replica, 0) + slaveInfoRegexp := regexp.MustCompile(`slave\d+:ip=.*`) + for _, line := range strings.Split(info, "\n") { + line = strings.TrimSpace(line) + switch { + case strings.HasPrefix(line, "role:slave"): + infoReplica.Role = "slave" + return infoReplica + case strings.HasPrefix(line, "role:master"): + infoReplica.Role = "master" + case slaveInfoRegexp.MatchString(line): + slaveInfo := strings.Split(line, ":") + s1 := slaveInfo[1] + slaveInfo = strings.Split(s1, ",") + replica := Replica{} + var host string + var port string + var offset string + for _, item := range slaveInfo { + if strings.HasPrefix(item, "ip=") { + host = strings.Split(item, "=")[1] + } + if strings.HasPrefix(item, "port=") { + port = strings.Split(item, "=")[1] + } + if strings.HasPrefix(item, "offset=") { + offset = strings.Split(item, "=")[1] + } + } + replica.Addr = host + ":" + port + replica.Offset = offset + replicas = append(replicas, replica) + } + } + best := replicas[0] + for _, replica := range replicas { + if replica.Offset > best.Offset { + best = replica + } + } + infoReplica.BestReplica = best.Addr + return infoReplica +} + func (r *Redis) DoWithStringReply(args ...interface{}) string { r.Send(args...) diff --git a/internal/reader/scan_standalone_reader.go b/internal/reader/scan_standalone_reader.go index a3eec7f2..f881d588 100644 --- a/internal/reader/scan_standalone_reader.go +++ b/internal/reader/scan_standalone_reader.go @@ -63,7 +63,7 @@ type scanStandaloneReader struct { func NewScanStandaloneReader(ctx context.Context, opts *ScanReaderOptions) Reader { r := new(scanStandaloneReader) // dbs - c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls) + c := client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) if c.IsCluster() { // not use opts.Cluster, because user may use standalone mode to scan a cluster node r.dbs = []int{0} } else { @@ -100,7 +100,7 @@ func (r *scanStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry } func (r *scanStandaloneReader) subscript() { - c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica) if len(r.dbs) == 0 { c.Send("psubscribe", "__keyevent@*__:*") } else { @@ -149,7 +149,7 @@ func (r *scanStandaloneReader) subscript() { } func (r *scanStandaloneReader) scan() { - c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + c := client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica) defer c.Close() for _, dbId := range r.dbs { if dbId != 0 { @@ -194,7 +194,7 @@ func (r *scanStandaloneReader) scan() { func (r *scanStandaloneReader) dump() { nowDbId := 0 - r.dumpClient = client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls) + r.dumpClient = client.NewRedisClient(r.ctx, r.opts.Address, r.opts.Username, r.opts.Password, r.opts.Tls, r.opts.PreferReplica) // Support prefer_replica=true in both Cluster and Standalone mode if r.opts.PreferReplica { r.dumpClient.Do("READONLY") diff --git a/internal/reader/sync_standalone_reader.go b/internal/reader/sync_standalone_reader.go index df0cd590..77542f7d 100644 --- a/internal/reader/sync_standalone_reader.go +++ b/internal/reader/sync_standalone_reader.go @@ -83,7 +83,7 @@ type syncStandaloneReader struct { func NewSyncStandaloneReader(ctx context.Context, opts *SyncReaderOptions) Reader { r := new(syncStandaloneReader) r.opts = opts - r.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls) + r.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, opts.PreferReplica) r.rd = r.client.BufioReader() r.stat.Name = "reader_" + strings.Replace(opts.Address, ":", "_", -1) r.stat.Address = opts.Address diff --git a/internal/utils/cluster_nodes.go b/internal/utils/cluster_nodes.go index cf33b207..73d41a65 100644 --- a/internal/utils/cluster_nodes.go +++ b/internal/utils/cluster_nodes.go @@ -11,7 +11,7 @@ import ( ) func GetRedisClusterNodes(ctx context.Context, address string, username string, password string, Tls bool, perferReplica bool) (addresses []string, slots [][]int) { - c := client.NewRedisClient(ctx, address, username, password, Tls) + c := client.NewRedisClient(ctx, address, username, password, Tls, false) reply := c.DoWithStringReply("cluster", "nodes") reply = strings.TrimSpace(reply) slotsCount := 0 diff --git a/internal/writer/redis_sentinel_writer.go b/internal/writer/redis_sentinel_writer.go index 95bc65cc..0b7e7157 100644 --- a/internal/writer/redis_sentinel_writer.go +++ b/internal/writer/redis_sentinel_writer.go @@ -8,7 +8,7 @@ import ( ) func NewRedisSentinelWriter(ctx context.Context, opts *RedisWriterOptions) Writer { - sentinel := client.NewSentinelClient(ctx, opts.Address, opts.Tls) + sentinel := client.NewSentinelMasterClient(ctx, opts.Address, opts.Tls) sentinel.Send("SENTINEL", "GET-MASTER-ADDR-BY-NAME", opts.Master) addr, err := sentinel.Receive() if err != nil { diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go index 9ca81a72..392a8ff3 100644 --- a/internal/writer/redis_standalone_writer.go +++ b/internal/writer/redis_standalone_writer.go @@ -48,7 +48,7 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri rw := new(redisStandaloneWriter) rw.address = opts.Address rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1) - rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls) + rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false) if opts.OffReply { log.Infof("turn off the reply of write") rw.offReply = true