diff --git a/go/vt/mysqlctl/backup.go b/go/vt/mysqlctl/backup.go index b1751ba3bd9..6fc23396160 100644 --- a/go/vt/mysqlctl/backup.go +++ b/go/vt/mysqlctl/backup.go @@ -85,6 +85,10 @@ var ( backupCompressBlocks = 2 titleCase = cases.Title(language.English).String + + // backupStatsEmitInterval is the maximum frequency at which backup stats + // are emitted. + backupStatsEmitInterval = 10 * time.Second ) func init() { @@ -118,13 +122,18 @@ func Backup(ctx context.Context, params BackupParams) error { } defer bs.Close() - // Scope bsStats to selected storage engine. - bsStats := params.Stats.Scope( - stats.Component(stats.BackupStorage), - stats.Implementation( - titleCase(backupstorage.BackupStorageImplementation), + // Scope bsStats to selected storage engine and throttle stat emission. + bsStats := backupstats.Throttle( + params.Stats.Scope( + stats.Component(stats.BackupStorage), + stats.Implementation( + titleCase(backupstorage.BackupStorageImplementation), + ), ), + backupStatsEmitInterval, ) + defer bsStats.Flush() + bs = bs.WithParams(backupstorage.Params{ Logger: params.Logger, Stats: bsStats, @@ -139,12 +148,20 @@ func Backup(ctx context.Context, params BackupParams) error { if err != nil { return vterrors.Wrap(err, "failed to find backup engine") } - // Scope stats to selected backup engine. - beParams := params.Copy() - beParams.Stats = params.Stats.Scope( - stats.Component(stats.BackupEngine), - stats.Implementation(titleCase(backupEngineImplementation)), + + // Scope stats to selected backup engine and throttle stat emission. + beStats := backupstats.Throttle( + params.Stats.Scope( + stats.Component(stats.BackupEngine), + stats.Implementation(titleCase(backupEngineImplementation)), + ), + backupStatsEmitInterval, ) + defer beStats.Flush() + + beParams := params.Copy() + beParams.Stats = beStats + // Take the backup, and either AbortBackup or EndBackup. usable, err := be.ExecuteBackup(ctx, beParams, bh) logger := params.Logger diff --git a/go/vt/mysqlctl/backup_test.go b/go/vt/mysqlctl/backup_test.go index d26ca873243..84925251ba4 100644 --- a/go/vt/mysqlctl/backup_test.go +++ b/go/vt/mysqlctl/backup_test.go @@ -46,17 +46,29 @@ func TestBackupExecutesBackupWithScopedParams(t *testing.T) { require.Nil(t, Backup(env.ctx, env.backupParams), env.logger.Events) require.Equal(t, 1, len(env.backupEngine.ExecuteBackupCalls)) + + // Get the backup params passed to the backup engine. executeBackupParams := env.backupEngine.ExecuteBackupCalls[0].BackupParams - var executeBackupStats *backupstats.FakeStats + + // Get the stats that were passed along with the backup params. + executeBackupParamsStats := unwrapStats(executeBackupParams.Stats) + + // Look through the return value of stats.Scope(). Find the one that is + // equal to the stats that were passed to the backup engine. + var scopedStats *backupstats.FakeStats for _, sr := range env.stats.ScopeReturns { - if sr == executeBackupParams.Stats { - executeBackupStats = sr.(*backupstats.FakeStats) + fs := unwrapStats(sr) + if fs == executeBackupParamsStats { + scopedStats = fs.(*backupstats.FakeStats) } } - require.Contains(t, executeBackupStats.ScopeV, backupstats.ScopeComponent) - require.Equal(t, backupstats.BackupEngine.String(), executeBackupStats.ScopeV[backupstats.ScopeComponent]) - require.Contains(t, executeBackupStats.ScopeV, backupstats.ScopeImplementation) - require.Equal(t, "Fake", executeBackupStats.ScopeV[backupstats.ScopeImplementation]) + + // Validate the scope of the stats. + require.NotNil(t, scopedStats, executeBackupParams.Stats) + require.Contains(t, scopedStats.ScopeV, backupstats.ScopeComponent) + require.Equal(t, backupstats.BackupEngine.String(), scopedStats.ScopeV[backupstats.ScopeComponent]) + require.Contains(t, scopedStats.ScopeV, backupstats.ScopeImplementation) + require.Equal(t, "Fake", scopedStats.ScopeV[backupstats.ScopeImplementation]) } // TestBackupNoStats tests that if BackupParams.Stats is nil, then Backup will @@ -71,7 +83,8 @@ func TestBackupNoStats(t *testing.T) { // It parameterizes the backup storage with nop stats. require.Equal(t, 1, len(env.backupStorage.WithParamsCalls)) - require.Equal(t, backupstats.NoStats(), env.backupStorage.WithParamsCalls[0].Stats) + fs := unwrapStats(env.backupStorage.WithParamsCalls[0].Stats) + require.Equal(t, backupstats.NoStats(), fs) } // TestBackupParameterizesBackupStorageWithScopedStats tests that Backup passes @@ -83,16 +96,29 @@ func TestBackupParameterizesBackupStorageWithScopedStats(t *testing.T) { require.Nil(t, Backup(env.ctx, env.backupParams), env.logger.Events) require.Equal(t, 1, len(env.backupStorage.WithParamsCalls)) - var storageStats *backupstats.FakeStats + + // Get the params passed in to backup storage WithParams(). + backupStorageParams := env.backupStorage.WithParamsCalls[0] + + // Get the stats attached to those params. + backupStorageParamsStats := unwrapStats(backupStorageParams.Stats) + + // Look through the return value of stats.Scope(). Find the one that is + // equal to the stats that were passed to the backup storage WithParams(). + var scopedStats *backupstats.FakeStats for _, sr := range env.stats.ScopeReturns { - if sr == env.backupStorage.WithParamsCalls[0].Stats { - storageStats = sr.(*backupstats.FakeStats) + fs := unwrapStats(sr) + if fs == backupStorageParamsStats { + scopedStats = fs.(*backupstats.FakeStats) } } - require.Contains(t, storageStats.ScopeV, backupstats.ScopeComponent) - require.Equal(t, backupstats.BackupStorage.String(), storageStats.ScopeV[backupstats.ScopeComponent]) - require.Contains(t, storageStats.ScopeV, backupstats.ScopeImplementation) - require.Equal(t, "Fake", storageStats.ScopeV[backupstats.ScopeImplementation]) + + // Validate the stats scope. + require.NotNil(t, scopedStats) + require.Contains(t, scopedStats.ScopeV, backupstats.ScopeComponent) + require.Equal(t, backupstats.BackupStorage.String(), scopedStats.ScopeV[backupstats.ScopeComponent]) + require.Contains(t, scopedStats.ScopeV, backupstats.ScopeImplementation) + require.Equal(t, "Fake", scopedStats.ScopeV[backupstats.ScopeImplementation]) } // TestBackupEmitsStats tests that Backup emits stats. @@ -123,13 +149,14 @@ func TestBackupTriesToParameterizeBackupStorage(t *testing.T) { require.Equal(t, env.logger, env.backupStorage.WithParamsCalls[0].Logger) var scopedStats backupstats.Stats for _, sr := range env.stats.ScopeReturns { - if sr != env.backupStorage.WithParamsCalls[0].Stats { + fs := unwrapStats(sr) + if fs != unwrapStats(env.backupStorage.WithParamsCalls[0].Stats) { continue } if scopedStats != nil { require.Fail(t, "backupstorage stats matches multiple scoped stats produced by parent stats") } - scopedStats = sr + scopedStats = fs } require.NotNil(t, scopedStats) } @@ -547,3 +574,14 @@ func (fbe *fakeBackupRestoreEnv) setStats(stats *backupstats.FakeStats) { fbe.restoreParams.Stats = nil fbe.stats = nil } + +// unwrapStats recursively unwraps a non-throttled Stats object from inside +// ThrottledStats. That's useful in the context of this test suite so we can +// validate interactions with FakeStats and NoStats. +func unwrapStats(stats backupstats.Stats) backupstats.Stats { + ts, ok := stats.(*backupstats.ThrottledStats) + if ok { + return unwrapStats(ts.Stats()) + } + return stats +} diff --git a/go/vt/mysqlctl/backupstats/memory_stats.go b/go/vt/mysqlctl/backupstats/memory_stats.go new file mode 100644 index 00000000000..5912533cc21 --- /dev/null +++ b/go/vt/mysqlctl/backupstats/memory_stats.go @@ -0,0 +1,44 @@ +package backupstats + +import "time" + +type memoryStats struct { + // bytes stores the sum of bytes passed to TimedIncrementBytes since the last + // call to Reset. + bytes int + // calls stores the number of calls to TimedIncrement or + // TimedIncrementBytesCalls since the last call to Reset. + calls int + // count stores the number of times TimedIncrement was called since the + // last call to Reset. + count int64 + // duration stores the sum of durations passed to TimedIncrement and + // TimedIncrementBytes since the the last call to Reset. + duration time.Duration +} + +func newMemoryStats() *memoryStats { + return &memoryStats{} +} + +// TimedIncrement increments Count by 1 and Duration by d. +func (ms *memoryStats) TimedIncrement(d time.Duration) { + ms.calls++ + ms.count++ + ms.duration += d +} + +// TimedIncrementBytes increments Bytes by b and Duration by d. +func (ms *memoryStats) TimedIncrementBytes(b int, d time.Duration) { + ms.bytes += b + ms.calls++ + ms.duration += d +} + +// Reset sets Bytes, Count, and Duration to zero. +func (ms *memoryStats) Reset() { + ms.bytes = 0 + ms.count = 0 + ms.duration = 0 + ms.calls = 0 +} diff --git a/go/vt/mysqlctl/backupstats/memory_stats_test.go b/go/vt/mysqlctl/backupstats/memory_stats_test.go new file mode 100644 index 00000000000..0dfd5b14131 --- /dev/null +++ b/go/vt/mysqlctl/backupstats/memory_stats_test.go @@ -0,0 +1,57 @@ +package backupstats + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestNewMemoryStats(t *testing.T) { + ms := newMemoryStats() + + require.Equal(t, 0, ms.bytes) + require.Equal(t, int64(0), ms.count) + require.Equal(t, time.Duration(0), ms.duration) +} + +func TestIncrement(t *testing.T) { + ms := newMemoryStats() + + ms.TimedIncrement(5 * time.Minute) + require.Equal(t, 0, ms.bytes) + require.Equal(t, int64(1), ms.count) + require.Equal(t, 5*time.Minute, ms.duration) + + ms.TimedIncrement(10 * time.Minute) + require.Equal(t, 0, ms.bytes) + require.Equal(t, int64(2), ms.count) + require.Equal(t, 15*time.Minute, ms.duration) +} + +func TestIncrementBytes(t *testing.T) { + ms := newMemoryStats() + + ms.TimedIncrementBytes(5, 5*time.Minute) + require.Equal(t, 5, ms.bytes) + require.Equal(t, int64(0), ms.count) + require.Equal(t, 5*time.Minute, ms.duration) + + ms.TimedIncrementBytes(10, 10*time.Minute) + require.Equal(t, 15, ms.bytes) + require.Equal(t, int64(0), ms.count) + require.Equal(t, 15*time.Minute, ms.duration) +} + +func TestReset(t *testing.T) { + ms := newMemoryStats() + + ms.TimedIncrement(5 * time.Minute) + ms.TimedIncrementBytes(5, 5*time.Minute) + + ms.Reset() + + require.Equal(t, 0, ms.bytes) + require.Equal(t, int64(0), ms.count) + require.Equal(t, time.Duration(0), ms.duration) +} diff --git a/go/vt/mysqlctl/backupstats/throttled_stats.go b/go/vt/mysqlctl/backupstats/throttled_stats.go new file mode 100644 index 00000000000..fb7134d6bd2 --- /dev/null +++ b/go/vt/mysqlctl/backupstats/throttled_stats.go @@ -0,0 +1,101 @@ +package backupstats + +import ( + "sync" + "time" +) + +// ThrottledStats is a Stats object that throttles and batches calls to +// TimedIncrement and TimedIncrementBytes when they occur more than once within +// a specified time interval. Unflushed calls are flushed when a subsequent call +// is made that is at least the specified time interval after the last flush +// time. +type ThrottledStats struct { + timedCount *memoryStats + timedBytes *memoryStats + + children []*ThrottledStats + lastFlushTime time.Time + maxInterval time.Duration + mu *sync.Mutex + stats Stats +} + +// Throttle wraps the provided stats object so that calls to TimedIncrement and +// TimedIncrementBytes are passed to the provided stats object no more than +// once per the specified maxInterval. +func Throttle(stats Stats, maxInterval time.Duration) *ThrottledStats { + return &ThrottledStats{ + timedCount: newMemoryStats(), + timedBytes: newMemoryStats(), + + children: make([]*ThrottledStats, 0), + maxInterval: maxInterval, + mu: &sync.Mutex{}, + stats: stats, + } +} + +// Flush flushes any unflushed TimedIncrement or TimedIncrementBytes calls to +// the wrapped Stats object. It also flushes any children created with Scope. +func (ts *ThrottledStats) Flush() { + ts.flush() + for _, c := range ts.children { + c.Flush() + } +} + +// Scope scopes the wrapped Stats object with the provided scopes, and returns +// a new ThrottledStats object wrapping that new Stats object. +// +// Scope is safe for use across goroutines. +func (ts *ThrottledStats) Scope(scopes ...Scope) Stats { + ts.mu.Lock() + defer ts.mu.Unlock() + + stats := ts.stats.Scope(scopes...) + newTs := Throttle(stats, ts.maxInterval) + ts.children = append(ts.children, newTs) + return newTs +} + +// Stats returns the wrapped Stats object. +func (ts *ThrottledStats) Stats() Stats { + return ts.stats +} + +// TimedIncrement batches the provided duration d with any previous, unflushed +// calls to TimedIncrement, and flushes all unflushed calls if enough time has +// elapsed since the last flush. +func (ts *ThrottledStats) TimedIncrement(d time.Duration) { + ts.timedCount.TimedIncrement(d) + ts.maybeFlush() +} + +// TimedIncrementBytes batches the provided bytes b and duration d with any +// previous, unflushed calls to TimedIncrementBytes, and flushes all unflushed +// calls if enough time has elapsed since the last flush. +func (ts *ThrottledStats) TimedIncrementBytes(b int, d time.Duration) { + ts.timedBytes.TimedIncrementBytes(b, d) + ts.maybeFlush() +} + +func (ts *ThrottledStats) flush() { + if ts.timedCount.calls > 0 { + ts.stats.TimedIncrement(ts.timedCount.duration) + ts.timedCount.Reset() + } + if ts.timedBytes.calls > 0 { + ts.stats.TimedIncrementBytes(ts.timedBytes.bytes, ts.timedBytes.duration) + ts.timedBytes.Reset() + } +} + +func (ts *ThrottledStats) maybeFlush() { + now := time.Now() + emitWaitTime := ts.maxInterval - now.Sub(ts.lastFlushTime) + if emitWaitTime < 0 { + ts.flush() + ts.lastFlushTime = now + } +} diff --git a/go/vt/mysqlctl/backupstats/throttled_stats_test.go b/go/vt/mysqlctl/backupstats/throttled_stats_test.go new file mode 100644 index 00000000000..574f21ef8c0 --- /dev/null +++ b/go/vt/mysqlctl/backupstats/throttled_stats_test.go @@ -0,0 +1,138 @@ +package backupstats + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestThrottledStatsFlush(t *testing.T) { + fakeStats := NewFakeStats() + throttled := Throttle(fakeStats, 5*time.Second) + + // First call to TimedIncrement is not throttled. + throttled.TimedIncrement(5 * time.Minute) + + require.Len(t, fakeStats.TimedIncrementCalls, 1) + require.Equal(t, 5*time.Minute, fakeStats.TimedIncrementCalls[0]) + + // Subsequent calls are throttled, batched and combined... + throttled.TimedIncrement(5 * time.Minute) + throttled.TimedIncrement(5 * time.Minute) + + require.Len(t, fakeStats.TimedIncrementCalls, 1) + + // ...and are emitted when Flush() is called. + throttled.Flush() + + require.Len(t, fakeStats.TimedIncrementCalls, 2) + require.Equal(t, 10*time.Minute, fakeStats.TimedIncrementCalls[1]) + + // The next call to Flush() is a no-op because there is nothing to flush. + throttled.Flush() + + require.Len(t, fakeStats.TimedIncrementCalls, 2) + require.Equal(t, 10*time.Minute, fakeStats.TimedIncrementCalls[1]) + + // Children are also flushed. + childThrottled1 := throttled.Scope(Component(BackupEngine)).(*ThrottledStats) + childStats1 := childThrottled1.Stats().(*FakeStats) + childThrottled2 := throttled.Scope(Component(BackupEngine)).(*ThrottledStats) + childStats2 := childThrottled2.Stats().(*FakeStats) + + require.Len(t, childStats1.TimedIncrementCalls, 0) + require.Len(t, childStats2.TimedIncrementCalls, 0) + + childThrottled1.TimedIncrement(5 * time.Minute) + childThrottled2.TimedIncrement(5 * time.Minute) + + require.Len(t, childStats1.TimedIncrementCalls, 1) + require.Len(t, childStats2.TimedIncrementCalls, 1) + + childThrottled1.TimedIncrement(5 * time.Minute) + childThrottled2.TimedIncrement(5 * time.Minute) + + require.Len(t, childStats1.TimedIncrementCalls, 1) + require.Len(t, childStats2.TimedIncrementCalls, 1) + + throttled.Flush() + + require.Len(t, childStats1.TimedIncrementCalls, 2) + require.Len(t, childStats2.TimedIncrementCalls, 2) +} + +func TestThrottledStatsScope(t *testing.T) { + fakeStats := NewFakeStats() + throttled := Throttle(fakeStats, 5*time.Second) + + // Get a scoped throttled stats, and the underlying scoped fake stats. + scopedThrottled := throttled.Scope(Component(BackupEngine)) + require.Len(t, fakeStats.ScopeCalls, 1) + require.Len(t, fakeStats.ScopeReturns, 1) + + scopedFakeStats := fakeStats.ScopeReturns[0].(*FakeStats) + require.Equal(t, BackupEngine.String(), scopedFakeStats.ScopeV[ScopeComponent]) + + // Validate that changes to the parent stats don't change the child, and + // vice versa. + throttled.TimedIncrement(5 * time.Second) + require.Len(t, fakeStats.TimedIncrementCalls, 1) + require.Len(t, scopedFakeStats.TimedIncrementCalls, 0) + + scopedThrottled.TimedIncrement(5 * time.Second) + require.Len(t, fakeStats.TimedIncrementCalls, 1) + require.Len(t, scopedFakeStats.TimedIncrementCalls, 1) +} + +func TestThrottledStatsTimedIncrement(t *testing.T) { + fakeStats := NewFakeStats() + throttled := Throttle(fakeStats, 1*time.Second) + + throttled.TimedIncrement(5 * time.Minute) + + require.Len(t, fakeStats.TimedIncrementCalls, 1) + require.Equal(t, 5*time.Minute, fakeStats.TimedIncrementCalls[0]) + + throttled.TimedIncrement(5 * time.Minute) + + require.Len(t, fakeStats.TimedIncrementCalls, 1) + + time.Sleep(500 * time.Millisecond) + + require.Len(t, fakeStats.TimedIncrementCalls, 1) + + time.Sleep(501 * time.Millisecond) + + throttled.TimedIncrement(5 * time.Minute) + + require.Len(t, fakeStats.TimedIncrementCalls, 2) + require.Equal(t, 10*time.Minute, fakeStats.TimedIncrementCalls[1]) +} + +func TestThrottledStatsTimedIncrementBytes(t *testing.T) { + fakeStats := NewFakeStats() + throttled := Throttle(fakeStats, 1*time.Second) + + throttled.TimedIncrementBytes(5, 5*time.Minute) + + require.Len(t, fakeStats.TimedIncrementBytesCalls, 1) + require.Equal(t, 5, fakeStats.TimedIncrementBytesCalls[0].Bytes) + require.Equal(t, 5*time.Minute, fakeStats.TimedIncrementBytesCalls[0].Duration) + + throttled.TimedIncrementBytes(5, 5*time.Minute) + + require.Len(t, fakeStats.TimedIncrementBytesCalls, 1) + + time.Sleep(500 * time.Millisecond) + + require.Len(t, fakeStats.TimedIncrementBytesCalls, 1) + + time.Sleep(501 * time.Millisecond) + + throttled.TimedIncrementBytes(5, 5*time.Minute) + + require.Len(t, fakeStats.TimedIncrementBytesCalls, 2) + require.Equal(t, 10, fakeStats.TimedIncrementBytesCalls[1].Bytes) + require.Equal(t, 10*time.Minute, fakeStats.TimedIncrementBytesCalls[1].Duration) +}