Skip to content

Commit e5fe177

Browse files
authored
min-resolved-ts: check dc label (#944)
1 parent 1bf6400 commit e5fe177

File tree

4 files changed

+120
-40
lines changed

4 files changed

+120
-40
lines changed

integration_tests/pd_api_test.go

+56-2
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ import (
4444

4545
"github.com/pingcap/failpoint"
4646
"github.com/pingcap/kvproto/pkg/kvrpcpb"
47+
"github.com/pingcap/kvproto/pkg/metapb"
4748
"github.com/stretchr/testify/suite"
49+
"github.com/tikv/client-go/v2/config"
4850
"github.com/tikv/client-go/v2/oracle"
4951
"github.com/tikv/client-go/v2/tikv"
5052
"github.com/tikv/client-go/v2/tikvrpc"
@@ -103,9 +105,9 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
103105
return c.Client.CloseAddr(addr)
104106
}
105107

106-
func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
108+
func (s *apiTestSuite) TestGetClusterMinResolvedTS() {
107109
util.EnableFailpoints()
108-
// Try to get the minimum resolved timestamp of the store from PD.
110+
// Try to get the minimum resolved timestamp of the cluster from PD.
109111
require := s.Require()
110112
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
111113
mockClient := storeSafeTsMockClient{
@@ -141,6 +143,58 @@ func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
141143
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
142144
}
143145

146+
func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
147+
util.EnableFailpoints()
148+
// Try to get the minimum resolved timestamp of the cluster from PD.
149+
require := s.Require()
150+
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
151+
mockClient := storeSafeTsMockClient{
152+
Client: s.store.GetTiKVClient(),
153+
}
154+
s.store.SetTiKVClient(&mockClient)
155+
var retryCount int
156+
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
157+
time.Sleep(2 * time.Second)
158+
if retryCount > 5 {
159+
break
160+
}
161+
retryCount++
162+
}
163+
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
164+
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
165+
defer func() {
166+
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
167+
}()
168+
169+
// Set DC label for store 1.
170+
dcLabel := "testDC"
171+
restore := config.UpdateGlobal(func(conf *config.Config) {
172+
conf.TxnScope = dcLabel
173+
})
174+
defer restore()
175+
176+
labels := []*metapb.StoreLabel{
177+
{
178+
Key: tikv.DCLabelKey,
179+
Value: dcLabel,
180+
},
181+
}
182+
s.store.GetRegionCache().SetRegionCacheStore(1, tikvrpc.TiKV, 1, labels)
183+
184+
// Try to get the minimum resolved timestamp of the store from TiKV.
185+
retryCount = 0
186+
for s.store.GetMinSafeTS(dcLabel) != 150 {
187+
time.Sleep(2 * time.Second)
188+
if retryCount > 5 {
189+
break
190+
}
191+
retryCount++
192+
}
193+
194+
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
195+
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
196+
}
197+
144198
func (s *apiTestSuite) TearDownTest() {
145199
if s.store != nil {
146200
s.Require().Nil(s.store.Close())

internal/locate/region_request_state_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ func TestRegionCacheStaleRead(t *testing.T) {
252252
originReloadRegionInterval := atomic.LoadInt64(&reloadRegionInterval)
253253
originBoTiKVServerBusy := retry.BoTiKVServerBusy
254254
defer func() {
255-
reloadRegionInterval = originReloadRegionInterval
255+
atomic.StoreInt64(&reloadRegionInterval, originReloadRegionInterval)
256256
retry.BoTiKVServerBusy = originBoTiKVServerBusy
257257
}()
258258
atomic.StoreInt64(&reloadRegionInterval, int64(24*time.Hour)) // disable reload region

tikv/kv.go

+57-30
Original file line numberDiff line numberDiff line change
@@ -543,49 +543,42 @@ func (s *KVStore) safeTSUpdater() {
543543
}
544544

545545
func (s *KVStore) updateSafeTS(ctx context.Context) {
546+
// Try to get the cluster-level minimum resolved timestamp from PD first.
547+
if s.setClusterMinSafeTSByPD(ctx) {
548+
return
549+
}
550+
546551
stores := s.regionCache.GetStoresByType(tikvrpc.TiKV)
547552
tikvClient := s.GetTiKVClient()
548553
wg := &sync.WaitGroup{}
549554
wg.Add(len(stores))
555+
// If getting the cluster-level minimum resolved timestamp from PD failed or returned 0,
556+
// try to get it from TiKV.
550557
for _, store := range stores {
551558
storeID := store.StoreID()
552559
storeAddr := store.GetAddr()
553560
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
554561
defer wg.Done()
555562

556-
var (
557-
safeTS uint64
558-
err error
563+
resp, err := tikvClient.SendRequest(
564+
ctx, storeAddr, tikvrpc.NewRequest(
565+
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
566+
KeyRange: &kvrpcpb.KeyRange{
567+
StartKey: []byte(""),
568+
EndKey: []byte(""),
569+
},
570+
}, kvrpcpb.Context{
571+
RequestSource: util.RequestSourceFromCtx(ctx),
572+
},
573+
), client.ReadTimeoutShort,
559574
)
560575
storeIDStr := strconv.Itoa(int(storeID))
561-
// Try to get the minimum resolved timestamp of the store from PD.
562-
if s.pdHttpClient != nil {
563-
safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
564-
if err != nil {
565-
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Uint64("store-id", storeID))
566-
}
567-
}
568-
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
569-
if safeTS == 0 || err != nil {
570-
resp, err := tikvClient.SendRequest(
571-
ctx, storeAddr, tikvrpc.NewRequest(
572-
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
573-
KeyRange: &kvrpcpb.KeyRange{
574-
StartKey: []byte(""),
575-
EndKey: []byte(""),
576-
},
577-
}, kvrpcpb.Context{
578-
RequestSource: util.RequestSourceFromCtx(ctx),
579-
},
580-
), client.ReadTimeoutShort,
581-
)
582-
if err != nil {
583-
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
584-
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
585-
return
586-
}
587-
safeTS = resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
576+
if err != nil {
577+
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
578+
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
579+
return
588580
}
581+
safeTS := resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
589582

590583
_, preSafeTS := s.getSafeTS(storeID)
591584
if preSafeTS > safeTS {
@@ -615,6 +608,40 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
615608
wg.Wait()
616609
}
617610

611+
var (
612+
skipClusterSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster")
613+
successClusterSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster")
614+
clusterMinSafeTSGap = metrics.TiKVMinSafeTSGapSeconds.WithLabelValues("cluster")
615+
)
616+
617+
// setClusterMinSafeTSByPD try to get cluster-level's min resolved timestamp from PD when @@txn_scope is `global`.
618+
func (s *KVStore) setClusterMinSafeTSByPD(ctx context.Context) bool {
619+
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
620+
// Try to get the minimum resolved timestamp of the cluster from PD.
621+
if s.pdHttpClient != nil && isGlobal {
622+
clusterMinSafeTS, err := s.pdHttpClient.GetClusterMinResolvedTS(ctx)
623+
if err != nil {
624+
logutil.BgLogger().Debug("get cluster-level min resolved timestamp from PD failed", zap.Error(err))
625+
} else if clusterMinSafeTS != 0 {
626+
// Update metrics.
627+
preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope)
628+
if preClusterMinSafeTS > clusterMinSafeTS {
629+
skipClusterSafeTSUpdateCounter.Inc()
630+
preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS)
631+
clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds())
632+
} else {
633+
s.minSafeTS.Store(oracle.GlobalTxnScope, clusterMinSafeTS)
634+
successClusterSafeTSUpdateCounter.Inc()
635+
safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS)
636+
clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds())
637+
}
638+
return true
639+
}
640+
}
641+
642+
return false
643+
}
644+
618645
// Variables defines the variables used by TiKV storage.
619646
type Variables = kv.Variables
620647

