From 74b147e69db09d07a44b87bc00f68eaf85e0450a Mon Sep 17 00:00:00 2001 From: Vicent Marti Date: Mon, 15 Mar 2021 18:07:36 +0100 Subject: [PATCH] tabletserver: more resilient wait for schema changes Signed-off-by: Vicent Marti --- go/vt/vttablet/endtoend/config_test.go | 2 +- go/vt/vttablet/tabletserver/tabletserver.go | 26 +++++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/endtoend/config_test.go b/go/vt/vttablet/endtoend/config_test.go index 0b8aa3a9ac4..8453ae7b011 100644 --- a/go/vt/vttablet/endtoend/config_test.go +++ b/go/vt/vttablet/endtoend/config_test.go @@ -191,7 +191,7 @@ func testQueryPlanCache(t *testing.T, cachedPlanSize, cachePlanSize2 int) { t.Helper() //sleep to avoid race between SchemaChanged event clearing out the plans cache which breaks this test - time.Sleep(1 * time.Second) + framework.Server.WaitForSchemaReset(2 * time.Second) defer framework.Server.SetQueryPlanCacheCap(framework.Server.QueryPlanCacheCap()) framework.Server.SetQueryPlanCacheCap(cachedPlanSize) diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 6c94dd70c23..132267e7b52 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -399,6 +399,32 @@ func (tsv *TabletServer) ReloadSchema(ctx context.Context) error { return tsv.se.Reload(ctx) } +// WaitForSchemaReset blocks the TabletServer until there's been at least `timeout` duration without +// any schema changes. This is useful for tests that need to wait for all the currently existing schema +// changes to finish being applied. +func (tsv *TabletServer) WaitForSchemaReset(timeout time.Duration) { + onSchemaChange := make(chan struct{}, 1) + tsv.se.RegisterNotifier("_tsv_wait", func(_ map[string]*schema.Table, _, _, _ []string) { + onSchemaChange <- struct{}{} + }) + defer tsv.se.UnregisterNotifier("_tsv_wait") + + after := time.NewTimer(timeout) + defer after.Stop() + + for { + select { + case <-after.C: + return + case <-onSchemaChange: + if !after.Stop() { + <-after.C + } + after.Reset(timeout) + } + } +} + // ClearQueryPlanCache clears internal query plan cache func (tsv *TabletServer) ClearQueryPlanCache() { // We should ideally bracket this with start & endErequest,