Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## Release v0.9.8

### Added
- Support for setting TTL on resources and configuring resource heartbeating

### Changed

- Envoy APIs are at 1d44c27ff7d4ebdfbfd9a6acbcecf9631b107e30
Expand Down
9 changes: 9 additions & 0 deletions pkg/cache/types/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package types

import (
"time"

"github.com/golang/protobuf/proto"
)

Expand All @@ -9,6 +11,13 @@ type Resource interface {
proto.Message
}

// ResourceWithTtl is a Resource with an optional TTL.
type ResourceWithTtl struct {
Resource Resource

Ttl *time.Duration
}

// MarshaledResource is an alias for the serialized binary array.
type MarshaledResource = []byte

Expand Down
16 changes: 13 additions & 3 deletions pkg/cache/v2/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

discovery "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
ttl "github.com/envoyproxy/go-control-plane/pkg/ttl/v2"
"github.com/golang/protobuf/ptypes/any"
)

Expand Down Expand Up @@ -82,7 +83,12 @@ type RawResponse struct {
Version string

// Resources to be included in the response.
Resources []types.Resource
Resources []types.ResourceWithTtl

// Whether this is a heartbeat response. For xDS versions that support TTL, this
// will be converted into a response that doesn't contain the actual resource protobuf.
// This allows for more lightweight updates that server only to update the TTL timer.
Heartbeat bool

// marshaledResponse holds an atomic reference to the serialized discovery response.
marshaledResponse atomic.Value
Expand Down Expand Up @@ -113,12 +119,16 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro
marshaledResources := make([]*any.Any, len(r.Resources))

for i, resource := range r.Resources {
marshaledResource, err := MarshalResource(resource)
maybeTtldResource, resourceType, err := ttl.MaybeCreateTtlResourceIfSupported(resource, GetResourceName(resource.Resource), r.Request.TypeUrl, r.Heartbeat)
if err != nil {
return nil, err
}
marshaledResource, err := MarshalResource(maybeTtldResource)
if err != nil {
return nil, err
}
marshaledResources[i] = &any.Any{
TypeUrl: r.Request.TypeUrl,
TypeUrl: resourceType,
Value: marshaledResource,
}
}
Expand Down
28 changes: 27 additions & 1 deletion pkg/cache/v2/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v2"
"github.com/envoyproxy/go-control-plane/pkg/resource/v2"
ttl_helper "github.com/envoyproxy/go-control-plane/pkg/ttl/v2"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/stretchr/testify/assert"
Expand All @@ -18,7 +19,7 @@ const (
)

func TestResponseGetDiscoveryResponse(t *testing.T) {
routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}}
routes := []types.ResourceWithTtl{{Resource: &route.RouteConfiguration{Name: resourceName}}}
resp := cache.RawResponse{
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
Version: "v",
Expand Down Expand Up @@ -65,3 +66,28 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) {
assert.Equal(t, r.Name, resourceName)
assert.Equal(t, discoveryResponse, dr)
}

func TestHeartbeatResponseGetDiscoveryResponse(t *testing.T) {
routes := []types.ResourceWithTtl{{Resource: &route.RouteConfiguration{Name: resourceName}}}
resp := cache.RawResponse{
Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType},
Version: "v",
Resources: routes,
Heartbeat: true,
}

discoveryResponse, err := resp.GetDiscoveryResponse()
assert.Nil(t, err)
assert.Equal(t, discoveryResponse.VersionInfo, resp.Version)
assert.Equal(t, len(discoveryResponse.Resources), 1)
assert.True(t, ttl_helper.IsTTLResource(discoveryResponse.Resources[0]))

cachedResponse, err := resp.GetDiscoveryResponse()
assert.Nil(t, err)
assert.Same(t, discoveryResponse, cachedResponse)

r := &route.RouteConfiguration{}
err = ptypes.UnmarshalAny(discoveryResponse.Resources[0], r)
assert.Nil(t, err)
assert.Equal(t, r.Name, resourceName)
}
10 changes: 5 additions & 5 deletions pkg/cache/v2/linear.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,19 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache {
}

