Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,143 changes: 1,299 additions & 844 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2154,6 +2154,15 @@ message InventoryStatusSummary {
repeated UpstreamInventoryHello Connected = 1 [(gogoproto.nullable) = false];
}

// InventoryConnectedServiceCountsRequest requests inventory connected service counts.
message InventoryConnectedServiceCountsRequest {}

// InventoryConnectedServiceCounts is the connected service counts seen in the inventory.
message InventoryConnectedServiceCounts {
// ServiceCounts is the count of each connected service seen in the inventory.
map<string, uint64> ServiceCounts = 1 [(gogoproto.castkey) = "github.com/gravitational/teleport/api/types.SystemRole"];
}

// InventoryPingRequest is used to request that the specified server be sent an inventory ping
// if it has a control stream registered.
message InventoryPingRequest {
Expand Down Expand Up @@ -2305,6 +2314,9 @@ service AuthService {
// GetInventoryStatus gets information about current instance inventory.
rpc GetInventoryStatus(InventoryStatusRequest) returns (InventoryStatusSummary);

// GetInventoryConnectedServiceCounts returns the counts of each connected service seen in the inventory.
rpc GetInventoryConnectedServiceCounts(InventoryConnectedServiceCountsRequest) returns (InventoryConnectedServiceCounts);

// PingInventory attempts to trigger a downstream inventory ping (used in testing/debug).
rpc PingInventory(InventoryPingRequest) returns (InventoryPingResponse);

Expand Down
12 changes: 12 additions & 0 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3440,6 +3440,18 @@ func (a *Server) GetInventoryStatus(ctx context.Context, req proto.InventoryStat
return rsp
}

// GetInventoryConnectedServiceCounts returns the counts of each connected service seen in the inventory.
func (a *Server) GetInventoryConnectedServiceCounts() proto.InventoryConnectedServiceCounts {
return proto.InventoryConnectedServiceCounts{
ServiceCounts: a.inventory.ConnectedServiceCounts(),
}
}

// GetInventoryConnectedServiceCount returns the counts of a particular connected service seen in the inventory.
func (a *Server) GetInventoryConnectedServiceCount(service types.SystemRole) uint64 {
return a.inventory.ConnectedServiceCount(service)
}

func (a *Server) PingInventory(ctx context.Context, req proto.InventoryPingRequest) (proto.InventoryPingResponse, error) {
const pingAttempt = "ping-attempt"
const pingSuccess = "ping-success"
Expand Down
10 changes: 10 additions & 0 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,16 @@ func (a *ServerWithRoles) GetInventoryStatus(ctx context.Context, req proto.Inve
return a.authServer.GetInventoryStatus(ctx, req), nil
}

// GetInventoryConnectedServiceCounts returns the counts of each connected service seen in the inventory.
func (a *ServerWithRoles) GetInventoryConnectedServiceCounts() (proto.InventoryConnectedServiceCounts, error) {
// only support builtin roles for now, but we'll eventually want to develop an RBAC syntax for
// the inventory APIs once they are more developed.
if !a.hasBuiltinRole(types.RoleAdmin) {
return proto.InventoryConnectedServiceCounts{}, trace.AccessDenied("requires builtin admin role")
}
return a.authServer.GetInventoryConnectedServiceCounts(), nil
}

func (a *ServerWithRoles) PingInventory(ctx context.Context, req proto.InventoryPingRequest) (proto.InventoryPingResponse, error) {
// admin-only for now, but we'll eventually want to develop an RBAC syntax for
// the inventory APIs once they are more developed.
Expand Down
15 changes: 15 additions & 0 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,21 @@ func (g *GRPCServer) GetInventoryStatus(ctx context.Context, req *proto.Inventor
return &rsp, nil
}

// GetInventoryConnectedServiceCounts returns the counts of each connected service seen in the inventory.
func (g *GRPCServer) GetInventoryConnectedServiceCounts(ctx context.Context, _ *proto.InventoryConnectedServiceCountsRequest) (*proto.InventoryConnectedServiceCounts, error) {
auth, err := g.authenticate(ctx)
if err != nil {
return nil, trail.ToGRPC(err)
}

rsp, err := auth.GetInventoryConnectedServiceCounts()
if err != nil {
return nil, trail.ToGRPC(err)
}

return &rsp, nil
}

func (g *GRPCServer) PingInventory(ctx context.Context, req *proto.InventoryPingRequest) (*proto.InventoryPingResponse, error) {
auth, err := g.authenticate(ctx)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func withTestEventsChannel(ch chan testEvent) ControllerOption {
// messages are processed by invoking the appropriate methods on the Auth interface.
type Controller struct {
store *Store
serviceCounter *serviceCounter
auth Auth
authID string
serverKeepAlive time.Duration
Expand All @@ -162,6 +163,7 @@ func NewController(auth Auth, usageReporter usagereporter.UsageReporter, opts ..
ctx, cancel := context.WithCancel(context.Background())
return &Controller{
store: NewStore(),
serviceCounter: &serviceCounter{},
serverKeepAlive: options.serverKeepAlive,
serverTTL: apidefaults.ServerAnnounceTTL,
instanceHBInterval: options.instanceHBInterval,
Expand Down Expand Up @@ -207,6 +209,16 @@ func (c *Controller) Iter(fn func(UpstreamHandle)) {
c.store.Iter(fn)
}

// ConnectedServiceCounts returns the number of each connected service seen in the inventory.
func (c *Controller) ConnectedServiceCounts() map[types.SystemRole]uint64 {
return c.serviceCounter.counts()
}

// ConnectedServiceCount returns the number of a particular connected service in the inventory.
func (c *Controller) ConnectedServiceCount(systemRole types.SystemRole) uint64 {
return c.serviceCounter.get(systemRole)
}

func (c *Controller) testEvent(event testEvent) {
if c.testEvents == nil {
return
Expand All @@ -218,7 +230,15 @@ func (c *Controller) testEvent(event testEvent) {
// and also manages keepalives for previously heartbeated state.
func (c *Controller) handleControlStream(handle *upstreamHandle) {
c.testEvent(handlerStart)

for _, service := range handle.hello.Services {
c.serviceCounter.increment(service)
}

defer func() {
for _, service := range handle.hello.Services {
c.serviceCounter.decrement(service)
}
c.store.Remove(handle)
handle.Close() // no effect if CloseWithError was called below
handle.ticker.Stop()
Expand Down
25 changes: 24 additions & 1 deletion lib/inventory/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestInstanceHeartbeat(t *testing.T) {
controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{
ServerID: serverID,
Version: teleport.Version,
Services: []types.SystemRole{types.RoleNode},
Services: []types.SystemRole{types.RoleNode, types.RoleApp},
})

// verify that control stream handle is now accessible
Expand All @@ -345,6 +345,13 @@ func TestInstanceHeartbeat(t *testing.T) {
deny(instanceHeartbeatErr, instanceCompareFailed, handlerClose),
)

// verify the service counter shows the correct number for the given services.
require.Equal(t, uint64(1), controller.serviceCounter.get(types.RoleNode))
require.Equal(t, uint64(1), controller.serviceCounter.get(types.RoleApp))

// this service was not seen, so it should be 0.
require.Equal(t, uint64(0), controller.serviceCounter.get(types.RoleOkta))

auth.mu.Lock()
auth.lastInstance.AppendControlLog(types.InstanceControlLogEntry{
Type: "concurrent-test-event",
Expand Down Expand Up @@ -499,6 +506,14 @@ func TestInstanceHeartbeat(t *testing.T) {
)
}

// verify the service counter shows the correct number for the given services.
require.Equal(t, map[types.SystemRole]uint64{
types.RoleApp: 1,
types.RoleNode: 1,
}, controller.ConnectedServiceCounts())
require.Equal(t, uint64(1), controller.ConnectedServiceCount(types.RoleNode))
require.Equal(t, uint64(1), controller.ConnectedServiceCount(types.RoleApp))

// verify that none of the qualified events were ever heartbeat because
// a reset always occurred.
var unqualifiedIncludes int
Expand Down Expand Up @@ -542,6 +557,14 @@ func TestInstanceHeartbeat(t *testing.T) {
logSize := len(auth.lastInstance.GetControlLog())
auth.mu.Unlock()
require.Greater(t, logSize, 2)

// verify the service counter now shows no connected services.
require.Equal(t, map[types.SystemRole]uint64{
types.RoleApp: 0,
types.RoleNode: 0,
}, controller.ConnectedServiceCounts())
require.Equal(t, uint64(0), controller.ConnectedServiceCount(types.RoleNode))
require.Equal(t, uint64(0), controller.ConnectedServiceCount(types.RoleApp))
}

type eventOpts struct {
Expand Down
69 changes: 69 additions & 0 deletions lib/inventory/servicecounter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
Copyright 2023 Gravitational, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package inventory

import (
"sync"
"sync/atomic"

"github.com/gravitational/teleport/api/types"
)

// serviceCounter will count services seen in the inventory.
type serviceCounter struct {
countMap sync.Map
}

// counts returns the count of each service seen in the counter.
func (s *serviceCounter) counts() map[types.SystemRole]uint64 {
counts := map[types.SystemRole]uint64{}
s.countMap.Range(func(key, value any) bool {
counts[key.(types.SystemRole)] = value.(*atomic.Uint64).Load()
return true
})

return counts
}

// increment will increment the counter for a service.
func (s *serviceCounter) increment(service types.SystemRole) {
s.load(service).Add(1)
}

// decrement will decrement the counter for a service.
func (s *serviceCounter) decrement(service types.SystemRole) {
// refer to the docs for atomic.AddUint64 for why this works as a decrement.
s.load(service).Add(^uint64(0))
}

// get will return the value of a counter for a service.
func (s *serviceCounter) get(service types.SystemRole) uint64 {
if result, ok := s.countMap.Load(service); ok {
return result.(*atomic.Uint64).Load()
}
return 0
}

// load will load the underlying atomic value in the sync map. This should
// only be used within the service counter.
func (s *serviceCounter) load(service types.SystemRole) *atomic.Uint64 {
if result, ok := s.countMap.Load(service); ok {
return result.(*atomic.Uint64)
}
result, _ := s.countMap.LoadOrStore(service, &atomic.Uint64{})
return result.(*atomic.Uint64)
}
45 changes: 45 additions & 0 deletions lib/inventory/servicecounter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2023 Gravitational, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package inventory

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/api/types"
)

func TestServiceCounter(t *testing.T) {
sc := serviceCounter{}

require.Equal(t, uint64(0), sc.get(types.RoleAuth))

sc.increment(types.RoleApp)
require.Equal(t, uint64(1), sc.get(types.RoleApp))
sc.increment(types.RoleApp)
require.Equal(t, uint64(2), sc.get(types.RoleApp))

require.Equal(t, map[types.SystemRole]uint64{
types.RoleApp: 2,
}, sc.counts())

sc.decrement(types.RoleApp)
require.Equal(t, uint64(1), sc.get(types.RoleApp))
sc.decrement(types.RoleApp)
require.Equal(t, uint64(0), sc.get(types.RoleApp))
}