Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions interop/xds/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,7 @@ func (x *xdsUpdateHealthServiceImpl) SetNotServing(_ context.Context, _ *testpb.
}

func xdsServingModeCallback(addr net.Addr, args xds.ServingModeChangeArgs) {
logger.Infof("Serving mode for xDS server at %s changed to %s", addr.String(), args.Mode)
if args.Err != nil {
logger.Infof("ServingModeCallback returned error: %v", args.Err)
}
logger.Infof("Serving mode callback for xDS server at %q invoked with mode: %q, err: %v", addr.String(), args.Mode, args.Err)
}

func main() {
Expand Down
8 changes: 7 additions & 1 deletion xds/internal/server/listener_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru
drainCallback: params.DrainCallback,
isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(),

mode: connectivity.ServingModeStarting,
closed: grpcsync.NewEvent(),
goodUpdate: grpcsync.NewEvent(),
ldsUpdateCh: make(chan ldsUpdateWithError, 1),
Expand Down Expand Up @@ -429,14 +430,19 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
}
}

// switchMode updates the value of serving mode and filter chains stored in the
// listenerWrapper. And if the serving mode has changed, it invokes the
// registered mode change callback.
func (l *listenerWrapper) switchMode(fcs *xdsresource.FilterChainManager, newMode connectivity.ServingMode, err error) {
l.mu.Lock()
defer l.mu.Unlock()

l.filterChains = fcs
if l.mode == newMode {
return
}
l.mode = newMode
if l.modeCallback != nil {
l.modeCallback(l.Listener.Addr(), newMode, err)
}
l.logger.Warningf("Listener %q entering mode: %q due to error: %v", l.Addr(), newMode, err)
}
14 changes: 13 additions & 1 deletion xds/internal/test/xds_server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,20 @@ func setupGRPCServer(t *testing.T, bootstrapContents []byte) (net.Listener, func
t.Fatal(err)
}

// Create a server option to get notified about serving mode changes. We don't
// do anything other than throwing a log entry here. But this is required,
// since the server code emits a log entry at the default level (which is
// ERROR) if no callback is registered for serving mode changes. Our
// testLogger fails the test if there is any log entry at ERROR level. It does
// provide an ExpectError() method, but that takes a string and it would be
// painful to construct the exact error message expected here. Instead this
// works just fine.
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
})

// Initialize an xDS-enabled gRPC server and register the stubServer on it.
server := xds.NewGRPCServer(grpc.Creds(creds), xds.BootstrapContentsForTesting(bootstrapContents))
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents))
testpb.RegisterTestServiceServer(server, &testService{})

// Create a local listener and pass it to Serve().
Expand Down
24 changes: 12 additions & 12 deletions xds/internal/test/xds_server_serving_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) {
updateCh := make(chan connectivity.ServingMode, 1)

// Create a server option to get notified about serving mode changes.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
updateCh <- args.Mode
Expand All @@ -82,6 +80,8 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) {
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener},
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -194,7 +194,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
case lis2.Addr().String():
updateCh2 <- args.Mode
default:
t.Logf("serving mode callback invoked for unknown listener address: %q", addr.String())
t.Errorf("serving mode callback invoked for unknown listener address: %q", addr.String())
}
})

Expand Down Expand Up @@ -240,15 +240,15 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}

Expand All @@ -274,7 +274,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener1},
}); err != nil {
t.Error(err)
t.Fatal(err)
}

// Wait for lis2 to move to "not-serving" mode.
Expand All @@ -283,7 +283,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != connectivity.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
}
}

Expand All @@ -298,7 +298,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{},
}); err != nil {
t.Error(err)
t.Fatal(err)
}

// Wait for lis1 to move to "not-serving" mode. lis2 was already removed
Expand All @@ -309,7 +309,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != connectivity.ServingModeNotServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing)
}
}

Expand All @@ -330,7 +330,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener1, listener2},
}); err != nil {
t.Error(err)
t.Fatal(err)
}

// Wait for both listeners to move to "serving" mode.
Expand All @@ -339,15 +339,15 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh1:
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}
select {
case <-ctx.Done():
t.Fatalf("timed out waiting for a mode change update: %v", err)
case mode := <-updateCh2:
if mode != connectivity.ServingModeServing {
t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing)
}
}

Expand Down
40 changes: 31 additions & 9 deletions xds/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,10 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
s := &GRPCServer{
gs: newGRPCServer(newOpts...),
quit: grpcsync.NewEvent(),
opts: handleServerOptions(opts),
}
s.logger = prefixLogger(s)
s.logger.Infof("Created xds.GRPCServer")
s.handleServerOptions(opts)

// We type assert our underlying gRPC server to the real grpc.Server here
// before trying to retrieve the configured credentials. This approach
Expand All @@ -128,14 +128,32 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {

// handleServerOptions iterates through the list of server options passed in by
// the user, and handles the xDS server specific options.
func handleServerOptions(opts []grpc.ServerOption) *serverOptions {
func (s *GRPCServer) handleServerOptions(opts []grpc.ServerOption) {
so := &serverOptions{}
for _, opt := range opts {
if o, ok := opt.(*serverOption); ok {
o.apply(so)
}
}
return so

// If the application did not register a mode change callback, the XdsServer
// registers its own which simply logs any errors and each serving resumption
// after an error, all at a default-visible log level, as per A36. The
// default-visible log level for us is ERROR.
//
// Note that this means that `s.opts.modeCallback` will never be nil and can
// safely be invoked directly from `handleServiceModeChanges`.
if so.modeCallback == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we have a defaultServerOptions that has this set in it, such that if applying the options doesn't override it, we get the defaults? (e.g. how we handle DialOptions in the grpc package)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

so.modeCallback = func(addr net.Addr, args ServingModeChangeArgs) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a standalone function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

switch args.Mode {
case connectivity.ServingModeServing:
s.logger.Errorf("Listener %q entering mode: %q", addr.String(), args.Mode)
case connectivity.ServingModeNotServing:
s.logger.Errorf("Listener %q entering mode: %q due to error: %v", addr.String(), args.Mode, args.Err)
}
}
}
s.opts = so
}

// RegisterService registers a service and its implementation to the underlying
Expand Down Expand Up @@ -291,12 +309,16 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
drainServerTransports(gs, args.addr.String())
}
}
if s.opts.modeCallback != nil {
s.opts.modeCallback(args.addr, ServingModeChangeArgs{
Mode: args.mode,
Err: args.err,
})
}

// The XdsServer API will allow applications to register a "serving state"
// callback to be invoked when the server begins serving and when the
// server encounters errors that force it to be "not serving". If "not
// serving", the callback must be provided error information, for
// debugging use by developers - A36.
s.opts.modeCallback(args.addr, ServingModeChangeArgs{
Mode: args.mode,
Err: args.err,
})
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion xds/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,18 @@ func (s) TestHandleListenerUpdate_NoXDSCreds(t *testing.T) {
fs, clientCh, cleanup := setupOverrides()
defer cleanup()

server := NewGRPCServer()
// Create a server option to get notified about serving mode changes. We don't
// do anything other than throwing a log entry here. But this is required,
// since the server code emits a log entry at the default level (which is
// ERROR) if no callback is registered for serving mode changes. Our
// testLogger fails the test if there is any log entry at ERROR level. It does
// provide an ExpectError() method, but that takes a string and it would be
// painful to construct the exact error message expected here. Instead this
// works just fine.
modeChangeOpt := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) {
t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
})
server := NewGRPCServer(modeChangeOpt)
defer server.Stop()

lis, err := testutils.LocalTCPListener()
Expand Down