Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,30 @@ func (hc *HealthCheckImpl) WaitForAllServingTablets(ctx context.Context, targets
return hc.waitForTablets(ctx, targets, true)
}

// FilterTargetsByKeyspaces only returns the targets that are part of the provided keyspaces
func FilterTargetsByKeyspaces(keyspaces []string, targets []*query.Target) []*query.Target {
filteredTargets := make([]*query.Target, 0)

// Keep them all if there are no keyspaces to watch
if len(KeyspacesToWatch) == 0 {
return append(filteredTargets, targets...)
}

// Let's remove from the target shards that are not in the keyspaceToWatch list.
for _, target := range targets {
for _, keyspaceToWatch := range keyspaces {
if target.Keyspace == keyspaceToWatch {
filteredTargets = append(filteredTargets, target)
}
}
}
return filteredTargets
}

// waitForTablets is the internal method that polls for tablets.
func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.Target, requireServing bool) error {
targets = FilterTargetsByKeyspaces(KeyspacesToWatch, targets)

for {
// We nil targets as we find them.
allPresent := true
Expand Down Expand Up @@ -648,6 +670,11 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.
select {
case <-ctx.Done():
timer.Stop()
for _, target := range targets {
if target != nil {
log.Infof("couldn't find tablets for target: %v", target)
}
}
return ctx.Err()
case <-timer.C:
}
Expand Down
89 changes: 89 additions & 0 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"vitess.io/vitess/go/vt/vttablet/queryservice"
"vitess.io/vitess/go/vt/vttablet/tabletconn"

"vitess.io/vitess/go/vt/proto/query"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
Expand Down Expand Up @@ -425,6 +426,94 @@ func TestHealthCheckTimeout(t *testing.T) {
mustMatch(t, want, result, "Wrong TabletHealth data")
}

func TestWaitForAllServingTablets(t *testing.T) {
ts := memorytopo.NewServer("cell")
hc := createTestHc(ts)
defer hc.Close()
tablet := createTestTablet(0, "cell", "a")
tablet.Type = topodatapb.TabletType_REPLICA
targets := []*query.Target{
{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
},
}
input := make(chan *querypb.StreamHealthResponse)
createFakeConn(tablet, input)

// create a channel and subscribe to healthcheck
resultChan := hc.Subscribe()
hc.AddTablet(tablet)
// there will be a first result, get and discard it
<-resultChan
// empty
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

err := hc.WaitForAllServingTablets(ctx, targets)
assert.NotNil(t, err, "error should not be nil")

shr := &querypb.StreamHealthResponse{
TabletAlias: tablet.Alias,
Target: &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA},
Serving: true,
TabletExternallyReparentedTimestamp: 0,
RealtimeStats: &querypb.RealtimeStats{SecondsBehindMaster: 1, CpuUsage: 0.2},
}

input <- shr
<-resultChan
// // check it's there

targets = []*query.Target{
{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
},
}

err = hc.WaitForAllServingTablets(ctx, targets)
assert.Nil(t, err, "error should be nil. Targets are found")

targets = []*query.Target{
{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
},
{
Keyspace: "newkeyspace",
Shard: tablet.Shard,
TabletType: tablet.Type,
},
}

err = hc.WaitForAllServingTablets(ctx, targets)
assert.NotNil(t, err, "error should not be nil (there are no tablets on this keyspace")

targets = []*query.Target{
{
Keyspace: tablet.Keyspace,
Shard: tablet.Shard,
TabletType: tablet.Type,
},
{
Keyspace: "newkeyspace",
Shard: tablet.Shard,
TabletType: tablet.Type,
},
}

KeyspacesToWatch = []string{tablet.Keyspace}

err = hc.WaitForAllServingTablets(ctx, targets)
assert.Nil(t, err, "error should be nil. Keyspace with no tablets is filtered")

KeyspacesToWatch = []string{}
}

// TestGetHealthyTablets tests the functionality of GetHealthyTabletStats.
func TestGetHealthyTablets(t *testing.T) {
ts := memorytopo.NewServer("cell")
Expand Down
6 changes: 6 additions & 0 deletions go/vt/discovery/legacy_tablet_stats_cache_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"golang.org/x/net/context"

"vitess.io/vitess/go/vt/log"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
Expand Down Expand Up @@ -84,6 +85,11 @@ func (tc *LegacyTabletStatsCache) waitForTablets(ctx context.Context, targets []
timer := time.NewTimer(waitAvailableTabletInterval)
select {
case <-ctx.Done():
for _, target := range targets {
if target != nil {
log.Infof("couldn't find tablets for target: %v", target)
}
}
timer.Stop()
return ctx.Err()
case <-timer.C:
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtgate/discoverygateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ func (dg *DiscoveryGateway) WaitForTablets(ctx context.Context, tabletTypesToWai
return err
}

return dg.tsc.WaitForAllServingTablets(ctx, targets)
filteredTargets := discovery.FilterTargetsByKeyspaces(discovery.KeyspacesToWatch, targets)
return dg.tsc.WaitForAllServingTablets(ctx, filteredTargets)
}

// Close shuts down underlying connections.
Expand Down
78 changes: 78 additions & 0 deletions go/vt/vtgate/discoverygateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package vtgate
import (
"fmt"
"testing"
"time"

"vitess.io/vitess/go/vt/log"

Expand All @@ -32,6 +33,7 @@ import (
"vitess.io/vitess/go/vt/topotools"

querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)
Expand Down Expand Up @@ -132,6 +134,82 @@ func TestDiscoveryGatewayGetTablets(t *testing.T) {
}
}

func TestDiscoveryGatewayWaitForTablets(t *testing.T) {
keyspace := "ks"
shard := "0"
cell := "local"
hc := discovery.NewFakeLegacyHealthCheck()
ts := memorytopo.NewServer("local")
srvTopo := srvtopotest.NewPassthroughSrvTopoServer()
srvTopo.TopoServer = ts
srvTopo.SrvKeyspaceNames = []string{keyspace}
srvTopo.SrvKeyspace = &topodatapb.SrvKeyspace{
Partitions: []*topodatapb.SrvKeyspace_KeyspacePartition{
{
ServedType: topodata.TabletType_MASTER,
ShardReferences: []*topodatapb.ShardReference{
{
Name: shard,
},
},
},
{
ServedType: topodata.TabletType_REPLICA,
ShardReferences: []*topodatapb.ShardReference{
{
Name: shard,
},
},
},
{
ServedType: topodata.TabletType_RDONLY,
ShardReferences: []*topodatapb.ShardReference{
{
Name: shard,
},
},
},
},
}

dg := NewDiscoveryGateway(context.Background(), hc, srvTopo, "local", 2)

// replica should only use local ones
hc.Reset()
dg.tsc.ResetForTesting()
hc.AddTestTablet(cell, "2.2.2.2", 1001, keyspace, shard, topodatapb.TabletType_REPLICA, true, 10, nil)
hc.AddTestTablet(cell, "1.1.1.1", 1001, keyspace, shard, topodatapb.TabletType_MASTER, true, 5, nil)
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
err := dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_REPLICA, topodatapb.TabletType_MASTER})
if err != nil {
t.Errorf("want %+v, got %+v", nil, err)
}

// fails if there are no available tablets for the desired TabletType
err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_RDONLY})
if err == nil {
t.Errorf("expected error, got nil")
}

