Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1252,9 +1252,9 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
// be in another unsharded keyspace.
smMu := sync.Mutex{}
tableCount := len(sequencesByBackingTable)
tablesFound := 0 // Used to short circuit the search
searchCompleted := make(chan struct{}) // The search has completed
searchKeyspace := func(sctx context.Context, keyspace string) error { // The function used to search each keyspace
tablesFound := 0 // Used to short circuit the search
// Define the function used to search each keyspace.
searchKeyspace := func(sctx context.Context, done chan struct{}, keyspace string) error {
kvs, kerr := ts.TopoServer().GetVSchema(sctx, keyspace)
if kerr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for keyspace %s: %v",
Expand All @@ -1267,7 +1267,7 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
select {
case <-sctx.Done():
return sctx.Err()
case <-searchCompleted:
case <-done: // We've found everything we need in other goroutines
return nil
default:
}
Expand All @@ -1279,13 +1279,16 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
sm != nil && tableName == sm.backingTableName {
tablesFound++ // This is also protected by the mutex
sm.backingTableKeyspace = keyspace
// Set the default keyspace name. We will later check to
// see if the tablet we send requests to is using a dbname
// override and use that if it is.
sm.backingTableDBName = "vt_" + keyspace
if tablesFound == tableCount { // Short circuit the search
select {
case <-searchCompleted: // It's already been closed
case <-done: // It's already been closed
return true
default:
close(searchCompleted) // Mark the search as completed
close(done) // Mark the search as completed
return true
}
}
Expand All @@ -1302,10 +1305,11 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get keyspaces: %v", err)
}
searchGroup, gctx := errgroup.WithContext(ctx)
searchCompleted := make(chan struct{})
for _, keyspace := range keyspaces {
keyspace := keyspace // https://golang.org/doc/faq#closures_and_goroutines
searchGroup.Go(func() error {
return searchKeyspace(gctx, keyspace)
return searchKeyspace(gctx, searchCompleted, keyspace)
})
}
if err := searchGroup.Wait(); err != nil {
Expand Down Expand Up @@ -1355,6 +1359,9 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa
}
sm.backingTableName = tableName
sm.backingTableKeyspace = keyspace
// Set the default keyspace name. We will later check to
// see if the tablet we send requests to is using a dbname
// override and use that if it is.
sm.backingTableDBName = "vt_" + keyspace
} else {
allFullyQualified = false
Expand Down
21 changes: 14 additions & 7 deletions go/vt/wrangler/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1974,9 +1974,9 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
// be in another unsharded keyspace.
smMu := sync.Mutex{}
tableCount := len(sequencesByBackingTable)
tablesFound := 0 // Used to short circuit the search
searchCompleted := make(chan struct{}) // The search has completed
searchKeyspace := func(sctx context.Context, keyspace string) error { // The function used to search each keyspace
tablesFound := 0 // Used to short circuit the search
// Define the function used to search each keyspace.
searchKeyspace := func(sctx context.Context, done chan struct{}, keyspace string) error {
kvs, kerr := ts.TopoServer().GetVSchema(sctx, keyspace)
if kerr != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for keyspace %s: %v",
Expand All @@ -1989,7 +1989,7 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
select {
case <-sctx.Done():
return sctx.Err()
case <-searchCompleted:
case <-done: // We've found everything we need in other goroutines
return nil
default:
}
Expand All @@ -2001,13 +2001,16 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
sm != nil && tableName == sm.backingTableName {
tablesFound++ // This is also protected by the mutex
sm.backingTableKeyspace = keyspace
// Set the default keyspace name. We will later check to
// see if the tablet we send requests to is using a dbname
// override and use that if it is.
sm.backingTableDBName = "vt_" + keyspace
if tablesFound == tableCount { // Short circuit the search
select {
case <-searchCompleted: // It's already been closed
case <-done: // It's already been closed
return true
default:
close(searchCompleted) // Mark the search as completed
close(done) // Mark the search as completed
return true
}
}
Expand All @@ -2024,10 +2027,11 @@ func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[s
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get keyspaces: %v", err)
}
searchGroup, gctx := errgroup.WithContext(ctx)
searchCompleted := make(chan struct{})
for _, keyspace := range keyspaces {
keyspace := keyspace // https://golang.org/doc/faq#closures_and_goroutines
searchGroup.Go(func() error {
return searchKeyspace(gctx, keyspace)
return searchKeyspace(gctx, searchCompleted, keyspace)
})
}
if err := searchGroup.Wait(); err != nil {
Expand Down Expand Up @@ -2077,6 +2081,9 @@ func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspa
}
sm.backingTableName = tableName
sm.backingTableKeyspace = keyspace
// Set the default keyspace name. We will later check to
// see if the tablet we send requests to is using a dbname
// override and use that if it is.
sm.backingTableDBName = "vt_" + keyspace
} else {
allFullyQualified = false
Expand Down