diff --git a/pkg/cvo/cvo.go b/pkg/cvo/cvo.go index 56e60e9a6f..e5325d0b34 100644 --- a/pkg/cvo/cvo.go +++ b/pkg/cvo/cvo.go @@ -82,9 +82,6 @@ type Operator struct { // releaseCreated, if set, is the timestamp of the current update. releaseCreated time.Time - // restConfig is used to create resourcebuilder. - restConfig *rest.Config - client clientset.Interface kubeClient kubernetes.Interface eventRecorder record.EventRecorder @@ -132,6 +129,7 @@ func New( cvInformer configinformersv1.ClusterVersionInformer, coInformer configinformersv1.ClusterOperatorInformer, restConfig *rest.Config, + burstRestConfig *rest.Config, client clientset.Interface, kubeClient kubernetes.Interface, enableMetrics bool, @@ -151,7 +149,6 @@ func New( payloadDir: overridePayloadDir, defaultUpstreamServer: "https://api.openshift.com/api/upgrades_info/v1/graph", - restConfig: restConfig, client: client, kubeClient: kubeClient, eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}), @@ -162,7 +159,7 @@ func New( optr.configSync = NewSyncWorker( optr.defaultPayloadRetriever(), - NewResourceBuilder(optr.restConfig, coInformer.Lister()), + NewResourceBuilder(restConfig, burstRestConfig, coInformer.Lister()), minimumInterval, wait.Backoff{ Duration: time.Second * 10, @@ -468,25 +465,35 @@ func (optr *Operator) SetSyncWorkerForTesting(worker ConfigSyncWorker) { // resourceBuilder provides the default builder implementation for the operator. // It is abstracted for testing. type resourceBuilder struct { - config *rest.Config - modifier resourcebuilder.MetaV1ObjectModifierFunc + config *rest.Config + burstConfig *rest.Config + modifier resourcebuilder.MetaV1ObjectModifierFunc clusterOperators internal.ClusterOperatorsGetter } // NewResourceBuilder creates the default resource builder implementation. -func NewResourceBuilder(config *rest.Config, clusterOperators configlistersv1.ClusterOperatorLister) payload.ResourceBuilder { - return &resourceBuilder{config: config, clusterOperators: clusterOperators} +func NewResourceBuilder(config, burstConfig *rest.Config, clusterOperators configlistersv1.ClusterOperatorLister) payload.ResourceBuilder { + return &resourceBuilder{ + config: config, + burstConfig: burstConfig, + clusterOperators: clusterOperators, + } } -func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface, error) { +func (b *resourceBuilder) builderFor(m *lib.Manifest, state payload.State) (resourcebuilder.Interface, error) { + config := b.config + if state == payload.InitializingPayload { + config = b.burstConfig + } + if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") { return internal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil } if resourcebuilder.Mapper.Exists(m.GVK) { - return resourcebuilder.New(resourcebuilder.Mapper, b.config, *m) + return resourcebuilder.New(resourcebuilder.Mapper, config, *m) } - client, err := dynamicclient.New(b.config, m.GVK, m.Object().GetNamespace()) + client, err := dynamicclient.New(config, m.GVK, m.Object().GetNamespace()) if err != nil { return nil, err } @@ -494,7 +501,7 @@ func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface } func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error { - builder, err := b.BuilderFor(m) + builder, err := b.builderFor(m, state) if err != nil { return err } diff --git a/pkg/cvo/internal/operatorstatus.go b/pkg/cvo/internal/operatorstatus.go index 69cbd2dceb..2f4dd8e633 100644 --- a/pkg/cvo/internal/operatorstatus.go +++ b/pkg/cvo/internal/operatorstatus.go @@ -99,7 +99,11 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error { if b.modifier != nil { b.modifier(os) } - ctxWithTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute) + timeout := 1 * time.Minute + if b.mode == resourcebuilder.InitializingMode { + timeout = 6 * time.Minute + } + ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout) defer cancel() return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os, b.mode) } @@ -179,11 +183,19 @@ func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration, failing = false } } - // if we're at the correct version, and available, and not failing, we are done - // if we're available, not failing, and not progressing, we're also done - // TODO: remove progressing once all cluster operators report expected versions - if available && (!progressing || len(expected.Status.Versions) > 0) && !failing { - return true, nil + switch mode { + case resourcebuilder.InitializingMode: + // during initialization we allow failing as long as the component goes available + if available && (!progressing || len(expected.Status.Versions) > 0) { + return true, nil + } + default: + // if we're at the correct version, and available, and not failing, we are done + // if we're available, not failing, and not progressing, we're also done + // TODO: remove progressing once all cluster operators report expected versions + if available && (!progressing || len(expected.Status.Versions) > 0) && !failing { + return true, nil + } } if c := resourcemerge.FindOperatorStatusCondition(actual.Status.Conditions, configv1.OperatorFailing); c != nil && c.Status == configv1.ConditionTrue { diff --git a/pkg/cvo/sync_test.go b/pkg/cvo/sync_test.go index 7fdb052d0a..0d10224223 100644 --- a/pkg/cvo/sync_test.go +++ b/pkg/cvo/sync_test.go @@ -121,7 +121,7 @@ func Test_SyncWorker_apply(t *testing.T) { worker := &SyncWorker{} worker.backoff.Steps = 3 - worker.builder = NewResourceBuilder(nil, nil) + worker.builder = NewResourceBuilder(nil, nil, nil) ctx := context.Background() worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()}) test.check(t, r.actions) diff --git a/pkg/cvo/sync_worker.go b/pkg/cvo/sync_worker.go index 2d08744647..8f7b571c82 100644 --- a/pkg/cvo/sync_worker.go +++ b/pkg/cvo/sync_worker.go @@ -429,17 +429,27 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w } var tasks []*payload.Task + backoff := w.backoff + if backoff.Steps > 1 && work.State == payload.InitializingPayload { + backoff = wait.Backoff{Steps: 4, Factor: 2, Duration: time.Second} + } for i := range payloadUpdate.Manifests { tasks = append(tasks, &payload.Task{ Index: i + 1, Total: total, Manifest: &payloadUpdate.Manifests[i], - Backoff: w.backoff, + Backoff: backoff, }) } graph := payload.NewTaskGraph(tasks) graph.Split(payload.SplitOnJobs) - graph.Parallelize(payload.ByNumberAndComponent) + if work.State == payload.InitializingPayload { + // get the payload out via brute force + maxWorkers = len(tasks) + graph.Parallelize(payload.FlattenByNumberAndComponent) + } else { + graph.Parallelize(payload.ByNumberAndComponent) + } // update each object err := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error { diff --git a/pkg/cvo/testdata/payloadtest/release-manifests/file b/pkg/cvo/testdata/payloadtest/release-manifests/0000_10_a_file similarity index 100% rename from pkg/cvo/testdata/payloadtest/release-manifests/file rename to pkg/cvo/testdata/payloadtest/release-manifests/0000_10_a_file diff --git a/pkg/cvo/testdata/payloadtest/release-manifests/file.json b/pkg/cvo/testdata/payloadtest/release-manifests/0000_10_a_file.json similarity index 100% rename from pkg/cvo/testdata/payloadtest/release-manifests/file.json rename to pkg/cvo/testdata/payloadtest/release-manifests/0000_10_a_file.json diff --git a/pkg/cvo/testdata/payloadtest/release-manifests/file.yaml b/pkg/cvo/testdata/payloadtest/release-manifests/0000_10_a_file.yaml similarity index 100% rename from pkg/cvo/testdata/payloadtest/release-manifests/file.yaml rename to pkg/cvo/testdata/payloadtest/release-manifests/0000_10_a_file.yaml diff --git a/pkg/cvo/testdata/payloadtest/release-manifests/file.yml b/pkg/cvo/testdata/payloadtest/release-manifests/0000_10_a_file.yml similarity index 100% rename from pkg/cvo/testdata/payloadtest/release-manifests/file.yml rename to pkg/cvo/testdata/payloadtest/release-manifests/0000_10_a_file.yml diff --git a/pkg/payload/payload_test.go b/pkg/payload/payload_test.go index 1c83985b1d..376d3f1586 100644 --- a/pkg/payload/payload_test.go +++ b/pkg/payload/payload_test.go @@ -48,8 +48,8 @@ func Test_loadUpdatePayload(t *testing.T) { ManifestHash: "6GC9TkkG9PA=", Manifests: []lib.Manifest{ { - OriginalFilename: "file.json", - Raw: mustRead(filepath.Join("..", "cvo", "testdata", "payloadtest", "release-manifests", "file.json")), + OriginalFilename: "0000_10_a_file.json", + Raw: mustRead(filepath.Join("..", "cvo", "testdata", "payloadtest", "release-manifests", "0000_10_a_file.json")), GVK: schema.GroupVersionKind{ Kind: "Test", Version: "v1", @@ -65,7 +65,7 @@ func Test_loadUpdatePayload(t *testing.T) { }, }, { - OriginalFilename: "file.yaml", + OriginalFilename: "0000_10_a_file.yaml", Raw: []byte(`{"apiVersion":"v1","kind":"Test","metadata":{"name":"file-yaml"}}`), GVK: schema.GroupVersionKind{ Kind: "Test", @@ -82,7 +82,7 @@ func Test_loadUpdatePayload(t *testing.T) { }, }, { - OriginalFilename: "file.yml", + OriginalFilename: "0000_10_a_file.yml", Raw: []byte(`{"apiVersion":"v1","kind":"Test","metadata":{"name":"file-yml"}}`), GVK: schema.GroupVersionKind{ Kind: "Test", diff --git a/pkg/payload/task_graph.go b/pkg/payload/task_graph.go index 2738de0170..eb39592c1e 100644 --- a/pkg/payload/task_graph.go +++ b/pkg/payload/task_graph.go @@ -78,6 +78,54 @@ func ByNumberAndComponent(tasks []*Task) [][]*TaskNode { return buckets } +// FlattenByNumberAndComponent creates parallelization for tasks whose original filenames are of the form +// 0000_NN_NAME_* - files that share 0000_NN_NAME_ are run in serial, but chunks of files that have +// different 0000_NN_NAME can be run in parallel. This splitter does *not* preserve ordering within run +// levels and is intended only for use cases where order is not important. +func FlattenByNumberAndComponent(tasks []*Task) [][]*TaskNode { + if len(tasks) <= 1 { + return nil + } + count := len(tasks) + matches := make([][]string, 0, count) + for i := 0; i < len(tasks); i++ { + matches = append(matches, reMatchPattern.FindStringSubmatch(tasks[i].Manifest.OriginalFilename)) + } + + var lastNode *TaskNode + var groups []*TaskNode + for i := 0; i < count; { + matchBase := matches[i] + j := i + 1 + for ; j < count; j++ { + matchNext := matches[j] + if matchBase == nil || matchNext == nil || matchBase[groupNumber] != matchNext[groupNumber] { + break + } + if matchBase[groupComponent] != matchNext[groupComponent] { + groups = append(groups, &TaskNode{Tasks: tasks[i:j]}) + i = j + } + matchBase = matchNext + } + if len(groups) > 0 { + groups = append(groups, &TaskNode{Tasks: tasks[i:j]}) + i = j + lastNode = nil + continue + } + if lastNode == nil { + lastNode = &TaskNode{Tasks: append([]*Task(nil), tasks[i:j]...)} + i = j + groups = append(groups, lastNode) + continue + } + lastNode.Tasks = append(lastNode.Tasks, tasks[i:j]...) + i = j + } + return [][]*TaskNode{groups} +} + type TaskNode struct { In []int Tasks []*Task diff --git a/pkg/payload/task_graph_test.go b/pkg/payload/task_graph_test.go index 61d3fe9e1c..9ef698c429 100644 --- a/pkg/payload/task_graph_test.go +++ b/pkg/payload/task_graph_test.go @@ -233,6 +233,143 @@ func TestByNumberAndComponent(t *testing.T) { } } +func TestFlattenByNumberAndComponent(t *testing.T) { + tasks := func(names ...string) []*Task { + var arr []*Task + for _, name := range names { + arr = append(arr, &Task{Manifest: &lib.Manifest{OriginalFilename: name}}) + } + return arr + } + tests := []struct { + name string + tasks []*Task + want [][]*TaskNode + }{ + { + name: "empty tasks", + tasks: tasks(), + want: nil, + }, + { + name: "no grouping possible", + tasks: tasks("a"), + want: nil, + }, + { + name: "no recognizable groups", + tasks: tasks("a", "b", "c"), + want: [][]*TaskNode{ + { + &TaskNode{Tasks: tasks("a")}, + &TaskNode{Tasks: tasks("b")}, + &TaskNode{Tasks: tasks("c")}, + }, + }, + }, + { + name: "single grouped item", + tasks: tasks("0000_01_x-y-z_file1"), + want: nil, + }, + { + name: "multiple grouped items in single node", + tasks: tasks("0000_01_x-y-z_file1", "0000_01_x-y-z_file2"), + want: [][]*TaskNode{ + { + &TaskNode{Tasks: tasks("0000_01_x-y-z_file1", "0000_01_x-y-z_file2")}, + }, + }, + }, + { + tasks: tasks("a", "0000_01_x-y-z_file1", "c"), + want: [][]*TaskNode{ + { + &TaskNode{Tasks: tasks("a")}, + &TaskNode{Tasks: tasks("0000_01_x-y-z_file1")}, + &TaskNode{Tasks: tasks("c")}, + }, + }, + }, + { + tasks: tasks("0000_01_x-y-z_file1", "0000_01_x-y-z_file2"), + want: [][]*TaskNode{ + { + &TaskNode{Tasks: tasks("0000_01_x-y-z_file1", "0000_01_x-y-z_file2")}, + }, + }, + }, + { + tasks: tasks("0000_01_a-b-c_file1", "0000_01_x-y-z_file2"), + want: [][]*TaskNode{ + { + &TaskNode{Tasks: tasks("0000_01_a-b-c_file1")}, + &TaskNode{Tasks: tasks("0000_01_x-y-z_file2")}, + }, + }, + }, + { + tasks: tasks( + "0000_01_a-b-c_file1", + "0000_01_x-y-z_file1", + "0000_01_x-y-z_file2", + "a", + "0000_01_x-y-z_file2", + "0000_01_x-y-z_file3", + ), + want: [][]*TaskNode{ + { + &TaskNode{Tasks: tasks( + "0000_01_a-b-c_file1", + )}, + &TaskNode{Tasks: tasks( + "0000_01_x-y-z_file1", + "0000_01_x-y-z_file2", + )}, + &TaskNode{Tasks: tasks( + "a", + )}, + &TaskNode{Tasks: tasks( + "0000_01_x-y-z_file2", + "0000_01_x-y-z_file3", + )}, + }, + }, + }, + { + tasks: tasks( + "0000_01_a-b-c_file1", + "0000_01_x-y-z_file1", + "0000_01_x-y-z_file2", + "0000_02_x-y-z_file2", + "0000_02_x-y-z_file3", + ), + want: [][]*TaskNode{ + { + &TaskNode{Tasks: tasks( + "0000_01_a-b-c_file1", + )}, + &TaskNode{Tasks: tasks( + "0000_01_x-y-z_file1", + "0000_01_x-y-z_file2", + )}, + &TaskNode{Tasks: tasks( + "0000_02_x-y-z_file2", + "0000_02_x-y-z_file3", + )}, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := FlattenByNumberAndComponent(tt.tasks); !reflect.DeepEqual(got, tt.want) { + t.Fatalf("%s", diff.ObjectReflectDiff(tt.want, got)) + } + }) + } +} + func Test_TaskGraph_real(t *testing.T) { path := os.Getenv("TEST_GRAPH_PATH") if len(path) == 0 { diff --git a/pkg/start/start.go b/pkg/start/start.go index a2f9dcfe71..cb5f6c3341 100644 --- a/pkg/start/start.go +++ b/pkg/start/start.go @@ -273,10 +273,14 @@ func newClientBuilder(kubeconfig string) (*ClientBuilder, error) { }, nil } -func increaseQPS(config *rest.Config) { +func defaultQPS(config *rest.Config) { config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(20, 40) } +func highQPS(config *rest.Config) { + config.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(40, 80) +} + func useProtobuf(config *rest.Config) { config.AcceptContentTypes = "application/vnd.kubernetes.protobuf,application/json" config.ContentType = "application/vnd.kubernetes.protobuf" @@ -313,7 +317,8 @@ func (o *Options) NewControllerContext(cb *ClientBuilder) *Context { resyncPeriod(o.ResyncInterval)(), cvInformer.Config().V1().ClusterVersions(), sharedInformers.Config().V1().ClusterOperators(), - cb.RestConfig(increaseQPS), + cb.RestConfig(defaultQPS), + cb.RestConfig(highQPS), cb.ClientOrDie(o.Namespace), cb.KubeClientOrDie(o.Namespace, useProtobuf), o.EnableMetrics, diff --git a/pkg/start/start_integration_test.go b/pkg/start/start_integration_test.go index e11b4e39cb..44e1f50f7c 100644 --- a/pkg/start/start_integration_test.go +++ b/pkg/start/start_integration_test.go @@ -230,7 +230,7 @@ func TestIntegrationCVO_initializeAndUpgrade(t *testing.T) { options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) stopCh := make(chan struct{}) @@ -381,7 +381,7 @@ func TestIntegrationCVO_initializeAndHandleError(t *testing.T) { options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) stopCh := make(chan struct{}) @@ -487,7 +487,7 @@ func TestIntegrationCVO_gracefulStepDown(t *testing.T) { options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(&mapPayloadRetriever{}, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) lock, err := createResourceLock(cb, ns, ns) @@ -657,7 +657,7 @@ metadata: options.EnableMetrics = false controllers := options.NewControllerContext(cb) - worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) + worker := cvo.NewSyncWorker(retriever, cvo.NewResourceBuilder(cfg, cfg, nil), 5*time.Second, wait.Backoff{Steps: 3}).(*cvo.SyncWorker) controllers.CVO.SetSyncWorkerForTesting(worker) stopCh := make(chan struct{})