Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

move delta to new linearization API #447

Merged
merged 3 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 4 additions & 9 deletions pkg/cache/v3/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,20 @@ type ConfigWatcher interface {
// CreateWatch returns a new open watch from a non-empty request.
// An individual consumer normally issues a single open watch by each type URL.
//
// Value channel produces requested resources, once they are available. If
// the channel is closed prior to cancellation of the watch, an unrecoverable
// error has occurred in the producer, and the consumer should close the
// corresponding stream.
// The provided channel produces requested resources as responses, once they are available.
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateWatch(*Request, chan Response) (cancel func())

// CreateDeltaWatch returns a new open incremental xDS watch.
//
// Value channel produces requested resources, or spontaneous updates in accordance
// with the incremental xDS specification. If the channel is closed
// prior to cancellation of the watch, an unrecoverable error has occurred in the producer,
// and the consumer should close the corresponding stream.
// The provided channel produces requested resources as responses, or spontaneous updates in accordance
// with the incremental xDS specification.
//
alecholmez marked this conversation as resolved.
Show resolved Hide resolved
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
CreateDeltaWatch(*DeltaRequest, stream.StreamState) (value chan DeltaResponse, cancel func())
CreateDeltaWatch(*DeltaRequest, stream.StreamState, chan DeltaResponse) (cancel func())
}

// ConfigFetcher fetches configuration resources from cache
Expand Down
30 changes: 18 additions & 12 deletions pkg/cache/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {

// Make our initial request as a wildcard to get all resources and make sure the wildcard requesting works as intended
for _, typ := range testTypes {
watches[typ], _ = c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
watches[typ] = make(chan cache.DeltaResponse, 1)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.StreamState{ResourceVersions: nil, Wildcard: true})
}, stream.StreamState{ResourceVersions: nil, Wildcard: true}, watches[typ])
}

if err := c.SetSnapshot(key, snapshot); err != nil {
Expand All @@ -53,13 +54,14 @@ func TestSnapshotCacheDeltaWatch(t *testing.T) {
// On re-request we want to use non-wildcard so we can verify the logic path of not requesting
// all resources as well as individual resource removals
for _, typ := range testTypes {
watches[typ], _ = c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
watches[typ] = make(chan cache.DeltaResponse, 1)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.StreamState{ResourceVersions: versionMap[typ]})
}, stream.StreamState{ResourceVersions: versionMap[typ]}, watches[typ])
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand Down Expand Up @@ -94,14 +96,15 @@ func TestDeltaRemoveResources(t *testing.T) {
watches := make(map[string]chan cache.DeltaResponse)

for _, typ := range testTypes {
watches[typ] = make(chan cache.DeltaResponse, 1)
// We don't specify any resource name subscriptions here because we want to make sure we test wildcard
// functionality. This means we should receive all resources back without requesting a subscription by name.
watches[typ], _ = c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
}, stream.StreamState{ResourceVersions: make(map[string]string), Wildcard: true})
}, stream.StreamState{ResourceVersions: make(map[string]string), Wildcard: true}, watches[typ])
}

if err := c.SetSnapshot(key, snapshot); err != nil {
Expand All @@ -127,12 +130,13 @@ func TestDeltaRemoveResources(t *testing.T) {
// We want to continue to do wildcard requests here so we can later
// test the removal of certain resources from a partial snapshot
for _, typ := range testTypes {
watches[typ], _ = c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
watches[typ] = make(chan cache.DeltaResponse, 1)
c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: "node",
},
TypeUrl: typ,
}, stream.StreamState{ResourceVersions: versionMap[typ], Wildcard: true})
}, stream.StreamState{ResourceVersions: versionMap[typ], Wildcard: true}, watches[typ])
}

if count := c.GetStatusInfo(key).GetNumDeltaWatches(); count != len(testTypes) {
Expand Down Expand Up @@ -172,6 +176,7 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
t.Parallel()
id := fmt.Sprintf("%d", i%2)
var cancel func()
responses := make(chan cache.DeltaResponse, 1)
if i < 25 {
snap := cache.Snapshot{}
snap.Resources[types.Endpoint] = cache.NewResources(version, []types.Resource{resource.MakeEndpoint(clusterName, uint32(i))})
Expand All @@ -181,13 +186,13 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
cancel()
}

_, cancel = c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
cancel = c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: id,
},
TypeUrl: rsrc.EndpointType,
ResourceNamesSubscribe: []string{clusterName},
}, stream.NewStreamState(false, make(map[string]string)))
}, stream.NewStreamState(false, make(map[string]string)), responses)
}
})
}(i)
Expand All @@ -197,13 +202,14 @@ func TestConcurrentSetDeltaWatch(t *testing.T) {
func TestSnapshotCacheDeltaWatchCancel(t *testing.T) {
c := cache.NewSnapshotCache(true, group{}, logger{t: t})
for _, typ := range testTypes {
_, cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
responses := make(chan cache.DeltaResponse, 1)
cancel := c.CreateDeltaWatch(&discovery.DeltaDiscoveryRequest{
Node: &core.Node{
Id: key,
},
TypeUrl: typ,
ResourceNamesSubscribe: names[typ],
}, stream.NewStreamState(false, make(map[string]string)))
}, stream.NewStreamState(false, make(map[string]string)), responses)

// Cancel the watch
cancel()
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,8 @@ func (cache *LinearCache) CreateWatch(request *Request, value chan Response) fun
}
}

