Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/clusterconditions/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type Mock struct {
}

// Valid returns an error popped from ValidQueue.
func (m *Mock) Valid(ctx context.Context, condition *configv1.ClusterCondition) error {
func (m *Mock) Valid(_ context.Context, condition *configv1.ClusterCondition) error {
m.Calls = append(m.Calls, Call{
When: time.Now(),
Method: "Valid",
Expand All @@ -61,7 +61,7 @@ func (m *Mock) Valid(ctx context.Context, condition *configv1.ClusterCondition)
}

// Match returns an error popped from MatchQueue.
func (m *Mock) Match(ctx context.Context, condition *configv1.ClusterCondition) (bool, error) {
func (m *Mock) Match(_ context.Context, condition *configv1.ClusterCondition) (bool, error) {
m.Calls = append(m.Calls, Call{
When: time.Now(),
Method: "Match",
Expand Down
2 changes: 1 addition & 1 deletion pkg/clusterconditions/promql/promql.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func NewPromQL(kubeClient kubernetes.Interface) *cache.Cache {
},
QueryTimeout: 5 * time.Minute,
},
MinBetweenMatches: 10 * time.Minute,
MinBetweenMatches: 1 * time.Second,
MinForCondition: time.Hour,
Expiration: 24 * time.Hour,
}
Expand Down
151 changes: 115 additions & 36 deletions pkg/cvo/availableupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,52 +45,91 @@ func (optr *Operator) syncAvailableUpdates(ctx context.Context, config *configv1
}

// updates are only checked at most once per minimumUpdateCheckInterval or if the generation changes
u := optr.getAvailableUpdates()
if u == nil {
optrAvailableUpdates := optr.getAvailableUpdates()
needFreshFetch := true
if optrAvailableUpdates == nil {
klog.V(2).Info("First attempt to retrieve available updates")
} else if !u.RecentlyChanged(optr.minimumUpdateCheckInterval) {
klog.V(2).Infof("Retrieving available updates again, because more than %s has elapsed since %s", optr.minimumUpdateCheckInterval, u.LastAttempt.Format(time.RFC3339))
} else if channel != u.Channel {
klog.V(2).Infof("Retrieving available updates again, because the channel has changed from %q to %q", u.Channel, channel)
} else if desiredArch != u.Architecture {
klog.V(2).Infof("Retrieving available updates again, because the architecture has changed from %q to %q", u.Architecture, desiredArch)
} else if upstream == u.Upstream || (upstream == optr.defaultUpstreamServer && u.Upstream == "") {
klog.V(2).Infof("Available updates were recently retrieved, with less than %s elapsed since %s, will try later.", optr.minimumUpdateCheckInterval, u.LastAttempt.Format(time.RFC3339))
return nil
optrAvailableUpdates = &availableUpdates{}
} else if !optrAvailableUpdates.RecentlyChanged(optr.minimumUpdateCheckInterval) {
klog.V(2).Infof("Retrieving available updates again, because more than %s has elapsed since %s", optr.minimumUpdateCheckInterval, optrAvailableUpdates.LastAttempt.Format(time.RFC3339))
} else if channel != optrAvailableUpdates.Channel {
klog.V(2).Infof("Retrieving available updates again, because the channel has changed from %q to %q", optrAvailableUpdates.Channel, channel)
} else if desiredArch != optrAvailableUpdates.Architecture {
klog.V(2).Infof("Retrieving available updates again, because the architecture has changed from %q to %q", optrAvailableUpdates.Architecture, desiredArch)
} else if upstream == optrAvailableUpdates.Upstream || (upstream == optr.defaultUpstreamServer && optrAvailableUpdates.Upstream == "") {
needsConditionalUpdateEval := false
for _, conditionalUpdate := range optrAvailableUpdates.ConditionalUpdates {
if recommended := meta.FindStatusCondition(conditionalUpdate.Conditions, "Recommended"); recommended == nil {
needsConditionalUpdateEval = true
break
} else if recommended.Status != metav1.ConditionTrue && recommended.Status != metav1.ConditionFalse {
needsConditionalUpdateEval = true
break
}
}
if !needsConditionalUpdateEval {
klog.V(2).Infof("Available updates were recently retrieved, with less than %s elapsed since %s, will try later.", optr.minimumUpdateCheckInterval, optrAvailableUpdates.LastAttempt.Format(time.RFC3339))
return nil
}
needFreshFetch = false
} else {
klog.V(2).Infof("Retrieving available updates again, because the upstream has changed from %q to %q", u.Upstream, config.Spec.Upstream)
klog.V(2).Infof("Retrieving available updates again, because the upstream has changed from %q to %q", optrAvailableUpdates.Upstream, config.Spec.Upstream)
}

transport, err := optr.getTransport()
if err != nil {
return err
}
if needFreshFetch {
transport, err := optr.getTransport()
if err != nil {
return err
}

userAgent := optr.getUserAgent()
userAgent := optr.getUserAgent()

current, updates, conditionalUpdates, condition := calculateAvailableUpdatesStatus(ctx, string(config.Spec.ClusterID),
transport, userAgent, upstream, desiredArch, currentArch, channel, optr.release.Version, optr.conditionRegistry)
current, updates, conditionalUpdates, condition := calculateAvailableUpdatesStatus(ctx, string(config.Spec.ClusterID),
transport, userAgent, upstream, desiredArch, currentArch, channel, optr.release.Version, optr.conditionRegistry)

if usedDefaultUpstream {
upstream = ""
// Populate conditions on conditional updates from operator state
for i := range optrAvailableUpdates.ConditionalUpdates {
for j := range conditionalUpdates {
if optrAvailableUpdates.ConditionalUpdates[i].Release.Image == conditionalUpdates[j].Release.Image {
conditionalUpdates[j].Conditions = optrAvailableUpdates.ConditionalUpdates[i].Conditions
break
}
}
}

if usedDefaultUpstream {
upstream = ""
}

optrAvailableUpdates.Upstream = upstream
optrAvailableUpdates.Channel = channel
optrAvailableUpdates.Architecture = desiredArch
optrAvailableUpdates.Current = current
optrAvailableUpdates.Updates = updates
optrAvailableUpdates.ConditionalUpdates = conditionalUpdates
optrAvailableUpdates.ConditionRegistry = optr.conditionRegistry
optrAvailableUpdates.Condition = condition
}

au := &availableUpdates{
Upstream: upstream,
Channel: config.Spec.Channel,
Architecture: desiredArch,
Current: current,
Updates: updates,
ConditionalUpdates: conditionalUpdates,
ConditionRegistry: optr.conditionRegistry,
Condition: condition,
optrAvailableUpdates.evaluateConditionalUpdates(ctx)

queueKey := optr.queueKey()
for _, conditionalUpdate := range optrAvailableUpdates.ConditionalUpdates {
if recommended := meta.FindStatusCondition(conditionalUpdate.Conditions, "Recommended"); recommended == nil {
klog.Warningf("Requeue available-update evaluation, because %q lacks a Recommended condition", conditionalUpdate.Release.Version)
optr.availableUpdatesQueue.AddAfter(queueKey, time.Second)
break
} else if recommended.Status != metav1.ConditionTrue && recommended.Status != metav1.ConditionFalse {
klog.V(2).Infof("Requeue available-update evaluation, because %q is %s=%s: %s: %s", conditionalUpdate.Release.Version, recommended.Type, recommended.Status, recommended.Reason, recommended.Message)
optr.availableUpdatesQueue.AddAfter(queueKey, time.Second)
break
}
}

au.evaluateConditionalUpdates(ctx)
optr.setAvailableUpdates(au)
optr.setAvailableUpdates(optrAvailableUpdates)

// requeue
optr.queue.Add(optr.queueKey())
// queue optr.sync() to update ClusterVersion status
optr.queue.Add(queueKey)
return nil
}

Expand Down Expand Up @@ -190,7 +229,37 @@ func (optr *Operator) setAvailableUpdates(u *availableUpdates) {
func (optr *Operator) getAvailableUpdates() *availableUpdates {
optr.statusLock.Lock()
defer optr.statusLock.Unlock()
return optr.availableUpdates

if optr.availableUpdates == nil {
return nil
}

u := &availableUpdates{
Upstream: optr.availableUpdates.Upstream,
Channel: optr.availableUpdates.Channel,
Architecture: optr.availableUpdates.Architecture,
LastAttempt: optr.availableUpdates.LastAttempt,
LastSyncOrConfigChange: optr.availableUpdates.LastSyncOrConfigChange,
Current: *optr.availableUpdates.Current.DeepCopy(),
ConditionRegistry: optr.availableUpdates.ConditionRegistry, // intentionally not a copy, to preserve cache state
Condition: optr.availableUpdates.Condition,
}

if optr.availableUpdates.Updates != nil {
u.Updates = make([]configv1.Release, 0, len(optr.availableUpdates.Updates))
for _, update := range optr.availableUpdates.Updates {
u.Updates = append(u.Updates, *update.DeepCopy())
}
}

if optr.availableUpdates.ConditionalUpdates != nil {
u.ConditionalUpdates = make([]configv1.ConditionalUpdate, 0, len(optr.availableUpdates.ConditionalUpdates))
for _, conditionalUpdate := range optr.availableUpdates.ConditionalUpdates {
u.ConditionalUpdates = append(u.ConditionalUpdates, *conditionalUpdate.DeepCopy())
}
}

return u
}

// getArchitecture returns the currently determined cluster architecture.
Expand Down Expand Up @@ -321,12 +390,22 @@ func (u *availableUpdates) evaluateConditionalUpdates(ctx context.Context) {
Reason: "AsExpected",
Message: "The update is recommended, because none of the conditional update risks apply to this cluster.",
})
u.Updates = append(u.Updates, conditionalUpdate.Release)
u.addUpdate(conditionalUpdate.Release)
}
u.ConditionalUpdates[i].Conditions = conditionalUpdate.Conditions
}
}

func (u *availableUpdates) addUpdate(release configv1.Release) {
for _, update := range u.Updates {
if update.Image == release.Image {
return
}
}

u.Updates = append(u.Updates, release)
}

func (u *availableUpdates) removeUpdate(image string) {
for i, update := range u.Updates {
if update.Image == image {
Expand Down
Loading