diff --git a/.changelog/12174.txt b/.changelog/12174.txt new file mode 100644 index 00000000000..b839df4eca4 --- /dev/null +++ b/.changelog/12174.txt @@ -0,0 +1,3 @@ +```release-note:bug +xds: fix for delta xDS reconnect bug in LDS/CDS +``` diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 4a8370a5571..c7fcb3592bf 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -165,12 +165,6 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove return status.Errorf(codes.InvalidArgument, "type URL is required for ADS") } - if handler, ok := handlers[req.TypeUrl]; ok { - if handler.Recv(req) { - generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl) - } - } - if node == nil && req.Node != nil { node = req.Node var err error @@ -180,6 +174,12 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove } } + if handler, ok := handlers[req.TypeUrl]; ok { + if handler.Recv(req, generator.ProxyFeatures) { + generator.Logger.Trace("subscribing to type", "typeUrl", req.TypeUrl) + } + } + case cfgSnap = <-stateCh: newRes, err := generator.allResourcesFromSnapshot(cfgSnap) if err != nil { @@ -440,7 +440,7 @@ func newDeltaType( // Recv handles new discovery requests from envoy. // // Returns true the first time a type receives a request. -func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool { +func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest, sf supportedProxyFeatures) bool { if t == nil { return false // not something we care about } @@ -453,6 +453,16 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool t.wildcard = len(req.ResourceNamesSubscribe) == 0 t.registered = true registeredThisTime = true + + if sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect { + switch t.typeURL { + case ListenerType, ClusterType: + if !t.wildcard { + t.wildcard = true + logger.Trace("fixing Envoy bug fixed in 1.19.0 by inferring wildcard mode for type") + } + } + } } /* diff --git a/agent/xds/delta_test.go b/agent/xds/delta_test.go index 573caa74cf9..14dac655b10 100644 --- a/agent/xds/delta_test.go +++ b/agent/xds/delta_test.go @@ -554,6 +554,72 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T) } } +func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t *testing.T) { + // This illustrates a scenario related to https://github.com/hashicorp/consul/issues/11833 + + aclResolve := func(id string) (acl.Authorizer, error) { + // Allow all + return acl.RootAuthorizer("manage"), nil + } + scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0) + _, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy + envoy.EnvoyVersion = "1.18.0" + + sid := structs.NewServiceID("web-sidecar-proxy", nil) + + // Register the proxy to create state needed to Watch() on + mgr.RegisterProxy(t, sid) + + var snap *proxycfg.ConfigSnapshot + runStep(t, "get into state after consul restarted", func(t *testing.T) { + snap = newTestSnapshot(t, nil, "") + + // Send initial cluster discover. + // This is to simulate the discovery request call from envoy after disconnected from consul ads stream. + // + // We need to force it to be an older version of envoy so that the logic shifts. + envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{ + ResourceNamesSubscribe: []string{ + "local_app", + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul", + }, + InitialResourceVersions: map[string]string{ + "local_app": "a948904f2f0f479b8f8197694b30184b0d2ed1c1cd2a1ec0fb85d299a192a447", + "db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": "5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03", + }, + }) + + // Check no response sent yet + assertDeltaChanBlocked(t, envoy.deltaStream.sendCh) + + requireProtocolVersionGauge(t, scenario, "v3", 1) + + // Deliver a new snapshot + // the config contains 3 clusters: local_app, db, geo-cache. + // this is to simulate the fact that there is one additional (upstream) cluster gets added to the sidecar service + // during the time xds disconnected (consul restarted). + mgr.DeliverConfig(t, sid, snap) + + assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{ + TypeUrl: ClusterType, + Nonce: hexString(1), + Resources: makeTestResources(t, + makeTestCluster(t, snap, "tcp:local_app"), + makeTestCluster(t, snap, "tcp:db"), + makeTestCluster(t, snap, "tcp:geo-cache"), + ), + }) + }) + + envoy.Close() + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(50 * time.Millisecond): + t.Fatalf("timed out waiting for handler to finish") + } +} + func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) { aclResolve := func(id string) (acl.Authorizer, error) { // Allow all diff --git a/agent/xds/envoy_versioning.go b/agent/xds/envoy_versioning.go index 0e218ad6594..0af28495604 100644 --- a/agent/xds/envoy_versioning.go +++ b/agent/xds/envoy_versioning.go @@ -26,11 +26,6 @@ type unsupportedVersion struct { } type supportedProxyFeatures struct { - // add version dependent feature flags here - - // GatewaysNeedStubClusterWhenEmptyWithIncrementalXDS is needed to paper - // over some weird envoy behavior. - // // For some reason Envoy versions prior to 1.16.0 when sent an empty CDS // list via the incremental xDS protocol will correctly ack the message and // just never request LDS resources. @@ -45,6 +40,19 @@ type supportedProxyFeatures struct { // issue: https://github.com/envoyproxy/envoy/issues/11877 // PR: https://github.com/envoyproxy/envoy/pull/12069 IncrementalXDSUpdatesMustBeSerial bool + + // Older versions of Envoy incorrectly exploded a wildcard subscription for + // LDS and CDS into specific line items on incremental xDS reconnect. They + // would populate both InitialResourceVersions and ResourceNamesSubscribe + // when they SHOULD have left ResourceNamesSubscribe empty (or used an + // explicit "*" in later Envoy versions) to imply wildcard mode. On + // reconnect, Consul interpreted the lack of the wildcard attribute as + // implying that the Envoy instance should not receive updates for any + // newly created listeners and clusters for the remaining life of that + // Envoy sidecar process. + // see: https://github.com/envoyproxy/envoy/issues/16063 + // see: https://github.com/envoyproxy/envoy/pull/16153 + ForceLDSandCDSToAlwaysUseWildcardsOnReconnect bool } func determineSupportedProxyFeatures(node *envoy_core_v3.Node) (supportedProxyFeatures, error) { @@ -90,6 +98,9 @@ func determineSupportedProxyFeaturesFromVersion(version *version.Version) (suppo sf.IncrementalXDSUpdatesMustBeSerial = true } + // All envoy versions available in Consul 1.10.x need this fix. + sf.ForceLDSandCDSToAlwaysUseWildcardsOnReconnect = true + return sf, nil } diff --git a/agent/xds/envoy_versioning_test.go b/agent/xds/envoy_versioning_test.go index 007f328fe94..d5fd8a2b6d3 100644 --- a/agent/xds/envoy_versioning_test.go +++ b/agent/xds/envoy_versioning_test.go @@ -115,6 +115,7 @@ func TestDetermineSupportedProxyFeaturesFromString(t *testing.T) { cases[v] = testcase{expect: supportedProxyFeatures{ GatewaysNeedStubClusterWhenEmptyWithIncrementalXDS: true, IncrementalXDSUpdatesMustBeSerial: true, + ForceLDSandCDSToAlwaysUseWildcardsOnReconnect: true, }} } for _, v := range []string{ @@ -122,7 +123,9 @@ func TestDetermineSupportedProxyFeaturesFromString(t *testing.T) { "1.17.0", "1.17.1", "1.17.2", "1.17.3", "1.17.4", "1.18.0", "1.18.1", "1.18.2", "1.18.3", "1.18.4", } { - cases[v] = testcase{expect: supportedProxyFeatures{}} + cases[v] = testcase{expect: supportedProxyFeatures{ + ForceLDSandCDSToAlwaysUseWildcardsOnReconnect: true, + }} } for name, tc := range cases { diff --git a/agent/xds/testing.go b/agent/xds/testing.go index 1ec7b8f6599..81cd835b04c 100644 --- a/agent/xds/testing.go +++ b/agent/xds/testing.go @@ -136,6 +136,8 @@ type TestEnvoy struct { proxyID string token string + EnvoyVersion string + stream *TestADSStream // SoTW v2 deltaStream *TestADSDeltaStream // Incremental v3 } @@ -275,9 +277,14 @@ func (e *TestEnvoy) sendDeltaReq( e.mu.Lock() defer e.mu.Unlock() - ev, valid := stringToEnvoyVersion(proxysupport.EnvoyVersions[0]) + stringVersion := e.EnvoyVersion + if stringVersion == "" { + stringVersion = proxysupport.EnvoyVersions[0] + } + + ev, valid := stringToEnvoyVersion(stringVersion) if !valid { - t.Fatal("envoy version is not valid: %s", proxysupport.EnvoyVersions[0]) + t.Fatal("envoy version is not valid: %s", stringVersion) } if req == nil {