Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/topotools/mirror_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func GetMirrorRules(ctx context.Context, ts *topo.Server) (map[string]map[string
// SaveMirrorRules converts a mapping of fromTable=>[]toTables into a
// vschemapb.MirrorRules protobuf message and saves it in the topology.
func SaveMirrorRules(ctx context.Context, ts *topo.Server, rules map[string]map[string]float32) error {
log.Infof("Saving mirror rules %v\n", rules)
log.V(2).Infof("Saving mirror rules %v\n", rules)

rrs := &vschemapb.MirrorRules{Rules: make([]*vschemapb.MirrorRule, 0)}
for fromTable, mrs := range rules {
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2977,7 +2977,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
time.Sleep(lockTablesCycleDelay)
}
}

// Get the source positions now that writes are stopped, the streams were stopped (e.g.
// intra-keyspace materializations that write on the source), and we know for certain
// that any in progress writes are done.
Expand Down
25 changes: 18 additions & 7 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1154,9 +1154,9 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
if ctx.Err() != nil {
// Even though we create a new context later on we still record any context error:
// for forensics in case of failures.
ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err())
ts.Logger().Infof("cancelMigration (%v): original context invalid: %s", ts.WorkflowName(), ctx.Err())
}

ts.Logger().Infof("cancelMigration (%v): starting", ts.WorkflowName())
// We create a new context while canceling the migration, so that we are independent of the original
// context being canceled prior to or during the cancel operation itself.
// First we create a copy of the parent context, so that we maintain the locks, but which cannot be
Expand All @@ -1168,20 +1168,27 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
err = ts.switchDeniedTables(cmCtx, true /* revert */)
if !ts.IsMultiTenantMigration() {
ts.Logger().Infof("cancelMigration (%v): adding denied tables to target", ts.WorkflowName())
err = ts.switchDeniedTables(cmCtx, true /* revert */)
} else {
ts.Logger().Infof("cancelMigration (%v): multi-tenant, not adding denied tables to target", ts.WorkflowName())
}
} else {
ts.Logger().Infof("cancelMigration (%v): allowing writes on source shards", ts.WorkflowName())
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not revert denied tables / shard access: %v", ts.WorkflowName(), err)
}

if err := sm.CancelStreamMigrations(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not cancel stream migrations: %v", ts.WorkflowName(), err)
}

ts.Logger().Infof("cancelMigration (%v): restarting vreplication workflows", ts.WorkflowName())
err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
Expand All @@ -1190,17 +1197,21 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
})
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not restart vreplication: %v", ts.WorkflowName(), err)
}

ts.Logger().Infof("cancelMigration (%v): deleting reverse vreplication workflows", ts.WorkflowName())
if err := ts.deleteReverseVReplication(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not delete reverse vreplication streams: %v", ts.WorkflowName(), err)
}

if cancelErrs.HasErrors() {
ts.Logger().Errorf("Cancel migration failed for %v, manual cleanup work may be necessary: %v", ts.WorkflowName(), cancelErrs.AggrError(vterrors.Aggregate))
return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary")
}

ts.Logger().Infof("cancelMigration (%v): completed", ts.WorkflowName())
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,15 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
}
blpStats.WorkflowConfig = workflowConfig.String()
ct.sourceTablet.Store(&topodatapb.TabletAlias{})
log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params)

id, err := strconv.ParseInt(params["id"], 10, 32)
if err != nil {
return nil, err
}
ct.id = int32(id)
ct.workflow = params["workflow"]
log.Infof("creating controller with id: %v, name: %v, cell: %v, tabletTypes: %v", ct.id, ct.workflow, cell, tabletTypesStr)

ct.lastWorkflowError = vterrors.NewLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), workflowConfig.MaxTimeToRetryError)

state := params["state"]
Expand Down
5 changes: 3 additions & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
settings.StopPos = pausePos
saveStop = false
}
log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %+v",
log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vr.id, vr.WorkflowName, settings.StartPos, settings.StopPos)
log.V(2).Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %+v",
vr.id, settings.StartPos, settings.StopPos, vr.source.Filter)
queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) {
return vr.dbClient.ExecuteWithRetry(ctx, sql)
Expand Down Expand Up @@ -266,7 +267,7 @@ func (vp *vplayer) updateFKCheck(ctx context.Context, flags2 uint32) error {
// one. This allows for the apply thread to catch up more quickly if
// a backlog builds up.
func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vp.vr.id, vp.startPos, vp.stopPos, vp.vr.source)
log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vp.vr.id, vp.vr.WorkflowName, vp.startPos, vp.stopPos)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down
Loading