Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 115 additions & 1 deletion go/test/endtoend/backup/vtctlbackup/backup_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/textutil"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/mysqlctl/backupstorage"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -421,6 +422,8 @@ func primaryBackup(t *testing.T) {
backups = localCluster.VerifyBackupCount(t, shardKsName, 2)
assert.Contains(t, backups[1], primary.Alias)

verifyTabletBackupStats(t, primary.VttabletProcess.GetVars())

// Perform PRS to demote the primary tablet (primary) so that we can do a restore there and verify we don't have the
// data from after the older/first backup
err = localCluster.VtctlclientProcess.ExecuteCommand("PlannedReparentShard", "--",
Expand All @@ -439,6 +442,8 @@ func primaryBackup(t *testing.T) {
err = localCluster.VtctlclientProcess.ExecuteCommand("RestoreFromBackup", "--", "--backup_timestamp", firstBackupTimestamp, primary.Alias)
require.Nil(t, err)

verifyTabletRestoreStats(t, primary.VttabletProcess.GetVars())

// Re-init the shard -- making the original primary tablet (primary) primary again -- for subsequent tests
err = localCluster.VtctlclientProcess.InitShardPrimary(keyspaceName, shardName, cell, primary.TabletUID)
require.Nil(t, err)
Expand All @@ -465,6 +470,8 @@ func primaryReplicaSameBackup(t *testing.T) {
err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
require.Nil(t, err)

verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars())

// insert more data on the primary
_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
require.Nil(t, err)
Expand Down Expand Up @@ -531,6 +538,8 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) {
err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
require.Nil(t, err)

verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars())

// insert more data on the primary
_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
require.Nil(t, err)
Expand Down Expand Up @@ -583,6 +592,8 @@ func primaryReplicaSameBackupModifiedCompressionEngine(t *testing.T) {
err = localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica2.Alias)
require.Nil(t, err)

verifyTabletBackupStats(t, replica2.VttabletProcess.GetVars())

// Force replica2 to restore from backup.
verifyRestoreTablet(t, replica2, "SERVING")
cluster.VerifyRowsInTablet(t, replica2, keyspaceName, 4)
Expand Down Expand Up @@ -621,6 +632,8 @@ func testRestoreOldPrimary(t *testing.T, method restoreMethod) {
err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
require.Nil(t, err)

verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars())

// insert more data on the primary
_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
require.Nil(t, err)
Expand All @@ -641,6 +654,8 @@ func testRestoreOldPrimary(t *testing.T, method restoreMethod) {
// wait for it to catch up.
cluster.VerifyRowsInTablet(t, primary, keyspaceName, 3)

verifyTabletRestoreStats(t, primary.VttabletProcess.GetVars())

// teardown
restartPrimaryAndReplica(t)
}
Expand Down Expand Up @@ -722,6 +737,8 @@ func terminatedRestore(t *testing.T) {
err := localCluster.VtctlclientProcess.ExecuteCommand("Backup", replica1.Alias)
require.Nil(t, err)

verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars())

// insert more data on the primary
_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
require.Nil(t, err)
Expand Down Expand Up @@ -753,6 +770,9 @@ func terminatedRestore(t *testing.T) {
assert.True(t, os.IsNotExist(err))

cluster.VerifyRowsInTablet(t, primary, keyspaceName, 3)

verifyTabletRestoreStats(t, primary.VttabletProcess.GetVars())

stopAllTablets()
}

Expand Down Expand Up @@ -789,6 +809,8 @@ func vtctlBackup(t *testing.T, tabletType string) {

backups := localCluster.VerifyBackupCount(t, shardKsName, 1)

verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars())

_, err = primary.VttabletProcess.QueryTablet("insert into vt_insert_test (msg) values ('test2')", keyspaceName, true)
require.Nil(t, err)

Expand All @@ -810,7 +832,6 @@ func vtctlBackup(t *testing.T, tabletType string) {
require.Nil(t, err)
_, err = primary.VttabletProcess.QueryTablet("DROP TABLE vt_insert_test", keyspaceName, true)
require.Nil(t, err)

}

func InitTestTable(t *testing.T) {
Expand Down Expand Up @@ -905,6 +926,9 @@ func terminateRestore(t *testing.T) {
if useXtrabackup {
stopRestoreMsg = "Restore: Preparing"
useXtrabackup = false
defer func() {
useXtrabackup = true
}()
}

args := append([]string{"--server", localCluster.VtctlclientProcess.Server, "--alsologtostderr"}, "RestoreFromBackup", "--", primary.Alias)
Expand Down Expand Up @@ -943,6 +967,8 @@ func vtctlBackupReplicaNoDestroyNoWrites(t *testing.T, tabletType string) (backu

backups = localCluster.VerifyBackupCount(t, shardKsName, 1)

verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars())

err = replica2.VttabletProcess.WaitForTabletStatusesForTimeout([]string{"SERVING"}, 25*time.Second)
require.Nil(t, err)

Expand Down Expand Up @@ -1036,6 +1062,7 @@ func TestReplicaIncrementalBackup(t *testing.T, incrementalFromPos mysql.Positio

backups, err := localCluster.ListBackups(shardKsName)
require.NoError(t, err)
verifyTabletBackupStats(t, replica1.VttabletProcess.GetVars())
backupName = backups[len(backups)-1]
backupLocation := localCluster.CurrentVTDATAROOT + "/backups/" + shardKsName + "/" + backupName
return readManifestFile(t, backupLocation), backupName
Expand All @@ -1051,4 +1078,91 @@ func TestReplicaRestoreToPos(t *testing.T, restoreToPos mysql.Position, expectEr
return
}
require.NoErrorf(t, err, "output: %v", output)
verifyTabletRestoreStats(t, replica1.VttabletProcess.GetVars())
}

func verifyTabletBackupStats(t *testing.T, vars map[string]any) {
// Currently only the builtin backup engine instruments bytes-processed
// counts.
if !useXtrabackup {
require.Contains(t, vars, "BackupBytes")
bb := vars["BackupBytes"].(map[string]any)
require.Contains(t, bb, "BackupEngine.Builtin.Compressor:Write")
require.Contains(t, bb, "BackupEngine.Builtin.Destination:Write")
require.Contains(t, bb, "BackupEngine.Builtin.Source:Read")
if backupstorage.BackupStorageImplementation == "file" {
require.Contains(t, bb, "BackupStorage.File.File:Write")
}
}

require.Contains(t, vars, "BackupCount")
bc := vars["BackupCount"].(map[string]any)
require.Contains(t, bc, "-.-.Backup")
// Currently only the builtin backup engine implements operation counts.
if !useXtrabackup {
require.Contains(t, bc, "BackupEngine.Builtin.Compressor:Close")
require.Contains(t, bc, "BackupEngine.Builtin.Destination:Close")
require.Contains(t, bc, "BackupEngine.Builtin.Destination:Open")
require.Contains(t, bc, "BackupEngine.Builtin.Source:Close")
require.Contains(t, bc, "BackupEngine.Builtin.Source:Open")
}

require.Contains(t, vars, "BackupDurationNanoseconds")
bd := vars["BackupDurationNanoseconds"]
require.Contains(t, bd, "-.-.Backup")
// Currently only the builtin backup engine emits timings.
if !useXtrabackup {
require.Contains(t, bd, "BackupEngine.Builtin.Compressor:Close")
require.Contains(t, bd, "BackupEngine.Builtin.Compressor:Write")
require.Contains(t, bd, "BackupEngine.Builtin.Destination:Close")
require.Contains(t, bd, "BackupEngine.Builtin.Destination:Open")
require.Contains(t, bd, "BackupEngine.Builtin.Destination:Write")
require.Contains(t, bd, "BackupEngine.Builtin.Source:Close")
require.Contains(t, bd, "BackupEngine.Builtin.Source:Open")
require.Contains(t, bd, "BackupEngine.Builtin.Source:Read")
}
if backupstorage.BackupStorageImplementation == "file" {
require.Contains(t, bd, "BackupStorage.File.File:Write")
}
}

func verifyTabletRestoreStats(t *testing.T, vars map[string]any) {
// Currently only the builtin backup engine instruments bytes-processed
// counts.
if !useXtrabackup {
require.Contains(t, vars, "RestoreBytes")
bb := vars["RestoreBytes"].(map[string]any)
require.Contains(t, bb, "BackupEngine.Builtin.Decompressor:Read")
require.Contains(t, bb, "BackupEngine.Builtin.Destination:Write")
require.Contains(t, bb, "BackupEngine.Builtin.Source:Read")
require.Contains(t, bb, "BackupStorage.File.File:Read")
}

require.Contains(t, vars, "RestoreCount")
bc := vars["RestoreCount"].(map[string]any)
require.Contains(t, bc, "-.-.Restore")
// Currently only the builtin backup engine emits operation counts.
if !useXtrabackup {
require.Contains(t, bc, "BackupEngine.Builtin.Decompressor:Close")
require.Contains(t, bc, "BackupEngine.Builtin.Destination:Close")
require.Contains(t, bc, "BackupEngine.Builtin.Destination:Open")
require.Contains(t, bc, "BackupEngine.Builtin.Source:Close")
require.Contains(t, bc, "BackupEngine.Builtin.Source:Open")
}

require.Contains(t, vars, "RestoreDurationNanoseconds")
bd := vars["RestoreDurationNanoseconds"]
require.Contains(t, bd, "-.-.Restore")
// Currently only the builtin backup engine emits timings.
if !useXtrabackup {
require.Contains(t, bd, "BackupEngine.Builtin.Decompressor:Close")
require.Contains(t, bd, "BackupEngine.Builtin.Decompressor:Read")
require.Contains(t, bd, "BackupEngine.Builtin.Destination:Close")
require.Contains(t, bd, "BackupEngine.Builtin.Destination:Open")
require.Contains(t, bd, "BackupEngine.Builtin.Destination:Write")
require.Contains(t, bd, "BackupEngine.Builtin.Source:Close")
require.Contains(t, bd, "BackupEngine.Builtin.Source:Open")
require.Contains(t, bd, "BackupEngine.Builtin.Source:Read")
}
require.Contains(t, bd, "BackupStorage.File.File:Read")
}
2 changes: 1 addition & 1 deletion go/vt/mysqlctl/backupstats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,6 @@ func (s *scopedStats) TimedIncrement(d time.Duration) {

// TimedIncrementBytes increments the byte-count and duration of the current scope.
func (s *scopedStats) TimedIncrementBytes(b int, d time.Duration) {
s.bytes.Add(s.labelValues, 1)
s.bytes.Add(s.labelValues, int64(b))
s.durationNs.Add(s.labelValues, int64(d.Nanoseconds()))
}
53 changes: 50 additions & 3 deletions go/vt/mysqlctl/backupstats/stats_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package backupstats

import (
"fmt"
"strings"
"testing"
"time"
Expand All @@ -11,6 +12,7 @@ import (
)

func TestBackupStats(t *testing.T) {
require.Nil(t, backupBytes)
require.Nil(t, backupCount)
require.Nil(t, backupDurationNs)
require.Nil(t, restoreCount)
Expand All @@ -19,13 +21,16 @@ func TestBackupStats(t *testing.T) {
BackupStats()
defer resetStats()

require.NotNil(t, backupBytes)
require.NotNil(t, backupCount)
require.NotNil(t, backupDurationNs)
require.Nil(t, restoreBytes)
require.Nil(t, restoreCount)
require.Nil(t, restoreDurationNs)
}

func TestRestoreStats(t *testing.T) {
require.Nil(t, backupBytes)
require.Nil(t, backupCount)
require.Nil(t, backupDurationNs)
require.Nil(t, restoreCount)
Expand All @@ -34,8 +39,10 @@ func TestRestoreStats(t *testing.T) {
RestoreStats()
defer resetStats()

require.Nil(t, backupBytes)
require.Nil(t, backupCount)
require.Nil(t, backupDurationNs)
require.NotNil(t, restoreBytes)
require.NotNil(t, restoreCount)
require.NotNil(t, restoreDurationNs)
}
Expand Down Expand Up @@ -94,16 +101,18 @@ func TestScope(t *testing.T) {
}

func TestStatsAreNotInitializedByDefault(t *testing.T) {
require.Nil(t, backupBytes)
require.Nil(t, backupCount)
require.Nil(t, backupDurationNs)
require.Nil(t, restoreBytes)
require.Nil(t, restoreCount)
require.Nil(t, restoreDurationNs)
}

func TestTimedIncrement(t *testing.T) {
bytes := stats.NewCountersWithMultiLabels("test_timed_increment_bytes", "", labels)
count := stats.NewCountersWithMultiLabels("test_timed_increment_count", "", labels)
durationNs := stats.NewCountersWithMultiLabels("test_timed_increment_duration_ns", "", labels)
bytes := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_bytes", t.Name()), "", labels)
count := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_count", t.Name()), "", labels)
durationNs := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_duration_ns", t.Name()), "", labels)

stats := newScopedStats(bytes, count, durationNs, nil)

Expand All @@ -112,6 +121,8 @@ func TestTimedIncrement(t *testing.T) {

stats.TimedIncrement(duration)

require.Equal(t, 0, len(bytes.Counts()))

require.Equal(t, 1, len(count.Counts()))
require.Equal(t, int64(1), count.Counts()[path])

Expand All @@ -120,16 +131,52 @@ func TestTimedIncrement(t *testing.T) {

stats.TimedIncrement(duration)

require.Equal(t, 0, len(bytes.Counts()))

require.Equal(t, 1, len(count.Counts()))
require.Equal(t, int64(2), count.Counts()[path])

require.Equal(t, 1, len(durationNs.Counts()))
require.Equal(t, 2*duration.Nanoseconds(), durationNs.Counts()[path])
}

func TestTimedIncrementBytes(t *testing.T) {
bytes := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_bytes", t.Name()), "", labels)
count := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_count", t.Name()), "", labels)
durationNs := stats.NewCountersWithMultiLabels(fmt.Sprintf("%s_test_timed_increment_duration_ns", t.Name()), "", labels)

stats := newScopedStats(bytes, count, durationNs, nil)

incBytes := 1024
duration := 10 * time.Second
path := strings.Join([]string{unscoped, unscoped, unscoped}, ".")

stats.TimedIncrementBytes(incBytes, duration)

require.Equal(t, 1, len(bytes.Counts()))
require.Equal(t, int64(incBytes), bytes.Counts()[path])

require.Equal(t, 0, len(count.Counts()))

require.Equal(t, 1, len(durationNs.Counts()))
require.Equal(t, duration.Nanoseconds(), durationNs.Counts()[path])

stats.TimedIncrementBytes(incBytes, duration)

require.Equal(t, 1, len(bytes.Counts()))
require.Equal(t, int64(2*incBytes), bytes.Counts()[path])

require.Equal(t, 0, len(count.Counts()))

require.Equal(t, 1, len(durationNs.Counts()))
require.Equal(t, 2*duration.Nanoseconds(), durationNs.Counts()[path])
}

func resetStats() {
backupBytes = nil
backupCount = nil
backupDurationNs = nil
restoreBytes = nil
restoreCount = nil
restoreDurationNs = nil
}