Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,30 +203,34 @@ func mustSendDDL(query mysql.Query, dbname string, filter *binlogdatapb.Filter)
return true
}

// tableMatches is similar to buildPlan below and MatchTable in vreplication/table_plan_builder.go.
func tableMatches(table sqlparser.TableName, dbname string, filter *binlogdatapb.Filter) bool {
if !table.Qualifier.IsEmpty() && table.Qualifier.String() != dbname {
return false
}
func ruleMatches(tableName string, filter *binlogdatapb.Filter) bool {
for _, rule := range filter.Rules {
switch {
case strings.HasPrefix(rule.Match, "/"):
expr := strings.Trim(rule.Match, "/")
result, err := regexp.MatchString(expr, table.Name.String())
result, err := regexp.MatchString(expr, tableName)
if err != nil {
return false
}
if !result {
continue
}
return true
case table.Name.String() == rule.Match:
case tableName == rule.Match:
return true
}
}
return false
}

// tableMatches is similar to buildPlan below and MatchTable in vreplication/table_plan_builder.go.
func tableMatches(table sqlparser.TableName, dbname string, filter *binlogdatapb.Filter) bool {
if !table.Qualifier.IsEmpty() && table.Qualifier.String() != dbname {
return false
}
return ruleMatches(table.Name.String(), filter)
}

func buildPlan(ti *Table, vschema *localVSchema, filter *binlogdatapb.Filter) (*Plan, error) {
for _, rule := range filter.Rules {
switch {
Expand Down
6 changes: 5 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/vstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e
vs.plans[id] = nil
return nil, nil
}
if !ruleMatches(tm.Name, vs.filter) {
return nil, nil
}

vevent, err := vs.buildTablePlan(id, tm)
if err != nil {
Expand Down Expand Up @@ -652,17 +655,18 @@ func (vs *vstreamer) buildTableColumns(tm *mysql.TableMap) ([]*querypb.Field, er
Type: t,
})
}

st, err := vs.se.GetTableForPos(sqlparser.NewTableIdent(tm.Name), mysql.EncodePosition(vs.pos))
if err != nil {
if vs.filter.FieldEventMode == binlogdatapb.Filter_ERR_ON_MISMATCH {
log.Infof("No schema found for table %s", tm.Name)
return nil, fmt.Errorf("unknown table %v in schema", tm.Name)
}
return fields, nil
}

if len(st.Fields) < len(tm.Types) {
if vs.filter.FieldEventMode == binlogdatapb.Filter_ERR_ON_MISMATCH {
log.Infof("Cannot determine columns for table %s", tm.Name)
return nil, fmt.Errorf("cannot determine table columns for %s: event has %v, schema as %v", tm.Name, tm.Types, st.Fields)
}
return fields, nil
Expand Down
58 changes: 58 additions & 0 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,63 @@ func insertLotsOfData(t *testing.T, numRows int) {
})
}

func TestMissingTables(t *testing.T) {
if testing.Short() {
t.Skip()
}
execStatements(t, []string{
"create table t1(id11 int, id12 int, primary key(id11))",
"create table shortlived(id31 int, id32 int, primary key(id31))",
})
defer execStatements(t, []string{
"drop table t1",
"drop table _shortlived",
})
startPos := masterPosition(t)
execStatements(t, []string{
"insert into shortlived values (1,1), (2,2)",
"alter table shortlived rename to _shortlived",
})
engine.se.Reload(context.Background())
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "t1",
Filter: "select * from t1",
}},
}
testcases := []testcase{
{
input: []string{},
output: [][]string{},
},

{
input: []string{
"insert into t1 values (101, 1010)",
},
output: [][]string{
{
"begin",
"gtid",
"commit",
},
{
"gtid",
"type:OTHER ",
},
{
"begin",
"type:FIELD field_event:<table_name:\"t1\" fields:<name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 > fields:<name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 > > ",
"type:ROW row_event:<table_name:\"t1\" row_changes:<after:<lengths:3 lengths:4 values:\"1011010\" > > > ",
"gtid",
"commit",
},
},
},
}
runCases(t, filter, testcases, startPos, nil)
}

func TestVStreamCopySimpleFlow(t *testing.T) {
if testing.Short() {
t.Skip()
Expand Down Expand Up @@ -1657,6 +1714,7 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [
}
}
if got := fmt.Sprintf("%v", evs[i]); got != want {
log.Errorf("%v (%d): event:\n%q, want\n%q", input, i, got, want)
t.Fatalf("%v (%d): event:\n%q, want\n%q", input, i, got, want)
}
}
Expand Down