Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,18 @@ func (s *Server) processDelta(stream ADSDeltaStream, reqCh <-chan *envoy_discove
handlers := map[string]*xDSDeltaType{
ListenerType: newDeltaType(generator, stream, ListenerType, func(kind structs.ServiceKind) bool {
return cfgSnap.Kind == structs.ServiceKindIngressGateway
}),
}, true),
RouteType: newDeltaType(generator, stream, RouteType, func(kind structs.ServiceKind) bool {
return cfgSnap.Kind == structs.ServiceKindIngressGateway
}),
}, false),
ClusterType: newDeltaType(generator, stream, ClusterType, func(kind structs.ServiceKind) bool {
// Mesh, Ingress, and Terminating gateways are allowed to inform CDS of
// no clusters.
return cfgSnap.Kind == structs.ServiceKindMeshGateway ||
cfgSnap.Kind == structs.ServiceKindTerminatingGateway ||
cfgSnap.Kind == structs.ServiceKindIngressGateway
}),
EndpointType: newDeltaType(generator, stream, EndpointType, nil),
}, true),
EndpointType: newDeltaType(generator, stream, EndpointType, nil, false),
}

// Endpoints are stored within a Cluster (and Routes
Expand Down Expand Up @@ -379,6 +379,10 @@ type xDSDeltaType struct {
// specific resource names. subscribe/unsubscribe are ignored.
wildcard bool

// alwaysWildCard indicates that this type is always wildcard regardless of
// specific resource names in the initial request.
alwaysWildCard bool

// sentToEnvoyOnce is true after we've sent one response to envoy.
sentToEnvoyOnce bool

Expand Down Expand Up @@ -419,12 +423,14 @@ func newDeltaType(
stream ADSDeltaStream,
typeUrl string,
allowEmptyFn func(kind structs.ServiceKind) bool,
alwaysWildcard bool,
) *xDSDeltaType {
return &xDSDeltaType{
generator: generator,
stream: stream,
typeURL: typeUrl,
allowEmptyFn: allowEmptyFn,
alwaysWildCard: alwaysWildcard,
subscriptions: make(map[string]struct{}),
resourceVersions: make(map[string]string),
pendingUpdates: make(map[string]map[string]PendingUpdate),
Expand All @@ -442,9 +448,9 @@ func (t *xDSDeltaType) Recv(req *envoy_discovery_v3.DeltaDiscoveryRequest) bool

registeredThisTime := false
if !t.registered {
// We are in the wildcard mode if the first request of a particular
// type has empty subscription list
t.wildcard = len(req.ResourceNamesSubscribe) == 0
// We are in the wildcard mode if a type should always be set to wildcard,
// or the first request of this type has empty subscription list
t.wildcard = t.alwaysWildCard || len(req.ResourceNamesSubscribe) == 0
t.registered = true
registeredThisTime = true
}
Expand Down
63 changes: 63 additions & 0 deletions agent/xds/delta_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,69 @@ func TestServer_DeltaAggregatedResources_v3_SlowEndpointPopulation(t *testing.T)
}
}

func TestServer_DeltaAggregatedResources_v3_GetAllClusterAfterConsulRestarted(t *testing.T) {
// This illustrates a scenario related to https://github.com/hashicorp/consul/issues/11833

aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
return acl.RootAuthorizer("manage"), nil
}
scenario := newTestServerDeltaScenario(t, aclResolve, "web-sidecar-proxy", "", 0)
_, mgr, errCh, envoy := scenario.server, scenario.mgr, scenario.errCh, scenario.envoy

sid := structs.NewServiceID("web-sidecar-proxy", nil)

// Register the proxy to create state needed to Watch() on
mgr.RegisterProxy(t, sid)

var snap *proxycfg.ConfigSnapshot
runStep(t, "get into state after consul restarted", func(t *testing.T) {
snap = newTestSnapshot(t, nil, "")

// Send initial cluster discover.
// This is to simulate the discovery request call from envoy after disconnected from consul ads stream.
envoy.SendDeltaReq(t, ClusterType, &envoy_discovery_v3.DeltaDiscoveryRequest{
ResourceNamesSubscribe: []string{
"local_app",
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul",
},
InitialResourceVersions: map[string]string{
"local_app": "a948904f2f0f479b8f8197694b30184b0d2ed1c1cd2a1ec0fb85d299a192a447",
"db.default.dc1.internal.11111111-2222-3333-4444-555555555555.consul": "5891b5b522d5df086d0ff0b110fbd9d21bb4fc7163af34d08286a2e846f6be03",
},
})

// Check no response sent yet
assertDeltaChanBlocked(t, envoy.deltaStream.sendCh)

requireProtocolVersionGauge(t, scenario, "v3", 1)

// Deliver a new snapshot
// the config contains 3 clusters: local_app, db, geo-cache.
// this is to simulate the fact that there is one additional (upstream) cluster gets added to the sidecar service
// during the time xds disconnected (consul restarted).
mgr.DeliverConfig(t, sid, snap)

assertDeltaResponseSent(t, envoy.deltaStream.sendCh, &envoy_discovery_v3.DeltaDiscoveryResponse{
TypeUrl: ClusterType,
Nonce: hexString(1),
Resources: makeTestResources(t,
makeTestCluster(t, snap, "tcp:local_app"),
makeTestCluster(t, snap, "tcp:db"),
makeTestCluster(t, snap, "tcp:geo-cache"),
),
})
})

envoy.Close()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(50 * time.Millisecond):
t.Fatalf("timed out waiting for handler to finish")
}
}

func TestServer_DeltaAggregatedResources_v3_BasicProtocol_TCP_clusterChangesImpactEndpoints(t *testing.T) {
aclResolve := func(id string) (acl.Authorizer, error) {
// Allow all
Expand Down