diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 319d6c7e0d3..901ea7bc17e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -203,16 +203,12 @@ func mustSendDDL(query mysql.Query, dbname string, filter *binlogdatapb.Filter) return true } -// tableMatches is similar to buildPlan below and MatchTable in vreplication/table_plan_builder.go. -func tableMatches(table sqlparser.TableName, dbname string, filter *binlogdatapb.Filter) bool { - if !table.Qualifier.IsEmpty() && table.Qualifier.String() != dbname { - return false - } +func ruleMatches(tableName string, filter *binlogdatapb.Filter) bool { for _, rule := range filter.Rules { switch { case strings.HasPrefix(rule.Match, "/"): expr := strings.Trim(rule.Match, "/") - result, err := regexp.MatchString(expr, table.Name.String()) + result, err := regexp.MatchString(expr, tableName) if err != nil { return false } @@ -220,13 +216,21 @@ func tableMatches(table sqlparser.TableName, dbname string, filter *binlogdatapb continue } return true - case table.Name.String() == rule.Match: + case tableName == rule.Match: return true } } return false } +// tableMatches is similar to buildPlan below and MatchTable in vreplication/table_plan_builder.go. +func tableMatches(table sqlparser.TableName, dbname string, filter *binlogdatapb.Filter) bool { + if !table.Qualifier.IsEmpty() && table.Qualifier.String() != dbname { + return false + } + return ruleMatches(table.Name.String(), filter) +} + func buildPlan(ti *Table, vschema *localVSchema, filter *binlogdatapb.Filter) (*Plan, error) { for _, rule := range filter.Rules { switch { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 58c29c3fd05..d9b2b3d788f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -497,6 +497,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e vs.plans[id] = nil return nil, nil } + if !ruleMatches(tm.Name, vs.filter) { + return nil, nil + } vevent, err := vs.buildTablePlan(id, tm) if err != nil { @@ -652,10 +655,10 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er Type: t, }) } - st, err := vs.se.GetTableForPos(sqlparser.NewTableIdent(tm.Name), mysql.EncodePosition(vs.pos)) if err != nil { if vs.filter.FieldEventMode == binlogdatapb.Filter_ERR_ON_MISMATCH { + log.Infof("No schema found for table %s", tm.Name) return nil, fmt.Errorf("unknown table %v in schema", tm.Name) } return fields, nil @@ -663,6 +666,7 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er if len(st.Fields) < len(tm.Types) { if vs.filter.FieldEventMode == binlogdatapb.Filter_ERR_ON_MISMATCH { + log.Infof("Cannot determine columns for table %s", tm.Name) return nil, fmt.Errorf("cannot determine table columns for %s: event has %v, schema as %v", tm.Name, tm.Types, st.Fields) } return fields, nil diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go index d10e757a7cc..96561f8e6b0 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go @@ -118,6 +118,63 @@ func insertLotsOfData(t *testing.T, numRows int) { }) } +func TestMissingTables(t *testing.T) { + if testing.Short() { + t.Skip() + } + execStatements(t, []string{ + "create table t1(id11 int, id12 int, primary key(id11))", + "create table shortlived(id31 int, id32 int, primary key(id31))", + }) + defer execStatements(t, []string{ + "drop table t1", + "drop table _shortlived", + }) + startPos := masterPosition(t) + execStatements(t, []string{ + "insert into shortlived values (1,1), (2,2)", + "alter table shortlived rename to _shortlived", + }) + engine.se.Reload(context.Background()) + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + } + testcases := []testcase{ + { + input: []string{}, + output: [][]string{}, + }, + + { + input: []string{ + "insert into t1 values (101, 1010)", + }, + output: [][]string{ + { + "begin", + "gtid", + "commit", + }, + { + "gtid", + "type:OTHER ", + }, + { + "begin", + "type:FIELD field_event: fields: > ", + "type:ROW row_event: > > ", + "gtid", + "commit", + }, + }, + }, + } + runCases(t, filter, testcases, startPos, nil) +} + func TestVStreamCopySimpleFlow(t *testing.T) { if testing.Short() { t.Skip() @@ -1657,6 +1714,7 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [ } } if got := fmt.Sprintf("%v", evs[i]); got != want { + log.Errorf("%v (%d): event:\n%q, want\n%q", input, i, got, want) t.Fatalf("%v (%d): event:\n%q, want\n%q", input, i, got, want) } }