diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index f0f86d5d99..edfbd603db 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -50,7 +50,7 @@ type LinearCache struct { deltaWatches map[int64]DeltaResponseWatch // Continuously incremented counter used to index delta watches. deltaWatchCount int64 - // versionMap holds the current hash map of all resources in the cache. + // versionMap holds the current hash map of all resources in the cache when delta watches are present. // versionMap is only to be used with delta xDS. versionMap map[string]string // Continuously incremented version. @@ -103,7 +103,7 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { watches: make(map[string]watches), watchAll: make(watches), deltaWatches: make(map[int64]DeltaResponseWatch), - versionMap: make(map[string]string), + versionMap: nil, version: 0, versionVector: make(map[string]uint64), } @@ -154,15 +154,19 @@ func (cache *LinearCache) notifyAll(modified map[string]struct{}) { } cache.watchAll = make(watches) - err := cache.updateVersionMap(modified) - if err != nil { - cache.log.Errorf("failed to update version map: %v", err) - } + // Building the version map has a very high cost when using SetResources to do full updates. + // As it is only used with delta watches, it is only maintained when applicable. + if cache.versionMap != nil { + err := cache.updateVersionMap(modified) + if err != nil { + cache.log.Errorf("failed to update version map: %v", err) + } - for id, watch := range cache.deltaWatches { - res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState) - if res != nil { - delete(cache.deltaWatches, id) + for id, watch := range cache.deltaWatches { + res := cache.respondDelta(watch.Request, watch.Response, watch.StreamState) + if res != nil { + delete(cache.deltaWatches, id) + } } } } @@ -366,6 +370,18 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S cache.mu.Lock() defer cache.mu.Unlock() + if cache.versionMap == nil { + // If we had no delta watch before, we need to build the version map for the first time. + // The map will not be removed when the last delta watch is removed to avoid rebuilding it constantly when only a few delta watches are applicable. + modified := map[string]struct{}{} + for name := range cache.resources { + modified[name] = struct{}{} + } + err := cache.updateVersionMap(modified) + if err != nil && cache.log != nil { + cache.log.Errorf("failed to update version map: %v", err) + } + } response := cache.respondDelta(request, value, state) // if respondDelta returns nil this means that there is no change in any resource version @@ -386,9 +402,14 @@ func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.S } func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { - for name, r := range cache.resources { - // skip recalculating hash for the resoces that weren't modified - if _, ok := modified[name]; !ok { + if cache.versionMap == nil { + cache.versionMap = make(map[string]string, len(modified)) + } + for name := range modified { + r, ok := cache.resources[name] + if !ok { + // The resource was deleted + delete(cache.versionMap, name) continue } // hash our version in here and build the version map @@ -401,12 +422,7 @@ func (cache *LinearCache) updateVersionMap(modified map[string]struct{}) error { return errors.New("failed to build resource version") } - cache.versionMap[GetResourceName(r)] = v - } - for name := range modified { - if r, ok := cache.resources[name]; !ok { - delete(cache.versionMap, GetResourceName(r)) - } + cache.versionMap[name] = v } return nil } @@ -432,6 +448,14 @@ func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response return nil, errors.New("not implemented") } +// Number of resources currently on the cache. +// As GetResources is building a clone it is expensive to get metrics otherwise. +func (cache *LinearCache) NumResources() int { + cache.mu.RLock() + defer cache.mu.RUnlock() + return len(cache.resources) +} + // Number of active watches for a resource name. func (cache *LinearCache) NumWatches(name string) int { cache.mu.RLock() diff --git a/pkg/cache/v3/linear_test.go b/pkg/cache/v3/linear_test.go index 4e5860404c..92742bb749 100644 --- a/pkg/cache/v3/linear_test.go +++ b/pkg/cache/v3/linear_test.go @@ -142,6 +142,22 @@ func checkDeltaWatchCount(t *testing.T, c *LinearCache, count int) { } } +func checkVersionMapNotSet(t *testing.T, c *LinearCache) { + t.Helper() + if c.versionMap != nil { + t.Errorf("version map is set on the cache with %d elements", len(c.versionMap)) + } +} + +func checkVersionMapSet(t *testing.T, c *LinearCache) { + t.Helper() + if c.versionMap == nil { + t.Errorf("version map is not set on the cache") + } else if len(c.versionMap) != len(c.resources) { + t.Errorf("version map has the wrong number of elements: %d instead of %d expected", len(c.versionMap), len(c.resources)) + } +} + func mustBlock(t *testing.T, w <-chan Response) { select { case <-w: @@ -186,6 +202,7 @@ func TestLinearInitialResources(t *testing.T) { verifyResponse(t, w, "0", 1) c.CreateWatch(&Request{TypeUrl: testType}, streamState, w) verifyResponse(t, w, "0", 2) + checkVersionMapNotSet(t, c) } func TestLinearCornerCases(t *testing.T) { @@ -216,6 +233,7 @@ func TestLinearBasic(t *testing.T) { w1 := make(chan Response, 1) c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, streamState, w1) mustBlock(t, w1) + checkVersionMapNotSet(t, c) w := make(chan Response, 1) c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) @@ -243,6 +261,8 @@ func TestLinearBasic(t *testing.T) { verifyResponse(t, w, "3", 1) c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, streamState, w) verifyResponse(t, w, "3", 2) + // Ensure the version map was not created as we only ever used stow watches + checkVersionMapNotSet(t, c) } func TestLinearSetResources(t *testing.T) { @@ -520,12 +540,15 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { hashB := hashResource(t, b) err = c.UpdateResource("b", b) assert.NoError(t, err) + // There is currently no delta watch + checkVersionMapNotSet(t, c) state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""}) w := make(chan DeltaResponse, 1) c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) checkDeltaWatchCount(t, c, 0) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"a", hashA}}, nil) + checkVersionMapSet(t, c) state = stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) w = make(chan DeltaResponse, 1) @@ -540,6 +563,7 @@ func TestLinearDeltaResourceUpdate(t *testing.T) { err = c.UpdateResource("a", a) assert.NoError(t, err) verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, nil) + checkVersionMapSet(t, c) } func TestLinearDeltaResourceDelete(t *testing.T) { @@ -578,11 +602,15 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { state := stream.NewStreamState(false, map[string]string{"a": "", "b": ""}) w := make(chan DeltaResponse, 1) + checkVersionMapNotSet(t, c) + assert.Equal(t, 0, c.NumResources()) // Initial update c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, state, w) mustBlockDelta(t, w) checkDeltaWatchCount(t, c, 1) + // The version map should now be created, even if empty + checkVersionMapSet(t, c) a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} hashA := hashResource(t, a) b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} @@ -591,6 +619,8 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { assert.NoError(t, err) resp := <-w validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) + checkVersionMapSet(t, c) + assert.Equal(t, 2, c.NumResources()) state.SetResourceVersions(resp.GetNextVersionMap()) // Multiple updates @@ -609,6 +639,8 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { assert.NoError(t, err) resp = <-w validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}, {"b", hashB}}, nil) + checkVersionMapSet(t, c) + assert.Equal(t, 2, c.NumResources()) state.SetResourceVersions(resp.GetNextVersionMap()) // Update/add/delete @@ -626,6 +658,8 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { assert.NotContains(t, c.resources, "b", "resource with name b was found in cache") resp = <-w validateDeltaResponse(t, resp, []resourceInfo{{"a", hashA}}, []string{"b"}) + checkVersionMapSet(t, c) + assert.Equal(t, 2, c.NumResources()) state.SetResourceVersions(resp.GetNextVersionMap()) // Re-add previously deleted watched resource @@ -640,6 +674,8 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { assert.NotContains(t, c.resources, "d", "resource with name d was found in cache") resp = <-w validateDeltaResponse(t, resp, []resourceInfo{{"b", hashB}}, nil) // d is not watched and should not be returned + checkVersionMapSet(t, c) + assert.Equal(t, 2, c.NumResources()) state.SetResourceVersions(resp.GetNextVersionMap()) // Wildcard create/update @@ -655,6 +691,8 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { err = c.UpdateResources(map[string]types.Resource{"b": b, "d": d}, nil) assert.NoError(t, err) verifyDeltaResponse(t, w, []resourceInfo{{"b", hashB}, {"d", hashD}}, nil) + checkVersionMapSet(t, c) + assert.Equal(t, 3, c.NumResources()) // Wildcard update/delete createWildcardDeltaWatch(c, w) @@ -668,4 +706,57 @@ func TestLinearDeltaMultiResourceUpdates(t *testing.T) { assert.NoError(t, err) assert.NotContains(t, c.resources, "d", "resource with name d was found in cache") verifyDeltaResponse(t, w, []resourceInfo{{"a", hashA}}, []string{"d"}) + + checkDeltaWatchCount(t, c, 0) + // Confirm that the map is still set even though there is currently no watch + checkVersionMapSet(t, c) + assert.Equal(t, 2, c.NumResources()) +} + +func TestLinearMixedWatches(t *testing.T) { + c := NewLinearCache(testType) + a := &endpoint.ClusterLoadAssignment{ClusterName: "a"} + err := c.UpdateResource("a", a) + assert.NoError(t, err) + b := &endpoint.ClusterLoadAssignment{ClusterName: "b"} + hashB := hashResource(t, b) + err = c.UpdateResource("b", b) + assert.NoError(t, err) + assert.Equal(t, 2, c.NumResources()) + + sotwState := stream.NewStreamState(false, nil) + w := make(chan Response, 1) + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + mustBlock(t, w) + checkVersionMapNotSet(t, c) + + a = &endpoint.ClusterLoadAssignment{ClusterName: "a", Endpoints: []*endpoint.LocalityLbEndpoints{ //resource update + {Priority: 25}, + }} + hashA := hashResource(t, a) + err = c.UpdateResources(map[string]types.Resource{"a": a}, nil) + assert.NoError(t, err) + // This behavior is currently invalid for cds and lds, but due to a current limitation of linear cache sotw implementation + verifyResponse(t, w, c.getVersion(), 1) + checkVersionMapNotSet(t, c) + + c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: c.getVersion()}, sotwState, w) + mustBlock(t, w) + checkVersionMapNotSet(t, c) + + deltaState := stream.NewStreamState(false, map[string]string{"a": hashA, "b": hashB}) + wd := make(chan DeltaResponse, 1) + + // Initial update + c.CreateDeltaWatch(&DeltaRequest{TypeUrl: testType}, deltaState, wd) + mustBlockDelta(t, wd) + checkDeltaWatchCount(t, c, 1) + checkVersionMapSet(t, c) + + err = c.UpdateResources(nil, []string{"b"}) + assert.NoError(t, err) + checkVersionMapSet(t, c) + + verifyResponse(t, w, c.getVersion(), 0) + verifyDeltaResponse(t, wd, nil, []string{"b"}) }