From db62f041e726a235ab35c35fd1fece8587919483 Mon Sep 17 00:00:00 2001 From: yoheimuta Date: Fri, 26 Aug 2022 11:52:52 +0900 Subject: [PATCH 01/18] VSCopy: Demonstrate to fail a test case on which the vstream API is supposed to resume the copy phase consistently Signed-off-by: yoheimuta --- go/vt/vtgate/endtoend/vstream_test.go | 89 +++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 477bb2518b5..c9ba0e98b2a 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -232,6 +232,95 @@ func TestVStreamCopyBasic(t *testing.T) { } } +func TestVStreamCopyResume(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() + + _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + if err != nil { + t.Fatal(err) + } + + mpos, err := mconn.PrimaryPosition() + if err != nil { + t.Fatal(err) + } + + _, err = conn.ExecuteFetch("update t1 set id2 = 10 where id1 = 1", 1, false) + if err != nil { + t.Fatal(err) + } + + _, err = conn.ExecuteFetch("insert into t1(id1,id2) values(9,9)", 1, false) + if err != nil { + t.Fatal(err) + } + + lastPK := sqltypes.Result{ + Fields: []*query.Field{{Name: "id1", Type: query.Type_INT32}}, + Rows: [][]sqltypes.Value{{sqltypes.NewInt32(4)}}, + } + qr := sqltypes.ResultToProto3(&lastPK) + tablePKs := []*binlogdatapb.TableLastPK{{ + TableName: "t1", + Lastpk: qr, + }} + var shardGtids []*binlogdatapb.ShardGtid + var vgtid = &binlogdatapb.VGtid{} + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "-80", + Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos), + TablePKs: tablePKs, + }) + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "80-", + Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos), + TablePKs: tablePKs, + }) + vgtid.ShardGtids = shardGtids + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + } + flags := &vtgatepb.VStreamFlags{} + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + _, _ = conn, mconn + if err != nil { + t.Fatal(err) + } + numExpectedRowEvents := 2 /* catchup events */ + 5 /* copy events */ + require.NotNil(t, reader) + var evs []*binlogdatapb.VEvent + for { + e, err := reader.Recv() + switch err { + case nil: + for _, ev := range e { + if ev.Type == binlogdatapb.VEventType_ROW { + evs = append(evs, ev) + printEvents(evs) // for debugging ci failures + } + } + if len(evs) == numExpectedRowEvents { + t.Logf("TestVStreamCopyResume was successful") + return + } + case io.EOF: + log.Infof("stream ended\n") + cancel() + default: + log.Errorf("Returned err %v", err) + t.Fatalf("remote error: %v\n", err) + } + } +} + func TestVStreamCurrent(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 0556705788ee94d4ac9c1f1c2afa71b6363dd09d Mon Sep 17 00:00:00 2001 From: yoheimuta Date: Fri, 26 Aug 2022 11:54:00 +0900 Subject: [PATCH 02/18] VSCopy: Resume the copy phase consistently from given GTID and lastpk Signed-off-by: yoheimuta --- .../vttablet/tabletserver/vstreamer/uvstreamer.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 40bf27dd0cf..4eec1e3689d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -253,9 +253,15 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb. } if !shouldSend && tableName != "" { shouldSend = true - _, ok := uvs.plans[tableName] + plan, ok := uvs.plans[tableName] if ok { - shouldSend = false + // Ideally we should compare the PKs and never send the rows whose PK has already been copied. + // For now, we send all the changes by accepting the duplicate events. + if plan.tablePK != nil && plan.tablePK.Lastpk != nil { + shouldSend = true + } else { + shouldSend = false + } } } if shouldSend { @@ -351,7 +357,9 @@ func (uvs *uvstreamer) init() error { if err := uvs.setStreamStartPosition(); err != nil { return err } - } else if uvs.startPos == "" || len(uvs.inTablePKs) > 0 { + } + + if uvs.startPos == "" || len(uvs.inTablePKs) > 0 { if err := uvs.buildTablePlan(); err != nil { return err } From bdcf51b17b57bc04b605f10cc9170da5f2c410d9 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 29 Aug 2022 16:20:55 -0400 Subject: [PATCH 03/18] Build out the unit test some more Signed-off-by: Matt Lord --- go/vt/vtgate/endtoend/vstream_test.go | 76 +++++++++++++++++++++------ 1 file changed, 61 insertions(+), 15 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index c9ba0e98b2a..497e65b3ecd 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "sort" "sync" "testing" @@ -243,43 +244,48 @@ func TestVStreamCopyResume(t *testing.T) { t.Fatal(err) } + // Any subsequent GTIDs will be part of the stream mpos, err := mconn.PrimaryPosition() if err != nil { t.Fatal(err) } - _, err = conn.ExecuteFetch("update t1 set id2 = 10 where id1 = 1", 1, false) - if err != nil { - t.Fatal(err) - } - + // This GTID should end up as a no-op because we should have copied the + // existing row _, err = conn.ExecuteFetch("insert into t1(id1,id2) values(9,9)", 1, false) if err != nil { t.Fatal(err) } + // lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9) lastPK := sqltypes.Result{ - Fields: []*query.Field{{Name: "id1", Type: query.Type_INT32}}, - Rows: [][]sqltypes.Value{{sqltypes.NewInt32(4)}}, + Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}}, + Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}}, } - qr := sqltypes.ResultToProto3(&lastPK) - tablePKs := []*binlogdatapb.TableLastPK{{ + tableLastPK := []*binlogdatapb.TableLastPK{{ TableName: "t1", - Lastpk: qr, + Lastpk: sqltypes.ResultToProto3(&lastPK), }} + + // This GTID must have a before and after value + _, err = conn.ExecuteFetch("update t1 set id2 = 10 where id1 = 1", 1, false) + if err != nil { + t.Fatal(err) + } + var shardGtids []*binlogdatapb.ShardGtid var vgtid = &binlogdatapb.VGtid{} shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ Keyspace: "ks", Shard: "-80", Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos), - TablePKs: tablePKs, + TablePKs: tableLastPK, }) shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ Keyspace: "ks", Shard: "80-", Gtid: fmt.Sprintf("%s/%s", mpos.GTIDSet.Flavor(), mpos), - TablePKs: tablePKs, + TablePKs: tableLastPK, }) vgtid.ShardGtids = shardGtids filter := &binlogdatapb.Filter{ @@ -290,12 +296,23 @@ func TestVStreamCopyResume(t *testing.T) { } flags := &vtgatepb.VStreamFlags{} reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) - _, _ = conn, mconn if err != nil { t.Fatal(err) } - numExpectedRowEvents := 2 /* catchup events */ + 5 /* copy events */ require.NotNil(t, reader) + + expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9) + expectedCatchupEvents := 2 // id1=9 and id2=9; id2=10 where id1=1 + rowCopyEvents, replCatchupEvents := 0, 0 + expectedEvents := []string{ + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + } var evs []*binlogdatapb.VEvent for { e, err := reader.Recv() @@ -304,10 +321,19 @@ func TestVStreamCopyResume(t *testing.T) { for _, ev := range e { if ev.Type == binlogdatapb.VEventType_ROW { evs = append(evs, ev) + if ev.Timestamp == 0 { + rowCopyEvents++ + } else { + replCatchupEvents++ + } printEvents(evs) // for debugging ci failures } } - if len(evs) == numExpectedRowEvents { + if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents { + sort.Sort(VEventSorter(evs)) + for i, ev := range evs { + require.Regexp(t, expectedEvents[i], ev.String()) + } t.Logf("TestVStreamCopyResume was successful") return } @@ -485,3 +511,23 @@ func printEvents(evs []*binlogdatapb.VEvent) { s += "===END===" + "\n" log.Infof("%s", s) } + +// Sort the VEvents by the first row change's after value bytes primarily, with +// secondary ordering by timestamp (ASC). Note that row copy events do not have +// a timestamp and the value will be 0. +type VEventSorter []*binlogdatapb.VEvent + +func (v VEventSorter) Len() int { + return len(v) +} +func (v VEventSorter) Swap(i, j int) { + v[i], v[j] = v[j], v[i] +} +func (v VEventSorter) Less(i, j int) bool { + valI := string(v[i].GetRowEvent().RowChanges[0].After.Values) + valJ := string(v[j].GetRowEvent().RowChanges[0].After.Values) + if valI == valJ { + return v[i].Timestamp < v[j].Timestamp + } + return valI < valJ +} From fc4743d1423cd5f02a33e299df1b992ca303d448 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 29 Aug 2022 20:54:58 -0400 Subject: [PATCH 04/18] Update tests for new behavior Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go | 12 ++++++++---- .../tabletserver/vstreamer/uvstreamer_flaky_test.go | 3 +++ 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 4eec1e3689d..258d2e768e4 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -253,11 +253,15 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb. } if !shouldSend && tableName != "" { shouldSend = true - plan, ok := uvs.plans[tableName] - if ok { - // Ideally we should compare the PKs and never send the rows whose PK has already been copied. - // For now, we send all the changes by accepting the duplicate events. + // If the event is on a table we haven't yet fully copied... + if plan, ok := uvs.plans[tableName]; ok { + // If there's a lastPK value then we're in the middle of a copy phase if plan.tablePK != nil && plan.tablePK.Lastpk != nil { + // Ideally we should compare the PKs and never send the rows whose PK has already been copied. + // For now, we send all the changes by allowing for any duplicate events -- meaning that we + // apply events in the stream for an inserted row, e.g. table:t2 pk:9, even though we will + // later copy the t2 table contents and copy that same row event again -- which should become + // harmless no-ops. shouldSend = true } else { shouldSend = false diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index fdd60b8207f..94387720f2e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -245,6 +245,7 @@ commit;" } numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) + numCopyEvents += 2 /* Repeated no-op events -- 1 field, 1 row -- from t2 row insert in stream before t2 table copy starts */ numCopyEvents += 2 /* GTID + Test event after all copy is done */ numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ @@ -478,6 +479,8 @@ var expectedEvents = []string{ "type:BEGIN", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"11110\"}}}", + "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}", + "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:2 lengths:3 values:\"11220\"}}}", "type:GTID", "type:COMMIT", //insert for t2 done along with t1 does not generate an event since t2 is not yet copied "type:OTHER gtid:\"Copy Start t2\"", From 2c5042a1a41f88aadb5d4327a9ec0e6892ab3e88 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 29 Aug 2022 21:48:29 -0400 Subject: [PATCH 05/18] Improve comments Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 258d2e768e4..8efb8863341 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -255,13 +255,13 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb. shouldSend = true // If the event is on a table we haven't yet fully copied... if plan, ok := uvs.plans[tableName]; ok { - // If there's a lastPK value then we're in the middle of a copy phase + // If there's a lastPK value then we're in the middle of a table's copy phase if plan.tablePK != nil && plan.tablePK.Lastpk != nil { - // Ideally we should compare the PKs and never send the rows whose PK has already been copied. - // For now, we send all the changes by allowing for any duplicate events -- meaning that we - // apply events in the stream for an inserted row, e.g. table:t2 pk:9, even though we will - // later copy the t2 table contents and copy that same row event again -- which should become - // harmless no-ops. + // Ideally we should compare the PKs and only send events for rows which have been copied. + // For now, we send all changes and allow for any duplicate events -- meaning that e.g. + // we apply events in the stream for a row insert, table:t2 pk:9, even though we will + // later copy the t2 table contents and copy that same row event again -- which should + // become harmless no-ops if the row did not change in the interim. shouldSend = true } else { shouldSend = false From 9971669e28b63e6c8fecdea106dd07c935cf1844 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 30 Aug 2022 00:57:20 -0400 Subject: [PATCH 06/18] Limit uvstreamer changes and update test Signed-off-by: Matt Lord --- go/vt/vtgate/endtoend/vstream_test.go | 182 ++++++++++++------ .../tabletserver/vstreamer/uvstreamer.go | 26 +-- 2 files changed, 125 insertions(+), 83 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 497e65b3ecd..a523d5e1eb5 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -236,28 +236,63 @@ func TestVStreamCopyBasic(t *testing.T) { func TestVStreamCopyResume(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - gconn, conn, mconn, closeConnections := initialize(ctx, t) - defer closeConnections() + gconn, conn, mconn, _ := initialize(ctx, t) + defer conn.Close() + defer mconn.Close() - _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, + } + flags := &vtgatepb.VStreamFlags{} + var shardGtids []*binlogdatapb.ShardGtid + var vgtid = &binlogdatapb.VGtid{} + + // Start with an empty position + mpos := mysql.Position{} + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "-80", + Gtid: "", + }) + shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ + Keyspace: "ks", + Shard: "80-", + Gtid: "", + }) + vgtid.ShardGtids = shardGtids + + _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4)", 1, false) if err != nil { t.Fatal(err) } - // Any subsequent GTIDs will be part of the stream - mpos, err := mconn.PrimaryPosition() + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) if err != nil { t.Fatal(err) } + require.NotNil(t, reader) + expectedRowCopyEvents := 4 // id1 and id2 IN(1,2,3,4) + expectedCatchupEvents := 0 + expectedEvents := []string{ + `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"11"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"22"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"33"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"44"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + } + confirmVStreamEvents(t, cancel, reader, expectedEvents, expectedRowCopyEvents, expectedCatchupEvents) - // This GTID should end up as a no-op because we should have copied the - // existing row - _, err = conn.ExecuteFetch("insert into t1(id1,id2) values(9,9)", 1, false) + // close the connection and the vstream with it + gconn.Close() + + // Now we want to pick up where we left off, so we get the current position and set + // the lastPK to what it was, id1=4, so we should only copy rows for id1 IN(5,6,7,8,9) + mpos, err = mconn.PrimaryPosition() if err != nil { t.Fatal(err) } - - // lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9) lastPK := sqltypes.Result{ Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}}, Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}}, @@ -266,15 +301,7 @@ func TestVStreamCopyResume(t *testing.T) { TableName: "t1", Lastpk: sqltypes.ResultToProto3(&lastPK), }} - - // This GTID must have a before and after value - _, err = conn.ExecuteFetch("update t1 set id2 = 10 where id1 = 1", 1, false) - if err != nil { - t.Fatal(err) - } - - var shardGtids []*binlogdatapb.ShardGtid - var vgtid = &binlogdatapb.VGtid{} + shardGtids = []*binlogdatapb.ShardGtid{} shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ Keyspace: "ks", Shard: "-80", @@ -288,63 +315,45 @@ func TestVStreamCopyResume(t *testing.T) { TablePKs: tableLastPK, }) vgtid.ShardGtids = shardGtids - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1", - }}, + + // Open a new connection (as we closed the original) + gconn, err = vtgateconn.Dial(ctx, grpcAddress) + if err != nil { + t.Fatal(err) } - flags := &vtgatepb.VStreamFlags{} - reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + defer gconn.Close() + + reader, err = gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) if err != nil { t.Fatal(err) } require.NotNil(t, reader) - expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9) - expectedCatchupEvents := 2 // id1=9 and id2=9; id2=10 where id1=1 - rowCopyEvents, replCatchupEvents := 0, 0 - expectedEvents := []string{ - `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + _, err = conn.ExecuteFetch("insert into t1(id1,id2) values (5,5), (6,6), (7,7), (8,8), (9,9)", 1, false) + if err != nil { + t.Fatal(err) + } + _, err = conn.ExecuteFetch("update t1 set id2 = 119 where id1 = 2", 1, false) + if err != nil { + t.Fatal(err) + } + _, err = conn.ExecuteFetch("update t1 set id2 = 219 where id1 = 7", 1, false) + if err != nil { + t.Fatal(err) + } + + expectedRowCopyEvents = 5 // id1 and id2 IN(5,6,7,8,9) + expectedCatchupEvents = 2 + expectedEvents = []string{ + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{before:{lengths:1 lengths:1 values:"22"} after:{lengths:1 lengths:3 values:"2119"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{before:{lengths:1 lengths:1 values:"77"} after:{lengths:1 lengths:3 values:"7219"}} keyspace:"ks" shard:"80-"} current_time:[0-9]+ keyspace:"ks" shard:"80-"`, `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, - `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, - } - var evs []*binlogdatapb.VEvent - for { - e, err := reader.Recv() - switch err { - case nil: - for _, ev := range e { - if ev.Type == binlogdatapb.VEventType_ROW { - evs = append(evs, ev) - if ev.Timestamp == 0 { - rowCopyEvents++ - } else { - replCatchupEvents++ - } - printEvents(evs) // for debugging ci failures - } - } - if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents { - sort.Sort(VEventSorter(evs)) - for i, ev := range evs { - require.Regexp(t, expectedEvents[i], ev.String()) - } - t.Logf("TestVStreamCopyResume was successful") - return - } - case io.EOF: - log.Infof("stream ended\n") - cancel() - default: - log.Errorf("Returned err %v", err) - t.Fatalf("remote error: %v\n", err) - } } + confirmVStreamEvents(t, cancel, reader, expectedEvents, expectedRowCopyEvents, expectedCatchupEvents) } func TestVStreamCurrent(t *testing.T) { @@ -496,6 +505,51 @@ func TestVStreamSharded(t *testing.T) { } +// confirmVStreamEvents confirms that you get the expected number and +// type of events from a vtgate vstream. It also confirms the specific +// events using expectedEvents, which is a string of valid regexes to +// match against that should be sorted in lexicographical order by the +// AFTER value, with secondary ordering by the timestamp of the event +// with no timestamp being 0. This is needed as we do not guarantee +// a strict ordering of events but only eventual consistency and we +// want to have determinstism for the test to avoid flakes. +func confirmVStreamEvents(t *testing.T, cancel context.CancelFunc, reader vtgateconn.VStreamReader, expectedEvents []string, expectedRowCopyEventCount, expectedCatchupEventCount int) { + var evs []*binlogdatapb.VEvent + var rowCopyEvents, replCatchupEvents int + for { + e, err := reader.Recv() + switch err { + case nil: + for _, ev := range e { + if ev.Type == binlogdatapb.VEventType_ROW { + evs = append(evs, ev) + t.Logf("Event: %v", ev) + if ev.Timestamp == 0 { + rowCopyEvents++ + } else { + replCatchupEvents++ + } + printEvents(evs) // for debugging ci failures + } + } + if expectedCatchupEventCount == replCatchupEvents && expectedRowCopyEventCount == rowCopyEvents { + sort.Sort(VEventSorter(evs)) + for i, ev := range evs { + require.Regexp(t, expectedEvents[i], ev.String()) + } + t.Logf("TestVStreamCopyResume was successful") + return + } + case io.EOF: + log.Infof("stream ended\n") + cancel() + default: + log.Errorf("Returned err %v", err) + t.Fatalf("remote error: %v\n", err) + } + } +} + var printMu sync.Mutex func printEvents(evs []*binlogdatapb.VEvent) { diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 8efb8863341..78842e4a446 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -253,19 +253,9 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb. } if !shouldSend && tableName != "" { shouldSend = true - // If the event is on a table we haven't yet fully copied... - if plan, ok := uvs.plans[tableName]; ok { - // If there's a lastPK value then we're in the middle of a table's copy phase - if plan.tablePK != nil && plan.tablePK.Lastpk != nil { - // Ideally we should compare the PKs and only send events for rows which have been copied. - // For now, we send all changes and allow for any duplicate events -- meaning that e.g. - // we apply events in the stream for a row insert, table:t2 pk:9, even though we will - // later copy the t2 table contents and copy that same row event again -- which should - // become harmless no-ops if the row did not change in the interim. - shouldSend = true - } else { - shouldSend = false - } + _, ok := uvs.plans[tableName] + if ok { + shouldSend = false } } if shouldSend { @@ -357,16 +347,14 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) { } func (uvs *uvstreamer) init() error { - if uvs.startPos != "" { - if err := uvs.setStreamStartPosition(); err != nil { - return err - } - } - if uvs.startPos == "" || len(uvs.inTablePKs) > 0 { if err := uvs.buildTablePlan(); err != nil { return err } + } else if uvs.startPos != "" { + if err := uvs.setStreamStartPosition(); err != nil { + return err + } } if uvs.pos.IsZero() && (len(uvs.plans) == 0) { From 2a8dd96171e00f66e544f9c4735607670dd5ba20 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 30 Aug 2022 01:01:02 -0400 Subject: [PATCH 07/18] Revert uvstreamer test changes Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 94387720f2e..fdd60b8207f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -245,7 +245,6 @@ commit;" } numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) - numCopyEvents += 2 /* Repeated no-op events -- 1 field, 1 row -- from t2 row insert in stream before t2 table copy starts */ numCopyEvents += 2 /* GTID + Test event after all copy is done */ numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ @@ -479,8 +478,6 @@ var expectedEvents = []string{ "type:BEGIN", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"11110\"}}}", - "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}", - "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:2 lengths:3 values:\"11220\"}}}", "type:GTID", "type:COMMIT", //insert for t2 done along with t1 does not generate an event since t2 is not yet copied "type:OTHER gtid:\"Copy Start t2\"", From ef896bbb65564ec6bacf97fe2a7c367d67662227 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 30 Aug 2022 11:25:49 -0400 Subject: [PATCH 08/18] Revert all uvstream changes Signed-off-by: Matt Lord --- go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 78842e4a446..40bf27dd0cf 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -347,12 +347,12 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) { } func (uvs *uvstreamer) init() error { - if uvs.startPos == "" || len(uvs.inTablePKs) > 0 { - if err := uvs.buildTablePlan(); err != nil { + if uvs.startPos != "" { + if err := uvs.setStreamStartPosition(); err != nil { return err } - } else if uvs.startPos != "" { - if err := uvs.setStreamStartPosition(); err != nil { + } else if uvs.startPos == "" || len(uvs.inTablePKs) > 0 { + if err := uvs.buildTablePlan(); err != nil { return err } } From 6d99dba39e5d987edc2a2ee64aee58545f1f5e85 Mon Sep 17 00:00:00 2001 From: yoheimuta Date: Wed, 31 Aug 2022 16:24:41 +0900 Subject: [PATCH 09/18] VCopy: Revert the last three commits Signed-off-by: yoheimuta --- go/vt/vtgate/endtoend/vstream_test.go | 182 ++++++------------ .../tabletserver/vstreamer/uvstreamer.go | 20 +- .../vstreamer/uvstreamer_flaky_test.go | 3 + 3 files changed, 83 insertions(+), 122 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index a523d5e1eb5..497e65b3ecd 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -236,63 +236,28 @@ func TestVStreamCopyBasic(t *testing.T) { func TestVStreamCopyResume(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - gconn, conn, mconn, _ := initialize(ctx, t) - defer conn.Close() - defer mconn.Close() - - filter := &binlogdatapb.Filter{ - Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1", - }}, - } - flags := &vtgatepb.VStreamFlags{} - var shardGtids []*binlogdatapb.ShardGtid - var vgtid = &binlogdatapb.VGtid{} - - // Start with an empty position - mpos := mysql.Position{} - shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ - Keyspace: "ks", - Shard: "-80", - Gtid: "", - }) - shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ - Keyspace: "ks", - Shard: "80-", - Gtid: "", - }) - vgtid.ShardGtids = shardGtids + gconn, conn, mconn, closeConnections := initialize(ctx, t) + defer closeConnections() - _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4)", 1, false) + _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) if err != nil { t.Fatal(err) } - reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + // Any subsequent GTIDs will be part of the stream + mpos, err := mconn.PrimaryPosition() if err != nil { t.Fatal(err) } - require.NotNil(t, reader) - expectedRowCopyEvents := 4 // id1 and id2 IN(1,2,3,4) - expectedCatchupEvents := 0 - expectedEvents := []string{ - `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"11"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, - `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"22"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, - `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"33"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, - `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"44"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, - } - confirmVStreamEvents(t, cancel, reader, expectedEvents, expectedRowCopyEvents, expectedCatchupEvents) - // close the connection and the vstream with it - gconn.Close() - - // Now we want to pick up where we left off, so we get the current position and set - // the lastPK to what it was, id1=4, so we should only copy rows for id1 IN(5,6,7,8,9) - mpos, err = mconn.PrimaryPosition() + // This GTID should end up as a no-op because we should have copied the + // existing row + _, err = conn.ExecuteFetch("insert into t1(id1,id2) values(9,9)", 1, false) if err != nil { t.Fatal(err) } + + // lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9) lastPK := sqltypes.Result{ Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}}, Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}}, @@ -301,7 +266,15 @@ func TestVStreamCopyResume(t *testing.T) { TableName: "t1", Lastpk: sqltypes.ResultToProto3(&lastPK), }} - shardGtids = []*binlogdatapb.ShardGtid{} + + // This GTID must have a before and after value + _, err = conn.ExecuteFetch("update t1 set id2 = 10 where id1 = 1", 1, false) + if err != nil { + t.Fatal(err) + } + + var shardGtids []*binlogdatapb.ShardGtid + var vgtid = &binlogdatapb.VGtid{} shardGtids = append(shardGtids, &binlogdatapb.ShardGtid{ Keyspace: "ks", Shard: "-80", @@ -315,45 +288,63 @@ func TestVStreamCopyResume(t *testing.T) { TablePKs: tableLastPK, }) vgtid.ShardGtids = shardGtids - - // Open a new connection (as we closed the original) - gconn, err = vtgateconn.Dial(ctx, grpcAddress) - if err != nil { - t.Fatal(err) + filter := &binlogdatapb.Filter{ + Rules: []*binlogdatapb.Rule{{ + Match: "t1", + Filter: "select * from t1", + }}, } - defer gconn.Close() - - reader, err = gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) + flags := &vtgatepb.VStreamFlags{} + reader, err := gconn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) if err != nil { t.Fatal(err) } require.NotNil(t, reader) - _, err = conn.ExecuteFetch("insert into t1(id1,id2) values (5,5), (6,6), (7,7), (8,8), (9,9)", 1, false) - if err != nil { - t.Fatal(err) - } - _, err = conn.ExecuteFetch("update t1 set id2 = 119 where id1 = 2", 1, false) - if err != nil { - t.Fatal(err) - } - _, err = conn.ExecuteFetch("update t1 set id2 = 219 where id1 = 7", 1, false) - if err != nil { - t.Fatal(err) - } - - expectedRowCopyEvents = 5 // id1 and id2 IN(5,6,7,8,9) - expectedCatchupEvents = 2 - expectedEvents = []string{ - `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{before:{lengths:1 lengths:1 values:"22"} after:{lengths:1 lengths:3 values:"2119"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9) + expectedCatchupEvents := 2 // id1=9 and id2=9; id2=10 where id1=1 + rowCopyEvents, replCatchupEvents := 0, 0 + expectedEvents := []string{ + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, - `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{before:{lengths:1 lengths:1 values:"77"} after:{lengths:1 lengths:3 values:"7219"}} keyspace:"ks" shard:"80-"} current_time:[0-9]+ keyspace:"ks" shard:"80-"`, `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + } + var evs []*binlogdatapb.VEvent + for { + e, err := reader.Recv() + switch err { + case nil: + for _, ev := range e { + if ev.Type == binlogdatapb.VEventType_ROW { + evs = append(evs, ev) + if ev.Timestamp == 0 { + rowCopyEvents++ + } else { + replCatchupEvents++ + } + printEvents(evs) // for debugging ci failures + } + } + if expectedCatchupEvents == replCatchupEvents && expectedRowCopyEvents == rowCopyEvents { + sort.Sort(VEventSorter(evs)) + for i, ev := range evs { + require.Regexp(t, expectedEvents[i], ev.String()) + } + t.Logf("TestVStreamCopyResume was successful") + return + } + case io.EOF: + log.Infof("stream ended\n") + cancel() + default: + log.Errorf("Returned err %v", err) + t.Fatalf("remote error: %v\n", err) + } } - confirmVStreamEvents(t, cancel, reader, expectedEvents, expectedRowCopyEvents, expectedCatchupEvents) } func TestVStreamCurrent(t *testing.T) { @@ -505,51 +496,6 @@ func TestVStreamSharded(t *testing.T) { } -// confirmVStreamEvents confirms that you get the expected number and -// type of events from a vtgate vstream. It also confirms the specific -// events using expectedEvents, which is a string of valid regexes to -// match against that should be sorted in lexicographical order by the -// AFTER value, with secondary ordering by the timestamp of the event -// with no timestamp being 0. This is needed as we do not guarantee -// a strict ordering of events but only eventual consistency and we -// want to have determinstism for the test to avoid flakes. -func confirmVStreamEvents(t *testing.T, cancel context.CancelFunc, reader vtgateconn.VStreamReader, expectedEvents []string, expectedRowCopyEventCount, expectedCatchupEventCount int) { - var evs []*binlogdatapb.VEvent - var rowCopyEvents, replCatchupEvents int - for { - e, err := reader.Recv() - switch err { - case nil: - for _, ev := range e { - if ev.Type == binlogdatapb.VEventType_ROW { - evs = append(evs, ev) - t.Logf("Event: %v", ev) - if ev.Timestamp == 0 { - rowCopyEvents++ - } else { - replCatchupEvents++ - } - printEvents(evs) // for debugging ci failures - } - } - if expectedCatchupEventCount == replCatchupEvents && expectedRowCopyEventCount == rowCopyEvents { - sort.Sort(VEventSorter(evs)) - for i, ev := range evs { - require.Regexp(t, expectedEvents[i], ev.String()) - } - t.Logf("TestVStreamCopyResume was successful") - return - } - case io.EOF: - log.Infof("stream ended\n") - cancel() - default: - log.Errorf("Returned err %v", err) - t.Fatalf("remote error: %v\n", err) - } - } -} - var printMu sync.Mutex func printEvents(evs []*binlogdatapb.VEvent) { diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 40bf27dd0cf..8efb8863341 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -253,9 +253,19 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb. } if !shouldSend && tableName != "" { shouldSend = true - _, ok := uvs.plans[tableName] - if ok { - shouldSend = false + // If the event is on a table we haven't yet fully copied... + if plan, ok := uvs.plans[tableName]; ok { + // If there's a lastPK value then we're in the middle of a table's copy phase + if plan.tablePK != nil && plan.tablePK.Lastpk != nil { + // Ideally we should compare the PKs and only send events for rows which have been copied. + // For now, we send all changes and allow for any duplicate events -- meaning that e.g. + // we apply events in the stream for a row insert, table:t2 pk:9, even though we will + // later copy the t2 table contents and copy that same row event again -- which should + // become harmless no-ops if the row did not change in the interim. + shouldSend = true + } else { + shouldSend = false + } } } if shouldSend { @@ -351,7 +361,9 @@ func (uvs *uvstreamer) init() error { if err := uvs.setStreamStartPosition(); err != nil { return err } - } else if uvs.startPos == "" || len(uvs.inTablePKs) > 0 { + } + + if uvs.startPos == "" || len(uvs.inTablePKs) > 0 { if err := uvs.buildTablePlan(); err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index fdd60b8207f..94387720f2e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -245,6 +245,7 @@ commit;" } numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) + numCopyEvents += 2 /* Repeated no-op events -- 1 field, 1 row -- from t2 row insert in stream before t2 table copy starts */ numCopyEvents += 2 /* GTID + Test event after all copy is done */ numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ @@ -478,6 +479,8 @@ var expectedEvents = []string{ "type:BEGIN", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"11110\"}}}", + "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}", + "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:2 lengths:3 values:\"11220\"}}}", "type:GTID", "type:COMMIT", //insert for t2 done along with t1 does not generate an event since t2 is not yet copied "type:OTHER gtid:\"Copy Start t2\"", From 60550f1168842899c9df805e36199a17eb28c8af Mon Sep 17 00:00:00 2001 From: yoheimuta Date: Wed, 31 Aug 2022 18:18:46 +0900 Subject: [PATCH 10/18] VCopy: Add a new vstream type that allows picking up where we left off Signed-off-by: yoheimuta --- go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 8efb8863341..2c4ab8b87e9 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -65,6 +65,10 @@ type uvstreamer struct { plans map[string]*tablePlan tablesToCopy []string + // particular type for the purpose that the client wants to pick up where it left off during a previous copy phase + // It turns on only if the input parameters of startPos and inTablePKs are given simultaneously. + pickingUpInputOffset bool + // changes for each table being copied fields []*querypb.Field pkfields []*querypb.Field @@ -255,8 +259,9 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb. shouldSend = true // If the event is on a table we haven't yet fully copied... if plan, ok := uvs.plans[tableName]; ok { - // If there's a lastPK value then we're in the middle of a table's copy phase - if plan.tablePK != nil && plan.tablePK.Lastpk != nil { + // If the client means to pick up where it left off and there's a lastPK value + // then we're in the middle of a table's copy phase + if uvs.pickingUpInputOffset && plan.tablePK != nil && plan.tablePK.Lastpk != nil { // Ideally we should compare the PKs and only send events for rows which have been copied. // For now, we send all changes and allow for any duplicate events -- meaning that e.g. // we apply events in the stream for a row insert, table:t2 pk:9, even though we will @@ -369,6 +374,10 @@ func (uvs *uvstreamer) init() error { } } + if uvs.startPos != "" && len(uvs.inTablePKs) > 0 { + uvs.pickingUpInputOffset = true + } + if uvs.pos.IsZero() && (len(uvs.plans) == 0) { return fmt.Errorf("stream needs a position or a table to copy") } From a1410c47d52f1fe09333f180190ff81a9abb60d1 Mon Sep 17 00:00:00 2001 From: yoheimuta Date: Wed, 31 Aug 2022 21:37:43 +0900 Subject: [PATCH 11/18] VCopy: Revert the unit test change Signed-off-by: yoheimuta --- go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 94387720f2e..fdd60b8207f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -245,7 +245,6 @@ commit;" } numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) - numCopyEvents += 2 /* Repeated no-op events -- 1 field, 1 row -- from t2 row insert in stream before t2 table copy starts */ numCopyEvents += 2 /* GTID + Test event after all copy is done */ numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ @@ -479,8 +478,6 @@ var expectedEvents = []string{ "type:BEGIN", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"11110\"}}}", - "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}", - "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:2 lengths:3 values:\"11220\"}}}", "type:GTID", "type:COMMIT", //insert for t2 done along with t1 does not generate an event since t2 is not yet copied "type:OTHER gtid:\"Copy Start t2\"", From 7cbbd1cadb79a40c1f2d6875d8ea9b11de0a747f Mon Sep 17 00:00:00 2001 From: yoheimuta Date: Thu, 1 Sep 2022 13:17:42 +0900 Subject: [PATCH 12/18] VCopy: Fix the end-to-end CI test Signed-off-by: yoheimuta --- go/vt/vtgate/endtoend/main_test.go | 12 ++++++++++++ go/vt/vtgate/endtoend/vstream_test.go | 26 +++++++++++++------------- 2 files changed, 25 insertions(+), 13 deletions(-) diff --git a/go/vt/vtgate/endtoend/main_test.go b/go/vt/vtgate/endtoend/main_test.go index df604c003cc..17cf3e6dd01 100644 --- a/go/vt/vtgate/endtoend/main_test.go +++ b/go/vt/vtgate/endtoend/main_test.go @@ -45,6 +45,12 @@ create table t1( primary key(id1) ) Engine=InnoDB; +create table t1_copy_resume( + id1 bigint, + id2 bigint, + primary key(id1) +) Engine=InnoDB; + create table t1_id2_idx( id2 bigint, keyspace_id varbinary(10), @@ -133,6 +139,12 @@ create table t1_sharded( Name: "t1_id2_vdx", }}, }, + "t1_copy_resume": { + ColumnVindexes: []*vschemapb.ColumnVindex{{ + Column: "id1", + Name: "hash", + }}, + }, "t1_sharded": { ColumnVindexes: []*vschemapb.ColumnVindex{{ Column: "id1", diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 497e65b3ecd..ebd4819ef98 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -239,7 +239,7 @@ func TestVStreamCopyResume(t *testing.T) { gconn, conn, mconn, closeConnections := initialize(ctx, t) defer closeConnections() - _, err := conn.ExecuteFetch("insert into t1(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) + _, err := conn.ExecuteFetch("insert into t1_copy_resume(id1,id2) values(1,1), (2,2), (3,3), (4,4), (5,5), (6,6), (7,7), (8,8)", 1, false) if err != nil { t.Fatal(err) } @@ -252,7 +252,7 @@ func TestVStreamCopyResume(t *testing.T) { // This GTID should end up as a no-op because we should have copied the // existing row - _, err = conn.ExecuteFetch("insert into t1(id1,id2) values(9,9)", 1, false) + _, err = conn.ExecuteFetch("insert into t1_copy_resume(id1,id2) values(9,9)", 1, false) if err != nil { t.Fatal(err) } @@ -263,12 +263,12 @@ func TestVStreamCopyResume(t *testing.T) { Rows: [][]sqltypes.Value{{sqltypes.NewInt64(4)}}, } tableLastPK := []*binlogdatapb.TableLastPK{{ - TableName: "t1", + TableName: "t1_copy_resume", Lastpk: sqltypes.ResultToProto3(&lastPK), }} // This GTID must have a before and after value - _, err = conn.ExecuteFetch("update t1 set id2 = 10 where id1 = 1", 1, false) + _, err = conn.ExecuteFetch("update t1_copy_resume set id2 = 10 where id1 = 1", 1, false) if err != nil { t.Fatal(err) } @@ -290,8 +290,8 @@ func TestVStreamCopyResume(t *testing.T) { vgtid.ShardGtids = shardGtids filter := &binlogdatapb.Filter{ Rules: []*binlogdatapb.Rule{{ - Match: "t1", - Filter: "select * from t1", + Match: "t1_copy_resume", + Filter: "select * from t1_copy_resume", }}, } flags := &vtgatepb.VStreamFlags{} @@ -305,13 +305,13 @@ func TestVStreamCopyResume(t *testing.T) { expectedCatchupEvents := 2 // id1=9 and id2=9; id2=10 where id1=1 rowCopyEvents, replCatchupEvents := 0, 0 expectedEvents := []string{ - `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, - `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, - `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, - `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, - `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, - `type:ROW row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, - `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, } var evs []*binlogdatapb.VEvent for { From ab242da903e98e048c540f6b9f74b85f4eb705cf Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 1 Nov 2022 13:30:43 +0100 Subject: [PATCH 13/18] Update logic for setting up uvstreamer based on input vgtid/tablepks. Add more catchup events to test Signed-off-by: Rohit Nayak --- go/vt/vtgate/endtoend/vstream_test.go | 40 +++++---- .../tabletserver/vstreamer/uvstreamer.go | 83 ++++++++++--------- 2 files changed, 69 insertions(+), 54 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index ebd4819ef98..88f9eb5487d 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -250,13 +250,6 @@ func TestVStreamCopyResume(t *testing.T) { t.Fatal(err) } - // This GTID should end up as a no-op because we should have copied the - // existing row - _, err = conn.ExecuteFetch("insert into t1_copy_resume(id1,id2) values(9,9)", 1, false) - if err != nil { - t.Fatal(err) - } - // lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9) lastPK := sqltypes.Result{ Fields: []*query.Field{{Name: "id1", Type: query.Type_INT64}}, @@ -267,10 +260,17 @@ func TestVStreamCopyResume(t *testing.T) { Lastpk: sqltypes.ResultToProto3(&lastPK), }} - // This GTID must have a before and after value - _, err = conn.ExecuteFetch("update t1_copy_resume set id2 = 10 where id1 = 1", 1, false) - if err != nil { - t.Fatal(err) + catchupQueries := []string{ + "insert into t1_copy_resume(id1,id2) values(9,9)", // this row will show up twice: once in catchup and copy + "update t1_copy_resume set id2 = 10 where id1 = 1", + "delete from t1_copy_resume where id1 = 1", + "update t1_copy_resume set id2 = 90 where id1 = 9", + } + for _, query := range catchupQueries { + _, err = conn.ExecuteFetch(query, 1, false) + if err != nil { + require.NoError(t, err) + } } var shardGtids []*binlogdatapb.ShardGtid @@ -302,16 +302,18 @@ func TestVStreamCopyResume(t *testing.T) { require.NotNil(t, reader) expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9) - expectedCatchupEvents := 2 // id1=9 and id2=9; id2=10 where id1=1 + expectedCatchupEvents := len(catchupQueries) rowCopyEvents, replCatchupEvents := 0, 0 expectedEvents := []string{ `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"55"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"66"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"77"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"88"}} keyspace:"ks" shard:"80-"} keyspace:"ks" shard:"80-"`, - `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:1 values:"99"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, + `type:ROW row_event:{table_name:"ks.t1_copy_resume" row_changes:{after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} keyspace:"ks" shard:"-80"`, + `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"99"} after:{lengths:1 lengths:2 values:"990"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, } var evs []*binlogdatapb.VEvent for { @@ -524,8 +526,16 @@ func (v VEventSorter) Swap(i, j int) { v[i], v[j] = v[j], v[i] } func (v VEventSorter) Less(i, j int) bool { - valI := string(v[i].GetRowEvent().RowChanges[0].After.Values) - valJ := string(v[j].GetRowEvent().RowChanges[0].After.Values) + valsI := v[i].GetRowEvent().RowChanges[0].After + if valsI == nil { + valsI = v[i].GetRowEvent().RowChanges[0].Before + } + valsJ := v[j].GetRowEvent().RowChanges[0].After + if valsJ == nil { + valsJ = v[j].GetRowEvent().RowChanges[0].Before + } + valI := string(valsI.Values) + valJ := string(valsJ.Values) if valI == valJ { return v[i].Timestamp < v[j].Timestamp } diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 2c4ab8b87e9..978a3ea29f0 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -65,10 +65,6 @@ type uvstreamer struct { plans map[string]*tablePlan tablesToCopy []string - // particular type for the purpose that the client wants to pick up where it left off during a previous copy phase - // It turns on only if the input parameters of startPos and inTablePKs are given simultaneously. - pickingUpInputOffset bool - // changes for each table being copied fields []*querypb.Field pkfields []*querypb.Field @@ -222,7 +218,8 @@ func getQuery(tableName string, filter string) string { query = buf.String() case key.IsKeyRange(filter): buf := sqlparser.NewTrackedBuffer(nil) - buf.Myprintf("select * from %v where in_keyrange(%v)", sqlparser.NewIdentifierCS(tableName), sqlparser.NewStrLiteral(filter)) + buf.Myprintf("select * from %v where in_keyrange(%v)", + sqlparser.NewIdentifierCS(tableName), sqlparser.NewStrLiteral(filter)) query = buf.String() } return query @@ -233,7 +230,28 @@ func (uvs *uvstreamer) Cancel() { uvs.cancel() } -// during copy phase only send streaming events (during catchup/fastforward) for pks already seen +// Only send events for tables whose copy phase is complete or in progress +// Todo: filter out events for rows not yet copied. we can only do this as a best-effort for comparable PKs. +func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool { + plan, ok := uvs.plans[tableName] + // Event is for a table which is not in its copy phase. + if !ok { + return true + } + // Event is for a table whose copy phase is yet to be started + if plan.tablePK == nil || plan.tablePK.Lastpk == nil { + return false + } + // Table is currently in its copy phase. + // We may send duplicate insert events or update/delete events for rows not yet seen to the client + // for the table being copied. This is ok as the client is expected to be + // idempotent: we only promise at-least-once semantics for VStream API (not exactly-once). + // Aside: vreplication workflows handle at-least-once by adding where clauses that render DML queries, related to + // events for rows not yet copied, as no-ops. + return true +} + +// Do not send internal heartbeat events. Filter out events for tables whose copy has not been started. func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb.VEvent { if len(uvs.plans) == 0 { return evs @@ -243,36 +261,21 @@ func (uvs *uvstreamer) filterEvents(evs []*binlogdatapb.VEvent) []*binlogdatapb. var shouldSend bool for _, ev := range evs { - shouldSend = false - tableName = "" switch ev.Type { case binlogdatapb.VEventType_ROW: tableName = ev.RowEvent.TableName case binlogdatapb.VEventType_FIELD: tableName = ev.FieldEvent.TableName + default: + tableName = "" + } + switch ev.Type { case binlogdatapb.VEventType_HEARTBEAT: shouldSend = false default: - shouldSend = true - } - if !shouldSend && tableName != "" { - shouldSend = true - // If the event is on a table we haven't yet fully copied... - if plan, ok := uvs.plans[tableName]; ok { - // If the client means to pick up where it left off and there's a lastPK value - // then we're in the middle of a table's copy phase - if uvs.pickingUpInputOffset && plan.tablePK != nil && plan.tablePK.Lastpk != nil { - // Ideally we should compare the PKs and only send events for rows which have been copied. - // For now, we send all changes and allow for any duplicate events -- meaning that e.g. - // we apply events in the stream for a row insert, table:t2 pk:9, even though we will - // later copy the t2 table contents and copy that same row event again -- which should - // become harmless no-ops if the row did not change in the interim. - shouldSend = true - } else { - shouldSend = false - } - } + shouldSend = uvs.shouldSendEventForTable(tableName, ev) } + if shouldSend { evs2 = append(evs2, ev) } @@ -346,7 +349,9 @@ func (uvs *uvstreamer) setStreamStartPosition() error { } if !curPos.AtLeast(pos) { uvs.vse.errorCounts.Add("GTIDSet Mismatch", 1) - return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) + return vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, + "GTIDSet Mismatch: requested source position:%v, current target vrep position: %v", + mysql.EncodePosition(pos), mysql.EncodePosition(curPos)) } uvs.pos = pos return nil @@ -361,23 +366,22 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) { return conn.PrimaryPosition() } +// Possible states: +// 1. TablePKs nil, startPos set to gtid or "current" => start replicating from pos +// 2. TablePKs nil, startPos empty => full table copy of tables matching filter +// 3. TablePKs not nil, startPos empty => table copy (for pks > lastPK) +// 4. TablePKs not nil, startPos set => run catchup from startPos, then table copy (for pks > lastPK) func (uvs *uvstreamer) init() error { - if uvs.startPos != "" { - if err := uvs.setStreamStartPosition(); err != nil { + if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* copy specified tables */ { + if err := uvs.buildTablePlan(); err != nil { return err } } - - if uvs.startPos == "" || len(uvs.inTablePKs) > 0 { - if err := uvs.buildTablePlan(); err != nil { + if uvs.startPos != "" { + if err := uvs.setStreamStartPosition(); err != nil { return err } } - - if uvs.startPos != "" && len(uvs.inTablePKs) > 0 { - uvs.pickingUpInputOffset = true - } - if uvs.pos.IsZero() && (len(uvs.plans) == 0) { return fmt.Errorf("stream needs a position or a table to copy") } @@ -399,7 +403,8 @@ func (uvs *uvstreamer) Stream() error { } uvs.sendTestEvent("Copy Done") } - vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) + vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), + uvs.filter, uvs.getVSchema(), uvs.send, "replicate", uvs.vse) uvs.setVs(vs) return vs.Stream() From 6d7d5165345b71d164acf1416bac201a00cad915 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Tue, 1 Nov 2022 23:36:55 +0100 Subject: [PATCH 14/18] Refactor logic to decide if event is to be sent. Enhance unit and e2e tests. Signed-off-by: Rohit Nayak --- go/vt/vtgate/endtoend/vstream_test.go | 5 +++-- .../tabletserver/vstreamer/uvstreamer.go | 18 +++++++++++------- .../vstreamer/uvstreamer_flaky_test.go | 11 +++++++++-- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index 88f9eb5487d..b13527ecdcc 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -263,6 +263,7 @@ func TestVStreamCopyResume(t *testing.T) { catchupQueries := []string{ "insert into t1_copy_resume(id1,id2) values(9,9)", // this row will show up twice: once in catchup and copy "update t1_copy_resume set id2 = 10 where id1 = 1", + "insert into t1(id1, id2) values(100,100)", "delete from t1_copy_resume where id1 = 1", "update t1_copy_resume set id2 = 90 where id1 = 9", } @@ -301,8 +302,8 @@ func TestVStreamCopyResume(t *testing.T) { } require.NotNil(t, reader) - expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9) - expectedCatchupEvents := len(catchupQueries) + expectedRowCopyEvents := 5 // id1 and id2 IN(5,6,7,8,9) + expectedCatchupEvents := len(catchupQueries) - 1 // insert into t1 should never reach rowCopyEvents, replCatchupEvents := 0, 0 expectedEvents := []string{ `type:ROW timestamp:[0-9]+ row_event:{table_name:"ks.t1_copy_resume" row_changes:{before:{lengths:1 lengths:1 values:"11"} after:{lengths:1 lengths:2 values:"110"}} keyspace:"ks" shard:"-80"} current_time:[0-9]+ keyspace:"ks" shard:"-80"`, diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 978a3ea29f0..d5d09942c8a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -230,25 +230,29 @@ func (uvs *uvstreamer) Cancel() { uvs.cancel() } +// We have not yet implemented the logic to check if an event for a row that is already copied, +// so we always return true there so that we don't miss events +func (uvs *uvstreamer) isRowCopied(tableName string, ev *binlogdatapb.VEvent) bool { + return true +} + // Only send events for tables whose copy phase is complete or in progress // Todo: filter out events for rows not yet copied. we can only do this as a best-effort for comparable PKs. func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool { - plan, ok := uvs.plans[tableName] + _, ok := uvs.plans[tableName] // Event is for a table which is not in its copy phase. if !ok { return true } - // Event is for a table whose copy phase is yet to be started - if plan.tablePK == nil || plan.tablePK.Lastpk == nil { - return false - } - // Table is currently in its copy phase. + + // Table is currently in its copy phase. We have not yet implemented the logic to check if + // an event for a row that is already copied, so we always return true there so that we don't miss events // We may send duplicate insert events or update/delete events for rows not yet seen to the client // for the table being copied. This is ok as the client is expected to be // idempotent: we only promise at-least-once semantics for VStream API (not exactly-once). // Aside: vreplication workflows handle at-least-once by adding where clauses that render DML queries, related to // events for rows not yet copied, as no-ops. - return true + return uvs.isRowCopied(tableName, ev) } // Do not send internal heartbeat events. Filter out events for tables whose copy has not been started. diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index fdd60b8207f..d803dbe9255 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -182,6 +182,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { uvstreamerTestMode = true defer func() { uvstreamerTestMode = false }() initialize(t) + if err := engine.se.Reload(context.Background()); err != nil { t.Fatal("Error reloading schema") } @@ -246,7 +247,7 @@ commit;" numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) numCopyEvents += 2 /* GTID + Test event after all copy is done */ - numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/ + numCatchupEvents := 2*5 + 1*7 /*1 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT, 1 t1+t2 BEGIN+FIELD+ROW+GTID+FIELD+ROW+COMMIT*/ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */ numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */ @@ -408,7 +409,11 @@ func getTablePK(table string, idx int) *binlogdatapb.TableLastPK { } func insertRow(t *testing.T, table string, idx int, id int) { - execStatement(t, fmt.Sprintf(insertQuery, table, idx, idx, id, id*idx*10)) + query := fmt.Sprintf(insertQuery, table, idx, idx, id, id*idx*10) + if table == "t2" { + log.Infof("&&&&&&&&&&&&&&& %s", query) + } + execStatement(t, query) } func printAllEvents(msg string) { @@ -478,6 +483,8 @@ var expectedEvents = []string{ "type:BEGIN", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"11110\"}}}", + "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}", + "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:2 lengths:3 values:\"11220\"}}}", "type:GTID", "type:COMMIT", //insert for t2 done along with t1 does not generate an event since t2 is not yet copied "type:OTHER gtid:\"Copy Start t2\"", From 3e24e87fccf54cf1169ebe413610248eb7ea53d9 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Wed, 2 Nov 2022 13:50:47 +0100 Subject: [PATCH 15/18] Don't send events for tables which we can identify as ones we haven't started copy for Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go | 9 +++++++-- .../tabletserver/vstreamer/uvstreamer_flaky_test.go | 10 +++++++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index d5d09942c8a..7b07d4afc53 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -236,15 +236,20 @@ func (uvs *uvstreamer) isRowCopied(tableName string, ev *binlogdatapb.VEvent) bo return true } -// Only send events for tables whose copy phase is complete or in progress +// Only send catchup/fastforward events for tables whose copy phase is complete or in progress // Todo: filter out events for rows not yet copied. we can only do this as a best-effort for comparable PKs. func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool { - _, ok := uvs.plans[tableName] + table, ok := uvs.plans[tableName] // Event is for a table which is not in its copy phase. if !ok { return true } + // if table copy was not started and no tablePK was specified we can ignore catchup/fastforward events for it + if table.tablePK == nil || table.tablePK.Lastpk == nil { + return false + } + // Table is currently in its copy phase. We have not yet implemented the logic to check if // an event for a row that is already copied, so we always return true there so that we don't miss events // We may send duplicate insert events or update/delete events for rows not yet seen to the client diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index d803dbe9255..5afcc2de9f1 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -191,6 +191,12 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { var tablePKs []*binlogdatapb.TableLastPK for i, table := range testState.tables { rules = append(rules, getRule(table)) + + // for table t2, let tablepk be nil, so that we don't send events + if table == "t2" { + continue + } + tablePKs = append(tablePKs, getTablePK(table, i+1)) } filter := &binlogdatapb.Filter{ @@ -247,7 +253,7 @@ commit;" numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) numCopyEvents += 2 /* GTID + Test event after all copy is done */ - numCatchupEvents := 2*5 + 1*7 /*1 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT, 1 t1+t2 BEGIN+FIELD+ROW+GTID+FIELD+ROW+COMMIT*/ + numCatchupEvents := 3 * 5 /*1 t1, 2 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */ numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */ @@ -483,8 +489,6 @@ var expectedEvents = []string{ "type:BEGIN", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"}}", "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:2 lengths:3 values:\"11110\"}}}", - "type:FIELD field_event:{table_name:\"t2\" fields:{name:\"id21\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id21\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id22\" type:INT32 table:\"t2\" org_table:\"t2\" database:\"vttest\" org_name:\"id22\" column_length:11 charset:63 column_type:\"int(11)\"}}", - "type:ROW row_event:{table_name:\"t2\" row_changes:{after:{lengths:2 lengths:3 values:\"11220\"}}}", "type:GTID", "type:COMMIT", //insert for t2 done along with t1 does not generate an event since t2 is not yet copied "type:OTHER gtid:\"Copy Start t2\"", From 431717261d698680c21c55ee3229f4627b146303 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Thu, 3 Nov 2022 21:22:21 +0100 Subject: [PATCH 16/18] Minor changes after self-review Signed-off-by: Rohit Nayak --- go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go | 12 ++++++------ .../tabletserver/vstreamer/uvstreamer_flaky_test.go | 10 +++------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 7b07d4afc53..435e1b3044e 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -230,14 +230,14 @@ func (uvs *uvstreamer) Cancel() { uvs.cancel() } -// We have not yet implemented the logic to check if an event for a row that is already copied, -// so we always return true there so that we don't miss events +// We have not yet implemented the logic to check if an event is for a row that is already copied, +// so we always return true so that we send all events for this table and so we don't miss events. func (uvs *uvstreamer) isRowCopied(tableName string, ev *binlogdatapb.VEvent) bool { return true } -// Only send catchup/fastforward events for tables whose copy phase is complete or in progress -// Todo: filter out events for rows not yet copied. we can only do this as a best-effort for comparable PKs. +// Only send catchup/fastforward events for tables whose copy phase is complete or in progress. +// todo: filter out events for rows not yet copied. Note that we can only do this as a best-effort for comparable PKs. func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool { table, ok := uvs.plans[tableName] // Event is for a table which is not in its copy phase. @@ -251,7 +251,7 @@ func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatap } // Table is currently in its copy phase. We have not yet implemented the logic to check if - // an event for a row that is already copied, so we always return true there so that we don't miss events + // an event for a row that is already copied, so we always return true there so that we don't miss events. // We may send duplicate insert events or update/delete events for rows not yet seen to the client // for the table being copied. This is ok as the client is expected to be // idempotent: we only promise at-least-once semantics for VStream API (not exactly-once). @@ -376,7 +376,7 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) { } // Possible states: -// 1. TablePKs nil, startPos set to gtid or "current" => start replicating from pos +// 1. TablePKs nil, startPos set to gtid or "current" => start replicating from pos // 2. TablePKs nil, startPos empty => full table copy of tables matching filter // 3. TablePKs not nil, startPos empty => table copy (for pks > lastPK) // 4. TablePKs not nil, startPos set => run catchup from startPos, then table copy (for pks > lastPK) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index 5afcc2de9f1..1ed673ebf90 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -192,7 +192,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { for i, table := range testState.tables { rules = append(rules, getRule(table)) - // for table t2, let tablepk be nil, so that we don't send events + // for table t2, let tablepk be nil, so that we don't send events for the insert in initTables() if table == "t2" { continue } @@ -253,7 +253,7 @@ commit;" numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) numCopyEvents += 2 /* GTID + Test event after all copy is done */ - numCatchupEvents := 3 * 5 /*1 t1, 2 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */ + numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */ numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */ @@ -415,11 +415,7 @@ func getTablePK(table string, idx int) *binlogdatapb.TableLastPK { } func insertRow(t *testing.T, table string, idx int, id int) { - query := fmt.Sprintf(insertQuery, table, idx, idx, id, id*idx*10) - if table == "t2" { - log.Infof("&&&&&&&&&&&&&&& %s", query) - } - execStatement(t, query) + execStatement(t, fmt.Sprintf(insertQuery, table, idx, idx, id, id*idx*10)) } func printAllEvents(msg string) { From 1edd07cecc04a7976a910112684f81f48fb03cf8 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 7 Nov 2022 18:47:26 -0500 Subject: [PATCH 17/18] Add vstream copy resume to release notes Signed-off-by: Matt Lord --- doc/releasenotes/16_0_0_summary.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/doc/releasenotes/16_0_0_summary.md b/doc/releasenotes/16_0_0_summary.md index 8347d75b8c0..34c04569fb5 100644 --- a/doc/releasenotes/16_0_0_summary.md +++ b/doc/releasenotes/16_0_0_summary.md @@ -2,10 +2,19 @@ - [New command line flags and behavior](#new-command-line-flags-and-behavior) +- **[VReplication](#vreplication)** + - [VStream Copy Resume](#vstream-copy-resume) + ## Known Issues ## Major Changes +### VReplication + +#### VStream Copy Resume + +In [PR #11103](https://github.com/vitessio/vitess/pull/11103) we introduced the ability to resume a `VTGate` [`VStream` copy operation](https://vitess.io/docs/design-docs/vreplication/vstream/vscopy/). This is useful when a [`VStream` copy operation](https://vitess.io/docs/design-docs/vreplication/vstream/vscopy/) is interrupted due to e.g. a network failure or a server restart. The `VStream` copy operation can be resumed by specifying each table's last seen primary key value in the `VStream` request. Please see the [`VStream` docs](https://vitess.io/docs/16.0/reference/vreplication/vstream/) for more details. + ### Breaking Changes #### Orchestrator Integration Deletion From 03dcf6d0aa59aaaccf0407ed21d92d6bd09b428a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 10 Nov 2022 10:04:47 -0500 Subject: [PATCH 18/18] Address review comments Signed-off-by: Matt Lord --- go/vt/vtgate/endtoend/vstream_test.go | 8 ++------ .../tabletserver/vstreamer/uvstreamer.go | 19 +++++++++++-------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/go/vt/vtgate/endtoend/vstream_test.go b/go/vt/vtgate/endtoend/vstream_test.go index b13527ecdcc..a13aac8291d 100644 --- a/go/vt/vtgate/endtoend/vstream_test.go +++ b/go/vt/vtgate/endtoend/vstream_test.go @@ -246,9 +246,7 @@ func TestVStreamCopyResume(t *testing.T) { // Any subsequent GTIDs will be part of the stream mpos, err := mconn.PrimaryPosition() - if err != nil { - t.Fatal(err) - } + require.NoError(t, err) // lastPK is id1=4, meaning we should only copy rows for id1 IN(5,6,7,8,9) lastPK := sqltypes.Result{ @@ -269,9 +267,7 @@ func TestVStreamCopyResume(t *testing.T) { } for _, query := range catchupQueries { _, err = conn.ExecuteFetch(query, 1, false) - if err != nil { - require.NoError(t, err) - } + require.NoError(t, err) } var shardGtids []*binlogdatapb.ShardGtid diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index 435e1b3044e..ad9cc99197c 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -237,7 +237,9 @@ func (uvs *uvstreamer) isRowCopied(tableName string, ev *binlogdatapb.VEvent) bo } // Only send catchup/fastforward events for tables whose copy phase is complete or in progress. -// todo: filter out events for rows not yet copied. Note that we can only do this as a best-effort for comparable PKs. +// This ensures we fulfill the at-least-once delivery semantics for events. +// TODO: filter out events for rows not yet copied. Note that we can only do this as a best-effort +// for comparable PKs. func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatapb.VEvent) bool { table, ok := uvs.plans[tableName] // Event is for a table which is not in its copy phase. @@ -250,13 +252,14 @@ func (uvs *uvstreamer) shouldSendEventForTable(tableName string, ev *binlogdatap return false } - // Table is currently in its copy phase. We have not yet implemented the logic to check if - // an event for a row that is already copied, so we always return true there so that we don't miss events. - // We may send duplicate insert events or update/delete events for rows not yet seen to the client - // for the table being copied. This is ok as the client is expected to be + // Table is currently in its copy phase. We have not yet implemented the logic to + // check if an event is for a row that is already copied, so we always return true + // there so that we don't miss events. + // We may send duplicate insert events or update/delete events for rows not yet seen + // to the client for the table being copied. This is ok as the client is expected to be // idempotent: we only promise at-least-once semantics for VStream API (not exactly-once). - // Aside: vreplication workflows handle at-least-once by adding where clauses that render DML queries, related to - // events for rows not yet copied, as no-ops. + // Aside: vreplication workflows handle at-least-once by adding where clauses that render + // DML queries, related to events for rows not yet copied, as no-ops. return uvs.isRowCopied(tableName, ev) } @@ -381,7 +384,7 @@ func (uvs *uvstreamer) currentPosition() (mysql.Position, error) { // 3. TablePKs not nil, startPos empty => table copy (for pks > lastPK) // 4. TablePKs not nil, startPos set => run catchup from startPos, then table copy (for pks > lastPK) func (uvs *uvstreamer) init() error { - if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* copy specified tables */ { + if uvs.startPos == "" /* full copy */ || len(uvs.inTablePKs) > 0 /* resume copy */ { if err := uvs.buildTablePlan(); err != nil { return err }