Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xdsclient: implementation of the xdsChannel #7757

Merged
merged 5 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 288 additions & 0 deletions xds/internal/xdsclient/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,288 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package xdsclient

import (
"errors"
"fmt"
"time"

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/backoff"
igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/xds/bootstrap"
"google.golang.org/grpc/xds/internal/xdsclient/load"
"google.golang.org/grpc/xds/internal/xdsclient/transport"
"google.golang.org/grpc/xds/internal/xdsclient/transport/ads"
"google.golang.org/grpc/xds/internal/xdsclient/transport/lrs"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
)

// channelEventHandler wraps callbacks used by the xdsChannel to notify the xDS
// client about events on the channel. Methods in this interface may be invoked
// concurrently and the xDS client implementation needs to handle them in a
// thread-safe manner.
type channelEventHandler interface {
// adsStreamFailure is called when the xdsChannel channel encounters an ADS
zasweq marked this conversation as resolved.
Show resolved Hide resolved
// stream failure.
adsStreamFailure(error)

// adsResourceUpdate is called when the xdsChannel receives an ADS response
// from the xDS management server. The callback is provided with the
// following:
// - the resource type of the resources in the response
// - a map of resources in the response, keyed by resource name
// - the metadata associated with the response
// - a callback to be invoked when the updated is processed
adsResourceUpdate(xdsresource.Type, map[string]ads.DataAndErrTuple, xdsresource.UpdateMetadata, func())

// adsResourceDoesNotExist is called when the xdsChannel determines that a
// requested ADS resource does not exist.
adsResourceDoesNotExist(xdsresource.Type, string)
}

// channelOptions holds the options for creating a new xdsChannel.
type channelOptions struct {
zasweq marked this conversation as resolved.
Show resolved Hide resolved
transport transport.Interface // Takes ownership of this transport.
serverConfig *bootstrap.ServerConfig // Configuration of the server to connect to.
bootstrapConfig *bootstrap.Config // Complete bootstrap configuration, used to decode resources.
resourceTypeGetter func(string) xdsresource.Type // Function to retrieve resource parsing functionality, based on resource type.
eventHandler channelEventHandler // Callbacks for ADS stream events.
backoff func(int) time.Duration // Backoff function to use for stream retries.
watchExpiryTimeout time.Duration // Timeout for ADS resource watch expiry.
logPrefix string // Prefix to use for logging.
}

// newChannel creates a new xdsChannel instance with the provided options.
// It performs basic validation on the provided options and initializes the
// xdsChannel with the necessary components.
func newChannel(opts channelOptions) (*xdsChannel, error) {
switch {
case opts.transport == nil:
return nil, errors.New("xdsChannel: transport is nil")
case opts.serverConfig == nil:
return nil, errors.New("xdsChannel: serverConfig is nil")
case opts.bootstrapConfig == nil:
return nil, errors.New("xdsChannel: bootstrapConfig is nil")
case opts.resourceTypeGetter == nil:
return nil, errors.New("xdsChannel: resourceTypeGetter is nil")
case opts.eventHandler == nil:
return nil, errors.New("xdsChannel: eventHandler is nil")
}

xc := &xdsChannel{
transport: opts.transport,
serverConfig: opts.serverConfig,
bootstrapConfig: opts.bootstrapConfig,
resourceTypeGetter: opts.resourceTypeGetter,
eventHandler: opts.eventHandler,
closed: grpcsync.NewEvent(),
}

l := grpclog.Component("xds")
logPrefix := opts.logPrefix + fmt.Sprintf("[xds-channel %p] ", xc)
xc.logger = igrpclog.NewPrefixLogger(l, logPrefix)

if opts.backoff == nil {
opts.backoff = backoff.DefaultExponential.Backoff
zasweq marked this conversation as resolved.
Show resolved Hide resolved
}
xc.ads = ads.NewStreamImpl(ads.StreamOpts{
Transport: xc.transport,
EventHandler: xc,
Backoff: opts.backoff,
NodeProto: xc.bootstrapConfig.Node(),
WatchExpiryTimeout: opts.watchExpiryTimeout,
LogPrefix: logPrefix,
})
xc.lrs = lrs.NewStreamImpl(lrs.StreamOpts{
Transport: xc.transport,
Backoff: opts.backoff,
NodeProto: xc.bootstrapConfig.Node(),
LogPrefix: logPrefix,
})
return xc, nil
}

