diff --git a/go/test/endtoend/vreplication/fk_config_test.go b/go/test/endtoend/vreplication/fk_config_test.go index db446b78b5a..5678da7f3f6 100644 --- a/go/test/endtoend/vreplication/fk_config_test.go +++ b/go/test/endtoend/vreplication/fk_config_test.go @@ -20,7 +20,7 @@ package vreplication // (child before parent), t1,t2 are in lexical order, and t11,t12 have valid circular foreign key constraints. var ( initialFKSchema = ` -create table parent(id int, name varchar(128), primary key(id)) engine=innodb; +create table parent(id int, name varchar(128), primary key(id), key(name)) engine=innodb; create table child(id int, parent_id int, name varchar(128), primary key(id), foreign key(parent_id) references parent(id) on delete cascade) engine=innodb; create view vparent as select * from parent; create table t1(id int, name varchar(128), primary key(id)) engine=innodb; diff --git a/go/test/endtoend/vreplication/fk_test.go b/go/test/endtoend/vreplication/fk_test.go index a349a94ffa1..71a78c7f800 100644 --- a/go/test/endtoend/vreplication/fk_test.go +++ b/go/test/endtoend/vreplication/fk_test.go @@ -28,6 +28,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" @@ -135,6 +136,8 @@ func TestFKWorkflow(t *testing.T) { require.Greater(t, t11Count, 1) require.Greater(t, t12Count, 1) require.Equal(t, t11Count, t12Count) + // Check for the secondary key + confirmTablesHaveSecondaryKeys(t, []*cluster.VttabletProcess{targetTab}, targetKeyspace, "parent") } } diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index 68e523b0030..fe8fb22b60c 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -474,7 +474,8 @@ func confirmTablesHaveSecondaryKeys(t *testing.T, tablets []*cluster.VttabletPro } select { case <-timer.C: - require.FailNow(t, "The following table(s) do not have any secondary keys: %s", strings.Join(tablesWithoutSecondaryKeys, ", ")) + failureMessage := fmt.Sprintf("The following table(s) do not have any secondary keys: %s", strings.Join(tablesWithoutSecondaryKeys, ", ")) + require.FailNow(t, failureMessage) default: time.Sleep(defaultTick) } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go index aae17027bc4..32594461e6b 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go @@ -136,7 +136,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings copyWorkQueue = vc.newCopyWorkQueue(parallelism, copyWorkerFactory) if state.currentTableName != "" { log.Infof("copy of table %s is done at lastpk %+v", state.currentTableName, lastpkbv) - if err := vc.deleteCopyState(state.currentTableName); err != nil { + if err := vc.runPostCopyActionsAndDeleteCopyState(ctx, state.currentTableName); err != nil { return err } } else { @@ -294,7 +294,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings if copyWorkQueue != nil { copyWorkQueue.close() } - if err := vc.deleteCopyState(state.currentTableName); err != nil { + if err := vc.runPostCopyActionsAndDeleteCopyState(ctx, state.currentTableName); err != nil { return err } if err := vc.updatePos(ctx, gtid); err != nil { @@ -305,11 +305,21 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings return nil } -// deleteCopyState deletes the copy state entry for a table, signifying that the copy phase is complete for that table. -func (vc *vcopier) deleteCopyState(tableName string) error { - log.Infof("Deleting copy state for table %s", tableName) - delQuery := fmt.Sprintf("delete from _vt.copy_state where table_name=%s and vrepl_id = %d", encodeString(tableName), vc.vr.id) - if _, err := vc.vr.dbClient.Execute(delQuery); err != nil { +// runPostCopyActionsAndDeleteCopyState runs post copy actions and deletes the +// copy state entry for a table, signifying that the copy phase is complete for +// that table. +func (vc *vcopier) runPostCopyActionsAndDeleteCopyState(ctx context.Context, tableName string) error { + if err := vc.vr.execPostCopyActions(ctx, tableName); err != nil { + return vterrors.Wrapf(err, "failed to execute post copy actions for table %q", tableName) + } + log.Infof("Deleting copy state and post copy actions for table %s", tableName) + delQueryBuf := sqlparser.NewTrackedBuffer(nil) + delQueryBuf.Myprintf( + "delete cs, pca from _vt.%s as cs left join _vt.%s as pca on cs.vrepl_id=pca.vrepl_id and cs.table_name=pca.table_name where cs.vrepl_id=%d and cs.table_name=%s", + copyStateTableName, postCopyActionTableName, + vc.vr.id, encodeString(tableName), + ) + if _, err := vc.vr.dbClient.Execute(delQueryBuf.String()); err != nil { return err } return nil