diff --git a/lib/auth/accessmonitoring/accessmonitoring.go b/lib/auth/accessmonitoring/accessmonitoring.go
new file mode 100644
index 0000000000000..6686b69918745
--- /dev/null
+++ b/lib/auth/accessmonitoring/accessmonitoring.go
@@ -0,0 +1,131 @@
+/*
+ * Teleport
+ * Copyright (C) 2025 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package accessmonitoring
+
+import (
+ "context"
+ "log/slog"
+
+ "github.com/gravitational/trace"
+
+ accessmonitoringrulesv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1"
+ "github.com/gravitational/teleport/api/types"
+ "github.com/gravitational/teleport/lib/accessmonitoring"
+ "github.com/gravitational/teleport/lib/accessmonitoring/review"
+ "github.com/gravitational/teleport/lib/backend"
+)
+
+// Client aggregates the parts of Teleport API client interface
+// (as implemented by github.com/gravitational/teleport/api/client.Client)
+// that are used by the access plugins.
+type Client interface {
+ types.Events
+ SubmitAccessReview(ctx context.Context, params types.AccessReviewSubmission) (types.AccessRequest, error)
+ ListAccessMonitoringRulesWithFilter(ctx context.Context, req *accessmonitoringrulesv1.ListAccessMonitoringRulesWithFilterRequest) ([]*accessmonitoringrulesv1.AccessMonitoringRule, string, error)
+ GetUser(ctx context.Context, name string, withSecrets bool) (types.User, error)
+}
+
+// Config specifies the access monitoring service configuration.
+type Config struct {
+ // Logger is the logger for the access monitoring serivce.
+ Logger *slog.Logger
+
+ // Backend should be a backend.Backend which can be used for obtaining the
+ // lock required to run the service.
+ Backend backend.Backend
+
+ // Client is the auth service client interface.
+ Client Client
+}
+
+// CheckAndSetDefaults checks and sets default config values.
+func (c *Config) CheckAndSetDefaults() error {
+ if c.Logger == nil {
+ c.Logger = slog.Default()
+ }
+ if c.Backend == nil {
+ return trace.BadParameter("backend: must be non-nil")
+ }
+ if c.Client == nil {
+ return trace.BadParameter("client: must be non-nil")
+ }
+ return nil
+}
+
+// AccessMonitoringService monitors access events and applies access monitoring
+// rules.
+type AccessMonitoringService struct {
+ cfg Config
+ monitor *accessmonitoring.AccessMonitor
+}
+
+// NewAccessMonitoringSerivce returns a new access monitoring service.
+func NewAccessMonitoringService(cfg Config) (*AccessMonitoringService, error) {
+ if err := cfg.CheckAndSetDefaults(); err != nil {
+ return nil, trace.Wrap(err, "failed to validate access monitoring service config")
+ }
+ return &AccessMonitoringService{
+ cfg: cfg,
+ }, nil
+}
+
+// Run the access monitoring service.
+func (s *AccessMonitoringService) Run(ctx context.Context) (err error) {
+ accessReviewHandler, err := review.NewHandler(review.Config{
+ Logger: s.cfg.Logger,
+ HandlerName: types.BuiltInAutomaticReview,
+ Client: s.cfg.Client,
+ })
+ if err != nil {
+ return trace.Wrap(err)
+ }
+
+ monitor, err := accessmonitoring.NewAccessMonitor(accessmonitoring.Config{
+ Logger: s.cfg.Logger,
+ Backend: s.cfg.Backend,
+ Events: s.cfg.Client,
+ })
+ if err != nil {
+ return trace.Wrap(err)
+ }
+
+ // Configure access review handlers.
+ monitor.AddAccessMonitoringRuleHandler(accessReviewHandler.HandleAccessMonitoringRule)
+ monitor.AddAccessRequestHandler(accessReviewHandler.HandleAccessRequest)
+ s.monitor = monitor
+
+ return trace.Wrap(s.tryAndCatch(ctx))
+}
+
+// tryAndCatch tries to run the access monitoring service and recovers from potential
+// panic by converting them into errors. This ensures that a critical bug in the
+// service cannot bring down the whole Teleport cluster.
+func (s *AccessMonitoringService) tryAndCatch(ctx context.Context) (err error) {
+ // If something terribly bad happens while running, we recover and return an error
+ defer func() {
+ if r := recover(); r != nil {
+ s.cfg.Logger.ErrorContext(ctx, "Recovered from panic in the access_monitoring_service",
+ "panic", r)
+ err = trace.NewAggregate(err, trace.Errorf("Panic recovered while running: %v", r))
+ }
+ }()
+
+ err = trace.Wrap(s.monitor.Run(ctx))
+ return
+}
diff --git a/lib/auth/accessmonitoring/accessmonitoring_test.go b/lib/auth/accessmonitoring/accessmonitoring_test.go
new file mode 100644
index 0000000000000..8a29b0bd93d47
--- /dev/null
+++ b/lib/auth/accessmonitoring/accessmonitoring_test.go
@@ -0,0 +1,267 @@
+/*
+ * Teleport
+ * Copyright (C) 2025 Gravitational, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package accessmonitoring
+
+import (
+ "context"
+ "errors"
+ "net"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+ "github.com/stretchr/testify/suite"
+
+ accessmonitoringrulesv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/accessmonitoringrules/v1"
+ headerv1 "github.com/gravitational/teleport/api/gen/proto/go/teleport/header/v1"
+ "github.com/gravitational/teleport/api/types"
+ "github.com/gravitational/teleport/lib/auth"
+ "github.com/gravitational/teleport/lib/modules"
+ "github.com/gravitational/teleport/lib/services"
+ "github.com/gravitational/teleport/lib/utils"
+)
+
+func TestMain(m *testing.M) {
+ modules.SetInsecureTestMode(true)
+ utils.InitLoggerForTests()
+ os.Exit(m.Run())
+}
+
+const (
+ adminRoleName = "admin-role"
+ requesterRoleName = "requester-role"
+ dynamicRoleName = "dynamic-role"
+
+ // admin-user is granted permissions to create access monitoring rules.
+ adminUserName = "admin-user"
+ // requester-user is granted permissions to create access requests for the
+ // dynamic-role.
+ requesterUserName = "requester-user"
+)
+
+func TestAccessMonitoringSuite(t *testing.T) {
+ suite.Run(t, &AccessMonitoringSuite{})
+}
+
+type AccessMonitoringSuite struct {
+ suite.Suite
+ srv *auth.TestTLSServer
+}
+
+func (s *AccessMonitoringSuite) SetupTest() {
+ t := s.T()
+ modules.SetTestModules(t, &modules.TestModules{TestBuildType: modules.BuildEnterprise})
+
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
+ t.Cleanup(cancel)
+
+ s.srv = newTestTLSServer(t)
+
+ // Setup system access review bot role and user.
+ _, err := s.srv.Auth().UpsertRole(ctx, services.NewSystemAutomaticAccessApproverRole())
+ require.NoError(t, err)
+
+ _, err = s.srv.Auth().UpsertUser(ctx, services.NewSystemAutomaticAccessBotUser())
+ require.NoError(t, err)
+
+ // Setup admin role and user
+ adminRole, err := types.NewRole(adminRoleName, types.RoleSpecV6{
+ Allow: types.RoleConditions{
+ Rules: []types.Rule{
+ types.NewRule(types.KindAccessMonitoringRule, services.RW()),
+ },
+ },
+ })
+ require.NoError(t, err)
+
+ _, err = s.srv.Auth().UpsertRole(ctx, adminRole)
+ require.NoError(t, err)
+
+ adminUser, err := types.NewUser(adminUserName)
+ require.NoError(t, err)
+
+ adminUser.SetRoles([]string{adminRoleName})
+ _, err = s.srv.Auth().UpsertUser(ctx, adminUser)
+ require.NoError(t, err)
+
+ // Setup requester role and user
+ requesterRole, err := types.NewRole(requesterRoleName, types.RoleSpecV6{
+ Allow: types.RoleConditions{
+ Request: &types.AccessRequestConditions{
+ Roles: []string{dynamicRoleName},
+ },
+ Rules: []types.Rule{
+ types.NewRule(types.KindAccessRequest, services.RW()),
+ },
+ },
+ })
+ require.NoError(t, err)
+
+ _, err = s.srv.Auth().UpsertRole(ctx, requesterRole)
+ require.NoError(t, err)
+
+ requesterUser, err := types.NewUser(requesterUserName)
+ require.NoError(t, err)
+
+ requesterUser.SetRoles([]string{requesterRoleName})
+ _, err = s.srv.Auth().UpsertUser(ctx, requesterUser)
+ require.NoError(t, err)
+
+ // Setup dynamic role
+ dynamicRole, err := types.NewRole(dynamicRoleName, types.RoleSpecV6{})
+ require.NoError(t, err)
+
+ _, err = s.srv.Auth().UpsertRole(ctx, dynamicRole)
+ require.NoError(t, err)
+}
+
+func (s *AccessMonitoringSuite) TestAccessRequestApproved() {
+ t := s.T()
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
+ t.Cleanup(cancel)
+
+ // Initialize and run access monitoring service
+ accessMonitoringService, err := NewAccessMonitoringService(Config{
+ Backend: s.srv.AuthServer.Backend,
+ Client: s.srv.Auth(),
+ })
+ require.NoError(t, err)
+ go func() { require.NoError(t, accessMonitoringService.Run(ctx)) }()
+
+ // Setup access monitoring rules
+ adminClient, err := s.srv.NewClient(auth.TestUser(adminUserName))
+ require.NoError(t, err)
+
+ rule := newApprovedRule("approve-dynamic-role", `
+ contains_all(set("dynamic-role"), access_request.spec.roles)`)
+
+ _, err = adminClient.AccessMonitoringRuleClient().CreateAccessMonitoringRule(ctx, rule)
+ require.NoError(t, err)
+
+ // Create access request
+ requesterClient, err := s.srv.NewClient(auth.TestUser(requesterUserName))
+ require.NoError(t, err)
+
+ req, err := services.NewAccessRequest(requesterUserName, dynamicRoleName)
+ require.NoError(t, err)
+
+ rr, err := requesterClient.CreateAccessRequestV2(ctx, req)
+ require.NoError(t, err)
+
+ require.EventuallyWithT(t, func(t *assert.CollectT) {
+ resp, err := s.srv.Auth().GetAccessRequests(ctx, types.AccessRequestFilter{
+ ID: rr.GetName(),
+ })
+ require.NoError(t, err)
+ require.Len(t, resp, 1)
+ require.Equal(t, types.RequestState_APPROVED, resp[0].GetState())
+ }, 10*time.Second, 100*time.Millisecond)
+}
+
+func (s *AccessMonitoringSuite) TestAccessRequestDenied() {
+ t := s.T()
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second*15)
+ t.Cleanup(cancel)
+
+ // Initialize and run access monitoring service
+ accessMonitoringService, err := NewAccessMonitoringService(Config{
+ Backend: s.srv.AuthServer.Backend,
+ Client: s.srv.Auth(),
+ })
+ require.NoError(t, err)
+ go func() { require.NoError(t, accessMonitoringService.Run(ctx)) }()
+
+ // Setup access monitoring rules
+ adminClient, err := s.srv.NewClient(auth.TestUser(adminUserName))
+ require.NoError(t, err)
+
+ rule := newDeniedRule("deny-dynamic-role", `
+ contains_all(set("dynamic-role"), access_request.spec.roles)`)
+
+ _, err = adminClient.AccessMonitoringRuleClient().CreateAccessMonitoringRule(ctx, rule)
+ require.NoError(t, err)
+
+ // Create access request
+ requesterClient, err := s.srv.NewClient(auth.TestUser(requesterUserName))
+ require.NoError(t, err)
+
+ req, err := services.NewAccessRequest(requesterUserName, dynamicRoleName)
+ require.NoError(t, err)
+
+ rr, err := requesterClient.CreateAccessRequestV2(ctx, req)
+ require.NoError(t, err)
+
+ require.EventuallyWithT(t, func(t *assert.CollectT) {
+ resp, err := s.srv.Auth().GetAccessRequests(ctx, types.AccessRequestFilter{
+ ID: rr.GetName(),
+ })
+ require.NoError(t, err)
+ require.Len(t, resp, 1)
+ require.Equal(t, types.RequestState_DENIED, resp[0].GetState())
+ }, 10*time.Second, 100*time.Millisecond)
+}
+
+func newTestTLSServer(t testing.TB) *auth.TestTLSServer {
+ as, err := auth.NewTestAuthServer(auth.TestAuthServerConfig{
+ Dir: t.TempDir(),
+ })
+ require.NoError(t, err)
+
+ srv, err := as.NewTestTLSServer()
+ require.NoError(t, err)
+
+ t.Cleanup(func() {
+ err := srv.Close()
+ if errors.Is(err, net.ErrClosed) {
+ return
+ }
+ require.NoError(t, err)
+ })
+
+ return srv
+}
+
+func newApprovedRule(name, condition string) *accessmonitoringrulesv1.AccessMonitoringRule {
+ return newReviewRule(name, condition, types.RequestState_APPROVED.String())
+}
+
+func newDeniedRule(name, condition string) *accessmonitoringrulesv1.AccessMonitoringRule {
+ return newReviewRule(name, condition, types.RequestState_DENIED.String())
+}
+
+func newReviewRule(name, condition, decision string) *accessmonitoringrulesv1.AccessMonitoringRule {
+ return &accessmonitoringrulesv1.AccessMonitoringRule{
+ Kind: types.KindAccessMonitoringRule,
+ Version: types.V1,
+ Metadata: &headerv1.Metadata{
+ Name: name,
+ },
+ Spec: &accessmonitoringrulesv1.AccessMonitoringRuleSpec{
+ Subjects: []string{types.KindAccessRequest},
+ Condition: condition,
+ DesiredState: types.AccessMonitoringRuleStateReviewed,
+ AutomaticReview: &accessmonitoringrulesv1.AutomaticReview{
+ Integration: types.BuiltInAutomaticReview,
+ Decision: decision,
+ },
+ },
+ }
+}
diff --git a/lib/service/service.go b/lib/service/service.go
index 180bed1322103..7e9e5547fbe68 100644
--- a/lib/service/service.go
+++ b/lib/service/service.go
@@ -87,6 +87,7 @@ import (
"github.com/gravitational/teleport/lib/agentless"
"github.com/gravitational/teleport/lib/auditd"
"github.com/gravitational/teleport/lib/auth"
+ "github.com/gravitational/teleport/lib/auth/accessmonitoring"
"github.com/gravitational/teleport/lib/auth/accesspoint"
"github.com/gravitational/teleport/lib/auth/authclient"
"github.com/gravitational/teleport/lib/auth/keygen"
@@ -2570,6 +2571,21 @@ func (process *TeleportProcess) initAuthService() error {
return trace.Wrap(err)
})
+ accessMonitoringService, err := accessmonitoring.NewAccessMonitoringService(accessmonitoring.Config{
+ Logger: logger.With(
+ teleport.ComponentKey,
+ teleport.Component(teleport.ComponentAuth, "access_monitoring_service")),
+ Backend: b,
+ Client: authServer,
+ })
+ if err != nil {
+ return trace.Wrap(err)
+ }
+ process.RegisterFunc("auth.access_monitoring_service", func() error {
+ return trace.Wrap(accessMonitoringService.Run(process.GracefulExitContext()),
+ "running access_monitoring_service")
+ })
+
// execute this when process is asked to exit:
process.OnExit("auth.shutdown", func(payload any) {
// The listeners have to be closed here, because if shutdown