Skip to content

Commit

Permalink
Add de-duplicating apps, dbs, and desktops when sorting/totalCount is…
Browse files Browse the repository at this point in the history
… needed (#12685) (#13451)

Fixes total count errors for the web UI:
- Adds de-duplicating matches for `FakePaginate` func.
- This change does makes `FakePaginate` less efficient b/c
  it will always run filter for every resource per fetch versus just
  running it on the initial fetch to get the total count.
- Removes inaccurate de-dupping in the web api layer since we
  do it from the back now
- Adds another criteria when de-duplicating `applications` by checking
  for its `public_addr` as well as its name.
  • Loading branch information
kimlisa authored Jun 14, 2022
1 parent 0e40558 commit 6f33a6a
Show file tree
Hide file tree
Showing 18 changed files with 500 additions and 152 deletions.
8 changes: 8 additions & 0 deletions api/client/proto/authservice.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions api/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1484,6 +1484,14 @@ message PaginatedResource {

// ListResourcesRequest defines a request to retrieve resources paginated. Only
// one type of resource can be retrieved per request.
//
// NOTE: There are two paths this request can take:
// 1. ListResources: the more efficient path that retrieves resources by subset
// at a time defined by field 'Limit'. Does NOT de-duplicate matches.
// 2. listResourcesWithSort: the less efficient path that retrieves all resources
// upfront by falling back to the traditional GetXXX calls. Used when sorting (SortBy),
// total count of resources (NeedTotalCount), or ResourceType `KindKubernetesCluster`
// is requested. Matches are de-duplicated.
message ListResourcesRequest {
// ResourceType is the resource that is going to be retrieved.
string ResourceType = 1 [ (gogoproto.jsontag) = "resource_type,omitempty" ];
Expand Down
11 changes: 7 additions & 4 deletions api/types/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,14 +301,17 @@ func (a *AppV3) CheckAndSetDefaults() error {
return nil
}

// DeduplicateApps deduplicates apps by name.
// DeduplicateApps deduplicates apps by combination of app name and public address.
// Apps can have the same name but also could have different addresses.
func DeduplicateApps(apps []Application) (result []Application) {
seen := make(map[string]struct{})
type key struct{ name, addr string }
seen := make(map[key]struct{})
for _, app := range apps {
if _, ok := seen[app.GetName()]; ok {
key := key{app.GetName(), app.GetPublicAddr()}
if _, ok := seen[key]; ok {
continue
}
seen[app.GetName()] = struct{}{}
seen[key] = struct{}{}
result = append(result, app)
}
return result
Expand Down
1 change: 0 additions & 1 deletion api/types/desktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,5 +333,4 @@ type ListWindowsDesktopsRequest struct {
StartKey, PredicateExpression string
Labels map[string]string
SearchKeywords []string
SortBy SortBy
}
1 change: 0 additions & 1 deletion lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -2856,7 +2856,6 @@ func (a *Server) IterateResourcePages(ctx context.Context, req proto.ListResourc
PredicateExpression: req.PredicateExpression,
Labels: req.Labels,
SearchKeywords: req.SearchKeywords,
SortBy: req.SortBy,
})
if err != nil {
return nil, trace.Wrap(err)
Expand Down
6 changes: 3 additions & 3 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ func (a *ServerWithRoles) ListResources(ctx context.Context, req proto.ListResou
return false, trace.Wrap(err)
}

switch match, err := services.MatchResourceByFilters(resource, filter); {
switch match, err := services.MatchResourceByFilters(resource, filter, nil /* ignore dup matches */); {
case err != nil:
return false, trace.Wrap(err)
case match:
Expand Down Expand Up @@ -1235,7 +1235,7 @@ func (a *ServerWithRoles) listResourcesWithSort(ctx context.Context, req proto.L
}

// Extract kube clusters into its own list.
clusters := []types.KubeCluster{}
var clusters []types.KubeCluster
for _, svc := range kubeservices {
for _, legacyCluster := range svc.GetKubernetesClusters() {
cluster, err := types.NewKubernetesClusterV3FromLegacyCluster(svc.GetNamespace(), legacyCluster)
Expand All @@ -1246,7 +1246,7 @@ func (a *ServerWithRoles) listResourcesWithSort(ctx context.Context, req proto.L
}
}

sortedClusters := types.KubeClusters(types.DeduplicateKubeClusters(clusters))
sortedClusters := types.KubeClusters(clusters)
if err := sortedClusters.SortByCustom(req.SortBy); err != nil {
return nil, trace.Wrap(err)
}
Expand Down
175 changes: 172 additions & 3 deletions lib/auth/auth_with_roles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2227,15 +2227,16 @@ func TestListResources_KindKubernetesCluster(t *testing.T) {

testNames := []string{"a", "b", "c", "d"}

// Add some kube services.
// Add a kube service with 3 clusters.
kubeService, err := types.NewServer("bar", types.KindKubeService, types.ServerSpecV2{
KubernetesClusters: []*types.KubernetesCluster{{Name: "d"}, {Name: "b"}, {Name: "a"}},
})
require.NoError(t, err)
_, err = s.UpsertKubeServiceV2(ctx, kubeService)
require.NoError(t, err)

// Include a duplicate cluster name to test deduplicate.
// Add a kube service with 2 clusters.
// Includes a duplicate cluster name to test deduplicate.
kubeService, err = types.NewServer("foo", types.KindKubeService, types.ServerSpecV2{
KubernetesClusters: []*types.KubernetesCluster{{Name: "a"}, {Name: "c"}},
})
Expand All @@ -2258,7 +2259,8 @@ func TestListResources_KindKubernetesCluster(t *testing.T) {
require.NoError(t, err)
require.Len(t, res.Resources, len(testNames))
require.Empty(t, res.NextKey)
require.Empty(t, res.TotalCount)
// There is 2 kube services, but 4 unique clusters.
require.Equal(t, 4, res.TotalCount)

clusters, err := types.ResourcesWithLabels(res.Resources).AsKubeClusters()
require.NoError(t, err)
Expand Down Expand Up @@ -2411,3 +2413,170 @@ func TestDeleteUserAppSessions(t *testing.T) {
require.NoError(t, err)
require.Len(t, sessions, 0)
}

func TestListResources_SortAndDeduplicate(t *testing.T) {
t.Parallel()
ctx := context.Background()
srv := newTestTLSServer(t)

// Create user, role, and client.
username := "user"
user, role, err := CreateUserAndRole(srv.Auth(), username, nil)
require.NoError(t, err)
identity := TestUser(user.GetName())
clt, err := srv.NewClient(identity)
require.NoError(t, err)

// Permit user to get all resources.
role.SetWindowsDesktopLabels(types.Allow, types.Labels{types.Wildcard: {types.Wildcard}})
require.NoError(t, srv.Auth().UpsertRole(ctx, role))

// Define some resource names for testing.
names := []string{"d", "b", "d", "a", "a", "b"}
uniqueNames := []string{"a", "b", "d"}

tests := []struct {
name string
kind string
insertResources func()
wantNames []string
}{
{
name: "KindDatabaseServer",
kind: types.KindDatabaseServer,
insertResources: func() {
for i := 0; i < len(names); i++ {
db, err := types.NewDatabaseServerV3(types.Metadata{
Name: fmt.Sprintf("name-%v", i),
}, types.DatabaseServerSpecV3{
HostID: "_",
Hostname: "_",
Database: &types.DatabaseV3{
Metadata: types.Metadata{
Name: names[i],
},
Spec: types.DatabaseSpecV3{
Protocol: "_",
URI: "_",
},
},
})
require.NoError(t, err)
_, err = srv.Auth().UpsertDatabaseServer(ctx, db)
require.NoError(t, err)
}
},
},
{
name: "KindAppServer",
kind: types.KindAppServer,
insertResources: func() {
for i := 0; i < len(names); i++ {
server, err := types.NewAppServerV3(types.Metadata{
Name: fmt.Sprintf("name-%v", i),
}, types.AppServerSpecV3{
HostID: "_",
App: &types.AppV3{Metadata: types.Metadata{Name: names[i]}, Spec: types.AppSpecV3{URI: "_"}},
})
require.NoError(t, err)
_, err = srv.Auth().UpsertApplicationServer(ctx, server)
require.NoError(t, err)
}
},
},
{
name: "KindWindowsDesktop",
kind: types.KindWindowsDesktop,
insertResources: func() {
for i := 0; i < len(names); i++ {
desktop, err := types.NewWindowsDesktopV3(names[i], nil, types.WindowsDesktopSpecV3{
Addr: "_",
HostID: fmt.Sprintf("name-%v", i),
})
require.NoError(t, err)
require.NoError(t, srv.Auth().UpsertWindowsDesktop(ctx, desktop))
}
},
},
{
name: "KindKubernetesCluster",
kind: types.KindKubernetesCluster,
insertResources: func() {
for i := 0; i < len(names); i++ {
server, err := types.NewServer(fmt.Sprintf("name-%v", i), types.KindKubeService, types.ServerSpecV2{
KubernetesClusters: []*types.KubernetesCluster{
// Test dedup inside this list as well as from each service.
{Name: names[i]},
{Name: names[i]},
},
})
require.NoError(t, err)
_, err = srv.Auth().UpsertKubeServiceV2(ctx, server)
require.NoError(t, err)
}
},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.insertResources()

// Fetch all resources
fetchedResources := make([]types.ResourceWithLabels, 0, len(uniqueNames))
resp, err := clt.ListResources(ctx, proto.ListResourcesRequest{
ResourceType: tc.kind,
NeedTotalCount: true,
Limit: 2,
SortBy: types.SortBy{Field: types.ResourceMetadataName, IsDesc: true},
})
require.NoError(t, err)
require.Len(t, resp.Resources, 2)
require.Equal(t, len(uniqueNames), resp.TotalCount)
fetchedResources = append(fetchedResources, resp.Resources...)

resp, err = clt.ListResources(ctx, proto.ListResourcesRequest{
ResourceType: tc.kind,
NeedTotalCount: true,
StartKey: resp.NextKey,
Limit: 2,
SortBy: types.SortBy{Field: types.ResourceMetadataName, IsDesc: true},
})
require.NoError(t, err)
require.Len(t, resp.Resources, 1)
require.Equal(t, len(uniqueNames), resp.TotalCount)
fetchedResources = append(fetchedResources, resp.Resources...)

r := types.ResourcesWithLabels(fetchedResources)
var extractedErr error
var extractedNames []string

switch tc.kind {
case types.KindDatabaseServer:
s, err := r.AsDatabaseServers()
require.NoError(t, err)
extractedNames, extractedErr = types.DatabaseServers(s).GetFieldVals(types.ResourceMetadataName)

case types.KindAppServer:
s, err := r.AsAppServers()
require.NoError(t, err)
extractedNames, extractedErr = types.AppServers(s).GetFieldVals(types.ResourceMetadataName)

case types.KindWindowsDesktop:
s, err := r.AsWindowsDesktops()
require.NoError(t, err)
extractedNames, extractedErr = types.WindowsDesktops(s).GetFieldVals(types.ResourceMetadataName)

default:
s, err := r.AsKubeClusters()
require.NoError(t, err)
require.Len(t, s, 3)
extractedNames, extractedErr = types.KubeClusters(s).GetFieldVals(types.ResourceMetadataName)
}

require.NoError(t, extractedErr)
require.ElementsMatch(t, uniqueNames, extractedNames)
require.IsDecreasing(t, extractedNames)
})
}
}
2 changes: 1 addition & 1 deletion lib/auth/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2206,7 +2206,7 @@ func TestListResources(t *testing.T) {
require.NoError(t, err)
require.Len(t, resp.Resources, 2)
require.Empty(t, resp.NextKey)
require.Empty(t, resp.TotalCount)
require.Equal(t, 2, resp.TotalCount)
}

// Test listing with NeedTotalCount flag.
Expand Down
83 changes: 70 additions & 13 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -701,19 +701,16 @@ func benchGetNodes(b *testing.B, nodeCount int) {
ctx := context.Background()

for i := 0; i < nodeCount; i++ {
func() {
server := suite.NewServer(types.KindNode, uuid.New().String(), "127.0.0.1:2022", apidefaults.Namespace)
_, err := p.presenceS.UpsertNode(ctx, server)
require.NoError(b, err)
timeout := time.NewTimer(time.Millisecond * 200)
defer timeout.Stop()
select {
case event := <-p.eventsC:
require.Equal(b, EventProcessed, event.Type)
case <-timeout.C:
b.Fatalf("timeout waiting for event, iteration=%d", i)
}
}()
server := suite.NewServer(types.KindNode, uuid.New().String(), "127.0.0.1:2022", apidefaults.Namespace)
_, err := p.presenceS.UpsertNode(ctx, server)
require.NoError(b, err)

select {
case event := <-p.eventsC:
require.Equal(b, EventProcessed, event.Type)
case <-time.After(200 * time.Millisecond):
b.Fatalf("timeout waiting for event, iteration=%d", i)
}
}

b.ResetTimer()
Expand Down Expand Up @@ -781,6 +778,66 @@ func benchListNodes(b *testing.B, nodeCount int, pageSize int) {
}
}

/*
goos: linux
goarch: amd64
pkg: github.com/gravitational/teleport/lib/cache
cpu: Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz
BenchmarkListResourcesWithSort-8 1 2351035036 ns/op
*/
func BenchmarkListResourcesWithSort(b *testing.B) {
p, err := newPack(b.TempDir(), ForAuth, memoryBackend(true))
require.NoError(b, err)
defer p.Close()

ctx := context.Background()

count := 100000
for i := 0; i < count; i++ {
server := suite.NewServer(types.KindNode, uuid.New().String(), "127.0.0.1:2022", apidefaults.Namespace)
// Set some static and dynamic labels.
server.Metadata.Labels = map[string]string{"os": "mac", "env": "prod", "country": "us", "tier": "frontend"}
server.Spec.CmdLabels = map[string]types.CommandLabelV2{
"version": {Result: "v8"},
"time": {Result: "now"},
}
_, err := p.presenceS.UpsertNode(ctx, server)
require.NoError(b, err)

select {
case event := <-p.eventsC:
require.Equal(b, EventProcessed, event.Type)
case <-time.After(200 * time.Millisecond):
b.Fatalf("timeout waiting for event, iteration=%d", i)
}
}

b.ResetTimer()

for _, limit := range []int32{100, 1_000, 10_000, 100_000} {
for _, totalCount := range []bool{true, false} {
b.Run(fmt.Sprintf("limit=%d,needTotal=%t", limit, totalCount), func(b *testing.B) {
for n := 0; n < b.N; n++ {
resp, err := p.cache.ListResources(ctx, proto.ListResourcesRequest{
ResourceType: types.KindNode,
Namespace: apidefaults.Namespace,
SortBy: types.SortBy{
IsDesc: true,
Field: types.ResourceSpecHostname,
},
// Predicate is the more expensive filter.
PredicateExpression: `search("mac", "frontend") && labels.version == "v8"`,
Limit: limit,
NeedTotalCount: totalCount,
})
require.NoError(b, err)
require.Len(b, resp.Resources, int(limit))
}
})
}
}
}

// TestListResources_NodesTTLVariant verifies that the custom ListNodes impl that we fallback to when
// using ttl-based caching works as expected.
func TestListResources_NodesTTLVariant(t *testing.T) {
Expand Down
Loading

0 comments on commit 6f33a6a

Please sign in to comment.