Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) {
}

// GetLoadTabletsTrigger is not implemented.
func (fhc *FakeHealthCheck) GetLoadTabletsTrigger() chan struct{} {
func (fhc *FakeHealthCheck) GetLoadTabletsTrigger() chan topo.KeyspaceShard {
return nil
}

Expand Down
23 changes: 13 additions & 10 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ type HealthCheck interface {
Unsubscribe(c chan *TabletHealth)

// GetLoadTabletsTrigger returns a channel that is used to inform when to load tablets.
GetLoadTabletsTrigger() chan struct{}
GetLoadTabletsTrigger() chan topo.KeyspaceShard
}

var _ HealthCheck = (*HealthCheckImpl)(nil)
Expand Down Expand Up @@ -302,8 +302,8 @@ type HealthCheckImpl struct {
subMu sync.Mutex
// subscribers
subscribers map[chan *TabletHealth]struct{}
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
loadTabletsTrigger chan struct{}
// loadTabletsTrigger is used to immediately load information about tablets of a specific shard.
loadTabletsTrigger chan topo.KeyspaceShard
// healthCheckDialSem is used to limit how many healthcheck connections can be opened to tablets at once.
healthCheckDialSem *semaphore.Weighted
}
Expand Down Expand Up @@ -371,7 +371,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
subscribers: make(map[chan *TabletHealth]struct{}),
cellAliases: make(map[string]string),
loadTabletsTrigger: make(chan struct{}, 1),
loadTabletsTrigger: make(chan topo.KeyspaceShard, 1024),
}
var topoWatchers []*TopologyWatcher
cells := strings.Split(cellsToWatch, ",")
Expand Down Expand Up @@ -543,18 +543,21 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
}

// If the previous tablet type was primary, we need to check if the next new primary has already been assigned.
// If no new primary has been assigned, we will trigger a `loadTablets` call to immediately redirect traffic to the new primary.
// If no new primary has been assigned, we will trigger loading of tablets for this keyspace shard to immediately redirect traffic to the new primary.
//
// This is to avoid a situation where a newly primary tablet for a shard has just been started and the tableRefreshInterval has not yet passed,
// causing an interruption where no primary is assigned to the shard.
if prevTarget.TabletType == topodata.TabletType_PRIMARY {
if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 {
log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet))
// We want to trigger a loadTablets call, but if the channel is not empty
// then a trigger is already scheduled, we don't need to trigger another one.
// This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
// We want to trigger a call to load tablets for this keyspace-shard,
// but we want this to be non-blocking to prevent the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
// If the buffer is exhausted, then we'll just receive the update when all the tablets are loaded on the ticker.
select {
case hc.loadTabletsTrigger <- struct{}{}:
case hc.loadTabletsTrigger <- topo.KeyspaceShard{
Keyspace: prevTarget.Keyspace,
Shard: prevTarget.Shard,
}:
default:
}
}
Expand Down Expand Up @@ -670,7 +673,7 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
}

// GetLoadTabletsTrigger returns a channel that is used to inform when to load tablets.
func (hc *HealthCheckImpl) GetLoadTabletsTrigger() chan struct{} {
func (hc *HealthCheckImpl) GetLoadTabletsTrigger() chan topo.KeyspaceShard {
return hc.loadTabletsTrigger
}

Expand Down
62 changes: 62 additions & 0 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1227,6 +1227,68 @@ func TestPrimaryInOtherCell(t *testing.T) {
mustMatch(t, want, a[0], "Expecting healthy primary")
}

// TestLoadTabletsTrigger tests that we send the correct information on the load tablets trigger.
func TestLoadTabletsTrigger(t *testing.T) {
ctx := utils.LeakCheckContext(t)

// create a health check instance.
hc := NewHealthCheck(ctx, time.Hour, time.Hour, nil, "", "", nil)
defer hc.Close()

ks := "keyspace"
shard := "shard"
// Add a tablet to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: "zone-1",
Uid: 100,
},
Type: topodatapb.TabletType_REPLICA,
Hostname: "host1",
PortMap: map[string]int32{
"grpc": 123,
},
Keyspace: ks,
Shard: shard,
}

