Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ var (
vrepldb = "vrepl"
globalDBQueries = make(chan string, 1000)
testForeignKeyQueries = false
doNotLogDBQueries = false
)

type LogExpectation struct {
Expand Down Expand Up @@ -392,6 +393,9 @@ func (dbc *realDBClient) ExecuteFetch(query string, maxrows int) (*sqltypes.Resu
return nil, nil
}
qr, err := dbc.conn.ExecuteFetch(query, 10000, true)
if doNotLogDBQueries {
return qr, err
}
if !strings.HasPrefix(query, "select") && !strings.HasPrefix(query, "set") && !dbc.nolog {
globalDBQueries <- query
} else if testForeignKeyQueries && strings.Contains(query, "foreign_key_checks") { //allow select/set for foreign_key_checks
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type ReplicatorPlan struct {
VStreamFilter *binlogdatapb.Filter
TargetTables map[string]*TablePlan
TablePlans map[string]*TablePlan
tableKeys map[string][]string
PKInfoMap map[string][]*PrimaryKeyInfo
}

// buildExecution plan uses the field info as input and the partially built
Expand Down Expand Up @@ -86,8 +86,9 @@ func (rp *ReplicatorPlan) buildExecutionPlan(fieldEvent *binlogdatapb.FieldEvent
// requires us to wait for the field info sent by the source.
func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Result, fields []*querypb.Field) (*TablePlan, error) {
tpb := &tablePlanBuilder{
name: sqlparser.NewTableIdent(tableName),
lastpk: lastpk,
name: sqlparser.NewTableIdent(tableName),
lastpk: lastpk,
pkInfos: rp.PKInfoMap[tableName],
}
for _, field := range fields {
colName := sqlparser.NewColIdent(field.Name)
Expand All @@ -103,10 +104,10 @@ func (rp *ReplicatorPlan) buildFromFields(tableName string, lastpk *sqltypes.Res
tpb.colExprs = append(tpb.colExprs, cexpr)
}
// The following actions are a subset of buildTablePlan.
if err := tpb.analyzePK(rp.tableKeys); err != nil {
if err := tpb.analyzePK(rp.PKInfoMap); err != nil {
return nil, err
}
return tpb.generate(rp.tableKeys), nil
return tpb.generate(), nil
}

// MarshalJSON performs a custom JSON Marshalling.
Expand Down
24 changes: 12 additions & 12 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,8 @@ func TestBuildPlayerPlan(t *testing.T) {
err: "group by expression is not allowed to reference an aggregate expression: a",
}}

tableKeys := map[string][]string{
"t1": {"c1"},
PrimaryKeyInfos := map[string][]*PrimaryKeyInfo{
"t1": {&PrimaryKeyInfo{Name: "c1"}},
}

copyState := map[string]*sqltypes.Result{
Expand All @@ -681,7 +681,7 @@ func TestBuildPlayerPlan(t *testing.T) {
}

for _, tcase := range testcases {
plan, err := buildReplicatorPlan(tcase.input, tableKeys, nil)
plan, err := buildReplicatorPlan(tcase.input, PrimaryKeyInfos, nil)
gotPlan, _ := json.Marshal(plan)
wantPlan, _ := json.Marshal(tcase.plan)
if string(gotPlan) != string(wantPlan) {
Expand All @@ -695,7 +695,7 @@ func TestBuildPlayerPlan(t *testing.T) {
t.Errorf("Filter err(%v): %s, want %v", tcase.input, gotErr, tcase.err)
}

plan, err = buildReplicatorPlan(tcase.input, tableKeys, copyState)
plan, err = buildReplicatorPlan(tcase.input, PrimaryKeyInfos, copyState)
if err != nil {
continue
}
Expand All @@ -708,9 +708,9 @@ func TestBuildPlayerPlan(t *testing.T) {
}

func TestBuildPlayerPlanNoDup(t *testing.T) {
tableKeys := map[string][]string{
"t1": {"c1"},
"t2": {"c2"},
PrimaryKeyInfos := map[string][]*PrimaryKeyInfo{
"t1": {&PrimaryKeyInfo{Name: "c1"}},
"t2": {&PrimaryKeyInfo{Name: "c2"}},
}
input := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand All @@ -721,17 +721,17 @@ func TestBuildPlayerPlanNoDup(t *testing.T) {
Filter: "select * from t",
}},
}
_, err := buildReplicatorPlan(input, tableKeys, nil)
_, err := buildReplicatorPlan(input, PrimaryKeyInfos, nil)
want := "more than one target for source table t"
if err == nil || !strings.Contains(err.Error(), want) {
t.Errorf("buildReplicatorPlan err: %v, must contain: %v", err, want)
}
}

func TestBuildPlayerPlanExclude(t *testing.T) {
tableKeys := map[string][]string{
"t1": {"c1"},
"t2": {"c2"},
PrimaryKeyInfos := map[string][]*PrimaryKeyInfo{
"t1": {&PrimaryKeyInfo{Name: "c1"}},
"t2": {&PrimaryKeyInfo{Name: "c2"}},
}
input := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Expand All @@ -742,7 +742,7 @@ func TestBuildPlayerPlanExclude(t *testing.T) {
Filter: "",
}},
}
plan, err := buildReplicatorPlan(input, tableKeys, nil)
plan, err := buildReplicatorPlan(input, PrimaryKeyInfos, nil)
assert.NoError(t, err)

want := &TestReplicatorPlan{
Expand Down
53 changes: 39 additions & 14 deletions go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type tablePlanBuilder struct {
onInsert insertType
pkCols []*colExpr
lastpk *sqltypes.Result
pkInfos []*PrimaryKeyInfo
}

// colExpr describes the processing to be performed to
Expand Down Expand Up @@ -103,7 +104,7 @@ const (
// a table-specific rule is built to be sent to the source. We don't send the
// original rule to the source because it may not match the same tables as the
// target.
// tableKeys specifies the list of primary key columns for each table.
// pkInfoMap specifies the list of primary key columns for each table.
// copyState is a map of tables that have not been fully copied yet.
// If a table is not present in copyState, then it has been fully copied. If so,
// all replication events are applied. The table still has to match a Filter.Rule.
Expand All @@ -114,14 +115,14 @@ const (
// The TablePlan built is a partial plan. The full plan for a table is built
// when we receive field information from events or rows sent by the source.
// buildExecutionPlan is the function that builds the full plan.
func buildReplicatorPlan(filter *binlogdatapb.Filter, tableKeys map[string][]string, copyState map[string]*sqltypes.Result) (*ReplicatorPlan, error) {
func buildReplicatorPlan(filter *binlogdatapb.Filter, pkInfoMap map[string][]*PrimaryKeyInfo, copyState map[string]*sqltypes.Result) (*ReplicatorPlan, error) {
plan := &ReplicatorPlan{
VStreamFilter: &binlogdatapb.Filter{FieldEventMode: filter.FieldEventMode},
TargetTables: make(map[string]*TablePlan),
TablePlans: make(map[string]*TablePlan),
tableKeys: tableKeys,
PKInfoMap: pkInfoMap,
}
for tableName := range tableKeys {
for tableName := range pkInfoMap {
lastpk, ok := copyState[tableName]
if ok && lastpk == nil {
// Don't replicate uncopied tables.
Expand All @@ -134,7 +135,7 @@ func buildReplicatorPlan(filter *binlogdatapb.Filter, tableKeys map[string][]str
if rule == nil {
continue
}
tablePlan, err := buildTablePlan(tableName, rule.Filter, tableKeys, lastpk)
tablePlan, err := buildTablePlan(tableName, rule.Filter, pkInfoMap, lastpk)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -173,7 +174,7 @@ func MatchTable(tableName string, filter *binlogdatapb.Filter) (*binlogdatapb.Ru
return nil, nil
}

func buildTablePlan(tableName, filter string, tableKeys map[string][]string, lastpk *sqltypes.Result) (*TablePlan, error) {
func buildTablePlan(tableName, filter string, pkInfoMap map[string][]*PrimaryKeyInfo, lastpk *sqltypes.Result) (*TablePlan, error) {
query := filter
// generate equivalent select statement if filter is empty or a keyrange.
switch {
Expand Down Expand Up @@ -222,6 +223,7 @@ func buildTablePlan(tableName, filter string, tableKeys map[string][]string, las
},
selColumns: make(map[string]bool),
lastpk: lastpk,
pkInfos: pkInfoMap[tableName],
}

if err := tpb.analyzeExprs(sel.SelectExprs); err != nil {
Expand All @@ -242,17 +244,17 @@ func buildTablePlan(tableName, filter string, tableKeys map[string][]string, las
if err := tpb.analyzeGroupBy(sel.GroupBy); err != nil {
return nil, err
}
if err := tpb.analyzePK(tableKeys); err != nil {
if err := tpb.analyzePK(pkInfoMap); err != nil {
return nil, err
}

sendRule.Filter = sqlparser.String(tpb.sendSelect)
tablePlan := tpb.generate(tableKeys)
tablePlan := tpb.generate()
tablePlan.SendRule = sendRule
return tablePlan, nil
}

func (tpb *tablePlanBuilder) generate(tableKeys map[string][]string) *TablePlan {
func (tpb *tablePlanBuilder) generate() *TablePlan {
refmap := make(map[string]bool)
for _, cexpr := range tpb.pkCols {
for k := range cexpr.references {
Expand Down Expand Up @@ -450,13 +452,13 @@ func (tpb *tablePlanBuilder) analyzeGroupBy(groupBy sqlparser.GroupBy) error {
}

// analyzePK builds tpb.pkCols.
func (tpb *tablePlanBuilder) analyzePK(tableKeys map[string][]string) error {
pkcols, ok := tableKeys[tpb.name.String()]
func (tpb *tablePlanBuilder) analyzePK(pkInfoMap map[string][]*PrimaryKeyInfo) error {
pkcols, ok := pkInfoMap[tpb.name.String()]
if !ok {
return fmt.Errorf("table %s not found in schema", tpb.name)
}
for _, pkcol := range pkcols {
cexpr := tpb.findCol(sqlparser.NewColIdent(pkcol))
cexpr := tpb.findCol(sqlparser.NewColIdent(pkcol.Name))
if cexpr == nil {
return fmt.Errorf("primary key column %s not found in select list", pkcol)
}
Expand Down Expand Up @@ -669,17 +671,40 @@ func (tpb *tablePlanBuilder) generateWhere(buf *sqlparser.TrackedBuffer, bvf *bi
}
}

func (tpb *tablePlanBuilder) getCharsetAndCollation(pkname string) (charSet string, collation string) {
for _, pkInfo := range tpb.pkInfos {
if strings.EqualFold(pkInfo.Name, pkname) {
if pkInfo.CharSet != "" {
charSet = fmt.Sprintf(" _%s ", pkInfo.CharSet)
}
if pkInfo.Collation != "" {
collation = fmt.Sprintf(" COLLATE %s ", pkInfo.Collation)
}
}
}
return charSet, collation
}

func (tpb *tablePlanBuilder) generatePKConstraint(buf *sqlparser.TrackedBuffer, bvf *bindvarFormatter) {
type charSetCollation struct {
charSet string
collation string
}
var charSetCollations []*charSetCollation
separator := "("
for _, pkname := range tpb.lastpk.Fields {
buf.Myprintf("%s%v", separator, &sqlparser.ColName{Name: sqlparser.NewColIdent(pkname.Name)})
charSet, collation := tpb.getCharsetAndCollation(pkname.Name)
charSetCollations = append(charSetCollations, &charSetCollation{charSet: charSet, collation: collation})
buf.Myprintf("%s%s%v%s", separator, charSet, &sqlparser.ColName{Name: sqlparser.NewColIdent(pkname.Name)}, collation)
separator = ","
}
separator = ") <= ("
for _, val := range tpb.lastpk.Rows[0] {
for i, val := range tpb.lastpk.Rows[0] {
buf.WriteString(separator)
buf.WriteString(charSetCollations[i].charSet)
separator = ","
val.EncodeSQL(buf)
buf.WriteString(charSetCollations[i].collation)
}
buf.WriteString(")")
}
Expand Down
7 changes: 5 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func newVCopier(vr *vreplicator) *vcopier {
func (vc *vcopier) initTablesForCopy(ctx context.Context) error {
defer vc.vr.dbClient.Rollback()

plan, err := buildReplicatorPlan(vc.vr.source.Filter, vc.vr.tableKeys, nil)
plan, err := buildReplicatorPlan(vc.vr.source.Filter, vc.vr.pkInfoMap, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -198,7 +198,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma

log.Infof("Copying table %s, lastpk: %v", tableName, copyState[tableName])

plan, err := buildReplicatorPlan(vc.vr.source.Filter, vc.vr.tableKeys, nil)
plan, err := buildReplicatorPlan(vc.vr.source.Filter, vc.vr.pkInfoMap, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -259,6 +259,9 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
_, err = vc.tablePlan.applyBulkInsert(rows, func(sql string) (*sqltypes.Result, error) {
start := time.Now()
qr, err := vc.vr.dbClient.ExecuteWithRetry(ctx, sql)
if err != nil {
return nil, err
}
vc.vr.stats.QueryTimings.Record("copy", start)

vc.vr.stats.CopyRowCount.Add(int64(qr.RowsAffected))
Expand Down
Loading