Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/backup_blackbox_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func TestExecuteBackupWithCanceledContext(t *testing.T) {

require.Error(t, err)
// all four files will fail
require.ErrorContains(t, err, "context canceled;context canceled;context canceled;context canceled")
require.ErrorContains(t, err, "context canceled")
assert.Equal(t, mysqlctl.BackupUnusable, backupResult)
}

Expand Down
64 changes: 45 additions & 19 deletions go/vt/mysqlctl/builtinbackupengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,39 +611,52 @@ func (be *BuiltinBackupEngine) backupFiles(
// Backup with the provided concurrency.
sema := semaphore.NewWeighted(int64(params.Concurrency))
wg := sync.WaitGroup{}

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

for i := range fes {
wg.Add(1)
go func(i int) {
defer wg.Done()
fe := &fes[i]
// Wait until we are ready to go, return if we encounter an error
acqErr := sema.Acquire(ctx, 1)
acqErr := sema.Acquire(ctxCancel, 1)
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to backup file: %s, err: %s", fe.Name, acqErr.Error())
bh.RecordError(acqErr)
cancel()
return
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying backing up this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if bh.HasErrors() {
params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error())
return
}

// Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might
// end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66,
// which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces
// unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check.
select {
case <-ctx.Done():
case <-ctxCancel.Done():
log.Errorf("Context canceled or timed out during %q backup", fe.Name)
bh.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
return
default:
}

if bh.HasErrors() {
params.Logger.Infof("failed to backup files due to error.")
return
}

// Backup the individual file.
name := fmt.Sprintf("%v", i)
bh.RecordError(be.backupFile(ctx, params, bh, fe, name))
err := be.backupFile(ctxCancel, params, bh, fe, name)
if err != nil {
bh.RecordError(acqErr)
cancel()
}
}(i)
}

Expand Down Expand Up @@ -775,11 +788,14 @@ func (bp *backupPipe) HashString() string {
return hex.EncodeToString(bp.crc32.Sum(nil))
}

func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger) {
func (bp *backupPipe) ReportProgress(ctx context.Context, period time.Duration, logger logutil.Logger) {
tick := time.NewTicker(period)
defer tick.Stop()
for {
select {
case <-ctx.Done():
logger.Infof("Canceled %q file", bp.filename)
return
case <-bp.done:
logger.Infof("Done taking Backup %q", bp.filename)
return
Expand Down Expand Up @@ -822,7 +838,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
}

br := newBackupReader(fe.Name, fi.Size(), timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger)

// Open the destination file for writing, and a buffer.
params.Logger.Infof("Backing up file: %v", fe.Name)
Expand Down Expand Up @@ -1021,43 +1037,53 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
sema := semaphore.NewWeighted(int64(params.Concurrency))
rec := concurrency.AllErrorRecorder{}
wg := sync.WaitGroup{}

ctxCancel, cancel := context.WithCancel(ctx)
defer cancel()

for i := range fes {
wg.Add(1)
go func(i int) {
defer wg.Done()
fe := &fes[i]
// Wait until we are ready to go, return if we encounter an error
acqErr := sema.Acquire(ctx, 1)
acqErr := sema.Acquire(ctxCancel, 1)
if acqErr != nil {
log.Errorf("Unable to acquire semaphore needed to restore file: %s, err: %s", fe.Name, acqErr.Error())
rec.RecordError(acqErr)
cancel()
return
}
defer sema.Release(1)

// First check if we have any error, if we have, there is no point trying to restore this file.
// We check for errors before checking if the context is canceled on purpose, if there was an
// error, the context would have been canceled already.
if rec.HasErrors() {
params.Logger.Errorf("Failed to restore files due to error: %v", bh.Error())
return
}

// Check for context cancellation explicitly because, the way semaphore code is written, theoretically we might
// end up not throwing an error even after cancellation. Please see https://cs.opensource.google/go/x/sync/+/refs/tags/v0.1.0:semaphore/semaphore.go;l=66,
// which suggests that if the context is already done, `Acquire()` may still succeed without blocking. This introduces
// unpredictability in my test cases, so in order to avoid that, I am adding this cancellation check.
select {
case <-ctx.Done():
case <-ctxCancel.Done():
log.Errorf("Context canceled or timed out during %q restore", fe.Name)
rec.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled"))
return
default:
}

if rec.HasErrors() {
params.Logger.Infof("Failed to restore files due to error.")
return
}

fe.ParentPath = createdDir
// And restore the file.
name := fmt.Sprintf("%v", i)
params.Logger.Infof("Copying file %v: %v", name, fe.Name)
err := be.restoreFile(ctx, params, bh, fe, bm, name)
err := be.restoreFile(ctxCancel, params, bh, fe, bm, name)
if err != nil {
rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fe.Name))
cancel()
}
}(i)
}
Expand Down Expand Up @@ -1087,7 +1113,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
}()

br := newBackupReader(name, 0, timedSource)
go br.ReportProgress(builtinBackupProgress, params.Logger)
go br.ReportProgress(ctx, builtinBackupProgress, params.Logger)
var reader io.Reader = br

// Open the destination file for writing.
Expand Down