Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 28 additions & 8 deletions go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ limitations under the License.
package discovery

import (
"context"
"sort"
"sync"

"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/queryservice"
Expand All @@ -30,7 +34,7 @@ import (
)

// This file contains the definitions for a FakeHealthCheck class to
// simulate a LegacyHealthCheck module. Note it is not in a sub-package because
// simulate a HealthCheck module. Note it is not in a sub-package because
// otherwise it couldn't be used in this package's tests because of
// circular dependencies.

Expand All @@ -41,7 +45,7 @@ func NewFakeHealthCheck() *FakeHealthCheck {
}
}

// FakeHealthCheck implements discovery.LegacyHealthCheck.
// FakeHealthCheck implements discovery.HealthCheck.
type FakeHealthCheck struct {
// mu protects the items map
mu sync.RWMutex
Expand All @@ -54,15 +58,30 @@ type fhcItem struct {
}

//
// discovery.LegacyHealthCheck interface methods
// discovery.HealthCheck interface methods
//

// RegisterStats is not implemented.
func (fhc *FakeHealthCheck) RegisterStats() {
}

// WaitForInitialStatsUpdates is not implemented.
func (fhc *FakeHealthCheck) WaitForInitialStatsUpdates() {
// WaitForAllServingTablets is not implemented.
func (fhc *FakeHealthCheck) WaitForAllServingTablets(ctx context.Context, targets []*querypb.Target) error {
return nil
}

// GetHealthyTabletStats is not implemented.
func (fhc *FakeHealthCheck) GetHealthyTabletStats(target *querypb.Target) []*TabletHealth {
return nil
}

// Subscribe is not implemented.
func (fhc *FakeHealthCheck) Subscribe() chan *TabletHealth {
return nil
}

// Unsubscribe is not implemented.
func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) {
}

// AddTablet adds the tablet.
Expand Down Expand Up @@ -112,13 +131,14 @@ func (fhc *FakeHealthCheck) ReplaceTablet(old, new *topodatapb.Tablet) {
}

// TabletConnection returns the TabletConn of the given tablet.
func (fhc *FakeHealthCheck) TabletConnection(key string) queryservice.QueryService {
func (fhc *FakeHealthCheck) TabletConnection(alias *topodatapb.TabletAlias) (queryservice.QueryService, error) {
key := topoproto.TabletAliasString(alias)
fhc.mu.RLock()
defer fhc.mu.RUnlock()
if item := fhc.items[key]; item != nil {
return item.conn
return item.conn, nil
}
return nil
return nil, vterrors.New(vtrpc.Code_NOT_FOUND, "tablet not found")
}

// CacheStatus returns the status for each tablet
Expand Down
45 changes: 40 additions & 5 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,6 @@ var (

//TODO(deepthi): change these vars back to unexported when discoveryGateway is removed

// CellsToWatch is the list of cells the healthcheck operates over. If it is empty, only the local cell is watched
CellsToWatch = flag.String("cells_to_watch", "", "comma-separated list of cells for watching tablets")
// AllowedTabletTypes is the list of allowed tablet types. e.g. {MASTER, REPLICA}
AllowedTabletTypes []topodata.TabletType
// TabletFilters are the keyspace|shard or keyrange filters to apply to the full set of tablets
Expand Down Expand Up @@ -164,6 +162,43 @@ type TabletRecorder interface {
type keyspaceShardTabletType string
type tabletAliasString string

//HealthCheck declares what the TabletGateway needs from the HealthCheck
type HealthCheck interface {
// CacheStatus returns a displayable version of the health check cache.
CacheStatus() TabletsCacheStatusList

// Close stops the healthcheck.
Close() error

// WaitForAllServingTablets waits for at least one healthy serving tablet in
// each given target before returning.
// It will return ctx.Err() if the context is canceled.
// It will return an error if it can't read the necessary topology records.
WaitForAllServingTablets(ctx context.Context, targets []*query.Target) error

// TabletConnection returns the TabletConn of the given tablet.
TabletConnection(alias *topodata.TabletAlias) (queryservice.QueryService, error)

// RegisterStats registers the connection counts stats
RegisterStats()

// GetHealthyTabletStats returns only the healthy tablets.
// The returned array is owned by the caller.
// For TabletType_MASTER, this will only return at most one entry,
// the most recent tablet of type master.
// This returns a copy of the data so that callers can access without
// synchronization
GetHealthyTabletStats(target *query.Target) []*TabletHealth

// Subscribe adds a listener. Used by vtgate buffer to learn about master changes.
Subscribe() chan *TabletHealth

// Unsubscribe removes a listener.
Unsubscribe(c chan *TabletHealth)
}

var _ HealthCheck = (*HealthCheckImpl)(nil)

// HealthCheckImpl performs health checking and stores the results.
// The goal of this object is to maintain a StreamHealth RPC
// to a lot of tablets. Tablets are added / removed by calling the
Expand Down Expand Up @@ -220,8 +255,8 @@ type HealthCheckImpl struct {
// The localCell for this healthcheck
// callback.
// A function to call when there is a master change. Used to notify vtgate's buffer to stop buffering.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell string) *HealthCheckImpl {
log.Infof("loading tablets for cells: %v", *CellsToWatch)
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string) *HealthCheckImpl {
log.Infof("loading tablets for cells: %v", cellsToWatch)

hc := &HealthCheckImpl{
ts: topoServer,
Expand All @@ -236,7 +271,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
}
var topoWatchers []*TopologyWatcher
var filter TabletFilter
cells := strings.Split(*CellsToWatch, ",")
cells := strings.Split(cellsToWatch, ",")
if len(cells) == 0 {
cells = append(cells, localCell)
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ func tabletDialer(tablet *topodatapb.Tablet, _ grpcclient.FailFast) (queryservic
}

func createTestHc(ts *topo.Server) *HealthCheckImpl {
return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell")
return NewHealthCheck(context.Background(), 1*time.Millisecond, time.Hour, ts, "cell", "")
}

type fakeConn struct {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func getItemPath(url string) string {
return parts[1]
}

func initAPI(hc HealthCheck) {
func initAPI(hc discovery.HealthCheck) {
// Healthcheck real time status per (cell, keyspace, tablet type, metric).
handleCollection("health-check", func(r *http.Request) (interface{}, error) {
cacheStatus := hc.CacheStatus()
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/discoverygateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewDiscoveryGateway(ctx context.Context, hc discovery.LegacyHealthCheck, se
// We set sendDownEvents=true because it's required by LegacyTabletStatsCache.
hc.SetListener(dg, true /* sendDownEvents */)

cells := *discovery.CellsToWatch
cells := *CellsToWatch
log.Infof("loading tablets for cells: %v", cells)
for _, c := range strings.Split(cells, ",") {
if c == "" {
Expand Down
Loading