From db9075c623fcf78fec8331ac4c4a68c2bc857ced Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Thu, 18 Jun 2020 23:36:39 -0700 Subject: [PATCH 1/3] vttablet: simplified tracker Signed-off-by: Sugu Sougoumarane --- go/vt/vttablet/endtoend/vstreamer_test.go | 4 +- .../tabletserver/replication_watcher.go | 29 ++-- .../tabletserver/replication_watcher_test.go | 161 ------------------ go/vt/vttablet/tabletserver/schema/tracker.go | 101 ++++++++--- .../tabletserver/schema/tracker_test.go | 72 ++++++-- go/vt/vttablet/tabletserver/tabletserver.go | 17 +- .../tabletserver/vstreamer/vstreamer_test.go | 16 -- 7 files changed, 149 insertions(+), 251 deletions(-) delete mode 100644 go/vt/vttablet/tabletserver/replication_watcher_test.go diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index dcd961cd899..43839122400 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -192,7 +192,7 @@ func TestSchemaVersioning(t *testing.T) { select { case eventCh <- evs: case <-ctx.Done(): - t.Fatal("Context Done() in send") + return nil } return nil } @@ -379,6 +379,7 @@ func runCases(ctx context.Context, t *testing.T, tests []test, eventCh chan []*b query := test.query client.Execute(query, nil) if len(test.output) > 0 { + t.Logf("expecting: %#v", test.output) expectLogs(ctx, t, query, eventCh, test.output) } if strings.HasPrefix(query, "create") || strings.HasPrefix(query, "alter") || strings.HasPrefix(query, "drop") { @@ -433,6 +434,7 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan [] for i, want := range output { // CurrentTime is not testable. evs[i].CurrentTime = 0 + t.Logf("checking: %v: %v: %v", i, want, evs[i]) switch want { case "begin": if evs[i].Type != binlogdatapb.VEventType_BEGIN { diff --git a/go/vt/vttablet/tabletserver/replication_watcher.go b/go/vt/vttablet/tabletserver/replication_watcher.go index 178bdeab8d6..4f425da122b 100644 --- a/go/vt/vttablet/tabletserver/replication_watcher.go +++ b/go/vt/vttablet/tabletserver/replication_watcher.go @@ -17,10 +17,10 @@ limitations under the License. package tabletserver import ( + "sync" "time" "golang.org/x/net/context" - "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -41,18 +41,17 @@ type ReplicationWatcher struct { env tabletenv.Env watchReplication bool vs VStreamer - subscriber schema.Subscriber cancel context.CancelFunc + wg sync.WaitGroup } // NewReplicationWatcher creates a new ReplicationWatcher. -func NewReplicationWatcher(env tabletenv.Env, vs VStreamer, config *tabletenv.TabletConfig, schemaTracker schema.Subscriber) *ReplicationWatcher { +func NewReplicationWatcher(env tabletenv.Env, vs VStreamer, config *tabletenv.TabletConfig) *ReplicationWatcher { return &ReplicationWatcher{ env: env, vs: vs, watchReplication: config.WatchReplication, - subscriber: schemaTracker, } } @@ -64,7 +63,8 @@ func (rpw *ReplicationWatcher) Open() { ctx, cancel := context.WithCancel(tabletenv.LocalContext()) rpw.cancel = cancel - go rpw.Process(ctx) + rpw.wg.Add(1) + go rpw.process(ctx) } // Close stops the ReplicationWatcher service. @@ -74,11 +74,12 @@ func (rpw *ReplicationWatcher) Close() { } rpw.cancel() rpw.cancel = nil + rpw.wg.Wait() } -// Process processes the replication stream. -func (rpw *ReplicationWatcher) Process(ctx context.Context) { +func (rpw *ReplicationWatcher) process(ctx context.Context) { defer rpw.env.LogError() + defer rpw.wg.Wait() filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ @@ -86,19 +87,9 @@ func (rpw *ReplicationWatcher) Process(ctx context.Context) { }}, } - var gtid string for { - // The tracker will reload the schema and save it into _vt.schema_tracking when the vstream encounters a DDL. + // VStreamer will reload the schema when it encounters a DDL. err := rpw.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error { - for _, event := range events { - if event.Type == binlogdatapb.VEventType_GTID { - gtid = event.Gtid - } - if event.Type == binlogdatapb.VEventType_DDL { - log.Infof("Calling schema updated for %s %s", gtid, event.Ddl) - rpw.subscriber.SchemaUpdated(gtid, event.Ddl, event.Timestamp) - } - } return nil }) select { @@ -106,7 +97,7 @@ func (rpw *ReplicationWatcher) Process(ctx context.Context) { return case <-time.After(5 * time.Second): } - log.Infof("VStream ended: %v, retrying in 5 seconds", err) + log.Infof("ReplicatinWatcher VStream ended: %v, retrying in 5 seconds", err) time.Sleep(5 * time.Second) } } diff --git a/go/vt/vttablet/tabletserver/replication_watcher_test.go b/go/vt/vttablet/tabletserver/replication_watcher_test.go deleted file mode 100644 index 67637b9cf34..00000000000 --- a/go/vt/vttablet/tabletserver/replication_watcher_test.go +++ /dev/null @@ -1,161 +0,0 @@ -/* -Copyright 2019 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package tabletserver - -import ( - "testing" - "time" - - "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" - - "github.com/stretchr/testify/require" - "golang.org/x/net/context" - "vitess.io/vitess/go/test/utils" - "vitess.io/vitess/go/vt/dbconfigs" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" -) - -func (t *mockSubscriber) SchemaUpdated(gtid string, ddl string, timestamp int64) error { - t.gtids = append(t.gtids, gtid) - t.ddls = append(t.ddls, ddl) - t.timestamps = append(t.timestamps, timestamp) - return nil -} - -var _ schema.Subscriber = (*mockSubscriber)(nil) -var _ VStreamer = (*fakeVstreamer)(nil) -var _ tabletenv.Env = (*fakeEnv)(nil) - -var env = &fakeEnv{} - -func TestReplicationWatcher(t *testing.T) { - testCases := []struct { - name string - input [][]*binlogdatapb.VEvent - expected []string - }{ - { - name: "empty", - input: [][]*binlogdatapb.VEvent{{}}, - expected: nil, - }, { - name: "single create table", - input: [][]*binlogdatapb.VEvent{{{ - Type: binlogdatapb.VEventType_DDL, - Timestamp: 643, - CurrentTime: 943, - Gtid: "gtid", - Ddl: "create table", - }}}, - expected: []string{"create table"}, - }, { - name: "mixed load", - input: [][]*binlogdatapb.VEvent{{{ - Type: binlogdatapb.VEventType_DDL, - Timestamp: 643, - CurrentTime: 943, - Gtid: "gtid", - Ddl: "create table", - }, { - Type: binlogdatapb.VEventType_INSERT, - Timestamp: 644, - CurrentTime: 944, - Gtid: "gtid2", - Ddl: "insert", - }, { - Type: binlogdatapb.VEventType_DDL, - Timestamp: 645, - CurrentTime: 945, - Gtid: "gtid3", - Ddl: "alter table", - }}}, - expected: []string{"create table", "alter table"}, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - subscriber := &mockSubscriber{} - streamer := &fakeVstreamer{ - events: testCase.input, - } - watcher := &ReplicationWatcher{ - env: env, - watchReplication: true, - vs: streamer, - subscriber: subscriber, - } - - // when - watcher.Open() - time.Sleep(1 * time.Millisecond) - watcher.Close() - - // then - require.True(t, streamer.called, "streamer never called") - utils.MustMatch(t, testCase.expected, subscriber.ddls, "didnt see ddls") - }) - } -} - -type mockSubscriber struct { - gtids []string - ddls []string - timestamps []int64 -} - -type fakeVstreamer struct { - called bool - events [][]*binlogdatapb.VEvent -} - -type fakeEnv struct{} - -func (f *fakeEnv) CheckMySQL() { -} - -func (f *fakeEnv) Config() *tabletenv.TabletConfig { - return nil -} - -func (f *fakeEnv) DBConfigs() *dbconfigs.DBConfigs { - return nil -} - -func (f *fakeEnv) Exporter() *servenv.Exporter { - return nil -} - -func (f *fakeEnv) Stats() *tabletenv.Stats { - return nil -} - -func (f *fakeEnv) LogError() { -} - -func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { - f.called = true - for _, events := range f.events { - err := send(events) - if err != nil { - return err - } - } - return nil -} diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 918ce42421e..10c66a4d5cf 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -20,11 +20,15 @@ import ( "bytes" "context" "fmt" + "sync" + "time" "github.com/gogo/protobuf/proto" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" ) const createSchemaTrackingTable = `CREATE TABLE IF NOT EXISTS _vt.schema_version ( @@ -36,53 +40,98 @@ const createSchemaTrackingTable = `CREATE TABLE IF NOT EXISTS _vt.schema_version PRIMARY KEY (id) ) ENGINE=InnoDB` -//Subscriber will get notified when the schema has been updated -type Subscriber interface { - SchemaUpdated(gtid string, ddl string, timestamp int64) error +// VStreamer defines the functions of VStreamer +// that the replicationWatcher needs. +type VStreamer interface { + Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error } -var _ Subscriber = (*Tracker)(nil) - -// Tracker implements Subscriber and persists versions into the ddb +// Tracker watches the replication and saves the latest schema into _vt.schema_version when a DDL is encountered. type Tracker struct { - engine *Engine - enabled bool + env tabletenv.Env + vs VStreamer + engine *Engine + cancel context.CancelFunc + wg sync.WaitGroup } // NewTracker creates a Tracker, needs an Open SchemaEngine (which implements the trackerEngine interface) -func NewTracker(engine *Engine) *Tracker { - return &Tracker{engine: engine} +func NewTracker(env tabletenv.Env, vs VStreamer, engine *Engine) *Tracker { + return &Tracker{ + env: env, + vs: vs, + engine: engine, + } } // Open enables the tracker functionality -func (t *Tracker) Open() { - t.enabled = true +func (tr *Tracker) Open() { + ctx, cancel := context.WithCancel(tabletenv.LocalContext()) + tr.cancel = cancel + tr.wg.Add(1) + go tr.process(ctx) } // Close disables the tracker functionality -func (t *Tracker) Close() { - t.enabled = false +func (tr *Tracker) Close() { + if tr.cancel == nil { + return + } + tr.cancel() + tr.cancel = nil + tr.wg.Wait() } -// SchemaUpdated is called by a vstream when it encounters a DDL -func (t *Tracker) SchemaUpdated(gtid string, ddl string, timestamp int64) error { - if !t.enabled { - log.Infof("Tracker not enabled, ignoring SchemaUpdated event") - return nil +func (tr *Tracker) process(ctx context.Context) { + defer tr.env.LogError() + defer tr.wg.Done() + + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/.*", + }}, } - log.Infof("Processing SchemaUpdated event for gtid %s, ddl %s", gtid, ddl) + + var gtid string + for { + err := tr.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error { + for _, event := range events { + if event.Type == binlogdatapb.VEventType_GTID { + gtid = event.Gtid + } + if event.Type == binlogdatapb.VEventType_DDL { + if err := tr.schemaUpdated(gtid, event.Ddl, event.Timestamp); err != nil { + tr.env.Stats().ErrorCounters.Add(vtrpcpb.Code_INTERNAL.String(), 1) + log.Errorf("Error updating schema: %v", err) + } + } + } + return nil + }) + select { + case <-ctx.Done(): + return + case <-time.After(5 * time.Second): + } + log.Infof("Tracker's vStream ended: %v, retrying in 5 seconds", err) + time.Sleep(5 * time.Second) + } +} + +func (tr *Tracker) schemaUpdated(gtid string, ddl string, timestamp int64) error { + log.Infof("Processing schemaUpdated event for gtid %s, ddl %s", gtid, ddl) if gtid == "" || ddl == "" { - return fmt.Errorf("got invalid gtid or ddl in SchemaUpdated") + return fmt.Errorf("got invalid gtid or ddl in schemaUpdated") } ctx := context.Background() // Engine will have reloaded the schema because vstream will reload it on a DDL - tables := t.engine.GetSchema() + tables := tr.engine.GetSchema() dbSchema := &binlogdatapb.MinimalSchema{ Tables: []*binlogdatapb.MinimalTable{}, } for name, table := range tables { - t := &binlogdatapb.MinimalTable{ + tr := &binlogdatapb.MinimalTable{ Name: name, Fields: table.Fields, } @@ -90,12 +139,12 @@ func (t *Tracker) SchemaUpdated(gtid string, ddl string, timestamp int64) error for _, pk := range table.PKColumns { pks = append(pks, int64(pk)) } - t.PKColumns = pks - dbSchema.Tables = append(dbSchema.Tables, t) + tr.PKColumns = pks + dbSchema.Tables = append(dbSchema.Tables, tr) } blob, _ := proto.Marshal(dbSchema) - conn, err := t.engine.GetConnection(ctx) + conn, err := tr.engine.GetConnection(ctx) if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index 08631f0720a..9472585ffb2 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -17,32 +17,76 @@ limitations under the License. package schema import ( - "fmt" - "regexp" "testing" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) func TestTracker(t *testing.T) { se, db, cancel := getTestSchemaEngine(t) defer cancel() - tracker := NewTracker(se) - tracker.Open() - gtid1 := "MySQL56/7b04699f-f5e9-11e9-bf88-9cb6d089e1c3:1-10" ddl1 := "create table tracker_test (id int)" - ts1 := int64(1427325876) - var query string - query = "CREATE TABLE IF NOT EXISTS _vt.schema_version.*" + query := "CREATE TABLE IF NOT EXISTS _vt.schema_version.*" db.AddQueryPattern(query, &sqltypes.Result{}) - query = fmt.Sprintf("insert into _vt.schema_version.*%s.*%s.*%d.*", gtid1, regexp.QuoteMeta(ddl1), ts1) - db.AddQueryPattern(query, &sqltypes.Result{}) + db.AddQueryPattern("insert into _vt.schema_version.*", &sqltypes.Result{}) + + vs := &fakeVstreamer{ + done: make(chan struct{}), + events: [][]*binlogdatapb.VEvent{{ + { + Type: binlogdatapb.VEventType_GTID, + Gtid: gtid1, + }, { + Type: binlogdatapb.VEventType_DDL, + Ddl: ddl1, + }, + { + Type: binlogdatapb.VEventType_GTID, + Gtid: "", + }, { + Type: binlogdatapb.VEventType_DDL, + Ddl: ddl1, + }, + { + Type: binlogdatapb.VEventType_GTID, + Gtid: gtid1, + }, { + Type: binlogdatapb.VEventType_DDL, + Ddl: "", + }, + }}, + } + initial := se.env.Stats().ErrorCounters.Counts()["INTERNAL"] + tracker := NewTracker(se.env, vs, se) + tracker.Open() + <-vs.done + cancel() + tracker.Close() + final := se.env.Stats().ErrorCounters.Counts()["INTERNAL"] + assert.Equal(t, initial+2, final) +} + +var _ VStreamer = (*fakeVstreamer)(nil) + +type fakeVstreamer struct { + done chan struct{} + events [][]*binlogdatapb.VEvent +} - require.NoError(t, tracker.SchemaUpdated(gtid1, ddl1, ts1)) - require.Error(t, tracker.SchemaUpdated("", ddl1, ts1)) - require.Error(t, tracker.SchemaUpdated(gtid1, "", ts1)) +func (f *fakeVstreamer) Stream(ctx context.Context, startPos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, send func([]*binlogdatapb.VEvent) error) error { + for _, events := range f.events { + err := send(events) + if err != nil { + return err + } + } + close(f.done) + <-ctx.Done() + return nil } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 34246108c87..9e2b0f1ca3e 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -240,8 +240,8 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsv.txThrottler = txthrottler.NewTxThrottler(tsv.config, topoServer) tsOnce.Do(func() { srvTopoServer = srvtopo.NewResilientServer(topoServer, "TabletSrvTopo") }) tsv.vstreamer = vstreamer.NewEngine(tsv, srvTopoServer, tsv.se, tsv.sh) - tsv.tracker = schema.NewTracker(tsv.se) - tsv.watcher = NewReplicationWatcher(tsv, tsv.vstreamer, tsv.config, tsv.tracker) + tsv.tracker = schema.NewTracker(tsv, tsv.vstreamer, tsv.se) + tsv.watcher = NewReplicationWatcher(tsv, tsv.vstreamer, tsv.config) tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { tsv.mu.Lock() @@ -267,19 +267,12 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to // StartTracker starts a new replication watcher // Only to be used for testing func (tsv *TabletServer) StartTracker() { - tsv.config.TrackSchemaVersions = true - tsv.config.WatchReplication = true tsv.tracker.Open() - //need to close and reopen watcher since it is already opened in the tsv init and is idempotent ... - tsv.watcher.Close() - tsv.watcher.watchReplication = true - tsv.watcher.Open() } // StopTracker turns the watcher off // Only to be used for testing func (tsv *TabletServer) StopTracker() { - tsv.config.TrackSchemaVersions = false tsv.tracker.Close() } @@ -572,12 +565,8 @@ func (tsv *TabletServer) serveNewType() (err error) { tsv.messager.Open() tsv.hr.Close() tsv.hw.Open() - log.Info("Opening tracker, trackschemaversions is %t", tsv.config.TrackSchemaVersions) tsv.tracker.Open() - if tsv.config.TrackSchemaVersions { - log.Info("Starting watcher") - tsv.watcher.Open() - } + tsv.watcher.Close() } else { tsv.txController.AcceptReadOnly() tsv.messager.Close() diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index 83ecf289db8..8431469a786 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -26,8 +26,6 @@ import ( "testing" "time" - "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" - "vitess.io/vitess/go/vt/log" "github.com/stretchr/testify/assert" @@ -1631,8 +1629,6 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [ } } -var lastPos string - func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*binlogdatapb.TableLastPK) <-chan []*binlogdatapb.VEvent { if position == "" { position = masterPosition(t) @@ -1661,18 +1657,6 @@ func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogda } } return engine.Stream(ctx, pos, tablePKs, filter, func(evs []*binlogdatapb.VEvent) error { - if t.Name() == "TestVersion" { // emulate tracker only for the version test - for _, ev := range evs { - log.Infof("Original stream: %s event found %v", ev.Type, ev) - if ev.Type == binlogdatapb.VEventType_GTID { - lastPos = ev.Gtid - } - if ev.Type == binlogdatapb.VEventType_DDL { - schemaTracker := schema.NewTracker(env.SchemaEngine) - schemaTracker.SchemaUpdated(lastPos, ev.Ddl, ev.Timestamp) - } - } - } t.Logf("Received events: %v", evs) select { case ch <- evs: From 7ce4df48ca1fdfc60f3b9896208556d9a7e365e6 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 19 Jun 2020 16:44:54 +0200 Subject: [PATCH 2/3] Make tests more repeatable Signed-off-by: Rohit Nayak --- go/test/endtoend/vreplication/cluster.go | 2 +- go/test/endtoend/vreplication/helper.go | 2 ++ .../vreplication/vreplication_test.go | 19 ++++++++++--------- 3 files changed, 13 insertions(+), 10 deletions(-) diff --git a/go/test/endtoend/vreplication/cluster.go b/go/test/endtoend/vreplication/cluster.go index 4f838506538..30ad922e341 100644 --- a/go/test/endtoend/vreplication/cluster.go +++ b/go/test/endtoend/vreplication/cluster.go @@ -374,7 +374,7 @@ func (vc *VitessCluster) WaitForVReplicationToCatchup(vttablet *cluster.Vttablet results := [3]string{"[INT64(0)]", "[INT64(1)]", "[INT64(0)]"} var lastChecked time.Time for ind, query := range queries { - waitDuration := 100 * time.Millisecond + waitDuration := 500 * time.Millisecond for duration > 0 { fmt.Printf("Executing query %s on %s\n", query, vttablet.Name) lastChecked = time.Now() diff --git a/go/test/endtoend/vreplication/helper.go b/go/test/endtoend/vreplication/helper.go index c774efeb7ab..e32f9afd043 100644 --- a/go/test/endtoend/vreplication/helper.go +++ b/go/test/endtoend/vreplication/helper.go @@ -159,7 +159,9 @@ func getQueryCount(url string, query string) int { //Queries seem to include non-printable characters at times and hence equality fails unless these are removed re := regexp.MustCompile("[[:^ascii:]]") foundQuery := re.ReplaceAllLiteralString(row[queryIndex], "") + foundQuery = strings.ReplaceAll(foundQuery, "_", "") cleanQuery := re.ReplaceAllLiteralString(query, "") + cleanQuery = strings.ReplaceAll(cleanQuery, "_", "") if foundQuery == cleanQuery { count, _ = strconv.Atoi(row[countIndex]) } diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index c7a9a6a843e..6e625ba338d 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -63,7 +63,6 @@ func TestBasicVreplicationWorkflow(t *testing.T) { verifyClusterHealth(t) insertInitialData(t) shardCustomer(t, true) - shardOrders(t) shardMerchant(t) @@ -175,9 +174,11 @@ func shardCustomer(t *testing.T, testReverse bool) { insertQuery2 := "insert into customer(name) values('tempCustomer2')" matchInsertQuery2 := "insert into customer(name, cid) values (:vtg1, :_cid0)" assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2)) - insertQuery2 = "insert into customer(name) values('tempCustomer3')" //ID 101, hence due to reverse_bits in shard 80- + + insertQuery2 = "insert into customer(name, cid) values('tempCustomer3', 101)" //ID 101, hence due to reverse_bits in shard 80- assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2)) - insertQuery2 = "insert into customer(name) values('tempCustomer4')" //ID 102, hence due to reverse_bits in shard -80 + + insertQuery2 = "insert into customer(name, cid) values('tempCustomer4', 102)" //ID 102, hence due to reverse_bits in shard -80 assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2)) if testReverse { @@ -244,12 +245,12 @@ func shardCustomer(t *testing.T, testReverse bool) { assert.NoError(t, err, "Customer table not deleted from zone1-200") assert.True(t, found) - insertQuery2 = "insert into customer(name) values('tempCustomer8')" //ID 103, hence due to reverse_bits in shard 80- + insertQuery2 = "insert into customer(name, cid) values('tempCustomer8', 103)" //ID 103, hence due to reverse_bits in shard 80- assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2)) - insertQuery2 = "insert into customer(name) values('tempCustomer9')" //ID 104, hence due to reverse_bits in shard 80- - assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2)) - insertQuery2 = "insert into customer(name) values('tempCustomer10')" //ID 105, hence due to reverse_bits in shard -80 + insertQuery2 = "insert into customer(name, cid) values('tempCustomer10', 104)" //ID 105, hence due to reverse_bits in shard -80 assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2)) + insertQuery2 = "insert into customer(name, cid) values('tempCustomer9', 105)" //ID 104, hence due to reverse_bits in shard 80- + assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2)) execVtgateQuery(t, vtgateConn, "customer", "delete from customer where name like 'tempCustomer%'") assert.Empty(t, validateCountInTablet(t, customerTab1, "customer", "customer", 1)) @@ -266,7 +267,7 @@ func shardCustomer(t *testing.T, testReverse bool) { func reshardCustomer2to4Split(t *testing.T) { ksName := "customer" - counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 5, "zone1-900": 6} + counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 6, "zone1-900": 5} reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-", 600, counts, nil) assert.Empty(t, validateCount(t, vtgateConn, ksName, "customer", 20)) query := "insert into customer (name) values('yoko')" @@ -329,7 +330,7 @@ func reshardMerchant3to1Merge(t *testing.T) { func reshardCustomer3to2SplitMerge(t *testing.T) { //-40,40-80,80-c0 => merge/split, c0- stays the same ending up with 3 ksName := "customer" - counts := map[string]int{"zone1-600": 5, "zone1-700": 5, "zone1-800": 5, "zone1-900": 6} + counts := map[string]int{"zone1-1000": 7, "zone1-1100": 9, "zone1-1200": 5} reshard(t, ksName, "customer", "c4c3", "-40,40-80,80-c0", "-60,60-c0", 1000, counts, nil) } From 9dd5ed46231c80fdd6d727bae9e7cedcb239fdb2 Mon Sep 17 00:00:00 2001 From: Sugu Sougoumarane Date: Fri, 19 Jun 2020 15:18:14 -0700 Subject: [PATCH 3/3] vttablet: fix tracker endtoend test Signed-off-by: Sugu Sougoumarane --- go/vt/vtexplain/vtexplain_vttablet.go | 1 + go/vt/vttablet/endtoend/framework/server.go | 1 + go/vt/vttablet/endtoend/vstreamer_test.go | 18 +++++------ .../tabletserver/replication_watcher.go | 2 +- go/vt/vttablet/tabletserver/schema/tracker.go | 31 ++++++++++++++++--- .../tabletserver/schema/tracker_test.go | 11 +++++-- .../tabletserver/vstreamer/vstreamer.go | 10 ------ 7 files changed, 45 insertions(+), 29 deletions(-) diff --git a/go/vt/vtexplain/vtexplain_vttablet.go b/go/vt/vtexplain/vtexplain_vttablet.go index 26724bc0684..d592414f282 100644 --- a/go/vt/vtexplain/vtexplain_vttablet.go +++ b/go/vt/vtexplain/vtexplain_vttablet.go @@ -77,6 +77,7 @@ func newTablet(opts *Options, t *topodatapb.Tablet) *explainTablet { db := fakesqldb.New(nil) config := tabletenv.NewCurrentConfig() + config.TrackSchemaVersions = false if opts.ExecutionMode == ModeTwoPC { config.TwoPCCoordinatorAddress = "XXX" config.TwoPCAbandonAge = 1.0 diff --git a/go/vt/vttablet/endtoend/framework/server.go b/go/vt/vttablet/endtoend/framework/server.go index 1383b0a0d1e..a14ded01fc3 100644 --- a/go/vt/vttablet/endtoend/framework/server.go +++ b/go/vt/vttablet/endtoend/framework/server.go @@ -74,6 +74,7 @@ func StartServer(connParams, connAppDebugParams mysql.ConnParams, dbName string) config.TwoPCAbandonAge = 1 config.TwoPCCoordinatorAddress = "fake" config.HotRowProtection.Mode = tabletenv.Enable + config.TrackSchemaVersions = true Target = querypb.Target{ Keyspace: "vttest", diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index 43839122400..032f544b8e3 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -34,10 +34,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" tabletpb "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/vttablet/endtoend/framework" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" ) type test struct { @@ -50,9 +47,7 @@ func TestHistorianSchemaUpdate(t *testing.T) { defer cancel() tsv := framework.Server historian := tsv.Historian() - srvTopo := srvtopo.NewResilientServer(framework.TopoServer, "SchemaVersionE2ETestTopo") - vstreamer.NewEngine(tabletenv.NewEnv(tsv.Config(), "SchemaVersionE2ETest"), srvTopo, tsv.SchemaEngine(), historian) target := &querypb.Target{ Keyspace: "vttest", Shard: "0", @@ -96,14 +91,19 @@ func TestHistorianSchemaUpdate(t *testing.T) { } func TestSchemaVersioning(t *testing.T) { + // Let's disable the already running tracker to prevent it from + // picking events from the previous test, and then re-enable it at the end. + tsv := framework.Server + tsv.StopTracker() + tsv.Historian().SetTrackSchemaVersions(false) + defer tsv.StartTracker() + defer tsv.Historian().SetTrackSchemaVersions(true) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - tsv := framework.Server tsv.Historian().SetTrackSchemaVersions(true) tsv.StartTracker() - srvTopo := srvtopo.NewResilientServer(framework.TopoServer, "SchemaVersionE2ETestTopo") - vstreamer.NewEngine(tabletenv.NewEnv(tsv.Config(), "SchemaVersionE2ETest"), srvTopo, tsv.SchemaEngine(), tsv.Historian()) target := &querypb.Target{ Keyspace: "vttest", Shard: "0", @@ -379,7 +379,6 @@ func runCases(ctx context.Context, t *testing.T, tests []test, eventCh chan []*b query := test.query client.Execute(query, nil) if len(test.output) > 0 { - t.Logf("expecting: %#v", test.output) expectLogs(ctx, t, query, eventCh, test.output) } if strings.HasPrefix(query, "create") || strings.HasPrefix(query, "alter") || strings.HasPrefix(query, "drop") { @@ -434,7 +433,6 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan [] for i, want := range output { // CurrentTime is not testable. evs[i].CurrentTime = 0 - t.Logf("checking: %v: %v: %v", i, want, evs[i]) switch want { case "begin": if evs[i].Type != binlogdatapb.VEventType_BEGIN { diff --git a/go/vt/vttablet/tabletserver/replication_watcher.go b/go/vt/vttablet/tabletserver/replication_watcher.go index 4f425da122b..06b7f829550 100644 --- a/go/vt/vttablet/tabletserver/replication_watcher.go +++ b/go/vt/vttablet/tabletserver/replication_watcher.go @@ -79,7 +79,7 @@ func (rpw *ReplicationWatcher) Close() { func (rpw *ReplicationWatcher) process(ctx context.Context) { defer rpw.env.LogError() - defer rpw.wg.Wait() + defer rpw.wg.Done() filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 10c66a4d5cf..4b57d210d63 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -48,35 +48,56 @@ type VStreamer interface { // Tracker watches the replication and saves the latest schema into _vt.schema_version when a DDL is encountered. type Tracker struct { + enabled bool + + mu sync.Mutex + cancel context.CancelFunc + wg sync.WaitGroup + env tabletenv.Env vs VStreamer engine *Engine - cancel context.CancelFunc - wg sync.WaitGroup } // NewTracker creates a Tracker, needs an Open SchemaEngine (which implements the trackerEngine interface) func NewTracker(env tabletenv.Env, vs VStreamer, engine *Engine) *Tracker { return &Tracker{ - env: env, - vs: vs, - engine: engine, + enabled: env.Config().TrackSchemaVersions, + env: env, + vs: vs, + engine: engine, } } // Open enables the tracker functionality func (tr *Tracker) Open() { + if !tr.enabled { + log.Info("Schema tracker is not enabled.") + return + } + + tr.mu.Lock() + defer tr.mu.Unlock() + if tr.cancel != nil { + return + } + ctx, cancel := context.WithCancel(tabletenv.LocalContext()) tr.cancel = cancel tr.wg.Add(1) + log.Info("Schema tracker enabled.") go tr.process(ctx) } // Close disables the tracker functionality func (tr *Tracker) Close() { + tr.mu.Lock() + defer tr.mu.Unlock() if tr.cancel == nil { return } + + log.Info("Schema tracker stopped.") tr.cancel() tr.cancel = nil tr.wg.Wait() diff --git a/go/vt/vttablet/tabletserver/schema/tracker_test.go b/go/vt/vttablet/tabletserver/schema/tracker_test.go index 9472585ffb2..3bbe1d63ba4 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker_test.go +++ b/go/vt/vttablet/tabletserver/schema/tracker_test.go @@ -23,6 +23,7 @@ import ( "golang.org/x/net/context" "vitess.io/vitess/go/sqltypes" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" ) func TestTracker(t *testing.T) { @@ -62,13 +63,17 @@ func TestTracker(t *testing.T) { }, }}, } - initial := se.env.Stats().ErrorCounters.Counts()["INTERNAL"] - tracker := NewTracker(se.env, vs, se) + config := se.env.Config() + config.TrackSchemaVersions = true + env := tabletenv.NewEnv(config, "TrackerTest") + initial := env.Stats().ErrorCounters.Counts()["INTERNAL"] + tracker := NewTracker(env, vs, se) tracker.Open() <-vs.done cancel() tracker.Close() - final := se.env.Stats().ErrorCounters.Counts()["INTERNAL"] + // Two of those events should have caused an error. + final := env.Stats().ErrorCounters.Counts()["INTERNAL"] assert.Equal(t, initial+2, final) } diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 96c603fa634..ee298b8ff34 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -131,15 +131,12 @@ func (vs *vstreamer) SetVSchema(vschema *localVSchema) { // that thread, which helps us avoid mutexes to update the plans. select { case vs.vevents <- vschema: - log.Infof("VSchema sent to vs.vevents") case <-vs.ctx.Done(): - log.Infof("ctx.Done() in setVSchema") } } // Cancel stops the streaming. func (vs *vstreamer) Cancel() { - log.Infof("vstreamer context is being cancelled") vs.cancel() } @@ -159,7 +156,6 @@ func (vs *vstreamer) Stream() error { // Stream streams binlog events. func (vs *vstreamer) replicate(ctx context.Context) error { - log.Infof("In replicate with pos %s", vs.pos) // Ensure sh is Open. If vttablet came up in a non_serving role, // the historian may not have been initialized. if err := vs.sh.Open(); err != nil { @@ -182,7 +178,6 @@ func (vs *vstreamer) replicate(ctx context.Context) error { // parseEvents parses and sends events. func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.BinlogEvent) error { - log.Infof("In parse events") // bufferAndTransmit uses bufferedEvents and curSize to buffer events. var ( bufferedEvents []*binlogdatapb.VEvent @@ -279,7 +274,6 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } vevents, err := vs.parseEvent(ev) if err != nil { - log.Infof("parseEvent returned error %v", err) return err } for _, vevent := range vevents { @@ -291,9 +285,7 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } } case vs.vschema = <-vs.vevents: - log.Infof("Received vschema in vs.vevents") if err := vs.rebuildPlans(); err != nil { - log.Infof("Error rebuilding plans %v", err) return err } // Increment this counter for testing. @@ -436,7 +428,6 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e }) } else { // If the DDL need not be sent, send a dummy OTHER event. - log.Infof("Not sending DDL for %s", q.SQL) vevents = append(vevents, &binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_GTID, Gtid: mysql.EncodePosition(vs.pos), @@ -513,7 +504,6 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e if id == vs.journalTableID { vevents, err = vs.processJournalEvent(vevents, plan, rows) } else if id == vs.versionTableID { - log.Infof("In vstreamer registering version event") vs.sh.RegisterVersionEvent() vevent := &binlogdatapb.VEvent{ Type: binlogdatapb.VEventType_VERSION,