Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/cmd/vttablet/vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func main() {
DBConfigs: config.DB.Clone(),
QueryServiceControl: qsc,
UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine()),
VREngine: vreplication.NewEngine(config, ts, tabletAlias.Cell, mysqld),
VREngine: vreplication.NewEngine(config, ts, tabletAlias.Cell, mysqld, qsc.LagThrottler()),
}
if err := tm.Start(tablet, config.Healthcheck.IntervalSeconds.Get()); err != nil {
log.Exitf("failed to parse -tablet-path or initialize DB credentials: %v", err)
Expand Down
124 changes: 98 additions & 26 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,16 @@ import (
)

var (
vc *VitessCluster
vtgate *cluster.VtgateProcess
defaultCell *Cell
vtgateConn *mysql.Conn
defaultRdonly int
defaultReplicas int
allCellNames string
httpClient = throttlebase.SetupHTTPClient(time.Second)
throttlerAppName = "vstreamer"
vc *VitessCluster
vtgate *cluster.VtgateProcess
defaultCell *Cell
vtgateConn *mysql.Conn
defaultRdonly int
defaultReplicas int
allCellNames string
httpClient = throttlebase.SetupHTTPClient(time.Second)
sourceThrottlerAppName = "vstreamer"
targetThrottlerAppName = "vreplication"
)

func init() {
Expand All @@ -63,16 +64,16 @@ func throttleResponse(tablet *cluster.VttabletProcess, path string) (resp *http.
return resp, respBody, err
}

func throttleStreamer(tablet *cluster.VttabletProcess) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerAppName))
func throttleStreamer(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", app))
}

func unthrottleStreamer(tablet *cluster.VttabletProcess) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerAppName))
func unthrottleStreamer(tablet *cluster.VttabletProcess, app string) (*http.Response, string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", app))
}

