diff --git a/doc/releasenotes/16_0_0_summary.md b/doc/releasenotes/16_0_0_summary.md
index 8347d75b8c0..34c04569fb5 100644
--- a/doc/releasenotes/16_0_0_summary.md
+++ b/doc/releasenotes/16_0_0_summary.md
@@ -2,10 +2,19 @@
- [New command line flags and behavior](#new-command-line-flags-and-behavior)
+- **[VReplication](#vreplication)**
+ - [VStream Copy Resume](#vstream-copy-resume)
+
## Known Issues
## Major Changes
+### VReplication
+
+#### VStream Copy Resume
+
+In [PR #11103](https://github.com/vitessio/vitess/pull/11103) we introduced the ability to resume a `VTGate` [`VStream` copy operation](https://vitess.io/docs/design-docs/vreplication/vstream/vscopy/). This is useful when a [`VStream` copy operation](https://vitess.io/docs/design-docs/vreplication/vstream/vscopy/) is interrupted due to e.g. a network failure or a server restart. The `VStream` copy operation can be resumed by specifying each table's last seen primary key value in the `VStream` request. Please see the [`VStream` docs](https://vitess.io/docs/16.0/reference/vreplication/vstream/) for more details.
+
### Breaking Changes
#### Orchestrator Integration Deletion
diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go
index 046af36a3dd..40582441a1f 100644
--- a/go/vt/vtgate/endtoend/main_test.go
+++ b/go/vt/vtgate/endtoend/main_test.go
@@ -46,6 +46,12 @@ create table t1(
primary key(id1)
) Engine=InnoDB;
+create table t1_copy_resume(
+ id1 bigint,
+ id2 bigint,
+ primary key(id1)
+) Engine=InnoDB;
+
create table t1_id2_idx(
id2 bigint,
keyspace_id varbinary(10),
@@ -134,6 +140,12 @@ create table t1_sharded(
Name: "t1_id2_vdx",
}},
},
+ "t1_copy_resume": {
+ ColumnVindexes: []*vschemapb.ColumnVindex{{
+ Column: "id1",
+ Name: "hash",
+ }},
+ },
"t1_sharded": {
ColumnVindexes: []*vschemapb.ColumnVindex{{
Column: "id1",
diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go
index 477bb2518b5..a13aac8291d 100644
--- a/go/vt/vtgate/endtoend/vstream_test.go
+++ b/go/vt/vtgate/endtoend/vstream_test.go
@@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
+ "sort"
"sync"
"testing"
@@ -232,6 +233,119 @@ func TestVStreamCopyBasic(t *testing.T) {
}
}
+func TestVStreamCopyResume(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ gconn, conn, mconn, closeConnections := initialize(ctx, t)
+ defer closeConnections()
+
+ _, err := conn.ExecuteFetch("insert into t1_copy_resume(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Any subsequent GTIDs will be part of the stream
+ mpos, err := mconn.PrimaryPosition()
+ require.NoError(t, err)
+
+ // lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9)
+ lastPK := sqltypes.Result{
+ Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}},
+ Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}},
+ }
+ tableLastPK := []*binlogdatapb.TableLastPK{{
+ TableName: "t1_copy_resume",
+ Lastpk: sqltypes.ResultToProto3(&lastPK),
+ }}
+
+ catchupQueries := []string{
+ "insert into t1_copy_resume(id1,id2) values(9,9)", // this row will show up twice: once in catchup and copy
+ "update t1_copy_resume set id2 = 10 where id1 = 1",
+ "insert into t1(id1, id2) values(100,100)",
+ "delete from t1_copy_resume where id1 = 1",
+ "update t1_copy_resume set id2 = 90 where id1 = 9",
+ }
+ for _, query := range catchupQueries {
+ _, err = conn.ExecuteFetch(query, 1, false)
+ require.NoError(t, err)
+ }
+
+ var shardGtids []*binlogdatapb.ShardGtid
+ var vgtid = &binlogdatapb.VGtid{}
+ shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
+ Keyspace: "ks",
+ Shard: "-80",
+ Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
+ TablePKs: tableLastPK,
+ })
+ shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{
+ Keyspace: "ks",
+ Shard: "80-",
+ Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos),
+ TablePKs: tableLastPK,
+ })
+ vgtid.ShardGtids = shardGtids
+ filter := &binlogdatapb.Filter{
+ Rules: []*binlogdatapb.Rule{{
+ Match: "t1_copy_resume",
+ Filter: "select * from t1_copy_resume",
+ }},
+ }
+ flags := &vtgatepb.VStreamFlags{}
+ reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
+ if err != nil {
+ t.Fatal(err)
+ }
+ require.NotNil(t, reader)
+
+ expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9)
+ expectedCatchupEvents := len(catchupQueries) - 1 // insert into t1 should never reach
+ rowCopyEvents, replCatchupEvents := 0, 0
+ expectedEvents := []string{
+ `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
+ `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
+ `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
+ `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
+ `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
+ `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`,
+ `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
+ `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`,
+ `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"99"} after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`,
+ }
+ var evs []*binlogdatapb.VEvent
+ for {
+ e, err := reader.Recv()
+ switch err {
+ case nil:
+ for _, ev := range e {
+ if ev.Type == binlogdatapb.VEventType_ROW {
+ evs = append(evs, ev)
+ if ev.Timestamp == 0 {
+ rowCopyEvents++
+ } else {
+ replCatchupEvents++
+ }
+ printEvents(evs) // for debugging ci failures
+ }
+ }
+ if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents {
+ sort.Sort(VEventSorter(evs))
+ for i, ev := range evs {
+ require.Regexp(t, expectedEvents[i], ev.String())
+ }
+ t.Logf("TestVStreamCopyResume was successful")
+ return
+ }
+ case io.EOF:
+ log.Infof("stream ended\n")
+ cancel()
+ default:
+ log.Errorf("Returned err %v", err)
+ t.Fatalf("remote error: %v\n", err)
+ }
+ }
+}
+
func TestVStreamCurrent(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@@ -396,3 +510,31 @@ func printEvents(evs []*binlogdatapb.VEvent) {
s += "===END===" + "\n"
log.Infof("%s", s)
}
+
+// Sort the VEvents by the first row change's after value bytes primarily, with
+// secondary ordering by timestamp (ASC). Note that row copy events do not have
+// a timestamp and the value will be 0.
+type VEventSorter []*binlogdatapb.VEvent
+
+func (v VEventSorter) Len() int {
+ return len(v)
+}
+func (v VEventSorter) Swap(i, j int) {
+ v[i], v[j] = v[j], v[i]
+}
+func (v VEventSorter) Less(i, j int) bool {
+ valsI := v[i].GetRowEvent().RowChanges[0].After
+ if valsI == nil {
+ valsI = v[i].GetRowEvent().RowChanges[0].Before
+ }
+ valsJ := v[j].GetRowEvent().RowChanges[0].After
+ if valsJ == nil {
+ valsJ = v[j].GetRowEvent().RowChanges[0].Before
+ }
+ valI := string(valsI.Values)
+ valJ := string(valsJ.Values)
+ if valI == valJ {
+ return v[i].Timestamp < v[j].Timestamp
+ }
+ return valI < valJ
+}
diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
index 40bf27dd0cf..ad9cc99197c 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
@@ -218,7 +218,8 @@ func getQuery(tableName string, filter string) string {
query = buf.String()
case key.IsKeyRange(filter):
buf := sqlparser.NewTrackedBuffer(nil)
- buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewIdentifierCS(tableName), sqlparser.NewStrLiteral(filter))
+ buf.Myprintf("select * from %v where in_keyrange(%v)",
+ sqlparser.NewIdentifierCS(tableName), sqlparser.NewStrLiteral(filter))
query = buf.String()
}
return query
@@ -229,7 +230,40 @@ func (uvs *uvstreamer) Cancel() {
uvs.cancel()
}
-// during copy phase only send streaming events (during catchup/fastforward) for pks already seen
+// We have not yet implemented the logic to check if an event is for a row that is already copied,
+// so we always return true so that we send all events for this table and so we don't miss events.
+func (uvs *uvstreamer) isRowCopied(tableName string, ev *binlogdatapb.VEvent) bool {
+ return true
+}
+
+// Only send catchup/fastforward events for tables whose copy phase is complete or in progress.
+// This ensures we fulfill the at-least-once delivery semantics for events.
+// TODO: filter out events for rows not yet copied. Note that we can only do this as a best-effort
+// for comparable PKs.
+func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool {
+ table, ok := uvs.plans[tableName]
+ // Event is for a table which is not in its copy phase.
+ if !ok {
+ return true
+ }
+
+ // if table copy was not started and no tablePK was specified we can ignore catchup/fastforward events for it
+ if table.tablePK == nil || table.tablePK.Lastpk == nil {
+ return false
+ }
+
+ // Table is currently in its copy phase. We have not yet implemented the logic to
+ // check if an event is for a row that is already copied, so we always return true
+ // there so that we don't miss events.
+ // We may send duplicate insert events or update/delete events for rows not yet seen
+ // to the client for the table being copied. This is ok as the client is expected to be
+ // idempotent: we only promise at-least-once semantics for VStream API (not exactly-once).
+ // Aside: vreplication workflows handle at-least-once by adding where clauses that render
+ // DML queries, related to events for rows not yet copied, as no-ops.
+ return uvs.isRowCopied(tableName, ev)
+}
+
+// Do not send internal heartbeat events. Filter out events for tables whose copy has not been started.
func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.VEvent {
if len(uvs.plans) == 0 {
return evs
@@ -239,25 +273,21 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.
var shouldSend bool
for _, ev := range evs {
- shouldSend = false
- tableName = ""
switch ev.Type {
case binlogdatapb.VEventType_ROW:
tableName = ev.RowEvent.TableName
case binlogdatapb.VEventType_FIELD:
tableName = ev.FieldEvent.TableName
+ default:
+ tableName = ""
+ }
+ switch ev.Type {
case binlogdatapb.VEventType_HEARTBEAT:
shouldSend = false
default:
- shouldSend = true
- }
- if !shouldSend && tableName != "" {
- shouldSend = true
- _, ok := uvs.plans[tableName]
- if ok {
- shouldSend = false
- }
+ shouldSend = uvs.shouldSendEventForTable(tableName, ev)
}
+
if shouldSend {
evs2 = append(evs2, ev)
}
@@ -331,7 +361,9 @@ func (uvs *uvstreamer) setStreamStartPosition() error {
}
if !curPos.AtLeast(pos) {
uvs.vse.errorCounts.Add("GTIDSet Mismatch", 1)
- return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
+ return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
+ "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v",
+ mysql.EncodePosition(pos), mysql.EncodePosition(curPos))
}
uvs.pos = pos
return nil
@@ -346,17 +378,22 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) {
return conn.PrimaryPosition()
}
+// Possible states:
+// 1. TablePKs nil, startPos set to gtid or "current" => start replicating from pos
+// 2. TablePKs nil, startPos empty => full table copy of tables matching filter
+// 3. TablePKs not nil, startPos empty => table copy (for pks > lastPK)
+// 4. TablePKs not nil, startPos set => run catchup from startPos, then table copy (for pks > lastPK)
func (uvs *uvstreamer) init() error {
- if uvs.startPos != "" {
- if err := uvs.setStreamStartPosition(); err != nil {
+ if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* resume copy */ {
+ if err := uvs.buildTablePlan(); err != nil {
return err
}
- } else if uvs.startPos == "" || len(uvs.inTablePKs) > 0 {
- if err := uvs.buildTablePlan(); err != nil {
+ }
+ if uvs.startPos != "" {
+ if err := uvs.setStreamStartPosition(); err != nil {
return err
}
}
-
if uvs.pos.IsZero() && (len(uvs.plans) == 0) {
return fmt.Errorf("stream needs a position or a table to copy")
}
@@ -378,7 +415,8 @@ func (uvs *uvstreamer) Stream() error {
}
uvs.sendTestEvent("Copy Done")
}
- vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse)
+ vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos),
+ uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse)
uvs.setVs(vs)
return vs.Stream()
diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go
index fdd60b8207f..1ed673ebf90 100644
--- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go
+++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go
@@ -182,6 +182,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
uvstreamerTestMode = true
defer func() { uvstreamerTestMode = false }()
initialize(t)
+
if err := engine.se.Reload(context.Background()); err != nil {
t.Fatal("Error reloading schema")
}
@@ -190,6 +191,12 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
var tablePKs []*binlogdatapb.TableLastPK
for i, table := range testState.tables {
rules = append(rules, getRule(table))
+
+ // for table t2, let tablepk be nil, so that we don't send events for the insert in initTables()
+ if table == "t2" {
+ continue
+ }
+
tablePKs = append(tablePKs, getTablePK(table, i+1))
}
filter := &binlogdatapb.Filter{
@@ -246,7 +253,7 @@ commit;"
numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/)
numCopyEvents += 2 /* GTID + Test event after all copy is done */
- numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/
+ numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */