diff --git a/go/test/endtoend/vreplication/helper.go b/go/test/endtoend/vreplication/helper.go index e32f9afd043..2dfc3467a93 100644 --- a/go/test/endtoend/vreplication/helper.go +++ b/go/test/endtoend/vreplication/helper.go @@ -241,3 +241,14 @@ func expectNumberOfStreams(t *testing.T, vtgateConn *mysql.Conn, name string, wo t.Fatalf("Incorrect streams found for %s: %s\n", name, result) } } + +func printShardPositions(vc *VitessCluster, ksShards []string) { + for _, ksShard := range ksShards { + output, err := vc.VtctlClient.ExecuteCommandWithOutput("ShardReplicationPositions", ksShard) + if err != nil { + fmt.Printf("Error in ShardReplicationPositions: %v, output %v", err, output) + } else { + fmt.Printf("Position of %s: %s", ksShard, output) + } + } +} diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 2c07a8541f9..58867889ad6 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -186,6 +186,7 @@ func insertMoreCustomers(t *testing.T, numCustomers int) { execVtgateQuery(t, vtgateConn, "customer", sql) } +// FIXME: if testReverse if false we don't dropsources and that creates a problem later on in the test due to existence of blacklisted tables func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAlias string) { if _, err := vc.AddKeyspace(t, cells, "customer", "-80,80-", customerVSchema, customerSchema, defaultReplicas, defaultRdonly, 200); err != nil { t.Fatal(err) @@ -250,6 +251,8 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "customer.p2c"); err != nil { t.Fatalf("SwitchWrites error: %s\n", output) } + ksShards := []string{"product/0", "customer/-80", "customer/80-"} + printShardPositions(vc, ksShards) insertQuery2 := "insert into customer(name) values('tempCustomer2')" matchInsertQuery2 := "insert into customer(name, cid) values (:vtg1, :_cid0)" assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2)) @@ -268,11 +271,13 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchReads", "-cells="+allCellNames, "-tablet_type=replica", "product.p2c_reverse"); err != nil { t.Fatalf("SwitchReads error: %s\n", output) } + printShardPositions(vc, ksShards) if output, err := vc.VtctlClient.ExecuteCommandWithOutput("SwitchWrites", "product.p2c_reverse"); err != nil { t.Fatalf("SwitchWrites error: %s\n", output) } insertQuery1 = "insert into customer(cid, name) values(1002, 'tempCustomer5')" assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "product", insertQuery1, matchInsertQuery1)) + // both inserts go into 80-, this tests the edge-case where a stream (-80) has no relevant new events after the previous switch insertQuery1 = "insert into customer(cid, name) values(1003, 'tempCustomer6')" assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery1, matchInsertQuery1)) insertQuery1 = "insert into customer(cid, name) values(1004, 'tempCustomer7')" diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 93f3356d69a..ad3ef6f388c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -251,8 +251,6 @@ func (vp *vplayer) updateTime(ts int64) (err error) { if _, err := vp.vr.dbClient.Execute(update); err != nil { return fmt.Errorf("error %v updating time", err) } - vp.unsavedEvent = nil - vp.timeLastSaved = time.Now() return nil } diff --git a/go/vt/wrangler/traffic_switcher.go b/go/vt/wrangler/traffic_switcher.go index 14ef269932e..7693ffd8e0e 100644 --- a/go/vt/wrangler/traffic_switcher.go +++ b/go/vt/wrangler/traffic_switcher.go @@ -741,14 +741,18 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati var mu sync.Mutex return ts.forAllUids(func(target *tsTarget, uid uint32) error { + ts.wr.Logger().Infof("uid: %d, target master %s, target position %s, shard %s", uid, + target.master.AliasString(), target.position, target.si.String()) bls := target.sources[uid] source := ts.sources[bls.Shard] - ts.wr.Logger().Infof("waiting for keyspace:shard: %v:%v, position %v", ts.targetKeyspace, target.si.ShardName(), source.position) + ts.wr.Logger().Infof("waiting for keyspace:shard: %v:%v, source position %v, uid %d", + ts.targetKeyspace, target.si.ShardName(), source.position, uid) if err := ts.wr.tmc.VReplicationWaitForPos(ctx, target.master.Tablet, int(uid), source.position); err != nil { return err } - ts.wr.Logger().Infof("position for keyspace:shard: %v:%v reached", ts.targetKeyspace, target.si.ShardName()) + ts.wr.Logger().Infof("position for keyspace:shard: %v:%v reached, uid %d", ts.targetKeyspace, target.si.ShardName(), uid) if _, err := ts.wr.tmc.VReplicationExec(ctx, target.master.Tablet, binlogplayer.StopVReplication(uid, "stopped for cutover")); err != nil { + log.Infof("error marking stopped for cutover on %s, uid %d", target.master.AliasString(), uid) return err } @@ -760,7 +764,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati } var err error target.position, err = ts.wr.tmc.MasterPosition(ctx, target.master.Tablet) - ts.wr.Logger().Infof("Position for uid %v: %v", uid, target.position) + ts.wr.Logger().Infof("Position for target master %s, uid %v: %v", target.master.AliasString(), uid, target.position) return err }) }