From 625ee4651224e87510771d1ec80e062e649f6484 Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Wed, 19 Apr 2023 15:08:29 -0500 Subject: [PATCH 1/4] XDS primitive generation for endpoints and clusters Co-authored-by: Nathan Coleman --- agent/proxycfg/api_gateway.go | 38 ++++++++- agent/proxycfg/snapshot.go | 1 + agent/proxycfg/testing_upstreams.go | 108 ++---------------------- agent/proxycfg/testing_upstreams_oss.go | 25 ++++++ agent/xds/clusters.go | 54 ++++++++++-- agent/xds/endpoints.go | 68 +++++++++++++-- agent/xds/listeners_apigateway.go | 21 +++++ 7 files changed, 198 insertions(+), 117 deletions(-) create mode 100644 agent/proxycfg/testing_upstreams_oss.go create mode 100644 agent/xds/listeners_apigateway.go diff --git a/agent/proxycfg/api_gateway.go b/agent/proxycfg/api_gateway.go index 18abed7b25d..4557e5322ec 100644 --- a/agent/proxycfg/api_gateway.go +++ b/agent/proxycfg/api_gateway.go @@ -128,7 +128,7 @@ func (h *handlerAPIGateway) handleUpdate(ctx context.Context, u UpdateEvent, sna return (*handlerUpstreams)(h).handleUpdateUpstreams(ctx, u, snap) } - return nil + return h.recompileDiscoveryChains(snap) } // handleRootCAUpdate responds to changes in the watched root CA for a gateway @@ -420,6 +420,42 @@ func (h *handlerAPIGateway) handleRouteConfigUpdate(ctx context.Context, u Updat return nil } +func (h *handlerAPIGateway) recompileDiscoveryChains(snap *ConfigSnapshot) error { + synthesizedChains := map[UpstreamID]*structs.CompiledDiscoveryChain{} + + for name, listener := range snap.APIGateway.Listeners { + boundListener, ok := snap.APIGateway.BoundListeners[name] + if !ok { + // Skip any listeners that don't have a bound listener. Once the bound listener is created, this will be run again. + continue + } + + if !snap.APIGateway.GatewayConfig.ListenerIsReady(name) { + // skip any listeners that might be in an invalid state + continue + } + + // Create a synthesized discovery chain for each service. + services, upstreams, compiled, err := snap.APIGateway.synthesizeChains(h.source.Datacenter, listener, boundListener) + if err != nil { + return err + } + + if len(upstreams) == 0 { + // skip if we can't construct any upstreams + continue + } + + for i, service := range services { + id := NewUpstreamIDFromServiceName(structs.NewServiceName(service.Name, &service.EnterpriseMeta)) + synthesizedChains[id] = compiled[i] + } + } + + snap.APIGateway.DiscoveryChain = synthesizedChains + return nil +} + // referenceIsForListener returns whether the provided structs.ResourceReference // targets the provided structs.APIGatewayListener. For this to be true, the kind // and name must match the structs.APIGatewayConfigEntry containing the listener, diff --git a/agent/proxycfg/snapshot.go b/agent/proxycfg/snapshot.go index cc3341d687a..576dd395106 100644 --- a/agent/proxycfg/snapshot.go +++ b/agent/proxycfg/snapshot.go @@ -913,6 +913,7 @@ DOMAIN_LOOP: return services, upstreams, compiled, err } +// TODO use this in listener code func (c *configSnapshotAPIGateway) toIngressTLS(key IngressListenerKey, listener structs.APIGatewayListener, bound structs.BoundAPIGatewayListener) (*structs.GatewayTLSConfig, error) { if len(listener.TLS.Certificates) == 0 { return nil, nil diff --git a/agent/proxycfg/testing_upstreams.go b/agent/proxycfg/testing_upstreams.go index 4a99bb023b2..f9b77c4d62d 100644 --- a/agent/proxycfg/testing_upstreams.go +++ b/agent/proxycfg/testing_upstreams.go @@ -13,7 +13,6 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/consul/proto/private/pbcommon" "github.com/hashicorp/consul/proto/private/pbpeering" ) @@ -100,60 +99,6 @@ func setupTestVariationConfigEntriesAndSnapshot( Nodes: TestGatewayNodesDC2(t), }, }) - case "order-by-locality-failover": - cluster1UID := UpstreamID{ - Name: "db", - Peer: "cluster-01", - EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(dbUID.PartitionOrDefault(), ""), - } - cluster2UID := UpstreamID{ - Name: "db", - Peer: "cluster-02", - EnterpriseMeta: acl.NewEnterpriseMetaWithPartition(dbUID.PartitionOrDefault(), ""), - } - chainID := makeChainID(structs.DiscoveryTargetOpts{Service: "db-v2"}) - events = append(events, - UpdateEvent{ - CorrelationID: "upstream-target:" + chainID + ":" + dbUID.String(), - Result: &structs.IndexedCheckServiceNodes{ - Nodes: TestUpstreamNodesAlternate(t), - }, - }, - UpdateEvent{ - CorrelationID: "peer-trust-bundle:cluster-01", - Result: &pbpeering.TrustBundleReadResponse{ - Bundle: &pbpeering.PeeringTrustBundle{ - PeerName: "peer1", - TrustDomain: "peer1.domain", - ExportedPartition: "peer1ap", - RootPEMs: []string{"peer1-root-1"}, - }, - }, - }, - UpdateEvent{ - CorrelationID: "peer-trust-bundle:cluster-02", - Result: &pbpeering.TrustBundleReadResponse{ - Bundle: &pbpeering.PeeringTrustBundle{ - PeerName: "peer2", - TrustDomain: "peer2.domain", - ExportedPartition: "peer2ap", - RootPEMs: []string{"peer2-root-2"}, - }, - }, - }, - UpdateEvent{ - CorrelationID: "upstream-peer:" + cluster1UID.String(), - Result: &structs.IndexedCheckServiceNodes{ - Nodes: structs.CheckServiceNodes{structs.TestCheckNodeServiceWithNameInPeer(t, "db", "dc1", "cluster-01", "10.40.1.1", false, cluster1UID.EnterpriseMeta)}, - }, - }, - UpdateEvent{ - CorrelationID: "upstream-peer:" + cluster2UID.String(), - Result: &structs.IndexedCheckServiceNodes{ - Nodes: structs.CheckServiceNodes{structs.TestCheckNodeServiceWithNameInPeer(t, "db", "dc2", "cluster-02", "10.40.1.2", false, cluster2UID.EnterpriseMeta)}, - }, - }, - ) case "failover-to-cluster-peer": uid := UpstreamID{ Name: "db", @@ -312,8 +257,8 @@ func setupTestVariationConfigEntriesAndSnapshot( case "lb-resolver": case "register-to-terminating-gateway": default: - t.Fatalf("unexpected variation: %q", variation) - return nil + extraEvents := extraUpdateEvents(t, variation, dbUID) + events = append(events, extraEvents...) } return events @@ -433,49 +378,6 @@ func setupTestVariationDiscoveryChain( }, }, ) - case "order-by-locality-failover": - peers = append(peers, - &pbpeering.Peering{ - Name: "cluster-01", - Remote: &pbpeering.RemoteInfo{ - Locality: &pbcommon.Locality{Region: "us-west-1"}, - }, - }, - &pbpeering.Peering{ - Name: "cluster-02", - Remote: &pbpeering.RemoteInfo{ - Locality: &pbcommon.Locality{Region: "us-west-2"}, - }, - }) - cluster1Target := structs.ServiceResolverFailoverTarget{ - Peer: "cluster-01", - } - cluster2Target := structs.ServiceResolverFailoverTarget{ - Peer: "cluster-02", - } - - entries = append(entries, - &structs.ServiceResolverConfigEntry{ - Kind: structs.ServiceResolver, - Name: "db", - EnterpriseMeta: entMeta, - ConnectTimeout: 33 * time.Second, - RequestTimeout: 33 * time.Second, - Failover: map[string]structs.ServiceResolverFailover{ - "*": { - Policy: &structs.ServiceResolverFailoverPolicy{ - Mode: "order-by-locality", - Regions: []string{"us-west-2", "us-west-1"}, - }, - Targets: []structs.ServiceResolverFailoverTarget{ - cluster1Target, - cluster2Target, - {Service: "db-v2"}, - }, - }, - }, - }, - ) case "redirect-to-cluster-peer": redirect := &structs.ServiceResolverRedirect{ Peer: "cluster-01", @@ -1024,8 +926,10 @@ func setupTestVariationDiscoveryChain( }, ) default: - t.Fatalf("unexpected variation: %q", variation) - return nil + e, p := extraDiscoChainConfig(t, variation, entMeta) + + entries = append(entries, e...) + peers = append(peers, p...) } if len(additionalEntries) > 0 { diff --git a/agent/proxycfg/testing_upstreams_oss.go b/agent/proxycfg/testing_upstreams_oss.go new file mode 100644 index 00000000000..3b8e22d0bda --- /dev/null +++ b/agent/proxycfg/testing_upstreams_oss.go @@ -0,0 +1,25 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: MPL-2.0 + +//go:build !consulent +// +build !consulent + +package proxycfg + +import ( + "github.com/mitchellh/go-testing-interface" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/private/pbpeering" +) + +func extraDiscoChainConfig(t testing.T, variation string, entMeta acl.EnterpriseMeta) ([]structs.ConfigEntry, []*pbpeering.Peering) { + t.Fatalf("unexpected variation: %q", variation) + return nil, nil +} + +func extraUpdateEvents(t testing.T, variation string, dbUID UpstreamID) []UpdateEvent { + t.Fatalf("unexpected variation: %q", variation) + return nil +} diff --git a/agent/xds/clusters.go b/agent/xds/clusters.go index d29d00a2564..84884bf4a78 100644 --- a/agent/xds/clusters.go +++ b/agent/xds/clusters.go @@ -65,13 +65,7 @@ func (s *ResourceGenerator) clustersFromSnapshot(cfgSnap *proxycfg.ConfigSnapsho } return res, nil case structs.ServiceKindAPIGateway: - // TODO Find a cleaner solution, can't currently pass unexported property types - var err error - cfgSnap.IngressGateway, err = cfgSnap.APIGateway.ToIngress(cfgSnap.Datacenter) - if err != nil { - return nil, err - } - res, err := s.clustersFromSnapshotIngressGateway(cfgSnap) + res, err := s.clustersFromSnapshotAPIGateway(cfgSnap) if err != nil { return nil, err } @@ -816,6 +810,52 @@ func (s *ResourceGenerator) clustersFromSnapshotIngressGateway(cfgSnap *proxycfg return clusters, nil } +func (s *ResourceGenerator) clustersFromSnapshotAPIGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { + var clusters []proto.Message + createdClusters := make(map[proxycfg.UpstreamID]bool) + readyUpstreams := getReadyUpstreams(cfgSnap) + + for listenerKey, upstreams := range readyUpstreams { + for _, upstream := range upstreams { + uid := proxycfg.NewUpstreamID(&upstream) + + // If we've already created a cluster for this upstream, skip it. Multiple listeners may + // reference the same upstream, so we don't need to create duplicate clusters in that case. + if createdClusters[uid] { + continue + } + + // Grab the discovery chain compiled in handlerAPIGateway.recompileDiscoveryChains + chain, ok := cfgSnap.APIGateway.DiscoveryChain[uid] + if !ok { + // this should not happen + return nil, fmt.Errorf("no discovery chain for upstream %q", uid) + } + + // Generate the list of upstream clusters for the discovery chain + upstreamClusters, err := s.makeUpstreamClustersForDiscoveryChain(uid, &upstream, chain, cfgSnap, false) + if err != nil { + return nil, err + } + + for _, cluster := range upstreamClusters { + // TODO Something analogous to s.configIngressUpstreamCluster(c, cfgSnap, listenerKey, &u) + // but not sure what that func does yet + s.configAPIUpstreamCluster(cluster, cfgSnap, listenerKey, &upstream) + clusters = append(clusters, cluster) + } + createdClusters[uid] = true + + } + } + return clusters, nil +} + +func (s *ResourceGenerator) configAPIUpstreamCluster(c *envoy_cluster_v3.Cluster, cfgSnap *proxycfg.ConfigSnapshot, listenerKey proxycfg.APIGatewayListenerKey, u *structs.Upstream) { + //TODO I don't think this is currently needed with what api gateway supports, but will be needed in the future + +} + func (s *ResourceGenerator) configIngressUpstreamCluster(c *envoy_cluster_v3.Cluster, cfgSnap *proxycfg.ConfigSnapshot, listenerKey proxycfg.IngressListenerKey, u *structs.Upstream) { var threshold *envoy_cluster_v3.CircuitBreakers_Thresholds setThresholdLimit := func(limitType string, limit int) { diff --git a/agent/xds/endpoints.go b/agent/xds/endpoints.go index 8c5b487ea97..92291d08106 100644 --- a/agent/xds/endpoints.go +++ b/agent/xds/endpoints.go @@ -41,13 +41,7 @@ func (s *ResourceGenerator) endpointsFromSnapshot(cfgSnap *proxycfg.ConfigSnapsh case structs.ServiceKindIngressGateway: return s.endpointsFromSnapshotIngressGateway(cfgSnap) case structs.ServiceKindAPIGateway: - // TODO Find a cleaner solution, can't currently pass unexported property types - var err error - cfgSnap.IngressGateway, err = cfgSnap.APIGateway.ToIngress(cfgSnap.Datacenter) - if err != nil { - return nil, err - } - return s.endpointsFromSnapshotIngressGateway(cfgSnap) + return s.endpointsFromSnapshotAPIGateway(cfgSnap) default: return nil, fmt.Errorf("Invalid service kind: %v", cfgSnap.Kind) } @@ -527,6 +521,65 @@ func (s *ResourceGenerator) endpointsFromSnapshotIngressGateway(cfgSnap *proxycf return resources, nil } +func getReadyUpstreams(cfgSnap *proxycfg.ConfigSnapshot) map[proxycfg.APIGatewayListenerKey][]structs.Upstream { + + readyUpstreams := map[proxycfg.APIGatewayListenerKey][]structs.Upstream{} + for _, l := range cfgSnap.APIGateway.Listeners { + //need to account for the state of the Listener when building the upstreams list + if !cfgSnap.APIGateway.GatewayConfig.ListenerIsReady(l.Name) { + continue + } + boundListener := cfgSnap.APIGateway.BoundListeners[l.Name] + //get route ref + for _, routeRef := range boundListener.Routes { + //get upstreams + upstreamMap := cfgSnap.APIGateway.Upstreams[routeRef] + for listenerKey, upstreams := range upstreamMap { + for _, u := range upstreams { + readyUpstreams[listenerKey] = append(readyUpstreams[listenerKey], u) + } + } + } + } + return readyUpstreams +} + +func (s *ResourceGenerator) endpointsFromSnapshotAPIGateway(cfgSnap *proxycfg.ConfigSnapshot) ([]proto.Message, error) { + var resources []proto.Message + createdClusters := make(map[proxycfg.UpstreamID]bool) + + readyUpstreams := getReadyUpstreams(cfgSnap) + + for _, upstreams := range readyUpstreams { + for _, u := range upstreams { + uid := proxycfg.NewUpstreamID(&u) + + // If we've already created endpoints for this upstream, skip it. Multiple listeners may + // reference the same upstream, so we don't need to create duplicate endpoints in that case. + if createdClusters[uid] { + continue + } + + es, err := s.endpointsFromDiscoveryChain( + uid, + cfgSnap.APIGateway.DiscoveryChain[uid], + cfgSnap, + proxycfg.GatewayKey{Datacenter: cfgSnap.Datacenter, Partition: u.DestinationPartition}, + u.Config, + cfgSnap.APIGateway.WatchedUpstreamEndpoints[uid], + cfgSnap.APIGateway.WatchedGatewayEndpoints[uid], + false, + ) + if err != nil { + return nil, err + } + resources = append(resources, es...) + createdClusters[uid] = true + } + } + return resources, nil +} + // used in clusters.go func makeEndpoint(host string, port int) *envoy_endpoint_v3.LbEndpoint { return &envoy_endpoint_v3.LbEndpoint{ @@ -628,6 +681,7 @@ func (s *ResourceGenerator) endpointsFromDiscoveryChain( var escapeHatchCluster *envoy_cluster_v3.Cluster if !forMeshGateway { + cfg, err := structs.ParseUpstreamConfigNoDefaults(upstreamConfigMap) if err != nil { // Don't hard fail on a config typo, just warn. The parse func returns diff --git a/agent/xds/listeners_apigateway.go b/agent/xds/listeners_apigateway.go new file mode 100644 index 00000000000..87ef51d26ea --- /dev/null +++ b/agent/xds/listeners_apigateway.go @@ -0,0 +1,21 @@ +package xds + +import ( + "fmt" + "github.com/hashicorp/consul/agent/proxycfg" + "github.com/hashicorp/consul/agent/structs" +) + +func refToAPIGatewayListenerKey(cfgSnap *proxycfg.ConfigSnapshot, listenerRef structs.ResourceReference) (*structs.APIGatewayListener, *structs.BoundAPIGatewayListener, *proxycfg.APIGatewayListenerKey, error) { + listenerCfg, ok := cfgSnap.APIGateway.Listeners[listenerRef.Name] + if !ok { + return nil, nil, nil, fmt.Errorf("no listener config found for listener %s", listenerCfg.Name) + } + + boundListenerCfg, ok := cfgSnap.APIGateway.BoundListeners[listenerRef.Name] + if !ok { + return nil, nil, nil, fmt.Errorf("no listener config found for listener %s", listenerCfg.Name) + } + return &listenerCfg, &boundListenerCfg, &proxycfg.APIGatewayListenerKey{Port: listenerCfg.Port, Protocol: string(listenerCfg.Protocol)}, nil + +} From e082ec4f1ad3f78f7e355c58d897d2ee9c7534c0 Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Wed, 19 Apr 2023 15:12:00 -0500 Subject: [PATCH 2/4] server_test --- agent/consul/server_test.go | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 5fcbb453381..bd39e9676ea 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -1884,14 +1884,18 @@ func TestServer_ReloadConfig(t *testing.T) { // Check the incoming RPC rate limiter got updated mockHandler.AssertCalled(t, "UpdateConfig", rpcRate.HandlerConfig{ - GlobalMode: rc.RequestLimits.Mode, - GlobalReadConfig: multilimiter.LimiterConfig{ - Rate: rc.RequestLimits.ReadRate, - Burst: int(rc.RequestLimits.ReadRate) * requestLimitsBurstMultiplier, - }, - GlobalWriteConfig: multilimiter.LimiterConfig{ - Rate: rc.RequestLimits.WriteRate, - Burst: int(rc.RequestLimits.WriteRate) * requestLimitsBurstMultiplier, + GlobalLimitConfig: rpcRate.GlobalLimitConfig{ + Mode: rc.RequestLimits.Mode, + ReadWriteConfig: rpcRate.ReadWriteConfig{ + ReadConfig: multilimiter.LimiterConfig{ + Rate: rc.RequestLimits.ReadRate, + Burst: int(rc.RequestLimits.ReadRate) * requestLimitsBurstMultiplier, + }, + WriteConfig: multilimiter.LimiterConfig{ + Rate: rc.RequestLimits.WriteRate, + Burst: int(rc.RequestLimits.WriteRate) * requestLimitsBurstMultiplier, + }, + }, }, }) @@ -1902,7 +1906,7 @@ func TestServer_ReloadConfig(t *testing.T) { defaults := DefaultConfig() got := s.raft.ReloadableConfig() require.Equal(t, uint64(4321), got.SnapshotThreshold, - "should have be reloaded to new value") + "should have been reloaded to new value") require.Equal(t, defaults.RaftConfig.SnapshotInterval, got.SnapshotInterval, "should have remained the default interval") require.Equal(t, defaults.RaftConfig.TrailingLogs, got.TrailingLogs, From 64bd00b02a5a2665965e7d8116297558db3e080b Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Wed, 19 Apr 2023 15:44:48 -0500 Subject: [PATCH 3/4] deleted extra file --- agent/xds/listeners_apigateway.go | 21 --------------------- 1 file changed, 21 deletions(-) delete mode 100644 agent/xds/listeners_apigateway.go diff --git a/agent/xds/listeners_apigateway.go b/agent/xds/listeners_apigateway.go deleted file mode 100644 index 87ef51d26ea..00000000000 --- a/agent/xds/listeners_apigateway.go +++ /dev/null @@ -1,21 +0,0 @@ -package xds - -import ( - "fmt" - "github.com/hashicorp/consul/agent/proxycfg" - "github.com/hashicorp/consul/agent/structs" -) - -func refToAPIGatewayListenerKey(cfgSnap *proxycfg.ConfigSnapshot, listenerRef structs.ResourceReference) (*structs.APIGatewayListener, *structs.BoundAPIGatewayListener, *proxycfg.APIGatewayListenerKey, error) { - listenerCfg, ok := cfgSnap.APIGateway.Listeners[listenerRef.Name] - if !ok { - return nil, nil, nil, fmt.Errorf("no listener config found for listener %s", listenerCfg.Name) - } - - boundListenerCfg, ok := cfgSnap.APIGateway.BoundListeners[listenerRef.Name] - if !ok { - return nil, nil, nil, fmt.Errorf("no listener config found for listener %s", listenerCfg.Name) - } - return &listenerCfg, &boundListenerCfg, &proxycfg.APIGatewayListenerKey{Port: listenerCfg.Port, Protocol: string(listenerCfg.Protocol)}, nil - -} From 694e517c23ed01a096d1fb7120b83c3487e73adb Mon Sep 17 00:00:00 2001 From: Sarah Alsmiller Date: Wed, 19 Apr 2023 19:43:58 -0500 Subject: [PATCH 4/4] add missing parents to test --- agent/xds/resources_test.go | 41 ++++++++++++++++++++++++------------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/agent/xds/resources_test.go b/agent/xds/resources_test.go index 5028cc67e47..3a068d850bd 100644 --- a/agent/xds/resources_test.go +++ b/agent/xds/resources_test.go @@ -365,20 +365,27 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase { }}, }, } - }, []structs.BoundRoute{ - &structs.TCPRouteConfigEntry{ - Kind: structs.TCPRoute, - Name: "route", - Services: []structs.TCPService{{ - Name: "service", - }}, - }, - }, []structs.InlineCertificateConfigEntry{{ - Kind: structs.InlineCertificate, - Name: "certificate", - PrivateKey: gatewayTestPrivateKey, - Certificate: gatewayTestCertificate, - }}, nil) + }, + []structs.BoundRoute{ + &structs.TCPRouteConfigEntry{ + Kind: structs.TCPRoute, + Name: "route", + Services: []structs.TCPService{{ + Name: "service", + }}, + Parents: []structs.ResourceReference{ + { + Kind: structs.APIGateway, + Name: "api-gateway", + }, + }, + }, + }, []structs.InlineCertificateConfigEntry{{ + Kind: structs.InlineCertificate, + Name: "certificate", + PrivateKey: gatewayTestPrivateKey, + Certificate: gatewayTestCertificate, + }}, nil) }, }, { @@ -410,6 +417,12 @@ func getAPIGatewayGoldenTestCases(t *testing.T) []goldenTestCase { Name: "service", }}, }}, + Parents: []structs.ResourceReference{ + { + Kind: structs.APIGateway, + Name: "api-gateway", + }, + }, }, }, nil, []proxycfg.UpdateEvent{{ CorrelationID: "discovery-chain:" + serviceUID.String(),