Skip to content

Commit 5da3c8f

Browse files
committed
Implement tracking of known resource names by caller stream. Fixes envoyproxy#399
The change is implemented simillary to Java implementation. The control plane keeps track of resource names that a caller knows already. Signed-off-by: Konstantin Kalin <[email protected]>
1 parent 151bc0c commit 5da3c8f

File tree

8 files changed

+176
-53
lines changed

8 files changed

+176
-53
lines changed

pkg/cache/v3/cache.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -48,10 +48,11 @@ type ConfigWatcher interface {
4848
// An individual consumer normally issues a single open watch by each type URL.
4949
//
5050
// The provided channel produces requested resources as responses, once they are available.
51+
// The provided map contains resources already known to the caller
5152
//
5253
// Cancel is an optional function to release resources in the producer. If
5354
// provided, the consumer may call this function multiple times.
54-
CreateWatch(*Request, chan Response) (cancel func())
55+
CreateWatch(*Request, chan Response, map[string]struct{}) (cancel func())
5556

5657
// CreateDeltaWatch returns a new open incremental xDS watch.
5758
//

pkg/cache/v3/linear.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ func (cache *LinearCache) GetResources() map[string]types.Resource {
253253
return cache.resources
254254
}
255255

256-
func (cache *LinearCache) CreateWatch(request *Request, value chan Response) func() {
256+
func (cache *LinearCache) CreateWatch(request *Request, value chan Response, knownResourceNames map[string]struct{}) func() {
257257
if request.TypeUrl != cache.typeURL {
258258
value <- nil
259259
return nil

pkg/cache/v3/linear_test.go

+30-30
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,9 @@ func hashResource(t *testing.T, resource types.Resource) string {
160160
func TestLinearInitialResources(t *testing.T) {
161161
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
162162
w := make(chan Response, 1)
163-
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, w)
163+
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType}, w, nil)
164164
verifyResponse(t, w, "0", 1)
165-
c.CreateWatch(&Request{TypeUrl: testType}, w)
165+
c.CreateWatch(&Request{TypeUrl: testType}, w, nil)
166166
verifyResponse(t, w, "0", 2)
167167
}
168168

@@ -174,7 +174,7 @@ func TestLinearCornerCases(t *testing.T) {
174174
}
175175
// create an incorrect type URL request
176176
w := make(chan Response, 1)
177-
c.CreateWatch(&Request{TypeUrl: "test"}, w)
177+
c.CreateWatch(&Request{TypeUrl: "test"}, w, nil)
178178
select {
179179
case r := <-w:
180180
if r != nil {
@@ -190,11 +190,11 @@ func TestLinearBasic(t *testing.T) {
190190

191191
// Create watches before a resource is ready
192192
w1 := make(chan Response, 1)
193-
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1)
193+
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1, nil)
194194
mustBlock(t, w1)
195195

196196
w := make(chan Response, 1)
197-
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
197+
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil)
198198
mustBlock(t, w)
199199
checkWatchCount(t, c, "a", 2)
200200
checkWatchCount(t, c, "b", 1)
@@ -205,19 +205,19 @@ func TestLinearBasic(t *testing.T) {
205205
verifyResponse(t, w, "1", 1)
206206

207207
// Request again, should get same response
208-
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
208+
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
209209
checkWatchCount(t, c, "a", 0)
210210
verifyResponse(t, w, "1", 1)
211-
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
211+
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil)
212212
checkWatchCount(t, c, "a", 0)
213213
verifyResponse(t, w, "1", 1)
214214

215215
// Add another element and update the first, response should be different
216216
require.NoError(t, c.UpdateResource("b", testResource("b")))
217217
require.NoError(t, c.UpdateResource("a", testResource("aa")))
218-
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
218+
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
219219
verifyResponse(t, w, "3", 1)
220-
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
220+
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil)
221221
verifyResponse(t, w, "3", 2)
222222
}
223223

