diff --git a/go/vt/vttablet/tabletserver/vstreamer/copy.go b/go/vt/vttablet/tabletserver/vstreamer/copy.go index 135984844ef..83d54f37da5 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/copy.go +++ b/go/vt/vttablet/tabletserver/vstreamer/copy.go @@ -73,6 +73,7 @@ func (uvs *uvstreamer) catchup(ctx context.Context) error { errch := make(chan error, 1) go func() { + uvs.stopPos = replication.Position{} // reset stopPos which was potentially set during fastforward startPos := replication.EncodePosition(uvs.pos) vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.throttlerApp, uvs.send2, "catchup", uvs.vse, nil) uvs.setVs(vs) diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go index bf8d4330831..dbdce6d8f22 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go @@ -112,7 +112,11 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se ev.Keyspace = vse.keyspace ev.Shard = vse.shard } - return send(evs) + err := send(evs) + if err != nil { + log.Infof("uvstreamer replicate send() returned with err %v", err) + } + return err } uvs := &uvstreamer{ ctx: ctx, @@ -327,17 +331,20 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error { } err := uvs.send(evs2) if err != nil && err != io.EOF { + log.Infof("uvstreamer catchup/fastforward send() returning with send error %v", err) return err } for _, ev := range evs2 { if ev.Type == binlogdatapb.VEventType_GTID { uvs.pos, _ = replication.DecodePosition(ev.Gtid) if !uvs.stopPos.IsZero() && uvs.pos.AtLeast(uvs.stopPos) { + log.Infof("Reached stop position %v, returning io.EOF", uvs.stopPos) err = io.EOF } } } if err != nil { + log.Infof("uvstreamer catchup/fastforward returning with EOF error %v", err) uvs.vse.errorCounts.Add("Send", 1) } return err diff --git a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go index e2972bb7071..e29c5fd5a82 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go +++ b/go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go @@ -255,7 +255,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) { numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/) numCopyEvents += 2 /* GTID + Event after all copy is done */ - numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */ + numCatchupEvents := 4 * 5 /* 3 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */ numFastForwardEvents := 5 /*t1:FIELD+ROW*/ numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */ numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */ @@ -398,6 +398,21 @@ func initTables(t *testing.T, tables []string) { } } } + callbacks["LASTPK.*t2.*complete"] = func() { + ctx := context.Background() + idx := 1 + id := numInitialRows + 100 + table := "t1" + query1 := fmt.Sprintf(insertQuery, table, idx, idx, id, id*idx*10) + queries := []string{ + "begin", + query1, + "commit", + } + env.Mysqld.ExecuteSuperQueryList(ctx, queries) + log.Infof("Position after insert into t1 and t2 after t2 complete: %s", primaryPosition(t)) + + } positions["afterInitialInsert"] = primaryPosition(t) } @@ -528,6 +543,11 @@ var expectedEvents = []string{ "type:BEGIN", "type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2\"} completed:true}", "type:COMMIT", + "type:BEGIN", + "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"} enum_set_string_values:true}", + "type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:3 lengths:4 values:\"1101100\"}}}", + "type:GTID", + "type:COMMIT", fmt.Sprintf("type:OTHER gtid:\"%s t3\"", copyPhaseStart), "type:BEGIN", "type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"} enum_set_string_values:true}",