diff --git a/go/stats/counters.go b/go/stats/counters.go index bc22dd7a2ca..07e7880d581 100644 --- a/go/stats/counters.go +++ b/go/stats/counters.go @@ -88,13 +88,22 @@ func (c *counters) Add(name string, value int64) { atomic.AddInt64(a, value) } -// ResetAll resets all counter values. +// ResetAll resets all counter values and clears all keys. func (c *counters) ResetAll() { c.mu.Lock() defer c.mu.Unlock() c.counts = make(map[string]*int64) } +// ZeroAll resets all counter values to zero +func (c *counters) ZeroAll() { + c.mu.Lock() + defer c.mu.Unlock() + for _, a := range c.counts { + atomic.StoreInt64(a, int64(0)) + } +} + // Reset resets a specific counter value to 0. func (c *counters) Reset(name string) { a := c.getValueAddr(name) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index 020251d4d25..67ca271d5ed 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -65,8 +65,8 @@ type TabletRecorder interface { // NewCellTabletsWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and starts refreshing. -func NewCellTabletsWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, topoReadConcurrency int) *TopologyWatcher { - return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) { +func NewCellTabletsWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher { + return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) { return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell) }) } @@ -74,7 +74,7 @@ func NewCellTabletsWatcher(topoServer *topo.Server, tr TabletRecorder, cell stri // NewShardReplicationWatcher returns a TopologyWatcher that // monitors the tablets in a cell/keyspace/shard, and starts refreshing. func NewShardReplicationWatcher(topoServer *topo.Server, tr TabletRecorder, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) *TopologyWatcher { - return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) { + return NewTopologyWatcher(topoServer, tr, cell, refreshInterval, true /* refreshKnownTablets */, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) { sri, err := tw.topoServer.GetShardReplication(tw.ctx, tw.cell, keyspace, shard) switch err { case nil: @@ -97,6 +97,7 @@ func NewShardReplicationWatcher(topoServer *topo.Server, tr TabletRecorder, cell // tabletInfo is used internally by the TopologyWatcher class type tabletInfo struct { alias string + key string tablet *topodatapb.Tablet } @@ -105,14 +106,15 @@ type tabletInfo struct { // the TabletRecorder AddTablet / RemoveTablet interface appropriately. type TopologyWatcher struct { // set at construction time - topoServer *topo.Server - tr TabletRecorder - cell string - refreshInterval time.Duration - getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) - sem chan int - ctx context.Context - cancelFunc context.CancelFunc + topoServer *topo.Server + tr TabletRecorder + cell string + refreshInterval time.Duration + refreshKnownTablets bool + getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error) + sem chan int + ctx context.Context + cancelFunc context.CancelFunc // wg keeps track of all launched Go routines. wg sync.WaitGroup @@ -127,15 +129,16 @@ type TopologyWatcher struct { // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and starts refreshing. -func NewTopologyWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)) *TopologyWatcher { +func NewTopologyWatcher(topoServer *topo.Server, tr TabletRecorder, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodatapb.TabletAlias, error)) *TopologyWatcher { tw := &TopologyWatcher{ - topoServer: topoServer, - tr: tr, - cell: cell, - refreshInterval: refreshInterval, - getTablets: getTablets, - sem: make(chan int, topoReadConcurrency), - tablets: make(map[string]*tabletInfo), + topoServer: topoServer, + tr: tr, + cell: cell, + refreshInterval: refreshInterval, + refreshKnownTablets: refreshKnownTablets, + getTablets: getTablets, + sem: make(chan int, topoReadConcurrency), + tablets: make(map[string]*tabletInfo), } tw.firstLoadChan = make(chan struct{}) tw.ctx, tw.cancelFunc = context.WithCancel(context.Background()) @@ -163,7 +166,9 @@ func (tw *TopologyWatcher) watch() { func (tw *TopologyWatcher) loadTablets() { var wg sync.WaitGroup newTablets := make(map[string]*tabletInfo) - tabletAlias, err := tw.getTablets(tw) + replacedTablets := make(map[string]*tabletInfo) + + tabletAliases, err := tw.getTablets(tw) topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1) if err != nil { topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1) @@ -175,7 +180,17 @@ func (tw *TopologyWatcher) loadTablets() { log.Errorf("cannot get tablets for cell: %v: %v", tw.cell, err) return } - for _, tAlias := range tabletAlias { + + tw.mu.Lock() + for _, tAlias := range tabletAliases { + if !tw.refreshKnownTablets { + aliasStr := topoproto.TabletAliasString(tAlias) + if val, ok := tw.tablets[aliasStr]; ok { + newTablets[aliasStr] = val + continue + } + } + wg.Add(1) go func(alias *topodatapb.TabletAlias) { defer wg.Done() @@ -193,32 +208,55 @@ func (tw *TopologyWatcher) loadTablets() { log.Errorf("cannot get tablet for alias %v: %v", alias, err) return } - key := TabletToMapKey(tablet.Tablet) tw.mu.Lock() - newTablets[key] = &tabletInfo{ - alias: topoproto.TabletAliasString(alias), + aliasStr := topoproto.TabletAliasString(alias) + newTablets[aliasStr] = &tabletInfo{ + alias: aliasStr, + key: TabletToMapKey(tablet.Tablet), tablet: tablet.Tablet, } tw.mu.Unlock() }(tAlias) } + tw.mu.Unlock() wg.Wait() tw.mu.Lock() - for key, tep := range newTablets { - if val, ok := tw.tablets[key]; !ok { - tw.tr.AddTablet(tep.tablet, tep.alias) - topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) - } else if val.alias != tep.alias { - tw.tr.ReplaceTablet(val.tablet, tep.tablet, tep.alias) + for alias, newVal := range newTablets { + if val, ok := tw.tablets[alias]; !ok { + // Check if there's a tablet with the same address key but a + // different alias. If so, replace it and keep track of the + // replaced alias to make sure it isn't removed later. + found := false + for _, otherVal := range tw.tablets { + if newVal.key == otherVal.key { + found = true + tw.tr.ReplaceTablet(otherVal.tablet, newVal.tablet, alias) + topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1) + replacedTablets[otherVal.alias] = newVal + } + } + if !found { + tw.tr.AddTablet(newVal.tablet, alias) + topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1) + } + + } else if val.key != newVal.key { + // Handle the case where the same tablet alias is now reporting + // a different address key. + replacedTablets[alias] = newVal + tw.tr.ReplaceTablet(val.tablet, newVal.tablet, alias) topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1) } } - for key, tep := range tw.tablets { - if _, ok := newTablets[key]; !ok { - tw.tr.RemoveTablet(tep.tablet) - topologyWatcherOperations.Add(topologyWatcherOpRemoveTablet, 1) + + for _, val := range tw.tablets { + if _, ok := newTablets[val.alias]; !ok { + if _, ok2 := replacedTablets[val.alias]; !ok2 { + tw.tr.RemoveTablet(val.tablet) + topologyWatcherOperations.Add(topologyWatcherOpRemoveTablet, 1) + } } } tw.tablets = newTablets diff --git a/go/vt/discovery/topology_watcher_test.go b/go/vt/discovery/topology_watcher_test.go index 0dce5659f3f..56ae63d7279 100644 --- a/go/vt/discovery/topology_watcher_test.go +++ b/go/vt/discovery/topology_watcher_test.go @@ -22,24 +22,53 @@ import ( "github.com/golang/protobuf/proto" "golang.org/x/net/context" + "vitess.io/vitess/go/vt/logutil" topodatapb "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" ) +func checkOpCounts(t *testing.T, tw *TopologyWatcher, prevCounts, deltas map[string]int64) map[string]int64 { + t.Helper() + newCounts := topologyWatcherOperations.Counts() + for key, prevVal := range prevCounts { + delta, ok := deltas[key] + if !ok { + delta = 0 + } + newVal, ok := newCounts[key] + if !ok { + newVal = 0 + } + + if newVal != prevVal+delta { + t.Errorf("expected %v to increase by %v, got %v -> %v", key, delta, prevVal, newVal) + } + } + return newCounts +} + func TestCellTabletsWatcher(t *testing.T) { - checkWatcher(t, true) + checkWatcher(t, true, true) +} + +func TestCellTabletsWatcherNoRefreshKnown(t *testing.T) { + checkWatcher(t, true, false) } func TestShardReplicationWatcher(t *testing.T) { - checkWatcher(t, false) + checkWatcher(t, false, true) } -func checkWatcher(t *testing.T, cellTablets bool) { +func checkWatcher(t *testing.T, cellTablets, refreshKnownTablets bool) { ts := memorytopo.NewServer("aa") fhc := NewFakeHealthCheck() + logger := logutil.NewMemoryLogger() + topologyWatcherOperations.ZeroAll() + counts := topologyWatcherOperations.Counts() var tw *TopologyWatcher if cellTablets { - tw = NewCellTabletsWatcher(ts, fhc, "aa", 10*time.Minute, 5) + tw = NewCellTabletsWatcher(ts, fhc, "aa", 10*time.Minute, refreshKnownTablets, 5) } else { tw = NewShardReplicationWatcher(ts, fhc, "aa", "keyspace", "shard", 10*time.Minute, 5) } @@ -50,6 +79,7 @@ func checkWatcher(t *testing.T, cellTablets bool) { if err := tw.WaitForInitialTopology(); err != nil { t.Fatalf("initial WaitForInitialTopology failed") } + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1}) // Add a tablet to the topology. tablet := &topodatapb.Tablet{ @@ -68,6 +98,7 @@ func checkWatcher(t *testing.T, cellTablets bool) { t.Fatalf("CreateTablet failed: %v", err) } tw.loadTablets() + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) // Check the tablet is returned by GetAllTablets(). allTablets := fhc.GetAllTablets() @@ -76,8 +107,56 @@ func checkWatcher(t *testing.T, cellTablets bool) { t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) } + // Add a second tablet to the topology. + tablet2 := &topodatapb.Tablet{ + Alias: &topodatapb.TabletAlias{ + Cell: "aa", + Uid: 2, + }, + Hostname: "host2", + PortMap: map[string]int32{ + "vt": 789, + }, + Keyspace: "keyspace", + Shard: "shard", + } + if err := ts.CreateTablet(context.Background(), tablet2); err != nil { + t.Fatalf("CreateTablet failed: %v", err) + } + tw.loadTablets() + + // If refreshKnownTablets is disabled, only the new tablet is read + // from the topo + if refreshKnownTablets { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "AddTablet": 1}) + } else { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "AddTablet": 1}) + } + + // Check the new tablet is returned by GetAllTablets(). + allTablets = fhc.GetAllTablets() + key = TabletToMapKey(tablet2) + if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet2) { + t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2) + } + + // Load the tablets again to show that when refreshKnownTablets is disabled, + // only the list is read from the topo + tw.loadTablets() + if refreshKnownTablets { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2}) + } else { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1}) + } + // same tablet, different port, should update (previous - // one should go away, new one be added). + // one should go away, new one be added) + // + // if refreshKnownTablets is disabled, this case is *not* + // detected and the tablet remains in the topo using the + // old key + origTablet := proto.Clone(tablet).(*topodatapb.Tablet) + origKey := TabletToMapKey(tablet) tablet.PortMap["vt"] = 456 if _, err := ts.UpdateTabletFields(context.Background(), tablet.Alias, func(t *topodatapb.Tablet) error { t.PortMap["vt"] = 456 @@ -88,25 +167,99 @@ func checkWatcher(t *testing.T, cellTablets bool) { tw.loadTablets() allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) + + if refreshKnownTablets { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) + + if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet) { + t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) + } + if _, ok := allTablets[origKey]; ok { + t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, origKey) + } + } else { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1}) + + if _, ok := allTablets[origKey]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[origKey], origTablet) { + t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, origTablet) + } + if _, ok := allTablets[key]; ok { + t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) + } } - // Remove and re-add with a new uid. This should trigger a ReplaceTablet in loadTablets, - // because the uid does not match. - if err := ts.DeleteTablet(context.Background(), tablet.Alias); err != nil { + // Remove the second tablet and re-add with a new uid. This should + // trigger a ReplaceTablet in loadTablets because the uid does not + // match. + // + // This case *is* detected even if refreshKnownTablets is false + // because the delete tablet / create tablet sequence causes the + // list of tablets to change and therefore the change is detected. + if err := ts.DeleteTablet(context.Background(), tablet2.Alias); err != nil { t.Fatalf("DeleteTablet failed: %v", err) } - tablet.Alias.Uid = 1 - if err := ts.CreateTablet(context.Background(), tablet); err != nil { + tablet2.Alias.Uid = 3 + if err := ts.CreateTablet(context.Background(), tablet2); err != nil { t.Fatalf("CreateTablet failed: %v", err) } + if err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { + t.Fatalf("FixShardReplication failed: %v", err) + } tw.loadTablets() + allTablets = fhc.GetAllTablets() + + if refreshKnownTablets { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 2, "ReplaceTablet": 1}) + } else { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "ReplaceTablet": 1}) + } + key = TabletToMapKey(tablet2) + if _, ok := allTablets[key]; !ok || len(allTablets) != 2 || !proto.Equal(allTablets[key], tablet2) { + t.Errorf("fhc.GetAllTablets() = %+v; want %v => %+v", allTablets, key, tablet2) + } + + // Remove the tablet and check that it is detected as being gone. + if err := ts.DeleteTablet(context.Background(), tablet.Alias); err != nil { + t.Fatalf("DeleteTablet failed: %v", err) + } + if err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { + t.Fatalf("FixShardReplication failed: %v", err) + } + tw.loadTablets() + if refreshKnownTablets { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 1, "RemoveTablet": 1}) + } else { + counts = checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "RemoveTablet": 1}) + } allTablets = fhc.GetAllTablets() key = TabletToMapKey(tablet) - if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet) { - t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet) + if _, ok := allTablets[key]; ok || len(allTablets) != 1 { + t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) + } + key = TabletToMapKey(tablet2) + if _, ok := allTablets[key]; !ok || len(allTablets) != 1 || !proto.Equal(allTablets[key], tablet2) { + t.Errorf("fhc.GetAllTablets() = %+v; want %+v", allTablets, tablet2) + } + + // Remove the other and check that it is detected as being gone. + if err := ts.DeleteTablet(context.Background(), tablet2.Alias); err != nil { + t.Fatalf("DeleteTablet failed: %v", err) + } + if err := topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard"); err != nil { + t.Fatalf("FixShardReplication failed: %v", err) + } + tw.loadTablets() + checkOpCounts(t, tw, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1}) + + allTablets = fhc.GetAllTablets() + key = TabletToMapKey(tablet) + if _, ok := allTablets[key]; ok || len(allTablets) != 0 { + t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) + } + key = TabletToMapKey(tablet2) + if _, ok := allTablets[key]; ok || len(allTablets) != 0 { + t.Errorf("fhc.GetAllTablets() = %+v; don't want %v", allTablets, key) } tw.Stop() diff --git a/go/vt/vtctld/realtime_status.go b/go/vt/vtctld/realtime_status.go index bfbdfb824ae..bff54db1fea 100644 --- a/go/vt/vtctld/realtime_status.go +++ b/go/vt/vtctld/realtime_status.go @@ -51,7 +51,7 @@ func newRealtimeStats(ts *topo.Server) (*realtimeStats, error) { } var watchers []*discovery.TopologyWatcher for _, cell := range cells { - watcher := discovery.NewCellTabletsWatcher(ts, hc, cell, *vtctl.HealthCheckTopologyRefresh, discovery.DefaultTopoReadConcurrency) + watcher := discovery.NewCellTabletsWatcher(ts, hc, cell, *vtctl.HealthCheckTopologyRefresh, true /* refreshKnownTablets */, discovery.DefaultTopoReadConcurrency) watchers = append(watchers, watcher) } r.cellWatchers = watchers diff --git a/go/vt/vtgate/gateway/discoverygateway.go b/go/vt/vtgate/gateway/discoverygateway.go index d870dccc459..40a8c15d15c 100644 --- a/go/vt/vtgate/gateway/discoverygateway.go +++ b/go/vt/vtgate/gateway/discoverygateway.go @@ -46,6 +46,7 @@ var ( cellsToWatch = flag.String("cells_to_watch", "", "comma-separated list of cells for watching tablets") tabletFilters flagutil.StringListValue refreshInterval = flag.Duration("tablet_refresh_interval", 1*time.Minute, "tablet refresh interval") + refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes") topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads") allowedTabletTypes []topodatapb.TabletType ) @@ -116,7 +117,7 @@ func createDiscoveryGateway(hc discovery.HealthCheck, serv srvtopo.Server, cell tr = fbs } - ctw := discovery.NewCellTabletsWatcher(topoServer, tr, c, *refreshInterval, *topoReadConcurrency) + ctw := discovery.NewCellTabletsWatcher(topoServer, tr, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency) dg.tabletsWatchers = append(dg.tabletsWatchers, ctw) } dg.QueryService = queryservice.Wrap(nil, dg.withRetry)