Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/client/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/gravitational/trace"

"github.com/gravitational/teleport/api/client/proto"
accessmonitoringrulesv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
Expand Down Expand Up @@ -56,6 +57,10 @@ func EventToGRPC(in types.Event) (*proto.Event, error) {
out.Resource = &proto.Event_KubernetesWaitingContainer{
KubernetesWaitingContainer: r,
}
case *accessmonitoringrulesv1.AccessMonitoringRule:
out.Resource = &proto.Event_AccessMonitoringRule{
AccessMonitoringRule: r,
}
}
case *types.ResourceHeader:
out.Resource = &proto.Event_ResourceHeader{
Expand Down Expand Up @@ -464,6 +469,9 @@ func EventFromGRPC(in *proto.Event) (*types.Event, error) {
} else if r := in.GetKubernetesWaitingContainer(); r != nil {
out.Resource = types.Resource153ToLegacy(r)
return &out, nil
} else if r := in.GetAccessMonitoringRule(); r != nil {
out.Resource = types.Resource153ToLegacy(r)
return &out, nil
} else {
return nil, trace.BadParameter("received unsupported resource %T", in.Resource)
}
Expand Down
6 changes: 6 additions & 0 deletions integrations/access/common/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/gravitational/trace"

"github.com/gravitational/teleport/api/client"
accessmonitoringrulesv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
"github.com/gravitational/teleport/integrations/access/common/teleport"
Expand Down Expand Up @@ -57,6 +58,11 @@ func (w *wrappedClient) ListAccessLists(ctx context.Context, pageSize int, pageT
return w.Client.AccessListClient().ListAccessLists(ctx, pageSize, pageToken)
}

// ListAccessMonitoringRules lists current access monitoring rules.
func (w *wrappedClient) ListAccessMonitoringRules(ctx context.Context, limit int, startKey string) ([]*accessmonitoringrulesv1.AccessMonitoringRule, string, error) {
return w.Client.AccessMonitoringRulesClient().ListAccessMonitoringRules(ctx, limit, startKey)
}

// wrapAPIClient will wrap the API client such that it conforms to the Teleport plugin client interface.
func wrapAPIClient(clt *client.Client) teleport.Client {
return &wrappedClient{
Expand Down
2 changes: 2 additions & 0 deletions integrations/access/common/teleport/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"

"github.com/gravitational/teleport/api/client/proto"
accessmonitoringrulesv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
"github.com/gravitational/teleport/integrations/lib/plugindata"
Expand All @@ -39,4 +40,5 @@ type Client interface {
SetAccessRequestState(ctx context.Context, params types.AccessRequestUpdate) error
ListResources(ctx context.Context, req proto.ListResourcesRequest) (*types.ListResourcesResponse, error)
ListAccessLists(context.Context, int, string) ([]*accesslist.AccessList, string, error)
ListAccessMonitoringRules(ctx context.Context, limit int, startKey string) ([]*accessmonitoringrulesv1.AccessMonitoringRule, string, error)
}
1 change: 1 addition & 0 deletions lib/auth/accesspoint/accesspoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ func NewAccessCache(cfg AccessCacheConfig) (*cache.Cache, error) {
UserGroups: cfg.Services,
Okta: cfg.Services.OktaClient(),
AccessLists: cfg.Services.AccessListClient(),
AccessMonitoringRules: cfg.Services.AccessMonitoringRuleClient(),
SecReports: cfg.Services.SecReportsClient(),
UserLoginStates: cfg.Services.UserLoginStateClient(),
Integrations: cfg.Services,
Expand Down
6 changes: 6 additions & 0 deletions lib/auth/authclient/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"google.golang.org/grpc"

"github.com/gravitational/teleport/api/client/proto"
accessmonitoringrules "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1"
integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
Expand Down Expand Up @@ -1154,6 +1155,11 @@ type Cache interface {

// IntegrationsGetter defines read/list methods for integrations.
services.IntegrationsGetter

// ListAccessMonitoringRules returns a paginated list of access monitoring rules.
ListAccessMonitoringRules(ctx context.Context, limit int, startKey string) ([]*accessmonitoringrules.AccessMonitoringRule, string, error)
// GetAccessMonitoringRule returns the specified access monitoring rule.
GetAccessMonitoringRule(ctx context.Context, name string) (*accessmonitoringrules.AccessMonitoringRule, error)
}

type NodeWrapper struct {
Expand Down
12 changes: 12 additions & 0 deletions lib/auth/grpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (
authpb "github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/api/constants"
"github.com/gravitational/teleport/api/gen/proto/go/assist/v1"
accessmonitoringrules "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1"
auditlogpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/auditlog/v1"
clusterconfigpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/clusterconfig/v1"
dbobjectpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/dbobject/v1"
Expand All @@ -72,6 +73,7 @@ import (
"github.com/gravitational/teleport/api/types/installers"
"github.com/gravitational/teleport/api/types/wrappers"
apiutils "github.com/gravitational/teleport/api/utils"
"github.com/gravitational/teleport/lib/accessmonitoringrules/accessmonitoringrulesv1"
"github.com/gravitational/teleport/lib/auth/assist/assistv1"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/auth/clusterconfig/clusterconfigv1"
Expand Down Expand Up @@ -5961,6 +5963,16 @@ func NewGRPCServer(cfg GRPCServerConfig) (*GRPCServer, error) {
}
kubewaitingcontainerpb.RegisterKubeWaitingContainersServiceServer(server, kubeWaitingContsServer)

accessMonitoringRuleServer, err := accessmonitoringrulesv1.NewService(&accessmonitoringrulesv1.ServiceConfig{
Authorizer: cfg.Authorizer,
Backend: cfg.AuthServer.Services,
Cache: cfg.AuthServer.Cache,
})
if err != nil {
return nil, trace.Wrap(err)
}
accessmonitoringrules.RegisterAccessMonitoringRulesServiceServer(server, accessMonitoringRuleServer)

// Only register the service if this is an open source build. Enterprise builds
// register the actual service via an auth plugin, if we register here then all
// Enterprise builds would fail with a duplicate service registered error.
Expand Down
40 changes: 40 additions & 0 deletions lib/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
accessmonitoringrulesv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
"github.com/gravitational/teleport/api/internalutils/stream"
Expand Down Expand Up @@ -169,6 +170,7 @@ func ForAuth(cfg Config) Config {
{Kind: types.KindAccessListMember},
{Kind: types.KindAccessListReview},
{Kind: types.KindKubeWaitingContainer},
{Kind: types.KindAccessMonitoringRule},
}
cfg.QueueSize = defaults.AuthQueueSize
// We don't want to enable partial health for auth cache because auth uses an event stream
Expand Down Expand Up @@ -545,6 +547,7 @@ type Cache struct {
eventsFanout *services.FanoutV2
lowVolumeEventsFanout *utils.RoundRobin[*services.FanoutV2]
kubeWaitingContsCache *local.KubeWaitingContainerService
accessMontoringRuleCache services.AccessMonitoringRules

// closed indicates that the cache has been closed
closed atomic.Bool
Expand Down Expand Up @@ -709,6 +712,8 @@ type Config struct {
AccessLists services.AccessLists
// KubeWaitingContainers is the Kubernetes waiting container service.
KubeWaitingContainers services.KubeWaitingContainer
// AccessMonitoringRules is the access monitoring rules service.
AccessMonitoringRules services.AccessMonitoringRules
// Backend is a backend for local cache
Backend backend.Backend
// MaxRetryPeriod is the maximum period between cache retries on failures
Expand Down Expand Up @@ -908,6 +913,12 @@ func New(config Config) (*Cache, error) {
return nil, trace.Wrap(err)
}

accessMonitoringRuleCache, err := local.NewAccessMonitoringRulesService(config.Backend)
if err != nil {
cancel()
return nil, trace.Wrap(err)
}

fanout := services.NewFanoutV2(services.FanoutV2Config{})
lowVolumeFanouts := make([]*services.FanoutV2, 0, config.FanoutShards)
for i := 0; i < config.FanoutShards; i++ {
Expand Down Expand Up @@ -944,6 +955,7 @@ func New(config Config) (*Cache, error) {
webSessionCache: local.NewIdentityService(config.Backend).WebSessions(),
webTokenCache: local.NewIdentityService(config.Backend).WebTokens(),
windowsDesktopsCache: local.NewWindowsDesktopService(config.Backend),
accessMontoringRuleCache: accessMonitoringRuleCache,
samlIdPServiceProvidersCache: samlIdPServiceProvidersCache,
userGroupsCache: userGroupsCache,
oktaCache: oktaCache,
Expand Down Expand Up @@ -3065,6 +3077,34 @@ func (c *Cache) ListAccessListReviews(ctx context.Context, accessList string, pa
return rg.reader.ListAccessListReviews(ctx, accessList, pageSize, pageToken)
}

// ListAccessMonitoringRules returns a paginated list of access monitoring rules.
func (c *Cache) ListAccessMonitoringRules(ctx context.Context, pageSize int, nextToken string) ([]*accessmonitoringrulesv1.AccessMonitoringRule, string, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListAccessMonitoringRules")
defer span.End()

rg, err := readCollectionCache(c, c.collections.accessMonitoringRules)

if err != nil {
return nil, "", trace.Wrap(err)
}
defer rg.Release()
out, nextKey, err := rg.reader.ListAccessMonitoringRules(ctx, pageSize, nextToken)
return out, nextKey, trace.Wrap(err)
}

// GetAccessMonitoringRule returns the specified AccessMonitoringRule resources.
func (c *Cache) GetAccessMonitoringRule(ctx context.Context, name string) (*accessmonitoringrulesv1.AccessMonitoringRule, error) {
ctx, span := c.Tracer.Start(ctx, "cache/GetAccessMonitoringRule")
defer span.End()

rg, err := readCollectionCache(c, c.collections.accessMonitoringRules)
if err != nil {
return nil, trace.Wrap(err)
}
defer rg.Release()
return rg.reader.GetAccessMonitoringRule(ctx, name)
}

// ListResources is a part of auth.Cache implementation
func (c *Cache) ListResources(ctx context.Context, req proto.ListResourcesRequest) (*types.ListResourcesResponse, error) {
ctx, span := c.Tracer.Start(ctx, "cache/ListResources")
Expand Down
23 changes: 23 additions & 0 deletions lib/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
accessmonitoringrulesv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
"github.com/gravitational/teleport/api/types"
"github.com/gravitational/teleport/api/types/accesslist"
Expand Down Expand Up @@ -114,6 +115,7 @@ type testPack struct {
secReports services.SecReports
accessLists services.AccessLists
kubeWaitingContainers services.KubeWaitingContainer
accessMonitoringRules services.AccessMonitoringRules
}

// testFuncs are functions to support testing an object in a cache.
Expand Down Expand Up @@ -290,6 +292,11 @@ func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) {
return nil, trace.Wrap(err)
}
p.accessLists = accessListsSvc
accessMonitoringRuleService, err := local.NewAccessMonitoringRulesService(p.backend)
if err != nil {
return nil, trace.Wrap(err)
}
p.accessMonitoringRules = accessMonitoringRuleService

kubeWaitingContSvc, err := local.NewKubeWaitingContainerService(p.backend)
if err != nil {
Expand Down Expand Up @@ -339,6 +346,7 @@ func newPack(dir string, setupConfig func(c Config) Config, opts ...packOption)
SecReports: p.secReports,
AccessLists: p.accessLists,
KubeWaitingContainers: p.kubeWaitingContainers,
AccessMonitoringRules: p.accessMonitoringRules,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -738,6 +746,7 @@ func TestCompletenessInit(t *testing.T) {
SecReports: p.secReports,
AccessLists: p.accessLists,
KubeWaitingContainers: p.kubeWaitingContainers,
AccessMonitoringRules: p.accessMonitoringRules,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -811,6 +820,7 @@ func TestCompletenessReset(t *testing.T) {
SecReports: p.secReports,
AccessLists: p.accessLists,
KubeWaitingContainers: p.kubeWaitingContainers,
AccessMonitoringRules: p.accessMonitoringRules,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -996,6 +1006,7 @@ func TestListResources_NodesTTLVariant(t *testing.T) {
SecReports: p.secReports,
AccessLists: p.accessLists,
KubeWaitingContainers: p.kubeWaitingContainers,
AccessMonitoringRules: p.accessMonitoringRules,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
neverOK: true, // ensure reads are never healthy
Expand Down Expand Up @@ -1080,6 +1091,7 @@ func initStrategy(t *testing.T) {
SecReports: p.secReports,
AccessLists: p.accessLists,
KubeWaitingContainers: p.kubeWaitingContainers,
AccessMonitoringRules: p.accessMonitoringRules,
MaxRetryPeriod: 200 * time.Millisecond,
EventsC: p.eventsC,
}))
Expand Down Expand Up @@ -2936,6 +2948,7 @@ func TestCacheWatchKindExistsInEvents(t *testing.T) {
types.KindAccessListMember: newAccessListMember(t, "access-list", "member"),
types.KindAccessListReview: newAccessListReview(t, "access-list", "review"),
types.KindKubeWaitingContainer: newKubeWaitingContainer(t),
types.KindAccessMonitoringRule: types.Resource153ToLegacy(newAccessMonitoringRule(t)),
}

for name, cfg := range cases {
Expand Down Expand Up @@ -3387,6 +3400,16 @@ func newKubeWaitingContainer(t *testing.T) types.Resource {
return types.Resource153ToLegacy(waitingCont)
}

func newAccessMonitoringRule(t *testing.T) *accessmonitoringrulesv1.AccessMonitoringRule {
t.Helper()
notification := &accessmonitoringrulesv1.AccessMonitoringRule{
Spec: &accessmonitoringrulesv1.AccessMonitoringRuleSpec{
Notification: &accessmonitoringrulesv1.Notification{},
},
}
return notification
}

func withKeepalive[T any](fn func(context.Context, T) (*types.KeepAlive, error)) func(context.Context, T) error {
return func(ctx context.Context, resource T) error {
_, err := fn(ctx, resource)
Expand Down
56 changes: 56 additions & 0 deletions lib/cache/collections.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/gravitational/teleport/api/client"
"github.com/gravitational/teleport/api/client/proto"
apidefaults "github.com/gravitational/teleport/api/defaults"
accessmonitoringrulesv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1"
kubewaitingcontainerpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/kubewaitingcontainer/v1"
userspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/users/v1"
"github.com/gravitational/teleport/api/types"
Expand Down Expand Up @@ -234,6 +235,7 @@ type cacheCollections struct {
webTokens collectionReader[webTokenGetter]
windowsDesktops collectionReader[windowsDesktopsGetter]
windowsDesktopServices collectionReader[windowsDesktopServiceGetter]
accessMonitoringRules collectionReader[accessMonitoringRuleGetter]
}

// setupCollections returns a registry of collections.
Expand Down Expand Up @@ -675,6 +677,12 @@ func setupCollections(c *Cache, watches []types.WatchKind) (*cacheCollections, e
watch: watch,
}
collections.byKind[resourceKind] = collections.kubeWaitingContainers
case types.KindAccessMonitoringRule:
if c.AccessMonitoringRules == nil {
return nil, trace.BadParameter("missing parameter AccessMonitoringRule")
}
collections.accessMonitoringRules = &genericCollection[*accessmonitoringrulesv1.AccessMonitoringRule, accessMonitoringRuleGetter, accessMonitoringRulesExecutor]{cache: c, watch: watch}
collections.byKind[resourceKind] = collections.accessMonitoringRules
default:
return nil, trace.BadParameter("resource %q is not supported", watch.Kind)
}
Expand Down Expand Up @@ -2900,3 +2908,51 @@ func (accessListReviewExecutor) getReader(cache *Cache, cacheOK bool) accessList
type accessListReviewsGetter interface {
ListAccessListReviews(ctx context.Context, accessList string, pageSize int, pageToken string) (reviews []*accesslist.Review, nextToken string, err error)
}

type accessMonitoringRulesExecutor struct{}

func (accessMonitoringRulesExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]*accessmonitoringrulesv1.AccessMonitoringRule, error) {
var resources []*accessmonitoringrulesv1.AccessMonitoringRule
var nextToken string
for {
var page []*accessmonitoringrulesv1.AccessMonitoringRule
var err error
page, nextToken, err = cache.AccessMonitoringRules.ListAccessMonitoringRules(ctx, 0 /* page size */, nextToken)
if err != nil {
return nil, trace.Wrap(err)
}
resources = append(resources, page...)

if nextToken == "" {
break
}
}
return resources, nil
}

func (accessMonitoringRulesExecutor) upsert(ctx context.Context, cache *Cache, resource *accessmonitoringrulesv1.AccessMonitoringRule) error {
_, err := cache.accessMontoringRuleCache.UpsertAccessMonitoringRule(ctx, resource)
return trace.Wrap(err)
}

func (accessMonitoringRulesExecutor) deleteAll(ctx context.Context, cache *Cache) error {
return cache.accessMontoringRuleCache.DeleteAllAccessMonitoringRules(ctx)
}

func (accessMonitoringRulesExecutor) delete(ctx context.Context, cache *Cache, resource types.Resource) error {
return cache.accessMontoringRuleCache.DeleteAccessMonitoringRule(ctx, resource.GetName())
}

func (accessMonitoringRulesExecutor) isSingleton() bool { return false }

func (accessMonitoringRulesExecutor) getReader(cache *Cache, cacheOK bool) accessMonitoringRuleGetter {
if cacheOK {
return cache.accessMontoringRuleCache
}
return cache.Config.AccessMonitoringRules
}

type accessMonitoringRuleGetter interface {
GetAccessMonitoringRule(ctx context.Context, name string) (*accessmonitoringrulesv1.AccessMonitoringRule, error)
ListAccessMonitoringRules(ctx context.Context, limit int, startKey string) ([]*accessmonitoringrulesv1.AccessMonitoringRule, string, error)
}
Loading