Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions go/sync2/fake_consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ type FakePendingResult struct {
BroadcastCalls int
// WaitCalls can be used to inspect Wait calls.
WaitCalls int
err error
result *sqltypes.Result
// AddWaiterCounterCalls can be used to inspect AddWaiterCounter calls.
AddWaiterCounterCalls []int64
// WaiterCount simulates the current waiter count
WaiterCount int64
err error
result *sqltypes.Result
}

var (
Expand Down Expand Up @@ -113,7 +117,9 @@ func (fr *FakePendingResult) Wait() {
fr.WaitCalls++
}

// AddWaiterCounter is currently a no-op.
func (fr *FakePendingResult) AddWaiterCounter(int64) *int64 {
return new(int64)
// AddWaiterCounter records the call and simulates waiter count changes.
func (fr *FakePendingResult) AddWaiterCounter(delta int64) *int64 {
fr.AddWaiterCounterCalls = append(fr.AddWaiterCounterCalls, delta)
fr.WaiterCount += delta
return &fr.WaiterCount
}
15 changes: 12 additions & 3 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
// Check tablet type.
if qre.shouldConsolidate() {
q, original := qre.tsv.qe.consolidator.Create(sqlWithoutComments)
waiterCapExceeded := false
if original {
defer q.Broadcast()
conn, err := qre.getConn()
Expand All @@ -729,13 +730,21 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
startTime := time.Now()
q.Wait()
qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
} else {
// Waiter cap exceeded, fall back to independent query execution
waiterCapExceeded = true
}
q.AddWaiterCounter(-1)
}
if q.Err() != nil {
return nil, q.Err()

// Return consolidation results unless waiter cap was exceeded
if !waiterCapExceeded {
if q.Err() != nil {
return nil, q.Err()
}
return q.Result(), nil
}
return q.Result(), nil
// If waiter cap exceeded, fall through to independent execution
}
conn, err := qre.getConn()
if err != nil {
Expand Down
73 changes: 73 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1466,6 +1466,79 @@ func TestQueryExecutorShouldConsolidate(t *testing.T) {
}
}

func TestQueryExecutorConsolidatorWaiterCapFallback(t *testing.T) {
// Test that when the consolidator waiter cap is reached, queries fall back
// to independent execution instead of returning empty results.

db := setUpQueryExecutorTest(t)
defer db.Close()

ctx := context.Background()
tsv := newTestTabletServer(ctx, enableConsolidator, db)
defer tsv.StopService()

// Set a waiter cap of 1
tsv.config.ConsolidatorQueryWaiterCap = 1

fakeConsolidator := sync2.NewFakeConsolidator()
tsv.qe.consolidator = fakeConsolidator

input := "select * from t limit 10001"
result := &sqltypes.Result{
Fields: getTestTableFields(),
Rows: [][]sqltypes.Value{{
sqltypes.NewInt32(1), // pk
sqltypes.NewInt32(100), // name
sqltypes.NewInt32(200), // addr
}},
}

// Set up consolidator to simulate an identical query already running (Created=false)
fakePendingResult := &sync2.FakePendingResult{}
fakePendingResult.SetResult(result)
// Start with waiter count above the cap (2 > 1), so the condition fails
fakePendingResult.WaiterCount = 2

fakeConsolidator.CreateReturn = &sync2.FakeConsolidatorCreateReturn{
Created: false, // Simulate identical query already running
PendingResult: fakePendingResult,
}

// Set up database query/response for fallback execution
db.AddQuery(input, result)

qre := newTestQueryExecutor(context.Background(), tsv, input, 0)
qre.options = &querypb.ExecuteOptions{Consolidator: querypb.ExecuteOptions_CONSOLIDATOR_ENABLED}

// Execute query
actualResult, err := qre.Execute()
require.NoError(t, err)
require.NotNil(t, actualResult)

// Verify we got the correct result (not empty)
require.Equal(t, result.Fields, actualResult.Fields)
require.Equal(t, result.Rows, actualResult.Rows)

// Verify consolidator was attempted
require.Len(t, fakeConsolidator.CreateCalls, 1)

// Verify we did NOT wait (because waiter cap was exceeded)
require.Equal(t, 0, fakePendingResult.WaitCalls)

// Verify we did NOT broadcast (because we're not the original)
require.Equal(t, 0, fakePendingResult.BroadcastCalls)

// Verify AddWaiterCounter was called: once with 0 (to check count), once with -1 (cleanup)
require.Len(t, fakePendingResult.AddWaiterCounterCalls, 2)
require.Equal(t, int64(0), fakePendingResult.AddWaiterCounterCalls[0]) // Check current count
require.Equal(t, int64(-1), fakePendingResult.AddWaiterCounterCalls[1]) // Decrement

// Verify fallback executed the query independently
require.Equal(t, 1, db.GetQueryCalledNum(input))

db.VerifyAllExecutedOrFail()
}

func TestGetConnectionLogStats(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
Expand Down
Loading