From 49a067a1a59884fe4b173b49d6f59f6c9341f767 Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Mon, 30 Jun 2025 16:05:50 -0400 Subject: [PATCH] Topo: Add NamedLock test for zk2 and consul and get them passing (#18407) Signed-off-by: Matt Lord --- go/test/endtoend/topotest/consul/main_test.go | 54 +++++++++++++++-- go/test/endtoend/topotest/zk2/main_test.go | 58 +++++++++++++++++-- go/vt/topo/test/lock.go | 11 ---- go/vt/topo/test/trylock.go | 11 ---- go/vt/topo/zk2topo/lock.go | 4 +- go/vt/topo/zk2topo/utils.go | 6 +- 6 files changed, 106 insertions(+), 38 deletions(-) diff --git a/go/test/endtoend/topotest/consul/main_test.go b/go/test/endtoend/topotest/consul/main_test.go index b71551dc6b7..33f7677f857 100644 --- a/go/test/endtoend/topotest/consul/main_test.go +++ b/go/test/endtoend/topotest/consul/main_test.go @@ -24,16 +24,15 @@ import ( "testing" "time" - topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils" - "vitess.io/vitess/go/vt/log" - "vitess.io/vitess/go/vt/topo" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" + topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" ) var ( @@ -221,6 +220,53 @@ func TestKeyspaceLocking(t *testing.T) { topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true) } +// TestNamedLocking tests that named locking works as intended. +func TestNamedLocking(t *testing.T) { + // Create topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot) + require.NoError(t, err) + + ctx := context.Background() + lockName := "TestNamedLocking" + action := "Testing" + + // Acquire a named lock. + ctx, unlock, err := ts.LockName(ctx, lockName, action) + require.NoError(t, err) + + // Check that we can't reacquire it from the same context. + _, _, err = ts.LockName(ctx, lockName, action) + require.ErrorContains(t, err, fmt.Sprintf("lock for named %s is already held", lockName)) + + // Check that CheckNameLocked doesn't return an error as we should still be + // holding the lock. + err = topo.CheckNameLocked(ctx, lockName) + require.NoError(t, err) + + // We'll now try to acquire the lock from a different goroutine. + secondCallerAcquired := false + go func() { + _, unlock, err := ts.LockName(context.Background(), lockName, action) + defer unlock(&err) + require.NoError(t, err) + secondCallerAcquired = true + }() + + // Wait for some time and ensure that the second attempt at acquiring the lock + // is blocked. + time.Sleep(100 * time.Millisecond) + require.False(t, secondCallerAcquired) + + // Unlock the name. + unlock(&err) + // Check that we no longer have the named lock. + err = topo.CheckNameLocked(ctx, lockName) + require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName)) + + // Wait to see that the second goroutine WAS now able to acquire the named lock. + topoutils.WaitForBoolValue(t, &secondCallerAcquired, true) +} + func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { t.Helper() qr, err := conn.ExecuteFetch(query, 1000, true) diff --git a/go/test/endtoend/topotest/zk2/main_test.go b/go/test/endtoend/topotest/zk2/main_test.go index 95a2fc13894..29c5cb89406 100644 --- a/go/test/endtoend/topotest/zk2/main_test.go +++ b/go/test/endtoend/topotest/zk2/main_test.go @@ -19,21 +19,20 @@ package zk2 import ( "context" "flag" + "fmt" "os" "testing" "time" - topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils" - "vitess.io/vitess/go/test/endtoend/utils" - "vitess.io/vitess/go/vt/topo" - - "vitess.io/vitess/go/vt/log" - "github.com/stretchr/testify/require" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" + topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils" + "vitess.io/vitess/go/test/endtoend/utils" + "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/topo" ) var ( @@ -97,6 +96,53 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } +// TestNamedLocking tests that named locking works as intended. +func TestNamedLocking(t *testing.T) { + // Create topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot) + require.NoError(t, err) + + ctx := context.Background() + lockName := "TestNamedLocking" + action := "Testing" + + // Acquire a named lock. + ctx, unlock, err := ts.LockName(ctx, lockName, action) + require.NoError(t, err) + + // Check that we can't reacquire it from the same context. + _, _, err = ts.LockName(ctx, lockName, action) + require.ErrorContains(t, err, fmt.Sprintf("lock for named %s is already held", lockName)) + + // Check that CheckNameLocked doesn't return an error as we should still be + // holding the lock. + err = topo.CheckNameLocked(ctx, lockName) + require.NoError(t, err) + + // We'll now try to acquire the lock from a different goroutine. + secondCallerAcquired := false + go func() { + _, unlock, err := ts.LockName(context.Background(), lockName, action) + defer unlock(&err) + require.NoError(t, err) + secondCallerAcquired = true + }() + + // Wait for some time and ensure that the second attempt at acquiring the lock + // is blocked. + time.Sleep(100 * time.Millisecond) + require.False(t, secondCallerAcquired) + + // Unlock the name. + unlock(&err) + // Check that we no longer have the named lock. + err = topo.CheckNameLocked(ctx, lockName) + require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName)) + + // Wait to see that the second goroutine WAS now able to acquire the named lock. + topoutils.WaitForBoolValue(t, &secondCallerAcquired, true) +} + func TestTopoDownServingQuery(t *testing.T) { ctx := context.Background() vtParams := mysql.ConnParams{ diff --git a/go/vt/topo/test/lock.go b/go/vt/topo/test/lock.go index dce51ed859d..83c6a181b40 100644 --- a/go/vt/topo/test/lock.go +++ b/go/vt/topo/test/lock.go @@ -47,9 +47,6 @@ func checkLock(t *testing.T, ctx context.Context, ts *topo.Server) { t.Log("=== checkLockTimeout") checkLockTimeout(ctx, t, conn) - t.Log("=== checkLockMissing") - checkLockMissing(ctx, t, conn) - t.Log("=== checkLockUnblocks") checkLockUnblocks(ctx, t, conn) } @@ -121,14 +118,6 @@ func checkLockTimeout(ctx context.Context, t *testing.T, conn topo.Conn) { } } -// checkLockMissing makes sure we can't lock a non-existing directory. -func checkLockMissing(ctx context.Context, t *testing.T, conn topo.Conn) { - keyspacePath := path.Join(topo.KeyspacesPath, "test_keyspace_666") - if _, err := conn.Lock(ctx, keyspacePath, "missing"); err == nil { - t.Fatalf("Lock(test_keyspace_666) worked for non-existing keyspace") - } -} - // checkLockUnblocks makes sure that a routine waiting on a lock // is unblocked when another routine frees the lock func checkLockUnblocks(ctx context.Context, t *testing.T, conn topo.Conn) { diff --git a/go/vt/topo/test/trylock.go b/go/vt/topo/test/trylock.go index c553e74bb61..3303cdfcd62 100644 --- a/go/vt/topo/test/trylock.go +++ b/go/vt/topo/test/trylock.go @@ -44,9 +44,6 @@ func checkTryLock(t *testing.T, ctx context.Context, ts *topo.Server) { t.Log("=== checkTryLockTimeout") checkTryLockTimeout(ctx, t, conn) - t.Log("=== checkTryLockMissing") - checkTryLockMissing(ctx, t, conn) - t.Log("=== checkTryLockUnblocks") checkTryLockUnblocks(ctx, t, conn) } @@ -142,14 +139,6 @@ func checkTryLockTimeout(ctx context.Context, t *testing.T, conn topo.Conn) { } } -// checkTryLockMissing makes sure we can't lock a non-existing directory. -func checkTryLockMissing(ctx context.Context, t *testing.T, conn topo.Conn) { - keyspacePath := path.Join(topo.KeyspacesPath, "test_keyspace_666") - if _, err := conn.TryLock(ctx, keyspacePath, "missing"); err == nil { - require.Fail(t, "TryLock(test_keyspace_666) worked for non-existing keyspace") - } -} - // unlike 'checkLockUnblocks', checkTryLockUnblocks will not block on other client but instead // keep retrying until it gets the lock. func checkTryLockUnblocks(ctx context.Context, t *testing.T, conn topo.Conn) { diff --git a/go/vt/topo/zk2topo/lock.go b/go/vt/topo/zk2topo/lock.go index fdd9fbd0137..4d42587faad 100644 --- a/go/vt/topo/zk2topo/lock.go +++ b/go/vt/topo/zk2topo/lock.go @@ -88,8 +88,8 @@ func (zs *Server) lock(ctx context.Context, dirPath, contents string) (topo.Lock // sequential nodes, they are created as children, not siblings. locksDir := path.Join(zs.root, dirPath, locksPath) + "/" - // Create the locks path, possibly creating the parent. - nodePath, err := CreateRecursive(ctx, zs.conn, locksDir, []byte(contents), zk.FlagSequence|zk.FlagEphemeral, zk.WorldACL(PermFile), 1) + // Create the lock path, creating the parents as needed. + nodePath, err := CreateRecursive(ctx, zs.conn, locksDir, []byte(contents), zk.FlagSequence|zk.FlagEphemeral, zk.WorldACL(PermFile), -1) if err != nil { return nil, convertError(err, locksDir) } diff --git a/go/vt/topo/zk2topo/utils.go b/go/vt/topo/zk2topo/utils.go index 08c95e60450..0378d7facd3 100644 --- a/go/vt/topo/zk2topo/utils.go +++ b/go/vt/topo/zk2topo/utils.go @@ -17,20 +17,18 @@ limitations under the License. package zk2topo import ( + "context" "fmt" "path" "sort" "strings" "sync" - "context" - "github.com/z-division/go-zookeeper/zk" + "vitess.io/vitess/go/fileutil" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vterrors" - - "vitess.io/vitess/go/fileutil" ) // CreateRecursive is a helper function on top of Create. It will