Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 50 additions & 4 deletions go/test/endtoend/topotest/consul/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,15 @@ import (
"testing"
"time"

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
)

var (
Expand Down Expand Up @@ -221,6 +220,53 @@ func TestKeyspaceLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestNamedLocking tests that named locking works as intended.
func TestNamedLocking(t *testing.T) {
// Create topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
require.NoError(t, err)

ctx := context.Background()
lockName := "TestNamedLocking"
action := "Testing"

// Acquire a named lock.
ctx, unlock, err := ts.LockName(ctx, lockName, action)
require.NoError(t, err)

// Check that we can't reacquire it from the same context.
_, _, err = ts.LockName(ctx, lockName, action)
require.ErrorContains(t, err, fmt.Sprintf("lock for named %s is already held", lockName))

// Check that CheckNameLocked doesn't return an error as we should still be
// holding the lock.
err = topo.CheckNameLocked(ctx, lockName)
require.NoError(t, err)

// We'll now try to acquire the lock from a different goroutine.
secondCallerAcquired := false
go func() {
_, unlock, err := ts.LockName(context.Background(), lockName, action)
defer unlock(&err)
require.NoError(t, err)
secondCallerAcquired = true
}()

// Wait for some time and ensure that the second attempt at acquiring the lock
// is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondCallerAcquired)

// Unlock the name.
unlock(&err)
// Check that we no longer have the named lock.
err = topo.CheckNameLocked(ctx, lockName)
require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName))

// Wait to see that the second goroutine WAS now able to acquire the named lock.
topoutils.WaitForBoolValue(t, &secondCallerAcquired, true)
}

func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
Expand Down
58 changes: 52 additions & 6 deletions go/test/endtoend/topotest/zk2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,20 @@ package zk2
import (
"context"
"flag"
"fmt"
"os"
"testing"
"time"

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/topo"

"vitess.io/vitess/go/vt/log"

"github.com/stretchr/testify/require"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo"
)

var (
Expand Down Expand Up @@ -97,6 +96,53 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}

// TestNamedLocking tests that named locking works as intended.
func TestNamedLocking(t *testing.T) {
// Create topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldClientProcess.TopoGlobalAddress, clusterInstance.VtctldClientProcess.TopoGlobalRoot)
require.NoError(t, err)

ctx := context.Background()
lockName := "TestNamedLocking"
action := "Testing"

// Acquire a named lock.
ctx, unlock, err := ts.LockName(ctx, lockName, action)
require.NoError(t, err)

// Check that we can't reacquire it from the same context.
_, _, err = ts.LockName(ctx, lockName, action)
require.ErrorContains(t, err, fmt.Sprintf("lock for named %s is already held", lockName))

// Check that CheckNameLocked doesn't return an error as we should still be
// holding the lock.
err = topo.CheckNameLocked(ctx, lockName)
require.NoError(t, err)

// We'll now try to acquire the lock from a different goroutine.
secondCallerAcquired := false
go func() {
_, unlock, err := ts.LockName(context.Background(), lockName, action)
defer unlock(&err)
require.NoError(t, err)
secondCallerAcquired = true
}()

// Wait for some time and ensure that the second attempt at acquiring the lock
// is blocked.
time.Sleep(100 * time.Millisecond)
require.False(t, secondCallerAcquired)

// Unlock the name.
unlock(&err)
// Check that we no longer have the named lock.
err = topo.CheckNameLocked(ctx, lockName)
require.ErrorContains(t, err, fmt.Sprintf("named %s is not locked (no lockInfo in map)", lockName))

// Wait to see that the second goroutine WAS now able to acquire the named lock.
topoutils.WaitForBoolValue(t, &secondCallerAcquired, true)
}

func TestTopoDownServingQuery(t *testing.T) {
ctx := context.Background()
vtParams := mysql.ConnParams{
Expand Down
11 changes: 0 additions & 11 deletions go/vt/topo/test/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ func checkLock(t *testing.T, ctx context.Context, ts *topo.Server) {
t.Log("=== checkLockTimeout")
checkLockTimeout(ctx, t, conn)

t.Log("=== checkLockMissing")
checkLockMissing(ctx, t, conn)

t.Log("=== checkLockUnblocks")
checkLockUnblocks(ctx, t, conn)
}
Expand Down Expand Up @@ -121,14 +118,6 @@ func checkLockTimeout(ctx context.Context, t *testing.T, conn topo.Conn) {
}
}

// checkLockMissing makes sure we can't lock a non-existing directory.
func checkLockMissing(ctx context.Context, t *testing.T, conn topo.Conn) {
keyspacePath := path.Join(topo.KeyspacesPath, "test_keyspace_666")
if _, err := conn.Lock(ctx, keyspacePath, "missing"); err == nil {
t.Fatalf("Lock(test_keyspace_666) worked for non-existing keyspace")
}
}

// checkLockUnblocks makes sure that a routine waiting on a lock
// is unblocked when another routine frees the lock
func checkLockUnblocks(ctx context.Context, t *testing.T, conn topo.Conn) {
Expand Down
11 changes: 0 additions & 11 deletions go/vt/topo/test/trylock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,6 @@ func checkTryLock(t *testing.T, ctx context.Context, ts *topo.Server) {
t.Log("=== checkTryLockTimeout")
checkTryLockTimeout(ctx, t, conn)

t.Log("=== checkTryLockMissing")
checkTryLockMissing(ctx, t, conn)

t.Log("=== checkTryLockUnblocks")
checkTryLockUnblocks(ctx, t, conn)
}
Expand Down Expand Up @@ -142,14 +139,6 @@ func checkTryLockTimeout(ctx context.Context, t *testing.T, conn topo.Conn) {
}
}

// checkTryLockMissing makes sure we can't lock a non-existing directory.
func checkTryLockMissing(ctx context.Context, t *testing.T, conn topo.Conn) {
keyspacePath := path.Join(topo.KeyspacesPath, "test_keyspace_666")
if _, err := conn.TryLock(ctx, keyspacePath, "missing"); err == nil {
require.Fail(t, "TryLock(test_keyspace_666) worked for non-existing keyspace")
}
}

// unlike 'checkLockUnblocks', checkTryLockUnblocks will not block on other client but instead
// keep retrying until it gets the lock.
func checkTryLockUnblocks(ctx context.Context, t *testing.T, conn topo.Conn) {
Expand Down
4 changes: 2 additions & 2 deletions go/vt/topo/zk2topo/lock.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ func (zs *Server) lock(ctx context.Context, dirPath, contents string) (topo.Lock
// sequential nodes, they are created as children, not siblings.
locksDir := path.Join(zs.root, dirPath, locksPath) + "/"

// Create the locks path, possibly creating the parent.
nodePath, err := CreateRecursive(ctx, zs.conn, locksDir, []byte(contents), zk.FlagSequence|zk.FlagEphemeral, zk.WorldACL(PermFile), 1)
// Create the lock path, creating the parents as needed.
nodePath, err := CreateRecursive(ctx, zs.conn, locksDir, []byte(contents), zk.FlagSequence|zk.FlagEphemeral, zk.WorldACL(PermFile), -1)
if err != nil {
return nil, convertError(err, locksDir)
}
Expand Down
6 changes: 2 additions & 4 deletions go/vt/topo/zk2topo/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,18 @@ limitations under the License.
package zk2topo

import (
"context"
"fmt"
"path"
"sort"
"strings"
"sync"

"context"

"github.com/z-division/go-zookeeper/zk"

"vitess.io/vitess/go/fileutil"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/fileutil"
)

// CreateRecursive is a helper function on top of Create. It will
Expand Down
Loading