Skip to content

Commit

Permalink
services: retry failed Nomad service deregistrations from client
Browse files Browse the repository at this point in the history
When the allocation is stopped, we deregister the service in the alloc runner's
`PreKill` hook. This ensures we delete the service registration and wait for the
shutdown delay before shutting down the tasks, so that workloads can drain their
connections. However, the call to remove the workload only logs errors and never
retries them.

Add a short retry loop to the `RemoveWorkload` method for Nomad services, so
that transient errors give us an extra opportunity to deregister the service
before the tasks are stopped, before we need to fall back to the data integrity
improvements implemented in #20590.

Ref: #16616
  • Loading branch information
dduzgun-security authored and tgross committed May 15, 2024
1 parent 1cc99cc commit 30ae71a
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 30 deletions.
78 changes: 60 additions & 18 deletions client/serviceregistration/nsd/nsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ import (
"fmt"
"strings"
"sync"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/nomad/structs"
"oss.indeed.com/go/libtime/decay"
)

type ServiceRegistrationHandler struct {
Expand All @@ -34,6 +36,9 @@ type ServiceRegistrationHandler struct {
// shutDownCh coordinates shutting down the handler and any long-running
// processes, such as the RPC retry.
shutDownCh chan struct{}

backoffMax time.Duration
backoffInitial time.Duration
}

// ServiceRegistrationHandlerCfg holds critical information used during the
Expand Down Expand Up @@ -62,20 +67,39 @@ type ServiceRegistrationHandlerCfg struct {
// CheckWatcher watches checks of services in the Nomad service provider,
// and restarts associated tasks in accordance with their check_restart block.
CheckWatcher serviceregistration.CheckWatcher

// BackoffMax is the maximum amont of time failed RemoveWorkload RPCs will
// be retried, defaults to 1s
BackoffMax time.Duration

// BackoffInitial is the initial gap before retrying failed RemoveWorkload
// RPCs, defaults to 100ms. This will double each attempt until BackoffMax
// is reached
BackoffInitial time.Duration
}

// NewServiceRegistrationHandler returns a ready to use
// ServiceRegistrationHandler which implements the serviceregistration.Handler
// interface.
func NewServiceRegistrationHandler(log hclog.Logger, cfg *ServiceRegistrationHandlerCfg) serviceregistration.Handler {
go cfg.CheckWatcher.Run(context.TODO())
return &ServiceRegistrationHandler{

s := &ServiceRegistrationHandler{
cfg: cfg,
log: log.Named("service_registration.nomad"),
registrationEnabled: cfg.Enabled,
checkWatcher: cfg.CheckWatcher,
shutDownCh: make(chan struct{}),
backoffMax: cfg.BackoffMax,
backoffInitial: cfg.BackoffInitial,
}
if s.backoffInitial == 0 {
s.backoffInitial = 100 * time.Millisecond
}
if s.backoffMax == 0 {
s.backoffMax = time.Second
}
return s
}

func (s *ServiceRegistrationHandler) RegisterWorkload(workload *serviceregistration.WorkloadServices) error {
Expand Down Expand Up @@ -183,26 +207,44 @@ func (s *ServiceRegistrationHandler) removeWorkload(

var deleteResp structs.ServiceRegistrationDeleteByIDResponse

err := s.cfg.RPCFn(structs.ServiceRegistrationDeleteByIDRPCMethod, &deleteArgs, &deleteResp)
if err == nil {
return
backoffOpts := decay.BackoffOptions{
MaxSleepTime: s.backoffMax,
InitialGapSize: s.backoffInitial,
}
backoffErr := decay.Backoff(func() (bool, error) {

// The Nomad API exposes service registration deletion to handle
// orphaned service registrations. In the event a service is removed
// accidentally that is still running, we will hit this error when we
// eventually want to remove it. We therefore want to handle this,
// while ensuring the operator can see.
if strings.Contains(err.Error(), "service registration not found") {
s.log.Info("attempted to delete non-existent service registration",
"service_id", id, "namespace", workload.ProviderNamespace)
return
}
select {
case <-s.shutDownCh:
return true, nil
default:
}

err := s.cfg.RPCFn(structs.ServiceRegistrationDeleteByIDRPCMethod,
&deleteArgs, &deleteResp)
if err == nil {
return false, nil
}

// Log the error as there is nothing left to do, so the operator can see it
// and identify any problems.
s.log.Error("failed to delete service registration",
"error", err, "service_id", id, "namespace", workload.ProviderNamespace)
// The Nomad API exposes service registration deletion to handle
// orphaned service registrations. In the event a service is removed
// accidentally that is still running, we will hit this error when we
// eventually want to remove it. We therefore want to handle this,
// while ensuring the operator can see.
if strings.Contains(err.Error(), "service registration not found") {
s.log.Info("attempted to delete non-existent service registration",
"service_id", id, "namespace", workload.ProviderNamespace)
return false, nil
}

return true, err
}, backoffOpts)

if backoffErr != nil {
// Log the error as there is nothing left to do, so the operator can see
// it and identify any problems.
s.log.Error("failed to delete service registration",
"error", backoffErr, "service_id", id, "namespace", workload.ProviderNamespace)
}
}

func (s *ServiceRegistrationHandler) UpdateWorkload(old, new *serviceregistration.WorkloadServices) error {
Expand Down
48 changes: 36 additions & 12 deletions client/serviceregistration/nsd/nsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ import (

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/serviceregistration"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/shoenig/test"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -110,6 +112,7 @@ func TestServiceRegistrationHandler_RemoveWorkload(t *testing.T) {
name string
inputCfg *ServiceRegistrationHandlerCfg
inputWorkload *serviceregistration.WorkloadServices
returnedDeleteErr error
expectedRPCs map[string]int
expectedError error
expWatch, expUnWatch int
Expand Down Expand Up @@ -138,26 +141,39 @@ func TestServiceRegistrationHandler_RemoveWorkload(t *testing.T) {
expWatch: 0,
expUnWatch: 2,
},
{
name: "failed deregister",
inputCfg: &ServiceRegistrationHandlerCfg{
Enabled: true,
CheckWatcher: new(mockCheckWatcher),
BackoffMax: 75 * time.Millisecond,
BackoffInitial: 50 * time.Millisecond,
},
inputWorkload: mockWorkload(),
returnedDeleteErr: errors.New("unrecoverable error"),
expectedRPCs: map[string]int{structs.ServiceRegistrationDeleteByIDRPCMethod: 4},
expectedError: nil,
expWatch: 0,
expUnWatch: 2,
},
}

// Create a logger we can use for all tests.
log := hclog.NewNullLogger()

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {

// Add the mock RPC functionality.
mockRPC := mockRPC{callCounts: map[string]int{}}
mockRPC := mockRPC{
callCounts: map[string]int{},
deleteResponseErr: tc.returnedDeleteErr,
}
tc.inputCfg.RPCFn = mockRPC.RPC

// Create the handler and run the tests.
h := NewServiceRegistrationHandler(log, tc.inputCfg)
h := NewServiceRegistrationHandler(testlog.HCLogger(t), tc.inputCfg)

h.RemoveWorkload(tc.inputWorkload)

require.Eventually(t, func() bool {
return assert.Equal(t, tc.expectedRPCs, mockRPC.calls())
}, 100*time.Millisecond, 10*time.Millisecond)
must.Eq(t, tc.expectedRPCs, mockRPC.calls())
tc.inputCfg.CheckWatcher.(*mockCheckWatcher).assert(t, tc.expWatch, tc.expUnWatch)
})
}
Expand Down Expand Up @@ -647,6 +663,9 @@ type mockRPC struct {
// lock should be used to access this.
callCounts map[string]int
l sync.RWMutex

deleteResponseErr error
upsertResponseErr error
}

// calls returns the mapping counting the number of calls made to each RPC
Expand All @@ -659,12 +678,17 @@ func (mr *mockRPC) calls() map[string]int {

// RPC mocks the server RPCs, acting as though any request succeeds.
func (mr *mockRPC) RPC(method string, _, _ interface{}) error {
mr.l.Lock()
defer mr.l.Unlock()

switch method {
case structs.ServiceRegistrationUpsertRPCMethod, structs.ServiceRegistrationDeleteByIDRPCMethod:
mr.l.Lock()
case structs.ServiceRegistrationUpsertRPCMethod:
mr.callCounts[method]++
return mr.upsertResponseErr

case structs.ServiceRegistrationDeleteByIDRPCMethod:
mr.callCounts[method]++
mr.l.Unlock()
return nil
return mr.deleteResponseErr
default:
return fmt.Errorf("unexpected RPC method: %v", method)
}
Expand Down

0 comments on commit 30ae71a

Please sign in to comment.