Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

min-resolved-ts: check dc label #944

Merged
merged 6 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
Expand Down Expand Up @@ -103,9 +104,9 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
func (s *apiTestSuite) TestGetClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the store from PD.
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Expand Down Expand Up @@ -141,6 +142,53 @@ func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}

func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()

// Set DC label for store 1.
dcLabel := "testDC"
labels := []*metapb.StoreLabel{
{
Key: tikv.DCLabelKey,
Value: dcLabel,
},
}
s.store.GetRegionCache().SetRegionCacheStore(1, tikvrpc.TiKV, 1, labels)

// Try to get the minimum resolved timestamp of the store from TiKV.
retryCount = 0
for s.store.GetMinSafeTS(dcLabel) != 150 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}

require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
}

func (s *apiTestSuite) TearDownTest() {
if s.store != nil {
s.Require().Nil(s.store.Close())
Expand Down
47 changes: 30 additions & 17 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,27 +547,18 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
tikvClient := s.GetTiKVClient()
wg := &sync.WaitGroup{}
wg.Add(len(stores))
clusterTS, txnScopeMap, err := s.buildTxnScopeMap(ctx, stores)
for _, store := range stores {
storeID := store.StoreID()
storeAddr := store.GetAddr()
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()

var (
safeTS uint64
err error
)
storeIDStr := strconv.Itoa(int(storeID))
// Try to get the minimum resolved timestamp of the store from PD.
if s.pdHttpClient != nil {
safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Uint64("store-id", storeID))
}
}
safeTS := clusterTS
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if safeTS == 0 || err != nil {
resp, err := tikvClient.SendRequest(
resp, e := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
KeyRange: &kvrpcpb.KeyRange{
Expand All @@ -579,7 +570,7 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
},
), client.ReadTimeoutShort,
)
if err != nil {
if e != nil {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
Expand All @@ -601,18 +592,40 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
}(ctx, wg, storeID, storeAddr)
}

txnScopeMap := make(map[string][]uint64)
if clusterTS != 0 && err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its better improve the readable like:

clusterMinSafeTS, txnScopeMap, err := s.buildTxnScopeMap(ctx, stores)
// fast path get cluster scope MinSafeTs
if clusterMinSafeTS!= 0 && err == nil {
     s.minSafeTS.Store(oracle.GlobalTxnScope, clusterMinSafeTS)
     return
}

// Get SafeTS from TiKV by Store
for _, store := range stores {
...
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea! fixed.

s.minSafeTS.Store(oracle.GlobalTxnScope, clusterTS)
} else {
for txnScope, storeIDs := range txnScopeMap {
s.updateMinSafeTS(txnScope, storeIDs)
}
}
wg.Wait()
}

// build txnScopeMap and judge whether it is needed to get safeTS from PD.
// - if stores label are global, return get cluster min resolved ts from pd.
// - if contains dc label store, return try to get it from TiKV.
func (s *KVStore) buildTxnScopeMap(ctx context.Context, stores []*Store) (safeTS uint64, txnScopeMap map[string][]uint64, err error) {
isGlobal := true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not need judge from the tikv labels, I think should judge from the config like the usage in tidb:
https://github.com/pingcap/tidb/blob/88225787f3c3da18323415c72208bce20876009a/expression/builtin_time.go#L6617-L6632

Copy link
Member Author

@HuSharp HuSharp Aug 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

txnScopeMap = make(map[string][]uint64)
for _, store := range stores {
txnScopeMap[oracle.GlobalTxnScope] = append(txnScopeMap[oracle.GlobalTxnScope], store.StoreID())

if label, ok := store.GetLabelValue(DCLabelKey); ok {
txnScopeMap[label] = append(txnScopeMap[label], store.StoreID())
isGlobal = false
}
}
for txnScope, storeIDs := range txnScopeMap {
s.updateMinSafeTS(txnScope, storeIDs)

// Try to get the minimum resolved timestamp of the cluster from PD.
if s.pdHttpClient != nil && isGlobal {
safeTS, err = s.pdHttpClient.GetClusterMinResolvedTS(ctx)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err))
}
}
wg.Wait()

return safeTS, txnScopeMap, err
}

// Variables defines the variables used by TiKV storage.
Expand Down
7 changes: 3 additions & 4 deletions util/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,11 @@ func NewPDHTTPClient(
}
}

// GetStoreMinResolvedTS get store-level min-resolved-ts from pd.
func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) {
// GetClusterMinResolvedTS get cluster-level min-resolved-ts from pd.
func (p *PDHTTPClient) GetClusterMinResolvedTS(ctx context.Context) (uint64, error) {
var err error
for _, addr := range p.addrs {
query := fmt.Sprintf("%s/%d", storeMinResolvedTSPrefix, storeID)
v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil)
v, e := pdRequest(ctx, addr, storeMinResolvedTSPrefix, p.cli, http.MethodGet, nil)
if e != nil {
logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
err = e
Expand Down
Loading