Skip to content
Merged
2 changes: 1 addition & 1 deletion go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) {
}

// GetLoadTabletsTrigger is not implemented.
func (fhc *FakeHealthCheck) GetLoadTabletsTrigger() chan struct{} {
func (fhc *FakeHealthCheck) GetLoadTabletsTrigger() chan topo.KeyspaceShard {
return nil
}

Expand Down
23 changes: 13 additions & 10 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ type HealthCheck interface {
Unsubscribe(c chan *TabletHealth)

// GetLoadTabletsTrigger returns a channel that is used to inform when to load tablets.
GetLoadTabletsTrigger() chan struct{}
GetLoadTabletsTrigger() chan topo.KeyspaceShard
}

var _ HealthCheck = (*HealthCheckImpl)(nil)
Expand Down Expand Up @@ -297,8 +297,8 @@ type HealthCheckImpl struct {
subMu sync.Mutex
// subscribers
subscribers map[chan *TabletHealth]struct{}
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
loadTabletsTrigger chan struct{}
// loadTabletsTrigger is used to immediately load information about tablets of a specific shard.
loadTabletsTrigger chan topo.KeyspaceShard
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
Expand Down Expand Up @@ -363,7 +363,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
subscribers: make(map[chan *TabletHealth]struct{}),
cellAliases: make(map[string]string),
loadTabletsTrigger: make(chan struct{}, 1),
loadTabletsTrigger: make(chan topo.KeyspaceShard, 1024),
}
var topoWatchers []*TopologyWatcher
cells := strings.Split(cellsToWatch, ",")
Expand Down Expand Up @@ -535,18 +535,21 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
}

// If the previous tablet type was primary, we need to check if the next new primary has already been assigned.
// If no new primary has been assigned, we will trigger a `loadTablets` call to immediately redirect traffic to the new primary.
// If no new primary has been assigned, we will trigger loading of tablets for this keyspace shard to immediately redirect traffic to the new primary.
//
// This is to avoid a situation where a newly primary tablet for a shard has just been started and the tableRefreshInterval has not yet passed,
// causing an interruption where no primary is assigned to the shard.
if prevTarget.TabletType == topodata.TabletType_PRIMARY {
if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 {
log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet))
// We want to trigger a loadTablets call, but if the channel is not empty
// then a trigger is already scheduled, we don't need to trigger another one.
// This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
// We want to trigger a call to load tablets for this keyspace-shard,
// but we want this to be non-blocking to prevent the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
// If the buffer is exhausted, then we'll just receive the update when all the tablets are loaded on the ticker.
select {
case hc.loadTabletsTrigger <- struct{}{}:
case hc.loadTabletsTrigger <- topo.KeyspaceShard{
Keyspace: prevTarget.Keyspace,
Shard: prevTarget.Shard,
}:
default:
}
}
Expand Down Expand Up @@ -662,7 +665,7 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
}

// GetLoadTabletsTrigger returns a channel that is used to inform when to load tablets.
func (hc *HealthCheckImpl) GetLoadTabletsTrigger() chan struct{} {
func (hc *HealthCheckImpl) GetLoadTabletsTrigger() chan topo.KeyspaceShard {
return hc.loadTabletsTrigger
}

Expand Down
62 changes: 62 additions & 0 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,68 @@ func TestPrimaryInOtherCell(t *testing.T) {
mustMatch(t, want, a[0], "Expecting healthy primary")
}

// TestLoadTabletsTrigger tests that we send the correct information on the load tablets trigger.
func TestLoadTabletsTrigger(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// create a health check instance.
hc := NewHealthCheck(ctx, time.Hour, time.Hour, nil, "", "", nil)
defer hc.Close()

ks := "keyspace"
shard := "shard"
// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone-1",
Uid: 100,
},
Type: topodatapb.TabletType_REPLICA,
Hostname: "host1",
PortMap: map[string]int32{
"grpc": 123,
},
Keyspace: ks,
Shard: shard,
}

