diff --git a/go/test/endtoend/vreplication/initial_data_test.go b/go/test/endtoend/vreplication/initial_data_test.go index d2ff6be0c63..9384c8be4b8 100644 --- a/go/test/endtoend/vreplication/initial_data_test.go +++ b/go/test/endtoend/vreplication/initial_data_test.go @@ -20,8 +20,10 @@ import ( "fmt" "math/rand/v2" "os" + "strings" "testing" + "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/log" ) @@ -43,6 +45,12 @@ func insertInitialData(t *testing.T) { `[[VARCHAR("Monoprice") VARCHAR("eléctronics")] [VARCHAR("newegg") VARCHAR("elec†ronics")]]`) insertJSONValues(t) + + insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs+":0", 50000) + log.Infof("Inserted large transaction for chunking tests") + + execVtgateQuery(t, vtgateConn, defaultSourceKs, "delete from customer where cid >= 50000 and cid < 50100") + log.Infof("Cleaned up chunk testing rows from source keyspace") }) } @@ -140,3 +148,15 @@ func insertIntoBlobTable(t *testing.T) { execVtgateQuery(t, vtgateConn, defaultSourceKs+":0", query) } } + +// insertLargeTransactionForChunkTesting inserts a transaction large enough to exceed the 1KB chunking threshold. +func insertLargeTransactionForChunkTesting(t *testing.T, vtgateConn *mysql.Conn, keyspace string, startID int) { + execVtgateQuery(t, vtgateConn, keyspace, "BEGIN") + for i := 0; i < 15; i++ { + largeData := strings.Repeat("x", 94) + fmt.Sprintf("_%05d", i) + query := fmt.Sprintf("INSERT INTO customer (cid, name) VALUES (%d, '%s')", + startID+i, largeData) + execVtgateQuery(t, vtgateConn, keyspace, query) + } + execVtgateQuery(t, vtgateConn, keyspace, "COMMIT") +} diff --git a/go/test/endtoend/vreplication/vstream_test.go b/go/test/endtoend/vreplication/vstream_test.go index b7877377d77..1f83dd0925a 100644 --- a/go/test/endtoend/vreplication/vstream_test.go +++ b/go/test/endtoend/vreplication/vstream_test.go @@ -80,7 +80,8 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) { }, } flags := &vtgatepb.VStreamFlags{ - TablesToCopy: []string{"product", "customer"}, + TablesToCopy: []string{"product", "customer"}, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions } id := 0 vtgateConn := vc.GetVTGateConn(t) @@ -95,6 +96,8 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) { execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id)) } + insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 10000) + // Stream events from the VStream API reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, flags) require.NoError(t, err) @@ -151,6 +154,7 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) { stopInserting.Store(false) var insertMu sync.Mutex go func() { + insertCount := 0 for { if stopInserting.Load() { return @@ -160,6 +164,10 @@ func TestVStreamWithTablesToSkipCopyFlag(t *testing.T) { execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id)) execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into product (pid, description) values (%d, 'description%d')", id+100, id)) execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into merchant (mname, category) values ('mname%d', 'category%d')", id+100, id)) + insertCount++ + if insertCount%5 == 0 { + insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 20000+insertCount*10) + } insertMu.Unlock() } }() @@ -239,7 +247,10 @@ func testVStreamWithFailover(t *testing.T, failover bool) { Filter: "select * from customer", }}, } - flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600} + flags := &vtgatepb.VStreamFlags{ + HeartbeatInterval: 3600, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions + } done := atomic.Bool{} done.Store(false) @@ -254,6 +265,7 @@ func testVStreamWithFailover(t *testing.T, failover bool) { // first goroutine that keeps inserting rows into table being streamed until some time elapses after second PRS go func() { + insertCount := 0 for { if stopInserting.Load() { return @@ -261,6 +273,10 @@ func testVStreamWithFailover(t *testing.T, failover bool) { insertMu.Lock() id++ execVtgateQuery(t, vtgateConn, defaultSourceKs, fmt.Sprintf("insert into customer (cid, name) values (%d, 'customer%d')", id+100, id)) + insertCount++ + if insertCount%3 == 0 { + insertLargeTransactionForChunkTesting(t, vtgateConn, defaultSourceKs, 40000+insertCount*10) + } insertMu.Unlock() } }() @@ -441,7 +457,11 @@ func testVStreamStopOnReshardFlag(t *testing.T, stopOnReshard bool, baseTabletID Filter: "select * from customer", }}, } - flags := &vtgatepb.VStreamFlags{HeartbeatInterval: 3600, StopOnReshard: stopOnReshard} + flags := &vtgatepb.VStreamFlags{ + HeartbeatInterval: 3600, + StopOnReshard: stopOnReshard, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions + } done := false id := 1000 @@ -581,7 +601,9 @@ func testVStreamCopyMultiKeyspaceReshard(t *testing.T, baseTabletID int) numEven Match: "/customer.*/", }}, } - flags := &vtgatepb.VStreamFlags{} + flags := &vtgatepb.VStreamFlags{ + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions + } done := false id := 1000 @@ -765,6 +787,7 @@ func TestMultiVStreamsKeyspaceReshard(t *testing.T) { } flags := &vtgatepb.VStreamFlags{ IncludeReshardJournalEvents: true, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions } journalEvents := 0 @@ -962,7 +985,8 @@ func TestMultiVStreamsKeyspaceStopOnReshard(t *testing.T) { }}, } flags := &vtgatepb.VStreamFlags{ - StopOnReshard: true, + StopOnReshard: true, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions } // Stream events but stop once we have a VGTID with positions for the old/original shards. @@ -1233,6 +1257,7 @@ func TestVStreamHeartbeats(t *testing.T) { name: "With Keyspace Heartbeats On", flags: &vtgatepb.VStreamFlags{ StreamKeyspaceHeartbeats: true, + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions }, expectedHeartbeats: numExpectedHeartbeats, }, @@ -1363,7 +1388,9 @@ func runVStreamAndGetNumOfRowEvents(t *testing.T, ctx context.Context, vstreamCo vgtid *binlogdatapb.VGtid, filter *binlogdatapb.Filter, done chan struct{}) (copyPhaseRowEvents int, runningPhaseRowEvents int) { copyPhase := true func() { - reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{}) + reader, err := vstreamConn.VStream(ctx, topodatapb.TabletType_PRIMARY, vgtid, filter, &vtgatepb.VStreamFlags{ + TransactionChunkSize: 1024, // 1KB - test chunking for all transactions + }) require.NoError(t, err) for { evs, err := reader.Recv() diff --git a/go/vt/proto/vtgate/vtgate.pb.go b/go/vt/proto/vtgate/vtgate.pb.go index b616d0aa6ad..7db2000f1da 100644 --- a/go/vt/proto/vtgate/vtgate.pb.go +++ b/go/vt/proto/vtgate/vtgate.pb.go @@ -1343,8 +1343,14 @@ type VStreamFlags struct { TablesToCopy []string `protobuf:"bytes,9,rep,name=tables_to_copy,json=tablesToCopy,proto3" json:"tables_to_copy,omitempty"` // Exclude the keyspace from the table name that is sent to the vstream client ExcludeKeyspaceFromTableName bool `protobuf:"varint,10,opt,name=exclude_keyspace_from_table_name,json=excludeKeyspaceFromTableName,proto3" json:"exclude_keyspace_from_table_name,omitempty"` - unknownFields protoimpl.UnknownFields - sizeCache protoimpl.SizeCache + // Transaction chunk threshold in bytes. When a transaction exceeds this size, + // VTGate will acquire a lock to ensure contiguous, non-interleaved delivery + // (BEGIN...ROW...COMMIT sent sequentially without mixing events from other shards). + // Events are still chunked to prevent OOM. Transactions smaller than this are sent + // without locking for better parallelism. + TransactionChunkSize int64 `protobuf:"varint,11,opt,name=transaction_chunk_size,json=transactionChunkSize,proto3" json:"transaction_chunk_size,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *VStreamFlags) Reset() { @@ -1447,6 +1453,13 @@ func (x *VStreamFlags) GetExcludeKeyspaceFromTableName() bool { return false } +func (x *VStreamFlags) GetTransactionChunkSize() int64 { + if x != nil { + return x.TransactionChunkSize + } + return 0 +} + // VStreamRequest is the payload for VStream. type VStreamRequest struct { state protoimpl.MessageState `protogen:"open.v1"` @@ -2013,7 +2026,7 @@ const file_vtgate_proto_rawDesc = "" + "\x19ResolveTransactionRequest\x12,\n" + "\tcaller_id\x18\x01 \x01(\v2\x0f.vtrpc.CallerIDR\bcallerId\x12\x12\n" + "\x04dtid\x18\x02 \x01(\tR\x04dtid\"\x1c\n" + - "\x1aResolveTransactionResponse\"\xdd\x03\n" + + "\x1aResolveTransactionResponse\"\x93\x04\n" + "\fVStreamFlags\x12#\n" + "\rminimize_skew\x18\x01 \x01(\bR\fminimizeSkew\x12-\n" + "\x12heartbeat_interval\x18\x02 \x01(\rR\x11heartbeatInterval\x12&\n" + @@ -2025,7 +2038,8 @@ const file_vtgate_proto_rawDesc = "" + "\x1einclude_reshard_journal_events\x18\b \x01(\bR\x1bincludeReshardJournalEvents\x12$\n" + "\x0etables_to_copy\x18\t \x03(\tR\ftablesToCopy\x12F\n" + " exclude_keyspace_from_table_name\x18\n" + - " \x01(\bR\x1cexcludeKeyspaceFromTableName\"\xf6\x01\n" + + " \x01(\bR\x1cexcludeKeyspaceFromTableName\x124\n" + + "\x16transaction_chunk_size\x18\v \x01(\x03R\x14transactionChunkSize\"\xf6\x01\n" + "\x0eVStreamRequest\x12,\n" + "\tcaller_id\x18\x01 \x01(\v2\x0f.vtrpc.CallerIDR\bcallerId\x125\n" + "\vtablet_type\x18\x02 \x01(\x0e2\x14.topodata.TabletTypeR\n" + diff --git a/go/vt/proto/vtgate/vtgate_vtproto.pb.go b/go/vt/proto/vtgate/vtgate_vtproto.pb.go index bc4d06c94ef..2f44eb4d9af 100644 --- a/go/vt/proto/vtgate/vtgate_vtproto.pb.go +++ b/go/vt/proto/vtgate/vtgate_vtproto.pb.go @@ -436,6 +436,7 @@ func (m *VStreamFlags) CloneVT() *VStreamFlags { r.StreamKeyspaceHeartbeats = m.StreamKeyspaceHeartbeats r.IncludeReshardJournalEvents = m.IncludeReshardJournalEvents r.ExcludeKeyspaceFromTableName = m.ExcludeKeyspaceFromTableName + r.TransactionChunkSize = m.TransactionChunkSize if rhs := m.TablesToCopy; rhs != nil { tmpContainer := make([]string, len(rhs)) copy(tmpContainer, rhs) @@ -1847,6 +1848,11 @@ func (m *VStreamFlags) MarshalToSizedBufferVT(dAtA []byte) (int, error) { i -= len(m.unknownFields) copy(dAtA[i:], m.unknownFields) } + if m.TransactionChunkSize != 0 { + i = protohelpers.EncodeVarint(dAtA, i, uint64(m.TransactionChunkSize)) + i-- + dAtA[i] = 0x58 + } if m.ExcludeKeyspaceFromTableName { i-- if m.ExcludeKeyspaceFromTableName { @@ -2794,6 +2800,9 @@ func (m *VStreamFlags) SizeVT() (n int) { if m.ExcludeKeyspaceFromTableName { n += 2 } + if m.TransactionChunkSize != 0 { + n += 1 + protohelpers.SizeOfVarint(uint64(m.TransactionChunkSize)) + } n += len(m.unknownFields) return n } @@ -6518,6 +6527,25 @@ func (m *VStreamFlags) UnmarshalVT(dAtA []byte) error { } } m.ExcludeKeyspaceFromTableName = bool(v != 0) + case 11: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field TransactionChunkSize", wireType) + } + m.TransactionChunkSize = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return protohelpers.ErrIntOverflow + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.TransactionChunkSize |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := protohelpers.Skip(dAtA[iNdEx:]) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 3e3195c885c..53b494c34ff 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -52,11 +52,12 @@ type vstreamManager struct { toposerv srvtopo.Server cell string - vstreamsCreated *stats.CountersWithMultiLabels - vstreamsLag *stats.GaugesWithMultiLabels - vstreamsCount *stats.CountersWithMultiLabels - vstreamsEventsStreamed *stats.CountersWithMultiLabels - vstreamsEndedWithErrors *stats.CountersWithMultiLabels + vstreamsCreated *stats.CountersWithMultiLabels + vstreamsLag *stats.GaugesWithMultiLabels + vstreamsCount *stats.CountersWithMultiLabels + vstreamsEventsStreamed *stats.CountersWithMultiLabels + vstreamsEndedWithErrors *stats.CountersWithMultiLabels + vstreamsTransactionsChunked *stats.CountersWithMultiLabels } // maxSkewTimeoutSeconds is the maximum allowed skew between two streams when the MinimizeSkew flag is set @@ -74,6 +75,11 @@ const stopOnReshardDelay = 500 * time.Millisecond // no events, including heartbeats, from any of the shards. var livenessTimeout = 10 * time.Minute +// defaultTransactionChunkSizeBytes is the default threshold for chunking transactions. +// 0 (the default value for protobuf int64) means disabled, clients must explicitly set a value to opt in for chunking. +// Eventually we plan to enable chunking by default, for now set to 0, which is the same as the protobuf default. +const defaultTransactionChunkSizeBytes = 0 + // vstream contains the metadata for one VStream request. type vstream struct { // mu protects parts of vgtid, the semantics of a send, and journaler. @@ -143,9 +149,17 @@ type vstream struct { // At what point, without any activity in the stream, should we consider it dead. streamLivenessTimer *time.Timer + // When a transaction exceeds this size, VStream acquires a lock to ensure contiguous, chunked delivery. + // Smaller transactions are sent without locking for better parallelism. + transactionChunkSizeBytes int + flags *vtgatepb.VStreamFlags } +func (vs *vstream) isChunkingEnabled() bool { + return vs.transactionChunkSizeBytes > 0 +} + type journalEvent struct { journal *binlogdatapb.Journal participants map[*binlogdatapb.ShardGtid]bool @@ -180,6 +194,10 @@ func newVStreamManager(resolver *srvtopo.Resolver, serv srvtopo.Server, cell str "VStreamsEndedWithErrors", "Number of vstreams that ended with errors", labels), + vstreamsTransactionsChunked: exporter.NewCountersWithMultiLabels( + "VStreamsTransactionsChunked", + "Number of transactions that exceeded TransactionChunkSize threshold and required locking for contiguous, chunked delivery", + labels), } } @@ -189,6 +207,9 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta if err != nil { return vterrors.Wrap(err, "failed to resolve vstream parameters") } + log.Infof("VStream flags: minimize_skew=%v, heartbeat_interval=%v, stop_on_reshard=%v, cells=%v, cell_preference=%v, tablet_order=%v, stream_keyspace_heartbeats=%v, include_reshard_journal_events=%v, tables_to_copy=%v, exclude_keyspace_from_table_name=%v, transaction_chunk_size=%v", + flags.GetMinimizeSkew(), flags.GetHeartbeatInterval(), flags.GetStopOnReshard(), flags.Cells, flags.CellPreference, flags.TabletOrder, + flags.GetStreamKeyspaceHeartbeats(), flags.GetIncludeReshardJournalEvents(), flags.TablesToCopy, flags.GetExcludeKeyspaceFromTableName(), flags.TransactionChunkSize) ts, err := vsm.toposerv.GetTopoServer() if err != nil { return vterrors.Wrap(err, "failed to get topology server") @@ -197,6 +218,13 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta log.Errorf("unable to get topo server in VStream()") return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unable to get topoology server") } + transactionChunkSizeBytes := defaultTransactionChunkSizeBytes + if flags.TransactionChunkSize > 0 && flags.GetMinimizeSkew() { + log.Warning("Minimize skew cannot be set with transaction chunk size (can cause deadlock), ignoring transaction chunk size.") + } else if flags.TransactionChunkSize > 0 { + transactionChunkSizeBytes = int(flags.TransactionChunkSize) + } + vs := &vstream{ vgtid: vgtid, tabletType: tabletType, @@ -215,6 +243,7 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta heartbeatInterval: flags.GetHeartbeatInterval(), ts: ts, copyCompletedShard: make(map[string]struct{}), + transactionChunkSizeBytes: transactionChunkSizeBytes, tabletPickerOptions: discovery.TabletPickerOptions{ CellPreference: flags.GetCellPreference(), TabletOrder: flags.GetTabletOrder(), @@ -687,6 +716,17 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha Options: options, } log.Infof("Starting to vstream from %s, with req %+v", tabletAliasString, req) + var txLockHeld bool + var inTransaction bool + var accumulatedSize int + + defer func() { + if txLockHeld { + vs.mu.Unlock() + txLockHeld = false + } + }() + err = tabletConn.VStream(ctx, req, func(events []*binlogdatapb.VEvent) error { // We received a valid event. Reset error count. errCount = 0 @@ -715,14 +755,22 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha sendevents := make([]*binlogdatapb.VEvent, 0, len(events)) for i, event := range events { vs.streamLivenessTimer.Reset(livenessTimeout) // Any event in the stream demonstrates liveness + accumulatedSize += event.SizeVT() switch event.Type { + case binlogdatapb.VEventType_BEGIN: + // Mark the start of a transaction. + // Also queue the events for sending to the client. + inTransaction = true + sendevents = append(sendevents, event) case binlogdatapb.VEventType_FIELD: ev := maybeUpdateTableName(event, sgtid.Keyspace, vs.flags.GetExcludeKeyspaceFromTableName(), extractFieldTableName) sendevents = append(sendevents, ev) case binlogdatapb.VEventType_ROW: ev := maybeUpdateTableName(event, sgtid.Keyspace, vs.flags.GetExcludeKeyspaceFromTableName(), extractRowTableName) sendevents = append(sendevents, ev) - case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER: + case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, binlogdatapb.VEventType_ROLLBACK: + inTransaction = false + accumulatedSize = 0 sendevents = append(sendevents, event) eventss = append(eventss, sendevents) @@ -730,9 +778,20 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha return vterrors.Wrap(err, aligningStreamsErr) } - if err := vs.sendAll(ctx, sgtid, eventss); err != nil { - log.Infof("vstream for %s/%s, error in sendAll: %v", sgtid.Keyspace, sgtid.Shard, err) - return vterrors.Wrap(err, sendingEventsErr) + var sendErr error + if vs.isChunkingEnabled() && txLockHeld { + // If chunking is enabled and we are holding the lock (only possible to acquire lock when chunking is enabled), then send the events. + sendErr = vs.sendEventsLocked(ctx, sgtid, eventss) + vs.mu.Unlock() + txLockHeld = false + } else { + // If chunking is not enabled or this transaction was small enough to not need chunking, + // fall back to default behavior of sending entire transaction atomically. + sendErr = vs.sendAll(ctx, sgtid, eventss) + } + if sendErr != nil { + log.Infof("vstream for %s/%s, error in sendAll: %v", sgtid.Keyspace, sgtid.Shard, sendErr) + return vterrors.Wrap(sendErr, sendingEventsErr) } eventss = nil sendevents = nil @@ -827,6 +886,41 @@ func (vs *vstream) streamFromTablet(ctx context.Context, sgtid *binlogdatapb.Sha if len(sendevents) != 0 { eventss = append(eventss, sendevents) } + + // If chunking is enabled, and we are holding the lock (only possible when enabled), and we are not in a transaction + // release the lock (this should not ever execute, acts as a safety check). + if vs.isChunkingEnabled() && txLockHeld && !inTransaction { + log.Warning("Detected held lock but not in a transaction, releasing the lock") + vs.mu.Unlock() + txLockHeld = false + } + + // If chunking is enabled, and we are holding the lock (only possible when chunking is enabled), send the events. + if vs.isChunkingEnabled() && txLockHeld && len(eventss) > 0 { + if err := vs.sendEventsLocked(ctx, sgtid, eventss); err != nil { + log.Infof("vstream for %s/%s, error in sendAll at end of callback: %v", sgtid.Keyspace, sgtid.Shard, err) + return vterrors.Wrap(err, sendingEventsErr) + } + eventss = nil + } + + // If chunking is enabled and we are in a transaction, and we do not yet hold the lock, and the accumulated size is greater than our chunk size + // then acquire the lock, so that we can send the events, and begin chunking the transaction. + if vs.isChunkingEnabled() && inTransaction && !txLockHeld && accumulatedSize > vs.transactionChunkSizeBytes { + log.Infof("vstream for %s/%s: transaction size %d bytes exceeds chunk size %d bytes, acquiring lock for contiguous, chunked delivery", + sgtid.Keyspace, sgtid.Shard, accumulatedSize, vs.transactionChunkSizeBytes) + vs.vsm.vstreamsTransactionsChunked.Add(labelValues, 1) + vs.mu.Lock() + txLockHeld = true + if len(eventss) > 0 { + if err := vs.sendEventsLocked(ctx, sgtid, eventss); err != nil { + log.Infof("vstream for %s/%s, error sending events after acquiring lock: %v", sgtid.Keyspace, sgtid.Shard, err) + return vterrors.Wrap(err, sendingEventsErr) + } + eventss = nil + } + } + return nil }) // If stream was ended (by a journal event), return nil without checking for error. @@ -942,6 +1036,11 @@ func (vs *vstream) shouldRetry(err error) (retry bool, ignoreTablet bool) { func (vs *vstream) sendAll(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { vs.mu.Lock() defer vs.mu.Unlock() + return vs.sendEventsLocked(ctx, sgtid, eventss) +} + +// sendEventsLocked sends events assuming vs.mu is already held by the caller. +func (vs *vstream) sendEventsLocked(ctx context.Context, sgtid *binlogdatapb.ShardGtid, eventss [][]*binlogdatapb.VEvent) error { labelValues := []string{sgtid.Keyspace, sgtid.Shard, vs.tabletType.String()} // Send all chunks while holding the lock. diff --git a/go/vt/vtgate/vstream_manager_test.go b/go/vt/vtgate/vstream_manager_test.go index 332e3c6aab4..04487917f79 100644 --- a/go/vt/vtgate/vstream_manager_test.go +++ b/go/vt/vtgate/vstream_manager_test.go @@ -472,6 +472,162 @@ func TestVStreamChunks(t *testing.T) { require.Equal(t, 100, ddlCount) } +// Verifies that large chunked transactions from one shard +// are not interleaved with events from other shards. +func TestVStreamChunksOverSizeThreshold(t *testing.T) { + ctx := t.Context() + ks := "TestVStream" + cell := "aa" + _ = createSandbox(ks) + hc := discovery.NewFakeHealthCheck(nil) + st := getSandboxTopo(ctx, cell, ks, []string{"-20", "20-40"}) + vsm := newTestVStreamManager(ctx, hc, st, cell) + vsm.vstreamsTransactionsChunked.ResetAll() + sbc0 := hc.AddTestTablet("aa", "1.1.1.1", 1001, ks, "-20", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "-20", sbc0.Tablet()) + sbc1 := hc.AddTestTablet("aa", "1.1.1.1", 1002, ks, "20-40", topodatapb.TabletType_PRIMARY, true, 1, nil) + addTabletToSandboxTopo(t, ctx, st, ks, "20-40", sbc1.Tablet()) + + rowData := make([]byte, 100) + for i := range rowData { + rowData[i] = byte(i % 256) + } + + sbc0.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_BEGIN}}, nil) + for range 50 { + sbc0.AddVStreamEvents([]*binlogdatapb.VEvent{{ + Type: binlogdatapb.VEventType_ROW, + RowEvent: &binlogdatapb.RowEvent{ + TableName: "shard0_table", + RowChanges: []*binlogdatapb.RowChange{{ + After: &querypb.Row{ + Lengths: []int64{int64(len(rowData))}, + Values: rowData, + }, + }}, + }, + }}, nil) + } + + sbc1.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_BEGIN}}, nil) + sbc1.AddVStreamEvents([]*binlogdatapb.VEvent{{ + Type: binlogdatapb.VEventType_ROW, + RowEvent: &binlogdatapb.RowEvent{ + TableName: "shard0_table", + RowChanges: []*binlogdatapb.RowChange{{ + After: &querypb.Row{ + Lengths: []int64{8}, + Values: rowData[:8], + }, + }}, + }, + }}, nil) + sbc1.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_COMMIT}}, nil) + + for range 50 { + sbc0.AddVStreamEvents([]*binlogdatapb.VEvent{{ + Type: binlogdatapb.VEventType_ROW, + RowEvent: &binlogdatapb.RowEvent{ + TableName: "shard0_table", + RowChanges: []*binlogdatapb.RowChange{{ + After: &querypb.Row{ + Lengths: []int64{int64(len(rowData))}, + Values: rowData, + }, + }}, + }, + }}, nil) + } + sbc0.AddVStreamEvents([]*binlogdatapb.VEvent{{Type: binlogdatapb.VEventType_COMMIT}}, nil) + + vgtid := &binlogdatapb.VGtid{ + ShardGtids: []*binlogdatapb.ShardGtid{{ + Keyspace: ks, + Shard: "-20", + Gtid: "pos", + }, { + Keyspace: ks, + Shard: "20-40", + Gtid: "pos", + }}, + } + + vstreamCtx, vstreamCancel := context.WithCancel(ctx) + defer vstreamCancel() + + // Track transaction states + type txState struct { + shard string + hasBegin bool + hasCommit bool + rowCount int + } + var currentTx *txState + var completedTxs []*txState + + flags := &vtgatepb.VStreamFlags{ + TransactionChunkSize: 1024, + } + + err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, flags, func(events []*binlogdatapb.VEvent) error { + for _, event := range events { + switch event.Type { + case binlogdatapb.VEventType_VGTID: + if event.Keyspace != "" && event.Shard != "" { + shard := event.Keyspace + "/" + event.Shard + if currentTx != nil && currentTx.shard != "" && currentTx.shard != shard { + return fmt.Errorf("VGTID from shard %s while transaction from shard %s is in progress (interleaving detected)", shard, currentTx.shard) + } + if currentTx != nil && currentTx.shard == "" { + currentTx.shard = shard + } + } + case binlogdatapb.VEventType_BEGIN: + if currentTx != nil && !currentTx.hasCommit { + return fmt.Errorf("BEGIN received while transaction %s is still open (interleaving detected)", currentTx.shard) + } + currentTx = &txState{hasBegin: true} + case binlogdatapb.VEventType_ROW: + if currentTx == nil { + return errors.New("ROW event outside transaction") + } + currentTx.rowCount++ + case binlogdatapb.VEventType_COMMIT: + if currentTx == nil { + return errors.New("COMMIT without BEGIN") + } + currentTx.hasCommit = true + completedTxs = append(completedTxs, currentTx) + t.Logf("COMMIT transaction for shard %s (rows=%d, completed_txs=%d)", currentTx.shard, currentTx.rowCount, len(completedTxs)) + currentTx = nil + default: + } + } + + if len(completedTxs) == 2 { + vstreamCancel() + } + + return nil + }) + + require.Error(t, err) + require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) + require.Equal(t, 2, len(completedTxs), "Should receive both transactions") + + var rowCounts []int + for _, tx := range completedTxs { + require.True(t, tx.hasBegin, "Transaction should have BEGIN") + require.True(t, tx.hasCommit, "Transaction should have COMMIT") + rowCounts = append(rowCounts, tx.rowCount) + } + require.ElementsMatch(t, []int{1, 100}, rowCounts, "Should have one transaction with 1 row and one with 100 rows") + + chunkedCounts := vsm.vstreamsTransactionsChunked.Counts() + require.Contains(t, chunkedCounts, "TestVStream.-20.PRIMARY", "Should have chunked transaction metric for -20 shard") + require.GreaterOrEqual(t, chunkedCounts["TestVStream.-20.PRIMARY"], int64(1), "Should have at least one chunked transaction") +} + func TestVStreamMulti(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -606,14 +762,7 @@ func TestVStreamsMetrics(t *testing.T) { err := vsm.VStream(vstreamCtx, topodatapb.TabletType_PRIMARY, vgtid, nil, &vtgatepb.VStreamFlags{}, func(events []*binlogdatapb.VEvent) error { receivedResponses = append(receivedResponses, &binlogdatapb.VStreamResponse{Events: events}) - // While the VStream is running, we should see one active stream per shard. - require.Equal(t, map[string]int64{ - expectedLabels1: 1, - expectedLabels2: 1, - }, vsm.vstreamsCount.Counts()) - if len(receivedResponses) == 2 { - // Stop streaming after receiving both expected responses. vstreamCancel() } @@ -622,34 +771,37 @@ func TestVStreamsMetrics(t *testing.T) { require.Error(t, err) require.ErrorIs(t, vterrors.UnwrapAll(err), context.Canceled) - require.Equal(t, 2, len(receivedResponses)) - // After the streams end, the count should go back to zero. - require.Equal(t, map[string]int64{ - expectedLabels1: 0, - expectedLabels2: 0, - }, vsm.vstreamsCount.Counts()) - - require.Equal(t, map[string]int64{ - expectedLabels1: 1, - expectedLabels2: 1, - }, vsm.vstreamsCreated.Counts()) - - require.Equal(t, map[string]int64{ - expectedLabels1: 5, - expectedLabels2: 7, - }, vsm.vstreamsLag.Counts()) - - require.Equal(t, map[string]int64{ - expectedLabels1: 2, - expectedLabels2: 2, - }, vsm.vstreamsEventsStreamed.Counts()) - - require.Equal(t, map[string]int64{ - expectedLabels1: 0, - expectedLabels2: 0, - }, vsm.vstreamsEndedWithErrors.Counts()) + counts := vsm.vstreamsCount.Counts() + require.Contains(t, counts, expectedLabels1, "Should have count for shard -20") + require.Contains(t, counts, expectedLabels2, "Should have count for shard 20-40") + require.Equal(t, int64(0), counts[expectedLabels1], "Shard -20 should have 0 active streams after completion") + require.Equal(t, int64(0), counts[expectedLabels2], "Shard 20-40 should have 0 active streams after completion") + + created := vsm.vstreamsCreated.Counts() + require.Contains(t, created, expectedLabels1, "Should have created count for shard -20") + require.Contains(t, created, expectedLabels2, "Should have created count for shard 20-40") + require.Equal(t, int64(1), created[expectedLabels1], "Shard -20 should have created 1 stream") + require.Equal(t, int64(1), created[expectedLabels2], "Shard 20-40 should have created 1 stream") + + lag := vsm.vstreamsLag.Counts() + require.Contains(t, lag, expectedLabels1, "Should have lag for shard -20") + require.Contains(t, lag, expectedLabels2, "Should have lag for shard 20-40") + require.Equal(t, int64(5), lag[expectedLabels1], "Shard -20 should have lag of 5") + require.Equal(t, int64(7), lag[expectedLabels2], "Shard 20-40 should have lag of 7") + + streamed := vsm.vstreamsEventsStreamed.Counts() + require.Contains(t, streamed, expectedLabels1, "Should have events streamed for shard -20") + require.Contains(t, streamed, expectedLabels2, "Should have events streamed for shard 20-40") + require.Equal(t, int64(2), streamed[expectedLabels1], "Shard -20 should have streamed 2 events") + require.Equal(t, int64(2), streamed[expectedLabels2], "Shard 20-40 should have streamed 2 events") + + errors := vsm.vstreamsEndedWithErrors.Counts() + require.Contains(t, errors, expectedLabels1, "Should have error count for shard -20") + require.Contains(t, errors, expectedLabels2, "Should have error count for shard 20-40") + require.Equal(t, int64(0), errors[expectedLabels1], "Shard -20 should have 0 errors") + require.Equal(t, int64(0), errors[expectedLabels2], "Shard 20-40 should have 0 errors") } func TestVStreamsMetricsErrors(t *testing.T) { diff --git a/proto/vtgate.proto b/proto/vtgate.proto index 06edb7feb62..3336298e878 100644 --- a/proto/vtgate.proto +++ b/proto/vtgate.proto @@ -374,6 +374,12 @@ message VStreamFlags { repeated string tables_to_copy = 9; // Exclude the keyspace from the table name that is sent to the vstream client bool exclude_keyspace_from_table_name = 10; + // Transaction chunk threshold in bytes. When a transaction exceeds this size, + // VTGate will acquire a lock to ensure contiguous, non-interleaved delivery + // (BEGIN...ROW...COMMIT sent sequentially without mixing events from other shards). + // Events are still chunked to prevent OOM. Transactions smaller than this are sent + // without locking for better parallelism. + int64 transaction_chunk_size = 11; } // VStreamRequest is the payload for VStream.