diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 14890a26b00..d1ab3541f1d 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1101,9 +1101,9 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat if ctx.Err() != nil { // Even though we create a new context later on we still record any context error: // for forensics in case of failures. - ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err()) + ts.Logger().Infof("cancelMigration (%v): original context invalid: %s", ts.WorkflowName(), ctx.Err()) } - + ts.Logger().Infof("cancelMigration (%v): starting", ts.WorkflowName()) // We create a new context while canceling the migration, so that we are independent of the original // context being canceled prior to or during the cancel operation itself. // First we create a copy of the parent context, so that we maintain the locks, but which cannot be @@ -1115,20 +1115,27 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat defer cmCancel() if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - err = ts.switchDeniedTables(cmCtx, true /* revert */) + if !ts.IsMultiTenantMigration() { + ts.Logger().Infof("cancelMigration (%v): adding denied tables to target", ts.WorkflowName()) + err = ts.switchDeniedTables(cmCtx, true /* revert */) + } else { + ts.Logger().Infof("cancelMigration (%v): multi-tenant, not adding denied tables to target", ts.WorkflowName()) + } } else { + ts.Logger().Infof("cancelMigration (%v): allowing writes on source shards", ts.WorkflowName()) err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err)) - ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err) + ts.Logger().Errorf("Cancel migration failed (%v): could not revert denied tables / shard access: %v", ts.WorkflowName(), err) } if err := sm.CancelStreamMigrations(cmCtx); err != nil { cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err)) - ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err) + ts.Logger().Errorf("Cancel migration failed (%v): could not cancel stream migrations: %v", ts.WorkflowName(), err) } + ts.Logger().Infof("cancelMigration (%v): restarting vreplication workflows", ts.WorkflowName()) err = ts.ForAllTargets(func(target *MigrationTarget) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) @@ -1137,17 +1144,21 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat }) if err != nil { cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err)) - ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) + ts.Logger().Errorf("Cancel migration failed (%v): could not restart vreplication: %v", ts.WorkflowName(), err) } + ts.Logger().Infof("cancelMigration (%v): deleting reverse vreplication workflows", ts.WorkflowName()) if err := ts.deleteReverseVReplication(cmCtx); err != nil { cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err)) - ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err) + ts.Logger().Errorf("Cancel migration failed (%v): could not delete reverse vreplication streams: %v", ts.WorkflowName(), err) } if cancelErrs.HasErrors() { + ts.Logger().Errorf("Cancel migration failed for %v, manual cleanup work may be necessary: %v", ts.WorkflowName(), cancelErrs.AggrError(vterrors.Aggregate)) return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary") } + + ts.Logger().Infof("cancelMigration (%v): completed", ts.WorkflowName()) return nil } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 581244eebb3..3f420afded5 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -86,7 +86,6 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor source: &binlogdatapb.BinlogSource{}, } ct.sourceTablet.Store(&topodatapb.TabletAlias{}) - log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params) id, err := strconv.ParseInt(params["id"], 10, 32) if err != nil { @@ -94,6 +93,8 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor } ct.id = int32(id) ct.workflow = params["workflow"] + log.Infof("creating controller with id: %v, name: %v, cell: %v, tabletTypes: %v", ct.id, ct.workflow, cell, tabletTypesStr) + ct.lastWorkflowError = vterrors.NewLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), maxTimeToRetryError) state := params["state"] diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index eb173407f1b..eb43fa70e1e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -251,7 +251,7 @@ func (vp *vplayer) updateFKCheck(ctx context.Context, flags2 uint32) error { // one. This allows for the apply thread to catch up more quickly if // a backlog builds up. func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) { - log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vp.vr.id, vp.startPos, vp.stopPos, vp.vr.source) + log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vp.vr.id, vp.vr.WorkflowName, vp.startPos, vp.stopPos) ctx, cancel := context.WithCancel(ctx) defer cancel()