diff --git a/api/client/client.go b/api/client/client.go index 90bd7e25d5889..5e4380cce2d06 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -47,6 +47,7 @@ import ( "github.com/gravitational/teleport/api/constants" "github.com/gravitational/teleport/api/defaults" devicepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/devicetrust/v1" + integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1" kubeproto "github.com/gravitational/teleport/api/gen/proto/go/teleport/kube/v1" loginrulepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/loginrule/v1" oktapb "github.com/gravitational/teleport/api/gen/proto/go/teleport/okta/v1" @@ -3349,6 +3350,87 @@ func (c *Client) DeleteAllUserGroups(ctx context.Context) error { return nil } +// integrationsClient returns an unadorned Integration client, using the underlying +// Auth gRPC connection. +func (c *Client) integrationsClient() integrationpb.IntegrationServiceClient { + return integrationpb.NewIntegrationServiceClient(c.conn) +} + +// ListIntegrations returns a paginated list of Integrations. +// The response includes a nextKey which must be used to fetch the next page. +func (c *Client) ListIntegrations(ctx context.Context, pageSize int, nextKey string) ([]types.Integration, string, error) { + resp, err := c.integrationsClient().ListIntegrations(ctx, &integrationpb.ListIntegrationsRequest{ + Limit: int32(pageSize), + NextKey: nextKey, + }) + if err != nil { + return nil, "", trail.FromGRPC(err) + } + + integrations := make([]types.Integration, 0, len(resp.GetIntegrations())) + for _, ig := range resp.GetIntegrations() { + integrations = append(integrations, ig) + } + + return integrations, resp.GetNextKey(), nil +} + +// GetIntegration returns an Integration by its name. +func (c *Client) GetIntegration(ctx context.Context, name string) (types.Integration, error) { + ig, err := c.integrationsClient().GetIntegration(ctx, &integrationpb.GetIntegrationRequest{ + Name: name, + }) + if err != nil { + return nil, trail.FromGRPC(err) + } + + return ig, nil +} + +// CreateIntegration creates a new Integration. +func (c *Client) CreateIntegration(ctx context.Context, ig types.Integration) (types.Integration, error) { + igV1, ok := ig.(*types.IntegrationV1) + if !ok { + return nil, trace.BadParameter("unsupported integration type %T", ig) + } + + ig, err := c.integrationsClient().CreateIntegration(ctx, &integrationpb.CreateIntegrationRequest{Integration: igV1}) + if err != nil { + return nil, trail.FromGRPC(err) + } + + return ig, nil +} + +// UpdateIntegration updates an existing Integration. +func (c *Client) UpdateIntegration(ctx context.Context, ig types.Integration) (types.Integration, error) { + igV1, ok := ig.(*types.IntegrationV1) + if !ok { + return nil, trace.BadParameter("unsupported integration type %T", ig) + } + + ig, err := c.integrationsClient().UpdateIntegration(ctx, &integrationpb.UpdateIntegrationRequest{Integration: igV1}) + if err != nil { + return nil, trail.FromGRPC(err) + } + + return ig, nil +} + +// DeleteIntegration removes an Integration by its name. +func (c *Client) DeleteIntegration(ctx context.Context, name string) error { + _, err := c.integrationsClient().DeleteIntegration(ctx, &integrationpb.DeleteIntegrationRequest{ + Name: name, + }) + return trail.FromGRPC(err) +} + +// DeleteAllIntegrations removes all Integrations. +func (c *Client) DeleteAllIntegrations(ctx context.Context) error { + _, err := c.integrationsClient().DeleteAllIntegrations(ctx, &integrationpb.DeleteAllIntegrationsRequest{}) + return trail.FromGRPC(err) +} + // GetLoginRule retrieves a login rule described by name. func (c *Client) GetLoginRule(ctx context.Context, name string) (*loginrulepb.LoginRule, error) { rule, err := c.LoginRuleClient().GetLoginRule(ctx, &loginrulepb.GetLoginRuleRequest{ diff --git a/api/client/events.go b/api/client/events.go index 2be0de813f3f9..6195647b392b8 100644 --- a/api/client/events.go +++ b/api/client/events.go @@ -195,6 +195,10 @@ func EventToGRPC(in types.Event) (*proto.Event, error) { out.Resource = &proto.Event_OktaAssignment{ OktaAssignment: r, } + case *types.IntegrationV1: + out.Resource = &proto.Event_Integration{ + Integration: r, + } default: return nil, trace.BadParameter("resource type %T is not supported", in.Resource) } @@ -341,6 +345,9 @@ func EventFromGRPC(in proto.Event) (*types.Event, error) { } else if r := in.GetOktaAssignment(); r != nil { out.Resource = r return &out, nil + } else if r := in.GetIntegration(); r != nil { + out.Resource = r + return &out, nil } else { return nil, trace.BadParameter("received unsupported resource %T", in.Resource) } diff --git a/api/types/constants.go b/api/types/constants.go index 8c995b11bcb0d..46317efbcbfb1 100644 --- a/api/types/constants.go +++ b/api/types/constants.go @@ -391,6 +391,10 @@ const ( // VerbEnroll allows enrollment of trusted devices. // Device Trust is a Teleport Enterprise feature. VerbEnroll = "enroll" + + // VerbUse allows the usage of an Integration. + // Roles with this verb can issue API calls using the integration. + VerbUse = "use" ) const ( diff --git a/lib/auth/api.go b/lib/auth/api.go index 023a26ede1bd3..5af97e42ab8dd 100644 --- a/lib/auth/api.go +++ b/lib/auth/api.go @@ -1007,6 +1007,9 @@ type Cache interface { ListSAMLIdPServiceProviders(ctx context.Context, pageSize int, nextKey string) ([]types.SAMLIdPServiceProvider, string, error) // GetSAMLIdPServiceProvider returns the specified SAML IdP service provider resources. GetSAMLIdPServiceProvider(ctx context.Context, name string) (types.SAMLIdPServiceProvider, error) + + // IntegrationsGetter defines read/list methods for integrations. + services.IntegrationsGetter } type NodeWrapper struct { diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 429985b32570f..8c62d078b659c 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -219,6 +219,12 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) { return nil, trace.Wrap(err) } } + if cfg.Integrations == nil { + cfg.Integrations, err = local.NewIntegrationsService(cfg.Backend) + if err != nil { + return nil, trace.Wrap(err) + } + } limiter, err := limiter.NewConnectionsLimiter(limiter.Config{ MaxConnections: defaults.LimiterMaxConcurrentSignatures, @@ -267,6 +273,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) { UserGroups: cfg.UserGroups, SessionTrackerService: cfg.SessionTrackerService, ConnectionsDiagnostic: cfg.ConnectionsDiagnostic, + Integrations: cfg.Integrations, StatusInternal: cfg.Status, UsageReporter: cfg.UsageReporter, @@ -361,6 +368,7 @@ type Services struct { services.SessionTrackerService services.ConnectionsDiagnostic services.StatusInternal + services.Integrations usagereporter.UsageReporter types.Events events.IAuditLog diff --git a/lib/auth/auth_with_roles.go b/lib/auth/auth_with_roles.go index 515d586a16857..9e82731f247c1 100644 --- a/lib/auth/auth_with_roles.go +++ b/lib/auth/auth_with_roles.go @@ -37,6 +37,7 @@ import ( "github.com/gravitational/teleport/api/constants" apidefaults "github.com/gravitational/teleport/api/defaults" devicepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/devicetrust/v1" + integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1" loginrulepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/loginrule/v1" pluginspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/plugins/v1" samlidppb "github.com/gravitational/teleport/api/gen/proto/go/teleport/samlidp/v1" @@ -45,6 +46,7 @@ import ( apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/api/types/wrappers" apiutils "github.com/gravitational/teleport/api/utils" + "github.com/gravitational/teleport/lib/auth/integration/integrationv1" "github.com/gravitational/teleport/lib/authz" "github.com/gravitational/teleport/lib/backend" "github.com/gravitational/teleport/lib/defaults" @@ -299,6 +301,120 @@ func (a *ServerWithRoles) SAMLIdPClient() samlidppb.SAMLIdPServiceClient { panic("SAMLIdPClient not implemented by ServerWithRoles") } +// integrationsService returns an Integrations Service. +func (a *ServerWithRoles) integrationsService() (*integrationv1.Service, error) { + igSvc, err := integrationv1.NewService(&integrationv1.ServiceConfig{ + Authorizer: authz.AuthorizerFunc(func(context.Context) (*authz.Context, error) { + return &a.context, nil + }), + Cache: a.authServer.Cache, + Backend: a.authServer.Services, + }) + if err != nil { + return nil, trace.Wrap(err) + } + + return igSvc, nil +} + +// CreateIntegration creates an Integration. +func (a *ServerWithRoles) CreateIntegration(ctx context.Context, ig types.Integration) (types.Integration, error) { + igSvc, err := a.integrationsService() + if err != nil { + return nil, trace.Wrap(err) + } + + igv1, ok := ig.(*types.IntegrationV1) + if !ok { + return nil, trace.BadParameter("unexpected integration type %T", ig) + } + + ig, err = igSvc.CreateIntegration(ctx, &integrationpb.CreateIntegrationRequest{Integration: igv1}) + if err != nil { + return nil, trace.Wrap(err) + } + return ig, nil +} + +// GetIntegration returns an Integration by its name. +func (a *ServerWithRoles) GetIntegration(ctx context.Context, name string) (types.Integration, error) { + igSvc, err := a.integrationsService() + if err != nil { + return nil, trace.Wrap(err) + } + + ig, err := igSvc.GetIntegration(ctx, &integrationpb.GetIntegrationRequest{Name: name}) + if err != nil { + return nil, trace.Wrap(err) + } + return ig, nil +} + +// ListIntegrations returns a list of Integrations. +// A next page can be retreived by calling ListIntegrations again and passing the nextKey from the previous response. +func (a *ServerWithRoles) ListIntegrations(ctx context.Context, pageSize int, nextKey string) ([]types.Integration, string, error) { + igSvc, err := a.integrationsService() + if err != nil { + return nil, "", trace.Wrap(err) + } + + resp, err := igSvc.ListIntegrations(ctx, &integrationpb.ListIntegrationsRequest{ + Limit: int32(pageSize), + NextKey: nextKey, + }) + if err != nil { + return nil, "", trace.Wrap(err) + } + + integrations := make([]types.Integration, 0, len(resp.GetIntegrations())) + for _, ig := range resp.GetIntegrations() { + integrations = append(integrations, ig) + } + + return integrations, resp.GetNextKey(), nil +} + +// UpdateIntegration updates an Integration. +func (a *ServerWithRoles) UpdateIntegration(ctx context.Context, ig types.Integration) (types.Integration, error) { + igSvc, err := a.integrationsService() + if err != nil { + return nil, trace.Wrap(err) + } + + igv1, ok := ig.(*types.IntegrationV1) + if !ok { + return nil, trace.BadParameter("unexpected integration type %T", ig) + } + + ig, err = igSvc.UpdateIntegration(ctx, &integrationpb.UpdateIntegrationRequest{Integration: igv1}) + if err != nil { + return nil, trace.Wrap(err) + } + return ig, nil +} + +// DeleteAllIntegrations deletes all integrations. +func (a *ServerWithRoles) DeleteAllIntegrations(ctx context.Context) error { + igSvc, err := a.integrationsService() + if err != nil { + return trace.Wrap(err) + } + + _, err = igSvc.DeleteAllIntegrations(ctx, &integrationpb.DeleteAllIntegrationsRequest{}) + return trace.Wrap(err) +} + +// DeleteIntegration deletes an integration integrations. +func (a *ServerWithRoles) DeleteIntegration(ctx context.Context, name string) error { + igSvc, err := a.integrationsService() + if err != nil { + return trace.Wrap(err) + } + + _, err = igSvc.DeleteIntegration(ctx, &integrationpb.DeleteIntegrationRequest{Name: name}) + return trace.Wrap(err) +} + // CreateSessionTracker creates a tracker resource for an active session. func (a *ServerWithRoles) CreateSessionTracker(ctx context.Context, tracker types.SessionTracker) (types.SessionTracker, error) { if err := a.serverAction(); err != nil { diff --git a/lib/auth/clt.go b/lib/auth/clt.go index c0df467789509..4ef5dc00343d6 100644 --- a/lib/auth/clt.go +++ b/lib/auth/clt.go @@ -640,6 +640,7 @@ type ClientI interface { services.SessionTrackerService services.ConnectionsDiagnostic services.SAMLIdPSession + services.Integrations types.Events types.WebSessionsGetter diff --git a/lib/auth/grpcserver.go b/lib/auth/grpcserver.go index 6e5034ab7bac4..4d1f8ae8ebd20 100644 --- a/lib/auth/grpcserver.go +++ b/lib/auth/grpcserver.go @@ -44,12 +44,14 @@ import ( "github.com/gravitational/teleport/api/client" "github.com/gravitational/teleport/api/client/proto" "github.com/gravitational/teleport/api/constants" + integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1" oktapb "github.com/gravitational/teleport/api/gen/proto/go/teleport/okta/v1" "github.com/gravitational/teleport/api/metadata" "github.com/gravitational/teleport/api/types" apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/api/types/installers" "github.com/gravitational/teleport/api/types/wrappers" + integrationService "github.com/gravitational/teleport/lib/auth/integration/integrationv1" "github.com/gravitational/teleport/lib/auth/okta" wanlib "github.com/gravitational/teleport/lib/auth/webauthn" "github.com/gravitational/teleport/lib/authz" @@ -5038,6 +5040,16 @@ func NewGRPCServer(cfg GRPCServerConfig) (*GRPCServer, error) { } oktapb.RegisterOktaServiceServer(server, oktaServiceServer) + integrationServiceServer, err := integrationService.NewService(&integrationService.ServiceConfig{ + Authorizer: cfg.Authorizer, + Backend: cfg.AuthServer.Services, + Cache: cfg.AuthServer.Cache, + }) + if err != nil { + return nil, trace.Wrap(err) + } + integrationpb.RegisterIntegrationServiceServer(server, integrationServiceServer) + return authServer, nil } diff --git a/lib/auth/init.go b/lib/auth/init.go index 321103d2ac9b0..1fc863d95bfb8 100644 --- a/lib/auth/init.go +++ b/lib/auth/init.go @@ -188,6 +188,9 @@ type InitConfig struct { // UserGroups is a service that manages user groups. UserGroups services.UserGroups + // Integrations is a service that manages Integrations. + Integrations services.Integrations + // SessionTrackerService is a service that manages trackers for all active sessions. SessionTrackerService services.SessionTrackerService diff --git a/lib/authz/permissions.go b/lib/authz/permissions.go index 3a7a7481870d1..645cfd576832e 100644 --- a/lib/authz/permissions.go +++ b/lib/authz/permissions.go @@ -523,6 +523,7 @@ func roleSpecForProxy(clusterName string) types.RoleSpecV6 { types.NewRule(types.KindDatabaseService, services.RO()), types.NewRule(types.KindSAMLIdPServiceProvider, services.RO()), types.NewRule(types.KindUserGroup, services.RO()), + types.NewRule(types.KindIntegration, services.RO()), // this rule allows local proxy to update the remote cluster's host certificate authorities // during certificates renewal { diff --git a/lib/cache/cache.go b/lib/cache/cache.go index 06df8b9e82219..15e3823b6cfb3 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -114,6 +114,7 @@ func ForAuth(cfg Config) Config { {Kind: types.KindUserGroup}, {Kind: types.KindOktaImportRule}, {Kind: types.KindOktaAssignment}, + {Kind: types.KindIntegration}, } cfg.QueueSize = defaults.AuthQueueSize return cfg @@ -457,6 +458,7 @@ type Cache struct { samlIdPServiceProvidersCache services.SAMLIdPServiceProviders //nolint:revive // Because we want this to be IdP. userGroupsCache services.UserGroups oktaCache services.Okta + integrationsCache services.Integrations eventsFanout *services.FanoutSet // closed indicates that the cache has been closed @@ -524,6 +526,7 @@ func (c *Cache) read() (readGuard, error) { samlIdPServiceProviders: c.samlIdPServiceProvidersCache, userGroups: c.userGroupsCache, okta: c.oktaCache, + integrations: c.integrationsCache, }, nil } c.rw.RUnlock() @@ -549,6 +552,7 @@ func (c *Cache) read() (readGuard, error) { samlIdPServiceProviders: c.Config.SAMLIdPServiceProviders, userGroups: c.Config.UserGroups, okta: c.Config.Okta, + integrations: c.Config.Integrations, release: nil, }, nil } @@ -579,6 +583,7 @@ type readGuard struct { samlIdPServiceProviders services.SAMLIdPServiceProviders //nolint:revive // Because we want this to be IdP. userGroups services.UserGroups okta services.Okta + integrations services.Integrations release func() released bool } @@ -652,6 +657,8 @@ type Config struct { UserGroups services.UserGroups // Okta is an Okta service. Okta services.Okta + // Integrations is an Integrations service. + Integrations services.Integrations // Backend is a backend for local cache Backend backend.Backend // MaxRetryPeriod is the maximum period between cache retries on failures @@ -798,6 +805,12 @@ func New(config Config) (*Cache, error) { return nil, trace.Wrap(err) } + integrationsCache, err := local.NewIntegrationsService(config.Backend) + if err != nil { + cancel() + return nil, trace.Wrap(err) + } + cs := &Cache{ ctx: ctx, cancel: cancel, @@ -825,6 +838,7 @@ func New(config Config) (*Cache, error) { samlIdPServiceProvidersCache: samlIdPServiceProvidersCache, userGroupsCache: userGroupsCache, oktaCache: oktaCache, + integrationsCache: integrationsCache, eventsFanout: services.NewFanoutSet(), Logger: log.WithFields(log.Fields{ trace.Component: config.Component, @@ -2378,6 +2392,32 @@ func (c *Cache) GetOktaAssignment(ctx context.Context, name string) (types.OktaA return rg.okta.GetOktaAssignment(ctx, name) } +// ListIntegrations returns a paginated list of all Integrations resources. +func (c *Cache) ListIntegrations(ctx context.Context, pageSize int, nextKey string) ([]types.Integration, string, error) { + ctx, span := c.Tracer.Start(ctx, "cache/ListIntegrations") + defer span.End() + + rg, err := c.read() + if err != nil { + return nil, "", trace.Wrap(err) + } + defer rg.Release() + return rg.integrations.ListIntegrations(ctx, pageSize, nextKey) +} + +// GetIntegration returns the specified Integration resources. +func (c *Cache) GetIntegration(ctx context.Context, name string) (types.Integration, error) { + ctx, span := c.Tracer.Start(ctx, "cache/GetIntegration") + defer span.End() + + rg, err := c.read() + if err != nil { + return nil, trace.Wrap(err) + } + defer rg.Release() + return rg.integrations.GetIntegration(ctx, name) +} + // ListResources is a part of auth.Cache implementation func (c *Cache) ListResources(ctx context.Context, req proto.ListResourcesRequest) (*types.ListResourcesResponse, error) { ctx, span := c.Tracer.Start(ctx, "cache/ListResources") diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index b0bdd7367ca63..b690fc304988d 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -86,6 +86,7 @@ type testPack struct { samlIDPServiceProviders services.SAMLIdPServiceProviders userGroups services.UserGroups okta services.Okta + integrations services.Integrations } func (t *testPack) Close() { @@ -215,6 +216,12 @@ func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) { } p.okta = oktaSvc + igSvc, err := local.NewIntegrationsService(p.backend) + if err != nil { + return nil, trace.Wrap(err) + } + p.integrations = igSvc + return p, nil } @@ -251,6 +258,7 @@ func newPack(dir string, setupConfig func(c Config) Config, opts ...packOption) SAMLIdPServiceProviders: p.samlIDPServiceProviders, UserGroups: p.userGroups, Okta: p.okta, + Integrations: p.integrations, MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) @@ -638,6 +646,7 @@ func TestCompletenessInit(t *testing.T) { SAMLIdPServiceProviders: p.samlIDPServiceProviders, UserGroups: p.userGroups, Okta: p.okta, + Integrations: p.integrations, MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) @@ -705,6 +714,7 @@ func TestCompletenessReset(t *testing.T) { SAMLIdPServiceProviders: p.samlIDPServiceProviders, UserGroups: p.userGroups, Okta: p.okta, + Integrations: p.integrations, MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) @@ -884,6 +894,7 @@ func TestListResources_NodesTTLVariant(t *testing.T) { SAMLIdPServiceProviders: p.samlIDPServiceProviders, UserGroups: p.userGroups, Okta: p.okta, + Integrations: p.integrations, MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, neverOK: true, // ensure reads are never healthy @@ -961,6 +972,7 @@ func initStrategy(t *testing.T) { SAMLIdPServiceProviders: p.samlIDPServiceProviders, UserGroups: p.userGroups, Okta: p.okta, + Integrations: p.integrations, MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) @@ -2658,6 +2670,43 @@ func TestOktaAssignments(t *testing.T) { }) } +// TestIntegrations tests that CRUD operations on integrations resources are +// replicated from the backend to the cache. +func TestIntegrations(t *testing.T) { + t.Parallel() + + p := newTestPack(t, ForAuth) + t.Cleanup(p.Close) + + testResources(t, p, testFuncs[types.Integration]{ + newResource: func(name string) (types.Integration, error) { + return types.NewIntegrationAWSOIDC( + types.Metadata{Name: name}, + &types.AWSOIDCIntegrationSpecV1{ + RoleARN: "arn:aws:iam::123456789012:role/OpsTeam", + }, + ) + }, + create: func(ctx context.Context, i types.Integration) error { + _, err := p.integrations.CreateIntegration(ctx, i) + return err + }, + list: func(ctx context.Context) ([]types.Integration, error) { + results, _, err := p.integrations.ListIntegrations(ctx, 0, "") + return results, err + }, + cacheList: func(ctx context.Context) ([]types.Integration, error) { + results, _, err := p.cache.ListIntegrations(ctx, 0, "") + return results, err + }, + update: func(ctx context.Context, i types.Integration) error { + _, err := p.integrations.UpdateIntegration(ctx, i) + return err + }, + deleteAll: p.integrations.DeleteAllIntegrations, + }) +} + // testResources is a generic tester for resources. func testResources[T types.Resource](t *testing.T, p *testPack, funcs testFuncs[T]) { ctx := context.Background() @@ -3049,6 +3098,7 @@ func TestCacheWatchKindExistsInEvents(t *testing.T) { types.KindUserGroup: &types.UserGroupV1{}, types.KindOktaImportRule: &types.OktaImportRuleV1{}, types.KindOktaAssignment: &types.OktaAssignmentV1{}, + types.KindIntegration: &types.IntegrationV1{}, } for name, cfg := range cases { diff --git a/lib/cache/collections.go b/lib/cache/collections.go index 56ac7fe6fab1f..27b119ee365a9 100644 --- a/lib/cache/collections.go +++ b/lib/cache/collections.go @@ -261,6 +261,11 @@ func setupCollections(c *Cache, watches []types.WatchKind) (map[resourceKind]col return nil, trace.BadParameter("missing parameter Okta") } collections[resourceKind] = &oktaAssignments{watch: watch, Cache: c} + case types.KindIntegration: + if c.Integrations == nil { + return nil, trace.BadParameter("missing parameter Integrations") + } + collections[resourceKind] = &integrations{watch: watch, Cache: c} default: return nil, trace.BadParameter("resource %q is not supported", watch.Kind) } @@ -3185,3 +3190,86 @@ func (s *oktaAssignments) processEvent(ctx context.Context, event types.Event) e func (s *oktaAssignments) watchKind() types.WatchKind { return s.watch } + +type integrations struct { + *Cache + watch types.WatchKind +} + +func (s *integrations) erase(ctx context.Context) error { + err := s.integrationsCache.DeleteAllIntegrations(ctx) + if err != nil && !trace.IsNotFound(err) { + return trace.Wrap(err) + } + return nil +} + +func (s *integrations) fetch(ctx context.Context) (apply func(ctx context.Context) error, err error) { + var ( + startKey string + resources []types.Integration + ) + for { + var igs []types.Integration + var err error + igs, startKey, err = s.Integrations.ListIntegrations(ctx, 0, startKey) + if err != nil { + return nil, trace.Wrap(err) + } + + resources = append(resources, igs...) + + if startKey == "" { + break + } + } + + return func(ctx context.Context) error { + if err := s.erase(ctx); err != nil { + return trace.Wrap(err) + } + + for _, resource := range resources { + _, err = s.integrationsCache.CreateIntegration(ctx, resource) + if trace.IsAlreadyExists(err) { + _, err = s.integrationsCache.UpdateIntegration(ctx, resource) + } + if err != nil { + return trace.Wrap(err) + } + } + return nil + }, nil +} + +func (s *integrations) processEvent(ctx context.Context, event types.Event) error { + switch event.Type { + case types.OpDelete: + err := s.integrationsCache.DeleteIntegration(ctx, event.Resource.GetName()) + if err != nil { + // Resource could be missing in the cache expired or not created, + // if the first consumed event is delete. + if !trace.IsNotFound(err) { + s.Logger.WithError(err).Warn("Failed to delete resource.") + return trace.Wrap(err) + } + } + case types.OpPut: + resource, ok := event.Resource.(types.Integration) + if !ok { + return trace.BadParameter("unexpected type %T", event.Resource) + } + _, err := s.integrationsCache.CreateIntegration(ctx, resource) + if trace.IsAlreadyExists(err) { + _, err = s.integrationsCache.UpdateIntegration(ctx, resource) + } + return trace.Wrap(err) + default: + s.Logger.WithField("event", event.Type).Warn("Skipping unsupported event type.") + } + return nil +} + +func (s *integrations) watchKind() types.WatchKind { + return s.watch +} diff --git a/lib/service/service.go b/lib/service/service.go index a4f67d0a1b8a4..23bcfed167917 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -1961,6 +1961,7 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca SAMLIdPServiceProviders: cfg.services, UserGroups: cfg.services, Okta: cfg.services.OktaClient(), + Integrations: cfg.services, WebSession: cfg.services.WebSessions(), WebToken: cfg.services.WebTokens(), Component: teleport.Component(append(cfg.cacheName, process.id, teleport.ComponentCache)...), diff --git a/lib/services/local/events.go b/lib/services/local/events.go index 6611786f7ea35..6bef5ea19a72c 100644 --- a/lib/services/local/events.go +++ b/lib/services/local/events.go @@ -156,6 +156,8 @@ func (e *EventsService) NewWatcher(ctx context.Context, watch types.Watch) (type parser = newOktaImportRuleParser() case types.KindOktaAssignment: parser = newOktaAssignmentParser() + case types.KindIntegration: + parser = newIntegrationParser() default: return nil, trace.BadParameter("watcher on object kind %q is not supported", kind.Kind) } @@ -1538,6 +1540,30 @@ func (p *oktaAssignmentParser) parse(event backend.Event) (types.Resource, error } } +func newIntegrationParser() *integrationParser { + return &integrationParser{ + baseParser: newBaseParser(backend.Key(integrationsPrefix)), + } +} + +type integrationParser struct { + baseParser +} + +func (p *integrationParser) parse(event backend.Event) (types.Resource, error) { + switch event.Type { + case types.OpDelete: + return resourceHeader(event, types.KindIntegration, types.V1, 0) + case types.OpPut: + return services.UnmarshalIntegration(event.Item.Value, + services.WithResourceID(event.Item.ID), + services.WithExpires(event.Item.Expires), + ) + default: + return nil, trace.BadParameter("event %v is not supported", event.Type) + } +} + func resourceHeader(event backend.Event, kind, version string, offset int) (types.Resource, error) { name, err := base(event.Item.Key, offset) if err != nil { diff --git a/lib/services/presets.go b/lib/services/presets.go index ccbd8850baeb5..752eb090b4c19 100644 --- a/lib/services/presets.go +++ b/lib/services/presets.go @@ -84,6 +84,7 @@ func NewPresetEditorRole() types.Role { types.NewRule(types.KindOktaImportRule, RW()), types.NewRule(types.KindOktaAssignment, RW()), types.NewRule(types.KindPlugin, RW()), + types.NewRule(types.KindIntegration, append(RW(), types.VerbUse)), // Please see defaultAllowRules when adding a new rule. }, }, @@ -203,6 +204,7 @@ func defaultAllowRules() map[string][]types.Rule { types.NewRule(types.KindOktaImportRule, RW()), types.NewRule(types.KindOktaAssignment, RW()), types.NewRule(types.KindPlugin, RW()), + types.NewRule(types.KindIntegration, append(RW(), types.VerbUse)), }, } } diff --git a/lib/services/services.go b/lib/services/services.go index af615d6865763..079807b27031b 100644 --- a/lib/services/services.go +++ b/lib/services/services.go @@ -43,6 +43,7 @@ type Services interface { WindowsDesktops SAMLIdPServiceProviders UserGroups + Integrations OktaClient() Okta }