Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (vc *VitessCluster) WaitForVReplicationToCatchup(vttablet *cluster.Vttablet
results := [3]string{"[INT64(0)]", "[INT64(1)]", "[INT64(0)]"}
var lastChecked time.Time
for ind, query := range queries {
waitDuration := 100 * time.Millisecond
waitDuration := 500 * time.Millisecond
for duration > 0 {
fmt.Printf("Executing query %s on %s\n", query, vttablet.Name)
lastChecked = time.Now()
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/vreplication/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ func getQueryCount(url string, query string) int {
//Queries seem to include non-printable characters at times and hence equality fails unless these are removed
re := regexp.MustCompile("[[:^ascii:]]")
foundQuery := re.ReplaceAllLiteralString(row[queryIndex], "")
foundQuery = strings.ReplaceAll(foundQuery, "_", "")
cleanQuery := re.ReplaceAllLiteralString(query, "")
cleanQuery = strings.ReplaceAll(cleanQuery, "_", "")
if foundQuery == cleanQuery {
count, _ = strconv.Atoi(row[countIndex])
}
Expand Down
19 changes: 10 additions & 9 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func TestBasicVreplicationWorkflow(t *testing.T) {
verifyClusterHealth(t)
insertInitialData(t)
shardCustomer(t, true)

shardOrders(t)
shardMerchant(t)

Expand Down Expand Up @@ -175,9 +174,11 @@ func shardCustomer(t *testing.T, testReverse bool) {
insertQuery2 := "insert into customer(name) values('tempCustomer2')"
matchInsertQuery2 := "insert into customer(name, cid) values (:vtg1, :_cid0)"
assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2))
insertQuery2 = "insert into customer(name) values('tempCustomer3')" //ID 101, hence due to reverse_bits in shard 80-

insertQuery2 = "insert into customer(name, cid) values('tempCustomer3', 101)" //ID 101, hence due to reverse_bits in shard 80-
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2))
insertQuery2 = "insert into customer(name) values('tempCustomer4')" //ID 102, hence due to reverse_bits in shard -80

insertQuery2 = "insert into customer(name, cid) values('tempCustomer4', 102)" //ID 102, hence due to reverse_bits in shard -80
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2))

if testReverse {
Expand Down Expand Up @@ -244,12 +245,12 @@ func shardCustomer(t *testing.T, testReverse bool) {
assert.NoError(t, err, "Customer table not deleted from zone1-200")
assert.True(t, found)

insertQuery2 = "insert into customer(name) values('tempCustomer8')" //ID 103, hence due to reverse_bits in shard 80-
insertQuery2 = "insert into customer(name, cid) values('tempCustomer8', 103)" //ID 103, hence due to reverse_bits in shard 80-
assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2))
insertQuery2 = "insert into customer(name) values('tempCustomer9')" //ID 104, hence due to reverse_bits in shard 80-
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2))
insertQuery2 = "insert into customer(name) values('tempCustomer10')" //ID 105, hence due to reverse_bits in shard -80
insertQuery2 = "insert into customer(name, cid) values('tempCustomer10', 104)" //ID 105, hence due to reverse_bits in shard -80
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2))
insertQuery2 = "insert into customer(name, cid) values('tempCustomer9', 105)" //ID 104, hence due to reverse_bits in shard 80-
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2))

