Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions go/libraries/doltcore/doltdb/doltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -1693,6 +1693,20 @@ type DatabaseUpdateListener interface {
DatabaseDropped(ctx *sql.Context, databaseName string) error
}

// GCPausableListener is an optional interface that DatabaseUpdateListeners can
// implement to participate in GC safepoints. Before GC runs, Stop() is called
// to prevent new work from starting and to wait for in-flight operations (such
// as Prolly tree traversals) to complete. After GC finishes, Resume() is called
// to allow operations to continue.
type GCPausableListener interface {
// Stop signals the listener to stop accepting new work. It returns a
// channel that will be closed when all in-flight operations have completed.
Stop() chan struct{}

// Resume allows the listener to accept new work again after GC completes.
Resume()
}

var DatabaseUpdateListeners = make([]DatabaseUpdateListener, 0)

// RegisterDatabaseUpdateListener registers |listener| to receive callbacks when databases are updated.
Expand Down
32 changes: 32 additions & 0 deletions go/libraries/doltcore/sqle/binlogreplication/binlog_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,17 @@ type binlogProducer struct {
logManager *logManager
gtidSequence int64
binlogEventMeta mysql.BinlogEventMetadata

// gcMu protects against GC deleting chunks while Prolly tree traversals
// (prolly.DiffMaps) are in flight. WorkingRootUpdated holds a read lock
// for the duration of its diff traversal. Stop() acquires a write lock,
// which blocks until all in-flight traversals complete and prevents new
// ones from starting. Resume() releases the write lock.
gcMu sync.RWMutex
}

var _ doltdb.DatabaseUpdateListener = (*binlogProducer)(nil)
var _ doltdb.GCPausableListener = (*binlogProducer)(nil)

// NewBinlogProducer creates and returns a new instance of BinlogProducer. Note that callers must register the
// returned binlogProducer as a DatabaseUpdateListener before it will start receiving database updates and start
Expand Down Expand Up @@ -86,6 +94,24 @@ func (b *binlogProducer) LogManager(logManager *logManager) {
b.logManager = logManager
}

// Stop implements the doltdb.GCPausableListener interface. It prevents new WorkingRootUpdated calls from
// starting Prolly tree traversals and returns a channel that closes when all in-flight traversals have
// completed. This must be called before GC to prevent chunk deletion during active diff operations.
func (b *binlogProducer) Stop() chan struct{} {
ch := make(chan struct{})
go func() {
b.gcMu.Lock() // blocks until all in-flight RLock holders (WorkingRootUpdated calls) release
close(ch)
}()
return ch
}

// Resume implements the doltdb.GCPausableListener interface. It releases the write lock acquired by Stop(),
// allowing new WorkingRootUpdated calls to proceed.
func (b *binlogProducer) Resume() {
b.gcMu.Unlock()
}

// WorkingRootUpdated implements the doltdb.DatabaseUpdateListener interface. When a working root changes,
// this function generates events for the binary log and sends them to all connected replicas.
//
Expand All @@ -102,6 +128,12 @@ func (b *binlogProducer) WorkingRootUpdated(ctx *sql.Context, databaseName strin
return nil
}

// Acquire read lock to prevent GC from deleting chunks while we traverse Prolly trees.
// Stop() acquires a write lock, so it will block until we release this read lock, and
// new calls here will block while GC holds the write lock.
b.gcMu.RLock()
defer b.gcMu.RUnlock()

var binlogEvents []mysql.BinlogEvent
tableDeltas, err := diff.GetTableDeltas(ctx, before, after)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
// Copyright 2024 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package binlogreplication

import (
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/dolthub/dolt/go/libraries/doltcore/doltdb"
)

// TestBinlogProducerImplementsGCPausableListener verifies the compile-time interface check.
func TestBinlogProducerImplementsGCPausableListener(t *testing.T) {
var _ doltdb.GCPausableListener = (*binlogProducer)(nil)
}

// TestBinlogProducer_StopWithNoInflight tests that Stop() returns a channel that closes
// immediately when there are no in-flight WorkingRootUpdated operations.
func TestBinlogProducer_StopWithNoInflight(t *testing.T) {
bp := &binlogProducer{mu: &sync.Mutex{}}

doneCh := bp.Stop()
select {
case <-doneCh:
// Channel closed — Stop() completed successfully
case <-time.After(time.Second):
t.Fatal("Stop() channel did not close when no in-flight operations exist")
}

// While stopped, new read locks (simulating WorkingRootUpdated entry) should block
blocked := make(chan struct{})
go func() {
bp.gcMu.RLock()
close(blocked)
bp.gcMu.RUnlock()
}()

select {
case <-blocked:
t.Fatal("RLock should be blocked while GC write lock is held")
case <-time.After(100 * time.Millisecond):
// Expected: blocked because Stop() holds write lock
}

// Resume should unblock the pending reader
bp.Resume()
select {
case <-blocked:
// Good — reader unblocked after Resume
case <-time.After(time.Second):
t.Fatal("RLock should be unblocked after Resume()")
}
}

// TestBinlogProducer_StopWaitsForInflight tests that Stop() waits for in-flight
// Prolly tree traversals (simulated by holding a read lock) to complete before
// the returned channel closes.
func TestBinlogProducer_StopWaitsForInflight(t *testing.T) {
bp := &binlogProducer{mu: &sync.Mutex{}}

// Simulate an in-flight WorkingRootUpdated holding the read lock
bp.gcMu.RLock()

doneCh := bp.Stop()

// The channel should NOT close yet because the in-flight operation holds the read lock
select {
case <-doneCh:
t.Fatal("Stop() channel should not close while in-flight operation holds read lock")
case <-time.After(100 * time.Millisecond):
// Expected: Stop is waiting for in-flight to complete
}

// Release the in-flight read lock (simulating WorkingRootUpdated returning)
bp.gcMu.RUnlock()

// Now the Stop() channel should close
select {
case <-doneCh:
// Good — Stop() completed after in-flight drained
case <-time.After(time.Second):
t.Fatal("Stop() channel should close after in-flight operation releases read lock")
}

bp.Resume()
}