// errors because there is no primary on ks2
ctx, _ = context.WithTimeout(context.Background(), 1*time.Second)
srvTopo.SrvKeyspaceNames = []string{keyspace, "ks2"}
err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_MASTER})
if err == nil {
t.Errorf("expected error, got nil")
}

discovery.KeyspacesToWatch = []string{keyspace}
// does not wait for ks2 if it's not part of the filter
ctx, _ = context.WithTimeout(context.Background(), 1*time.Second)
err = dg.WaitForTablets(ctx, []topodatapb.TabletType{topodatapb.TabletType_MASTER})
if err != nil {
t.Errorf("want %+v, got %+v", nil, err)
}

discovery.KeyspacesToWatch = []string{}
}

func TestShuffleTablets(t *testing.T) {
ts1 := discovery.LegacyTabletStats{
Key: "t1",
Expand Down
8 changes: 8 additions & 0 deletions go/vt/vttablet/tabletconn/tablet_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package tabletconn

import (
"flag"
"sync"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
Expand Down Expand Up @@ -50,9 +51,14 @@ type TabletDialer func(tablet *topodatapb.Tablet, failFast grpcclient.FailFast)

var dialers = make(map[string]TabletDialer)

// mu This mutex helps us prevent data races when registering / getting dialers
var mu sync.Mutex

// RegisterDialer is meant to be used by TabletDialer implementations
// to self register.
func RegisterDialer(name string, dialer TabletDialer) {
mu.Lock()
defer mu.Unlock()
if _, ok := dialers[name]; ok {
log.Fatalf("Dialer %s already exists", name)
}
Expand All @@ -61,6 +67,8 @@ func RegisterDialer(name string, dialer TabletDialer) {

// GetDialer returns the dialer to use, described by the command line flag
func GetDialer() TabletDialer {
mu.Lock()
defer mu.Unlock()
td, ok := dialers[*TabletProtocol]
if !ok {
log.Exitf("No dialer registered for tablet protocol %s", *TabletProtocol)
Expand Down