Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions internal/catalog/exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,12 @@ func NewFailoverPolicyMapper() FailoverPolicyMapper {
func ValidateLocalServiceRefNoSection(ref *pbresource.Reference, wrapErr func(error) error) error {
return types.ValidateLocalServiceRefNoSection(ref, wrapErr)
}

// ValidateSelector ensures that the selector has at least one exact or prefix
// match constraint, and that if a filter is present it is valid.
//
// The selector can be nil, and have zero exact/prefix matches if allowEmpty is
// set to true.
func ValidateSelector(sel *pbcatalog.WorkloadSelector, allowEmpty bool) error {
return types.ValidateSelector(sel, allowEmpty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package endpoints

import (
"context"
"fmt"
"sort"

"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -169,6 +170,14 @@ func gatherWorkloadsForService(ctx context.Context, rt controller.Runtime, svc *
workloadNames[rsp.Resource.Id.Name] = struct{}{}
}

if sel.GetFilter() != "" && len(workloads) > 0 {
var err error
workloads, err = resource.FilterResourcesByMetadata(workloads, sel.GetFilter())
if err != nil {
return nil, fmt.Errorf("error filtering results by metadata: %w", err)
}
}

// Sorting ensures deterministic output. This will help for testing but
// the real reason to do this is so we will be able to diff the set of
// workloads endpoints to determine if we need to update them.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/suite"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"

svctest "github.com/hashicorp/consul/agent/grpc-external/services/resource/testing"
"github.com/hashicorp/consul/internal/catalog/internal/types"
Expand All @@ -30,14 +31,16 @@ type reconciliationDataSuite struct {
client pbresource.ResourceServiceClient
rt controller.Runtime

apiServiceData *pbcatalog.Service
apiService *pbresource.Resource
apiEndpoints *pbresource.Resource
api1Workload *pbresource.Resource
api2Workload *pbresource.Resource
api123Workload *pbresource.Resource
web1Workload *pbresource.Resource
web2Workload *pbresource.Resource
apiServiceData *pbcatalog.Service
apiService *pbresource.Resource
apiServiceSubsetData *pbcatalog.Service
apiServiceSubset *pbresource.Resource
apiEndpoints *pbresource.Resource
api1Workload *pbresource.Resource
api2Workload *pbresource.Resource
api123Workload *pbresource.Resource
web1Workload *pbresource.Resource
web2Workload *pbresource.Resource
}

func (suite *reconciliationDataSuite) SetupTest() {
Expand All @@ -62,12 +65,19 @@ func (suite *reconciliationDataSuite) SetupTest() {
},
},
}
suite.apiServiceSubsetData = proto.Clone(suite.apiServiceData).(*pbcatalog.Service)
suite.apiServiceSubsetData.Workloads.Filter = "(zim in metadata) and (metadata.zim matches `^g.`)"

suite.apiService = rtest.Resource(pbcatalog.ServiceType, "api").
WithData(suite.T(), suite.apiServiceData).
Write(suite.T(), suite.client)

suite.apiServiceSubset = rtest.Resource(pbcatalog.ServiceType, "api-subset").
WithData(suite.T(), suite.apiServiceSubsetData).
Write(suite.T(), suite.client)

suite.api1Workload = rtest.Resource(pbcatalog.WorkloadType, "api-1").
WithMeta("zim", "dib").
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1"},
Expand All @@ -92,6 +102,7 @@ func (suite *reconciliationDataSuite) SetupTest() {
Write(suite.T(), suite.client)

suite.api123Workload = rtest.Resource(pbcatalog.WorkloadType, "api-123").
WithMeta("zim", "gir").
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1"},
Expand All @@ -104,6 +115,7 @@ func (suite *reconciliationDataSuite) SetupTest() {
Write(suite.T(), suite.client)

suite.web1Workload = rtest.Resource(pbcatalog.WorkloadType, "web-1").
WithMeta("zim", "gaz").
WithData(suite.T(), &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "127.0.0.1"},
Expand Down Expand Up @@ -259,6 +271,20 @@ func (suite *reconciliationDataSuite) TestGetWorkloadData() {
prototest.AssertDeepEqual(suite.T(), suite.web2Workload, data[4].resource)
}

func (suite *reconciliationDataSuite) TestGetWorkloadDataWithFilter() {
// This is like TestGetWorkloadData except it exercises the post-read
// filter on the selector.
data, err := getWorkloadData(suite.ctx, suite.rt, &serviceData{
resource: suite.apiServiceSubset,
service: suite.apiServiceSubsetData,
})

require.NoError(suite.T(), err)
require.Len(suite.T(), data, 2)
prototest.AssertDeepEqual(suite.T(), suite.api123Workload, data[0].resource)
prototest.AssertDeepEqual(suite.T(), suite.web1Workload, data[1].resource)
}

func TestReconciliationData(t *testing.T) {
suite.Run(t, new(reconciliationDataSuite))
}
2 changes: 1 addition & 1 deletion internal/catalog/internal/types/dns_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func ValidateDNSPolicy(res *pbresource.Resource) error {
var err error
// Ensure that this resource isn't useless and is attempting to
// select at least one workload.
if selErr := validateSelector(policy.Workloads, false); selErr != nil {
if selErr := ValidateSelector(policy.Workloads, false); selErr != nil {
err = multierror.Append(err, resource.ErrInvalidField{
Name: "workloads",
Wrapped: selErr,
Expand Down
2 changes: 1 addition & 1 deletion internal/catalog/internal/types/health_checks.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func ValidateHealthChecks(res *pbresource.Resource) error {
var err error

// Validate the workload selector
if selErr := validateSelector(checks.Workloads, false); selErr != nil {
if selErr := ValidateSelector(checks.Workloads, false); selErr != nil {
err = multierror.Append(err, resource.ErrInvalidField{
Name: "workloads",
Wrapped: selErr,
Expand Down
2 changes: 1 addition & 1 deletion internal/catalog/internal/types/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func ValidateService(res *pbresource.Resource) error {
// ServiceEndpoints objects for this service such as when desiring to
// configure endpoint information for external services that are not
// registered as workloads
if selErr := validateSelector(service.Workloads, true); selErr != nil {
if selErr := ValidateSelector(service.Workloads, true); selErr != nil {
err = multierror.Append(err, resource.ErrInvalidField{
Name: "workloads",
Wrapped: selErr,
Expand Down
27 changes: 20 additions & 7 deletions internal/catalog/internal/types/validators.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func validateWorkloadHost(host string) error {
return nil
}

func validateSelector(sel *pbcatalog.WorkloadSelector, allowEmpty bool) error {
func ValidateSelector(sel *pbcatalog.WorkloadSelector, allowEmpty bool) error {
if sel == nil {
if allowEmpty {
return nil
Expand All @@ -88,30 +88,43 @@ func validateSelector(sel *pbcatalog.WorkloadSelector, allowEmpty bool) error {
}

if len(sel.Names) == 0 && len(sel.Prefixes) == 0 {
if allowEmpty {
return nil
if !allowEmpty {
return resource.ErrEmpty
}

return resource.ErrEmpty
if sel.Filter != "" {
return resource.ErrInvalidField{
Name: "filter",
Wrapped: errors.New("filter cannot be set unless there is a name or prefix selector"),
}
}
return nil
}

var err error
var merr error

// Validate that all the exact match names are non-empty. This is
// mostly for the sake of not admitting values that should always
// be meaningless and never actually cause selection of a workload.
// This is because workloads must have non-empty names.
for idx, name := range sel.Names {
if name == "" {
err = multierror.Append(err, resource.ErrInvalidListElement{
merr = multierror.Append(merr, resource.ErrInvalidListElement{
Name: "names",
Index: idx,
Wrapped: resource.ErrEmpty,
})
}
}

return err
if err := resource.ValidateMetadataFilter(sel.GetFilter()); err != nil {
merr = multierror.Append(merr, resource.ErrInvalidField{
Name: "filter",
Wrapped: err,
})
}

return merr
}

func validateIPAddress(ip string) error {
Expand Down
41 changes: 40 additions & 1 deletion internal/catalog/internal/types/validators_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package types

import (
"errors"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -281,11 +282,49 @@ func TestValidateSelector(t *testing.T) {
},
},
},
"filter-with-empty-query": {
selector: &pbcatalog.WorkloadSelector{
Filter: "garbage.value == zzz",
},
allowEmpty: true,
err: resource.ErrInvalidField{
Name: "filter",
Wrapped: errors.New(
`filter cannot be set unless there is a name or prefix selector`,
),
},
},
"bad-filter": {
selector: &pbcatalog.WorkloadSelector{
Prefixes: []string{"foo", "bar"},
Filter: "garbage.value == zzz",
},
allowEmpty: false,
err: &multierror.Error{
Errors: []error{
resource.ErrInvalidField{
Name: "filter",
Wrapped: fmt.Errorf(
`filter "garbage.value == zzz" is invalid: %w`,
errors.New(`Selector "garbage" is not valid`),
),
},
},
},
},
"good-filter": {
selector: &pbcatalog.WorkloadSelector{
Prefixes: []string{"foo", "bar"},
Filter: "metadata.zone == west1",
},
allowEmpty: false,
err: nil,
},
}

for name, tcase := range cases {
t.Run(name, func(t *testing.T) {
err := validateSelector(tcase.selector, tcase.allowEmpty)
err := ValidateSelector(tcase.selector, tcase.allowEmpty)
if tcase.err == nil {
require.NoError(t, err)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
destinationIDs := r.mapper.DestinationsForWorkload(req.ID)
rt.Logger.Trace("cached destinations IDs", "ids", destinationIDs)

decodedDestinations, err := r.fetchDestinations(ctx, rt.Client, destinationIDs)
decodedDestinations, err := r.fetchDestinations(ctx, rt.Client, destinationIDs, workload)
if err != nil {
rt.Logger.Error("error fetching mapper", "error", err)
return err
Expand Down Expand Up @@ -241,8 +241,9 @@ func validate(
func (r *reconciler) fetchDestinations(
ctx context.Context,
client pbresource.ResourceServiceClient,
destinationIDs []*pbresource.ID) ([]*types.DecodedDestinations, error) {

destinationIDs []*pbresource.ID,
workload *types.DecodedWorkload,
) ([]*types.DecodedDestinations, error) {
// Sort all configs alphabetically.
sort.Slice(destinationIDs, func(i, j int) bool {
return destinationIDs[i].GetName() < destinationIDs[j].GetName()
Expand All @@ -259,6 +260,17 @@ func (r *reconciler) fetchDestinations(
r.mapper.UntrackDestinations(id)
continue
}

if res.Data.Workloads.Filter != "" {
match, err := resource.FilterMatchesResourceMetadata(workload.Resource, res.Data.Workloads.Filter)
if err != nil {
return nil, fmt.Errorf("error checking selector filters: %w", err)
}
if !match {
continue
}
}

decoded = append(decoded, res)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package proxyconfiguration

import (
"context"
"fmt"

"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
Expand Down Expand Up @@ -86,7 +87,7 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
proxyCfgIDs := r.proxyConfigMapper.IDsForWorkload(req.ID)
rt.Logger.Trace("cached proxy cfg IDs", "ids", proxyCfgIDs)

decodedProxyCfgs, err := r.fetchProxyConfigs(ctx, rt.Client, proxyCfgIDs)
decodedProxyCfgs, err := r.fetchProxyConfigs(ctx, rt.Client, proxyCfgIDs, workload)
if err != nil {
rt.Logger.Error("error fetching proxy configurations", "error", err)
return err
Expand Down Expand Up @@ -154,8 +155,9 @@ func (r *reconciler) Reconcile(ctx context.Context, rt controller.Runtime, req c
func (r *reconciler) fetchProxyConfigs(
ctx context.Context,
client pbresource.ResourceServiceClient,
proxyCfgIds []*pbresource.ID) ([]*types.DecodedProxyConfiguration, error) {

proxyCfgIds []*pbresource.ID,
workload *types.DecodedWorkload,
) ([]*types.DecodedProxyConfiguration, error) {
var decoded []*types.DecodedProxyConfiguration
for _, id := range proxyCfgIds {
res, err := resource.GetDecodedResource[*pbmesh.ProxyConfiguration](ctx, client, id)
Expand All @@ -167,6 +169,17 @@ func (r *reconciler) fetchProxyConfigs(
r.proxyConfigMapper.UntrackID(id)
continue
}

if res.Data.Workloads.Filter != "" {
match, err := resource.FilterMatchesResourceMetadata(workload.Resource, res.Data.Workloads.Filter)
if err != nil {
return nil, fmt.Errorf("error checking selector filters: %w", err)
}
if !match {
continue
}
}

decoded = append(decoded, res)
}

Expand Down
10 changes: 8 additions & 2 deletions internal/mesh/internal/types/destinations.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ func ValidateDestinations(res *pbresource.Resource) error {

var merr error

// Validate the workload selector
if selErr := catalog.ValidateSelector(destinations.Workloads, false); selErr != nil {
merr = multierror.Append(merr, resource.ErrInvalidField{
Name: "workloads",
Wrapped: selErr,
})
}

for i, dest := range destinations.Destinations {
wrapDestErr := func(err error) error {
return resource.ErrInvalidListElement{
Expand All @@ -97,7 +105,5 @@ func ValidateDestinations(res *pbresource.Resource) error {
// TODO(v2): validate ListenAddr
}

// TODO(v2): validate workload selectors

return merr
}
Loading