Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
492ac3c
VStreamer throttles on source based on check-self
shlomi-noach Jan 19, 2021
b2ee511
vstreamer.Engine now gets lagThrottler reference, to be used by vstre…
shlomi-noach Jan 20, 2021
2907746
removed debug messages
shlomi-noach Jan 20, 2021
0eea641
Engine owns throttling. Streamers of all types cal on Engine's thrott…
shlomi-noach Jan 20, 2021
f99fa1b
only throttle the events channel, not other channels. We do this by w…
shlomi-noach Jan 20, 2021
542fb91
name of vreplication_basic workflow
shlomi-noach Jan 20, 2021
dd14a08
name of workflow file
shlomi-noach Jan 20, 2021
1e781d4
vreplication_basic on its own shard
shlomi-noach Jan 20, 2021
138bfd5
API endpoint for throttle-app and unthrottle-app
shlomi-noach Jan 20, 2021
603838c
simplify
shlomi-noach Jan 20, 2021
5239392
rename
shlomi-noach Jan 20, 2021
67b5889
test materialization with source throttling
shlomi-noach Jan 20, 2021
209e96e
some comments, a minor Sleep
shlomi-noach Jan 21, 2021
9b3e65a
vreplication_basic moved to a new shard
shlomi-noach Jan 21, 2021
694e1f1
Merge remote-tracking branch 'upstream/master' into vstreamer-throttl…
shlomi-noach Jan 21, 2021
d101493
sub test sections
shlomi-noach Jan 21, 2021
4bf07d3
enable throttler in vreplication test tablets
shlomi-noach Jan 21, 2021
7fa0763
fix shard name
shlomi-noach Jan 21, 2021
78cb68b
exit endless loop
shlomi-noach Jan 21, 2021
ace94f6
fix replication lag query: apparently 'lag' is now introduced asa new…
shlomi-noach Jan 21, 2021
498ccf3
more granular throttling
shlomi-noach Jan 21, 2021
ac494c2
fix vstreamer app name in test, plus validate it
shlomi-noach Jan 21, 2021
6d6e06b
fix throttler path
shlomi-noach Jan 21, 2021
e4a00b3
minor refactor
shlomi-noach Jan 21, 2021
2c32c91
update api path
shlomi-noach Jan 21, 2021
f8822eb
throttle on source tablets
shlomi-noach Jan 21, 2021
fdad85c
throttle product, count customer
shlomi-noach Jan 21, 2021
a55bf2f
code comments
shlomi-noach Jan 21, 2021
c9d5447
comments
shlomi-noach Jan 21, 2021
48279f5
disable lag throttler in irrlelevant test
shlomi-noach Jan 21, 2021
0de0eb7
fix vreplication test: add required sleep just after throttling, vali…
shlomi-noach Jan 24, 2021
7c49e2d
run self checks in throttler
shlomi-noach Jan 24, 2021
bdbcf1d
run self checks in throttler
shlomi-noach Jan 24, 2021
5e24918
Merge remote-tracking branch 'upstream/master' into vstreamer-throttl…
shlomi-noach Jan 24, 2021
dcb6915
fixed dependencies in tabletserver
shlomi-noach Jan 24, 2021
166d939
comment
shlomi-noach Jan 24, 2021
0bc14fa
support 'TickNow()'
shlomi-noach Jan 25, 2021
db121c1
speed up of throttler refresh tick upon opening and upon becoming leader
shlomi-noach Jan 25, 2021
bb328fa
restoring lost endtoend test(24). Whoops
shlomi-noach Jan 25, 2021
13ca3db
merge master, resolve conflicts
shlomi-noach Jan 26, 2021
2b4c5a2
workflow endtoend tests: named shards rather than numbered shards
shlomi-noach Jan 26, 2021
7f44ebc
convert shard numbers to strings
shlomi-noach Jan 26, 2021
22088b4
shard -1 --> shard ""
shlomi-noach Jan 26, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .github/workflows/cluster_endtoend_vreplication_basic.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Cluster (vreplication_basic)
on: [push, pull_request]
jobs:

build:
name: Run endtoend tests on Cluster (vreplication_basic)
runs-on: ubuntu-latest

