Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/14908.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
cache: prevent goroutine leak in agent cache
```
81 changes: 73 additions & 8 deletions agent/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ type Cache struct {
entries map[string]cacheEntry
entriesExpiryHeap *ttlcache.ExpiryHeap

fetchLock sync.Mutex
lastFetchID uint64
fetchHandles map[string]fetchHandle

// stopped is used as an atomic flag to signal that the Cache has been
// discarded so background fetches and expiry processing should stop.
stopped uint32
Expand All @@ -130,6 +134,11 @@ type Cache struct {
rateLimitCancel context.CancelFunc
}

type fetchHandle struct {
id uint64
stopCh chan struct{}
}

// typeEntry is a single type that is registered with a Cache.
type typeEntry struct {
// Name that was used to register the Type
Expand Down Expand Up @@ -205,6 +214,7 @@ func New(options Options) *Cache {
types: make(map[string]typeEntry),
entries: make(map[string]cacheEntry),
entriesExpiryHeap: ttlcache.NewExpiryHeap(),
fetchHandles: make(map[string]fetchHandle),
stopCh: make(chan struct{}),
options: options,
rateLimitContext: ctx,
Expand Down Expand Up @@ -590,8 +600,18 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
metrics.SetGauge([]string{"consul", "cache", "entries_count"}, float32(len(c.entries)))

tEntry := r.TypeEntry
// The actual Fetch must be performed in a goroutine.
go func() {

// The actual Fetch must be performed in a goroutine. Ensure that we only
// have one in-flight at a time, but don't use a deferred
// context.WithCancel style termination so that these things outlive their
// requester.
//
// By the time we get here the system WANTS to make a replacement fetcher, so
// we terminate the prior one and replace it.
handle := c.getOrReplaceFetchHandle(key)
go func(handle fetchHandle) {
defer c.deleteFetchHandle(key, handle.id)

// If we have background refresh and currently are in "disconnected" state,
// waiting for a response might mean we mark our results as stale for up to
// 10 minutes (max blocking timeout) after connection is restored. To reduce
Expand Down Expand Up @@ -642,6 +662,14 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
connectedTimer.Stop()
}

// If we were stopped while waiting on a blocking query now would be a
// good time to detect that.
select {
case <-handle.stopCh:
return
default:
}

// Copy the existing entry to start.
newEntry := entry
newEntry.Fetching = false
Expand Down Expand Up @@ -792,13 +820,15 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
}

// If we're over the attempt minimum, start an exponential backoff.
if wait := backOffWait(attempt); wait > 0 {
time.Sleep(wait)
}
wait := backOffWait(attempt)

// If we have a timer, wait for it
if tEntry.Opts.RefreshTimer > 0 {
time.Sleep(tEntry.Opts.RefreshTimer)
wait += tEntry.Opts.RefreshTimer

select {
case <-time.After(wait):
case <-handle.stopCh:
return
}

// Trigger. The "allowNew" field is false because in the time we were
Expand All @@ -808,11 +838,46 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
r.Info.MinIndex = 0
c.fetch(key, r, false, attempt, true)
}
}()
}(handle)

return entry.Waiter
}

func (c *Cache) getOrReplaceFetchHandle(key string) fetchHandle {
c.fetchLock.Lock()
defer c.fetchLock.Unlock()

if prevHandle, ok := c.fetchHandles[key]; ok {
close(prevHandle.stopCh)
}

c.lastFetchID++

handle := fetchHandle{
id: c.lastFetchID,
stopCh: make(chan struct{}),
}

c.fetchHandles[key] = handle

return handle
}

func (c *Cache) deleteFetchHandle(key string, fetchID uint64) {
c.fetchLock.Lock()
defer c.fetchLock.Unlock()

// Only remove a fetchHandle if it's YOUR fetchHandle.
handle, ok := c.fetchHandles[key]
if !ok {
return
}

if handle.id == fetchID {
delete(c.fetchHandles, key)
}
}

func backOffWait(failures uint) time.Duration {
if failures > CacheRefreshBackoffMin {
shift := failures - CacheRefreshBackoffMin
Expand Down
33 changes: 18 additions & 15 deletions agent/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/lib/ttlcache"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
)

// Test a basic Get with no indexes (and therefore no blocking queries).
Expand Down Expand Up @@ -1750,12 +1751,22 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
require.NoError(t, err)
require.Equal(t, true, result)

waitUntilFetching := func(expectValue bool) {
retry.Run(t, func(t *retry.R) {
c.entriesLock.Lock()
defer c.entriesLock.Unlock()
entry, ok := c.entries[key]
require.True(t, ok)
if expectValue {
require.True(t, entry.Fetching)
} else {
require.False(t, entry.Fetching)
}
})
}

// ensure that the entry is fetching again
c.entriesLock.Lock()
entry, ok := c.entries[key]
require.True(t, ok)
require.True(t, entry.Fetching)
c.entriesLock.Unlock()
waitUntilFetching(true)

requestChan := make(chan error)

Expand Down Expand Up @@ -1789,11 +1800,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) {
}

// ensure that the entry is fetching again
c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.True(t, entry.Fetching)
c.entriesLock.Unlock()
waitUntilFetching(true)

// background a call that will wait for a newer version - will result in an acl not found error
go getError(5)
Expand All @@ -1814,11 +1821,7 @@ func TestCache_RefreshLifeCycle(t *testing.T) {

// ensure that the ACL not found error killed off the background refresh
// but didn't remove it from the cache
c.entriesLock.Lock()
entry, ok = c.entries[key]
require.True(t, ok)
require.False(t, entry.Fetching)
c.entriesLock.Unlock()
waitUntilFetching(false)
}

type fakeType struct {
Expand Down