Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,143 changes: 1,299 additions & 844 deletions api/client/proto/authservice.pb.go

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions api/proto/teleport/legacy/client/proto/authservice.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2154,6 +2154,15 @@ message InventoryStatusSummary {
repeated UpstreamInventoryHello Connected = 1 [(gogoproto.nullable) = false];
}

// InventoryConnectedServiceCountsRequest requests inventory connected service counts.
message InventoryConnectedServiceCountsRequest {}

// InventoryConnectedServiceCounts is the connected service counts seen in the inventory.
message InventoryConnectedServiceCounts {
// ServiceCounts is the count of each connected service seen in the inventory.
map<string, uint64> ServiceCounts = 1 [(gogoproto.castkey) = "github.com/gravitational/teleport/api/types.SystemRole"];
}

// InventoryPingRequest is used to request that the specified server be sent an inventory ping
// if it has a control stream registered.
message InventoryPingRequest {
Expand Down Expand Up @@ -2305,6 +2314,9 @@ service AuthService {
// GetInventoryStatus gets information about current instance inventory.
rpc GetInventoryStatus(InventoryStatusRequest) returns (InventoryStatusSummary);

// GetInventoryConnectedServiceCounts returns the counts of each connected service seen in the inventory.
rpc GetInventoryConnectedServiceCounts(InventoryConnectedServiceCountsRequest) returns (InventoryConnectedServiceCounts);

// PingInventory attempts to trigger a downstream inventory ping (used in testing/debug).
rpc PingInventory(InventoryPingRequest) returns (InventoryPingResponse);

Expand Down
12 changes: 12 additions & 0 deletions lib/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -3444,6 +3444,18 @@ func (a *Server) GetInventoryStatus(ctx context.Context, req proto.InventoryStat
return rsp
}

// GetInventoryConnectedServiceCounts returns the counts of each connected service seen in the inventory.
func (a *Server) GetInventoryConnectedServiceCounts() proto.InventoryConnectedServiceCounts {
return proto.InventoryConnectedServiceCounts{
ServiceCounts: a.inventory.ConnectedServiceCounts(),
}
}

// GetInventoryConnectedServiceCount returns the counts of a particular connected service seen in the inventory.
func (a *Server) GetInventoryConnectedServiceCount(service types.SystemRole) uint64 {
return a.inventory.ConnectedServiceCount(service)
}

func (a *Server) PingInventory(ctx context.Context, req proto.InventoryPingRequest) (proto.InventoryPingResponse, error) {
const pingAttempt = "ping-attempt"
const pingSuccess = "ping-success"
Expand Down
10 changes: 10 additions & 0 deletions lib/auth/auth_with_roles.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,16 @@ func (a *ServerWithRoles) GetInventoryStatus(ctx context.Context, req proto.Inve
return a.authServer.GetInventoryStatus(ctx, req), nil
}

// GetInventoryConnectedServiceCounts returns the counts of each connected service seen in the inventory.
func (a *ServerWithRoles) GetInventoryConnectedServiceCounts() (proto.InventoryConnectedServiceCounts, error) {
// only support builtin roles for now, but we'll eventually want to develop an RBAC syntax for
// the inventory APIs once they are more developed.
if !a.hasBuiltinRole(types.RoleAdmin) {
return proto.InventoryConnectedServiceCounts{}, trace.AccessDenied("requires builtin admin role")
}
return a.authServer.GetInventoryConnectedServiceCounts(), nil
}

func (a *ServerWithRoles) PingInventory(ctx context.Context, req proto.InventoryPingRequest) (proto.InventoryPingResponse, error) {
// admin-only for now, but we'll eventually want to develop an RBAC syntax for
// the inventory APIs once they are more developed.
Expand Down
15 changes: 15 additions & 0 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,21 @@ func (g *GRPCServer) GetInventoryStatus(ctx context.Context, req *proto.Inventor
return &rsp, nil
}

// GetInventoryConnectedServiceCounts returns the counts of each connected service seen in the inventory.
func (g *GRPCServer) GetInventoryConnectedServiceCounts(ctx context.Context, _ *proto.InventoryConnectedServiceCountsRequest) (*proto.InventoryConnectedServiceCounts, error) {
auth, err := g.authenticate(ctx)
if err != nil {
return nil, trail.ToGRPC(err)
}

rsp, err := auth.GetInventoryConnectedServiceCounts()
if err != nil {
return nil, trail.ToGRPC(err)
}

return &rsp, nil
}

func (g *GRPCServer) PingInventory(ctx context.Context, req *proto.InventoryPingRequest) (*proto.InventoryPingResponse, error) {
auth, err := g.authenticate(ctx)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions lib/inventory/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func withTestEventsChannel(ch chan testEvent) ControllerOption {
// messages are processed by invoking the appropriate methods on the Auth interface.
type Controller struct {
store *Store
serviceCounter *serviceCounter
auth Auth
authID string
serverKeepAlive time.Duration
Expand All @@ -162,6 +163,7 @@ func NewController(auth Auth, usageReporter usagereporter.UsageReporter, opts ..
ctx, cancel := context.WithCancel(context.Background())
return &Controller{
store: NewStore(),
serviceCounter: &serviceCounter{},
serverKeepAlive: options.serverKeepAlive,
serverTTL: apidefaults.ServerAnnounceTTL,
instanceHBInterval: options.instanceHBInterval,
Expand Down Expand Up @@ -207,6 +209,16 @@ func (c *Controller) Iter(fn func(UpstreamHandle)) {
c.store.Iter(fn)
}

// ConnectedServiceCounts returns the number of each connected service seen in the inventory.
func (c *Controller) ConnectedServiceCounts() map[types.SystemRole]uint64 {
return c.serviceCounter.counts()
}

// ConnectedServiceCount returns the number of a particular connected service in the inventory.
func (c *Controller) ConnectedServiceCount(systemRole types.SystemRole) uint64 {
return c.serviceCounter.get(systemRole)
}

func (c *Controller) testEvent(event testEvent) {
if c.testEvents == nil {
return
Expand All @@ -218,7 +230,15 @@ func (c *Controller) testEvent(event testEvent) {
// and also manages keepalives for previously heartbeated state.
func (c *Controller) handleControlStream(handle *upstreamHandle) {
c.testEvent(handlerStart)

for _, service := range handle.hello.Services {
c.serviceCounter.increment(service)
}

defer func() {
for _, service := range handle.hello.Services {
c.serviceCounter.decrement(service)
}
c.store.Remove(handle)
handle.Close() // no effect if CloseWithError was called below
handle.ticker.Stop()
Expand Down
25 changes: 24 additions & 1 deletion lib/inventory/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func TestInstanceHeartbeat(t *testing.T) {
controller.RegisterControlStream(upstream, proto.UpstreamInventoryHello{
ServerID: serverID,
Version: teleport.Version,
Services: []types.SystemRole{types.RoleNode},
Services: []types.SystemRole{types.RoleNode, types.RoleApp},
})

// verify that control stream handle is now accessible
Expand All @@ -345,6 +345,13 @@ func TestInstanceHeartbeat(t *testing.T) {
deny(instanceHeartbeatErr, instanceCompareFailed, handlerClose),
)

// verify the service counter shows the correct number for the given services.
require.Equal(t, uint64(1), controller.serviceCounter.get(types.RoleNode))
require.Equal(t, uint64(1), controller.serviceCounter.get(types.RoleApp))

// this service was not seen, so it should be 0.
require.Equal(t, uint64(0), controller.serviceCounter.get(types.RoleOkta))

auth.mu.Lock()
auth.lastInstance.AppendControlLog(types.InstanceControlLogEntry{
Type: "concurrent-test-event",
Expand Down Expand Up @@ -499,6 +506,14 @@ func TestInstanceHeartbeat(t *testing.T) {
)
}

// verify the service counter shows the correct number for the given services.
require.Equal(t, map[types.SystemRole]uint64{
types.RoleApp: 1,
types.RoleNode: 1,
}, controller.ConnectedServiceCounts())
require.Equal(t, uint64(1), controller.ConnectedServiceCount(types.RoleNode))
require.Equal(t, uint64(1), controller.ConnectedServiceCount(types.RoleApp))

// verify that none of the qualified events were ever heartbeat because
// a reset always occurred.
var unqualifiedIncludes int
Expand Down Expand Up @@ -542,6 +557,14 @@ func TestInstanceHeartbeat(t *testing.T) {
logSize := len(auth.lastInstance.GetControlLog())
auth.mu.Unlock()
require.Greater(t, logSize, 2)

// verify the service counter now shows no connected services.
require.Equal(t, map[types.SystemRole]uint64{
types.RoleApp: 0,
types.RoleNode: 0,
}, controller.ConnectedServiceCounts())
require.Equal(t, uint64(0), controller.ConnectedServiceCount(types.RoleNode))
require.Equal(t, uint64(0), controller.ConnectedServiceCount(types.RoleApp))
}

type eventOpts struct {
Expand Down
69 changes: 69 additions & 0 deletions lib/inventory/servicecounter.go
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also expose some sort of notification mechanism that triggers whenever some counts go from zero to non-zero and viceversa? I imagine it could be useful for other components in the auth server that depend on some agent to be connected but don't necessarily care about the exact number.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed a bit of this offline. This is useful but may not be immediately useful for the Okta service use case.

Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
Copyright 2023 Gravitational, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package inventory

import (
"sync"
"sync/atomic"

"github.com/gravitational/teleport/api/types"
)

// serviceCounter will count services seen in the inventory.
type serviceCounter struct {
countMap sync.Map
}

// counts returns the count of each service seen in the counter.
func (s *serviceCounter) counts() map[types.SystemRole]uint64 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth it to add a dedicated function to fetch the count for a single role?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was waffling about that. It makes sense, but it's also not a huge deal to just query the map. I'm ambivalent, I think. If you feel strongly I can add one.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that serviceCounter has a get method that does that already (although it could be made to not actually add to the map if there's no need for it).

I wouldn't bother with wiring it up for the gRPC service (making a map is nothing compared to the overhead of protobuf encoding and network transmission, and saving like 5 strings and integers worth of data is kinda useless) but it might be useful as an exposed ConnectedServiceCount(types.SystemRole) method in Controller. The first usecase for this, after all, is going to be about checking if there's any Okta agents connected, and we won't care about any other count.

counts := map[types.SystemRole]uint64{}
s.countMap.Range(func(key, value any) bool {
counts[key.(types.SystemRole)] = value.(*atomic.Uint64).Load()
return true
})

return counts
}

// increment will increment the counter for a service.
func (s *serviceCounter) increment(service types.SystemRole) {
s.load(service).Add(1)
}

// decrement will decrement the counter for a service.
func (s *serviceCounter) decrement(service types.SystemRole) {
// refer to the docs for atomic.AddUint64 for why this works as a decrement.
s.load(service).Add(^uint64(0))
}

// get will return the value of a counter for a service.
func (s *serviceCounter) get(service types.SystemRole) uint64 {
if result, ok := s.countMap.Load(service); ok {
return result.(*atomic.Uint64).Load()
}
return 0
}

// load will load the underlying atomic value in the sync map. This should
// only be used within the service counter.
func (s *serviceCounter) load(service types.SystemRole) *atomic.Uint64 {
if result, ok := s.countMap.Load(service); ok {
return result.(*atomic.Uint64)
}
result, _ := s.countMap.LoadOrStore(service, &atomic.Uint64{})
Comment thread
mdwn marked this conversation as resolved.
return result.(*atomic.Uint64)
}
45 changes: 45 additions & 0 deletions lib/inventory/servicecounter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2023 Gravitational, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package inventory

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/gravitational/teleport/api/types"
)

func TestServiceCounter(t *testing.T) {
sc := serviceCounter{}

require.Equal(t, uint64(0), sc.get(types.RoleAuth))

sc.increment(types.RoleApp)
require.Equal(t, uint64(1), sc.get(types.RoleApp))
sc.increment(types.RoleApp)
require.Equal(t, uint64(2), sc.get(types.RoleApp))

require.Equal(t, map[types.SystemRole]uint64{
types.RoleApp: 2,
}, sc.counts())

sc.decrement(types.RoleApp)
require.Equal(t, uint64(1), sc.get(types.RoleApp))
sc.decrement(types.RoleApp)
require.Equal(t, uint64(0), sc.get(types.RoleApp))
}