From 461c16a4b84f8a52eda4fdcf6028934ac2e2abd6 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Tue, 11 Feb 2025 16:56:49 +0800 Subject: [PATCH] This is an automated cherry-pick of #9018 close tikv/pd#9017 Signed-off-by: ti-chi-bot --- server/region_syncer/server.go | 61 ++++++++++++++++++++++++---------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/server/region_syncer/server.go b/server/region_syncer/server.go index 723cef879b5..d95e1f33ad0 100644 --- a/server/region_syncer/server.go +++ b/server/region_syncer/server.go @@ -28,6 +28,7 @@ import ( "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/grpcutil" "github.com/tikv/pd/pkg/ratelimit" +<<<<<<< HEAD:server/region_syncer/server.go "github.com/tikv/pd/pkg/syncutil" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/storage" @@ -36,6 +37,14 @@ import ( "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" +======= + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/keypath" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" +>>>>>>> cbb4dfeb7 (syncer: exit broadcast as soon as possible (#9018)):pkg/syncer/server.go ) const ( @@ -163,13 +172,13 @@ func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *cor RegionLeaders: leaders, Buckets: buckets, } - s.broadcast(regions) + s.broadcast(ctx, regions) case <-ticker.C: alive := &pdpb.SyncRegionResponse{ Header: &pdpb.ResponseHeader{ClusterId: s.server.ClusterID()}, StartIndex: s.history.GetNextIndex(), } - s.broadcast(alive) + s.broadcast(ctx, alive) } requests = requests[:0] stats = stats[:0] @@ -335,23 +344,39 @@ func (s *RegionSyncer) bindStream(name string, stream ServerStream) { s.mu.streams[name] = stream } -func (s *RegionSyncer) broadcast(regions *pdpb.SyncRegionResponse) { - var failed []string - s.mu.RLock() - for name, sender := range s.mu.streams { - err := sender.Send(regions) - if err != nil { - log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err)) - failed = append(failed, name) +func (s *RegionSyncer) broadcast(ctx context.Context, regions *pdpb.SyncRegionResponse) { + broadcastDone := make(chan struct{}, 1) + go func() { + defer logutil.LogPanic() + var failed []string + s.mu.RLock() + for name, sender := range s.mu.streams { + select { + case <-ctx.Done(): + s.mu.RUnlock() + close(broadcastDone) + return + default: + } + err := sender.Send(regions) + if err != nil { + log.Error("region syncer send data meet error", errs.ZapError(errs.ErrGRPCSend, err)) + failed = append(failed, name) + } } - } - s.mu.RUnlock() - if len(failed) > 0 { - s.mu.Lock() - for _, name := range failed { - delete(s.mu.streams, name) - log.Info("region syncer delete the stream", zap.String("stream", name)) + s.mu.RUnlock() + if len(failed) > 0 { + s.mu.Lock() + for _, name := range failed { + delete(s.mu.streams, name) + log.Info("region syncer delete the stream", zap.String("stream", name)) + } + s.mu.Unlock() } - s.mu.Unlock() + close(broadcastDone) + }() + select { + case <-broadcastDone: + case <-ctx.Done(): } }