// TestBinlogProducer_MultipleInflightDrain tests that Stop() waits for multiple
// concurrent in-flight operations to drain before completing.
func TestBinlogProducer_MultipleInflightDrain(t *testing.T) {
bp := &binlogProducer{mu: &sync.Mutex{}}

// Simulate 3 concurrent in-flight WorkingRootUpdated calls
bp.gcMu.RLock()
bp.gcMu.RLock()
bp.gcMu.RLock()

doneCh := bp.Stop()

// Should not close yet
select {
case <-doneCh:
t.Fatal("Stop() should not complete with 3 in-flight operations")
case <-time.After(50 * time.Millisecond):
}

// Release two — still one holding
bp.gcMu.RUnlock()
bp.gcMu.RUnlock()

select {
case <-doneCh:
t.Fatal("Stop() should not complete with 1 remaining in-flight operation")
case <-time.After(50 * time.Millisecond):
}

// Release the last one
bp.gcMu.RUnlock()

select {
case <-doneCh:
// Good
case <-time.After(time.Second):
t.Fatal("Stop() should complete after all in-flight operations drain")
}

bp.Resume()
}

// TestBinlogProducer_StopResumeIdempotent tests that Stop/Resume can be called
// multiple times in sequence (as would happen with multiple GC cycles).
func TestBinlogProducer_StopResumeIdempotent(t *testing.T) {
bp := &binlogProducer{mu: &sync.Mutex{}}

for i := 0; i < 3; i++ {
doneCh := bp.Stop()
select {
case <-doneCh:
case <-time.After(time.Second):
t.Fatalf("Stop() cycle %d did not complete", i)
}
bp.Resume()
}

// After multiple cycles, read lock should still work normally
bp.gcMu.RLock()
bp.gcMu.RUnlock()
}

// TestBinlogProducer_ConcurrentStopAndWorkingRootUpdated tests the race between
// Stop() being called and a new WorkingRootUpdated attempting to enter.
func TestBinlogProducer_ConcurrentStopAndWorkingRootUpdated(t *testing.T) {
bp := &binlogProducer{mu: &sync.Mutex{}}

// Run multiple goroutines trying to acquire read locks while stop/resume cycles
var wg sync.WaitGroup
done := make(chan struct{})

// Readers simulating WorkingRootUpdated calls
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-done:
return
default:
bp.gcMu.RLock()
// Simulate brief work
time.Sleep(time.Microsecond)
bp.gcMu.RUnlock()
}
}
}()
}

// Run a few GC stop/resume cycles concurrently with readers
for i := 0; i < 5; i++ {
doneCh := bp.Stop()
<-doneCh
// GC is running — all readers blocked
time.Sleep(time.Millisecond)
bp.Resume()
time.Sleep(time.Millisecond)
}

close(done)
wg.Wait()

// No panics or deadlocks — test passes by completing
assert.True(t, true)
}

// TestDatabaseUpdateListeners_GCPausable verifies that the GCPausableListener
// type assertion works correctly against DatabaseUpdateListeners.
func TestDatabaseUpdateListeners_GCPausable(t *testing.T) {
bp := &binlogProducer{mu: &sync.Mutex{}}

// Register and check type assertion
var listener doltdb.DatabaseUpdateListener = bp
pausable, ok := listener.(doltdb.GCPausableListener)
require.True(t, ok, "binlogProducer should implement GCPausableListener")

doneCh := pausable.Stop()
select {
case <-doneCh:
case <-time.After(time.Second):
t.Fatal("Stop() via interface did not complete")
}
pausable.Resume()
}
Original file line number Diff line number Diff line change
Expand Up @@ -869,9 +869,15 @@ func encodeBytesFromAddress(ctx *sql.Context, addr hash.Hash, ns tree.NodeStore,
if ns == nil {
return nil, fmt.Errorf("nil NodeStore used to encode bytes from address")
}
bytes, err := ns.ReadBytes(ctx, addr)
if err != nil {
return nil, err

// A zero hash address means empty content (e.g. empty string stored without
// an out-of-band chunk). Skip the ChunkStore lookup and encode zero-length data.
var bytes []byte
if !addr.IsEmpty() {
bytes, err = ns.ReadBytes(ctx, addr)
if err != nil {
return nil, err
}
}

blobType := typ.(sql.StringType)
Expand Down
18 changes: 18 additions & 0 deletions go/libraries/doltcore/sqle/dprocedures/dolt_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,5 +325,23 @@ func RunDoltGC(ctx *sql.Context, ddb *doltdb.DoltDB, mode types.GCMode, cmp chun
statsDoneCh: statsDoneCh,
}
}
// Stop any GC-pausable database update listeners (e.g. binlog producer) to prevent
// chunk deletion during in-flight Prolly tree traversals (prolly.DiffMaps).
var pausedListeners []doltdb.GCPausableListener
for _, listener := range doltdb.DatabaseUpdateListeners {
if pausable, ok := listener.(doltdb.GCPausableListener); ok {
doneCh := pausable.Stop()
<-doneCh
pausedListeners = append(pausedListeners, pausable)
}
}
if len(pausedListeners) > 0 {
defer func() {
for _, p := range pausedListeners {
p.Resume()
}
}()
}

return ddb.GC(ctx, mode, cmp, sc)
}
Loading