Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ require (
github.com/magiconair/properties v1.8.1
github.com/mattn/go-runewidth v0.0.3 // indirect
github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1
github.com/mitchellh/go-ps v1.0.0 // indirect
github.com/mitchellh/go-testing-interface v1.14.0 // indirect
github.com/mitchellh/mapstructure v1.2.3 // indirect
github.com/olekukonko/tablewriter v0.0.0-20180130162743-b8a9be070da4
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ github.com/mitchellh/cli v1.1.0 h1:tEElEatulEHDeedTxwckzyYMA5c86fbmNIUL1hBIiTg=
github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc=
github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg=
github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI=
github.com/mitchellh/go-testing-interface v1.14.0 h1:/x0XQ6h+3U3nAyk1yx+bHPURrKa9sVVvYbuqZ7pIAtI=
github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8=
Expand Down
128 changes: 118 additions & 10 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,20 @@ type test struct {
output []string
}

// the schema version tests can get events related to the creation of the schema version table depending on
// whether the table already exists or not. To avoid different behaviour when tests are run together
// this function adds to events expected if table is not present
func getSchemaVersionTableCreationEvents() []string {
tableCreationEvents := []string{"gtid", "other", "gtid", "other"}
client := framework.NewClient()
_, err := client.Execute("describe _vt.schema_version", nil)
if err != nil {
log.Errorf("_vt.schema_version not found, will expect its table creation events")
return tableCreationEvents
}
return nil
}

func TestSchemaVersioning(t *testing.T) {
// Let's disable the already running tracker to prevent it from
// picking events from the previous test, and then re-enable it at the end.
Expand Down Expand Up @@ -67,14 +81,15 @@ func TestSchemaVersioning(t *testing.T) {
var cases = []test{
{
query: "create table vitess_version (id1 int, id2 int)",
output: []string{
output: append(append([]string{
`gtid`, //gtid+other => vstream current pos
`other`,
`gtid`, //gtid+ddl => actual query
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `,
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `},
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
},
),
},
{
query: "insert into vitess_version values(1, 10)",
Expand Down Expand Up @@ -168,6 +183,7 @@ func TestSchemaVersioning(t *testing.T) {
}
runCases(ctx, t, cases, eventCh)
cancel()

log.Infof("\n\n\n=============================================== PAST EVENTS WITH TRACK VERSIONS START HERE ======================\n\n\n")
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
Expand Down Expand Up @@ -197,9 +213,10 @@ func TestSchemaVersioning(t *testing.T) {
}()

// playing events from the past: same events as original since historian is providing the latest schema
output := []string{
output := append(append([]string{
`gtid`,
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `,
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `},
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
Expand All @@ -224,7 +241,7 @@ func TestSchemaVersioning(t *testing.T) {
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > fields:<name:"id4" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 lengths:4 values:"440FFFGGGG" > > > `,
`gtid`,
}
)

expectLogs(ctx, t, "Past stream", eventCh, output)

Expand Down Expand Up @@ -260,9 +277,10 @@ func TestSchemaVersioning(t *testing.T) {
}()

// playing events from the past: same as earlier except one below, see comments
output = []string{
output = append(append([]string{
`gtid`,
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `,
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `},
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
Expand Down Expand Up @@ -290,7 +308,7 @@ func TestSchemaVersioning(t *testing.T) {
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > fields:<name:"id4" type:VARBINARY > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 lengths:4 values:"440FFFGGGG" > > > `,
`gtid`,
}
)

expectLogs(ctx, t, "Past stream", eventCh, output)
cancel()
Expand All @@ -302,6 +320,94 @@ func TestSchemaVersioning(t *testing.T) {
log.Info("=== END OF TEST")
}

func TestSchemaVersioningLongDDL(t *testing.T) {
// Let's disable the already running tracker to prevent it from
// picking events from the previous test, and then re-enable it at the end.
tsv := framework.Server
tsv.EnableHistorian(false)
tsv.SetTracking(false)
defer tsv.EnableHistorian(true)
defer tsv.SetTracking(true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tsv.EnableHistorian(true)
tsv.SetTracking(true)

target := &querypb.Target{
Keyspace: "vttest",
Shard: "0",
TabletType: tabletpb.TabletType_MASTER,
Cell: "",
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*/",
}},
}
longDDL := "create table vitess_version ("
for i := 0; i < 100; i++ {
col := fmt.Sprintf("id%d_%s int", i, strings.Repeat("0", 10))
if i != 99 {
col += ", "
}
longDDL += col
}
longDDL += ")"

var cases = []test{
{
query: longDDL,
output: append(append([]string{
`gtid`, //gtid+other => vstream current pos
`other`,
`gtid`, //gtid+ddl => actual query
fmt.Sprintf(`type:DDL ddl:"%s" `, longDDL)},
getSchemaVersionTableCreationEvents()...),
`version`,
`gtid`,
),
},
}
eventCh := make(chan []*binlogdatapb.VEvent)
var startPos string
send := func(events []*binlogdatapb.VEvent) error {
var evs []*binlogdatapb.VEvent
for _, event := range events {
if event.Type == binlogdatapb.VEventType_GTID {
if startPos == "" {
startPos = event.Gtid
}
}
if event.Type == binlogdatapb.VEventType_HEARTBEAT {
continue
}
log.Infof("Received event %v", event)
evs = append(evs, event)
}
select {
case eventCh <- evs:
case <-ctx.Done():
return nil
}
return nil
}
go func() {
defer close(eventCh)
if err := tsv.VStream(ctx, target, "current", nil, filter, send); err != nil {
fmt.Printf("Error in tsv.VStream: %v", err)
t.Error(err)
}
}()
runCases(ctx, t, cases, eventCh)

cancel()

client := framework.NewClient()
client.Execute("drop table vitess_version", nil)
client.Execute("drop table _vt.schema_version", nil)
}

func runCases(ctx context.Context, t *testing.T, tests []test, eventCh chan []*binlogdatapb.VEvent) {
client := framework.NewClient()

Expand All @@ -316,6 +422,8 @@ func runCases(ctx context.Context, t *testing.T, tests []test, eventCh chan []*b
if err != nil || !ok {
t.Fatalf("Query %s never got inserted into the schema_version table", query)
}
framework.Server.SchemaEngine().Reload(ctx)

}
}
}
Expand All @@ -325,7 +433,7 @@ func expectLogs(ctx context.Context, t *testing.T, query string, eventCh chan []
timer := time.NewTimer(5 * time.Second)
defer timer.Stop()
var evs []*binlogdatapb.VEvent
log.Infof("In expectLogs for query %s, output len %s", query, len(output))
log.Infof("In expectLogs for query %s, output len %d", query, len(output))
for {
select {
case allevs, ok := <-eventCh:
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletserver/schema/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ const createSchemaTrackingTable = `CREATE TABLE IF NOT EXISTS _vt.schema_version
schemax BLOB NOT NULL,
PRIMARY KEY (id)
) ENGINE=InnoDB`
const alterSchemaTrackingTable = "alter table _vt.schema_version modify column ddl BLOB NOT NULL"

var withDDL = withddl.New([]string{createSchemaTrackingTable})
var withDDL = withddl.New([]string{createSchemaTrackingTable, alterSchemaTrackingTable})

// VStreamer defines the functions of VStreamer
// that the replicationWatcher needs.
Expand Down
2 changes: 1 addition & 1 deletion go/vt/withddl/withddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (wd *WithDDL) Exec(ctx context.Context, query string, f interface{}) (*sqlt
return nil, err
}

log.Info("Updating schema for %v and retrying: %v", query, err)
log.Infof("Updating schema for %v and retrying: %v", query, err)
for _, query := range wd.ddls {
_, merr := exec(query)
if merr == nil {
Expand Down