// We want to run updateHealth with arguments that always
// make it trigger load Tablets.
th := &TabletHealth{
Tablet: tablet1,
Target: &querypb.Target{
Keyspace: ks,
Shard: shard,
TabletType: topodatapb.TabletType_REPLICA,
},
}
prevTarget := &querypb.Target{
Keyspace: ks,
Shard: shard,
TabletType: topodatapb.TabletType_PRIMARY,
}
hc.AddTablet(tablet1)

numTriggers := 10
for i := 0; i < numTriggers; i++ {
// Since the previous target was a primary, and there are no other
// primary tablets for the given keyspace shard, we will see the healtcheck
// send on the loadTablets trigger. We just want to verify the information
// there is correct.
hc.updateHealth(th, prevTarget, false, false)
}

ch := hc.GetLoadTabletsTrigger()
require.Len(t, ch, numTriggers)
for i := 0; i < numTriggers; i++ {
// Read from the channel and verify we indeed have the right values.
kss := <-ch
require.EqualValues(t, ks, kss.Keyspace)
require.EqualValues(t, shard, kss.Shard)
}
require.Len(t, ch, 0)
}

func TestReplicaInOtherCell(t *testing.T) {
ctx := utils.LeakCheckContext(t)

Expand Down
46 changes: 41 additions & 5 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,39 @@ func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil)
}

func (tw *TopologyWatcher) getTabletsByShard(keyspace string, shard string) ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByShardCell(tw.ctx, keyspace, shard, []string{tw.cell})
}

// Start starts the topology watcher.
func (tw *TopologyWatcher) Start() {
tw.wg.Add(1)
// Goroutine to refresh the tablets list periodically.
go func(t *TopologyWatcher) {
defer t.wg.Done()
ticker := time.NewTicker(t.refreshInterval)
defer ticker.Stop()
t.loadTablets()
for {
t.loadTablets()
select {
case <-t.ctx.Done():
return
case <-tw.healthcheck.GetLoadTabletsTrigger():
case kss := <-t.healthcheck.GetLoadTabletsTrigger():
t.loadTabletsForKeyspaceShard(kss.Keyspace, kss.Shard)
case <-ticker.C:
// Since we are going to load all the tablets,
// we can clear out the entire list for reloading
// specific keyspace shards.
func() {
for {
select {
case <-t.healthcheck.GetLoadTabletsTrigger():
default:
return
}
}
}()
t.loadTablets()
}
}
}(tw)
Expand All @@ -136,10 +155,23 @@ func (tw *TopologyWatcher) Stop() {
tw.wg.Wait()
}

func (tw *TopologyWatcher) loadTabletsForKeyspaceShard(keyspace string, shard string) {
if keyspace == "" || shard == "" {
log.Errorf("topologyWatcher: loadTabletsForKeyspaceShard: keyspace and shard are required")
return
}
tabletInfos, err := tw.getTabletsByShard(keyspace, shard)
if err != nil {
log.Errorf("error getting tablets for keyspace-shard: %v:%v: %v", keyspace, shard, err)
return
}
// Since we are only reading tablets for a keyspace shard,
// this is by default a partial result.
tw.storeTabletInfos(tabletInfos /* partialResults */, true)
}

func (tw *TopologyWatcher) loadTablets() {
newTablets := make(map[string]*tabletInfo)
var partialResult bool

// First get the list of all tablets.
tabletInfos, err := tw.getTablets()
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
Expand All @@ -155,6 +187,11 @@ func (tw *TopologyWatcher) loadTablets() {
}
}

tw.storeTabletInfos(tabletInfos, partialResult)
}

func (tw *TopologyWatcher) storeTabletInfos(tabletInfos []*topo.TabletInfo, partialResult bool) {
newTablets := make(map[string]*tabletInfo)
// Accumulate a list of all known alias strings to use later
// when sorting.
tabletAliasStrs := make([]string, 0, len(tabletInfos))
Expand Down Expand Up @@ -243,7 +280,6 @@ func (tw *TopologyWatcher) loadTablets() {
}
tw.topoChecksum = crc32.ChecksumIEEE(buf.Bytes())
tw.lastRefresh = time.Now()

}

// RefreshLag returns the time since the last refresh.
Expand Down
65 changes: 65 additions & 0 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,71 @@ func TestFilterByKeyspace(t *testing.T) {
}
}