// The xdsChannel represents a client channel to the management server, and is
zasweq marked this conversation as resolved.
Show resolved Hide resolved
// responsible for managing the lifecycle of the ADS and LRS streams and
// invoking callbacks on interested authorities for various stream events.
zasweq marked this conversation as resolved.
Show resolved Hide resolved
type xdsChannel struct {
// The following fields are initialized at creation time and are read-only
// after that, and hence need not be guarded by a mutex.
transport transport.Interface // Takes ownership of this transport (used to make streaming calls).
ads *ads.StreamImpl // An ADS stream to the management server.
lrs *lrs.StreamImpl // An LRS stream to the management server.
serverConfig *bootstrap.ServerConfig // Configuration of the server to connect to.
bootstrapConfig *bootstrap.Config // Complete bootstrap configuration, used to decode resources.
resourceTypeGetter func(string) xdsresource.Type // Function to retrieve resource parsing functionality, based on resource type.
eventHandler channelEventHandler // Callbacks for ADS stream events.
logger *igrpclog.PrefixLogger // Logger to use for logging.
closed *grpcsync.Event // Fired when the channel is closed.
}

func (xc *xdsChannel) close() {
xc.closed.Fire()
xc.ads.Stop()
xc.lrs.Stop()
xc.transport.Close()
xc.logger.Infof("Shutdown")
}

// reportLoad returns a load.Store that can be used to report load to the LRS, and a
// function that can be called to stop reporting load.
func (xc *xdsChannel) reportLoad() (*load.Store, func()) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Attempt to start load reporting on closed channel")
}
return nil, func() {}

Check warning on line 154 in xds/internal/xdsclient/channel.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/channel.go#L151-L154

Added lines #L151 - L154 were not covered by tests
zasweq marked this conversation as resolved.
Show resolved Hide resolved
}
return xc.lrs.ReportLoad()
}

// subscribe adds a subscription for the given resource name of the given
// resource type on the ADS stream.
func (xc *xdsChannel) subscribe(typ xdsresource.Type, name string) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Attempt to subscribe to an xDS resource of type %s and name %q on a closed channel", typ.TypeName(), name)
}
return

Check warning on line 166 in xds/internal/xdsclient/channel.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/channel.go#L163-L166

Added lines #L163 - L166 were not covered by tests
}
xc.ads.Subscribe(typ, name)
}

// unsubscribe removes the subscription for the given resource name of the given
// resource type from the ADS stream.
func (xc *xdsChannel) unsubscribe(typ xdsresource.Type, name string) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Attempt to unsubscribe to an xDS resource of type %s and name %q on a closed channel", typ.TypeName(), name)
}
return

Check warning on line 178 in xds/internal/xdsclient/channel.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/channel.go#L175-L178

Added lines #L175 - L178 were not covered by tests
}
xc.ads.Unsubscribe(typ, name)
}

// The following OnADSXxx() methods implement the ads.StreamEventHandler interface
// and are invoked by the ADS stream implementation.

// OnADSStreamError is invoked when an error occurs on the ADS stream. It
// propagates the update to the xDS client.
func (xc *xdsChannel) OnADSStreamError(err error) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Received error on closed ADS stream: %v", err)
zasweq marked this conversation as resolved.
Show resolved Hide resolved
}

Check warning on line 192 in xds/internal/xdsclient/channel.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/channel.go#L191-L192

Added lines #L191 - L192 were not covered by tests
return
}
xc.eventHandler.adsStreamFailure(err)
}

// OnADSWatchExpiry is invoked when a watch for a resource expires. It
// propagates the update to the xDS client.
func (xc *xdsChannel) OnADSWatchExpiry(typ xdsresource.Type, name string) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Received resource watch expiry for resource %q on closed ADS stream", name)
}
return

Check warning on line 205 in xds/internal/xdsclient/channel.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/channel.go#L202-L205

Added lines #L202 - L205 were not covered by tests
}
xc.eventHandler.adsResourceDoesNotExist(typ, name)
}

