Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (vc *VitessCluster) WaitForVReplicationToCatchup(vttablet *cluster.Vttablet
results := [3]string{"[INT64(0)]", "[INT64(1)]", "[INT64(0)]"}
var lastChecked time.Time
for ind, query := range queries {
waitDuration := 100 * time.Millisecond
waitDuration := 500 * time.Millisecond
for duration > 0 {
fmt.Printf("Executing query %s on %s\n", query, vttablet.Name)
lastChecked = time.Now()
Expand Down
2 changes: 2 additions & 0 deletions go/test/endtoend/vreplication/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ func getQueryCount(url string, query string) int {
//Queries seem to include non-printable characters at times and hence equality fails unless these are removed
re := regexp.MustCompile("[[:^ascii:]]")
foundQuery := re.ReplaceAllLiteralString(row[queryIndex], "")
foundQuery = strings.ReplaceAll(foundQuery, "_", "")
cleanQuery := re.ReplaceAllLiteralString(query, "")
cleanQuery = strings.ReplaceAll(cleanQuery, "_", "")
if foundQuery == cleanQuery {
count, _ = strconv.Atoi(row[countIndex])
}
Expand Down
19 changes: 10 additions & 9 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ func TestBasicVreplicationWorkflow(t *testing.T) {
verifyClusterHealth(t)
insertInitialData(t)
shardCustomer(t, true)

shardOrders(t)
shardMerchant(t)

Expand Down Expand Up @@ -175,9 +174,11 @@ func shardCustomer(t *testing.T, testReverse bool) {
insertQuery2 := "insert into customer(name) values('tempCustomer2')"
matchInsertQuery2 := "insert into customer(name, cid) values (:vtg1, :_cid0)"
assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2))
insertQuery2 = "insert into customer(name) values('tempCustomer3')" //ID 101, hence due to reverse_bits in shard 80-

insertQuery2 = "insert into customer(name, cid) values('tempCustomer3', 101)" //ID 101, hence due to reverse_bits in shard 80-
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2))
insertQuery2 = "insert into customer(name) values('tempCustomer4')" //ID 102, hence due to reverse_bits in shard -80

insertQuery2 = "insert into customer(name, cid) values('tempCustomer4', 102)" //ID 102, hence due to reverse_bits in shard -80
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2))

if testReverse {
Expand Down Expand Up @@ -244,12 +245,12 @@ func shardCustomer(t *testing.T, testReverse bool) {
assert.NoError(t, err, "Customer table not deleted from zone1-200")
assert.True(t, found)

insertQuery2 = "insert into customer(name) values('tempCustomer8')" //ID 103, hence due to reverse_bits in shard 80-
insertQuery2 = "insert into customer(name, cid) values('tempCustomer8', 103)" //ID 103, hence due to reverse_bits in shard 80-
assert.False(t, validateThatQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", insertQuery2, matchInsertQuery2))
insertQuery2 = "insert into customer(name) values('tempCustomer9')" //ID 104, hence due to reverse_bits in shard 80-
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2))
insertQuery2 = "insert into customer(name) values('tempCustomer10')" //ID 105, hence due to reverse_bits in shard -80
insertQuery2 = "insert into customer(name, cid) values('tempCustomer10', 104)" //ID 105, hence due to reverse_bits in shard -80
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab1, "customer", insertQuery2, matchInsertQuery2))
insertQuery2 = "insert into customer(name, cid) values('tempCustomer9', 105)" //ID 104, hence due to reverse_bits in shard 80-
assert.True(t, validateThatQueryExecutesOnTablet(t, vtgateConn, customerTab2, "customer", insertQuery2, matchInsertQuery2))