// We want to run updateHealth with arguments that always
// make it trigger load Tablets.
th := &TabletHealth{
Tablet: tablet1,
Target: &querypb.Target{
Keyspace: ks,
Shard: shard,
TabletType: topodatapb.TabletType_REPLICA,
},
}
prevTarget := &querypb.Target{
Keyspace: ks,
Shard: shard,
TabletType: topodatapb.TabletType_PRIMARY,
}
hc.AddTablet(tablet1)

numTriggers := 10
for i := 0; i < numTriggers; i++ {
// Since the previous target was a primary, and there are no other
// primary tablets for the given keyspace shard, we will see the healtcheck
// send on the loadTablets trigger. We just want to verify the information
// there is correct.
hc.updateHealth(th, prevTarget, false, false)
}

ch := hc.GetLoadTabletsTrigger()
require.Len(t, ch, numTriggers)
for i := 0; i < numTriggers; i++ {
// Read from the channel and verify we indeed have the right values.
kss := <-ch
require.EqualValues(t, ks, kss.Keyspace)
require.EqualValues(t, shard, kss.Shard)
}
require.Len(t, ch, 0)
}

func TestReplicaInOtherCell(t *testing.T) {
ctx := utils.LeakCheckContext(t)

Expand Down
46 changes: 41 additions & 5 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,39 @@ func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil)
}

func (tw *TopologyWatcher) getTabletsByShard(keyspace string, shard string) ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByShardCell(tw.ctx, keyspace, shard, []string{tw.cell})
}

// Start starts the topology watcher.
func (tw *TopologyWatcher) Start() {
tw.wg.Add(1)
// Goroutine to refresh the tablets list periodically.
go func(t *TopologyWatcher) {
defer t.wg.Done()
ticker := time.NewTicker(t.refreshInterval)
defer ticker.Stop()
t.loadTablets()
for {
t.loadTablets()
select {
case <-t.ctx.Done():
return
case <-tw.healthcheck.GetLoadTabletsTrigger():
case kss := <-t.healthcheck.GetLoadTabletsTrigger():
t.loadTabletsForKeyspaceShard(kss.Keyspace, kss.Shard)
case <-ticker.C:
// Since we are going to load all the tablets,
// we can clear out the entire list for reloading
// specific keyspace shards.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the review! 💕

func() {
for {
select {
case <-t.healthcheck.GetLoadTabletsTrigger():
default:
return
}
}
}()
t.loadTablets()
}
}
}(tw)
Expand All @@ -136,10 +155,23 @@ func (tw *TopologyWatcher) Stop() {
tw.wg.Wait()
}

func (tw *TopologyWatcher) loadTabletsForKeyspaceShard(keyspace string, shard string) {
if keyspace == "" || shard == "" {
log.Errorf("topologyWatcher: loadTabletsForKeyspaceShard: keyspace and shard are required")
return
}
tabletInfos, err := tw.getTabletsByShard(keyspace, shard)
if err != nil {
log.Errorf("error getting tablets for keyspace-shard: %v:%v: %v", keyspace, shard, err)
return
}
// Since we are only reading tablets for a keyspace shard,
// this is by default a partial result.
tw.storeTabletInfos(tabletInfos /* partialResults */, true)
}

func (tw *TopologyWatcher) loadTablets() {
newTablets := make(map[string]*tabletInfo)
var partialResult bool

// First get the list of all tablets.
tabletInfos, err := tw.getTablets()
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
Expand All @@ -155,6 +187,11 @@ func (tw *TopologyWatcher) loadTablets() {
}
}

tw.storeTabletInfos(tabletInfos, partialResult)
}

func (tw *TopologyWatcher) storeTabletInfos(tabletInfos []*topo.TabletInfo, partialResult bool) {
newTablets := make(map[string]*tabletInfo)
// Accumulate a list of all known alias strings to use later
// when sorting.
tabletAliasStrs := make([]string, 0, len(tabletInfos))
Expand Down Expand Up @@ -243,7 +280,6 @@ func (tw *TopologyWatcher) loadTablets() {
}
tw.topoChecksum = crc32.ChecksumIEEE(buf.Bytes())
tw.lastRefresh = time.Now()

}

