Skip to content

Commit

Permalink
syncer: exit broadcast as soon as possible (#9018) (#9074)
Browse files Browse the repository at this point in the history
close #9017

Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
ti-chi-bot and rleungx authored Feb 12, 2025
1 parent 5036cc2 commit 20dd6ba
Showing 1 changed file with 35 additions and 18 deletions.
53 changes: 35 additions & 18 deletions pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/syncutil"
"go.uber.org/zap"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -160,13 +161,13 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor
RegionLeaders: leaders,
Buckets: buckets,
}
s.broadcast(regions)
s.broadcast(ctx, regions)
case <-ticker.C:
alive := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()},
StartIndex: s.history.getNextIndex(),
}
s.broadcast(alive)
s.broadcast(ctx, alive)
}
requests = requests[:0]
stats = stats[:0]
Expand Down Expand Up @@ -344,23 +345,39 @@ func (s *RegionSyncer) bindStream(name string, stream ServerStream) {
s.mu.streams[name] = stream
}

func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) {
var failed []string
s.mu.RLock()
for name, sender := range s.mu.streams {
err := sender.Send(regions)
if err != nil {
log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err))
failed = append(failed, name)
func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionResponse) {
broadcastDone := make(chan struct{}, 1)
go func() {
defer logutil.LogPanic()
var failed []string
s.mu.RLock()
for name, sender := range s.mu.streams {
select {
case <-ctx.Done():
s.mu.RUnlock()
close(broadcastDone)
return
default:
}
err := sender.Send(regions)
if err != nil {
log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err))
failed = append(failed, name)
}
}
}
s.mu.RUnlock()
if len(failed) > 0 {
s.mu.Lock()
for _, name := range failed {
delete(s.mu.streams, name)
log.Info("region syncer delete the stream", zap.String("stream", name))
s.mu.RUnlock()
if len(failed) > 0 {
s.mu.Lock()
for _, name := range failed {
delete(s.mu.streams, name)
log.Info("region syncer delete the stream", zap.String("stream", name))
}
s.mu.Unlock()
}
s.mu.Unlock()
close(broadcastDone)
}()
select {
case <-broadcastDone:
case <-ctx.Done():
}
}

0 comments on commit 20dd6ba

Please sign in to comment.