Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Flags:
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
--config-type string Config file type (omit to infer config type from file extension).
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
--consolidator-query-waiter-cap-method string Configure the method when consolidator waiter cap is exceeded. Options: fallthrough, reject. (default "fallthrough")
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
--consul_auth_static_file string JSON File to read the topos/tokens from.
Expand Down
1 change: 1 addition & 0 deletions go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ Flags:
--config-persistence-min-interval duration minimum interval between persisting dynamic config changes back to disk (if no change has occurred, nothing is done). (default 1s)
--config-type string Config file type (omit to infer config type from file extension).
--consolidator-query-waiter-cap int Configure the maximum number of clients allowed to wait on the consolidator.
--consolidator-query-waiter-cap-method string Configure the method when consolidator waiter cap is exceeded. Options: fallthrough, reject. (default "fallthrough")
--consolidator-stream-query-size int Configure the stream consolidator query size in bytes. Setting to 0 disables the stream consolidator. (default 2097152)
--consolidator-stream-total-size int Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator. (default 134217728)
--consul_auth_static_file string JSON File to read the topos/tokens from.
Expand Down
16 changes: 11 additions & 5 deletions go/sync2/fake_consolidator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,12 @@ type FakePendingResult struct {
BroadcastCalls int
// WaitCalls can be used to inspect Wait calls.
WaitCalls int
err error
result *sqltypes.Result
// AddWaiterCounterCalls can be used to inspect AddWaiterCounter calls.
AddWaiterCounterCalls []int64
// WaiterCount simulates the current waiter count
WaiterCount int64
err error
result *sqltypes.Result
}

