From dde7fb7a405be08b4cd4cff14fc3c6d9a5c3dcf6 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 30 Jul 2023 21:21:15 +0300 Subject: [PATCH 01/16] Backup: graceful handling of compressor/decompressor closure Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/builtinbackupengine.go | 71 +++++++++++++++------------ 1 file changed, 40 insertions(+), 31 deletions(-) diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 6f43255f9c1..4b3ab1a10d7 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -21,6 +21,7 @@ import ( "context" "encoding/hex" "encoding/json" + "errors" "fmt" "hash" "hash/crc32" @@ -56,6 +57,8 @@ const ( builtinBackupEngineName = "builtin" autoIncrementalFromPos = "auto" dataDictionaryFile = "mysql.ibd" + + compressorTimeout = 3 * time.Second ) var ( @@ -756,6 +759,27 @@ func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger } } +// closeWithTimeout attempts to close an unreliable Closer. The Close() function may time out. Unfortunately Close() does not accept a +// context, and so we wrap it with an external context.WithTimeout. +// We only wait very briefly as we don't want to hold up anything else. +func closeWithTimeout(ctx context.Context, closer io.Closer, timeout time.Duration) error { + done := make(chan error) + defer close(done) + + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + go func() { + done <- closer.Close() + }() + select { + case err := <-done: + return err + case <-ctx.Done(): + return vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timeout waiting for compressor/decompressor Close()") + } +} + // backupFile backs up an individual file. func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, fe *FileEntry, name string) (finalErr error) { // Open the source file for reading. @@ -795,12 +819,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara defer func(name, fileName string) { closeDestAt := time.Now() if rerr := dest.Close(); rerr != nil { - if finalErr != nil { - // We already have an error, just log this one. - params.Logger.Errorf2(rerr, "failed to close file %v,%v", name, fe.Name) - } else { - finalErr = rerr - } + finalErr = errors.Join(finalErr, vterrors.Wrapf(rerr, "failed to close file %v,%v", name, fe.Name)) } params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt)) }(name, fe.Name) @@ -827,6 +846,16 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara compressStats := params.Stats.Scope(stats.Operation("Compressor:Write")) writer = ioutil.NewMeteredWriter(compressor, compressStats.TimedIncrementBytes) + + defer func() { + // Close gzip to flush it, after that all data is sent to writer. + closeCompressorAt := time.Now() + if cerr := closeWithTimeout(ctx, compressor, compressorTimeout); err != nil { + finalErr = errors.Join(finalErr, vterrors.Wrapf(cerr, "failed to close compressor %v", name)) + } + params.Stats.Scope(stats.Operation("Compressor:Close")).TimedIncrement(time.Since(closeCompressorAt)) + }() + } if builtinBackupFileReadBufferSize > 0 { @@ -840,15 +869,6 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara return vterrors.Wrap(err, "cannot copy data") } - // Close gzip to flush it, after that all data is sent to writer. - if compressor != nil { - closeCompressorAt := time.Now() - if err = compressor.Close(); err != nil { - return vterrors.Wrap(err, "cannot close compressor") - } - params.Stats.Scope(stats.Operation("Compressor:Close")).TimedIncrement(time.Since(closeCompressorAt)) - } - // Close the backupPipe to finish writing on destination. if err = bw.Close(); err != nil { return vterrors.Wrapf(err, "cannot flush destination: %v", name) @@ -860,7 +880,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara // Save the hash. fe.Hash = bw.HashString() - return nil + return finalErr } // executeRestoreFullBackup restores the files from a full backup. The underlying mysql database service is expected to be stopped. @@ -1047,12 +1067,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa defer func() { closeDestAt := time.Now() if cerr := dest.Close(); cerr != nil { - if finalErr != nil { - // We already have an error, just log this one. - log.Errorf("failed to close file %v: %v", name, cerr) - } else { - finalErr = vterrors.Wrap(cerr, "failed to close destination file") - } + finalErr = errors.Join(finalErr, vterrors.Wrap(cerr, "failed to close destination file")) } params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt)) }() @@ -1097,14 +1112,8 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa defer func() { closeDecompressorAt := time.Now() - if cerr := decompressor.Close(); cerr != nil { - params.Logger.Errorf("failed to close decompressor: %v", cerr) - if finalErr != nil { - // We already have an error, just log this one. - log.Errorf("failed to close decompressor %v: %v", name, cerr) - } else { - finalErr = vterrors.Wrap(cerr, "failed to close decompressor") - } + if cerr := closeWithTimeout(ctx, decompressor, compressorTimeout); err != nil { + finalErr = errors.Join(finalErr, vterrors.Wrapf(cerr, "failed to close decompressor %v", name)) } params.Stats.Scope(stats.Operation("Decompressor:Close")).TimedIncrement(time.Since(closeDecompressorAt)) }() @@ -1130,7 +1139,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa return vterrors.Wrap(err, "failed to close the source reader") } - return nil + return finalErr } // ShouldDrainForBackup satisfies the BackupEngine interface From ddcee65be447791387687e4a6f8122574676fd77 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 31 Jul 2023 08:10:28 +0300 Subject: [PATCH 02/16] ensure to never lose an error where finalErr is a named return value with defer function Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/builtinbackupengine.go | 64 +++++++++++++++++++++------ 1 file changed, 51 insertions(+), 13 deletions(-) diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 4b3ab1a10d7..f96e5cf0e73 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -819,11 +819,25 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara defer func(name, fileName string) { closeDestAt := time.Now() if rerr := dest.Close(); rerr != nil { - finalErr = errors.Join(finalErr, vterrors.Wrapf(rerr, "failed to close file %v,%v", name, fe.Name)) + rerr = vterrors.Wrapf(rerr, "failed to close file %v,%v", name, fe.Name) + params.Logger.Error(rerr) + finalErr = errors.Join(finalErr, rerr) } params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt)) }(name, fe.Name) + // Note about `finalErr`: it's a named return value and we have a deferred function that sets it. That means + // this function will ALWAYS return `finalErr`, overriding any other returned value (ie returned error). + // See for example this snippet: + // // explain_this_comment() returns "surprise" rather than : + // func explain_this_comment() (finalErr error) { + // defer func() { + // finalErr = fmt.Errorf("surprise") + // }() + // return nil + // } + // This is why from this point on we take care to always assign any error into `finalErr`. + destStats := params.Stats.Scope(stats.Operation("Destination:Write")) timedDest := ioutil.NewMeteredWriteCloser(dest, destStats.TimedIncrementBytes) @@ -841,7 +855,8 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara compressor, err = newBuiltinCompressor(CompressionEngineName, writer, params.Logger) } if err != nil { - return vterrors.Wrap(err, "can't create compressor") + finalErr = vterrors.Wrap(err, "can't create compressor") + return finalErr } compressStats := params.Stats.Scope(stats.Operation("Compressor:Write")) @@ -851,7 +866,9 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara // Close gzip to flush it, after that all data is sent to writer. closeCompressorAt := time.Now() if cerr := closeWithTimeout(ctx, compressor, compressorTimeout); err != nil { - finalErr = errors.Join(finalErr, vterrors.Wrapf(cerr, "failed to close compressor %v", name)) + cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", name) + params.Logger.Error(cerr) + finalErr = errors.Join(finalErr, cerr) } params.Stats.Scope(stats.Operation("Compressor:Close")).TimedIncrement(time.Since(closeCompressorAt)) }() @@ -866,16 +883,19 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara // optional pipe, tee, output file and hasher). _, err = io.Copy(writer, reader) if err != nil { - return vterrors.Wrap(err, "cannot copy data") + finalErr = vterrors.Wrap(err, "cannot copy data") + return finalErr } // Close the backupPipe to finish writing on destination. - if err = bw.Close(); err != nil { - return vterrors.Wrapf(err, "cannot flush destination: %v", name) + if err := bw.Close(); err != nil { + finalErr = vterrors.Wrapf(err, "cannot flush destination: %v", name) + return finalErr } if err := br.Close(); err != nil { - return vterrors.Wrap(err, "failed to close the source reader") + finalErr = vterrors.Wrap(err, "failed to close the source reader") + return finalErr } // Save the hash. @@ -1113,30 +1133,48 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa defer func() { closeDecompressorAt := time.Now() if cerr := closeWithTimeout(ctx, decompressor, compressorTimeout); err != nil { - finalErr = errors.Join(finalErr, vterrors.Wrapf(cerr, "failed to close decompressor %v", name)) + cerr = vterrors.Wrapf(cerr, "failed to close decompressor %v", name) + params.Logger.Error(cerr) + finalErr = errors.Join(finalErr, cerr) } params.Stats.Scope(stats.Operation("Decompressor:Close")).TimedIncrement(time.Since(closeDecompressorAt)) }() } + // Note about `finalErr`: it's a named return value and we have a deferred function that sets it. That means + // this function will ALWAYS return `finalErr`, overriding any other returned value (ie returned error). + // See for example this snippet: + // // explain_this_comment() returns "surprise" rather than : + // func explain_this_comment() (finalErr error) { + // defer func() { + // finalErr = fmt.Errorf("surprise") + // }() + // return nil + // } + // This is why from this point on we take care to always assign any error into `finalErr`. + // Copy the data. Will also write to the hasher. - if _, err = io.Copy(bufferedDest, reader); err != nil { - return vterrors.Wrap(err, "failed to copy file contents") + if _, err := io.Copy(bufferedDest, reader); err != nil { + finalErr = vterrors.Wrap(err, "failed to copy file contents") + return finalErr } // Check the hash. hash := br.HashString() if hash != fe.Hash { - return vterrors.Errorf(vtrpc.Code_INTERNAL, "hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash) + finalErr = vterrors.Errorf(vtrpc.Code_INTERNAL, "hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash) + return finalErr } // Flush the buffer. if err := bufferedDest.Flush(); err != nil { - return vterrors.Wrap(err, "failed to flush destination buffer") + finalErr = vterrors.Wrap(err, "failed to flush destination buffer") + return finalErr } if err := br.Close(); err != nil { - return vterrors.Wrap(err, "failed to close the source reader") + finalErr = vterrors.Wrap(err, "failed to close the source reader") + return finalErr } return finalErr From c8a1963c2122161e2654a4637aa415c4f640c068 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 31 Jul 2023 10:53:13 +0300 Subject: [PATCH 03/16] simplify return path, as the named finalErr return value is auto assigned on return Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/builtinbackupengine.go | 52 ++++++--------------------- 1 file changed, 10 insertions(+), 42 deletions(-) diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index f96e5cf0e73..4712232abc2 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -826,18 +826,6 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt)) }(name, fe.Name) - // Note about `finalErr`: it's a named return value and we have a deferred function that sets it. That means - // this function will ALWAYS return `finalErr`, overriding any other returned value (ie returned error). - // See for example this snippet: - // // explain_this_comment() returns "surprise" rather than : - // func explain_this_comment() (finalErr error) { - // defer func() { - // finalErr = fmt.Errorf("surprise") - // }() - // return nil - // } - // This is why from this point on we take care to always assign any error into `finalErr`. - destStats := params.Stats.Scope(stats.Operation("Destination:Write")) timedDest := ioutil.NewMeteredWriteCloser(dest, destStats.TimedIncrementBytes) @@ -855,8 +843,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara compressor, err = newBuiltinCompressor(CompressionEngineName, writer, params.Logger) } if err != nil { - finalErr = vterrors.Wrap(err, "can't create compressor") - return finalErr + return vterrors.Wrap(err, "can't create compressor") } compressStats := params.Stats.Scope(stats.Operation("Compressor:Write")) @@ -883,24 +870,21 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara // optional pipe, tee, output file and hasher). _, err = io.Copy(writer, reader) if err != nil { - finalErr = vterrors.Wrap(err, "cannot copy data") - return finalErr + return vterrors.Wrap(err, "cannot copy data") } // Close the backupPipe to finish writing on destination. if err := bw.Close(); err != nil { - finalErr = vterrors.Wrapf(err, "cannot flush destination: %v", name) - return finalErr + return vterrors.Wrapf(err, "cannot flush destination: %v", name) } if err := br.Close(); err != nil { - finalErr = vterrors.Wrap(err, "failed to close the source reader") - return finalErr + return vterrors.Wrap(err, "failed to close the source reader") } // Save the hash. fe.Hash = bw.HashString() - return finalErr + return nil } // executeRestoreFullBackup restores the files from a full backup. The underlying mysql database service is expected to be stopped. @@ -1141,43 +1125,27 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa }() } - // Note about `finalErr`: it's a named return value and we have a deferred function that sets it. That means - // this function will ALWAYS return `finalErr`, overriding any other returned value (ie returned error). - // See for example this snippet: - // // explain_this_comment() returns "surprise" rather than : - // func explain_this_comment() (finalErr error) { - // defer func() { - // finalErr = fmt.Errorf("surprise") - // }() - // return nil - // } - // This is why from this point on we take care to always assign any error into `finalErr`. - // Copy the data. Will also write to the hasher. if _, err := io.Copy(bufferedDest, reader); err != nil { - finalErr = vterrors.Wrap(err, "failed to copy file contents") - return finalErr + return vterrors.Wrap(err, "failed to copy file contents") } // Check the hash. hash := br.HashString() if hash != fe.Hash { - finalErr = vterrors.Errorf(vtrpc.Code_INTERNAL, "hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash) - return finalErr + return vterrors.Errorf(vtrpc.Code_INTERNAL, "hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash) } // Flush the buffer. if err := bufferedDest.Flush(); err != nil { - finalErr = vterrors.Wrap(err, "failed to flush destination buffer") - return finalErr + return vterrors.Wrap(err, "failed to flush destination buffer") } if err := br.Close(); err != nil { - finalErr = vterrors.Wrap(err, "failed to close the source reader") - return finalErr + return vterrors.Wrap(err, "failed to close the source reader") } - return finalErr + return nil } // ShouldDrainForBackup satisfies the BackupEngine interface From 3cac18571765ce52829217c57622467b30123c3f Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 31 Jul 2023 11:28:59 +0300 Subject: [PATCH 04/16] add unit test for closeWithTimeout Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/builtinbackupengine2_test.go | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/go/vt/mysqlctl/builtinbackupengine2_test.go b/go/vt/mysqlctl/builtinbackupengine2_test.go index 29252392c7f..237f8be79f8 100644 --- a/go/vt/mysqlctl/builtinbackupengine2_test.go +++ b/go/vt/mysqlctl/builtinbackupengine2_test.go @@ -18,9 +18,12 @@ limitations under the License. package mysqlctl import ( + "context" "testing" + "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestGetIncrementalFromPosGTIDSet(t *testing.T) { @@ -67,3 +70,20 @@ func TestGetIncrementalFromPosGTIDSet(t *testing.T) { }) } } + +type hangCloser struct { +} + +func (c hangCloser) Close() error { + ch := make(chan bool) + ch <- true // hang forever + return nil +} + +func TestCloseWithTimeout(t *testing.T) { + closer := hangCloser{} + + err := closeWithTimeout(context.Background(), closer, time.Second) + require.Error(t, err) + assert.Contains(t, err.Error(), "timeout waiting") +} From aabdc1800844aec70a7eead2b021ed56d62b5235 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 1 Aug 2023 10:42:36 +0300 Subject: [PATCH 05/16] refactor: created TimeoutCloser wrapper around io.Closer Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/ioutil/timeout_closer.go | 60 +++++++++++++++++++++ go/ioutil/timeout_closer_test.go | 46 ++++++++++++++++ go/vt/mysqlctl/builtinbackupengine.go | 30 ++--------- go/vt/mysqlctl/builtinbackupengine2_test.go | 20 ------- 4 files changed, 111 insertions(+), 45 deletions(-) create mode 100644 go/ioutil/timeout_closer.go create mode 100644 go/ioutil/timeout_closer_test.go diff --git a/go/ioutil/timeout_closer.go b/go/ioutil/timeout_closer.go new file mode 100644 index 00000000000..7c93e823846 --- /dev/null +++ b/go/ioutil/timeout_closer.go @@ -0,0 +1,60 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +MeteredReadCloser and MeteredReader are time-and-byte-tracking wrappers around +ReadCloser and Reader. +*/ + +package ioutil + +import ( + "context" + "io" + "time" +) + +// TimeoutCloser is an io.Closer that has a timeout for executing the Close() function. +type TimeoutCloser struct { + io.Closer + closer io.Closer + timeout time.Duration +} + +func NewTimeoutCloser(closer io.Closer, timeout time.Duration) *TimeoutCloser { + return &TimeoutCloser{ + closer: closer, + timeout: timeout, + } +} + +func (c *TimeoutCloser) Close() error { + done := make(chan error) + defer close(done) + + ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + defer cancel() + + go func() { + done <- c.closer.Close() + }() + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/go/ioutil/timeout_closer_test.go b/go/ioutil/timeout_closer_test.go new file mode 100644 index 00000000000..8bdca06d5bc --- /dev/null +++ b/go/ioutil/timeout_closer_test.go @@ -0,0 +1,46 @@ +/* +Copyright 2023 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +MeteredReadCloser and MeteredReader are time-and-byte-tracking wrappers around +ReadCloser and Reader. +*/ + +package ioutil + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type hangCloser struct { +} + +func (c hangCloser) Close() error { + ch := make(chan bool) + ch <- true // hang forever + return nil +} + +func TestTimeoutCloser(t *testing.T) { + closer := NewTimeoutCloser(&hangCloser{}, time.Second) + err := closer.Close() + require.Error(t, err) + assert.Contains(t, err.Error(), "context deadline exceeded") +} diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 4712232abc2..458adf41107 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -759,27 +759,6 @@ func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger } } -// closeWithTimeout attempts to close an unreliable Closer. The Close() function may time out. Unfortunately Close() does not accept a -// context, and so we wrap it with an external context.WithTimeout. -// We only wait very briefly as we don't want to hold up anything else. -func closeWithTimeout(ctx context.Context, closer io.Closer, timeout time.Duration) error { - done := make(chan error) - defer close(done) - - ctx, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - go func() { - done <- closer.Close() - }() - select { - case err := <-done: - return err - case <-ctx.Done(): - return vterrors.Errorf(vtrpc.Code_DEADLINE_EXCEEDED, "timeout waiting for compressor/decompressor Close()") - } -} - // backupFile backs up an individual file. func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, fe *FileEntry, name string) (finalErr error) { // Open the source file for reading. @@ -835,8 +814,8 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara var writer io.Writer = bw // Create the gzip compression pipe, if necessary. - var compressor io.WriteCloser if backupStorageCompress { + var compressor io.WriteCloser if ExternalCompressorCmd != "" { compressor, err = newExternalCompressor(ctx, ExternalCompressorCmd, writer, params.Logger) } else { @@ -845,6 +824,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara if err != nil { return vterrors.Wrap(err, "can't create compressor") } + closer := ioutil.NewTimeoutCloser(compressor, closeTimeout) compressStats := params.Stats.Scope(stats.Operation("Compressor:Write")) writer = ioutil.NewMeteredWriter(compressor, compressStats.TimedIncrementBytes) @@ -852,14 +832,13 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara defer func() { // Close gzip to flush it, after that all data is sent to writer. closeCompressorAt := time.Now() - if cerr := closeWithTimeout(ctx, compressor, compressorTimeout); err != nil { + if cerr := closer.Close(); err != nil { cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", name) params.Logger.Error(cerr) finalErr = errors.Join(finalErr, cerr) } params.Stats.Scope(stats.Operation("Compressor:Close")).TimedIncrement(time.Since(closeCompressorAt)) }() - } if builtinBackupFileReadBufferSize > 0 { @@ -1110,13 +1089,14 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa if err != nil { return vterrors.Wrap(err, "can't create decompressor") } + closer := ioutil.NewTimeoutCloser(decompressor, closeTimeout) decompressStats := params.Stats.Scope(stats.Operation("Decompressor:Read")) reader = ioutil.NewMeteredReader(decompressor, decompressStats.TimedIncrementBytes) defer func() { closeDecompressorAt := time.Now() - if cerr := closeWithTimeout(ctx, decompressor, compressorTimeout); err != nil { + if cerr := closer.Close(); err != nil { cerr = vterrors.Wrapf(cerr, "failed to close decompressor %v", name) params.Logger.Error(cerr) finalErr = errors.Join(finalErr, cerr) diff --git a/go/vt/mysqlctl/builtinbackupengine2_test.go b/go/vt/mysqlctl/builtinbackupengine2_test.go index 237f8be79f8..29252392c7f 100644 --- a/go/vt/mysqlctl/builtinbackupengine2_test.go +++ b/go/vt/mysqlctl/builtinbackupengine2_test.go @@ -18,12 +18,9 @@ limitations under the License. package mysqlctl import ( - "context" "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestGetIncrementalFromPosGTIDSet(t *testing.T) { @@ -70,20 +67,3 @@ func TestGetIncrementalFromPosGTIDSet(t *testing.T) { }) } } - -type hangCloser struct { -} - -func (c hangCloser) Close() error { - ch := make(chan bool) - ch <- true // hang forever - return nil -} - -func TestCloseWithTimeout(t *testing.T) { - closer := hangCloser{} - - err := closeWithTimeout(context.Background(), closer, time.Second) - require.Error(t, err) - assert.Contains(t, err.Error(), "timeout waiting") -} From b71f6df3130eb3e1269109391adf72b627c1709a Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 1 Aug 2023 10:58:10 +0300 Subject: [PATCH 06/16] removed irrelevant comment Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/ioutil/timeout_closer.go | 5 ----- go/ioutil/timeout_closer_test.go | 5 ----- 2 files changed, 10 deletions(-) diff --git a/go/ioutil/timeout_closer.go b/go/ioutil/timeout_closer.go index 7c93e823846..52c93c12c90 100644 --- a/go/ioutil/timeout_closer.go +++ b/go/ioutil/timeout_closer.go @@ -14,11 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -/* -MeteredReadCloser and MeteredReader are time-and-byte-tracking wrappers around -ReadCloser and Reader. -*/ - package ioutil import ( diff --git a/go/ioutil/timeout_closer_test.go b/go/ioutil/timeout_closer_test.go index 8bdca06d5bc..6b46ff40897 100644 --- a/go/ioutil/timeout_closer_test.go +++ b/go/ioutil/timeout_closer_test.go @@ -14,11 +14,6 @@ See the License for the specific language governing permissions and limitations under the License. */ -/* -MeteredReadCloser and MeteredReader are time-and-byte-tracking wrappers around -ReadCloser and Reader. -*/ - package ioutil import ( From 7cc44e6207ca076cea0352137f227a4fa4e0c959 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 1 Aug 2023 11:01:05 +0300 Subject: [PATCH 07/16] add unit test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/ioutil/timeout_closer_test.go | 22 ++++++++++++++++------ 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/go/ioutil/timeout_closer_test.go b/go/ioutil/timeout_closer_test.go index 6b46ff40897..8685dec7fec 100644 --- a/go/ioutil/timeout_closer_test.go +++ b/go/ioutil/timeout_closer_test.go @@ -25,17 +25,27 @@ import ( ) type hangCloser struct { + hang bool } func (c hangCloser) Close() error { - ch := make(chan bool) - ch <- true // hang forever + if c.hang { + ch := make(chan bool) + ch <- true // hang forever + } return nil } func TestTimeoutCloser(t *testing.T) { - closer := NewTimeoutCloser(&hangCloser{}, time.Second) - err := closer.Close() - require.Error(t, err) - assert.Contains(t, err.Error(), "context deadline exceeded") + { + closer := NewTimeoutCloser(&hangCloser{hang: false}, time.Second) + err := closer.Close() + require.NoError(t, err) + } + { + closer := NewTimeoutCloser(&hangCloser{hang: true}, time.Second) + err := closer.Close() + require.Error(t, err) + assert.Contains(t, err.Error(), "context deadline exceeded") + } } From 5c52d6db0c4b6ca37b479176788b84454b4b7452 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 1 Aug 2023 16:04:06 +0300 Subject: [PATCH 08/16] defer cancel Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/builtinbackupengine.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index 458adf41107..e3689122186 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -57,8 +57,6 @@ const ( builtinBackupEngineName = "builtin" autoIncrementalFromPos = "auto" dataDictionaryFile = "mysql.ibd" - - compressorTimeout = 3 * time.Second ) var ( @@ -761,6 +759,8 @@ func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger // backupFile backs up an individual file. func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, fe *FileEntry, name string) (finalErr error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() // Open the source file for reading. openSourceAt := time.Now() source, err := fe.open(params.Cnf, true) @@ -832,6 +832,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara defer func() { // Close gzip to flush it, after that all data is sent to writer. closeCompressorAt := time.Now() + params.Logger.Infof("closing compressor") if cerr := closer.Close(); err != nil { cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", name) params.Logger.Error(cerr) @@ -1018,6 +1019,8 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP // restoreFile restores an individual file. func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, bm builtinBackupManifest, name string) (finalErr error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() // Open the source file for reading. openSourceAt := time.Now() source, err := bh.ReadFile(ctx, name) @@ -1096,6 +1099,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa defer func() { closeDecompressorAt := time.Now() + params.Logger.Infof("closing decompressor") if cerr := closer.Close(); err != nil { cerr = vterrors.Wrapf(cerr, "failed to close decompressor %v", name) params.Logger.Error(cerr) From 59cfbcf86590543d43e0f0939107123f18e457bb Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Tue, 1 Aug 2023 16:05:18 +0300 Subject: [PATCH 09/16] refactor closeTimeout to neutral file Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/backup.go | 3 +++ go/vt/mysqlctl/xtrabackupengine.go | 3 --- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index c07c15d16b5..ab5a42c2323 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -57,6 +57,9 @@ const ( RestoreState = "restore_in_progress" // BackupTimestampFormat is the format in which we save BackupTime and FinishedTime BackupTimestampFormat = "2006-01-02.150405" + + // closeTimeout is the timeout for closing backup files after writing. + closeTimeout = 10 * time.Minute ) const ( diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index eb7d228a6fa..7423d35a6eb 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -70,9 +70,6 @@ const ( xtrabackupBinaryName = "xtrabackup" xtrabackupEngineName = "xtrabackup" xbstream = "xbstream" - - // closeTimeout is the timeout for closing backup files after writing. - closeTimeout = 10 * time.Minute ) // xtraBackupManifest represents a backup. From 0769be531c5488c7d368d5324fb1f26199f25248 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 2 Aug 2023 08:16:33 +0300 Subject: [PATCH 10/16] use integral context Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/ioutil/timeout_closer.go | 7 ++++--- go/ioutil/timeout_closer_test.go | 6 ++++-- go/vt/mysqlctl/builtinbackupengine.go | 4 ++-- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/go/ioutil/timeout_closer.go b/go/ioutil/timeout_closer.go index 52c93c12c90..c10e7269cc3 100644 --- a/go/ioutil/timeout_closer.go +++ b/go/ioutil/timeout_closer.go @@ -24,13 +24,14 @@ import ( // TimeoutCloser is an io.Closer that has a timeout for executing the Close() function. type TimeoutCloser struct { - io.Closer + ctx context.Context closer io.Closer timeout time.Duration } -func NewTimeoutCloser(closer io.Closer, timeout time.Duration) *TimeoutCloser { +func NewTimeoutCloser(ctx context.Context, closer io.Closer, timeout time.Duration) *TimeoutCloser { return &TimeoutCloser{ + ctx: ctx, closer: closer, timeout: timeout, } @@ -40,7 +41,7 @@ func (c *TimeoutCloser) Close() error { done := make(chan error) defer close(done) - ctx, cancel := context.WithTimeout(context.Background(), c.timeout) + ctx, cancel := context.WithTimeout(c.ctx, c.timeout) defer cancel() go func() { diff --git a/go/ioutil/timeout_closer_test.go b/go/ioutil/timeout_closer_test.go index 8685dec7fec..023b4b64a79 100644 --- a/go/ioutil/timeout_closer_test.go +++ b/go/ioutil/timeout_closer_test.go @@ -17,6 +17,7 @@ limitations under the License. package ioutil import ( + "context" "testing" "time" @@ -37,13 +38,14 @@ func (c hangCloser) Close() error { } func TestTimeoutCloser(t *testing.T) { + ctx := context.Background() { - closer := NewTimeoutCloser(&hangCloser{hang: false}, time.Second) + closer := NewTimeoutCloser(ctx, &hangCloser{hang: false}, time.Second) err := closer.Close() require.NoError(t, err) } { - closer := NewTimeoutCloser(&hangCloser{hang: true}, time.Second) + closer := NewTimeoutCloser(ctx, &hangCloser{hang: true}, time.Second) err := closer.Close() require.Error(t, err) assert.Contains(t, err.Error(), "context deadline exceeded") diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index e3689122186..e4b9121e23a 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -824,7 +824,7 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara if err != nil { return vterrors.Wrap(err, "can't create compressor") } - closer := ioutil.NewTimeoutCloser(compressor, closeTimeout) + closer := ioutil.NewTimeoutCloser(ctx, compressor, closeTimeout) compressStats := params.Stats.Scope(stats.Operation("Compressor:Write")) writer = ioutil.NewMeteredWriter(compressor, compressStats.TimedIncrementBytes) @@ -1092,7 +1092,7 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa if err != nil { return vterrors.Wrap(err, "can't create decompressor") } - closer := ioutil.NewTimeoutCloser(decompressor, closeTimeout) + closer := ioutil.NewTimeoutCloser(ctx, decompressor, closeTimeout) decompressStats := params.Stats.Scope(stats.Operation("Decompressor:Read")) reader = ioutil.NewMeteredReader(decompressor, decompressStats.TimedIncrementBytes) From ebe4cd7b747b89c0a1cd56c854ce3b6fcabf8c71 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 2 Aug 2023 08:18:52 +0300 Subject: [PATCH 11/16] improved test Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/ioutil/timeout_closer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/ioutil/timeout_closer_test.go b/go/ioutil/timeout_closer_test.go index 023b4b64a79..9aabe307c85 100644 --- a/go/ioutil/timeout_closer_test.go +++ b/go/ioutil/timeout_closer_test.go @@ -48,6 +48,6 @@ func TestTimeoutCloser(t *testing.T) { closer := NewTimeoutCloser(ctx, &hangCloser{hang: true}, time.Second) err := closer.Close() require.Error(t, err) - assert.Contains(t, err.Error(), "context deadline exceeded") + assert.ErrorIs(t, err, context.DeadlineExceeded) } } From 0a6ee15fd9ab959106a8c2f1141172b34eba6704 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 2 Aug 2023 08:19:02 +0300 Subject: [PATCH 12/16] fix defer calling point Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/ioutil/timeout_closer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go/ioutil/timeout_closer.go b/go/ioutil/timeout_closer.go index c10e7269cc3..0545d81cd00 100644 --- a/go/ioutil/timeout_closer.go +++ b/go/ioutil/timeout_closer.go @@ -39,12 +39,12 @@ func NewTimeoutCloser(ctx context.Context, closer io.Closer, timeout time.Durati func (c *TimeoutCloser) Close() error { done := make(chan error) - defer close(done) ctx, cancel := context.WithTimeout(c.ctx, c.timeout) defer cancel() go func() { + defer close(done) done <- c.closer.Close() }() select { From 968432a3c6a5df5a0d3903c3928bb39800df621d Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 2 Aug 2023 08:22:09 +0300 Subject: [PATCH 13/16] rename test files Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/backup_blackbox_test.go | 577 +++++++++++++++++++ go/vt/mysqlctl/builtinbackupengine2_test.go | 69 --- go/vt/mysqlctl/builtinbackupengine_test.go | 584 ++------------------ 3 files changed, 615 insertions(+), 615 deletions(-) create mode 100644 go/vt/mysqlctl/backup_blackbox_test.go delete mode 100644 go/vt/mysqlctl/builtinbackupengine2_test.go diff --git a/go/vt/mysqlctl/backup_blackbox_test.go b/go/vt/mysqlctl/backup_blackbox_test.go new file mode 100644 index 00000000000..906e250ecaf --- /dev/null +++ b/go/vt/mysqlctl/backup_blackbox_test.go @@ -0,0 +1,577 @@ +/* +Copyright 2022 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package mysqlctl_test is the blackbox tests for package mysqlctl. +package mysqlctl_test + +import ( + "context" + "fmt" + "os" + "path" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqltypes" + + "vitess.io/vitess/go/mysql" + "vitess.io/vitess/go/mysql/fakesqldb" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/mysqlctl/backupstats" + "vitess.io/vitess/go/vt/mysqlctl/filebackupstorage" + "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/proto/vttime" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/memorytopo" +) + +func setBuiltinBackupMysqldDeadline(t time.Duration) time.Duration { + old := mysqlctl.BuiltinBackupMysqldTimeout + mysqlctl.BuiltinBackupMysqldTimeout = t + + return old +} + +func createBackupDir(root string, dirs ...string) error { + for _, dir := range dirs { + if err := os.MkdirAll(path.Join(root, dir), 0755); err != nil { + return err + } + } + + return nil +} + +func createBackupFiles(root string, fileCount int, ext string) error { + for i := 0; i < fileCount; i++ { + f, err := os.Create(path.Join(root, fmt.Sprintf("%d.%s", i, ext))) + if err != nil { + return err + } + if _, err := f.Write([]byte("hello, world!")); err != nil { + return err + } + defer f.Close() + } + + return nil +} + +func TestExecuteBackup(t *testing.T) { + // Set up local backup directory + backupRoot := "testdata/builtinbackup_test" + filebackupstorage.FileBackupStorageRoot = backupRoot + require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) + dataDir := path.Join(backupRoot, "datadir") + // Add some files under data directory to force backup to actually backup files. + require.NoError(t, createBackupDir(dataDir, "test1")) + require.NoError(t, createBackupDir(dataDir, "test2")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) + defer os.RemoveAll(backupRoot) + + ctx := context.Background() + + needIt, err := needInnoDBRedoLogSubdir() + require.NoError(t, err) + if needIt { + fpath := path.Join("log", mysql.DynamicRedoLogSubdir) + if err := createBackupDir(backupRoot, fpath); err != nil { + require.Failf(t, err.Error(), "failed to create directory: %s", fpath) + } + } + + // Set up topo + keyspace, shard := "mykeyspace", "-80" + ts := memorytopo.NewServer("cell1") + defer ts.Close() + + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") + tablet.Keyspace = keyspace + tablet.Shard = shard + + require.NoError(t, ts.CreateTablet(ctx, tablet)) + + _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} + + now := time.Now() + si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} + + return nil + }) + require.NoError(t, err) + + be := &mysqlctl.BuiltinBackupEngine{} + + // Configure a tight deadline to force a timeout + oldDeadline := setBuiltinBackupMysqldDeadline(time.Second) + defer setBuiltinBackupMysqldDeadline(oldDeadline) + + bh := filebackupstorage.NewBackupHandle(nil, "", "", false) + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP SLAVE", "START SLAVE", in that order. + mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + // mysqld.ShutdownTime = time.Minute + + fakeStats := backupstats.NewFakeStats() + + ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Concurrency: 2, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + Stats: fakeStats, + }, bh) + + require.NoError(t, err) + assert.True(t, ok) + + var destinationCloseStats int + var destinationOpenStats int + var destinationWriteStats int + var sourceCloseStats int + var sourceOpenStats int + var sourceReadStats int + + for _, sr := range fakeStats.ScopeReturns { + sfs := sr.(*backupstats.FakeStats) + switch sfs.ScopeV[backupstats.ScopeOperation] { + case "Destination:Close": + destinationCloseStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Destination:Open": + destinationOpenStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Destination:Write": + destinationWriteStats++ + require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) + case "Source:Close": + sourceCloseStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Source:Open": + sourceOpenStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Source:Read": + sourceReadStats++ + require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) + } + } + + require.Equal(t, 4, destinationCloseStats) + require.Equal(t, 4, destinationOpenStats) + require.Equal(t, 4, destinationWriteStats) + require.Equal(t, 4, sourceCloseStats) + require.Equal(t, 4, sourceOpenStats) + require.Equal(t, 4, sourceReadStats) + + mysqld.ExpectedExecuteSuperQueryCurrent = 0 // resest the index of what queries we've run + mysqld.ShutdownTime = time.Minute // reminder that shutdownDeadline is 1s + + ok, err = be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + }, bh) + + assert.Error(t, err) + assert.False(t, ok) +} + +func TestExecuteBackupWithSafeUpgrade(t *testing.T) { + // Set up local backup directory + backupRoot := "testdata/builtinbackup_test" + filebackupstorage.FileBackupStorageRoot = backupRoot + require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) + dataDir := path.Join(backupRoot, "datadir") + // Add some files under data directory to force backup to actually backup files. + require.NoError(t, createBackupDir(dataDir, "test1")) + require.NoError(t, createBackupDir(dataDir, "test2")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) + defer os.RemoveAll(backupRoot) + + ctx := context.Background() + + needIt, err := needInnoDBRedoLogSubdir() + require.NoError(t, err) + if needIt { + fpath := path.Join("log", mysql.DynamicRedoLogSubdir) + if err := createBackupDir(backupRoot, fpath); err != nil { + require.Failf(t, err.Error(), "failed to create directory: %s", fpath) + } + } + + // Set up topo + keyspace, shard := "mykeyspace", "-80" + ts := memorytopo.NewServer("cell1") + defer ts.Close() + + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") + tablet.Keyspace = keyspace + tablet.Shard = shard + + require.NoError(t, ts.CreateTablet(ctx, tablet)) + + _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} + + now := time.Now() + si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} + + return nil + }) + require.NoError(t, err) + + be := &mysqlctl.BuiltinBackupEngine{} + + // Configure a tight deadline to force a timeout + oldDeadline := setBuiltinBackupMysqldDeadline(time.Second) + defer setBuiltinBackupMysqldDeadline(oldDeadline) + + bh := filebackupstorage.NewBackupHandle(nil, "", "", false) + + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP SLAVE", "START SLAVE", in that order. + // It also needs to be allowed to receive the query to disable the innodb_fast_shutdown flag. + mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + mysqld.FetchSuperQueryMap = map[string]*sqltypes.Result{ + "SET GLOBAL innodb_fast_shutdown=0": {}, + } + + ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Concurrency: 2, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + Stats: backupstats.NewFakeStats(), + UpgradeSafe: true, + }, bh) + + require.NoError(t, err) + assert.True(t, ok) +} + +// TestExecuteBackupWithCanceledContext tests the ability of the backup function to gracefully handle cases where errors +// occur due to various reasons, such as context time cancel. The process should not panic in these situations. +func TestExecuteBackupWithCanceledContext(t *testing.T) { + // Set up local backup directory + id := fmt.Sprintf("%d", time.Now().UnixNano()) + backupRoot := fmt.Sprintf("testdata/builtinbackup_test_%s", id) + filebackupstorage.FileBackupStorageRoot = backupRoot + require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) + dataDir := path.Join(backupRoot, "datadir") + // Add some files under data directory to force backup to execute semaphore acquire inside + // backupFiles() method (https://github.com/vitessio/vitess/blob/main/go/vt/mysqlctl/builtinbackupengine.go#L483). + require.NoError(t, createBackupDir(dataDir, "test1")) + require.NoError(t, createBackupDir(dataDir, "test2")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) + defer os.RemoveAll(backupRoot) + + // Cancel the context deliberately + ctx, cancel := context.WithCancel(context.Background()) + cancel() + needIt, err := needInnoDBRedoLogSubdir() + require.NoError(t, err) + if needIt { + fpath := path.Join("log", mysql.DynamicRedoLogSubdir) + if err := createBackupDir(backupRoot, fpath); err != nil { + require.Failf(t, err.Error(), "failed to create directory: %s", fpath) + } + } + + // Set up topo + keyspace, shard := "mykeyspace", "-80" + ts := memorytopo.NewServer("cell1") + defer ts.Close() + + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") + tablet.Keyspace = keyspace + tablet.Shard = shard + + require.NoError(t, ts.CreateTablet(ctx, tablet)) + + _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} + + now := time.Now() + si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} + + return nil + }) + require.NoError(t, err) + + be := &mysqlctl.BuiltinBackupEngine{} + bh := filebackupstorage.NewBackupHandle(nil, "", "", false) + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP SLAVE", "START SLAVE", in that order. + mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + + ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Stats: backupstats.NewFakeStats(), + Concurrency: 2, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + }, bh) + + require.Error(t, err) + // all four files will fail + require.ErrorContains(t, err, "context canceled;context canceled;context canceled;context canceled") + assert.False(t, ok) +} + +// TestExecuteRestoreWithCanceledContext tests the ability of the restore function to gracefully handle cases where errors +// occur due to various reasons, such as context timed-out. The process should not panic in these situations. +func TestExecuteRestoreWithTimedOutContext(t *testing.T) { + // Set up local backup directory + id := fmt.Sprintf("%d", time.Now().UnixNano()) + backupRoot := fmt.Sprintf("testdata/builtinbackup_test_%s", id) + filebackupstorage.FileBackupStorageRoot = backupRoot + require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) + dataDir := path.Join(backupRoot, "datadir") + // Add some files under data directory to force backup to execute semaphore acquire inside + // backupFiles() method (https://github.com/vitessio/vitess/blob/main/go/vt/mysqlctl/builtinbackupengine.go#L483). + require.NoError(t, createBackupDir(dataDir, "test1")) + require.NoError(t, createBackupDir(dataDir, "test2")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) + require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) + defer os.RemoveAll(backupRoot) + + ctx := context.Background() + needIt, err := needInnoDBRedoLogSubdir() + require.NoError(t, err) + if needIt { + fpath := path.Join("log", mysql.DynamicRedoLogSubdir) + if err := createBackupDir(backupRoot, fpath); err != nil { + require.Failf(t, err.Error(), "failed to create directory: %s", fpath) + } + } + + // Set up topo + keyspace, shard := "mykeyspace", "-80" + ts := memorytopo.NewServer("cell1") + defer ts.Close() + + require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) + require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) + + tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") + tablet.Keyspace = keyspace + tablet.Shard = shard + + require.NoError(t, ts.CreateTablet(ctx, tablet)) + + _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} + + now := time.Now() + si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} + + return nil + }) + require.NoError(t, err) + + be := &mysqlctl.BuiltinBackupEngine{} + bh := filebackupstorage.NewBackupHandle(nil, "", "", false) + // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: + // "STOP SLAVE", "START SLAVE", in that order. + mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + + ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + }, + Stats: backupstats.NewFakeStats(), + Concurrency: 2, + HookExtraEnv: map[string]string{}, + TopoServer: ts, + Keyspace: keyspace, + Shard: shard, + }, bh) + + require.NoError(t, err) + assert.True(t, ok) + + // Now try to restore the above backup. + bh = filebackupstorage.NewBackupHandle(nil, "", "", true) + mysqld = mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + + fakeStats := backupstats.NewFakeStats() + + restoreParams := mysqlctl.RestoreParams{ + Cnf: &mysqlctl.Mycnf{ + InnodbDataHomeDir: path.Join(backupRoot, "innodb"), + InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), + DataDir: path.Join(backupRoot, "datadir"), + BinLogPath: path.Join(backupRoot, "binlog"), + RelayLogPath: path.Join(backupRoot, "relaylog"), + RelayLogIndexPath: path.Join(backupRoot, "relaylogindex"), + RelayLogInfoPath: path.Join(backupRoot, "relayloginfo"), + }, + Logger: logutil.NewConsoleLogger(), + Mysqld: mysqld, + Concurrency: 2, + HookExtraEnv: map[string]string{}, + DeleteBeforeRestore: false, + DbName: "test", + Keyspace: "test", + Shard: "-", + StartTime: time.Now(), + RestoreToPos: mysql.Position{}, + RestoreToTimestamp: time.Time{}, + DryRun: false, + Stats: fakeStats, + } + + // Successful restore. + bm, err := be.ExecuteRestore(ctx, restoreParams, bh) + assert.NoError(t, err) + assert.NotNil(t, bm) + + var destinationCloseStats int + var destinationOpenStats int + var destinationWriteStats int + var sourceCloseStats int + var sourceOpenStats int + var sourceReadStats int + + for _, sr := range fakeStats.ScopeReturns { + sfs := sr.(*backupstats.FakeStats) + switch sfs.ScopeV[backupstats.ScopeOperation] { + case "Destination:Close": + destinationCloseStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Destination:Open": + destinationOpenStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Destination:Write": + destinationWriteStats++ + require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) + case "Source:Close": + sourceCloseStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Source:Open": + sourceOpenStats++ + require.Len(t, sfs.TimedIncrementCalls, 1) + case "Source:Read": + sourceReadStats++ + require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) + } + } + + require.Equal(t, 4, destinationCloseStats) + require.Equal(t, 4, destinationOpenStats) + require.Equal(t, 4, destinationWriteStats) + require.Equal(t, 4, sourceCloseStats) + require.Equal(t, 4, sourceOpenStats) + require.Equal(t, 4, sourceReadStats) + + // Restore using timed-out context + mysqld = mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) + mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} + restoreParams.Mysqld = mysqld + timedOutCtx, cancel := context.WithTimeout(ctx, 1*time.Second) + defer cancel() + // Let the context time out. + time.Sleep(1 * time.Second) + bm, err = be.ExecuteRestore(timedOutCtx, restoreParams, bh) + // ExecuteRestore should fail. + assert.Error(t, err) + assert.Nil(t, bm) + // error message can contain any combination of "context deadline exceeded" or "context canceled" + if !strings.Contains(err.Error(), "context canceled") && !strings.Contains(err.Error(), "context deadline exceeded") { + assert.Fail(t, "Test should fail with either `context canceled` or `context deadline exceeded`") + } +} + +// needInnoDBRedoLogSubdir indicates whether we need to create a redo log subdirectory. +// Starting with MySQL 8.0.30, the InnoDB redo logs are stored in a subdirectory of the +// (/. by default) called "#innodb_redo". See: +// +// https://dev.mysql.com/doc/refman/8.0/en/innodb-redo-log.html#innodb-modifying-redo-log-capacity +func needInnoDBRedoLogSubdir() (needIt bool, err error) { + mysqldVersionStr, err := mysqlctl.GetVersionString() + if err != nil { + return needIt, err + } + _, sv, err := mysqlctl.ParseVersionString(mysqldVersionStr) + if err != nil { + return needIt, err + } + versionStr := fmt.Sprintf("%d.%d.%d", sv.Major, sv.Minor, sv.Patch) + _, capableOf, _ := mysql.GetFlavor(versionStr, nil) + if capableOf == nil { + return needIt, fmt.Errorf("cannot determine database flavor details for version %s", versionStr) + } + return capableOf(mysql.DynamicRedoLogCapacityFlavorCapability) +} diff --git a/go/vt/mysqlctl/builtinbackupengine2_test.go b/go/vt/mysqlctl/builtinbackupengine2_test.go deleted file mode 100644 index 29252392c7f..00000000000 --- a/go/vt/mysqlctl/builtinbackupengine2_test.go +++ /dev/null @@ -1,69 +0,0 @@ -/* -Copyright 2023 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package mysqlctl_test is the blackbox tests for package mysqlctl. -package mysqlctl - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestGetIncrementalFromPosGTIDSet(t *testing.T) { - tcases := []struct { - incrementalFromPos string - gtidSet string - expctError bool - }{ - { - "MySQL56/16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", - "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", - false, - }, - { - "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", - "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", - false, - }, - { - "MySQL56/16b1039f-22b6-11ed-b765-0a43f95f28a3", - "", - true, - }, - { - "MySQL56/invalid", - "", - true, - }, - { - "16b1039f-22b6-11ed-b765-0a43f95f28a3", - "", - true, - }, - } - for _, tcase := range tcases { - t.Run(tcase.incrementalFromPos, func(t *testing.T) { - gtidSet, err := getIncrementalFromPosGTIDSet(tcase.incrementalFromPos) - if tcase.expctError { - assert.Error(t, err) - } else { - assert.NoError(t, err) - assert.Equal(t, tcase.gtidSet, gtidSet.String()) - } - }) - } -} diff --git a/go/vt/mysqlctl/builtinbackupengine_test.go b/go/vt/mysqlctl/builtinbackupengine_test.go index 906e250ecaf..29252392c7f 100644 --- a/go/vt/mysqlctl/builtinbackupengine_test.go +++ b/go/vt/mysqlctl/builtinbackupengine_test.go @@ -1,5 +1,5 @@ /* -Copyright 2022 The Vitess Authors. +Copyright 2023 The Vitess Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -15,563 +15,55 @@ limitations under the License. */ // Package mysqlctl_test is the blackbox tests for package mysqlctl. -package mysqlctl_test +package mysqlctl import ( - "context" - "fmt" - "os" - "path" - "strings" "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "vitess.io/vitess/go/sqltypes" - - "vitess.io/vitess/go/mysql" - "vitess.io/vitess/go/mysql/fakesqldb" - "vitess.io/vitess/go/vt/logutil" - "vitess.io/vitess/go/vt/mysqlctl" - "vitess.io/vitess/go/vt/mysqlctl/backupstats" - "vitess.io/vitess/go/vt/mysqlctl/filebackupstorage" - "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/proto/vttime" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/memorytopo" ) -func setBuiltinBackupMysqldDeadline(t time.Duration) time.Duration { - old := mysqlctl.BuiltinBackupMysqldTimeout - mysqlctl.BuiltinBackupMysqldTimeout = t - - return old -} - -func createBackupDir(root string, dirs ...string) error { - for _, dir := range dirs { - if err := os.MkdirAll(path.Join(root, dir), 0755); err != nil { - return err - } - } - - return nil -} - -func createBackupFiles(root string, fileCount int, ext string) error { - for i := 0; i < fileCount; i++ { - f, err := os.Create(path.Join(root, fmt.Sprintf("%d.%s", i, ext))) - if err != nil { - return err - } - if _, err := f.Write([]byte("hello, world!")); err != nil { - return err - } - defer f.Close() - } - - return nil -} - -func TestExecuteBackup(t *testing.T) { - // Set up local backup directory - backupRoot := "testdata/builtinbackup_test" - filebackupstorage.FileBackupStorageRoot = backupRoot - require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) - dataDir := path.Join(backupRoot, "datadir") - // Add some files under data directory to force backup to actually backup files. - require.NoError(t, createBackupDir(dataDir, "test1")) - require.NoError(t, createBackupDir(dataDir, "test2")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) - defer os.RemoveAll(backupRoot) - - ctx := context.Background() - - needIt, err := needInnoDBRedoLogSubdir() - require.NoError(t, err) - if needIt { - fpath := path.Join("log", mysql.DynamicRedoLogSubdir) - if err := createBackupDir(backupRoot, fpath); err != nil { - require.Failf(t, err.Error(), "failed to create directory: %s", fpath) - } - } - - // Set up topo - keyspace, shard := "mykeyspace", "-80" - ts := memorytopo.NewServer("cell1") - defer ts.Close() - - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") - tablet.Keyspace = keyspace - tablet.Shard = shard - - require.NoError(t, ts.CreateTablet(ctx, tablet)) - - _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { - si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} - - now := time.Now() - si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} - - return nil - }) - require.NoError(t, err) - - be := &mysqlctl.BuiltinBackupEngine{} - - // Configure a tight deadline to force a timeout - oldDeadline := setBuiltinBackupMysqldDeadline(time.Second) - defer setBuiltinBackupMysqldDeadline(oldDeadline) - - bh := filebackupstorage.NewBackupHandle(nil, "", "", false) - - // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: - // "STOP SLAVE", "START SLAVE", in that order. - mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - // mysqld.ShutdownTime = time.Minute - - fakeStats := backupstats.NewFakeStats() - - ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), +func TestGetIncrementalFromPosGTIDSet(t *testing.T) { + tcases := []struct { + incrementalFromPos string + gtidSet string + expctError bool + }{ + { + "MySQL56/16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", + "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", + false, }, - Concurrency: 2, - HookExtraEnv: map[string]string{}, - TopoServer: ts, - Keyspace: keyspace, - Shard: shard, - Stats: fakeStats, - }, bh) - - require.NoError(t, err) - assert.True(t, ok) - - var destinationCloseStats int - var destinationOpenStats int - var destinationWriteStats int - var sourceCloseStats int - var sourceOpenStats int - var sourceReadStats int - - for _, sr := range fakeStats.ScopeReturns { - sfs := sr.(*backupstats.FakeStats) - switch sfs.ScopeV[backupstats.ScopeOperation] { - case "Destination:Close": - destinationCloseStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Destination:Open": - destinationOpenStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Destination:Write": - destinationWriteStats++ - require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) - case "Source:Close": - sourceCloseStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Source:Open": - sourceOpenStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Source:Read": - sourceReadStats++ - require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) - } - } - - require.Equal(t, 4, destinationCloseStats) - require.Equal(t, 4, destinationOpenStats) - require.Equal(t, 4, destinationWriteStats) - require.Equal(t, 4, sourceCloseStats) - require.Equal(t, 4, sourceOpenStats) - require.Equal(t, 4, sourceReadStats) - - mysqld.ExpectedExecuteSuperQueryCurrent = 0 // resest the index of what queries we've run - mysqld.ShutdownTime = time.Minute // reminder that shutdownDeadline is 1s - - ok, err = be.ExecuteBackup(ctx, mysqlctl.BackupParams{ - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), + { + "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", + "16b1039f-22b6-11ed-b765-0a43f95f28a3:1-615", + false, }, - HookExtraEnv: map[string]string{}, - TopoServer: ts, - Keyspace: keyspace, - Shard: shard, - }, bh) - - assert.Error(t, err) - assert.False(t, ok) -} - -func TestExecuteBackupWithSafeUpgrade(t *testing.T) { - // Set up local backup directory - backupRoot := "testdata/builtinbackup_test" - filebackupstorage.FileBackupStorageRoot = backupRoot - require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) - dataDir := path.Join(backupRoot, "datadir") - // Add some files under data directory to force backup to actually backup files. - require.NoError(t, createBackupDir(dataDir, "test1")) - require.NoError(t, createBackupDir(dataDir, "test2")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) - defer os.RemoveAll(backupRoot) - - ctx := context.Background() - - needIt, err := needInnoDBRedoLogSubdir() - require.NoError(t, err) - if needIt { - fpath := path.Join("log", mysql.DynamicRedoLogSubdir) - if err := createBackupDir(backupRoot, fpath); err != nil { - require.Failf(t, err.Error(), "failed to create directory: %s", fpath) - } - } - - // Set up topo - keyspace, shard := "mykeyspace", "-80" - ts := memorytopo.NewServer("cell1") - defer ts.Close() - - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") - tablet.Keyspace = keyspace - tablet.Shard = shard - - require.NoError(t, ts.CreateTablet(ctx, tablet)) - - _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { - si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} - - now := time.Now() - si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} - - return nil - }) - require.NoError(t, err) - - be := &mysqlctl.BuiltinBackupEngine{} - - // Configure a tight deadline to force a timeout - oldDeadline := setBuiltinBackupMysqldDeadline(time.Second) - defer setBuiltinBackupMysqldDeadline(oldDeadline) - - bh := filebackupstorage.NewBackupHandle(nil, "", "", false) - - // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: - // "STOP SLAVE", "START SLAVE", in that order. - // It also needs to be allowed to receive the query to disable the innodb_fast_shutdown flag. - mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - mysqld.FetchSuperQueryMap = map[string]*sqltypes.Result{ - "SET GLOBAL innodb_fast_shutdown=0": {}, - } - - ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), + { + "MySQL56/16b1039f-22b6-11ed-b765-0a43f95f28a3", + "", + true, }, - Concurrency: 2, - TopoServer: ts, - Keyspace: keyspace, - Shard: shard, - Stats: backupstats.NewFakeStats(), - UpgradeSafe: true, - }, bh) - - require.NoError(t, err) - assert.True(t, ok) -} - -// TestExecuteBackupWithCanceledContext tests the ability of the backup function to gracefully handle cases where errors -// occur due to various reasons, such as context time cancel. The process should not panic in these situations. -func TestExecuteBackupWithCanceledContext(t *testing.T) { - // Set up local backup directory - id := fmt.Sprintf("%d", time.Now().UnixNano()) - backupRoot := fmt.Sprintf("testdata/builtinbackup_test_%s", id) - filebackupstorage.FileBackupStorageRoot = backupRoot - require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) - dataDir := path.Join(backupRoot, "datadir") - // Add some files under data directory to force backup to execute semaphore acquire inside - // backupFiles() method (https://github.com/vitessio/vitess/blob/main/go/vt/mysqlctl/builtinbackupengine.go#L483). - require.NoError(t, createBackupDir(dataDir, "test1")) - require.NoError(t, createBackupDir(dataDir, "test2")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) - defer os.RemoveAll(backupRoot) - - // Cancel the context deliberately - ctx, cancel := context.WithCancel(context.Background()) - cancel() - needIt, err := needInnoDBRedoLogSubdir() - require.NoError(t, err) - if needIt { - fpath := path.Join("log", mysql.DynamicRedoLogSubdir) - if err := createBackupDir(backupRoot, fpath); err != nil { - require.Failf(t, err.Error(), "failed to create directory: %s", fpath) - } - } - - // Set up topo - keyspace, shard := "mykeyspace", "-80" - ts := memorytopo.NewServer("cell1") - defer ts.Close() - - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") - tablet.Keyspace = keyspace - tablet.Shard = shard - - require.NoError(t, ts.CreateTablet(ctx, tablet)) - - _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { - si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} - - now := time.Now() - si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} - - return nil - }) - require.NoError(t, err) - - be := &mysqlctl.BuiltinBackupEngine{} - bh := filebackupstorage.NewBackupHandle(nil, "", "", false) - // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: - // "STOP SLAVE", "START SLAVE", in that order. - mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - - ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), + { + "MySQL56/invalid", + "", + true, }, - Stats: backupstats.NewFakeStats(), - Concurrency: 2, - HookExtraEnv: map[string]string{}, - TopoServer: ts, - Keyspace: keyspace, - Shard: shard, - }, bh) - - require.Error(t, err) - // all four files will fail - require.ErrorContains(t, err, "context canceled;context canceled;context canceled;context canceled") - assert.False(t, ok) -} - -// TestExecuteRestoreWithCanceledContext tests the ability of the restore function to gracefully handle cases where errors -// occur due to various reasons, such as context timed-out. The process should not panic in these situations. -func TestExecuteRestoreWithTimedOutContext(t *testing.T) { - // Set up local backup directory - id := fmt.Sprintf("%d", time.Now().UnixNano()) - backupRoot := fmt.Sprintf("testdata/builtinbackup_test_%s", id) - filebackupstorage.FileBackupStorageRoot = backupRoot - require.NoError(t, createBackupDir(backupRoot, "innodb", "log", "datadir")) - dataDir := path.Join(backupRoot, "datadir") - // Add some files under data directory to force backup to execute semaphore acquire inside - // backupFiles() method (https://github.com/vitessio/vitess/blob/main/go/vt/mysqlctl/builtinbackupengine.go#L483). - require.NoError(t, createBackupDir(dataDir, "test1")) - require.NoError(t, createBackupDir(dataDir, "test2")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test1"), 2, "ibd")) - require.NoError(t, createBackupFiles(path.Join(dataDir, "test2"), 2, "ibd")) - defer os.RemoveAll(backupRoot) - - ctx := context.Background() - needIt, err := needInnoDBRedoLogSubdir() - require.NoError(t, err) - if needIt { - fpath := path.Join("log", mysql.DynamicRedoLogSubdir) - if err := createBackupDir(backupRoot, fpath); err != nil { - require.Failf(t, err.Error(), "failed to create directory: %s", fpath) - } - } - - // Set up topo - keyspace, shard := "mykeyspace", "-80" - ts := memorytopo.NewServer("cell1") - defer ts.Close() - - require.NoError(t, ts.CreateKeyspace(ctx, keyspace, &topodata.Keyspace{})) - require.NoError(t, ts.CreateShard(ctx, keyspace, shard)) - - tablet := topo.NewTablet(100, "cell1", "mykeyspace-00-80-0100") - tablet.Keyspace = keyspace - tablet.Shard = shard - - require.NoError(t, ts.CreateTablet(ctx, tablet)) - - _, err = ts.UpdateShardFields(ctx, keyspace, shard, func(si *topo.ShardInfo) error { - si.PrimaryAlias = &topodata.TabletAlias{Uid: 100, Cell: "cell1"} - - now := time.Now() - si.PrimaryTermStartTime = &vttime.Time{Seconds: int64(now.Second()), Nanoseconds: int32(now.Nanosecond())} - - return nil - }) - require.NoError(t, err) - - be := &mysqlctl.BuiltinBackupEngine{} - bh := filebackupstorage.NewBackupHandle(nil, "", "", false) - // Spin up a fake daemon to be used in backups. It needs to be allowed to receive: - // "STOP SLAVE", "START SLAVE", in that order. - mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - - ok, err := be.ExecuteBackup(ctx, mysqlctl.BackupParams{ - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), - }, - Stats: backupstats.NewFakeStats(), - Concurrency: 2, - HookExtraEnv: map[string]string{}, - TopoServer: ts, - Keyspace: keyspace, - Shard: shard, - }, bh) - - require.NoError(t, err) - assert.True(t, ok) - - // Now try to restore the above backup. - bh = filebackupstorage.NewBackupHandle(nil, "", "", true) - mysqld = mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - - fakeStats := backupstats.NewFakeStats() - - restoreParams := mysqlctl.RestoreParams{ - Cnf: &mysqlctl.Mycnf{ - InnodbDataHomeDir: path.Join(backupRoot, "innodb"), - InnodbLogGroupHomeDir: path.Join(backupRoot, "log"), - DataDir: path.Join(backupRoot, "datadir"), - BinLogPath: path.Join(backupRoot, "binlog"), - RelayLogPath: path.Join(backupRoot, "relaylog"), - RelayLogIndexPath: path.Join(backupRoot, "relaylogindex"), - RelayLogInfoPath: path.Join(backupRoot, "relayloginfo"), + { + "16b1039f-22b6-11ed-b765-0a43f95f28a3", + "", + true, }, - Logger: logutil.NewConsoleLogger(), - Mysqld: mysqld, - Concurrency: 2, - HookExtraEnv: map[string]string{}, - DeleteBeforeRestore: false, - DbName: "test", - Keyspace: "test", - Shard: "-", - StartTime: time.Now(), - RestoreToPos: mysql.Position{}, - RestoreToTimestamp: time.Time{}, - DryRun: false, - Stats: fakeStats, - } - - // Successful restore. - bm, err := be.ExecuteRestore(ctx, restoreParams, bh) - assert.NoError(t, err) - assert.NotNil(t, bm) - - var destinationCloseStats int - var destinationOpenStats int - var destinationWriteStats int - var sourceCloseStats int - var sourceOpenStats int - var sourceReadStats int - - for _, sr := range fakeStats.ScopeReturns { - sfs := sr.(*backupstats.FakeStats) - switch sfs.ScopeV[backupstats.ScopeOperation] { - case "Destination:Close": - destinationCloseStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Destination:Open": - destinationOpenStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Destination:Write": - destinationWriteStats++ - require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) - case "Source:Close": - sourceCloseStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Source:Open": - sourceOpenStats++ - require.Len(t, sfs.TimedIncrementCalls, 1) - case "Source:Read": - sourceReadStats++ - require.GreaterOrEqual(t, len(sfs.TimedIncrementBytesCalls), 1) - } - } - - require.Equal(t, 4, destinationCloseStats) - require.Equal(t, 4, destinationOpenStats) - require.Equal(t, 4, destinationWriteStats) - require.Equal(t, 4, sourceCloseStats) - require.Equal(t, 4, sourceOpenStats) - require.Equal(t, 4, sourceReadStats) - - // Restore using timed-out context - mysqld = mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t)) - mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"} - restoreParams.Mysqld = mysqld - timedOutCtx, cancel := context.WithTimeout(ctx, 1*time.Second) - defer cancel() - // Let the context time out. - time.Sleep(1 * time.Second) - bm, err = be.ExecuteRestore(timedOutCtx, restoreParams, bh) - // ExecuteRestore should fail. - assert.Error(t, err) - assert.Nil(t, bm) - // error message can contain any combination of "context deadline exceeded" or "context canceled" - if !strings.Contains(err.Error(), "context canceled") && !strings.Contains(err.Error(), "context deadline exceeded") { - assert.Fail(t, "Test should fail with either `context canceled` or `context deadline exceeded`") - } -} - -// needInnoDBRedoLogSubdir indicates whether we need to create a redo log subdirectory. -// Starting with MySQL 8.0.30, the InnoDB redo logs are stored in a subdirectory of the -// (/. by default) called "#innodb_redo". See: -// -// https://dev.mysql.com/doc/refman/8.0/en/innodb-redo-log.html#innodb-modifying-redo-log-capacity -func needInnoDBRedoLogSubdir() (needIt bool, err error) { - mysqldVersionStr, err := mysqlctl.GetVersionString() - if err != nil { - return needIt, err - } - _, sv, err := mysqlctl.ParseVersionString(mysqldVersionStr) - if err != nil { - return needIt, err } - versionStr := fmt.Sprintf("%d.%d.%d", sv.Major, sv.Minor, sv.Patch) - _, capableOf, _ := mysql.GetFlavor(versionStr, nil) - if capableOf == nil { - return needIt, fmt.Errorf("cannot determine database flavor details for version %s", versionStr) + for _, tcase := range tcases { + t.Run(tcase.incrementalFromPos, func(t *testing.T) { + gtidSet, err := getIncrementalFromPosGTIDSet(tcase.incrementalFromPos) + if tcase.expctError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tcase.gtidSet, gtidSet.String()) + } + }) } - return capableOf(mysql.DynamicRedoLogCapacityFlavorCapability) } From 1ed3788ae8ed7eeaed5ce8245d7e7e41ad0a2663 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 2 Aug 2023 08:27:16 +0300 Subject: [PATCH 14/16] use TimeoutCloser for xtrabackupengine compressors/decompressors Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/xtrabackupengine.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/go/vt/mysqlctl/xtrabackupengine.go b/go/vt/mysqlctl/xtrabackupengine.go index 7423d35a6eb..0a1f32c3ecd 100644 --- a/go/vt/mysqlctl/xtrabackupengine.go +++ b/go/vt/mysqlctl/xtrabackupengine.go @@ -32,6 +32,7 @@ import ( "github.com/spf13/pflag" + "vitess.io/vitess/go/ioutil" "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/mysqlctl/backupstorage" @@ -360,7 +361,7 @@ func (be *XtrabackupEngine) backupFiles( destWriters := []io.Writer{} destBuffers := []*bufio.Writer{} - destCompressors := []io.WriteCloser{} + destCompressors := []io.Closer{} for _, file := range destFiles { buffer := bufio.NewWriterSize(file, writerBufferSize) destBuffers = append(destBuffers, buffer) @@ -380,7 +381,7 @@ func (be *XtrabackupEngine) backupFiles( } writer = compressor - destCompressors = append(destCompressors, compressor) + destCompressors = append(destCompressors, ioutil.NewTimeoutCloser(ctx, compressor, closeTimeout)) } destWriters = append(destWriters, writer) @@ -628,7 +629,7 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log }() srcReaders := []io.Reader{} - srcDecompressors := []io.ReadCloser{} + srcDecompressors := []io.Closer{} for _, file := range srcFiles { reader := io.Reader(file) @@ -661,7 +662,7 @@ func (be *XtrabackupEngine) extractFiles(ctx context.Context, logger logutil.Log if err != nil { return vterrors.Wrap(err, "can't create decompressor") } - srcDecompressors = append(srcDecompressors, decompressor) + srcDecompressors = append(srcDecompressors, ioutil.NewTimeoutCloser(ctx, decompressor, closeTimeout)) reader = decompressor } From 88717293f9d0170b7ae3557c36cdb32cfccba977 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 2 Aug 2023 08:32:22 +0300 Subject: [PATCH 15/16] reduced closeTimeout to 1 minute Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/backup.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index ab5a42c2323..1f619d57344 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -59,7 +59,9 @@ const ( BackupTimestampFormat = "2006-01-02.150405" // closeTimeout is the timeout for closing backup files after writing. - closeTimeout = 10 * time.Minute + // The value is a bit arbitrary. How long does it make sense to wait for a Close()? With a cloud-based implementation, + // network might be an issue. _Seconds_ are probably too short. The whereabouts of a minute us a reasonable value. + closeTimeout = 1 * time.Minute ) const ( From 6009acf0c5e3f1c9f10c67e4051646eb0aa5a604 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Thu, 3 Aug 2023 10:40:28 +0300 Subject: [PATCH 16/16] fix ordering of flow Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/mysqlctl/builtinbackupengine.go | 82 +++++++++++++++------------ 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go index e4b9121e23a..77c8ad386e3 100644 --- a/go/vt/mysqlctl/builtinbackupengine.go +++ b/go/vt/mysqlctl/builtinbackupengine.go @@ -810,51 +810,61 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara bw := newBackupWriter(fe.Name, builtinBackupStorageWriteBufferSize, fi.Size(), timedDest) - var reader io.Reader = br - var writer io.Writer = bw + // We create the following inner function because: + // - we must `defer` the compressor's Close() function + // - but it must take place before we close the pipe reader&writer + createAndCopy := func() (createAndCopyErr error) { + var reader io.Reader = br + var writer io.Writer = bw + + // Create the gzip compression pipe, if necessary. + if backupStorageCompress { + var compressor io.WriteCloser + if ExternalCompressorCmd != "" { + compressor, err = newExternalCompressor(ctx, ExternalCompressorCmd, writer, params.Logger) + } else { + compressor, err = newBuiltinCompressor(CompressionEngineName, writer, params.Logger) + } + if err != nil { + return vterrors.Wrap(err, "can't create compressor") + } - // Create the gzip compression pipe, if necessary. - if backupStorageCompress { - var compressor io.WriteCloser - if ExternalCompressorCmd != "" { - compressor, err = newExternalCompressor(ctx, ExternalCompressorCmd, writer, params.Logger) - } else { - compressor, err = newBuiltinCompressor(CompressionEngineName, writer, params.Logger) - } - if err != nil { - return vterrors.Wrap(err, "can't create compressor") - } - closer := ioutil.NewTimeoutCloser(ctx, compressor, closeTimeout) + compressStats := params.Stats.Scope(stats.Operation("Compressor:Write")) + writer = ioutil.NewMeteredWriter(compressor, compressStats.TimedIncrementBytes) - compressStats := params.Stats.Scope(stats.Operation("Compressor:Write")) - writer = ioutil.NewMeteredWriter(compressor, compressStats.TimedIncrementBytes) + closer := ioutil.NewTimeoutCloser(ctx, compressor, closeTimeout) + defer func() { + // Close gzip to flush it, after that all data is sent to writer. + closeCompressorAt := time.Now() + params.Logger.Infof("closing compressor") + if cerr := closer.Close(); err != nil { + cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", name) + params.Logger.Error(cerr) + createAndCopyErr = errors.Join(createAndCopyErr, cerr) + } + params.Stats.Scope(stats.Operation("Compressor:Close")).TimedIncrement(time.Since(closeCompressorAt)) + }() + } - defer func() { - // Close gzip to flush it, after that all data is sent to writer. - closeCompressorAt := time.Now() - params.Logger.Infof("closing compressor") - if cerr := closer.Close(); err != nil { - cerr = vterrors.Wrapf(cerr, "failed to close compressor %v", name) - params.Logger.Error(cerr) - finalErr = errors.Join(finalErr, cerr) - } - params.Stats.Scope(stats.Operation("Compressor:Close")).TimedIncrement(time.Since(closeCompressorAt)) - }() - } + if builtinBackupFileReadBufferSize > 0 { + reader = bufio.NewReaderSize(br, int(builtinBackupFileReadBufferSize)) + } - if builtinBackupFileReadBufferSize > 0 { - reader = bufio.NewReaderSize(br, int(builtinBackupFileReadBufferSize)) + // Copy from the source file to writer (optional gzip, + // optional pipe, tee, output file and hasher). + _, err = io.Copy(writer, reader) + if err != nil { + return vterrors.Wrap(err, "cannot copy data") + } + return nil } - // Copy from the source file to writer (optional gzip, - // optional pipe, tee, output file and hasher). - _, err = io.Copy(writer, reader) - if err != nil { - return vterrors.Wrap(err, "cannot copy data") + if err := createAndCopy(); err != nil { + return err } // Close the backupPipe to finish writing on destination. - if err := bw.Close(); err != nil { + if err = bw.Close(); err != nil { return vterrors.Wrapf(err, "cannot flush destination: %v", name) }