Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

min-resolved-ts: check dc label #944

Merged
merged 6 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions integration_tests/pd_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/config"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikv"
"github.com/tikv/client-go/v2/tikvrpc"
Expand Down Expand Up @@ -103,9 +105,9 @@ func (c *storeSafeTsMockClient) CloseAddr(addr string) error {
return c.Client.CloseAddr(addr)
}

func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
func (s *apiTestSuite) TestGetClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the store from PD.
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Expand Down Expand Up @@ -141,6 +143,58 @@ func (s *apiTestSuite) TestGetStoreMinResolvedTS() {
require.Equal(uint64(150), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
}

func (s *apiTestSuite) TestDCLabelClusterMinResolvedTS() {
util.EnableFailpoints()
// Try to get the minimum resolved timestamp of the cluster from PD.
require := s.Require()
require.Nil(failpoint.Enable("tikvclient/InjectMinResolvedTS", `return(100)`))
mockClient := storeSafeTsMockClient{
Client: s.store.GetTiKVClient(),
}
s.store.SetTiKVClient(&mockClient)
var retryCount int
for s.store.GetMinSafeTS(oracle.GlobalTxnScope) != 100 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}
require.Equal(atomic.LoadInt32(&mockClient.requestCount), int32(0))
require.Equal(uint64(100), s.store.GetMinSafeTS(oracle.GlobalTxnScope))
defer func() {
s.Require().Nil(failpoint.Disable("tikvclient/InjectMinResolvedTS"))
}()

// Set DC label for store 1.
dcLabel := "testDC"
restore := config.UpdateGlobal(func(conf *config.Config) {
conf.TxnScope = dcLabel
})
defer restore()

labels := []*metapb.StoreLabel{
{
Key: tikv.DCLabelKey,
Value: dcLabel,
},
}
s.store.GetRegionCache().SetRegionCacheStore(1, tikvrpc.TiKV, 1, labels)

// Try to get the minimum resolved timestamp of the store from TiKV.
retryCount = 0
for s.store.GetMinSafeTS(dcLabel) != 150 {
time.Sleep(2 * time.Second)
if retryCount > 5 {
break
}
retryCount++
}

require.GreaterOrEqual(atomic.LoadInt32(&mockClient.requestCount), int32(1))
require.Equal(uint64(150), s.store.GetMinSafeTS(dcLabel))
}