execVtgateQuery(t, vtgateConn, "customer", "delete from customer where name like 'tempCustomer%'")
assert.Empty(t, validateCountInTablet(t, customerTab1, "customer", "customer", 1))
Expand All @@ -266,7 +267,7 @@ func shardCustomer(t *testing.T, testReverse bool) {

func reshardCustomer2to4Split(t *testing.T) {
ksName := "customer"
counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 5, "zone1-900": 6}
counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 6, "zone1-900": 5}
reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-", 600, counts, nil)
assert.Empty(t, validateCount(t, vtgateConn, ksName, "customer", 20))
query := "insert into customer (name) values('yoko')"
Expand Down Expand Up @@ -329,7 +330,7 @@ func reshardMerchant3to1Merge(t *testing.T) {

func reshardCustomer3to2SplitMerge(t *testing.T) { //-40,40-80,80-c0 => merge/split, c0- stays the same ending up with 3
ksName := "customer"
counts := map[string]int{"zone1-600": 5, "zone1-700": 5, "zone1-800": 5, "zone1-900": 6}
counts := map[string]int{"zone1-1000": 7, "zone1-1100": 9, "zone1-1200": 5}
reshard(t, ksName, "customer", "c4c3", "-40,40-80,80-c0", "-60,60-c0", 1000, counts, nil)
}

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtexplain/vtexplain_vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newTablet(opts *Options, t *topodatapb.Tablet) *explainTablet {
db := fakesqldb.New(nil)

config := tabletenv.NewCurrentConfig()
config.TrackSchemaVersions = false
if opts.ExecutionMode == ModeTwoPC {
config.TwoPCCoordinatorAddress = "XXX"
config.TwoPCAbandonAge = 1.0
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/endtoend/framework/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func StartServer(connParams, connAppDebugParams mysql.ConnParams, dbName string)
config.TwoPCAbandonAge = 1
config.TwoPCCoordinatorAddress = "fake"
config.HotRowProtection.Mode = tabletenv.Enable
config.TrackSchemaVersions = true

Target = querypb.Target{
Keyspace: "vttest",
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,7 @@ import (
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletpb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
)

type test struct {
Expand All @@ -50,9 +47,7 @@ func TestHistorianSchemaUpdate(t *testing.T) {
defer cancel()
tsv := framework.Server
historian := tsv.Historian()
srvTopo := srvtopo.NewResilientServer(framework.TopoServer, "SchemaVersionE2ETestTopo")

vstreamer.NewEngine(tabletenv.NewEnv(tsv.Config(), "SchemaVersionE2ETest"), srvTopo, tsv.SchemaEngine(), historian)
target := &querypb.Target{
Keyspace: "vttest",
Shard: "0",
Expand Down Expand Up @@ -96,14 +91,19 @@ func TestHistorianSchemaUpdate(t *testing.T) {
}

func TestSchemaVersioning(t *testing.T) {
// Let's disable the already running tracker to prevent it from
// picking events from the previous test, and then re-enable it at the end.
tsv := framework.Server
tsv.StopTracker()
tsv.Historian().SetTrackSchemaVersions(false)
defer tsv.StartTracker()
defer tsv.Historian().SetTrackSchemaVersions(true)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tsv := framework.Server
tsv.Historian().SetTrackSchemaVersions(true)
tsv.StartTracker()
srvTopo := srvtopo.NewResilientServer(framework.TopoServer, "SchemaVersionE2ETestTopo")

vstreamer.NewEngine(tabletenv.NewEnv(tsv.Config(), "SchemaVersionE2ETest"), srvTopo, tsv.SchemaEngine(), tsv.Historian())
target := &querypb.Target{
Keyspace: "vttest",
Shard: "0",
Expand Down Expand Up @@ -192,7 +192,7 @@ func TestSchemaVersioning(t *testing.T) {
select {
case eventCh <- evs:
case <-ctx.Done():
t.Fatal("Context Done() in send")
return nil
}
return nil
}
Expand Down
29 changes: 10 additions & 19 deletions go/vt/vttablet/tabletserver/replication_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ limitations under the License.
package tabletserver

import (
"sync"
"time"

"golang.org/x/net/context"
"vitess.io/vitess/go/vt/vttablet/tabletserver/schema"

"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
Expand All @@ -41,18 +41,17 @@ type ReplicationWatcher struct {
env tabletenv.Env
watchReplication bool
vs VStreamer
subscriber schema.Subscriber

cancel context.CancelFunc
wg sync.WaitGroup
}

// NewReplicationWatcher creates a new ReplicationWatcher.
func NewReplicationWatcher(env tabletenv.Env, vs VStreamer, config *tabletenv.TabletConfig, schemaTracker schema.Subscriber) *ReplicationWatcher {
func NewReplicationWatcher(env tabletenv.Env, vs VStreamer, config *tabletenv.TabletConfig) *ReplicationWatcher {
return &ReplicationWatcher{
env: env,
vs: vs,
watchReplication: config.WatchReplication,
subscriber: schemaTracker,
}
}

Expand All @@ -64,7 +63,8 @@ func (rpw *ReplicationWatcher) Open() {

ctx, cancel := context.WithCancel(tabletenv.LocalContext())
rpw.cancel = cancel
go rpw.Process(ctx)
rpw.wg.Add(1)
go rpw.process(ctx)
}

// Close stops the ReplicationWatcher service.
Expand All @@ -74,39 +74,30 @@ func (rpw *ReplicationWatcher) Close() {
}
rpw.cancel()
rpw.cancel = nil
rpw.wg.Wait()
}

// Process processes the replication stream.
func (rpw *ReplicationWatcher) Process(ctx context.Context) {
func (rpw *ReplicationWatcher) process(ctx context.Context) {
defer rpw.env.LogError()
defer rpw.wg.Done()

filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*",
}},
}

var gtid string
for {
// The tracker will reload the schema and save it into _vt.schema_tracking when the vstream encounters a DDL.
// VStreamer will reload the schema when it encounters a DDL.
err := rpw.vs.Stream(ctx, "current", nil, filter, func(events []*binlogdatapb.VEvent) error {
for _, event := range events {
if event.Type == binlogdatapb.VEventType_GTID {
gtid = event.Gtid
}
if event.Type == binlogdatapb.VEventType_DDL {
log.Infof("Calling schema updated for %s %s", gtid, event.Ddl)
rpw.subscriber.SchemaUpdated(gtid, event.Ddl, event.Timestamp)
}
}
return nil
})
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Second):
}
log.Infof("VStream ended: %v, retrying in 5 seconds", err)
log.Infof("ReplicatinWatcher VStream ended: %v, retrying in 5 seconds", err)
time.Sleep(5 * time.Second)
}
}
161 changes: 0 additions & 161 deletions go/vt/vttablet/tabletserver/replication_watcher_test.go

This file was deleted.

Loading