From 0962e47ff4a7c1c1ceb92410c167118b0e04cdeb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 20 Jun 2024 10:51:53 -0400 Subject: [PATCH 1/7] Auto renew KS lock on traffic switch and vdiff init Signed-off-by: Matt Lord --- go/vt/topo/etcd2topo/lock.go | 1 - go/vt/topo/keyspace_lock.go | 28 +++++ go/vt/vtctl/workflow/server.go | 111 ++++++++++++++++++ go/vt/vtctl/workflow/utils.go | 6 +- .../tabletmanager/vdiff/table_differ.go | 72 +++++++++--- 5 files changed, 195 insertions(+), 23 deletions(-) diff --git a/go/vt/topo/etcd2topo/lock.go b/go/vt/topo/etcd2topo/lock.go index 89095156471..0153526a139 100644 --- a/go/vt/topo/etcd2topo/lock.go +++ b/go/vt/topo/etcd2topo/lock.go @@ -22,7 +22,6 @@ import ( "path" "github.com/spf13/pflag" - "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" diff --git a/go/vt/topo/keyspace_lock.go b/go/vt/topo/keyspace_lock.go index 7df1b2ee64f..52f03c4a34d 100644 --- a/go/vt/topo/keyspace_lock.go +++ b/go/vt/topo/keyspace_lock.go @@ -19,6 +19,9 @@ package topo import ( "context" "path" + "time" + + "vitess.io/vitess/go/vt/vterrors" ) type keyspaceLock struct { @@ -56,3 +59,28 @@ func CheckKeyspaceLocked(ctx context.Context, keyspace string) error { keyspace: keyspace, }) } + +// AutoRenewKeyspaceLockLease will renew the keyspace lock lease every renewTime +// in a goroutine until the maxTime is reached -- exiting early if the context +// is cancelled, the done channel is closed, or an error is encountered. +func AutoRenewKeyspaceLockLease(ctx context.Context, keyspace string, renewTime, maxTime time.Duration, doneCh <-chan struct{}, errCh chan error) { + go func() { + ksLock := &keyspaceLock{keyspace: keyspace} + renewAttempts := int(maxTime.Seconds() / renewTime.Seconds()) + for i := 0; i < renewAttempts; i++ { + time.Sleep(renewTime) + select { + case <-ctx.Done(): + return + case <-doneCh: + return + default: + // Attempt to renew lease. + if err := checkLocked(ctx, ksLock); err != nil { + errCh <- vterrors.Wrapf(err, "failed to renew keyspace %s lock lease", keyspace) + return + } + } + } + }() +} diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 587caff3c8c..a660e2f2801 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -80,6 +80,8 @@ const ( rdonlyTabletSuffix = "@rdonly" // Globally routable tables don't have a keyspace prefix. globalTableQualifier = "" + + maxKeyspaceLockLeaseTTL = 10 * time.Minute ) var tabletTypeSuffixes = []string{primaryTabletSuffix, replicaTabletSuffix, rdonlyTabletSuffix} @@ -3245,6 +3247,28 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } defer unlock(&err) + // Ensure that the leases are renewed and we're able to hold the locks during the + // traffic switching. + leaseDoneCh := make(chan struct{}) + leaseErrCh := make(chan error, 1) + defer func() { + close(leaseDoneCh) + for range leaseErrCh { // Drain the channel + } + close(leaseErrCh) + }() + // Renew it every 5 seconds. + topo.AutoRenewKeyspaceLockLease(ctx, ts.SourceKeyspaceName(), time.Second*5, maxKeyspaceLockLeaseTTL, leaseDoneCh, leaseErrCh) + // Check for errors in our diff goroutine and the lock lease renewal goroutine. + haveLeaseError := func() error { + select { + case err := <-leaseErrCh: + return err + default: + return nil + } + } + if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { switch { case ts.IsMultiTenantMigration(): @@ -3263,6 +3287,9 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } return sw.logs(), nil } + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil { return handleError("failed to switch read traffic for the shards", err) @@ -3326,6 +3353,29 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit defer targetUnlock(&err) } + // Ensure that the leases are renewed and we're able to hold the locks during the + // traffic switching. + leaseDoneCh := make(chan struct{}) + leaseErrCh := make(chan error, 2) + defer func() { + close(leaseDoneCh) + for range leaseErrCh { // Drain the channel + } + close(leaseErrCh) + }() + // Renew it every 5 seconds. + topo.AutoRenewKeyspaceLockLease(ctx, ts.SourceKeyspaceName(), time.Second*5, maxKeyspaceLockLeaseTTL, leaseDoneCh, leaseErrCh) + topo.AutoRenewKeyspaceLockLease(ctx, ts.TargetKeyspaceName(), time.Second*5, maxKeyspaceLockLeaseTTL, leaseDoneCh, leaseErrCh) + // Check for errors in our diff goroutine and the lock lease renewal goroutine. + haveLeaseError := func() error { + select { + case err := <-leaseErrCh: + return err + default: + return nil + } + } + // Find out if the target is using any sequence tables for auto_increment // value generation. If so, then we'll need to ensure that they are // initialized properly before allowing new writes on the target. @@ -3335,18 +3385,27 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit if req.InitializeTargetSequences && ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && ts.SourceKeyspaceSchema() != nil && ts.SourceKeyspaceSchema().Keyspace != nil && !ts.SourceKeyspaceSchema().Keyspace.Sharded { + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } sequenceMetadata, err = ts.getTargetSequenceMetadata(ctx) if err != nil { return handleError(fmt.Sprintf("failed to get the sequence information in the %s keyspace", ts.TargetKeyspaceName()), err) } } + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } // If no journals exist, sourceWorkflows will be initialized by sm.MigrateStreams. journalsExist, sourceWorkflows, err := ts.checkJournals(ctx) if err != nil { return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err) } if !journalsExist { + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } ts.Logger().Infof("No previous journals were found. Proceeding normally.") sm, err := BuildStreamMigrator(ctx, ts, cancel, s.env.Parser()) if err != nil { @@ -3364,12 +3423,18 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // For intra-keyspace materialization streams that we migrate where the source and target are // the keyspace being resharded, we wait for those to catchup in the stopStreams path before // we actually stop them. + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } ts.Logger().Infof("Stopping source writes") if err := sw.stopSourceWrites(ctx); err != nil { sw.cancelMigration(ctx, sm) return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) } + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } ts.Logger().Infof("Stopping streams") // Use a shorter context for this since since when doing a Reshard, if there are intra-keyspace // materializations then we have to wait for them to catchup before switching traffic for the @@ -3388,6 +3453,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err) } + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { ts.Logger().Infof("Executing LOCK TABLES on source tables %d times", lockTablesCycles) // Doing this twice with a pause in-between to catch any writes that may have raced in between @@ -3403,24 +3471,36 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } ts.Logger().Infof("Waiting for streams to catchup") if err := sw.waitForCatchup(ctx, timeout); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to sync up replication between the source and target", err) } + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } ts.Logger().Infof("Migrating streams") if err := sw.migrateStreams(ctx, sm); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to migrate the workflow streams", err) } + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } ts.Logger().Infof("Resetting sequences") if err := sw.resetSequences(ctx); err != nil { sw.cancelMigration(ctx, sm) return handleError("failed to reset the sequences", err) } + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } ts.Logger().Infof("Creating reverse streams") if err := sw.createReverseVReplication(ctx); err != nil { sw.cancelMigration(ctx, sm) @@ -3429,10 +3509,16 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // Initialize any target sequences, if there are any, before allowing new writes. if req.InitializeTargetSequences && len(sequenceMetadata) > 0 { + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } ts.Logger().Infof("Initializing target sequences") // Writes are blocked so we can safely initialize the sequence tables but // we also want to use a shorter timeout than the parent context. // We use at most half of the overall timeout. + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) defer cancel() if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { @@ -3444,6 +3530,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit if cancel { return handleError("invalid cancel", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "traffic switching has reached the point of no return, cannot cancel")) } + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } ts.Logger().Infof("Journals were found. Completing the left over steps.") // Need to gather positions in case all journals were not created. if err := ts.gatherPositions(ctx); err != nil { @@ -3451,26 +3540,48 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } // This is the point of no return. Once a journal is created, // traffic can be redirected to target shards. if err := sw.createJournals(ctx, sourceWorkflows); err != nil { return handleError("failed to create the journal", err) } + + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } if err := sw.allowTargetWrites(ctx); err != nil { return handleError(fmt.Sprintf("failed to allow writes in the %s keyspace", ts.TargetKeyspaceName()), err) } + + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } if err := sw.changeRouting(ctx); err != nil { return handleError("failed to update the routing rules", err) } + + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } if err := sw.streamMigraterfinalize(ctx, ts, sourceWorkflows); err != nil { return handleError("failed to finalize the traffic switch", err) } + if req.EnableReverseReplication { + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } if err := sw.startReverseVReplication(ctx); err != nil { return handleError("failed to start the reverse workflow", err) } } + if leaseError := haveLeaseError(); leaseError != nil { + return handleError("lost keyspace lock lease", err) + } if err := sw.freezeTargetVReplication(ctx); err != nil { return handleError(fmt.Sprintf("failed to freeze the workflow in the %s keyspace", ts.TargetKeyspaceName()), err) } diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index d4e8d7b4ec0..3897be6c575 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -28,10 +28,6 @@ import ( "strings" "sync" - querypb "vitess.io/vitess/go/vt/proto/query" - - "vitess.io/vitess/go/vt/vtgate/vindexes" - "google.golang.org/protobuf/encoding/prototext" "vitess.io/vitess/go/sets" @@ -46,9 +42,11 @@ import ( "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" "vitess.io/vitess/go/vt/vttablet/tmclient" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + querypb "vitess.io/vitess/go/vt/proto/query" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index a98a3ce90f9..f7f4f2a1d64 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -52,14 +52,15 @@ import ( type tableDiffPhase string const ( - initializing = tableDiffPhase("initializing") - pickingTablets = tableDiffPhase("picking_streaming_tablets") - syncingSources = tableDiffPhase("syncing_source_streams") - syncingTargets = tableDiffPhase("syncing_target_streams") - startingSources = tableDiffPhase("starting_source_data_streams") - startingTargets = tableDiffPhase("starting_target_data_streams") - restartingVreplication = tableDiffPhase("restarting_vreplication_streams") - diffingTable = tableDiffPhase("diffing_table") + initializing = tableDiffPhase("initializing") + pickingTablets = tableDiffPhase("picking_streaming_tablets") + syncingSources = tableDiffPhase("syncing_source_streams") + syncingTargets = tableDiffPhase("syncing_target_streams") + startingSources = tableDiffPhase("starting_source_data_streams") + startingTargets = tableDiffPhase("starting_target_data_streams") + restartingVreplication = tableDiffPhase("restarting_vreplication_streams") + diffingTable = tableDiffPhase("diffing_table") + maxKeyspaceLockLeaseTTL = 10 * time.Minute ) // how long to wait for background operations to complete @@ -130,6 +131,33 @@ func (td *tableDiffer) initialize(ctx context.Context) error { } }() + // Ensure that the lease is renewed and we're able to hold the lock during the + // table diff initialization work. + done := make(chan struct{}) + errCh := make(chan error, 1) + defer func() { + close(done) + for range errCh { // Drain the channel + } + close(errCh) + }() + // Renew it every 5 seconds. + topo.AutoRenewKeyspaceLockLease(ctx, targetKeyspace, time.Second*5, maxKeyspaceLockLeaseTTL, done, errCh) + // Check for errors in our diff goroutine and the lock lease renewal goroutine. + haveError := func(inerr error) error { + if inerr != nil { + return inerr + } + select { + case <-ctx.Done(): + return ctx.Err() + case err := <-errCh: + return err + default: + return nil + } + } + if err := td.stopTargetVReplicationStreams(ctx, dbClient); err != nil { return err } @@ -147,22 +175,30 @@ func (td *tableDiffer) initialize(ctx context.Context) error { td.shardStreamsCtx, td.shardStreamsCancel = context.WithCancel(ctx) - if err := td.selectTablets(ctx); err != nil { - return err + err = td.selectTablets(ctx) + if rerr := haveError(err); rerr != nil { + return rerr } - if err := td.syncSourceStreams(ctx); err != nil { - return err + err = td.syncSourceStreams(ctx) + if rerr := haveError(err); rerr != nil { + return rerr } - if err := td.startSourceDataStreams(td.shardStreamsCtx); err != nil { - return err + err = td.startSourceDataStreams(td.shardStreamsCtx) + if rerr := haveError(err); rerr != nil { + return rerr } - if err := td.syncTargetStreams(ctx); err != nil { - return err + err = td.syncTargetStreams(ctx) + if rerr := haveError(err); rerr != nil { + return rerr } - if err := td.startTargetDataStream(td.shardStreamsCtx); err != nil { - return err + err = td.startTargetDataStream(td.shardStreamsCtx) + if rerr := haveError(err); rerr != nil { + return rerr } td.setupRowSorters() + if rerr := haveError(nil); rerr != nil { + return rerr + } return nil } From c5dc919031cd54a348811d849f5a5a0f29958bae Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 20 Jun 2024 12:36:21 -0400 Subject: [PATCH 2/7] Check for no shard primary tablet in buildResharder Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/resharder.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/vt/vtctl/workflow/resharder.go b/go/vt/vtctl/workflow/resharder.go index 95fcea3a2a9..4f4ed34963a 100644 --- a/go/vt/vtctl/workflow/resharder.go +++ b/go/vt/vtctl/workflow/resharder.go @@ -99,6 +99,9 @@ func (s *Server) buildResharder(ctx context.Context, keyspace, workflow string, if err != nil { return nil, vterrors.Wrapf(err, "GetShard(%s) failed", shard) } + if si.PrimaryAlias == nil { + return nil, fmt.Errorf("target shard %v has no primary tablet", shard) + } if si.IsPrimaryServing { return nil, fmt.Errorf("target shard %v is in serving state", shard) } From 3817a30eb6fd6002613c3a5b6899369175698d1a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 20 Jun 2024 14:26:47 -0400 Subject: [PATCH 3/7] Move things around Signed-off-by: Matt Lord --- go/vt/topo/keyspace_lock.go | 31 +- go/vt/vtctl/workflow/server.go | 299 +++++++++++++----- go/vt/vtctl/workflow/switcher.go | 4 +- go/vt/vtctl/workflow/switcher_dry_run.go | 2 +- go/vt/vtctl/workflow/switcher_interface.go | 2 +- .../tabletmanager/vdiff/table_differ.go | 63 ++-- 6 files changed, 278 insertions(+), 123 deletions(-) diff --git a/go/vt/topo/keyspace_lock.go b/go/vt/topo/keyspace_lock.go index 52f03c4a34d..156046f7dad 100644 --- a/go/vt/topo/keyspace_lock.go +++ b/go/vt/topo/keyspace_lock.go @@ -24,6 +24,11 @@ import ( "vitess.io/vitess/go/vt/vterrors" ) +const ( + MaxKeyspaceLockLeaseTTL = 10 * time.Minute + leaseRenewalInterval = 5 * time.Second +) + type keyspaceLock struct { keyspace string } @@ -60,27 +65,33 @@ func CheckKeyspaceLocked(ctx context.Context, keyspace string) error { }) } -// AutoRenewKeyspaceLockLease will renew the keyspace lock lease every renewTime -// in a goroutine until the maxTime is reached -- exiting early if the context -// is cancelled, the done channel is closed, or an error is encountered. -func AutoRenewKeyspaceLockLease(ctx context.Context, keyspace string, renewTime, maxTime time.Duration, doneCh <-chan struct{}, errCh chan error) { +// LockKeyspaceWithLeaseRenewal locks the keyspace and starts a goroutine which runs +// until the MaxKeyspaceLockLeaseTTL is reached -- exiting early if the context is +// cancelled, the done channel is closed, or an error is encountered -- refreshing +// the lock's lease every leaseRenewal until ends. +func (ts *Server) LockKeyspaceWithLeaseRenewal(ctx context.Context, keyspace, action string, doneCh <-chan struct{}, errCh chan<- error) (context.Context, func(*error), error) { + ksLock := &keyspaceLock{keyspace: keyspace} + lockCtx, unLockF, err := ts.internalLock(ctx, ksLock, action, true) + if err != nil { + return nil, nil, err + } go func() { - ksLock := &keyspaceLock{keyspace: keyspace} - renewAttempts := int(maxTime.Seconds() / renewTime.Seconds()) + renewAttempts := int(MaxKeyspaceLockLeaseTTL.Seconds() / leaseRenewalInterval.Seconds()) for i := 0; i < renewAttempts; i++ { - time.Sleep(renewTime) + time.Sleep(leaseRenewalInterval) select { - case <-ctx.Done(): + case <-lockCtx.Done(): return case <-doneCh: return default: - // Attempt to renew lease. - if err := checkLocked(ctx, ksLock); err != nil { + // Attempt to renew the lease. + if err := checkLocked(lockCtx, ksLock); err != nil { errCh <- vterrors.Wrapf(err, "failed to renew keyspace %s lock lease", keyspace) return } } } }() + return lockCtx, unLockF, err } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index a660e2f2801..ae042ff099e 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1462,13 +1462,32 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl return nil, err } sw := &switcher{s: s, ts: ts} - lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate") + + // Lock the target keyspace while we setup the workflow and ensure that we're able + // to hold the lock throughout. + leaseDoneCh := make(chan struct{}) + leaseErrCh := make(chan error, 1) + defer func() { + close(leaseDoneCh) + for range leaseErrCh { // Drain the channel + } + close(leaseErrCh) + }() + lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate", leaseDoneCh, leaseErrCh) if lockErr != nil { ts.Logger().Errorf("Locking target keyspace %s failed: %v", ts.TargetKeyspaceName(), lockErr) return nil, lockErr } defer targetUnlock(&err) ctx = lockCtx + haveLeaseError := func() error { + select { + case err := <-leaseErrCh: + return err + default: + return nil + } + } // If we get an error after this point, where the vreplication streams/records // have been created, then we clean up the workflow's artifacts. @@ -1488,26 +1507,41 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl if cerr := s.ts.SaveVSchema(ctx, targetKeyspace, origVSchema); cerr != nil { err = vterrors.Wrapf(err, "failed to restore original target vschema: %v", cerr) } + if leaseErr := haveLeaseError(); leaseErr != nil { // We only check at the end as an FYI that we lost the lease + log.Warningf("keyspace lock lease was lost when cleaning up after a failed MoveTablesCreate: %v", leaseErr) + } } }() // Now that the streams have been successfully created, let's put the associated // routing rules and denied tables entries in place. if externalTopo == nil { + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := s.setupInitialRoutingRules(ctx, req, mz, tables); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } // We added to the vschema. if err := s.ts.SaveVSchema(ctx, targetKeyspace, vschema); err != nil { return nil, err } } if isStandardMoveTables() { // Non-standard ones do not use shard scoped mechanisms + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := s.setupInitialDeniedTables(ctx, ts); err != nil { return nil, vterrors.Wrapf(err, "failed to put initial denied tables entries in place on the target shards") } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := s.ts.RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } @@ -1518,6 +1552,9 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } tabletShards, err := s.collectTargetStreams(ctx, mz) if err != nil { return nil, err @@ -1529,6 +1566,9 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } if mz.ms.ExternalCluster == "" { + if leaseErr := haveLeaseError(); leaseErr != nil { + err = leaseErr + } exists, tablets, err := s.checkIfPreviousJournalExists(ctx, mz, migrationID) if err != nil { return nil, err @@ -1544,6 +1584,9 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl } if req.AutoStart { + if leaseErr := haveLeaseError(); leaseErr != nil { + err = leaseErr + } if err := mz.startStreams(ctx); err != nil { return nil, err } @@ -2574,16 +2617,27 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, sw = &switcher{s: s, ts: ts} } var tctx context.Context - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets") + + // Need to lock both source and target keyspaces. + // Ensure that the leases are renewed and we're able to hold the locks during the + // cleanup work. + leaseDoneCh := make(chan struct{}) + leaseErrCh := make(chan error, 2) + defer func() { + close(leaseDoneCh) + for range leaseErrCh { // Drain the channel + } + close(leaseErrCh) + }() + tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets", leaseDoneCh, leaseErrCh) if lockErr != nil { ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr } defer sourceUnlock(&err) ctx = tctx - if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets") + tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets", leaseDoneCh, leaseErrCh) if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr @@ -2591,27 +2645,55 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, defer targetUnlock(&err) ctx = tctx } + // Check for errors in our diff goroutine and the lock lease renewal. + haveLeaseError := func() error { + select { + case err := <-leaseErrCh: + return err + default: + return nil + } + } + if !keepData { switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.removeTargetTables(ctx); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropSourceDeniedTables(ctx); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropTargetDeniedTables(ctx); err != nil { return nil, err } case binlogdatapb.MigrationType_SHARDS: + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropTargetShards(ctx); err != nil { return nil, err } } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := s.dropRelatedArtifacts(ctx, keepRoutingRules, sw); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } @@ -2766,7 +2848,19 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy sw = &switcher{ts: ts, s: s} } var tctx context.Context - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources") + + // Need to lock both source and target keyspaces. + // Ensure that the leases are renewed and we're able to hold the locks during the + // cleanup work. + leaseDoneCh := make(chan struct{}) + leaseErrCh := make(chan error, 2) + defer func() { + close(leaseDoneCh) + for range leaseErrCh { // Drain the channel + } + close(leaseErrCh) + }() + tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources", leaseDoneCh, leaseErrCh) if lockErr != nil { ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr @@ -2774,7 +2868,7 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy defer sourceUnlock(&err) ctx = tctx if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources") + tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources", leaseDoneCh, leaseErrCh) if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr @@ -2782,7 +2876,20 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy defer targetUnlock(&err) ctx = tctx } + // Check for errors in our diff goroutine and the lock lease renewal. + haveLeaseError := func() error { + select { + case err := <-leaseErrCh: + return err + default: + return nil + } + } + if !force { + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.validateWorkflowHasCompleted(ctx); err != nil { ts.Logger().Errorf("Workflow has not completed, cannot DropSources: %v", err) return nil, err @@ -2792,26 +2899,44 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: log.Infof("Deleting tables") + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.removeSourceTables(ctx, removalType); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropSourceDeniedTables(ctx); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropTargetDeniedTables(ctx); err != nil { return nil, err } case binlogdatapb.MigrationType_SHARDS: log.Infof("Removing shards") + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.dropSourceShards(ctx); err != nil { return nil, err } } } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := s.dropArtifacts(ctx, keepRoutingRules, sw); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } @@ -3000,26 +3125,56 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche sw = &switcher{s: s, ts: ts} } var tctx context.Context - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow") + + // Lock the target keyspace and ensure that the lease is renewed and we're able to hold + // the lock during the finalization. + leaseDoneCh := make(chan struct{}) + leaseErrCh := make(chan error, 1) + defer func() { + close(leaseDoneCh) + for range leaseErrCh { // Drain the channel + } + close(leaseErrCh) + }() + tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow", leaseDoneCh, leaseErrCh) if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) ctx = tctx + // Check for errors in our diff goroutine and the lock lease renewal. + haveLeaseError := func() error { + select { + case err := <-leaseErrCh: + return err + default: + return nil + } + } + if err := sw.dropTargetVReplicationStreams(ctx); err != nil { return nil, err } if !cancel { + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.addParticipatingTablesToKeyspace(ctx, ts.targetKeyspace, tableSpecs); err != nil { return nil, err } + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { return nil, err } } log.Infof("cancel is %t, keepData %t", cancel, keepData) if cancel && !keepData { + if leaseErr := haveLeaseError(); leaseErr != nil { + return nil, leaseErr + } if err := sw.removeTargetTables(ctx); err != nil { return nil, err } @@ -3240,14 +3395,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc return handleError("workflow validation failed", err) } - // For reads, locking the source keyspace is sufficient. - ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads") - if lockErr != nil { - return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) - } - defer unlock(&err) - - // Ensure that the leases are renewed and we're able to hold the locks during the + // Ensure that the lease is renewed and we're able to hold the locks during the // traffic switching. leaseDoneCh := make(chan struct{}) leaseErrCh := make(chan error, 1) @@ -3257,9 +3405,13 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } close(leaseErrCh) }() - // Renew it every 5 seconds. - topo.AutoRenewKeyspaceLockLease(ctx, ts.SourceKeyspaceName(), time.Second*5, maxKeyspaceLockLeaseTTL, leaseDoneCh, leaseErrCh) - // Check for errors in our diff goroutine and the lock lease renewal goroutine. + // For reads, only locking the source keyspace is sufficient. + ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", leaseDoneCh, leaseErrCh) + if lockErr != nil { + return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) + } + defer unlock(&err) + // Check for errors in our diff goroutine and the lock lease renewal. haveLeaseError := func() error { select { case err := <-leaseErrCh: @@ -3287,8 +3439,8 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc } return sw.logs(), nil } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction) if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil { @@ -3337,36 +3489,31 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } - // Need to lock both source and target keyspaces. - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") + // We need to lock both source and target keyspaces. + // Ensure that the leases are renewed and we're able to hold the locks during the + // traffic switching. + leaseDoneCh := make(chan struct{}) + leaseErrCh := make(chan error, 2) + defer func() { + close(leaseDoneCh) + for range leaseErrCh { // Drain the channel + } + close(leaseErrCh) + }() + tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", leaseDoneCh, leaseErrCh) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } ctx = tctx defer sourceUnlock(&err) if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites") + tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites", leaseDoneCh, leaseErrCh) if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) } ctx = tctx defer targetUnlock(&err) } - - // Ensure that the leases are renewed and we're able to hold the locks during the - // traffic switching. - leaseDoneCh := make(chan struct{}) - leaseErrCh := make(chan error, 2) - defer func() { - close(leaseDoneCh) - for range leaseErrCh { // Drain the channel - } - close(leaseErrCh) - }() - // Renew it every 5 seconds. - topo.AutoRenewKeyspaceLockLease(ctx, ts.SourceKeyspaceName(), time.Second*5, maxKeyspaceLockLeaseTTL, leaseDoneCh, leaseErrCh) - topo.AutoRenewKeyspaceLockLease(ctx, ts.TargetKeyspaceName(), time.Second*5, maxKeyspaceLockLeaseTTL, leaseDoneCh, leaseErrCh) - // Check for errors in our diff goroutine and the lock lease renewal goroutine. haveLeaseError := func() error { select { case err := <-leaseErrCh: @@ -3385,8 +3532,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit if req.InitializeTargetSequences && ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && ts.SourceKeyspaceSchema() != nil && ts.SourceKeyspaceSchema().Keyspace != nil && !ts.SourceKeyspaceSchema().Keyspace.Sharded { - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } sequenceMetadata, err = ts.getTargetSequenceMetadata(ctx) if err != nil { @@ -3394,8 +3541,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } // If no journals exist, sourceWorkflows will be initialized by sm.MigrateStreams. journalsExist, sourceWorkflows, err := ts.checkJournals(ctx) @@ -3403,8 +3550,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err) } if !journalsExist { - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } ts.Logger().Infof("No previous journals were found. Proceeding normally.") sm, err := BuildStreamMigrator(ctx, ts, cancel, s.env.Parser()) @@ -3423,8 +3570,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // For intra-keyspace materialization streams that we migrate where the source and target are // the keyspace being resharded, we wait for those to catchup in the stopStreams path before // we actually stop them. - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } ts.Logger().Infof("Stopping source writes") if err := sw.stopSourceWrites(ctx); err != nil { @@ -3432,8 +3579,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } ts.Logger().Infof("Stopping streams") // Use a shorter context for this since since when doing a Reshard, if there are intra-keyspace @@ -3453,8 +3600,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err) } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { ts.Logger().Infof("Executing LOCK TABLES on source tables %d times", lockTablesCycles) @@ -3471,8 +3618,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } ts.Logger().Infof("Waiting for streams to catchup") if err := sw.waitForCatchup(ctx, timeout); err != nil { @@ -3480,8 +3627,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError("failed to sync up replication between the source and target", err) } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } ts.Logger().Infof("Migrating streams") if err := sw.migrateStreams(ctx, sm); err != nil { @@ -3489,8 +3636,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError("failed to migrate the workflow streams", err) } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } ts.Logger().Infof("Resetting sequences") if err := sw.resetSequences(ctx); err != nil { @@ -3498,8 +3645,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError("failed to reset the sequences", err) } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } ts.Logger().Infof("Creating reverse streams") if err := sw.createReverseVReplication(ctx); err != nil { @@ -3509,15 +3656,15 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // Initialize any target sequences, if there are any, before allowing new writes. if req.InitializeTargetSequences && len(sequenceMetadata) > 0 { - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } ts.Logger().Infof("Initializing target sequences") // Writes are blocked so we can safely initialize the sequence tables but // we also want to use a shorter timeout than the parent context. // We use at most half of the overall timeout. - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) defer cancel() @@ -3530,8 +3677,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit if cancel { return handleError("invalid cancel", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "traffic switching has reached the point of no return, cannot cancel")) } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } ts.Logger().Infof("Journals were found. Completing the left over steps.") // Need to gather positions in case all journals were not created. @@ -3540,8 +3687,8 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } // This is the point of no return. Once a journal is created, // traffic can be redirected to target shards. @@ -3549,38 +3696,38 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError("failed to create the journal", err) } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } if err := sw.allowTargetWrites(ctx); err != nil { return handleError(fmt.Sprintf("failed to allow writes in the %s keyspace", ts.TargetKeyspaceName()), err) } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } if err := sw.changeRouting(ctx); err != nil { return handleError("failed to update the routing rules", err) } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } if err := sw.streamMigraterfinalize(ctx, ts, sourceWorkflows); err != nil { return handleError("failed to finalize the traffic switch", err) } if req.EnableReverseReplication { - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } if err := sw.startReverseVReplication(ctx); err != nil { return handleError("failed to start the reverse workflow", err) } } - if leaseError := haveLeaseError(); leaseError != nil { - return handleError("lost keyspace lock lease", err) + if leaseErr := haveLeaseError(); leaseErr != nil { + return handleError("lost keyspace lock lease", leaseErr) } if err := sw.freezeTargetVReplication(ctx); err != nil { return handleError(fmt.Sprintf("failed to freeze the workflow in the %s keyspace", ts.TargetKeyspaceName()), err) diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go index aa41655aab8..01c1c98a371 100644 --- a/go/vt/vtctl/workflow/switcher.go +++ b/go/vt/vtctl/workflow/switcher.go @@ -126,8 +126,8 @@ func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { r.ts.cancelMigration(ctx, sm) } -func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) { - return r.s.ts.LockKeyspace(ctx, keyspace, action) +func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string, doneCh <-chan struct{}, errCh chan<- error) (context.Context, func(*error), error) { + return r.s.ts.LockKeyspaceWithLeaseRenewal(ctx, keyspace, action, doneCh, errCh) } func (r *switcher) freezeTargetVReplication(ctx context.Context) error { diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go index b8b1369bdf7..2445ffa83b3 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run.go +++ b/go/vt/vtctl/workflow/switcher_dry_run.go @@ -293,7 +293,7 @@ func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrato dr.drLog.Log("Cancel migration as requested") } -func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) (context.Context, func(*error), error) { +func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string, _ <-chan struct{}, _ chan<- error) (context.Context, func(*error), error) { dr.drLog.Logf("Lock keyspace %s", keyspace) return ctx, func(e *error) { dr.drLog.Logf("Unlock keyspace %s", keyspace) diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go index 0780aaf484c..28a12341432 100644 --- a/go/vt/vtctl/workflow/switcher_interface.go +++ b/go/vt/vtctl/workflow/switcher_interface.go @@ -24,7 +24,7 @@ import ( ) type iswitcher interface { - lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) + lockKeyspace(ctx context.Context, keyspace, action string, doneCh <-chan struct{}, errCh chan<- error) (context.Context, func(*error), error) cancelMigration(ctx context.Context, sm *StreamMigrator) stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error) stopSourceWrites(ctx context.Context) error diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index f7f4f2a1d64..a717956b9e3 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -52,15 +52,14 @@ import ( type tableDiffPhase string const ( - initializing = tableDiffPhase("initializing") - pickingTablets = tableDiffPhase("picking_streaming_tablets") - syncingSources = tableDiffPhase("syncing_source_streams") - syncingTargets = tableDiffPhase("syncing_target_streams") - startingSources = tableDiffPhase("starting_source_data_streams") - startingTargets = tableDiffPhase("starting_target_data_streams") - restartingVreplication = tableDiffPhase("restarting_vreplication_streams") - diffingTable = tableDiffPhase("diffing_table") - maxKeyspaceLockLeaseTTL = 10 * time.Minute + initializing = tableDiffPhase("initializing") + pickingTablets = tableDiffPhase("picking_streaming_tablets") + syncingSources = tableDiffPhase("syncing_source_streams") + syncingTargets = tableDiffPhase("syncing_target_streams") + startingSources = tableDiffPhase("starting_source_data_streams") + startingTargets = tableDiffPhase("starting_target_data_streams") + restartingVreplication = tableDiffPhase("restarting_vreplication_streams") + diffingTable = tableDiffPhase("diffing_table") ) // how long to wait for background operations to complete @@ -117,12 +116,22 @@ func (td *tableDiffer) initialize(ctx context.Context) error { targetKeyspace := td.wd.ct.vde.thisTablet.Keyspace log.Infof("Locking target keyspace %s", targetKeyspace) - ctx, unlock, lockErr := td.wd.ct.ts.LockKeyspace(ctx, targetKeyspace, "vdiff") + // Ensure that we hold onto the lock. + // Ensure that the lease is renewed and we're able to hold the lock during the + // table diff initialization work. + done := make(chan struct{}) + errCh := make(chan error, 1) + defer func() { + close(done) + for range errCh { // Drain the channel + } + close(errCh) + }() + ctx, unlock, lockErr := td.wd.ct.ts.LockKeyspaceWithLeaseRenewal(ctx, targetKeyspace, "vdiff", done, errCh) if lockErr != nil { log.Errorf("LockKeyspace failed: %v", lockErr) return lockErr } - var err error defer func() { unlock(&err) @@ -130,20 +139,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { log.Errorf("UnlockKeyspace %s failed: %v", targetKeyspace, err) } }() - - // Ensure that the lease is renewed and we're able to hold the lock during the - // table diff initialization work. - done := make(chan struct{}) - errCh := make(chan error, 1) - defer func() { - close(done) - for range errCh { // Drain the channel - } - close(errCh) - }() - // Renew it every 5 seconds. - topo.AutoRenewKeyspaceLockLease(ctx, targetKeyspace, time.Second*5, maxKeyspaceLockLeaseTTL, done, errCh) - // Check for errors in our diff goroutine and the lock lease renewal goroutine. + // Check for errors in our diff goroutine and the lock lease renewal. haveError := func(inerr error) error { if inerr != nil { return inerr @@ -168,8 +164,9 @@ func (td *tableDiffer) initialize(ctx context.Context) error { td.wd.ct.workflow, targetKeyspace) restartCtx, restartCancel := context.WithTimeout(context.Background(), BackgroundOperationTimeout) defer restartCancel() - if err := td.restartTargetVReplicationStreams(restartCtx); err != nil { - log.Errorf("error restarting target streams: %v", err) + err := td.restartTargetVReplicationStreams(restartCtx) + if rerr := haveError(err); rerr != nil { + log.Errorf("error restarting target streams: %v", rerr) } }() @@ -177,27 +174,27 @@ func (td *tableDiffer) initialize(ctx context.Context) error { err = td.selectTablets(ctx) if rerr := haveError(err); rerr != nil { - return rerr + return vterrors.Wrap(rerr, "when selecting tablets") } err = td.syncSourceStreams(ctx) if rerr := haveError(err); rerr != nil { - return rerr + return vterrors.Wrap(rerr, "when syncing source streams") } err = td.startSourceDataStreams(td.shardStreamsCtx) if rerr := haveError(err); rerr != nil { - return rerr + return vterrors.Wrap(rerr, "when starting source data streams") } err = td.syncTargetStreams(ctx) if rerr := haveError(err); rerr != nil { - return rerr + return vterrors.Wrap(rerr, "when syncing target streams") } err = td.startTargetDataStream(td.shardStreamsCtx) if rerr := haveError(err); rerr != nil { - return rerr + return vterrors.Wrap(rerr, "when starting target data streams") } td.setupRowSorters() if rerr := haveError(nil); rerr != nil { - return rerr + return vterrors.Wrap(rerr, "when setting up row sorters") } return nil } From b1fa06d2ca73861f028a4cf2b651c050d0f4ef48 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 20 Jun 2024 18:04:17 -0400 Subject: [PATCH 4/7] Refactor and fix Can't range over a buffered channel to drain it w/o first closing it. Just close it. Signed-off-by: Matt Lord --- go/vt/topo/keyspace_lock.go | 13 +- go/vt/vtctl/workflow/server.go | 156 ++++++++---------- go/vt/vtctl/workflow/switcher.go | 4 +- go/vt/vtctl/workflow/switcher_dry_run.go | 4 +- go/vt/vtctl/workflow/switcher_interface.go | 2 +- .../tabletmanager/vdiff/table_differ.go | 12 +- 6 files changed, 89 insertions(+), 102 deletions(-) diff --git a/go/vt/topo/keyspace_lock.go b/go/vt/topo/keyspace_lock.go index 156046f7dad..b4ca84c08ce 100644 --- a/go/vt/topo/keyspace_lock.go +++ b/go/vt/topo/keyspace_lock.go @@ -69,12 +69,14 @@ func CheckKeyspaceLocked(ctx context.Context, keyspace string) error { // until the MaxKeyspaceLockLeaseTTL is reached -- exiting early if the context is // cancelled, the done channel is closed, or an error is encountered -- refreshing // the lock's lease every leaseRenewal until ends. -func (ts *Server) LockKeyspaceWithLeaseRenewal(ctx context.Context, keyspace, action string, doneCh <-chan struct{}, errCh chan<- error) (context.Context, func(*error), error) { +func (ts *Server) LockKeyspaceWithLeaseRenewal(ctx context.Context, keyspace, action string) (context.Context, func(*error), <-chan error, error) { ksLock := &keyspaceLock{keyspace: keyspace} lockCtx, unLockF, err := ts.internalLock(ctx, ksLock, action, true) if err != nil { - return nil, nil, err + return nil, nil, nil, err } + doneCh := make(chan struct{}) + errCh := make(chan error, 1) go func() { renewAttempts := int(MaxKeyspaceLockLeaseTTL.Seconds() / leaseRenewalInterval.Seconds()) for i := 0; i < renewAttempts; i++ { @@ -93,5 +95,10 @@ func (ts *Server) LockKeyspaceWithLeaseRenewal(ctx context.Context, keyspace, ac } } }() - return lockCtx, unLockF, err + f := func(err *error) { + close(doneCh) + close(errCh) + unLockF(err) + } + return lockCtx, f, errCh, err } diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index ae042ff099e..36397974a0c 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1465,21 +1465,14 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl // Lock the target keyspace while we setup the workflow and ensure that we're able // to hold the lock throughout. - leaseDoneCh := make(chan struct{}) - leaseErrCh := make(chan error, 1) - defer func() { - close(leaseDoneCh) - for range leaseErrCh { // Drain the channel - } - close(leaseErrCh) - }() - lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate", leaseDoneCh, leaseErrCh) + lockCtx, targetUnlock, leaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "MoveTablesCreate") if lockErr != nil { ts.Logger().Errorf("Locking target keyspace %s failed: %v", ts.TargetKeyspaceName(), lockErr) return nil, lockErr } defer targetUnlock(&err) ctx = lockCtx + // Check for errors with the lock lease renewal. haveLeaseError := func() error { select { case err := <-leaseErrCh: @@ -2621,38 +2614,42 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData, // Need to lock both source and target keyspaces. // Ensure that the leases are renewed and we're able to hold the locks during the // cleanup work. - leaseDoneCh := make(chan struct{}) - leaseErrCh := make(chan error, 2) - defer func() { - close(leaseDoneCh) - for range leaseErrCh { // Drain the channel - } - close(leaseErrCh) - }() - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets", leaseDoneCh, leaseErrCh) + tctx, sourceUnlock, sourceLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets") if lockErr != nil { ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr } defer sourceUnlock(&err) ctx = tctx + // Check for errors with the lock lease renewal. + haveLeaseError := func() error { + select { + case err := <-sourceLeaseErrCh: + return err + default: + return nil + } + } if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets", leaseDoneCh, leaseErrCh) + tctx, targetUnlock, targetLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) ctx = tctx - } - // Check for errors in our diff goroutine and the lock lease renewal. - haveLeaseError := func() error { - select { - case err := <-leaseErrCh: - return err - default: - return nil + // Check for errors with the lock lease renewal in both keyspaces. + haveLeaseError = func() error { + select { + case err := <-sourceLeaseErrCh: + return err + case err := <-targetLeaseErrCh: + return err + default: + return nil + } } + } if !keepData { @@ -2852,37 +2849,40 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy // Need to lock both source and target keyspaces. // Ensure that the leases are renewed and we're able to hold the locks during the // cleanup work. - leaseDoneCh := make(chan struct{}) - leaseErrCh := make(chan error, 2) - defer func() { - close(leaseDoneCh) - for range leaseErrCh { // Drain the channel - } - close(leaseErrCh) - }() - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources", leaseDoneCh, leaseErrCh) + tctx, sourceUnlock, sourceLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources") if lockErr != nil { ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr) return nil, lockErr } defer sourceUnlock(&err) ctx = tctx + // Check for errors with the lock lease renewal. + haveLeaseError := func() error { + select { + case err := <-sourceLeaseErrCh: + return err + default: + return nil + } + } if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources", leaseDoneCh, leaseErrCh) + tctx, targetUnlock, targetLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) ctx = tctx - } - // Check for errors in our diff goroutine and the lock lease renewal. - haveLeaseError := func() error { - select { - case err := <-leaseErrCh: - return err - default: - return nil + // Check for errors with the lock lease renewal in both keyspaces. + haveLeaseError = func() error { + select { + case err := <-sourceLeaseErrCh: + return err + case err := <-targetLeaseErrCh: + return err + default: + return nil + } } } @@ -3128,22 +3128,14 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche // Lock the target keyspace and ensure that the lease is renewed and we're able to hold // the lock during the finalization. - leaseDoneCh := make(chan struct{}) - leaseErrCh := make(chan error, 1) - defer func() { - close(leaseDoneCh) - for range leaseErrCh { // Drain the channel - } - close(leaseErrCh) - }() - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow", leaseDoneCh, leaseErrCh) + tctx, targetUnlock, leaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow") if lockErr != nil { ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr) return nil, lockErr } defer targetUnlock(&err) ctx = tctx - // Check for errors in our diff goroutine and the lock lease renewal. + // Check for errors with the lock lease renewal. haveLeaseError := func() error { select { case err := <-leaseErrCh: @@ -3397,21 +3389,13 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc // Ensure that the lease is renewed and we're able to hold the locks during the // traffic switching. - leaseDoneCh := make(chan struct{}) - leaseErrCh := make(chan error, 1) - defer func() { - close(leaseDoneCh) - for range leaseErrCh { // Drain the channel - } - close(leaseErrCh) - }() // For reads, only locking the source keyspace is sufficient. - ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", leaseDoneCh, leaseErrCh) + ctx, unlock, leaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads") if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer unlock(&err) - // Check for errors in our diff goroutine and the lock lease renewal. + // Check for errors with the lock lease renewal. haveLeaseError := func() error { select { case err := <-leaseErrCh: @@ -3492,36 +3476,40 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // We need to lock both source and target keyspaces. // Ensure that the leases are renewed and we're able to hold the locks during the // traffic switching. - leaseDoneCh := make(chan struct{}) - leaseErrCh := make(chan error, 2) - defer func() { - close(leaseDoneCh) - for range leaseErrCh { // Drain the channel - } - close(leaseErrCh) - }() - tctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites", leaseDoneCh, leaseErrCh) + tctx, sourceUnlock, sourceLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchWrites") if lockErr != nil { return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } - ctx = tctx defer sourceUnlock(&err) - if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - tctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites", leaseDoneCh, leaseErrCh) - if lockErr != nil { - return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) - } - ctx = tctx - defer targetUnlock(&err) - } + ctx = tctx + // Check for errors with the lock lease renewal. haveLeaseError := func() error { select { - case err := <-leaseErrCh: + case err := <-sourceLeaseErrCh: return err default: return nil } } + if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { + tctx, targetUnlock, targetLeaseErrCh, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "SwitchWrites") + if lockErr != nil { + return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) + } + defer targetUnlock(&err) + ctx = tctx + // Check for errors with the lock lease renewal in both keyspaces. + haveLeaseError = func() error { + select { + case err := <-sourceLeaseErrCh: + return err + case err := <-targetLeaseErrCh: + return err + default: + return nil + } + } + } // Find out if the target is using any sequence tables for auto_increment // value generation. If so, then we'll need to ensure that they are diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go index 01c1c98a371..0c5fcf7f615 100644 --- a/go/vt/vtctl/workflow/switcher.go +++ b/go/vt/vtctl/workflow/switcher.go @@ -126,8 +126,8 @@ func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { r.ts.cancelMigration(ctx, sm) } -func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string, doneCh <-chan struct{}, errCh chan<- error) (context.Context, func(*error), error) { - return r.s.ts.LockKeyspaceWithLeaseRenewal(ctx, keyspace, action, doneCh, errCh) +func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), <-chan error, error) { + return r.s.ts.LockKeyspaceWithLeaseRenewal(ctx, keyspace, action) } func (r *switcher) freezeTargetVReplication(ctx context.Context) error { diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go index 2445ffa83b3..19df89ce38c 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run.go +++ b/go/vt/vtctl/workflow/switcher_dry_run.go @@ -293,11 +293,11 @@ func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrato dr.drLog.Log("Cancel migration as requested") } -func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string, _ <-chan struct{}, _ chan<- error) (context.Context, func(*error), error) { +func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) (context.Context, func(*error), <-chan error, error) { dr.drLog.Logf("Lock keyspace %s", keyspace) return ctx, func(e *error) { dr.drLog.Logf("Unlock keyspace %s", keyspace) - }, nil + }, nil, nil } func (dr *switcherDryRun) removeSourceTables(ctx context.Context, removalType TableRemovalType) error { diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go index 28a12341432..4d18a5dec6e 100644 --- a/go/vt/vtctl/workflow/switcher_interface.go +++ b/go/vt/vtctl/workflow/switcher_interface.go @@ -24,7 +24,7 @@ import ( ) type iswitcher interface { - lockKeyspace(ctx context.Context, keyspace, action string, doneCh <-chan struct{}, errCh chan<- error) (context.Context, func(*error), error) + lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), <-chan error, error) cancelMigration(ctx context.Context, sm *StreamMigrator) stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error) stopSourceWrites(ctx context.Context) error diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index a717956b9e3..f17f2a2a5ce 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -119,15 +119,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { // Ensure that we hold onto the lock. // Ensure that the lease is renewed and we're able to hold the lock during the // table diff initialization work. - done := make(chan struct{}) - errCh := make(chan error, 1) - defer func() { - close(done) - for range errCh { // Drain the channel - } - close(errCh) - }() - ctx, unlock, lockErr := td.wd.ct.ts.LockKeyspaceWithLeaseRenewal(ctx, targetKeyspace, "vdiff", done, errCh) + ctx, unlock, leaseErrCh, lockErr := td.wd.ct.ts.LockKeyspaceWithLeaseRenewal(ctx, targetKeyspace, "vdiff") if lockErr != nil { log.Errorf("LockKeyspace failed: %v", lockErr) return lockErr @@ -147,7 +139,7 @@ func (td *tableDiffer) initialize(ctx context.Context) error { select { case <-ctx.Done(): return ctx.Err() - case err := <-errCh: + case err := <-leaseErrCh: return err default: return nil From fee64a5ae10229ac2792c908466e78bf1862c533 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 20 Jun 2024 18:05:48 -0400 Subject: [PATCH 5/7] WiP Signed-off-by: Matt Lord --- go/vt/topo/keyspace_lock.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/go/vt/topo/keyspace_lock.go b/go/vt/topo/keyspace_lock.go index b4ca84c08ce..9b9ac45449e 100644 --- a/go/vt/topo/keyspace_lock.go +++ b/go/vt/topo/keyspace_lock.go @@ -66,9 +66,9 @@ func CheckKeyspaceLocked(ctx context.Context, keyspace string) error { } // LockKeyspaceWithLeaseRenewal locks the keyspace and starts a goroutine which runs -// until the MaxKeyspaceLockLeaseTTL is reached -- exiting early if the context is -// cancelled, the done channel is closed, or an error is encountered -- refreshing -// the lock's lease every leaseRenewal until ends. +// until the MaxKeyspaceLockLeaseTTL is reached -- exiting if the context is +// cancelled, the unlock function is called, or an error is encountered -- refreshing +// the lock's lease every leaseRenewal until it ends. func (ts *Server) LockKeyspaceWithLeaseRenewal(ctx context.Context, keyspace, action string) (context.Context, func(*error), <-chan error, error) { ksLock := &keyspaceLock{keyspace: keyspace} lockCtx, unLockF, err := ts.internalLock(ctx, ksLock, action, true) @@ -95,6 +95,7 @@ func (ts *Server) LockKeyspaceWithLeaseRenewal(ctx context.Context, keyspace, ac } } }() + // Add to the unlock function, closing our related channels first. f := func(err *error) { close(doneCh) close(errCh) From 7c5c0db0519d03ccadc90a471d0f14ea69419b2c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 21 Jun 2024 10:56:34 -0400 Subject: [PATCH 6/7] Minor tweaks Signed-off-by: Matt Lord --- go/vt/topo/keyspace_lock.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/go/vt/topo/keyspace_lock.go b/go/vt/topo/keyspace_lock.go index 9b9ac45449e..c382c5ce6a9 100644 --- a/go/vt/topo/keyspace_lock.go +++ b/go/vt/topo/keyspace_lock.go @@ -26,7 +26,7 @@ import ( const ( MaxKeyspaceLockLeaseTTL = 10 * time.Minute - leaseRenewalInterval = 5 * time.Second + leaseRenewalInterval = 10 * time.Second ) type keyspaceLock struct { @@ -69,9 +69,12 @@ func CheckKeyspaceLocked(ctx context.Context, keyspace string) error { // until the MaxKeyspaceLockLeaseTTL is reached -- exiting if the context is // cancelled, the unlock function is called, or an error is encountered -- refreshing // the lock's lease every leaseRenewal until it ends. +// It returns a read-only error channel that you should regularly check to ensure that +// you have not lost the lock or encountered any other non-recoverable errors related +// to your lease. func (ts *Server) LockKeyspaceWithLeaseRenewal(ctx context.Context, keyspace, action string) (context.Context, func(*error), <-chan error, error) { ksLock := &keyspaceLock{keyspace: keyspace} - lockCtx, unLockF, err := ts.internalLock(ctx, ksLock, action, true) + lockCtx, unlockF, err := ts.internalLock(ctx, ksLock, action, true) if err != nil { return nil, nil, nil, err } @@ -96,10 +99,10 @@ func (ts *Server) LockKeyspaceWithLeaseRenewal(ctx context.Context, keyspace, ac } }() // Add to the unlock function, closing our related channels first. - f := func(err *error) { + newUnlockF := func(err *error) { close(doneCh) close(errCh) - unLockF(err) + unlockF(err) } - return lockCtx, f, errCh, err + return lockCtx, newUnlockF, errCh, err } From 9f9f13f50d9245e3b41729556a8a4f7aa7513d99 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 21 Jun 2024 13:17:33 -0400 Subject: [PATCH 7/7] Improve renewal Signed-off-by: Matt Lord --- go/vt/topo/keyspace_lock.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/go/vt/topo/keyspace_lock.go b/go/vt/topo/keyspace_lock.go index c382c5ce6a9..f13e6303610 100644 --- a/go/vt/topo/keyspace_lock.go +++ b/go/vt/topo/keyspace_lock.go @@ -22,6 +22,8 @@ import ( "time" "vitess.io/vitess/go/vt/vterrors" + + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( @@ -78,16 +80,17 @@ func (ts *Server) LockKeyspaceWithLeaseRenewal(ctx context.Context, keyspace, ac if err != nil { return nil, nil, nil, err } - doneCh := make(chan struct{}) - errCh := make(chan error, 1) + done := make(chan struct{}) // Our work is done and we should exit + errCh := make(chan error, 1) // Communicate any errors encountered during renewals go func() { - renewAttempts := int(MaxKeyspaceLockLeaseTTL.Seconds() / leaseRenewalInterval.Seconds()) - for i := 0; i < renewAttempts; i++ { + defer close(errCh) + maxRenewals := int((MaxKeyspaceLockLeaseTTL.Seconds() - leaseRenewalInterval.Seconds()) / leaseRenewalInterval.Seconds()) + for i := 0; i < maxRenewals; i++ { time.Sleep(leaseRenewalInterval) select { case <-lockCtx.Done(): return - case <-doneCh: + case <-done: return default: // Attempt to renew the lease. @@ -97,12 +100,18 @@ func (ts *Server) LockKeyspaceWithLeaseRenewal(ctx context.Context, keyspace, ac } } } + time.Sleep(leaseRenewalInterval) + select { + case <-done: + default: + errCh <- vterrors.Errorf(vtrpcpb.Code_DEADLINE_EXCEEDED, "cannot renew keyspace %s lock lease as we've hit the time limit of %v", + keyspace, MaxKeyspaceLockLeaseTTL) + } }() - // Add to the unlock function, closing our related channels first. + // Add to the unlock function to end the lease renewal work. newUnlockF := func(err *error) { - close(doneCh) - close(errCh) + close(done) unlockF(err) } - return lockCtx, newUnlockF, errCh, err + return lockCtx, newUnlockF, errCh, nil }