func (s *apiTestSuite) TearDownTest() {
if s.store != nil {
s.Require().Nil(s.store.Close())
Expand Down
2 changes: 1 addition & 1 deletion internal/locate/region_request_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func TestRegionCacheStaleRead(t *testing.T) {
originReloadRegionInterval := atomic.LoadInt64(&reloadRegionInterval)
originBoTiKVServerBusy := retry.BoTiKVServerBusy
defer func() {
reloadRegionInterval = originReloadRegionInterval
atomic.StoreInt64(&reloadRegionInterval, originReloadRegionInterval)
retry.BoTiKVServerBusy = originBoTiKVServerBusy
}()
atomic.StoreInt64(&reloadRegionInterval, int64(24*time.Hour)) // disable reload region
Expand Down
87 changes: 57 additions & 30 deletions tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,49 +543,42 @@ func (s *KVStore) safeTSUpdater() {
}

func (s *KVStore) updateSafeTS(ctx context.Context) {
// Try to get the cluster-level minimum resolved timestamp from PD first.
if s.setClusterMinSafeTSByPD(ctx) {
return
}

stores := s.regionCache.GetStoresByType(tikvrpc.TiKV)
tikvClient := s.GetTiKVClient()
wg := &sync.WaitGroup{}
wg.Add(len(stores))
// If getting the cluster-level minimum resolved timestamp from PD failed or returned 0,
// try to get it from TiKV.
for _, store := range stores {
storeID := store.StoreID()
storeAddr := store.GetAddr()
go func(ctx context.Context, wg *sync.WaitGroup, storeID uint64, storeAddr string) {
defer wg.Done()

var (
safeTS uint64
err error
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
KeyRange: &kvrpcpb.KeyRange{
StartKey: []byte(""),
EndKey: []byte(""),
},
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(ctx),
},
), client.ReadTimeoutShort,
)
storeIDStr := strconv.Itoa(int(storeID))
// Try to get the minimum resolved timestamp of the store from PD.
if s.pdHttpClient != nil {
safeTS, err = s.pdHttpClient.GetStoreMinResolvedTS(ctx, storeID)
if err != nil {
logutil.BgLogger().Debug("get resolved TS from PD failed", zap.Error(err), zap.Uint64("store-id", storeID))
}
}
// If getting the minimum resolved timestamp from PD failed or returned 0, try to get it from TiKV.
if safeTS == 0 || err != nil {
resp, err := tikvClient.SendRequest(
ctx, storeAddr, tikvrpc.NewRequest(
tikvrpc.CmdStoreSafeTS, &kvrpcpb.StoreSafeTSRequest{
KeyRange: &kvrpcpb.KeyRange{
StartKey: []byte(""),
EndKey: []byte(""),
},
}, kvrpcpb.Context{
RequestSource: util.RequestSourceFromCtx(ctx),
},
), client.ReadTimeoutShort,
)
if err != nil {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
}
safeTS = resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()
if err != nil {
metrics.TiKVSafeTSUpdateCounter.WithLabelValues("fail", storeIDStr).Inc()
logutil.BgLogger().Debug("update safeTS failed", zap.Error(err), zap.Uint64("store-id", storeID))
return
}
safeTS := resp.Resp.(*kvrpcpb.StoreSafeTSResponse).GetSafeTs()

_, preSafeTS := s.getSafeTS(storeID)
if preSafeTS > safeTS {
Expand Down Expand Up @@ -615,6 +608,40 @@ func (s *KVStore) updateSafeTS(ctx context.Context) {
wg.Wait()
}

var (
skipClusterSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("skip", "cluster")
successClusterSafeTSUpdateCounter = metrics.TiKVSafeTSUpdateCounter.WithLabelValues("success", "cluster")
clusterMinSafeTSGap = metrics.TiKVMinSafeTSGapSeconds.WithLabelValues("cluster")
)

// setClusterMinSafeTSByPD try to get cluster-level's min resolved timestamp from PD when @@txn_scope is `global`.
func (s *KVStore) setClusterMinSafeTSByPD(ctx context.Context) bool {
isGlobal := config.GetTxnScopeFromConfig() == oracle.GlobalTxnScope
// Try to get the minimum resolved timestamp of the cluster from PD.
if s.pdHttpClient != nil && isGlobal {
clusterMinSafeTS, err := s.pdHttpClient.GetClusterMinResolvedTS(ctx)
if err != nil {
logutil.BgLogger().Debug("get cluster-level min resolved timestamp from PD failed", zap.Error(err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can warn it, it is called every 2 secs.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For clusters upgraded from v6.0.0~v6.2.0 will meet this error, does it make sense?
https://docs.pingcap.com/tidb/stable/pd-configuration-file#min-resolved-ts-persistence-interval-new-in-v600

} else if clusterMinSafeTS != 0 {
// Update metrics.
preClusterMinSafeTS := s.GetMinSafeTS(oracle.GlobalTxnScope)
if preClusterMinSafeTS > clusterMinSafeTS {
skipClusterSafeTSUpdateCounter.Inc()
preSafeTSTime := oracle.GetTimeFromTS(preClusterMinSafeTS)
clusterMinSafeTSGap.Set(time.Since(preSafeTSTime).Seconds())
} else {
s.minSafeTS.Store(oracle.GlobalTxnScope, clusterMinSafeTS)
successClusterSafeTSUpdateCounter.Inc()
safeTSTime := oracle.GetTimeFromTS(clusterMinSafeTS)
clusterMinSafeTSGap.Set(time.Since(safeTSTime).Seconds())
}
return true
}
}

return false
}

// Variables defines the variables used by TiKV storage.
type Variables = kv.Variables

Expand Down
13 changes: 6 additions & 7 deletions util/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (
// pd request retry time when connection fail.
pdRequestRetryTime = 10

storeMinResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
minResolvedTSPrefix = "pd/api/v1/min-resolved-ts"
)

// PDHTTPClient is an HTTP client of pd.
Expand Down Expand Up @@ -86,18 +86,17 @@ func NewPDHTTPClient(
}
}

// GetStoreMinResolvedTS get store-level min-resolved-ts from pd.
func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64) (uint64, error) {
// GetClusterMinResolvedTS get cluster-level min-resolved-ts from pd.
func (p *PDHTTPClient) GetClusterMinResolvedTS(ctx context.Context) (uint64, error) {
var err error
for _, addr := range p.addrs {
query := fmt.Sprintf("%s/%d", storeMinResolvedTSPrefix, storeID)
v, e := pdRequest(ctx, addr, query, p.cli, http.MethodGet, nil)
v, e := pdRequest(ctx, addr, minResolvedTSPrefix, p.cli, http.MethodGet, nil)
if e != nil {
logutil.BgLogger().Debug("failed to get min resolved ts", zap.String("addr", addr), zap.Error(e))
err = e
continue
}
logutil.BgLogger().Debug("store min resolved ts", zap.String("resp", string(v)))
logutil.BgLogger().Debug("get cluster min resolved ts", zap.String("resp", string(v)))
d := struct {
IsRealTime bool `json:"is_real_time,omitempty"`
MinResolvedTS uint64 `json:"min_resolved_ts"`
Expand All @@ -107,7 +106,7 @@ func (p *PDHTTPClient) GetStoreMinResolvedTS(ctx context.Context, storeID uint64
return 0, errors.Trace(err)
}
if !d.IsRealTime {
message := fmt.Errorf("store min resolved ts not enabled, addr: %s", addr)
message := fmt.Errorf("cluster min resolved ts not enabled, addr: %s", addr)
logutil.BgLogger().Debug(message.Error())
return 0, errors.Trace(message)
}
Expand Down
Loading