Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions agent/xds/proxystateconverter/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,8 @@ import (
"time"

envoy_http_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
"github.com/hashicorp/go-uuid"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/wrapperspb"

Expand Down Expand Up @@ -1443,7 +1442,11 @@ func makeL4Destination(opts destinationOpts) (*pbproxystate.L4Destination, error

l4Dest := &pbproxystate.L4Destination{
//AccessLog: accessLogs,
Name: opts.cluster,
Destination: &pbproxystate.L4Destination_Cluster{
Cluster: &pbproxystate.DestinationCluster{
Name: opts.cluster,
},
},
StatPrefix: makeStatPrefix(opts.statPrefix, opts.filterName),
}
return l4Dest, nil
Expand Down
8 changes: 5 additions & 3 deletions agent/xdsv2/cluster_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import (
envoy_aggregate_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/clusters/aggregate/v3"
envoy_upstreams_v3 "github.com/envoyproxy/go-control-plane/envoy/extensions/upstreams/http/v3"
envoy_type_v3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"

"github.com/hashicorp/consul/envoyextensions/xdscommon"
"github.com/hashicorp/consul/proto-public/pbmesh/v1alpha1/pbproxystate"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

func (pr *ProxyResources) doesEnvoyClusterAlreadyExist(name string) bool {
Expand Down Expand Up @@ -174,9 +174,11 @@ func (pr *ProxyResources) makeEnvoyStaticCluster(name string, protocol string, s
}
return cluster, nil
}

func (pr *ProxyResources) makeEnvoyDnsCluster(name string, protocol string, dns *pbproxystate.DNSEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
return nil, nil
}

func (pr *ProxyResources) makeEnvoyPassthroughCluster(name string, protocol string, passthrough *pbproxystate.PassthroughEndpointGroup) (*envoy_cluster_v3.Cluster, error) {
cluster := &envoy_cluster_v3.Cluster{
Name: name,
Expand Down Expand Up @@ -343,6 +345,6 @@ func addEnvoyLBToCluster(dynamicConfig *pbproxystate.DynamicEndpointGroupConfig,
}

// TODO(proxystate): In a future PR this will create clusters and add it to ProxyResources.proxyState
func (pr *ProxyResources) makeEnvoyClusterFromL4Destination(name string) error {
func (pr *ProxyResources) makeEnvoyClusterFromL4Destination(l4 *pbproxystate.L4Destination) error {
return nil
}
27 changes: 24 additions & 3 deletions agent/xdsv2/listener_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func (pr *ProxyResources) makeEnvoyResourcesForSNIDestination(sni *pbproxystate.
}

func (pr *ProxyResources) makeEnvoyResourcesForL4Destination(l4 *pbproxystate.Router_L4) ([]*envoy_listener_v3.Filter, error) {
err := pr.makeEnvoyClusterFromL4Destination(l4.L4.Name)
err := pr.makeEnvoyClusterFromL4Destination(l4.L4)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -355,9 +355,30 @@ func makeL4Filters(defaultAllow bool, l4 *pbproxystate.L4Destination) ([]*envoy_

// Add tcp proxy filter
tcp := &envoy_tcp_proxy_v3.TcpProxy{
ClusterSpecifier: &envoy_tcp_proxy_v3.TcpProxy_Cluster{Cluster: l4.Name},
StatPrefix: l4.StatPrefix,
StatPrefix: l4.StatPrefix,
}

switch dest := l4.Destination.(type) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Accounting for a TCPRoute split.

case *pbproxystate.L4Destination_Cluster:
tcp.ClusterSpecifier = &envoy_tcp_proxy_v3.TcpProxy_Cluster{Cluster: dest.Cluster.Name}
case *pbproxystate.L4Destination_WeightedClusters:
clusters := make([]*envoy_tcp_proxy_v3.TcpProxy_WeightedCluster_ClusterWeight, 0, len(dest.WeightedClusters.Clusters))
for _, cluster := range dest.WeightedClusters.Clusters {
clusters = append(clusters, &envoy_tcp_proxy_v3.TcpProxy_WeightedCluster_ClusterWeight{
Name: cluster.Name,
Weight: cluster.Weight.GetValue(),
})
}

tcp.ClusterSpecifier = &envoy_tcp_proxy_v3.TcpProxy_WeightedClusters{
WeightedClusters: &envoy_tcp_proxy_v3.TcpProxy_WeightedCluster{
Clusters: clusters,
},
}
default:
return nil, fmt.Errorf("unexpected l4 destination type: %T", l4.Destination)
}

tcpFilter, err := makeEnvoyFilter(envoyNetworkFilterName, tcp)
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
@@ -1,59 +1,61 @@
{
"proxyState": {
"identity": {
"tenancy": {
"partition": "default",
"namespace": "default",
"peerName": "local"
"proxyState": {
"identity": {
"tenancy": {
"partition": "default",
"namespace": "default",
"peerName": "local"
},
"name": "test-identity"
"name": "test-identity"
},
"listeners": [
"listeners": [
{
"name": "outbound_listener",
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
"name": "outbound_listener",
"direction": "DIRECTION_OUTBOUND",
"hostPort": {
"host": "127.0.0.1",
"port": 15001
},
"routers": [
"routers": [
{
"match": {
"prefixRanges": [
"match": {
"prefixRanges": [
{
"addressPrefix": "1.1.1.1",
"prefixLen": 32
"addressPrefix": "1.1.1.1",
"prefixLen": 32
}
],
"destinationPort": 8080
},
"l4": {
"name": "tcp.api-1.default.dc1.internal.foo.consul",
"statPrefix": "upstream.tcp.api-1.default.default.dc1"
"l4": {
"cluster": {
"name": "tcp.api-1.default.dc1.internal.foo.consul"
},
"statPrefix": "upstream.tcp.api-1.default.default.dc1"
}
}
],
"capabilities": [
"capabilities": [
"CAPABILITY_TRANSPARENT"
]
}
],
"clusters": {
"tcp.api-1.default.dc1.internal.foo.consul": {
"endpointGroup": {
"dynamic": {
"config": {
"disablePanicThreshold": true
"clusters": {
"tcp.api-1.default.dc1.internal.foo.consul": {
"endpointGroup": {
"dynamic": {
"config": {
"disablePanicThreshold": true
},
"outboundTls": {
"outboundMesh": {
"identityKey": "test-identity",
"validationContext": {
"spiffeIds": [
"outboundTls": {
"outboundMesh": {
"identityKey": "test-identity",
"validationContext": {
"spiffeIds": [
"spiffe://foo.consul/ap/default/ns/default/identity/api1-identity"
]
},
"sni": "api-1.default.dc1.internal.foo.consul"
"sni": "api-1.default.dc1.internal.foo.consul"
},
"alpnProtocols": [
"consul~tcp"
Expand All @@ -64,22 +66,22 @@
}
}
},
"requiredEndpoints": {
"api-1.default.dc1.internal.foo.consul": {
"id": {
"name": "api-1",
"type": {
"group": "catalog",
"groupVersion": "v1alpha1",
"kind": "ServiceEndpoints"
"requiredEndpoints": {
"api-1.default.dc1.internal.foo.consul": {
"id": {
"name": "api-1",
"type": {
"group": "catalog",
"groupVersion": "v1alpha1",
"kind": "ServiceEndpoints"
},
"tenancy": {
"partition": "default",
"namespace": "default",
"peerName": "local"
"tenancy": {
"partition": "default",
"namespace": "default",
"peerName": "local"
}
},
"port": "mesh"
"port": "mesh"
}
}
}
3 changes: 2 additions & 1 deletion command/resource/apply/apply.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copywrite headers fixup in passing

// SPDX-License-Identifier: BUSL-1.1

package apply

Expand Down Expand Up @@ -156,6 +156,7 @@ func (c *cmd) Help() string {
}

const synopsis = "Writes/updates resource information"

const help = `
Usage: consul resource apply -f=<file-path>

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) HashiCorp, Inc.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is new from the initial review.

// SPDX-License-Identifier: BUSL-1.1

package sidecarproxycache

import (
"github.com/hashicorp/consul/internal/catalog"
"github.com/hashicorp/consul/internal/mesh/internal/types"
"github.com/hashicorp/consul/internal/resource"
"github.com/hashicorp/consul/internal/resource/mappers/bimapper"
"github.com/hashicorp/consul/proto-public/pbresource"
)

type ComputedRoutesCache struct {
mapper *bimapper.Mapper
}

func NewComputedRoutesCache() *ComputedRoutesCache {
return &ComputedRoutesCache{
mapper: bimapper.New(types.ComputedRoutesType, catalog.ServiceType),
}
}

func (c *ComputedRoutesCache) TrackComputedRoutes(computedRoutes *types.DecodedComputedRoutes) {
var serviceRefs []resource.ReferenceOrID

for _, pcr := range computedRoutes.Data.PortedConfigs {
for _, details := range pcr.Targets {
serviceRefs = append(serviceRefs, details.BackendRef.Ref)
if details.FailoverConfig != nil {
for _, dest := range details.FailoverConfig.Destinations {
serviceRefs = append(serviceRefs, dest.Ref)
}
}
}
}

c.mapper.TrackItem(computedRoutes.Resource.Id, serviceRefs)
}

func (c *ComputedRoutesCache) UntrackComputedRoutes(computedRoutesID *pbresource.ID) {
c.mapper.UntrackItem(computedRoutesID)
}

func (c *ComputedRoutesCache) ComputedRoutesByService(id resource.ReferenceOrID) []*pbresource.ID {
return c.mapper.ItemIDsForLink(id)
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type DestinationsCache struct {

// store is a map from destination service reference and port as a reference key
// to the object representing destination reference.
store map[ReferenceKeyWithPort]intermediate.CombinedDestinationRef
store map[ReferenceKeyWithPort]intermediate.CombinedDestinationRef
storedPorts map[resource.ReferenceKey]map[string]struct{}

// sourceProxiesIndex stores a map from a reference key of source proxy IDs
// to the keys in the store map.
Expand All @@ -36,6 +37,7 @@ type storeKeys map[ReferenceKeyWithPort]struct{}
func NewDestinationsCache() *DestinationsCache {
return &DestinationsCache{
store: make(map[ReferenceKeyWithPort]intermediate.CombinedDestinationRef),
storedPorts: make(map[resource.ReferenceKey]map[string]struct{}),
sourceProxiesIndex: make(map[resource.ReferenceKey]storeKeys),
}
}
Expand Down Expand Up @@ -87,6 +89,7 @@ func (c *DestinationsCache) addLocked(d intermediate.CombinedDestinationRef) {
key := KeyFromRefAndPort(d.ServiceRef, d.Port)

c.store[key] = d
c.addPortLocked(d.ServiceRef, d.Port)

// Update source proxies index.
for proxyRef := range d.SourceProxies {
Expand All @@ -99,6 +102,18 @@ func (c *DestinationsCache) addLocked(d intermediate.CombinedDestinationRef) {
}
}

func (c *DestinationsCache) addPortLocked(ref *pbresource.Reference, port string) {
rk := resource.NewReferenceKey(ref)

m, ok := c.storedPorts[rk]
if !ok {
m = make(map[string]struct{})
c.storedPorts[rk] = m
}

m[port] = struct{}{}
}

func (c *DestinationsCache) deleteLocked(ref *pbresource.Reference, port string) {
key := KeyFromRefAndPort(ref, port)

Expand All @@ -117,6 +132,22 @@ func (c *DestinationsCache) deleteLocked(ref *pbresource.Reference, port string)

// Finally, delete this destination from the store.
delete(c.store, key)
c.deletePortLocked(ref, port)
}

func (c *DestinationsCache) deletePortLocked(ref *pbresource.Reference, port string) {
rk := resource.NewReferenceKey(ref)

m, ok := c.storedPorts[rk]
if !ok {
return
}

delete(m, port)

if len(m) == 0 {
delete(c.storedPorts, rk)
}
}

// DeleteSourceProxy deletes the source proxy given by id from the cache.
Expand Down Expand Up @@ -166,6 +197,35 @@ func (c *DestinationsCache) ReadDestination(ref *pbresource.Reference, port stri
return d, found
}

func (c *DestinationsCache) ReadDestinationsByServiceAllPorts(ref *pbresource.Reference) []intermediate.CombinedDestinationRef {
// Check that reference is a catalog.Service type.
if !resource.EqualType(catalog.ServiceType, ref.Type) {
panic("ref must of type catalog.Service")
}

c.lock.RLock()
defer c.lock.RUnlock()

rk := resource.NewReferenceKey(ref)

ports, ok := c.storedPorts[rk]
if !ok {
return nil
}

var destinations []intermediate.CombinedDestinationRef
for port := range ports {
key := KeyFromRefAndPort(ref, port)

d, found := c.store[key]
if found {
destinations = append(destinations, d)
}
}

return destinations
}

// DestinationsBySourceProxy returns all destinations that are a referenced by the given source proxy id.
func (c *DestinationsCache) DestinationsBySourceProxy(id *pbresource.ID) []intermediate.CombinedDestinationRef {
// Check that id is the ProxyStateTemplate type.
Expand Down
Loading