diff --git a/go/vt/vtgate/tabletgateway.go b/go/vt/vtgate/tabletgateway.go index 5499716c4da..d50e8936ae9 100644 --- a/go/vt/vtgate/tabletgateway.go +++ b/go/vt/vtgate/tabletgateway.go @@ -309,6 +309,7 @@ func (gw *TabletGateway) withRetry(ctx context.Context, target *querypb.Target, // if primary is serving, but we initially found no tablet, we're in an inconsistent state // we then retry the entire loop if primary != nil { + err = vterrors.Errorf(vtrpcpb.Code_UNAVAILABLE, "inconsistent state detected, primary is serving but initially found no available tablet") continue } } diff --git a/go/vt/vtgate/tabletgateway_flaky_test.go b/go/vt/vtgate/tabletgateway_flaky_test.go index 67140e47910..b9f4acae64b 100644 --- a/go/vt/vtgate/tabletgateway_flaky_test.go +++ b/go/vt/vtgate/tabletgateway_flaky_test.go @@ -233,3 +233,83 @@ outer: t.Fatalf("timed out waiting for query to execute") } } + +// TestInconsistentStateDetectedBuffering simulates the case where we have used up all our buffering retries and in the +// last attempt we are in an inconsistent state. Meaning that we initially thought that there are no available tablets +// but after a moment the primary is found to be serving. +// This is inconsistent and we want to fail properly. This scenario used to panic since no error and no results were +// returned. +func TestInconsistentStateDetectedBuffering(t *testing.T) { + bufferImplementation = "keyspace_events" + buffer.SetBufferingModeInTestingEnv(true) + defer func() { + buffer.SetBufferingModeInTestingEnv(false) + bufferImplementation = "healthcheck" + }() + + keyspace := "ks1" + shard := "-80" + tabletType := topodatapb.TabletType_PRIMARY + host := "1.1.1.1" + port := int32(1001) + target := &querypb.Target{ + Keyspace: keyspace, + Shard: shard, + TabletType: tabletType, + } + + ts := &fakeTopoServer{} + // create a new fake health check. We want to check the buffering code which uses Subscribe, so we must also pass a channel + hc := discovery.NewFakeHealthCheck(make(chan *discovery.TabletHealth)) + // create a new tablet gateway + tg := NewTabletGateway(context.Background(), hc, ts, "cell") + + tg.retryCount = 0 + + // add a primary tabelt which is serving + sbc := hc.AddTestTablet("cell", host, port, keyspace, shard, tabletType, true, 10, nil) + + // add a result to the sandbox connection + sqlResult1 := &sqltypes.Result{ + Fields: []*querypb.Field{{ + Name: "col1", + Type: sqltypes.VarChar, + }}, + RowsAffected: 1, + Rows: [][]sqltypes.Value{{ + sqltypes.MakeTrusted(sqltypes.VarChar, []byte("bb")), + }}, + } + sbc.SetResults([]*sqltypes.Result{sqlResult1}) + + // get the primary and replica tablet from the fake health check + tablets := hc.GetAllTablets() + var primaryTablet *topodatapb.Tablet + + for _, tablet := range tablets { + if tablet.Type == topodatapb.TabletType_PRIMARY { + primaryTablet = tablet + } + } + require.NotNil(t, primaryTablet) + hc.SetServing(primaryTablet, true) + hc.Broadcast(primaryTablet) + hc.SetServing(primaryTablet, false) + + var res *sqltypes.Result + var err error + queryChan := make(chan struct{}) + go func() { + res, err = tg.Execute(context.Background(), target, "query", nil, 0, 0, nil) + queryChan <- struct{}{} + }() + + select { + case <-queryChan: + require.Nil(t, res) + require.Error(t, err) + require.Equal(t, "target: ks1.-80.primary: inconsistent state detected, primary is serving but initially found no available tablet", err.Error()) + case <-time.After(15 * time.Second): + t.Fatalf("timed out waiting for query to execute") + } +}