// RefreshLag returns the time since the last refresh.
Expand Down
65 changes: 65 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,71 @@ func TestFilterByKeyspace(t *testing.T) {
}
}

// TestLoadTablets tests that loadTablets works as intended for the given inputs.
func TestLoadTablets(t *testing.T) {
ctx := utils.LeakCheckContext(t)

hc := NewFakeHealthCheck(nil)
f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)}
ts := memorytopo.NewServer(ctx, testCell)
defer ts.Close()
tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true)

// Add 2 tablets from 2 different tracked keyspaces to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: testCell,
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": 123,
},
Keyspace: "ks1",
Shard: "shard",
}
tablet2 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: testCell,
Uid: 10,
},
Hostname: "host2",
PortMap: map[string]int32{
"vt": 124,
},
Keyspace: "ks4",
Shard: "shard",
}
for _, ks := range testKeyspacesToWatch {
_, err := ts.GetOrCreateShard(ctx, ks, "shard")
require.NoError(t, err)
}
require.NoError(t, ts.CreateTablet(ctx, tablet1))
require.NoError(t, ts.CreateTablet(ctx, tablet2))

// Let's refresh the information for a different keyspace shard. We shouldn't
// reload either tablet's information.
tw.loadTabletsForKeyspaceShard("ks2", "shard")
key1 := TabletToMapKey(tablet1)
key2 := TabletToMapKey(tablet2)
allTablets := hc.GetAllTablets()
assert.NotContains(t, allTablets, key1)
assert.NotContains(t, allTablets, key2)

// Now, if we reload the first tablet's shard, we should see this tablet
// but not the other.
tw.loadTabletsForKeyspaceShard("ks1", "shard")
allTablets = hc.GetAllTablets()
assert.Contains(t, allTablets, key1)
assert.NotContains(t, allTablets, key2)

// Finally, if we load all the tablets, both the tablets should be visible.
tw.loadTablets()
allTablets = hc.GetAllTablets()
assert.Contains(t, allTablets, key1)
assert.Contains(t, allTablets, key2)
}

// TestFilterByKeyspaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher
// has a FilterByKeyspace TabletFilter configured along with refreshKnownTablets turned off. We want
// to ensure that the TopologyWatcher:
Expand Down
51 changes: 11 additions & 40 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/protoutil"
Expand Down Expand Up @@ -648,47 +646,20 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
defer span.Finish()
var err error

if len(cells) == 0 {
cells, err = ts.GetCellInfoNames(ctx)
if err != nil {
return nil, err
}
if len(cells) == 0 { // Nothing to do
return nil, nil
}
// if we get a partial result, we keep going. It most likely means
// a cell is out of commission.
aliases, err := ts.FindAllTabletAliasesInShardByCell(ctx, keyspace, shard, cells)
if err != nil && !IsErrType(err, PartialResult) {
return nil, err
}

mu := sync.Mutex{}
eg, ctx := errgroup.WithContext(ctx)

tablets := make([]*TabletInfo, 0, len(cells))
var kss *KeyspaceShard
if keyspace != "" {
kss = &KeyspaceShard{
Keyspace: keyspace,
Shard: shard,
}
}
options := &GetTabletsByCellOptions{
KeyspaceShard: kss,
}
for _, cell := range cells {
eg.Go(func() error {
t, err := ts.GetTabletsByCell(ctx, cell, options)
if err != nil {
return vterrors.Wrapf(err, "GetTabletsByCell for %v failed.", cell)
}
mu.Lock()
defer mu.Unlock()
tablets = append(tablets, t...)
return nil
})
}
if err := eg.Wait(); err != nil {
log.Warningf("GetTabletsByShardCell(%v,%v): got partial result: %v", keyspace, shard, err)
return tablets, NewError(PartialResult, shard)
// get the tablets for the cells we were able to reach, forward
// ErrPartialResult from FindAllTabletAliasesInShard
result, gerr := ts.GetTabletList(ctx, aliases, nil)
if gerr == nil && err != nil {
gerr = err
}
return tablets, nil
return result, gerr
}

// GetTabletMapForShard returns the tablets for a shard. It can return
Expand Down
Loading
Loading