Skip to content

Commit

Permalink
add region cache state test & fix some issues of replica selector (#910)
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <[email protected]>
  • Loading branch information
you06 authored Aug 7, 2023
1 parent 7bb81ba commit 5968ce9
Show file tree
Hide file tree
Showing 5 changed files with 911 additions and 22 deletions.
4 changes: 3 additions & 1 deletion internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -452,10 +452,12 @@ func (c *RegionCache) Close() {
c.cancelFunc()
}

var reloadRegionInterval = int64(10 * time.Second)

// asyncCheckAndResolveLoop with
func (c *RegionCache) asyncCheckAndResolveLoop(interval time.Duration) {
ticker := time.NewTicker(interval)
reloadRegionTicker := time.NewTicker(10 * time.Second)
reloadRegionTicker := time.NewTicker(time.Duration(atomic.LoadInt64(&reloadRegionInterval)))
defer func() {
ticker.Stop()
reloadRegionTicker.Stop()
Expand Down
61 changes: 42 additions & 19 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,8 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
return rpcCtx, err
}
if state.fallbackFromLeader {
staleRead := false
rpcCtx.contextPatcher.staleRead = &staleRead
replicaRead := true
rpcCtx.contextPatcher.replicaRead = &replicaRead
}
Expand Down Expand Up @@ -562,6 +564,10 @@ type accessFollower struct {
lastIdx AccessIndex
}

// Follower read will try followers first, if no follower is available, it will fallback to leader.
// Specially, for stale read, it tries local peer(can be either leader or follower), then use snapshot read in the leader,
// if the leader read receive server-is-busy and connection errors, the region cache is still valid,
// and the state will be changed to tryFollower, which will read by replica read.
func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
resetStaleRead := false
if state.lastIdx < 0 {
Expand Down Expand Up @@ -609,14 +615,30 @@ func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
leaderInvalid := leader.isEpochStale() || state.IsLeaderExhausted(leader)
leaderEpochStale := leader.isEpochStale()
leaderInvalid := leaderEpochStale || state.IsLeaderExhausted(leader)
if len(state.option.labels) > 0 {
logutil.BgLogger().Warn("unable to find stores with given labels",
zap.Uint64("region", selector.region.GetID()),
zap.Bool("leader-invalid", leaderInvalid),
zap.Any("labels", state.option.labels))
}
if leaderInvalid {
// In stale-read, the request will fallback to leader after the local follower failure.
// If the leader is also unavailable, we can fallback to the follower and use replica-read flag again,
// The remote follower not tried yet, and the local follower can retry without stale-read flag.
if state.isStaleRead {
selector.state = &tryFollower{
fallbackFromLeader: true,
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
}
if leaderEpochStale {
selector.regionCache.scheduleReloadRegion(selector.region)
}
return nil, stateChanged{}
}
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
Expand Down Expand Up @@ -655,13 +677,17 @@ func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replic
}

func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
return !replica.isEpochStale() && !replica.isExhausted(1) &&
if replica.isEpochStale() || replica.isExhausted(1) || replica.store.getLivenessState() == unreachable {
return false
}
if state.option.leaderOnly && idx == state.leaderIdx {
// The request can only be sent to the leader.
((state.option.leaderOnly && idx == state.leaderIdx) ||
// Choose a replica with matched labels.
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels))) &&
// Make sure the replica is not unreachable.
replica.store.getLivenessState() != unreachable
return true
} else if !state.tryLeader && idx == state.leaderIdx {
// The request cannot be sent to leader.
return false
}
return replica.store.IsLabelsMatch(state.option.labels)
}

type invalidStore struct {
Expand Down Expand Up @@ -930,25 +956,21 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
s.region.invalidate(StoreNotFound)
}

// For some reason, the leader is unreachable by now, try followers instead.
func (s *replicaSelector) fallback2Follower(ctx *RPCContext) bool {
if ctx == nil || s == nil || s.state == nil {
// For some reasons, the leader is unreachable by now, try followers instead.
// the state is changed in accessFollower.next when leader is unavailable.
func (s *replicaSelector) canFallback2Follower() bool {
if s == nil || s.state == nil {
return false
}
state, ok := s.state.(*accessFollower)
if !ok {
return false
}
if state.lastIdx != state.leaderIdx {
if !state.isStaleRead {
return false
}
s.state = &tryFollower{
fallbackFromLeader: true,
leaderIdx: state.leaderIdx,
lastIdx: state.leaderIdx,
labels: state.option.labels,
}
return true
// can fallback to follower only when the leader is exhausted.
return state.lastIdx == state.leaderIdx && state.IsLeaderExhausted(s.replicas[state.leaderIdx])
}

func (s *replicaSelector) invalidateRegion() {
Expand Down Expand Up @@ -1680,6 +1702,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
}

// This peer is removed from the region. Invalidate the region since it's too stale.
// if the region error is from follower, can we mark the peer unavailable and reload region asynchronously?
if regionErr.GetRegionNotFound() != nil {
s.regionCache.InvalidateCachedRegion(ctx.Region)
return false, nil
Expand All @@ -1706,7 +1729,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
logutil.BgLogger().Warn("tikv reports `ServerIsBusy` retry later",
zap.String("reason", regionErr.GetServerIsBusy().GetReason()),
zap.Stringer("ctx", ctx))
if s.replicaSelector.fallback2Follower(ctx) {
if s.replicaSelector.canFallback2Follower() {
// immediately retry on followers.
return true, nil
}
Expand Down
Loading

0 comments on commit 5968ce9

Please sign in to comment.