// TestLoadTablets tests that loadTablets works as intended for the given inputs.
func TestLoadTablets(t *testing.T) {
ctx := utils.LeakCheckContext(t)

hc := NewFakeHealthCheck(nil)
f := TabletFilters{NewFilterByKeyspace(testKeyspacesToWatch)}
ts := memorytopo.NewServer(ctx, testCell)
defer ts.Close()
tw := NewTopologyWatcher(context.Background(), ts, hc, f, testCell, 10*time.Minute, true)

// Add 2 tablets from 2 different tracked keyspaces to the topology.
tablet1 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: testCell,
Uid: 0,
},
Hostname: "host1",
PortMap: map[string]int32{
"vt": 123,
},
Keyspace: "ks1",
Shard: "shard",
}
tablet2 := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: testCell,
Uid: 10,
},
Hostname: "host2",
PortMap: map[string]int32{
"vt": 124,
},
Keyspace: "ks4",
Shard: "shard",
}
for _, ks := range testKeyspacesToWatch {
_, err := ts.GetOrCreateShard(ctx, ks, "shard")
require.NoError(t, err)
}
require.NoError(t, ts.CreateTablet(ctx, tablet1))
require.NoError(t, ts.CreateTablet(ctx, tablet2))

// Let's refresh the information for a different keyspace shard. We shouldn't
// reload either tablet's information.
tw.loadTabletsForKeyspaceShard("ks2", "shard")
key1 := TabletToMapKey(tablet1)
key2 := TabletToMapKey(tablet2)
allTablets := hc.GetAllTablets()
assert.NotContains(t, allTablets, key1)
assert.NotContains(t, allTablets, key2)

// Now, if we reload the first tablet's shard, we should see this tablet
// but not the other.
tw.loadTabletsForKeyspaceShard("ks1", "shard")
allTablets = hc.GetAllTablets()
assert.Contains(t, allTablets, key1)
assert.NotContains(t, allTablets, key2)

// Finally, if we load all the tablets, both the tablets should be visible.
tw.loadTablets()
allTablets = hc.GetAllTablets()
assert.Contains(t, allTablets, key1)
assert.Contains(t, allTablets, key2)
}

// TestFilterByKeyspaceSkipsIgnoredTablets confirms a bug fix for the case when a TopologyWatcher
// has a FilterByKeyspace TabletFilter configured along with refreshKnownTablets turned off. We want
// to ensure that the TopologyWatcher:
Expand Down
51 changes: 11 additions & 40 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ import (
"sync"
"time"

"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/protoutil"
Expand Down Expand Up @@ -661,47 +659,20 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
defer span.Finish()
var err error

if len(cells) == 0 {
cells, err = ts.GetCellInfoNames(ctx)
if err != nil {
return nil, err
}
if len(cells) == 0 { // Nothing to do
return nil, nil
}
// if we get a partial result, we keep going. It most likely means
// a cell is out of commission.
aliases, err := ts.FindAllTabletAliasesInShardByCell(ctx, keyspace, shard, cells)
if err != nil && !IsErrType(err, PartialResult) {
return nil, err
}

mu := sync.Mutex{}
eg, ctx := errgroup.WithContext(ctx)

tablets := make([]*TabletInfo, 0, len(cells))
var kss *KeyspaceShard
if keyspace != "" {
kss = &KeyspaceShard{
Keyspace: keyspace,
Shard: shard,
}
}
options := &GetTabletsByCellOptions{
KeyspaceShard: kss,
}
for _, cell := range cells {
eg.Go(func() error {
t, err := ts.GetTabletsByCell(ctx, cell, options)
if err != nil {
return vterrors.Wrapf(err, "GetTabletsByCell for %v failed.", cell)
}
mu.Lock()
defer mu.Unlock()
tablets = append(tablets, t...)
return nil
})
}
if err := eg.Wait(); err != nil {
log.Warningf("GetTabletsByShardCell(%v,%v): got partial result: %v", keyspace, shard, err)
return tablets, NewError(PartialResult, shard)
// get the tablets for the cells we were able to reach, forward
// ErrPartialResult from FindAllTabletAliasesInShard
result, gerr := ts.GetTabletList(ctx, aliases, nil)
if gerr == nil && err != nil {
gerr = err
}
return tablets, nil
return result, gerr
}

// GetTabletMapForShard returns the tablets for a shard. It can return
Expand Down
Loading
Loading