Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/cvo/cvo.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (optr *Operator) Run(workers int, stopCh <-chan struct{}) {

// start the config sync loop, and have it notify the queue when new status is detected
go runThrottledStatusNotifier(stopCh, optr.statusInterval, 2, optr.configSync.StatusCh(), func() { optr.queue.Add(optr.queueKey()) })
go optr.configSync.Start(stopCh)
go optr.configSync.Start(8, stopCh)

go wait.Until(func() { optr.worker(optr.queue, optr.sync) }, time.Second, stopCh)
go wait.Until(func() { optr.worker(optr.availableUpdatesQueue, optr.availableUpdatesSync) }, time.Second, stopCh)
Expand Down
6 changes: 3 additions & 3 deletions pkg/cvo/cvo_scenarios_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func TestCVO_StartupAndSync(t *testing.T) {
defer close(stopCh)
defer shutdownFn()
worker := o.configSync.(*SyncWorker)
go worker.Start(stopCh)
go worker.Start(1, stopCh)

// Step 1: Verify the CVO creates the initial Cluster Version object
//
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestCVO_RestartAndReconcile(t *testing.T) {
// Step 2: Start the sync worker and verify the sequence of events, and then verify
// the status does not change
//
go worker.Start(stopCh)
go worker.Start(1, stopCh)
//
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Expand Down Expand Up @@ -538,7 +538,7 @@ func TestCVO_ErrorDuringReconcile(t *testing.T) {

// Step 2: Start the sync worker and verify the sequence of events
//
go worker.Start(stopCh)
go worker.Start(1, stopCh)
//
verifyAllStatus(t, worker.StatusCh(),
SyncWorkerStatus{
Expand Down
6 changes: 3 additions & 3 deletions pkg/cvo/cvo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1304,9 +1304,9 @@ func TestOperator_sync(t *testing.T) {
Actual: configv1.Update{Image: "image/image:v4.0.1", Version: "0.0.1-abc"},
},
optr: Operator{
releaseImage: "image/image:v4.0.1",
namespace: "test",
name: "default",
releaseImage: "image/image:v4.0.1",
namespace: "test",
name: "default",
defaultUpstreamServer: "http://localhost:8080/graph",
availableUpdates: &availableUpdates{
Upstream: "",
Expand Down
268 changes: 3 additions & 265 deletions pkg/cvo/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/davecgh/go-spew/spew"

apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand All @@ -28,179 +27,6 @@ import (
"github.com/openshift/cluster-version-operator/pkg/payload"
)

func TestHasRequeueOnErrorAnnotation(t *testing.T) {
tests := []struct {
annos map[string]string

exp bool
experrs []string
}{{
annos: nil,
exp: false,
experrs: nil,
}, {
annos: map[string]string{"dummy": "dummy"},
exp: false,
experrs: nil,
}, {
annos: map[string]string{requeueOnErrorAnnotationKey: "NoMatch"},
exp: true,
experrs: []string{"NoMatch"},
}, {
annos: map[string]string{requeueOnErrorAnnotationKey: "NoMatch,NotFound"},
exp: true,
experrs: []string{"NoMatch", "NotFound"},
}}
for idx, test := range tests {
t.Run(fmt.Sprintf("test#%d", idx), func(t *testing.T) {
got, goterrs := hasRequeueOnErrorAnnotation(test.annos)
if got != test.exp {
t.Fatalf("expected %v got %v", test.exp, got)
}
if !reflect.DeepEqual(goterrs, test.experrs) {
t.Fatalf("expected %v got %v", test.exp, got)
}
})
}
}

func TestShouldRequeueOnErr(t *testing.T) {
tests := []struct {
err error
manifest string
exp bool
}{{
err: nil,
manifest: `{
"apiVersion": "v1",
"kind": "ConfigMap"
}`,

exp: false,
}, {
err: fmt.Errorf("random error"),
manifest: `{
"apiVersion": "v1",
"kind": "ConfigMap"
}`,

exp: false,
}, {
err: &meta.NoResourceMatchError{},
manifest: `{
"apiVersion": "v1",
"kind": "ConfigMap"
}`,

exp: false,
}, {
err: &payload.UpdateError{Nested: &meta.NoResourceMatchError{}},
manifest: `{
"apiVersion": "v1",
"kind": "ConfigMap"
}`,

exp: false,
}, {
err: &meta.NoResourceMatchError{},
manifest: `{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"annotations": {
"v1.cluster-version-operator.operators.openshift.io/requeue-on-error": "NoMatch"
}
}
}`,

exp: true,
}, {
err: &payload.UpdateError{Nested: &meta.NoResourceMatchError{}},
manifest: `{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"annotations": {
"v1.cluster-version-operator.operators.openshift.io/requeue-on-error": "NoMatch"
}
}
}`,

exp: true,
}, {
err: &meta.NoResourceMatchError{},
manifest: `{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"annotations": {
"v1.cluster-version-operator.operators.openshift.io/requeue-on-error": "NotFound"
}
}
}`,

exp: false,
}, {
err: &payload.UpdateError{Nested: &meta.NoResourceMatchError{}},
manifest: `{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"annotations": {
"v1.cluster-version-operator.operators.openshift.io/requeue-on-error": "NotFound"
}
}
}`,

exp: false,
}, {
err: apierrors.NewInternalError(fmt.Errorf("dummy")),
manifest: `{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"annotations": {
"v1.cluster-version-operator.operators.openshift.io/requeue-on-error": "NoMatch"
}
}
}`,

exp: false,
}, {
err: &payload.UpdateError{Nested: apierrors.NewInternalError(fmt.Errorf("dummy"))},
manifest: `{
"apiVersion": "v1",
"kind": "ConfigMap",
"metadata": {
"annotations": {
"v1.cluster-version-operator.operators.openshift.io/requeue-on-error": "NoMatch"
}
}
}`,

exp: false,
}, {
err: &payload.UpdateError{Nested: &resourcebuilder.RetryLaterError{}},
manifest: `{
"apiVersion": "v1",
"kind": "ConfigMap"
}`,

exp: true,
}}
for idx, test := range tests {
t.Run(fmt.Sprintf("test#%d", idx), func(t *testing.T) {
var manifest lib.Manifest
if err := json.Unmarshal([]byte(test.manifest), &manifest); err != nil {
t.Fatal(err)
}
if got := shouldRequeueOnErr(test.err, &manifest); got != test.exp {
t.Fatalf("expected %v got %v", test.exp, got)
}
})
}
}

func Test_SyncWorker_apply(t *testing.T) {
tests := []struct {
manifests []string
Expand Down Expand Up @@ -274,94 +100,6 @@ func Test_SyncWorker_apply(t *testing.T) {
t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got))
}
},
}, {
manifests: []string{
`{
"apiVersion": "test.cvo.io/v1",
"kind": "TestA",
"metadata": {
"namespace": "default",
"name": "testa",
"annotations": {
"v1.cluster-version-operator.operators.openshift.io/requeue-on-error": "NoMatch"
}
}
}`,
`{
"apiVersion": "test.cvo.io/v1",
"kind": "TestB",
"metadata": {
"namespace": "default",
"name": "testb"
}
}`,
},
reactors: map[action]error{
newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa"): &meta.NoResourceMatchError{},
},
wantErr: true,
check: func(t *testing.T, actions []action) {
if len(actions) != 7 {
spew.Dump(actions)
t.Fatalf("unexpected %d actions", len(actions))
}

if got, exp := actions[0], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa")); !reflect.DeepEqual(got, exp) {
t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got))
}
if got, exp := actions[3], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestB"}, "default", "testb")); !reflect.DeepEqual(got, exp) {
t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got))
}
if got, exp := actions[4], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa")); !reflect.DeepEqual(got, exp) {
t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got))
}
},
}, {
manifests: []string{
`{
"apiVersion": "test.cvo.io/v1",
"kind": "TestA",
"metadata": {
"namespace": "default",
"name": "testa",
"annotations": {
"v1.cluster-version-operator.operators.openshift.io/requeue-on-error": "NoMatch"
}
}
}`,
`{
"apiVersion": "test.cvo.io/v1",
"kind": "TestB",
"metadata": {
"namespace": "default",
"name": "testb",
"annotations": {
"v1.cluster-version-operator.operators.openshift.io/requeue-on-error": "NoMatch"
}
}
}`,
},
reactors: map[action]error{
newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa"): &meta.NoResourceMatchError{},
newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestB"}, "default", "testb"): &meta.NoResourceMatchError{},
},
wantErr: true,
check: func(t *testing.T, actions []action) {
if len(actions) != 9 {
spew.Dump(actions)
t.Fatalf("unexpected %d actions", len(actions))
}

if got, exp := actions[0], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa")); !reflect.DeepEqual(got, exp) {
t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got))
}
if got, exp := actions[3], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestB"}, "default", "testb")); !reflect.DeepEqual(got, exp) {
t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got))
}
if got, exp := actions[6], (newAction(schema.GroupVersionKind{"test.cvo.io", "v1", "TestA"}, "default", "testa")); !reflect.DeepEqual(got, exp) {
t.Fatalf("expected: %s got: %s", spew.Sdump(exp), spew.Sdump(got))
}
},
}}
for idx, test := range tests {
t.Run(fmt.Sprintf("test#%d", idx), func(t *testing.T) {
Expand All @@ -385,7 +123,7 @@ func Test_SyncWorker_apply(t *testing.T) {
worker.backoff.Steps = 3
worker.builder = NewResourceBuilder(nil)
ctx := context.Background()
worker.apply(ctx, up, &SyncWork{}, &statusWrapper{w: worker, previousStatus: worker.Status()})
worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()})
test.check(t, r.actions)
})
}
Expand Down Expand Up @@ -549,7 +287,7 @@ func Test_SyncWorker_apply_generic(t *testing.T) {
modifiers: test.modifiers,
}
ctx := context.Background()
err := worker.apply(ctx, up, &SyncWork{}, &statusWrapper{w: worker, previousStatus: worker.Status()})
err := worker.apply(ctx, up, &SyncWork{}, 1, &statusWrapper{w: worker, previousStatus: worker.Status()})
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -613,7 +351,7 @@ func (r *fakeSyncRecorder) StatusCh() <-chan SyncWorkerStatus {
return ch
}

func (r *fakeSyncRecorder) Start(stopCh <-chan struct{}) {}
func (r *fakeSyncRecorder) Start(maxWorkers int, stopCh <-chan struct{}) {}

func (r *fakeSyncRecorder) Update(generation int64, desired configv1.Update, overrides []configv1.ComponentOverride, reconciling bool) *SyncWorkerStatus {
r.Updates = append(r.Updates, desired)
Expand Down
Loading