Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go/vt/vtgate/tabletgateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target,
// if primary is serving, but we initially found no tablet, we're in an inconsistent state
// we then retry the entire loop
if primary != nil {
err = vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "inconsistent state detected, primary is serving but initially found no available tablet")
continue
}
}
Expand Down
80 changes: 80 additions & 0 deletions go/vt/vtgate/tabletgateway_flaky_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,3 +233,83 @@ outer:
t.Fatalf("timed out waiting for query to execute")
}
}

// TestInconsistentStateDetectedBuffering simulates the case where we have used up all our buffering retries and in the
// last attempt we are in an inconsistent state. Meaning that we initially thought that there are no available tablets
// but after a moment the primary is found to be serving.
// This is inconsistent and we want to fail properly. This scenario used to panic since no error and no results were
// returned.
func TestInconsistentStateDetectedBuffering(t *testing.T) {
bufferImplementation = "keyspace_events"
buffer.SetBufferingModeInTestingEnv(true)
defer func() {
buffer.SetBufferingModeInTestingEnv(false)
bufferImplementation = "healthcheck"
}()

keyspace := "ks1"
shard := "-80"
tabletType := topodatapb.TabletType_PRIMARY
host := "1.1.1.1"
port := int32(1001)
target := &querypb.Target{
Keyspace: keyspace,
Shard: shard,
TabletType: tabletType,
}

ts := &fakeTopoServer{}
// create a new fake health check. We want to check the buffering code which uses Subscribe, so we must also pass a channel
hc := discovery.NewFakeHealthCheck(make(chan *discovery.TabletHealth))
// create a new tablet gateway
tg := NewTabletGateway(context.Background(), hc, ts, "cell")

tg.retryCount = 0

// add a primary tabelt which is serving
sbc := hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil)

// add a result to the sandbox connection
sqlResult1 := &sqltypes.Result{
Fields: []*querypb.Field{{
Name: "col1",
Type: sqltypes.VarChar,
}},
RowsAffected: 1,
Rows: [][]sqltypes.Value{{
sqltypes.MakeTrusted(sqltypes.VarChar, []byte("bb")),
}},
}
sbc.SetResults([]*sqltypes.Result{sqlResult1})

// get the primary and replica tablet from the fake health check
tablets := hc.GetAllTablets()
var primaryTablet *topodatapb.Tablet

for _, tablet := range tablets {
if tablet.Type == topodatapb.TabletType_PRIMARY {
primaryTablet = tablet
}
}
require.NotNil(t, primaryTablet)
hc.SetServing(primaryTablet, true)
hc.Broadcast(primaryTablet)
hc.SetServing(primaryTablet, false)

var res *sqltypes.Result
var err error
queryChan := make(chan struct{})
go func() {
res, err = tg.Execute(context.Background(), target, "query", nil, 0, 0, nil)
queryChan <- struct{}{}
}()

select {
case <-queryChan:
require.Nil(t, res)
require.Error(t, err)
require.Equal(t, "target: ks1.-80.primary: inconsistent state detected, primary is serving but initially found no available tablet", err.Error())
case <-time.After(15 * time.Second):
t.Fatalf("timed out waiting for query to execute")
}
}