Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion integration/proxy/automaticupgrades_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/integration/helpers"
"github.com/gravitational/teleport/integrations/kube-agent-updater/pkg/basichttp"
"github.com/gravitational/teleport/integrations/kube-agent-updater/pkg/constants"
Expand All @@ -41,7 +43,8 @@ import (
)

func createProxyWithChannels(t *testing.T, channels automaticupgrades.Channels) string {
require.NoError(t, channels.CheckAndSetDefaults())
features := proto.Features{}
require.NoError(t, channels.CheckAndSetDefaults(features))
testDir := t.TempDir()

cfg := helpers.InstanceConfig{
Expand Down Expand Up @@ -84,14 +87,18 @@ func TestVersionServer(t *testing.T) {

staticChannel := "static/ok"
staticHighChannel := "static/high"
staticNoVersionChannel := "static/none"
forwardChannel := "forward/ok"
forwardHighChannel := "forward/high"
forwardNoVersionChannel := "forward/none"
forwardPath := "/version-server/"

upstreamServer := basichttp.NewServerMock(forwardPath + constants.VersionPath)
upstreamServer.SetResponse(t, http.StatusOK, testVersion)
upstreamHighServer := basichttp.NewServerMock(forwardPath + constants.VersionPath)
upstreamHighServer.SetResponse(t, http.StatusOK, testVersionMajorTooHigh)
upstreamNoVersionServer := basichttp.NewServerMock(forwardPath + constants.VersionPath)
upstreamNoVersionServer.SetResponse(t, http.StatusOK, constants.NoVersion)

channels := automaticupgrades.Channels{
staticChannel: {
Expand All @@ -100,12 +107,18 @@ func TestVersionServer(t *testing.T) {
staticHighChannel: {
StaticVersion: testVersionMajorTooHigh,
},
staticNoVersionChannel: {
StaticVersion: constants.NoVersion,
},
forwardChannel: {
ForwardURL: upstreamServer.Srv.URL + forwardPath,
},
forwardHighChannel: {
ForwardURL: upstreamHighServer.Srv.URL + forwardPath,
},
forwardNoVersionChannel: {
ForwardURL: upstreamNoVersionServer.Srv.URL + forwardPath,
},
}

proxyAddr := createProxyWithChannels(t, channels)
Expand All @@ -115,6 +128,7 @@ func TestVersionServer(t *testing.T) {
}
httpClient := http.Client{Transport: tr}

// Test execution
tests := []struct {
name string
channel string
Expand All @@ -128,6 +142,11 @@ func TestVersionServer(t *testing.T) {
{
name: "static version too high",
channel: staticHighChannel,
expectedResponse: teleport.Version,
},
{
name: "static version none",
channel: staticNoVersionChannel,
expectedResponse: constants.NoVersion,
},
{
Expand All @@ -138,6 +157,11 @@ func TestVersionServer(t *testing.T) {
{
name: "forward version too high",
channel: forwardHighChannel,
expectedResponse: teleport.Version,
},
{
name: "forward version none",
channel: forwardNoVersionChannel,
expectedResponse: constants.NoVersion,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@ package version

import (
"context"
"fmt"
"strings"

"github.com/gravitational/teleport/integrations/kube-agent-updater/pkg/constants"
)

// StaticGetter is a fake version.Getter that return a static answer. This is used
Expand All @@ -37,6 +40,13 @@ func (v StaticGetter) GetVersion(_ context.Context) (string, error) {

// NewStaticGetter creates a StaticGetter
func NewStaticGetter(version string, err error) Getter {
if version == constants.NoVersion {
return StaticGetter{
version: "",
err: &NoNewVersionError{Message: fmt.Sprintf("target version set to '%s'", constants.NoVersion)},
}
}

semVersion := version
if semVersion != "" && !strings.HasPrefix(semVersion, "v") {
semVersion = "v" + version
Expand Down
3 changes: 2 additions & 1 deletion integrations/kube-agent-updater/pkg/version/versionget.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import (
// Getter gets the target image version for an external source. It should cache
// the result to reduce io and avoid potential rate-limits and is safe to call
// multiple times over a short period.
// If the version source intentionally returns no version, a NoNewVersionError is
// returned.
type Getter interface {
GetVersion(context.Context) (string, error)
}

// ValidVersionChange receives the current version and the candidate next version
// and evaluates if the version transition is valid.
func ValidVersionChange(ctx context.Context, current, next string) bool {
// TODO: clarify rollback constraints regarding previous version and add a "previous" parameter
log := ctrllog.FromContext(ctx).V(1)
// Cannot upgrade to a non-valid version
if !semver.IsValid(next) {
Expand Down
123 changes: 120 additions & 3 deletions lib/automaticupgrades/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,60 @@ package automaticupgrades
import (
"context"
"net/url"
"strconv"
"strings"
"sync"

"github.com/gravitational/trace"
"golang.org/x/mod/semver"

"github.com/gravitational/teleport"
"github.com/gravitational/teleport/api/client/proto"
"github.com/gravitational/teleport/integrations/kube-agent-updater/pkg/maintenance"
"github.com/gravitational/teleport/integrations/kube-agent-updater/pkg/version"
)

const (
DefaultChannelName = "default"
DefaultCloudChannelName = "stable/cloud"
stableCloudVersionBaseURL = "https://updates.releases.teleport.dev/v1/stable/cloud"
Comment thread
marcoandredinis marked this conversation as resolved.
Outdated
)

// Channels is a map of Channel objects.
type Channels map[string]*Channel

// CheckAndSetDefaults checks that every Channel is valid and initializes them.
func (c Channels) CheckAndSetDefaults() error {
// It also creates default channels if they are not already present.
// Cloud must have the `default` and `stable/cloud` channels.
// Self-hosted with automatic upgrades must have the `default` channel.
func (c Channels) CheckAndSetDefaults(features proto.Features) error {
defaultChannel, err := NewDefaultChannel()
if err != nil {
return trace.Wrap(err)
}

// If we're on cloud, we need at least "cloud/stable" and "default"
if features.GetCloud() {
if _, ok := c[DefaultCloudChannelName]; !ok {
c[DefaultCloudChannelName] = defaultChannel
}
if _, ok := c[DefaultChannelName]; !ok {
c[DefaultChannelName] = c[DefaultCloudChannelName]
}
}

// If we're on self-hosted with automatic upgrades, we need a "default" channel
// We don't want to break existing setups so we'll automatically point to the
// `cloud/stable` channel.
// TODO: in v15 make this a hard requirement and error if `default` is not set
// and automatic upgrades are enabled
Comment on lines 69 to 70
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is for v15
So, I guess we want to remove this and add it back for the backport PRs

if features.GetAutomaticUpgrades() {
if _, ok := c[DefaultChannelName]; !ok {
c[DefaultChannelName] = defaultChannel
}
}

var errs []error
var err error
for name, channel := range c {
// Wrapping is not mandatory here, but it adds the channel name in the
// error, which makes troubleshooting easier.
Expand All @@ -46,6 +86,16 @@ func (c Channels) CheckAndSetDefaults() error {
return trace.NewAggregate(errs...)
}

// DefaultVersion returns the version served by the default upgrade channel.
func (c Channels) DefaultVersion(ctx context.Context) (string, error) {
channel, ok := c[DefaultChannelName]
if !ok {
return "", trace.NotFound("default version channel not found")
}
targetVersion, err := channel.GetVersion(ctx)
return targetVersion, trace.Wrap(err)
}

// Channel describes an automatic update channel configuration.
// It can be configured to serve a static version, or forward version requests
// to an upstream version server. Forwarded results are cached for 1 minute.
Expand All @@ -61,6 +111,9 @@ type Channel struct {
versionGetter version.Getter
// criticalTrigger gets the criticality of the channel. It is populated by CheckAndSetDefaults.
criticalTrigger maintenance.Trigger
// teleportMajor stores the current teleport major for comparison.
// This field is initialized during CheckAndSetDefaults.
teleportMajor int
}

// CheckAndSetDefaults checks that the Channel configuration is valid and inits
Expand All @@ -83,17 +136,81 @@ func (c *Channel) CheckAndSetDefaults() error {
default:
return trace.BadParameter("either ForwardURL or StaticVersion must be set")
}

var err error
c.teleportMajor, err = parseMajorFromVersionString(teleport.Version)
if err != nil {
return trace.Wrap(err, "failed to process teleport version")
}

return nil
}

// GetVersion returns the current version of the channel. If io is involved,
// this function implements cache and is safe to call frequently.
// If the target version major is higher than the Teleport version (the one
// in the Teleport binary, this is usually the proxy version), this function
// returns the Teleport version instead.
// If the version source intentionally did not specify a version, a
// NoNewVersionError is returned.
func (c *Channel) GetVersion(ctx context.Context) (string, error) {
return c.versionGetter.GetVersion(ctx)
targetVersion, err := c.versionGetter.GetVersion(ctx)
if err != nil {
return "", trace.Wrap(err)
}

targetMajor, err := parseMajorFromVersionString(targetVersion)
if err != nil {
return "", trace.Wrap(err, "failed to process target version")
}

// The target version is officially incompatible with our version,
// we prefer returning our version rather than having a broken client
if targetMajor > c.teleportMajor {
return teleport.Version, nil
}

return targetVersion, nil
}

// GetCritical returns the current criticality of the channel. If io is involved,
// this function implements cache and is safe to call frequently.
func (c *Channel) GetCritical(ctx context.Context) (bool, error) {
return c.criticalTrigger.CanStart(ctx, nil)
}

// NewDefaultChannel creates a default automatic upgrade channel
// It looks up the environment variable, and if not found uses the default
// base URL. This default channel can be used in the proxy (to back its own version server)
// or in other Teleport process such as integration services deploying and
// updating teleport agents.
func NewDefaultChannel() (*Channel, error) {
return sync.OnceValues[*Channel, error](
func() (*Channel, error) {
forwardURL := GetChannel()
if forwardURL == "" {
forwardURL = stableCloudVersionBaseURL
}
defaultChannel := &Channel{
ForwardURL: forwardURL,
}
if err := defaultChannel.CheckAndSetDefaults(); err != nil {
return nil, trace.Wrap(err)
}
return defaultChannel, nil
})()
}

func parseMajorFromVersionString(v string) (int, error) {
v, err := version.EnsureSemver(v)
if err != nil {
return 0, trace.Wrap(err, "invalid semver: %s", v)
}
majorStr := semver.Major(v)
if majorStr == "" {
return 0, trace.BadParameter("cannot detect version major")
}

major, err := strconv.Atoi(strings.TrimPrefix(majorStr, "v"))
return major, trace.Wrap(err, "cannot convert version major to int")
}
Loading