Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions lib/integrations/awsoidc/deployservice_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ func (req *UpdateServiceRequest) CheckAndSetDefaults() error {
return nil
}

// UpdateDeployServiceAgent updates the deploy service agent with the specified teleportVersionTag.
func UpdateDeployServiceAgent(ctx context.Context, clt DeployServiceClient, req UpdateServiceRequest) error {
// UpdateDeployService updates the AWS OIDC deploy service with the specified version tag.
func UpdateDeployService(ctx context.Context, clt DeployServiceClient, req UpdateServiceRequest) error {
if err := req.CheckAndSetDefaults(); err != nil {
return trace.Wrap(err)
}
Expand Down
89 changes: 47 additions & 42 deletions lib/service/awsoidc.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,25 @@ import (
)

const (
// updateDeployAgentsInterval specifies how frequently to check for available updates.
updateDeployAgentsInterval = time.Minute * 30
// updateAWSOIDCDeployServiceInterval specifies how frequently to check for available updates.
updateAWSOIDCDeployServiceInterval = time.Minute * 30

// maxConcurrentUpdates specifies the maximum number of concurrent updates
maxConcurrentUpdates = 3
)

func (process *TeleportProcess) initDeployServiceUpdater() error {
func (process *TeleportProcess) initAWSOIDCDeployServiceUpdater() error {
// start process only after teleport process has started
if _, err := process.WaitForEvent(process.GracefulExitContext(), TeleportReadyEvent); err != nil {
return trace.Wrap(err)
}

resp, err := process.getInstanceClient().Ping(process.GracefulExitContext())
authClient := process.getInstanceClient()
if authClient == nil {
return trace.Errorf("instance client not yet initialized")
}

resp, err := authClient.Ping(process.GracefulExitContext())
if err != nil {
return trace.Wrap(err)
}
Expand Down Expand Up @@ -81,14 +86,14 @@ func (process *TeleportProcess) initDeployServiceUpdater() error {
return trace.Wrap(err)
}

clusterNameConfig, err := process.getInstanceClient().GetClusterName()
clusterNameConfig, err := authClient.GetClusterName()
if err != nil {
return trace.Wrap(err)
}

updater, err := NewDeployServiceUpdater(DeployServiceUpdaterConfig{
updater, err := NewDeployServiceUpdater(AWSOIDCDeployServiceUpdaterConfig{
Log: process.log.WithField(trace.Component, teleport.Component(teleport.ComponentProxy, "aws_oidc_deploy_service_updater")),
AuthClient: process.getInstanceClient(),
AuthClient: authClient,
Clock: process.Clock,
TeleportClusterName: clusterNameConfig.GetClusterName(),
TeleportClusterVersion: resp.GetServerVersion(),
Expand All @@ -100,12 +105,12 @@ func (process *TeleportProcess) initDeployServiceUpdater() error {
return trace.Wrap(err)
}

process.log.Infof("The new service has started successfully. Checking for deploy service updates every %v.", updateDeployAgentsInterval)
process.log.Infof("The new service has started successfully. Checking for deploy service updates every %v.", updateAWSOIDCDeployServiceInterval)
return trace.Wrap(updater.Run(process.GracefulExitContext()))
}

// DeployServiceUpdaterConfig specifies updater configs
type DeployServiceUpdaterConfig struct {
// AWSOIDCDeployServiceUpdaterConfig specifies updater configs
type AWSOIDCDeployServiceUpdaterConfig struct {
// Log is the logger
Log *logrus.Entry
// AuthClient is the auth api client
Expand All @@ -116,7 +121,7 @@ type DeployServiceUpdaterConfig struct {
TeleportClusterName string
// TeleportClusterVersion specifies the teleport cluster version
TeleportClusterVersion string
// AWSOIDCProvderAddr specifies the aws oidc provider address used to generate AWS OIDC tokens
// AWSOIDCProvderAddr specifies the AWS OIDC provider address used to generate AWS OIDC tokens
AWSOIDCProviderAddr string
// CriticalEndpoint specifies the endpoint to check for critical updates
CriticalEndpoint string
Expand All @@ -125,7 +130,7 @@ type DeployServiceUpdaterConfig struct {
}

// CheckAndSetDefaults checks and sets default config values.
func (cfg *DeployServiceUpdaterConfig) CheckAndSetDefaults() error {
func (cfg *AWSOIDCDeployServiceUpdaterConfig) CheckAndSetDefaults() error {
if cfg.AuthClient == nil {
return trace.BadParameter("auth client required")
}
Expand All @@ -139,7 +144,7 @@ func (cfg *DeployServiceUpdaterConfig) CheckAndSetDefaults() error {
}

if cfg.AWSOIDCProviderAddr == "" {
return trace.BadParameter("aws oidc provider address required")
return trace.BadParameter("AWS OIDC provider address required")
}

if cfg.Log == nil {
Expand All @@ -153,33 +158,33 @@ func (cfg *DeployServiceUpdaterConfig) CheckAndSetDefaults() error {
return nil
}

// DeployServiceUpdater periodically updates deploy service agents
type DeployServiceUpdater struct {
DeployServiceUpdaterConfig
// AWSOIDCDeployServiceUpdater periodically updates AWS OIDC deploy service
type AWSOIDCDeployServiceUpdater struct {
AWSOIDCDeployServiceUpdaterConfig
}

// NewDeployServiceUpdater returns a new DeployServiceUpdater
func NewDeployServiceUpdater(config DeployServiceUpdaterConfig) (*DeployServiceUpdater, error) {
// NewAWSOIDCDeployServiceUpdater returns a new AWSOIDCDeployServiceUpdater
func NewDeployServiceUpdater(config AWSOIDCDeployServiceUpdaterConfig) (*AWSOIDCDeployServiceUpdater, error) {
if err := config.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}

return &DeployServiceUpdater{
DeployServiceUpdaterConfig: config,
return &AWSOIDCDeployServiceUpdater{
AWSOIDCDeployServiceUpdaterConfig: config,
}, nil
}

// Run periodically updates the deploy service agents
func (updater *DeployServiceUpdater) Run(ctx context.Context) error {
// Run periodically updates the AWS OIDC deploy service
func (updater *AWSOIDCDeployServiceUpdater) Run(ctx context.Context) error {
periodic := interval.New(interval.Config{
Duration: updateDeployAgentsInterval,
Duration: updateAWSOIDCDeployServiceInterval,
Jitter: retryutils.NewSeventhJitter(),
})
defer periodic.Stop()

for {
if err := updater.updateDeployServiceAgents(ctx); err != nil {
updater.Log.WithError(err).Warningf("Update failed. Retrying in ~%v.", updateDeployAgentsInterval)
if err := updater.updateAWSOIDCDeployServices(ctx); err != nil {
updater.Log.WithError(err).Warningf("Update failed. Retrying in ~%v.", updateAWSOIDCDeployServiceInterval)
}

select {
Expand All @@ -190,7 +195,7 @@ func (updater *DeployServiceUpdater) Run(ctx context.Context) error {
}
}

func (updater *DeployServiceUpdater) updateDeployServiceAgents(ctx context.Context) error {
func (updater *AWSOIDCDeployServiceUpdater) updateAWSOIDCDeployServices(ctx context.Context) error {
cmc, err := updater.AuthClient.GetClusterMaintenanceConfig(ctx)
if err != nil {
return trace.Wrap(err)
Expand All @@ -215,14 +220,14 @@ func (updater *DeployServiceUpdater) updateDeployServiceAgents(ctx context.Conte
stableVersion = strings.TrimPrefix(stableVersion, "v")

// minServerVersion specifies the minimum version of the cluster required for
// updated agents to remain compatible with the cluster.
// updated AWS OIDC deploy service to remain compatible with the cluster.
minServerVersion, err := utils.MajorSemver(stableVersion)
if err != nil {
return trace.Wrap(err)
}

if !utils.MeetsVersion(updater.TeleportClusterVersion, minServerVersion) {
updater.Log.Debugf("Skipping update. %v agents will not be compatible with a %v cluster.", stableVersion, updater.TeleportClusterVersion)
updater.Log.Debugf("Skipping update. %v AWS OIDC Deploy Service will not be compatible with a %v cluster.", stableVersion, updater.TeleportClusterVersion)
return nil
}

Expand All @@ -232,7 +237,7 @@ func (updater *DeployServiceUpdater) updateDeployServiceAgents(ctx context.Conte
}

// The updater needs to iterate over all integrations and aws regions to check
// for deploy service agents to update. In order to reduce the number of api
// for AWS OIDC deploy services to update. In order to reduce the number of api
// calls, the aws regions are first reduced to only the regions containing
// an RDS database.
awsRegions := make(map[string]interface{})
Expand All @@ -256,8 +261,8 @@ func (updater *DeployServiceUpdater) updateDeployServiceAgents(ctx context.Conte
}
go func(ig types.Integration, region string) {
defer sem.Release(1)
if err := updater.updateDeployServiceAgent(ctx, ig, region, stableVersion); err != nil {
updater.Log.WithError(err).Warningf("Failed to update deploy service agent for integration %s in region %s.", ig.GetName(), region)
if err := updater.updateAWSOIDCDeployService(ctx, ig, region, stableVersion); err != nil {
updater.Log.WithError(err).Warningf("Failed to update AWS OIDC Deploy Service for integration %s in region %s.", ig.GetName(), region)
}
}(ig, region)
}
Expand All @@ -267,8 +272,8 @@ func (updater *DeployServiceUpdater) updateDeployServiceAgents(ctx context.Conte
return trace.Wrap(sem.Acquire(ctx, maxConcurrentUpdates))
}

func (updater *DeployServiceUpdater) updateDeployServiceAgent(ctx context.Context, integration types.Integration, awsRegion, teleportVersion string) error {
// Do not attempt update if integration is not an aws oidc integration.
func (updater *AWSOIDCDeployServiceUpdater) updateAWSOIDCDeployService(ctx context.Context, integration types.Integration, awsRegion, teleportVersion string) error {
// Do not attempt update if integration is not an AWS OIDC integration.
if integration.GetAWSOIDCIntegrationSpec() == nil {
return nil
}
Expand All @@ -288,7 +293,7 @@ func (updater *DeployServiceUpdater) updateDeployServiceAgent(ctx context.Contex
}

// The deploy service client is initialized using AWS OIDC integration.
deployServiceClient, err := awsoidc.NewDeployServiceClient(ctx, req, updater.AuthClient)
awsOIDCDeployServiceClient, err := awsoidc.NewDeployServiceClient(ctx, req, updater.AuthClient)
if err != nil {
return trace.Wrap(err)
}
Expand All @@ -301,18 +306,18 @@ func (updater *DeployServiceUpdater) updateDeployServiceAgent(ctx context.Contex
types.IntegrationLabel: integration.GetName(),
}

// Acquire a lease for the region + integration before attempting to update the deploy service agent.
// Acquire a lease for the region + integration before attempting to update the deploy service.
// If the lease cannot be acquired, the update is already being handled by another instance.
semLock, err := updater.AuthClient.AcquireSemaphore(ctx, types.AcquireSemaphoreRequest{
SemaphoreKind: types.SemaphoreKindConnection,
SemaphoreName: fmt.Sprintf("update_deploy_service_agents_%s_%s", awsRegion, integration.GetName()),
SemaphoreName: fmt.Sprintf("update_aws_oidc_deploy_service_%s_%s", awsRegion, integration.GetName()),
MaxLeases: 1,
Expires: updater.Clock.Now().Add(updateDeployAgentsInterval),
Holder: "update_deploy_service_agents",
Expires: updater.Clock.Now().Add(updateAWSOIDCDeployServiceInterval),
Holder: "update_aws_oidc_deploy_service",
})
if err != nil {
if strings.Contains(err.Error(), teleport.MaxLeases) {
updater.Log.WithError(err).Debug("Deploy service agent update is already being processed.")
updater.Log.WithError(err).Debug("AWS OIDC Deploy Service update is already being processed.")
return nil
}
return trace.Wrap(err)
Expand All @@ -323,8 +328,8 @@ func (updater *DeployServiceUpdater) updateDeployServiceAgent(ctx context.Contex
}
}()

updater.Log.Debugf("Updating Deploy Service Agents for integration %s in AWS region: %s", integration.GetName(), awsRegion)
if err := awsoidc.UpdateDeployServiceAgent(ctx, deployServiceClient, awsoidc.UpdateServiceRequest{
updater.Log.Debugf("Updating AWS OIDC Deploy Service for integration %s in AWS region: %s", integration.GetName(), awsRegion)
if err := awsoidc.UpdateDeployService(ctx, awsOIDCDeployServiceClient, awsoidc.UpdateServiceRequest{
TeleportClusterName: updater.TeleportClusterName,
TeleportVersionTag: teleportVersion,
OwnershipTags: ownershipTags,
Expand All @@ -338,7 +343,7 @@ func (updater *DeployServiceUpdater) updateDeployServiceAgent(ctx context.Contex
updater.Log.Debugf("Integration %s does not manage any services within region %s.", integration.GetName(), awsRegion)
return nil
case trace.IsAccessDenied(awslib.ConvertIAMv2Error(trace.Unwrap(err))):
// The aws oidc role may lack permissions due to changes in teleport.
// The AWS OIDC role may lack permissions due to changes in teleport.
// In this situation users should be notified that they will need to
// re-run the deploy service iam configuration script and update the
// permissions.
Expand Down
2 changes: 1 addition & 1 deletion lib/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,7 +1014,7 @@ func NewTeleport(cfg *servicecfg.Config) (*TeleportProcess, error) {
}

if process.Config.Proxy.Enabled {
process.RegisterFunc("update.aws-oidc.deploy.agents", process.initDeployServiceUpdater)
process.RegisterFunc("update.aws-oidc.deploy.service", process.initAWSOIDCDeployServiceUpdater)
}

serviceStarted := false
Expand Down