diff --git a/doc/releasenotes/17_0_0_summary.md b/doc/releasenotes/17_0_0_summary.md
index 6a0554ba9f8..d96550fa3d8 100644
--- a/doc/releasenotes/17_0_0_summary.md
+++ b/doc/releasenotes/17_0_0_summary.md
@@ -3,11 +3,26 @@
### Table of Contents
- **[Major Changes](#major-changes)**
+ - **[Breaking Changes](#breaking-changes)**
+ - [Deprecated Stats](#deprecated-stats)
- **[New command line flags and behavior](#new-flag)**
- [Builtin backup: read buffering flags](#builtin-backup-read-buffering-flags)
+ - **[New stats](#new-stats)**
+ - [Detailed backup and restore stats](#detailed-backup-and-restore-stats)
## Major Changes
+### Breaking Changes
+
+#### Deprecated Stats
+
+These stats are deprecated in v17.
+
+| Deprecated stat | Supported alternatives |
+|-|-|
+| `backup_duration_seconds` | `BackupDurationNanoseconds` |
+| `restore_duration_seconds` | `RestoreDurationNanoseconds` |
+
### New command line flags and behavior
#### Backup --builtinbackup-file-read-buffer-size and --builtinbackup-file-write-buffer-size
@@ -25,3 +40,123 @@ These flags are applicable to the following programs:
- `vtctld`
- `vttablet`
- `vttestserver`
+
+### New stats
+
+#### Detailed backup and restore stats
+
+##### Backup metrics
+
+Metrics related to backup operations are available in both Vtbackup and VTTablet.
+
+**BackupBytes, BackupCount, BackupDurationNanoseconds**
+
+Depending on the Backup Engine and Backup Storage in-use, a backup may be a complex pipeline of operations, including but not limited to:
+
+ * Reading files from disk.
+ * Compressing files.
+ * Uploading compress files to cloud object storage.
+
+These operations are counted and timed, and the number of bytes consumed or produced by each stage of the pipeline are counted as well.
+
+##### Restore metrics
+
+Metrics related to restore operations are available in both Vtbackup and VTTablet.
+
+**RestoreBytes, RestoreCount, RestoreDurationNanoseconds**
+
+Depending on the Backup Engine and Backup Storage in-use, a restore may be a complex pipeline of operations, including but not limited to:
+
+ * Downloading compressed files from cloud object storage.
+ * Decompressing files.
+ * Writing decompressed files to disk.
+
+These operations are counted and timed, and the number of bytes consumed or produced by each stage of the pipeline are counted as well.
+
+##### Vtbackup metrics
+
+Vtbackup exports some metrics which are not available elsewhere.
+
+**DurationByPhaseSeconds**
+
+Vtbackup fetches the last backup, restores it to an empty mysql installation, replicates recent changes into that installation, and then takes a backup of that installation.
+
+_DurationByPhaseSeconds_ exports timings for these individual phases.
+
+##### Example
+
+**A snippet of vtbackup metrics after running it against the local example after creating the initial cluster**
+
+(Processed with `jq` for readability.)
+
+```
+{
+ "BackupBytes": {
+ "BackupEngine.Builtin.Source:Read": 4777,
+ "BackupEngine.Builtin.Compressor:Write": 4616,
+ "BackupEngine.Builtin.Destination:Write": 162,
+ "BackupStorage.File.File:Write": 163
+ },
+ "BackupCount": {
+ "-.-.Backup": 1,
+ "BackupEngine.Builtin.Source:Open": 161,
+ "BackupEngine.Builtin.Source:Close": 322,
+ "BackupEngine.Builtin.Compressor:Close": 161,
+ "BackupEngine.Builtin.Destination:Open": 161,
+ "BackupEngine.Builtin.Destination:Close": 322
+ },
+ "BackupDurationNanoseconds": {
+ "-.-.Backup": 4188508542,
+ "BackupEngine.Builtin.Source:Open": 10649832,
+ "BackupEngine.Builtin.Source:Read": 55901067,
+ "BackupEngine.Builtin.Source:Close": 960826,
+ "BackupEngine.Builtin.Compressor:Write": 278358826,
+ "BackupEngine.Builtin.Compressor:Close": 79358372,
+ "BackupEngine.Builtin.Destination:Open": 16456627,
+ "BackupEngine.Builtin.Destination:Write": 11021043,
+ "BackupEngine.Builtin.Destination:Close": 17144630,
+ "BackupStorage.File.File:Write": 10743169
+ },
+ "DurationByPhaseSeconds": {
+ "InitMySQLd": 2,
+ "RestoreLastBackup": 6,
+ "CatchUpReplication": 1,
+ "TakeNewBackup": 4
+ },
+ "RestoreBytes": {
+ "BackupEngine.Builtin.Source:Read": 1095,
+ "BackupEngine.Builtin.Decompressor:Read": 950,
+ "BackupEngine.Builtin.Destination:Write": 209,
+ "BackupStorage.File.File:Read": 1113
+ },
+ "RestoreCount": {
+ "-.-.Restore": 1,
+ "BackupEngine.Builtin.Source:Open": 161,
+ "BackupEngine.Builtin.Source:Close": 322,
+ "BackupEngine.Builtin.Decompressor:Close": 161,
+ "BackupEngine.Builtin.Destination:Open": 161,
+ "BackupEngine.Builtin.Destination:Close": 322
+ },
+ "RestoreDurationNanoseconds": {
+ "-.-.Restore": 6204765541,
+ "BackupEngine.Builtin.Source:Open": 10542539,
+ "BackupEngine.Builtin.Source:Read": 104658370,
+ "BackupEngine.Builtin.Source:Close": 773038,
+ "BackupEngine.Builtin.Decompressor:Read": 165692120,
+ "BackupEngine.Builtin.Decompressor:Close": 51040,
+ "BackupEngine.Builtin.Destination:Open": 22715122,
+ "BackupEngine.Builtin.Destination:Write": 41679581,
+ "BackupEngine.Builtin.Destination:Close": 26954624,
+ "BackupStorage.File.File:Read": 102416075
+ },
+ "backup_duration_seconds": 4,
+ "restore_duration_seconds": 6
+}
+```
+
+Some notes to help understand these metrics:
+
+ * `BackupBytes["BackupStorage.File.File:Write"]` measures how many bytes were read from disk by the `file` Backup Storage implementation during the backup phase.
+ * `DurationByPhaseSeconds["CatchUpReplication"]` measures how long it took to catch-up replication after the restore phase.
+ * `DurationByPhaseSeconds["RestoreLastBackup"]` measures to the duration of the restore phase.
+ * `RestoreDurationNanoseconds["-.-.Restore"]` also measures to the duration of the restore phase.
diff --git a/go/cmd/vtbackup/vtbackup.go b/go/cmd/vtbackup/vtbackup.go
index 4e8ed55ff53..5ea15f728fd 100644
--- a/go/cmd/vtbackup/vtbackup.go
+++ b/go/cmd/vtbackup/vtbackup.go
@@ -74,10 +74,12 @@ import (
"vitess.io/vitess/go/cmd"
"vitess.io/vitess/go/exit"
"vitess.io/vitess/go/mysql"
+ "vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
+ "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/servenv"
@@ -119,6 +121,11 @@ var (
detachedMode bool
keepAliveTimeout = 0 * time.Second
disableRedoLog = false
+ durationByPhase = stats.NewGaugesWithSingleLabel(
+ "DurationByPhaseSeconds",
+ "How long it took vtbackup to perform each phase (in seconds).",
+ "phase",
+ )
)
func registerFlags(fs *pflag.FlagSet) {
@@ -258,9 +265,11 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
}
initCtx, initCancel := context.WithTimeout(ctx, mysqlTimeout)
defer initCancel()
+ initMysqldAt := time.Now()
if err := mysqld.Init(initCtx, mycnf, initDBSQLFile); err != nil {
return fmt.Errorf("failed to initialize mysql data dir and start mysqld: %v", err)
}
+ durationByPhase.Set("InitMySQLd", int64(time.Since(initMysqldAt).Seconds()))
// Shut down mysqld when we're done.
defer func() {
// Be careful not to use the original context, because we don't want to
@@ -291,6 +300,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
Keyspace: initKeyspace,
Shard: initShard,
TabletAlias: topoproto.TabletAliasString(tabletAlias),
+ Stats: backupstats.BackupStats(),
}
// In initial_backup mode, just take a backup of this empty database.
if initialBackup {
@@ -312,12 +322,14 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
if err := mysqlctl.Backup(ctx, backupParams); err != nil {
return fmt.Errorf("backup failed: %v", err)
}
+ durationByPhase.Set("InitialBackup", int64(time.Since(backupParams.BackupTime).Seconds()))
log.Info("Initial backup successful.")
return nil
}
backupDir := mysqlctl.GetBackupDir(initKeyspace, initShard)
log.Infof("Restoring latest backup from directory %v", backupDir)
+ restoreAt := time.Now()
params := mysqlctl.RestoreParams{
Cnf: mycnf,
Mysqld: mysqld,
@@ -328,6 +340,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
DbName: dbName,
Keyspace: initKeyspace,
Shard: initShard,
+ Stats: backupstats.RestoreStats(),
}
backupManifest, err := mysqlctl.Restore(ctx, params)
var restorePos mysql.Position
@@ -345,6 +358,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
default:
return fmt.Errorf("can't restore from backup: %v", err)
}
+ durationByPhase.Set("RestoreLastBackup", int64(time.Since(restoreAt).Seconds()))
// Disable redo logging (if we can) before we start replication.
disabledRedoLog := false
@@ -413,6 +427,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
// We're caught up on replication to at least the point the primary
// was at when this vtbackup run started.
log.Infof("Replication caught up to %v after %v", status.Position, time.Since(waitStartTime))
+ durationByPhase.Set("CatchUpReplication", int64(time.Since(waitStartTime).Seconds()))
break
}
if !status.Healthy() {
@@ -446,6 +461,7 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
}
if restartBeforeBackup {
+ restartAt := time.Now()
log.Info("Proceeding with clean MySQL shutdown and startup to flush all buffers.")
// Prep for full/clean shutdown (not typically the default)
if err := mysqld.ExecuteSuperQuery(ctx, "SET GLOBAL innodb_fast_shutdown=0"); err != nil {
@@ -459,12 +475,15 @@ func takeBackup(ctx context.Context, topoServer *topo.Server, backupStorage back
if err := mysqld.Start(ctx, mycnf); err != nil {
return fmt.Errorf("Could not start MySQL after full shutdown: %v", err)
}
+ durationByPhase.Set("RestartBeforeBackup", int64(time.Since(restartAt).Seconds()))
}
// Now we can take a new backup.
+ backupAt := time.Now()
if err := mysqlctl.Backup(ctx, backupParams); err != nil {
return fmt.Errorf("error taking backup: %v", err)
}
+ durationByPhase.Set("TakeNewBackup", int64(time.Since(backupAt).Seconds()))
// Return a non-zero exit code if we didn't meet the replication position
// goal, even though we took a backup that pushes the high-water mark up.
diff --git a/go/ioutil/doc.go b/go/ioutil/doc.go
new file mode 100644
index 00000000000..85f8e7f0a9a
--- /dev/null
+++ b/go/ioutil/doc.go
@@ -0,0 +1,18 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package ioutil provides wrappers around golang IO interfaces.
+package ioutil
diff --git a/go/ioutil/meter.go b/go/ioutil/meter.go
new file mode 100644
index 00000000000..1673d829d53
--- /dev/null
+++ b/go/ioutil/meter.go
@@ -0,0 +1,58 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/*
+The meter struct contains time-and-byte-tracking functionality used by
+MeteredReader, MeteredReadCloser, MeteredWriter, MeteredWriteCloser.
+*/
+
+package ioutil
+
+import "time"
+
+// meter contains time-and-byte-tracking functionality.
+type meter struct {
+ fs []func(b int, d time.Duration)
+ bytes int64
+ duration time.Duration
+}
+
+// Bytes reports the total bytes processed in calls to measure().
+func (mtr *meter) Bytes() int64 {
+ return mtr.bytes
+}
+
+// Duration reports the total time spent in calls to measure().
+func (mtr *meter) Duration() time.Duration {
+ return mtr.duration
+}
+
+// measure tracks the time spent and bytes processed by f. Time is accumulated into total,
+// and reported to callback fns.
+func (mtr *meter) measure(f func(p []byte) (int, error), p []byte) (b int, err error) {
+ s := time.Now()
+ b, err = f(p)
+ d := time.Since(s)
+
+ mtr.bytes += int64(b)
+ mtr.duration += d
+
+ for _, cb := range mtr.fs {
+ cb(b, d)
+ }
+
+ return
+}
diff --git a/go/ioutil/reader.go b/go/ioutil/reader.go
new file mode 100644
index 00000000000..144201416b2
--- /dev/null
+++ b/go/ioutil/reader.go
@@ -0,0 +1,98 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/*
+MeteredReadCloser and MeteredReader are time-and-byte-tracking wrappers around
+ReadCloser and Reader.
+*/
+
+package ioutil
+
+import (
+ "io"
+ "time"
+)
+
+// MeteredReadCloser tracks how much time is spent and bytes are read in Read
+// calls.
+type MeteredReadCloser interface {
+ io.ReadCloser
+ // Bytes reports the total number of bytes read in Read calls.
+ Bytes() int64
+ // Duration reports the total duration of time spent on Read calls.
+ Duration() time.Duration
+}
+
+// MeteredReader tracks how much time is spent and bytes are read in Read
+// calls.
+type MeteredReader interface {
+ io.Reader
+ // Bytes reports the total number of bytes read in Read calls.
+ Bytes() int64
+ // Duration reports the total duration of time spent on Read calls.
+ Duration() time.Duration
+}
+
+type meteredReadCloser struct {
+ io.ReadCloser
+ *meter
+}
+
+type meteredReader struct {
+ io.Reader
+ *meter
+}
+
+// NewMeteredReadCloser creates a MeteredReadCloser which tracks the amount of
+// time spent and bytes read in Read calls to the provided inner ReadCloser.
+// Optional callbacks will be called with the time spent and bytes read in each
+// Read call.
+func NewMeteredReadCloser(rc io.ReadCloser, fns ...func(int, time.Duration)) MeteredReadCloser {
+ return &meteredReadCloser{
+ ReadCloser: rc,
+ meter: &meter{fns, 0, 0},
+ }
+}
+
+// Read calls the inner ReadCloser, increments the total Duration and Bytes,
+// and calls any registered callbacks with the amount of time spent and bytes
+// read in this Read call.
+func (trc *meteredReadCloser) Read(p []byte) (n int, err error) {
+ return trc.meter.measure(trc.ReadCloser.Read, p)
+}
+
+// NewMeteredReader creates a MeteredReader which tracks the amount of time spent
+// and bytes read in Read calls to the provided inner Reader. Optional
+// callbacks will be called with the time spent and bytes read in each Read
+// call.
+func NewMeteredReader(r io.Reader, fns ...func(int, time.Duration)) MeteredReader {
+ return &meteredReader{
+ Reader: r,
+ meter: &meter{fns, 0, 0},
+ }
+}
+
+// Duration reports the total time spend on Read calls so far.
+func (tr *meteredReader) Duration() time.Duration {
+ return tr.duration
+}
+
+// Read calls the inner Reader, increments the total Duration and Bytes, and
+// calls any registered callbacks with the amount of time spent and bytes read
+// in this Read call.
+func (tr *meteredReader) Read(p []byte) (int, error) {
+ return tr.measure(tr.Reader.Read, p)
+}
diff --git a/go/ioutil/writer.go b/go/ioutil/writer.go
new file mode 100644
index 00000000000..4aac07ba501
--- /dev/null
+++ b/go/ioutil/writer.go
@@ -0,0 +1,89 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/*
+MeteredWriteCloser and MeteredWriter are respectively, time-and-byte-tracking
+wrappers around WriteCloser and Writer.
+*/
+
+package ioutil
+
+import (
+ "io"
+ "time"
+)
+
+// MeteredWriteCloser tracks how much time is spent and bytes are written in
+// Write calls.
+type MeteredWriteCloser interface {
+ io.WriteCloser
+ // Duration reports the total duration of time spent on Write calls.
+ Duration() time.Duration
+}
+
+// MeteredWriter tracks how much time is spent and bytes are written in Write
+// calls.
+type MeteredWriter interface {
+ io.Writer
+ // Duration reports the total duration of time spent on Writer calls.
+ Duration() time.Duration
+}
+
+type meteredWriteCloser struct {
+ io.WriteCloser
+ *meter
+}
+
+type meteredWriter struct {
+ io.Writer
+ *meter
+}
+
+// NewMeteredWriteCloser creates a MeteredWriteCloser which tracks the amount of
+// time spent and bytes writtein in Write calls to the provided inner
+// WriteCloser. Optional callbacks will be called with the time spent and bytes
+// written in each Write call.
+func NewMeteredWriteCloser(wc io.WriteCloser, fns ...func(int, time.Duration)) MeteredWriteCloser {
+ return &meteredWriteCloser{
+ WriteCloser: wc,
+ meter: &meter{fns, 0, 0},
+ }
+}
+
+// Write calls the inner WriteCloser, increments the total Duration and Bytes,
+// and calls any registered callbacks with the amount of time spent and bytes
+// written in this Write call.
+func (twc *meteredWriteCloser) Write(p []byte) (int, error) {
+ return twc.meter.measure(twc.WriteCloser.Write, p)
+}
+
+// NewMeteredWriter creates a MeteredWriter which tracks the amount of time spent
+// and bytes written in Write calls to the provided inner Writer. Optional
+// callbacks will be called with the time spent and bytes written in each Write
+// call.
+func NewMeteredWriter(tw io.Writer, fns ...func(int, time.Duration)) MeteredWriter {
+ return &meteredWriter{
+ Writer: tw,
+ meter: &meter{fns, 0, 0},
+ }
+}
+
+// Write calls the inner Writer, increments the total Duration and Bytes, and
+// calls any registered callbacks with the amount of time spent and bytes
+// written in this Write call.
+func (tw *meteredWriter) Write(p []byte) (int, error) {
+ return tw.meter.measure(tw.Writer.Write, p)
+}
diff --git a/go/test/fuzzing/tablet_manager_fuzzer.go b/go/test/fuzzing/tablet_manager_fuzzer.go
index 0e6b6aaece7..316cf75fb82 100644
--- a/go/test/fuzzing/tablet_manager_fuzzer.go
+++ b/go/test/fuzzing/tablet_manager_fuzzer.go
@@ -22,7 +22,7 @@ import (
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
+ "vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
"vitess.io/vitess/go/vt/vttablet/tabletservermock"
@@ -42,7 +42,7 @@ func FuzzTabletManagerExecuteFetchAsDba(data []byte) int {
cp := mysql.ConnParams{}
db := fakesqldb.New(t)
db.AddQueryPattern(".*", &sqltypes.Result{})
- daemon := fakemysqldaemon.NewFakeMysqlDaemon(db)
+ daemon := mysqlctl.NewFakeMysqlDaemon(db)
dbName := "dbname"
tm := &tabletmanager.TabletManager{
diff --git a/go/vt/mysqlctl/azblobbackupstorage/azblob.go b/go/vt/mysqlctl/azblobbackupstorage/azblob.go
index 660abcc5008..08fa24643b7 100644
--- a/go/vt/mysqlctl/azblobbackupstorage/azblob.go
+++ b/go/vt/mysqlctl/azblobbackupstorage/azblob.go
@@ -416,6 +416,11 @@ func (bs *AZBlobBackupStorage) Close() error {
return nil
}
+func (bs *AZBlobBackupStorage) WithParams(params backupstorage.Params) backupstorage.BackupStorage {
+ // TODO(maxeng): return a new AZBlobBackupStorage that uses params.
+ return bs
+}
+
// objName joins path parts into an object name.
// Unlike path.Join, it doesn't collapse ".." or strip trailing slashes.
// It also adds the value of the -azblob_backup_storage_root flag if set.
diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go
index 4a7781c1be9..c299cf8dc51 100644
--- a/go/vt/mysqlctl/backup.go
+++ b/go/vt/mysqlctl/backup.go
@@ -25,6 +25,9 @@ import (
"strings"
"time"
+ "golang.org/x/text/cases"
+ "golang.org/x/text/language"
+
"github.com/spf13/pflag"
"vitess.io/vitess/go/vt/servenv"
@@ -32,8 +35,9 @@ import (
"context"
"vitess.io/vitess/go/mysql"
- "vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
+ "vitess.io/vitess/go/vt/mysqlctl/backupstats"
+ stats "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/topoproto"
@@ -85,8 +89,7 @@ var (
// once before the writer blocks
backupCompressBlocks = 2
- backupDuration = stats.NewGauge("backup_duration_seconds", "How long it took to complete the last backup operation (in seconds)")
- restoreDuration = stats.NewGauge("restore_duration_seconds", "How long it took to complete the last restore operation (in seconds)")
+ titleCase = cases.Title(language.English).String
)
func init() {
@@ -106,6 +109,10 @@ func registerBackupFlags(fs *pflag.FlagSet) {
// - shuts down Mysqld during the backup
// - remember if we were replicating, restore the exact same state
func Backup(ctx context.Context, params BackupParams) error {
+ if params.Stats == nil {
+ params.Stats = stats.NoStats()
+ }
+
startTs := time.Now()
backupDir := GetBackupDir(params.Keyspace, params.Shard)
name := fmt.Sprintf("%v.%v", params.BackupTime.UTC().Format(BackupTimestampFormat), params.TabletAlias)
@@ -115,6 +122,19 @@ func Backup(ctx context.Context, params BackupParams) error {
return vterrors.Wrap(err, "unable to get backup storage")
}
defer bs.Close()
+
+ // Scope bsStats to selected storage engine.
+ bsStats := params.Stats.Scope(
+ stats.Component(stats.BackupStorage),
+ stats.Implementation(
+ titleCase(backupstorage.BackupStorageImplementation),
+ ),
+ )
+ bs = bs.WithParams(backupstorage.Params{
+ Logger: params.Logger,
+ Stats: bsStats,
+ })
+
bh, err := bs.StartBackup(ctx, backupDir, name)
if err != nil {
return vterrors.Wrap(err, "StartBackup failed")
@@ -124,9 +144,14 @@ func Backup(ctx context.Context, params BackupParams) error {
if err != nil {
return vterrors.Wrap(err, "failed to find backup engine")
}
-
+ // Scope stats to selected backup engine.
+ beParams := params.Copy()
+ beParams.Stats = params.Stats.Scope(
+ stats.Component(stats.BackupEngine),
+ stats.Implementation(titleCase(backupEngineImplementation)),
+ )
// Take the backup, and either AbortBackup or EndBackup.
- usable, err := be.ExecuteBackup(ctx, params, bh)
+ usable, err := be.ExecuteBackup(ctx, beParams, bh)
logger := params.Logger
var finishErr error
if usable {
@@ -146,7 +171,8 @@ func Backup(ctx context.Context, params BackupParams) error {
}
// The backup worked, so just return the finish error, if any.
- backupDuration.Set(int64(time.Since(startTs).Seconds()))
+ stats.DeprecatedBackupDurationS.Set(int64(time.Since(startTs).Seconds()))
+ params.Stats.Scope(stats.Operation("Backup")).TimedIncrement(time.Since(startTs))
return finishErr
}
@@ -274,6 +300,10 @@ func ShouldRestore(ctx context.Context, params RestoreParams) (bool, error) {
// appropriate backup on the BackupStorage, Restore logs an error
// and returns ErrNoBackup. Any other error is returned.
func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error) {
+ if params.Stats == nil {
+ params.Stats = stats.NoStats()
+ }
+
startTs := time.Now()
// find the right backup handle: most recent one, with a MANIFEST
params.Logger.Infof("Restore: looking for a suitable backup to restore")
@@ -283,6 +313,18 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error)
}
defer bs.Close()
+ // Scope bsStats to selected storage engine.
+ bsStats := params.Stats.Scope(
+ stats.Component(backupstats.BackupStorage),
+ stats.Implementation(
+ titleCase(backupstorage.BackupStorageImplementation),
+ ),
+ )
+ bs = bs.WithParams(backupstorage.Params{
+ Logger: params.Logger,
+ Stats: bsStats,
+ })
+
// Backups are stored in a directory structure that starts with
// /
backupDir := GetBackupDir(params.Keyspace, params.Shard)
@@ -325,7 +367,13 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error)
if params.DryRun {
return nil, nil
}
- manifest, err := re.ExecuteRestore(ctx, params, bh)
+ // Scope stats to selected backup engine.
+ reParams := params.Copy()
+ reParams.Stats = params.Stats.Scope(
+ stats.Component(backupstats.BackupEngine),
+ stats.Implementation(titleCase(backupEngineImplementation)),
+ )
+ manifest, err := re.ExecuteRestore(ctx, reParams, bh)
if err != nil {
return nil, err
}
@@ -393,7 +441,8 @@ func Restore(ctx context.Context, params RestoreParams) (*BackupManifest, error)
return nil, err
}
- restoreDuration.Set(int64(time.Since(startTs).Seconds()))
+ stats.DeprecatedRestoreDurationS.Set(int64(time.Since(startTs).Seconds()))
+ params.Stats.Scope(stats.Operation("Restore")).TimedIncrement(time.Since(startTs))
params.Logger.Infof("Restore: complete")
return manifest, nil
}
diff --git a/go/vt/mysqlctl/backup_test.go b/go/vt/mysqlctl/backup_test.go
index 08d5e31a116..d26ca873243 100644
--- a/go/vt/mysqlctl/backup_test.go
+++ b/go/vt/mysqlctl/backup_test.go
@@ -17,15 +17,123 @@ limitations under the License.
package mysqlctl
import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "io"
"os"
"path"
"reflect"
"sort"
"testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
"vitess.io/vitess/go/mysql"
+ "vitess.io/vitess/go/mysql/fakesqldb"
+ "vitess.io/vitess/go/vt/logutil"
+ "vitess.io/vitess/go/vt/mysqlctl/backupstats"
+ "vitess.io/vitess/go/vt/mysqlctl/backupstorage"
)
+// TestBackupExecutesBackupWithScopedParams tests that Backup passes
+// a Scope()-ed stats to backupengine ExecuteBackup.
+func TestBackupExecutesBackupWithScopedParams(t *testing.T) {
+ env, closer := createFakeBackupRestoreEnv(t)
+ defer closer()
+
+ require.Nil(t, Backup(env.ctx, env.backupParams), env.logger.Events)
+
+ require.Equal(t, 1, len(env.backupEngine.ExecuteBackupCalls))
+ executeBackupParams := env.backupEngine.ExecuteBackupCalls[0].BackupParams
+ var executeBackupStats *backupstats.FakeStats
+ for _, sr := range env.stats.ScopeReturns {
+ if sr == executeBackupParams.Stats {
+ executeBackupStats = sr.(*backupstats.FakeStats)
+ }
+ }
+ require.Contains(t, executeBackupStats.ScopeV, backupstats.ScopeComponent)
+ require.Equal(t, backupstats.BackupEngine.String(), executeBackupStats.ScopeV[backupstats.ScopeComponent])
+ require.Contains(t, executeBackupStats.ScopeV, backupstats.ScopeImplementation)
+ require.Equal(t, "Fake", executeBackupStats.ScopeV[backupstats.ScopeImplementation])
+}
+
+// TestBackupNoStats tests that if BackupParams.Stats is nil, then Backup will
+// pass non-nil Stats to sub-components.
+func TestBackupNoStats(t *testing.T) {
+ env, closer := createFakeBackupRestoreEnv(t)
+ defer closer()
+
+ env.setStats(nil)
+
+ require.Nil(t, Backup(env.ctx, env.backupParams), env.logger.Events)
+
+ // It parameterizes the backup storage with nop stats.
+ require.Equal(t, 1, len(env.backupStorage.WithParamsCalls))
+ require.Equal(t, backupstats.NoStats(), env.backupStorage.WithParamsCalls[0].Stats)
+}
+
+// TestBackupParameterizesBackupStorageWithScopedStats tests that Backup passes
+// a Scope()-ed stats to BackupStorage.WithParams.
+func TestBackupParameterizesBackupStorageWithScopedStats(t *testing.T) {
+ env, closer := createFakeBackupRestoreEnv(t)
+ defer closer()
+
+ require.Nil(t, Backup(env.ctx, env.backupParams), env.logger.Events)
+
+ require.Equal(t, 1, len(env.backupStorage.WithParamsCalls))
+ var storageStats *backupstats.FakeStats
+ for _, sr := range env.stats.ScopeReturns {
+ if sr == env.backupStorage.WithParamsCalls[0].Stats {
+ storageStats = sr.(*backupstats.FakeStats)
+ }
+ }
+ require.Contains(t, storageStats.ScopeV, backupstats.ScopeComponent)
+ require.Equal(t, backupstats.BackupStorage.String(), storageStats.ScopeV[backupstats.ScopeComponent])
+ require.Contains(t, storageStats.ScopeV, backupstats.ScopeImplementation)
+ require.Equal(t, "Fake", storageStats.ScopeV[backupstats.ScopeImplementation])
+}
+
+// TestBackupEmitsStats tests that Backup emits stats.
+func TestBackupEmitsStats(t *testing.T) {
+ env, closer := createFakeBackupRestoreEnv(t)
+ defer closer()
+
+ // Force ExecuteBackup to take time so we can test stats emission.
+ env.backupEngine.ExecuteBackupDuration = 1001 * time.Millisecond
+
+ require.Nil(t, Backup(env.ctx, env.backupParams), env.logger.Events)
+
+ require.NotZero(t, backupstats.DeprecatedBackupDurationS.Get())
+ require.Equal(t, 0, len(env.stats.TimedIncrementCalls))
+ require.Equal(t, 0, len(env.stats.ScopeV))
+}
+
+// TestBackupTriesToParameterizeBackupStorage tests that Backup tries to pass
+// backupstorage.Params to backupstorage, but only if it responds to
+// backupstorage.WithParams.
+func TestBackupTriesToParameterizeBackupStorage(t *testing.T) {
+ env, closer := createFakeBackupRestoreEnv(t)
+ defer closer()
+
+ require.Nil(t, Backup(env.ctx, env.backupParams), env.logger.Events)
+
+ require.Equal(t, 1, len(env.backupStorage.WithParamsCalls))
+ require.Equal(t, env.logger, env.backupStorage.WithParamsCalls[0].Logger)
+ var scopedStats backupstats.Stats
+ for _, sr := range env.stats.ScopeReturns {
+ if sr != env.backupStorage.WithParamsCalls[0].Stats {
+ continue
+ }
+ if scopedStats != nil {
+ require.Fail(t, "backupstorage stats matches multiple scoped stats produced by parent stats")
+ }
+ scopedStats = sr
+ }
+ require.NotNil(t, scopedStats)
+}
+
func TestFindFilesToBackupWithoutRedoLog(t *testing.T) {
root := t.TempDir()
@@ -209,8 +317,233 @@ func TestFindFilesToBackupWithRedoLog(t *testing.T) {
}
}
+// TestRestoreEmitsStats tests that Restore emits stats.
+func TestRestoreEmitsStats(t *testing.T) {
+ env, closer := createFakeBackupRestoreEnv(t)
+ defer closer()
+
+ // Force ExecuteRestore to take time so we can test stats emission.
+ env.backupEngine.ExecuteRestoreDuration = 1001 * time.Millisecond
+
+ _, err := Restore(env.ctx, env.restoreParams)
+ require.Nil(t, err, env.logger.Events)
+
+ require.NotZero(t, backupstats.DeprecatedRestoreDurationS.Get())
+ require.Equal(t, 0, len(env.stats.TimedIncrementCalls))
+ require.Equal(t, 0, len(env.stats.ScopeV))
+}
+
+// TestRestoreExecutesRestoreWithScopedParams tests that Restore passes
+// a Scope()-ed stats to backupengine ExecuteRestore.
+func TestRestoreExecutesRestoreWithScopedParams(t *testing.T) {
+ env, closer := createFakeBackupRestoreEnv(t)
+ defer closer()
+
+ _, err := Restore(env.ctx, env.restoreParams)
+ require.Nil(t, err, env.logger.Events)
+
+ require.Equal(t, 1, len(env.backupEngine.ExecuteRestoreCalls))
+ executeRestoreParams := env.backupEngine.ExecuteRestoreCalls[0].RestoreParams
+ var executeRestoreStats *backupstats.FakeStats
+ for _, sr := range env.stats.ScopeReturns {
+ if sr == executeRestoreParams.Stats {
+ executeRestoreStats = sr.(*backupstats.FakeStats)
+ }
+ }
+ require.Contains(t, executeRestoreStats.ScopeV, backupstats.ScopeComponent)
+ require.Equal(t, backupstats.BackupEngine.String(), executeRestoreStats.ScopeV[backupstats.ScopeComponent])
+ require.Contains(t, executeRestoreStats.ScopeV, backupstats.ScopeImplementation)
+ require.Equal(t, "Fake", executeRestoreStats.ScopeV[backupstats.ScopeImplementation])
+}
+
+// TestRestoreNoStats tests that if RestoreParams.Stats is nil, then Restore will
+// pass non-nil Stats to sub-components.
+func TestRestoreNoStats(t *testing.T) {
+ env, closer := createFakeBackupRestoreEnv(t)
+ defer closer()
+
+ env.setStats(nil)
+
+ _, err := Restore(env.ctx, env.restoreParams)
+ require.Nil(t, err, env.logger.Events)
+
+ // It parameterizes the backup storage with nop stats.
+ require.Equal(t, 1, len(env.backupStorage.WithParamsCalls))
+ require.Equal(t, backupstats.NoStats(), env.backupStorage.WithParamsCalls[0].Stats)
+}
+
+// TestRestoreParameterizesBackupStorageWithScopedStats tests that Restore passes
+// a Scope()-ed stats to BackupStorage.WithParams.
+func TestRestoreParameterizesBackupStorageWithScopedStats(t *testing.T) {
+ env, closer := createFakeBackupRestoreEnv(t)
+ defer closer()
+
+ _, err := Restore(env.ctx, env.restoreParams)
+ require.Nil(t, err, env.logger.Events)
+
+ require.Equal(t, 1, len(env.backupStorage.WithParamsCalls))
+ var storageStats *backupstats.FakeStats
+ for _, sr := range env.stats.ScopeReturns {
+ if sr == env.backupStorage.WithParamsCalls[0].Stats {
+ storageStats = sr.(*backupstats.FakeStats)
+ }
+ }
+ require.Contains(t, storageStats.ScopeV, backupstats.ScopeComponent)
+ require.Equal(t, backupstats.BackupStorage.String(), storageStats.ScopeV[backupstats.ScopeComponent])
+ require.Contains(t, storageStats.ScopeV, backupstats.ScopeImplementation)
+ require.Equal(t, "Fake", storageStats.ScopeV[backupstats.ScopeImplementation])
+}
+
+// TestRestoreTriesToParameterizeBackupStorage tests that Restore tries to pass
+// backupstorage.Params to backupstorage, but only if it responds to
+// backupstorage.WithParams.
+func TestRestoreTriesToParameterizeBackupStorage(t *testing.T) {
+ env, closer := createFakeBackupRestoreEnv(t)
+ defer closer()
+
+ _, err := Restore(env.ctx, env.restoreParams)
+ require.Nil(t, err, env.logger.Events)
+
+ require.Equal(t, 1, len(env.backupStorage.WithParamsCalls))
+ require.Equal(t, env.logger, env.backupStorage.WithParamsCalls[0].Logger)
+ var scopedStats backupstats.Stats
+ for _, sr := range env.stats.ScopeReturns {
+ if sr != env.backupStorage.WithParamsCalls[0].Stats {
+ continue
+ }
+ if scopedStats != nil {
+ require.Fail(t, "backupstorage stats matches multiple scoped stats produced by parent stats")
+ }
+ scopedStats = sr
+ }
+ require.NotNil(t, scopedStats)
+}
+
type forTest []FileEntry
func (f forTest) Len() int { return len(f) }
func (f forTest) Swap(i, j int) { f[i], f[j] = f[j], f[i] }
func (f forTest) Less(i, j int) bool { return f[i].Base+f[i].Name < f[j].Base+f[j].Name }
+
+type fakeBackupRestoreEnv struct {
+ backupEngine *FakeBackupEngine
+ backupParams BackupParams
+ backupStorage *FakeBackupStorage
+ ctx context.Context
+ logger *logutil.MemoryLogger
+ restoreParams RestoreParams
+ mysqld *FakeMysqlDaemon
+ stats *backupstats.FakeStats
+}
+
+func createFakeBackupRestoreEnv(t *testing.T) (*fakeBackupRestoreEnv, func()) {
+ ctx := context.Background()
+ logger := logutil.NewMemoryLogger()
+
+ sqldb := fakesqldb.New(t)
+ sqldb.SetNeverFail(true)
+ mysqld := NewFakeMysqlDaemon(sqldb)
+ require.Nil(t, mysqld.Shutdown(ctx, nil, false))
+ defer mysqld.Close()
+
+ dirName, err := os.MkdirTemp("", "vt_backup_test")
+ require.Nil(t, err)
+
+ cnf := &Mycnf{
+ DataDir: dirName,
+ }
+
+ stats := backupstats.NewFakeStats()
+
+ backupParams := BackupParams{
+ Cnf: cnf,
+ Logger: logger,
+ Mysqld: mysqld,
+ Concurrency: 1,
+ HookExtraEnv: map[string]string{},
+ TopoServer: nil,
+ Keyspace: "test",
+ Shard: "-",
+ BackupTime: time.Now(),
+ IncrementalFromPos: "",
+ Stats: stats,
+ }
+
+ restoreParams := RestoreParams{
+ Cnf: cnf,
+ Logger: logger,
+ Mysqld: mysqld,
+ Concurrency: 1,
+ HookExtraEnv: map[string]string{},
+ DeleteBeforeRestore: false,
+ DbName: "test",
+ Keyspace: "test",
+ Shard: "-",
+ StartTime: time.Now(),
+ RestoreToPos: mysql.Position{},
+ DryRun: false,
+ Stats: stats,
+ }
+
+ manifest := BackupManifest{
+ BackupTime: time.Now().Add(-1 * time.Hour).Format(time.RFC3339),
+ BackupMethod: "fake",
+ Keyspace: "test",
+ Shard: "-",
+ }
+
+ manifestBytes, err := json.Marshal(manifest)
+ require.Nil(t, err)
+
+ testBackupEngine := FakeBackupEngine{}
+ testBackupEngine.ExecuteRestoreReturn = FakeBackupEngineExecuteRestoreReturn{&manifest, nil}
+
+ previousBackupEngineImplementation := backupEngineImplementation
+ BackupRestoreEngineMap["fake"] = &testBackupEngine
+ backupEngineImplementation = "fake"
+
+ testBackupStorage := FakeBackupStorage{}
+ testBackupStorage.ListBackupsReturn = FakeBackupStorageListBackupsReturn{
+ BackupHandles: []backupstorage.BackupHandle{
+ &FakeBackupHandle{
+ ReadFileReturnF: func(context.Context, string) (io.ReadCloser, error) {
+ return io.NopCloser(bytes.NewBuffer(manifestBytes)), nil
+ },
+ },
+ },
+ }
+ testBackupStorage.StartBackupReturn = FakeBackupStorageStartBackupReturn{&FakeBackupHandle{}, nil}
+ testBackupStorage.WithParamsReturn = &testBackupStorage
+
+ backupstorage.BackupStorageMap["fake"] = &testBackupStorage
+ previousBackupStorageImplementation := backupstorage.BackupStorageImplementation
+ backupstorage.BackupStorageImplementation = "fake"
+
+ closer := func() {
+ backupstats.DeprecatedBackupDurationS.Reset()
+ backupstats.DeprecatedRestoreDurationS.Reset()
+
+ delete(BackupRestoreEngineMap, "fake")
+ backupEngineImplementation = previousBackupEngineImplementation
+
+ delete(backupstorage.BackupStorageMap, "fake")
+ backupstorage.BackupStorageImplementation = previousBackupStorageImplementation
+ }
+
+ return &fakeBackupRestoreEnv{
+ backupEngine: &testBackupEngine,
+ backupParams: backupParams,
+ backupStorage: &testBackupStorage,
+ ctx: ctx,
+ logger: logger,
+ mysqld: mysqld,
+ restoreParams: restoreParams,
+ stats: stats,
+ }, closer
+}
+
+func (fbe *fakeBackupRestoreEnv) setStats(stats *backupstats.FakeStats) {
+ fbe.backupParams.Stats = nil
+ fbe.restoreParams.Stats = nil
+ fbe.stats = nil
+}
diff --git a/go/vt/mysqlctl/backupengine.go b/go/vt/mysqlctl/backupengine.go
index 7473edd5dba..d41780ca9e9 100644
--- a/go/vt/mysqlctl/backupengine.go
+++ b/go/vt/mysqlctl/backupengine.go
@@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/logutil"
+ "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
@@ -70,6 +71,25 @@ type BackupParams struct {
// Position of last known backup. If non empty, then this value indicates the backup should be incremental
// and as of this position
IncrementalFromPos string
+ // Stats let's backup engines report detailed backup timings.
+ Stats backupstats.Stats
+}
+
+func (b BackupParams) Copy() BackupParams {
+ return BackupParams{
+ b.Cnf,
+ b.Mysqld,
+ b.Logger,
+ b.Concurrency,
+ b.HookExtraEnv,
+ b.TopoServer,
+ b.Keyspace,
+ b.Shard,
+ b.TabletAlias,
+ b.BackupTime,
+ b.IncrementalFromPos,
+ b.Stats,
+ }
}
// RestoreParams is the struct that holds all params passed to ExecuteRestore
@@ -99,6 +119,26 @@ type RestoreParams struct {
RestoreToPos mysql.Position
// When DryRun is set, no restore actually takes place; but some of its steps are validated.
DryRun bool
+ // Stats let's restore engines report detailed restore timings.
+ Stats backupstats.Stats
+}
+
+func (p RestoreParams) Copy() RestoreParams {
+ return RestoreParams{
+ p.Cnf,
+ p.Mysqld,
+ p.Logger,
+ p.Concurrency,
+ p.HookExtraEnv,
+ p.DeleteBeforeRestore,
+ p.DbName,
+ p.Keyspace,
+ p.Shard,
+ p.StartTime,
+ p.RestoreToPos,
+ p.DryRun,
+ p.Stats,
+ }
}
func (p *RestoreParams) IsIncrementalRecovery() bool {
diff --git a/go/vt/mysqlctl/backupstats/component.go b/go/vt/mysqlctl/backupstats/component.go
new file mode 100644
index 00000000000..bcc781b912f
--- /dev/null
+++ b/go/vt/mysqlctl/backupstats/component.go
@@ -0,0 +1,36 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package backupstats
+
+// ComponentType is used to scope Stats.
+// E.g. stats.Scope(Component(BackupEngine)).
+type ComponentType int
+
+const (
+ BackupEngine ComponentType = iota
+ BackupStorage
+)
+
+func (c ComponentType) String() string {
+ switch c {
+ case BackupEngine:
+ return "BackupEngine"
+ case BackupStorage:
+ return "BackupStorage"
+ }
+ return "Other"
+}
diff --git a/go/vt/mysqlctl/backupstats/deprecated.go b/go/vt/mysqlctl/backupstats/deprecated.go
new file mode 100644
index 00000000000..b2beb73b3c5
--- /dev/null
+++ b/go/vt/mysqlctl/backupstats/deprecated.go
@@ -0,0 +1,35 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package backupstats
+
+import "vitess.io/vitess/go/stats"
+
+var (
+ // DeprecatedBackupDurationS is a deprecated statistic that will be removed
+ // in the next release. Use backup_duration_nanoseconds instead.
+ DeprecatedBackupDurationS = stats.NewGauge(
+ "backup_duration_seconds",
+ "[DEPRECATED] How long it took to complete the last backup operation (in seconds)",
+ )
+
+ // DeprecatedRestoreDurationS is a deprecated statistic that will be
+ // removed in the next release. Use restore_duration_nanoseconds instead.
+ DeprecatedRestoreDurationS = stats.NewGauge(
+ "restore_duration_seconds",
+ "[DEPRECATED] How long it took to complete the last restore operation (in seconds)",
+ )
+)
diff --git a/go/vt/mysqlctl/backupstats/doc.go b/go/vt/mysqlctl/backupstats/doc.go
new file mode 100644
index 00000000000..f8df6f750b5
--- /dev/null
+++ b/go/vt/mysqlctl/backupstats/doc.go
@@ -0,0 +1,24 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+// Package backupstats provides a Stats interface for backup and restore
+// operations in the mysqlctl package.
+//
+// The goal is to provide a consistent reporting interface that can be
+// used by any BackupEngine, BackupStorage, or FileHandle component, so that
+// those components don't have to register their own stats or reference global
+// variables.
+package backupstats
diff --git a/go/vt/mysqlctl/backupstats/fake_stats.go b/go/vt/mysqlctl/backupstats/fake_stats.go
new file mode 100644
index 00000000000..6f487659827
--- /dev/null
+++ b/go/vt/mysqlctl/backupstats/fake_stats.go
@@ -0,0 +1,62 @@
+package backupstats
+
+import "time"
+
+type FakeStats struct {
+ ScopeV map[ScopeType]ScopeValue
+ TimedIncrementCalls []time.Duration
+ TimedIncrementBytesCalls []struct {
+ Bytes int
+ Duration time.Duration
+ }
+ ScopeCalls [][]Scope
+ ScopeReturns []Stats
+}
+
+func NewFakeStats(scopes ...Scope) *FakeStats {
+ scopeV := make(map[ScopeType]ScopeValue)
+ for _, s := range scopes {
+ scopeV[s.Type] = s.Value
+ }
+ return &FakeStats{
+ ScopeV: scopeV,
+ }
+}
+
+// Scope returns a new FakeStats with scopes merged from the current FakeStats'
+// scopes and provided scopes. It also records the return value in
+// ScopeReturns, for use in unit test assertions.
+func (fs *FakeStats) Scope(scopes ...Scope) Stats {
+ fs.ScopeCalls = append(fs.ScopeCalls, scopes)
+ newScopeV := map[ScopeType]ScopeValue{}
+ for t, v := range fs.ScopeV {
+ newScopeV[t] = v
+ }
+ for _, s := range scopes {
+ if _, ok := newScopeV[s.Type]; !ok {
+ newScopeV[s.Type] = s.Value
+ }
+ }
+ newScopes := []Scope{}
+ for t, v := range newScopeV {
+ newScopes = append(newScopes, Scope{t, v})
+ }
+ sfs := NewFakeStats(newScopes...)
+ fs.ScopeReturns = append(fs.ScopeReturns, sfs)
+ return sfs
+}
+
+// TimedIncrement does nothing except record calls made to this function in
+// TimedIncrementCalls, for use in unit test assertions.
+func (fs *FakeStats) TimedIncrement(d time.Duration) {
+ fs.TimedIncrementCalls = append(fs.TimedIncrementCalls, d)
+}
+
+// TimedIncrement does nothing except record calls made to this function in
+// TimedIncrementBytesCalls, for use in unit test assertions.
+func (fs *FakeStats) TimedIncrementBytes(b int, d time.Duration) {
+ fs.TimedIncrementBytesCalls = append(fs.TimedIncrementBytesCalls, struct {
+ Bytes int
+ Duration time.Duration
+ }{b, d})
+}
diff --git a/go/vt/mysqlctl/backupstats/scope.go b/go/vt/mysqlctl/backupstats/scope.go
new file mode 100644
index 00000000000..6ddb022e5ac
--- /dev/null
+++ b/go/vt/mysqlctl/backupstats/scope.go
@@ -0,0 +1,58 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package backupstats
+
+type (
+ // Scope is used to specify the scope of Stats. In this way the same Stats
+ // can be passed down to different layers and components of backup and
+ // restore operations, while allowing each component to emit stats without
+ // overwriting the stats of other components.
+ Scope struct {
+ Type ScopeType
+ Value ScopeValue
+ }
+
+ // ScopeType is used to specify the type of scope being set.
+ ScopeType int
+ // ScopeValue is used to specify the value of the scope being set.
+ ScopeValue = string
+)
+
+const (
+ // ScopeComponent is used to specify the type of component, such as
+ // "BackupEngine" or "BackupStorage".
+ ScopeComponent ScopeType = iota
+ // ScopeImplementation is used to specify the specific component, such as
+ // "Builtin" and "XtraBackup" in the case of a "BackupEngine" componenet.
+ ScopeImplementation
+ // ScopeOperation is used to specify the type of operation. Examples of
+ // high-level operations are "Backup" and "Restore", and examples of
+ // low-level operations like "Read" and "Write".
+ ScopeOperation
+)
+
+func Component(c ComponentType) Scope {
+ return Scope{ScopeComponent, c.String()}
+}
+
+func Implementation(v ScopeValue) Scope {
+ return Scope{ScopeImplementation, v}
+}
+
+func Operation(v ScopeValue) Scope {
+ return Scope{ScopeOperation, v}
+}
diff --git a/go/vt/mysqlctl/backupstats/stats.go b/go/vt/mysqlctl/backupstats/stats.go
new file mode 100644
index 00000000000..e81bd569a97
--- /dev/null
+++ b/go/vt/mysqlctl/backupstats/stats.go
@@ -0,0 +1,199 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package backupstats
+
+import (
+ "sync"
+ "time"
+
+ "vitess.io/vitess/go/stats"
+ vtstats "vitess.io/vitess/go/stats"
+)
+
+// Stats is a reporting interface meant to be shared among backup and restore
+// components.
+//
+// This interface is meant to give those components a way to report stats
+// without having to register their own stats or to import globally registered
+// stats. This way out-of-tree plugins can have a mechanism for reporting
+// stats, while the policies for those stats (metric names and labels, stat
+// sinks) remain in the control of in-tree Vitess code and Vitess users.
+type Stats interface {
+ // Scope creates a new Stats which inherits this Stats' scopes plus any
+ // new provided scopes.
+ //
+ // Once a ScopeType has been set in a Stats, it cannot be changed by
+ // further calls to Scope().
+ //
+ // This allows parent components to prepare properly scoped Stats and pass
+ // them to child components in such a way that child components cannot
+ // overwrite another components' metrics.
+ Scope(...Scope) Stats
+ // Increment count by 1 and increase duration.
+ TimedIncrement(time.Duration)
+ // Increment bytes and increase duration.
+ TimedIncrementBytes(int, time.Duration)
+}
+
+type noStats struct{}
+
+type scopedStats struct {
+ bytes *vtstats.CountersWithMultiLabels
+ count *vtstats.CountersWithMultiLabels
+ durationNs *vtstats.CountersWithMultiLabels
+ labelValues []string
+}
+
+const unscoped = "-"
+
+var (
+ defaultNoStats *noStats
+
+ labels = []string{"component", "implementation", "operation"}
+
+ registerBackupStats sync.Once
+ registerRestoreStats sync.Once
+
+ backupBytes *stats.CountersWithMultiLabels
+ backupCount *stats.CountersWithMultiLabels
+ backupDurationNs *stats.CountersWithMultiLabels
+ restoreBytes *stats.CountersWithMultiLabels
+ restoreCount *stats.CountersWithMultiLabels
+ restoreDurationNs *stats.CountersWithMultiLabels
+)
+
+// BackupStats creates a new Stats for backup operations.
+//
+// It registers the following metrics with the Vitess stats package.
+//
+// - BackupBytes: number of bytes processed by an an operation for given
+// component and implementation.
+// - BackupCount: number of times an operation has happened for given
+// component and implementation.
+// - BackupDurationNanoseconds: time spent on an operation for a given
+// component and implementation.
+func BackupStats() Stats {
+ registerBackupStats.Do(func() {
+ backupBytes = stats.NewCountersWithMultiLabels(
+ "BackupBytes",
+ "How many backup bytes processed.",
+ labels,
+ )
+ backupCount = stats.NewCountersWithMultiLabels(
+ "BackupCount",
+ "How many backup operations have happened.",
+ labels,
+ )
+ backupDurationNs = stats.NewCountersWithMultiLabels(
+ "BackupDurationNanoseconds",
+ "How much time has been spent on backup operations (in nanoseconds).",
+ labels,
+ )
+ })
+ return newScopedStats(backupBytes, backupCount, backupDurationNs, nil)
+}
+
+// RestoreStats creates a new Stats for restore operations.
+//
+// It registers the following metrics with the Vitess stats package.
+//
+// - RestoreBytes: number of bytes processed by an an operation for given
+// component and implementation.
+// - RestoreCount: number of times an operation has happened for given
+// component and implementation.
+// - RestoreDurationNanoseconds: time spent on an operation for a given
+// component and implementation.
+func RestoreStats() Stats {
+ registerRestoreStats.Do(func() {
+ restoreBytes = stats.NewCountersWithMultiLabels(
+ "RestoreBytes",
+ "How many restore bytes processed.",
+ labels,
+ )
+ restoreCount = stats.NewCountersWithMultiLabels(
+ "RestoreCount",
+ "How many restore operations have happened.",
+ labels,
+ )
+ restoreDurationNs = stats.NewCountersWithMultiLabels(
+ "RestoreDurationNanoseconds",
+ "How much time has been spent on restore operations (in nanoseconds).",
+ labels,
+ )
+ })
+ return newScopedStats(restoreBytes, restoreCount, restoreDurationNs, nil)
+}
+
+// NoStats returns a no-op Stats suitable for tests and for backwards
+// compoatibility.
+func NoStats() Stats {
+ return defaultNoStats
+}
+
+func (ns *noStats) Lock(...ScopeType) Stats { return ns }
+func (ns *noStats) Scope(...Scope) Stats { return ns }
+func (ns *noStats) TimedIncrement(time.Duration) {}
+func (ns *noStats) TimedIncrementBytes(int, time.Duration) {}
+
+func newScopedStats(
+ bytes *stats.CountersWithMultiLabels,
+ count *stats.CountersWithMultiLabels,
+ durationNs *stats.CountersWithMultiLabels,
+ labelValues []string,
+) Stats {
+ if labelValues == nil {
+ labelValues = make([]string, len(durationNs.Labels()))
+ for i := 0; i < len(labelValues); i++ {
+ labelValues[i] = unscoped
+ }
+ }
+
+ return &scopedStats{bytes, count, durationNs, labelValues}
+}
+
+// Scope returns a new Stats narrowed by the provided scopes. If a provided
+// scope is already set in this Stats, the new Stats uses that scope, and the
+// provided scope is ignored.
+func (s *scopedStats) Scope(scopes ...Scope) Stats {
+ copyOfLabelValues := make([]string, len(s.labelValues))
+ copy(copyOfLabelValues, s.labelValues)
+ for _, scope := range scopes {
+ typeIdx := int(scope.Type)
+
+ // Ignore this scope if the ScopeType is invalid.
+ if typeIdx > len(copyOfLabelValues)-1 {
+ continue
+ }
+ // Ignore this scope if it is already been set in this Stats' label values.
+ if copyOfLabelValues[typeIdx] == unscoped {
+ copyOfLabelValues[typeIdx] = scope.Value
+ }
+ }
+ return newScopedStats(s.bytes, s.count, s.durationNs, copyOfLabelValues)
+}
+
+// TimedIncrement increments the count and duration of the current scope.
+func (s *scopedStats) TimedIncrement(d time.Duration) {
+ s.count.Add(s.labelValues, 1)
+ s.durationNs.Add(s.labelValues, int64(d.Nanoseconds()))
+}
+
+// TimedIncrementBytes increments the byte-count and duration of the current scope.
+func (s *scopedStats) TimedIncrementBytes(b int, d time.Duration) {
+ s.bytes.Add(s.labelValues, 1)
+ s.durationNs.Add(s.labelValues, int64(d.Nanoseconds()))
+}
diff --git a/go/vt/mysqlctl/backupstats/stats_test.go b/go/vt/mysqlctl/backupstats/stats_test.go
new file mode 100644
index 00000000000..f88bfdf8fb6
--- /dev/null
+++ b/go/vt/mysqlctl/backupstats/stats_test.go
@@ -0,0 +1,135 @@
+package backupstats
+
+import (
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ "vitess.io/vitess/go/stats"
+)
+
+func TestBackupStats(t *testing.T) {
+ require.Nil(t, backupCount)
+ require.Nil(t, backupDurationNs)
+ require.Nil(t, restoreCount)
+ require.Nil(t, restoreDurationNs)
+
+ BackupStats()
+ defer resetStats()
+
+ require.NotNil(t, backupCount)
+ require.NotNil(t, backupDurationNs)
+ require.Nil(t, restoreCount)
+ require.Nil(t, restoreDurationNs)
+}
+
+func TestRestoreStats(t *testing.T) {
+ require.Nil(t, backupCount)
+ require.Nil(t, backupDurationNs)
+ require.Nil(t, restoreCount)
+ require.Nil(t, restoreDurationNs)
+
+ RestoreStats()
+ defer resetStats()
+
+ require.Nil(t, backupCount)
+ require.Nil(t, backupDurationNs)
+ require.NotNil(t, restoreCount)
+ require.NotNil(t, restoreDurationNs)
+}
+
+func TestScope(t *testing.T) {
+ bytes := stats.NewCountersWithMultiLabels("TestScopeBytes", "", labels)
+ count := stats.NewCountersWithMultiLabels("TestScopeCount", "", labels)
+ durationNs := stats.NewCountersWithMultiLabels("TestScopeDurationNs", "", labels)
+
+ duration := 10 * time.Second
+
+ stats1 := newScopedStats(bytes, count, durationNs, nil)
+ path1 := strings.Join([]string{unscoped, unscoped, unscoped}, ".")
+
+ stats2 := stats1.Scope(Component(BackupEngine), Implementation("Test"))
+ path2 := strings.Join([]string{BackupEngine.String(), "Test", unscoped}, ".")
+
+ // New stats2 with new scope, let's test:
+ // - TimedIncrement on new stats1 increments stats1 scope but not stats2.
+ // - TimedIncrement on new stats2 increments stats2 scope but not stats1.
+ stats1.TimedIncrement(duration)
+
+ require.Equal(t, 1, len(count.Counts()))
+ require.Equal(t, int64(1), count.Counts()[path1])
+ require.Equal(t, 1, len(durationNs.Counts()))
+ require.Equal(t, duration.Nanoseconds(), durationNs.Counts()[path1])
+
+ stats2.TimedIncrement(duration)
+
+ require.Equal(t, 2, len(count.Counts()))
+ require.Equal(t, int64(1), count.Counts()[path1])
+ require.Equal(t, int64(1), count.Counts()[path2])
+ require.Equal(t, 2, len(durationNs.Counts()))
+ require.Equal(t, duration.Nanoseconds(), durationNs.Counts()[path1])
+ require.Equal(t, duration.Nanoseconds(), durationNs.Counts()[path2])
+
+ // Next let's test that:
+ // - We cannot rescope a ScopeType once it's been set.
+ // - We can scope a ScopeType that is not yet set.
+ stats3 := stats2.Scope(
+ Component(BackupStorage), /* not rescoped, because Component already set on stats2. */
+ Implementation("TestChange"), /* not rescoped, because Implementation already set on stats2 */
+ Operation("Test"), /* scoped, because Operation not yet set on stats2 */
+ )
+ path3 := strings.Join([]string{BackupEngine.String(), "Test", "Test"}, ".")
+ stats3.TimedIncrement(duration)
+
+ require.Equal(t, 3, len(count.Counts()))
+ require.Equal(t, int64(1), count.Counts()[path1])
+ require.Equal(t, int64(1), count.Counts()[path2])
+ require.Equal(t, int64(1), count.Counts()[path3])
+ require.Equal(t, 3, len(durationNs.Counts()))
+ require.Equal(t, duration.Nanoseconds(), durationNs.Counts()[path1])
+ require.Equal(t, duration.Nanoseconds(), durationNs.Counts()[path2])
+ require.Equal(t, duration.Nanoseconds(), durationNs.Counts()[path3])
+}
+
+func TestStatsAreNotInitializedByDefault(t *testing.T) {
+ require.Nil(t, backupCount)
+ require.Nil(t, backupDurationNs)
+ require.Nil(t, restoreCount)
+ require.Nil(t, restoreDurationNs)
+}
+
+func TestTimedIncrement(t *testing.T) {
+ bytes := stats.NewCountersWithMultiLabels("test_timed_increment_bytes", "", labels)
+ count := stats.NewCountersWithMultiLabels("test_timed_increment_count", "", labels)
+ durationNs := stats.NewCountersWithMultiLabels("test_timed_increment_duration_ns", "", labels)
+
+ stats := newScopedStats(bytes, count, durationNs, nil)
+
+ duration := 10 * time.Second
+ path := strings.Join([]string{unscoped, unscoped, unscoped}, ".")
+
+ stats.TimedIncrement(duration)
+
+ require.Equal(t, 1, len(count.Counts()))
+ require.Equal(t, int64(1), count.Counts()[path])
+
+ require.Equal(t, 1, len(durationNs.Counts()))
+ require.Equal(t, duration.Nanoseconds(), durationNs.Counts()[path])
+
+ stats.TimedIncrement(duration)
+
+ require.Equal(t, 1, len(count.Counts()))
+ require.Equal(t, int64(2), count.Counts()[path])
+
+ require.Equal(t, 1, len(durationNs.Counts()))
+ require.Equal(t, 2*duration.Nanoseconds(), durationNs.Counts()[path])
+}
+
+func resetStats() {
+ backupCount = nil
+ backupDurationNs = nil
+ restoreCount = nil
+ restoreDurationNs = nil
+}
diff --git a/go/vt/mysqlctl/backupstorage/interface.go b/go/vt/mysqlctl/backupstorage/interface.go
index 1a302d79a94..92bc71d63aa 100644
--- a/go/vt/mysqlctl/backupstorage/interface.go
+++ b/go/vt/mysqlctl/backupstorage/interface.go
@@ -119,6 +119,13 @@ type BackupStorage interface {
// session, such as closing connections. Implementations of
// BackupStorage must support being reused after Close() is called.
Close() error
+
+ // WithParams should return a new BackupStorage which is a shared-nothing
+ // copy of the current BackupStorage and which uses Params.
+ //
+ // This method is intended to give BackupStorage implementations logging
+ // and metrics mechanisms.
+ WithParams(Params) BackupStorage
}
// BackupStorageMap contains the registered implementations for BackupStorage
diff --git a/go/vt/mysqlctl/backupstorage/params.go b/go/vt/mysqlctl/backupstorage/params.go
new file mode 100644
index 00000000000..7126a5c6103
--- /dev/null
+++ b/go/vt/mysqlctl/backupstorage/params.go
@@ -0,0 +1,41 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package backupstorage
+
+import (
+ "vitess.io/vitess/go/vt/logutil"
+ "vitess.io/vitess/go/vt/mysqlctl/backupstats"
+ logutilpb "vitess.io/vitess/go/vt/proto/logutil"
+)
+
+// Params contains common utilities that may be used by BackupStorage and FileHandle components.
+// This gives out-of-three backup/restore plugins mechanisms to log and generate metrics,
+// while keeping policies (metric names and labels, logging destination, etc.)
+// in the control of in-tree components and Vitess users.
+type Params struct {
+ Logger logutil.Logger
+ Stats backupstats.Stats
+}
+
+// NoParams gives BackupStorage components way to log and generate stats
+// without doing nil checking.
+func NoParams() Params {
+ return Params{
+ Logger: logutil.NewCallbackLogger(func(*logutilpb.Event) {}),
+ Stats: backupstats.NoStats(),
+ }
+}
diff --git a/go/vt/mysqlctl/binlogs_gtid_test.go b/go/vt/mysqlctl/binlogs_gtid_test.go
index f09d88c6544..336b835e3bc 100644
--- a/go/vt/mysqlctl/binlogs_gtid_test.go
+++ b/go/vt/mysqlctl/binlogs_gtid_test.go
@@ -1,7 +1,19 @@
-// Package mysqlctl_test is the blackbox tests for package mysqlctl.
-// Tests that need to use fakemysqldaemon must be written as blackbox tests;
-// since fakemysqldaemon imports mysqlctl, importing fakemysqldaemon in
-// a `package mysqlctl` test would cause a circular import.
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
package mysqlctl
import (
diff --git a/go/vt/mysqlctl/builtinbackupengine.go b/go/vt/mysqlctl/builtinbackupengine.go
index a493856a486..e6e60786c22 100644
--- a/go/vt/mysqlctl/builtinbackupengine.go
+++ b/go/vt/mysqlctl/builtinbackupengine.go
@@ -34,11 +34,13 @@ import (
"github.com/spf13/pflag"
+ "vitess.io/vitess/go/ioutil"
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
+ stats "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/servenv"
@@ -654,25 +656,42 @@ func (bp *backupPipe) ReportProgress(period time.Duration, logger logutil.Logger
// backupFile backs up an individual file.
func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupParams, bh backupstorage.BackupHandle, fe *FileEntry, name string) (finalErr error) {
// Open the source file for reading.
+ openSourceAt := time.Now()
source, err := fe.open(params.Cnf, true)
if err != nil {
return err
}
- defer source.Close()
+ params.Stats.Scope(stats.Operation("Source:Open")).TimedIncrement(time.Since(openSourceAt))
+
+ defer func() {
+ closeSourceAt := time.Now()
+ source.Close()
+ params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt))
+ }()
+
+ readStats := params.Stats.Scope(stats.Operation("Source:Read"))
+ timedSource := ioutil.NewMeteredReadCloser(source, readStats.TimedIncrementBytes)
fi, err := source.Stat()
if err != nil {
return err
}
- params.Logger.Infof("Backing up file: %v", fe.Name)
+ br := newBackupReader(fe.Name, fi.Size(), timedSource)
+ go br.ReportProgress(builtinBackupProgress, params.Logger)
+
// Open the destination file for writing, and a buffer.
- wc, err := bh.AddFile(ctx, name, fi.Size())
+ params.Logger.Infof("Backing up file: %v", fe.Name)
+ openDestAt := time.Now()
+ dest, err := bh.AddFile(ctx, name, fi.Size())
if err != nil {
return vterrors.Wrapf(err, "cannot add file: %v,%v", name, fe.Name)
}
+ params.Stats.Scope(stats.Operation("Destination:Open")).TimedIncrement(time.Since(openDestAt))
+
defer func(name, fileName string) {
- if rerr := wc.Close(); rerr != nil {
+ closeDestAt := time.Now()
+ if rerr := dest.Close(); rerr != nil {
if finalErr != nil {
// We already have an error, just log this one.
params.Logger.Errorf2(rerr, "failed to close file %v,%v", name, fe.Name)
@@ -680,11 +699,13 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
finalErr = rerr
}
}
+ params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt))
}(name, fe.Name)
- bw := newBackupWriter(fe.Name, builtinBackupStorageWriteBufferSize, fi.Size(), wc)
- br := newBackupReader(fe.Name, fi.Size(), source)
- go br.ReportProgress(builtinBackupProgress, params.Logger)
+ destStats := params.Stats.Scope(stats.Operation("Destination:Write"))
+ timedDest := ioutil.NewMeteredWriteCloser(dest, destStats.TimedIncrementBytes)
+
+ bw := newBackupWriter(fe.Name, builtinBackupStorageWriteBufferSize, fi.Size(), timedDest)
var reader io.Reader = br
var writer io.Writer = bw
@@ -700,7 +721,9 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
if err != nil {
return vterrors.Wrap(err, "can't create compressor")
}
- writer = compressor
+
+ compressStats := params.Stats.Scope(stats.Operation("Compressor:Write"))
+ writer = ioutil.NewMeteredWriter(compressor, compressStats.TimedIncrementBytes)
}
if builtinBackupFileReadBufferSize > 0 {
@@ -716,19 +739,25 @@ func (be *BuiltinBackupEngine) backupFile(ctx context.Context, params BackupPara
// Close gzip to flush it, after that all data is sent to writer.
if compressor != nil {
+ closeCompressorAt := time.Now()
if err = compressor.Close(); err != nil {
return vterrors.Wrap(err, "cannot close compressor")
}
+ params.Stats.Scope(stats.Operation("Compressor:Close")).TimedIncrement(time.Since(closeCompressorAt))
}
// Close the backupPipe to finish writing on destination.
+ closeWriterAt := time.Now()
if err = bw.Close(); err != nil {
return vterrors.Wrapf(err, "cannot flush destination: %v", name)
}
+ params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeWriterAt))
+ closeReaderAt := time.Now()
if err := br.Close(); err != nil {
return vterrors.Wrap(err, "failed to close the source reader")
}
+ params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeReaderAt))
// Save the hash.
fe.Hash = bw.HashString()
@@ -867,19 +896,37 @@ func (be *BuiltinBackupEngine) restoreFiles(ctx context.Context, params RestoreP
// restoreFile restores an individual file.
func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestoreParams, bh backupstorage.BackupHandle, fe *FileEntry, bm builtinBackupManifest, name string) (finalErr error) {
// Open the source file for reading.
+ openSourceAt := time.Now()
source, err := bh.ReadFile(ctx, name)
if err != nil {
return vterrors.Wrap(err, "can't open source file for reading")
}
- defer source.Close()
+ params.Stats.Scope(stats.Operation("Source:Open")).TimedIncrement(time.Since(openSourceAt))
+
+ readStats := params.Stats.Scope(stats.Operation("Source:Read"))
+ timedSource := ioutil.NewMeteredReader(source, readStats.TimedIncrementBytes)
+
+ defer func() {
+ closeSourceAt := time.Now()
+ source.Close()
+ params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt))
+ }()
+
+ br := newBackupReader(name, 0, timedSource)
+ go br.ReportProgress(builtinBackupProgress, params.Logger)
+ var reader io.Reader = br
// Open the destination file for writing.
- dstFile, err := fe.open(params.Cnf, false)
+ openDestAt := time.Now()
+ dest, err := fe.open(params.Cnf, false)
if err != nil {
return vterrors.Wrap(err, "can't open destination file for writing")
}
+ params.Stats.Scope(stats.Operation("Destination:Open")).TimedIncrement(time.Since(openDestAt))
+
defer func() {
- if cerr := dstFile.Close(); cerr != nil {
+ closeDestAt := time.Now()
+ if cerr := dest.Close(); cerr != nil {
if finalErr != nil {
// We already have an error, just log this one.
log.Errorf("failed to close file %v: %v", name, cerr)
@@ -887,18 +934,19 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
finalErr = vterrors.Wrap(cerr, "failed to close destination file")
}
}
+ params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt))
}()
- bp := newBackupReader(name, 0, source)
- go bp.ReportProgress(builtinBackupProgress, params.Logger)
+ writeStats := params.Stats.Scope(stats.Operation("Destination:Write"))
+ timedDest := ioutil.NewMeteredWriter(dest, writeStats.TimedIncrementBytes)
- dst := bufio.NewWriterSize(dstFile, int(builtinBackupFileWriteBufferSize))
- var reader io.Reader = bp
+ bufferedDest := bufio.NewWriterSize(timedDest, int(builtinBackupFileWriteBufferSize))
// Create the uncompresser if needed.
if !bm.SkipCompress {
var decompressor io.ReadCloser
var deCompressionEngine = bm.CompressionEngine
+
if deCompressionEngine == "" {
// for backward compatibility
deCompressionEngine = PgzipCompressor
@@ -920,7 +968,11 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
return vterrors.Wrap(err, "can't create decompressor")
}
+ decompressStats := params.Stats.Scope(stats.Operation("Decompressor:Read"))
+ reader = ioutil.NewMeteredReader(decompressor, decompressStats.TimedIncrementBytes)
+
defer func() {
+ closeDecompressorAt := time.Now()
if cerr := decompressor.Close(); cerr != nil {
params.Logger.Errorf("failed to close decompressor: %v", cerr)
if finalErr != nil {
@@ -930,29 +982,33 @@ func (be *BuiltinBackupEngine) restoreFile(ctx context.Context, params RestorePa
finalErr = vterrors.Wrap(cerr, "failed to close decompressor")
}
}
+ params.Stats.Scope(stats.Operation("Decompressor:Close")).TimedIncrement(time.Since(closeDecompressorAt))
}()
- reader = decompressor
}
// Copy the data. Will also write to the hasher.
- if _, err = io.Copy(dst, reader); err != nil {
+ if _, err = io.Copy(bufferedDest, reader); err != nil {
return vterrors.Wrap(err, "failed to copy file contents")
}
// Check the hash.
- hash := bp.HashString()
+ hash := br.HashString()
if hash != fe.Hash {
return vterrors.Errorf(vtrpc.Code_INTERNAL, "hash mismatch for %v, got %v expected %v", fe.Name, hash, fe.Hash)
}
// Flush the buffer.
- if err := dst.Flush(); err != nil {
+ closeDestAt := time.Now()
+ if err := bufferedDest.Flush(); err != nil {
return vterrors.Wrap(err, "failed to flush destination buffer")
}
+ params.Stats.Scope(stats.Operation("Destination:Close")).TimedIncrement(time.Since(closeDestAt))
- if err := bp.Close(); err != nil {
+ closeSourceAt := time.Now()
+ if err := br.Close(); err != nil {
return vterrors.Wrap(err, "failed to close the source reader")
}
+ params.Stats.Scope(stats.Operation("Source:Close")).TimedIncrement(time.Since(closeSourceAt))
return nil
}
diff --git a/go/vt/mysqlctl/builtinbackupengine_test.go b/go/vt/mysqlctl/builtinbackupengine_test.go
index b6837380db7..c86086984f0 100644
--- a/go/vt/mysqlctl/builtinbackupengine_test.go
+++ b/go/vt/mysqlctl/builtinbackupengine_test.go
@@ -1,7 +1,20 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
// Package mysqlctl_test is the blackbox tests for package mysqlctl.
-// Tests that need to use fakemysqldaemon must be written as blackbox tests;
-// since fakemysqldaemon imports mysqlctl, importing fakemysqldaemon in
-// a `package mysqlctl` test would cause a circular import.
package mysqlctl_test
import (
@@ -19,7 +32,6 @@ import (
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
"vitess.io/vitess/go/vt/mysqlctl/filebackupstorage"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vttime"
@@ -100,11 +112,11 @@ func TestExecuteBackup(t *testing.T) {
oldDeadline := setBuiltinBackupMysqldDeadline(time.Second)
defer setBuiltinBackupMysqldDeadline(oldDeadline)
- bh := filebackupstorage.FileBackupHandle{}
+ bh := filebackupstorage.NewBackupHandle(nil, "", "", false)
// Spin up a fake daemon to be used in backups. It needs to be allowed to receive:
// "STOP SLAVE", "START SLAVE", in that order.
- mysqld := fakemysqldaemon.NewFakeMysqlDaemon(fakesqldb.New(t))
+ mysqld := mysqlctl.NewFakeMysqlDaemon(fakesqldb.New(t))
mysqld.ExpectedExecuteSuperQueryList = []string{"STOP SLAVE", "START SLAVE"}
// mysqld.ShutdownTime = time.Minute
@@ -120,7 +132,7 @@ func TestExecuteBackup(t *testing.T) {
TopoServer: ts,
Keyspace: keyspace,
Shard: shard,
- }, &bh)
+ }, bh)
require.NoError(t, err)
assert.True(t, ok)
@@ -140,7 +152,7 @@ func TestExecuteBackup(t *testing.T) {
TopoServer: ts,
Keyspace: keyspace,
Shard: shard,
- }, &bh)
+ }, bh)
assert.Error(t, err)
assert.False(t, ok)
diff --git a/go/vt/mysqlctl/cephbackupstorage/ceph.go b/go/vt/mysqlctl/cephbackupstorage/ceph.go
index b599a76845a..f8e33dbe641 100644
--- a/go/vt/mysqlctl/cephbackupstorage/ceph.go
+++ b/go/vt/mysqlctl/cephbackupstorage/ceph.go
@@ -281,6 +281,11 @@ func (bs *CephBackupStorage) Close() error {
return nil
}
+func (bs *CephBackupStorage) WithParams(params backupstorage.Params) backupstorage.BackupStorage {
+ // TODO(maxeng): return a new CephBackupStorage that uses params.
+ return bs
+}
+
// client returns the Ceph Storage client instance.
// If there isn't one yet, it tries to create one.
func (bs *CephBackupStorage) client() (*minio.Client, error) {
diff --git a/go/vt/mysqlctl/fakebackupengine.go b/go/vt/mysqlctl/fakebackupengine.go
new file mode 100644
index 00000000000..c0fce435d35
--- /dev/null
+++ b/go/vt/mysqlctl/fakebackupengine.go
@@ -0,0 +1,92 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package mysqlctl
+
+import (
+ "context"
+ "time"
+
+ "vitess.io/vitess/go/vt/mysqlctl/backupstorage"
+)
+
+type FakeBackupEngine struct {
+ ExecuteBackupCalls []FakeBackupEngineExecuteBackupCall
+ ExecuteBackupDuration time.Duration
+ ExecuteBackupReturn FakeBackupEngineExecuteBackupReturn
+ ExecuteRestoreCalls []FakeBackupEngineExecuteRestoreCall
+ ExecuteRestoreDuration time.Duration
+ ExecuteRestoreReturn FakeBackupEngineExecuteRestoreReturn
+ ShouldDrainForBackupCalls int
+ ShouldDrainForBackupReturn bool
+}
+
+type FakeBackupEngineExecuteBackupCall struct {
+ BackupParams BackupParams
+ BackupHandle backupstorage.BackupHandle
+}
+
+type FakeBackupEngineExecuteBackupReturn struct {
+ Ok bool
+ Err error
+}
+
+type FakeBackupEngineExecuteRestoreCall struct {
+ BackupHandle backupstorage.BackupHandle
+ RestoreParams RestoreParams
+}
+
+type FakeBackupEngineExecuteRestoreReturn struct {
+ Manifest *BackupManifest
+ Err error
+}
+
+func (be *FakeBackupEngine) ExecuteBackup(
+ ctx context.Context,
+ params BackupParams,
+ bh backupstorage.BackupHandle,
+) (bool, error) {
+ be.ExecuteBackupCalls = append(be.ExecuteBackupCalls, FakeBackupEngineExecuteBackupCall{params, bh})
+
+ if be.ExecuteBackupDuration > 0 {
+ time.Sleep(be.ExecuteBackupDuration)
+ }
+
+ return be.ExecuteBackupReturn.Ok, be.ExecuteBackupReturn.Err
+}
+
+func (be *FakeBackupEngine) ExecuteRestore(
+ ctx context.Context, params RestoreParams,
+ bh backupstorage.BackupHandle,
+) (*BackupManifest, error) {
+ be.ExecuteRestoreCalls = append(be.ExecuteRestoreCalls, FakeBackupEngineExecuteRestoreCall{bh, params})
+
+ // mark restore as in progress
+ if err := createStateFile(params.Cnf); err != nil {
+ return nil, err
+ }
+
+ if be.ExecuteRestoreDuration > 0 {
+ time.Sleep(be.ExecuteRestoreDuration)
+ }
+
+ return be.ExecuteRestoreReturn.Manifest, be.ExecuteRestoreReturn.Err
+}
+
+func (be *FakeBackupEngine) ShouldDrainForBackup() bool {
+ be.ShouldDrainForBackupCalls = be.ShouldDrainForBackupCalls + 1
+ return be.ShouldDrainForBackupReturn
+}
diff --git a/go/vt/mysqlctl/fakebackupstorage.go b/go/vt/mysqlctl/fakebackupstorage.go
new file mode 100644
index 00000000000..75587191157
--- /dev/null
+++ b/go/vt/mysqlctl/fakebackupstorage.go
@@ -0,0 +1,167 @@
+/*
+Copyright 2022 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package mysqlctl
+
+import (
+ "context"
+ "fmt"
+ "io"
+
+ "vitess.io/vitess/go/vt/concurrency"
+ "vitess.io/vitess/go/vt/mysqlctl/backupstorage"
+)
+
+type FakeBackupHandle struct {
+ Dir string
+ NameV string
+ ReadOnly bool
+ Errors concurrency.AllErrorRecorder
+
+ AbortBackupCalls []context.Context
+ AbortBackupReturn error
+ AddFileCalls []FakeBackupHandleAddFileCall
+ AddFileReturn FakeBackupHandleAddFileReturn
+ EndBackupCalls []context.Context
+ EndBackupReturn error
+ ReadFileCalls []FakeBackupHandleReadFileCall
+ ReadFileReturnF func(ctx context.Context, filename string) (io.ReadCloser, error)
+}
+
+type FakeBackupHandleAddFileCall struct {
+ Ctx context.Context
+ Filename string
+ Filesize int64
+}
+
+type FakeBackupHandleAddFileReturn struct {
+ WriteCloser io.WriteCloser
+ Err error
+}
+
+type FakeBackupHandleReadFileCall struct {
+ Ctx context.Context
+ Filename string
+}
+
+func (fbh *FakeBackupHandle) RecordError(err error) {
+ fbh.Errors.RecordError(err)
+}
+
+func (fbh *FakeBackupHandle) HasErrors() bool {
+ return fbh.Errors.HasErrors()
+}
+
+func (fbh *FakeBackupHandle) Error() error {
+ return fbh.Errors.Error()
+}
+
+func (fbh *FakeBackupHandle) Directory() string {
+ return fbh.Dir
+}
+
+func (fbh *FakeBackupHandle) Name() string {
+ return fbh.NameV
+}
+
+func (fbh *FakeBackupHandle) AddFile(ctx context.Context, filename string, filesize int64) (io.WriteCloser, error) {
+ fbh.AddFileCalls = append(fbh.AddFileCalls, FakeBackupHandleAddFileCall{ctx, filename, filesize})
+ return fbh.AddFileReturn.WriteCloser, fbh.AddFileReturn.Err
+}
+
+func (fbh *FakeBackupHandle) EndBackup(ctx context.Context) error {
+ fbh.EndBackupCalls = append(fbh.EndBackupCalls, ctx)
+ return fbh.EndBackupReturn
+}
+
+func (fbh *FakeBackupHandle) AbortBackup(ctx context.Context) error {
+ fbh.AbortBackupCalls = append(fbh.AbortBackupCalls, ctx)
+ return fbh.AbortBackupReturn
+}
+
+func (fbh *FakeBackupHandle) ReadFile(ctx context.Context, filename string) (io.ReadCloser, error) {
+ fbh.ReadFileCalls = append(fbh.ReadFileCalls, FakeBackupHandleReadFileCall{ctx, filename})
+ if fbh.ReadFileReturnF == nil {
+ return nil, fmt.Errorf("FakeBackupHandle has not defined a ReadFileReturnF")
+ }
+ return fbh.ReadFileReturnF(ctx, filename)
+}
+
+type FakeBackupStorage struct {
+ CloseCalls int
+ CloseReturn error
+ ListBackupsCalls []FakeBackupStorageListBackupsCall
+ ListBackupsReturn FakeBackupStorageListBackupsReturn
+ RemoveBackupCalls []FakeBackupStorageRemoveBackupCall
+ RemoveBackupReturn error
+ RemoveBackupReturne error
+ StartBackupCalls []FakeBackupStorageStartBackupCall
+ StartBackupReturn FakeBackupStorageStartBackupReturn
+ WithParamsCalls []backupstorage.Params
+ WithParamsReturn backupstorage.BackupStorage
+}
+
+type FakeBackupStorageListBackupsCall struct {
+ Ctx context.Context
+ Dir string
+}
+
+type FakeBackupStorageListBackupsReturn struct {
+ BackupHandles []backupstorage.BackupHandle
+ Err error
+}
+
+type FakeBackupStorageRemoveBackupCall struct {
+ Ctx context.Context
+ Dir string
+ Name string
+}
+
+type FakeBackupStorageStartBackupCall struct {
+ Ctx context.Context
+ Dir string
+ Name string
+}
+
+type FakeBackupStorageStartBackupReturn struct {
+ BackupHandle backupstorage.BackupHandle
+ Err error
+}
+
+func (fbs *FakeBackupStorage) ListBackups(ctx context.Context, dir string) ([]backupstorage.BackupHandle, error) {
+ fbs.ListBackupsCalls = append(fbs.ListBackupsCalls, FakeBackupStorageListBackupsCall{ctx, dir})
+ return fbs.ListBackupsReturn.BackupHandles, fbs.ListBackupsReturn.Err
+}
+
+func (fbs *FakeBackupStorage) StartBackup(ctx context.Context, dir, name string) (backupstorage.BackupHandle, error) {
+ fbs.StartBackupCalls = append(fbs.StartBackupCalls, FakeBackupStorageStartBackupCall{ctx, dir, name})
+ return fbs.StartBackupReturn.BackupHandle, fbs.StartBackupReturn.Err
+}
+
+func (fbs *FakeBackupStorage) RemoveBackup(ctx context.Context, dir, name string) error {
+ fbs.RemoveBackupCalls = append(fbs.RemoveBackupCalls, FakeBackupStorageRemoveBackupCall{ctx, dir, name})
+ return fbs.RemoveBackupReturn
+}
+
+func (fbs *FakeBackupStorage) Close() error {
+ fbs.CloseCalls = fbs.CloseCalls + 1
+ return fbs.CloseReturn
+}
+
+func (fbs *FakeBackupStorage) WithParams(params backupstorage.Params) backupstorage.BackupStorage {
+ fbs.WithParamsCalls = append(fbs.WithParamsCalls, params)
+ return fbs.WithParamsReturn
+}
diff --git a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go b/go/vt/mysqlctl/fakemysqldaemon.go
similarity index 98%
rename from go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
rename to go/vt/mysqlctl/fakemysqldaemon.go
index 89bf656c2bf..e73fa86b87e 100644
--- a/go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go
+++ b/go/vt/mysqlctl/fakemysqldaemon.go
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
-package fakemysqldaemon
+package mysqlctl
import (
"context"
@@ -29,7 +29,6 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/dbconnpool"
- "vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
querypb "vitess.io/vitess/go/vt/proto/query"
@@ -189,7 +188,7 @@ func NewFakeMysqlDaemon(db *fakesqldb.DB) *FakeMysqlDaemon {
}
// Start is part of the MysqlDaemon interface
-func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *mysqlctl.Mycnf, mysqldArgs ...string) error {
+func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *Mycnf, mysqldArgs ...string) error {
if fmd.Running {
return fmt.Errorf("fake mysql daemon already running")
}
@@ -207,7 +206,7 @@ func (fmd *FakeMysqlDaemon) Start(ctx context.Context, cnf *mysqlctl.Mycnf, mysq
}
// Shutdown is part of the MysqlDaemon interface
-func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, cnf *mysqlctl.Mycnf, waitForMysqld bool) error {
+func (fmd *FakeMysqlDaemon) Shutdown(ctx context.Context, cnf *Mycnf, waitForMysqld bool) error {
if !fmd.Running {
return fmt.Errorf("fake mysql daemon not running")
}
@@ -230,17 +229,17 @@ func (fmd *FakeMysqlDaemon) RunMysqlUpgrade() error {
}
// ReinitConfig is part of the MysqlDaemon interface
-func (fmd *FakeMysqlDaemon) ReinitConfig(ctx context.Context, cnf *mysqlctl.Mycnf) error {
+func (fmd *FakeMysqlDaemon) ReinitConfig(ctx context.Context, cnf *Mycnf) error {
return nil
}
// RefreshConfig is part of the MysqlDaemon interface
-func (fmd *FakeMysqlDaemon) RefreshConfig(ctx context.Context, cnf *mysqlctl.Mycnf) error {
+func (fmd *FakeMysqlDaemon) RefreshConfig(ctx context.Context, cnf *Mycnf) error {
return nil
}
// Wait is part of the MysqlDaemon interface.
-func (fmd *FakeMysqlDaemon) Wait(ctx context.Context, cnf *mysqlctl.Mycnf) error {
+func (fmd *FakeMysqlDaemon) Wait(ctx context.Context, cnf *Mycnf) error {
return nil
}
diff --git a/go/vt/mysqlctl/filebackupstorage/file.go b/go/vt/mysqlctl/filebackupstorage/file.go
index 61faef2183f..99148d9169b 100644
--- a/go/vt/mysqlctl/filebackupstorage/file.go
+++ b/go/vt/mysqlctl/filebackupstorage/file.go
@@ -27,7 +27,9 @@ import (
"github.com/spf13/pflag"
+ "vitess.io/vitess/go/ioutil"
"vitess.io/vitess/go/vt/concurrency"
+ stats "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/servenv"
)
@@ -36,6 +38,8 @@ var (
// FileBackupStorageRoot is where the backups will go.
// Exported for test purposes.
FileBackupStorageRoot string
+
+ defaultFileBackupStorage = newFileBackupStorage(backupstorage.NoParams())
)
func registerFlags(fs *pflag.FlagSet) {
@@ -58,6 +62,23 @@ type FileBackupHandle struct {
errors concurrency.AllErrorRecorder
}
+func NewBackupHandle(
+ fbs *FileBackupStorage,
+ dir,
+ name string,
+ readOnly bool,
+) backupstorage.BackupHandle {
+ if fbs == nil {
+ fbs = defaultFileBackupStorage
+ }
+ return &FileBackupHandle{
+ fbs: fbs,
+ dir: dir,
+ name: name,
+ readOnly: readOnly,
+ }
+}
+
// RecordError is part of the concurrency.ErrorRecorder interface.
func (fbh *FileBackupHandle) RecordError(err error) {
fbh.errors.RecordError(err)
@@ -89,7 +110,12 @@ func (fbh *FileBackupHandle) AddFile(ctx context.Context, filename string, files
return nil, fmt.Errorf("AddFile cannot be called on read-only backup")
}
p := path.Join(FileBackupStorageRoot, fbh.dir, fbh.name, filename)
- return os.Create(p)
+ f, err := os.Create(p)
+ if err != nil {
+ return nil, err
+ }
+ stat := fbh.fbs.params.Stats.Scope(stats.Operation("File:Write"))
+ return ioutil.NewMeteredWriteCloser(f, stat.TimedIncrementBytes), nil
}
// EndBackup is part of the BackupHandle interface
@@ -114,11 +140,22 @@ func (fbh *FileBackupHandle) ReadFile(ctx context.Context, filename string) (io.
return nil, fmt.Errorf("ReadFile cannot be called on read-write backup")
}
p := path.Join(FileBackupStorageRoot, fbh.dir, fbh.name, filename)
- return os.Open(p)
+ f, err := os.Open(p)
+ if err != nil {
+ return nil, err
+ }
+ stat := fbh.fbs.params.Stats.Scope(stats.Operation("File:Read"))
+ return ioutil.NewMeteredReadCloser(f, stat.TimedIncrementBytes), nil
}
// FileBackupStorage implements BackupStorage for local file system.
-type FileBackupStorage struct{}
+type FileBackupStorage struct {
+ params backupstorage.Params
+}
+
+func newFileBackupStorage(params backupstorage.Params) *FileBackupStorage {
+ return &FileBackupStorage{params}
+}
// ListBackups is part of the BackupStorage interface
func (fbs *FileBackupStorage) ListBackups(ctx context.Context, dir string) ([]backupstorage.BackupHandle, error) {
@@ -140,12 +177,7 @@ func (fbs *FileBackupStorage) ListBackups(ctx context.Context, dir string) ([]ba
if info.Name() == "." || info.Name() == ".." {
continue
}
- result = append(result, &FileBackupHandle{
- fbs: fbs,
- dir: dir,
- name: info.Name(),
- readOnly: true,
- })
+ result = append(result, NewBackupHandle(fbs, dir, info.Name(), true /*readOnly*/))
}
return result, nil
}
@@ -164,12 +196,7 @@ func (fbs *FileBackupStorage) StartBackup(ctx context.Context, dir, name string)
return nil, err
}
- return &FileBackupHandle{
- fbs: fbs,
- dir: dir,
- name: name,
- readOnly: false,
- }, nil
+ return NewBackupHandle(fbs, dir, name, false /*readOnly*/), nil
}
// RemoveBackup is part of the BackupStorage interface
@@ -183,6 +210,10 @@ func (fbs *FileBackupStorage) Close() error {
return nil
}
+func (fbs *FileBackupStorage) WithParams(params backupstorage.Params) backupstorage.BackupStorage {
+ return &FileBackupStorage{params}
+}
+
func init() {
- backupstorage.BackupStorageMap["file"] = &FileBackupStorage{}
+ backupstorage.BackupStorageMap["file"] = defaultFileBackupStorage
}
diff --git a/go/vt/mysqlctl/filebackupstorage/file_test.go b/go/vt/mysqlctl/filebackupstorage/file_test.go
index dd24edfb9ec..346481484f4 100644
--- a/go/vt/mysqlctl/filebackupstorage/file_test.go
+++ b/go/vt/mysqlctl/filebackupstorage/file_test.go
@@ -20,6 +20,8 @@ import (
"context"
"io"
"testing"
+
+ "vitess.io/vitess/go/vt/mysqlctl/backupstorage"
)
// This file tests the file BackupStorage engine.
@@ -31,9 +33,9 @@ import (
// setupFileBackupStorage creates a temporary directory, and
// returns a FileBackupStorage based on it
-func setupFileBackupStorage(t *testing.T) *FileBackupStorage {
+func setupFileBackupStorage(t *testing.T) backupstorage.BackupStorage {
FileBackupStorageRoot = t.TempDir()
- return &FileBackupStorage{}
+ return newFileBackupStorage(backupstorage.NoParams())
}
func TestListBackups(t *testing.T) {
diff --git a/go/vt/mysqlctl/gcsbackupstorage/gcs.go b/go/vt/mysqlctl/gcsbackupstorage/gcs.go
index d5008edd020..814395a225a 100644
--- a/go/vt/mysqlctl/gcsbackupstorage/gcs.go
+++ b/go/vt/mysqlctl/gcsbackupstorage/gcs.go
@@ -254,6 +254,11 @@ func (bs *GCSBackupStorage) Close() error {
return nil
}
+func (bs *GCSBackupStorage) WithParams(params backupstorage.Params) backupstorage.BackupStorage {
+ // TODO(maxeng): return a new GCSBackupStorage that uses params.
+ return bs
+}
+
// client returns the GCS Storage client instance.
// If there isn't one yet, it tries to create one.
func (bs *GCSBackupStorage) client(ctx context.Context) (*storage.Client, error) {
diff --git a/go/vt/mysqlctl/s3backupstorage/s3.go b/go/vt/mysqlctl/s3backupstorage/s3.go
index 75f3a5d3eb5..4d10cd7f080 100644
--- a/go/vt/mysqlctl/s3backupstorage/s3.go
+++ b/go/vt/mysqlctl/s3backupstorage/s3.go
@@ -410,6 +410,11 @@ func (bs *S3BackupStorage) Close() error {
return nil
}
+func (bs *S3BackupStorage) WithParams(params backupstorage.Params) backupstorage.BackupStorage {
+ // TODO(maxeng): return a new S3BackupStorage that uses params.
+ return bs
+}
+
var _ backupstorage.BackupStorage = (*S3BackupStorage)(nil)
// getLogLevel converts the string loglevel to an aws.LogLevelType
diff --git a/go/vt/vttablet/tabletmanager/restore.go b/go/vt/vttablet/tabletmanager/restore.go
index 9d4b3b174f3..afd0f5c0365 100644
--- a/go/vt/vttablet/tabletmanager/restore.go
+++ b/go/vt/vttablet/tabletmanager/restore.go
@@ -30,6 +30,7 @@ import (
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
+ "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
@@ -188,6 +189,7 @@ func (tm *TabletManager) restoreDataLocked(ctx context.Context, logger logutil.L
Shard: tablet.Shard,
StartTime: logutil.ProtoToTime(request.BackupTime),
DryRun: request.DryRun,
+ Stats: backupstats.RestoreStats(),
}
if request.RestoreToPos != "" {
pos, err := mysql.DecodePosition(request.RestoreToPos)
diff --git a/go/vt/vttablet/tabletmanager/rpc_backup.go b/go/vt/vttablet/tabletmanager/rpc_backup.go
index c8a8d38eb58..3b1f7a35a74 100644
--- a/go/vt/vttablet/tabletmanager/rpc_backup.go
+++ b/go/vt/vttablet/tabletmanager/rpc_backup.go
@@ -23,6 +23,7 @@ import (
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/mysqlctl"
+ "vitess.io/vitess/go/vt/mysqlctl/backupstats"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
@@ -105,6 +106,7 @@ func (tm *TabletManager) Backup(ctx context.Context, logger logutil.Logger, req
Shard: tablet.Shard,
TabletAlias: topoproto.TabletAliasString(tablet.Alias),
BackupTime: time.Now(),
+ Stats: backupstats.BackupStats(),
}
returnErr := mysqlctl.Backup(ctx, backupParams)
diff --git a/go/vt/vttablet/tabletmanager/rpc_query_test.go b/go/vt/vttablet/tabletmanager/rpc_query_test.go
index f6167e24917..87a64b2d8b7 100644
--- a/go/vt/vttablet/tabletmanager/rpc_query_test.go
+++ b/go/vt/vttablet/tabletmanager/rpc_query_test.go
@@ -27,7 +27,7 @@ import (
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
+ "vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/vttablet/tabletservermock"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
@@ -38,7 +38,7 @@ func TestTabletManager_ExecuteFetchAsDba(t *testing.T) {
cp := mysql.ConnParams{}
db := fakesqldb.New(t)
db.AddQueryPattern(".*", &sqltypes.Result{})
- daemon := fakemysqldaemon.NewFakeMysqlDaemon(db)
+ daemon := mysqlctl.NewFakeMysqlDaemon(db)
dbName := " escap`e me "
tm := &TabletManager{
diff --git a/go/vt/vttablet/tabletmanager/tm_init_test.go b/go/vt/vttablet/tabletmanager/tm_init_test.go
index e1225abd464..36e6e175531 100644
--- a/go/vt/vttablet/tabletmanager/tm_init_test.go
+++ b/go/vt/vttablet/tabletmanager/tm_init_test.go
@@ -33,7 +33,7 @@ import (
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/logutil"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
+ "vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
@@ -379,7 +379,7 @@ func TestCheckPrimaryShip(t *testing.T) {
tablet.Type = topodatapb.TabletType_REPLICA
tablet.PrimaryTermStartTime = nil
// Get the fakeMySQL and set it up to expect a set replication source command
- fakeMysql := tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon)
+ fakeMysql := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon)
fakeMysql.SetReplicationSourceInputs = append(fakeMysql.SetReplicationSourceInputs, fmt.Sprintf("%v:%v", otherTablet.MysqlHostname, otherTablet.MysqlPort))
fakeMysql.ExpectedExecuteSuperQueryList = []string{
"RESET SLAVE ALL",
@@ -638,7 +638,7 @@ func TestGetBuildTags(t *testing.T) {
}
}
-func newTestMysqlDaemon(t *testing.T, port int32) *fakemysqldaemon.FakeMysqlDaemon {
+func newTestMysqlDaemon(t *testing.T, port int32) *mysqlctl.FakeMysqlDaemon {
t.Helper()
db := fakesqldb.New(t)
@@ -646,7 +646,7 @@ func newTestMysqlDaemon(t *testing.T, port int32) *fakemysqldaemon.FakeMysqlDaem
db.AddQueryPattern("BEGIN", &sqltypes.Result{})
db.AddQueryPattern("COMMIT", &sqltypes.Result{})
- mysqld := fakemysqldaemon.NewFakeMysqlDaemon(db)
+ mysqld := mysqlctl.NewFakeMysqlDaemon(db)
mysqld.MysqlPort = sync2.NewAtomicInt32(port)
return mysqld
diff --git a/go/vt/vttablet/tabletmanager/tm_state_test.go b/go/vt/vttablet/tabletmanager/tm_state_test.go
index 48e2123554f..537580d4853 100644
--- a/go/vt/vttablet/tabletmanager/tm_state_test.go
+++ b/go/vt/vttablet/tabletmanager/tm_state_test.go
@@ -28,7 +28,7 @@ import (
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/key"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
+ "vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/faketopo"
@@ -105,7 +105,7 @@ func TestStateDenyList(t *testing.T) {
tm := newTestTM(t, ts, 1, "ks", "0")
defer tm.Stop()
- fmd := tm.MysqlDaemon.(*fakemysqldaemon.FakeMysqlDaemon)
+ fmd := tm.MysqlDaemon.(*mysqlctl.FakeMysqlDaemon)
fmd.Schema = &tabletmanagerdatapb.SchemaDefinition{
TableDefinitions: []*tabletmanagerdatapb.TableDefinition{{
Name: "t1",
diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go
index 11e57b36fcb..d2c0f2f04c6 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/controller_test.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/controller_test.go
@@ -23,12 +23,12 @@ import (
"testing"
"time"
+ "vitess.io/vitess/go/vt/mysqlctl"
querypb "vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
@@ -89,7 +89,7 @@ func TestControllerKeyRange(t *testing.T) {
dbClient.ExpectRequest("commit", nil, nil)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, nil)
if err != nil {
@@ -125,7 +125,7 @@ func TestControllerTables(t *testing.T) {
dbClient.ExpectRequest("commit", nil, nil)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{
+ mysqld := &mysqlctl.FakeMysqlDaemon{
MysqlPort: sync2.NewAtomicInt32(3306),
Schema: &tabletmanagerdatapb.SchemaDefinition{
DatabaseSchema: "",
@@ -218,7 +218,7 @@ func TestControllerOverrides(t *testing.T) {
dbClient.ExpectRequest("commit", nil, nil)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, nil)
if err != nil {
@@ -286,7 +286,7 @@ func TestControllerRetry(t *testing.T) {
dbClient.ExpectRequestRE("update _vt.vreplication set pos='MariaDB/0-1-1235', time_updated=.*", testDMLResponse, nil)
dbClient.ExpectRequest("commit", nil, nil)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "rdonly", nil, nil)
if err != nil {
@@ -346,7 +346,7 @@ func TestControllerStopPosition(t *testing.T) {
dbClient.ExpectRequest("update _vt.vreplication set state='Stopped', message='Reached stopping position, done playing logs' where id=1", testDMLResponse, nil)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
ct, err := newController(context.Background(), params, dbClientFactory, mysqld, env.TopoServ, env.Cells[0], "replica", nil, nil)
if err != nil {
diff --git a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go
index b560761273b..1a8fe307fa1 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/engine_test.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/engine_test.go
@@ -31,7 +31,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
+ "vitess.io/vitess/go/vt/mysqlctl"
)
func TestEngineOpen(t *testing.T) {
@@ -41,7 +41,7 @@ func TestEngineOpen(t *testing.T) {
resetBinlogClient()
dbClient := binlogplayer.NewMockDBClient(t)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil)
require.False(t, vre.IsOpen())
@@ -81,7 +81,7 @@ func TestEngineOpenRetry(t *testing.T) {
resetBinlogClient()
dbClient := binlogplayer.NewMockDBClient(t)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil)
@@ -142,7 +142,7 @@ func TestEngineExec(t *testing.T) {
resetBinlogClient()
dbClient := binlogplayer.NewMockDBClient(t)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
// Test Insert
@@ -306,7 +306,7 @@ func TestEngineBadInsert(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil)
@@ -334,7 +334,7 @@ func TestEngineSelect(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil)
@@ -367,7 +367,7 @@ func TestWaitForPos(t *testing.T) {
waitRetryTime = 10 * time.Millisecond
dbClient := binlogplayer.NewMockDBClient(t)
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil)
@@ -395,7 +395,7 @@ func TestWaitForPos(t *testing.T) {
func TestWaitForPosError(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil)
@@ -431,7 +431,7 @@ func TestWaitForPosError(t *testing.T) {
func TestWaitForPosCancel(t *testing.T) {
dbClient := binlogplayer.NewMockDBClient(t)
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil)
@@ -474,7 +474,7 @@ func TestGetDBClient(t *testing.T) {
dbClientFactoryDba := func() binlogplayer.DBClient { return dbClientDba }
dbClientFactoryFiltered := func() binlogplayer.DBClient { return dbClientFiltered }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
vre := NewTestEngine(env.TopoServ, env.Cells[0], mysqld, dbClientFactoryFiltered, dbClientFactoryDba, dbClientDba.DBName(), nil)
shouldBeDbaClient := vre.getDBClient(true /*runAsAdmin*/)
diff --git a/go/vt/vttablet/tabletmanager/vreplication/fuzz.go b/go/vt/vttablet/tabletmanager/vreplication/fuzz.go
index 55c1e743dad..0fcfcce9660 100644
--- a/go/vt/vttablet/tabletmanager/vreplication/fuzz.go
+++ b/go/vt/vttablet/tabletmanager/vreplication/fuzz.go
@@ -25,7 +25,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
+ "vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/topo/memorytopo"
fuzz "github.com/AdaLogics/go-fuzz-headers"
@@ -94,7 +94,7 @@ func FuzzEngine(data []byte) int {
resetBinlogClient()
dbClient := binlogplayer.NewMockDBClient(t)
dbClientFactory := func() binlogplayer.DBClient { return dbClient }
- mysqld := &fakemysqldaemon.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
+ mysqld := &mysqlctl.FakeMysqlDaemon{MysqlPort: sync2.NewAtomicInt32(3306)}
vre := NewTestEngine(topoServer, "cell1", mysqld, dbClientFactory, dbClientFactory, dbClient.DBName(), nil)
diff --git a/go/vt/vttablet/tabletserver/repltracker/poller_test.go b/go/vt/vttablet/tabletserver/repltracker/poller_test.go
index 3dc27c771ca..e0734118160 100644
--- a/go/vt/vttablet/tabletserver/repltracker/poller_test.go
+++ b/go/vt/vttablet/tabletserver/repltracker/poller_test.go
@@ -23,12 +23,12 @@ import (
"github.com/stretchr/testify/assert"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
+ "vitess.io/vitess/go/vt/mysqlctl"
)
func TestPoller(t *testing.T) {
poller := &poller{}
- mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil)
+ mysqld := mysqlctl.NewFakeMysqlDaemon(nil)
poller.InitDBConfig(mysqld)
mysqld.ReplicationStatusError = errors.New("err")
diff --git a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go
index 362148cd3b2..67f15d44ff2 100644
--- a/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go
+++ b/go/vt/vttablet/tabletserver/repltracker/repltracker_test.go
@@ -25,7 +25,7 @@ import (
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/vt/dbconfigs"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
+ "vitess.io/vitess/go/vt/mysqlctl"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
@@ -47,7 +47,7 @@ func TestReplTracker(t *testing.T) {
Uid: 1,
}
target := &querypb.Target{}
- mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil)
+ mysqld := mysqlctl.NewFakeMysqlDaemon(nil)
rt := NewReplTracker(env, alias)
rt.InitDBConfig(target, mysqld)
diff --git a/go/vt/wrangler/doc_test.md b/go/vt/wrangler/doc_test.md
index 4fd445581da..c84a3720225 100644
--- a/go/vt/wrangler/doc_test.md
+++ b/go/vt/wrangler/doc_test.md
@@ -43,7 +43,7 @@ test the workflow state machine. There is no actual data being vreplicated.
#### The fake MySQLDaemon
-`go/vt/mysqlctl/fakemysqldaemon/fakemysqldaemon.go`
+`go/vt/mysqlctl/fakemysqldaemon.go`
Used to set primary positions to provide/validate gtids.
diff --git a/go/vt/wrangler/fake_tablet_test.go b/go/vt/wrangler/fake_tablet_test.go
index 9fdb6e616a1..f0ce1d6704c 100644
--- a/go/vt/wrangler/fake_tablet_test.go
+++ b/go/vt/wrangler/fake_tablet_test.go
@@ -29,7 +29,7 @@ import (
"vitess.io/vitess/go/mysql/fakesqldb"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/vt/dbconfigs"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
+ "vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/grpctmserver"
"vitess.io/vitess/go/vt/vttablet/tabletconntest"
@@ -67,7 +67,7 @@ type fakeTablet struct {
// We also create the RPCServer, so users can register more services
// before calling StartActionLoop().
Tablet *topodatapb.Tablet
- FakeMysqlDaemon *fakemysqldaemon.FakeMysqlDaemon
+ FakeMysqlDaemon *mysqlctl.FakeMysqlDaemon
RPCServer *grpc.Server
// The following fields are created when we start the event loop for
@@ -134,7 +134,7 @@ func newFakeTablet(t *testing.T, wr *Wrangler, cell string, uid uint32, tabletTy
}
// create a FakeMysqlDaemon with the right information by default
- fakeMysqlDaemon := fakemysqldaemon.NewFakeMysqlDaemon(db)
+ fakeMysqlDaemon := mysqlctl.NewFakeMysqlDaemon(db)
fakeMysqlDaemon.MysqlPort.Set(mysqlPort)
return &fakeTablet{
diff --git a/go/vt/wrangler/testlib/fake_tablet.go b/go/vt/wrangler/testlib/fake_tablet.go
index ff8d0457517..ae8f086d78d 100644
--- a/go/vt/wrangler/testlib/fake_tablet.go
+++ b/go/vt/wrangler/testlib/fake_tablet.go
@@ -34,7 +34,7 @@ import (
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
- "vitess.io/vitess/go/vt/mysqlctl/fakemysqldaemon"
+ "vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/grpctmserver"
@@ -69,7 +69,7 @@ type FakeTablet struct {
// We also create the RPCServer, so users can register more services
// before calling StartActionLoop().
Tablet *topodatapb.Tablet
- FakeMysqlDaemon *fakemysqldaemon.FakeMysqlDaemon
+ FakeMysqlDaemon *mysqlctl.FakeMysqlDaemon
RPCServer *grpc.Server
// The following fields are created when we start the event loop for
@@ -159,7 +159,7 @@ func NewFakeTablet(t *testing.T, wr *wrangler.Wrangler, cell string, uid uint32,
}
// create a FakeMysqlDaemon with the right information by default
- fakeMysqlDaemon := fakemysqldaemon.NewFakeMysqlDaemon(db)
+ fakeMysqlDaemon := mysqlctl.NewFakeMysqlDaemon(db)
fakeMysqlDaemon.MysqlPort.Set(mysqlPort)
return &FakeTablet{