@@ -226,10 +226,10 @@ func TestLinearSetResources(t *testing.T) {
226226

227227
// Create new resources
228228
w1 := make(chan Response, 1)
229-
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1)
229+
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w1, nil)
230230
mustBlock(t, w1)
231231
w2 := make(chan Response, 1)
232-
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w2)
232+
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w2, nil)
233233
mustBlock(t, w2)
234234
c.SetResources(map[string]types.Resource{
235235
"a": testResource("a"),
@@ -239,9 +239,9 @@ func TestLinearSetResources(t *testing.T) {
239239
verifyResponse(t, w2, "1", 2) // the version was only incremented once for all resources
240240

241241
// Add another element and update the first, response should be different
242-
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w1)
242+
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w1, nil)
243243
mustBlock(t, w1)
244-
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w2)
244+
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w2, nil)
245245
mustBlock(t, w2)
246246
c.SetResources(map[string]types.Resource{
247247
"a": testResource("aa"),
@@ -252,9 +252,9 @@ func TestLinearSetResources(t *testing.T) {
252252
verifyResponse(t, w2, "2", 3)
253253

254254
// Delete resource
255-
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, w1)
255+
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "2"}, w1, nil)
256256
mustBlock(t, w1)
257-
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, w2)
257+
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "2"}, w2, nil)
258258
mustBlock(t, w2)
259259
c.SetResources(map[string]types.Resource{
260260
"b": testResource("b"),
@@ -285,43 +285,43 @@ func TestLinearVersionPrefix(t *testing.T) {
285285
c := NewLinearCache(testType, WithVersionPrefix("instance1-"))
286286

287287
w := make(chan Response, 1)
288-
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
288+
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
289289
verifyResponse(t, w, "instance1-0", 0)
290290

291291
require.NoError(t, c.UpdateResource("a", testResource("a")))
292-
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
292+
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
293293
verifyResponse(t, w, "instance1-1", 1)
294294

295-
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, w)
295+
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "instance1-1"}, w, nil)
296296
mustBlock(t, w)
297297
checkWatchCount(t, c, "a", 1)
298298
}
299299

300300
func TestLinearDeletion(t *testing.T) {
301301
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
302302
w := make(chan Response, 1)
303-
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w)
303+
c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
304304
mustBlock(t, w)
305305
checkWatchCount(t, c, "a", 1)
306306
require.NoError(t, c.DeleteResource("a"))
307307
verifyResponse(t, w, "1", 0)
308308
checkWatchCount(t, c, "a", 0)
309-
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w)
309+
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w, nil)
310310
verifyResponse(t, w, "1", 1)
311311
checkWatchCount(t, c, "b", 0)
312312
require.NoError(t, c.DeleteResource("b"))
313-
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w)
313+
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w, nil)
314314
verifyResponse(t, w, "2", 0)
315315
checkWatchCount(t, c, "b", 0)
316316
}
317317

318318
func TestLinearWatchTwo(t *testing.T) {
319319
c := NewLinearCache(testType, WithInitialResources(map[string]types.Resource{"a": testResource("a"), "b": testResource("b")}))
320320
w := make(chan Response, 1)
321-
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, w)
321+
c.CreateWatch(&Request{ResourceNames: []string{"a", "b"}, TypeUrl: testType, VersionInfo: "0"}, w, nil)
322322
mustBlock(t, w)
323323
w1 := make(chan Response, 1)
324-
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w1)
324+
c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "0"}, w1, nil)
325325
mustBlock(t, w1)
326326
require.NoError(t, c.UpdateResource("a", testResource("aa")))
327327
// should only get the modified resource
@@ -335,14 +335,14 @@ func TestLinearCancel(t *testing.T) {
335335

336336
// cancel watch-all
337337
w := make(chan Response, 1)
338-
cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w)
338+
cancel := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w, nil)
339339
mustBlock(t, w)
340340
checkWatchCount(t, c, "a", 1)
341341
cancel()
342342
checkWatchCount(t, c, "a", 0)
343343

