diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 71c97af9339..d64cc72bbf8 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -106,18 +106,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove handlers := map[string]*xDSDeltaType{ ListenerType: newDeltaType(generator, stream, ListenerType, func(kind structs.ServiceKind) bool { return cfgSnap.Kind == structs.ServiceKindIngressGateway - }), + }, true), RouteType: newDeltaType(generator, stream, RouteType, func(kind structs.ServiceKind) bool { return cfgSnap.Kind == structs.ServiceKindIngressGateway - }), + }, false), ClusterType: newDeltaType(generator, stream, ClusterType, func(kind structs.ServiceKind) bool { // Mesh, Ingress, and Terminating gateways are allowed to inform CDS of // no clusters. return cfgSnap.Kind == structs.ServiceKindMeshGateway || cfgSnap.Kind == structs.ServiceKindTerminatingGateway || cfgSnap.Kind == structs.ServiceKindIngressGateway - }), - EndpointType: newDeltaType(generator, stream, EndpointType, nil), + }, true), + EndpointType: newDeltaType(generator, stream, EndpointType, nil, false), } // Endpoints are stored within a Cluster (and Routes @@ -379,6 +379,10 @@ type xDSDeltaType struct { // specific resource names. subscribe/unsubscribe are ignored. wildcard bool + // alwaysWildCard indicates that this type is always wildcard regardless of + // specific resource names in the initial request. + alwaysWildCard bool + // sentToEnvoyOnce is true after we've sent one response to envoy. sentToEnvoyOnce bool @@ -419,12 +423,14 @@ func newDeltaType( stream ADSDeltaStream, typeUrl string, allowEmptyFn func(kind structs.ServiceKind) bool, + alwaysWildcard bool, ) *xDSDeltaType { return &xDSDeltaType{ generator: generator, stream: stream, typeURL: typeUrl, allowEmptyFn: allowEmptyFn, + alwaysWildCard: alwaysWildcard, subscriptions: make(map[string]struct{}), resourceVersions: make(map[string]string), pendingUpdates: make(map[string]map[string]PendingUpdate), @@ -442,9 +448,9 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool registeredThisTime := false if !t.registered { - // We are in the wildcard mode if the first request of a particular - // type has empty subscription list - t.wildcard = len(req.ResourceNamesSubscribe) == 0 + // We are in the wildcard mode if a type should always be set to wildcard, + // or the first request of this type has empty subscription list + t.wildcard = t.alwaysWildCard || len(req.ResourceNamesSubscribe) == 0 t.registered = true registeredThisTime = true } diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 99a1f0393b5..719809fd688 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -554,6 +554,69 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) } } +func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t *testing.T) { + // This illustrates a scenario related to https://github.com/hashicorp/consul/issues/11833 + + aclResolve := func(id string) (acl.Authorizer, error) { + // Allow all + return acl.RootAuthorizer("manage"), nil + } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + _, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, sid) + + var snap *proxycfg.ConfigSnapshot + runStep(t, "get into state after consul restarted", func(t *testing.T) { + snap = newTestSnapshot(t, nil, "") + + // Send initial cluster discover. + // This is to simulate the discovery request call from envoy after disconnected from consul ads stream. + envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "local_app", + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + }, + InitialResourceVersions: map[string]string{ + "local_app": "a948904f2f0f479b8f8197694b30184b0d2ed1c1cd2a1ec0fb85d299a192a447", + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": "5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03", + }, + }) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + requireProtocolVersionGauge(t, scenario, "v3", 1) + + // Deliver a new snapshot + // the config contains 3 clusters: local_app, db, geo-cache. + // this is to simulate the fact that there is one additional (upstream) cluster gets added to the sidecar service + // during the time xds disconnected (consul restarted). + mgr.DeliverConfig(t, sid, snap) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + }) + + envoy.Close() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) { aclResolve := func(id string) (acl.Authorizer, error) { // Allow all