diff --git a/api/client/client.go b/api/client/client.go index 50defde257c0d..f86e420aca234 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -47,6 +47,7 @@ import ( "github.com/gravitational/teleport/api/constants" "github.com/gravitational/teleport/api/defaults" devicepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/devicetrust/v1" + integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1" kubeproto "github.com/gravitational/teleport/api/gen/proto/go/teleport/kube/v1" loginrulepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/loginrule/v1" oktapb "github.com/gravitational/teleport/api/gen/proto/go/teleport/okta/v1" @@ -3363,7 +3364,6 @@ func (c *Client) GetClusterMaintenanceConfig(ctx context.Context) (types.Cluster if err != nil { return nil, trail.FromGRPC(err) } - return rsp, nil } @@ -3378,6 +3378,87 @@ func (c *Client) UpdateClusterMaintenanceConfig(ctx context.Context, cmc types.C return trail.FromGRPC(err) } +// integrationsClient returns an unadorned Integration client, using the underlying +// Auth gRPC connection. +func (c *Client) integrationsClient() integrationpb.IntegrationServiceClient { + return integrationpb.NewIntegrationServiceClient(c.conn) +} + +// ListIntegrations returns a paginated list of Integrations. +// The response includes a nextKey which must be used to fetch the next page. +func (c *Client) ListIntegrations(ctx context.Context, pageSize int, nextKey string) ([]types.Integration, string, error) { + resp, err := c.integrationsClient().ListIntegrations(ctx, &integrationpb.ListIntegrationsRequest{ + Limit: int32(pageSize), + NextKey: nextKey, + }) + if err != nil { + return nil, "", trail.FromGRPC(err) + } + + integrations := make([]types.Integration, 0, len(resp.GetIntegrations())) + for _, ig := range resp.GetIntegrations() { + integrations = append(integrations, ig) + } + + return integrations, resp.GetNextKey(), nil +} + +// GetIntegration returns an Integration by its name. +func (c *Client) GetIntegration(ctx context.Context, name string) (types.Integration, error) { + ig, err := c.integrationsClient().GetIntegration(ctx, &integrationpb.GetIntegrationRequest{ + Name: name, + }) + if err != nil { + return nil, trail.FromGRPC(err) + } + + return ig, nil +} + +// CreateIntegration creates a new Integration. +func (c *Client) CreateIntegration(ctx context.Context, ig types.Integration) (types.Integration, error) { + igV1, ok := ig.(*types.IntegrationV1) + if !ok { + return nil, trace.BadParameter("unsupported integration type %T", ig) + } + + ig, err := c.integrationsClient().CreateIntegration(ctx, &integrationpb.CreateIntegrationRequest{Integration: igV1}) + if err != nil { + return nil, trail.FromGRPC(err) + } + + return ig, nil +} + +// UpdateIntegration updates an existing Integration. +func (c *Client) UpdateIntegration(ctx context.Context, ig types.Integration) (types.Integration, error) { + igV1, ok := ig.(*types.IntegrationV1) + if !ok { + return nil, trace.BadParameter("unsupported integration type %T", ig) + } + + ig, err := c.integrationsClient().UpdateIntegration(ctx, &integrationpb.UpdateIntegrationRequest{Integration: igV1}) + if err != nil { + return nil, trail.FromGRPC(err) + } + + return ig, nil +} + +// DeleteIntegration removes an Integration by its name. +func (c *Client) DeleteIntegration(ctx context.Context, name string) error { + _, err := c.integrationsClient().DeleteIntegration(ctx, &integrationpb.DeleteIntegrationRequest{ + Name: name, + }) + return trail.FromGRPC(err) +} + +// DeleteAllIntegrations removes all Integrations. +func (c *Client) DeleteAllIntegrations(ctx context.Context) error { + _, err := c.integrationsClient().DeleteAllIntegrations(ctx, &integrationpb.DeleteAllIntegrationsRequest{}) + return trail.FromGRPC(err) +} + // PluginsClient returns an unadorned Plugins client, using the underlying // Auth gRPC connection. // Clients connecting to non-Enterprise clusters, or older Teleport versions, diff --git a/api/client/events.go b/api/client/events.go index 2be0de813f3f9..6195647b392b8 100644 --- a/api/client/events.go +++ b/api/client/events.go @@ -195,6 +195,10 @@ func EventToGRPC(in types.Event) (*proto.Event, error) { out.Resource = &proto.Event_OktaAssignment{ OktaAssignment: r, } + case *types.IntegrationV1: + out.Resource = &proto.Event_Integration{ + Integration: r, + } default: return nil, trace.BadParameter("resource type %T is not supported", in.Resource) } @@ -341,6 +345,9 @@ func EventFromGRPC(in proto.Event) (*types.Event, error) { } else if r := in.GetOktaAssignment(); r != nil { out.Resource = r return &out, nil + } else if r := in.GetIntegration(); r != nil { + out.Resource = r + return &out, nil } else { return nil, trace.BadParameter("received unsupported resource %T", in.Resource) } diff --git a/api/types/constants.go b/api/types/constants.go index 5a393b5242d5a..068a30091077d 100644 --- a/api/types/constants.go +++ b/api/types/constants.go @@ -401,6 +401,10 @@ const ( // VerbEnroll allows enrollment of trusted devices. // Device Trust is a Teleport Enterprise feature. VerbEnroll = "enroll" + + // VerbUse allows the usage of an Integration. + // Roles with this verb can issue API calls using the integration. + VerbUse = "use" ) const ( diff --git a/lib/auth/api.go b/lib/auth/api.go index c7fe65cb197ec..6dcc894320790 100644 --- a/lib/auth/api.go +++ b/lib/auth/api.go @@ -1006,6 +1006,9 @@ type Cache interface { ListSAMLIdPServiceProviders(ctx context.Context, pageSize int, nextKey string) ([]types.SAMLIdPServiceProvider, string, error) // GetSAMLIdPServiceProvider returns the specified SAML IdP service provider resources. GetSAMLIdPServiceProvider(ctx context.Context, name string) (types.SAMLIdPServiceProvider, error) + + // IntegrationsGetter defines read/list methods for integrations. + services.IntegrationsGetter } type NodeWrapper struct { diff --git a/lib/auth/auth.go b/lib/auth/auth.go index 48dd61f58578d..d877f957e3cec 100644 --- a/lib/auth/auth.go +++ b/lib/auth/auth.go @@ -220,6 +220,12 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) { return nil, trace.Wrap(err) } } + if cfg.Integrations == nil { + cfg.Integrations, err = local.NewIntegrationsService(cfg.Backend) + if err != nil { + return nil, trace.Wrap(err) + } + } limiter, err := limiter.NewConnectionsLimiter(limiter.Config{ MaxConnections: defaults.LimiterMaxConcurrentSignatures, @@ -268,6 +274,7 @@ func NewServer(cfg *InitConfig, opts ...ServerOption) (*Server, error) { UserGroups: cfg.UserGroups, SessionTrackerService: cfg.SessionTrackerService, ConnectionsDiagnostic: cfg.ConnectionsDiagnostic, + Integrations: cfg.Integrations, StatusInternal: cfg.Status, UsageReporter: cfg.UsageReporter, @@ -371,6 +378,7 @@ type Services struct { services.SessionTrackerService services.ConnectionsDiagnostic services.StatusInternal + services.Integrations usagereporter.UsageReporter types.Events events.AuditLogSessionStreamer diff --git a/lib/auth/auth_with_roles.go b/lib/auth/auth_with_roles.go index 8387c86e2b18d..a24719906a245 100644 --- a/lib/auth/auth_with_roles.go +++ b/lib/auth/auth_with_roles.go @@ -37,6 +37,7 @@ import ( "github.com/gravitational/teleport/api/constants" apidefaults "github.com/gravitational/teleport/api/defaults" devicepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/devicetrust/v1" + integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1" loginrulepb "github.com/gravitational/teleport/api/gen/proto/go/teleport/loginrule/v1" oktapb "github.com/gravitational/teleport/api/gen/proto/go/teleport/okta/v1" pluginspb "github.com/gravitational/teleport/api/gen/proto/go/teleport/plugins/v1" @@ -47,6 +48,7 @@ import ( apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/api/types/wrappers" apiutils "github.com/gravitational/teleport/api/utils" + "github.com/gravitational/teleport/lib/auth/integration/integrationv1" "github.com/gravitational/teleport/lib/auth/trust/trustv1" "github.com/gravitational/teleport/lib/authz" "github.com/gravitational/teleport/lib/backend" @@ -305,6 +307,120 @@ func (a *ServerWithRoles) SAMLIdPClient() samlidppb.SAMLIdPServiceClient { ) } +// integrationsService returns an Integrations Service. +func (a *ServerWithRoles) integrationsService() (*integrationv1.Service, error) { + igSvc, err := integrationv1.NewService(&integrationv1.ServiceConfig{ + Authorizer: authz.AuthorizerFunc(func(context.Context) (*authz.Context, error) { + return &a.context, nil + }), + Cache: a.authServer.Cache, + Backend: a.authServer.Services, + }) + if err != nil { + return nil, trace.Wrap(err) + } + + return igSvc, nil +} + +// CreateIntegration creates an Integration. +func (a *ServerWithRoles) CreateIntegration(ctx context.Context, ig types.Integration) (types.Integration, error) { + igSvc, err := a.integrationsService() + if err != nil { + return nil, trace.Wrap(err) + } + + igv1, ok := ig.(*types.IntegrationV1) + if !ok { + return nil, trace.BadParameter("unexpected integration type %T", ig) + } + + ig, err = igSvc.CreateIntegration(ctx, &integrationpb.CreateIntegrationRequest{Integration: igv1}) + if err != nil { + return nil, trace.Wrap(err) + } + return ig, nil +} + +// GetIntegration returns an Integration by its name. +func (a *ServerWithRoles) GetIntegration(ctx context.Context, name string) (types.Integration, error) { + igSvc, err := a.integrationsService() + if err != nil { + return nil, trace.Wrap(err) + } + + ig, err := igSvc.GetIntegration(ctx, &integrationpb.GetIntegrationRequest{Name: name}) + if err != nil { + return nil, trace.Wrap(err) + } + return ig, nil +} + +// ListIntegrations returns a list of Integrations. +// A next page can be retreived by calling ListIntegrations again and passing the nextKey from the previous response. +func (a *ServerWithRoles) ListIntegrations(ctx context.Context, pageSize int, nextKey string) ([]types.Integration, string, error) { + igSvc, err := a.integrationsService() + if err != nil { + return nil, "", trace.Wrap(err) + } + + resp, err := igSvc.ListIntegrations(ctx, &integrationpb.ListIntegrationsRequest{ + Limit: int32(pageSize), + NextKey: nextKey, + }) + if err != nil { + return nil, "", trace.Wrap(err) + } + + integrations := make([]types.Integration, 0, len(resp.GetIntegrations())) + for _, ig := range resp.GetIntegrations() { + integrations = append(integrations, ig) + } + + return integrations, resp.GetNextKey(), nil +} + +// UpdateIntegration updates an Integration. +func (a *ServerWithRoles) UpdateIntegration(ctx context.Context, ig types.Integration) (types.Integration, error) { + igSvc, err := a.integrationsService() + if err != nil { + return nil, trace.Wrap(err) + } + + igv1, ok := ig.(*types.IntegrationV1) + if !ok { + return nil, trace.BadParameter("unexpected integration type %T", ig) + } + + ig, err = igSvc.UpdateIntegration(ctx, &integrationpb.UpdateIntegrationRequest{Integration: igv1}) + if err != nil { + return nil, trace.Wrap(err) + } + return ig, nil +} + +// DeleteAllIntegrations deletes all integrations. +func (a *ServerWithRoles) DeleteAllIntegrations(ctx context.Context) error { + igSvc, err := a.integrationsService() + if err != nil { + return trace.Wrap(err) + } + + _, err = igSvc.DeleteAllIntegrations(ctx, &integrationpb.DeleteAllIntegrationsRequest{}) + return trace.Wrap(err) +} + +// DeleteIntegration deletes an integration integrations. +func (a *ServerWithRoles) DeleteIntegration(ctx context.Context, name string) error { + igSvc, err := a.integrationsService() + if err != nil { + return trace.Wrap(err) + } + + _, err = igSvc.DeleteIntegration(ctx, &integrationpb.DeleteIntegrationRequest{Name: name}) + return trace.Wrap(err) +} + // CreateSessionTracker creates a tracker resource for an active session. func (a *ServerWithRoles) CreateSessionTracker(ctx context.Context, tracker types.SessionTracker) (types.SessionTracker, error) { if err := a.serverAction(); err != nil { diff --git a/lib/auth/clt.go b/lib/auth/clt.go index 68d487e1ad935..3e89bfae29f8f 100644 --- a/lib/auth/clt.go +++ b/lib/auth/clt.go @@ -711,6 +711,7 @@ type ClientI interface { services.SessionTrackerService services.ConnectionsDiagnostic services.SAMLIdPSession + services.Integrations types.Events types.WebSessionsGetter diff --git a/lib/auth/grpcserver.go b/lib/auth/grpcserver.go index af0e9ab51570c..6f1b10ce104dd 100644 --- a/lib/auth/grpcserver.go +++ b/lib/auth/grpcserver.go @@ -44,6 +44,7 @@ import ( "github.com/gravitational/teleport/api/client" "github.com/gravitational/teleport/api/client/proto" "github.com/gravitational/teleport/api/constants" + integrationpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/integration/v1" oktapb "github.com/gravitational/teleport/api/gen/proto/go/teleport/okta/v1" trustpb "github.com/gravitational/teleport/api/gen/proto/go/teleport/trust/v1" "github.com/gravitational/teleport/api/metadata" @@ -51,6 +52,7 @@ import ( apievents "github.com/gravitational/teleport/api/types/events" "github.com/gravitational/teleport/api/types/installers" "github.com/gravitational/teleport/api/types/wrappers" + integrationService "github.com/gravitational/teleport/lib/auth/integration/integrationv1" "github.com/gravitational/teleport/lib/auth/okta" "github.com/gravitational/teleport/lib/auth/trust/trustv1" wanlib "github.com/gravitational/teleport/lib/auth/webauthn" @@ -5068,6 +5070,16 @@ func NewGRPCServer(cfg GRPCServerConfig) (*GRPCServer, error) { } oktapb.RegisterOktaServiceServer(server, oktaServiceServer) + integrationServiceServer, err := integrationService.NewService(&integrationService.ServiceConfig{ + Authorizer: cfg.Authorizer, + Backend: cfg.AuthServer.Services, + Cache: cfg.AuthServer.Cache, + }) + if err != nil { + return nil, trace.Wrap(err) + } + integrationpb.RegisterIntegrationServiceServer(server, integrationServiceServer) + return authServer, nil } diff --git a/lib/auth/init.go b/lib/auth/init.go index 80e21d8acab75..8f594639184c3 100644 --- a/lib/auth/init.go +++ b/lib/auth/init.go @@ -189,6 +189,9 @@ type InitConfig struct { // UserGroups is a service that manages user groups. UserGroups services.UserGroups + // Integrations is a service that manages Integrations. + Integrations services.Integrations + // SessionTrackerService is a service that manages trackers for all active sessions. SessionTrackerService services.SessionTrackerService diff --git a/lib/authz/permissions.go b/lib/authz/permissions.go index 61a0f68b67a0a..e3384e2622215 100644 --- a/lib/authz/permissions.go +++ b/lib/authz/permissions.go @@ -521,6 +521,7 @@ func roleSpecForProxy(clusterName string) types.RoleSpecV6 { types.NewRule(types.KindDatabaseService, services.RO()), types.NewRule(types.KindSAMLIdPServiceProvider, services.RO()), types.NewRule(types.KindUserGroup, services.RO()), + types.NewRule(types.KindIntegration, services.RO()), // this rule allows local proxy to update the remote cluster's host certificate authorities // during certificates renewal { diff --git a/lib/cache/cache.go b/lib/cache/cache.go index d724110260dab..d623a2c359097 100644 --- a/lib/cache/cache.go +++ b/lib/cache/cache.go @@ -113,6 +113,7 @@ func ForAuth(cfg Config) Config { {Kind: types.KindUserGroup}, {Kind: types.KindOktaImportRule}, {Kind: types.KindOktaAssignment}, + {Kind: types.KindIntegration}, } cfg.QueueSize = defaults.AuthQueueSize return cfg @@ -156,6 +157,7 @@ func ForProxy(cfg Config) Config { {Kind: types.KindKubernetesCluster}, {Kind: types.KindSAMLIdPServiceProvider}, {Kind: types.KindUserGroup}, + {Kind: types.KindIntegration}, } cfg.QueueSize = defaults.ProxyQueueSize return cfg @@ -460,6 +462,7 @@ type Cache struct { samlIdPServiceProvidersCache services.SAMLIdPServiceProviders //nolint:revive // Because we want this to be IdP. userGroupsCache services.UserGroups oktaCache services.Okta + integrationsCache services.Integrations eventsFanout *services.FanoutSet // closed indicates that the cache has been closed @@ -527,6 +530,7 @@ func (c *Cache) read() (readGuard, error) { samlIdPServiceProviders: c.samlIdPServiceProvidersCache, userGroups: c.userGroupsCache, okta: c.oktaCache, + integrations: c.integrationsCache, }, nil } c.rw.RUnlock() @@ -552,6 +556,7 @@ func (c *Cache) read() (readGuard, error) { samlIdPServiceProviders: c.Config.SAMLIdPServiceProviders, userGroups: c.Config.UserGroups, okta: c.Config.Okta, + integrations: c.Config.Integrations, release: nil, }, nil } @@ -582,6 +587,7 @@ type readGuard struct { samlIdPServiceProviders services.SAMLIdPServiceProviders //nolint:revive // Because we want this to be IdP. userGroups services.UserGroups okta services.Okta + integrations services.Integrations release func() released bool } @@ -655,6 +661,8 @@ type Config struct { UserGroups services.UserGroups // Okta is an Okta service. Okta services.Okta + // Integrations is an Integrations service. + Integrations services.Integrations // Backend is a backend for local cache Backend backend.Backend // MaxRetryPeriod is the maximum period between cache retries on failures @@ -814,6 +822,12 @@ func New(config Config) (*Cache, error) { return nil, trace.Wrap(err) } + integrationsCache, err := local.NewIntegrationsService(config.Backend) + if err != nil { + cancel() + return nil, trace.Wrap(err) + } + cs := &Cache{ ctx: ctx, cancel: cancel, @@ -841,6 +855,7 @@ func New(config Config) (*Cache, error) { samlIdPServiceProvidersCache: samlIdPServiceProvidersCache, userGroupsCache: userGroupsCache, oktaCache: oktaCache, + integrationsCache: integrationsCache, eventsFanout: services.NewFanoutSet(), Logger: log.WithFields(log.Fields{ trace.Component: config.Component, @@ -2389,6 +2404,32 @@ func (c *Cache) GetOktaAssignment(ctx context.Context, name string) (types.OktaA return rg.okta.GetOktaAssignment(ctx, name) } +// ListIntegrations returns a paginated list of all Integrations resources. +func (c *Cache) ListIntegrations(ctx context.Context, pageSize int, nextKey string) ([]types.Integration, string, error) { + ctx, span := c.Tracer.Start(ctx, "cache/ListIntegrations") + defer span.End() + + rg, err := c.read() + if err != nil { + return nil, "", trace.Wrap(err) + } + defer rg.Release() + return rg.integrations.ListIntegrations(ctx, pageSize, nextKey) +} + +// GetIntegration returns the specified Integration resources. +func (c *Cache) GetIntegration(ctx context.Context, name string) (types.Integration, error) { + ctx, span := c.Tracer.Start(ctx, "cache/GetIntegration") + defer span.End() + + rg, err := c.read() + if err != nil { + return nil, trace.Wrap(err) + } + defer rg.Release() + return rg.integrations.GetIntegration(ctx, name) +} + // ListResources is a part of auth.Cache implementation func (c *Cache) ListResources(ctx context.Context, req proto.ListResourcesRequest) (*types.ListResourcesResponse, error) { ctx, span := c.Tracer.Start(ctx, "cache/ListResources") diff --git a/lib/cache/cache_test.go b/lib/cache/cache_test.go index 3f1ac663c6274..0137b59a58ad1 100644 --- a/lib/cache/cache_test.go +++ b/lib/cache/cache_test.go @@ -86,6 +86,7 @@ type testPack struct { samlIDPServiceProviders services.SAMLIdPServiceProviders userGroups services.UserGroups okta services.Okta + integrations services.Integrations } // testFuncs are functions to support testing an object in a cache. @@ -222,6 +223,12 @@ func newPackWithoutCache(dir string, opts ...packOption) (*testPack, error) { } p.okta = oktaSvc + igSvc, err := local.NewIntegrationsService(p.backend) + if err != nil { + return nil, trace.Wrap(err) + } + p.integrations = igSvc + return p, nil } @@ -258,6 +265,7 @@ func newPack(dir string, setupConfig func(c Config) Config, opts ...packOption) SAMLIdPServiceProviders: p.samlIDPServiceProviders, UserGroups: p.userGroups, Okta: p.okta, + Integrations: p.integrations, MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) @@ -645,6 +653,7 @@ func TestCompletenessInit(t *testing.T) { SAMLIdPServiceProviders: p.samlIDPServiceProviders, UserGroups: p.userGroups, Okta: p.okta, + Integrations: p.integrations, MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) @@ -712,6 +721,7 @@ func TestCompletenessReset(t *testing.T) { SAMLIdPServiceProviders: p.samlIDPServiceProviders, UserGroups: p.userGroups, Okta: p.okta, + Integrations: p.integrations, MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) @@ -891,6 +901,7 @@ func TestListResources_NodesTTLVariant(t *testing.T) { SAMLIdPServiceProviders: p.samlIDPServiceProviders, UserGroups: p.userGroups, Okta: p.okta, + Integrations: p.integrations, MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, neverOK: true, // ensure reads are never healthy @@ -968,6 +979,7 @@ func initStrategy(t *testing.T) { SAMLIdPServiceProviders: p.samlIDPServiceProviders, UserGroups: p.userGroups, Okta: p.okta, + Integrations: p.integrations, MaxRetryPeriod: 200 * time.Millisecond, EventsC: p.eventsC, })) @@ -1991,6 +2003,44 @@ func TestOktaAssignments(t *testing.T) { }) } +// TestIntegrations tests that CRUD operations on integrations resources are +// replicated from the backend to the cache. +func TestIntegrations(t *testing.T) { + t.Parallel() + + p := newTestPack(t, ForAuth) + t.Cleanup(p.Close) + + testResources(t, p, testFuncs[types.Integration]{ + newResource: func(name string) (types.Integration, error) { + return types.NewIntegrationAWSOIDC( + types.Metadata{Name: name}, + &types.AWSOIDCIntegrationSpecV1{ + RoleARN: "arn:aws:iam::123456789012:role/OpsTeam", + }, + ) + }, + create: func(ctx context.Context, i types.Integration) error { + _, err := p.integrations.CreateIntegration(ctx, i) + return err + }, + list: func(ctx context.Context) ([]types.Integration, error) { + results, _, err := p.integrations.ListIntegrations(ctx, 0, "") + return results, err + }, + cacheGet: p.cache.GetIntegration, + cacheList: func(ctx context.Context) ([]types.Integration, error) { + results, _, err := p.cache.ListIntegrations(ctx, 0, "") + return results, err + }, + update: func(ctx context.Context, i types.Integration) error { + _, err := p.integrations.UpdateIntegration(ctx, i) + return err + }, + deleteAll: p.integrations.DeleteAllIntegrations, + }) +} + // testResources is a generic tester for resources. func testResources[T types.Resource](t *testing.T, p *testPack, funcs testFuncs[T]) { ctx := context.Background() @@ -2398,6 +2448,7 @@ func TestCacheWatchKindExistsInEvents(t *testing.T) { types.KindUserGroup: &types.UserGroupV1{}, types.KindOktaImportRule: &types.OktaImportRuleV1{}, types.KindOktaAssignment: &types.OktaAssignmentV1{}, + types.KindIntegration: &types.IntegrationV1{}, } for name, cfg := range cases { diff --git a/lib/cache/collections.go b/lib/cache/collections.go index 4e63f812bc466..57307b42f06d0 100644 --- a/lib/cache/collections.go +++ b/lib/cache/collections.go @@ -354,6 +354,11 @@ func setupCollections(c *Cache, watches []types.WatchKind) (map[resourceKind]col return nil, trace.BadParameter("missing parameter Okta") } collections[resourceKind] = &genericCollection[types.OktaAssignment, oktaAssignmentsExecutor]{cache: c, watch: watch} + case types.KindIntegration: + if c.Integrations == nil { + return nil, trace.BadParameter("missing parameter Integrations") + } + collections[resourceKind] = &genericCollection[types.Integration, integrationsExecutor]{cache: c, watch: watch} default: return nil, trace.BadParameter("resource %q is not supported", watch.Kind) } @@ -1575,3 +1580,48 @@ func (oktaAssignmentsExecutor) delete(ctx context.Context, cache *Cache, resourc func (oktaAssignmentsExecutor) isSingleton() bool { return false } var _ executor[types.OktaAssignment] = oktaAssignmentsExecutor{} + +type integrationsExecutor struct{} + +func (integrationsExecutor) getAll(ctx context.Context, cache *Cache, loadSecrets bool) ([]types.Integration, error) { + var ( + startKey string + resources []types.Integration + ) + for { + var igs []types.Integration + var err error + igs, startKey, err = cache.Integrations.ListIntegrations(ctx, 0, startKey) + if err != nil { + return nil, trace.Wrap(err) + } + + resources = append(resources, igs...) + + if startKey == "" { + break + } + } + + return resources, nil +} + +func (integrationsExecutor) upsert(ctx context.Context, cache *Cache, resource types.Integration) error { + _, err := cache.integrationsCache.CreateIntegration(ctx, resource) + if trace.IsAlreadyExists(err) { + _, err = cache.integrationsCache.UpdateIntegration(ctx, resource) + } + return trace.Wrap(err) +} + +func (integrationsExecutor) deleteAll(ctx context.Context, cache *Cache) error { + return cache.integrationsCache.DeleteAllIntegrations(ctx) +} + +func (integrationsExecutor) delete(ctx context.Context, cache *Cache, resource types.Resource) error { + return cache.integrationsCache.DeleteIntegration(ctx, resource.GetName()) +} + +func (integrationsExecutor) isSingleton() bool { return false } + +var _ executor[types.Integration] = integrationsExecutor{} diff --git a/lib/service/service.go b/lib/service/service.go index baee5a46f5e10..6319297bfc3ea 100644 --- a/lib/service/service.go +++ b/lib/service/service.go @@ -2008,6 +2008,7 @@ func (process *TeleportProcess) newAccessCache(cfg accessCacheConfig) (*cache.Ca SAMLIdPServiceProviders: cfg.services, UserGroups: cfg.services, Okta: cfg.services.OktaClient(), + Integrations: cfg.services, WebSession: cfg.services.WebSessions(), WebToken: cfg.services.WebTokens(), Component: teleport.Component(append(cfg.cacheName, process.id, teleport.ComponentCache)...), diff --git a/lib/services/local/events.go b/lib/services/local/events.go index a3c23f9572394..9916bb6a8c50b 100644 --- a/lib/services/local/events.go +++ b/lib/services/local/events.go @@ -154,6 +154,8 @@ func (e *EventsService) NewWatcher(ctx context.Context, watch types.Watch) (type parser = newOktaImportRuleParser() case types.KindOktaAssignment: parser = newOktaAssignmentParser() + case types.KindIntegration: + parser = newIntegrationParser() default: return nil, trace.BadParameter("watcher on object kind %q is not supported", kind.Kind) } @@ -1523,6 +1525,30 @@ func (p *oktaAssignmentParser) parse(event backend.Event) (types.Resource, error } } +func newIntegrationParser() *integrationParser { + return &integrationParser{ + baseParser: newBaseParser(backend.Key(integrationsPrefix)), + } +} + +type integrationParser struct { + baseParser +} + +func (p *integrationParser) parse(event backend.Event) (types.Resource, error) { + switch event.Type { + case types.OpDelete: + return resourceHeader(event, types.KindIntegration, types.V1, 0) + case types.OpPut: + return services.UnmarshalIntegration(event.Item.Value, + services.WithResourceID(event.Item.ID), + services.WithExpires(event.Item.Expires), + ) + default: + return nil, trace.BadParameter("event %v is not supported", event.Type) + } +} + func resourceHeader(event backend.Event, kind, version string, offset int) (types.Resource, error) { name, err := base(event.Item.Key, offset) if err != nil { diff --git a/lib/services/presets.go b/lib/services/presets.go index caf70e62c3d32..40f520a6687f7 100644 --- a/lib/services/presets.go +++ b/lib/services/presets.go @@ -85,6 +85,7 @@ func NewPresetEditorRole() types.Role { types.NewRule(types.KindOktaImportRule, RW()), types.NewRule(types.KindOktaAssignment, RW()), types.NewRule(types.KindLock, RW()), + types.NewRule(types.KindIntegration, append(RW(), types.VerbUse)), // Please see defaultAllowRules when adding a new rule. }, }, @@ -209,6 +210,7 @@ func defaultAllowRules() map[string][]types.Rule { types.NewRule(types.KindOktaAssignment, RW()), types.NewRule(types.KindDevice, append(RW(), types.VerbCreateEnrollToken, types.VerbEnroll)), types.NewRule(types.KindLock, RW()), + types.NewRule(types.KindIntegration, append(RW(), types.VerbUse)), }, } } diff --git a/lib/services/services.go b/lib/services/services.go index af615d6865763..079807b27031b 100644 --- a/lib/services/services.go +++ b/lib/services/services.go @@ -43,6 +43,7 @@ type Services interface { WindowsDesktops SAMLIdPServiceProviders UserGroups + Integrations OktaClient() Okta }