Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: implementation of the xdsChannel #7757

Merged
merged 5 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions xds/internal/xdsclient/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@ import (
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// channelEventHandler wraps callbacks used by the xdsChannel to notify the xDS
// client about events on the channel. Methods in this interface may be invoked
// concurrently and the xDS client implementation needs to handle them in a
// thread-safe manner.
type channelEventHandler interface {
// xdsChannelEventHandler wraps callbacks used by the xdsChannel to notify the
// xDS client about events on the channel. Methods in this interface may be
// invoked concurrently and the xDS client implementation needs to handle them
// in a thread-safe manner.
type xdsChannelEventHandler interface {
// adsStreamFailure is called when the xdsChannel channel encounters an ADS
zasweq marked this conversation as resolved.
Show resolved Hide resolved
// stream failure.
adsStreamFailure(error)
Expand All @@ -57,22 +57,22 @@ type channelEventHandler interface {
adsResourceDoesNotExist(xdsresource.Type, string)
}

// channelOptions holds the options for creating a new xdsChannel.
type channelOptions struct {
// xdsChannelOpts holds the options for creating a new xdsChannel.
type xdsChannelOpts struct {
transport transport.Interface // Takes ownership of this transport.
serverConfig *bootstrap.ServerConfig // Configuration of the server to connect to.
bootstrapConfig *bootstrap.Config // Complete bootstrap configuration, used to decode resources.
resourceTypeGetter func(string) xdsresource.Type // Function to retrieve resource parsing functionality, based on resource type.
eventHandler channelEventHandler // Callbacks for ADS stream events.
backoff func(int) time.Duration // Backoff function to use for stream retries.
eventHandler xdsChannelEventHandler // Callbacks for ADS stream events.
backoff func(int) time.Duration // Backoff function to use for stream retries. Defaults to exponential backoff, if unset.
watchExpiryTimeout time.Duration // Timeout for ADS resource watch expiry.
logPrefix string // Prefix to use for logging.
}

// newChannel creates a new xdsChannel instance with the provided options.
// newXDSChannel creates a new xdsChannel instance with the provided options.
// It performs basic validation on the provided options and initializes the
// xdsChannel with the necessary components.
func newChannel(opts channelOptions) (*xdsChannel, error) {
func newXDSChannel(opts xdsChannelOpts) (*xdsChannel, error) {
switch {
case opts.transport == nil:
return nil, errors.New("xdsChannel: transport is nil")
Expand Down Expand Up @@ -119,9 +119,9 @@ func newChannel(opts channelOptions) (*xdsChannel, error) {
return xc, nil
}

// The xdsChannel represents a client channel to the management server, and is
// responsible for managing the lifecycle of the ADS and LRS streams and
// invoking callbacks on interested authorities for various stream events.
// xdsChannel represents a client channel to a management server, and is
// responsible for managing the lifecycle of the ADS and LRS streams. It invokes
// callbacks on the registered event handler for various ADS stream events.
type xdsChannel struct {
// The following fields are initialized at creation time and are read-only
// after that, and hence need not be guarded by a mutex.
Expand All @@ -131,7 +131,7 @@ type xdsChannel struct {
serverConfig *bootstrap.ServerConfig // Configuration of the server to connect to.
bootstrapConfig *bootstrap.Config // Complete bootstrap configuration, used to decode resources.
resourceTypeGetter func(string) xdsresource.Type // Function to retrieve resource parsing functionality, based on resource type.
eventHandler channelEventHandler // Callbacks for ADS stream events.
eventHandler xdsChannelEventHandler // Callbacks for ADS stream events.
logger *igrpclog.PrefixLogger // Logger to use for logging.
closed *grpcsync.Event // Fired when the channel is closed.
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (xc *xdsChannel) unsubscribe(typ xdsresource.Type, name string) {
func (xc *xdsChannel) OnADSStreamError(err error) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Received error on closed ADS stream: %v", err)
xc.logger.Infof("Received ADS stream error on a closed xdsChannel: %v", err)
}
return
}
Expand All @@ -200,7 +200,7 @@ func (xc *xdsChannel) OnADSStreamError(err error) {
func (xc *xdsChannel) OnADSWatchExpiry(typ xdsresource.Type, name string) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Received resource watch expiry for resource %q on closed ADS stream", name)
xc.logger.Infof("Received ADS resource watch expiry for resource %q on a closed xdsChannel", name)
}
return
}
Expand All @@ -210,6 +210,9 @@ func (xc *xdsChannel) OnADSWatchExpiry(typ xdsresource.Type, name string) {
// OnADSResponse is invoked when a response is received on the ADS stream. It
// decodes the resources in the response, and propagates the updates to the xDS
// client.
//
// It returns the list of resource names in the response and any errors
// encountered during decoding.
func (xc *xdsChannel) OnADSResponse(resp ads.Response, onDone func()) ([]string, error) {
zasweq marked this conversation as resolved.
Show resolved Hide resolved
if xc.closed.HasFired() {
if xc.logger.V(2) {
Expand Down Expand Up @@ -239,6 +242,19 @@ func (xc *xdsChannel) OnADSResponse(resp ads.Response, onDone func()) ([]string,
return names, err
}

// decodeResponse decodes the resources in the given ADS response.
//
// The opts parameter provides configuration options for decoding the resources.
// The rType parameter specifies the resource type parser to use for decoding
// the resources.
//
// The returned map contains a key for each resource in the response, with the
// value being either the decoded resource data or an error if decoding failed.
// The returned metadata includes the version of the response, the timestamp of
// the update, and the status of the update (ACKed or NACKed).
//
// If there are any errors decoding the resources, the metadata will indicate
// that the update was NACKed, and the errors will be returned as well.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: s/errors will be returned as well/something as in paragraph 2, that mentions it is the value for the key.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

func decodeResponse(opts *xdsresource.DecodeOptions, rType xdsresource.Type, resp ads.Response) (map[string]ads.DataAndErrTuple, xdsresource.UpdateMetadata, error) {
zasweq marked this conversation as resolved.
Show resolved Hide resolved
timestamp := time.Now()
md := xdsresource.UpdateMetadata{
Expand Down
16 changes: 8 additions & 8 deletions xds/internal/xdsclient/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func xdsChannelForTest(t *testing.T, serverURI, nodeID string, watchExpiryTimeou
}

// Create an xdsChannel that uses everything set up above.
xc, err := newChannel(channelOptions{
xc, err := newXDSChannel(xdsChannelOpts{
transport: tr,
serverConfig: serverCfg,
bootstrapConfig: bootstrapCfg,
Expand Down Expand Up @@ -151,30 +151,30 @@ func (s) TestChannel_New_FailureCases(t *testing.T) {

tests := []struct {
name string
opts channelOptions
opts xdsChannelOpts
wantErrStr string
}{
{
name: "emptyTransport",
opts: channelOptions{},
opts: xdsChannelOpts{},
wantErrStr: "transport is nil",
},
{
name: "emptyServerConfig",
opts: channelOptions{transport: &fakeTransport{}},
opts: xdsChannelOpts{transport: &fakeTransport{}},
wantErrStr: "serverConfig is nil",
},
{
name: "emptyBootstrapConfig",
opts: channelOptions{
opts: xdsChannelOpts{
transport: &fakeTransport{},
serverConfig: &bootstrap.ServerConfig{},
},
wantErrStr: "bootstrapConfig is nil",
},
{
name: "emptyResourceTypeGetter",
opts: channelOptions{
opts: xdsChannelOpts{
transport: &fakeTransport{},
serverConfig: &bootstrap.ServerConfig{},
bootstrapConfig: &bootstrap.Config{},
Expand All @@ -183,7 +183,7 @@ func (s) TestChannel_New_FailureCases(t *testing.T) {
},
{
name: "emptyEventHandler",
opts: channelOptions{
opts: xdsChannelOpts{
transport: &fakeTransport{},
serverConfig: &bootstrap.ServerConfig{},
bootstrapConfig: &bootstrap.Config{},
Expand All @@ -195,7 +195,7 @@ func (s) TestChannel_New_FailureCases(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, err := newChannel(test.opts)
_, err := newXDSChannel(test.opts)
if err == nil || !strings.Contains(err.Error(), test.wantErrStr) {
t.Fatalf("newXDSChannel() = %v, want %q", err, test.wantErrStr)
}
Expand Down
Loading