From 39ecba3ea4254ef907d254709674ef5364b98cd9 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Tue, 1 Mar 2022 11:36:37 -0800 Subject: [PATCH 1/3] xds: avoid log spam during server mode switches --- interop/xds/server/server.go | 5 +--- xds/internal/server/listener_wrapper.go | 8 ++++++- .../test/xds_server_integration_test.go | 14 ++++++++++- .../test/xds_server_serving_mode_test.go | 24 +++++++++---------- xds/server.go | 18 ++++++++++++++ xds/server_test.go | 13 +++++++++- 6 files changed, 63 insertions(+), 19 deletions(-) diff --git a/interop/xds/server/server.go b/interop/xds/server/server.go index afbbc56af89e..5932199de6c6 100644 --- a/interop/xds/server/server.go +++ b/interop/xds/server/server.go @@ -100,10 +100,7 @@ func (x *xdsUpdateHealthServiceImpl) SetNotServing(_ context.Context, _ *testpb. } func xdsServingModeCallback(addr net.Addr, args xds.ServingModeChangeArgs) { - logger.Infof("Serving mode for xDS server at %s changed to %s", addr.String(), args.Mode) - if args.Err != nil { - logger.Infof("ServingModeCallback returned error: %v", args.Err) - } + logger.Infof("Serving mode callback for xDS server at %q invoked with mode: %q, err: %v", addr.String(), args.Mode, args.Err) } func main() { diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index c90f9672ea32..421ed7533633 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -111,6 +111,7 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru drainCallback: params.DrainCallback, isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(), + mode: connectivity.ServingModeStarting, closed: grpcsync.NewEvent(), goodUpdate: grpcsync.NewEvent(), ldsUpdateCh: make(chan ldsUpdateWithError, 1), @@ -429,14 +430,19 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) { } } +// switchMode updates the value of serving mode and filter chains stored in the +// listenerWrapper. And if the serving mode has changed, it invokes the +// registered mode change callback. func (l *listenerWrapper) switchMode(fcs *xdsresource.FilterChainManager, newMode connectivity.ServingMode, err error) { l.mu.Lock() defer l.mu.Unlock() l.filterChains = fcs + if l.mode == newMode { + return + } l.mode = newMode if l.modeCallback != nil { l.modeCallback(l.Listener.Addr(), newMode, err) } - l.logger.Warningf("Listener %q entering mode: %q due to error: %v", l.Addr(), newMode, err) } diff --git a/xds/internal/test/xds_server_integration_test.go b/xds/internal/test/xds_server_integration_test.go index 6904e0e88d5c..b362926905b6 100644 --- a/xds/internal/test/xds_server_integration_test.go +++ b/xds/internal/test/xds_server_integration_test.go @@ -80,8 +80,20 @@ func setupGRPCServer(t *testing.T, bootstrapContents []byte) (net.Listener, func t.Fatal(err) } + // Create a server option to get notified about serving mode changes. We don't + // do anything other than throwing a log entry here. But this is required, + // since the server code emits a log entry at the default level (which is + // ERROR) if no callback is registered for serving mode changes. Our + // testLogger fails the test if there is any log entry at ERROR level. It does + // provide an ExpectError() method, but that takes a string and it would be + // painful to construct the exact error message expected here. Instead this + // works just fine. + modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { + t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) + }) + // Initialize an xDS-enabled gRPC server and register the stubServer on it. - server := xds.NewGRPCServer(grpc.Creds(creds), xds.BootstrapContentsForTesting(bootstrapContents)) + server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt, xds.BootstrapContentsForTesting(bootstrapContents)) testpb.RegisterTestServiceServer(server, &testService{}) // Create a local listener and pass it to Serve(). diff --git a/xds/internal/test/xds_server_serving_mode_test.go b/xds/internal/test/xds_server_serving_mode_test.go index 5fa17546e81d..236a831c642b 100644 --- a/xds/internal/test/xds_server_serving_mode_test.go +++ b/xds/internal/test/xds_server_serving_mode_test.go @@ -60,8 +60,6 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) { updateCh := make(chan connectivity.ServingMode, 1) // Create a server option to get notified about serving mode changes. - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) - defer cancel() modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) { t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) updateCh <- args.Mode @@ -82,6 +80,8 @@ func (s) TestServerSideXDS_RedundantUpdateSuppression(t *testing.T) { NodeID: nodeID, Listeners: []*v3listenerpb.Listener{listener}, } + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() if err := managementServer.Update(ctx, resources); err != nil { t.Fatal(err) } @@ -194,7 +194,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { case lis2.Addr().String(): updateCh2 <- args.Mode default: - t.Logf("serving mode callback invoked for unknown listener address: %q", addr.String()) + t.Errorf("serving mode callback invoked for unknown listener address: %q", addr.String()) } }) @@ -240,7 +240,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh1: if mode != connectivity.ServingModeServing { - t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) + t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) } } select { @@ -248,7 +248,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh2: if mode != connectivity.ServingModeServing { - t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) + t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) } } @@ -274,7 +274,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { NodeID: nodeID, Listeners: []*v3listenerpb.Listener{listener1}, }); err != nil { - t.Error(err) + t.Fatal(err) } // Wait for lis2 to move to "not-serving" mode. @@ -283,7 +283,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh2: if mode != connectivity.ServingModeNotServing { - t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing) + t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing) } } @@ -298,7 +298,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { NodeID: nodeID, Listeners: []*v3listenerpb.Listener{}, }); err != nil { - t.Error(err) + t.Fatal(err) } // Wait for lis1 to move to "not-serving" mode. lis2 was already removed @@ -309,7 +309,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh1: if mode != connectivity.ServingModeNotServing { - t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing) + t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeNotServing) } } @@ -330,7 +330,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { NodeID: nodeID, Listeners: []*v3listenerpb.Listener{listener1, listener2}, }); err != nil { - t.Error(err) + t.Fatal(err) } // Wait for both listeners to move to "serving" mode. @@ -339,7 +339,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh1: if mode != connectivity.ServingModeServing { - t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) + t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) } } select { @@ -347,7 +347,7 @@ func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) { t.Fatalf("timed out waiting for a mode change update: %v", err) case mode := <-updateCh2: if mode != connectivity.ServingModeServing { - t.Errorf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) + t.Fatalf("listener received new mode %v, want %v", mode, connectivity.ServingModeServing) } } diff --git a/xds/server.go b/xds/server.go index 0319ddcaf533..8e2bde3477ee 100644 --- a/xds/server.go +++ b/xds/server.go @@ -291,11 +291,29 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) { drainServerTransports(gs, args.addr.String()) } } + if s.opts.modeCallback != nil { + // The XdsServer API will allow applications to register a "serving state" + // callback to be invoked when the server begins serving and when the + // server encounters errors that force it to be "not serving". If "not + // serving", the callback must be provided error information, for + // debugging use by developers - A36. s.opts.modeCallback(args.addr, ServingModeChangeArgs{ Mode: args.mode, Err: args.err, }) + } else { + // If the application does not register the callback, XdsServer should log + // any errors and each serving resumption after an error, all at a + // default-visible log level - A36. + // + // The default-visible log level for us is ERROR. + switch args.mode { + case connectivity.ServingModeServing: + s.logger.Errorf("Listener %q entering mode: %q", args.addr.String(), args.mode) + case connectivity.ServingModeNotServing: + s.logger.Errorf("Listener %q entering mode: %q due to error: %v", args.addr.String(), args.mode, args.err) + } } } } diff --git a/xds/server_test.go b/xds/server_test.go index e307beee754d..ac0c573fdef7 100644 --- a/xds/server_test.go +++ b/xds/server_test.go @@ -711,7 +711,18 @@ func (s) TestHandleListenerUpdate_NoXDSCreds(t *testing.T) { fs, clientCh, cleanup := setupOverrides() defer cleanup() - server := NewGRPCServer() + // Create a server option to get notified about serving mode changes. We don't + // do anything other than throwing a log entry here. But this is required, + // since the server code emits a log entry at the default level (which is + // ERROR) if no callback is registered for serving mode changes. Our + // testLogger fails the test if there is any log entry at ERROR level. It does + // provide an ExpectError() method, but that takes a string and it would be + // painful to construct the exact error message expected here. Instead this + // works just fine. + modeChangeOpt := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) { + t.Logf("Serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err) + }) + server := NewGRPCServer(modeChangeOpt) defer server.Stop() lis, err := testutils.LocalTCPListener() From f563be37c263eb710eef4a49c25cc336d2e4d7e8 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Thu, 10 Mar 2022 15:51:09 -0800 Subject: [PATCH 2/3] register a logging callback when user does not register mode change callback --- xds/server.go | 56 +++++++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 26 deletions(-) diff --git a/xds/server.go b/xds/server.go index 8e2bde3477ee..33c0fba0155d 100644 --- a/xds/server.go +++ b/xds/server.go @@ -105,10 +105,10 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer { s := &GRPCServer{ gs: newGRPCServer(newOpts...), quit: grpcsync.NewEvent(), - opts: handleServerOptions(opts), } s.logger = prefixLogger(s) s.logger.Infof("Created xds.GRPCServer") + s.handleServerOptions(opts) // We type assert our underlying gRPC server to the real grpc.Server here // before trying to retrieve the configured credentials. This approach @@ -128,14 +128,32 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer { // handleServerOptions iterates through the list of server options passed in by // the user, and handles the xDS server specific options. -func handleServerOptions(opts []grpc.ServerOption) *serverOptions { +func (s *GRPCServer) handleServerOptions(opts []grpc.ServerOption) { so := &serverOptions{} for _, opt := range opts { if o, ok := opt.(*serverOption); ok { o.apply(so) } } - return so + + // If the application did not register a mode change callback, the XdsServer + // registers its own which simply logs any errors and each serving resumption + // after an error, all at a default-visible log level, as per A36. The + // default-visible log level for us is ERROR. + // + // Note that this means that `s.opts.modeCallback` will never be nil and can + // safely be invoked directly from `handleServiceModeChanges`. + if so.modeCallback == nil { + so.modeCallback = func(addr net.Addr, args ServingModeChangeArgs) { + switch args.Mode { + case connectivity.ServingModeServing: + s.logger.Errorf("Listener %q entering mode: %q", addr.String(), args.Mode) + case connectivity.ServingModeNotServing: + s.logger.Errorf("Listener %q entering mode: %q due to error: %v", addr.String(), args.Mode, args.Err) + } + } + } + s.opts = so } // RegisterService registers a service and its implementation to the underlying @@ -292,29 +310,15 @@ func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) { } } - if s.opts.modeCallback != nil { - // The XdsServer API will allow applications to register a "serving state" - // callback to be invoked when the server begins serving and when the - // server encounters errors that force it to be "not serving". If "not - // serving", the callback must be provided error information, for - // debugging use by developers - A36. - s.opts.modeCallback(args.addr, ServingModeChangeArgs{ - Mode: args.mode, - Err: args.err, - }) - } else { - // If the application does not register the callback, XdsServer should log - // any errors and each serving resumption after an error, all at a - // default-visible log level - A36. - // - // The default-visible log level for us is ERROR. - switch args.mode { - case connectivity.ServingModeServing: - s.logger.Errorf("Listener %q entering mode: %q", args.addr.String(), args.mode) - case connectivity.ServingModeNotServing: - s.logger.Errorf("Listener %q entering mode: %q due to error: %v", args.addr.String(), args.mode, args.err) - } - } + // The XdsServer API will allow applications to register a "serving state" + // callback to be invoked when the server begins serving and when the + // server encounters errors that force it to be "not serving". If "not + // serving", the callback must be provided error information, for + // debugging use by developers - A36. + s.opts.modeCallback(args.addr, ServingModeChangeArgs{ + Mode: args.mode, + Err: args.err, + }) } } } From 925063f29398503e121a7c8bbba64aa775e12ead Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Wed, 16 Mar 2022 12:02:38 -0700 Subject: [PATCH 3/3] support default server options and make the logging callback a standalone function --- xds/server.go | 39 +++++++++++++++++++++------------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/xds/server.go b/xds/server.go index 33c0fba0155d..5ab8a5a98008 100644 --- a/xds/server.go +++ b/xds/server.go @@ -129,31 +129,34 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer { // handleServerOptions iterates through the list of server options passed in by // the user, and handles the xDS server specific options. func (s *GRPCServer) handleServerOptions(opts []grpc.ServerOption) { - so := &serverOptions{} + so := s.defaultServerOptions() for _, opt := range opts { if o, ok := opt.(*serverOption); ok { o.apply(so) } } + s.opts = so +} - // If the application did not register a mode change callback, the XdsServer - // registers its own which simply logs any errors and each serving resumption - // after an error, all at a default-visible log level, as per A36. The - // default-visible log level for us is ERROR. - // - // Note that this means that `s.opts.modeCallback` will never be nil and can - // safely be invoked directly from `handleServiceModeChanges`. - if so.modeCallback == nil { - so.modeCallback = func(addr net.Addr, args ServingModeChangeArgs) { - switch args.Mode { - case connectivity.ServingModeServing: - s.logger.Errorf("Listener %q entering mode: %q", addr.String(), args.Mode) - case connectivity.ServingModeNotServing: - s.logger.Errorf("Listener %q entering mode: %q due to error: %v", addr.String(), args.Mode, args.Err) - } - } +func (s *GRPCServer) defaultServerOptions() *serverOptions { + return &serverOptions{ + // A default serving mode change callback which simply logs at the + // default-visible log level. This will be used if the application does not + // register a mode change callback. + // + // Note that this means that `s.opts.modeCallback` will never be nil and can + // safely be invoked directly from `handleServingModeChanges`. + modeCallback: s.loggingServerModeChangeCallback, + } +} + +func (s *GRPCServer) loggingServerModeChangeCallback(addr net.Addr, args ServingModeChangeArgs) { + switch args.Mode { + case connectivity.ServingModeServing: + s.logger.Errorf("Listener %q entering mode: %q", addr.String(), args.Mode) + case connectivity.ServingModeNotServing: + s.logger.Errorf("Listener %q entering mode: %q due to error: %v", addr.String(), args.Mode, args.Err) } - s.opts = so } // RegisterService registers a service and its implementation to the underlying