var (
Expand Down Expand Up @@ -113,7 +117,9 @@ func (fr *FakePendingResult) Wait() {
fr.WaitCalls++
}

// AddWaiterCounter is currently a no-op.
func (fr *FakePendingResult) AddWaiterCounter(int64) *int64 {
return new(int64)
// AddWaiterCounter records the call and simulates waiter count changes.
func (fr *FakePendingResult) AddWaiterCounter(delta int64) *int64 {
fr.AddWaiterCounterCalls = append(fr.AddWaiterCounterCalls, delta)
fr.WaiterCount += delta
return &fr.WaiterCount
}
23 changes: 19 additions & 4 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,8 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
// Check tablet type.
if qre.shouldConsolidate() {
q, original := qre.tsv.qe.consolidator.Create(sqlWithoutComments)
waiterCapExceeded := false

if original {
defer q.Broadcast()
conn, err := qre.getConn()
Expand All @@ -717,13 +719,26 @@ func (qre *QueryExecutor) execSelect() (*sqltypes.Result, error) {
startTime := time.Now()
q.Wait()
qre.tsv.stats.WaitTimings.Record("Consolidations", startTime)
q.AddWaiterCounter(-1)
} else {
// Waiter cap exceeded, handle based on configured method
q.AddWaiterCounter(-1)
if qre.tsv.config.ConsolidatorQueryWaiterCapMethod == "reject" {
return nil, vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "consolidator waiter cap (%d) exceeded", waiterCap)
}
// Default to fallback to independent query execution
waiterCapExceeded = true
}
q.AddWaiterCounter(-1)
}
if q.Err() != nil {
return nil, q.Err()

// Return consolidation results unless waiter cap was exceeded
if !waiterCapExceeded {
if q.Err() != nil {
return nil, q.Err()
}
return q.Result(), nil
}
return q.Result(), nil
// If waiter cap exceeded, fall through to independent execution
}
conn, err := qre.getConn()
if err != nil {
Expand Down
144 changes: 144 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1437,6 +1437,150 @@ func TestQueryExecutorShouldConsolidate(t *testing.T) {
}
}

func TestQueryExecutorConsolidatorWaiterCapFallback(t *testing.T) {
// Test that when the consolidator waiter cap is reached, queries fall back
// to independent execution instead of returning empty results.

db := setUpQueryExecutorTest(t)
defer db.Close()

ctx := context.Background()
tsv := newTestTabletServer(ctx, enableConsolidator, db)
defer tsv.StopService()

// Set a waiter cap of 1
tsv.config.ConsolidatorQueryWaiterCap = 1

fakeConsolidator := sync2.NewFakeConsolidator()
tsv.qe.consolidator = fakeConsolidator

input := "select * from t limit 10001"
result := &sqltypes.Result{
Fields: getTestTableFields(),
Rows: [][]sqltypes.Value{{
sqltypes.NewInt32(1), // pk
sqltypes.NewInt32(100), // name
sqltypes.NewInt32(200), // addr
}},
}

// Set up consolidator to simulate an identical query already running (Created=false)
fakePendingResult := &sync2.FakePendingResult{}
fakePendingResult.SetResult(result)
// Start with waiter count above the cap (2 > 1), so the condition fails
fakePendingResult.WaiterCount = 2

fakeConsolidator.CreateReturn = &sync2.FakeConsolidatorCreateReturn{
Created: false, // Simulate identical query already running
PendingResult: fakePendingResult,
}

// Set up database query/response for fallback execution
db.AddQuery(input, result)

qre := newTestQueryExecutor(context.Background(), tsv, input, 0)
qre.options = &querypb.ExecuteOptions{Consolidator: querypb.ExecuteOptions_CONSOLIDATOR_ENABLED}

// Execute query
actualResult, err := qre.Execute()
require.NoError(t, err)
require.NotNil(t, actualResult)

// Verify we got the correct result (not empty)
require.Equal(t, result.Fields, actualResult.Fields)
require.Equal(t, result.Rows, actualResult.Rows)

// Verify consolidator was attempted
require.Len(t, fakeConsolidator.CreateCalls, 1)

// Verify we did NOT wait (because waiter cap was exceeded)
require.Equal(t, 0, fakePendingResult.WaitCalls)

// Verify we did NOT broadcast (because we're not the original)
require.Equal(t, 0, fakePendingResult.BroadcastCalls)

// Verify AddWaiterCounter was called: once with 0 (to check count), once with -1 (cleanup)
require.Len(t, fakePendingResult.AddWaiterCounterCalls, 2)
require.Equal(t, int64(0), fakePendingResult.AddWaiterCounterCalls[0]) // Check current count
require.Equal(t, int64(-1), fakePendingResult.AddWaiterCounterCalls[1]) // Decrement

// Verify fallback executed the query independently
require.Equal(t, 1, db.GetQueryCalledNum(input))

db.VerifyAllExecutedOrFail()
}

func TestQueryExecutorConsolidatorWaiterCapReject(t *testing.T) {
// Test that when the consolidator waiter cap is reached and method is "reject",
// queries are rejected with RESOURCE_EXHAUSTED error.

db := setUpQueryExecutorTest(t)
defer db.Close()

ctx := context.Background()
tsv := newTestTabletServer(ctx, enableConsolidator, db)
defer tsv.StopService()

// Set waiter cap of 1 and method to "reject"
tsv.config.ConsolidatorQueryWaiterCap = 1
tsv.config.ConsolidatorQueryWaiterCapMethod = "reject"

fakeConsolidator := sync2.NewFakeConsolidator()
tsv.qe.consolidator = fakeConsolidator

input := "select * from t limit 10001"
result := &sqltypes.Result{
Fields: getTestTableFields(),
Rows: [][]sqltypes.Value{{
sqltypes.NewInt32(1), // pk
sqltypes.NewInt32(100), // name
sqltypes.NewInt32(200), // addr
}},
}

// Set up consolidator to simulate an identical query already running (Created=false)
fakePendingResult := &sync2.FakePendingResult{}
fakePendingResult.SetResult(result)

// Start with waiter count above the cap (2 > 1), so the condition fails
fakePendingResult.WaiterCount = 2

fakeConsolidator.CreateReturn = &sync2.FakeConsolidatorCreateReturn{
Created: false, // Simulate identical query already running
PendingResult: fakePendingResult,
}

qre := newTestQueryExecutor(context.Background(), tsv, input, 0)
qre.options = &querypb.ExecuteOptions{Consolidator: querypb.ExecuteOptions_CONSOLIDATOR_ENABLED}

// Execute query - should get RESOURCE_EXHAUSTED error
actualResult, err := qre.Execute()
require.Error(t, err)
require.Nil(t, actualResult)

// Verify error is RESOURCE_EXHAUSTED
require.Contains(t, err.Error(), "consolidator waiter cap")
require.Contains(t, err.Error(), "exceeded")
require.Equal(t, vtrpcpb.Code_RESOURCE_EXHAUSTED, vterrors.Code(err))

// Verify consolidator was attempted
require.Len(t, fakeConsolidator.CreateCalls, 1)

// Verify we did NOT wait (because waiter cap was exceeded and method is reject)
require.Equal(t, 0, fakePendingResult.WaitCalls)

// Verify we did NOT broadcast (because we're not the original)
require.Equal(t, 0, fakePendingResult.BroadcastCalls)

// Verify AddWaiterCounter was called: once with 0 (to check count), once with -1 (cleanup)
require.Len(t, fakePendingResult.AddWaiterCounterCalls, 2)
require.Equal(t, int64(0), fakePendingResult.AddWaiterCounterCalls[0]) // Check current count
require.Equal(t, int64(-1), fakePendingResult.AddWaiterCounterCalls[1]) // Decrement

// Verify no database query was executed (rejected before fallback)
require.Equal(t, 0, db.GetQueryCalledNum(input))
}

func TestGetConnectionLogStats(t *testing.T) {
db := setUpQueryExecutorTest(t)
defer db.Close()
Expand Down
19 changes: 16 additions & 3 deletions go/vt/vttablet/tabletserver/tabletenv/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ func registerTabletEnvFlags(fs *pflag.FlagSet) {
fs.Int64Var(&currentConfig.ConsolidatorStreamTotalSize, "consolidator-stream-total-size", defaultConfig.ConsolidatorStreamTotalSize, "Configure the stream consolidator total size in bytes. Setting to 0 disables the stream consolidator.")

fs.Int64Var(&currentConfig.ConsolidatorQueryWaiterCap, "consolidator-query-waiter-cap", 0, "Configure the maximum number of clients allowed to wait on the consolidator.")
fs.StringVar(&currentConfig.ConsolidatorQueryWaiterCapMethod, "consolidator-query-waiter-cap-method", "fallthrough", "Configure the method when consolidator waiter cap is exceeded. Options: fallthrough, reject.")
fs.DurationVar(&healthCheckInterval, "health_check_interval", defaultConfig.Healthcheck.Interval, "Interval between health checks")
fs.DurationVar(&degradedThreshold, "degraded_threshold", defaultConfig.Healthcheck.DegradedThreshold, "replication lag after which a replica is considered degraded")
fs.DurationVar(&unhealthyThreshold, "unhealthy_threshold", defaultConfig.Healthcheck.UnhealthyThreshold, "replication lag after which a replica is considered unhealthy")
Expand Down Expand Up @@ -249,6 +250,16 @@ func Init() {
currentConfig.Consolidator = Disable
}

switch currentConfig.ConsolidatorQueryWaiterCapMethod {
case "fallthrough", "reject":
// Valid options
case "":
// Empty string defaults to fallthrough
currentConfig.ConsolidatorQueryWaiterCapMethod = "fallthrough"
default:
log.Exitf("Invalid consolidator-query-waiter-cap-method value %v: must be either 'fallthrough' or 'reject'", currentConfig.ConsolidatorQueryWaiterCapMethod)
}

if heartbeatInterval == 0 {
heartbeatInterval = defaultConfig.ReplicationTracker.HeartbeatInterval
}
Expand Down Expand Up @@ -319,6 +330,7 @@ type TabletConfig struct {
ConsolidatorStreamTotalSize int64 `json:"consolidatorStreamTotalSize,omitempty"`
ConsolidatorStreamQuerySize int64 `json:"consolidatorStreamQuerySize,omitempty"`
ConsolidatorQueryWaiterCap int64 `json:"consolidatorMaxQueryWait,omitempty"`
ConsolidatorQueryWaiterCapMethod string `json:"consolidatorQueryWaiterCapMethod,omitempty"`
QueryCacheMemory int64 `json:"queryCacheMemory,omitempty"`
QueryCacheDoorkeeper bool `json:"queryCacheDoorkeeper,omitempty"`
SchemaReloadInterval time.Duration `json:"schemaReloadIntervalSeconds,omitempty"`
Expand Down Expand Up @@ -982,9 +994,10 @@ var defaultConfig = TabletConfig{
// of them ready in MySQL and profit from a pipelining effect.
MaxConcurrency: 5,
},
Consolidator: Enable,
ConsolidatorStreamTotalSize: 128 * 1024 * 1024,
ConsolidatorStreamQuerySize: 2 * 1024 * 1024,
Consolidator: Enable,
ConsolidatorStreamTotalSize: 128 * 1024 * 1024,
ConsolidatorStreamQuerySize: 2 * 1024 * 1024,
ConsolidatorQueryWaiterCapMethod: "fallthrough",
// The value for StreamBufferSize was chosen after trying out a few of
// them. Too small buffers force too many packets to be sent. Too big
// buffers force the clients to read them in multiple chunks and make
Expand Down
1 change: 1 addition & 0 deletions go/vt/vttablet/tabletserver/tabletenv/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func TestDefaultConfig(t *testing.T) {
gotBytes, err := yaml2.Marshal(NewDefaultConfig())
require.NoError(t, err)
want := `consolidator: enable
consolidatorQueryWaiterCapMethod: fallthrough
consolidatorStreamQuerySize: 2097152
consolidatorStreamTotalSize: 134217728
gracePeriods: {}
Expand Down
Loading