Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 27 additions & 10 deletions go/vt/mysqlctl/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ var (
backupCompressBlocks = 2

titleCase = cases.Title(language.English).String

// backupStatsEmitInterval is the maximum frequency at which backup stats
// are emitted.
backupStatsEmitInterval = 10 * time.Second
)

func init() {
Expand Down Expand Up @@ -118,13 +122,18 @@ func Backup(ctx context.Context, params BackupParams) error {
}
defer bs.Close()

// Scope bsStats to selected storage engine.
bsStats := params.Stats.Scope(
stats.Component(stats.BackupStorage),
stats.Implementation(
titleCase(backupstorage.BackupStorageImplementation),
// Scope bsStats to selected storage engine and throttle stat emission.
bsStats := backupstats.Throttle(
params.Stats.Scope(
stats.Component(stats.BackupStorage),
stats.Implementation(
titleCase(backupstorage.BackupStorageImplementation),
),
),
backupStatsEmitInterval,
)
defer bsStats.Flush()

bs = bs.WithParams(backupstorage.Params{
Logger: params.Logger,
Stats: bsStats,
Expand All @@ -139,12 +148,20 @@ func Backup(ctx context.Context, params BackupParams) error {
if err != nil {
return vterrors.Wrap(err, "failed to find backup engine")
}
// Scope stats to selected backup engine.
beParams := params.Copy()
beParams.Stats = params.Stats.Scope(
stats.Component(stats.BackupEngine),
stats.Implementation(titleCase(backupEngineImplementation)),

// Scope stats to selected backup engine and throttle stat emission.
beStats := backupstats.Throttle(
params.Stats.Scope(
stats.Component(stats.BackupEngine),
stats.Implementation(titleCase(backupEngineImplementation)),
),
backupStatsEmitInterval,
)
defer beStats.Flush()

beParams := params.Copy()
beParams.Stats = beStats

// Take the backup, and either AbortBackup or EndBackup.
usable, err := be.ExecuteBackup(ctx, beParams, bh)
logger := params.Logger
Expand Down
72 changes: 55 additions & 17 deletions go/vt/mysqlctl/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,29 @@ func TestBackupExecutesBackupWithScopedParams(t *testing.T) {
require.Nil(t, Backup(env.ctx, env.backupParams), env.logger.Events)

require.Equal(t, 1, len(env.backupEngine.ExecuteBackupCalls))

// Get the backup params passed to the backup engine.
executeBackupParams := env.backupEngine.ExecuteBackupCalls[0].BackupParams
var executeBackupStats *backupstats.FakeStats

// Get the stats that were passed along with the backup params.
executeBackupParamsStats := unwrapStats(executeBackupParams.Stats)

// Look through the return value of stats.Scope(). Find the one that is
// equal to the stats that were passed to the backup engine.
var scopedStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == executeBackupParams.Stats {
executeBackupStats = sr.(*backupstats.FakeStats)
fs := unwrapStats(sr)
if fs == executeBackupParamsStats {
scopedStats = fs.(*backupstats.FakeStats)
}
}
require.Contains(t, executeBackupStats.ScopeV, backupstats.ScopeComponent)
require.Equal(t, backupstats.BackupEngine.String(), executeBackupStats.ScopeV[backupstats.ScopeComponent])
require.Contains(t, executeBackupStats.ScopeV, backupstats.ScopeImplementation)
require.Equal(t, "Fake", executeBackupStats.ScopeV[backupstats.ScopeImplementation])

// Validate the scope of the stats.
require.NotNil(t, scopedStats, executeBackupParams.Stats)
require.Contains(t, scopedStats.ScopeV, backupstats.ScopeComponent)
require.Equal(t, backupstats.BackupEngine.String(), scopedStats.ScopeV[backupstats.ScopeComponent])
require.Contains(t, scopedStats.ScopeV, backupstats.ScopeImplementation)
require.Equal(t, "Fake", scopedStats.ScopeV[backupstats.ScopeImplementation])
}

// TestBackupNoStats tests that if BackupParams.Stats is nil, then Backup will
Expand All @@ -71,7 +83,8 @@ func TestBackupNoStats(t *testing.T) {

// It parameterizes the backup storage with nop stats.
require.Equal(t, 1, len(env.backupStorage.WithParamsCalls))
require.Equal(t, backupstats.NoStats(), env.backupStorage.WithParamsCalls[0].Stats)
fs := unwrapStats(env.backupStorage.WithParamsCalls[0].Stats)
require.Equal(t, backupstats.NoStats(), fs)
}

// TestBackupParameterizesBackupStorageWithScopedStats tests that Backup passes
Expand All @@ -83,16 +96,29 @@ func TestBackupParameterizesBackupStorageWithScopedStats(t *testing.T) {
require.Nil(t, Backup(env.ctx, env.backupParams), env.logger.Events)

require.Equal(t, 1, len(env.backupStorage.WithParamsCalls))
var storageStats *backupstats.FakeStats

// Get the params passed in to backup storage WithParams().
backupStorageParams := env.backupStorage.WithParamsCalls[0]

// Get the stats attached to those params.
backupStorageParamsStats := unwrapStats(backupStorageParams.Stats)

// Look through the return value of stats.Scope(). Find the one that is
// equal to the stats that were passed to the backup storage WithParams().
var scopedStats *backupstats.FakeStats
for _, sr := range env.stats.ScopeReturns {
if sr == env.backupStorage.WithParamsCalls[0].Stats {
storageStats = sr.(*backupstats.FakeStats)
fs := unwrapStats(sr)
if fs == backupStorageParamsStats {
scopedStats = fs.(*backupstats.FakeStats)
}
}
require.Contains(t, storageStats.ScopeV, backupstats.ScopeComponent)
require.Equal(t, backupstats.BackupStorage.String(), storageStats.ScopeV[backupstats.ScopeComponent])
require.Contains(t, storageStats.ScopeV, backupstats.ScopeImplementation)
require.Equal(t, "Fake", storageStats.ScopeV[backupstats.ScopeImplementation])

// Validate the stats scope.
require.NotNil(t, scopedStats)
require.Contains(t, scopedStats.ScopeV, backupstats.ScopeComponent)
require.Equal(t, backupstats.BackupStorage.String(), scopedStats.ScopeV[backupstats.ScopeComponent])
require.Contains(t, scopedStats.ScopeV, backupstats.ScopeImplementation)
require.Equal(t, "Fake", scopedStats.ScopeV[backupstats.ScopeImplementation])
}

// TestBackupEmitsStats tests that Backup emits stats.
Expand Down Expand Up @@ -123,13 +149,14 @@ func TestBackupTriesToParameterizeBackupStorage(t *testing.T) {
require.Equal(t, env.logger, env.backupStorage.WithParamsCalls[0].Logger)
var scopedStats backupstats.Stats
for _, sr := range env.stats.ScopeReturns {
if sr != env.backupStorage.WithParamsCalls[0].Stats {
fs := unwrapStats(sr)
if fs != unwrapStats(env.backupStorage.WithParamsCalls[0].Stats) {
continue
}
if scopedStats != nil {
require.Fail(t, "backupstorage stats matches multiple scoped stats produced by parent stats")
}
scopedStats = sr
scopedStats = fs
}
require.NotNil(t, scopedStats)
}
Expand Down Expand Up @@ -547,3 +574,14 @@ func (fbe *fakeBackupRestoreEnv) setStats(stats *backupstats.FakeStats) {
fbe.restoreParams.Stats = nil
fbe.stats = nil
}

// unwrapStats recursively unwraps a non-throttled Stats object from inside
// ThrottledStats. That's useful in the context of this test suite so we can
// validate interactions with FakeStats and NoStats.
func unwrapStats(stats backupstats.Stats) backupstats.Stats {
ts, ok := stats.(*backupstats.ThrottledStats)
if ok {
return unwrapStats(ts.Stats())
}
return stats
}
44 changes: 44 additions & 0 deletions go/vt/mysqlctl/backupstats/memory_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package backupstats

import "time"

type memoryStats struct {
// bytes stores the sum of bytes passed to TimedIncrementBytes since the last
// call to Reset.
bytes int
// calls stores the number of calls to TimedIncrement or
// TimedIncrementBytesCalls since the last call to Reset.
calls int
// count stores the number of times TimedIncrement was called since the
// last call to Reset.
count int64
// duration stores the sum of durations passed to TimedIncrement and
// TimedIncrementBytes since the the last call to Reset.
duration time.Duration
}

func newMemoryStats() *memoryStats {
return &memoryStats{}
}

// TimedIncrement increments Count by 1 and Duration by d.
func (ms *memoryStats) TimedIncrement(d time.Duration) {
ms.calls++
ms.count++
ms.duration += d
}

// TimedIncrementBytes increments Bytes by b and Duration by d.
func (ms *memoryStats) TimedIncrementBytes(b int, d time.Duration) {
ms.bytes += b
ms.calls++
ms.duration += d
}

// Reset sets Bytes, Count, and Duration to zero.
func (ms *memoryStats) Reset() {
ms.bytes = 0
ms.count = 0
ms.duration = 0
ms.calls = 0
}
57 changes: 57 additions & 0 deletions go/vt/mysqlctl/backupstats/memory_stats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package backupstats

import (
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestNewMemoryStats(t *testing.T) {
ms := newMemoryStats()

require.Equal(t, 0, ms.bytes)
require.Equal(t, int64(0), ms.count)
require.Equal(t, time.Duration(0), ms.duration)
}

func TestIncrement(t *testing.T) {
ms := newMemoryStats()

ms.TimedIncrement(5 * time.Minute)
require.Equal(t, 0, ms.bytes)
require.Equal(t, int64(1), ms.count)
require.Equal(t, 5*time.Minute, ms.duration)

ms.TimedIncrement(10 * time.Minute)
require.Equal(t, 0, ms.bytes)
require.Equal(t, int64(2), ms.count)
require.Equal(t, 15*time.Minute, ms.duration)
}

func TestIncrementBytes(t *testing.T) {
ms := newMemoryStats()

ms.TimedIncrementBytes(5, 5*time.Minute)
require.Equal(t, 5, ms.bytes)
require.Equal(t, int64(0), ms.count)
require.Equal(t, 5*time.Minute, ms.duration)

ms.TimedIncrementBytes(10, 10*time.Minute)
require.Equal(t, 15, ms.bytes)
require.Equal(t, int64(0), ms.count)
require.Equal(t, 15*time.Minute, ms.duration)
}

func TestReset(t *testing.T) {
ms := newMemoryStats()

ms.TimedIncrement(5 * time.Minute)
ms.TimedIncrementBytes(5, 5*time.Minute)

ms.Reset()

require.Equal(t, 0, ms.bytes)
require.Equal(t, int64(0), ms.count)
require.Equal(t, time.Duration(0), ms.duration)
}
101 changes: 101 additions & 0 deletions go/vt/mysqlctl/backupstats/throttled_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package backupstats

import (
"sync"
"time"
)

// ThrottledStats is a Stats object that throttles and batches calls to
// TimedIncrement and TimedIncrementBytes when they occur more than once within
// a specified time interval. Unflushed calls are flushed when a subsequent call
// is made that is at least the specified time interval after the last flush
// time.
type ThrottledStats struct {
timedCount *memoryStats
timedBytes *memoryStats

children []*ThrottledStats
lastFlushTime time.Time
maxInterval time.Duration
mu *sync.Mutex
stats Stats
}

// Throttle wraps the provided stats object so that calls to TimedIncrement and
// TimedIncrementBytes are passed to the provided stats object no more than
// once per the specified maxInterval.
func Throttle(stats Stats, maxInterval time.Duration) *ThrottledStats {
return &ThrottledStats{
timedCount: newMemoryStats(),
timedBytes: newMemoryStats(),

children: make([]*ThrottledStats, 0),
maxInterval: maxInterval,
mu: &sync.Mutex{},
stats: stats,
}
}

// Flush flushes any unflushed TimedIncrement or TimedIncrementBytes calls to
// the wrapped Stats object. It also flushes any children created with Scope.
func (ts *ThrottledStats) Flush() {
ts.flush()
for _, c := range ts.children {
c.Flush()
}
}

// Scope scopes the wrapped Stats object with the provided scopes, and returns
// a new ThrottledStats object wrapping that new Stats object.
//
// Scope is safe for use across goroutines.
func (ts *ThrottledStats) Scope(scopes ...Scope) Stats {
ts.mu.Lock()
defer ts.mu.Unlock()

stats := ts.stats.Scope(scopes...)
newTs := Throttle(stats, ts.maxInterval)
ts.children = append(ts.children, newTs)
return newTs
}

// Stats returns the wrapped Stats object.
func (ts *ThrottledStats) Stats() Stats {
return ts.stats
}

// TimedIncrement batches the provided duration d with any previous, unflushed
// calls to TimedIncrement, and flushes all unflushed calls if enough time has
// elapsed since the last flush.
func (ts *ThrottledStats) TimedIncrement(d time.Duration) {
ts.timedCount.TimedIncrement(d)
ts.maybeFlush()
}

// TimedIncrementBytes batches the provided bytes b and duration d with any
// previous, unflushed calls to TimedIncrementBytes, and flushes all unflushed
// calls if enough time has elapsed since the last flush.
func (ts *ThrottledStats) TimedIncrementBytes(b int, d time.Duration) {
ts.timedBytes.TimedIncrementBytes(b, d)
ts.maybeFlush()
}

func (ts *ThrottledStats) flush() {
if ts.timedCount.calls > 0 {
ts.stats.TimedIncrement(ts.timedCount.duration)
ts.timedCount.Reset()
}
if ts.timedBytes.calls > 0 {
ts.stats.TimedIncrementBytes(ts.timedBytes.bytes, ts.timedBytes.duration)
ts.timedBytes.Reset()
}
}

func (ts *ThrottledStats) maybeFlush() {
now := time.Now()
emitWaitTime := ts.maxInterval - now.Sub(ts.lastFlushTime)
if emitWaitTime < 0 {
ts.flush()
ts.lastFlushTime = now
}
}
Loading