diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go index 0b8aa3a9ac4..8453ae7b011 100644 --- a/go/vt/vttablet/endtoend/config_test.go +++ b/go/vt/vttablet/endtoend/config_test.go @@ -191,7 +191,7 @@ func testQueryPlanCache(t *testing.T, cachedPlanSize, cachePlanSize2 int) { t.Helper() //sleep to avoid race between SchemaChanged event clearing out the plans cache which breaks this test - time.Sleep(1 * time.Second) + framework.Server.WaitForSchemaReset(2 * time.Second) defer framework.Server.SetQueryPlanCacheCap(framework.Server.QueryPlanCacheCap()) framework.Server.SetQueryPlanCacheCap(cachedPlanSize) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 6c94dd70c23..132267e7b52 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -399,6 +399,32 @@ func (tsv *TabletServer) ReloadSchema(ctx context.Context) error { return tsv.se.Reload(ctx) } +// WaitForSchemaReset blocks the TabletServer until there's been at least `timeout` duration without +// any schema changes. This is useful for tests that need to wait for all the currently existing schema +// changes to finish being applied. +func (tsv *TabletServer) WaitForSchemaReset(timeout time.Duration) { + onSchemaChange := make(chan struct{}, 1) + tsv.se.RegisterNotifier("_tsv_wait", func(_ map[string]*schema.Table, _, _, _ []string) { + onSchemaChange <- struct{}{} + }) + defer tsv.se.UnregisterNotifier("_tsv_wait") + + after := time.NewTimer(timeout) + defer after.Stop() + + for { + select { + case <-after.C: + return + case <-onSchemaChange: + if !after.Stop() { + <-after.C + } + after.Reset(timeout) + } + } +} + // ClearQueryPlanCache clears internal query plan cache func (tsv *TabletServer) ClearQueryPlanCache() { // We should ideally bracket this with start & endErequest,