From 352322c360e56174acd123de761c683041bf59f5 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 4 Jun 2020 12:14:58 +0200 Subject: [PATCH] Schema Engine Notifier: fix race between schema change broadcasts and tabletserver shutdown Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletserver/schema/engine.go | 61 +++++++++++++------- 1 file changed, 41 insertions(+), 20 deletions(-) diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index ec99ab48293..b304fdb6541 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -59,6 +59,7 @@ type Engine struct { reloadTime time.Duration //the position at which the schema was last loaded. it is only used in conjunction with ReloadAt reloadAtPos mysql.Position + notifierMu sync.Mutex notifiers map[string]notifier // The following fields have their own synchronization @@ -67,6 +68,18 @@ type Engine struct { ticks *timer.Timer } +// Lock acquires the SE mutex with optional logging (useful for debugging deadlocks) +func (se *Engine) Lock(msg string) { + log.V(2).Infof("SE: acquiring Lock %s", msg) + se.mu.Lock() +} + +// Unlock releases the SE mutex with optional logging (useful for debugging deadlocks) +func (se *Engine) Unlock(msg string) { + log.V(2).Infof("SE: releasing Lock %s", msg) + se.mu.Unlock() +} + // NewEngine creates a new Engine. func NewEngine(env tabletenv.Env) *Engine { reloadTime := time.Duration(env.Config().SchemaReloadIntervalSeconds * 1e9) @@ -106,8 +119,8 @@ func (se *Engine) InitDBConfig(cp dbconfigs.Connector) { // Open initializes the Engine. Calling Open on an already // open engine is a no-op. func (se *Engine) Open() error { - se.mu.Lock() - defer se.mu.Unlock() + se.Lock("Open") + defer se.Unlock("Open") if se.isOpen { return nil } @@ -134,8 +147,8 @@ func (se *Engine) Open() error { // IsOpen checks if engine is open func (se *Engine) IsOpen() bool { - se.mu.Lock() - defer se.mu.Unlock() + se.Lock("IsOpen") + defer se.Unlock("IsOpen") return se.isOpen } @@ -147,8 +160,8 @@ func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) { // Close shuts down Engine and is idempotent. // It can be re-opened after Close. func (se *Engine) Close() { - se.mu.Lock() - defer se.mu.Unlock() + se.Lock("Close") + defer se.Unlock("Close") if !se.isOpen { return } @@ -164,8 +177,8 @@ func (se *Engine) Close() { // they don't get accidentally reused after losing mastership. func (se *Engine) MakeNonMaster() { // This function is tested through endtoend test. - se.mu.Lock() - defer se.mu.Unlock() + se.Lock("MakeNonMaster") + defer se.Unlock("MakeNonMaster") for _, t := range se.tables { if t.SequenceInfo != nil { t.SequenceInfo.Lock() @@ -187,8 +200,8 @@ func (se *Engine) Reload(ctx context.Context) error { // It maintains the position at which the schema was reloaded and if the same position is provided // (say by multiple vstreams) it returns the cached schema. In case of a newer or empty pos it always reloads the schema func (se *Engine) ReloadAt(ctx context.Context, pos mysql.Position) error { - se.mu.Lock() - defer se.mu.Unlock() + se.Lock("ReloadAt") + defer se.Unlock("ReloadAt") if !se.isOpen { log.Warning("Schema reload called for an engine that is not yet open") return nil @@ -324,12 +337,13 @@ func (se *Engine) populatePrimaryKeys(ctx context.Context, conn *connpool.DBConn // function must not change the map or its contents. The only exception // is the sequence table where the values can be changed using the lock. func (se *Engine) RegisterNotifier(name string, f notifier) { - se.mu.Lock() - defer se.mu.Unlock() if !se.isOpen { return } + se.notifierMu.Lock() + defer se.notifierMu.Unlock() + se.notifiers[name] = f var created []string for tableName := range se.tables { @@ -340,17 +354,24 @@ func (se *Engine) RegisterNotifier(name string, f notifier) { // UnregisterNotifier unregisters the notifier function. func (se *Engine) UnregisterNotifier(name string) { - se.mu.Lock() - defer se.mu.Unlock() if !se.isOpen { return } + se.notifierMu.Lock() + defer se.notifierMu.Unlock() + delete(se.notifiers, name) } // broadcast must be called while holding a lock on se.mu. func (se *Engine) broadcast(created, altered, dropped []string) { + if !se.isOpen { + return + } + + se.notifierMu.Lock() + defer se.notifierMu.Unlock() s := make(map[string]*Table, len(se.tables)) for k, v := range se.tables { s[k] = v @@ -362,16 +383,16 @@ func (se *Engine) broadcast(created, altered, dropped []string) { // GetTable returns the info for a table. func (se *Engine) GetTable(tableName sqlparser.TableIdent) *Table { - se.mu.Lock() - defer se.mu.Unlock() + se.Lock("GetTable") + defer se.Unlock("GetTable") return se.tables[tableName.String()] } // GetSchema returns the current The Tables are a shared // data structure and must be treated as read-only. func (se *Engine) GetSchema() map[string]*Table { - se.mu.Lock() - defer se.mu.Unlock() + se.Lock("GetSchema") + defer se.Unlock("GetSchema") tables := make(map[string]*Table, len(se.tables)) for k, v := range se.tables { tables[k] = v @@ -422,7 +443,7 @@ func NewEngineForTests() *Engine { // SetTableForTests puts a Table in the map directly. func (se *Engine) SetTableForTests(table *Table) { - se.mu.Lock() - defer se.mu.Unlock() + se.Lock("SetTableForTests") + defer se.Unlock("SetTableForTests") se.tables[table.Name.String()] = table }