344344
// cancel watch for "a"
345-
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w)
345+
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w, nil)
346346
mustBlock(t, w)
347347
checkWatchCount(t, c, "a", 1)
348348
cancel()
@@ -352,10 +352,10 @@ func TestLinearCancel(t *testing.T) {
352352
w2 := make(chan Response, 1)
353353
w3 := make(chan Response, 1)
354354
w4 := make(chan Response, 1)
355-
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w)
356-
cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, w2)
357-
cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w3)
358-
cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w4)
355+
cancel = c.CreateWatch(&Request{ResourceNames: []string{"a"}, TypeUrl: testType, VersionInfo: "1"}, w, nil)
356+
cancel2 := c.CreateWatch(&Request{ResourceNames: []string{"b"}, TypeUrl: testType, VersionInfo: "1"}, w2, nil)
357+
cancel3 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w3, nil)
358+
cancel4 := c.CreateWatch(&Request{TypeUrl: testType, VersionInfo: "1"}, w4, nil)
359359
mustBlock(t, w)
360360
mustBlock(t, w2)
361361
mustBlock(t, w3)
@@ -396,7 +396,7 @@ func TestLinearConcurrentSetWatch(t *testing.T) {
396396
ResourceNames: []string{id, id2},
397397
VersionInfo: "0",
398398
TypeUrl: testType,
399-
}, value)
399+
}, value, nil)
400400
// wait until all updates apply
401401
verifyResponse(t, value, "", 1)
402402
}

pkg/cache/v3/mux.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ type MuxCache struct {
3737

3838
var _ Cache = &MuxCache{}
3939

40-
func (mux *MuxCache) CreateWatch(request *Request, value chan Response) func() {
40+
func (mux *MuxCache) CreateWatch(request *Request, value chan Response, knownResourceNames map[string]struct{}) func() {
4141
key := mux.Classify(request)
4242
cache, exists := mux.Caches[key]
4343
if !exists {
4444
value <- nil
4545
return nil
4646
}
47-
return cache.CreateWatch(request, value)
47+
return cache.CreateWatch(request, value, knownResourceNames)
4848
}
4949

5050
func (mux *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {

pkg/cache/v3/simple.go

+34-1
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ func superset(names map[string]bool, resources map[string]types.ResourceWithTTL)
286286
}
287287

288288
// CreateWatch returns a watch for an xDS request.
289-
func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) func() {
289+
func (cache *snapshotCache) CreateWatch(request *Request, value chan Response, knownResourceNames map[string]struct{}) func() {
290290
nodeID := cache.hash.ID(request.Node)
291291

292292
cache.mu.Lock()
@@ -306,6 +306,39 @@ func (cache *snapshotCache) CreateWatch(request *Request, value chan Response) f
306306
snapshot, exists := cache.snapshots[nodeID]
307307
version := snapshot.GetVersion(request.TypeUrl)
308308

309+
if exists && knownResourceNames != nil {
310+
diff := make([]string, len(request.ResourceNames))
311+
for _, r := range request.ResourceNames {
312+
if _, ok := knownResourceNames[r]; !ok {
313+
diff = append(diff, r)
314+
}
315+
}
316+
if cache.log != nil {
317+
cache.log.Debugf("nodeID %q requested %s%v and known %v. Diff %v", nodeID,
318+
request.TypeUrl, request.ResourceNames, knownResourceNames, diff)
319+
}
320+
if len(diff) > 0 {
321+
found := false
322+
if cache.log != nil {
323+
cache.log.Debugf("nodeID %q still needs %v", nodeID, diff)
324+
}
325+
326+
resources := snapshot.GetResourcesAndTTL(request.TypeUrl)
327+
for _, name := range diff {
328+
if _, exists := resources[name]; exists {
329+
found = true
330+
break
331+
}
332+
}
333+
334+
// cache contains resources already, the watch may be responded immediately
335+
if found {
336+
_ = cache.respond(context.Background(), request, value, resources, version, false)
337+
return nil
338+
}
339+
}
340+
}
341+
309342
// if the requested version is up-to-date or missing a response, leave an open watch
310343
if !exists || request.VersionInfo == version {
311344
watchID := cache.nextWatchID()

0 commit comments

Comments
 (0)