Skip to content

Commit e23033f

Browse files
committed
use config instead
Signed-off-by: husharp <[email protected]>
1 parent 2359b66 commit e23033f

File tree

3 files changed

+47
-45
lines changed

3 files changed

+47
-45
lines changed

integration_tests/pd_api_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import (
4646
"github.com/pingcap/kvproto/pkg/kvrpcpb"
4747
"github.com/pingcap/kvproto/pkg/metapb"
4848
"github.com/stretchr/testify/suite"
49+
"github.com/tikv/client-go/v2/config"
4950
"github.com/tikv/client-go/v2/oracle"
5051
"github.com/tikv/client-go/v2/tikv"
5152
"github.com/tikv/client-go/v2/tikvrpc"
@@ -167,6 +168,11 @@ func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
167168

168169
// Set DC label for store 1.
169170
dcLabel := "testDC"
171+
restore := config.UpdateGlobal(func(conf *config.Config) {
172+
conf.TxnScope = dcLabel
173+
})
174+
defer restore()
175+
170176
labels := []*metapb.StoreLabel{
171177
{
172178
Key: tikv.DCLabelKey,

tikv/kv.go

+37-41
Original file line numberDiff line numberDiff line change
@@ -542,33 +542,13 @@ func (s *KVStore) safeTSUpdater() {
542542
}
543543
}
544544

545-
var (
546-
skipSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster")
547-
successSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster")
548-
clusterMinSafeTSGap = metrics.TiKVMinSafeTSGapSeconds.WithLabelValues("cluster")
549-
)
550-
551545
func (s *KVStore) updateSafeTS(ctx context.Context) {
552-
stores := s.regionCache.GetStoresByType(tikvrpc.TiKV)
553-
// Try getting the cluster-level minimum resolved timestamp from PD first.
554-
clusterMinSafeTS, txnScopeMap, err := s.buildTxnScopeMap(ctx, stores)
555-
if clusterMinSafeTS != 0 && err == nil {
556-
preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope)
557-
if preClusterMinSafeTS > clusterMinSafeTS {
558-
skipSafeTSUpdateCounter.Inc()
559-
preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS)
560-
clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds())
561-
return
562-
} else {
563-
s.minSafeTS.Store(oracle.GlobalTxnScope, clusterMinSafeTS)
564-
successSafeTSUpdateCounter.Inc()
565-
safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS)
566-
clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds())
567-
}
568-
546+
// Try to get the cluster-level minimum resolved timestamp from PD first.
547+
if s.setClusterMinSafeTSByPD(ctx) {
569548
return
570549
}
571550

551+
stores := s.regionCache.GetStoresByType(tikvrpc.TiKV)
572552
tikvClient := s.GetTiKVClient()
573553
wg := &sync.WaitGroup{}
574554
wg.Add(len(stores))
@@ -580,7 +560,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
580560
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
581561
defer wg.Done()
582562

