Skip to content
Merged
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.6 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/eapache/queue/v2 v2.0.0-20230407133247-75960ed334e4 // indirect
github.com/ebitengine/purego v0.8.2 // indirect
github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,6 @@ github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8Yc
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y=
github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY=
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
Expand Down
20 changes: 20 additions & 0 deletions go/test/endtoend/vreplication/initial_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"fmt"
"math/rand/v2"
"os"
"strings"
"testing"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
)

Expand All @@ -43,6 +45,12 @@ func insertInitialData(t *testing.T) {
`[[VARCHAR("Monoprice") VARCHAR("eléctronics")] [VARCHAR("newegg") VARCHAR("elec†ronics")]]`)

insertJSONValues(t)

insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs+":0", 50000)
log.Infof("Inserted large transaction for chunking tests")

execVtgateQuery(t, vtgateConn, defaultSourceKs, "delete from customer where cid >= 50000 and cid < 50100")
log.Infof("Cleaned up chunk testing rows from source keyspace")
})
}

Expand Down Expand Up @@ -140,3 +148,15 @@ func insertIntoBlobTable(t *testing.T) {
execVtgateQuery(t, vtgateConn, fmt.Sprintf("%s:0", defaultSourceKs), query)
}
}

// insertLargeTransactionForChunkTesting inserts a transaction large enough to exceed the 1KB chunking threshold.
func insertLargeTransactionForChunkTesting(t *testing.T, vtgateConn *mysql.Conn, keyspace string, startID int) {
execVtgateQuery(t, vtgateConn, keyspace, "BEGIN")
for i := 0; i < 15; i++ {
largeData := strings.Repeat("x", 94) + fmt.Sprintf("_%05d", i)
query := fmt.Sprintf("INSERT INTO customer (cid, name) VALUES (%d, '%s')",
startID+i, largeData)
execVtgateQuery(t, vtgateConn, keyspace, query)
}
execVtgateQuery(t, vtgateConn, keyspace, "COMMIT")
}
75 changes: 51 additions & 24 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
defaultRdonly = 0

defaultCell := vc.Cells[vc.CellNames[0]]
vc.AddKeyspace(t, []*Cell{defaultCell}, "product", "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)
vc.AddKeyspace(t, []*Cell{defaultCell}, defaultSourceKs, "0", initialProductVSchema, initialProductSchema, defaultReplicas, defaultRdonly, 100, nil)
verifyClusterHealth(t, vc)

ctx := context.Background()
Expand All @@ -59,7 +59,7 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
defer vstreamConn.Close()
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Keyspace: "product",
Keyspace: defaultSourceKs,
Shard: "0",
Gtid: "",
}}}
Expand All @@ -79,7 +79,8 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
},
}
flags := &vtgatepb.VStreamFlags{
TablesToCopy: []string{"product", "customer"},
TablesToCopy: []string{"product", "customer"},
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
id := 0
vtgateConn := vc.GetVTGateConn(t)
Expand All @@ -89,11 +90,13 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
// present in the filter before running the VStream.
for range 10 {
id++
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
}

insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 10000)

// Stream events from the VStream API
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
Expand Down Expand Up @@ -150,15 +153,20 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
stopInserting.Store(false)
var insertMu sync.Mutex
go func() {
insertCount := 0
for {
if stopInserting.Load() {
return
}
insertMu.Lock()
id++
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
execVtgateQuery(t, vtgateConn, "product", fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
insertCount++
if insertCount%5 == 0 {
insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 20000+insertCount*10)
}
insertMu.Unlock()
}
}()
Expand All @@ -168,9 +176,9 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
time.Sleep(10 * time.Second) // Give the vstream plenty of time to catchup
done.Store(true)

