Skip to content

Commit 6a19c69

Browse files
committed
check label
Signed-off-by: husharp <[email protected]>
1 parent 5968ce9 commit 6a19c69

File tree

3 files changed

+77
-21
lines changed

3 files changed

+77
-21
lines changed

integration_tests/pd_api_test.go

+50-2
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ import (
4444

4545
"github.com/pingcap/failpoint"
4646
"github.com/pingcap/kvproto/pkg/kvrpcpb"
47+
"github.com/pingcap/kvproto/pkg/metapb"
4748
"github.com/stretchr/testify/suite"
4849
"github.com/tikv/client-go/v2/oracle"
4950
"github.com/tikv/client-go/v2/tikv"
@@ -103,9 +104,9 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
103104
return c.Client.CloseAddr(addr)
104105
}
105106

106-
func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
107+
func (s *apiTestSuite) TestGetClusterMinResolvedTS() {
107108
util.EnableFailpoints()
108-
// Try to get the minimum resolved timestamp of the store from PD.
109+
// Try to get the minimum resolved timestamp of the cluster from PD.
109110
require := s.Require()
110111
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
111112
mockClient := storeSafeTsMockClient{
@@ -141,6 +142,53 @@ func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
141142
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
142143
}
143144

145+
func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
146+
util.EnableFailpoints()
147+
// Try to get the minimum resolved timestamp of the cluster from PD.
148+
require := s.Require()
149+
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
150+
mockClient := storeSafeTsMockClient{
151+
Client: s.store.GetTiKVClient(),
152+
}
153+
s.store.SetTiKVClient(&mockClient)
154+
var retryCount int
155+
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
156+
time.Sleep(2 * time.Second)
157+
if retryCount > 5 {
158+
break
159+
}
160+
retryCount++
161+
}
162+
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
163+
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
164+
defer func() {
165+
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
166+
}()
167+
168+
// Set DC label for store 1.
169+
dcLabel := "testDC"
170+
labels := []*metapb.StoreLabel{
171+
{
172+
Key: tikv.DCLabelKey,
173+
Value: dcLabel,
174+
},
175+
}
176+
s.store.GetRegionCache().SetRegionCacheStore(1, tikvrpc.TiKV, 1, labels)
177+
178+
// Try to get the minimum resolved timestamp of the store from TiKV.
179+
retryCount = 0
180+
for s.store.GetMinSafeTS(dcLabel) != 150 {
181+
time.Sleep(2 * time.Second)
182+
if retryCount > 5 {
183+
break
184+
}
185+
retryCount++
186+
}
187+
188+
require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
189+
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
190+
}
191+
144192
func (s *apiTestSuite) TearDownTest() {
145193
if s.store != nil {
146194
s.Require().Nil(s.store.Close())

tikv/kv.go

+24-15
Original file line numberDiff line numberDiff line change
@@ -547,24 +547,14 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
547547
tikvClient := s.GetTiKVClient()
548548
wg := &sync.WaitGroup{}
549549
wg.Add(len(stores))
550+
safeTS, txnScopeMap, err := s.buildTxnScopeMap(ctx)
550551
for _, store := range stores {
551552
storeID := store.StoreID()
552553
storeAddr := store.GetAddr()
553554
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
554555
defer wg.Done()
555556

556-
var (
557-
safeTS uint64
558-
err error
559-
)
560557
storeIDStr := strconv.Itoa(int(storeID))
561-
// Try to get the minimum resolved timestamp of the store from PD.
562-
if s.pdHttpClient != nil {
563-
safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
564-
if err != nil {
565-
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Uint64("store-id", storeID))
566-
}
567-
}
568558
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
569559
if safeTS == 0 || err != nil {
570560
resp, err := tikvClient.SendRequest(
@@ -601,18 +591,37 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
601591
}(ctx, wg, storeID, storeAddr)
602592
}
603593

604-
txnScopeMap := make(map[string][]uint64)
594+
for txnScope, storeIDs := range txnScopeMap {
595+
s.updateMinSafeTS(txnScope, storeIDs)
596+
}
597+
wg.Wait()
598+
}
599+
600+
// build txnScopeMap and judge whether it is needed to get safeTS from PD.
601+
// - if stores label are global, return get cluster min resolved ts from pd.
602+
// - if contains dc label store, return try to get it from TiKV.
603+
func (s *KVStore) buildTxnScopeMap(ctx context.Context) (safeTS uint64, txnScopeMap map[string][]uint64, err error) {
604+
isGlobal := true
605+
txnScopeMap = make(map[string][]uint64)
606+
stores := s.regionCache.GetStoresByType(tikvrpc.TiKV)
605607
for _, store := range stores {
606608
txnScopeMap[oracle.GlobalTxnScope] = append(txnScopeMap[oracle.GlobalTxnScope], store.StoreID())
607609

608610
if label, ok := store.GetLabelValue(DCLabelKey); ok {
609611
txnScopeMap[label] = append(txnScopeMap[label], store.StoreID())
612+
isGlobal = false
610613
}
611614
}
612-
for txnScope, storeIDs := range txnScopeMap {
613-
s.updateMinSafeTS(txnScope, storeIDs)
615+
616+
// Try to get the minimum resolved timestamp of the cluster from PD.
617+
if s.pdHttpClient != nil && isGlobal {
618+
safeTS, err = s.pdHttpClient.GetClusterMinResolvedTS(ctx)
619+
if err != nil {
620+
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err))
621+
}
614622
}
615-
wg.Wait()
623+
624+
return safeTS, txnScopeMap, err
616625
}
617626

618627
// Variables defines the variables used by TiKV storage.

util/pd.go

+3-4
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,11 @@ func NewPDHTTPClient(
8686
}
8787
}
8888

89-
// GetStoreMinResolvedTS get store-level min-resolved-ts from pd.
90-
func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) {
89+
// GetClusterMinResolvedTS get cluster-level min-resolved-ts from pd.
90+
func (p *PDHTTPClient) GetClusterMinResolvedTS(ctx context.Context) (uint64, error) {
9191
var err error
9292
for _, addr := range p.addrs {
93-
query := fmt.Sprintf("%s/%d", storeMinResolvedTSPrefix, storeID)
94-
v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil)
93+
v, e := pdRequest(ctx, addr, storeMinResolvedTSPrefix, p.cli, http.MethodGet, nil)
9594
if e != nil {
9695
logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
9796
err = e

0 commit comments

Comments
 (0)