583-
resp, e := tikvClient.SendRequest(
563+
resp, err := tikvClient.SendRequest(
584564
ctx, storeAddr, tikvrpc.NewRequest(
585565
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
586566
KeyRange: &kvrpcpb.KeyRange{
@@ -593,7 +573,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
593573
), client.ReadTimeoutShort,
594574
)
595575
storeIDStr := strconv.Itoa(int(storeID))
596-
if e != nil {
576+
if err != nil {
597577
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
598578
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
599579
return
@@ -614,36 +594,52 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
614594
}(ctx, wg, storeID, storeAddr)
615595
}
616596

617-
for txnScope, storeIDs := range txnScopeMap {
618-
s.updateMinSafeTS(txnScope, storeIDs)
619-
}
620-
wg.Wait()
621-
}
622-
623-
// build txnScopeMap and judge whether it is needed to get safeTS from PD.
624-
// - if stores label are global, return get cluster min resolved ts from pd.
625-
// - if contains dc label store, return try to get it from TiKV.
626-
func (s *KVStore) buildTxnScopeMap(ctx context.Context, stores []*Store) (safeTS uint64, txnScopeMap map[string][]uint64, err error) {
627-
isGlobal := true
628-
txnScopeMap = make(map[string][]uint64)
597+
txnScopeMap := make(map[string][]uint64)
629598
for _, store := range stores {
630599
txnScopeMap[oracle.GlobalTxnScope] = append(txnScopeMap[oracle.GlobalTxnScope], store.StoreID())
631600

632601
if label, ok := store.GetLabelValue(DCLabelKey); ok {
633602
txnScopeMap[label] = append(txnScopeMap[label], store.StoreID())
634-
isGlobal = false
635603
}
636604
}
605+
for txnScope, storeIDs := range txnScopeMap {
606+
s.updateMinSafeTS(txnScope, storeIDs)
607+
}
608+
wg.Wait()
609+
}
610+
611+
var (
612+
skipClusterSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster")
613+
successClusterSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster")
614+
clusterMinSafeTSGap = metrics.TiKVMinSafeTSGapSeconds.WithLabelValues("cluster")
615+
)
637616

617+
// setClusterMinSafeTSByPD try to get cluster-level's min resolved timestamp from PD when @@txn_scope is `global`.
618+
func (s *KVStore) setClusterMinSafeTSByPD(ctx context.Context) bool {
619+
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
638620
// Try to get the minimum resolved timestamp of the cluster from PD.
639621
if s.pdHttpClient != nil && isGlobal {
640-
safeTS, err = s.pdHttpClient.GetClusterMinResolvedTS(ctx)
622+
clusterMinSafeTS, err := s.pdHttpClient.GetClusterMinResolvedTS(ctx)
641623
if err != nil {
642-
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err))
624+
logutil.BgLogger().Debug("get cluster-level min resolved timestamp from PD failed", zap.Error(err))
625+
} else if clusterMinSafeTS != 0 {
626+
// Update metrics.
627+
preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope)
628+
if preClusterMinSafeTS > clusterMinSafeTS {
629+
skipClusterSafeTSUpdateCounter.Inc()
630+
preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS)
631+
clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds())
632+
} else {
633+
s.minSafeTS.Store(oracle.GlobalTxnScope, clusterMinSafeTS)
634+
successClusterSafeTSUpdateCounter.Inc()
635+
safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS)
636+
clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds())
637+
}
638+
return true
643639
}
644640
}
645641

646-
return safeTS, txnScopeMap, err
642+
return false
647643
}
648644

649645
// Variables defines the variables used by TiKV storage.

util/pd.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ const (
5656
// pd request retry time when connection fail.
5757
pdRequestRetryTime = 10
5858

59-
storeMinResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
59+
minResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
6060
)
6161

6262
// PDHTTPClient is an HTTP client of pd.
@@ -90,13 +90,13 @@ func NewPDHTTPClient(
9090
func (p *PDHTTPClient) GetClusterMinResolvedTS(ctx context.Context) (uint64, error) {
9191
var err error
9292
for _, addr := range p.addrs {
93-
v, e := pdRequest(ctx, addr, storeMinResolvedTSPrefix, p.cli, http.MethodGet, nil)
93+
v, e := pdRequest(ctx, addr, minResolvedTSPrefix, p.cli, http.MethodGet, nil)
9494
if e != nil {
9595
logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
9696
err = e
9797
continue
9898
}
99-
logutil.BgLogger().Debug("store min resolved ts", zap.String("resp", string(v)))
99+
logutil.BgLogger().Debug("get cluster min resolved ts", zap.String("resp", string(v)))
100100
d := struct {
101101
IsRealTime bool `json:"is_real_time,omitempty"`
102102
MinResolvedTS uint64 `json:"min_resolved_ts"`
@@ -106,7 +106,7 @@ func (p *PDHTTPClient) GetClusterMinResolvedTS(ctx context.Context) (uint64, err
106106
return 0, errors.Trace(err)
107107
}
108108
if !d.IsRealTime {
109-
message := fmt.Errorf("store min resolved ts not enabled, addr: %s", addr)
109+
message := fmt.Errorf("cluster min resolved ts not enabled, addr: %s", addr)
110110
logutil.BgLogger().Debug(message.Error())
111111
return 0, errors.Trace(message)
112112
}

0 commit comments

Comments
 (0)