steps:
- name: Set up Go
uses: actions/setup-go@v1
with:
go-version: 1.15

- name: Check out code
uses: actions/checkout@v2

- name: Get dependencies
run: |
sudo apt-get update
sudo apt-get install -y mysql-server mysql-client make unzip g++ etcd curl git wget eatmydata
sudo service mysql stop
sudo service etcd stop
sudo ln -s /etc/apparmor.d/usr.sbin.mysqld /etc/apparmor.d/disable/
sudo apparmor_parser -R /etc/apparmor.d/usr.sbin.mysqld
go mod download

wget https://repo.percona.com/apt/percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get install -y gnupg2
sudo dpkg -i percona-release_latest.$(lsb_release -sc)_all.deb
sudo apt-get update
sudo apt-get install percona-xtrabackup-24

- name: Run cluster endtoend test
timeout-minutes: 30
run: |
source build.env
eatmydata -- go run test.go -docker=false -print-log -follow -shard vreplication_basic
8 changes: 7 additions & 1 deletion go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,14 @@ func (vc *VitessCluster) AddTablet(t *testing.T, cell *Cell, keyspace *Keyspace,
vc.Topo.Port,
globalConfig.hostname,
globalConfig.tmpDir,
[]string{"-queryserver-config-schema-reload-time", "5"}, //FIXME: for multi-cell initial schema doesn't seem to load without this
[]string{
"-queryserver-config-schema-reload-time", "5",
"-enable-lag-throttler",
"-heartbeat_enable",
"-heartbeat_interval", "250ms",
}, //FIXME: for multi-cell initial schema doesn't seem to load without "-queryserver-config-schema-reload-time"
false)

require.NotNil(t, vttablet)
vttablet.SupportsBackup = false

