Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Deleting EventTypes when changing sink to a non-Broker kind #420

Merged
merged 26 commits into from
May 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f5a9a40
Adding knative-sources namespace to gcppubsub's ServiceAccount.
nachocano Feb 28, 2019
61a15f4
Merge remote-tracking branch 'upstream/master'
nachocano Feb 28, 2019
993f253
Merge remote-tracking branch 'upstream/master'
nachocano Mar 1, 2019
5d069c1
Merge remote-tracking branch 'upstream/master'
nachocano Mar 1, 2019
dfc8add
Merge remote-tracking branch 'upstream/master'
nachocano Mar 12, 2019
2d410d0
Merge remote-tracking branch 'upstream/master'
nachocano Mar 15, 2019
ed434d6
Merge remote-tracking branch 'upstream/master'
nachocano Mar 25, 2019
4b44736
Merge remote-tracking branch 'upstream/master'
nachocano Mar 26, 2019
9288767
Merge remote-tracking branch 'upstream/master'
nachocano Mar 27, 2019
d171278
Merge remote-tracking branch 'upstream/master'
nachocano Mar 27, 2019
603c57c
Fixing bug introduced during a refactor.
nachocano Mar 27, 2019
9d63463
Merge remote-tracking branch 'upstream/master'
nachocano Mar 29, 2019
23d8f31
Merge remote-tracking branch 'upstream/master'
nachocano Apr 3, 2019
e7666f0
Merge remote-tracking branch 'upstream/master'
nachocano Apr 8, 2019
d6775fa
Merge remote-tracking branch 'upstream/master'
nachocano Apr 17, 2019
0aa5816
Merge remote-tracking branch 'upstream/master'
nachocano Apr 26, 2019
c05b9eb
Merge remote-tracking branch 'upstream/master'
nachocano May 3, 2019
b7960ed
Merge remote-tracking branch 'upstream/master'
nachocano May 6, 2019
1507b81
Merge remote-tracking branch 'upstream/master'
nachocano May 7, 2019
a0ff481
Merge remote-tracking branch 'upstream/master'
nachocano May 7, 2019
9025af9
Merge remote-tracking branch 'upstream/master'
nachocano May 9, 2019
e6f3cf9
Merge remote-tracking branch 'upstream/master'
nachocano May 13, 2019
b6e90a9
Deleting eventTypes for non-Broker sinks
nachocano May 13, 2019
97d596c
more UTs
nachocano May 13, 2019
f7b010b
Merge remote-tracking branch 'upstream/master' into non-broker-sink
nachocano May 13, 2019
cefc49f
fixing test
nachocano May 13, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions contrib/awssqs/pkg/reconciler/awssqssource.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,15 +148,12 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) error
}
src.Status.MarkDeployed()

// Only create EventTypes for Broker sinks.
if src.Spec.Sink.Kind == "Broker" {
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()

return nil
}
Expand Down Expand Up @@ -233,6 +230,7 @@ func (r *reconciler) newEventTypeReconcilerArgs(src *v1alpha1.AwsSqsSource) *eve
Specs: specs,
Namespace: src.Namespace,
Labels: getLabels(src),
Kind: src.Spec.Sink.Kind,
}
}

Expand Down
14 changes: 6 additions & 8 deletions contrib/gcppubsub/pkg/reconciler/gcppubsubsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,12 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) error
}
src.Status.MarkDeployed()

// Only create EventTypes for Broker sinks.
if src.Spec.Sink.Kind == "Broker" {
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()

return nil
}
Expand Down Expand Up @@ -323,6 +320,7 @@ func (r *reconciler) newEventTypeReconcilerArgs(src *v1alpha1.GcpPubSubSource) *
Specs: specs,
Namespace: src.Namespace,
Labels: getLabels(src),
Kind: src.Spec.Sink.Kind,
}
}

