Skip to content

Commit

Permalink
rls: update picker synchronously upon receipt of configuration update
Browse files Browse the repository at this point in the history
  • Loading branch information
aranjans committed Jul 15, 2024
1 parent 59954c8 commit b500c57
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 5 deletions.
17 changes: 12 additions & 5 deletions balancer/rls/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ var (

// Following functions are no-ops in actual code, but can be overridden in
// tests to give tests visibility into exactly when certain events happen.
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}
clientConnUpdateHook = func() {}
dataCachePurgeHook = func() {}
resetBackoffHook = func() {}
entryWithValidBackoffEvicted = func() {}

Check warning on line 80 in balancer/rls/balancer.go

View check run for this annotation

Codecov / codecov/patch

balancer/rls/balancer.go#L78-L80

Added lines #L78 - L80 were not covered by tests
)

func init() {
Expand Down Expand Up @@ -297,13 +298,19 @@ func (b *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error
if resizeCache {
// If the new config changes reduces the size of the data cache, we
// might have to evict entries to get the cache size down to the newly
// specified size.
// specified size. If we do evict an entry withvalid backoff timer,
// the new picker needs to be sent to the channel to re-process any
// RPCs queued as a result of this backoff timer.
//
// And we cannot do this operation above (where we compute the
// `resizeCache` boolean) because `cacheMu` needs to be grabbed before
// `stateMu` if we are to hold both locks at the same time.
b.cacheMu.Lock()
b.dataCache.resize(newCfg.cacheSizeBytes)
evicted := b.dataCache.resize(newCfg.cacheSizeBytes)
if evicted {
b.sendNewPickerLocked()
entryWithValidBackoffEvicted()
}
b.cacheMu.Unlock()
}
return nil
Expand Down
195 changes: 195 additions & 0 deletions balancer/rls/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,201 @@ func (s) TestConfigUpdate_DataCacheSizeDecrease(t *testing.T) {
verifyRLSRequest(t, rlsReqCh, true)
}

// Test that when a data cache entry is evicted due to config change
// in cache size, the picker is updated accordingly.
func (s) TestPickerUpdateOnDataCacheSizeDecrease(t *testing.T) {
// Create a restartable listener which can close existing connections.
l, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)

// Override the clientConn update hook to get notified.
clientConnUpdateDone := make(chan struct{}, 1)
origClientConnUpdateHook := clientConnUpdateHook
clientConnUpdateHook = func() { clientConnUpdateDone <- struct{}{} }
defer func() { clientConnUpdateHook = origClientConnUpdateHook }()

// Override the cache entry size func, and always return 1.
origEntrySizeFunc := computeDataCacheEntrySize
computeDataCacheEntrySize = func(cacheKey, *cacheEntry) int64 { return 1 }
defer func() { computeDataCacheEntrySize = origEntrySizeFunc }()

// Override the backoff strategy to return a large backoff which
// will make sure the date cache entry remains in backoff for the
// duration of the test.
origBackoffStrategy := defaultBackoffStrategy
defaultBackoffStrategy = &fakeBackoffStrategy{backoff: defaultTestTimeout}
defer func() { defaultBackoffStrategy = origBackoffStrategy }()

// Override the minEvictionDuration to ensure that when the config update
// reduces the cache size, the resize operation is not stopped because
// we find an entry whose minExpiryDuration has not elapsed.
origMinEvictDuration := minEvictDuration
minEvictDuration = time.Duration(0)
defer func() { minEvictDuration = origMinEvictDuration }()

// Override the entryWithValidBackoffEvicted to ensure we update
// picker when an entry with valid backoff time was evicted.
backOffItemEvicted := make(chan struct{}, 1)
origBackoffItemEvicted := entryWithValidBackoffEvicted
entryWithValidBackoffEvicted = func() { backOffItemEvicted <- struct{}{} }
defer func() { entryWithValidBackoffEvicted = origBackoffItemEvicted }()

// Register the top-level wrapping balancer which forwards calls to RLS.
topLevelBalancerName := t.Name() + "top-level"
var ccWrapper *testCCWrapper
stub.Register(topLevelBalancerName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
ccWrapper = &testCCWrapper{ClientConn: bd.ClientConn}
bd.Data = balancer.Get(Name).Build(ccWrapper, bd.BuildOptions)
},
ParseConfig: func(sc json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
parser := balancer.Get(Name).(balancer.ConfigParser)
return parser.ParseConfig(sc)
},
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
bal := bd.Data.(balancer.Balancer)
return bal.UpdateClientConnState(ccs)
},
Close: func(bd *stub.BalancerData) {
bal := bd.Data.(balancer.Balancer)
bal.Close()
},
})

