diff --git a/CHANGELOG.md b/CHANGELOG.md index a6d8a12222..c9af52d192 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## Release v0.9.8 +### Added +- Support for setting TTL on resources and configuring resource heartbeating + ### Changed - Envoy APIs are at 1d44c27ff7d4ebdfbfd9a6acbcecf9631b107e30 diff --git a/pkg/cache/types/types.go b/pkg/cache/types/types.go index 982d0934e2..e0b87452c0 100644 --- a/pkg/cache/types/types.go +++ b/pkg/cache/types/types.go @@ -1,6 +1,8 @@ package types import ( + "time" + "github.com/golang/protobuf/proto" ) @@ -9,6 +11,13 @@ type Resource interface { proto.Message } +// ResourceWithTtl is a Resource with an optional TTL. +type ResourceWithTtl struct { + Resource Resource + + Ttl *time.Duration +} + // MarshaledResource is an alias for the serialized binary array. type MarshaledResource = []byte diff --git a/pkg/cache/v2/cache.go b/pkg/cache/v2/cache.go index 0bf08f8596..0824428cf7 100644 --- a/pkg/cache/v2/cache.go +++ b/pkg/cache/v2/cache.go @@ -22,6 +22,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/api/v2" "github.com/envoyproxy/go-control-plane/pkg/cache/types" + ttl "github.com/envoyproxy/go-control-plane/pkg/ttl/v2" "github.com/golang/protobuf/ptypes/any" ) @@ -82,7 +83,12 @@ type RawResponse struct { Version string // Resources to be included in the response. - Resources []types.Resource + Resources []types.ResourceWithTtl + + // Whether this is a heartbeat response. For xDS versions that support TTL, this + // will be converted into a response that doesn't contain the actual resource protobuf. + // This allows for more lightweight updates that server only to update the TTL timer. + Heartbeat bool // marshaledResponse holds an atomic reference to the serialized discovery response. marshaledResponse atomic.Value @@ -113,12 +119,16 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro marshaledResources := make([]*any.Any, len(r.Resources)) for i, resource := range r.Resources { - marshaledResource, err := MarshalResource(resource) + maybeTtldResource, resourceType, err := ttl.MaybeCreateTtlResourceIfSupported(resource, GetResourceName(resource.Resource), r.Request.TypeUrl, r.Heartbeat) + if err != nil { + return nil, err + } + marshaledResource, err := MarshalResource(maybeTtldResource) if err != nil { return nil, err } marshaledResources[i] = &any.Any{ - TypeUrl: r.Request.TypeUrl, + TypeUrl: resourceType, Value: marshaledResource, } } diff --git a/pkg/cache/v2/cache_test.go b/pkg/cache/v2/cache_test.go index 75194f6235..099fa3a7ea 100644 --- a/pkg/cache/v2/cache_test.go +++ b/pkg/cache/v2/cache_test.go @@ -8,6 +8,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v2" "github.com/envoyproxy/go-control-plane/pkg/resource/v2" + ttl_helper "github.com/envoyproxy/go-control-plane/pkg/ttl/v2" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" "github.com/stretchr/testify/assert" @@ -18,7 +19,7 @@ const ( ) func TestResponseGetDiscoveryResponse(t *testing.T) { - routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}} + routes := []types.ResourceWithTtl{{Resource: &route.RouteConfiguration{Name: resourceName}}} resp := cache.RawResponse{ Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType}, Version: "v", @@ -65,3 +66,28 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) { assert.Equal(t, r.Name, resourceName) assert.Equal(t, discoveryResponse, dr) } + +func TestHeartbeatResponseGetDiscoveryResponse(t *testing.T) { + routes := []types.ResourceWithTtl{{Resource: &route.RouteConfiguration{Name: resourceName}}} + resp := cache.RawResponse{ + Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType}, + Version: "v", + Resources: routes, + Heartbeat: true, + } + + discoveryResponse, err := resp.GetDiscoveryResponse() + assert.Nil(t, err) + assert.Equal(t, discoveryResponse.VersionInfo, resp.Version) + assert.Equal(t, len(discoveryResponse.Resources), 1) + assert.True(t, ttl_helper.IsTTLResource(discoveryResponse.Resources[0])) + + cachedResponse, err := resp.GetDiscoveryResponse() + assert.Nil(t, err) + assert.Same(t, discoveryResponse, cachedResponse) + + r := &route.RouteConfiguration{} + err = ptypes.UnmarshalAny(discoveryResponse.Resources[0], r) + assert.Nil(t, err) + assert.Equal(t, r.Name, resourceName) +} diff --git a/pkg/cache/v2/linear.go b/pkg/cache/v2/linear.go index 27172fa586..deeb02f843 100644 --- a/pkg/cache/v2/linear.go +++ b/pkg/cache/v2/linear.go @@ -91,19 +91,19 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { } func (cache *LinearCache) respond(value chan Response, staleResources []string) { - var resources []types.Resource + var resources []types.ResourceWithTtl // TODO: optimize the resources slice creations across different clients if len(staleResources) == 0 { - resources = make([]types.Resource, 0, len(cache.resources)) + resources = make([]types.ResourceWithTtl, 0, len(cache.resources)) for _, resource := range cache.resources { - resources = append(resources, resource) + resources = append(resources, types.ResourceWithTtl{Resource: resource}) } } else { - resources = make([]types.Resource, 0, len(staleResources)) + resources = make([]types.ResourceWithTtl, 0, len(staleResources)) for _, name := range staleResources { resource := cache.resources[name] if resource != nil { - resources = append(resources, resource) + resources = append(resources, types.ResourceWithTtl{Resource: resource}) } } } diff --git a/pkg/cache/v2/resource.go b/pkg/cache/v2/resource.go index 0407de3698..cd96ed51d2 100644 --- a/pkg/cache/v2/resource.go +++ b/pkg/cache/v2/resource.go @@ -86,13 +86,13 @@ func MarshalResource(resource types.Resource) (types.MarshaledResource, error) { // GetResourceReferences returns the names for dependent resources (EDS cluster // names for CDS, RDS routes names for LDS). -func GetResourceReferences(resources map[string]types.Resource) map[string]bool { +func GetResourceReferences(resources map[string]types.ResourceWithTtl) map[string]bool { out := make(map[string]bool) for _, res := range resources { - if res == nil { + if res.Resource == nil { continue } - switch v := res.(type) { + switch v := res.Resource.(type) { case *endpoint.ClusterLoadAssignment: // no dependencies case *cluster.Cluster: diff --git a/pkg/cache/v2/resource_test.go b/pkg/cache/v2/resource_test.go index bb0c5e695a..fb67fe410b 100644 --- a/pkg/cache/v2/resource_test.go +++ b/pkg/cache/v2/resource_test.go @@ -138,7 +138,7 @@ func TestGetResourceReferences(t *testing.T) { }, } for _, cs := range cases { - names := cache.GetResourceReferences(cache.IndexResourcesByName([]types.Resource{cs.in})) + names := cache.GetResourceReferences(cache.IndexResourcesByName([]types.ResourceWithTtl{{Resource: cs.in}})) if !reflect.DeepEqual(names, cs.out) { t.Errorf("GetResourceReferences(%v) => got %v, want %v", cs.in, names, cs.out) } diff --git a/pkg/cache/v2/simple.go b/pkg/cache/v2/simple.go index cf10b70fae..57417f87cc 100644 --- a/pkg/cache/v2/simple.go +++ b/pkg/cache/v2/simple.go @@ -60,6 +60,10 @@ type SnapshotCache interface { GetStatusKeys() []string } +type heartbeatHandle struct { + cancel func() +} + type snapshotCache struct { // watchCount is an atomic counter incremented for each watch. This needs to // be the first field in the struct to guarantee that it is 64-bit aligned, @@ -96,13 +100,89 @@ type snapshotCache struct { // // Logger is optional. func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache { - return &snapshotCache{ + return newSnapshotCache(ads, hash, logger) +} + +func newSnapshotCache(ads bool, hash NodeHash, logger log.Logger) *snapshotCache { + cache := &snapshotCache{ log: logger, ads: ads, snapshots: make(map[string]Snapshot), status: make(map[string]*statusInfo), hash: hash, } + + return cache +} + +// NewSnapshotCacheWithHeartbeating initializes a simple cache that sends periodic heartbeat +// responses for resources with a TTL. +// +// ADS flag forces a delay in responding to streaming requests until all +// resources are explicitly named in the request. This avoids the problem of a +// partial request over a single stream for a subset of resources which would +// require generating a fresh version for acknowledgement. ADS flag requires +// snapshot consistency. For non-ADS case (and fetch), multiple partial +// requests are sent across multiple streams and re-using the snapshot version +// is OK. +// +// Logger is optional. +// +// The context provides a way to cancel the heartbeating routine, while the heartbeatInterval +// parameter controls how often heartbeating occurs. +func NewSnapshotCacheWithHeartbeating(ctx context.Context, ads bool, hash NodeHash, logger log.Logger, heartbeatInterval time.Duration) SnapshotCache { + cache := newSnapshotCache(ads, hash, logger) + go func() { + t := time.NewTicker(heartbeatInterval) + + for { + select { + case <-t.C: + cache.mu.Lock() + for node := range cache.status { + // TODO(snowp): Omit heartbeats if a real response has been sent recently. + cache.sendHeartbeats(ctx, node) + } + cache.mu.Unlock() + case <-ctx.Done(): + return + } + } + }() + return cache +} + +func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { + snapshot := cache.snapshots[node] + if info, ok := cache.status[node]; ok { + info.mu.Lock() + for id, watch := range info.watches { + // Respond with the current version regardless of whether the version has changed. + version := snapshot.GetVersion(watch.Request.TypeUrl) + resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl) + + // TODO(snowp): Construct this once per type instead of once per watch. + resourcesWithTtl := map[string]types.ResourceWithTtl{} + for k, v := range resources { + if v.Ttl != nil { + resourcesWithTtl[k] = v + } + } + + if len(resourcesWithTtl) == 0 { + continue + } + if cache.log != nil { + cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version) + } + + cache.respond(watch.Request, watch.Response, resourcesWithTtl, version, true) + + // The watch must be deleted and we must rely on the client to ack this response to create a new watch. + delete(info.watches, id) + } + info.mu.Unlock() + } } // SetSnapshotCache updates a snapshot for a node. @@ -122,7 +202,8 @@ func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) error { if cache.log != nil { cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version) } - cache.respond(watch.Request, watch.Response, snapshot.GetResources(watch.Request.TypeUrl), version) + resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl) + cache.respond(watch.Request, watch.Response, resources, version, false) // discard the watch delete(info.watches, id) @@ -165,7 +246,7 @@ func nameSet(names []string) map[string]bool { } // superset checks that all resources are listed in the names set. -func superset(names map[string]bool, resources map[string]types.Resource) error { +func superset(names map[string]bool, resources map[string]types.ResourceWithTtl) error { for resourceName := range resources { if _, exists := names[resourceName]; !exists { return fmt.Errorf("%q not listed", resourceName) @@ -212,7 +293,8 @@ func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func() } // otherwise, the watch may be responded immediately - cache.respond(request, value, snapshot.GetResources(request.TypeUrl), version) + resources := snapshot.GetResourcesAndTtl(request.TypeUrl) + cache.respond(request, value, resources, version, false) return value, nil } @@ -237,7 +319,7 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { // Respond to a watch with the snapshot value. The value channel should have capacity not to block. // TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 -func (cache *snapshotCache) respond(request *Request, value chan Response, resources map[string]types.Resource, version string) { +func (cache *snapshotCache) respond(request *Request, value chan Response, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) { // for ADS, the request names must match the snapshot names // if they do not, then the watch is never responded, and it is expected that envoy makes another request if len(request.ResourceNames) != 0 && cache.ads { @@ -253,11 +335,11 @@ func (cache *snapshotCache) respond(request *Request, value chan Response, resou request.TypeUrl, request.ResourceNames, request.VersionInfo, version) } - value <- createResponse(request, resources, version) + value <- createResponse(request, resources, version, heartbeat) } -func createResponse(request *Request, resources map[string]types.Resource, version string) Response { - filtered := make([]types.Resource, 0, len(resources)) +func createResponse(request *Request, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) Response { + filtered := make([]types.ResourceWithTtl, 0, len(resources)) // Reply only with the requested resources. Envoy may ask each resource // individually in a separate stream. It is ok to reply with the same version @@ -279,6 +361,7 @@ func createResponse(request *Request, resources map[string]types.Resource, versi Request: request, Version: version, Resources: filtered, + Heartbeat: heartbeat, } } @@ -301,8 +384,8 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon return nil, &types.SkipFetchError{} } - resources := snapshot.GetResources(request.TypeUrl) - out := createResponse(request, resources, version) + resources := snapshot.GetResourcesAndTtl(request.TypeUrl) + out := createResponse(request, resources, version, false) return out, nil } diff --git a/pkg/cache/v2/simple_test.go b/pkg/cache/v2/simple_test.go index 825cfb6df7..5211b66833 100644 --- a/pkg/cache/v2/simple_test.go +++ b/pkg/cache/v2/simple_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "reflect" + "sync" "testing" "time" @@ -54,6 +55,17 @@ var ( []types.Resource{testRuntime}, []types.Resource{testSecret[0]}) + ttl = 2 * time.Second + heartbeat = time.Second + + snapshotWithTtl = cache.NewSnapshotWithTtls(version, + []types.ResourceWithTtl{{Resource: testEndpoint, Ttl: &ttl}}, + []types.ResourceWithTtl{{Resource: testCluster}}, + []types.ResourceWithTtl{{Resource: testRoute}}, + []types.ResourceWithTtl{{Resource: testListener}}, + []types.ResourceWithTtl{{Resource: testRuntime}}, + []types.ResourceWithTtl{{Resource: testSecret[0]}}) + names = map[string][]string{ rsrc.EndpointType: {clusterName}, rsrc.ClusterType: nil, @@ -80,6 +92,94 @@ func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) } func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) } +func TestSnapshotCacheWithTtl(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := cache.NewSnapshotCacheWithHeartbeating(ctx, true, group{}, logger{t: t}, time.Second) + + if _, err := c.GetSnapshot(key); err == nil { + t.Errorf("unexpected snapshot found for key %q", key) + } + + if err := c.SetSnapshot(key, snapshotWithTtl); err != nil { + t.Fatal(err) + } + + snap, err := c.GetSnapshot(key) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(snap, snapshotWithTtl) { + t.Errorf("expect snapshot: %v, got: %v", snapshotWithTtl, snap) + } + + wg := sync.WaitGroup{} + // All the resources should respond immediately when version is not up to date. + for _, typ := range testTypes { + wg.Add(1) + t.Run(typ, func(t *testing.T) { + defer wg.Done() + value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + select { + case out := <-value: + if gotVersion, _ := out.GetVersion(); gotVersion != version { + t.Errorf("got version %q, want %q", gotVersion, version) + } + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) { + t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResourcesAndTtl(typ)) + } + case <-time.After(2 * time.Second): + t.Errorf("failed to receive snapshot response") + } + }) + } + wg.Wait() + + // Once everything is up to date, only the TTL'd resource should send out updates. + wg = sync.WaitGroup{} + updatesByType := map[string]int{} + for _, typ := range testTypes { + wg.Add(1) + go func(typ string) { + defer wg.Done() + + end := time.After(5 * time.Second) + for { + value, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}) + + select { + case out := <-value: + if gotVersion, _ := out.GetVersion(); gotVersion != version { + t.Errorf("got version %q, want %q", gotVersion, version) + } + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) { + t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResources(typ)) + } + + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) { + t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResources(typ)) + } + + updatesByType[typ]++ + case <-end: + cancel() + return + } + } + }(typ) + } + + wg.Wait() + + if len(updatesByType) != 1 { + t.Errorf("expected to only receive updates for TTL'd type, got %v", updatesByType) + } + // Avoid an exact match on number of triggers to avoid this being flaky. + if updatesByType[rsrc.EndpointType] < 2 { + t.Errorf("expected at least two TTL updates for endpoints, got %d", updatesByType[rsrc.EndpointType]) + } +} + func TestSnapshotCache(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) @@ -116,8 +216,8 @@ func TestSnapshotCache(t *testing.T) { if gotVersion, _ := out.GetVersion(); gotVersion != version { t.Errorf("got version %q, want %q", gotVersion, version) } - if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResources(typ)) { - t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResources(typ)) + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) { + t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ)) } case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") @@ -173,8 +273,8 @@ func TestSnapshotCacheWatch(t *testing.T) { if gotVersion, _ := out.GetVersion(); gotVersion != version { t.Errorf("got version %q, want %q", gotVersion, version) } - if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResources(typ)) { - t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResources(typ)) + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) { + t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ)) } case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") diff --git a/pkg/cache/v2/snapshot.go b/pkg/cache/v2/snapshot.go index 3b6eea88ba..4a15bf563a 100644 --- a/pkg/cache/v2/snapshot.go +++ b/pkg/cache/v2/snapshot.go @@ -17,6 +17,7 @@ package cache import ( "errors" "fmt" + "time" "github.com/envoyproxy/go-control-plane/pkg/cache/types" ) @@ -27,20 +28,29 @@ type Resources struct { Version string // Items in the group indexed by name. - Items map[string]types.Resource + Items map[string]types.ResourceWithTtl } // IndexResourcesByName creates a map from the resource name to the resource. -func IndexResourcesByName(items []types.Resource) map[string]types.Resource { - indexed := make(map[string]types.Resource, len(items)) +func IndexResourcesByName(items []types.ResourceWithTtl) map[string]types.ResourceWithTtl { + indexed := make(map[string]types.ResourceWithTtl) for _, item := range items { - indexed[GetResourceName(item)] = item + indexed[GetResourceName(item.Resource)] = item } return indexed } // NewResources creates a new resource group. func NewResources(version string, items []types.Resource) Resources { + itemsWithTtl := []types.ResourceWithTtl{} + for _, item := range items { + itemsWithTtl = append(itemsWithTtl, types.ResourceWithTtl{Resource: item}) + } + return NewResourcesWithTtl(version, itemsWithTtl) +} + +// NewResources creates a new resource group. +func NewResourcesWithTtl(version string, items []types.ResourceWithTtl) Resources { return Resources{ Version: version, Items: IndexResourcesByName(items), @@ -96,6 +106,28 @@ func NewSnapshotWithResources(version string, resources SnapshotResources) Snaps return out } +type ResourceWithTtl struct { + Resources []types.Resource + Ttl *time.Duration +} + +func NewSnapshotWithTtls(version string, + endpoints []types.ResourceWithTtl, + clusters []types.ResourceWithTtl, + routes []types.ResourceWithTtl, + listeners []types.ResourceWithTtl, + runtimes []types.ResourceWithTtl, + secrets []types.ResourceWithTtl) Snapshot { + out := Snapshot{} + out.Resources[types.Endpoint] = NewResourcesWithTtl(version, endpoints) + out.Resources[types.Cluster] = NewResourcesWithTtl(version, clusters) + out.Resources[types.Route] = NewResourcesWithTtl(version, routes) + out.Resources[types.Listener] = NewResourcesWithTtl(version, listeners) + out.Resources[types.Runtime] = NewResourcesWithTtl(version, runtimes) + out.Resources[types.Secret] = NewResourcesWithTtl(version, secrets) + return out +} + // Consistent check verifies that the dependent resources are exactly listed in the // snapshot: // - all EDS resources are listed by name in CDS resources @@ -123,8 +155,24 @@ func (s *Snapshot) Consistent() error { return superset(routes, s.Resources[types.Route].Items) } -// GetResources selects snapshot resources by type. +// GetResources selects snapshot resources by type, returning the map of resources. func (s *Snapshot) GetResources(typeURL string) map[string]types.Resource { + resources := s.GetResourcesAndTtl(typeURL) + if resources == nil { + return nil + } + + withoutTtl := make(map[string]types.Resource, len(resources)) + + for k, v := range resources { + withoutTtl[k] = v.Resource + } + + return withoutTtl +} + +// GetResourcesAndTtl selects snapshot resources by type, returning the map of resources and the associated TTL. +func (s *Snapshot) GetResourcesAndTtl(typeURL string) map[string]types.ResourceWithTtl { if s == nil { return nil } diff --git a/pkg/cache/v3/cache.go b/pkg/cache/v3/cache.go index 485f41212b..ec48d1ab49 100644 --- a/pkg/cache/v3/cache.go +++ b/pkg/cache/v3/cache.go @@ -23,6 +23,7 @@ import ( discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" "github.com/envoyproxy/go-control-plane/pkg/cache/types" + ttl "github.com/envoyproxy/go-control-plane/pkg/ttl/v3" "github.com/golang/protobuf/ptypes/any" ) @@ -83,7 +84,12 @@ type RawResponse struct { Version string // Resources to be included in the response. - Resources []types.Resource + Resources []types.ResourceWithTtl + + // Whether this is a heartbeat response. For xDS versions that support TTL, this + // will be converted into a response that doesn't contain the actual resource protobuf. + // This allows for more lightweight updates that server only to update the TTL timer. + Heartbeat bool // marshaledResponse holds an atomic reference to the serialized discovery response. marshaledResponse atomic.Value @@ -114,12 +120,16 @@ func (r *RawResponse) GetDiscoveryResponse() (*discovery.DiscoveryResponse, erro marshaledResources := make([]*any.Any, len(r.Resources)) for i, resource := range r.Resources { - marshaledResource, err := MarshalResource(resource) + maybeTtldResource, resourceType, err := ttl.MaybeCreateTtlResourceIfSupported(resource, GetResourceName(resource.Resource), r.Request.TypeUrl, r.Heartbeat) + if err != nil { + return nil, err + } + marshaledResource, err := MarshalResource(maybeTtldResource) if err != nil { return nil, err } marshaledResources[i] = &any.Any{ - TypeUrl: r.Request.TypeUrl, + TypeUrl: resourceType, Value: marshaledResource, } } diff --git a/pkg/cache/v3/cache_test.go b/pkg/cache/v3/cache_test.go index 217edaf0d6..b931646758 100644 --- a/pkg/cache/v3/cache_test.go +++ b/pkg/cache/v3/cache_test.go @@ -9,6 +9,7 @@ import ( "github.com/envoyproxy/go-control-plane/pkg/cache/types" "github.com/envoyproxy/go-control-plane/pkg/cache/v3" "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + ttl_helper "github.com/envoyproxy/go-control-plane/pkg/ttl/v2" "github.com/golang/protobuf/ptypes" "github.com/golang/protobuf/ptypes/any" "github.com/stretchr/testify/assert" @@ -19,7 +20,7 @@ const ( ) func TestResponseGetDiscoveryResponse(t *testing.T) { - routes := []types.Resource{&route.RouteConfiguration{Name: resourceName}} + routes := []types.ResourceWithTtl{{Resource: &route.RouteConfiguration{Name: resourceName}}} resp := cache.RawResponse{ Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType}, Version: "v", @@ -66,3 +67,28 @@ func TestPassthroughResponseGetDiscoveryResponse(t *testing.T) { assert.Equal(t, r.Name, resourceName) assert.Equal(t, discoveryResponse, dr) } + +func TestHeartbeatResponseGetDiscoveryResponse(t *testing.T) { + routes := []types.ResourceWithTtl{{Resource: &route.RouteConfiguration{Name: resourceName}}} + resp := cache.RawResponse{ + Request: &discovery.DiscoveryRequest{TypeUrl: resource.RouteType}, + Version: "v", + Resources: routes, + Heartbeat: true, + } + + discoveryResponse, err := resp.GetDiscoveryResponse() + assert.Nil(t, err) + assert.Equal(t, discoveryResponse.VersionInfo, resp.Version) + assert.Equal(t, len(discoveryResponse.Resources), 1) + assert.True(t, ttl_helper.IsTTLResource(discoveryResponse.Resources[0])) + + cachedResponse, err := resp.GetDiscoveryResponse() + assert.Nil(t, err) + assert.Same(t, discoveryResponse, cachedResponse) + + r := &route.RouteConfiguration{} + err = ptypes.UnmarshalAny(discoveryResponse.Resources[0], r) + assert.Nil(t, err) + assert.Equal(t, r.Name, resourceName) +} diff --git a/pkg/cache/v3/linear.go b/pkg/cache/v3/linear.go index b6384aa8b9..10c548f178 100644 --- a/pkg/cache/v3/linear.go +++ b/pkg/cache/v3/linear.go @@ -92,19 +92,19 @@ func NewLinearCache(typeURL string, opts ...LinearCacheOption) *LinearCache { } func (cache *LinearCache) respond(value chan Response, staleResources []string) { - var resources []types.Resource + var resources []types.ResourceWithTtl // TODO: optimize the resources slice creations across different clients if len(staleResources) == 0 { - resources = make([]types.Resource, 0, len(cache.resources)) + resources = make([]types.ResourceWithTtl, 0, len(cache.resources)) for _, resource := range cache.resources { - resources = append(resources, resource) + resources = append(resources, types.ResourceWithTtl{Resource: resource}) } } else { - resources = make([]types.Resource, 0, len(staleResources)) + resources = make([]types.ResourceWithTtl, 0, len(staleResources)) for _, name := range staleResources { resource := cache.resources[name] if resource != nil { - resources = append(resources, resource) + resources = append(resources, types.ResourceWithTtl{Resource: resource}) } } } diff --git a/pkg/cache/v3/resource.go b/pkg/cache/v3/resource.go index d6a2c977c3..014f8400ed 100644 --- a/pkg/cache/v3/resource.go +++ b/pkg/cache/v3/resource.go @@ -87,13 +87,13 @@ func MarshalResource(resource types.Resource) (types.MarshaledResource, error) { // GetResourceReferences returns the names for dependent resources (EDS cluster // names for CDS, RDS routes names for LDS). -func GetResourceReferences(resources map[string]types.Resource) map[string]bool { +func GetResourceReferences(resources map[string]types.ResourceWithTtl) map[string]bool { out := make(map[string]bool) for _, res := range resources { - if res == nil { + if res.Resource == nil { continue } - switch v := res.(type) { + switch v := res.Resource.(type) { case *endpoint.ClusterLoadAssignment: // no dependencies case *cluster.Cluster: diff --git a/pkg/cache/v3/resource_test.go b/pkg/cache/v3/resource_test.go index 201872d2f5..e040a7e15f 100644 --- a/pkg/cache/v3/resource_test.go +++ b/pkg/cache/v3/resource_test.go @@ -139,7 +139,7 @@ func TestGetResourceReferences(t *testing.T) { }, } for _, cs := range cases { - names := cache.GetResourceReferences(cache.IndexResourcesByName([]types.Resource{cs.in})) + names := cache.GetResourceReferences(cache.IndexResourcesByName([]types.ResourceWithTtl{{Resource: cs.in}})) if !reflect.DeepEqual(names, cs.out) { t.Errorf("GetResourceReferences(%v) => got %v, want %v", cs.in, names, cs.out) } diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index d3ae26d175..cecedf82d9 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -61,6 +61,10 @@ type SnapshotCache interface { GetStatusKeys() []string } +type heartbeatHandle struct { + cancel func() +} + type snapshotCache struct { // watchCount is an atomic counter incremented for each watch. This needs to // be the first field in the struct to guarantee that it is 64-bit aligned, @@ -97,13 +101,89 @@ type snapshotCache struct { // // Logger is optional. func NewSnapshotCache(ads bool, hash NodeHash, logger log.Logger) SnapshotCache { - return &snapshotCache{ + return newSnapshotCache(ads, hash, logger) +} + +func newSnapshotCache(ads bool, hash NodeHash, logger log.Logger) *snapshotCache { + cache := &snapshotCache{ log: logger, ads: ads, snapshots: make(map[string]Snapshot), status: make(map[string]*statusInfo), hash: hash, } + + return cache +} + +// NewSnapshotCacheWithHeartbeating initializes a simple cache that sends periodic heartbeat +// responses for resources with a TTL. +// +// ADS flag forces a delay in responding to streaming requests until all +// resources are explicitly named in the request. This avoids the problem of a +// partial request over a single stream for a subset of resources which would +// require generating a fresh version for acknowledgement. ADS flag requires +// snapshot consistency. For non-ADS case (and fetch), multiple partial +// requests are sent across multiple streams and re-using the snapshot version +// is OK. +// +// Logger is optional. +// +// The context provides a way to cancel the heartbeating routine, while the heartbeatInterval +// parameter controls how often heartbeating occurs. +func NewSnapshotCacheWithHeartbeating(ctx context.Context, ads bool, hash NodeHash, logger log.Logger, heartbeatInterval time.Duration) SnapshotCache { + cache := newSnapshotCache(ads, hash, logger) + go func() { + t := time.NewTicker(heartbeatInterval) + + for { + select { + case <-t.C: + cache.mu.Lock() + for node := range cache.status { + // TODO(snowp): Omit heartbeats if a real response has been sent recently. + cache.sendHeartbeats(ctx, node) + } + cache.mu.Unlock() + case <-ctx.Done(): + return + } + } + }() + return cache +} + +func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { + snapshot := cache.snapshots[node] + if info, ok := cache.status[node]; ok { + info.mu.Lock() + for id, watch := range info.watches { + // Respond with the current version regardless of whether the version has changed. + version := snapshot.GetVersion(watch.Request.TypeUrl) + resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl) + + // TODO(snowp): Construct this once per type instead of once per watch. + resourcesWithTtl := map[string]types.ResourceWithTtl{} + for k, v := range resources { + if v.Ttl != nil { + resourcesWithTtl[k] = v + } + } + + if len(resourcesWithTtl) == 0 { + continue + } + if cache.log != nil { + cache.log.Debugf("respond open watch %d%v with heartbeat for version %q", id, watch.Request.ResourceNames, version) + } + + cache.respond(watch.Request, watch.Response, resourcesWithTtl, version, true) + + // The watch must be deleted and we must rely on the client to ack this response to create a new watch. + delete(info.watches, id) + } + info.mu.Unlock() + } } // SetSnapshotCache updates a snapshot for a node. @@ -123,7 +203,8 @@ func (cache *snapshotCache) SetSnapshot(node string, snapshot Snapshot) error { if cache.log != nil { cache.log.Debugf("respond open watch %d%v with new version %q", id, watch.Request.ResourceNames, version) } - cache.respond(watch.Request, watch.Response, snapshot.GetResources(watch.Request.TypeUrl), version) + resources := snapshot.GetResourcesAndTtl(watch.Request.TypeUrl) + cache.respond(watch.Request, watch.Response, resources, version, false) // discard the watch delete(info.watches, id) @@ -166,7 +247,7 @@ func nameSet(names []string) map[string]bool { } // superset checks that all resources are listed in the names set. -func superset(names map[string]bool, resources map[string]types.Resource) error { +func superset(names map[string]bool, resources map[string]types.ResourceWithTtl) error { for resourceName := range resources { if _, exists := names[resourceName]; !exists { return fmt.Errorf("%q not listed", resourceName) @@ -213,7 +294,8 @@ func (cache *snapshotCache) CreateWatch(request *Request) (chan Response, func() } // otherwise, the watch may be responded immediately - cache.respond(request, value, snapshot.GetResources(request.TypeUrl), version) + resources := snapshot.GetResourcesAndTtl(request.TypeUrl) + cache.respond(request, value, resources, version, false) return value, nil } @@ -238,7 +320,7 @@ func (cache *snapshotCache) cancelWatch(nodeID string, watchID int64) func() { // Respond to a watch with the snapshot value. The value channel should have capacity not to block. // TODO(kuat) do not respond always, see issue https://github.com/envoyproxy/go-control-plane/issues/46 -func (cache *snapshotCache) respond(request *Request, value chan Response, resources map[string]types.Resource, version string) { +func (cache *snapshotCache) respond(request *Request, value chan Response, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) { // for ADS, the request names must match the snapshot names // if they do not, then the watch is never responded, and it is expected that envoy makes another request if len(request.ResourceNames) != 0 && cache.ads { @@ -254,11 +336,11 @@ func (cache *snapshotCache) respond(request *Request, value chan Response, resou request.TypeUrl, request.ResourceNames, request.VersionInfo, version) } - value <- createResponse(request, resources, version) + value <- createResponse(request, resources, version, heartbeat) } -func createResponse(request *Request, resources map[string]types.Resource, version string) Response { - filtered := make([]types.Resource, 0, len(resources)) +func createResponse(request *Request, resources map[string]types.ResourceWithTtl, version string, heartbeat bool) Response { + filtered := make([]types.ResourceWithTtl, 0, len(resources)) // Reply only with the requested resources. Envoy may ask each resource // individually in a separate stream. It is ok to reply with the same version @@ -280,6 +362,7 @@ func createResponse(request *Request, resources map[string]types.Resource, versi Request: request, Version: version, Resources: filtered, + Heartbeat: heartbeat, } } @@ -302,8 +385,8 @@ func (cache *snapshotCache) Fetch(ctx context.Context, request *Request) (Respon return nil, &types.SkipFetchError{} } - resources := snapshot.GetResources(request.TypeUrl) - out := createResponse(request, resources, version) + resources := snapshot.GetResourcesAndTtl(request.TypeUrl) + out := createResponse(request, resources, version, false) return out, nil } diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index 7e2399470b..509f268ca8 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "reflect" + "sync" "testing" "time" @@ -55,6 +56,17 @@ var ( []types.Resource{testRuntime}, []types.Resource{testSecret[0]}) + ttl = 2 * time.Second + heartbeat = time.Second + + snapshotWithTtl = cache.NewSnapshotWithTtls(version, + []types.ResourceWithTtl{{Resource: testEndpoint, Ttl: &ttl}}, + []types.ResourceWithTtl{{Resource: testCluster}}, + []types.ResourceWithTtl{{Resource: testRoute}}, + []types.ResourceWithTtl{{Resource: testListener}}, + []types.ResourceWithTtl{{Resource: testRuntime}}, + []types.ResourceWithTtl{{Resource: testSecret[0]}}) + names = map[string][]string{ rsrc.EndpointType: {clusterName}, rsrc.ClusterType: nil, @@ -81,6 +93,94 @@ func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) } func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) } +func TestSnapshotCacheWithTtl(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := cache.NewSnapshotCacheWithHeartbeating(ctx, true, group{}, logger{t: t}, time.Second) + + if _, err := c.GetSnapshot(key); err == nil { + t.Errorf("unexpected snapshot found for key %q", key) + } + + if err := c.SetSnapshot(key, snapshotWithTtl); err != nil { + t.Fatal(err) + } + + snap, err := c.GetSnapshot(key) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(snap, snapshotWithTtl) { + t.Errorf("expect snapshot: %v, got: %v", snapshotWithTtl, snap) + } + + wg := sync.WaitGroup{} + // All the resources should respond immediately when version is not up to date. + for _, typ := range testTypes { + wg.Add(1) + t.Run(typ, func(t *testing.T) { + defer wg.Done() + value, _ := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ]}) + select { + case out := <-value: + if gotVersion, _ := out.GetVersion(); gotVersion != version { + t.Errorf("got version %q, want %q", gotVersion, version) + } + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) { + t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResourcesAndTtl(typ)) + } + case <-time.After(2 * time.Second): + t.Errorf("failed to receive snapshot response") + } + }) + } + wg.Wait() + + // Once everything is up to date, only the TTL'd resource should send out updates. + wg = sync.WaitGroup{} + updatesByType := map[string]int{} + for _, typ := range testTypes { + wg.Add(1) + go func(typ string) { + defer wg.Done() + + end := time.After(5 * time.Second) + for { + value, cancel := c.CreateWatch(&discovery.DiscoveryRequest{TypeUrl: typ, ResourceNames: names[typ], VersionInfo: version}) + + select { + case out := <-value: + if gotVersion, _ := out.GetVersion(); gotVersion != version { + t.Errorf("got version %q, want %q", gotVersion, version) + } + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) { + t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResources(typ)) + } + + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshotWithTtl.GetResourcesAndTtl(typ)) { + t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshotWithTtl.GetResources(typ)) + } + + updatesByType[typ]++ + case <-end: + cancel() + return + } + } + }(typ) + } + + wg.Wait() + + if len(updatesByType) != 1 { + t.Errorf("expected to only receive updates for TTL'd type, got %v", updatesByType) + } + // Avoid an exact match on number of triggers to avoid this being flaky. + if updatesByType[rsrc.EndpointType] < 2 { + t.Errorf("expected at least two TTL updates for endpoints, got %d", updatesByType[rsrc.EndpointType]) + } +} + func TestSnapshotCache(t *testing.T) { c := cache.NewSnapshotCache(true, group{}, logger{t: t}) @@ -117,8 +217,8 @@ func TestSnapshotCache(t *testing.T) { if gotVersion, _ := out.GetVersion(); gotVersion != version { t.Errorf("got version %q, want %q", gotVersion, version) } - if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResources(typ)) { - t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResources(typ)) + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) { + t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ)) } case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") @@ -174,8 +274,8 @@ func TestSnapshotCacheWatch(t *testing.T) { if gotVersion, _ := out.GetVersion(); gotVersion != version { t.Errorf("got version %q, want %q", gotVersion, version) } - if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResources(typ)) { - t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResources(typ)) + if !reflect.DeepEqual(cache.IndexResourcesByName(out.(*cache.RawResponse).Resources), snapshot.GetResourcesAndTtl(typ)) { + t.Errorf("get resources %v, want %v", out.(*cache.RawResponse).Resources, snapshot.GetResourcesAndTtl(typ)) } case <-time.After(time.Second): t.Fatal("failed to receive snapshot response") diff --git a/pkg/cache/v3/snapshot.go b/pkg/cache/v3/snapshot.go index 60d6b5c021..bae73f1736 100644 --- a/pkg/cache/v3/snapshot.go +++ b/pkg/cache/v3/snapshot.go @@ -18,6 +18,7 @@ package cache import ( "errors" "fmt" + "time" "github.com/envoyproxy/go-control-plane/pkg/cache/types" ) @@ -28,20 +29,29 @@ type Resources struct { Version string // Items in the group indexed by name. - Items map[string]types.Resource + Items map[string]types.ResourceWithTtl } // IndexResourcesByName creates a map from the resource name to the resource. -func IndexResourcesByName(items []types.Resource) map[string]types.Resource { - indexed := make(map[string]types.Resource, len(items)) +func IndexResourcesByName(items []types.ResourceWithTtl) map[string]types.ResourceWithTtl { + indexed := make(map[string]types.ResourceWithTtl) for _, item := range items { - indexed[GetResourceName(item)] = item + indexed[GetResourceName(item.Resource)] = item } return indexed } // NewResources creates a new resource group. func NewResources(version string, items []types.Resource) Resources { + itemsWithTtl := []types.ResourceWithTtl{} + for _, item := range items { + itemsWithTtl = append(itemsWithTtl, types.ResourceWithTtl{Resource: item}) + } + return NewResourcesWithTtl(version, itemsWithTtl) +} + +// NewResources creates a new resource group. +func NewResourcesWithTtl(version string, items []types.ResourceWithTtl) Resources { return Resources{ Version: version, Items: IndexResourcesByName(items), @@ -97,6 +107,28 @@ func NewSnapshotWithResources(version string, resources SnapshotResources) Snaps return out } +type ResourceWithTtl struct { + Resources []types.Resource + Ttl *time.Duration +} + +func NewSnapshotWithTtls(version string, + endpoints []types.ResourceWithTtl, + clusters []types.ResourceWithTtl, + routes []types.ResourceWithTtl, + listeners []types.ResourceWithTtl, + runtimes []types.ResourceWithTtl, + secrets []types.ResourceWithTtl) Snapshot { + out := Snapshot{} + out.Resources[types.Endpoint] = NewResourcesWithTtl(version, endpoints) + out.Resources[types.Cluster] = NewResourcesWithTtl(version, clusters) + out.Resources[types.Route] = NewResourcesWithTtl(version, routes) + out.Resources[types.Listener] = NewResourcesWithTtl(version, listeners) + out.Resources[types.Runtime] = NewResourcesWithTtl(version, runtimes) + out.Resources[types.Secret] = NewResourcesWithTtl(version, secrets) + return out +} + // Consistent check verifies that the dependent resources are exactly listed in the // snapshot: // - all EDS resources are listed by name in CDS resources @@ -124,8 +156,24 @@ func (s *Snapshot) Consistent() error { return superset(routes, s.Resources[types.Route].Items) } -// GetResources selects snapshot resources by type. +// GetResources selects snapshot resources by type, returning the map of resources. func (s *Snapshot) GetResources(typeURL string) map[string]types.Resource { + resources := s.GetResourcesAndTtl(typeURL) + if resources == nil { + return nil + } + + withoutTtl := make(map[string]types.Resource, len(resources)) + + for k, v := range resources { + withoutTtl[k] = v.Resource + } + + return withoutTtl +} + +// GetResourcesAndTtl selects snapshot resources by type, returning the map of resources and the associated TTL. +func (s *Snapshot) GetResourcesAndTtl(typeURL string) map[string]types.ResourceWithTtl { if s == nil { return nil } diff --git a/pkg/integration/ttl_integration_test.go b/pkg/integration/ttl_integration_test.go new file mode 100644 index 0000000000..07e478062a --- /dev/null +++ b/pkg/integration/ttl_integration_test.go @@ -0,0 +1,139 @@ +package integration + +import ( + "context" + "net" + "testing" + "time" + + envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3" + envoy_service_discovery_v3 "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "github.com/envoyproxy/go-control-plane/pkg/cache/v3" + "github.com/envoyproxy/go-control-plane/pkg/resource/v3" + "github.com/envoyproxy/go-control-plane/pkg/server/v3" + "github.com/golang/protobuf/ptypes" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +type logger struct { + t *testing.T +} + +func (log logger) Debugf(format string, args ...interface{}) { log.t.Logf(format, args...) } +func (log logger) Infof(format string, args ...interface{}) { log.t.Logf(format, args...) } +func (log logger) Warnf(format string, args ...interface{}) { log.t.Logf(format, args...) } +func (log logger) Errorf(format string, args ...interface{}) { log.t.Logf(format, args...) } + +func TestTtlResponse(t *testing.T) { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + snapshotCache := cache.NewSnapshotCacheWithHeartbeating(ctx, false, cache.IDHash{}, logger{t: t}, time.Second) + + server := server.NewServer(ctx, snapshotCache, nil) + + grpcServer := grpc.NewServer() + endpointservice.RegisterEndpointDiscoveryServiceServer(grpcServer, server) + + l, err := net.Listen("tcp", ":9999") + assert.NoError(t, err) + + go func() { + grpcServer.Serve(l) + }() + defer grpcServer.Stop() + + conn, err := grpc.Dial(":9999", grpc.WithInsecure()) + assert.NoError(t, err) + client := endpointservice.NewEndpointDiscoveryServiceClient(conn) + + sclient, err := client.StreamEndpoints(ctx) + assert.NoError(t, err) + + err = sclient.Send(&envoy_service_discovery_v3.DiscoveryRequest{ + Node: &envoy_config_core_v3.Node{ + Id: "test", + }, + ResourceNames: []string{"resource"}, + TypeUrl: resource.EndpointType, + }) + assert.NoError(t, err) + + oneSecond := time.Second + cla := &envoy_config_endpoint_v3.ClusterLoadAssignment{ClusterName: "resource"} + err = snapshotCache.SetSnapshot("test", cache.NewSnapshotWithTtls("1", []types.ResourceWithTtl{{ + Resource: cla, + Ttl: &oneSecond, + }}, nil, nil, nil, nil, nil)) + assert.NoError(t, err) + + timeout := time.NewTimer(5 * time.Second) + + awaitResponse := func() *envoy_service_discovery_v3.DiscoveryResponse { + t.Helper() + doneCh := make(chan *envoy_service_discovery_v3.DiscoveryResponse) + go func() { + + r, err := sclient.Recv() + assert.NoError(t, err) + + doneCh <- r + }() + + select { + case <-timeout.C: + assert.Fail(t, "timed out") + return nil + case r := <-doneCh: + return r + } + } + + response := awaitResponse() + isFullResponseWithTTL(t, response) + + err = sclient.Send(&envoy_service_discovery_v3.DiscoveryRequest{ + Node: &envoy_config_core_v3.Node{ + Id: "test", + }, + ResourceNames: []string{"resource"}, + TypeUrl: resource.EndpointType, + VersionInfo: "1", + ResponseNonce: response.Nonce, + }) + assert.NoError(t, err) + + response = awaitResponse() + isHeartbeatResponseWithTTL(t, response) +} + +func isFullResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) { + t.Helper() + + assert.Len(t, response.Resources, 1) + r := response.Resources[0] + resource := &envoy_service_discovery_v3.Resource{} + err := ptypes.UnmarshalAny(r, resource) + assert.NoError(t, err) + + assert.NotNil(t, resource.Ttl) + assert.NotNil(t, resource.Resource) +} + +func isHeartbeatResponseWithTTL(t *testing.T, response *envoy_service_discovery_v3.DiscoveryResponse) { + t.Helper() + + assert.Len(t, response.Resources, 1) + r := response.Resources[0] + resource := &envoy_service_discovery_v3.Resource{} + err := ptypes.UnmarshalAny(r, resource) + assert.NoError(t, err) + + assert.NotNil(t, resource.Ttl) + assert.Nil(t, resource.Resource) +} diff --git a/pkg/server/v2/gateway_test.go b/pkg/server/v2/gateway_test.go index 9a83e4e34e..1fbef90aab 100644 --- a/pkg/server/v2/gateway_test.go +++ b/pkg/server/v2/gateway_test.go @@ -45,21 +45,21 @@ func TestGateway(t *testing.T) { resource.ClusterType: { &cache.RawResponse{ Version: "2", - Resources: []types.Resource{cluster}, + Resources: []types.ResourceWithTtl{{Resource: cluster}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType}, }, }, resource.RouteType: { &cache.RawResponse{ Version: "3", - Resources: []types.Resource{route}, + Resources: []types.ResourceWithTtl{{Resource: route}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RouteType}, }, }, resource.ListenerType: { &cache.RawResponse{ Version: "4", - Resources: []types.Resource{listener}, + Resources: []types.ResourceWithTtl{{Resource: listener}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ListenerType}, }, }, diff --git a/pkg/server/v2/server_test.go b/pkg/server/v2/server_test.go index b8f12bf7b9..6bcf6415af 100644 --- a/pkg/server/v2/server_test.go +++ b/pkg/server/v2/server_test.go @@ -172,42 +172,42 @@ func makeResponses() map[string][]cache.Response { rsrc.EndpointType: { &cache.RawResponse{ Version: "1", - Resources: []types.Resource{endpoint}, + Resources: []types.ResourceWithTtl{{Resource: endpoint}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType}, }, }, rsrc.ClusterType: { &cache.RawResponse{ Version: "2", - Resources: []types.Resource{cluster}, + Resources: []types.ResourceWithTtl{{Resource: cluster}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType}, }, }, rsrc.RouteType: { &cache.RawResponse{ Version: "3", - Resources: []types.Resource{route}, + Resources: []types.ResourceWithTtl{{Resource: route}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RouteType}, }, }, rsrc.ListenerType: { &cache.RawResponse{ Version: "4", - Resources: []types.Resource{listener}, + Resources: []types.ResourceWithTtl{{Resource: listener}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ListenerType}, }, }, rsrc.SecretType: { &cache.RawResponse{ Version: "5", - Resources: []types.Resource{secret}, + Resources: []types.ResourceWithTtl{{Resource: secret}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.SecretType}, }, }, rsrc.RuntimeType: { &cache.RawResponse{ Version: "6", - Resources: []types.Resource{runtime}, + Resources: []types.ResourceWithTtl{{Resource: runtime}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RuntimeType}, }, }, @@ -215,7 +215,7 @@ func makeResponses() map[string][]cache.Response { opaqueType: { &cache.RawResponse{ Version: "7", - Resources: []types.Resource{opaque}, + Resources: []types.ResourceWithTtl{{Resource: opaque}}, Request: &discovery.DiscoveryRequest{TypeUrl: opaqueType}, }, }, diff --git a/pkg/server/v3/gateway_test.go b/pkg/server/v3/gateway_test.go index 1c106c2c2f..3bb28e36e8 100644 --- a/pkg/server/v3/gateway_test.go +++ b/pkg/server/v3/gateway_test.go @@ -46,21 +46,21 @@ func TestGateway(t *testing.T) { resource.ClusterType: { &cache.RawResponse{ Version: "2", - Resources: []types.Resource{cluster}, + Resources: []types.ResourceWithTtl{{Resource: cluster}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType}, }, }, resource.RouteType: { &cache.RawResponse{ Version: "3", - Resources: []types.Resource{route}, + Resources: []types.ResourceWithTtl{{Resource: route}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RouteType}, }, }, resource.ListenerType: { &cache.RawResponse{ Version: "4", - Resources: []types.Resource{listener}, + Resources: []types.ResourceWithTtl{{Resource: listener}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ListenerType}, }, }, diff --git a/pkg/server/v3/server_test.go b/pkg/server/v3/server_test.go index 6a4b74b0ce..0715755319 100644 --- a/pkg/server/v3/server_test.go +++ b/pkg/server/v3/server_test.go @@ -173,42 +173,42 @@ func makeResponses() map[string][]cache.Response { rsrc.EndpointType: { &cache.RawResponse{ Version: "1", - Resources: []types.Resource{endpoint}, + Resources: []types.ResourceWithTtl{{Resource: endpoint}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.EndpointType}, }, }, rsrc.ClusterType: { &cache.RawResponse{ Version: "2", - Resources: []types.Resource{cluster}, + Resources: []types.ResourceWithTtl{{Resource: cluster}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ClusterType}, }, }, rsrc.RouteType: { &cache.RawResponse{ Version: "3", - Resources: []types.Resource{route}, + Resources: []types.ResourceWithTtl{{Resource: route}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RouteType}, }, }, rsrc.ListenerType: { &cache.RawResponse{ Version: "4", - Resources: []types.Resource{listener}, + Resources: []types.ResourceWithTtl{{Resource: listener}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.ListenerType}, }, }, rsrc.SecretType: { &cache.RawResponse{ Version: "5", - Resources: []types.Resource{secret}, + Resources: []types.ResourceWithTtl{{Resource: secret}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.SecretType}, }, }, rsrc.RuntimeType: { &cache.RawResponse{ Version: "6", - Resources: []types.Resource{runtime}, + Resources: []types.ResourceWithTtl{{Resource: runtime}}, Request: &discovery.DiscoveryRequest{TypeUrl: rsrc.RuntimeType}, }, }, @@ -216,7 +216,7 @@ func makeResponses() map[string][]cache.Response { opaqueType: { &cache.RawResponse{ Version: "7", - Resources: []types.Resource{opaque}, + Resources: []types.ResourceWithTtl{{Resource: opaque}}, Request: &discovery.DiscoveryRequest{TypeUrl: opaqueType}, }, }, diff --git a/pkg/ttl/v2/ttl.go b/pkg/ttl/v2/ttl.go new file mode 100644 index 0000000000..3b587c5ee4 --- /dev/null +++ b/pkg/ttl/v2/ttl.go @@ -0,0 +1,18 @@ +package ttl + +import ( + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "github.com/golang/protobuf/ptypes/any" +) + +// Helper functions for interacting with TTL resources for xDS V2. Since TTL resources are not supported for V2, these are +// essentially noops. + +func MaybeCreateTtlResourceIfSupported(resource types.ResourceWithTtl, name string, resourceTypeUrl string, heartbeat bool) (types.Resource, string, error) { + return resource.Resource, resourceTypeUrl, nil +} + +func IsTTLResource(resource *any.Any) bool { + // This is just used in test; pretend like all resources have a TTL in V2 for testing purposes. + return true +} diff --git a/pkg/ttl/v3/ttl.go b/pkg/ttl/v3/ttl.go new file mode 100644 index 0000000000..e1d0e59d19 --- /dev/null +++ b/pkg/ttl/v3/ttl.go @@ -0,0 +1,48 @@ +package ttl + +import ( + discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/envoyproxy/go-control-plane/pkg/cache/types" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/any" +) + +var deltaResourceTypeURL = "type.googleapis.com/" + proto.MessageName(&discovery.Resource{}) + +// Helper functions for interacting with TTL resources for xDS V3. A resource will be wrapped in a discovery.Resource in order +// to allow specifying a TTL. If the resource is meant to be a heartbeat response, only the resource name and TTL will be set +// to avoid having to send the entire resource down. + +func MaybeCreateTtlResourceIfSupported(resource types.ResourceWithTtl, name string, resourceTypeUrl string, heartbeat bool) (types.Resource, string, error) { + if resource.Ttl != nil { + wrappedResource := &discovery.Resource{ + Name: name, + Ttl: ptypes.DurationProto(*resource.Ttl), + } + + if !heartbeat { + any, err := ptypes.MarshalAny(resource.Resource) + if err != nil { + return nil, "", err + } + any.TypeUrl = resourceTypeUrl + wrappedResource.Resource = any + } + + return wrappedResource, deltaResourceTypeURL, nil + } + + return resource.Resource, resourceTypeUrl, nil +} + +func IsTTLResource(resource *any.Any) bool { + // This is only done in test, so no need to worry about the overhead of the marshalling. + wrappedResource := &discovery.Resource{} + err := ptypes.UnmarshalAny(resource, wrappedResource) + if err != nil { + return false + } + + return wrappedResource.Resource == nil +} diff --git a/scripts/create_version.sh b/scripts/create_version.sh index e6645eed51..e9e52a86b4 100755 --- a/scripts/create_version.sh +++ b/scripts/create_version.sh @@ -34,6 +34,7 @@ MODULES=( 'clusterservice "github.com/envoyproxy/go-control-plane/envoy/api/v2 '"github.com/envoyproxy/go-control-plane/pkg/server/v2":"github.com/envoyproxy/go-control-plane/pkg/server/v3"' '"github.com/envoyproxy/go-control-plane/pkg/server/rest/v2":"github.com/envoyproxy/go-control-plane/pkg/server/rest/v3"' '"github.com/envoyproxy/go-control-plane/pkg/server/sotw/v2":"github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3"' + 'ttl "github.com/envoyproxy/go-control-plane/pkg/ttl/v2":ttl "github.com/envoyproxy/go-control-plane/pkg/ttl/v3"' ) workdir="$(dirname "$0")"