Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions go/vt/vttablet/tabletserver/vstreamer/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ func (uvs *uvstreamer) catchup(ctx context.Context) error {
errch := make(chan error, 1)
go func() {
startPos := mysql.EncodePosition(uvs.pos)
vs := newVStreamer(ctx, uvs.cp, uvs.se, uvs.sh, startPos, "", uvs.filter, uvs.vschema, uvs.send2)
vs := newVStreamer(ctx, uvs.cp, uvs.se, uvs.sh, startPos, "", uvs.filter, uvs.getVSchema(), uvs.send2)
uvs.setVs(vs)
errch <- vs.Stream()
uvs.vs = nil
uvs.setVs(nil)
log.Infof("catchup vs.stream returned with vs.pos %s", vs.pos.String())
}()

Expand Down Expand Up @@ -207,9 +208,11 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
pos, _ := mysql.DecodePosition(rows.Gtid)
if !uvs.pos.IsZero() && !uvs.pos.AtLeast(pos) {
if err := uvs.fastForward(rows.Gtid); err != nil {
uvs.setVs(nil)
log.Infof("fastForward returned error %v", err)
return err
}
uvs.setVs(nil)
if mysql.EncodePosition(uvs.pos) != rows.Gtid {
return fmt.Errorf("position after fastforward was %s but stopPos was %s", uvs.pos, rows.Gtid)
}
Expand Down Expand Up @@ -273,7 +276,7 @@ func (uvs *uvstreamer) copyTable(ctx context.Context, tableName string) error {
func (uvs *uvstreamer) fastForward(stopPos string) error {
log.Infof("starting fastForward from %s upto pos %s", mysql.EncodePosition(uvs.pos), stopPos)
uvs.stopPos, _ = mysql.DecodePosition(stopPos)
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, uvs.sh, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.vschema, uvs.send2)
uvs.vs = vs
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, uvs.sh, mysql.EncodePosition(uvs.pos), "", uvs.filter, uvs.getVSchema(), uvs.send2)
uvs.setVs(vs)
return vs.Stream()
}
33 changes: 28 additions & 5 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ type uvstreamer struct {
startPos string
filter *binlogdatapb.Filter
inTablePKs []*binlogdatapb.TableLastPK
vschema *localVSchema

vschema *localVSchema

// map holds tables remaining to be fully copied, it is depleted as each table gets completely copied
plans map[string]*tablePlan
Expand All @@ -80,7 +81,7 @@ type uvstreamer struct {

config *uvstreamerConfig

vs *vstreamer //last vstreamer created in uvstreamer: FIXME currently used only for setting vschema, find another way?
vs *vstreamer //last vstreamer created in uvstreamer
}

type uvstreamerConfig struct {
Expand Down Expand Up @@ -367,20 +368,42 @@ func (uvs *uvstreamer) Stream() error {
}
uvs.sendTestEvent("Copy Done")
}
log.V(2).Infof("Starting replicate in uvstreamer.Stream()")
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, uvs.sh, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.vschema, uvs.send)
uvs.vs = vs
vs := newVStreamer(uvs.ctx, uvs.cp, uvs.se, uvs.sh, mysql.EncodePosition(uvs.pos), mysql.EncodePosition(uvs.stopPos), uvs.filter, uvs.getVSchema(), uvs.send)

uvs.setVs(vs)
return vs.Stream()
}

func (uvs *uvstreamer) lock(msg string) {
uvs.mu.Lock()
}

func (uvs *uvstreamer) unlock(msg string) {
uvs.mu.Unlock()
}

func (uvs *uvstreamer) setVs(vs *vstreamer) {
uvs.lock("setVs")
defer uvs.unlock("setVs")
uvs.vs = vs
}

// SetVSchema updates the vstreamer against the new vschema.
func (uvs *uvstreamer) SetVSchema(vschema *localVSchema) {
uvs.lock("SetVSchema")
defer uvs.unlock("SetVSchema")
uvs.vschema = vschema
if uvs.vs != nil {
uvs.vs.SetVSchema(vschema)
}
}

func (uvs *uvstreamer) getVSchema() *localVSchema {
uvs.lock("getVSchema")
defer uvs.unlock("getVSchema")
return uvs.vschema
}

func (uvs *uvstreamer) setCopyState(tableName string, qr *querypb.QueryResult) {
uvs.plans[tableName].tablePK.Lastpk = qr
}
Expand Down
45 changes: 24 additions & 21 deletions go/vt/vttablet/tabletserver/vstreamer/uvstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,10 +236,9 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
numCopyEvents += 2 /* GTID + Test event after all copy is done */
numCatchupEvents := 3 * 5 /*2 t1, 1 t2 : BEGIN+FIELD+ROW+GTID+COMMIT*/
numFastForwardEvents := 5 /*t1:FIELD+ROW*/
numIgnored := 3 // empty events
numMisc := 1 /* t2 insert during t1 catchup that comes in t2 copy */
numReplicateEvents := 3*5 /* insert into t1/t2/t3 */ + 4 /* second insert into t3, no FieldEvent */
numExpectedEvents := numCopyEvents + numCatchupEvents + numFastForwardEvents + numIgnored + numMisc + numReplicateEvents
numExpectedEvents := numCopyEvents + numCatchupEvents + numFastForwardEvents + numMisc + numReplicateEvents