execVtgateQuery(t, vtgateConn, "customer", "delete from customer where name like 'tempCustomer%'")
assert.Empty(t, validateCountInTablet(t, customerTab1, "customer", "customer", 1))
Expand All @@ -266,7 +267,7 @@ func shardCustomer(t *testing.T, testReverse bool) {

func reshardCustomer2to4Split(t *testing.T) {
ksName := "customer"
counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 5, "zone1-900": 6}
counts := map[string]int{"zone1-600": 4, "zone1-700": 5, "zone1-800": 6, "zone1-900": 5}
reshard(t, ksName, "customer", "c2c4", "-80,80-", "-40,40-80,80-c0,c0-", 600, counts, nil)
assert.Empty(t, validateCount(t, vtgateConn, ksName, "customer", 20))
query := "insert into customer (name) values('yoko')"
Expand Down Expand Up @@ -329,7 +330,7 @@ func reshardMerchant3to1Merge(t *testing.T) {

func reshardCustomer3to2SplitMerge(t *testing.T) { //-40,40-80,80-c0 => merge/split, c0- stays the same ending up with 3
ksName := "customer"
counts := map[string]int{"zone1-600": 5, "zone1-700": 5, "zone1-800": 5, "zone1-900": 6}
counts := map[string]int{"zone1-1000": 7, "zone1-1100": 9, "zone1-1200": 5}
reshard(t, ksName, "customer", "c4c3", "-40,40-80,80-c0", "-60,60-c0", 1000, counts, nil)
}

Expand Down
1 change: 1 addition & 0 deletions go/vt/vtexplain/vtexplain_vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func newTablet(opts *Options, t *topodatapb.Tablet) *explainTablet {
db := fakesqldb.New(nil)

config := tabletenv.NewCurrentConfig()
config.TrackSchemaVersions = false
if opts.ExecutionMode == ModeTwoPC {
config.TwoPCCoordinatorAddress = "XXX"
config.TwoPCAbandonAge = 1.0
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/endtoend/framework/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func StartServer(connParams, connAppDebugParams mysql.ConnParams, dbName string)
config.TwoPCAbandonAge = 1
config.TwoPCCoordinatorAddress = "fake"
config.HotRowProtection.Mode = tabletenv.Enable
config.TrackSchemaVersions = true

Target = querypb.Target{
Keyspace: "vttest",
Expand Down
94 changes: 12 additions & 82 deletions go/vt/vttablet/endtoend/vstreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,88 +22,36 @@ import (
"errors"
"fmt"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"vitess.io/vitess/go/vt/sqlparser"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
tabletpb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/srvtopo"
"vitess.io/vitess/go/vt/vttablet/endtoend/framework"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"
)

type test struct {
query string
output []string
}

func TestHistorianSchemaUpdate(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
func TestSchemaVersioning(t *testing.T) {
// Let's disable the already running tracker to prevent it from
// picking events from the previous test, and then re-enable it at the end.
tsv := framework.Server
historian := tsv.Historian()
srvTopo := srvtopo.NewResilientServer(framework.TopoServer, "SchemaVersionE2ETestTopo")

vstreamer.NewEngine(tabletenv.NewEnv(tsv.Config(), "SchemaVersionE2ETest"), srvTopo, tsv.SchemaEngine(), historian)
target := &querypb.Target{
Keyspace: "vttest",
Shard: "0",
TabletType: tabletpb.TabletType_MASTER,
Cell: "",
}
filter := &binlogdatapb.Filter{
Rules: []*binlogdatapb.Rule{{
Match: "/.*/",
}},
}
var createTableSQL = "create table historian_test1(id1 int)"
var mu sync.Mutex
mu.Lock()
send := func(events []*binlogdatapb.VEvent) error {
for _, ev := range events {
if ev.Type == binlogdatapb.VEventType_DDL && ev.Ddl == createTableSQL {
log.Info("Found DDL for table historian_test1")
mu.Unlock()
}
}
return nil
}
go func() {
if err := tsv.VStream(ctx, target, "current", nil, filter, send); err != nil {
fmt.Printf("Error in tsv.VStream: %v", err)
t.Error(err)
}
}()
tsv.EnableHistorian(false)
tsv.SetTracking(false)
defer tsv.EnableHistorian(true)
defer tsv.SetTracking(true)

require.Nil(t, historian.GetTableForPos(sqlparser.NewTableIdent("historian_test1"), ""))
require.NotNil(t, historian.GetTableForPos(sqlparser.NewTableIdent("vitess_test"), ""))
client := framework.NewClient()
client.Execute(createTableSQL, nil)

mu.Lock()
minSchema := historian.GetTableForPos(sqlparser.NewTableIdent("historian_test1"), "")
want := `name:"historian_test1" fields:<name:"id1" type:INT32 table:"historian_test1" org_table:"historian_test1" database:"vttest" org_name:"id1" column_length:11 charset:63 flags:32768 > `
require.Equal(t, fmt.Sprintf("%v", minSchema), want)

}

func TestSchemaVersioning(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tsv := framework.Server
tsv.Historian().SetTrackSchemaVersions(true)
tsv.StartTracker()
srvTopo := srvtopo.NewResilientServer(framework.TopoServer, "SchemaVersionE2ETestTopo")
tsv.EnableHistorian(true)
tsv.SetTracking(true)

vstreamer.NewEngine(tabletenv.NewEnv(tsv.Config(), "SchemaVersionE2ETest"), srvTopo, tsv.SchemaEngine(), tsv.Historian())
target := &querypb.Target{
Keyspace: "vttest",
Shard: "0",
Expand All @@ -124,8 +72,6 @@ func TestSchemaVersioning(t *testing.T) {
`other`,
`gtid`, //gtid+ddl => actual query
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `,
`gtid`, //gtid+other => insert into schema_version resulting in version+other
`other`,
`version`,
`gtid`,
},
Expand All @@ -142,8 +88,6 @@ func TestSchemaVersioning(t *testing.T) {
output: []string{
`gtid`,
`type:DDL ddl:"alter table vitess_version add column id3 int" `,
`gtid`, //gtid+other => insert into schema_version resulting in version+other
`other`,
`version`,
`gtid`,
},
Expand All @@ -159,8 +103,6 @@ func TestSchemaVersioning(t *testing.T) {
output: []string{
`gtid`,
`type:DDL ddl:"alter table vitess_version modify column id3 varbinary(16)" `,
`gtid`, //gtid+other => insert into schema_version resulting in version+other
`other`,
`version`,
`gtid`,
},
Expand Down Expand Up @@ -192,7 +134,7 @@ func TestSchemaVersioning(t *testing.T) {
select {
case eventCh <- evs:
case <-ctx.Done():
t.Fatal("Context Done() in send")
return nil
}
return nil
}
Expand All @@ -206,7 +148,7 @@ func TestSchemaVersioning(t *testing.T) {
log.Infof("\n\n\n=============================================== CURRENT EVENTS START HERE ======================\n\n\n")
runCases(ctx, t, cases, eventCh)

tsv.StopTracker()
tsv.SetTracking(false)
cases = []test{
{
//comment prefix required so we don't look for ddl in schema_version
Expand Down Expand Up @@ -258,26 +200,20 @@ func TestSchemaVersioning(t *testing.T) {
output := []string{
`gtid`,
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `,
`gtid`,
`other`,
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 values:"110" > > > `,
`gtid`,
`gtid`,
`type:DDL ddl:"alter table vitess_version add column id3 int" `,
`gtid`,
`other`,
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:INT32 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 lengths:3 values:"220200" > > > `,
`gtid`,
`gtid`,
`type:DDL ddl:"alter table vitess_version modify column id3 varbinary(16)" `,
`gtid`,
`other`,
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > fields:<name:"id3" type:VARBINARY > > `,
Expand All @@ -295,7 +231,7 @@ func TestSchemaVersioning(t *testing.T) {
cancel()

log.Infof("\n\n\n=============================================== PAST EVENTS WITHOUT TRACK VERSIONS START HERE ======================\n\n\n")
tsv.Historian().SetTrackSchemaVersions(false)
tsv.EnableHistorian(false)
ctx, cancel = context.WithCancel(context.Background())
defer cancel()
eventCh = make(chan []*binlogdatapb.VEvent)
Expand Down Expand Up @@ -327,17 +263,13 @@ func TestSchemaVersioning(t *testing.T) {
output = []string{
`gtid`,
`type:DDL ddl:"create table vitess_version (id1 int, id2 int)" `,
`gtid`,
`other`,
`version`,
`gtid`,
`type:FIELD field_event:<table_name:"vitess_version" fields:<name:"id1" type:INT32 > fields:<name:"id2" type:INT32 > > `,
`type:ROW row_event:<table_name:"vitess_version" row_changes:<after:<lengths:1 lengths:2 values:"110" > > > `,
`gtid`,
`gtid`,
`type:DDL ddl:"alter table vitess_version add column id3 int" `,
`gtid`,
`other`,
`version`,
`gtid`,
/*at this point we only have latest schema so we have types (int32, int32, varbinary, varbinary) so the types don't match. Hence the @ fieldnames*/
Expand All @@ -346,8 +278,6 @@ func TestSchemaVersioning(t *testing.T) {
`gtid`,
`gtid`,
`type:DDL ddl:"alter table vitess_version modify column id3 varbinary(16)" `,
`gtid`,
`other`,
`version`,
`gtid`,
/*at this point we only have latest schema so we have types (int32, int32, varbinary, varbinary),
Expand Down
Loading