diff --git a/go.mod b/go.mod index 0fda1348b06..0fbf618f358 100644 --- a/go.mod +++ b/go.mod @@ -48,6 +48,7 @@ require ( github.com/magiconair/properties v1.8.1 github.com/mattn/go-runewidth v0.0.3 // indirect github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 + github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-testing-interface v1.14.0 // indirect github.com/mitchellh/mapstructure v1.2.3 // indirect github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4 diff --git a/go.sum b/go.sum index 388c27fd24b..c41fd8aff02 100644 --- a/go.sum +++ b/go.sum @@ -420,6 +420,8 @@ github.com/mitchellh/cli v1.1.0 h1:tEElEatulEHDeedTxwckzyYMA5c86fbmNIUL1hBIiTg= github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= +github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.14.0 h1:/x0XQ6h+3U3nAyk1yx+bHPURrKa9sVVvYbuqZ7pIAtI= github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= diff --git a/go/vt/vttablet/endtoend/vstreamer_test.go b/go/vt/vttablet/endtoend/vstreamer_test.go index 7c1f6e39234..c7f5e30b5dc 100644 --- a/go/vt/vttablet/endtoend/vstreamer_test.go +++ b/go/vt/vttablet/endtoend/vstreamer_test.go @@ -38,6 +38,20 @@ type test struct { output []string } +// the schema version tests can get events related to the creation of the schema version table depending on +// whether the table already exists or not. To avoid different behaviour when tests are run together +// this function adds to events expected if table is not present +func getSchemaVersionTableCreationEvents() []string { + tableCreationEvents := []string{"gtid", "other", "gtid", "other"} + client := framework.NewClient() + _, err := client.Execute("describe _vt.schema_version", nil) + if err != nil { + log.Errorf("_vt.schema_version not found, will expect its table creation events") + return tableCreationEvents + } + return nil +} + func TestSchemaVersioning(t *testing.T) { // Let's disable the already running tracker to prevent it from // picking events from the previous test, and then re-enable it at the end. @@ -67,14 +81,15 @@ func TestSchemaVersioning(t *testing.T) { var cases = []test{ { query: "create table vitess_version (id1 int, id2 int)", - output: []string{ + output: append(append([]string{ `gtid`, //gtid+other => vstream current pos `other`, `gtid`, //gtid+ddl => actual query - `type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `, + `type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `}, + getSchemaVersionTableCreationEvents()...), `version`, `gtid`, - }, + ), }, { query: "insert into vitess_version values(1, 10)", @@ -168,6 +183,7 @@ func TestSchemaVersioning(t *testing.T) { } runCases(ctx, t, cases, eventCh) cancel() + log.Infof("\n\n\n=============================================== PAST EVENTS WITH TRACK VERSIONS START HERE ======================\n\n\n") ctx, cancel = context.WithCancel(context.Background()) defer cancel() @@ -197,9 +213,10 @@ func TestSchemaVersioning(t *testing.T) { }() // playing events from the past: same events as original since historian is providing the latest schema - output := []string{ + output := append(append([]string{ `gtid`, - `type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `, + `type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `}, + getSchemaVersionTableCreationEvents()...), `version`, `gtid`, `type:FIELD field_event: fields: > `, @@ -224,7 +241,7 @@ func TestSchemaVersioning(t *testing.T) { `type:FIELD field_event: fields: fields: fields: > `, `type:ROW row_event: > > `, `gtid`, - } + ) expectLogs(ctx, t, "Past stream", eventCh, output) @@ -260,9 +277,10 @@ func TestSchemaVersioning(t *testing.T) { }() // playing events from the past: same as earlier except one below, see comments - output = []string{ + output = append(append([]string{ `gtid`, - `type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `, + `type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `}, + getSchemaVersionTableCreationEvents()...), `version`, `gtid`, `type:FIELD field_event: fields: > `, @@ -290,7 +308,7 @@ func TestSchemaVersioning(t *testing.T) { `type:FIELD field_event: fields: fields: fields: > `, `type:ROW row_event: > > `, `gtid`, - } + ) expectLogs(ctx, t, "Past stream", eventCh, output) cancel() @@ -302,6 +320,94 @@ func TestSchemaVersioning(t *testing.T) { log.Info("=== END OF TEST") } +func TestSchemaVersioningLongDDL(t *testing.T) { + // Let's disable the already running tracker to prevent it from + // picking events from the previous test, and then re-enable it at the end. + tsv := framework.Server + tsv.EnableHistorian(false) + tsv.SetTracking(false) + defer tsv.EnableHistorian(true) + defer tsv.SetTracking(true) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + tsv.EnableHistorian(true) + tsv.SetTracking(true) + + target := &querypb.Target{ + Keyspace: "vttest", + Shard: "0", + TabletType: tabletpb.TabletType_MASTER, + Cell: "", + } + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "/.*/", + }}, + } + longDDL := "create table vitess_version (" + for i := 0; i < 100; i++ { + col := fmt.Sprintf("id%d_%s int", i, strings.Repeat("0", 10)) + if i != 99 { + col += ", " + } + longDDL += col + } + longDDL += ")" + + var cases = []test{ + { + query: longDDL, + output: append(append([]string{ + `gtid`, //gtid+other => vstream current pos + `other`, + `gtid`, //gtid+ddl => actual query + fmt.Sprintf(`type:DDL ddl:"%s" `, longDDL)}, + getSchemaVersionTableCreationEvents()...), + `version`, + `gtid`, + ), + }, + } + eventCh := make(chan []*binlogdatapb.VEvent) + var startPos string + send := func(events []*binlogdatapb.VEvent) error { + var evs []*binlogdatapb.VEvent + for _, event := range events { + if event.Type == binlogdatapb.VEventType_GTID { + if startPos == "" { + startPos = event.Gtid + } + } + if event.Type == binlogdatapb.VEventType_HEARTBEAT { + continue + } + log.Infof("Received event %v", event) + evs = append(evs, event) + } + select { + case eventCh <- evs: + case <-ctx.Done(): + return nil + } + return nil + } + go func() { + defer close(eventCh) + if err := tsv.VStream(ctx, target, "current", nil, filter, send); err != nil { + fmt.Printf("Error in tsv.VStream: %v", err) + t.Error(err) + } + }() + runCases(ctx, t, cases, eventCh) + + cancel() + + client := framework.NewClient() + client.Execute("drop table vitess_version", nil) + client.Execute("drop table _vt.schema_version", nil) +} + func runCases(ctx context.Context, t *testing.T, tests []test, eventCh chan []*binlogdatapb.VEvent) { client := framework.NewClient() @@ -316,6 +422,8 @@ func runCases(ctx context.Context, t *testing.T, tests []test, eventCh chan []*b if err != nil || !ok { t.Fatalf("Query %s never got inserted into the schema_version table", query) } + framework.Server.SchemaEngine().Reload(ctx) + } } } @@ -325,7 +433,7 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan [] timer := time.NewTimer(5 * time.Second) defer timer.Stop() var evs []*binlogdatapb.VEvent - log.Infof("In expectLogs for query %s, output len %s", query, len(output)) + log.Infof("In expectLogs for query %s, output len %d", query, len(output)) for { select { case allevs, ok := <-eventCh: diff --git a/go/vt/vttablet/tabletserver/schema/tracker.go b/go/vt/vttablet/tabletserver/schema/tracker.go index 274eba88175..12b89cc22e2 100644 --- a/go/vt/vttablet/tabletserver/schema/tracker.go +++ b/go/vt/vttablet/tabletserver/schema/tracker.go @@ -40,8 +40,9 @@ const createSchemaTrackingTable = `CREATE TABLE IF NOT EXISTS _vt.schema_version schemax BLOB NOT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB` +const alterSchemaTrackingTable = "alter table _vt.schema_version modify column ddl BLOB NOT NULL" -var withDDL = withddl.New([]string{createSchemaTrackingTable}) +var withDDL = withddl.New([]string{createSchemaTrackingTable, alterSchemaTrackingTable}) // VStreamer defines the functions of VStreamer // that the replicationWatcher needs. diff --git a/go/vt/withddl/withddl.go b/go/vt/withddl/withddl.go index 9cfb3c3329d..95fb3fd3aaf 100644 --- a/go/vt/withddl/withddl.go +++ b/go/vt/withddl/withddl.go @@ -64,7 +64,7 @@ func (wd *WithDDL) Exec(ctx context.Context, query string, f interface{}) (*sqlt return nil, err } - log.Info("Updating schema for %v and retrying: %v", query, err) + log.Infof("Updating schema for %v and retrying: %v", query, err) for _, query := range wd.ddls { _, merr := exec(query) if merr == nil {