Expand Down
786 changes: 450 additions & 336 deletions go/test/endtoend/vreplication/vreplication_test.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func TestSchemaVersioning(t *testing.T) {
tsv.EnableHistorian(false)
tsv.SetTracking(false)
tsv.EnableHeartbeat(false)
tsv.EnableThrottler(false)
defer tsv.EnableThrottler(true)
defer tsv.EnableHeartbeat(true)
defer tsv.EnableHistorian(true)
defer tsv.SetTracking(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (ec *externalConnector) Get(name string) (*mysqlConnector, error) {
c := &mysqlConnector{}
c.env = tabletenv.NewEnv(config, name)
c.se = schema.NewEngine(c.env)
c.vstreamer = vstreamer.NewEngine(c.env, nil, c.se, "")
c.vstreamer = vstreamer.NewEngine(c.env, nil, c.se, nil, "")
c.vstreamer.InitDBConfig("")
c.se.InitDBConfig(c.env.Config().DB.AllPrivsWithDB())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestMain(m *testing.M) {

// engines cannot be initialized in testenv because it introduces
// circular dependencies.
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, env.Cells[0])
streamerEngine = vstreamer.NewEngine(env.TabletEnv, env.SrvTopo, env.SchemaEngine, nil, env.Cells[0])
streamerEngine.InitDBConfig(env.KeyspaceName)
streamerEngine.Open()
defer streamerEngine.Close()
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/relaylog.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func newRelayLog(ctx context.Context, maxItems, maxSize int) *relayLog {
return rl
}

// Send writes events to the relay log
func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error {
rl.mu.Lock()
defer rl.mu.Unlock()
Expand All @@ -83,6 +84,7 @@ func (rl *relayLog) Send(events []*binlogdatapb.VEvent) error {
return nil
}

// Fetch returns all existing items in the relay log, and empties the log
func (rl *relayLog) Fetch() ([][]*binlogdatapb.VEvent, error) {
rl.mu.Lock()
defer rl.mu.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewReplicaConnector(connParams *mysql.ConnParams) *replicaConnector {
env := tabletenv.NewEnv(config, "source")
c.se = schema.NewEngine(env)
c.se.SkipMetaCheck = true
c.vstreamer = vstreamer.NewEngine(env, nil, c.se, "")
c.vstreamer = vstreamer.NewEngine(env, nil, c.se, nil, "")
c.se.InitDBConfig(dbconfigs.New(connParams))

// Open
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
}
}

// applyStmtEvent applies an actual DML statement received from the source, directly onto the backend database
func (vp *vplayer) applyStmtEvent(ctx context.Context, event *binlogdatapb.VEvent) error {
sql := event.Statement
if sql == "" {
Expand Down
27 changes: 18 additions & 9 deletions go/vt/vttablet/tabletserver/gc/tablegc.go
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,21 @@ func (collector *TableGC) checkTables(ctx context.Context) error {
return nil
}

func (collector *TableGC) throttleStatusOK(ctx context.Context) bool {
if time.Since(collector.lastSuccessfulThrottleCheck) <= throttleCheckDuration {
// if last check was OK just very recently there is no need to check again
return true
}
// It's time to run a throttler check
checkResult := collector.lagThrottler.Check(ctx, throttlerAppName, "", throttleFlags)
if checkResult.StatusCode != http.StatusOK {
// sorry, we got throttled.
return false
}
collector.lastSuccessfulThrottleCheck = time.Now()
return true
}

// purge continuously purges rows from a table.
// This function is non-reentrant: there's only one instance of this function running at any given time.
// A timer keeps calling this function, so if it bails out (e.g. on error) it will later resume work
Expand Down Expand Up @@ -451,15 +466,9 @@ func (collector *TableGC) purge(ctx context.Context) (tableName string, err erro

log.Infof("TableGC: purge begin for %s", tableName)
for {
if time.Since(collector.lastSuccessfulThrottleCheck) > throttleCheckDuration {
// It's time to run a throttler check
checkResult := collector.lagThrottler.Check(ctx, throttlerAppName, "", throttleFlags)
if checkResult.StatusCode != http.StatusOK {
// sorry, we got throttled. Back off, sleep, try again
time.Sleep(throttleCheckDuration)
continue
}
collector.lastSuccessfulThrottleCheck = time.Now()
for !collector.throttleStatusOK(ctx) {
// Sorry, got throttled. Sleep some time, then check again
time.Sleep(throttleCheckDuration)
}
// OK, we're clear to go!

Expand Down
48 changes: 40 additions & 8 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,29 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to

tsOnce.Do(func() { srvTopoServer = srvtopo.NewResilientServer(topoServer, "TabletSrvTopo") })

tabletTypeFunc := func() topodatapb.TabletType {
if tsv.sm == nil {
return topodatapb.TabletType_UNKNOWN
}
return tsv.sm.Target().TabletType
}

tsv.statelessql = NewQueryList("oltp-stateless")
tsv.statefulql = NewQueryList("oltp-stateful")
tsv.olapql = NewQueryList("olap")
tsv.lagThrottler = throttle.NewThrottler(tsv, topoServer, tabletTypeFunc)
tsv.hs = newHealthStreamer(tsv, alias)
tsv.se = schema.NewEngine(tsv)
tsv.rt = repltracker.NewReplTracker(tsv, alias)
tsv.vstreamer = vstreamer.NewEngine(tsv, srvTopoServer, tsv.se, alias.Cell)
tsv.vstreamer = vstreamer.NewEngine(tsv, srvTopoServer, tsv.se, tsv.lagThrottler, alias.Cell)
tsv.tracker = schema.NewTracker(tsv, tsv.vstreamer, tsv.se)
tsv.watcher = NewBinlogWatcher(tsv, tsv.vstreamer, tsv.config)
tsv.qe = NewQueryEngine(tsv, tsv.se)
tsv.txThrottler = txthrottler.NewTxThrottler(tsv.config, topoServer)
tsv.te = NewTxEngine(tsv)
tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer)

tabletTypeFunc := func() topodatapb.TabletType {
if tsv.sm == nil {
return topodatapb.TabletType_UNKNOWN
}
return tsv.sm.Target().TabletType
}
tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tabletTypeFunc)
tsv.lagThrottler = throttle.NewThrottler(tsv, topoServer, tabletTypeFunc)
tsv.tableGC = gc.NewTableGC(tsv, topoServer, tabletTypeFunc, tsv.lagThrottler)

tsv.sm = &stateManager{
Expand Down Expand Up @@ -1621,10 +1622,34 @@ func (tsv *TabletServer) registerThrottlerStatusHandler() {
})
}

// registerThrottlerThrottleAppHandler registers a throttler "throttle-app" request
func (tsv *TabletServer) registerThrottlerThrottleAppHandler() {
tsv.exporter.HandleFunc("/throttler/throttle-app", func(w http.ResponseWriter, r *http.Request) {
appName := r.URL.Query().Get("app")
d, err := time.ParseDuration(r.URL.Query().Get("duration"))
if err != nil {
http.Error(w, fmt.Sprintf("not ok: %v", err), http.StatusInternalServerError)
return
}
appThrottle := tsv.lagThrottler.ThrottleApp(appName, time.Now().Add(d), 1)

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(appThrottle)
})
tsv.exporter.HandleFunc("/throttler/unthrottle-app", func(w http.ResponseWriter, r *http.Request) {
appName := r.URL.Query().Get("app")
appThrottle := tsv.lagThrottler.UnthrottleApp(appName)

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(appThrottle)
})
}

// registerThrottlerHandlers registers all throttler handlers
func (tsv *TabletServer) registerThrottlerHandlers() {
tsv.registerThrottlerCheckHandlers()
tsv.registerThrottlerStatusHandler()
tsv.registerThrottlerThrottleAppHandler()
}

func (tsv *TabletServer) registerDebugEnvHandler() {
Expand All @@ -1639,6 +1664,13 @@ func (tsv *TabletServer) EnableHeartbeat(enabled bool) {
tsv.rt.EnableHeartbeat(enabled)
}

// EnableThrottler forces throttler to be on or off.
// When throttler is off, it responds to all check requests with HTTP 200 OK
// Only to be used for testing.
func (tsv *TabletServer) EnableThrottler(enabled bool) {
tsv.Config().EnableLagThrottler = enabled
}

// SetTracking forces tracking to be on or off.
// Only to be used for testing.
func (tsv *TabletServer) SetTracking(enabled bool) {
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletserver/throttle/base/app_throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
// AppThrottle is the definition for an app throttling instruction
// - Ratio: [0..1], 0 == no throttle, 1 == fully throttle
type AppThrottle struct {
AppName string
ExpireAt time.Time
Ratio float64
}

// NewAppThrottle creates an AppThrottle struct
func NewAppThrottle(expireAt time.Time, ratio float64) *AppThrottle {
func NewAppThrottle(appName string, expireAt time.Time, ratio float64) *AppThrottle {
result := &AppThrottle{
AppName: appName,
ExpireAt: expireAt,
Ratio: ratio,
}
Expand Down
9 changes: 5 additions & 4 deletions go/vt/vttablet/tabletserver/throttle/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,11 +645,10 @@ func (throttler *Throttler) expireThrottledApps() {
}

// ThrottleApp instructs the throttler to begin throttling an app, to som eperiod and with some ratio.
func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, ratio float64) {
func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, ratio float64) (appThrottle *base.AppThrottle) {
throttler.throttledAppsMutex.Lock()
defer throttler.throttledAppsMutex.Unlock()

var appThrottle *base.AppThrottle
now := time.Now()
if object, found := throttler.throttledApps.Get(appName); found {
appThrottle = object.(*base.AppThrottle)
Expand All @@ -666,18 +665,20 @@ func (throttler *Throttler) ThrottleApp(appName string, expireAt time.Time, rati
if ratio < 0 {
ratio = defaultThrottleRatio
}
appThrottle = base.NewAppThrottle(expireAt, ratio)
appThrottle = base.NewAppThrottle(appName, expireAt, ratio)
}
if now.Before(appThrottle.ExpireAt) {
throttler.throttledApps.Set(appName, appThrottle, cache.DefaultExpiration)
} else {
throttler.UnthrottleApp(appName)
}
return appThrottle
}

// UnthrottleApp cancels any throttling, if any, for a given app
func (throttler *Throttler) UnthrottleApp(appName string) {
func (throttler *Throttler) UnthrottleApp(appName string) (appThrottle *base.AppThrottle) {
throttler.throttledApps.Delete(appName)
return base.NewAppThrottle(appName, time.Now(), 0)
}

// IsAppThrottled tells whether some app should be throttled.
Expand Down
Loading