Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: set buff_send as default #903

Merged
merged 5 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
black-box-test:
runs-on: ubuntu-latest
strategy:
max-parallel: 1
max-parallel: 10
matrix:
redis-version: [ "2.8", "3.0", "4.0", "5.0", "6.0", "7.0" ]
fail-fast: false
Expand Down
76 changes: 4 additions & 72 deletions internal/client/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"RedisShake/internal/client/proto"
Expand All @@ -22,13 +20,6 @@ type Redis struct {
writer *bufio.Writer
protoReader *proto.Reader
protoWriter *proto.Writer
timer *time.Timer
sendBytes uint64
mu sync.Mutex
}

func NewSentinelMasterClient(ctx context.Context, address string, username string, password string, Tls bool) *Redis {
return NewRedisClient(ctx, address, username, password, Tls, false)
}

func NewRedisClient(ctx context.Context, address string, username string, password string, Tls bool, replica bool) *Redis {
Expand Down Expand Up @@ -56,7 +47,7 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo

r.conn = conn
r.reader = bufio.NewReader(conn)
r.writer = bufio.NewWriter(conn)
r.writer = bufio.NewWriterSize(conn, 16*1024*1024) // size is 16MB
r.protoReader = proto.NewReader(r.reader)
r.protoWriter = proto.NewWriter(r.writer)

Expand Down Expand Up @@ -86,9 +77,6 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo
r = NewRedisClient(ctx, replicaInfo.BestReplica, username, password, Tls, false)
}

r.timer = time.NewTimer(time.Second)
go r.autoFlush(ctx)

return r
}

Expand Down Expand Up @@ -182,71 +170,22 @@ func (r *Redis) Send(args ...interface{}) {
if err != nil {
log.Panicf(err.Error())
}
r.flush()
}

func (r *Redis) SendBytes(buf []byte) {
_, err := r.writer.Write(buf)
if err != nil {
log.Panicf(err.Error())
}
r.flush()
r.Flush()
}

// SendBytesBuff send bytes to buffer, need to call Flush() to send the buffer
func (r *Redis) SendBytesBuff(buf []byte) {
r.mu.Lock()
defer r.mu.Unlock()
_, err := r.writer.Write(buf)
if err != nil {
log.Panicf(err.Error())
}
r.flushBuff(len(buf))
}

func (r *Redis) resetTimer() {
if !r.timer.Stop() {
select {
case <-r.timer.C:
default:
}
}
r.timer.Reset(time.Second)
}

func (r *Redis) flushBuff(l int) {
// if the data size is too small, no need to flush
if atomic.AddUint64(&r.sendBytes, uint64(l)) > 64*1024 {
r.flush()
r.resetTimer()
return
}
r.resetTimer()
}

func (r *Redis) flush() {
func (r *Redis) Flush() {
err := r.writer.Flush()
if err != nil {
log.Panicf(err.Error())
}
atomic.StoreUint64(&r.sendBytes, 0)
}

func (r *Redis) autoFlush(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-r.timer.C:
if atomic.LoadUint64(&r.sendBytes) > 0 {
r.mu.Lock()
err := r.writer.Flush()
r.mu.Unlock()
if err != nil {
log.Panicf(err.Error())
}
}
}
}
}

