Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v9] backport #12685 (Add de-duplicating resources when sorting/totalCount is requested) #13451

Merged
merged 5 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/client/proto/authservice.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions api/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,14 @@ message PaginatedResource {

// ListResourcesRequest defines a request to retrieve resources paginated. Only
// one type of resource can be retrieved per request.
//
// NOTE: There are two paths this request can take:
// 1. ListResources: the more efficient path that retrieves resources by subset
// at a time defined by field 'Limit'. Does NOT de-duplicate matches.
// 2. listResourcesWithSort: the less efficient path that retrieves all resources
// upfront by falling back to the traditional GetXXX calls. Used when sorting (SortBy),
// total count of resources (NeedTotalCount), or ResourceType `KindKubernetesCluster`
// is requested. Matches are de-duplicated.
message ListResourcesRequest {
// ResourceType is the resource that is going to be retrieved.
string ResourceType = 1 [ (gogoproto.jsontag) = "resource_type,omitempty" ];
Expand Down
11 changes: 7 additions & 4 deletions api/types/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,17 @@ func (a *AppV3) CheckAndSetDefaults() error {
return nil
}

// DeduplicateApps deduplicates apps by name.
// DeduplicateApps deduplicates apps by combination of app name and public address.
// Apps can have the same name but also could have different addresses.
func DeduplicateApps(apps []Application) (result []Application) {
seen := make(map[string]struct{})
type key struct{ name, addr string }
seen := make(map[key]struct{})
for _, app := range apps {
if _, ok := seen[app.GetName()]; ok {
key := key{app.GetName(), app.GetPublicAddr()}
if _, ok := seen[key]; ok {
continue
}
seen[app.GetName()] = struct{}{}
seen[key] = struct{}{}
result = append(result, app)
}
return result
Expand Down
1 change: 0 additions & 1 deletion api/types/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,5 +333,4 @@ type ListWindowsDesktopsRequest struct {
StartKey, PredicateExpression string
Labels map[string]string
SearchKeywords []string
SortBy SortBy
}
1 change: 0 additions & 1 deletion lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -2856,7 +2856,6 @@ func (a *Server) IterateResourcePages(ctx context.Context, req proto.ListResourc
PredicateExpression: req.PredicateExpression,
Labels: req.Labels,
SearchKeywords: req.SearchKeywords,
SortBy: req.SortBy,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
6 changes: 3 additions & 3 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ func (a *ServerWithRoles) ListResources(ctx context.Context, req proto.ListResou
return false, trace.Wrap(err)
}

switch match, err := services.MatchResourceByFilters(resource, filter); {
switch match, err := services.MatchResourceByFilters(resource, filter, nil /* ignore dup matches */); {
case err != nil:
return false, trace.Wrap(err)
case match:
Expand Down Expand Up @@ -1235,7 +1235,7 @@ func (a *ServerWithRoles) listResourcesWithSort(ctx context.Context, req proto.L
}

// Extract kube clusters into its own list.
clusters := []types.KubeCluster{}
var clusters []types.KubeCluster
for _, svc := range kubeservices {
for _, legacyCluster := range svc.GetKubernetesClusters() {
cluster, err := types.NewKubernetesClusterV3FromLegacyCluster(svc.GetNamespace(), legacyCluster)
Expand All @@ -1246,7 +1246,7 @@ func (a *ServerWithRoles) listResourcesWithSort(ctx context.Context, req proto.L
}
}

sortedClusters := types.KubeClusters(types.DeduplicateKubeClusters(clusters))
sortedClusters := types.KubeClusters(clusters)
if err := sortedClusters.SortByCustom(req.SortBy); err != nil {
return nil, trace.Wrap(err)
}
Expand Down
175 changes: 172 additions & 3 deletions lib/auth/auth_with_roles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2227,15 +2227,16 @@ func TestListResources_KindKubernetesCluster(t *testing.T) {

testNames := []string{"a", "b", "c", "d"}

// Add some kube services.
// Add a kube service with 3 clusters.
kubeService, err := types.NewServer("bar", types.KindKubeService, types.ServerSpecV2{
KubernetesClusters: []*types.KubernetesCluster{{Name: "d"}, {Name: "b"}, {Name: "a"}},
})
require.NoError(t, err)
_, err = s.UpsertKubeServiceV2(ctx, kubeService)
require.NoError(t, err)

// Include a duplicate cluster name to test deduplicate.
// Add a kube service with 2 clusters.
// Includes a duplicate cluster name to test deduplicate.
kubeService, err = types.NewServer("foo", types.KindKubeService, types.ServerSpecV2{
KubernetesClusters: []*types.KubernetesCluster{{Name: "a"}, {Name: "c"}},
})
Expand All @@ -2258,7 +2259,8 @@ func TestListResources_KindKubernetesCluster(t *testing.T) {
require.NoError(t, err)
require.Len(t, res.Resources, len(testNames))
require.Empty(t, res.NextKey)
require.Empty(t, res.TotalCount)
// There is 2 kube services, but 4 unique clusters.
require.Equal(t, 4, res.TotalCount)

clusters, err := types.ResourcesWithLabels(res.Resources).AsKubeClusters()
require.NoError(t, err)
Expand Down Expand Up @@ -2411,3 +2413,170 @@ func TestDeleteUserAppSessions(t *testing.T) {
require.NoError(t, err)
require.Len(t, sessions, 0)
}

func TestListResources_SortAndDeduplicate(t *testing.T) {
t.Parallel()
ctx := context.Background()
srv := newTestTLSServer(t)

// Create user, role, and client.
username := "user"
user, role, err := CreateUserAndRole(srv.Auth(), username, nil)
require.NoError(t, err)
identity := TestUser(user.GetName())
clt, err := srv.NewClient(identity)
require.NoError(t, err)

// Permit user to get all resources.
role.SetWindowsDesktopLabels(types.Allow, types.Labels{types.Wildcard: {types.Wildcard}})
require.NoError(t, srv.Auth().UpsertRole(ctx, role))

// Define some resource names for testing.
names := []string{"d", "b", "d", "a", "a", "b"}
uniqueNames := []string{"a", "b", "d"}

tests := []struct {
name string
kind string
insertResources func()
wantNames []string
}{
{
name: "KindDatabaseServer",
kind: types.KindDatabaseServer,
insertResources: func() {
for i := 0; i < len(names); i++ {
db, err := types.NewDatabaseServerV3(types.Metadata{
Name: fmt.Sprintf("name-%v", i),
}, types.DatabaseServerSpecV3{
HostID: "_",
Hostname: "_",
Database: &types.DatabaseV3{
Metadata: types.Metadata{
Name: names[i],
},
Spec: types.DatabaseSpecV3{
Protocol: "_",
URI: "_",
},
},
})
require.NoError(t, err)
_, err = srv.Auth().UpsertDatabaseServer(ctx, db)
require.NoError(t, err)
}
},
},
{
name: "KindAppServer",
kind: types.KindAppServer,
insertResources: func() {
for i := 0; i < len(names); i++ {
server, err := types.NewAppServerV3(types.Metadata{
Name: fmt.Sprintf("name-%v", i),
}, types.AppServerSpecV3{
HostID: "_",
App: &types.AppV3{Metadata: types.Metadata{Name: names[i]}, Spec: types.AppSpecV3{URI: "_"}},
})
require.NoError(t, err)
_, err = srv.Auth().UpsertApplicationServer(ctx, server)
require.NoError(t, err)
}
},
},
{
name: "KindWindowsDesktop",
kind: types.KindWindowsDesktop,
insertResources: func() {
for i := 0; i < len(names); i++ {
desktop, err := types.NewWindowsDesktopV3(names[i], nil, types.WindowsDesktopSpecV3{
Addr: "_",
HostID: fmt.Sprintf("name-%v", i),
})
require.NoError(t, err)
require.NoError(t, srv.Auth().UpsertWindowsDesktop(ctx, desktop))
}
},
},
{
name: "KindKubernetesCluster",
kind: types.KindKubernetesCluster,
insertResources: func() {
for i := 0; i < len(names); i++ {
server, err := types.NewServer(fmt.Sprintf("name-%v", i), types.KindKubeService, types.ServerSpecV2{
KubernetesClusters: []*types.KubernetesCluster{
// Test dedup inside this list as well as from each service.
{Name: names[i]},
{Name: names[i]},
},
})
require.NoError(t, err)
_, err = srv.Auth().UpsertKubeServiceV2(ctx, server)
require.NoError(t, err)
}
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.insertResources()

// Fetch all resources
fetchedResources := make([]types.ResourceWithLabels, 0, len(uniqueNames))
resp, err := clt.ListResources(ctx, proto.ListResourcesRequest{
ResourceType: tc.kind,
NeedTotalCount: true,
Limit: 2,
SortBy: types.SortBy{Field: types.ResourceMetadataName, IsDesc: true},
})
require.NoError(t, err)
require.Len(t, resp.Resources, 2)
require.Equal(t, len(uniqueNames), resp.TotalCount)
fetchedResources = append(fetchedResources, resp.Resources...)

resp, err = clt.ListResources(ctx, proto.ListResourcesRequest{
ResourceType: tc.kind,
NeedTotalCount: true,
StartKey: resp.NextKey,
Limit: 2,
SortBy: types.SortBy{Field: types.ResourceMetadataName, IsDesc: true},
})
require.NoError(t, err)
require.Len(t, resp.Resources, 1)
require.Equal(t, len(uniqueNames), resp.TotalCount)
fetchedResources = append(fetchedResources, resp.Resources...)

r := types.ResourcesWithLabels(fetchedResources)
var extractedErr error
var extractedNames []string

switch tc.kind {
case types.KindDatabaseServer:
s, err := r.AsDatabaseServers()
require.NoError(t, err)
extractedNames, extractedErr = types.DatabaseServers(s).GetFieldVals(types.ResourceMetadataName)

case types.KindAppServer:
s, err := r.AsAppServers()
require.NoError(t, err)
extractedNames, extractedErr = types.AppServers(s).GetFieldVals(types.ResourceMetadataName)

case types.KindWindowsDesktop:
s, err := r.AsWindowsDesktops()
require.NoError(t, err)
extractedNames, extractedErr = types.WindowsDesktops(s).GetFieldVals(types.ResourceMetadataName)

default:
s, err := r.AsKubeClusters()
require.NoError(t, err)
require.Len(t, s, 3)
extractedNames, extractedErr = types.KubeClusters(s).GetFieldVals(types.ResourceMetadataName)
}

require.NoError(t, extractedErr)
require.ElementsMatch(t, uniqueNames, extractedNames)
require.IsDecreasing(t, extractedNames)
})
}
}
2 changes: 1 addition & 1 deletion lib/auth/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2206,7 +2206,7 @@ func TestListResources(t *testing.T) {
require.NoError(t, err)
require.Len(t, resp.Resources, 2)
require.Empty(t, resp.NextKey)
require.Empty(t, resp.TotalCount)
require.Equal(t, 2, resp.TotalCount)
}

// Test listing with NeedTotalCount flag.
Expand Down
83 changes: 70 additions & 13 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,19 +701,16 @@ func benchGetNodes(b *testing.B, nodeCount int) {
ctx := context.Background()

for i := 0; i < nodeCount; i++ {
func() {
server := suite.NewServer(types.KindNode, uuid.New().String(), "127.0.0.1:2022", apidefaults.Namespace)
_, err := p.presenceS.UpsertNode(ctx, server)
require.NoError(b, err)
timeout := time.NewTimer(time.Millisecond * 200)
defer timeout.Stop()
select {
case event := <-p.eventsC:
require.Equal(b, EventProcessed, event.Type)
case <-timeout.C:
b.Fatalf("timeout waiting for event, iteration=%d", i)
}
}()
server := suite.NewServer(types.KindNode, uuid.New().String(), "127.0.0.1:2022", apidefaults.Namespace)
_, err := p.presenceS.UpsertNode(ctx, server)
require.NoError(b, err)

select {
case event := <-p.eventsC:
require.Equal(b, EventProcessed, event.Type)
case <-time.After(200 * time.Millisecond):
b.Fatalf("timeout waiting for event, iteration=%d", i)
}
}

b.ResetTimer()
Expand Down Expand Up @@ -781,6 +778,66 @@ func benchListNodes(b *testing.B, nodeCount int, pageSize int) {
}
}

/*
goos: linux
goarch: amd64
pkg: github.com/gravitational/teleport/lib/cache
cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz
BenchmarkListResourcesWithSort-8 1 2351035036 ns/op
*/
func BenchmarkListResourcesWithSort(b *testing.B) {
p, err := newPack(b.TempDir(), ForAuth, memoryBackend(true))
require.NoError(b, err)
defer p.Close()

ctx := context.Background()

count := 100000
for i := 0; i < count; i++ {
server := suite.NewServer(types.KindNode, uuid.New().String(), "127.0.0.1:2022", apidefaults.Namespace)
// Set some static and dynamic labels.
server.Metadata.Labels = map[string]string{"os": "mac", "env": "prod", "country": "us", "tier": "frontend"}
server.Spec.CmdLabels = map[string]types.CommandLabelV2{
"version": {Result: "v8"},
"time": {Result: "now"},
}
_, err := p.presenceS.UpsertNode(ctx, server)
require.NoError(b, err)

select {
case event := <-p.eventsC:
require.Equal(b, EventProcessed, event.Type)
case <-time.After(200 * time.Millisecond):
b.Fatalf("timeout waiting for event, iteration=%d", i)
}
}

b.ResetTimer()

for _, limit := range []int32{100, 1_000, 10_000, 100_000} {
for _, totalCount := range []bool{true, false} {
b.Run(fmt.Sprintf("limit=%d,needTotal=%t", limit, totalCount), func(b *testing.B) {
for n := 0; n < b.N; n++ {
resp, err := p.cache.ListResources(ctx, proto.ListResourcesRequest{
ResourceType: types.KindNode,
Namespace: apidefaults.Namespace,
SortBy: types.SortBy{
IsDesc: true,
Field: types.ResourceSpecHostname,
},
// Predicate is the more expensive filter.
PredicateExpression: `search("mac", "frontend") && labels.version == "v8"`,
Limit: limit,
NeedTotalCount: totalCount,
})
require.NoError(b, err)
require.Len(b, resp.Resources, int(limit))
}
})
}
}
}

// TestListResources_NodesTTLVariant verifies that the custom ListNodes impl that we fallback to when
// using ttl-based caching works as expected.
func TestListResources_NodesTTLVariant(t *testing.T) {
Expand Down
Loading