From 0099ee36f39b43b255c03e157d311bfb9f87ed69 Mon Sep 17 00:00:00 2001 From: Steve Brown Date: Sun, 1 Mar 2026 19:35:50 -0500 Subject: [PATCH 1/2] fix(binlog): add GC safepoint for binlog producer to prevent chunk deletion during Prolly tree traversal The binlog producer's WorkingRootUpdated method traverses Prolly trees via prolly.DiffMaps to generate row events for connected replicas. If DOLT_GC runs concurrently, it can delete chunks that the binlog producer is actively traversing, causing "empty chunk returned from ChunkStore" panics. This adds a GCPausableListener interface that the binlog producer implements. Before GC runs, it calls Stop() which acquires a write lock, blocking until all in-flight WorkingRootUpdated calls (which hold read locks) complete. After GC finishes, Resume() releases the write lock, allowing new binlog events to be generated. Changes: - Add GCPausableListener interface to doltdb package - Implement Stop()/Resume() on binlogProducer using sync.RWMutex - Add RLock/RUnlock around WorkingRootUpdated's diff traversal - Pause GCPausableListeners in RunDoltGC before running GC - Comprehensive test suite for Stop/Resume/drain semantics --- go/libraries/doltcore/doltdb/doltdb.go | 14 ++ .../sqle/binlogreplication/binlog_producer.go | 32 +++ .../binlog_producer_gc_test.go | 228 ++++++++++++++++++ .../doltcore/sqle/dprocedures/dolt_gc.go | 18 ++ 4 files changed, 292 insertions(+) create mode 100644 go/libraries/doltcore/sqle/binlogreplication/binlog_producer_gc_test.go diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 082a76ae69b..5d28707628b 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1693,6 +1693,20 @@ type DatabaseUpdateListener interface { DatabaseDropped(ctx *sql.Context, databaseName string) error } +// GCPausableListener is an optional interface that DatabaseUpdateListeners can +// implement to participate in GC safepoints. Before GC runs, Stop() is called +// to prevent new work from starting and to wait for in-flight operations (such +// as Prolly tree traversals) to complete. After GC finishes, Resume() is called +// to allow operations to continue. +type GCPausableListener interface { + // Stop signals the listener to stop accepting new work. It returns a + // channel that will be closed when all in-flight operations have completed. + Stop() chan struct{} + + // Resume allows the listener to accept new work again after GC completes. + Resume() +} + var DatabaseUpdateListeners = make([]DatabaseUpdateListener, 0) // RegisterDatabaseUpdateListener registers |listener| to receive callbacks when databases are updated. diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go index 695e08cfe6c..2eed612d70a 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go @@ -54,9 +54,17 @@ type binlogProducer struct { logManager *logManager gtidSequence int64 binlogEventMeta mysql.BinlogEventMetadata + + // gcMu protects against GC deleting chunks while Prolly tree traversals + // (prolly.DiffMaps) are in flight. WorkingRootUpdated holds a read lock + // for the duration of its diff traversal. Stop() acquires a write lock, + // which blocks until all in-flight traversals complete and prevents new + // ones from starting. Resume() releases the write lock. + gcMu sync.RWMutex } var _ doltdb.DatabaseUpdateListener = (*binlogProducer)(nil) +var _ doltdb.GCPausableListener = (*binlogProducer)(nil) // NewBinlogProducer creates and returns a new instance of BinlogProducer. Note that callers must register the // returned binlogProducer as a DatabaseUpdateListener before it will start receiving database updates and start @@ -86,6 +94,24 @@ func (b *binlogProducer) LogManager(logManager *logManager) { b.logManager = logManager } +// Stop implements the doltdb.GCPausableListener interface. It prevents new WorkingRootUpdated calls from +// starting Prolly tree traversals and returns a channel that closes when all in-flight traversals have +// completed. This must be called before GC to prevent chunk deletion during active diff operations. +func (b *binlogProducer) Stop() chan struct{} { + ch := make(chan struct{}) + go func() { + b.gcMu.Lock() // blocks until all in-flight RLock holders (WorkingRootUpdated calls) release + close(ch) + }() + return ch +} + +// Resume implements the doltdb.GCPausableListener interface. It releases the write lock acquired by Stop(), +// allowing new WorkingRootUpdated calls to proceed. +func (b *binlogProducer) Resume() { + b.gcMu.Unlock() +} + // WorkingRootUpdated implements the doltdb.DatabaseUpdateListener interface. When a working root changes, // this function generates events for the binary log and sends them to all connected replicas. // @@ -102,6 +128,12 @@ func (b *binlogProducer) WorkingRootUpdated(ctx *sql.Context, databaseName strin return nil } + // Acquire read lock to prevent GC from deleting chunks while we traverse Prolly trees. + // Stop() acquires a write lock, so it will block until we release this read lock, and + // new calls here will block while GC holds the write lock. + b.gcMu.RLock() + defer b.gcMu.RUnlock() + var binlogEvents []mysql.BinlogEvent tableDeltas, err := diff.GetTableDeltas(ctx, before, after) if err != nil { diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_producer_gc_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer_gc_test.go new file mode 100644 index 00000000000..5a8b5dd7abb --- /dev/null +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer_gc_test.go @@ -0,0 +1,228 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlogreplication + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" +) + +// TestBinlogProducerImplementsGCPausableListener verifies the compile-time interface check. +func TestBinlogProducerImplementsGCPausableListener(t *testing.T) { + var _ doltdb.GCPausableListener = (*binlogProducer)(nil) +} + +// TestBinlogProducer_StopWithNoInflight tests that Stop() returns a channel that closes +// immediately when there are no in-flight WorkingRootUpdated operations. +func TestBinlogProducer_StopWithNoInflight(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + doneCh := bp.Stop() + select { + case <-doneCh: + // Channel closed — Stop() completed successfully + case <-time.After(time.Second): + t.Fatal("Stop() channel did not close when no in-flight operations exist") + } + + // While stopped, new read locks (simulating WorkingRootUpdated entry) should block + blocked := make(chan struct{}) + go func() { + bp.gcMu.RLock() + close(blocked) + bp.gcMu.RUnlock() + }() + + select { + case <-blocked: + t.Fatal("RLock should be blocked while GC write lock is held") + case <-time.After(100 * time.Millisecond): + // Expected: blocked because Stop() holds write lock + } + + // Resume should unblock the pending reader + bp.Resume() + select { + case <-blocked: + // Good — reader unblocked after Resume + case <-time.After(time.Second): + t.Fatal("RLock should be unblocked after Resume()") + } +} + +// TestBinlogProducer_StopWaitsForInflight tests that Stop() waits for in-flight +// Prolly tree traversals (simulated by holding a read lock) to complete before +// the returned channel closes. +func TestBinlogProducer_StopWaitsForInflight(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + // Simulate an in-flight WorkingRootUpdated holding the read lock + bp.gcMu.RLock() + + doneCh := bp.Stop() + + // The channel should NOT close yet because the in-flight operation holds the read lock + select { + case <-doneCh: + t.Fatal("Stop() channel should not close while in-flight operation holds read lock") + case <-time.After(100 * time.Millisecond): + // Expected: Stop is waiting for in-flight to complete + } + + // Release the in-flight read lock (simulating WorkingRootUpdated returning) + bp.gcMu.RUnlock() + + // Now the Stop() channel should close + select { + case <-doneCh: + // Good — Stop() completed after in-flight drained + case <-time.After(time.Second): + t.Fatal("Stop() channel should close after in-flight operation releases read lock") + } + + bp.Resume() +} + +// TestBinlogProducer_MultipleInflightDrain tests that Stop() waits for multiple +// concurrent in-flight operations to drain before completing. +func TestBinlogProducer_MultipleInflightDrain(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + // Simulate 3 concurrent in-flight WorkingRootUpdated calls + bp.gcMu.RLock() + bp.gcMu.RLock() + bp.gcMu.RLock() + + doneCh := bp.Stop() + + // Should not close yet + select { + case <-doneCh: + t.Fatal("Stop() should not complete with 3 in-flight operations") + case <-time.After(50 * time.Millisecond): + } + + // Release two — still one holding + bp.gcMu.RUnlock() + bp.gcMu.RUnlock() + + select { + case <-doneCh: + t.Fatal("Stop() should not complete with 1 remaining in-flight operation") + case <-time.After(50 * time.Millisecond): + } + + // Release the last one + bp.gcMu.RUnlock() + + select { + case <-doneCh: + // Good + case <-time.After(time.Second): + t.Fatal("Stop() should complete after all in-flight operations drain") + } + + bp.Resume() +} + +// TestBinlogProducer_StopResumeIdempotent tests that Stop/Resume can be called +// multiple times in sequence (as would happen with multiple GC cycles). +func TestBinlogProducer_StopResumeIdempotent(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + for i := 0; i < 3; i++ { + doneCh := bp.Stop() + select { + case <-doneCh: + case <-time.After(time.Second): + t.Fatalf("Stop() cycle %d did not complete", i) + } + bp.Resume() + } + + // After multiple cycles, read lock should still work normally + bp.gcMu.RLock() + bp.gcMu.RUnlock() +} + +// TestBinlogProducer_ConcurrentStopAndWorkingRootUpdated tests the race between +// Stop() being called and a new WorkingRootUpdated attempting to enter. +func TestBinlogProducer_ConcurrentStopAndWorkingRootUpdated(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + // Run multiple goroutines trying to acquire read locks while stop/resume cycles + var wg sync.WaitGroup + done := make(chan struct{}) + + // Readers simulating WorkingRootUpdated calls + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + bp.gcMu.RLock() + // Simulate brief work + time.Sleep(time.Microsecond) + bp.gcMu.RUnlock() + } + } + }() + } + + // Run a few GC stop/resume cycles concurrently with readers + for i := 0; i < 5; i++ { + doneCh := bp.Stop() + <-doneCh + // GC is running — all readers blocked + time.Sleep(time.Millisecond) + bp.Resume() + time.Sleep(time.Millisecond) + } + + close(done) + wg.Wait() + + // No panics or deadlocks — test passes by completing + assert.True(t, true) +} + +// TestDatabaseUpdateListeners_GCPausable verifies that the GCPausableListener +// type assertion works correctly against DatabaseUpdateListeners. +func TestDatabaseUpdateListeners_GCPausable(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + // Register and check type assertion + var listener doltdb.DatabaseUpdateListener = bp + pausable, ok := listener.(doltdb.GCPausableListener) + require.True(t, ok, "binlogProducer should implement GCPausableListener") + + doneCh := pausable.Stop() + select { + case <-doneCh: + case <-time.After(time.Second): + t.Fatal("Stop() via interface did not complete") + } + pausable.Resume() +} diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index 91a457a3282..b3084406acf 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -325,5 +325,23 @@ func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, cmp chun statsDoneCh: statsDoneCh, } } + // Stop any GC-pausable database update listeners (e.g. binlog producer) to prevent + // chunk deletion during in-flight Prolly tree traversals (prolly.DiffMaps). + var pausedListeners []doltdb.GCPausableListener + for _, listener := range doltdb.DatabaseUpdateListeners { + if pausable, ok := listener.(doltdb.GCPausableListener); ok { + doneCh := pausable.Stop() + <-doneCh + pausedListeners = append(pausedListeners, pausable) + } + } + if len(pausedListeners) > 0 { + defer func() { + for _, p := range pausedListeners { + p.Resume() + } + }() + } + return ddb.GC(ctx, mode, cmp, sc) } From b5c8599408bc51e7728dcfb9383df30ca5d1b311 Mon Sep 17 00:00:00 2001 From: aegis/crew/ellie Date: Mon, 2 Mar 2026 13:41:45 -0500 Subject: [PATCH 2/2] fix: handle zero-hash address in binlog TEXT/BLOB serialization Empty strings in TEXT/BLOB columns are stored with a zero hash address (no out-of-band chunk). The binlog serializer unconditionally called ReadBytes on this address, causing "empty chunk returned from ChunkStore" panic. Skip the ChunkStore lookup when the address is the zero hash and encode zero-length data instead. Co-Authored-By: Claude Opus 4.6 --- .../binlogreplication/binlog_type_serialization.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_type_serialization.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_type_serialization.go index c918bfa41fa..f2aa09042ba 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_type_serialization.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_type_serialization.go @@ -869,9 +869,15 @@ func encodeBytesFromAddress(ctx *sql.Context, addr hash.Hash, ns tree.NodeStore, if ns == nil { return nil, fmt.Errorf("nil NodeStore used to encode bytes from address") } - bytes, err := ns.ReadBytes(ctx, addr) - if err != nil { - return nil, err + + // A zero hash address means empty content (e.g. empty string stored without + // an out-of-band chunk). Skip the ChunkStore lookup and encode zero-length data. + var bytes []byte + if !addr.IsEmpty() { + bytes, err = ns.ReadBytes(ctx, addr) + if err != nil { + return nil, err + } } blobType := typ.(sql.StringType)