diff --git a/go/libraries/doltcore/doltdb/doltdb.go b/go/libraries/doltcore/doltdb/doltdb.go index 082a76ae69b..5d28707628b 100644 --- a/go/libraries/doltcore/doltdb/doltdb.go +++ b/go/libraries/doltcore/doltdb/doltdb.go @@ -1693,6 +1693,20 @@ type DatabaseUpdateListener interface { DatabaseDropped(ctx *sql.Context, databaseName string) error } +// GCPausableListener is an optional interface that DatabaseUpdateListeners can +// implement to participate in GC safepoints. Before GC runs, Stop() is called +// to prevent new work from starting and to wait for in-flight operations (such +// as Prolly tree traversals) to complete. After GC finishes, Resume() is called +// to allow operations to continue. +type GCPausableListener interface { + // Stop signals the listener to stop accepting new work. It returns a + // channel that will be closed when all in-flight operations have completed. + Stop() chan struct{} + + // Resume allows the listener to accept new work again after GC completes. + Resume() +} + var DatabaseUpdateListeners = make([]DatabaseUpdateListener, 0) // RegisterDatabaseUpdateListener registers |listener| to receive callbacks when databases are updated. diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go index 695e08cfe6c..2eed612d70a 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go @@ -54,9 +54,17 @@ type binlogProducer struct { logManager *logManager gtidSequence int64 binlogEventMeta mysql.BinlogEventMetadata + + // gcMu protects against GC deleting chunks while Prolly tree traversals + // (prolly.DiffMaps) are in flight. WorkingRootUpdated holds a read lock + // for the duration of its diff traversal. Stop() acquires a write lock, + // which blocks until all in-flight traversals complete and prevents new + // ones from starting. Resume() releases the write lock. + gcMu sync.RWMutex } var _ doltdb.DatabaseUpdateListener = (*binlogProducer)(nil) +var _ doltdb.GCPausableListener = (*binlogProducer)(nil) // NewBinlogProducer creates and returns a new instance of BinlogProducer. Note that callers must register the // returned binlogProducer as a DatabaseUpdateListener before it will start receiving database updates and start @@ -86,6 +94,24 @@ func (b *binlogProducer) LogManager(logManager *logManager) { b.logManager = logManager } +// Stop implements the doltdb.GCPausableListener interface. It prevents new WorkingRootUpdated calls from +// starting Prolly tree traversals and returns a channel that closes when all in-flight traversals have +// completed. This must be called before GC to prevent chunk deletion during active diff operations. +func (b *binlogProducer) Stop() chan struct{} { + ch := make(chan struct{}) + go func() { + b.gcMu.Lock() // blocks until all in-flight RLock holders (WorkingRootUpdated calls) release + close(ch) + }() + return ch +} + +// Resume implements the doltdb.GCPausableListener interface. It releases the write lock acquired by Stop(), +// allowing new WorkingRootUpdated calls to proceed. +func (b *binlogProducer) Resume() { + b.gcMu.Unlock() +} + // WorkingRootUpdated implements the doltdb.DatabaseUpdateListener interface. When a working root changes, // this function generates events for the binary log and sends them to all connected replicas. // @@ -102,6 +128,12 @@ func (b *binlogProducer) WorkingRootUpdated(ctx *sql.Context, databaseName strin return nil } + // Acquire read lock to prevent GC from deleting chunks while we traverse Prolly trees. + // Stop() acquires a write lock, so it will block until we release this read lock, and + // new calls here will block while GC holds the write lock. + b.gcMu.RLock() + defer b.gcMu.RUnlock() + var binlogEvents []mysql.BinlogEvent tableDeltas, err := diff.GetTableDeltas(ctx, before, after) if err != nil { diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_producer_gc_test.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer_gc_test.go new file mode 100644 index 00000000000..5a8b5dd7abb --- /dev/null +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_producer_gc_test.go @@ -0,0 +1,228 @@ +// Copyright 2024 Dolthub, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package binlogreplication + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/dolthub/dolt/go/libraries/doltcore/doltdb" +) + +// TestBinlogProducerImplementsGCPausableListener verifies the compile-time interface check. +func TestBinlogProducerImplementsGCPausableListener(t *testing.T) { + var _ doltdb.GCPausableListener = (*binlogProducer)(nil) +} + +// TestBinlogProducer_StopWithNoInflight tests that Stop() returns a channel that closes +// immediately when there are no in-flight WorkingRootUpdated operations. +func TestBinlogProducer_StopWithNoInflight(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + doneCh := bp.Stop() + select { + case <-doneCh: + // Channel closed — Stop() completed successfully + case <-time.After(time.Second): + t.Fatal("Stop() channel did not close when no in-flight operations exist") + } + + // While stopped, new read locks (simulating WorkingRootUpdated entry) should block + blocked := make(chan struct{}) + go func() { + bp.gcMu.RLock() + close(blocked) + bp.gcMu.RUnlock() + }() + + select { + case <-blocked: + t.Fatal("RLock should be blocked while GC write lock is held") + case <-time.After(100 * time.Millisecond): + // Expected: blocked because Stop() holds write lock + } + + // Resume should unblock the pending reader + bp.Resume() + select { + case <-blocked: + // Good — reader unblocked after Resume + case <-time.After(time.Second): + t.Fatal("RLock should be unblocked after Resume()") + } +} + +// TestBinlogProducer_StopWaitsForInflight tests that Stop() waits for in-flight +// Prolly tree traversals (simulated by holding a read lock) to complete before +// the returned channel closes. +func TestBinlogProducer_StopWaitsForInflight(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + // Simulate an in-flight WorkingRootUpdated holding the read lock + bp.gcMu.RLock() + + doneCh := bp.Stop() + + // The channel should NOT close yet because the in-flight operation holds the read lock + select { + case <-doneCh: + t.Fatal("Stop() channel should not close while in-flight operation holds read lock") + case <-time.After(100 * time.Millisecond): + // Expected: Stop is waiting for in-flight to complete + } + + // Release the in-flight read lock (simulating WorkingRootUpdated returning) + bp.gcMu.RUnlock() + + // Now the Stop() channel should close + select { + case <-doneCh: + // Good — Stop() completed after in-flight drained + case <-time.After(time.Second): + t.Fatal("Stop() channel should close after in-flight operation releases read lock") + } + + bp.Resume() +} + +// TestBinlogProducer_MultipleInflightDrain tests that Stop() waits for multiple +// concurrent in-flight operations to drain before completing. +func TestBinlogProducer_MultipleInflightDrain(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + // Simulate 3 concurrent in-flight WorkingRootUpdated calls + bp.gcMu.RLock() + bp.gcMu.RLock() + bp.gcMu.RLock() + + doneCh := bp.Stop() + + // Should not close yet + select { + case <-doneCh: + t.Fatal("Stop() should not complete with 3 in-flight operations") + case <-time.After(50 * time.Millisecond): + } + + // Release two — still one holding + bp.gcMu.RUnlock() + bp.gcMu.RUnlock() + + select { + case <-doneCh: + t.Fatal("Stop() should not complete with 1 remaining in-flight operation") + case <-time.After(50 * time.Millisecond): + } + + // Release the last one + bp.gcMu.RUnlock() + + select { + case <-doneCh: + // Good + case <-time.After(time.Second): + t.Fatal("Stop() should complete after all in-flight operations drain") + } + + bp.Resume() +} + +// TestBinlogProducer_StopResumeIdempotent tests that Stop/Resume can be called +// multiple times in sequence (as would happen with multiple GC cycles). +func TestBinlogProducer_StopResumeIdempotent(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + for i := 0; i < 3; i++ { + doneCh := bp.Stop() + select { + case <-doneCh: + case <-time.After(time.Second): + t.Fatalf("Stop() cycle %d did not complete", i) + } + bp.Resume() + } + + // After multiple cycles, read lock should still work normally + bp.gcMu.RLock() + bp.gcMu.RUnlock() +} + +// TestBinlogProducer_ConcurrentStopAndWorkingRootUpdated tests the race between +// Stop() being called and a new WorkingRootUpdated attempting to enter. +func TestBinlogProducer_ConcurrentStopAndWorkingRootUpdated(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + // Run multiple goroutines trying to acquire read locks while stop/resume cycles + var wg sync.WaitGroup + done := make(chan struct{}) + + // Readers simulating WorkingRootUpdated calls + for i := 0; i < 5; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-done: + return + default: + bp.gcMu.RLock() + // Simulate brief work + time.Sleep(time.Microsecond) + bp.gcMu.RUnlock() + } + } + }() + } + + // Run a few GC stop/resume cycles concurrently with readers + for i := 0; i < 5; i++ { + doneCh := bp.Stop() + <-doneCh + // GC is running — all readers blocked + time.Sleep(time.Millisecond) + bp.Resume() + time.Sleep(time.Millisecond) + } + + close(done) + wg.Wait() + + // No panics or deadlocks — test passes by completing + assert.True(t, true) +} + +// TestDatabaseUpdateListeners_GCPausable verifies that the GCPausableListener +// type assertion works correctly against DatabaseUpdateListeners. +func TestDatabaseUpdateListeners_GCPausable(t *testing.T) { + bp := &binlogProducer{mu: &sync.Mutex{}} + + // Register and check type assertion + var listener doltdb.DatabaseUpdateListener = bp + pausable, ok := listener.(doltdb.GCPausableListener) + require.True(t, ok, "binlogProducer should implement GCPausableListener") + + doneCh := pausable.Stop() + select { + case <-doneCh: + case <-time.After(time.Second): + t.Fatal("Stop() via interface did not complete") + } + pausable.Resume() +} diff --git a/go/libraries/doltcore/sqle/binlogreplication/binlog_type_serialization.go b/go/libraries/doltcore/sqle/binlogreplication/binlog_type_serialization.go index c918bfa41fa..f2aa09042ba 100644 --- a/go/libraries/doltcore/sqle/binlogreplication/binlog_type_serialization.go +++ b/go/libraries/doltcore/sqle/binlogreplication/binlog_type_serialization.go @@ -869,9 +869,15 @@ func encodeBytesFromAddress(ctx *sql.Context, addr hash.Hash, ns tree.NodeStore, if ns == nil { return nil, fmt.Errorf("nil NodeStore used to encode bytes from address") } - bytes, err := ns.ReadBytes(ctx, addr) - if err != nil { - return nil, err + + // A zero hash address means empty content (e.g. empty string stored without + // an out-of-band chunk). Skip the ChunkStore lookup and encode zero-length data. + var bytes []byte + if !addr.IsEmpty() { + bytes, err = ns.ReadBytes(ctx, addr) + if err != nil { + return nil, err + } } blobType := typ.(sql.StringType) diff --git a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go index 91a457a3282..b3084406acf 100644 --- a/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go +++ b/go/libraries/doltcore/sqle/dprocedures/dolt_gc.go @@ -325,5 +325,23 @@ func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, cmp chun statsDoneCh: statsDoneCh, } } + // Stop any GC-pausable database update listeners (e.g. binlog producer) to prevent + // chunk deletion during in-flight Prolly tree traversals (prolly.DiffMaps). + var pausedListeners []doltdb.GCPausableListener + for _, listener := range doltdb.DatabaseUpdateListeners { + if pausable, ok := listener.(doltdb.GCPausableListener); ok { + doneCh := pausable.Stop() + <-doneCh + pausedListeners = append(pausedListeners, pausable) + } + } + if len(pausedListeners) > 0 { + defer func() { + for _, p := range pausedListeners { + p.Resume() + } + }() + } + return ddb.GC(ctx, mode, cmp, sc) }