Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ func Init() (*Env, error) {
os.RemoveAll(te.cluster.Config.SchemaDir)
return nil, fmt.Errorf("could not launch mysql: %v", err)
}

te.Dbcfgs = dbconfigs.NewTestDBConfigs(te.cluster.MySQLConnParams(), te.cluster.MySQLAppDebugConnParams(), te.cluster.DbName())
config := tabletenv.NewDefaultConfig()
config.DB = te.Dbcfgs
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,6 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog

// parseEvent parses an event from the binlog and converts it to a list of VEvents.
func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) {

if !ev.IsValid() {
return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev)
}
Expand Down Expand Up @@ -383,6 +382,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
}
// Insert/Delete/Update are supported only to be used in the context of external mysql streams where source databases
// could be using SBR. Vitess itself will never run into cases where it needs to consume non rbr statements.

switch cat := sqlparser.Preview(q.SQL); cat {
case sqlparser.StmtInsert:
mustSend := mustSendStmt(q, params.DbName)
Expand Down Expand Up @@ -455,7 +455,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
Statement: q.SQL,
})
}
case sqlparser.StmtOther, sqlparser.StmtPriv:
case sqlparser.StmtOther, sqlparser.StmtPriv, sqlparser.StmtSet:
// These are either:
// 1) DBA statements like REPAIR that can be ignored.
// 2) Privilege-altering statements like GRANT/REVOKE
Expand Down Expand Up @@ -535,6 +535,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
if err != nil {
return nil, err
}

}
for _, vevent := range vevents {
vevent.Timestamp = int64(ev.Timestamp())
Expand Down
59 changes: 59 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,57 @@ type testcase struct {
output [][]string
}

func checkIfOptionIsSupported(t *testing.T, variable string) bool {
qr, err := env.Mysqld.FetchSuperQuery(context.Background(), fmt.Sprintf("show variables like '%s'", variable))
require.NoError(t, err)
require.NotNil(t, qr)
if qr.Rows != nil && len(qr.Rows) == 1 {
return true
}
return false
}

func TestSetStatement(t *testing.T) {

if testing.Short() {
t.Skip()
}
if !checkIfOptionIsSupported(t, "log_builtin_as_identified_by_password") {
// the combination of setting this option and support for "set password" only works on a few flavors
log.Info("Cannot test SetStatement on this flavor")
return
}

execStatements(t, []string{
"create table t1(id int, val varbinary(128), primary key(id))",
})
defer execStatements(t, []string{
"drop table t1",
})
engine.se.Reload(context.Background())
queries := []string{
"begin",
"insert into t1 values (1, 'aaa')",
"commit",
"set global log_builtin_as_identified_by_password=1",
"SET PASSWORD FOR 'vt_appdebug'@'localhost'='*AA17DA66C7C714557F5485E84BCAFF2C209F2F53'", //select password('vtappdebug_password');
}
testcases := []testcase{{
input: queries,
output: [][]string{{
`begin`,
`type:FIELD field_event:<table_name:"t1" fields:<name:"id" type:INT32 table:"t1" org_table:"t1" database:"vttest" org_name:"id" column_length:11 charset:63 > fields:<name:"val" type:VARBINARY table:"t1" org_table:"t1" database:"vttest" org_name:"val" column_length:128 charset:63 > > `,
`type:ROW row_event:<table_name:"t1" row_changes:<after:<lengths:1 lengths:3 values:"1aaa" > > > `,
`gtid`,
`commit`,
}, {
`gtid`,
`other`,
}},
}}
runCases(t, nil, testcases, "current", nil)
}

func TestVersion(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down Expand Up @@ -1649,6 +1700,14 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [
if evs[i].Type != binlogdatapb.VEventType_COMMIT {
t.Fatalf("%v (%d): event: %v, want commit", input, i, evs[i])
}
case "other":
if evs[i].Type != binlogdatapb.VEventType_OTHER {
t.Fatalf("%v (%d): event: %v, want other", input, i, evs[i])
}
case "ddl":
if evs[i].Type != binlogdatapb.VEventType_DDL {
t.Fatalf("%v (%d): event: %v, want ddl", input, i, evs[i])
}
default:
evs[i].Timestamp = 0
if evs[i].Type == binlogdatapb.VEventType_FIELD {
Expand Down