Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 20 additions & 13 deletions pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,6 @@ type Operator struct {
// releaseCreated, if set, is the timestamp of the current update.
releaseCreated time.Time

// restConfig is used to create resourcebuilder.
restConfig *rest.Config

client clientset.Interface
kubeClient kubernetes.Interface
eventRecorder record.EventRecorder
Expand Down Expand Up @@ -132,6 +129,7 @@ func New(
cvInformer configinformersv1.ClusterVersionInformer,
coInformer configinformersv1.ClusterOperatorInformer,
restConfig *rest.Config,
burstRestConfig *rest.Config,
client clientset.Interface,
kubeClient kubernetes.Interface,
enableMetrics bool,
Expand All @@ -151,7 +149,6 @@ func New(
payloadDir: overridePayloadDir,
defaultUpstreamServer: "https://api.openshift.com/api/upgrades_info/v1/graph",

restConfig: restConfig,
client: client,
kubeClient: kubeClient,
eventRecorder: eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: namespace}),
Expand All @@ -162,7 +159,7 @@ func New(

optr.configSync = NewSyncWorker(
optr.defaultPayloadRetriever(),
NewResourceBuilder(optr.restConfig, coInformer.Lister()),
NewResourceBuilder(restConfig, burstRestConfig, coInformer.Lister()),
minimumInterval,
wait.Backoff{
Duration: time.Second * 10,
Expand Down Expand Up @@ -468,33 +465,43 @@ func (optr *Operator) SetSyncWorkerForTesting(worker ConfigSyncWorker) {
// resourceBuilder provides the default builder implementation for the operator.
// It is abstracted for testing.
type resourceBuilder struct {
config *rest.Config
modifier resourcebuilder.MetaV1ObjectModifierFunc
config *rest.Config
burstConfig *rest.Config
modifier resourcebuilder.MetaV1ObjectModifierFunc

clusterOperators internal.ClusterOperatorsGetter
}

// NewResourceBuilder creates the default resource builder implementation.
func NewResourceBuilder(config *rest.Config, clusterOperators configlistersv1.ClusterOperatorLister) payload.ResourceBuilder {
return &resourceBuilder{config: config, clusterOperators: clusterOperators}
func NewResourceBuilder(config, burstConfig *rest.Config, clusterOperators configlistersv1.ClusterOperatorLister) payload.ResourceBuilder {
return &resourceBuilder{
config: config,
burstConfig: burstConfig,
clusterOperators: clusterOperators,
}
}

func (b *resourceBuilder) BuilderFor(m *lib.Manifest) (resourcebuilder.Interface, error) {
func (b *resourceBuilder) builderFor(m *lib.Manifest, state payload.State) (resourcebuilder.Interface, error) {
config := b.config
if state == payload.InitializingPayload {
config = b.burstConfig
}

if b.clusterOperators != nil && m.GVK == configv1.SchemeGroupVersion.WithKind("ClusterOperator") {
return internal.NewClusterOperatorBuilder(b.clusterOperators, *m), nil
}
if resourcebuilder.Mapper.Exists(m.GVK) {
return resourcebuilder.New(resourcebuilder.Mapper, b.config, *m)
return resourcebuilder.New(resourcebuilder.Mapper, config, *m)
}
client, err := dynamicclient.New(b.config, m.GVK, m.Object().GetNamespace())
client, err := dynamicclient.New(config, m.GVK, m.Object().GetNamespace())
if err != nil {
return nil, err
}
return internal.NewGenericBuilder(client, *m)
}

func (b *resourceBuilder) Apply(ctx context.Context, m *lib.Manifest, state payload.State) error {
builder, err := b.BuilderFor(m)
builder, err := b.builderFor(m, state)
if err != nil {
return err
}
Expand Down
24 changes: 18 additions & 6 deletions pkg/cvo/internal/operatorstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ func (b *clusterOperatorBuilder) Do(ctx context.Context) error {
if b.modifier != nil {
b.modifier(os)
}
ctxWithTimeout, cancel := context.WithTimeout(ctx, 1*time.Minute)
timeout := 1 * time.Minute
if b.mode == resourcebuilder.InitializingMode {
timeout = 6 * time.Minute
}
ctxWithTimeout, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
return waitForOperatorStatusToBeDone(ctxWithTimeout, 1*time.Second, b.client, os, b.mode)
}
Expand Down Expand Up @@ -179,11 +183,19 @@ func waitForOperatorStatusToBeDone(ctx context.Context, interval time.Duration,
failing = false
}
}
// if we're at the correct version, and available, and not failing, we are done
// if we're available, not failing, and not progressing, we're also done
// TODO: remove progressing once all cluster operators report expected versions
if available && (!progressing || len(expected.Status.Versions) > 0) && !failing {
return true, nil
switch mode {
case resourcebuilder.InitializingMode:
// during initialization we allow failing as long as the component goes available
if available && (!progressing || len(expected.Status.Versions) > 0) {
return true, nil
}
default:
// if we're at the correct version, and available, and not failing, we are done
// if we're available, not failing, and not progressing, we're also done
// TODO: remove progressing once all cluster operators report expected versions
if available && (!progressing || len(expected.Status.Versions) > 0) && !failing {
return true, nil
}
}

if c := resourcemerge.FindOperatorStatusCondition(actual.Status.Conditions, configv1.OperatorFailing); c != nil && c.Status == configv1.ConditionTrue {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func Test_SyncWorker_apply(t *testing.T) {

worker := &SyncWorker{}
worker.backoff.Steps = 3
worker.builder = NewResourceBuilder(nil, nil)
worker.builder = NewResourceBuilder(nil, nil, nil)
ctx := context.Background()
worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()})
test.check(t, r.actions)
Expand Down
14 changes: 12 additions & 2 deletions pkg/cvo/sync_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,17 +429,27 @@ func (w *SyncWorker) apply(ctx context.Context, payloadUpdate *payload.Update, w
}

var tasks []*payload.Task
backoff := w.backoff
if backoff.Steps > 1 && work.State == payload.InitializingPayload {
backoff = wait.Backoff{Steps: 4, Factor: 2, Duration: time.Second}
}
for i := range payloadUpdate.Manifests {
tasks = append(tasks, &payload.Task{
Index: i + 1,
Total: total,
Manifest: &payloadUpdate.Manifests[i],
Backoff: w.backoff,
Backoff: backoff,
})
}
graph := payload.NewTaskGraph(tasks)
graph.Split(payload.SplitOnJobs)
graph.Parallelize(payload.ByNumberAndComponent)
if work.State == payload.InitializingPayload {
// get the payload out via brute force
maxWorkers = len(tasks)
graph.Parallelize(payload.FlattenByNumberAndComponent)
} else {
graph.Parallelize(payload.ByNumberAndComponent)
}

// update each object
err := payload.RunGraph(ctx, graph, maxWorkers, func(ctx context.Context, tasks []*payload.Task) error {
Expand Down
8 changes: 4 additions & 4 deletions pkg/payload/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ func Test_loadUpdatePayload(t *testing.T) {
ManifestHash: "6GC9TkkG9PA=",
Manifests: []lib.Manifest{
{
OriginalFilename: "file.json",
Raw: mustRead(filepath.Join("..", "cvo", "testdata", "payloadtest", "release-manifests", "file.json")),
OriginalFilename: "0000_10_a_file.json",
Raw: mustRead(filepath.Join("..", "cvo", "testdata", "payloadtest", "release-manifests", "0000_10_a_file.json")),
GVK: schema.GroupVersionKind{
Kind: "Test",
Version: "v1",
Expand All @@ -65,7 +65,7 @@ func Test_loadUpdatePayload(t *testing.T) {
},
},
{
OriginalFilename: "file.yaml",
OriginalFilename: "0000_10_a_file.yaml",
Raw: []byte(`{"apiVersion":"v1","kind":"Test","metadata":{"name":"file-yaml"}}`),
GVK: schema.GroupVersionKind{
Kind: "Test",
Expand All @@ -82,7 +82,7 @@ func Test_loadUpdatePayload(t *testing.T) {
},
},
{
OriginalFilename: "file.yml",
OriginalFilename: "0000_10_a_file.yml",
Raw: []byte(`{"apiVersion":"v1","kind":"Test","metadata":{"name":"file-yml"}}`),
GVK: schema.GroupVersionKind{
Kind: "Test",
Expand Down
48 changes: 48 additions & 0 deletions pkg/payload/task_graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,54 @@ func ByNumberAndComponent(tasks []*Task) [][]*TaskNode {
return buckets
}

// FlattenByNumberAndComponent creates parallelization for tasks whose original filenames are of the form
// 0000_NN_NAME_* - files that share 0000_NN_NAME_ are run in serial, but chunks of files that have
// different 0000_NN_NAME can be run in parallel. This splitter does *not* preserve ordering within run
// levels and is intended only for use cases where order is not important.
func FlattenByNumberAndComponent(tasks []*Task) [][]*TaskNode {
if len(tasks) <= 1 {
return nil
}
count := len(tasks)
matches := make([][]string, 0, count)
for i := 0; i < len(tasks); i++ {
matches = append(matches, reMatchPattern.FindStringSubmatch(tasks[i].Manifest.OriginalFilename))
}

var lastNode *TaskNode
var groups []*TaskNode
for i := 0; i < count; {
matchBase := matches[i]
j := i + 1
for ; j < count; j++ {
matchNext := matches[j]
if matchBase == nil || matchNext == nil || matchBase[groupNumber] != matchNext[groupNumber] {
break
}
if matchBase[groupComponent] != matchNext[groupComponent] {
groups = append(groups, &TaskNode{Tasks: tasks[i:j]})
i = j
}
matchBase = matchNext
}
if len(groups) > 0 {
groups = append(groups, &TaskNode{Tasks: tasks[i:j]})
i = j
lastNode = nil
continue
}
if lastNode == nil {
lastNode = &TaskNode{Tasks: append([]*Task(nil), tasks[i:j]...)}
i = j
groups = append(groups, lastNode)
continue
}
lastNode.Tasks = append(lastNode.Tasks, tasks[i:j]...)
i = j
}
return [][]*TaskNode{groups}
}

type TaskNode struct {
In []int
Tasks []*Task
Expand Down
137 changes: 137 additions & 0 deletions pkg/payload/task_graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,143 @@ func TestByNumberAndComponent(t *testing.T) {
}
}

func TestFlattenByNumberAndComponent(t *testing.T) {
tasks := func(names ...string) []*Task {
var arr []*Task
for _, name := range names {
arr = append(arr, &Task{Manifest: &lib.Manifest{OriginalFilename: name}})
}
return arr
}
tests := []struct {
name string
tasks []*Task
want [][]*TaskNode
}{
{
name: "empty tasks",
tasks: tasks(),
want: nil,
},
{
name: "no grouping possible",
tasks: tasks("a"),
want: nil,
},
{
name: "no recognizable groups",
tasks: tasks("a", "b", "c"),
want: [][]*TaskNode{
{
&TaskNode{Tasks: tasks("a")},
&TaskNode{Tasks: tasks("b")},
&TaskNode{Tasks: tasks("c")},
},
},
},
{
name: "single grouped item",
tasks: tasks("0000_01_x-y-z_file1"),
want: nil,
},
{
name: "multiple grouped items in single node",
tasks: tasks("0000_01_x-y-z_file1", "0000_01_x-y-z_file2"),
want: [][]*TaskNode{
{
&TaskNode{Tasks: tasks("0000_01_x-y-z_file1", "0000_01_x-y-z_file2")},
},
},
},
{
tasks: tasks("a", "0000_01_x-y-z_file1", "c"),
want: [][]*TaskNode{
{
&TaskNode{Tasks: tasks("a")},
&TaskNode{Tasks: tasks("0000_01_x-y-z_file1")},
&TaskNode{Tasks: tasks("c")},
},
},
},
{
tasks: tasks("0000_01_x-y-z_file1", "0000_01_x-y-z_file2"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is same as fee2d06#diff-6c3be3194acbe263715006cdee7e9442R276 multiple grouped items in single node

want: [][]*TaskNode{
{
&TaskNode{Tasks: tasks("0000_01_x-y-z_file1", "0000_01_x-y-z_file2")},
},
},
},
{
tasks: tasks("0000_01_a-b-c_file1", "0000_01_x-y-z_file2"),
want: [][]*TaskNode{
{
&TaskNode{Tasks: tasks("0000_01_a-b-c_file1")},
&TaskNode{Tasks: tasks("0000_01_x-y-z_file2")},
},
},
},
{
tasks: tasks(
"0000_01_a-b-c_file1",
"0000_01_x-y-z_file1",
"0000_01_x-y-z_file2",
"a",
"0000_01_x-y-z_file2",
"0000_01_x-y-z_file3",
),
want: [][]*TaskNode{
{
&TaskNode{Tasks: tasks(
"0000_01_a-b-c_file1",
)},
&TaskNode{Tasks: tasks(
"0000_01_x-y-z_file1",
"0000_01_x-y-z_file2",
)},
&TaskNode{Tasks: tasks(
"a",
)},
&TaskNode{Tasks: tasks(
"0000_01_x-y-z_file2",
"0000_01_x-y-z_file3",
)},
},
},
},
{
tasks: tasks(
"0000_01_a-b-c_file1",
"0000_01_x-y-z_file1",
"0000_01_x-y-z_file2",
"0000_02_x-y-z_file2",
"0000_02_x-y-z_file3",
),
want: [][]*TaskNode{
{
&TaskNode{Tasks: tasks(
"0000_01_a-b-c_file1",
)},
&TaskNode{Tasks: tasks(
"0000_01_x-y-z_file1",
"0000_01_x-y-z_file2",
)},
&TaskNode{Tasks: tasks(
"0000_02_x-y-z_file2",
"0000_02_x-y-z_file3",
)},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := FlattenByNumberAndComponent(tt.tasks); !reflect.DeepEqual(got, tt.want) {
t.Fatalf("%s", diff.ObjectReflectDiff(tt.want, got))
}
})
}
}

func Test_TaskGraph_real(t *testing.T) {
path := os.Getenv("TEST_GRAPH_PATH")
if len(path) == 0 {
Expand Down
Loading