// Start an RLS server and set the throttler to never throttle requests.
rlsServer, rlsReqCh := rlstest.SetupFakeRLSServer(t, lis)
overrideAdaptiveThrottler(t, neverThrottlingThrottler())

// Register an LB policy to act as the child policy for RLS LB policy.
childPolicyName := "test-child-policy" + t.Name()
e2e.RegisterRLSChildPolicy(childPolicyName, nil)
t.Logf("Registered child policy with name %q", childPolicyName)

// Start a couple of test backends, and set up the fake RLS server to return
// these as targets in the RLS response, based on request keys.
_, backendAddress1 := startBackend(t)
backendCh2, backendAddress2 := startBackend(t)
backendCh3, backendAddress3 := startBackend(t)
rlsServer.SetResponseCallback(func(ctx context.Context, req *rlspb.RouteLookupRequest) *rlstest.RouteLookupResponse {
if req.KeyMap["k1"] == "v1" {
return &rlstest.RouteLookupResponse{Err: errors.New("dummy error to set this entry in backoff"), Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress1}}}
}
if req.KeyMap["k2"] == "v2" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress2}}}
}
if req.KeyMap["k3"] == "v3" {
return &rlstest.RouteLookupResponse{Resp: &rlspb.RouteLookupResponse{Targets: []string{backendAddress3}}}
}
return &rlstest.RouteLookupResponse{Err: errors.New("no keys in request metadata")}
})

// Register a manual resolver and push the RLS service config through it.
r := manual.NewBuilderWithScheme("rls-e2e")
headers := `
[
{
"key": "k1",
"names": [
"n1"
]
},
{
"key": "k2",
"names": [
"n2"
]
},
{
"key": "k3",
"names": [
"n3"
]
}
]
`
scJSON := fmt.Sprintf(`
{
"loadBalancingConfig": [
{
"%s": {
"routeLookupConfig": {
"grpcKeybuilders": [{
"names": [{"service": "grpc.testing.TestService"}],
"headers": %s
}],
"lookupService": "%s",
"cacheSizeBytes": 1000
},
"childPolicy": [{"%s": {}}],
"childPolicyConfigTargetFieldName": "Backend"
}
}
]
}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName)
sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON)
r.InitialState(resolver.State{ServiceConfig: sc})

cc, err := grpc.Dial(r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("create grpc.Dial() failed: %v", err)
}
defer cc.Close()

<-clientConnUpdateDone

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
makeTestRPCAndVerifyError(ctx, t, cc, codes.Unavailable, nil)
t.Logf("Verifying if RPC failed when listener is stopped.")

ctxOutgoing := metadata.AppendToOutgoingContext(ctx, "n2", "v2")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh2)
verifyRLSRequest(t, rlsReqCh, true)

ctxOutgoing = metadata.AppendToOutgoingContext(ctx, "n3", "v3")
makeTestRPCAndExpectItToReachBackend(ctxOutgoing, t, cc, backendCh3)
verifyRLSRequest(t, rlsReqCh, true)

// Setting the size to 1 will cause the entries to be
// evicted.
scJSON1 := fmt.Sprintf(`
{
"loadBalancingConfig": [
{
"%s": {
"routeLookupConfig": {
"grpcKeybuilders": [{
"names": [{"service": "grpc.testing.TestService"}],
"headers": %s
}],
"lookupService": "%s",
"cacheSizeBytes": 1
},
"childPolicy": [{"%s": {}}],
"childPolicyConfigTargetFieldName": "Backend"
}
}
]
}`, topLevelBalancerName, headers, rlsServer.Address, childPolicyName)
sc1 := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(scJSON1)
r.UpdateState(resolver.State{ServiceConfig: sc1})
<-clientConnUpdateDone

// Stop the listener
lis.Stop()

select {
// Wait for backOffItemEvicted to ensure picker was updated
// synchronously when there was cache resize on config update.
case <-backOffItemEvicted:
case <-ctx.Done():
t.Error("Error sending picker update on eviction of cache entry with valid backoff: context timed out.")
}
}

// TestDataCachePurging verifies that the LB policy periodically evicts expired
// entries from the data cache.
func (s) TestDataCachePurging(t *testing.T) {
Expand Down

0 comments on commit b500c57

Please sign in to comment.