@@ -547,17 +547,18 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
547
547
tikvClient := s .GetTiKVClient ()
548
548
wg := & sync.WaitGroup {}
549
549
wg .Add (len (stores ))
550
- safeTS , txnScopeMap , err := s .buildTxnScopeMap (ctx )
550
+ clusterTS , txnScopeMap , err := s .buildTxnScopeMap (ctx , stores )
551
551
for _ , store := range stores {
552
552
storeID := store .StoreID ()
553
553
storeAddr := store .GetAddr ()
554
554
go func (ctx context.Context , wg * sync.WaitGroup , storeID uint64 , storeAddr string ) {
555
555
defer wg .Done ()
556
556
557
557
storeIDStr := strconv .Itoa (int (storeID ))
558
+ safeTS := clusterTS
558
559
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
559
560
if safeTS == 0 || err != nil {
560
- resp , err := tikvClient .SendRequest (
561
+ resp , e := tikvClient .SendRequest (
561
562
ctx , storeAddr , tikvrpc .NewRequest (
562
563
tikvrpc .CmdStoreSafeTS , & kvrpcpb.StoreSafeTSRequest {
563
564
KeyRange : & kvrpcpb.KeyRange {
@@ -569,7 +570,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
569
570
},
570
571
), client .ReadTimeoutShort ,
571
572
)
572
- if err != nil {
573
+ if e != nil {
573
574
metrics .TiKVSafeTSUpdateCounter .WithLabelValues ("fail" , storeIDStr ).Inc ()
574
575
logutil .BgLogger ().Debug ("update safeTS failed" , zap .Error (err ), zap .Uint64 ("store-id" , storeID ))
575
576
return
@@ -591,19 +592,22 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
591
592
}(ctx , wg , storeID , storeAddr )
592
593
}
593
594
594
- for txnScope , storeIDs := range txnScopeMap {
595
- s .updateMinSafeTS (txnScope , storeIDs )
595
+ if clusterTS != 0 && err == nil {
596
+ s .minSafeTS .Store (oracle .GlobalTxnScope , clusterTS )
597
+ } else {
598
+ for txnScope , storeIDs := range txnScopeMap {
599
+ s .updateMinSafeTS (txnScope , storeIDs )
600
+ }
596
601
}
597
602
wg .Wait ()
598
603
}
599
604
600
605
// build txnScopeMap and judge whether it is needed to get safeTS from PD.
601
606
// - if stores label are global, return get cluster min resolved ts from pd.
602
607
// - if contains dc label store, return try to get it from TiKV.
603
- func (s * KVStore ) buildTxnScopeMap (ctx context.Context ) (safeTS uint64 , txnScopeMap map [string ][]uint64 , err error ) {
608
+ func (s * KVStore ) buildTxnScopeMap (ctx context.Context , stores [] * Store ) (safeTS uint64 , txnScopeMap map [string ][]uint64 , err error ) {
604
609
isGlobal := true
605
610
txnScopeMap = make (map [string ][]uint64 )
606
- stores := s .regionCache .GetStoresByType (tikvrpc .TiKV )
607
611
for _ , store := range stores {
608
612
txnScopeMap [oracle .GlobalTxnScope ] = append (txnScopeMap [oracle .GlobalTxnScope ], store .StoreID ())
609
613
0 commit comments