// OnADSResponse is invoked when a response is received on the ADS stream. It
// decodes the resources in the response, and propagates the updates to the xDS
// client.
func (xc *xdsChannel) OnADSResponse(resp ads.Response, onDone func()) ([]string, error) {
zasweq marked this conversation as resolved.
Show resolved Hide resolved
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Received an update from the ADS stream on closed ADS stream")
}
return nil, errors.New("xdsChannel is closed")

Check warning on line 218 in xds/internal/xdsclient/channel.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/channel.go#L215-L218

Added lines #L215 - L218 were not covered by tests
}

// Lookup the resource parser based on the resource type.
rType := xc.resourceTypeGetter(resp.TypeURL)
if rType == nil {
return nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource type URL %q unknown in response from server", resp.TypeURL)
}

Check warning on line 225 in xds/internal/xdsclient/channel.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/channel.go#L224-L225

Added lines #L224 - L225 were not covered by tests

// Decode the resources and build the list of resource names to return.
opts := &xdsresource.DecodeOptions{
BootstrapConfig: xc.bootstrapConfig,
ServerConfig: xc.serverConfig,
}
updates, md, err := decodeResponse(opts, rType, resp)
var names []string
for name := range updates {
names = append(names, name)
}

xc.eventHandler.adsResourceUpdate(rType, updates, md, onDone)
return names, err
}

func decodeResponse(opts *xdsresource.DecodeOptions, rType xdsresource.Type, resp ads.Response) (map[string]ads.DataAndErrTuple, xdsresource.UpdateMetadata, error) {
zasweq marked this conversation as resolved.
Show resolved Hide resolved
timestamp := time.Now()
md := xdsresource.UpdateMetadata{
Version: resp.Version,
Timestamp: timestamp,
}

topLevelErrors := make([]error, 0) // Tracks deserialization errors, where we don't have a resource name.
perResourceErrors := make(map[string]error) // Tracks resource validation errors, where we have a resource name.
ret := make(map[string]ads.DataAndErrTuple) // Return result, a map from resource name to either resource data or error.
for _, r := range resp.Resources {
result, err := rType.Decode(opts, r)

// Name field of the result is left unpopulated only when resource
// deserialization fails.
name := ""
if result != nil {
name = xdsresource.ParseName(result.Name).String()
}
if err == nil {
ret[name] = ads.DataAndErrTuple{Resource: result.Resource}
continue
}
if name == "" {
zasweq marked this conversation as resolved.
Show resolved Hide resolved
topLevelErrors = append(topLevelErrors, err)
continue
}
perResourceErrors[name] = err
// Add place holder in the map so we know this resource name was in
// the response.
ret[name] = ads.DataAndErrTuple{Err: err}

Check warning on line 272 in xds/internal/xdsclient/channel.go

View check run for this annotation

Codecov / codecov/patch

xds/internal/xdsclient/channel.go#L269-L272

Added lines #L269 - L272 were not covered by tests
}

if len(topLevelErrors) == 0 && len(perResourceErrors) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clarification: why status is ServiceStatusNACKed if only some resources have errors and some not? If i remember correctly, the server will send us all the resources by default whether we need it or not. So, we should not consider those we don't need right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the xDS transport protocol level, an ACK or a NACK is not for a particular resource, but for a version of a particular resource type.

What this means is that if we receive resources [A, B, C] of type Listener at version 1, and we like all of them, we would ACK version 1 for resource type Listener.

Later on, if we receive an update for the same resource type and we receive resource [A, B, C] at version 2, but this time we don't like resource C for some reason, we would send a NACK for version 2 for resource type Listener. But we will go ahead and use resources A and B at version 2, but we will use resource C at version 1. This information is captured in the update metadata and when a CSDS client queries us for a resource dump, we will return [A at 2], [B at 2], [C at 1, but NACKed 2]. This is taken care of at the authority level, which is the one that maintains the resource cache.

Hope this helps.

md.Status = xdsresource.ServiceStatusACKed
return ret, md, nil
}

md.Status = xdsresource.ServiceStatusNACKed
errRet := combineErrors(rType.TypeName(), topLevelErrors, perResourceErrors)
md.ErrState = &xdsresource.UpdateErrorMetadata{
Version: resp.Version,
Err: errRet,
Timestamp: timestamp,
zasweq marked this conversation as resolved.
Show resolved Hide resolved
}
return ret, md, errRet
}
Loading