From 30ae71a5adb229e0fad7785f632600763e986391 Mon Sep 17 00:00:00 2001 From: Deniz Onur Duzgun <59659739+dduzgun-security@users.noreply.github.com> Date: Wed, 15 May 2024 13:22:10 -0400 Subject: [PATCH] services: retry failed Nomad service deregistrations from client When the allocation is stopped, we deregister the service in the alloc runner's `PreKill` hook. This ensures we delete the service registration and wait for the shutdown delay before shutting down the tasks, so that workloads can drain their connections. However, the call to remove the workload only logs errors and never retries them. Add a short retry loop to the `RemoveWorkload` method for Nomad services, so that transient errors give us an extra opportunity to deregister the service before the tasks are stopped, before we need to fall back to the data integrity improvements implemented in #20590. Ref: https://github.com/hashicorp/nomad/issues/16616 --- client/serviceregistration/nsd/nsd.go | 78 +++++++++++++++++----- client/serviceregistration/nsd/nsd_test.go | 48 +++++++++---- 2 files changed, 96 insertions(+), 30 deletions(-) diff --git a/client/serviceregistration/nsd/nsd.go b/client/serviceregistration/nsd/nsd.go index 1ec34902caa0..45a5aaf14bc0 100644 --- a/client/serviceregistration/nsd/nsd.go +++ b/client/serviceregistration/nsd/nsd.go @@ -9,11 +9,13 @@ import ( "fmt" "strings" "sync" + "time" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/serviceregistration" "github.com/hashicorp/nomad/nomad/structs" + "oss.indeed.com/go/libtime/decay" ) type ServiceRegistrationHandler struct { @@ -34,6 +36,9 @@ type ServiceRegistrationHandler struct { // shutDownCh coordinates shutting down the handler and any long-running // processes, such as the RPC retry. shutDownCh chan struct{} + + backoffMax time.Duration + backoffInitial time.Duration } // ServiceRegistrationHandlerCfg holds critical information used during the @@ -62,6 +67,15 @@ type ServiceRegistrationHandlerCfg struct { // CheckWatcher watches checks of services in the Nomad service provider, // and restarts associated tasks in accordance with their check_restart block. CheckWatcher serviceregistration.CheckWatcher + + // BackoffMax is the maximum amont of time failed RemoveWorkload RPCs will + // be retried, defaults to 1s + BackoffMax time.Duration + + // BackoffInitial is the initial gap before retrying failed RemoveWorkload + // RPCs, defaults to 100ms. This will double each attempt until BackoffMax + // is reached + BackoffInitial time.Duration } // NewServiceRegistrationHandler returns a ready to use @@ -69,13 +83,23 @@ type ServiceRegistrationHandlerCfg struct { // interface. func NewServiceRegistrationHandler(log hclog.Logger, cfg *ServiceRegistrationHandlerCfg) serviceregistration.Handler { go cfg.CheckWatcher.Run(context.TODO()) - return &ServiceRegistrationHandler{ + + s := &ServiceRegistrationHandler{ cfg: cfg, log: log.Named("service_registration.nomad"), registrationEnabled: cfg.Enabled, checkWatcher: cfg.CheckWatcher, shutDownCh: make(chan struct{}), + backoffMax: cfg.BackoffMax, + backoffInitial: cfg.BackoffInitial, } + if s.backoffInitial == 0 { + s.backoffInitial = 100 * time.Millisecond + } + if s.backoffMax == 0 { + s.backoffMax = time.Second + } + return s } func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistration.WorkloadServices) error { @@ -183,26 +207,44 @@ func (s *ServiceRegistrationHandler) removeWorkload( var deleteResp structs.ServiceRegistrationDeleteByIDResponse - err := s.cfg.RPCFn(structs.ServiceRegistrationDeleteByIDRPCMethod, &deleteArgs, &deleteResp) - if err == nil { - return + backoffOpts := decay.BackoffOptions{ + MaxSleepTime: s.backoffMax, + InitialGapSize: s.backoffInitial, } + backoffErr := decay.Backoff(func() (bool, error) { - // The Nomad API exposes service registration deletion to handle - // orphaned service registrations. In the event a service is removed - // accidentally that is still running, we will hit this error when we - // eventually want to remove it. We therefore want to handle this, - // while ensuring the operator can see. - if strings.Contains(err.Error(), "service registration not found") { - s.log.Info("attempted to delete non-existent service registration", - "service_id", id, "namespace", workload.ProviderNamespace) - return - } + select { + case <-s.shutDownCh: + return true, nil + default: + } + + err := s.cfg.RPCFn(structs.ServiceRegistrationDeleteByIDRPCMethod, + &deleteArgs, &deleteResp) + if err == nil { + return false, nil + } - // Log the error as there is nothing left to do, so the operator can see it - // and identify any problems. - s.log.Error("failed to delete service registration", - "error", err, "service_id", id, "namespace", workload.ProviderNamespace) + // The Nomad API exposes service registration deletion to handle + // orphaned service registrations. In the event a service is removed + // accidentally that is still running, we will hit this error when we + // eventually want to remove it. We therefore want to handle this, + // while ensuring the operator can see. + if strings.Contains(err.Error(), "service registration not found") { + s.log.Info("attempted to delete non-existent service registration", + "service_id", id, "namespace", workload.ProviderNamespace) + return false, nil + } + + return true, err + }, backoffOpts) + + if backoffErr != nil { + // Log the error as there is nothing left to do, so the operator can see + // it and identify any problems. + s.log.Error("failed to delete service registration", + "error", backoffErr, "service_id", id, "namespace", workload.ProviderNamespace) + } } func (s *ServiceRegistrationHandler) UpdateWorkload(old, new *serviceregistration.WorkloadServices) error { diff --git a/client/serviceregistration/nsd/nsd_test.go b/client/serviceregistration/nsd/nsd_test.go index 7210eb913afb..e76d11fb4b94 100644 --- a/client/serviceregistration/nsd/nsd_test.go +++ b/client/serviceregistration/nsd/nsd_test.go @@ -13,8 +13,10 @@ import ( "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/serviceregistration" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" "github.com/shoenig/test" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -110,6 +112,7 @@ func TestServiceRegistrationHandler_RemoveWorkload(t *testing.T) { name string inputCfg *ServiceRegistrationHandlerCfg inputWorkload *serviceregistration.WorkloadServices + returnedDeleteErr error expectedRPCs map[string]int expectedError error expWatch, expUnWatch int @@ -138,26 +141,39 @@ func TestServiceRegistrationHandler_RemoveWorkload(t *testing.T) { expWatch: 0, expUnWatch: 2, }, + { + name: "failed deregister", + inputCfg: &ServiceRegistrationHandlerCfg{ + Enabled: true, + CheckWatcher: new(mockCheckWatcher), + BackoffMax: 75 * time.Millisecond, + BackoffInitial: 50 * time.Millisecond, + }, + inputWorkload: mockWorkload(), + returnedDeleteErr: errors.New("unrecoverable error"), + expectedRPCs: map[string]int{structs.ServiceRegistrationDeleteByIDRPCMethod: 4}, + expectedError: nil, + expWatch: 0, + expUnWatch: 2, + }, } - // Create a logger we can use for all tests. - log := hclog.NewNullLogger() - for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { // Add the mock RPC functionality. - mockRPC := mockRPC{callCounts: map[string]int{}} + mockRPC := mockRPC{ + callCounts: map[string]int{}, + deleteResponseErr: tc.returnedDeleteErr, + } tc.inputCfg.RPCFn = mockRPC.RPC // Create the handler and run the tests. - h := NewServiceRegistrationHandler(log, tc.inputCfg) + h := NewServiceRegistrationHandler(testlog.HCLogger(t), tc.inputCfg) h.RemoveWorkload(tc.inputWorkload) - require.Eventually(t, func() bool { - return assert.Equal(t, tc.expectedRPCs, mockRPC.calls()) - }, 100*time.Millisecond, 10*time.Millisecond) + must.Eq(t, tc.expectedRPCs, mockRPC.calls()) tc.inputCfg.CheckWatcher.(*mockCheckWatcher).assert(t, tc.expWatch, tc.expUnWatch) }) } @@ -647,6 +663,9 @@ type mockRPC struct { // lock should be used to access this. callCounts map[string]int l sync.RWMutex + + deleteResponseErr error + upsertResponseErr error } // calls returns the mapping counting the number of calls made to each RPC @@ -659,12 +678,17 @@ func (mr *mockRPC) calls() map[string]int { // RPC mocks the server RPCs, acting as though any request succeeds. func (mr *mockRPC) RPC(method string, _, _ interface{}) error { + mr.l.Lock() + defer mr.l.Unlock() + switch method { - case structs.ServiceRegistrationUpsertRPCMethod, structs.ServiceRegistrationDeleteByIDRPCMethod: - mr.l.Lock() + case structs.ServiceRegistrationUpsertRPCMethod: + mr.callCounts[method]++ + return mr.upsertResponseErr + + case structs.ServiceRegistrationDeleteByIDRPCMethod: mr.callCounts[method]++ - mr.l.Unlock() - return nil + return mr.deleteResponseErr default: return fmt.Errorf("unexpected RPC method: %v", method) }