Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions xds/internal/clients/internal/testutils/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,28 @@
}
}

// SendContext sends value on the underlying channel, or returns an error if
// the context expires.
func (c *Channel) SendContext(ctx context.Context, value any) error {
select {
case c.C <- value:
return nil
case <-ctx.Done():
return ctx.Err()

Check warning on line 69 in xds/internal/clients/internal/testutils/channel.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/clients/internal/testutils/channel.go#L68-L69

Added lines #L68 - L69 were not covered by tests
}
}

// Drain drains the channel by repeatedly reading from it until it is empty.
func (c *Channel) Drain() {
for {
select {
case <-c.C:

Check warning on line 77 in xds/internal/clients/internal/testutils/channel.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/clients/internal/testutils/channel.go#L77

Added line #L77 was not covered by tests
default:
return
}
}
}

// NewChannelWithSize returns a new Channel with a buffer of bufSize.
func NewChannelWithSize(bufSize int) *Channel {
return &Channel{C: make(chan any, bufSize)}
Expand Down
71 changes: 30 additions & 41 deletions xds/internal/clients/xdsclient/ads_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,39 +73,13 @@
onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream.
}

// watchState is a enum that describes the watch state of a particular
// resource.
type watchState int

const (
// resourceWatchStateStarted is the state where a watch for a resource was
// started, but a request asking for that resource is yet to be sent to the
// management server.
resourceWatchStateStarted watchState = iota
// resourceWatchStateRequested is the state when a request has been sent for
// the resource being watched.
resourceWatchStateRequested
// ResourceWatchStateReceived is the state when a response has been received
// for the resource being watched.
resourceWatchStateReceived
// resourceWatchStateTimeout is the state when the watch timer associated
// with the resource expired because no response was received.
resourceWatchStateTimeout
)

// resourceWatchState is the state corresponding to a resource being watched.
type resourceWatchState struct {
State watchState // Watch state of the resource.
ExpiryTimer *time.Timer // Timer for the expiry of the watch.
}

// state corresponding to a resource type.
type resourceTypeState struct {
version string // Last acked version. Should not be reset when the stream breaks.
nonce string // Last received nonce. Should be reset when the stream breaks.
bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked.
subscribedResources map[string]*resourceWatchState // Map of subscribed resource names to their state.
pendingWrite bool // True if there is a pending write for this resource type.
version string // Last acked version. Should not be reset when the stream breaks.
nonce string // Last received nonce. Should be reset when the stream breaks.
bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked.
subscribedResources map[string]*xdsresource.ResourceWatchState // Map of subscribed resource names to their state.
pendingWrite bool // True if there is a pending write for this resource type.
}

// adsStreamImpl provides the functionality associated with an ADS (Aggregated
Expand Down Expand Up @@ -198,15 +172,15 @@
// An entry in the type state map is created as part of the first
// subscription request for this type.
state = &resourceTypeState{
subscribedResources: make(map[string]*resourceWatchState),
subscribedResources: make(map[string]*xdsresource.ResourceWatchState),
bufferedRequests: make(chan struct{}, 1),
}
s.resourceTypeState[typ] = state
}

// Create state for the newly subscribed resource. The watch timer will
// be started when a request for this resource is actually sent out.
state.subscribedResources[name] = &resourceWatchState{State: resourceWatchStateStarted}
state.subscribedResources[name] = &xdsresource.ResourceWatchState{State: xdsresource.ResourceWatchStateStarted}
state.pendingWrite = true

