Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/services/local/presence.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type backendItemToResourceFunc func(item backend.Item) (types.ResourceWithLabels
func NewPresenceService(b backend.Backend) *PresenceService {
return &PresenceService{
log: logrus.WithFields(logrus.Fields{trace.Component: "Presence"}),
jitter: utils.NewJitter(),
jitter: utils.NewFullJitter(),
Backend: b,
}
}
Expand Down Expand Up @@ -799,10 +799,10 @@ func (s *PresenceService) DeleteAllRemoteClusters() error {
}

// this combination of backoff parameters leads to worst-case total time spent
// in backoff between 1200ms and 2400ms depending on jitter. tests are in
// in backoff between 1ms and 2000ms depending on jitter. tests are in
// place to verify that this is sufficient to resolve a 20-lease contention
// event, which is worse than should ever occur in practice.
const baseBackoff = time.Millisecond * 300
const baseBackoff = time.Millisecond * 400
const leaseRetryAttempts int64 = 6

// AcquireSemaphore attempts to acquire the specified semaphore. AcquireSemaphore will automatically handle
Expand Down
10 changes: 4 additions & 6 deletions lib/services/local/services_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ import (
"context"
"os"
"testing"
"time"

"github.com/gravitational/teleport/lib/backend"
"github.com/gravitational/teleport/lib/backend/lite"
"github.com/gravitational/teleport/lib/backend/memory"
"github.com/gravitational/teleport/lib/services/suite"
"github.com/gravitational/teleport/lib/utils"
"github.com/jonboulle/clockwork"
Expand All @@ -48,10 +47,9 @@ func (s *ServicesSuite) SetUpTest(c *check.C) {

clock := clockwork.NewFakeClock()

s.bk, err = lite.NewWithConfig(ctx, lite.Config{
Path: c.MkDir(),
PollStreamPeriod: 200 * time.Millisecond,
Clock: clock,
s.bk, err = memory.New(memory.Config{
Context: ctx,
Clock: clock,
})
c.Assert(err, check.IsNil)

Expand Down
13 changes: 7 additions & 6 deletions lib/services/suite/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"crypto/x509"
"crypto/x509/pkix"
"encoding/base64"
"fmt"
"sort"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -1280,24 +1281,24 @@ func (s *ServicesTestSuite) SemaphoreContention(c *check.C) {
Expiry: time.Hour,
Params: types.AcquireSemaphoreRequest{
SemaphoreKind: types.SemaphoreKindConnection,
SemaphoreName: "alice",
SemaphoreName: fmt.Sprintf("sem-%d", i), // avoid overlap between iterations
MaxLeases: locks,
},
}
// we leak lock handles in the spawned goroutines, so
// context-based cancellation is needed to cleanup the
// background keepalive activity.
cancelCtx, cancel := context.WithCancel(ctx)
var wg sync.WaitGroup
acquireErrs := make(chan error, locks)
for i := int64(0); i < locks; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_, err := services.AcquireSemaphoreLock(cancelCtx, cfg)
c.Assert(err, check.IsNil)
acquireErrs <- err
}()
}
wg.Wait()
for i := int64(0); i < locks; i++ {
c.Assert(<-acquireErrs, check.IsNil)
}
cancel()
c.Assert(s.PresenceS.DeleteSemaphore(ctx, types.SemaphoreFilter{
SemaphoreKind: cfg.Params.SemaphoreKind,
Expand Down
20 changes: 20 additions & 0 deletions lib/utils/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,26 @@ func NewSeventhJitter() Jitter {
}
}

// NewFullJitter builds a new jitter on the range (0,n]. Most use-cases
// are better served by a jitter with a meaningful minimum value, but if
// the *only* purpose of the jitter is to spread out retries to the greatest
// extent possible (e.g. when retrying a CompareAndSwap operation), a full jitter
// may be appropriate.
func NewFullJitter() Jitter {
var mu sync.Mutex
rng := rand.New(rand.NewSource(time.Now().UnixNano()))
return func(d time.Duration) time.Duration {
// values less than 1 cause rng to panic, and some logic
// relies on treating zero duration as non-blocking case.
if d < 1 {
return 0
}
mu.Lock()
defer mu.Unlock()
return time.Duration(1) + time.Duration(rng.Int63n(int64(d)))
}
}

// Retry is an interface that provides retry logic
type Retry interface {
// Reset resets retry state
Expand Down