Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 18 additions & 7 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1101,9 +1101,9 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
if ctx.Err() != nil {
// Even though we create a new context later on we still record any context error:
// for forensics in case of failures.
ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err())
ts.Logger().Infof("cancelMigration (%v): original context invalid: %s", ts.WorkflowName(), ctx.Err())
}

ts.Logger().Infof("cancelMigration (%v): starting", ts.WorkflowName())
// We create a new context while canceling the migration, so that we are independent of the original
// context being canceled prior to or during the cancel operation itself.
// First we create a copy of the parent context, so that we maintain the locks, but which cannot be
Expand All @@ -1115,20 +1115,27 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
err = ts.switchDeniedTables(cmCtx, true /* revert */)
if !ts.IsMultiTenantMigration() {
ts.Logger().Infof("cancelMigration (%v): adding denied tables to target", ts.WorkflowName())
err = ts.switchDeniedTables(cmCtx, true /* revert */)
} else {
ts.Logger().Infof("cancelMigration (%v): multi-tenant, not adding denied tables to target", ts.WorkflowName())
}
} else {
ts.Logger().Infof("cancelMigration (%v): allowing writes on source shards", ts.WorkflowName())
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not revert denied tables / shard access: %v", ts.WorkflowName(), err)
}

if err := sm.CancelStreamMigrations(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not cancel stream migrations: %v", ts.WorkflowName(), err)
}

ts.Logger().Infof("cancelMigration (%v): restarting vreplication workflows", ts.WorkflowName())
err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
Expand All @@ -1137,17 +1144,21 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
})
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not restart vreplication: %v", ts.WorkflowName(), err)
}

ts.Logger().Infof("cancelMigration (%v): deleting reverse vreplication workflows", ts.WorkflowName())
if err := ts.deleteReverseVReplication(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not delete reverse vreplication streams: %v", ts.WorkflowName(), err)
}

if cancelErrs.HasErrors() {
ts.Logger().Errorf("Cancel migration failed for %v, manual cleanup work may be necessary: %v", ts.WorkflowName(), cancelErrs.AggrError(vterrors.Aggregate))
return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary")
}

ts.Logger().Infof("cancelMigration (%v): completed", ts.WorkflowName())
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,15 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
source: &binlogdatapb.BinlogSource{},
}
ct.sourceTablet.Store(&topodatapb.TabletAlias{})
log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params)

id, err := strconv.ParseInt(params["id"], 10, 32)
if err != nil {
return nil, err
}
ct.id = int32(id)
ct.workflow = params["workflow"]
log.Infof("creating controller with id: %v, name: %v, cell: %v, tabletTypes: %v", ct.id, ct.workflow, cell, tabletTypesStr)

ct.lastWorkflowError = vterrors.NewLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), maxTimeToRetryError)

state := params["state"]
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ func (vp *vplayer) updateFKCheck(ctx context.Context, flags2 uint32) error {
// one. This allows for the apply thread to catch up more quickly if
// a backlog builds up.
func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vp.vr.id, vp.startPos, vp.stopPos, vp.vr.source)
log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vp.vr.id, vp.vr.WorkflowName, vp.startPos, vp.stopPos)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
Loading