diff --git a/go/test/endtoend/backup/vtctlbackup/backup_utils.go b/go/test/endtoend/backup/vtctlbackup/backup_utils.go index 87e854b7d64..2b241feaeb5 100644 --- a/go/test/endtoend/backup/vtctlbackup/backup_utils.go +++ b/go/test/endtoend/backup/vtctlbackup/backup_utils.go @@ -32,6 +32,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/textutil" "vitess.io/vitess/go/vt/mysqlctl" + "vitess.io/vitess/go/vt/mysqlctl/backupstorage" "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" @@ -421,6 +422,8 @@ func primaryBackup(t *testing.T) { backups = localCluster.VerifyBackupCount(t, shardKsName, 2) assert.Contains(t, backups[1], primary.Alias) + verifyTabletBackupStats(t, primary.VttabletProcess.GetVars()) + // Perform PRS to demote the primary tablet (primary) so that we can do a restore there and verify we don't have the // data from after the older/first backup err = localCluster.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "--", @@ -439,6 +442,8 @@ func primaryBackup(t *testing.T) { err = localCluster.VtctlclientProcess.ExecuteCommand("RestoreFromBackup", "--", "--backup_timestamp", firstBackupTimestamp, primary.Alias) require.Nil(t, err) + verifyTabletRestoreStats(t, primary.VttabletProcess.GetVars()) + // Re-init the shard -- making the original primary tablet (primary) primary again -- for subsequent tests err = localCluster.VtctlclientProcess.InitShardPrimary(keyspaceName, shardName, cell, primary.TabletUID) require.Nil(t, err) @@ -465,6 +470,8 @@ func primaryReplicaSameBackup(t *testing.T) { err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias) require.Nil(t, err) + verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) + // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) require.Nil(t, err) @@ -531,6 +538,8 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias) require.Nil(t, err) + verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) + // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) require.Nil(t, err) @@ -583,6 +592,8 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) { err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica2.Alias) require.Nil(t, err) + verifyTabletBackupStats(t, replica2.VttabletProcess.GetVars()) + // Force replica2 to restore from backup. verifyRestoreTablet(t, replica2, "SERVING") cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 4) @@ -621,6 +632,8 @@ func testRestoreOldPrimary(t *testing.T, method restoreMethod) { err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias) require.Nil(t, err) + verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) + // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) require.Nil(t, err) @@ -641,6 +654,8 @@ func testRestoreOldPrimary(t *testing.T, method restoreMethod) { // wait for it to catch up. cluster.VerifyRowsInTablet(t, primary, keyspaceName, 3) + verifyTabletRestoreStats(t, primary.VttabletProcess.GetVars()) + // teardown restartPrimaryAndReplica(t) } @@ -722,6 +737,8 @@ func terminatedRestore(t *testing.T) { err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias) require.Nil(t, err) + verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) + // insert more data on the primary _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) require.Nil(t, err) @@ -753,6 +770,9 @@ func terminatedRestore(t *testing.T) { assert.True(t, os.IsNotExist(err)) cluster.VerifyRowsInTablet(t, primary, keyspaceName, 3) + + verifyTabletRestoreStats(t, primary.VttabletProcess.GetVars()) + stopAllTablets() } @@ -789,6 +809,8 @@ func vtctlBackup(t *testing.T, tabletType string) { backups := localCluster.VerifyBackupCount(t, shardKsName, 1) + verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) + _, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true) require.Nil(t, err) @@ -810,7 +832,6 @@ func vtctlBackup(t *testing.T, tabletType string) { require.Nil(t, err) _, err = primary.VttabletProcess.QueryTablet("DROP TABLE vt_insert_test", keyspaceName, true) require.Nil(t, err) - } func InitTestTable(t *testing.T) { @@ -905,6 +926,9 @@ func terminateRestore(t *testing.T) { if useXtrabackup { stopRestoreMsg = "Restore: Preparing" useXtrabackup = false + defer func() { + useXtrabackup = true + }() } args := append([]string{"--server", localCluster.VtctlclientProcess.Server, "--alsologtostderr"}, "RestoreFromBackup", "--", primary.Alias) @@ -943,6 +967,8 @@ func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, tabletType string) (backu backups = localCluster.VerifyBackupCount(t, shardKsName, 1) + verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) + err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second) require.Nil(t, err) @@ -1036,6 +1062,7 @@ func TestReplicaIncrementalBackup(t *testing.T, incrementalFromPos mysql.Positio backups, err := localCluster.ListBackups(shardKsName) require.NoError(t, err) + verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars()) backupName = backups[len(backups)-1] backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backupName return readManifestFile(t, backupLocation), backupName @@ -1051,4 +1078,91 @@ func TestReplicaRestoreToPos(t *testing.T, restoreToPos mysql.Position, expectEr return } require.NoErrorf(t, err, "output: %v", output) + verifyTabletRestoreStats(t, replica1.VttabletProcess.GetVars()) +} + +func verifyTabletBackupStats(t *testing.T, vars map[string]any) { + // Currently only the builtin backup engine instruments bytes-processed + // counts. + if !useXtrabackup { + require.Contains(t, vars, "BackupBytes") + bb := vars["BackupBytes"].(map[string]any) + require.Contains(t, bb, "BackupEngine.Builtin.Compressor:Write") + require.Contains(t, bb, "BackupEngine.Builtin.Destination:Write") + require.Contains(t, bb, "BackupEngine.Builtin.Source:Read") + if backupstorage.BackupStorageImplementation == "file" { + require.Contains(t, bb, "BackupStorage.File.File:Write") + } + } + + require.Contains(t, vars, "BackupCount") + bc := vars["BackupCount"].(map[string]any) + require.Contains(t, bc, "-.-.Backup") + // Currently only the builtin backup engine implements operation counts. + if !useXtrabackup { + require.Contains(t, bc, "BackupEngine.Builtin.Compressor:Close") + require.Contains(t, bc, "BackupEngine.Builtin.Destination:Close") + require.Contains(t, bc, "BackupEngine.Builtin.Destination:Open") + require.Contains(t, bc, "BackupEngine.Builtin.Source:Close") + require.Contains(t, bc, "BackupEngine.Builtin.Source:Open") + } + + require.Contains(t, vars, "BackupDurationNanoseconds") + bd := vars["BackupDurationNanoseconds"] + require.Contains(t, bd, "-.-.Backup") + // Currently only the builtin backup engine emits timings. + if !useXtrabackup { + require.Contains(t, bd, "BackupEngine.Builtin.Compressor:Close") + require.Contains(t, bd, "BackupEngine.Builtin.Compressor:Write") + require.Contains(t, bd, "BackupEngine.Builtin.Destination:Close") + require.Contains(t, bd, "BackupEngine.Builtin.Destination:Open") + require.Contains(t, bd, "BackupEngine.Builtin.Destination:Write") + require.Contains(t, bd, "BackupEngine.Builtin.Source:Close") + require.Contains(t, bd, "BackupEngine.Builtin.Source:Open") + require.Contains(t, bd, "BackupEngine.Builtin.Source:Read") + } + if backupstorage.BackupStorageImplementation == "file" { + require.Contains(t, bd, "BackupStorage.File.File:Write") + } +} + +func verifyTabletRestoreStats(t *testing.T, vars map[string]any) { + // Currently only the builtin backup engine instruments bytes-processed + // counts. + if !useXtrabackup { + require.Contains(t, vars, "RestoreBytes") + bb := vars["RestoreBytes"].(map[string]any) + require.Contains(t, bb, "BackupEngine.Builtin.Decompressor:Read") + require.Contains(t, bb, "BackupEngine.Builtin.Destination:Write") + require.Contains(t, bb, "BackupEngine.Builtin.Source:Read") + require.Contains(t, bb, "BackupStorage.File.File:Read") + } + + require.Contains(t, vars, "RestoreCount") + bc := vars["RestoreCount"].(map[string]any) + require.Contains(t, bc, "-.-.Restore") + // Currently only the builtin backup engine emits operation counts. + if !useXtrabackup { + require.Contains(t, bc, "BackupEngine.Builtin.Decompressor:Close") + require.Contains(t, bc, "BackupEngine.Builtin.Destination:Close") + require.Contains(t, bc, "BackupEngine.Builtin.Destination:Open") + require.Contains(t, bc, "BackupEngine.Builtin.Source:Close") + require.Contains(t, bc, "BackupEngine.Builtin.Source:Open") + } + + require.Contains(t, vars, "RestoreDurationNanoseconds") + bd := vars["RestoreDurationNanoseconds"] + require.Contains(t, bd, "-.-.Restore") + // Currently only the builtin backup engine emits timings. + if !useXtrabackup { + require.Contains(t, bd, "BackupEngine.Builtin.Decompressor:Close") + require.Contains(t, bd, "BackupEngine.Builtin.Decompressor:Read") + require.Contains(t, bd, "BackupEngine.Builtin.Destination:Close") + require.Contains(t, bd, "BackupEngine.Builtin.Destination:Open") + require.Contains(t, bd, "BackupEngine.Builtin.Destination:Write") + require.Contains(t, bd, "BackupEngine.Builtin.Source:Close") + require.Contains(t, bd, "BackupEngine.Builtin.Source:Open") + require.Contains(t, bd, "BackupEngine.Builtin.Source:Read") + } + require.Contains(t, bd, "BackupStorage.File.File:Read") } diff --git a/go/vt/mysqlctl/backupstats/stats.go b/go/vt/mysqlctl/backupstats/stats.go index e81bd569a97..1d4b643a0e9 100644 --- a/go/vt/mysqlctl/backupstats/stats.go +++ b/go/vt/mysqlctl/backupstats/stats.go @@ -194,6 +194,6 @@ func (s *scopedStats) TimedIncrement(d time.Duration) { // TimedIncrementBytes increments the byte-count and duration of the current scope. func (s *scopedStats) TimedIncrementBytes(b int, d time.Duration) { - s.bytes.Add(s.labelValues, 1) + s.bytes.Add(s.labelValues, int64(b)) s.durationNs.Add(s.labelValues, int64(d.Nanoseconds())) } diff --git a/go/vt/mysqlctl/backupstats/stats_test.go b/go/vt/mysqlctl/backupstats/stats_test.go index f88bfdf8fb6..7fe61f0be60 100644 --- a/go/vt/mysqlctl/backupstats/stats_test.go +++ b/go/vt/mysqlctl/backupstats/stats_test.go @@ -1,6 +1,7 @@ package backupstats import ( + "fmt" "strings" "testing" "time" @@ -11,6 +12,7 @@ import ( ) func TestBackupStats(t *testing.T) { + require.Nil(t, backupBytes) require.Nil(t, backupCount) require.Nil(t, backupDurationNs) require.Nil(t, restoreCount) @@ -19,13 +21,16 @@ func TestBackupStats(t *testing.T) { BackupStats() defer resetStats() + require.NotNil(t, backupBytes) require.NotNil(t, backupCount) require.NotNil(t, backupDurationNs) + require.Nil(t, restoreBytes) require.Nil(t, restoreCount) require.Nil(t, restoreDurationNs) } func TestRestoreStats(t *testing.T) { + require.Nil(t, backupBytes) require.Nil(t, backupCount) require.Nil(t, backupDurationNs) require.Nil(t, restoreCount) @@ -34,8 +39,10 @@ func TestRestoreStats(t *testing.T) { RestoreStats() defer resetStats() + require.Nil(t, backupBytes) require.Nil(t, backupCount) require.Nil(t, backupDurationNs) + require.NotNil(t, restoreBytes) require.NotNil(t, restoreCount) require.NotNil(t, restoreDurationNs) } @@ -94,16 +101,18 @@ func TestScope(t *testing.T) { } func TestStatsAreNotInitializedByDefault(t *testing.T) { + require.Nil(t, backupBytes) require.Nil(t, backupCount) require.Nil(t, backupDurationNs) + require.Nil(t, restoreBytes) require.Nil(t, restoreCount) require.Nil(t, restoreDurationNs) } func TestTimedIncrement(t *testing.T) { - bytes := stats.NewCountersWithMultiLabels("test_timed_increment_bytes", "", labels) - count := stats.NewCountersWithMultiLabels("test_timed_increment_count", "", labels) - durationNs := stats.NewCountersWithMultiLabels("test_timed_increment_duration_ns", "", labels) + bytes := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_bytes", t.Name()), "", labels) + count := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_count", t.Name()), "", labels) + durationNs := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_duration_ns", t.Name()), "", labels) stats := newScopedStats(bytes, count, durationNs, nil) @@ -112,6 +121,8 @@ func TestTimedIncrement(t *testing.T) { stats.TimedIncrement(duration) + require.Equal(t, 0, len(bytes.Counts())) + require.Equal(t, 1, len(count.Counts())) require.Equal(t, int64(1), count.Counts()[path]) @@ -120,6 +131,8 @@ func TestTimedIncrement(t *testing.T) { stats.TimedIncrement(duration) + require.Equal(t, 0, len(bytes.Counts())) + require.Equal(t, 1, len(count.Counts())) require.Equal(t, int64(2), count.Counts()[path]) @@ -127,9 +140,43 @@ func TestTimedIncrement(t *testing.T) { require.Equal(t, 2*duration.Nanoseconds(), durationNs.Counts()[path]) } +func TestTimedIncrementBytes(t *testing.T) { + bytes := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_bytes", t.Name()), "", labels) + count := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_count", t.Name()), "", labels) + durationNs := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_duration_ns", t.Name()), "", labels) + + stats := newScopedStats(bytes, count, durationNs, nil) + + incBytes := 1024 + duration := 10 * time.Second + path := strings.Join([]string{unscoped, unscoped, unscoped}, ".") + + stats.TimedIncrementBytes(incBytes, duration) + + require.Equal(t, 1, len(bytes.Counts())) + require.Equal(t, int64(incBytes), bytes.Counts()[path]) + + require.Equal(t, 0, len(count.Counts())) + + require.Equal(t, 1, len(durationNs.Counts())) + require.Equal(t, duration.Nanoseconds(), durationNs.Counts()[path]) + + stats.TimedIncrementBytes(incBytes, duration) + + require.Equal(t, 1, len(bytes.Counts())) + require.Equal(t, int64(2*incBytes), bytes.Counts()[path]) + + require.Equal(t, 0, len(count.Counts())) + + require.Equal(t, 1, len(durationNs.Counts())) + require.Equal(t, 2*duration.Nanoseconds(), durationNs.Counts()[path]) +} + func resetStats() { + backupBytes = nil backupCount = nil backupDurationNs = nil + restoreBytes = nil restoreCount = nil restoreDurationNs = nil }