util/pd.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ const (
5656
// pd request retry time when connection fail.
5757
pdRequestRetryTime = 10
5858

59-
storeMinResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
59+
minResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
6060
)
6161

6262
// PDHTTPClient is an HTTP client of pd.
@@ -86,18 +86,17 @@ func NewPDHTTPClient(
8686
}
8787
}
8888

89-
// GetStoreMinResolvedTS get store-level min-resolved-ts from pd.
90-
func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) {
89+
// GetClusterMinResolvedTS get cluster-level min-resolved-ts from pd.
90+
func (p *PDHTTPClient) GetClusterMinResolvedTS(ctx context.Context) (uint64, error) {
9191
var err error
9292
for _, addr := range p.addrs {
93-
query := fmt.Sprintf("%s/%d", storeMinResolvedTSPrefix, storeID)
94-
v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil)
93+
v, e := pdRequest(ctx, addr, minResolvedTSPrefix, p.cli, http.MethodGet, nil)
9594
if e != nil {
9695
logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
9796
err = e
9897
continue
9998
}
100-
logutil.BgLogger().Debug("store min resolved ts", zap.String("resp", string(v)))
99+
logutil.BgLogger().Debug("get cluster min resolved ts", zap.String("resp", string(v)))
101100
d := struct {
102101
IsRealTime bool `json:"is_real_time,omitempty"`
103102
MinResolvedTS uint64 `json:"min_resolved_ts"`
@@ -107,7 +106,7 @@ func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64
107106
return 0, errors.Trace(err)
108107
}
109108
if !d.IsRealTime {
110-
message := fmt.Errorf("store min resolved ts not enabled, addr: %s", addr)
109+
message := fmt.Errorf("cluster min resolved ts not enabled, addr: %s", addr)
111110
logutil.BgLogger().Debug(message.Error())
112111
return 0, errors.Trace(message)
113112
}

0 commit comments

Comments
 (0)