func (cache *LinearCache) respond(value chan Response, staleResources []string) {
var resources []types.Resource
var resources []types.ResourceWithTtl
// TODO: optimize the resources slice creations across different clients
if len(staleResources) == 0 {
resources = make([]types.Resource, 0, len(cache.resources))
resources = make([]types.ResourceWithTtl, 0, len(cache.resources))
for _, resource := range cache.resources {
resources = append(resources, resource)
resources = append(resources, types.ResourceWithTtl{Resource: resource})
}
} else {
resources = make([]types.Resource, 0, len(staleResources))
resources = make([]types.ResourceWithTtl, 0, len(staleResources))
for _, name := range staleResources {
resource := cache.resources[name]
if resource != nil {
resources = append(resources, resource)
resources = append(resources, types.ResourceWithTtl{Resource: resource})
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/cache/v2/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,13 +86,13 @@ func MarshalResource(resource types.Resource) (types.MarshaledResource, error) {

// GetResourceReferences returns the names for dependent resources (EDS cluster
// names for CDS, RDS routes names for LDS).
func GetResourceReferences(resources map[string]types.Resource) map[string]bool {
func GetResourceReferences(resources map[string]types.ResourceWithTtl) map[string]bool {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there something we can do here so we don't need to change the param type to a concrete implementation? Maybe by adding a getTTL method to the Resource interface?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah I like that, should simplify things quite a bit

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time trying to get this to work, the problem is that by adding a function here regular proto messages no longer satisfy the interface, which means that we end up having to wrap the resources in a struct regardless. I tried composing a proto.Message into ResourceWithTtl to make make them all just satisfy types.Resource, but it seems like the protobuf library does all kind of type conversions so just satisfying proto.Message isn't actually sufficient.

lmk if you have any ideas for how to get this working, otherwise the current approach seems like the simplest one (if verbose)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we change Resource to be a lightweight struct that embeds a proto? Might be a cleaner solution.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I gave this a go, the problem here is that it means that a regular proto no longer passes as a Resource (at least from what I could tell), which means that all callers that currently pass []Resource{proto1, proto2} would have to update it to []Resource{Resource{Message: proto1}, Resource{Message: proto2}. Not sure if it would be worth the churn to change? wdyt

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could do []Resource{{proto1}, {proto2}} which I can live with. What do others think? I think it makes sense to align Resource with xDS Resource proto which has TTL field. But I'm fine either way.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Omitting the field name works when there's only one field e.g. type Resource struct { proto.Message }. Once we introduce a second one for the TTL, they now need to be specified.

I would expose the a function to get the TTL but that means having to embed the TTL within the proto, which then forces the user to use the Resource proto wrapper (not to be confused with the Go type) themselves to specify the TTL. This would be an option: we could provide a helper function to make it easier to create the proto wrapper and rely on it for holding the TTL configuration. LMK what you all think (cc @jyotimahapatra @jessicayuen)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll note that this adds a bunch of complexity around anything that cares about the actual resource, like consistency checks, as we need to handle potentially unwrapping the wrapper type to see the real resource.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok after much more playing around it seems like embedding proto.Message breaks marshalling, since the proto library makes use of some cute interface casts to do marshalling, which ends up failing when embedding the interface. As such I'd propose leaving things as is, unless someone has any ideas for how to work around this

out := make(map[string]bool)
for _, res := range resources {
if res == nil {
if res.Resource == nil {
continue
}
switch v := res.(type) {
switch v := res.Resource.(type) {
case *endpoint.ClusterLoadAssignment:
// no dependencies
case *cluster.Cluster:
Expand Down
2 changes: 1 addition & 1 deletion pkg/cache/v2/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func TestGetResourceReferences(t *testing.T) {
},
}
for _, cs := range cases {
names := cache.GetResourceReferences(cache.IndexResourcesByName([]types.Resource{cs.in}))
names := cache.GetResourceReferences(cache.IndexResourcesByName([]types.ResourceWithTtl{{Resource: cs.in}}))
if !reflect.DeepEqual(names, cs.out) {
t.Errorf("GetResourceReferences(%v) => got %v, want %v", cs.in, names, cs.out)
}
Expand Down
103 changes: 93 additions & 10 deletions pkg/cache/v2/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ type SnapshotCache interface {
GetStatusKeys() []string
}

type heartbeatHandle struct {
cancel func()
}

type snapshotCache struct {
// watchCount is an atomic counter incremented for each watch. This needs to
// be the first field in the struct to guarantee that it is 64-bit aligned,
Expand Down Expand Up @@ -96,13 +100,89 @@ type snapshotCache struct {
//
// Logger is optional.
func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache {
return &snapshotCache{
return newSnapshotCache(ads, hash, logger)
}

func newSnapshotCache(ads bool, hash NodeHash, logger log.Logger) *snapshotCache {
cache := &snapshotCache{
log: logger,
ads: ads,
snapshots: make(map[string]Snapshot),
status: make(map[string]*statusInfo),
hash: hash,
}

return cache
}

// NewSnapshotCacheWithHeartbeating initializes a simple cache that sends periodic heartbeat
// responses for resources with a TTL.
//
// ADS flag forces a delay in responding to streaming requests until all
// resources are explicitly named in the request. This avoids the problem of a
// partial request over a single stream for a subset of resources which would
// require generating a fresh version for acknowledgement. ADS flag requires
// snapshot consistency. For non-ADS case (and fetch), multiple partial
// requests are sent across multiple streams and re-using the snapshot version
// is OK.
//
// Logger is optional.
//
// The context provides a way to cancel the heartbeating routine, while the heartbeatInterval
// parameter controls how often heartbeating occurs.
func NewSnapshotCacheWithHeartbeating(ctx context.Context, ads bool, hash NodeHash, logger log.Logger, heartbeatInterval time.Duration) SnapshotCache {
cache := newSnapshotCache(ads, hash, logger)
go func() {
t := time.NewTicker(heartbeatInterval)

for {
select {
case <-t.C:
cache.mu.Lock()
for node := range cache.status {
// TODO(snowp): Omit heartbeats if a real response has been sent recently.
cache.sendHeartbeats(ctx, node)
}
cache.mu.Unlock()
case <-ctx.Done():
return
}
}
}()
return cache
}

func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) {
snapshot := cache.snapshots[node]
if info, ok := cache.status[node]; ok {
info.mu.Lock()
for id, watch := range info.watches {
// Respond with the current version regardless of whether the version has changed.
version := snapshot.GetVersion(watch.Request.TypeUrl)
resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl)

// TODO(snowp): Construct this once per type instead of once per watch.
resourcesWithTtl := map[string]types.ResourceWithTtl{}
for k, v := range resources {
if v.Ttl != nil {
resourcesWithTtl[k] = v
}
}

if len(resourcesWithTtl) == 0 {
continue
}
if cache.log != nil {
cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version)
}

cache.respond(watch.Request, watch.Response, resourcesWithTtl, version, true)

// The watch must be deleted and we must rely on the client to ack this response to create a new watch.
delete(info.watches, id)
}
info.mu.Unlock()
}
}

// SetSnapshotCache updates a snapshot for a node.
Expand All @@ -122,7 +202,8 @@ func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) error {
if cache.log != nil {
cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version)
}
cache.respond(watch.Request, watch.Response, snapshot.GetResources(watch.Request.TypeUrl), version)
resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl)
cache.respond(watch.Request, watch.Response, resources, version, false)

// discard the watch
delete(info.watches, id)
Expand Down Expand Up @@ -165,7 +246,7 @@ func nameSet(names []string) map[string]bool {
}

// superset checks that all resources are listed in the names set.
func superset(names map[string]bool, resources map[string]types.Resource) error {
func superset(names map[string]bool, resources map[string]types.ResourceWithTtl) error {
for resourceName := range resources {
if _, exists := names[resourceName]; !exists {
return fmt.Errorf("%q not listed", resourceName)
Expand Down Expand Up @@ -212,7 +293,8 @@ func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func()
}

// otherwise, the watch may be responded immediately
cache.respond(request, value, snapshot.GetResources(request.TypeUrl), version)
resources := snapshot.GetResourcesAndTtl(request.TypeUrl)
cache.respond(request, value, resources, version, false)

return value, nil
}
Expand All @@ -237,7 +319,7 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() {

// Respond to a watch with the snapshot value. The value channel should have capacity not to block.
// TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46
func (cache *snapshotCache) respond(request *Request, value chan Response, resources map[string]types.Resource, version string) {
func (cache *snapshotCache) respond(request *Request, value chan Response, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) {
// for ADS, the request names must match the snapshot names
// if they do not, then the watch is never responded, and it is expected that envoy makes another request
if len(request.ResourceNames) != 0 && cache.ads {
Expand All @@ -253,11 +335,11 @@ func (cache *snapshotCache) respond(request *Request, value chan Response, resou
request.TypeUrl, request.ResourceNames, request.VersionInfo, version)
}

value <- createResponse(request, resources, version)
value <- createResponse(request, resources, version, heartbeat)
}

func createResponse(request *Request, resources map[string]types.Resource, version string) Response {
filtered := make([]types.Resource, 0, len(resources))
func createResponse(request *Request, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) Response {
filtered := make([]types.ResourceWithTtl, 0, len(resources))

// Reply only with the requested resources. Envoy may ask each resource
// individually in a separate stream. It is ok to reply with the same version
Expand All @@ -279,6 +361,7 @@ func createResponse(request *Request, resources map[string]types.Resource, versi
Request: request,
Version: version,
Resources: filtered,
Heartbeat: heartbeat,
}
}

Expand All @@ -301,8 +384,8 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon
return nil, &types.SkipFetchError{}
}

resources := snapshot.GetResources(request.TypeUrl)
out := createResponse(request, resources, version)
resources := snapshot.GetResourcesAndTtl(request.TypeUrl)
out := createResponse(request, resources, version, false)
return out, nil
}

Expand Down
Loading