var lastRowEventSeen bool

Expand Down Expand Up @@ -267,12 +266,8 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
}
muAllEvents.Lock()
defer muAllEvents.Unlock()
printAllEvents("End of test")
if len(allEvents) != numExpectedEvents {
log.Errorf("Received %d events, expected %d", len(allEvents), numExpectedEvents)
for _, ev := range allEvents {
log.Errorf("\t%s", ev)
}
printAllEvents(fmt.Sprintf("Received %d events, expected %d", len(allEvents), numExpectedEvents))
t.Fatalf("Received %d events, expected %d", len(allEvents), numExpectedEvents)
} else {
log.Infof("Successfully received %d events", numExpectedEvents)
Expand All @@ -281,14 +276,12 @@ func TestVStreamCopyCompleteFlow(t *testing.T) {
}

func validateReceivedEvents(t *testing.T) {
if len(allEvents) != len(expectedEvents) {
t.Fatalf("Received events not equal to expected events, wanted %d, got %d", len(expectedEvents), len(allEvents))
}
for i, ev := range allEvents {
ev.Timestamp = 0
got := ev.String()
want := expectedEvents[i]
if !strings.HasPrefix(got, want) {
printAllEvents("Events not received in the right order")
t.Fatalf("Event %d did not match, want %s, got %s", i, want, got)
}
}
Expand Down Expand Up @@ -321,11 +314,24 @@ func initTables(t *testing.T, tables []string) {
positions[fmt.Sprintf("%sBulkInsert", table)] = masterPosition(t)

callbacks[fmt.Sprintf("LASTPK.*%s.*%d", table, numInitialRows)] = func() {
ctx := context.Background()
if tableName == "t1" {
insertRow(t, "t1", 1, numInitialRows+1)
//should result in empty commit ignored during catchup since t2 copy has not started
insertRow(t, "t2", 2, numInitialRows+1)
log.Infof("Position after first insert into t1 (and t2/t3): %s", masterPosition(t))
idx := 1
id := numInitialRows + 1
table := "t1"
query1 := fmt.Sprintf(insertQuery, table, idx, idx, id, id*idx*10)
idx = 2
table = "t2"
query2 := fmt.Sprintf(insertQuery, table, idx, idx, id, id*idx*10)

queries := []string{
"begin",
query1,
query2,
"commit",
}
env.Mysqld.ExecuteSuperQueryList(ctx, queries)
log.Infof("Position after first insert into t1 and t2: %s", masterPosition(t))
}
}
}
Expand Down Expand Up @@ -364,9 +370,9 @@ func insertRow(t *testing.T, table string, idx int, id int) {
}

func printAllEvents(msg string) {
log.Infof("%s: Received %d events", msg, len(allEvents))
for _, ev := range allEvents {
log.Infof("\t%s", ev)
log.Errorf("%s: Received %d events", msg, len(allEvents))
for i, ev := range allEvents {
log.Errorf("%d:\t%s", i, ev)
}
}

Expand Down Expand Up @@ -428,10 +434,7 @@ var expectedEvents = []string{
"type:FIELD field_event:<table_name:\"t1\" fields:<name:\"id11\" type:INT32 > fields:<name:\"id12\" type:INT32 > > ",
"type:ROW row_event:<table_name:\"t1\" row_changes:<after:<lengths:2 lengths:3 values:\"11110\" > > > ",
"type:GTID",
"type:COMMIT",
"type:BEGIN", //empty commit for insert into t3
"type:GTID",
"type:COMMIT",
"type:COMMIT", //insert for t2 done along with t1 does not generate an event since t2 is not yet copied
"type:OTHER gtid:\"Copy Start t2\"",
"type:BEGIN",
"type:FIELD field_event:<table_name:\"t1\" fields:<name:\"id11\" type:INT32 > fields:<name:\"id12\" type:INT32 > > ",
Expand Down
49 changes: 31 additions & 18 deletions go/vt/vttablet/tabletserver/vstreamer/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -519,8 +519,8 @@ func TestOther(t *testing.T) {
t.Logf("Run mode: %v", mode)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := startStream(ctx, t, nil, "", nil)

wg, ch := startStream(ctx, t, nil, "", nil)
defer wg.Wait()
want := [][]string{{
`gtid`,
`type:OTHER `,
Expand Down Expand Up @@ -623,8 +623,8 @@ func TestREKeyRange(t *testing.T) {
Filter: "-80",
}},
}
ch := startStream(ctx, t, filter, "", nil)

wg, ch := startStream(ctx, t, filter, "", nil)
defer wg.Wait()
// 1, 2, 3 and 5 are in shard -80.
// 4 and 6 are in shard 80-.
input := []string{
Expand Down Expand Up @@ -686,6 +686,7 @@ func TestREKeyRange(t *testing.T) {
`gtid`,
`commit`,
}})
cancel()
}

func TestInKeyRangeMultiColumn(t *testing.T) {
Expand Down Expand Up @@ -713,7 +714,8 @@ func TestInKeyRangeMultiColumn(t *testing.T) {
Filter: "select id, region, val, keyspace_id() from t1 where in_keyrange('-80')",
}},
}
ch := startStream(ctx, t, filter, "", nil)
wg, ch := startStream(ctx, t, filter, "", nil)
defer wg.Wait()

// 1, 2, 3 and 5 are in shard -80.
// 4 and 6 are in shard 80-.
Expand Down Expand Up @@ -741,6 +743,7 @@ func TestInKeyRangeMultiColumn(t *testing.T) {
`gtid`,
`commit`,
}})
cancel()
}

