@@ -173,8 +173,12 @@ func (kss *keyspaceState) beingResharded(currentShard string) bool {
173
173
}
174
174
175
175
type shardState struct {
176
- target * querypb.Target
177
- serving bool
176
+ target * querypb.Target
177
+ serving bool
178
+ // waitForReparent is used to tell the keyspace event watcher
179
+ // that this shard should be marked serving only after a reparent
180
+ // operation has succeeded.
181
+ waitForReparent bool
178
182
externallyReparented int64
179
183
currentPrimary * topodatapb.TabletAlias
180
184
}
@@ -357,8 +361,34 @@ func (kss *keyspaceState) onHealthCheck(th *TabletHealth) {
357
361
// if the shard went from serving to not serving, or the other way around, the keyspace
358
362
// is undergoing an availability event
359
363
if sstate .serving != th .Serving {
360
- sstate .serving = th .Serving
361
364
kss .consistent = false
365
+ switch {
366
+ case th .Serving && sstate .waitForReparent :
367
+ // While waiting for a reparent, if we receive a serving primary,
368
+ // we should check if the primary term start time is greater than the externally reparented time.
369
+ // We mark the shard serving only if it is. This is required so that we don't prematurely stop
370
+ // buffering for PRS, or TabletExternallyReparented, after seeing a serving healthcheck from the
371
+ // same old primary tablet that has already been turned read-only.
372
+ if th .PrimaryTermStartTime > sstate .externallyReparented {
373
+ sstate .waitForReparent = false
374
+ sstate .serving = true
375
+ }
376
+ case th .Serving && ! sstate .waitForReparent :
377
+ sstate .serving = true
378
+ case ! th .Serving :
379
+ sstate .serving = false
380
+ }
381
+ }
382
+ if ! th .Serving {
383
+ // Once we have seen a non-serving primary healthcheck, there is no need for us to explicitly wait
384
+ // for a reparent to happen. We use waitForReparent to ensure that we don't prematurely stop
385
+ // buffering when we receive a serving healthcheck from the primary that is being demoted.
386
+ // However, if we receive a non-serving check, then we know that we won't receive any more serving
387
+ // health checks until reparent finishes. Specifically, this helps us when PRS fails, but
388
+ // stops gracefully because the new candidate couldn't get caught up in time. In this case, we promote
389
+ // the previous primary back. Without turning off waitForReparent here, we wouldn't be able to stop
390
+ // buffering for that case.
391
+ sstate .waitForReparent = false
362
392
}
363
393
364
394
// if the primary for this shard has been externally reparented, we're undergoing a failover,
@@ -653,3 +683,36 @@ func (kew *KeyspaceEventWatcher) GetServingKeyspaces() []string {
653
683
}
654
684
return servingKeyspaces
655
685
}
686
+
687
+ // MarkShardNotServing marks the given shard not serving.
688
+ // We use this when we start buffering for a given shard. This helps
689
+ // coordinate between the sharding logic and the keyspace event watcher.
690
+ // We take in a boolean as well to tell us whether this error is because
691
+ // a reparent is ongoing. If it is, we also mark the shard to wait for a reparent.
692
+ // The return argument is whether the shard was found and marked not serving successfully or not.
693
+ func (kew * KeyspaceEventWatcher ) MarkShardNotServing (ctx context.Context , keyspace string , shard string , isReparentErr bool ) bool {
694
+ kss := kew .getKeyspaceStatus (ctx , keyspace )
695
+ if kss == nil {
696
+ // Only happens if the keyspace was deleted.
697
+ return false
698
+ }
699
+ kss .mu .Lock ()
700
+ defer kss .mu .Unlock ()
701
+ sstate := kss .shards [shard ]
702
+ if sstate == nil {
703
+ // This only happens if the shard is deleted, or if
704
+ // the keyspace event watcher hasn't seen the shard at all.
705
+ return false
706
+ }
707
+ // Mark the keyspace inconsistent and the shard not serving.
708
+ kss .consistent = false
709
+ sstate .serving = false
710
+ if isReparentErr {
711
+ // If the error was triggered because a reparent operation has started.
712
+ // We mark the shard to wait for a reparent to finish before marking it serving.
713
+ // This is required to prevent premature stopping of buffering if we receive
714
+ // a serving healthcheck from a primary that is being demoted.
715
+ sstate .waitForReparent = true
716
+ }
717
+ return true
718
+ }
0 commit comments