From ccbd2e47520cb1eefed1bc28e5a2e33b5e4ecf26 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 10 Mar 2021 20:46:37 +0100 Subject: [PATCH 1/4] Make the frequency at which heartbeats update the _vt.vreplication table a vttablet configuration Signed-off-by: Rohit Nayak --- go.mod | 1 + go.sum | 2 ++ .../tabletmanager/vreplication/vplayer.go | 24 ++++++++++++++-- .../vreplication/vplayer_flaky_test.go | 28 +++++++++++++++++++ .../tabletmanager/vreplication/vreplicator.go | 10 ++++++- 5 files changed, 61 insertions(+), 4 deletions(-) diff --git a/go.mod b/go.mod index 6840b146fca..07fc32afff5 100644 --- a/go.mod +++ b/go.mod @@ -60,6 +60,7 @@ require ( github.com/martini-contrib/render v0.0.0-20150707142108-ec18f8345a11 github.com/mattn/go-sqlite3 v1.14.0 github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 + github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-testing-interface v1.14.0 // indirect github.com/mitchellh/mapstructure v1.2.3 // indirect github.com/montanaflynn/stats v0.6.3 diff --git a/go.sum b/go.sum index 226248912b4..99a0751dba6 100644 --- a/go.sum +++ b/go.sum @@ -484,6 +484,8 @@ github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXx github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= +github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.14.0 h1:/x0XQ6h+3U3nAyk1yx+bHPURrKa9sVVvYbuqZ7pIAtI= github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 77f688d9d31..b1999702c87 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -58,6 +58,9 @@ type vplayer struct { lastTimestampNs int64 // timeOffsetNs keeps track of the clock difference with respect to source tablet. timeOffsetNs int64 + // numAccumulatedHeartbeats keeps track of how many heartbeats have been received since we updated the time_updated column of _vt.vreplication + numAccumulatedHeartbeats int + // canAcceptStmtEvents is set to true if the current player can accept events in statement mode. Only true for filters that are match all. canAcceptStmtEvents bool @@ -227,6 +230,7 @@ func (vp *vplayer) applyRowEvent(ctx context.Context, rowEvent *binlogdatapb.Row } func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) { + vp.numAccumulatedHeartbeats = 0 update := binlogplayer.GenerateUpdatePos(vp.vr.id, vp.pos, time.Now().Unix(), ts) if _, err := vp.vr.dbClient.Execute(update); err != nil { return false, fmt.Errorf("error %v updating position", err) @@ -246,9 +250,7 @@ func (vp *vplayer) updatePos(ts int64) (posReached bool, err error) { return posReached, nil } -func (vp *vplayer) recordHeartbeat() (err error) { - tm := time.Now().Unix() - vp.vr.stats.RecordHeartbeat(tm) +func (vp *vplayer) updateCurrentTime(tm int64) error { update, err := binlogplayer.GenerateUpdateTime(vp.vr.id, tm) if err != nil { return err @@ -259,6 +261,20 @@ func (vp *vplayer) recordHeartbeat() (err error) { return nil } +func (vp *vplayer) mustUpdateCurrentTime() bool { + return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval +} + +func (vp *vplayer) recordHeartbeat() error { + tm := time.Now().Unix() + vp.vr.stats.RecordHeartbeat(tm) + if !vp.mustUpdateCurrentTime() { + return nil + } + vp.numAccumulatedHeartbeats = 0 + return vp.updateCurrentTime(tm) +} + // applyEvents is the main thread that applies the events. It has the following use // cases to take into account: // * Normal transaction that has row mutations. In this case, the transaction @@ -606,11 +622,13 @@ func (vp *vplayer) applyEvent(ctx context.Context, event *binlogdatapb.VEvent, m return io.EOF case binlogdatapb.VEventType_HEARTBEAT: if !vp.vr.dbClient.InTransaction { + vp.numAccumulatedHeartbeats++ err := vp.recordHeartbeat() if err != nil { return err } } } + return nil } diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index 24e2a1773c9..ab75df054d2 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -38,6 +38,34 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" ) +func TestHeartbeatFrequencyFlag(t *testing.T) { + origVReplicationHeartbeatUpdateInterval := *vreplicationHeartbeatUpdateInterval + defer func() { + *vreplicationHeartbeatUpdateInterval = origVReplicationHeartbeatUpdateInterval + }() + + stats := binlogplayer.NewStats() + vp := &vplayer{vr: &vreplicator{dbClient: newVDBClient(realDBClientFactory(), stats), stats: stats}} + + t.Run("default frequency", func(t *testing.T) { + vp.numAccumulatedHeartbeats = 0 + require.False(t, vp.mustUpdateCurrentTime()) + vp.numAccumulatedHeartbeats = 1 + require.True(t, vp.mustUpdateCurrentTime()) + }) + + *vreplicationHeartbeatUpdateInterval = 4 + + t.Run("custom frequency", func(t *testing.T) { + vp.numAccumulatedHeartbeats = 0 + require.False(t, vp.mustUpdateCurrentTime()) + vp.numAccumulatedHeartbeats = 3 + require.False(t, vp.mustUpdateCurrentTime()) + vp.numAccumulatedHeartbeats = 4 + require.True(t, vp.mustUpdateCurrentTime()) + }) +} + func TestVReplicationTimeUpdated(t *testing.T) { ctx := context.Background() defer deleteTablet(addTablet(100)) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 5cd14fd8733..62c4444bb3e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -41,12 +41,20 @@ var ( // idleTimeout is set to slightly above 1s, compared to heartbeatTime // set by VStreamer at slightly below 1s. This minimizes conflicts // between the two timeouts. - idleTimeout = 1100 * time.Millisecond + idleTimeout = 1100 * time.Millisecond + dbLockRetryDelay = 1 * time.Second relayLogMaxSize = flag.Int("relay_log_max_size", 250000, "Maximum buffer size (in bytes) for VReplication target buffering. If single rows are larger than this, a single row is buffered at a time.") relayLogMaxItems = flag.Int("relay_log_max_items", 5000, "Maximum number of rows for VReplication target buffering.") copyTimeout = 1 * time.Hour replicaLagTolerance = 10 * time.Second + + // vreplicationHeartbeatUpdateInterval determines how often the time_updated column is updated if there are no real events on the source and the source + // vstream is only sending heartbeats for this long. Keep this low if you expect high QPS and are monitoring this column to alert about potential + // outages. Keep this high if + // you have too many streams the extra write qps or cpu load due to these updates are unacceptable + // you have too many streams and/or a large source field (lot of participating tables) which generates unacceptable increase in your binlog size + vreplicationHeartbeatUpdateInterval = flag.Int("vreplication_heartbeat_update_interval", 1, "Frequency (in seconds) at which the time_updated column of a vreplication stream when idling") ) // vreplicator provides the core logic to start vreplication streams From 18a4a6403fa428234ccb0e08eeadcb596935b7f8 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 10 Mar 2021 21:21:33 +0100 Subject: [PATCH 2/4] go mod tidy-ed Signed-off-by: Rohit Nayak --- go.mod | 1 - go.sum | 2 -- 2 files changed, 3 deletions(-) diff --git a/go.mod b/go.mod index 07fc32afff5..6840b146fca 100644 --- a/go.mod +++ b/go.mod @@ -60,7 +60,6 @@ require ( github.com/martini-contrib/render v0.0.0-20150707142108-ec18f8345a11 github.com/mattn/go-sqlite3 v1.14.0 github.com/minio/minio-go v0.0.0-20190131015406-c8a261de75c1 - github.com/mitchellh/go-ps v1.0.0 // indirect github.com/mitchellh/go-testing-interface v1.14.0 // indirect github.com/mitchellh/mapstructure v1.2.3 // indirect github.com/montanaflynn/stats v0.6.3 diff --git a/go.sum b/go.sum index 99a0751dba6..226248912b4 100644 --- a/go.sum +++ b/go.sum @@ -484,8 +484,6 @@ github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXx github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= -github.com/mitchellh/go-ps v1.0.0 h1:i6ampVEEF4wQFF+bkYfwYgY+F/uYJDktmvLPf7qIgjc= -github.com/mitchellh/go-ps v1.0.0/go.mod h1:J4lOc8z8yJs6vUwklHw2XEIiT4z4C40KtWVN3nvg8Pg= github.com/mitchellh/go-testing-interface v1.0.0/go.mod h1:kRemZodwjscx+RGhAo8eIhFbs2+BFgRtFPeD/KE+zxI= github.com/mitchellh/go-testing-interface v1.14.0 h1:/x0XQ6h+3U3nAyk1yx+bHPURrKa9sVVvYbuqZ7pIAtI= github.com/mitchellh/go-testing-interface v1.14.0/go.mod h1:gfgS7OtZj6MA4U1UrDRp04twqAjfvlZyCfX3sDjEym8= From 6638c54d1ded4452fe55f7d1b94bd399cc2785cf Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 11 Mar 2021 12:08:31 +0100 Subject: [PATCH 3/4] Implement upper bound on the update interval Signed-off-by: Rohit Nayak --- .../tabletmanager/vreplication/vplayer.go | 3 +- .../vreplication/vplayer_flaky_test.go | 40 +++++++++++-------- .../tabletmanager/vreplication/vreplicator.go | 5 ++- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index b1999702c87..3e04b1d5b0e 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -262,7 +262,8 @@ func (vp *vplayer) updateCurrentTime(tm int64) error { } func (vp *vplayer) mustUpdateCurrentTime() bool { - return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval + return vp.numAccumulatedHeartbeats >= *vreplicationHeartbeatUpdateInterval || + vp.numAccumulatedHeartbeats >= vreplicationMinimumHeartbeatUpdateInterval } func (vp *vplayer) recordHeartbeat() error { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go index ab75df054d2..99357e79ef4 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_flaky_test.go @@ -47,23 +47,29 @@ func TestHeartbeatFrequencyFlag(t *testing.T) { stats := binlogplayer.NewStats() vp := &vplayer{vr: &vreplicator{dbClient: newVDBClient(realDBClientFactory(), stats), stats: stats}} - t.Run("default frequency", func(t *testing.T) { - vp.numAccumulatedHeartbeats = 0 - require.False(t, vp.mustUpdateCurrentTime()) - vp.numAccumulatedHeartbeats = 1 - require.True(t, vp.mustUpdateCurrentTime()) - }) - - *vreplicationHeartbeatUpdateInterval = 4 - - t.Run("custom frequency", func(t *testing.T) { - vp.numAccumulatedHeartbeats = 0 - require.False(t, vp.mustUpdateCurrentTime()) - vp.numAccumulatedHeartbeats = 3 - require.False(t, vp.mustUpdateCurrentTime()) - vp.numAccumulatedHeartbeats = 4 - require.True(t, vp.mustUpdateCurrentTime()) - }) + type testcount struct { + count int + mustUpdate bool + } + type testcase struct { + name string + interval int + counts []testcount + } + testcases := []*testcase{ + {"default frequency", 1, []testcount{{count: 0, mustUpdate: false}, {1, true}}}, + {"custom frequency", 4, []testcount{{count: 0, mustUpdate: false}, {count: 3, mustUpdate: false}, {4, true}}}, + {"minumum frequency", 61, []testcount{{count: 59, mustUpdate: false}, {count: 60, mustUpdate: true}, {61, true}}}, + } + for _, tcase := range testcases { + t.Run(tcase.name, func(t *testing.T) { + *vreplicationHeartbeatUpdateInterval = tcase.interval + for _, tcount := range tcase.counts { + vp.numAccumulatedHeartbeats = tcount.count + require.Equal(t, tcount.mustUpdate, vp.mustUpdateCurrentTime()) + } + }) + } } func TestVReplicationTimeUpdated(t *testing.T) { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 62c4444bb3e..47c77324d15 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -54,7 +54,10 @@ var ( // outages. Keep this high if // you have too many streams the extra write qps or cpu load due to these updates are unacceptable // you have too many streams and/or a large source field (lot of participating tables) which generates unacceptable increase in your binlog size - vreplicationHeartbeatUpdateInterval = flag.Int("vreplication_heartbeat_update_interval", 1, "Frequency (in seconds) at which the time_updated column of a vreplication stream when idling") + vreplicationHeartbeatUpdateInterval = flag.Int("vreplication_heartbeat_update_interval", 1, "Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling") + // vreplicationMinimumHeartbeatUpdateInterval overrides vreplicationHeartbeatUpdateInterval if the latter is higher than this + // to ensure that it satisfies liveness criteria implicitly expected by internal processes like Online DDL + vreplicationMinimumHeartbeatUpdateInterval = 60 ) // vreplicator provides the core logic to start vreplication streams From 3e3617db240de4f11816f417c9704132fdb84cda Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 12 Mar 2021 19:02:15 +0100 Subject: [PATCH 4/4] Log a warning if provided update interval is greater than max allowed Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletmanager/vreplication/vreplicator.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go index 47c77324d15..ddd36ce9a02 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vreplicator.go @@ -98,6 +98,10 @@ type vreplicator struct { // More advanced constructs can be used. Please see the table plan builder // documentation for more info. func newVReplicator(id uint32, source *binlogdatapb.BinlogSource, sourceVStreamer VStreamerClient, stats *binlogplayer.Stats, dbClient binlogplayer.DBClient, mysqld mysqlctl.MysqlDaemon, vre *Engine) *vreplicator { + if *vreplicationHeartbeatUpdateInterval > vreplicationMinimumHeartbeatUpdateInterval { + log.Warningf("the supplied value for vreplication_heartbeat_update_interval:%d seconds is larger than the maximum allowed:%d seconds, vreplication will fallback to %d", + *vreplicationHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval, vreplicationMinimumHeartbeatUpdateInterval) + } return &vreplicator{ vre: vre, id: id,