func TestREMultiColumnVindex(t *testing.T) {
Expand Down Expand Up @@ -768,7 +771,8 @@ func TestREMultiColumnVindex(t *testing.T) {
Filter: "-80",
}},
}
ch := startStream(ctx, t, filter, "", nil)
wg, ch := startStream(ctx, t, filter, "", nil)
defer wg.Wait()

// 1, 2, 3 and 5 are in shard -80.
// 4 and 6 are in shard 80-.
Expand All @@ -795,6 +799,7 @@ func TestREMultiColumnVindex(t *testing.T) {
`gtid`,
`commit`,
}})
cancel()
}

func TestSelectFilter(t *testing.T) {
Expand Down Expand Up @@ -1448,10 +1453,12 @@ func TestHeartbeat(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

ch := startStream(ctx, t, nil, "", nil)
wg, ch := startStream(ctx, t, nil, "", nil)
defer wg.Wait()
evs := <-ch
require.Equal(t, 1, len(evs))
assert.Equal(t, binlogdatapb.VEventType_HEARTBEAT, evs[0].Type)
cancel()
}

func TestNoFutureGTID(t *testing.T) {
Expand Down Expand Up @@ -1544,15 +1551,16 @@ func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, p
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := startStream(ctx, t, filter, position, tablePK)

wg, ch := startStream(ctx, t, filter, position, tablePK)
defer wg.Wait()
// If position is 'current', we wait for a heartbeat to be
// sure the vstreamer has started.
if position == "current" {
log.Infof("Starting stream with current position")
expectLog(ctx, t, "current pos", ch, [][]string{{`gtid`, `type:OTHER `}})
}

log.Infof("Starting to run test cases")
for _, tcase := range testcases {
switch input := tcase.input.(type) {
case []string:
Expand All @@ -1564,10 +1572,12 @@ func runCases(t *testing.T, filter *binlogdatapb.Filter, testcases []testcase, p
}
expectLog(ctx, t, tcase.input, ch, tcase.output)
}

cancel()
if evs, ok := <-ch; ok {
t.Fatalf("unexpected evs: %v", evs)
}
log.Infof("Last line of runCases")
}

func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan []*binlogdatapb.VEvent, output [][]string) {
Expand Down Expand Up @@ -1633,23 +1643,26 @@ func expectLog(ctx context.Context, t *testing.T, input interface{}, ch <-chan [

var lastPos string

func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*binlogdatapb.TableLastPK) <-chan []*binlogdatapb.VEvent {
if position == "" {
func startStream(ctx context.Context, t *testing.T, filter *binlogdatapb.Filter, position string, tablePKs []*binlogdatapb.TableLastPK) (*sync.WaitGroup, <-chan []*binlogdatapb.VEvent) {
switch position {
case "":
position = masterPosition(t)
}
if position == "vscopy" {
case "vscopy":
position = ""
}

wg := sync.WaitGroup{}
wg.Add(1)
ch := make(chan []*binlogdatapb.VEvent)

go func() {
defer close(ch)
err := vstream(ctx, t, position, tablePKs, filter, ch)
if len(tablePKs) == 0 {
require.Nil(t, err)
}
defer wg.Done()
log.Infof(">>>>>>>>>>> before vstream")
vstream(ctx, t, position, tablePKs, filter, ch)
log.Infof(">>>>>>>>>> after vstream")
}()
return ch
return &wg, ch
}

func vstream(ctx context.Context, t *testing.T, pos string, tablePKs []*binlogdatapb.TableLastPK, filter *binlogdatapb.Filter, ch chan []*binlogdatapb.VEvent) error {
Expand Down