Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion go/stats/counters.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,22 @@ func (c *counters) Add(name string, value int64) {
atomic.AddInt64(a, value)
}

// ResetAll resets all counter values.
// ResetAll resets all counter values and clears all keys.
func (c *counters) ResetAll() {
c.mu.Lock()
defer c.mu.Unlock()
c.counts = make(map[string]*int64)
}

// ZeroAll resets all counter values to zero
func (c *counters) ZeroAll() {
c.mu.Lock()
defer c.mu.Unlock()
for _, a := range c.counts {
atomic.StoreInt64(a, int64(0))
}
}

// Reset resets a specific counter value to 0.
func (c *counters) Reset(name string) {
a := c.getValueAddr(name)
Expand Down
106 changes: 72 additions & 34 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,16 +65,16 @@ type TabletRecorder interface {

// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
func NewCellTabletsWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) {
func NewCellTabletsWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell)
})
}

// NewShardReplicationWatcher returns a TopologyWatcher that
// monitors the tablets in a cell/keyspace/shard, and starts refreshing.
func NewShardReplicationWatcher(topoServer *topo.Server, tr TabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) {
return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, true /* refreshKnownTablets */, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) {
sri, err := tw.topoServer.GetShardReplication(tw.ctx, tw.cell, keyspace, shard)
switch err {
case nil:
Expand All @@ -97,6 +97,7 @@ func NewShardReplicationWatcher(topoServer *topo.Server, tr TabletRecorder, cell
// tabletInfo is used internally by the TopologyWatcher class
type tabletInfo struct {
alias string
key string
tablet *topodatapb.Tablet
}

Expand All @@ -105,14 +106,15 @@ type tabletInfo struct {
// the TabletRecorder AddTablet / RemoveTablet interface appropriately.
type TopologyWatcher struct {
// set at construction time
topoServer *topo.Server
tr TabletRecorder
cell string
refreshInterval time.Duration
getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)
sem chan int
ctx context.Context
cancelFunc context.CancelFunc
topoServer *topo.Server
tr TabletRecorder
cell string
refreshInterval time.Duration
refreshKnownTablets bool
getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)
sem chan int
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
wg sync.WaitGroup

Expand All @@ -127,15 +129,16 @@ type TopologyWatcher struct {

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
func NewTopologyWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)) *TopologyWatcher {
func NewTopologyWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
tr: tr,
cell: cell,
refreshInterval: refreshInterval,
getTablets: getTablets,
sem: make(chan int, topoReadConcurrency),
tablets: make(map[string]*tabletInfo),
topoServer: topoServer,
tr: tr,
cell: cell,
refreshInterval: refreshInterval,
refreshKnownTablets: refreshKnownTablets,
getTablets: getTablets,
sem: make(chan int, topoReadConcurrency),
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})
tw.ctx, tw.cancelFunc = context.WithCancel(context.Background())
Expand Down Expand Up @@ -163,7 +166,9 @@ func (tw *TopologyWatcher) watch() {
func (tw *TopologyWatcher) loadTablets() {
var wg sync.WaitGroup
newTablets := make(map[string]*tabletInfo)
tabletAlias, err := tw.getTablets(tw)
replacedTablets := make(map[string]*tabletInfo)

tabletAliases, err := tw.getTablets(tw)
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1)
Expand All @@ -175,7 +180,17 @@ func (tw *TopologyWatcher) loadTablets() {
log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err)
return
}
for _, tAlias := range tabletAlias {

tw.mu.Lock()
for _, tAlias := range tabletAliases {
if !tw.refreshKnownTablets {
aliasStr := topoproto.TabletAliasString(tAlias)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you decided to not use the alias as the key instead. But healthcheck is still using TabletToMapKey. How do the two coordinate correctly?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The TopologyWatcher now uses the alias as the key for the internal tablets map and all the temporary data structures.

When it calls into healthcheck to add/remove/replace the tablet, it passes the full tablet record. At that point HC recomputes its own hash key from the address map. I think we could (and probably should) switch that to store tablets keyed by the alias as well, but it's not necessary as part of this change.

if val, ok := tw.tablets[aliasStr]; ok {
newTablets[aliasStr] = val
continue
}
}

wg.Add(1)
go func(alias *topodatapb.TabletAlias) {
defer wg.Done()
Expand All @@ -193,32 +208,55 @@ func (tw *TopologyWatcher) loadTablets() {
log.Errorf("cannot get tablet for alias %v: %v", alias, err)
return
}
key := TabletToMapKey(tablet.Tablet)
tw.mu.Lock()
newTablets[key] = &tabletInfo{
alias: topoproto.TabletAliasString(alias),
aliasStr := topoproto.TabletAliasString(alias)
newTablets[aliasStr] = &tabletInfo{
alias: aliasStr,
key: TabletToMapKey(tablet.Tablet),
tablet: tablet.Tablet,
}
tw.mu.Unlock()
}(tAlias)
}

tw.mu.Unlock()
wg.Wait()
tw.mu.Lock()
for key, tep := range newTablets {
if val, ok := tw.tablets[key]; !ok {
tw.tr.AddTablet(tep.tablet, tep.alias)
topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1)

} else if val.alias != tep.alias {
tw.tr.ReplaceTablet(val.tablet, tep.tablet, tep.alias)
for alias, newVal := range newTablets {
if val, ok := tw.tablets[alias]; !ok {
// Check if there's a tablet with the same address key but a
// different alias. If so, replace it and keep track of the
// replaced alias to make sure it isn't removed later.
found := false
for _, otherVal := range tw.tablets {
if newVal.key == otherVal.key {
found = true
tw.tr.ReplaceTablet(otherVal.tablet, newVal.tablet, alias)
topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1)
replacedTablets[otherVal.alias] = newVal
}
}
if !found {
tw.tr.AddTablet(newVal.tablet, alias)
topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1)
}

} else if val.key != newVal.key {
// Handle the case where the same tablet alias is now reporting
// a different address key.
replacedTablets[alias] = newVal
tw.tr.ReplaceTablet(val.tablet, newVal.tablet, alias)
topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1)
}
}
for key, tep := range tw.tablets {
if _, ok := newTablets[key]; !ok {
tw.tr.RemoveTablet(tep.tablet)
topologyWatcherOperations.Add(topologyWatcherOpRemoveTablet, 1)

for _, val := range tw.tablets {
if _, ok := newTablets[val.alias]; !ok {
if _, ok2 := replacedTablets[val.alias]; !ok2 {
tw.tr.RemoveTablet(val.tablet)
topologyWatcherOperations.Add(topologyWatcherOpRemoveTablet, 1)
}
}
}
tw.tablets = newTablets
Expand Down
Loading