-
Notifications
You must be signed in to change notification settings - Fork 225
delete ra deployment when sink is not found #1533
Changes from 8 commits
1dbefe9
50d7030
9028264
3b30043
af6b53b
631578b
d0afc27
7e099de
32e7c35
4d0bd75
f0abe98
6347e52
46a84cb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,131 @@ | ||
//+build e2e | ||
|
||
/* | ||
Copyright 2020 The Knative Authors | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package e2e | ||
|
||
import ( | ||
"testing" | ||
|
||
appsv1 "k8s.io/api/apps/v1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/util/sets" | ||
|
||
testlib "knative.dev/eventing/test/lib" | ||
"knative.dev/eventing/test/lib/resources" | ||
"knative.dev/pkg/apis" | ||
pkgTest "knative.dev/pkg/test" | ||
|
||
sourcesv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1" | ||
"knative.dev/eventing-contrib/test" | ||
"knative.dev/eventing-contrib/test/e2e/helpers" | ||
contribtestlib "knative.dev/eventing-contrib/test/lib" | ||
contribresources "knative.dev/eventing-contrib/test/lib/resources" | ||
) | ||
|
||
const ( | ||
rtKafkaSourceName = "e2e-rt-kafka-source" | ||
rtChannelName = "e2e-rt-channel" | ||
rtKafkaConsumerGroup = "e2e-rt-cg" | ||
rtKafkaTopicName = "e2e-rt-topic" | ||
) | ||
|
||
//TestKafkaSourceReconciler tests various kafka source reconciler statuses | ||
//RT is short for reconciler test | ||
func TestKafkaSourceReconciler(t *testing.T) { | ||
client := testlib.Setup(t, true) | ||
defer testlib.TearDown(client) | ||
|
||
for _, test := range []struct { | ||
name string | ||
action func(c *testlib.Client) | ||
expectedStatuses sets.String | ||
wantRADepCount int | ||
}{{ | ||
"create_kafka_source", | ||
createKafkaSourceWithSinkMissing, | ||
sets.NewString("NotFound"), | ||
0, | ||
}, { | ||
"create_sink", | ||
createChannel, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't get why you're using the kafka channel here 😄, if you need to create a "sink", can you use a simpler one (like the test images to receive events)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wanted to use the simplest one like you have mentioned, but if am not wrong, if the sink is "v1/Service", it is not validated to see if its available or not. Reconciles to true always. Next option was to use ksvc, but i dint see any ksvc used as sink in any of the eventing tests nor the knative serving components getting installed. Thats why used a channel. Willing to change to simpler ones if there are other options. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think you need an addressable, so maybe using a kube service with a pod behind the hood is fine? I would love as much as possible to have this "dependency" between the kafka source tests and kafka channel There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but with kube service as i mentioned above , its not reporting the right status on the kafks source. It always reconciles to true whether its there are not. I initially tried with kube service only. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok so then, more than creating a kafka channel, can you create an imc channel and clearly state you need it for that purpose? Because IMC is part of the eventing "core release", it's installed in the test env for sure. You cannot make the same assumption with kafka channel There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Oh, ok, this is the reason that I wasn't able to reproduce the issue. I was trying with a Kube service There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. used imc initially, the tests failed, then switched to kafka channel. Tried one more time with imc, got the below logs in tests
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first 2 errors (failed to create) are weird, i don't get what's the problem there tbh, double check all the code you use to start the imc. in fact, in testlib.Client there should be a method to create a imc channel (it's from eventing/test/lib), you don't need to develop it by yourself. The last error is clearly an issue of cluster roles, seems like you need some specific cluster role to check if the addressable is ready. @matzew any thoughts on that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. imc was not set up as part of e2e set up, added those in the e2e scripts and test passes. @slinkydeveloper @matzew |
||
sets.NewString(""), | ||
1, | ||
}, { | ||
"delete_sink", | ||
deleteChannel, | ||
sets.NewString("NotFound"), | ||
0, | ||
}, { | ||
"create_sink_after_delete", | ||
createChannel, | ||
sets.NewString(""), | ||
1, | ||
}} { | ||
t.Run(test.name, func(t *testing.T) { | ||
testKafkaSourceReconciler(client, test.name, test.action, test.expectedStatuses, test.wantRADepCount) | ||
}) | ||
} | ||
} | ||
|
||
func testKafkaSourceReconciler(c *testlib.Client, name string, doAction func(c *testlib.Client), expectedStatuses sets.String, wantRADepCount int) { | ||
doAction(c) | ||
|
||
if err := helpers.CheckKafkaSourceState(c, rtKafkaSourceName, func(ks *sourcesv1beta1.KafkaSource) (bool, error) { | ||
ready := ks.Status.GetCondition(apis.ConditionReady) | ||
if ready != nil { | ||
if expectedStatuses.Has(ready.Reason) { | ||
return true, nil | ||
} | ||
} | ||
return false, nil | ||
}); err != nil { | ||
c.T.Fatalf("Failed to validate kafkasource state, expected status : %v, err : %v", expectedStatuses.UnsortedList(), err) | ||
} | ||
|
||
if err := helpers.CheckRADeployment(c, rtKafkaSourceName, func(deps *appsv1.DeploymentList) (bool, error) { | ||
if len(deps.Items) == wantRADepCount { | ||
return true, nil | ||
} | ||
return false, nil | ||
}); err != nil { | ||
c.T.Fatal("Failed to validate adapter deployment state:", err) | ||
} | ||
} | ||
|
||
func createKafkaSourceWithSinkMissing(c *testlib.Client) { | ||
helpers.MustCreateTopic(c, kafkaClusterName, kafkaClusterNamespace, rtKafkaTopicName) | ||
|
||
contribtestlib.CreateKafkaSourceV1Beta1OrFail(c, contribresources.KafkaSourceV1Beta1( | ||
kafkaBootstrapUrl, | ||
rtKafkaTopicName, | ||
pkgTest.CoreV1ObjectReference(test.KafkaChannelKind, resources.MessagingAPIVersion, rtChannelName), | ||
contribresources.WithNameV1Beta1(rtKafkaSourceName), | ||
contribresources.WithConsumerGroupV1Beta1(rtKafkaConsumerGroup), | ||
)) | ||
} | ||
|
||
func createChannel(c *testlib.Client) { | ||
c.CreateChannelOrFail(rtChannelName, &metav1.TypeMeta{ | ||
APIVersion: resources.MessagingAPIVersion, | ||
Kind: test.KafkaChannelKind, | ||
}) | ||
} | ||
|
||
func deleteChannel(c *testlib.Client) { | ||
contribtestlib.DeleteResourceOrFail(c, rtChannelName, helpers.KafkaChannelGVR) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
Copyright 2020 The Knative Authors | ||
|
||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
|
||
http://www.apache.org/licenses/LICENSE-2.0 | ||
|
||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package lib | ||
|
||
import ( | ||
"k8s.io/apimachinery/pkg/runtime/schema" | ||
|
||
testlib "knative.dev/eventing/test/lib" | ||
) | ||
|
||
func DeleteResourceOrFail(c *testlib.Client, name string, gvr schema.GroupVersionResource) { | ||
unstructured := c.Dynamic.Resource(gvr).Namespace(c.Namespace) | ||
if err := unstructured.Delete(name, nil); err != nil { | ||
c.T.Fatalf("Failed to delete the resource %q : %v", name, err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added KafkaCannelGVR for deleting it