Skip to content

Commit e1f69d8

Browse files
authored
xdsclient: delay resource cache deletion to handle immediate re-subscription of same resource (#8369)
1 parent 8adcc94 commit e1f69d8

File tree

6 files changed

+268
-21
lines changed

6 files changed

+268
-21
lines changed

xds/internal/clients/xdsclient/ads_stream.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ type adsStreamEventHandler interface {
7171
onStreamError(error) // Called when the ADS stream breaks.
7272
onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource.
7373
onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream.
74+
onRequest(typeURL string) // Called when a request is about to be sent on the ADS stream.
7475
}
7576

7677
// state corresponding to a resource type.
@@ -444,6 +445,11 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
444445
}
445446
}
446447

448+
// Call the event handler to remove unsubscribed cache entries. It is to
449+
// ensure the cache entries are deleted even if discovery request fails. In
450+
// case of failure when the stream restarts, nonce is reset anyways.
451+
s.eventHandler.onRequest(url)
452+
447453
msg, err := proto.Marshal(req)
448454
if err != nil {
449455
s.logger.Warningf("Failed to marshal DiscoveryRequest: %v", err)
@@ -460,6 +466,7 @@ func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string,
460466
} else if s.logger.V(2) {
461467
s.logger.Warningf("ADS request sent for type %q, resources: %v, version: %q, nonce: %q", url, names, version, nonce)
462468
}
469+
463470
return nil
464471
}
465472