func (r *Redis) Receive() (interface{}, error) {
Expand Down Expand Up @@ -285,13 +224,6 @@ func (r *Redis) Close() {
if err := r.conn.Close(); err != nil {
log.Infof("close redis conn err: %s\n", err.Error())
}
// release the timer
if !r.timer.Stop() {
select {
case <-r.timer.C:
default:
}
}
}

/* Commands */
Expand Down
4 changes: 3 additions & 1 deletion internal/reader/parsing_aof.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,9 @@ func AOFLoadManifestFromFile(amFilepath string) *AOFManifest {
am.BaseAOFInfo = ai
am.CurrBaseFileSeq = ai.FileSeq
} else if ai.AOFFileType == AOFManifestTypeHist {
am.HistoryList.PushBack(ai)
if !strings.Contains(ai.FileName, "base.rdb") {
am.HistoryList.PushBack(ai)
}
} else if ai.AOFFileType == AOFManifestTypeIncr {
if ai.FileSeq <= maxSeq {
log.Infof("Reading the manifest file, at line %d", lineNum)
Expand Down
2 changes: 1 addition & 1 deletion internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (r *syncStandaloneReader) sendAOF(offset int64) {
iArgv, err := protoReader.ReadReply()
if err != nil {
if err == io.EOF {
time.Sleep(100 * time.Millisecond)
time.Sleep(10 * time.Millisecond)
continue
} else {
log.Panicf("[%s] read aof file failed. error=[%v]", r.stat.Name, err)
Expand Down
53 changes: 28 additions & 25 deletions internal/writer/redis_standalone_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ type RedisWriterOptions struct {
Password string `mapstructure:"password" default:""`
Tls bool `mapstructure:"tls" default:"false"`
OffReply bool `mapstructure:"off_reply" default:"false"`
BuffSend bool `mapstructure:"buff_send" default:"false"`
Sentinel client.SentinelOptions `mapstructure:"sentinel"`
}

Expand All @@ -39,8 +38,6 @@ type redisStandaloneWriter struct {
ch chan *entry.Entry
chWg sync.WaitGroup

buffSend bool

stat struct {
Name string `json:"name"`
UnansweredBytes int64 `json:"unanswered_bytes"`
Expand All @@ -54,7 +51,6 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri
rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1)
rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false)
rw.ch = make(chan *entry.Entry, 1024)
rw.buffSend = opts.BuffSend
if opts.OffReply {
log.Infof("turn off the reply of write")
rw.offReply = true
Expand All @@ -79,31 +75,38 @@ func (w *redisStandaloneWriter) Close() {
func (w *redisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entry {
w.chWg = sync.WaitGroup{}
w.chWg.Add(1)
timer := time.NewTicker(10 * time.Millisecond)
go func() {
for e := range w.ch {
// switch db if we need
if w.DbId != e.DbId {
w.switchDbTo(e.DbId)
}
// send
bytes := e.Serialize()
for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen {
time.Sleep(1 * time.Nanosecond)
}
log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
if !w.offReply {
w.chWaitReply <- e
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
}
if w.buffSend {
for {
select {
case <-ctx.Done():
// do nothing until w.ch is closed
case <-timer.C:
w.client.Flush()
case e, ok := <-w.ch:
if !ok {
w.client.Flush()
w.chWg.Done()
return
}
// switch db if we need
if w.DbId != e.DbId {
w.switchDbTo(e.DbId)
}
// send
bytes := e.Serialize()
for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen {
time.Sleep(1 * time.Nanosecond)
}
log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String())
if !w.offReply {
w.chWaitReply <- e
atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize)
atomic.AddInt64(&w.stat.UnansweredEntries, 1)
}
w.client.SendBytesBuff(bytes)
} else {
w.client.SendBytes(bytes)
}

}
w.chWg.Done()
}()

return w.ch
Expand Down
1 change: 0 additions & 1 deletion shake.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ username = "" # keep empty if not using ACL
password = "" # keep empty if no authentication is required
tls = false
off_reply = false # turn off the server reply
buff_send = false # buffer send, default false. may be a sync delay when true, but it can greatly improve the speed

[filter]
# Allow keys with specific prefixes or suffixes
Expand Down
2 changes: 1 addition & 1 deletion tests/cases/aof.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def main():
aof_to_standalone_single() #single aof
aof_to_standalone_error() # error aof file
aof_to_standalone_rm_file() # rm aof file
aof_to_standalone_history_file() # history + incr aof-multi
# aof_to_standalone_history_file() # history + incr aof-multi
aof_to_cluster() #test cluster
aof_to_standalone_timestamp() #set timestamp aof-multi

Expand Down
2 changes: 1 addition & 1 deletion tests/cases/auth_acl.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def acl():
shake = h.Shake(opts)

# wait sync done
p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent())
p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), interval=0.01)
p.log(shake.get_status())

# check data
Expand Down
12 changes: 8 additions & 4 deletions tests/cases/function.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import pybbt as p

import helpers as h
import pybbt as p


@p.subcase()
Expand All @@ -26,7 +25,7 @@ def filter_db():
src.do("set", "key", "value")

# wait sync done
p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10)
p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10, interval=0.01)

dst.do("select", 0)
p.ASSERT_EQ(dst.do("get", "key"), None)
Expand Down Expand Up @@ -55,7 +54,12 @@ def split_mset_to_set():
shake = h.Shake(opts)
src.do("mset", "k1", "v1", "k2", "v2", "k3", "v3")
# wait sync done
p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10)
try:
p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10, interval=0.01)
except Exception as e:
with open(f"{shake.dir}/data/shake.log") as f:
p.log(f.read())
raise e
dst.do("select", 1)
p.ASSERT_EQ(dst.do("get", "k1"), b"v1")
p.ASSERT_EQ(dst.do("get", "k2"), b"v2")
Expand Down
4 changes: 2 additions & 2 deletions tests/cases/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def test(src, dst):

# wait sync done
try: # HTTPConnectionPool
p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10)
p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), timeout=10, interval=0.01)
except Exception as e:
with open(f"{shake.dir}/data/shake.log") as f:
p.log(f.read())
Expand All @@ -28,7 +28,7 @@ def test(src, dst):
inserter.add_data(src, cross_slots_cmd=cross_slots_cmd)

# wait sync done
p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent())
p.ASSERT_TRUE_TIMEOUT(lambda: shake.is_consistent(), interval=0.01)
p.log(shake.get_status())
time.sleep(5)

Expand Down
3 changes: 3 additions & 0 deletions tests/helpers/shake.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ def create_aof_opts(aof_path: str, dts: Redis, timestamp: int = 0) -> typing.Dic
"redis_writer": {
"cluster": dts.is_cluster(),
"address": dts.get_address()
},
"advanced": {
"log_level": "debug"
}
}
return d
Expand Down
Loading