diff --git a/AGENTS.md b/AGENTS.md new file mode 120000 index 00000000000..681311eb9cf --- /dev/null +++ b/AGENTS.md @@ -0,0 +1 @@ +CLAUDE.md \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000000..a953f73de90 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,341 @@ +## :handshake: Our Partnership + +**We're building this together.** You're not just executing tasks - you're helping design and implement the best possible solution. This means: + +- Challenge my suggestions when something feels wrong +- Ask me to explain my reasoning +- Propose alternative approaches +- Take time to think through problems + +**Quality is non-negotiable.** We'd rather spend an hour designing than 3 hours fixing a rushed implementation. + +## :thought_balloon: Before We Code + +Always discuss first: +- What problem are we solving? +- What's the ideal solution? +- What tests would prove it works? +- Are we making the codebase better? + +## Strict Task Adherence + +**Only do exactly what I ask for - nothing more, nothing less.** + +- Do NOT proactively update documentation unless explicitly requested +- Do NOT add explanatory comments unless asked +- Do NOT make "improvements" or "clean up" code beyond the specific task +- Do NOT add features, optimizations, or enhancements I didn't mention +- If there is something you think should be done, suggest it, but don't do it until asked to + +**Red flags that indicate you're going beyond the task:** +- "Let me also..." +- "While I'm at it..." +- "I should also update..." +- "Let me improve..." +- "I'll also clean up..." + +**If the task is complete, STOP. Don't look for more work to do.** + +## :test_tube: Test-Driven Development + +TDD isn't optional - it's how we ensure quality: + +### The TDD Cycle +1. **Red** - Write a failing test that defines success +2. **Green** - Write minimal code to pass +3. **Refactor** - Make it clean and elegant + +### Example TDD Session +```go +// Step 1: Write the test first +func TestConnectionBilateralCleanup(t *testing.T) { + // Define what success looks like + client, server := testutils.CreateConnectedTCPPair() + + // Test the behavior we want + client.Close() + + // Both sides should be closed + assert.Eventually(t, func() bool { + return isConnectionClosed(server) + }) +} + +// Step 2: See it fail (confirms we're testing the right thing) +// Step 3: Implement the feature +// Step 4: See it pass +// Step 5: Refactor for clarity +``` + +To make sure tests are easy to read, we use testify assertions. Make sure to use assert.Eventually instead of using manual thread.sleep and timeouts. + +## :rotating_light: Error Handling Excellence + +Error handling is not an afterthought - it's core to reliable software. + +### Go Error Patterns +```go +// YES - Clear error context +func ProcessUser(id string) (*User, error) { + if id == "" { + return nil, fmt.Errorf("user ID cannot be empty") + } + + user, err := db.GetUser(id) + if err != nil { + return nil, fmt.Errorf("failed to get user %s: %w", id, err) + } + + return user, nil +} + +// NO - Swallowing errors +func ProcessUser(id string) *User { + user, _ := db.GetUser(id) // What if this fails? + return user +} +``` + +### Error Handling Principles +1. **Wrap errors with context** - Use `fmt.Errorf("context: %w", err)` +2. **Validate early** - Check inputs before doing work +3. **Fail fast** - Don't continue with invalid state +4. **Log appropriately** - Errors at boundaries, debug info internally +5. **Return structured errors** - Use error types for different handling + +### Testing Error Paths +```go +func TestProcessUser_InvalidID(t *testing.T) { + _, err := ProcessUser("") + assert.ErrorContains(t, err, "cannot be empty") +} + +func TestProcessUser_DatabaseError(t *testing.T) { + mockDB.EXPECT().GetUser("123").Return(nil, errors.New("db connection failed")) + + _, err := ProcessUser("123") + assert.ErrorContains(t, err, "failed to get user") +} +``` + +## :triangular_ruler: Design Principles + +### 1. Simple is Better Than Clever +```go +// YES - Clear and obvious +if user.NeedsMigration() { + return migrate(user) +} + +// NO - Clever but unclear +return user.NeedsMigration() && migrate(user) || user +``` + +### 2. Explicit is Better Than Implicit +- Clear function names +- Obvious parameter types +- No hidden side effects + +### 3. Performance with Clarity +- Optimize hot paths +- But keep code readable +- Document why, not what + +### 4. Fail Fast and Clearly +- Validate inputs early +- Return clear error messages +- Help future debugging + +### 5. Interfaces Define What You Need, Not What You Provide +- When you need something from another component, define the interface in your package +- Don't look at what someone else provides - define exactly what you require +- This keeps interfaces small, focused, and prevents unnecessary coupling +- Types and their methods live together. At the top of files, use a single ```type ()``` with all type declarations inside. + +### 6. Go-Specific Best Practices +- **Receiver naming** - Use consistent, short receiver names (e.g., `u *User`, not `user *User`) +- **Package naming** - Short, descriptive, lowercase without underscores +- **Interface naming** - Single-method interfaces end in `-er` (Reader, Writer, Handler) +- **Context first** - Always pass `context.Context` as the first parameter +- **Channels for coordination** - Use channels to coordinate goroutines, not shared memory + +## :mag: Dubugging & Troubleshooting + +When things don't work as expected, we debug systematically: + +### Debugging Strategy +1. **Reproduce reliably** - Create a minimal failing case +2. **Isolate the problem** - Binary search through the system +3. **Understand the data flow** - Trace inputs and outputs +4. **Question assumptions** - What did we assume was working? +5. **Fix the root cause** - Not just the symptoms + +### Debugging Tools & Techniques +```go +// Use structured logging for debugging +log.WithFields(log.Fields{ + "user_id": userID, + "action": "process_payment", + "amount": amount, +}).Debug("Starting payment processing") + +// Add strategic debug points +func processPayment(amount float64) error { + log.Debugf("processPayment called with amount: %f", amount) + + if amount <= 0 { + return fmt.Errorf("invalid amount: %f", amount) + } + + // More processing... + log.Debug("Payment validation passed") + return nil +} +``` + +### When Stuck +- Write a test that reproduces the issue +- Add logging to understand data flow +- Use the debugger to step through code +- Rubber duck explain the problem +- Take a break and come back fresh + +## :recycle: Refactoring Legacy Code + +When improving existing code, we move carefully and systematically: + +### Refactoring Strategy +1. **Understand first** - Read and comprehend the existing code +2. **Add tests** - Create safety nets before changing anything +3. **Small steps** - Make tiny, verifiable improvements +4. **Preserve behavior** - Keep the same external interface +5. **Measure improvement** - Verify it's actually better + +### Safe Refactoring Process +```go +// Step 1: Add characterization tests +func TestLegacyProcessor_ExistingBehavior(t *testing.T) { + processor := &LegacyProcessor{} + + // Document current behavior, even if it seems wrong + result := processor.Process("input") + assert.Equal(t, "weird_legacy_output", result) +} + +// Step 2: Refactor with tests passing +func (p *LegacyProcessor) Process(input string) string { + // Improved implementation that maintains the same behavior + return processWithNewLogic(input) +} + +// Step 3: Now we can safely change the behavior +func TestProcessor_ImprovedBehavior(t *testing.T) { + processor := &Processor{} + + result := processor.Process("input") + assert.Equal(t, "expected_output", result) +} +``` + +## :arrows_counterclockwise: Development Workflow + +### Starting a Feature +1. **Discuss** - "I'm thinking about implementing X. Here's my approach..." +2. **Design** - Sketch out the API and key components +3. **Test** - Write tests that define the behavior +4. **Implement** - Make the tests pass +5. **Review** - "Does this make sense? Any concerns?" + +### Making Changes +1. **Small PRs** - Easier to review and less risky +2. **Incremental** - Build features piece by piece +3. **Always tested** - No exceptions +4. **Clear commits** - Each commit should have a clear purpose + +### Git and PR Workflow + +**CRITICAL: Git commands are ONLY for reading state - NEVER for modifying it.** +- **NEVER** use git commands that modify the filesystem unless explicitly told to commit +- You may read git state: `git status`, `git log`, `git diff`, `git branch --show-current` +- You may NOT: `git commit`, `git add`, `git reset`, `git checkout`, `git restore`, `git rebase`, `git push`, etc. +- **ONLY commit when explicitly asked to commit** +- When asked to commit, do it once and stop +- Only I can modify git state unless you've been given explicit permission to commit + +**Once a PR is created, NEVER amend commits or rewrite history.** +- Always create new commits after PR is created +- No `git commit --amend` after pushing to a PR branch +- No `git rebase` that rewrites commits in the PR +- No force pushes to PR branches +- This keeps the PR history clean and reviewable + +**When asked to write a PR description:** +1. **Use `gh` CLI** - Always use `gh pr edit ` to update PRs +2. **Update both body and title** - Use `--body` and `--title` flags +3. **Be informal, humble, and short** - Keep it conversational and to the point +4. **Credit appropriately** - If Claude Code wrote most of it, mention that +5. **Example format**: + ``` + ## What's this? + [Brief explanation of the feature/fix] + + ## How it works + [Key implementation details] + + ## Usage + [Code examples if relevant] + + --- + _Most of this was written by Claude Code - I just provided direction._ + ``` + +## :memo: Code Review Mindset + +When reviewing code (yours or mine), ask: +- Is this the simplest solution? +- Will this make sense in 6 months? +- Are edge cases handled? +- Is it well tested? +- Does it improve the codebase? + +## :dart: Common Patterns + +### Feature Implementation +``` +You: "Let's add feature X" +Me: "Sounds good! What's the API going to look like? What are the main use cases?" +[Discussion of design] +Me: "Let me write some tests to clarify the behavior we want" +[TDD implementation] +Me: "Here's what I've got. What do you think?" +``` + +### Bug Fixing +``` +You: "We have a bug where X happens" +Me: "Let's write a test that reproduces it first" +[Test that fails] +Me: "Great, now we know exactly what we're fixing" +[Fix implementation] +``` + +### Performance Work +``` +You: "This seems slow" +Me: "Let's benchmark it first to get a baseline" +[Benchmark results] +Me: "Now let's optimize without breaking functionality" +[Optimization with tests passing] +``` + +## :rocket: Shipping Quality + +Before considering any work "done": +- [ ] Tests pass and cover the feature +- [ ] Code is clean and readable +- [ ] Edge cases are handled +- [ ] Performance is acceptable +- [ ] Documentation is updated if needed +- [ ] We're both happy with it + +Remember: We're crafting software, not just making it work. Every line of code is an opportunity to make the system better. diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 752a0d72d5d..111d840a680 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -54,13 +54,22 @@ import ( mysqlctlpb "vitess.io/vitess/go/vt/proto/mysqlctl" tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - "vitess.io/vitess/go/vt/proto/vtrpc" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) const ( builtinBackupEngineName = "builtin" AutoIncrementalFromPos = "auto" dataDictionaryFile = "mysql.ibd" +<<<<<<< HEAD +======= + + // How many times we will retry file operations. Note that a file operation that + // returns a vtrpc.Code_FAILED_PRECONDITION error is considered fatal and we will + // not retry. + maxRetriesPerFile = 1 + maxFileCloseRetries = 20 // At this point we should consider it permanent +>>>>>>> 1fe6c68094 (BuiltinBackupEngine: Retry file close and fail backup when we cannot (#18848)) ) var ( @@ -180,7 +189,7 @@ func (fe *FileEntry) fullPath(cnf *Mycnf) (string, error) { case backupBinlogDir: root = filepath.Dir(cnf.BinLogPath) default: - return "", vterrors.Errorf(vtrpc.Code_UNKNOWN, "unknown base: %v", fe.Base) + return "", vterrors.Errorf(vtrpcpb.Code_UNKNOWN, "unknown base: %v", fe.Base) } return path.Join(fe.ParentPath, root, fe.Name), nil @@ -303,7 +312,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par } purgedGTIDSet, ok := gtidPurged.GTIDSet.(replication.Mysql56GTIDSet) if !ok { - return gtidPurged, nil, vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "failed to parse a valid MySQL GTID set from value: %v", gtidPurged) + return gtidPurged, nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to parse a valid MySQL GTID set from value: %v", gtidPurged) } return gtidPurged, purgedGTIDSet, nil } @@ -366,7 +375,7 @@ func (be *BuiltinBackupEngine) executeIncrementalBackup(ctx context.Context, par return BackupUnusable, vterrors.Wrapf(err, "reading timestamps from binlog files %v", binaryLogsToBackup) } if resp.FirstTimestampBinlog == "" || resp.LastTimestampBinlog == "" { - return BackupUnusable, vterrors.Errorf(vtrpc.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp) + return BackupUnusable, vterrors.Errorf(vtrpcpb.Code_ABORTED, "empty binlog name in response. Request=%v, Response=%v", req, resp) } log.Infof("ReadBinlogFilesTimestampsResponse: %+v", resp) incrDetails := &IncrementalBackupDetails{ @@ -604,6 +613,64 @@ func (be *BuiltinBackupEngine) backupFiles( sema := semaphore.NewWeighted(int64(params.Concurrency)) wg := sync.WaitGroup{} +<<<<<<< HEAD +======= + // BackupHandle supports the BackupErrorRecorder interface for tracking errors + // across any goroutines that fan out to take the backup. This means that we + // don't need a local error recorder and can put everything through the bh. + // + // This handles the scenario where bh.AddFile() encounters an error asynchronously, + // which ordinarily would be lost in the context of `be.backupFile`, i.e. if an + // error were encountered + // [here](https://github.com/vitessio/vitess/blob/d26b6c7975b12a87364e471e2e2dfa4e253c2a5b/go/vt/mysqlctl/s3backupstorage/s3.go#L139-L142). + // + // All the errors are grouped per file, if one or more files failed, we back them up + // once more concurrently, if any of the retry fail, we fail-fast by canceling the context + // and return an error. There is no reason to continue processing the other retries, if + // one of them failed. + if files := bh.GetFailedFiles(); len(files) > 0 { + newFEs := make([]FileEntry, len(fes)) + for _, file := range files { + fileNb, err := strconv.Atoi(file) + if err != nil { + return vterrors.Wrapf(err, "failed to retry file '%s'", file) + } + oldFes := fes[fileNb] + newFEs[fileNb] = FileEntry{ + Base: oldFes.Base, + Name: oldFes.Name, + ParentPath: oldFes.ParentPath, + RetryCount: 1, + } + bh.ResetErrorForFile(file) + } + err = be.backupFileEntries(ctx, newFEs, bh, params) + if err != nil { + return err + } + } + + // Backup the MANIFEST file and apply retry logic. + var manifestErr error + for currentRetry := 0; currentRetry <= maxRetriesPerFile; currentRetry++ { + manifestErr = be.backupManifest(ctx, params, bh, backupPosition, purgedPosition, fromPosition, fromBackupName, serverUUID, mysqlVersion, incrDetails, fes, currentRetry) + if manifestErr == nil || vterrors.Code(manifestErr) == vtrpcpb.Code_FAILED_PRECONDITION { + break + } + bh.ResetErrorForFile(backupManifestFileName) + } + if manifestErr != nil { + return manifestErr + } + return nil +} + +// backupFileEntries iterates over a slice of FileEntry, backing them up concurrently up to the defined concurrency limit. +// This function will ignore empty FileEntry, allowing the retry mechanism to send a partially empty slice, to not +// mess up the index of retriable FileEntry. +// This function does not leave any background operation behind itself, all calls to bh.AddFile will be finished or canceled. +func (be *BuiltinBackupEngine) backupFileEntries(ctx context.Context, fes []FileEntry, bh backupstorage.BackupHandle, params BackupParams) error { +>>>>>>> 1fe6c68094 (BuiltinBackupEngine: Retry file close and fail backup when we cannot (#18848)) ctxCancel, cancel := context.WithCancel(ctx) defer func() { // We may still have operations in flight that require a valid context, such as adding files to S3. @@ -645,16 +712,31 @@ func (be *BuiltinBackupEngine) backupFiles( select { case <-ctxCancel.Done(): log.Errorf("Context canceled or timed out during %q backup", fe.Name) +<<<<<<< HEAD bh.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled")) return +======= + bh.RecordError(name, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context canceled")) + return nil +>>>>>>> 1fe6c68094 (BuiltinBackupEngine: Retry file close and fail backup when we cannot (#18848)) default: } // Backup the individual file. +<<<<<<< HEAD name := fmt.Sprintf("%v", i) if err := be.backupFile(ctxCancel, params, bh, fe, name); err != nil { bh.RecordError(err) cancel() +======= + var errBackupFile error + if errBackupFile = be.backupFile(ctxCancel, params, bh, fe, name); errBackupFile != nil { + bh.RecordError(name, vterrors.Wrapf(errBackupFile, "failed to backup file '%s'", name)) + if fe.RetryCount >= maxRetriesPerFile || vterrors.Code(errBackupFile) == vtrpcpb.Code_FAILED_PRECONDITION { + // this is the last attempt, and we have an error, we can cancel everything and fail fast. + cancel() + } +>>>>>>> 1fe6c68094 (BuiltinBackupEngine: Retry file close and fail backup when we cannot (#18848)) } }(i) } @@ -832,7 +914,10 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara defer func() { closeSourceAt := time.Now() - source.Close() + if err := closeWithRetry(ctx, params.Logger, source, fe.Name); err != nil { + params.Logger.Infof("Failed to close %s source file during backup: %v", fe.Name, err) + return + } params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt)) }() @@ -858,10 +943,11 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara defer func(name, fileName string) { closeDestAt := time.Now() - if rerr := dest.Close(); rerr != nil { - rerr = vterrors.Wrapf(rerr, "failed to close file %v,%v", name, fe.Name) + if rerr := closeWithRetry(ctx, params.Logger, dest, fe.Name); rerr != nil { + rerr = vterrors.Wrapf(rerr, "failed to close destination file (%v) %v", name, fe.Name) params.Logger.Error(rerr) finalErr = errors.Join(finalErr, rerr) + return } params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt)) }(name, fe.Name) @@ -906,12 +992,20 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara closer := ioutil.NewTimeoutCloser(ctx, compressor, closeTimeout) defer func() { // Close gzip to flush it, after that all data is sent to writer. +<<<<<<< HEAD closeCompressorAt := time.Now() params.Logger.Infof("closing compressor") if cerr := closer.Close(); err != nil { cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", name) +======= + params.Logger.Infof("Closing compressor for file: %s %s", fe.Name, retryStr) + closeCompressorAt := time.Now() + if cerr := closeWithRetry(ctx, params.Logger, closer, "compressor"); cerr != nil { + cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", fe.Name) +>>>>>>> 1fe6c68094 (BuiltinBackupEngine: Retry file close and fail backup when we cannot (#18848)) params.Logger.Error(cerr) createAndCopyErr = errors.Join(createAndCopyErr, cerr) + return } params.Stats.Scope(stats.Operation("Compressor:Close")).TimedIncrement(time.Since(closeCompressorAt)) }() @@ -939,6 +1033,97 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara return nil } +<<<<<<< HEAD +======= +func (be *BuiltinBackupEngine) backupManifest( + ctx context.Context, + params BackupParams, + bh backupstorage.BackupHandle, + backupPosition replication.Position, + purgedPosition replication.Position, + fromPosition replication.Position, + fromBackupName string, + serverUUID string, + mysqlVersion string, + incrDetails *IncrementalBackupDetails, + fes []FileEntry, + currentAttempt int, +) (finalErr error) { + retryStr := retryToString(currentAttempt) + params.Logger.Infof("Backing up file %s %s", backupManifestFileName, retryStr) + defer func() { + state := "Completed" + if finalErr != nil { + state = "Failed" + } + params.Logger.Infof("%s backing up %s %s", state, backupManifestFileName, retryStr) + }() + + // Creating this function allows us to ensure we always close the writer no matter what, + // and in case of success that we close it before calling bh.EndBackup. + addAndWrite := func() (addAndWriteError error) { + // open the MANIFEST + wc, err := bh.AddFile(ctx, backupManifestFileName, backupstorage.FileSizeUnknown) + if err != nil { + return vterrors.Wrapf(err, "cannot add %v to backup %s", backupManifestFileName, retryStr) + } + defer func() { + if err := closeWithRetry(ctx, params.Logger, wc, backupManifestFileName); err != nil { + addAndWriteError = errors.Join(addAndWriteError, vterrors.Wrapf(err, "cannot close backup: %v", backupManifestFileName)) + } + }() + + // JSON-encode and write the MANIFEST + bm := &builtinBackupManifest{ + // Common base fields + BackupManifest: BackupManifest{ + BackupName: bh.Name(), + BackupMethod: builtinBackupEngineName, + Position: backupPosition, + PurgedPosition: purgedPosition, + FromPosition: fromPosition, + FromBackup: fromBackupName, + Incremental: !fromPosition.IsZero(), + ServerUUID: serverUUID, + TabletAlias: params.TabletAlias, + Keyspace: params.Keyspace, + Shard: params.Shard, + BackupTime: params.BackupTime.UTC().Format(time.RFC3339), + FinishedTime: time.Now().UTC().Format(time.RFC3339), + MySQLVersion: mysqlVersion, + UpgradeSafe: params.UpgradeSafe, + IncrementalDetails: incrDetails, + }, + + // Builtin-specific fields + FileEntries: fes, + SkipCompress: !backupStorageCompress, + CompressionEngine: CompressionEngineName, + ExternalDecompressor: ManifestExternalDecompressorCmd, + } + data, err := json.MarshalIndent(bm, "", " ") + if err != nil { + return vterrors.Wrapf(err, "cannot JSON encode %v %s", backupManifestFileName, retryStr) + } + if _, err := wc.Write(data); err != nil { + return vterrors.Wrapf(err, "cannot write %v %s", backupManifestFileName, retryStr) + } + return nil + } + + err := addAndWrite() + if err != nil { + return err + } + + err = bh.EndBackup(ctx) + if err != nil { + return err + } + return bh.Error() +} + +>>>>>>> 1fe6c68094 (BuiltinBackupEngine: Retry file close and fail backup when we cannot (#18848)) // executeRestoreFullBackup restores the files from a full backup. The underlying mysql database service is expected to be stopped. func (be *BuiltinBackupEngine) executeRestoreFullBackup(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, bm builtinBackupManifest) error { if err := prepareToRestore(ctx, params.Cnf, params.Mysqld, params.Logger, params.MysqlShutdownTimeout); err != nil { @@ -964,7 +1149,7 @@ func (be *BuiltinBackupEngine) executeRestoreIncrementalBackup(ctx context.Conte defer os.RemoveAll(createdDir) mysqld, ok := params.Mysqld.(*Mysqld) if !ok { - return vterrors.Errorf(vtrpc.Code_UNIMPLEMENTED, "expected: Mysqld") + return vterrors.Errorf(vtrpcpb.Code_UNIMPLEMENTED, "expected: Mysqld") } for _, fe := range bm.FileEntries { fe.ParentPath = createdDir @@ -1021,6 +1206,29 @@ func (be *BuiltinBackupEngine) ExecuteRestore(ctx context.Context, params Restor return &bm.BackupManifest, nil } +<<<<<<< HEAD +======= +func (be *BuiltinBackupEngine) restoreManifest(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle) (bm builtinBackupManifest, finalErr error) { + var retryCount int + defer func() { + state := "Completed" + if finalErr != nil { + state = "Failed" + } + params.Logger.Infof("%s restoring %s %s", state, backupManifestFileName, retryToString(retryCount)) + }() + + for ; retryCount <= maxRetriesPerFile; retryCount++ { + params.Logger.Infof("Restoring file %s %s", backupManifestFileName, retryToString(retryCount)) + if finalErr = getBackupManifestInto(ctx, bh, &bm); finalErr == nil || vterrors.Code(finalErr) == vtrpcpb.Code_FAILED_PRECONDITION { + break + } + params.Logger.Infof("Failed restoring %s %s", backupManifestFileName, retryToString(retryCount)) + } + return +} + +>>>>>>> 1fe6c68094 (BuiltinBackupEngine: Retry file close and fail backup when we cannot (#18848)) // restoreFiles will copy all the files from the BackupStorage to the // right place. func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, bm builtinBackupManifest) (createdDir string, err error) { @@ -1086,19 +1294,35 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP select { case <-ctxCancel.Done(): log.Errorf("Context canceled or timed out during %q restore", fe.Name) +<<<<<<< HEAD rec.RecordError(vterrors.Errorf(vtrpc.Code_CANCELED, "context canceled")) return +======= + bh.RecordError(name, vterrors.Errorf(vtrpcpb.Code_CANCELED, "context canceled")) + return nil +>>>>>>> 1fe6c68094 (BuiltinBackupEngine: Retry file close and fail backup when we cannot (#18848)) default: } fe.ParentPath = createdDir // And restore the file. +<<<<<<< HEAD name := fmt.Sprintf("%v", i) params.Logger.Infof("Copying file %v: %v", name, fe.Name) err := be.restoreFile(ctxCancel, params, bh, fe, bm, name) if err != nil { rec.RecordError(vterrors.Wrapf(err, "can't restore file %v to %v", name, fe.Name)) cancel() +======= + params.Logger.Infof("Copying file %v: %v %s", name, fe.Name, retryToString(fe.RetryCount)) + if errRestore := be.restoreFile(ctx, params, bh, fe, bm, name); errRestore != nil { + bh.RecordError(name, vterrors.Wrapf(errRestore, "failed to restore file %v to %v", name, fe.Name)) + if fe.RetryCount >= maxRetriesPerFile || vterrors.Code(errRestore) == vtrpcpb.Code_FAILED_PRECONDITION { + // this is the last attempt, and we have an error, we can return an error, which will let errgroup + // know it can cancel the context + return errRestore + } +>>>>>>> 1fe6c68094 (BuiltinBackupEngine: Retry file close and fail backup when we cannot (#18848)) } }(i) } @@ -1123,7 +1347,10 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa defer func() { closeSourceAt := time.Now() - source.Close() + if err := closeWithRetry(ctx, params.Logger, source, fe.Name); err != nil { + params.Logger.Errorf("Failed to close source file %s during restore: %v", name, err) + return + } params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt)) }() @@ -1141,8 +1368,10 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa defer func() { closeDestAt := time.Now() - if cerr := dest.Close(); cerr != nil { + if cerr := closeWithRetry(ctx, params.Logger, dest, fe.Name); cerr != nil { finalErr = errors.Join(finalErr, vterrors.Wrap(cerr, "failed to close destination file")) + params.Logger.Errorf("Failed to close destination file %s during restore: %v", dest.Name(), cerr) + return } params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt)) }() @@ -1187,12 +1416,13 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa reader = ioutil.NewMeteredReader(decompressor, decompressStats.TimedIncrementBytes) defer func() { - closeDecompressorAt := time.Now() params.Logger.Infof("closing decompressor") - if cerr := closer.Close(); err != nil { + closeDecompressorAt := time.Now() + if cerr := closeWithRetry(ctx, params.Logger, closer, "decompressor"); cerr != nil { cerr = vterrors.Wrapf(cerr, "failed to close decompressor %v", name) params.Logger.Error(cerr) finalErr = errors.Join(finalErr, cerr) + return } params.Stats.Scope(stats.Operation("Decompressor:Close")).TimedIncrement(time.Since(closeDecompressorAt)) }() @@ -1206,7 +1436,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa // Check the hash. hash := br.HashString() if hash != fe.Hash { - return vterrors.Errorf(vtrpc.Code_INTERNAL, "hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash) + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash) } // Flush the buffer. @@ -1264,3 +1494,49 @@ func getPrimaryPosition(ctx context.Context, tmc tmclient.TabletManagerClient, t func init() { BackupRestoreEngineMap[builtinBackupEngineName] = &BuiltinBackupEngine{} } + +// closeWithRetry does just what it says. Retrying a close operation is important as +// an error is most likely transient/ephemeral and leaving around open file descriptors +// can lead to later problems as the file may be in a sort-of uploaded state where it +// exists but has not yet been finalized (this is true for GCS). This can cause +// unexpected behavior if you retry the file while the original request is still in this +// state. Most implementations such as GCS will automatically retry operations, but close +// is one that may be left to the caller (this is true for GCS). +// We model this retry after the GCS retry implementation described here: +// https://cloud.google.com/storage/docs/retry-strategy#go +func closeWithRetry(ctx context.Context, logger logutil.Logger, file io.Closer, name string) error { + backoff := 1 * time.Second + backoffLimit := backoff * 30 + var err error + retries := 0 + fileType := "source" + if _, ok := file.(io.Writer); ok { + fileType = "destination" + } + for { + if err = file.Close(); err == nil { + return nil + } + if retries == maxFileCloseRetries { + // Let's give up as this does not appear to be transient. We cannot know + // the full list of all transient/ephemeral errors across all backup engine + // providers so we consider it permanent at this point. We return a + // FAILED_PRECONDITION code which tells the upper layers not to retry as we + // now cannot be sure that this backup would be usable when it finishes. + logger.Errorf("Failed to close %s file %s after %d attempts, giving up: %v", fileType, name, maxFileCloseRetries, err) + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "failed to close the %s file after %d attempts, giving up", fileType, maxFileCloseRetries) + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(backoff): + // Exponential backoff with 2 as a factor. + if backoff != backoffLimit { + updatedBackoff := time.Duration(float64(backoff) * 2) + backoff = min(updatedBackoff, backoffLimit) + } + } + retries++ + logger.Errorf("Failed to close %s file %s, will perform retry %d of %d in %v: %v", fileType, name, retries, maxFileCloseRetries, backoff, err) + } +} diff --git a/go/vt/mysqlctl/file_close_test.go b/go/vt/mysqlctl/file_close_test.go new file mode 100644 index 00000000000..20ca94a65f1 --- /dev/null +++ b/go/vt/mysqlctl/file_close_test.go @@ -0,0 +1,643 @@ +/* +Copyright 2025 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package mysqlctl + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "os" + "path" + "path/filepath" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/mysql/replication" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/mysqlctl/backupstats" +) + +// mockCloser is a mock implementation of io.Closer that can be configured +// to fail a certain number of times before succeeding or to always fail. +type mockCloser struct { + failCount int // Number of times Close should fail before succeeding + currentFails int // Current number of failures + alwaysFail bool // If true, Close will always fail + mu sync.Mutex // Protects failCount and currentFails + closeCalled int // Number of times Close was called + closed bool // Whether the closer has been successfully closed + err error // The error to return on failure +} + +func newMockCloser(failCount int, err error) *mockCloser { + return &mockCloser{ + failCount: failCount, + err: err, + } +} + +func newAlwaysFailingCloser(err error) *mockCloser { + return &mockCloser{ + alwaysFail: true, + err: err, + } +} + +func (m *mockCloser) Close() error { + m.mu.Lock() + defer m.mu.Unlock() + + m.closeCalled++ + + if m.closed { + return nil + } + + if m.alwaysFail || m.currentFails < m.failCount { + m.currentFails++ + return m.err + } + + m.closed = true + return nil +} + +func (m *mockCloser) getCloseCalled() int { + m.mu.Lock() + defer m.mu.Unlock() + return m.closeCalled +} + +func (m *mockCloser) isClosed() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.closed +} + +// mockReadOnlyCloser combines mockCloser with Read-Only capabilities for testing. +type mockReadOnlyCloser struct { + *mockCloser + io.Reader +} + +// mockReadWriteCloser combines mockCloser with Read/Write capabilities for testing. +type mockReadWriteCloser struct { + *mockCloser + io.Reader + io.Writer +} + +func newMockReadOnlyCloser(failCount int, err error) *mockReadOnlyCloser { + return &mockReadOnlyCloser{ + mockCloser: newMockCloser(failCount, err), + Reader: bytes.NewReader([]byte("test data")), + } +} + +func newMockReadWriteCloser(failCount int, err error) *mockReadWriteCloser { + return &mockReadWriteCloser{ + mockCloser: newMockCloser(failCount, err), + Writer: &bytes.Buffer{}, + } +} + +// mockBackupHandle is a mock implementation of backupstorage.BackupHandle for testing. +type mockBackupHandle struct { + addFileReturn io.WriteCloser + addFileErr error + readFileReturn io.ReadCloser + readFileErr error + name string + failedFiles map[string]error + mu sync.Mutex +} + +func newMockBackupHandle() *mockBackupHandle { + return &mockBackupHandle{ + failedFiles: make(map[string]error), + name: "test-backup", + } +} + +func (m *mockBackupHandle) Name() string { + return m.name +} + +func (m *mockBackupHandle) Directory() string { + return "test-directory" +} + +func (m *mockBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) { + if m.addFileErr != nil { + return nil, m.addFileErr + } + return m.addFileReturn, nil +} + +func (m *mockBackupHandle) ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) { + if m.readFileErr != nil { + return nil, m.readFileErr + } + return m.readFileReturn, nil +} + +func (m *mockBackupHandle) EndBackup(ctx context.Context) error { + return nil +} + +func (m *mockBackupHandle) AbortBackup(ctx context.Context) error { + return nil +} + +func (m *mockBackupHandle) RecordError(filename string, err error) { + m.mu.Lock() + defer m.mu.Unlock() + m.failedFiles[filename] = err +} + +func (m *mockBackupHandle) GetFailedFiles() []string { + m.mu.Lock() + defer m.mu.Unlock() + files := make([]string, 0, len(m.failedFiles)) + for file := range m.failedFiles { + files = append(files, file) + } + return files +} + +func (m *mockBackupHandle) ResetErrorForFile(filename string) { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.failedFiles, filename) +} + +func (m *mockBackupHandle) Error() error { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.failedFiles) > 0 { + var errs []string + for file, err := range m.failedFiles { + errs = append(errs, fmt.Sprintf("%s: %v", file, err)) + } + return fmt.Errorf("failed files: %s", strings.Join(errs, ", ")) + } + return nil +} + +func (m *mockBackupHandle) HasErrors() bool { + m.mu.Lock() + defer m.mu.Unlock() + return len(m.failedFiles) > 0 +} + +// TestCloseWithRetrySuccess tests that closeWithRetry succeeds when Close succeeds. +func TestCloseWithRetrySuccess(t *testing.T) { + ctx := context.Background() + logger := logutil.NewMemoryLogger() + closer := newMockCloser(0, nil) + + err := closeWithRetry(ctx, logger, closer, "test-file") + + assert.NoError(t, err) + assert.Equal(t, 1, closer.getCloseCalled()) + assert.True(t, closer.isClosed()) +} + +// TestCloseWithRetryTransientFailure tests that closeWithRetry retries on transient failures. +func TestCloseWithRetryTransientFailure(t *testing.T) { + ctx := context.Background() + logger := logutil.NewMemoryLogger() + + // Test with various failure counts that should eventually succeed. + // Note: We keep the failure count low because closeWithRetry uses exponential + // backoff (1s, 2s, 4s, 8s, 16s, 30s...) which can make tests slow. + testCases := []struct { + name string + failCount int + }{ + {"fail once then succeed", 1}, + {"fail twice then succeed", 2}, + {"fail three times then succeed", 3}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + closer := newMockCloser(tc.failCount, errors.New("transient error")) + + err := closeWithRetry(ctx, logger, closer, "test-file") + + assert.NoError(t, err) + assert.True(t, closer.isClosed()) + // Should have called Close failCount+1 times (failCount failures + 1 success). + assert.Equal(t, tc.failCount+1, closer.getCloseCalled()) + }) + } +} + +// TestCloseWithRetryPermanentFailure tests that closeWithRetry gives up after maxFileCloseRetries. +// Note: This test uses context cancellation to avoid waiting for the full exponential backoff +// which can take several minutes with 20 retries. +func TestCloseWithRetryPermanentFailure(t *testing.T) { + // Use a context with a short timeout to avoid waiting for all retries. + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + logger := logutil.NewMemoryLogger() + closer := newAlwaysFailingCloser(errors.New("permanent error")) + + err := closeWithRetry(ctx, logger, closer, "test-file") + + // Should fail with context deadline exceeded. + assert.Error(t, err) + assert.Equal(t, context.DeadlineExceeded, err) + // Should have attempted multiple times before context deadline. + assert.Greater(t, closer.getCloseCalled(), 1) + assert.False(t, closer.isClosed()) +} + +// TestCloseWithRetryContextCancellation tests that closeWithRetry respects context cancellation. +func TestCloseWithRetryContextCancellation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + logger := logutil.NewMemoryLogger() + closer := newAlwaysFailingCloser(errors.New("error")) + // Cancel the context after a short delay. + go func() { + time.Sleep(50 * time.Millisecond) + cancel() + }() + + err := closeWithRetry(ctx, logger, closer, "test-file") + + assert.Error(t, err) + assert.Equal(t, context.Canceled, err) + // Should have called Close at least once but not maxFileCloseRetries times. + assert.Greater(t, closer.getCloseCalled(), 0) + assert.Less(t, closer.getCloseCalled(), maxFileCloseRetries+1) +} + +// TestBackupFileSourceCloseError tests error handling when a source file close fails during backup. +func TestBackupFileSourceCloseError(t *testing.T) { + ctx := context.Background() + logger := logutil.NewMemoryLogger() + // Create a temporary directory for test files. + tmpDir := t.TempDir() + // Create test source file. + sourceFile := path.Join(tmpDir, "source.txt") + err := os.WriteFile(sourceFile, []byte("test content"), 0644) + require.NoError(t, err) + // Create Mycnf pointing to our temp directory. + cnf := &Mycnf{ + DataDir: tmpDir, + } + be := &BuiltinBackupEngine{} + // Create a mock backup handle with a source file that fails to close multiple times. + sourceCloser := newMockReadWriteCloser(2, errors.New("failed to close source file")) + bh := newMockBackupHandle() + bh.addFileReturn = sourceCloser + params := BackupParams{ + Cnf: cnf, + Logger: logger, + Stats: backupstats.NoStats(), + Concurrency: 1, + } + fe := &FileEntry{ + Base: backupData, + Name: "source.txt", + } + + // backupFile should handle the error gracefully. + err = be.backupFile(ctx, params, bh, fe, "0") + + // Should succeed after retries. + assert.NoError(t, err) + assert.Equal(t, 3, sourceCloser.getCloseCalled()) // 2 failures + 1 success + assert.True(t, sourceCloser.isClosed()) +} + +// TestBackupFileDestinationCloseError tests error handling when a destination file close fails during backup. +func TestBackupFileDestinationCloseError(t *testing.T) { + ctx := context.Background() + logger := logutil.NewMemoryLogger() + // Create a temporary directory for test files. + tmpDir := t.TempDir() + // Create test source file. + sourceFile := path.Join(tmpDir, "source.txt") + content := []byte("test content for destination close error") + err := os.WriteFile(sourceFile, content, 0644) + require.NoError(t, err) + // Create Mycnf pointing to our temp directory. + cnf := &Mycnf{ + DataDir: tmpDir, + } + be := &BuiltinBackupEngine{} + // Create a mock backup handle with a destination that fails to close multiple times. + destCloser := newMockReadWriteCloser(3, errors.New("failed to close destination file")) + bh := newMockBackupHandle() + bh.addFileReturn = destCloser + params := BackupParams{ + Cnf: cnf, + Logger: logger, + Stats: backupstats.NoStats(), + Concurrency: 1, + } + fe := &FileEntry{ + Base: backupData, + Name: "source.txt", + } + + err = be.backupFile(ctx, params, bh, fe, "0") + + // Should succeed after retries. + assert.NoError(t, err) + assert.Equal(t, 4, destCloser.getCloseCalled()) // 3 failures + 1 success + assert.True(t, destCloser.isClosed()) +} + +// TestBackupFileDestinationCloseMaxRetries tests that destination close gives up after max retries. +// Note: This test uses a short context timeout to avoid waiting for all retries with exponential +// backoff. +func TestBackupFileDestinationCloseMaxRetries(t *testing.T) { + // Use a short timeout to avoid waiting for the full exponential backoff. + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + logger := logutil.NewMemoryLogger() + // Create a temporary directory for test files. + tmpDir := t.TempDir() + // Create test destination file. + destFile := path.Join(tmpDir, "destination.txt") + content := []byte("test content for max retries") + err := os.WriteFile(destFile, content, 0644) + require.NoError(t, err) + // Create Mycnf pointing to our temp directory. + cnf := &Mycnf{ + DataDir: tmpDir, + } + be := &BuiltinBackupEngine{} + // Create a mock backup handle with a destination file that always fails to close. + destCloser := &mockReadWriteCloser{ + mockCloser: newAlwaysFailingCloser(errors.New("permanent file close failure")), + Writer: &bytes.Buffer{}, + } + bh := newMockBackupHandle() + bh.addFileReturn = destCloser + params := BackupParams{ + Cnf: cnf, + Logger: logger, + Stats: backupstats.NoStats(), + Concurrency: 1, + } + fe := &FileEntry{ + Base: backupData, + Name: "destination.txt", + } + + err = be.backupFile(ctx, params, bh, fe, "0") + + // Should fail due to close error (context deadline exceeded). + assert.Error(t, err) + assert.Contains(t, err.Error(), "failed to close destination file") + // Should have attempted multiple times before timeout. + assert.Greater(t, destCloser.getCloseCalled(), 1) + assert.False(t, destCloser.isClosed()) +} + +// TestBackupManifestCloseError tests error handling when manifest writer close fails. +func TestBackupManifestCloseError(t *testing.T) { + ctx := context.Background() + logger := logutil.NewMemoryLogger() + be := &BuiltinBackupEngine{} + + testCases := []struct { + name string + failCount int + alwaysFail bool + expectError bool + expectedCallCount int + useTimeout bool + }{ + { + name: "close succeeds immediately", + failCount: 0, + alwaysFail: false, + expectError: false, + expectedCallCount: 1, + useTimeout: false, + }, + { + name: "close fails twice then succeeds", + failCount: 2, + alwaysFail: false, + expectError: false, + expectedCallCount: 3, + useTimeout: false, + }, + { + name: "close always fails", + failCount: 0, + alwaysFail: true, + expectError: true, + expectedCallCount: 2, // Will timeout before reaching max retries + useTimeout: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + testCtx := ctx + if tc.useTimeout { + var cancel context.CancelFunc + testCtx, cancel = context.WithTimeout(ctx, 2*time.Second) + defer cancel() + } + var wc *mockReadWriteCloser + if tc.alwaysFail { + wc = &mockReadWriteCloser{ + mockCloser: newAlwaysFailingCloser(errors.New("write error")), + Writer: &bytes.Buffer{}, + } + } else { + wc = newMockReadWriteCloser(tc.failCount, errors.New("transient write error")) + } + bh := newMockBackupHandle() + bh.addFileReturn = wc + tmpDir := t.TempDir() + cnf := &Mycnf{ + DataDir: tmpDir, + } + params := BackupParams{ + Cnf: cnf, + Logger: logger, + Stats: backupstats.NoStats(), + BackupTime: time.Now(), + } + fes := []FileEntry{} + + err := be.backupManifest(testCtx, params, bh, testPosition(), testPosition(), testPosition(), "", "test-uuid", "8.0.32", nil, fes, 0) + + if tc.expectError { + assert.Error(t, err) + assert.Contains(t, err.Error(), "cannot close backup") + } else { + assert.NoError(t, err) + } + assert.GreaterOrEqual(t, wc.getCloseCalled(), tc.expectedCallCount) + }) + } +} + +// TestRestoreFileSourceCloseError tests error handling when a source file close fails during restore. +func TestRestoreFileSourceCloseError(t *testing.T) { + ctx := context.Background() + logger := logutil.NewMemoryLogger() + tmpDir := t.TempDir() + be := &BuiltinBackupEngine{} + // Create a mock backup handle with a file source that fails to close. + sourceCloser := newMockReadOnlyCloser(2, errors.New("failed to close source file")) + bh := newMockBackupHandle() + bh.readFileReturn = sourceCloser + cnf := &Mycnf{ + DataDir: tmpDir, + } + params := RestoreParams{ + Cnf: cnf, + Logger: logger, + Stats: backupstats.NoStats(), + } + fe := &FileEntry{ + Base: backupData, + Name: "test-restore.txt", + Hash: "00000000", // Will fail hash check, but that's ok for this test + } + bm := builtinBackupManifest{ + SkipCompress: true, + } + + err := be.restoreFile(ctx, params, bh, fe, bm, "0") + + // Will fail due to hash mismatch, but we can verify close was attempted with retries. + assert.Error(t, err) + // Source should have been closed with retries. + assert.GreaterOrEqual(t, sourceCloser.getCloseCalled(), 1) +} + +// TestRestoreFileDestinationClose tests the happy path when closing a destination file during restore. +func TestRestoreFileDestinationClose(t *testing.T) { + ctx := context.Background() + logger := logutil.NewMemoryLogger() + tmpDir := t.TempDir() + // We need to create a more complete test setup for this + // because restoreFile creates the destination file itself. + be := &BuiltinBackupEngine{} + content := []byte("test restore content") + br := bytes.NewReader(content) + sourceCloser := &mockReadWriteCloser{ + mockCloser: newMockCloser(0, nil), + Reader: br, + } + bh := newMockBackupHandle() + bh.readFileReturn = sourceCloser + cnf := &Mycnf{ + DataDir: tmpDir, + } + params := RestoreParams{ + Cnf: cnf, + Logger: logger, + Stats: backupstats.NoStats(), + } + // Calculate the actual hash of our content for a successful restore. + bp := newBackupReader("test", 0, bytes.NewReader(content)) + io.ReadAll(bp) + expectedHash := bp.HashString() + fe := &FileEntry{ + Base: backupData, + Name: "test-restore.txt", + Hash: expectedHash, + } + bm := builtinBackupManifest{ + SkipCompress: true, + } + + err := be.restoreFile(ctx, params, bh, fe, bm, "0") + + // The restore should succeed (destination close should work for real files). + assert.NoError(t, err) + // Verify the file was actually created + destPath := filepath.Join(tmpDir, "test-restore.txt") + _, err = os.Stat(destPath) + assert.NoError(t, err) +} + +// TestRestoreFileWithCloseRetriesIntegration is an integration test that verifies +// the full restore flow handles close retries properly. +func TestRestoreFileWithCloseRetriesIntegration(t *testing.T) { + ctx := context.Background() + logger := logutil.NewMemoryLogger() + tmpDir := t.TempDir() + be := &BuiltinBackupEngine{} + content := []byte("integration test content for restore") + // Create a source that will fail to close a few times. + sourceCloser := &mockReadOnlyCloser{ + mockCloser: newMockCloser(1, errors.New("transient file close error")), + Reader: bytes.NewReader(content), + } + bh := newMockBackupHandle() + bh.readFileReturn = sourceCloser + cnf := &Mycnf{ + DataDir: tmpDir, + } + params := RestoreParams{ + Cnf: cnf, + Logger: logger, + Stats: backupstats.NoStats(), + } + // Calculate the hash. + bp := newBackupReader("test", 0, bytes.NewReader(content)) + io.ReadAll(bp) + expectedHash := bp.HashString() + fe := &FileEntry{ + Base: backupData, + Name: "integration-test.txt", + Hash: expectedHash, + } + bm := builtinBackupManifest{ + SkipCompress: true, + } + + err := be.restoreFile(ctx, params, bh, fe, bm, "0") + + // Should succeed after retries. + assert.NoError(t, err) + // Verify source was closed with retry (1 failure + 1 success) + assert.Equal(t, 2, sourceCloser.getCloseCalled()) + assert.True(t, sourceCloser.isClosed()) + // Verify the file was created with correct content. + destPath := filepath.Join(tmpDir, "integration-test.txt") + restoredContent, err := os.ReadFile(destPath) + require.NoError(t, err) + assert.Equal(t, content, restoredContent) +} + +// Helper function to create a test replication position. +func testPosition() replication.Position { + return replication.Position{} +} diff --git a/go/vt/mysqlctl/gcsbackupstorage/gcs.go b/go/vt/mysqlctl/gcsbackupstorage/gcs.go index 814395a225a..13852ac6f74 100644 --- a/go/vt/mysqlctl/gcsbackupstorage/gcs.go +++ b/go/vt/mysqlctl/gcsbackupstorage/gcs.go @@ -246,7 +246,9 @@ func (bs *GCSBackupStorage) Close() error { // so we know to create a new client the next time one // is needed. client := bs._client - bs._client = nil + defer func() { + bs._client = nil + }() if err := client.Close(); err != nil { return err }