Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
80fb058
VStream: Prevent buffering entire transactions (OOM risk), instead se…
twthorn Oct 31, 2025
9b5b92f
VStream: Add large transaction test for VTGate
twthorn Nov 3, 2025
b2ff2d2
VStream: Add transaction chunk size flag for VTGate
twthorn Nov 4, 2025
840b95c
Fix lint static code checks
twthorn Nov 4, 2025
0c9ff21
Test transaction chunking in e2e tests, simplify lock logic
twthorn Nov 13, 2025
ac0f86e
Merge branch 'main' into fix_vtgate_oom
twthorn Nov 13, 2025
b6d9b5e
Add large transaction to trigger chunking in all tests in vreplicatio…
twthorn Nov 13, 2025
5e46b81
Fit data within column limits for e2e vstream tests
twthorn Nov 13, 2025
33b9bf1
Regenerate proto files
twthorn Nov 13, 2025
ee0e442
Fix unique key constraint for test
twthorn Nov 14, 2025
dc8901d
Clean up chunk testing rows
twthorn Nov 17, 2025
850e3cd
Only clean up necessary rows
twthorn Nov 17, 2025
50d5b3b
Wait for deletes to replicate
twthorn Nov 18, 2025
fd6e6b3
Clean up chunk testing rows immediately
twthorn Nov 18, 2025
97d149e
Minimize unnecessary changes
twthorn Nov 21, 2025
ccace81
Handle rollback, clean up comments used when debugging
twthorn Nov 25, 2025
a3bde55
Regenerate proto files
twthorn Nov 26, 2025
c05ae93
Prevent minimize skew with transaction chunking, add comments and logs
twthorn Dec 2, 2025
f392c83
Make VStream metrics test less brittle
twthorn Dec 3, 2025
dc91e10
Release lock after VStream ends
twthorn Dec 3, 2025
c334f00
Make transaction chunking opt-in
twthorn Dec 4, 2025
0fa4663
Clean up handling of chuking and minimize skew, add comments
twthorn Dec 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions go/test/endtoend/vreplication/initial_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import (
"fmt"
"math/rand/v2"
"os"
"strings"
"testing"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/vt/log"
)

Expand All @@ -43,6 +45,12 @@ func insertInitialData(t *testing.T) {
`[[VARCHAR("Monoprice") VARCHAR("eléctronics")] [VARCHAR("newegg") VARCHAR("elec†ronics")]]`)

insertJSONValues(t)

insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs+":0", 50000)
log.Infof("Inserted large transaction for chunking tests")

execVtgateQuery(t, vtgateConn, defaultSourceKs, "delete from customer where cid >= 50000 and cid < 50100")
log.Infof("Cleaned up chunk testing rows from source keyspace")
})
}

Expand Down Expand Up @@ -140,3 +148,15 @@ func insertIntoBlobTable(t *testing.T) {
execVtgateQuery(t, vtgateConn, defaultSourceKs+":0", query)
}
}

// insertLargeTransactionForChunkTesting inserts a transaction large enough to exceed the 1KB chunking threshold.
func insertLargeTransactionForChunkTesting(t *testing.T, vtgateConn *mysql.Conn, keyspace string, startID int) {
execVtgateQuery(t, vtgateConn, keyspace, "BEGIN")
for i := 0; i < 15; i++ {
largeData := strings.Repeat("x", 94) + fmt.Sprintf("_%05d", i)
query := fmt.Sprintf("INSERT INTO customer (cid, name) VALUES (%d, '%s')",
startID+i, largeData)
execVtgateQuery(t, vtgateConn, keyspace, query)
}
execVtgateQuery(t, vtgateConn, keyspace, "COMMIT")
}
39 changes: 33 additions & 6 deletions go/test/endtoend/vreplication/vstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
},
}
flags := &vtgatepb.VStreamFlags{
TablesToCopy: []string{"product", "customer"},
TablesToCopy: []string{"product", "customer"},
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
id := 0
vtgateConn := vc.GetVTGateConn(t)
Expand All @@ -95,6 +96,8 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
}

insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 10000)

// Stream events from the VStream API
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags)
require.NoError(t, err)
Expand Down Expand Up @@ -151,6 +154,7 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
stopInserting.Store(false)
var insertMu sync.Mutex
go func() {
insertCount := 0
for {
if stopInserting.Load() {
return
Expand All @@ -160,6 +164,10 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) {
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id))
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id))
insertCount++
if insertCount%5 == 0 {
insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 20000+insertCount*10)
}
insertMu.Unlock()
}
}()
Expand Down Expand Up @@ -239,7 +247,10 @@ func testVStreamWithFailover(t *testing.T, failover bool) {
Filter: "select * from customer",
}},
}
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600}
flags := &vtgatepb.VStreamFlags{
HeartbeatInterval: 3600,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
done := atomic.Bool{}
done.Store(false)

Expand All @@ -254,13 +265,18 @@ func testVStreamWithFailover(t *testing.T, failover bool) {

// first goroutine that keeps inserting rows into table being streamed until some time elapses after second PRS
go func() {
insertCount := 0
for {
if stopInserting.Load() {
return
}
insertMu.Lock()
id++
execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id))
insertCount++
if insertCount%3 == 0 {
insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 40000+insertCount*10)
}
insertMu.Unlock()
}
}()
Expand Down Expand Up @@ -441,7 +457,11 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID
Filter: "select * from customer",
}},
}
flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600, StopOnReshard: stopOnReshard}
flags := &vtgatepb.VStreamFlags{
HeartbeatInterval: 3600,
StopOnReshard: stopOnReshard,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
done := false

id := 1000
Expand Down Expand Up @@ -581,7 +601,9 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven
Match: "/customer.*/",
}},
}
flags := &vtgatepb.VStreamFlags{}
flags := &vtgatepb.VStreamFlags{
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
done := false

id := 1000
Expand Down Expand Up @@ -765,6 +787,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) {
}
flags := &vtgatepb.VStreamFlags{
IncludeReshardJournalEvents: true,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}
journalEvents := 0

Expand Down Expand Up @@ -962,7 +985,8 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) {
}},
}
flags := &vtgatepb.VStreamFlags{
StopOnReshard: true,
StopOnReshard: true,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
}

// Stream events but stop once we have a VGTID with positions for the old/original shards.
Expand Down Expand Up @@ -1233,6 +1257,7 @@ func TestVStreamHeartbeats(t *testing.T) {
name: "With Keyspace Heartbeats On",
flags: &vtgatepb.VStreamFlags{
StreamKeyspaceHeartbeats: true,
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
},
expectedHeartbeats: numExpectedHeartbeats,
},
Expand Down Expand Up @@ -1363,7 +1388,9 @@ func runVStreamAndGetNumOfRowEvents(t *testing.T, ctx context.Context, vstreamCo
vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, done chan struct{}) (copyPhaseRowEvents int, runningPhaseRowEvents int) {
copyPhase := true
func() {
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{})
reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{
TransactionChunkSize: 1024, // 1KB - test chunking for all transactions
})
require.NoError(t, err)
for {
evs, err := reader.Recv()
Expand Down
22 changes: 18 additions & 4 deletions go/vt/proto/vtgate/vtgate.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

28 changes: 28 additions & 0 deletions go/vt/proto/vtgate/vtgate_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading