-
Notifications
You must be signed in to change notification settings - Fork 214
Targeted edge blocking #663
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
6eb91ab
8263002
c9dd479
aa16795
fa6ff37
e3a2105
ca186ed
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,10 +7,14 @@ import ( | |
| "io/ioutil" | ||
| "net/http" | ||
| "net/url" | ||
| "sort" | ||
| "strings" | ||
| "time" | ||
|
|
||
| "github.com/blang/semver/v4" | ||
| "github.com/google/uuid" | ||
| configv1 "github.com/openshift/api/config/v1" | ||
| "github.com/openshift/cluster-version-operator/pkg/clusterconditions" | ||
| "k8s.io/klog/v2" | ||
| ) | ||
|
|
||
|
|
@@ -35,9 +39,6 @@ func NewClient(id uuid.UUID, transport *http.Transport) Client { | |
| return Client{id: id, transport: transport} | ||
| } | ||
|
|
||
| // Update is a single node from the update graph. | ||
| type Update node | ||
|
|
||
| // Error is returned when are unable to get updates. | ||
| type Error struct { | ||
| // Reason is the reason suggested for the ClusterOperator status condition. | ||
|
|
@@ -56,14 +57,17 @@ func (err *Error) Error() string { | |
| } | ||
|
|
||
| // GetUpdates fetches the current and next-applicable update payloads from the specified | ||
| // upstream Cincinnati stack given the current version and channel. The next- | ||
| // applicable updates are determined by downloading the update graph, finding | ||
| // the current version within that graph (typically the root node), and then | ||
| // finding all of the children. These children are the available updates for | ||
| // the current version and their payloads indicate from where the actual update | ||
| // image can be downloaded. | ||
| func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, channel string, version semver.Version) (Update, []Update, error) { | ||
| var current Update | ||
| // upstream Cincinnati stack given the current version and channel. The command: | ||
| // | ||
| // 1. Downloads the update graph from the requested URI for the requested arch and channel. | ||
| // 2. Finds the current version entry under .nodes. | ||
| // 3. Finds recommended next-hop updates by searching .edges for updates from the current | ||
| // version. Returns a slice of target Releases with these unconditional recommendations. | ||
| // 4. Finds conditionally recommended next-hop updates by searching .conditionalEdges for | ||
| // updates from the current version. Returns a slice of ConditionalUpdates with these | ||
| // conditional recommendations. | ||
| func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, channel string, version semver.Version) (configv1.Release, []configv1.Release, []configv1.ConditionalUpdate, error) { | ||
| var current configv1.Release | ||
| // Prepare parametrized cincinnati query. | ||
| queryParams := uri.Query() | ||
| queryParams.Add("arch", arch) | ||
|
|
@@ -75,7 +79,7 @@ func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, chann | |
| // Download the update graph. | ||
| req, err := http.NewRequest("GET", uri.String(), nil) | ||
| if err != nil { | ||
| return current, nil, &Error{Reason: "InvalidRequest", Message: err.Error(), cause: err} | ||
| return current, nil, nil, &Error{Reason: "InvalidRequest", Message: err.Error(), cause: err} | ||
| } | ||
| req.Header.Add("Accept", GraphMediaType) | ||
| if c.transport != nil && c.transport.TLSClientConfig != nil { | ||
|
|
@@ -101,23 +105,23 @@ func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, chann | |
| defer cancel() | ||
| resp, err := client.Do(req.WithContext(timeoutCtx)) | ||
| if err != nil { | ||
| return current, nil, &Error{Reason: "RemoteFailed", Message: err.Error(), cause: err} | ||
| return current, nil, nil, &Error{Reason: "RemoteFailed", Message: err.Error(), cause: err} | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| if resp.StatusCode != http.StatusOK { | ||
| return current, nil, &Error{Reason: "ResponseFailed", Message: fmt.Sprintf("unexpected HTTP status: %s", resp.Status)} | ||
| return current, nil, nil, &Error{Reason: "ResponseFailed", Message: fmt.Sprintf("unexpected HTTP status: %s", resp.Status)} | ||
| } | ||
|
|
||
| // Parse the graph. | ||
| body, err := ioutil.ReadAll(resp.Body) | ||
| if err != nil { | ||
| return current, nil, &Error{Reason: "ResponseFailed", Message: err.Error(), cause: err} | ||
| return current, nil, nil, &Error{Reason: "ResponseFailed", Message: err.Error(), cause: err} | ||
| } | ||
|
|
||
| var graph graph | ||
| if err = json.Unmarshal(body, &graph); err != nil { | ||
| return current, nil, &Error{Reason: "ResponseInvalid", Message: err.Error(), cause: err} | ||
| return current, nil, nil, &Error{Reason: "ResponseInvalid", Message: err.Error(), cause: err} | ||
| } | ||
|
|
||
| // Find the current version within the graph. | ||
|
|
@@ -126,13 +130,19 @@ func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, chann | |
| for i, node := range graph.Nodes { | ||
| if version.EQ(node.Version) { | ||
| currentIdx = i | ||
| current = Update(graph.Nodes[i]) | ||
| found = true | ||
| current, err = convertRetrievedUpdateToRelease(graph.Nodes[i]) | ||
| if err != nil { | ||
| return current, nil, nil, &Error{ | ||
| Reason: "ResponseInvalid", | ||
| Message: fmt.Sprintf("invalid current node: %s", err), | ||
| } | ||
| } | ||
| break | ||
| } | ||
| } | ||
| if !found { | ||
| return current, nil, &Error{ | ||
| return current, nil, nil, &Error{ | ||
| Reason: "VersionNotFound", | ||
| Message: fmt.Sprintf("currently reconciling cluster version %s not found in the %q channel", version, channel), | ||
| } | ||
|
|
@@ -146,17 +156,98 @@ func (c Client) GetUpdates(ctx context.Context, uri *url.URL, arch string, chann | |
| } | ||
| } | ||
|
|
||
| var updates []Update | ||
| var updates []configv1.Release | ||
| for _, i := range nextIdxs { | ||
| updates = append(updates, Update(graph.Nodes[i])) | ||
| update, err := convertRetrievedUpdateToRelease(graph.Nodes[i]) | ||
| if err != nil { | ||
| return current, nil, nil, &Error{ | ||
| Reason: "ResponseInvalid", | ||
| Message: fmt.Sprintf("invalid recommended update node: %s", err), | ||
| } | ||
| } | ||
| updates = append(updates, update) | ||
| } | ||
|
|
||
| var conditionalUpdates []configv1.ConditionalUpdate | ||
| for _, conditionalEdges := range graph.ConditionalEdges { | ||
| for _, edge := range conditionalEdges.Edges { | ||
| if version.String() == edge.From { | ||
| var target *node | ||
| for i, node := range graph.Nodes { | ||
| if node.Version.String() == edge.To { | ||
| target = &graph.Nodes[i] | ||
| break | ||
| } | ||
| } | ||
| if target == nil { | ||
| return current, updates, nil, &Error{ | ||
| Reason: "ResponseInvalid", | ||
| Message: fmt.Sprintf("no node for conditional update %s", edge.To), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In the case target node is null, it seems
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Having |
||
| } | ||
| } | ||
| update, err := convertRetrievedUpdateToRelease(*target) | ||
| if err != nil { | ||
| return current, updates, nil, &Error{ | ||
| Reason: "ResponseInvalid", | ||
| Message: fmt.Sprintf("invalid conditional update node: %s", err), | ||
| } | ||
| } | ||
| conditionalUpdates = append(conditionalUpdates, configv1.ConditionalUpdate{ | ||
| Release: update, | ||
| Risks: conditionalEdges.Risks, | ||
| }) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| for i := len(updates) - 1; i >= 0; i-- { | ||
| for _, conditionalUpdate := range conditionalUpdates { | ||
| if conditionalUpdate.Release.Image == updates[i].Image { | ||
| klog.Warningf("Update to %s listed as both a conditional and unconditional update; preferring the conditional update.", conditionalUpdate.Release.Version) | ||
| updates = append(updates[:i], updates[i+1:]...) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| if len(updates) == 0 { | ||
| updates = nil | ||
| } | ||
|
|
||
| for i := len(conditionalUpdates) - 1; i >= 0; i-- { | ||
| for j, risk := range conditionalUpdates[i].Risks { | ||
| conditionalUpdates[i].Risks[j].MatchingRules, err = clusterconditions.PruneInvalid(ctx, risk.MatchingRules) | ||
| if len(conditionalUpdates[i].Risks[j].MatchingRules) == 0 { | ||
| klog.Warningf("Conditional update to %s, risk %q, has empty pruned matchingRules; dropping this target to avoid rejections when pushing to the Kubernetes API server. Pruning results: %s", conditionalUpdates[i].Release.Version, risk.Name, err) | ||
| conditionalUpdates = append(conditionalUpdates[:i], conditionalUpdates[i+1:]...) | ||
| } else if err != nil { | ||
| klog.Warningf("Conditional update to %s, risk %q, has pruned matchingRules (although other valid, recognized matchingRules were given, and are sufficient to keep the conditional update): %s", conditionalUpdates[i].Release.Version, risk.Name, err) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| targets := make(map[string]int, len(conditionalUpdates)) | ||
| for _, conditionalUpdate := range conditionalUpdates { | ||
| targets[conditionalUpdate.Release.Image]++ | ||
| } | ||
|
|
||
| return current, updates, nil | ||
| for i := len(conditionalUpdates) - 1; i >= 0; i-- { | ||
| if targets[conditionalUpdates[i].Release.Image] > 1 { | ||
| klog.Warningf("Upstream declares %d conditional updates to %s; dropping them all.", targets[conditionalUpdates[i].Release.Image], conditionalUpdates[i].Release.Version) | ||
wking marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| conditionalUpdates = append(conditionalUpdates[:i], conditionalUpdates[i+1:]...) | ||
| } | ||
| } | ||
|
|
||
| if len(conditionalUpdates) == 0 { | ||
| conditionalUpdates = nil | ||
| } | ||
|
|
||
| return current, updates, conditionalUpdates, nil | ||
| } | ||
|
|
||
| type graph struct { | ||
| Nodes []node | ||
| Edges []edge | ||
| Nodes []node | ||
| Edges []edge | ||
| ConditionalEdges []conditionalEdges `json:"conditionalEdges"` | ||
| } | ||
|
|
||
| type node struct { | ||
|
|
@@ -170,6 +261,16 @@ type edge struct { | |
| Destination int | ||
| } | ||
|
|
||
| type conditionalEdge struct { | ||
| From string `json:"from"` | ||
| To string `json:"to"` | ||
| } | ||
|
|
||
| type conditionalEdges struct { | ||
| Edges []conditionalEdge `json:"edges"` | ||
| Risks []configv1.ConditionalUpdateRisk `json:"risks"` | ||
| } | ||
|
|
||
| // UnmarshalJSON unmarshals an edge in the update graph. The edge's JSON | ||
| // representation is a two-element array of indices, but Go's representation is | ||
| // a struct with two elements so this custom unmarshal method is required. | ||
|
|
@@ -188,3 +289,22 @@ func (e *edge) UnmarshalJSON(data []byte) error { | |
|
|
||
| return nil | ||
| } | ||
|
|
||
| func convertRetrievedUpdateToRelease(update node) (configv1.Release, error) { | ||
| cvoUpdate := configv1.Release{ | ||
| Version: update.Version.String(), | ||
| Image: update.Image, | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is no check for cases where update.Image is null. It might be ok as the code to validate the version will be called at some point of time before the actual upgrade. However I wanted to point it out to see if I am missing anything. |
||
| if urlString, ok := update.Metadata["url"]; ok { | ||
| _, err := url.Parse(urlString) | ||
| if err != nil { | ||
| return cvoUpdate, fmt.Errorf("invalid URL for %s: %s", cvoUpdate.Version, err) | ||
| } | ||
| cvoUpdate.URL = configv1.URL(urlString) | ||
| } | ||
| if channels, ok := update.Metadata["io.openshift.upgrades.graph.release.channels"]; ok { | ||
| cvoUpdate.Channels = strings.Split(channels, ",") | ||
| sort.Strings(cvoUpdate.Channels) | ||
| } | ||
| return cvoUpdate, nil | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.