Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 60 additions & 16 deletions lib/backend/etcdbk/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"os"
"sort"
"strings"
"sync"
"time"

"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -265,6 +266,7 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke
CleanupInterval: utils.SeventhJitter(time.Minute * 2),
})
if err != nil {
cancel()
return nil, trace.Wrap(err)
}

Expand All @@ -285,28 +287,19 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke
}

// Check that the etcd nodes are at least the minimum version supported
if err = b.reconnect(ctx); err != nil {
if err = b.reconnect(b.ctx); err != nil {
b.Close()
return nil, trace.Wrap(err)
}
timeout, cancel := context.WithTimeout(ctx, time.Second*3*time.Duration(len(cfg.Nodes)))
defer cancel()
for _, n := range cfg.Nodes {
status, err := b.clients.Next().Status(timeout, n)
if err != nil {
return nil, trace.Wrap(err)
}

ver := semver.New(status.Version)
min := semver.New(teleport.MinimumEtcdVersion)
if ver.LessThan(*min) {
return nil, trace.BadParameter("unsupported version of etcd %v for node %v, must be %v or greater",
status.Version, n, teleport.MinimumEtcdVersion)
}
if err := b.checkVersion(b.ctx); err != nil {
b.Close()
return nil, trace.Wrap(err)
}

// Reconnect the etcd client to work around a data race in their code.
// Upstream fix: https://github.com/etcd-io/etcd/pull/12992
if err = b.reconnect(ctx); err != nil {
if err = b.reconnect(b.ctx); err != nil {
b.Close()
return nil, trace.Wrap(err)
}
go b.asyncWatch()
Expand All @@ -315,6 +308,57 @@ func New(ctx context.Context, params backend.Params, opts ...Option) (*EtcdBacke
return b, nil
}

func (b *EtcdBackend) checkVersion(ctx context.Context) error {
// scope version check to one third the default I/O timeout since slowness that is
// anywhere near the default timeout is going to cause systemic issues.
ctx, cancel := context.WithTimeout(ctx, apidefaults.DefaultIOTimeout/3)

results := make(chan error, len(b.cfg.Nodes))

var wg sync.WaitGroup
for _, nn := range b.cfg.Nodes {
wg.Add(1)
go func(n string) (err error) {
defer func() {
results <- err
wg.Done()
}()
status, err := b.clients.Next().Status(ctx, n)
if err != nil {
return trace.Wrap(err)
}

ver, err := semver.NewVersion(status.Version)
if err != nil {
return trace.BadParameter("failed to parse etcd version %q: %v", status.Version, err)
}

min := semver.New(teleport.MinimumEtcdVersion)
if ver.LessThan(*min) {
return trace.BadParameter("unsupported version of etcd %v for node %v, must be %v or greater",
status.Version, n, teleport.MinimumEtcdVersion)
}

return nil
}(nn)
}

// wait for results
var err error
for range b.cfg.Nodes {
err = <-results
if err == nil {
// stop on first success, we don't care about all endpoints
// being healthy, just that at least one is.
break
}
}

cancel()
wg.Wait()
return trace.Wrap(err)
}

// Validate checks if all the parameters are present/valid
func (cfg *Config) Validate() error {
if len(cfg.Key) == 0 {
Expand Down