From 3ff4c6283e1dacd7b4915140438d052331f6d3af Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 5 Dec 2025 16:22:48 +0000 Subject: [PATCH 1/5] VDiff: wait for workflow lock with exponential backoff Signed-off-by: Matt Lord --- .../tabletmanager/vdiff/table_differ.go | 43 ++++++++++++++----- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 06c8db52225..fc77f1bf826 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -121,11 +121,34 @@ func (td *tableDiffer) initialize(ctx context.Context) error { targetKeyspace := td.wd.ct.vde.thisTablet.Keyspace lockName := fmt.Sprintf("%s/%s", targetKeyspace, td.wd.ct.workflow) - log.Infof("Locking workflow %s", lockName) - ctx, unlock, lockErr := td.wd.ct.ts.LockName(ctx, lockName, "vdiff") - if lockErr != nil { - log.Errorf("Locking workfkow %s failed: %v", lockName, lockErr) - return lockErr + log.Infof("Locking workflow %s for VDiff %s", lockName, td.wd.ct.uuid) + // We attempt to get the lock until we can, using an exponential backoff. + var ( + vctx context.Context + unlock func(*error) + lockErr error + retryDelay = 100 * time.Millisecond + maxRetryDelay = topo.LockTimeout + backoffFactor = 1.5 + ) + for { + vctx, unlock, lockErr = td.wd.ct.ts.LockName(ctx, lockName, "vdiff") + if lockErr == nil { + break + } + log.Warningf("Locking workfkow %s for VDiff %s initialization (stream ID: %d) failed, will wait %v before retrying: %v", + lockName, td.wd.ct.uuid, td.wd.ct.id, retryDelay, lockErr) + select { + case <-ctx.Done(): + return vterrors.Errorf(vtrpcpb.Code_CANCELED, "engine is shutting down") + case <-td.wd.ct.done: + return ErrVDiffStoppedByUser + case <-time.After(retryDelay): + if retryDelay < maxRetryDelay { + retryDelay = min(time.Duration(float64(retryDelay)*backoffFactor), maxRetryDelay) + } + continue + } } var err error @@ -136,7 +159,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { } }() - if err := td.stopTargetVReplicationStreams(ctx, dbClient); err != nil { + if err := td.stopTargetVReplicationStreams(vctx, dbClient); err != nil { return err } defer func() { @@ -151,18 +174,18 @@ func (td *tableDiffer) initialize(ctx context.Context) error { } }() - td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(ctx) + td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(vctx) - if err := td.selectTablets(ctx); err != nil { + if err := td.selectTablets(vctx); err != nil { return err } - if err := td.syncSourceStreams(ctx); err != nil { + if err := td.syncSourceStreams(vctx); err != nil { return err } if err := td.startSourceDataStreams(td.shardStreamsCtx); err != nil { return err } - if err := td.syncTargetStreams(ctx); err != nil { + if err := td.syncTargetStreams(vctx); err != nil { return err } if err := td.startTargetDataStream(td.shardStreamsCtx); err != nil { From b1100bba2113daff745a4821dfc4e9750d688d9a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 8 Jan 2026 13:48:52 +0000 Subject: [PATCH 2/5] Add tests Signed-off-by: Matt Lord --- .../vdiff/table_differ_lock_test.go | 158 ++++++++++++++++++ 1 file changed, 158 insertions(+) create mode 100644 go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go new file mode 100644 index 00000000000..2e5614d754b --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go @@ -0,0 +1,158 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vdiff + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +// TestWorkflowLockRetriesWithMultipleShards tests that multiple concurrent +// VDiff operations (simulating one per shard) can all successfully acquire +// the workflow lock using exponential backoff retry logic. +func TestWorkflowLockRetriesWithMultipleShards(t *testing.T) { + numShards := 64 + ctx, cancel := context.WithTimeout(t.Context(), 60*time.Second) + defer cancel() + ts := memorytopo.NewServer(ctx, "zone1") + defer ts.Close() + keyspace := "testkeyspace" + workflow := "testwf" + lockName := fmt.Sprintf("%s/%s", keyspace, workflow) + var successCount atomic.Int32 + var wg sync.WaitGroup + + // Simulate multiple concurrent table diffs (one per shard) all trying to acquire the same named lock. + for shard := range numShards { + wg.Add(1) + go func(shardID int) { + defer wg.Done() + retryDelay := 10 * time.Millisecond + maxRetryDelay := 100 * time.Millisecond + backoffFactor := 1.5 + + for { + attemptCtx, attemptCancel := context.WithTimeout(ctx, topo.LockTimeout) + _, unlock, lockErr := ts.LockName(attemptCtx, lockName, "vdiff") + if lockErr == nil { + time.Sleep(1 * time.Millisecond) // Simulate initialization work + var unlockErr error + unlock(&unlockErr) + attemptCancel() + successCount.Add(1) + return + } + + // Retry with exponential backoff. + attemptCancel() + + select { + case <-ctx.Done(): + return + case <-time.After(retryDelay): + if retryDelay < maxRetryDelay { + retryDelay = min(time.Duration(float64(retryDelay)*backoffFactor), maxRetryDelay) + } + continue + } + } + }(shard) + } + wg.Wait() + + // All shards should have successfully acquired the lock. + require.Equal(t, int32(numShards), successCount.Load(), + "all shards should successfully acquire the lock with retry logic") +} + +// TestWorkflowLockExponentialBackoff tests that the exponential backoff +// retry logic works correctly when acquiring workflow locks. +func TestWorkflowLockExponentialBackoff(t *testing.T) { + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + ts := memorytopo.NewServer(ctx, "zone1") + defer ts.Close() + keyspace := "testkeyspace" + workflow := "testwf" + lockName := fmt.Sprintf("%s/%s", keyspace, workflow) + var ( + retryCount atomic.Int32 + gotLock atomic.Bool + wg sync.WaitGroup + ) + + // First, acquire the lock and hold it. + _, unlock, err := ts.LockName(ctx, lockName, "test") + require.NoError(t, err) + + // Start a goroutine that will try to acquire the lock that is held. + wg.Go(func() { + retryDelay := 10 * time.Millisecond + maxRetryDelay := 100 * time.Millisecond + backoffFactor := 1.5 + + for { + attemptCtx, attemptCancel := context.WithTimeout(ctx, maxRetryDelay) + _, attemptUnlock, attemptErr := ts.LockName(attemptCtx, lockName, "test") + if attemptErr == nil { + gotLock.Store(true) + defer attemptUnlock(&attemptErr) + defer attemptCancel() + return + } + + attemptCancel() + retryCount.Add(1) + + select { + case <-ctx.Done(): + return + case <-time.After(retryDelay): + if retryDelay < maxRetryDelay { + retryDelay = min(time.Duration(float64(retryDelay)*backoffFactor), maxRetryDelay) + } + continue + } + } + }) + + // Wait for multiple retries. + require.Eventually(t, func() bool { + return retryCount.Load() > 1 + }, 5*time.Second, 50*time.Millisecond) + + // Release the lock. + var unlockErr error + unlock(&unlockErr) + require.NoError(t, unlockErr) + + // Wait for the goroutine to get the lock now and finish. + wg.Wait() + + // Verify the lock was eventually acquired and there were retries. + require.True(t, gotLock.Load(), "the lock should have been acquired after retries") + require.Greater(t, retryCount.Load(), int32(1), "we should have retried at least twice") +} From 24d7d18fad0591d7d1489bf14ad6f1e97f1fc93b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 9 Jan 2026 04:35:41 +0000 Subject: [PATCH 3/5] Add some jitter to the retry delay Signed-off-by: Matt Lord --- .../tabletmanager/vdiff/table_differ.go | 8 ++- .../vdiff/table_differ_lock_test.go | 70 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 6b94d1b780e..007f56cc5d5 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -21,6 +21,7 @@ import ( "encoding/json" "errors" "fmt" + "math/rand/v2" "slices" "strings" "sync" @@ -136,7 +137,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { if lockErr == nil { break } - log.Warningf("Locking workfkow %s for VDiff %s initialization (stream ID: %d) failed, will wait %v before retrying: %v", + log.Warningf("Locking workflow %s for VDiff %s initialization (stream ID: %d) failed, will wait %v before retrying: %v", lockName, td.wd.ct.uuid, td.wd.ct.id, retryDelay, lockErr) select { case <-ctx.Done(): @@ -147,6 +148,11 @@ func (td *tableDiffer) initialize(ctx context.Context) error { if retryDelay < maxRetryDelay { retryDelay = min(time.Duration(float64(retryDelay)*backoffFactor), maxRetryDelay) } + // Add jitter to prevent thundering herds: ±25% of original retryDelay. + // This means that we may wait up to maxRetryDelay * 1.25, but it prevents all of + // the waiters from eventually waiting for the fixed maxRetryDelay period. + jitter := time.Duration(rand.IntN(int(retryDelay) / 2)) + retryDelay = retryDelay - (retryDelay / 4) + jitter continue } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go index 2e5614d754b..2ad5de6f7d7 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go @@ -19,6 +19,7 @@ package vdiff import ( "context" "fmt" + "math/rand/v2" "sync" "sync/atomic" "testing" @@ -156,3 +157,72 @@ func TestWorkflowLockExponentialBackoff(t *testing.T) { require.True(t, gotLock.Load(), "the lock should have been acquired after retries") require.Greater(t, retryCount.Load(), int32(1), "we should have retried at least twice") } + +// TestWorkflowLockJitter tests that jitter is correctly applied to retry delays +// to prevent thundering herd issues. +func TestWorkflowLockJitter(t *testing.T) { + testCases := []struct { + name string + retryDelay time.Duration + expectedMin time.Duration + expectedMax time.Duration + iterations int + uniqueDelays int // minimum number of unique delays we expect + }{ + { + name: "10ms delay", + retryDelay: 10 * time.Millisecond, + expectedMin: 7 * time.Millisecond, // 10 - (10/4) = 7.5ms + expectedMax: 13 * time.Millisecond, // 10 - (10/4) + (10/2) = 12.5ms (+ tolerance) + iterations: 100, + uniqueDelays: 3, // Should see multiple different delays + }, + { + name: "100ms delay", + retryDelay: 100 * time.Millisecond, + expectedMin: 75 * time.Millisecond, // 100 - 25 = 75ms + expectedMax: 125 * time.Millisecond, // 100 - 25 + 50 = 125ms + iterations: 100, + uniqueDelays: 10, // Should see many different delays + }, + { + name: "1s delay", + retryDelay: 1 * time.Second, + expectedMin: 750 * time.Millisecond, // 1000 - 250 = 750ms + expectedMax: 1250 * time.Millisecond, // 1000 - 250 + 500 = 1250ms + iterations: 100, + uniqueDelays: 20, // Should see many different delays + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + seenDelays := make(map[time.Duration]bool) + var minSeen, maxSeen time.Duration = tc.retryDelay, 0 + + for range tc.iterations { + retryDelay := tc.retryDelay + jitter := time.Duration(rand.IntN(int(retryDelay) / 2)) + jitteredDelay := retryDelay - (retryDelay / 4) + jitter + + // Track min/max. + if jitteredDelay < minSeen { + minSeen = jitteredDelay + } + if jitteredDelay > maxSeen { + maxSeen = jitteredDelay + } + + seenDelays[jitteredDelay] = true + + // Verify the jittered delay is within expected bounds. + require.GreaterOrEqual(t, jitteredDelay, tc.expectedMin, "jittered delay should be >= %v", tc.expectedMin) + require.LessOrEqual(t, jitteredDelay, tc.expectedMax, "jittered delay should be <= %v", tc.expectedMax) + } + + // Verify we're seeing variety in the delays (not always the same value). + require.GreaterOrEqual(t, len(seenDelays), tc.uniqueDelays, + "should see at least %d unique delays, got %d", tc.uniqueDelays, len(seenDelays)) + }) + } +} From 93f8d38c12a14332d69a072efc19b1984223d94e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 9 Jan 2026 14:36:48 +0000 Subject: [PATCH 4/5] Tiny nit Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go index 2ad5de6f7d7..26b52ebe850 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go @@ -198,7 +198,7 @@ func TestWorkflowLockJitter(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { seenDelays := make(map[time.Duration]bool) - var minSeen, maxSeen time.Duration = tc.retryDelay, 0 + var minSeen, maxSeen time.Duration for range tc.iterations { retryDelay := tc.retryDelay From 9014534fa596856a493a5643f36020646755addf Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 10 Jan 2026 05:14:01 +0000 Subject: [PATCH 5/5] Correct copyright year :-) Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go index 26b52ebe850..963f80c2773 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ_lock_test.go @@ -1,5 +1,5 @@ /* -Copyright 2025 The Vitess Authors. +Copyright 2026 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.