diff --git a/go/vt/srvtopo/watch.go b/go/vt/srvtopo/watch.go index 4a0ccda2d59..e81fa2f5b76 100644 --- a/go/vt/srvtopo/watch.go +++ b/go/vt/srvtopo/watch.go @@ -161,12 +161,18 @@ func (entry *watchEntry) update(ctx context.Context, value any, err error, init entry.onValueLocked(value) } - listeners := entry.listeners - entry.listeners = entry.listeners[:0] - - for _, callback := range listeners { - if callback(entry.value, entry.lastError) { - entry.listeners = append(entry.listeners, callback) + // Only notify listeners on success or when no cached value exists after error processing. + // This prevents unnecessary notifications during topo outages when cached data is available. + shouldNotifyListeners := err == nil || entry.value == nil + + if shouldNotifyListeners { + listeners := entry.listeners + entry.listeners = entry.listeners[:0] + + for _, callback := range listeners { + if callback(entry.value, entry.lastError) { + entry.listeners = append(entry.listeners, callback) + } } } } diff --git a/go/vt/srvtopo/watch_test.go b/go/vt/srvtopo/watch_test.go new file mode 100644 index 00000000000..cc320a005ae --- /dev/null +++ b/go/vt/srvtopo/watch_test.go @@ -0,0 +1,243 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package srvtopo + +import ( + "context" + "fmt" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" + + vschemapb "vitess.io/vitess/go/vt/proto/vschema" +) + +// TestWatcherOutageBehavior tests that watchers remain silent during topo outages. +func TestWatcherOutageBehavior(t *testing.T) { + originalCacheTTL := srvTopoCacheTTL + originalCacheRefresh := srvTopoCacheRefresh + srvTopoCacheTTL = 100 * time.Millisecond + srvTopoCacheRefresh = 50 * time.Millisecond + defer func() { + srvTopoCacheTTL = originalCacheTTL + srvTopoCacheRefresh = originalCacheRefresh + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts, factory := memorytopo.NewServerAndFactory(ctx, "test_cell") + counts := stats.NewCountersWithSingleLabel("", "Watcher outage test", "type") + rs := NewResilientServer(ctx, ts, counts) + + initialVSchema := &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "ks1": {Sharded: false}, + }, + } + err := ts.UpdateSrvVSchema(ctx, "test_cell", initialVSchema) + require.NoError(t, err) + + var watcherCallCount atomic.Int32 + var lastWatcherError atomic.Value + + rs.WatchSrvVSchema(ctx, "test_cell", func(v *vschemapb.SrvVSchema, e error) bool { + watcherCallCount.Add(1) + if e != nil { + lastWatcherError.Store(e) + } else { + lastWatcherError.Store((*error)(nil)) + } + return true + }) + + // Wait for initial callback + assert.Eventually(t, func() bool { + return watcherCallCount.Load() >= 1 + }, 2*time.Second, 10*time.Millisecond) + + initialWatcherCalls := watcherCallCount.Load() + require.GreaterOrEqual(t, initialWatcherCalls, int32(1)) + if errPtr := lastWatcherError.Load(); errPtr != nil { + if err, ok := errPtr.(error); ok && err != nil { + require.NoError(t, err) + } + } + + // Verify Get operations work normally + vschema, err := rs.GetSrvVSchema(ctx, "test_cell") + require.NoError(t, err) + require.NotNil(t, vschema) + + // Simulate topo outage + factory.SetError(fmt.Errorf("simulated topo error")) + + // Get should still work from cache during outage + vschema, err = rs.GetSrvVSchema(ctx, "test_cell") + assert.NoError(t, err) + assert.NotNil(t, vschema) + + // Wait during outage period + outageDuration := 500 * time.Millisecond + time.Sleep(outageDuration) + + // Watchers should remain silent during outage + watcherCallsDuringOutage := watcherCallCount.Load() - initialWatcherCalls + assert.Equal(t, int32(0), watcherCallsDuringOutage, "watchers should be silent during outage") + + // Get operations should continue working from cache + vschema, err = rs.GetSrvVSchema(ctx, "test_cell") + assert.NoError(t, err) + assert.NotNil(t, vschema) + + // Clear the error and update VSchema + factory.SetError(nil) + updatedVSchema := &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "ks2": {Sharded: false}, + }, + } + err = ts.UpdateSrvVSchema(ctx, "test_cell", updatedVSchema) + require.NoError(t, err) + + // Verify recovery callback occurs + watcherCallsBeforeRecovery := watcherCallCount.Load() + assert.Eventually(t, func() bool { + errPtr := lastWatcherError.Load() + isNoError := errPtr == nil || (errPtr.(*error) == nil) + return watcherCallCount.Load() > watcherCallsBeforeRecovery && isNoError + }, 2*time.Second, 10*time.Millisecond) + + // Verify recovery worked + vschema, err = rs.GetSrvVSchema(ctx, "test_cell") + assert.NoError(t, err) + assert.NotNil(t, vschema) +} + +// TestVSchemaWatcherCacheExpiryBehavior tests cache behavior during different error types. +func TestVSchemaWatcherCacheExpiryBehavior(t *testing.T) { + originalCacheTTL := srvTopoCacheTTL + originalCacheRefresh := srvTopoCacheRefresh + srvTopoCacheTTL = 100 * time.Millisecond + srvTopoCacheRefresh = 50 * time.Millisecond + defer func() { + srvTopoCacheTTL = originalCacheTTL + srvTopoCacheRefresh = originalCacheRefresh + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts, factory := memorytopo.NewServerAndFactory(ctx, "test_cell") + counts := stats.NewCountersWithSingleLabel("", "Cache expiry test", "type") + rs := NewResilientServer(ctx, ts, counts) + + // Set initial VSchema + initialVSchema := &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "ks1": {Sharded: false}, + }, + } + err := ts.UpdateSrvVSchema(ctx, "test_cell", initialVSchema) + require.NoError(t, err) + + // Get the initial value to populate cache + vschema, err := rs.GetSrvVSchema(ctx, "test_cell") + require.NoError(t, err) + require.NotNil(t, vschema) + + // Wait for cache TTL to expire + time.Sleep(srvTopoCacheTTL + 10*time.Millisecond) + + // Set a non-topo error (like 500 HTTP error) + nonTopoErr := fmt.Errorf("HTTP 500 internal server error") + factory.SetError(nonTopoErr) + + // Get VSchema after TTL expiry with non-topo error + // Should still serve cached value (not the error) + vschema, err = rs.GetSrvVSchema(ctx, "test_cell") + assert.NoError(t, err, "Should serve cached value for non-topo errors even after TTL") + assert.NotNil(t, vschema, "Should return cached VSchema") + + // Now test with a topo error + factory.SetError(topo.NewError(topo.Timeout, "topo timeout error")) + time.Sleep(srvTopoCacheTTL + 10*time.Millisecond) // Let TTL expire again + + // With topo error after TTL expiry, cache should be cleared + vschema, err = rs.GetSrvVSchema(ctx, "test_cell") + assert.Error(t, err, "Should return error for topo errors after TTL expiry") + assert.True(t, topo.IsErrType(err, topo.Timeout), "Should return the topo error") + assert.Nil(t, vschema, "Should not return vschema when error occurs") +} + +// TestWatcherShouldOnlyNotifyOnActualChanges tests that watchers are called when VSchema content changes. +func TestWatcherShouldOnlyNotifyOnActualChanges(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts := memorytopo.NewServer(ctx, "test_cell") + counts := stats.NewCountersWithSingleLabel("", "Change detection test", "type") + rs := NewResilientServer(ctx, ts, counts) + + vschema := &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "ks1": {Sharded: false}, + }, + } + err := ts.UpdateSrvVSchema(ctx, "test_cell", vschema) + require.NoError(t, err) + + var callCount atomic.Int32 + rs.WatchSrvVSchema(ctx, "test_cell", func(v *vschemapb.SrvVSchema, e error) bool { + callCount.Add(1) + return true + }) + + // Wait for initial call + assert.Eventually(t, func() bool { + return callCount.Load() >= 1 + }, 1*time.Second, 10*time.Millisecond) + + initialCalls := callCount.Load() + + // Update with same vschema content + err = ts.UpdateSrvVSchema(ctx, "test_cell", vschema) + require.NoError(t, err) + + time.Sleep(100 * time.Millisecond) + callsAfterSameUpdate := callCount.Load() + + t.Logf("Calls after same content update: %d", callsAfterSameUpdate-initialCalls) + + // Update with different vschema + differentVSchema := &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "ks2": {Sharded: true}, + }, + } + err = ts.UpdateSrvVSchema(ctx, "test_cell", differentVSchema) + require.NoError(t, err) + + // Should trigger a call for actual changes + assert.Eventually(t, func() bool { + return callCount.Load() > callsAfterSameUpdate + }, 1*time.Second, 10*time.Millisecond) +} diff --git a/go/vt/topo/consultopo/server_flaky_test.go b/go/vt/topo/consultopo/server_flaky_test.go index a987336dd01..4e2526e710b 100644 --- a/go/vt/topo/consultopo/server_flaky_test.go +++ b/go/vt/topo/consultopo/server_flaky_test.go @@ -23,18 +23,22 @@ import ( "os" "os/exec" "path" + "sync/atomic" "testing" "time" - "vitess.io/vitess/go/vt/log" - "github.com/hashicorp/consul/api" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/testfiles" + "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/test" - - topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) // startConsul starts a consul subprocess, and waits for it to be ready. @@ -319,3 +323,120 @@ func TestConsulTopoWithAuthFailure(t *testing.T) { t.Errorf("Expected CreateCellInfo to fail: got %v, want %s", err, want) } } + +// TestConsulWatcherStormPrevention tests that resilient watchers don't storm subscribers during Consul outages. +// This test validates the fix for the specific Consul storm scenario reported by the team. +func TestConsulWatcherStormPrevention(t *testing.T) { + // Save original values and restore them after the test + originalWatchPollDuration := watchPollDuration + originalAuthFile := consulAuthClientStaticFile + defer func() { + watchPollDuration = originalWatchPollDuration + consulAuthClientStaticFile = originalAuthFile + }() + + // Configure test settings - using direct assignment since flag parsing in tests is complex + watchPollDuration = 100 * time.Millisecond // Faster polling for test + consulAuthClientStaticFile = "" // Clear auth file to avoid conflicts + + // Start Consul server + cmd, configFilename, serverAddr := startConsul(t, "") + defer func() { + if err := cmd.Process.Kill(); err != nil { + log.Errorf("cmd process kill has an error: %v", err) + } + if err := cmd.Wait(); err != nil { + log.Errorf("cmd wait has an error: %v", err) + } + os.Remove(configFilename) + }() + + ctx := context.Background() + testRoot := "storm-test" + + // Create the topo server + ts, err := topo.OpenServer("consul", serverAddr, path.Join(testRoot, topo.GlobalCell)) + require.NoError(t, err, "OpenServer() failed") + + // Create the CellInfo + cellName := "test_cell" + err = ts.CreateCellInfo(ctx, cellName, &topodatapb.CellInfo{ + ServerAddress: serverAddr, + Root: path.Join(testRoot, cellName), + }) + require.NoError(t, err, "CreateCellInfo() failed") + + // Create resilient server + counts := stats.NewCountersWithSingleLabel("", "Consul storm test", "type") + rs := srvtopo.NewResilientServer(ctx, ts, counts) + + // Set initial VSchema + initialVSchema := &vschemapb.SrvVSchema{ + Keyspaces: map[string]*vschemapb.Keyspace{ + "test_keyspace": {Sharded: false}, + }, + } + err = ts.UpdateSrvVSchema(ctx, cellName, initialVSchema) + require.NoError(t, err, "UpdateSrvVSchema() failed") + + // Set up watcher with call counter + var watcherCallCount atomic.Int32 + var lastWatcherError error + + rs.WatchSrvVSchema(ctx, cellName, func(v *vschemapb.SrvVSchema, e error) bool { + count := watcherCallCount.Add(1) + lastWatcherError = e + if e != nil { + t.Logf("Watcher callback #%d - error: %v", count, e) + } else { + t.Logf("Watcher callback #%d - success", count) + } + return true + }) + + // Wait for initial callback + assert.Eventually(t, func() bool { + return watcherCallCount.Load() >= 1 + }, 10*time.Second, 10*time.Millisecond) + + initialWatcherCalls := watcherCallCount.Load() + require.GreaterOrEqual(t, initialWatcherCalls, int32(1), "Expected at least 1 initial watcher call") + require.NoError(t, lastWatcherError, "Initial watcher call should not have error") + + // Verify Get operations work normally + vschema, err := rs.GetSrvVSchema(ctx, cellName) + require.NoError(t, err, "GetSrvVSchema() failed") + require.NotNil(t, vschema, "GetSrvVSchema() returned nil") + + t.Logf("Setup complete. Initial watcher calls: %d", initialWatcherCalls) + + // Simulate Consul outage by killing the Consul process + // This will cause watch errors which previously triggered storms + err = cmd.Process.Kill() + require.NoError(t, err, "Failed to kill consul process") + + // Get should still work from cache during outage + vschema, err = rs.GetSrvVSchema(ctx, cellName) + assert.NoError(t, err, "GetSrvVSchema() should work from cache during outage") + assert.NotNil(t, vschema, "GetSrvVSchema() should return cached value during outage") + + // Wait during outage period - this is when storms would occur without our fix + outageDuration := 2 * time.Second + t.Logf("Waiting %v during Consul outage to check for watcher storms...", outageDuration) + time.Sleep(outageDuration) + + // Check watcher calls during outage - key assertion for storm prevention + watcherCallsDuringOutage := watcherCallCount.Load() - initialWatcherCalls + t.Logf("Watcher calls during outage: %d", watcherCallsDuringOutage) + + // With our fix, watchers should remain silent during outage when cached data is available + // This is the core validation: no storm of subscriber calls during Consul outages + assert.Equal(t, int32(0), watcherCallsDuringOutage, "Watchers should remain completely silent during Consul outage") + + // Get operations should continue working from cache + vschema, err = rs.GetSrvVSchema(ctx, cellName) + assert.NoError(t, err, "GetSrvVSchema() should continue working from cache") + assert.NotNil(t, vschema, "GetSrvVSchema() should continue returning cached value") + + t.Log("Consul storm prevention test completed - watchers remained quiet during outage") +}