xds/internal/clients/xdsclient/authority.go

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,9 @@ func (a *authority) fallbackToServer(xc *xdsChannelWithConfig) bool {
293293
// Subscribe to all existing resources from the new management server.
294294
for typ, resources := range a.resources {
295295
for name, state := range resources {
296+
if len(state.watchers) == 0 {
297+
continue
298+
}
296299
if a.logger.V(2) {
297300
a.logger.Infof("Resubscribing to resource of type %q and name %q", typ.TypeName, name)
298301
}
@@ -655,6 +658,17 @@ func (a *authority) watchResource(rType ResourceType, resourceName string, watch
655658
}
656659
resources[resourceName] = state
657660
xdsChannel.channel.subscribe(rType, resourceName)
661+
} else if len(state.watchers) == 0 {
662+
if a.logger.V(2) {
663+
a.logger.Infof("Re-watch for type %q, resource name %q before unsubscription", rType.TypeName, resourceName)
664+
}
665+
// Add the active channel to the resource's channel configs if not
666+
// already present.
667+
state.xdsChannelConfigs[xdsChannel] = true
668+
// Ensure the resource is subscribed on the active channel. We do this
669+
// even if resource is present in cache as re-watches might occur
670+
// after unsubscribes or channel changes.
671+
xdsChannel.channel.subscribe(rType, resourceName)
658672
}
659673
// Always add the new watcher to the set of watchers.
660674
state.watchers[watcher] = true
@@ -732,32 +746,16 @@ func (a *authority) unwatchResource(rType ResourceType, resourceName string, wat
732746
}
733747

734748
// There are no more watchers for this resource. Unsubscribe this
735-
// resource from all channels where it was subscribed to and delete
736-
// the state associated with it.
749+
// resource from all channels where it was subscribed to but do not
750+
// delete the state associated with it in case the resource is
751+
// re-requested later before un-subscription request is completed by
752+
// the management server.
737753
if a.logger.V(2) {
738754
a.logger.Infof("Removing last watch for resource name %q", resourceName)
739755
}
740756
for xcc := range state.xdsChannelConfigs {
741757
xcc.channel.unsubscribe(rType, resourceName)
742758
}
743-
delete(resources, resourceName)
744-
745-
// If there are no more watchers for this resource type, delete the
746-
// resource type from the top-level map.
747-
if len(resources) == 0 {
748-
if a.logger.V(2) {
749-
a.logger.Infof("Removing last watch for resource type %q", rType.TypeName)
750-
}
751-
delete(a.resources, rType)
752-
}
753-
// If there are no more watchers for any resource type, release the
754-
// reference to the xdsChannels.
755-
if len(a.resources) == 0 {
756-
if a.logger.V(2) {
757-
a.logger.Infof("Removing last watch for for any resource type, releasing reference to the xdsChannel")
758-
}
759-
a.closeXDSChannels()
760-
}
761759
}, func() { close(done) })
762760
<-done
763761
})
@@ -809,7 +807,7 @@ func (a *authority) closeXDSChannels() {
809807
func (a *authority) watcherExistsForUncachedResource() bool {
810808
for _, resourceStates := range a.resources {
811809
for _, state := range resourceStates {
812-
if state.md.Status == xdsresource.ServiceStatusRequested {
810+
if len(state.watchers) > 0 && state.md.Status == xdsresource.ServiceStatusRequested {
813811
return true
814812
}
815813
}
@@ -841,6 +839,9 @@ func (a *authority) resourceConfig() []*v3statuspb.ClientConfig_GenericXdsConfig
841839
for rType, resourceStates := range a.resources {
842840
typeURL := rType.TypeURL
843841
for name, state := range resourceStates {
842+
if len(state.watchers) == 0 {
843+
continue
844+
}
844845
var raw *anypb.Any
845846
if state.cache != nil {
846847
raw = &anypb.Any{TypeUrl: typeURL, Value: state.cache.Bytes()}
@@ -874,6 +875,43 @@ func (a *authority) close() {
874875
}
875876
}
876877

878+
// removeUnsubscribedCacheEntries iterates through all resources of the given type and
879+
// removes the state for resources that have no active watchers. This is called
880+
// after sending a discovery request to ensure that resources that were
881+
// unsubscribed (and thus have no watchers) are eventually removed from the
882+
// authority's cache.
883+
//
884+
// This method is only executed in the context of a serializer callback.
885+
func (a *authority) removeUnsubscribedCacheEntries(rType ResourceType) {
886+
resources := a.resources[rType]
887+
if resources == nil {
888+
return
889+
}
890+
891+
for name, state := range resources {
892+
if len(state.watchers) == 0 {
893+
if a.logger.V(2) {
894+
a.logger.Infof("Removing resource state for %q of type %q as it has no watchers", name, rType.TypeName)
895+
}
896+
delete(resources, name)
897+
}
898+
}
899+
900+
if len(resources) == 0 {
901+
if a.logger.V(2) {
902+
a.logger.Infof("Removing resource type %q from cache as it has no more resources", rType.TypeName)
903+
}
904+
delete(a.resources, rType)
905+
}
906+
907+
if len(a.resources) == 0 {
908+
if a.logger.V(2) {
909+
a.logger.Infof("Removing last watch for any resource type, releasing reference to the xdsChannels")
910+
}
911+
a.closeXDSChannels()
912+
}
913+
}
914+
877915
func serviceStatusToProto(serviceStatus xdsresource.ServiceStatus) v3adminpb.ClientResourceStatus {
878916
switch serviceStatus {
879917
case xdsresource.ServiceStatusUnknown:

xds/internal/clients/xdsclient/channel.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ type xdsChannelEventHandler interface {
5959
// adsResourceDoesNotExist is called when the xdsChannel determines that a
6060
// requested ADS resource does not exist.
6161
adsResourceDoesNotExist(ResourceType, string)
62+
63+
// adsResourceRemoveUnsubscribedCacheEntries is called when the xdsChannel
64+
// needs to remove unsubscribed cache entries.
65+
adsResourceRemoveUnsubscribedCacheEntries(ResourceType)
6266
}
6367

6468
// xdsChannelOpts holds the options for creating a new xdsChannel.
@@ -136,8 +140,32 @@ type xdsChannel struct {
136140
}
137141

138142
func (xc *xdsChannel) close() {
143+
if xc.closed.HasFired() {
144+
return
145+
}
139146
xc.closed.Fire()
147+
148+
// Get the resource types that this specific ADS stream was handling
149+
// before stopping it.
150+
//
151+
// TODO: Revisit if we can avoid acquiring the lock of ads (another type).
152+
xc.ads.mu.Lock()
153+
typesHandledByStream := make([]ResourceType, 0, len(xc.ads.resourceTypeState))
154+
for typ := range xc.ads.resourceTypeState {
155+
typesHandledByStream = append(typesHandledByStream, typ)
156+
}
157+
xc.ads.mu.Unlock()
158+
140159
xc.ads.Stop()
160+
161+
// Schedule removeUnsubscribedCacheEntries for the types this stream was handling,
162+
// on all authorities that were interested in this channel.
163+
if _, ok := xc.eventHandler.(*channelState); ok {
164+
for _, typ := range typesHandledByStream {
165+
xc.eventHandler.adsResourceRemoveUnsubscribedCacheEntries(typ)
166+
}
167+
}
168+
141169
xc.transport.Close()
142170
xc.logger.Infof("Shutdown")
143171
}
@@ -228,6 +256,26 @@ func (xc *xdsChannel) onResponse(resp response, onDone func()) ([]string, error)
228256
return names, err
229257
}
230258

259+
// onRequest invoked when a request is about to be sent on the ADS stream. It
260+
// removes the cache entries for the resource type that are no longer subscribed to.
261+
func (xc *xdsChannel) onRequest(typeURL string) {
262+
if xc.closed.HasFired() {
263+
if xc.logger.V(2) {
264+
xc.logger.Infof("Received an update from the ADS stream on closed ADS stream")
265+
}
266+
return
267+
}
268+
269+
// Lookup the resource parser based on the resource type.
270+
rType, ok := xc.clientConfig.ResourceTypes[typeURL]
271+
if !ok {
272+
logger.Warningf("Resource type URL %q unknown in response from server", typeURL)
273+
return
274+
}
275+
276+
xc.eventHandler.adsResourceRemoveUnsubscribedCacheEntries(rType)
277+
}
278+
231279
// decodeResponse decodes the resources in the given ADS response.
232280
//
233281
// The opts parameter provides configuration options for decoding the resources.

xds/internal/clients/xdsclient/channel_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -772,3 +772,6 @@ func (ta *testEventHandler) waitForResourceDoesNotExist(ctx context.Context) (Re
772772
}
773773
return typ, name, nil
774774
}
775+
776+
func (*testEventHandler) adsResourceRemoveUnsubscribedCacheEntries(ResourceType) {
777+
}

xds/internal/clients/xdsclient/test/misc_watchers_test.go

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,3 +527,139 @@ func (s) TestWatchErrorsContainNodeID_ChannelCreationFailure(t *testing.T) {
527527
}
528528
}
529529
}
530+
531+
// TestUnsubscribeAndResubscribe tests the scenario where the client is busy
532+
// processing a response (simulating a pending ACK at a higher level by holding
533+
// the onDone callback from watchers). During this busy state, a resource is
534+
// unsubscribed and then immediately resubscribed which causes the
535+
// unsubscription and new subscription requests to be buffered due to flow
536+
// control.
537+
//
538+
// The test verifies the following:
539+
// - The resubscribed resource is served from the cache.
540+
// - No "resource does not exist" error is generated for the resubscribed
541+
// resource.
542+
func (s) TestRaceUnsubscribeResubscribe(t *testing.T) {
543+
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
544+
defer cancel()
545+
546+
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
547+
nodeID := uuid.New().String()
548+
549+
resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType}
550+
si := clients.ServerIdentifier{
551+
ServerURI: mgmtServer.Address,
552+
Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"},
553+
}
554+
555+
configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}}
556+
xdsClientConfig := xdsclient.Config{
557+
Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}},
558+
Node: clients.Node{ID: nodeID},
559+
TransportBuilder: grpctransport.NewBuilder(configs),
560+
ResourceTypes: resourceTypes,
561+
// Xdstp resource names used in this test do not specify an
562+
// authority. These will end up looking up an entry with the
563+
// empty key in the authorities map. Having an entry with an
564+
// empty key and empty configuration, results in these
565+
// resources also using the top-level configuration.
566+
Authorities: map[string]xdsclient.Authority{
567+
"": {XDSServers: []xdsclient.ServerConfig{}},
568+
},
569+
}
570+
571+
// Create an xDS client with the above config.
572+
client, err := xdsclient.New(xdsClientConfig)
573+
if err != nil {
574+
t.Fatalf("Failed to create xDS client: %v", err)
575+
}
576+
defer client.Close()
577+
578+
const ldsResourceName1 = "test-listener-resource1"
579+
const ldsResourceName2 = "test-listener-resource2"
580+
const rdsName1 = "test-route-configuration-resource1"
581+
const rdsName2 = "test-route-configuration-resource2"
582+
listenerResource1 := e2e.DefaultClientListener(ldsResourceName1, rdsName1)
583+
listenerResource2 := e2e.DefaultClientListener(ldsResourceName2, rdsName2)
584+
585+
// Watch ldsResourceName1 with a regular watcher to ensure it's in cache
586+
// and ACKed.
587+
watcherInitial := newListenerWatcher()
588+
cancelInitial := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, watcherInitial)
589+
if err := mgmtServer.Update(ctx, e2e.UpdateOptions{NodeID: nodeID, Listeners: []*v3listenerpb.Listener{listenerResource1}, SkipValidation: true}); err != nil {
590+
t.Fatalf("mgmtServer.Update() for %s failed: %v", ldsResourceName1, err)
591+
}
592+
if err := verifyListenerUpdate(ctx, watcherInitial.updateCh, listenerUpdateErrTuple{update: listenerUpdate{RouteConfigName: rdsName1}}); err != nil {
593+
t.Fatalf("watcherR1Initial did not receive update for %s: %v", ldsResourceName1, err)
594+
}
595+
cancelInitial()
596+
597+
// Watch ldsResourceName1 and ldsResourceName2 using blocking watchers.
598+
// - Server sends {ldsResourceName1, ldsResourceName2}.
599+
// - Watchers for both resources get the update but we HOLD on to their
600+
// onDone callbacks.
601+
blockingWatcherR1 := newBLockingListenerWatcher()
602+
cancelR1 := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, blockingWatcherR1)
603+
// defer cancelR1 later to create the race
604+
605+
blockingWatcherR2 := newBLockingListenerWatcher()
606+
cancelR2 := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName2, blockingWatcherR2)
607+
defer cancelR2()
608+
609+
// Configure the listener resources on the management server.
610+
resources := e2e.UpdateOptions{
611+
NodeID: nodeID,
612+
Listeners: []*v3listenerpb.Listener{listenerResource1, listenerResource2},
613+
SkipValidation: true}
614+
if err := mgmtServer.Update(ctx, resources); err != nil {
615+
t.Fatalf("mgmtServer.Update() for %s and %s failed: %v", ldsResourceName1, ldsResourceName2, err)
616+
}
617+
618+
var onDoneR1, onDoneR2 func()
619+
select {
620+
case <-blockingWatcherR1.updateCh:
621+
onDoneR1 = <-blockingWatcherR1.doneNotifierCh
622+
case <-ctx.Done():
623+
t.Fatalf("Timeout waiting for update for %s on blockingWatcherR1: %v", ldsResourceName1, ctx.Err())
624+
}
625+
select {
626+
case <-blockingWatcherR2.updateCh:
627+
onDoneR2 = <-blockingWatcherR2.doneNotifierCh
628+
case <-ctx.Done():
629+
t.Fatalf("Timeout waiting for update for %s on blockingWatcherR2: %v", ldsResourceName2, ctx.Err())
630+
}
631+
632+
// At this point, ACK for {listenerResource1,listenerResource2} has been
633+
// sent by the client but s.fc.pending.Load() is true because onDoneR1 and
634+
// onDoneR2 are held.
635+
//
636+
// Unsubscribe listenerResource1. This request should be buffered by
637+
// adsStreamImpl because s.fc.pending.Load() is true.
638+
cancelR1()
639+
640+
// Resubscribe listenerResource1 with a new regular watcher, which should
641+
// be served from cache.
642+
watcherR1New := newListenerWatcher()
643+
cancelR1New := client.WatchResource(xdsresource.V3ListenerURL, ldsResourceName1, watcherR1New)
644+
defer cancelR1New()
645+
646+
if err := verifyListenerUpdate(ctx, watcherR1New.updateCh, listenerUpdateErrTuple{update: listenerUpdate{RouteConfigName: rdsName1}}); err != nil {
647+
t.Fatalf("watcherR1New did not receive cached update for %s: %v", ldsResourceName1, err)
648+
}
649+
650+
// Release the onDone callbacks.
651+
if onDoneR1 != nil { // onDoneR1 might be nil if cancelR1() completed very fast.
652+
onDoneR1()
653+
}
654+
onDoneR2()
655+
656+
// Verify watcherR1New does not get a "resource does not exist" error.
657+
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout*10) // Slightly longer to catch delayed errors
658+
defer sCancel()
659+
if err := verifyNoListenerUpdate(sCtx, watcherR1New.resourceErrCh); err != nil {
660+
t.Fatalf("watcherR1New received unexpected resource error for %s: %v", ldsResourceName1, err)
661+
}
662+
if err := verifyNoListenerUpdate(sCtx, watcherR1New.ambientErrCh); err != nil {
663+
t.Fatalf("watcherR1New received unexpected ambient error for %s: %v", ldsResourceName1, err)
664+
}
665+
}

xds/internal/clients/xdsclient/xdsclient.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,21 @@ func (cs *channelState) adsResourceDoesNotExist(typ ResourceType, resourceName s
439439
}
440440
}
441441

442+
func (cs *channelState) adsResourceRemoveUnsubscribedCacheEntries(rType ResourceType) {
443+
if cs.parent.done.HasFired() {
444+
return
445+
}
446+
447+
cs.parent.channelsMu.Lock()
448+
defer cs.parent.channelsMu.Unlock()
449+
450+
for authority := range cs.interestedAuthorities {
451+
authority.xdsClientSerializer.TrySchedule(func(context.Context) {
452+
authority.removeUnsubscribedCacheEntries(rType)
453+
})
454+
}
455+
}
456+
442457
func resourceWatchStateForTesting(c *XDSClient, rType ResourceType, resourceName string) (xdsresource.ResourceWatchState, error) {
443458
c.channelsMu.Lock()
444459
defer c.channelsMu.Unlock()

0 commit comments

Comments
 (0)