diff --git a/go/sync2/semaphore.go b/go/sync2/semaphore.go index 536399824cc..c0d1fd2ce48 100644 --- a/go/sync2/semaphore.go +++ b/go/sync2/semaphore.go @@ -21,6 +21,7 @@ package sync2 // cases, you just want a familiar API. import ( + "context" "time" ) @@ -61,6 +62,17 @@ func (sem *Semaphore) Acquire() bool { } } +// AcquireContext returns true on successful acquisition, and +// false on context expiry. Timeout is ignored. +func (sem *Semaphore) AcquireContext(ctx context.Context) bool { + select { + case <-sem.slots: + return true + case <-ctx.Done(): + return false + } +} + // TryAcquire acquires a semaphore if it's immediately available. // It returns false otherwise. func (sem *Semaphore) TryAcquire() bool { diff --git a/go/sync2/semaphore_flaky_test.go b/go/sync2/semaphore_test.go similarity index 53% rename from go/sync2/semaphore_flaky_test.go rename to go/sync2/semaphore_test.go index 60b287df216..aa4f6f5cee1 100644 --- a/go/sync2/semaphore_flaky_test.go +++ b/go/sync2/semaphore_test.go @@ -17,8 +17,11 @@ limitations under the License. package sync2 import ( + "context" "testing" "time" + + "github.com/stretchr/testify/assert" ) func TestSemaNoTimeout(t *testing.T) { @@ -26,42 +29,51 @@ func TestSemaNoTimeout(t *testing.T) { s.Acquire() released := false go func() { - time.Sleep(10 * time.Millisecond) released = true s.Release() }() s.Acquire() - if !released { - t.Errorf("release: false, want true") - } + assert.True(t, released) } func TestSemaTimeout(t *testing.T) { - s := NewSemaphore(1, 5*time.Millisecond) + s := NewSemaphore(1, 1*time.Millisecond) + s.Acquire() + release := make(chan struct{}) + released := make(chan struct{}) + go func() { + <-release + s.Release() + released <- struct{}{} + }() + assert.False(t, s.Acquire()) + release <- struct{}{} + <-released + assert.True(t, s.Acquire()) +} + +func TestSemaAcquireContext(t *testing.T) { + s := NewSemaphore(1, 0) s.Acquire() + release := make(chan struct{}) + released := make(chan struct{}) go func() { - time.Sleep(10 * time.Millisecond) + <-release s.Release() + released <- struct{}{} }() - if s.Acquire() { - t.Errorf("Acquire: true, want false") - } - time.Sleep(10 * time.Millisecond) - if !s.Acquire() { - t.Errorf("Acquire: false, want true") - } + ctx, cancel := context.WithCancel(context.Background()) + cancel() + assert.False(t, s.AcquireContext(ctx)) + release <- struct{}{} + <-released + assert.True(t, s.AcquireContext(context.Background())) } func TestSemaTryAcquire(t *testing.T) { s := NewSemaphore(1, 0) - if !s.TryAcquire() { - t.Errorf("TryAcquire: false, want true") - } - if s.TryAcquire() { - t.Errorf("TryAcquire: true, want false") - } + assert.True(t, s.TryAcquire()) + assert.False(t, s.TryAcquire()) s.Release() - if !s.TryAcquire() { - t.Errorf("TryAcquire: false, want true") - } + assert.True(t, s.TryAcquire()) } diff --git a/go/vt/vttablet/tabletmanager/replmanager.go b/go/vt/vttablet/tabletmanager/replmanager.go index c80f3c3de8f..a7790e31a4c 100644 --- a/go/vt/vttablet/tabletmanager/replmanager.go +++ b/go/vt/vttablet/tabletmanager/replmanager.go @@ -91,8 +91,8 @@ func (rm *replManager) SetTabletType(tabletType topodatapb.TabletType) { func (rm *replManager) check() { // We need to obtain the action lock if we're going to fix - // replication - if err := rm.tm.lock(rm.ctx); err != nil { + // replication, but only if the lock is available to take. + if !rm.tm.tryLock() { return } defer rm.tm.unlock() diff --git a/go/vt/vttablet/tabletmanager/rpc_server.go b/go/vt/vttablet/tabletmanager/rpc_server.go index d451fc778c9..9420ca9824c 100644 --- a/go/vt/vttablet/tabletmanager/rpc_server.go +++ b/go/vt/vttablet/tabletmanager/rpc_server.go @@ -34,28 +34,23 @@ import ( // Utility functions for RPC service // -// lock is used at the beginning of an RPC call, to lock the -// action mutex. It returns ctx.Err() if <-ctx.Done() after the lock. +// lock is used at the beginning of an RPC call, to acquire the +// action semaphore. It returns ctx.Err() if the context expires. func (tm *TabletManager) lock(ctx context.Context) error { - tm.actionMutex.Lock() - tm.actionMutexLocked = true - - // After we take the lock (which could take a long time), we - // check the client is still here. - select { - case <-ctx.Done(): - tm.actionMutexLocked = false - tm.actionMutex.Unlock() - return ctx.Err() - default: + if tm.actionSema.AcquireContext(ctx) { return nil } + return ctx.Err() +} + +// tryLock will return immediately, true on success and false on failure. +func (tm *TabletManager) tryLock() bool { + return tm.actionSema.TryAcquire() } // unlock is the symmetrical action to lock. func (tm *TabletManager) unlock() { - tm.actionMutexLocked = false - tm.actionMutex.Unlock() + tm.actionSema.Release() } // HandleRPCPanic is part of the RPCTM interface. diff --git a/go/vt/vttablet/tabletmanager/tm_init.go b/go/vt/vttablet/tabletmanager/tm_init.go index 5bfae9aa2a7..6b247604ba5 100644 --- a/go/vt/vttablet/tabletmanager/tm_init.go +++ b/go/vt/vttablet/tabletmanager/tm_init.go @@ -25,10 +25,10 @@ topology server. Only 'vtctl DeleteTablet' should be run by other processes, everything else should ask the tablet server to make the change. -Most RPC calls lock the actionMutex, except the easy read-only ones. +Most RPC calls obtain the actionSema, except the easy read-only ones. RPC calls that change the tablet record will also call updateState. -See rpc_server.go for all cases, and which actions take the actionMutex, +See rpc_server.go for all cases, and which actions take the actionSema, and which run changeCallback. */ package tabletmanager @@ -42,6 +42,7 @@ import ( "time" "vitess.io/vitess/go/flagutil" + "vitess.io/vitess/go/sync2" "vitess.io/vitess/go/vt/vterrors" "golang.org/x/net/context" @@ -140,17 +141,11 @@ type TabletManager struct { // when we transition back from something like MASTER. baseTabletType topodatapb.TabletType - // actionMutex is there to run only one action at a time. - // This mutex can be held for long periods of time (hours), - // like in the case of a restore. This mutex must be obtained + // actionSema is there to run only one action at a time. + // This semaphore can be held for long periods of time (hours), + // like in the case of a restore. This semaphore must be obtained // first before other mutexes. - actionMutex sync.Mutex - - // actionMutexLocked is set to true after we acquire actionMutex, - // and reset to false when we release it. - // It is meant as a sanity check to make sure the methods that need - // to have the actionMutex have it. - actionMutexLocked bool + actionSema *sync2.Semaphore // orc is an optional client for Orchestrator HTTP API calls. // If this is nil, those calls will be skipped. @@ -238,6 +233,7 @@ func (tm *TabletManager) Start(tablet *topodatapb.Tablet, healthCheckInterval ti tm.replManager = newReplManager(tm.BatchCtx, tm, healthCheckInterval) tm.tabletAlias = tablet.Alias tm.tmState = newTMState(tm, tablet) + tm.actionSema = sync2.NewSemaphore(1, 0) demoteType, err := topoproto.ParseTabletType(*demoteMasterType) if err != nil { diff --git a/go/vt/vttablet/tabletmanager/tm_state.go b/go/vt/vttablet/tabletmanager/tm_state.go index a18fc118853..86ca17fced3 100644 --- a/go/vt/vttablet/tabletmanager/tm_state.go +++ b/go/vt/vttablet/tabletmanager/tm_state.go @@ -49,7 +49,7 @@ type tmState struct { // while changing the state of the system to match these values. // This can be held for many seconds while tmState connects to // external components to change their state. - // Obtaining tm.actionMutex before calling a tmState function is + // Obtaining tm.actionSema before calling a tmState function is // not required. // Because mu can be held for long, we publish the current state // of these variables into displayState, which can be accessed