diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go index 89095156471..0153526a139 100644 --- a/go/vt/topo/etcd2topo/lock.go +++ b/go/vt/topo/etcd2topo/lock.go @@ -22,7 +22,6 @@ import ( "path" "github.com/spf13/pflag" - "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" diff --git a/go/vt/topo/keyspace_lock.go b/go/vt/topo/keyspace_lock.go index 7df1b2ee64f..f13e6303610 100644 --- a/go/vt/topo/keyspace_lock.go +++ b/go/vt/topo/keyspace_lock.go @@ -19,6 +19,16 @@ package topo import ( "context" "path" + "time" + + "vitess.io/vitess/go/vt/vterrors" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" +) + +const ( + MaxKeyspaceLockLeaseTTL = 10 * time.Minute + leaseRenewalInterval = 10 * time.Second ) type keyspaceLock struct { @@ -56,3 +66,52 @@ func CheckKeyspaceLocked(ctx context.Context, keyspace string) error { keyspace: keyspace, }) } + +// LockKeyspaceWithLeaseRenewal locks the keyspace and starts a goroutine which runs +// until the MaxKeyspaceLockLeaseTTL is reached -- exiting if the context is +// cancelled, the unlock function is called, or an error is encountered -- refreshing +// the lock's lease every leaseRenewal until it ends. +// It returns a read-only error channel that you should regularly check to ensure that +// you have not lost the lock or encountered any other non-recoverable errors related +// to your lease. +func (ts *Server) LockKeyspaceWithLeaseRenewal(ctx context.Context, keyspace, action string) (context.Context, func(*error), <-chan error, error) { + ksLock := &keyspaceLock{keyspace: keyspace} + lockCtx, unlockF, err := ts.internalLock(ctx, ksLock, action, true) + if err != nil { + return nil, nil, nil, err + } + done := make(chan struct{}) // Our work is done and we should exit + errCh := make(chan error, 1) // Communicate any errors encountered during renewals + go func() { + defer close(errCh) + maxRenewals := int((MaxKeyspaceLockLeaseTTL.Seconds() - leaseRenewalInterval.Seconds()) / leaseRenewalInterval.Seconds()) + for i := 0; i < maxRenewals; i++ { + time.Sleep(leaseRenewalInterval) + select { + case <-lockCtx.Done(): + return + case <-done: + return + default: + // Attempt to renew the lease. + if err := checkLocked(lockCtx, ksLock); err != nil { + errCh <- vterrors.Wrapf(err, "failed to renew keyspace %s lock lease", keyspace) + return + } + } + } + time.Sleep(leaseRenewalInterval) + select { + case <-done: + default: + errCh <- vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "cannot renew keyspace %s lock lease as we've hit the time limit of %v", + keyspace, MaxKeyspaceLockLeaseTTL) + } + }() + // Add to the unlock function to end the lease renewal work. + newUnlockF := func(err *error) { + close(done) + unlockF(err) + } + return lockCtx, newUnlockF, errCh, nil +} diff --git a/go/vt/vtctl/workflow/resharder.go b/go/vt/vtctl/workflow/resharder.go index 95fcea3a2a9..4f4ed34963a 100644 --- a/go/vt/vtctl/workflow/resharder.go +++ b/go/vt/vtctl/workflow/resharder.go @@ -99,6 +99,9 @@ func (s *Server) buildResharder(ctx context.Context, keyspace, workflow string, if err != nil { return nil, vterrors.Wrapf(err, "GetShard(%s) failed", shard) } + if si.PrimaryAlias == nil { + return nil, fmt.Errorf("target shard %v has no primary tablet", shard) + } if si.IsPrimaryServing { return nil, fmt.Errorf("target shard %v is in serving state", shard) } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 587caff3c8c..36397974a0c 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -80,6 +80,8 @@ const ( rdonlyTabletSuffix = "@rdonly" // Globally routable tables don't have a keyspace prefix. globalTableQualifier = "" + + maxKeyspaceLockLeaseTTL = 10 * time.Minute ) var tabletTypeSuffixes = []string{primaryTabletSuffix, replicaTabletSuffix, rdonlyTabletSuffix} @@ -1460,13 +1462,25 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } sw := &switcher{s: s, ts: ts} - lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate") + + // Lock the target keyspace while we setup the workflow and ensure that we're able + // to hold the lock throughout. + lockCtx, targetUnlock, leaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate") if lockErr != nil { ts.Logger().Errorf("Locking target keyspace %s failed: %v", ts.TargetKeyspaceName(), lockErr) return nil, lockErr } defer targetUnlock(&err) ctx = lockCtx + // Check for errors with the lock lease renewal. + haveLeaseError := func() error { + select { + case err := <-leaseErrCh: + return err + default: + return nil + } + } // If we get an error after this point, where the vreplication streams/records // have been created, then we clean up the workflow's artifacts. @@ -1486,26 +1500,41 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if cerr := s.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil { err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr) } + if leaseErr := haveLeaseError(); leaseErr != nil { // We only check at the end as an FYI that we lost the lease + log.Warningf("keyspace lock lease was lost when cleaning up after a failed MoveTablesCreate: %v", leaseErr) + } } }() // Now that the streams have been successfully created, let's put the associated // routing rules and denied tables entries in place. if externalTopo == nil { + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := s.setupInitialRoutingRules(ctx, req, mz, tables); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } // We added to the vschema. if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { return nil, err } } if isStandardMoveTables() { // Non-standard ones do not use shard scoped mechanisms + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := s.setupInitialDeniedTables(ctx, ts); err != nil { return nil, vterrors.Wrapf(err, "failed to put initial denied tables entries in place on the target shards") } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } @@ -1516,6 +1545,9 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } tabletShards, err := s.collectTargetStreams(ctx, mz) if err != nil { return nil, err @@ -1527,6 +1559,9 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } if mz.ms.ExternalCluster == "" { + if leaseErr := haveLeaseError(); leaseErr != nil { + err = leaseErr + } exists, tablets, err := s.checkIfPreviousJournalExists(ctx, mz, migrationID) if err != nil { return nil, err @@ -1542,6 +1577,9 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } if req.AutoStart { + if leaseErr := haveLeaseError(); leaseErr != nil { + err = leaseErr + } if err := mz.startStreams(ctx); err != nil { return nil, err } @@ -2572,44 +2610,87 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, sw = &switcher{s: s, ts: ts} } var tctx context.Context - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets") + + // Need to lock both source and target keyspaces. + // Ensure that the leases are renewed and we're able to hold the locks during the + // cleanup work. + tctx, sourceUnlock, sourceLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets") if lockErr != nil { ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr } defer sourceUnlock(&err) ctx = tctx - + // Check for errors with the lock lease renewal. + haveLeaseError := func() error { + select { + case err := <-sourceLeaseErrCh: + return err + default: + return nil + } + } if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets") + tctx, targetUnlock, targetLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) ctx = tctx + // Check for errors with the lock lease renewal in both keyspaces. + haveLeaseError = func() error { + select { + case err := <-sourceLeaseErrCh: + return err + case err := <-targetLeaseErrCh: + return err + default: + return nil + } + } + } + if !keepData { switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.removeTargetTables(ctx); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropSourceDeniedTables(ctx); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropTargetDeniedTables(ctx); err != nil { return nil, err } case binlogdatapb.MigrationType_SHARDS: + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropTargetShards(ctx); err != nil { return nil, err } } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := s.dropRelatedArtifacts(ctx, keepRoutingRules, sw); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } @@ -2764,23 +2845,51 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy sw = &switcher{ts: ts, s: s} } var tctx context.Context - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources") + + // Need to lock both source and target keyspaces. + // Ensure that the leases are renewed and we're able to hold the locks during the + // cleanup work. + tctx, sourceUnlock, sourceLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources") if lockErr != nil { ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr } defer sourceUnlock(&err) ctx = tctx + // Check for errors with the lock lease renewal. + haveLeaseError := func() error { + select { + case err := <-sourceLeaseErrCh: + return err + default: + return nil + } + } if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources") + tctx, targetUnlock, targetLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) ctx = tctx + // Check for errors with the lock lease renewal in both keyspaces. + haveLeaseError = func() error { + select { + case err := <-sourceLeaseErrCh: + return err + case err := <-targetLeaseErrCh: + return err + default: + return nil + } + } } + if !force { + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.validateWorkflowHasCompleted(ctx); err != nil { ts.Logger().Errorf("Workflow has not completed, cannot DropSources: %v", err) return nil, err @@ -2790,26 +2899,44 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: log.Infof("Deleting tables") + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.removeSourceTables(ctx, removalType); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropSourceDeniedTables(ctx); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropTargetDeniedTables(ctx); err != nil { return nil, err } case binlogdatapb.MigrationType_SHARDS: log.Infof("Removing shards") + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropSourceShards(ctx); err != nil { return nil, err } } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := s.dropArtifacts(ctx, keepRoutingRules, sw); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } @@ -2998,26 +3125,48 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche sw = &switcher{s: s, ts: ts} } var tctx context.Context - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow") + + // Lock the target keyspace and ensure that the lease is renewed and we're able to hold + // the lock during the finalization. + tctx, targetUnlock, leaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) ctx = tctx + // Check for errors with the lock lease renewal. + haveLeaseError := func() error { + select { + case err := <-leaseErrCh: + return err + default: + return nil + } + } + if err := sw.dropTargetVReplicationStreams(ctx); err != nil { return nil, err } if !cancel { + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.addParticipatingTablesToKeyspace(ctx, ts.targetKeyspace, tableSpecs); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } } log.Infof("cancel is %t, keepData %t", cancel, keepData) if cancel && !keepData { + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.removeTargetTables(ctx); err != nil { return nil, err } @@ -3238,12 +3387,23 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return handleError("workflow validation failed", err) } - // For reads, locking the source keyspace is sufficient. - ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads") + // Ensure that the lease is renewed and we're able to hold the locks during the + // traffic switching. + // For reads, only locking the source keyspace is sufficient. + ctx, unlock, leaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads") if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer unlock(&err) + // Check for errors with the lock lease renewal. + haveLeaseError := func() error { + select { + case err := <-leaseErrCh: + return err + default: + return nil + } + } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { switch { @@ -3263,6 +3423,9 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } return sw.logs(), nil } + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the shards", err) @@ -3310,20 +3473,42 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } - // Need to lock both source and target keyspaces. - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") + // We need to lock both source and target keyspaces. + // Ensure that the leases are renewed and we're able to hold the locks during the + // traffic switching. + tctx, sourceUnlock, sourceLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } - ctx = tctx defer sourceUnlock(&err) + ctx = tctx + // Check for errors with the lock lease renewal. + haveLeaseError := func() error { + select { + case err := <-sourceLeaseErrCh: + return err + default: + return nil + } + } if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites") + tctx, targetUnlock, targetLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites") if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } - ctx = tctx defer targetUnlock(&err) + ctx = tctx + // Check for errors with the lock lease renewal in both keyspaces. + haveLeaseError = func() error { + select { + case err := <-sourceLeaseErrCh: + return err + case err := <-targetLeaseErrCh: + return err + default: + return nil + } + } } // Find out if the target is using any sequence tables for auto_increment @@ -3335,18 +3520,27 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit if req.InitializeTargetSequences && ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && ts.SourceKeyspaceSchema() != nil && ts.SourceKeyspaceSchema().Keyspace != nil && !ts.SourceKeyspaceSchema().Keyspace.Sharded { + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } sequenceMetadata, err = ts.getTargetSequenceMetadata(ctx) if err != nil { return handleError(fmt.Sprintf("failed to get the sequence information in the %s keyspace", ts.TargetKeyspaceName()), err) } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } // If no journals exist, sourceWorkflows will be initialized by sm.MigrateStreams. journalsExist, sourceWorkflows, err := ts.checkJournals(ctx) if err != nil { return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err) } if !journalsExist { + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } ts.Logger().Infof("No previous journals were found. Proceeding normally.") sm, err := BuildStreamMigrator(ctx, ts, cancel, s.env.Parser()) if err != nil { @@ -3364,12 +3558,18 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // For intra-keyspace materialization streams that we migrate where the source and target are // the keyspace being resharded, we wait for those to catchup in the stopStreams path before // we actually stop them. + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } ts.Logger().Infof("Stopping source writes") if err := sw.stopSourceWrites(ctx); err != nil { sw.cancelMigration(ctx, sm) return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) } + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } ts.Logger().Infof("Stopping streams") // Use a shorter context for this since since when doing a Reshard, if there are intra-keyspace // materializations then we have to wait for them to catchup before switching traffic for the @@ -3388,6 +3588,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err) } + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { ts.Logger().Infof("Executing LOCK TABLES on source tables %d times", lockTablesCycles) // Doing this twice with a pause in-between to catch any writes that may have raced in between @@ -3403,24 +3606,36 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } ts.Logger().Infof("Waiting for streams to catchup") if err := sw.waitForCatchup(ctx, timeout); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to sync up replication between the source and target", err) } + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } ts.Logger().Infof("Migrating streams") if err := sw.migrateStreams(ctx, sm); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to migrate the workflow streams", err) } + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } ts.Logger().Infof("Resetting sequences") if err := sw.resetSequences(ctx); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to reset the sequences", err) } + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } ts.Logger().Infof("Creating reverse streams") if err := sw.createReverseVReplication(ctx); err != nil { sw.cancelMigration(ctx, sm) @@ -3429,10 +3644,16 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // Initialize any target sequences, if there are any, before allowing new writes. if req.InitializeTargetSequences && len(sequenceMetadata) > 0 { + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } ts.Logger().Infof("Initializing target sequences") // Writes are blocked so we can safely initialize the sequence tables but // we also want to use a shorter timeout than the parent context. // We use at most half of the overall timeout. + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) defer cancel() if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { @@ -3444,6 +3665,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit if cancel { return handleError("invalid cancel", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "traffic switching has reached the point of no return, cannot cancel")) } + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } ts.Logger().Infof("Journals were found. Completing the left over steps.") // Need to gather positions in case all journals were not created. if err := ts.gatherPositions(ctx); err != nil { @@ -3451,26 +3675,48 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } // This is the point of no return. Once a journal is created, // traffic can be redirected to target shards. if err := sw.createJournals(ctx, sourceWorkflows); err != nil { return handleError("failed to create the journal", err) } + + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } if err := sw.allowTargetWrites(ctx); err != nil { return handleError(fmt.Sprintf("failed to allow writes in the %s keyspace", ts.TargetKeyspaceName()), err) } + + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } if err := sw.changeRouting(ctx); err != nil { return handleError("failed to update the routing rules", err) } + + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } if err := sw.streamMigraterfinalize(ctx, ts, sourceWorkflows); err != nil { return handleError("failed to finalize the traffic switch", err) } + if req.EnableReverseReplication { + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } if err := sw.startReverseVReplication(ctx); err != nil { return handleError("failed to start the reverse workflow", err) } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) + } if err := sw.freezeTargetVReplication(ctx); err != nil { return handleError(fmt.Sprintf("failed to freeze the workflow in the %s keyspace", ts.TargetKeyspaceName()), err) } diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go index aa41655aab8..0c5fcf7f615 100644 --- a/go/vt/vtctl/workflow/switcher.go +++ b/go/vt/vtctl/workflow/switcher.go @@ -126,8 +126,8 @@ func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { r.ts.cancelMigration(ctx, sm) } -func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) { - return r.s.ts.LockKeyspace(ctx, keyspace, action) +func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), <-chan error, error) { + return r.s.ts.LockKeyspaceWithLeaseRenewal(ctx, keyspace, action) } func (r *switcher) freezeTargetVReplication(ctx context.Context) error { diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go index b8b1369bdf7..19df89ce38c 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run.go +++ b/go/vt/vtctl/workflow/switcher_dry_run.go @@ -293,11 +293,11 @@ func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrato dr.drLog.Log("Cancel migration as requested") } -func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) (context.Context, func(*error), error) { +func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) (context.Context, func(*error), <-chan error, error) { dr.drLog.Logf("Lock keyspace %s", keyspace) return ctx, func(e *error) { dr.drLog.Logf("Unlock keyspace %s", keyspace) - }, nil + }, nil, nil } func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType TableRemovalType) error { diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go index 0780aaf484c..4d18a5dec6e 100644 --- a/go/vt/vtctl/workflow/switcher_interface.go +++ b/go/vt/vtctl/workflow/switcher_interface.go @@ -24,7 +24,7 @@ import ( ) type iswitcher interface { - lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) + lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), <-chan error, error) cancelMigration(ctx context.Context, sm *StreamMigrator) stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error) stopSourceWrites(ctx context.Context) error diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index d4e8d7b4ec0..3897be6c575 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -28,10 +28,6 @@ import ( "strings" "sync" - querypb "vitess.io/vitess/go/vt/proto/query" - - "vitess.io/vitess/go/vt/vtgate/vindexes" - "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/sets" @@ -46,9 +42,11 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tmclient" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index a98a3ce90f9..f17f2a2a5ce 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -116,12 +116,14 @@ func (td *tableDiffer) initialize(ctx context.Context) error { targetKeyspace := td.wd.ct.vde.thisTablet.Keyspace log.Infof("Locking target keyspace %s", targetKeyspace) - ctx, unlock, lockErr := td.wd.ct.ts.LockKeyspace(ctx, targetKeyspace, "vdiff") + // Ensure that we hold onto the lock. + // Ensure that the lease is renewed and we're able to hold the lock during the + // table diff initialization work. + ctx, unlock, leaseErrCh, lockErr := td.wd.ct.ts.LockKeyspaceWithLeaseRenewal(ctx, targetKeyspace, "vdiff") if lockErr != nil { log.Errorf("LockKeyspace failed: %v", lockErr) return lockErr } - var err error defer func() { unlock(&err) @@ -129,6 +131,20 @@ func (td *tableDiffer) initialize(ctx context.Context) error { log.Errorf("UnlockKeyspace %s failed: %v", targetKeyspace, err) } }() + // Check for errors in our diff goroutine and the lock lease renewal. + haveError := func(inerr error) error { + if inerr != nil { + return inerr + } + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-leaseErrCh: + return err + default: + return nil + } + } if err := td.stopTargetVReplicationStreams(ctx, dbClient); err != nil { return err @@ -140,29 +156,38 @@ func (td *tableDiffer) initialize(ctx context.Context) error { td.wd.ct.workflow, targetKeyspace) restartCtx, restartCancel := context.WithTimeout(context.Background(), BackgroundOperationTimeout) defer restartCancel() - if err := td.restartTargetVReplicationStreams(restartCtx); err != nil { - log.Errorf("error restarting target streams: %v", err) + err := td.restartTargetVReplicationStreams(restartCtx) + if rerr := haveError(err); rerr != nil { + log.Errorf("error restarting target streams: %v", rerr) } }() td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(ctx) - if err := td.selectTablets(ctx); err != nil { - return err + err = td.selectTablets(ctx) + if rerr := haveError(err); rerr != nil { + return vterrors.Wrap(rerr, "when selecting tablets") } - if err := td.syncSourceStreams(ctx); err != nil { - return err + err = td.syncSourceStreams(ctx) + if rerr := haveError(err); rerr != nil { + return vterrors.Wrap(rerr, "when syncing source streams") } - if err := td.startSourceDataStreams(td.shardStreamsCtx); err != nil { - return err + err = td.startSourceDataStreams(td.shardStreamsCtx) + if rerr := haveError(err); rerr != nil { + return vterrors.Wrap(rerr, "when starting source data streams") } - if err := td.syncTargetStreams(ctx); err != nil { - return err + err = td.syncTargetStreams(ctx) + if rerr := haveError(err); rerr != nil { + return vterrors.Wrap(rerr, "when syncing target streams") } - if err := td.startTargetDataStream(td.shardStreamsCtx); err != nil { - return err + err = td.startTargetDataStream(td.shardStreamsCtx) + if rerr := haveError(err); rerr != nil { + return vterrors.Wrap(rerr, "when starting target data streams") } td.setupRowSorters() + if rerr := haveError(nil); rerr != nil { + return vterrors.Wrap(rerr, "when setting up row sorters") + } return nil }