diff --git a/internal/client/redis.go b/internal/client/redis.go index 2d05e22a..f7e4c4b4 100644 --- a/internal/client/redis.go +++ b/internal/client/redis.go @@ -46,8 +46,9 @@ func NewRedisClient(ctx context.Context, address string, username string, passwo } r.conn = conn + // Increase the size of the underlying TCP send cache to avoid short-write errors r.reader = bufio.NewReader(conn) - r.writer = bufio.NewWriterSize(conn, 16*1024*1024) // size is 16MB + r.writer = bufio.NewWriterSize(conn, 32*1024) // size is 32KiB r.protoReader = proto.NewReader(r.reader) r.protoWriter = proto.NewWriter(r.writer) diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go index b51a29c1..518452b8 100644 --- a/internal/writer/redis_standalone_writer.go +++ b/internal/writer/redis_standalone_writer.go @@ -50,13 +50,13 @@ func NewRedisStandaloneWriter(ctx context.Context, opts *RedisWriterOptions) Wri rw.address = opts.Address rw.stat.Name = "writer_" + strings.Replace(opts.Address, ":", "_", -1) rw.client = client.NewRedisClient(ctx, opts.Address, opts.Username, opts.Password, opts.Tls, false) - rw.ch = make(chan *entry.Entry, 1024) + rw.ch = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit) if opts.OffReply { log.Infof("turn off the reply of write") rw.offReply = true rw.client.Send("CLIENT", "REPLY", "OFF") } else { - rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit) + rw.chWaitReply = make(chan *entry.Entry, config.Opt.Advanced.PipelineCountLimit*2) rw.chWaitWg.Add(1) go rw.processReply() } @@ -75,40 +75,7 @@ func (w *redisStandaloneWriter) Close() { func (w *redisStandaloneWriter) StartWrite(ctx context.Context) chan *entry.Entry { w.chWg = sync.WaitGroup{} w.chWg.Add(1) - timer := time.NewTicker(10 * time.Millisecond) - go func() { - for { - select { - case <-ctx.Done(): - // do nothing until w.ch is closed - case <-timer.C: - w.client.Flush() - case e, ok := <-w.ch: - if !ok { - w.client.Flush() - w.chWg.Done() - return - } - // switch db if we need - if w.DbId != e.DbId { - w.switchDbTo(e.DbId) - } - // send - bytes := e.Serialize() - for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen { - time.Sleep(1 * time.Nanosecond) - } - log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String()) - if !w.offReply { - w.chWaitReply <- e - atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize) - atomic.AddInt64(&w.stat.UnansweredEntries, 1) - } - w.client.SendBytesBuff(bytes) - } - } - }() - + go w.processWrite(ctx) return w.ch } @@ -128,6 +95,47 @@ func (w *redisStandaloneWriter) switchDbTo(newDbId int) { } } +func (w *redisStandaloneWriter) processWrite(ctx context.Context) { + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + // do nothing until w.ch is closed + case <-ticker.C: + w.client.Flush() + case e, ok := <-w.ch: + if !ok { + // clean up and exit + w.client.Flush() + w.chWg.Done() + return + } + // switch db if we need + if w.DbId != e.DbId { + w.switchDbTo(e.DbId) + } + // send + bytes := e.Serialize() + for e.SerializedSize+atomic.LoadInt64(&w.stat.UnansweredBytes) > config.Opt.Advanced.TargetRedisClientMaxQuerybufLen { + time.Sleep(1 * time.Nanosecond) + } + log.Debugf("[%s] send cmd. cmd=[%s]", w.stat.Name, e.String()) + if !w.offReply { + select { + case w.chWaitReply <- e: + default: + w.client.Flush() + w.chWaitReply <- e + } + atomic.AddInt64(&w.stat.UnansweredBytes, e.SerializedSize) + atomic.AddInt64(&w.stat.UnansweredEntries, 1) + } + w.client.SendBytesBuff(bytes) + } + } +} + func (w *redisStandaloneWriter) processReply() { for e := range w.chWaitReply { reply, err := w.client.Receive()