// Send a request for the resource type with updated subscriptions.
Expand Down Expand Up @@ -616,8 +590,8 @@
s.logger.Warningf("ADS stream received a response for resource %q, but no state exists for it", name)
continue
}
if ws := rs.State; ws == resourceWatchStateStarted || ws == resourceWatchStateRequested {
rs.State = resourceWatchStateReceived
if ws := rs.State; ws == xdsresource.ResourceWatchStateStarted || ws == xdsresource.ResourceWatchStateRequested {
rs.State = xdsresource.ResourceWatchStateReceived
if rs.ExpiryTimer != nil {
rs.ExpiryTimer.Stop()
rs.ExpiryTimer = nil
Expand Down Expand Up @@ -652,14 +626,14 @@
s.mu.Lock()
for _, state := range s.resourceTypeState {
for _, rs := range state.subscribedResources {
if rs.State != resourceWatchStateRequested {
if rs.State != xdsresource.ResourceWatchStateRequested {
continue
}
if rs.ExpiryTimer != nil {
rs.ExpiryTimer.Stop()
rs.ExpiryTimer = nil
}
rs.State = resourceWatchStateStarted
rs.State = xdsresource.ResourceWatchStateStarted
}
}
s.mu.Unlock()
Expand Down Expand Up @@ -691,23 +665,38 @@
if !ok {
continue
}
if resourceState.State != resourceWatchStateStarted {
if resourceState.State != xdsresource.ResourceWatchStateStarted {
continue
}
resourceState.State = resourceWatchStateRequested
resourceState.State = xdsresource.ResourceWatchStateRequested

rs := resourceState
resourceState.ExpiryTimer = time.AfterFunc(s.watchExpiryTimeout, func() {
s.mu.Lock()
rs.State = resourceWatchStateTimeout
rs.State = xdsresource.ResourceWatchStateTimeout
rs.ExpiryTimer = nil
s.mu.Unlock()
s.eventHandler.onWatchExpiry(typ, name)
})
}
}

func resourceNames(m map[string]*resourceWatchState) []string {
func (s *adsStreamImpl) adsResourceWatchStateForTesting(rType ResourceType, resourceName string) (xdsresource.ResourceWatchState, error) {
s.mu.Lock()
defer s.mu.Unlock()

state, ok := s.resourceTypeState[rType]
if !ok {
return xdsresource.ResourceWatchState{}, fmt.Errorf("unknown resource type: %v", rType)
}

Check warning on line 691 in xds/internal/clients/xdsclient/ads_stream.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/clients/xdsclient/ads_stream.go#L690-L691

Added lines #L690 - L691 were not covered by tests
resourceState, ok := state.subscribedResources[resourceName]
if !ok {
return xdsresource.ResourceWatchState{}, fmt.Errorf("unknown resource name: %v", resourceName)
}

Check warning on line 695 in xds/internal/clients/xdsclient/ads_stream.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/clients/xdsclient/ads_stream.go#L694-L695

Added lines #L694 - L695 were not covered by tests
return *resourceState, nil
}

func resourceNames(m map[string]*xdsresource.ResourceWatchState) []string {
ret := make([]string, len(m))
idx := 0
for name := range m {
Expand Down
36 changes: 36 additions & 0 deletions xds/internal/clients/xdsclient/internal/internal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Package internal contains functionality internal to the xdsclient package.
package internal

import "time"

var (
// WatchExpiryTimeout is the watch expiry timeout for xDS client. It can be
// overridden by tests to change the default watch expiry timeout.
WatchExpiryTimeout time.Duration

// StreamBackoff is the stream backoff for xDS client. It can be overridden
// by tests to change the default backoff strategy.
StreamBackoff func(int) time.Duration

// ResourceWatchStateForTesting gets the watch state for the resource
// identified by the given resource type and resource name. Returns a
// non-nil error if there is no such resource being watched.
ResourceWatchStateForTesting any // func(*xdsclient.XDSClient, xdsclient.ResourceType, string) error
)
46 changes: 46 additions & 0 deletions xds/internal/clients/xdsclient/internal/xdsresource/ads_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package xdsresource

import "time"

// WatchState is a enum that describes the watch state of a particular
// resource.
type WatchState int

const (
// ResourceWatchStateStarted is the state where a watch for a resource was
// started, but a request asking for that resource is yet to be sent to the
// management server.
ResourceWatchStateStarted WatchState = iota
// ResourceWatchStateRequested is the state when a request has been sent for
// the resource being watched.
ResourceWatchStateRequested
// ResourceWatchStateReceived is the state when a response has been received
// for the resource being watched.
ResourceWatchStateReceived
// ResourceWatchStateTimeout is the state when the watch timer associated
// with the resource expired because no response was received.
ResourceWatchStateTimeout
)

// ResourceWatchState is the state corresponding to a resource being watched.
type ResourceWatchState struct {
State WatchState // Watch state of the resource.
ExpiryTimer *time.Timer // Timer for the expiry of the watch.
}
Loading