func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) {
return nil, nil
func (cache *LinearCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
return nil
}

func (cache *LinearCache) Fetch(ctx context.Context, request *Request) (Response, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/cache/v3/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ func (mux *MuxCache) CreateWatch(request *Request, value chan Response) func() {
return cache.CreateWatch(request, value)
}

func (cache *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) {
return nil, nil
func (cache *MuxCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
return nil
}

func (mux *MuxCache) Fetch(ctx context.Context, request *Request) (Response, error) {
Expand Down
8 changes: 3 additions & 5 deletions pkg/cache/v3/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func createResponse(request *Request, resources map[string]types.ResourceWithTtl
}

// CreateDeltaWatch returns a watch for a delta xDS request which implements the Simple SnapshotCache.
func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState) (chan DeltaResponse, func()) {
func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream.StreamState, value chan DeltaResponse) func() {
nodeID := cache.hash.ID(request.Node)
t := request.GetTypeUrl()

Expand All @@ -405,8 +405,6 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream
// update last watch request time
info.SetLastDeltaWatchRequestTime(time.Now())

value := make(chan DeltaResponse, 1)

// find the current cache snapshot for the provided node
snapshot, exists := cache.snapshots[nodeID]

Expand All @@ -433,10 +431,10 @@ func (cache *snapshotCache) CreateDeltaWatch(request *DeltaRequest, state stream

info.SetDeltaResponseWatch(watchID, DeltaResponseWatch{Request: request, Response: value, StreamState: state})

return value, cache.cancelDeltaWatch(nodeID, watchID)
return cache.cancelDeltaWatch(nodeID, watchID)
}

return value, nil
return nil
}

func (cache *snapshotCache) nextDeltaWatchID() int64 {
Expand Down
29 changes: 6 additions & 23 deletions pkg/server/delta/v3/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
// Since there was no previous state, we know we're handling the first request of this type
// so we set the initial resource versions if we have any, and also signal if this stream is in wildcard mode.
watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions())
watch.responses = make(chan cache.DeltaResponse, 1)

// We must signal goroutine termination to prevent a race between the cancel closing the watch
// and the producer closing the watch.
Expand All @@ -170,31 +171,13 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
s.subscribe(req.GetResourceNamesSubscribe(), watch.state.ResourceVersions)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), watch.state.ResourceVersions)

watch.responses, watch.cancel = s.cache.CreateDeltaWatch(req, watch.state)

// Go does not allow for selecting over a dynamic set of channels
// so we introduce a termination chan to handle cancelling any watches.
terminate := make(chan struct{})
watch.termination = terminate
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
watches.deltaWatches[typeURL] = watch

go func() {
select {
case resp, more := <-watch.responses:
if more {
watches.deltaMuxedResponses <- resp
} else {
// Check again if the watch is cancelled.
select {
case <-terminate: // do nothing
default:
// We cannot close the responses channel since it can be closed twice.
// Instead we send a fake error response.
watches.deltaMuxedResponses <- deltaErrorResponse
}
}
break
case <-terminate:
break
resp, more := <-watch.responses
if more {
watches.deltaMuxedResponses <- resp
}
}()
}
Expand Down
14 changes: 0 additions & 14 deletions pkg/server/delta/v3/watches.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ func (w *watches) Cancel() {
if watch.cancel != nil {
watch.cancel()
}

watch.terminate()
}
}

Expand All @@ -40,8 +38,6 @@ type watch struct {
cancel func()
nonce string

termination chan struct{}

state stream.StreamState
}

Expand All @@ -50,14 +46,4 @@ func (w *watch) Cancel() {
if w.cancel != nil {
w.cancel()
}

if w.termination != nil {
close(w.termination)
}
}

func (w *watch) terminate() {
if w.termination != nil {
close(w.termination)
}
}
34 changes: 3 additions & 31 deletions pkg/server/v3/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,9 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/server/v3"
)

func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state stream.StreamState) (chan cache.DeltaResponse, func()) {
func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryRequest, state stream.StreamState, out chan cache.DeltaResponse) func() {
config.deltaCounts[req.TypeUrl] = config.deltaCounts[req.TypeUrl] + 1

// Create our out watch channel to return with a buffer of one
out := make(chan cache.DeltaResponse, 1)

if len(config.deltaResponses[req.TypeUrl]) > 0 {
res := config.deltaResponses[req.TypeUrl][0]
// In subscribed, we only want to send back what's changed if we detect changes
Expand Down Expand Up @@ -63,17 +60,14 @@ func (config *mockConfigWatcher) CreateDeltaWatch(req *discovery.DeltaDiscoveryR
SystemVersionInfo: "",
NextVersionMap: state.ResourceVersions,
}
} else if config.closeWatch {
close(out)
} else {
config.deltaWatches += 1
return out, func() {
close(out)
return func() {
config.deltaWatches -= 1
}
}

return out, nil
return nil
}

type mockDeltaStream struct {
Expand Down Expand Up @@ -275,28 +269,6 @@ func TestDeltaResponseHandlers(t *testing.T) {
}
}

func TestDeltaWatchClosed(t *testing.T) {
for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
config := makeMockConfigWatcher()
config.closeWatch = true
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})

resp := makeMockDeltaStream(t)
resp.recv <- &discovery.DeltaDiscoveryRequest{
Node: node,
TypeUrl: typ,
}

// Verify that the response fails when the watch is closed
err := s.DeltaAggregatedResources(resp)
assert.Error(t, err)

close(resp.recv)
})
}
}

func TestSendDeltaError(t *testing.T) {
for _, typ := range testTypes {
t.Run(typ, func(t *testing.T) {
Expand Down