Expand Down
33 changes: 29 additions & 4 deletions contrib/gcppubsub/pkg/reconciler/gcppubsubsource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
eventingsourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
v1 "k8s.io/api/apps/v1"
"k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -269,7 +269,7 @@ func TestReconcile(t *testing.T) {
getAddressableWithName(transformerAddressableName),
},
WantPresent: []runtime.Object{
getReadySource(),
getReadyAndMarkEventTypeSource(),
},
}, {
Name: "successful create - reuse existing receive adapter",
Expand All @@ -287,7 +287,7 @@ func TestReconcile(t *testing.T) {
},
},
WantPresent: []runtime.Object{
getReadySource(),
getReadyAndMarkEventTypeSource(),
},
}, {
Name: "successful create event types",
Expand All @@ -300,6 +300,20 @@ func TestReconcile(t *testing.T) {
getReadyAndMarkEventTypeSourceWithKind(brokerKind),
getEventType(),
},
}, {
Name: "successful delete event types",
InitialState: []runtime.Object{
getSource(),
getAddressable(),
getAddressableWithName(transformerAddressableName),
getEventTypeForSource("name-1", getSource()),
},
WantPresent: []runtime.Object{
getReadyAndMarkEventTypeSource(),
},
WantAbsent: []runtime.Object{
getEventTypeForSource("name-1", getSource()),
},
}, {
Name: "cannot create event types",
InitialState: []runtime.Object{
Expand Down Expand Up @@ -411,6 +425,10 @@ func getSourceWithKind(kind string) *sourcesv1alpha1.GcpPubSubSource {
}

func getEventType() *eventingv1alpha1.EventType {
return getEventTypeForSource("", getSourceWithKind(brokerKind))
}

func getEventTypeForSource(name string, src *sourcesv1alpha1.GcpPubSubSource) *eventingv1alpha1.EventType {
return &eventingv1alpha1.EventType{
TypeMeta: metav1.TypeMeta{
APIVersion: eventingv1alpha1.SchemeGroupVersion.String(),
Expand All @@ -427,9 +445,10 @@ func getEventType() *eventingv1alpha1.EventType {
UID: sourceUID,
},
},
Name: name,
GenerateName: fmt.Sprintf("%s-", sourcesv1alpha1.GcpPubSubSourceEventType),
Namespace: testNS,
Labels: getLabels(getSourceWithKind(brokerKind)),
Labels: getLabels(src),
},
Spec: eventingv1alpha1.EventTypeSpec{
Type: sourcesv1alpha1.GcpPubSubSourceEventType,
Expand Down Expand Up @@ -508,6 +527,12 @@ func getReadySourceWithKind(kind string) *sourcesv1alpha1.GcpPubSubSource {
return src
}

func getReadyAndMarkEventTypeSource() *sourcesv1alpha1.GcpPubSubSource {
src := getReadySource()
src.Status.MarkEventTypes()
return src
}

func getReadyAndMarkEventTypeSourceWithKind(kind string) *sourcesv1alpha1.GcpPubSubSource {
src := getReadySourceWithKind(kind)
src.Status.MarkEventTypes()
Expand Down
14 changes: 6 additions & 8 deletions contrib/kafka/pkg/reconciler/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,12 @@ func (r *reconciler) Reconcile(ctx context.Context, object runtime.Object) error
}
src.Status.MarkDeployed()

// Only create EventTypes for Broker sinks.
if src.Spec.Sink.Kind == "Broker" {
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()
err = r.reconcileEventTypes(ctx, src)
if err != nil {
logger.Error("Unable to reconcile the event types", zap.Error(err))
return err
}
src.Status.MarkEventTypes()

return nil
}
Expand Down Expand Up @@ -199,6 +196,7 @@ func (r *reconciler) newEventTypeReconcilerArgs(src *v1alpha1.KafkaSource) *even
Specs: specs,
Namespace: src.Namespace,
Labels: getLabels(src),
Kind: src.Spec.Sink.Kind,
}
}

Expand Down
79 changes: 70 additions & 9 deletions contrib/kafka/pkg/reconciler/kafkasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
eventingv1alpha1 "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
eventingsourcesv1alpha1 "github.com/knative/eventing/pkg/apis/sources/v1alpha1"
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
v1 "k8s.io/api/apps/v1"
"k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestReconcile(t *testing.T) {
getAddressable(),
},
WantPresent: []runtime.Object{
getReadySource(),
getReadySourceAndMarkEventTypes(),
},
}, {
Name: "successful create - reuse existing receive adapter",
Expand All @@ -159,7 +159,7 @@ func TestReconcile(t *testing.T) {
},
},
WantPresent: []runtime.Object{
getReadySource(),
getReadySourceAndMarkEventTypes(),
},
}, {
Name: "successful create event types",
Expand All @@ -185,8 +185,59 @@ func TestReconcile(t *testing.T) {
},
WantPresent: []runtime.Object{
getReadyAndMarkEventTypesSourceWithKind(brokerKind),
getEventType("name1", "group", "topic1"),
getEventType("name2", "group", "topic2"),
getEventType("name1", "topic1"),
getEventType("name2", "topic2"),
},
}, {
Name: "successful create missing event types",
InitialState: []runtime.Object{
getSourceWithKind(brokerKind),
getAddressableWithKind(brokerKind),
getEventType("name2", "topic2"),
getEventType("name3", "whatever_topic"),
},
Mocks: controllertesting.Mocks{
MockCreates: []controllertesting.MockCreate{
func(_ client.Client, _ context.Context, obj runtime.Object) (controllertesting.MockHandled, error) {
if eventType, ok := obj.(*eventingv1alpha1.EventType); ok {
// Hack because the fakeClient does not support GenerateName.
if strings.Contains(eventType.Spec.Source, "topic1") {
eventType.Name = "name1"
}
return controllertesting.Unhandled, nil
}
return controllertesting.Unhandled, nil
},
},
},
WantPresent: []runtime.Object{
getReadyAndMarkEventTypesSourceWithKind(brokerKind),
getEventType("name1", "topic1"),
getEventType("name2", "topic2"),
},
WantAbsent: []runtime.Object{
getEventType("name3", "whatever_topic"),
},
}, {
Name: "successful delete event type",
InitialState: []runtime.Object{
getSource(),
getAddressable(),
getReceiveAdapter(),
getEventType("name1", "topic1"),
},
Mocks: controllertesting.Mocks{
MockCreates: []controllertesting.MockCreate{
func(_ client.Client, _ context.Context, _ runtime.Object) (controllertesting.MockHandled, error) {
return controllertesting.Handled, errors.New("an error that won't be seen because create is not called")
},
},
},
WantPresent: []runtime.Object{
getReadySourceAndMarkEventTypes(),
},
WantAbsent: []runtime.Object{
getEventType("name1", "topic1"),
},
}, {
Name: "cannot create event types",
Expand All @@ -205,8 +256,8 @@ func TestReconcile(t *testing.T) {
},
},
WantAbsent: []runtime.Object{
getEventType("name1", "group", "topic1"),
getEventType("name2", "group", "topic2"),
getEventType("name1", "topic1"),
getEventType("name2", "topic2"),
},
WantPresent: []runtime.Object{
getSourceWithSinkAndDeployedAndKind(brokerKind),
Expand Down Expand Up @@ -288,7 +339,7 @@ func getSourceWithKind(kind string) *sourcesv1alpha1.KafkaSource {
return obj
}

func getEventType(name, group, topic string) *eventingv1alpha1.EventType {
func getEventTypeForSource(name, topic string, source *sourcesv1alpha1.KafkaSource) *eventingv1alpha1.EventType {
return &eventingv1alpha1.EventType{
TypeMeta: metav1.TypeMeta{
APIVersion: eventingv1alpha1.SchemeGroupVersion.String(),
Expand All @@ -308,7 +359,7 @@ func getEventType(name, group, topic string) *eventingv1alpha1.EventType {
Name: name,
GenerateName: fmt.Sprintf("%s-", sourcesv1alpha1.KafkaEventType),
Namespace: testNS,
Labels: getLabels(getSourceWithKind(brokerKind)),
Labels: getLabels(source),
},
Spec: eventingv1alpha1.EventTypeSpec{
Type: sourcesv1alpha1.KafkaEventType,
Expand All @@ -318,6 +369,10 @@ func getEventType(name, group, topic string) *eventingv1alpha1.EventType {
}
}

func getEventType(name, topic string) *eventingv1alpha1.EventType {
return getEventTypeForSource(name, topic, getSourceWithKind(brokerKind))
}

func getSourceWithNoSink() *sourcesv1alpha1.KafkaSource {
src := getSource()
src.Status.InitializeConditions()
Expand Down Expand Up @@ -352,6 +407,12 @@ func getReadySourceWithKind(kind string) *sourcesv1alpha1.KafkaSource {
return src
}

func getReadySourceAndMarkEventTypes() *sourcesv1alpha1.KafkaSource {
src := getReadySource()
src.Status.MarkEventTypes()
return src
}

func getReadyAndMarkEventTypesSourceWithKind(kind string) *sourcesv1alpha1.KafkaSource {
src := getReadySourceWithKind(kind)
src.Status.MarkEventTypes()
Expand Down
9 changes: 9 additions & 0 deletions pkg/reconciler/eventtype/eventtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type ReconcilerArgs struct {
Specs []eventingv1alpha1.EventTypeSpec
Namespace string
Labels map[string]string
Kind string
}

// Reconcile reconciles the EventTypes taken from 'args', and sets 'owner' as the controller.
Expand Down Expand Up @@ -100,6 +101,14 @@ func (r *Reconciler) getEventTypes(ctx context.Context, namespace string, lbs ma
// makeEventTypes creates the in-memory representation of the EventTypes.
func (r *Reconciler) makeEventTypes(args *ReconcilerArgs, owner metav1.Object) ([]eventingv1alpha1.EventType, error) {
eventTypes := make([]eventingv1alpha1.EventType, 0)

// Only create EventTypes for Broker sinks.
// We add this check here in case the Source was changed from a Broker to non-Broker sink.
// If so, we need to delete the existing EventTypes, thus we return empty expected.
if args.Kind != "Broker" {
return eventTypes, nil
}

for _, spec := range args.Specs {
eventType := resources.MakeEventType(spec, args.Namespace, args.Labels)
// Setting the reference to delete the EventType upon uninstalling the source.
Expand Down
12 changes: 5 additions & 7 deletions pkg/reconciler/githubsource/githubsource.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,14 +195,11 @@ func (r *reconciler) reconcile(ctx context.Context, source *sourcesv1alpha1.GitH
return nil
}

// Only create EventTypes for Broker sinks.
if source.Spec.Sink.Kind == "Broker" {
err = r.reconcileEventTypes(ctx, source)
if err != nil {
return err
}
source.Status.MarkEventTypes()
err = r.reconcileEventTypes(ctx, source)
if err != nil {
return err
}
source.Status.MarkEventTypes()

return nil
}
Expand Down Expand Up @@ -366,6 +363,7 @@ func (r *reconciler) newEventTypeReconcilerArgs(source *sourcesv1alpha1.GitHubSo
Specs: specs,
Namespace: source.Namespace,
Labels: resources.Labels(source.Name),
Kind: source.Spec.Sink.Kind,
}
}

Expand Down
Loading