qr1 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from customer")
qr2 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from product")
qr3 := execVtgateQuery(t, vtgateConn, "product", "select count(*) from merchant")
qr1 := execVtgateQuery(t, vtgateConn, defaultSourceKs, "select count(*) from customer")
qr2 := execVtgateQuery(t, vtgateConn, defaultSourceKs, "select count(*) from product")
qr3 := execVtgateQuery(t, vtgateConn, defaultSourceKs, "select count(*) from merchant")
require.NotNil(t, qr1)
require.NotNil(t, qr2)
require.NotNil(t, qr3)
Expand Down Expand Up @@ -238,7 +246,10 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
Filter: "select * from customer",
}},
}
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600}
flags := &vtgatepb.VStreamFlags{
HeartbeatInterval: 3600,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
done := atomic.Bool{}
done.Store(false)

Expand All @@ -253,13 +264,18 @@ func testVStreamWithFailover(t *testing.T, failover bool) {

// first goroutine that keeps inserting rows into table being streamed until some time elapses after second PRS
go func() {
insertCount := 0
for {
if stopInserting.Load() {
return
}
insertMu.Lock()
id++
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
insertCount++
if insertCount%3 == 0 {
insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 40000+insertCount*10)
}
insertMu.Unlock()
}
}()
Expand Down Expand Up @@ -304,15 +320,15 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
case 1:
if failover {
insertMu.Lock()
output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", fmt.Sprintf("%s/0", defaultSourceKs), "--new-primary=zone1-101")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", defaultSourceKs+"/0", "--new-primary=zone1-101")
insertMu.Unlock()
log.Infof("output of first PRS is %s", output)
require.NoError(t, err)
}
case 2:
if failover {
insertMu.Lock()
output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", fmt.Sprintf("%s/0", defaultSourceKs), "--new-primary=zone1-100")
output, err := vc.VtctldClient.ExecuteCommandWithOutput("PlannedReparentShard", defaultSourceKs+"/0", "--new-primary=zone1-100")
insertMu.Unlock()
log.Infof("output of second PRS is %s", output)
require.NoError(t, err)
Expand Down Expand Up @@ -383,7 +399,7 @@ func insertRow(keyspace, table string, id int) {
if vtgateConn == nil {
return
}
vtgateConn.ExecuteFetch(fmt.Sprintf("use %s", keyspace), 1000, false)
vtgateConn.ExecuteFetch("use "+keyspace, 1000, false)
vtgateConn.ExecuteFetch("begin", 1000, false)
_, err := vtgateConn.ExecuteFetch(fmt.Sprintf("insert into %s (name) values ('%s%d')", table, table, id), 1000, false)
if err != nil {
Expand Down Expand Up @@ -440,7 +456,11 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID
Filter: "select * from customer",
}},
}
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600, StopOnReshard: stopOnReshard}
flags := &vtgatepb.VStreamFlags{
HeartbeatInterval: 3600,
StopOnReshard: stopOnReshard,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
done := false

id := 1000
Expand Down Expand Up @@ -580,7 +600,9 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
Match: "/customer.*/",
}},
}
flags := &vtgatepb.VStreamFlags{}
flags := &vtgatepb.VStreamFlags{
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
done := false

id := 1000
Expand Down Expand Up @@ -764,6 +786,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
}
flags := &vtgatepb.VStreamFlags{
IncludeReshardJournalEvents: true,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
journalEvents := 0

Expand All @@ -787,7 +810,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
case "0":
// We expect some for the sequence backing table, but don't care.
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
require.FailNow(t, "received event for unexpected shard: "+shard)
}
case binlogdatapb.VEventType_VGTID:
newVGTID = ev.GetVgtid()
Expand Down Expand Up @@ -845,7 +868,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
case "0":
// Again, we expect some for the sequence backing table, but don't care.
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
require.FailNow(t, "received event for unexpected shard: "+shard)
}
case binlogdatapb.VEventType_JOURNAL:
require.True(t, ev.Journal.MigrationType == binlogdatapb.MigrationType_SHARDS)
Expand Down Expand Up @@ -961,7 +984,8 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
}},
}
flags := &vtgatepb.VStreamFlags{
StopOnReshard: true,
StopOnReshard: true,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}

// Stream events but stop once we have a VGTID with positions for the old/original shards.
Expand All @@ -982,7 +1006,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
case "-80", "80-":
oldShardRowEvents++
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
require.FailNow(t, "received event for unexpected shard: "+shard)
}
case binlogdatapb.VEventType_VGTID:
newVGTID = ev.GetVgtid()
Expand Down Expand Up @@ -1039,7 +1063,7 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
switch shard {
case "-80", "80-":
default:
require.FailNow(t, fmt.Sprintf("received event for unexpected shard: %s", shard))
require.FailNow(t, "received event for unexpected shard: "+shard)
}
case binlogdatapb.VEventType_JOURNAL:
t.Logf("Journal event: %+v", ev)
Expand Down Expand Up @@ -1232,6 +1256,7 @@ func TestVStreamHeartbeats(t *testing.T) {
name: "With Keyspace Heartbeats On",
flags: &vtgatepb.VStreamFlags{
StreamKeyspaceHeartbeats: true,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
},
expectedHeartbeats: numExpectedHeartbeats,
},
Expand Down Expand Up @@ -1362,7 +1387,9 @@ func runVStreamAndGetNumOfRowEvents(t *testing.T, ctx context.Context, vstreamCo
vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, done chan struct{}) (copyPhaseRowEvents int, runningPhaseRowEvents int) {
copyPhase := true
func() {
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{})
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
})
require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down
Loading
Loading