Skip to content

Commit

Permalink
bugfix: sync reader adaptive gracefully shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
zgg2001 committed Jan 17, 2024
1 parent 871c038 commit f3c0d7d
Showing 1 changed file with 38 additions and 6 deletions.
44 changes: 38 additions & 6 deletions internal/reader/sync_standalone_reader.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
package reader

import (
"context"
"bufio"
"context"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"

"RedisShake/internal/client"
Expand Down Expand Up @@ -94,21 +95,48 @@ func (r *syncStandaloneReader) StartRead(ctx context.Context) chan *entry.Entry
r.ctx = ctx
r.ch = make(chan *entry.Entry, 1024)
go func() {

readerStopCh := make(chan struct{})
readerStopDone := make(chan struct{})
var wg sync.WaitGroup

r.sendReplconfListenPort()
r.sendPSync()
go r.sendReplconfAck() // start sent replconf ack
wg.Add(1)
go func() {
defer wg.Done()
go r.sendReplconfAck(readerStopCh) // start sent replconf ack
}()
rdbFilePath := r.receiveRDB()
startOffset := r.stat.AofReceivedOffset
go r.receiveAOF(r.rd)
wg.Add(1)
go func() {
defer wg.Done()
r.receiveAOF(r.rd, readerStopCh)
}()
go func() {
wg.Wait()
readerStopDone <- struct{}{}
}()

if r.opts.SyncRdb {
r.sendRDB(rdbFilePath)
}
if r.opts.SyncAof {
r.stat.Status = kSyncAof
r.sendAOF(startOffset)
}

close(readerStopCh)
timeout := time.Duration(10) * time.Second
select {
case <-readerStopDone:
r.client.Close()
log.Debugf("[%s] close reader client", r.stat.Name)
case <-time.After(timeout):
log.Debugf("[%s] The risk of Goroutine leaks is present", r.stat.Name)
}
close(r.ch)
r.client.Close()
}()

return r.ch
Expand Down Expand Up @@ -225,7 +253,7 @@ func (r *syncStandaloneReader) receiveRDB() string {
return rdbFilePath
}

func (r *syncStandaloneReader) receiveAOF(rd io.Reader) {
func (r *syncStandaloneReader) receiveAOF(rd io.Reader, stopCh chan struct{}) {
log.Debugf("[%s] start receiving aof data, and save to file", r.stat.Name)
aofWriter := rotate.NewAOFWriter(r.stat.Name, r.stat.Dir, r.stat.AofReceivedOffset)
defer aofWriter.Close()
Expand All @@ -234,6 +262,8 @@ func (r *syncStandaloneReader) receiveAOF(rd io.Reader) {
select {
case <-r.ctx.Done():
return
case <-stopCh:
return
default:
n, err := rd.Read(buf)
if err != nil {
Expand Down Expand Up @@ -310,13 +340,15 @@ func (r *syncStandaloneReader) sendAOF(offset int64) {
}

// sendReplconfAck send replconf ack to master to keep heartbeat between redis-shake and source redis.
func (r *syncStandaloneReader) sendReplconfAck() {
func (r *syncStandaloneReader) sendReplconfAck(stopCh chan struct{}) {
ticker := time.NewTicker(time.Millisecond * 100)
defer ticker.Stop()
for range ticker.C {
select {
case <-r.ctx.Done():
return
case <-stopCh:
return
default:
if r.stat.AofReceivedOffset != 0 {
r.client.Send("replconf", "ack", strconv.FormatInt(r.stat.AofReceivedOffset, 10))
Expand Down

0 comments on commit f3c0d7d

Please sign in to comment.