func throttlerCheckSelf(tablet *cluster.VttabletProcess) (resp *http.Response, respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, throttlerAppName)
func throttlerCheckSelf(tablet *cluster.VttabletProcess, app string) (resp *http.Response, respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, app)
resp, err = httpClient.Get(apiURL)
if err != nil {
return resp, respBody, err
Expand Down Expand Up @@ -107,6 +108,7 @@ func TestBasicVreplicationWorkflow(t *testing.T) {
materializeRollup(t)

shardCustomer(t, true, []*Cell{defaultCell}, defaultCellName)

validateRollupReplicates(t)
shardOrders(t)
shardMerchant(t)
Expand Down Expand Up @@ -221,11 +223,16 @@ func insertMoreProducts(t *testing.T) {
execVtgateQuery(t, vtgateConn, "product", sql)
}

func insertMoreProductsForThrottler(t *testing.T) {
func insertMoreProductsForSourceThrottler(t *testing.T) {
sql := "insert into product(pid, description) values(103, 'new-cpu'),(104, 'new-camera'),(105, 'new-mouse');"
execVtgateQuery(t, vtgateConn, "product", sql)
}

func insertMoreProductsForTargetThrottler(t *testing.T) {
sql := "insert into product(pid, description) values(203, 'new-cpu'),(204, 'new-camera'),(205, 'new-mouse');"
execVtgateQuery(t, vtgateConn, "product", sql)
}

func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAlias string) {
t.Run("shardCustomer", func(t *testing.T) {
workflow := "p2c"
Expand All @@ -241,6 +248,7 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
if err := vtgate.WaitForStatusOfTabletInShard(fmt.Sprintf("%s.%s.master", "customer", "80-"), 1); err != nil {
t.Fatal(err)
}

tables := "customer"
moveTables(t, sourceCellOrAlias, workflow, sourceKs, targetKs, tables)

Expand Down Expand Up @@ -593,41 +601,105 @@ func materializeProduct(t *testing.T) {
}

productTablets := vc.getVttabletsInKeyspace(t, defaultCell, "product", "master")
t.Run("throttle-app", func(t *testing.T) {
t.Run("throttle-app-product", func(t *testing.T) {
// Now, throttle the streamer on source tablets, insert some rows
for _, tab := range productTablets {
_, body, err := throttleStreamer(tab)
_, body, err := throttleStreamer(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)
assert.Contains(t, body, sourceThrottlerAppName)
}
// Wait for throttling to take effect (caching will expire by this time):
time.Sleep(1 * time.Second)
for _, tab := range productTablets {
_, body, err := throttlerCheckSelf(tab)
assert.NoError(t, err)
assert.Contains(t, body, "417")
{
_, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "417")
}
{
_, body, err := throttlerCheckSelf(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
}
insertMoreProductsForThrottler(t)
insertMoreProductsForSourceThrottler(t)
// To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place,
time.Sleep(1 * time.Second)
// we expect the additional rows to **not appear** in the materialized view
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 5)
}
})
t.Run("unthrottle-app", func(t *testing.T) {
t.Run("unthrottle-app-product", func(t *testing.T) {
// unthrottle on source tablets, and expect the rows to show up
for _, tab := range productTablets {
_, body, err := unthrottleStreamer(tab)
_, body, err := unthrottleStreamer(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, throttlerAppName)
assert.Contains(t, body, sourceThrottlerAppName)
}
// give time for unthrottling to take effect and for target to fetch data
time.Sleep(3 * time.Second)
for _, tab := range productTablets {
{
_, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
}
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 8)
}
})

t.Run("throttle-app-customer", func(t *testing.T) {
// Now, throttle the streamer on source tablets, insert some rows
for _, tab := range customerTablets {
_, body, err := throttleStreamer(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, targetThrottlerAppName)
}
// Wait for throttling to take effect (caching will expire by this time):
time.Sleep(1 * time.Second)
for _, tab := range customerTablets {
{
_, body, err := throttlerCheckSelf(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "417")
}
{
_, body, err := throttlerCheckSelf(tab, sourceThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
}
insertMoreProductsForTargetThrottler(t)
// To be fair to the test, we give the target time to apply the new changes. We expect it to NOT get them in the first place,
time.Sleep(1 * time.Second)
// we expect the additional rows to **not appear** in the materialized view
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 8)
}
})
t.Run("unthrottle-app-customer", func(t *testing.T) {
// unthrottle on source tablets, and expect the rows to show up
for _, tab := range customerTablets {
_, body, err := unthrottleStreamer(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, targetThrottlerAppName)
}
// give time for unthrottling to take effect and for target to fetch data
time.Sleep(3 * time.Second)
for _, tab := range customerTablets {
{
_, body, err := throttlerCheckSelf(tab, targetThrottlerAppName)
assert.NoError(t, err)
assert.Contains(t, body, "200")
}
}
for _, tab := range customerTablets {
validateCountInTablet(t, tab, keyspace, workflow, 11)
}
})
})
}

Expand Down
38 changes: 22 additions & 16 deletions go/vt/vttablet/tabletmanager/vreplication/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,30 @@ limitations under the License.
package vreplication

import (
"context"
"errors"
"flag"
"fmt"
"sort"
"sync"
"time"

"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/withddl"

"context"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/sync2"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/evalengine"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
"vitess.io/vitess/go/vt/withddl"
)

const (
Expand All @@ -65,6 +64,10 @@ const (

var withDDL *withddl.WithDDL

const (
throttlerAppName = "vreplication"
)

func init() {
allddls := append([]string{}, binlogplayer.CreateVReplicationTable()...)
allddls = append(allddls, binlogplayer.AlterVReplicationTable...)
Expand Down Expand Up @@ -111,6 +114,8 @@ type Engine struct {

journaler map[string]*journalEvent
ec *externalConnector

throttlerClient *throttle.Client
}

type journalEvent struct {
Expand All @@ -121,14 +126,15 @@ type journalEvent struct {

// NewEngine creates a new Engine.
// A nil ts means that the Engine is disabled.
func NewEngine(config *tabletenv.TabletConfig, ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon) *Engine {
func NewEngine(config *tabletenv.TabletConfig, ts *topo.Server, cell string, mysqld mysqlctl.MysqlDaemon, lagThrottler *throttle.Throttler) *Engine {
vre := &Engine{
controllers: make(map[int]*controller),
ts: ts,
cell: cell,
mysqld: mysqld,
journaler: make(map[string]*journalEvent),
ec: newExternalConnector(config.ExternalConnections),
controllers: make(map[int]*controller),
ts: ts,
cell: cell,
mysqld: mysqld,
journaler: make(map[string]*journalEvent),
ec: newExternalConnector(config.ExternalConnections),
throttlerClient: throttle.NewBackgroundClient(lagThrottler, throttlerAppName, throttle.ThrottleCheckSelf),
}
return vre
}
Expand Down
16 changes: 12 additions & 4 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,11 +221,18 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
var updateCopyState *sqlparser.ParsedQuery
var bv map[string]*querypb.BindVariable
err = vc.vr.sourceVStreamer.VStreamRows(ctx, initialPlan.SendRule.Filter, lastpkpb, func(rows *binlogdatapb.VStreamRowsResponse) error {
select {
case <-ctx.Done():
return io.EOF
default:
for {
select {
case <-ctx.Done():
return io.EOF
default:
}
// verify throttler is happy, otherwise keep looping
if vc.vr.vre.throttlerClient.ThrottleCheckOKOrWait(ctx) {
break
}
}

if vc.tablePlan == nil {
if len(rows.Fields) == 0 {
return fmt.Errorf("expecting field event first, got: %v", rows)
Expand All @@ -249,6 +256,7 @@ func (vc *vcopier) copyTable(ctx context.Context, tableName string, copyState ma
if len(rows.Rows) == 0 {
return nil
}

// The number of rows we receive depends on the packet size set
// for the row streamer. Since the packet size is roughly equivalent
// to data size, this should map to a uniform amount of pages affected
Expand Down
5 changes: 5 additions & 0 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ func (vp *vplayer) applyEvents(ctx context.Context, relay *relayLog) error {
defer vp.vr.stats.SecondsBehindMaster.Set(math.MaxInt64)
var sbm int64 = -1
for {
// check throttler.
if !vp.vr.vre.throttlerClient.ThrottleCheckOKOrWait(ctx) {
continue
}

items, err := relay.Fetch()
if err != nil {
return err
Expand Down
Loading