Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,11 +313,16 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
// It does not block on making connection.
// name is an optional tag for the tablet, e.g. an alternative address.
func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
log.Infof("Calling AddTablet for tablet: %v", tablet)
// check whether we should really add this tablet
if !hc.isIncluded(tablet) {
return
}
// check whether grpc port is present on tablet, if not return
if tablet.PortMap["grpc"] == 0 {
return
}

log.Infof("Adding tablet to healthcheck: %v", tablet)
hc.mu.Lock()
defer hc.mu.Unlock()
if hc.healthByAlias == nil {
Expand Down Expand Up @@ -371,11 +376,12 @@ func (hc *HealthCheckImpl) RemoveTablet(tablet *topodata.Tablet) {

// ReplaceTablet removes the old tablet and adds the new tablet.
func (hc *HealthCheckImpl) ReplaceTablet(old, new *topodata.Tablet) {
hc.deleteTablet(old)
hc.RemoveTablet(old)
hc.AddTablet(new)
}

func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) {
log.Infof("Removing tablet from healthcheck: %v", tablet)
hc.mu.Lock()
defer hc.mu.Unlock()

Expand Down
78 changes: 42 additions & 36 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ func TestHealthCheck(t *testing.T) {
hc := createTestHc(ts)
// close healthcheck
defer hc.Close()
tablet := topo.NewTablet(0, "cell", "a")
tablet.Keyspace = "k"
tablet.Shard = "s"
tablet.PortMap["vt"] = 1
tablet := createTestTablet(0, "cell", "a")
tablet.Type = topodatapb.TabletType_REPLICA
input := make(chan *querypb.StreamHealthResponse)
conn := createFakeConn(tablet, input)
Expand Down Expand Up @@ -203,8 +200,7 @@ func TestHealthCheckStreamError(t *testing.T) {
hc := createTestHc(ts)
defer hc.Close()

tablet := topo.NewTablet(0, "cell", "a")
tablet.PortMap["vt"] = 1
tablet := createTestTablet(0, "cell", "a")
input := make(chan *querypb.StreamHealthResponse)
resultChan := hc.Subscribe()
fc := createFakeConn(tablet, input)
Expand All @@ -214,7 +210,7 @@ func TestHealthCheckStreamError(t *testing.T) {
// Immediately after AddTablet() there will be the first notification.
want := &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{},
Target: &querypb.Target{Keyspace: "k", Shard: "s"},
Serving: false,
MasterTermStartTime: 0,
}
Expand Down Expand Up @@ -260,8 +256,7 @@ func TestHealthCheckVerifiesTabletAlias(t *testing.T) {
hc := createTestHc(ts)
defer hc.Close()

tablet := topo.NewTablet(1, "cell", "a")
tablet.PortMap["vt"] = 1
tablet := createTestTablet(0, "cell", "a")
input := make(chan *querypb.StreamHealthResponse, 1)
fc := createFakeConn(tablet, input)
resultChan := hc.Subscribe()
Expand All @@ -271,7 +266,7 @@ func TestHealthCheckVerifiesTabletAlias(t *testing.T) {
// Immediately after AddTablet() there will be the first notification.
want := &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{},
Target: &querypb.Target{Keyspace: "k", Shard: "s"},
Serving: false,
MasterTermStartTime: 0,
}
Expand Down Expand Up @@ -302,8 +297,7 @@ func TestHealthCheckVerifiesTabletAlias(t *testing.T) {
func TestHealthCheckCloseWaitsForGoRoutines(t *testing.T) {
ts := memorytopo.NewServer("cell")
hc := createTestHc(ts)
tablet := topo.NewTablet(0, "cell", "a")
tablet.PortMap["vt"] = 1
tablet := createTestTablet(0, "cell", "a")
input := make(chan *querypb.StreamHealthResponse, 1)
createFakeConn(tablet, input)
resultChan := hc.Subscribe()
Expand All @@ -313,7 +307,7 @@ func TestHealthCheckCloseWaitsForGoRoutines(t *testing.T) {
// Immediately after AddTablet() there will be the first notification.
want := &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{},
Target: &querypb.Target{Keyspace: "k", Shard: "s"},
Serving: false,
MasterTermStartTime: 0,
}
Expand Down Expand Up @@ -364,17 +358,15 @@ func TestHealthCheckTimeout(t *testing.T) {
hc := createTestHc(ts)
hc.healthCheckTimeout = 500 * time.Millisecond
defer hc.Close()

tablet := topo.NewTablet(0, "cell", "a")
tablet.PortMap["vt"] = 1
tablet := createTestTablet(0, "cell", "a")
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
resultChan := hc.Subscribe()
hc.AddTablet(tablet)
// Immediately after AddTablet() there will be the first notification.
want := &TabletHealth{
Tablet: tablet,
Target: &querypb.Target{},
Target: &querypb.Target{Keyspace: "k", Shard: "s"},
Serving: false,
MasterTermStartTime: 0,
}
Expand Down Expand Up @@ -432,10 +424,7 @@ func TestGetHealthyTablets(t *testing.T) {
ts := memorytopo.NewServer("cell")
hc := createTestHc(ts)
defer hc.Close()
tablet := topo.NewTablet(0, "cell", "a")
tablet.Keyspace = "k"
tablet.Shard = "s"
tablet.PortMap["vt"] = 1
tablet := createTestTablet(0, "cell", "a")
tablet.Type = topodatapb.TabletType_REPLICA
input := make(chan *querypb.StreamHealthResponse)
createFakeConn(tablet, input)
Expand Down Expand Up @@ -507,16 +496,11 @@ func TestGetHealthyTablets(t *testing.T) {
mustMatch(t, want, a, "unexpected result")

// add a second tablet
tablet2 := topo.NewTablet(11, "cell", "host2")
tablet2.Keyspace = "k"
tablet2.Shard = "s"
tablet2.PortMap["vt"] = 2
tablet2 := createTestTablet(11, "cell", "host2")
tablet2.Type = topodatapb.TabletType_REPLICA
input2 := make(chan *querypb.StreamHealthResponse)
createFakeConn(tablet2, input2)
t.Logf(`createFakeConn({Host: "a", PortMap: {"vt": 2}}, c)`)
hc.AddTablet(tablet2)
t.Logf(`hc = HealthCheck(); hc.AddTablet({Host: "a", PortMap: {"vt": 1}}, "")`)
// there will be a first result, get and discard it
<-resultChan

Expand Down Expand Up @@ -637,10 +621,7 @@ func TestAliases(t *testing.T) {
defer ts.DeleteCellsAlias(context.Background(), "region2")

// add a tablet as replica in diff cell, same region
tablet := topo.NewTablet(1, "cell1", "host3")
tablet.Keyspace = "k"
tablet.Shard = "s"
tablet.PortMap["vt"] = 1
tablet := createTestTablet(1, "cell1", "host3")
tablet.Type = topodatapb.TabletType_REPLICA
input := make(chan *querypb.StreamHealthResponse)
fc := createFakeConn(tablet, input)
Expand Down Expand Up @@ -688,10 +669,7 @@ func TestAliases(t *testing.T) {
mustMatch(t, want, a, "Wrong TabletHealth data")

// add another tablet in a diff cell, diff region
tablet2 := topo.NewTablet(2, "cell2", "host4")
tablet2.Keyspace = "k"
tablet2.Shard = "s"
tablet2.PortMap["vt"] = 2
tablet2 := createTestTablet(2, "cell2", "host4")
tablet2.Type = topodatapb.TabletType_REPLICA
input2 := make(chan *querypb.StreamHealthResponse)
fc = createFakeConn(tablet2, input2)
Expand All @@ -711,6 +689,26 @@ func TestAliases(t *testing.T) {
mustMatch(t, want, a, "Wrong TabletHealth data")
}

func TestHealthCheckChecksGrpcPort(t *testing.T) {
ts := memorytopo.NewServer("cell")
hc := createTestHc(ts)
defer hc.Close()

tablet := createTestTablet(0, "cell", "a")
tablet.PortMap["grpc"] = 0
resultChan := hc.Subscribe()

// AddTablet should not add the tablet because port is 0
hc.AddTablet(tablet)

select {
case result := <-resultChan:
assert.Nil(t, result, "healthCheck received result: %v", result)
case <-time.After(2 * time.Millisecond):
// No response after timeout. Success.
}
}

func TestTemplate(t *testing.T) {
tablet := topo.NewTablet(0, "cell", "a")
ts := []*TabletHealth{
Expand All @@ -736,7 +734,6 @@ func TestTemplate(t *testing.T) {
}

func TestDebugURLFormatting(t *testing.T) {

//log error
if err2 := flag.Set("tablet_url_template", "https://{{.GetHostNameLevel 0}}.bastion.{{.Tablet.Alias.Cell}}.corp"); err2 != nil {
log.Errorf("flag.Set(\"tablet_url_template\", \"https://{{.GetHostNameLevel 0}}.bastion.{{.Tablet.Alias.Cell}}.corp\") failed : %v", err2)
Expand Down Expand Up @@ -873,6 +870,15 @@ func createFixedHealthConn(tablet *topodatapb.Tablet, fixedResult *querypb.Strea
return conn
}

func createTestTablet(uid uint32, cell, host string) *topodatapb.Tablet {
tablet := topo.NewTablet(uid, cell, host)
tablet.PortMap["vt"] = 1
tablet.PortMap["grpc"] = 2
tablet.Keyspace = "k"
tablet.Shard = "s"
return tablet
}

var mustMatch = utils.MustMatchFn(
[]interface{}{ // types with unexported fields
TabletHealth{},
Expand Down
3 changes: 2 additions & 1 deletion go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,9 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
if !thc.loggedServingState || (serving != thc.Serving) {
// Emit the log from a separate goroutine to avoid holding
// the th lock while logging is happening
go log.Infof("HealthCheckUpdate(Serving State): tablet: %v serving => %v for %v/%v (%v) reason: %s",
log.Infof("HealthCheckUpdate(Serving State): tablet: %v serving %v => %v for %v/%v (%v) reason: %s",
topotools.TabletIdent(thc.Tablet),
thc.Serving,
serving,
thc.Tablet.GetKeyspace(),
thc.Tablet.GetShard(),
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vtgate/vschema_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (vm *VSchemaManager) watchSrvVSchema(ctx context.Context, cell string) {
// we don't know the real value. In this case, we want
// to use the previous value if it was set, or an
// empty vschema if it wasn't.
log.Infof("Received vschema update")
switch {
case err == nil:
// Good case, we can try to save that value.
Expand All @@ -79,6 +80,7 @@ func (vm *VSchemaManager) watchSrvVSchema(ctx context.Context, cell string) {
// Otherwise, keep what we already had before.
v = nil
default:
log.Errorf("SrvVschema watch error: %v", err)
// Watch error, increment our counters.
if vschemaCounters != nil {
vschemaCounters.Add("WatchError", 1)
Expand Down