Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/fk_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package vreplication
// (child before parent), t1,t2 are in lexical order, and t11,t12 have valid circular foreign key constraints.
var (
initialFKSchema = `
create table parent(id int, name varchar(128), primary key(id)) engine=innodb;
create table parent(id int, name varchar(128), primary key(id), key(name)) engine=innodb;
create table child(id int, parent_id int, name varchar(128), primary key(id), foreign key(parent_id) references parent(id) on delete cascade) engine=innodb;
create view vparent as select * from parent;
create table t1(id int, name varchar(128), primary key(id)) engine=innodb;
Expand Down
3 changes: 3 additions & 0 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
Expand Down Expand Up @@ -135,6 +136,8 @@ func TestFKWorkflow(t *testing.T) {
require.Greater(t, t11Count, 1)
require.Greater(t, t12Count, 1)
require.Equal(t, t11Count, t12Count)
// Check for the secondary key
confirmTablesHaveSecondaryKeys(t, []*cluster.VttabletProcess{targetTab}, targetKeyspace, "parent")
}

}
Expand Down
3 changes: 2 additions & 1 deletion go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,8 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro
}
select {
case <-timer.C:
require.FailNow(t, "The following table(s) do not have any secondary keys: %s", strings.Join(tablesWithoutSecondaryKeys, ", "))
failureMessage := fmt.Sprintf("The following table(s) do not have any secondary keys: %s", strings.Join(tablesWithoutSecondaryKeys, ", "))
require.FailNow(t, failureMessage)
default:
time.Sleep(defaultTick)
}
Expand Down
24 changes: 17 additions & 7 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
copyWorkQueue = vc.newCopyWorkQueue(parallelism, copyWorkerFactory)
if state.currentTableName != "" {
log.Infof("copy of table %s is done at lastpk %+v", state.currentTableName, lastpkbv)
if err := vc.deleteCopyState(state.currentTableName); err != nil {
if err := vc.runPostCopyActionsAndDeleteCopyState(ctx, state.currentTableName); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -294,7 +294,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
if copyWorkQueue != nil {
copyWorkQueue.close()
}
if err := vc.deleteCopyState(state.currentTableName); err != nil {
if err := vc.runPostCopyActionsAndDeleteCopyState(ctx, state.currentTableName); err != nil {
return err
}
if err := vc.updatePos(ctx, gtid); err != nil {
Expand All @@ -305,11 +305,21 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
return nil
}

// deleteCopyState deletes the copy state entry for a table, signifying that the copy phase is complete for that table.
func (vc *vcopier) deleteCopyState(tableName string) error {
log.Infof("Deleting copy state for table %s", tableName)
delQuery := fmt.Sprintf("delete from _vt.copy_state where table_name=%s and vrepl_id = %d", encodeString(tableName), vc.vr.id)
if _, err := vc.vr.dbClient.Execute(delQuery); err != nil {
// runPostCopyActionsAndDeleteCopyState runs post copy actions and deletes the
// copy state entry for a table, signifying that the copy phase is complete for
// that table.
func (vc *vcopier) runPostCopyActionsAndDeleteCopyState(ctx context.Context, tableName string) error {
if err := vc.vr.execPostCopyActions(ctx, tableName); err != nil {
return vterrors.Wrapf(err, "failed to execute post copy actions for table %q", tableName)
}
log.Infof("Deleting copy state and post copy actions for table %s", tableName)
delQueryBuf := sqlparser.NewTrackedBuffer(nil)
delQueryBuf.Myprintf(
"delete cs, pca from _vt.%s as cs left join _vt.%s as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name where cs.vrepl_id=%d and cs.table_name=%s",
copyStateTableName, postCopyActionTableName,
vc.vr.id, encodeString(tableName),
)
if _, err := vc.vr.dbClient.Execute(delQueryBuf.String()); err != nil {
return err
}
return nil
Expand Down
Loading