Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 33 additions & 16 deletions lib/auth/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,15 +247,32 @@ func Init(ctx context.Context, cfg InitConfig, opts ...ServerOption) (*Server, e
}

domainName := cfg.ClusterName.GetClusterName()
lock, err := backend.AcquireLock(ctx, cfg.Backend, domainName, 30*time.Second)
if err != nil {
if err := backend.RunWhileLocked(ctx,
backend.RunWhileLockedConfig{
LockConfiguration: backend.LockConfiguration{
Backend: cfg.Backend,
LockName: domainName,
TTL: 30 * time.Second,
},
RefreshLockInterval: 20 * time.Second,
}, func(ctx context.Context) error {
return trace.Wrap(initCluster(ctx, cfg, asrv))
}); err != nil {
return nil, trace.Wrap(err)
}
defer lock.Release(ctx, cfg.Backend)

return asrv, nil
}

// initCluster configures the cluster based on the user provided configuration. This should
// only be called when the init lock is held to prevent multiple instances of Auth from attempting
// to bootstrap the cluster at the same time.
func initCluster(ctx context.Context, cfg InitConfig, asrv *Server) error {
span := oteltrace.SpanFromContext(ctx)
domainName := cfg.ClusterName.GetClusterName()
firstStart, err := isFirstStart(ctx, asrv, cfg)
if err != nil {
return nil, trace.Wrap(err)
return trace.Wrap(err)
}

// if bootstrap resources are supplied, use them to bootstrap backend state
Expand All @@ -264,10 +281,10 @@ func Init(ctx context.Context, cfg InitConfig, opts ...ServerOption) (*Server, e
if firstStart {
log.Infof("Applying %v bootstrap resources (first initialization)", len(cfg.BootstrapResources))
if err := checkResourceConsistency(ctx, asrv.keyStore, domainName, cfg.BootstrapResources...); err != nil {
return nil, trace.Wrap(err, "refusing to bootstrap backend")
return trace.Wrap(err, "refusing to bootstrap backend")
}
if err := local.CreateResources(ctx, cfg.Backend, cfg.BootstrapResources...); err != nil {
return nil, trace.Wrap(err, "backend bootstrap failed")
return trace.Wrap(err, "backend bootstrap failed")
}
} else {
log.Warnf("Ignoring %v bootstrap resources (previously initialized)", len(cfg.BootstrapResources))
Expand All @@ -279,7 +296,7 @@ func Init(ctx context.Context, cfg InitConfig, opts ...ServerOption) (*Server, e
log.Infof("Applying %v resources (apply-on-startup)", len(cfg.ApplyOnStartupResources))

if err := applyResources(ctx, asrv.Services, cfg.ApplyOnStartupResources); err != nil {
return nil, trace.Wrap(err, "applying resources failed")
return trace.Wrap(err, "applying resources failed")
}
}

Expand All @@ -291,7 +308,7 @@ func Init(ctx context.Context, cfg InitConfig, opts ...ServerOption) (*Server, e
// singletons). However, we need to keep them around while Telekube uses them.
for _, role := range cfg.Roles {
if err := asrv.UpsertRole(ctx, role); err != nil {
return nil, trace.Wrap(err)
return trace.Wrap(err)
}
log.Infof("Created role: %v.", role)
}
Expand All @@ -308,15 +325,15 @@ func Init(ctx context.Context, cfg InitConfig, opts ...ServerOption) (*Server, e
// this part of code is only used in tests.
if err := asrv.CreateCertAuthority(ca); err != nil {
if !trace.IsAlreadyExists(err) {
return nil, trace.Wrap(err)
return trace.Wrap(err)
}
} else {
log.Infof("Created trusted certificate authority: %q, type: %q.", ca.GetName(), ca.GetType())
}
}
for _, tunnel := range cfg.ReverseTunnels {
if err := asrv.UpsertReverseTunnel(tunnel); err != nil {
return nil, trace.Wrap(err)
return trace.Wrap(err)
}
log.Infof("Created reverse tunnel: %v.", tunnel)
}
Expand Down Expand Up @@ -398,7 +415,7 @@ func Init(ctx context.Context, cfg InitConfig, opts ...ServerOption) (*Server, e
})

if err := g.Wait(); err != nil {
return nil, trace.Wrap(err)
return trace.Wrap(err)
}

// Override user passed in cluster name with what is in the backend.
Expand All @@ -407,7 +424,7 @@ func Init(ctx context.Context, cfg InitConfig, opts ...ServerOption) (*Server, e
// Migrate Host CA as Database CA before certificates generation. Otherwise, the Database CA will be
// generated which we don't want for existing installations.
if err := migrateDBAuthority(ctx, asrv); err != nil {
return nil, trace.Wrap(err, "failed to migrate database CA")
return trace.Wrap(err, "failed to migrate database CA")
}

// generate certificate authorities if they don't exist
Expand Down Expand Up @@ -506,7 +523,7 @@ func Init(ctx context.Context, cfg InitConfig, opts ...ServerOption) (*Server, e
})
}
if err := g.Wait(); err != nil {
return nil, trace.Wrap(err)
return trace.Wrap(err)
}

// Delete any unused keys from the keyStore. This is to avoid exhausting
Expand All @@ -528,14 +545,14 @@ func Init(ctx context.Context, cfg InitConfig, opts ...ServerOption) (*Server, e
span.AddEvent("migrating legacy resources")
// Migrate any legacy resources to new format.
if err := migrateLegacyResources(ctx, asrv); err != nil {
return nil, trace.Wrap(err)
return trace.Wrap(err)
}
span.AddEvent("completed migration legacy resources")

span.AddEvent("creating presets")
// Create presets - convenience and example resources.
if err := createPresets(ctx, asrv); err != nil {
return nil, trace.Wrap(err)
return trace.Wrap(err)
}
span.AddEvent("completed creating presets")

Expand All @@ -546,7 +563,7 @@ func Init(ctx context.Context, cfg InitConfig, opts ...ServerOption) (*Server, e
log.Infof("Auth server is skipping periodic operations.")
}

return asrv, nil
return nil
}

// generateAuthority creates a new self-signed authority of the provided type
Expand Down
89 changes: 75 additions & 14 deletions lib/backend/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,28 +54,55 @@ func randomID() ([]byte, error) {
return bytes[:], nil
}

type LockConfiguration struct {
Backend Backend
LockName string
// TTL defines when lock will be released automatically
TTL time.Duration
// RetryInterval defines interval which is used to retry locking after
// initial lock failed due to someone else holding lock.
RetryInterval time.Duration
}

func (l *LockConfiguration) CheckAndSetDefaults() error {
if l.Backend == nil {
return trace.BadParameter("missing Backend")
}
if l.LockName == "" {
return trace.BadParameter("missing LockName")
}
if l.TTL == 0 {
return trace.BadParameter("missing TTL")
}
if l.RetryInterval == 0 {
l.RetryInterval = 250 * time.Millisecond
}
return nil
}

// AcquireLock grabs a lock that will be released automatically in TTL
func AcquireLock(ctx context.Context, backend Backend, lockName string, ttl time.Duration) (Lock, error) {
if lockName == "" {
return Lock{}, trace.BadParameter("missing parameter lock name")
func AcquireLock(ctx context.Context, cfg LockConfiguration) (Lock, error) {
err := cfg.CheckAndSetDefaults()
if err != nil {
return Lock{}, trace.Wrap(err)
}
key := lockKey(lockName)
key := lockKey(cfg.LockName)
id, err := randomID()
if err != nil {
return Lock{}, trace.Wrap(err)
}
for {
// Get will clear TTL on a lock
backend.Get(ctx, key)
cfg.Backend.Get(ctx, key)

// CreateVal is atomic:
_, err = backend.Create(ctx, Item{Key: key, Value: id, Expires: backend.Clock().Now().UTC().Add(ttl)})
_, err = cfg.Backend.Create(ctx, Item{Key: key, Value: id, Expires: cfg.Backend.Clock().Now().UTC().Add(cfg.TTL)})
if err == nil {
break // success
}
if trace.IsAlreadyExists(err) { // locked? wait and repeat:
select {
case <-backend.Clock().After(250 * time.Millisecond):
case <-cfg.Backend.Clock().After(cfg.RetryInterval):
// OK, go around and try again
continue

Expand All @@ -86,7 +113,7 @@ func AcquireLock(ctx context.Context, backend Backend, lockName string, ttl time
}
return Lock{}, trace.ConvertSystemError(err)
}
return Lock{key: key, id: id, ttl: ttl}, nil
return Lock{key: key, id: id, ttl: cfg.TTL}, nil
}

// Release forces lock release
Expand Down Expand Up @@ -134,22 +161,52 @@ func (l *Lock) resetTTL(ctx context.Context, backend Backend) error {
return nil
}

// RunWhileLockedConfig is configuration for RunWhileLocked function.
type RunWhileLockedConfig struct {
// LockConfiguration is configuration for acquire lock.
LockConfiguration

// ReleaseCtxTimeout defines timeout used for calling lock.Release method (optional).
ReleaseCtxTimeout time.Duration
// RefreshLockInterval defines interval at which lock will be refreshed
// if fn is still running (optional).
RefreshLockInterval time.Duration
}

func (c *RunWhileLockedConfig) CheckAndSetDefaults() error {
if err := c.LockConfiguration.CheckAndSetDefaults(); err != nil {
return trace.Wrap(err)
}
if c.ReleaseCtxTimeout <= 0 {
c.ReleaseCtxTimeout = 300 * time.Millisecond
}
if c.RefreshLockInterval <= 0 {
c.RefreshLockInterval = c.LockConfiguration.TTL / 2
}
return nil
}

// RunWhileLocked allows you to run a function while a lock is held.
func RunWhileLocked(ctx context.Context, backend Backend, lockName string, ttl time.Duration, fn func(context.Context) error) error {
lock, err := AcquireLock(ctx, backend, lockName, ttl)
func RunWhileLocked(ctx context.Context, cfg RunWhileLockedConfig, fn func(context.Context) error) error {
if err := cfg.CheckAndSetDefaults(); err != nil {
return trace.Wrap(err)
}

lock, err := AcquireLock(ctx, cfg.LockConfiguration)
if err != nil {
return trace.Wrap(err)
}

subContext, cancelFunction := context.WithCancel(ctx)
defer cancelFunction()

stopRefresh := make(chan struct{})
go func() {
refreshAfter := ttl / 2
refreshAfter := cfg.RefreshLockInterval
for {
select {
case <-backend.Clock().After(refreshAfter):
if err := lock.resetTTL(ctx, backend); err != nil {
case <-cfg.Backend.Clock().After(refreshAfter):
if err := lock.resetTTL(ctx, cfg.Backend); err != nil {
cancelFunction()
log.Errorf("%v", err)
return
Expand All @@ -163,7 +220,11 @@ func RunWhileLocked(ctx context.Context, backend Backend, lockName string, ttl t
fnErr := fn(subContext)
close(stopRefresh)

if err := lock.Release(ctx, backend); err != nil {
// lock.Release should be called with separate ctx. If someone cancels via ctx
// RunWhileLocked method, we want to at least try releasing lock.
releaseLockCtx, releaseLockCancel := context.WithTimeout(context.Background(), cfg.ReleaseCtxTimeout)
defer releaseLockCancel()
if err := lock.Release(releaseLockCtx, cfg.Backend); err != nil {
return trace.NewAggregate(fnErr, err)
}

Expand Down
Loading