Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (uvs *uvstreamer) catchup(ctx context.Context) error {

errch := make(chan error, 1)
go func() {
uvs.stopPos = replication.Position{} // reset stopPos which was potentially set during fastforward
startPos := replication.EncodePosition(uvs.pos)
vs := newVStreamer(ctx, uvs.cp, uvs.se, startPos, "", uvs.filter, uvs.getVSchema(), uvs.throttlerApp, uvs.send2, "catchup", uvs.vse, nil)
uvs.setVs(vs)
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,11 @@ func newUVStreamer(ctx context.Context, vse *Engine, cp dbconfigs.Connector, se
ev.Keyspace = vse.keyspace
ev.Shard = vse.shard
}
return send(evs)
err := send(evs)
if err != nil {
log.Infof("uvstreamer replicate send() returned with err %v", err)
}
return err
}
uvs := &uvstreamer{
ctx: ctx,
Expand Down Expand Up @@ -327,17 +331,20 @@ func (uvs *uvstreamer) send2(evs []*binlogdatapb.VEvent) error {
}
err := uvs.send(evs2)
if err != nil && err != io.EOF {
log.Infof("uvstreamer catchup/fastforward send() returning with send error %v", err)
return err
}
for _, ev := range evs2 {
if ev.Type == binlogdatapb.VEventType_GTID {
uvs.pos, _ = replication.DecodePosition(ev.Gtid)
if !uvs.stopPos.IsZero() && uvs.pos.AtLeast(uvs.stopPos) {
log.Infof("Reached stop position %v, returning io.EOF", uvs.stopPos)
err = io.EOF
}
}
}
if err != nil {
log.Infof("uvstreamer catchup/fastforward returning with EOF error %v", err)
uvs.vse.errorCounts.Add("Send", 1)
}
return err
Expand Down
22 changes: 21 additions & 1 deletion go/vt/vttablet/tabletserver/vstreamer/uvstreamer_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {

numCopyEvents := 3 /*t1,t2,t3*/ * (numInitialRows + 1 /*FieldEvent*/ + 1 /*LastPKEvent*/ + 1 /*TestEvent: Copy Start*/ + 2 /*begin,commit*/ + 3 /* LastPK Completed*/)
numCopyEvents += 2 /* GTID + Event after all copy is done */
numCatchupEvents := 3 * 5 /* 2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */
numCatchupEvents := 4 * 5 /* 3 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT */
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
numReplicateEvents := 2*5 /* insert into t1/t2 */ + 6 /* begin/field/2 inserts/gtid/commit */
Expand Down Expand Up @@ -398,6 +398,21 @@ func initTables(t *testing.T, tables []string) {
}
}
}
callbacks["LASTPK.*t2.*complete"] = func() {
ctx := context.Background()
idx := 1
id := numInitialRows + 100
table := "t1"
query1 := fmt.Sprintf(insertQuery, table, idx, idx, id, id*idx*10)
queries := []string{
"begin",
query1,
"commit",
}
env.Mysqld.ExecuteSuperQueryList(ctx, queries)
log.Infof("Position after insert into t1 and t2 after t2 complete: %s", primaryPosition(t))

}
positions["afterInitialInsert"] = primaryPosition(t)
}

Expand Down Expand Up @@ -528,6 +543,11 @@ var expectedEvents = []string{
"type:BEGIN",
"type:LASTPK last_p_k_event:{table_last_p_k:{table_name:\"t2\"} completed:true}",
"type:COMMIT",
"type:BEGIN",
"type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"} enum_set_string_values:true}",
"type:ROW row_event:{table_name:\"t1\" row_changes:{after:{lengths:3 lengths:4 values:\"1101100\"}}}",
"type:GTID",
"type:COMMIT",
fmt.Sprintf("type:OTHER gtid:\"%s t3\"", copyPhaseStart),
"type:BEGIN",
"type:FIELD field_event:{table_name:\"t1\" fields:{name:\"id11\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id11\" column_length:11 charset:63 column_type:\"int(11)\"} fields:{name:\"id12\" type:INT32 table:\"t1\" org_table:\"t1\" database:\"vttest\" org_name:\"id12\" column_length:11 charset:63 column_type:\"int(11)\"} enum_set_string_values:true}",
Expand Down
Loading