From 6104252873adb30736f8d352ebd1a30434e5805e Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 31 Aug 2025 12:42:45 +0300 Subject: [PATCH 1/2] experimental: mutex protection for AppendGTIDInPlace Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/schema/engine.go | 6 ++++++ go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | 4 +++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 31646744c24..5bc93b21120 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -413,6 +413,12 @@ func (se *Engine) ReloadAtEx(ctx context.Context, pos replication.Position, incl return nil } +func (se *Engine) MutexProtected(f func()) { + se.mu.Lock() + defer se.mu.Unlock() + f() +} + func populateInnoDBStats(ctx context.Context, conn *connpool.Conn) (map[string]*Table, error) { innodbTableSizesQuery := conn.BaseShowInnodbTableSizes() if innodbTableSizesQuery == "" { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index 9a9b2a169ad..e8424ed95a0 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -491,7 +491,9 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev SequenceNumber: sequenceNumber, }) } - vs.pos = replication.AppendGTIDInPlace(vs.pos, gtid) + vs.se.MutexProtected(func() { + vs.pos = replication.AppendGTIDInPlace(vs.pos, gtid) + }) vs.commitParent = commitParent vs.sequenceNumber = sequenceNumber vs.eventGTID = gtid From 24e2a2e824af87331e23827018b0a7f6897c1903 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Sun, 31 Aug 2025 14:05:13 +0300 Subject: [PATCH 2/2] use AppendGTID Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- go/vt/vttablet/tabletserver/schema/engine.go | 6 ------ go/vt/vttablet/tabletserver/vstreamer/vstreamer.go | 4 +--- 2 files changed, 1 insertion(+), 9 deletions(-) diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 5bc93b21120..31646744c24 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -413,12 +413,6 @@ func (se *Engine) ReloadAtEx(ctx context.Context, pos replication.Position, incl return nil } -func (se *Engine) MutexProtected(f func()) { - se.mu.Lock() - defer se.mu.Unlock() - f() -} - func populateInnoDBStats(ctx context.Context, conn *connpool.Conn) (map[string]*Table, error) { innodbTableSizesQuery := conn.BaseShowInnodbTableSizes() if innodbTableSizesQuery == "" { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index e8424ed95a0..2602c4e4f6a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -491,9 +491,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent, bufferAndTransmit func(vev SequenceNumber: sequenceNumber, }) } - vs.se.MutexProtected(func() { - vs.pos = replication.AppendGTIDInPlace(vs.pos, gtid) - }) + vs.pos = replication.AppendGTID(vs.pos, gtid) vs.commitParent = commitParent vs.sequenceNumber = sequenceNumber vs.eventGTID = gtid