-
Notifications
You must be signed in to change notification settings - Fork 563
cache: add support for xDS TTLs #359
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 23 commits
d60ddf1
6a04f23
46e1521
4950ef7
7c7bd4b
8a3df30
46f7685
85188e4
d28f5f3
dc7554a
506f272
bd50781
ef8ef83
7ca85c3
62b78a4
83e352f
8aa5fc4
b9fe8c9
703d458
e428918
8c823a7
c3585ab
d899fe9
0bc2e6e
3d7535d
eb44d1a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,13 +86,13 @@ func MarshalResource(resource types.Resource) (types.MarshaledResource, error) { | |
|
|
||
| // GetResourceReferences returns the names for dependent resources (EDS cluster | ||
| // names for CDS, RDS routes names for LDS). | ||
| func GetResourceReferences(resources map[string]types.Resource) map[string]bool { | ||
| func GetResourceReferences(resources map[string]types.ResourceWithTtl) map[string]bool { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there something we can do here so we don't need to change the param type to a concrete implementation? Maybe by adding a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yeah I like that, should simplify things quite a bit
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I spent some time trying to get this to work, the problem is that by adding a function here regular proto messages no longer satisfy the interface, which means that we end up having to wrap the resources in a struct regardless. I tried composing a lmk if you have any ideas for how to get this working, otherwise the current approach seems like the simplest one (if verbose)
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we change Resource to be a lightweight struct that embeds a proto? Might be a cleaner solution.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I gave this a go, the problem here is that it means that a regular proto no longer passes as a
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you could do
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Omitting the field name works when there's only one field e.g. I would expose the a function to get the TTL but that means having to embed the TTL within the proto, which then forces the user to use the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll note that this adds a bunch of complexity around anything that cares about the actual resource, like consistency checks, as we need to handle potentially unwrapping the wrapper type to see the real resource.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok after much more playing around it seems like embedding |
||
| out := make(map[string]bool) | ||
| for _, res := range resources { | ||
| if res == nil { | ||
| if res.Resource == nil { | ||
| continue | ||
| } | ||
| switch v := res.(type) { | ||
| switch v := res.Resource.(type) { | ||
| case *endpoint.ClusterLoadAssignment: | ||
| // no dependencies | ||
| case *cluster.Cluster: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,6 +60,10 @@ type SnapshotCache interface { | |
| GetStatusKeys() []string | ||
| } | ||
|
|
||
| type heartbeatHandle struct { | ||
| cancel func() | ||
| } | ||
|
|
||
| type snapshotCache struct { | ||
| // watchCount is an atomic counter incremented for each watch. This needs to | ||
| // be the first field in the struct to guarantee that it is 64-bit aligned, | ||
|
|
@@ -81,6 +85,9 @@ type snapshotCache struct { | |
| // hash is the hashing function for Envoy nodes | ||
| hash NodeHash | ||
|
|
||
| // heartbeatRoutines keeps track of the active heartbeat goroutines | ||
| heartbeatRoutines map[string]heartbeatHandle | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we need so many go routines? Could we just have a single ticker that traverses the entire cache and re-pushes resources that aged?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It just felt the easiest at the time, I'll give the single goroutine a go
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this field needed now? I don't see it being updated. |
||
|
|
||
| mu sync.RWMutex | ||
| } | ||
|
|
||
|
|
@@ -96,12 +103,89 @@ type snapshotCache struct { | |
| // | ||
| // Logger is optional. | ||
| func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache { | ||
| return &snapshotCache{ | ||
| log: logger, | ||
| ads: ads, | ||
| snapshots: make(map[string]Snapshot), | ||
| status: make(map[string]*statusInfo), | ||
| hash: hash, | ||
| return newSnapshotCache(ads, hash, logger) | ||
| } | ||
|
|
||
| func newSnapshotCache(ads bool, hash NodeHash, logger log.Logger) *snapshotCache { | ||
| cache := &snapshotCache{ | ||
| log: logger, | ||
| ads: ads, | ||
| snapshots: make(map[string]Snapshot), | ||
| status: make(map[string]*statusInfo), | ||
| hash: hash, | ||
| heartbeatRoutines: make(map[string]heartbeatHandle), | ||
| } | ||
|
|
||
| return cache | ||
| } | ||
|
|
||
| // NewSnapshotCacheWithHeartbeating initializes a simple cache that sends periodic heartbeat | ||
| // responses for resources with a TTL. | ||
| // | ||
| // ADS flag forces a delay in responding to streaming requests until all | ||
| // resources are explicitly named in the request. This avoids the problem of a | ||
| // partial request over a single stream for a subset of resources which would | ||
| // require generating a fresh version for acknowledgement. ADS flag requires | ||
| // snapshot consistency. For non-ADS case (and fetch), multiple partial | ||
| // requests are sent across multiple streams and re-using the snapshot version | ||
| // is OK. | ||
| // | ||
| // Logger is optional. | ||
| // | ||
| // The context provides a way to cancel the heartbeating routine, while the heartbeatInterval | ||
| // parameter controls how often heartbeating occurs. | ||
| func NewSnapshotCacheWithHeartbeating(ctx context.Context, ads bool, hash NodeHash, logger log.Logger, heartbeatInterval time.Duration) SnapshotCache { | ||
| cache := newSnapshotCache(ads, hash, logger) | ||
| go func() { | ||
| t := time.NewTicker(heartbeatInterval) | ||
|
|
||
| for { | ||
| select { | ||
| case <-t.C: | ||
| cache.mu.Lock() | ||
| for node := range cache.status { | ||
| // TODO(snowp): Omit heartbeats if a real response has been sent recently. | ||
| cache.sendHeartbeats(ctx, node) | ||
| } | ||
| cache.mu.Unlock() | ||
| case <-ctx.Done(): | ||
| return | ||
| } | ||
| } | ||
| }() | ||
| return cache | ||
| } | ||
|
|
||
| func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { | ||
| snapshot := cache.snapshots[node] | ||
| if info, ok := cache.status[node]; ok { | ||
| info.mu.Lock() | ||
| for id, watch := range info.watches { | ||
|
jyotimahapatra marked this conversation as resolved.
|
||
| // Respond with the current version regardless of whether the version has changed. | ||
| version := snapshot.GetVersion(watch.Request.TypeUrl) | ||
| resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl) | ||
|
|
||
| // TODO(snowp): Construct this once per type instead of once per watch. | ||
| resourcesWithTtl := map[string]types.ResourceWithTtl{} | ||
| for k, v := range resources { | ||
| if v.Ttl != nil { | ||
| resourcesWithTtl[k] = v | ||
| } | ||
| } | ||
|
|
||
| if len(resourcesWithTtl) == 0 { | ||
| continue | ||
| } | ||
| if cache.log != nil { | ||
| cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version) | ||
| } | ||
|
|
||
| cache.respond(watch.Request, watch.Response, resourcesWithTtl, version, true) | ||
|
|
||
| // The watch must be deleted and we must rely on the client to ack this response to create a new watch. | ||
| delete(info.watches, id) | ||
| } | ||
| info.mu.Unlock() | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -113,6 +197,12 @@ func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) error { | |
| // update the existing entry | ||
| cache.snapshots[node] = snapshot | ||
|
|
||
| handle, ok := cache.heartbeatRoutines[node] | ||
| // No heartbeat configured and no existing heartbeat, nothing to do. | ||
| if ok { | ||
| handle.cancel() | ||
| } | ||
|
|
||
| // trigger existing watches for which version changed | ||
| if info, ok := cache.status[node]; ok { | ||
| info.mu.Lock() | ||
|
|
@@ -122,7 +212,8 @@ func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) error { | |
| if cache.log != nil { | ||
| cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version) | ||
| } | ||
| cache.respond(watch.Request, watch.Response, snapshot.GetResources(watch.Request.TypeUrl), version) | ||
| resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl) | ||
| cache.respond(watch.Request, watch.Response, resources, version, false) | ||
|
|
||
| // discard the watch | ||
| delete(info.watches, id) | ||
|
|
@@ -165,7 +256,7 @@ func nameSet(names []string) map[string]bool { | |
| } | ||
|
|
||
| // superset checks that all resources are listed in the names set. | ||
| func superset(names map[string]bool, resources map[string]types.Resource) error { | ||
| func superset(names map[string]bool, resources map[string]types.ResourceWithTtl) error { | ||
| for resourceName := range resources { | ||
| if _, exists := names[resourceName]; !exists { | ||
| return fmt.Errorf("%q not listed", resourceName) | ||
|
|
@@ -212,7 +303,8 @@ func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func() | |
| } | ||
|
|
||
| // otherwise, the watch may be responded immediately | ||
| cache.respond(request, value, snapshot.GetResources(request.TypeUrl), version) | ||
| resources := snapshot.GetResourcesAndTtl(request.TypeUrl) | ||
| cache.respond(request, value, resources, version, false) | ||
|
|
||
| return value, nil | ||
| } | ||
|
|
@@ -237,7 +329,7 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { | |
|
|
||
| // Respond to a watch with the snapshot value. The value channel should have capacity not to block. | ||
| // TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 | ||
| func (cache *snapshotCache) respond(request *Request, value chan Response, resources map[string]types.Resource, version string) { | ||
| func (cache *snapshotCache) respond(request *Request, value chan Response, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) { | ||
| // for ADS, the request names must match the snapshot names | ||
| // if they do not, then the watch is never responded, and it is expected that envoy makes another request | ||
| if len(request.ResourceNames) != 0 && cache.ads { | ||
|
|
@@ -253,11 +345,11 @@ func (cache *snapshotCache) respond(request *Request, value chan Response, resou | |
| request.TypeUrl, request.ResourceNames, request.VersionInfo, version) | ||
| } | ||
|
|
||
| value <- createResponse(request, resources, version) | ||
| value <- createResponse(request, resources, version, heartbeat) | ||
| } | ||
|
|
||
| func createResponse(request *Request, resources map[string]types.Resource, version string) Response { | ||
| filtered := make([]types.Resource, 0, len(resources)) | ||
| func createResponse(request *Request, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) Response { | ||
| filtered := make([]types.ResourceWithTtl, 0, len(resources)) | ||
|
|
||
| // Reply only with the requested resources. Envoy may ask each resource | ||
| // individually in a separate stream. It is ok to reply with the same version | ||
|
|
@@ -279,6 +371,7 @@ func createResponse(request *Request, resources map[string]types.Resource, versi | |
| Request: request, | ||
| Version: version, | ||
| Resources: filtered, | ||
| Heartbeat: heartbeat, | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -301,8 +394,8 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon | |
| return nil, &types.SkipFetchError{} | ||
| } | ||
|
|
||
| resources := snapshot.GetResources(request.TypeUrl) | ||
| out := createResponse(request, resources, version) | ||
| resources := snapshot.GetResourcesAndTtl(request.TypeUrl) | ||
| out := createResponse(request, resources, version, false) | ||
| return out, nil | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.