diff --git a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go index 2336641132d..50032ba4e37 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go +++ b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go @@ -94,7 +94,6 @@ func Init() (*Env, error) { os.RemoveAll(te.cluster.Config.SchemaDir) return nil, fmt.Errorf("could not launch mysql: %v", err) } - te.Dbcfgs = dbconfigs.NewTestDBConfigs(te.cluster.MySQLConnParams(), te.cluster.MySQLAppDebugConnParams(), te.cluster.DbName()) config := tabletenv.NewDefaultConfig() config.DB = te.Dbcfgs diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 58c29c3fd05..2c9f578f9fd 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -316,7 +316,6 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog // parseEvent parses an event from the binlog and converts it to a list of VEvents. func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) { - if !ev.IsValid() { return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev) } @@ -383,6 +382,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e } // Insert/Delete/Update are supported only to be used in the context of external mysql streams where source databases // could be using SBR. Vitess itself will never run into cases where it needs to consume non rbr statements. + switch cat := sqlparser.Preview(q.SQL); cat { case sqlparser.StmtInsert: mustSend := mustSendStmt(q, params.DbName) @@ -455,7 +455,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e Statement: q.SQL, }) } - case sqlparser.StmtOther, sqlparser.StmtPriv: + case sqlparser.StmtOther, sqlparser.StmtPriv, sqlparser.StmtSet: // These are either: // 1) DBA statements like REPAIR that can be ignored. // 2) Privilege-altering statements like GRANT/REVOKE @@ -535,6 +535,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e if err != nil { return nil, err } + } for _, vevent := range vevents { vevent.Timestamp = int64(ev.Timestamp()) diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index d10e757a7cc..e2675de7814 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -43,6 +43,57 @@ type testcase struct { output [][]string } +func checkIfOptionIsSupported(t *testing.T, variable string) bool { + qr, err := env.Mysqld.FetchSuperQuery(context.Background(), fmt.Sprintf("show variables like '%s'", variable)) + require.NoError(t, err) + require.NotNil(t, qr) + if qr.Rows != nil && len(qr.Rows) == 1 { + return true + } + return false +} + +func TestSetStatement(t *testing.T) { + + if testing.Short() { + t.Skip() + } + if !checkIfOptionIsSupported(t, "log_builtin_as_identified_by_password") { + // the combination of setting this option and support for "set password" only works on a few flavors + log.Info("Cannot test SetStatement on this flavor") + return + } + + execStatements(t, []string{ + "create table t1(id int, val varbinary(128), primary key(id))", + }) + defer execStatements(t, []string{ + "drop table t1", + }) + engine.se.Reload(context.Background()) + queries := []string{ + "begin", + "insert into t1 values (1, 'aaa')", + "commit", + "set global log_builtin_as_identified_by_password=1", + "SET PASSWORD FOR 'vt_appdebug'@'localhost'='*AA17DA66C7C714557F5485E84BCAFF2C209F2F53'", //select password('vtappdebug_password'); + } + testcases := []testcase{{ + input: queries, + output: [][]string{{ + `begin`, + `type:FIELD field_event: fields: > `, + `type:ROW row_event: > > `, + `gtid`, + `commit`, + }, { + `gtid`, + `other`, + }}, + }} + runCases(t, nil, testcases, "current", nil) +} + func TestVersion(t *testing.T) { if testing.Short() { t.Skip() @@ -1649,6 +1700,14 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [ if evs[i].Type != binlogdatapb.VEventType_COMMIT { t.Fatalf("%v (%d): event: %v, want commit", input, i, evs[i]) } + case "other": + if evs[i].Type != binlogdatapb.VEventType_OTHER { + t.Fatalf("%v (%d): event: %v, want other", input, i, evs[i]) + } + case "ddl": + if evs[i].Type != binlogdatapb.VEventType_DDL { + t.Fatalf("%v (%d): event: %v, want ddl", input, i, evs[i]) + } default: evs[i].Timestamp = 0 if evs[i].Type == binlogdatapb.VEventType_FIELD {