diff --git a/kafka/source/README.md b/kafka/source/README.md index b2a0ba3c58..8e34f71a3a 100644 --- a/kafka/source/README.md +++ b/kafka/source/README.md @@ -41,9 +41,10 @@ event sink. bootstrapServers: REPLACE_WITH_CLUSTER_URL topics: knative-demo-topic sink: - apiVersion: serving.knative.dev/v1alpha1 - kind: Service - name: event-display + ref: + apiVersion: serving.knative.dev/v1alpha1 + kind: Service + name: event-display ``` ## Example diff --git a/kafka/source/config/300-kafkasource.yaml b/kafka/source/config/300-kafkasource.yaml index 324f5ef34f..01a7bc6962 100644 --- a/kafka/source/config/300-kafkasource.yaml +++ b/kafka/source/config/300-kafkasource.yaml @@ -126,7 +126,30 @@ spec: serviceAccountName: type: string sink: - type: object + anyOf: + - type: object + description: "the destination that should receive events." + properties: + ref: + type: object + description: "a reference to a Kubernetes object from which to retrieve the target URI." + required: + - apiVersion + - kind + - name + properties: + apiVersion: + type: string + minLength: 1 + kind: + type: string + minLength: 1 + name: + type: string + minLength: 1 + uri: + type: string + description: "the target URI. If ref is provided, this must be relative URI reference." type: object required: - bootstrapServers diff --git a/kafka/source/pkg/apis/sources/v1alpha1/kafka_lifecycle.go b/kafka/source/pkg/apis/sources/v1alpha1/kafka_lifecycle.go index 658a35aa7f..1178307917 100644 --- a/kafka/source/pkg/apis/sources/v1alpha1/kafka_lifecycle.go +++ b/kafka/source/pkg/apis/sources/v1alpha1/kafka_lifecycle.go @@ -18,6 +18,7 @@ package v1alpha1 import ( appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "knative.dev/eventing/pkg/apis/duck" "knative.dev/pkg/apis" ) @@ -68,6 +69,22 @@ func (s *KafkaSourceStatus) MarkSink(uri string) { } } +// MarkSinkWarnDeprecated sets the condition that the source has a sink configured and warns ref is deprecated. +func (s *KafkaSourceStatus) MarkSinkWarnRefDeprecated(uri string) { + s.SinkURI = uri + if len(uri) > 0 { + c := apis.Condition{ + Type: KafkaConditionSinkProvided, + Status: corev1.ConditionTrue, + Severity: apis.ConditionSeverityError, + Message: "Using deprecated object ref fields when specifying spec.sink. Update to spec.sink.ref. These will be removed in 0.11.", + } + KafkaSourceCondSet.Manage(s).SetCondition(c) + } else { + KafkaSourceCondSet.Manage(s).MarkUnknown(KafkaConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "") + } +} + // MarkNoSink sets the condition that the source does not have a sink configured. func (s *KafkaSourceStatus) MarkNoSink(reason, messageFormat string, messageA ...interface{}) { KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionSinkProvided, reason, messageFormat, messageA...) diff --git a/kafka/source/pkg/apis/sources/v1alpha1/kafka_lifecycle_test.go b/kafka/source/pkg/apis/sources/v1alpha1/kafka_lifecycle_test.go index ea679a60ca..eaf6b5b0af 100644 --- a/kafka/source/pkg/apis/sources/v1alpha1/kafka_lifecycle_test.go +++ b/kafka/source/pkg/apis/sources/v1alpha1/kafka_lifecycle_test.go @@ -39,6 +39,10 @@ var ( }, }, } + condReady = apis.Condition{ + Type: KafkaConditionReady, + Status: corev1.ConditionTrue, + } ) // Check that KafkaSource implements the Conditions duck type. diff --git a/kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go b/kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go index 79c3f63406..98b7e03e87 100644 --- a/kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go +++ b/kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go @@ -25,6 +25,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" + apisv1alpha1 "knative.dev/pkg/apis/v1alpha1" "knative.dev/pkg/kmeta" ) @@ -119,7 +120,7 @@ type KafkaSourceSpec struct { // Sink is a reference to an object that will resolve to a domain name to use as the sink. // +optional - Sink *corev1.ObjectReference `json:"sink,omitempty"` + Sink *apisv1alpha1.Destination `json:"sink,omitempty"` // ServiceAccoutName is the name of the ServiceAccount that will be used to run the Receive // Adapter Deployment. diff --git a/kafka/source/pkg/apis/sources/v1alpha1/kafka_validation_test.go b/kafka/source/pkg/apis/sources/v1alpha1/kafka_validation_test.go index aba30c5607..db713fcd7a 100644 --- a/kafka/source/pkg/apis/sources/v1alpha1/kafka_validation_test.go +++ b/kafka/source/pkg/apis/sources/v1alpha1/kafka_validation_test.go @@ -21,6 +21,7 @@ import ( "testing" corev1 "k8s.io/api/core/v1" + apisv1alpha1 "knative.dev/pkg/apis/v1alpha1" ) var ( @@ -28,11 +29,13 @@ var ( BootstrapServers: "servers", Topics: "topics", ConsumerGroup: "group", - Sink: &corev1.ObjectReference{ - APIVersion: "foo", - Kind: "bar", - Namespace: "baz", - Name: "qux", + Sink: &apisv1alpha1.Destination{ + Ref: &corev1.ObjectReference{ + APIVersion: "foo", + Kind: "bar", + Namespace: "baz", + Name: "qux", + }, }, ServiceAccountName: "service-account-name", } @@ -70,11 +73,13 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) { orig: &fullSpec, updated: KafkaSourceSpec{ Topics: fullSpec.Topics, - Sink: &corev1.ObjectReference{ - APIVersion: "some-other-api-version", - Kind: fullSpec.Sink.Kind, - Namespace: fullSpec.Sink.Namespace, - Name: fullSpec.Sink.Name, + Sink: &apisv1alpha1.Destination{ + Ref: &corev1.ObjectReference{ + APIVersion: "some-other-api-version", + Kind: fullSpec.Sink.Ref.APIVersion, + Namespace: fullSpec.Sink.Ref.Namespace, + Name: fullSpec.Sink.Ref.Name, + }, }, ServiceAccountName: fullSpec.ServiceAccountName, }, @@ -84,11 +89,13 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) { orig: &fullSpec, updated: KafkaSourceSpec{ Topics: fullSpec.Topics, - Sink: &corev1.ObjectReference{ - APIVersion: fullSpec.Sink.APIVersion, - Kind: "some-other-kind", - Namespace: fullSpec.Sink.Namespace, - Name: fullSpec.Sink.Name, + Sink: &apisv1alpha1.Destination{ + Ref: &corev1.ObjectReference{ + APIVersion: fullSpec.Sink.Ref.APIVersion, + Kind: "some-other-kind", + Namespace: fullSpec.Sink.Ref.Namespace, + Name: fullSpec.Sink.Ref.Name, + }, }, ServiceAccountName: fullSpec.ServiceAccountName, }, @@ -98,11 +105,13 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) { orig: &fullSpec, updated: KafkaSourceSpec{ Topics: fullSpec.Topics, - Sink: &corev1.ObjectReference{ - APIVersion: fullSpec.Sink.APIVersion, - Kind: fullSpec.Sink.Kind, - Namespace: "some-other-namespace", - Name: fullSpec.Sink.Name, + Sink: &apisv1alpha1.Destination{ + Ref: &corev1.ObjectReference{ + APIVersion: fullSpec.Sink.Ref.APIVersion, + Kind: fullSpec.Sink.Ref.Kind, + Namespace: "some-other-namespace", + Name: fullSpec.Sink.Ref.Name, + }, }, ServiceAccountName: fullSpec.ServiceAccountName, }, @@ -112,11 +121,13 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) { orig: &fullSpec, updated: KafkaSourceSpec{ Topics: fullSpec.Topics, - Sink: &corev1.ObjectReference{ - APIVersion: fullSpec.Sink.APIVersion, - Kind: fullSpec.Sink.Kind, - Namespace: fullSpec.Sink.Namespace, - Name: "some-other-name", + Sink: &apisv1alpha1.Destination{ + Ref: &corev1.ObjectReference{ + APIVersion: fullSpec.Sink.Ref.APIVersion, + Kind: fullSpec.Sink.Ref.Kind, + Namespace: fullSpec.Sink.Ref.Namespace, + Name: "some-other-name", + }, }, ServiceAccountName: fullSpec.ServiceAccountName, }, @@ -126,11 +137,13 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) { orig: &fullSpec, updated: KafkaSourceSpec{ Topics: fullSpec.Topics, - Sink: &corev1.ObjectReference{ - APIVersion: fullSpec.Sink.APIVersion, - Kind: fullSpec.Sink.Kind, - Namespace: fullSpec.Sink.Namespace, - Name: "some-other-name", + Sink: &apisv1alpha1.Destination{ + Ref: &corev1.ObjectReference{ + APIVersion: fullSpec.Sink.Ref.APIVersion, + Kind: fullSpec.Sink.Ref.Kind, + Namespace: fullSpec.Sink.Ref.Namespace, + Name: "some-other-name", + }, }, ServiceAccountName: fullSpec.ServiceAccountName, }, diff --git a/kafka/source/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go b/kafka/source/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go index 1416958422..a24c4f5240 100644 --- a/kafka/source/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go +++ b/kafka/source/pkg/apis/sources/v1alpha1/zz_generated.deepcopy.go @@ -23,6 +23,7 @@ package v1alpha1 import ( v1 "k8s.io/api/core/v1" runtime "k8s.io/apimachinery/pkg/runtime" + apisv1alpha1 "knative.dev/pkg/apis/v1alpha1" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -178,8 +179,8 @@ func (in *KafkaSourceSpec) DeepCopyInto(out *KafkaSourceSpec) { in.Net.DeepCopyInto(&out.Net) if in.Sink != nil { in, out := &in.Sink, &out.Sink - *out = new(v1.ObjectReference) - **out = **in + *out = new(apisv1alpha1.Destination) + (*in).DeepCopyInto(*out) } out.Resources = in.Resources return diff --git a/kafka/source/pkg/reconciler/controller.go b/kafka/source/pkg/reconciler/controller.go index 9cc444eec9..ffd3c392c9 100644 --- a/kafka/source/pkg/reconciler/controller.go +++ b/kafka/source/pkg/reconciler/controller.go @@ -27,12 +27,12 @@ import ( kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1alpha1/kafkasource" "knative.dev/eventing/pkg/apis/sources/v1alpha1" eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype" - "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/reconciler" deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/logging" + "knative.dev/pkg/resolver" ) const ( @@ -65,8 +65,7 @@ func NewController( } impl := controller.NewImpl(c, c.Logger, "KafkaSource") - - c.sinkReconciler = duck.NewSinkReconciler(ctx, impl.EnqueueKey) + c.sinkResolver = resolver.NewURIResolver(ctx, impl.EnqueueKey) c.Logger.Info("Setting up kafka event handlers") diff --git a/kafka/source/pkg/reconciler/kafkasource.go b/kafka/source/pkg/reconciler/kafkasource.go index 00aaf48f63..2d5515381d 100644 --- a/kafka/source/pkg/reconciler/kafkasource.go +++ b/kafka/source/pkg/reconciler/kafkasource.go @@ -48,8 +48,8 @@ import ( clientset "knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned" listers "knative.dev/eventing-contrib/kafka/source/pkg/client/listers/sources/v1alpha1" eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" - "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/reconciler" + "knative.dev/pkg/resolver" "knative.dev/pkg/controller" pkgLogging "knative.dev/pkg/logging" @@ -75,7 +75,7 @@ type Reconciler struct { receiveAdapterImage string eventTypeLister eventinglisters.EventTypeLister kafkaLister listers.KafkaSourceLister - sinkReconciler *duck.SinkReconciler + sinkResolver *resolver.URIResolver deploymentLister appsv1listers.DeploymentLister loggingContext context.Context loggingConfig *pkgLogging.Config @@ -130,18 +130,38 @@ func (r *Reconciler) reconcile(ctx context.Context, src *v1alpha1.KafkaSource) e return errors.New("spec.sink missing") } - sinkObjRef := src.Spec.Sink - if sinkObjRef.Namespace == "" { - sinkObjRef.Namespace = src.Namespace + if src.Spec.Sink == nil { + src.Status.MarkNoSink("SinkMissing", "") + return fmt.Errorf("spec.sink missing") + } + + dest := src.Spec.Sink.DeepCopy() + if dest.Ref != nil { + // To call URIFromDestination(), dest.Ref must have a Namespace. If there is + // no Namespace defined in dest.Ref, we will use the Namespace of the source + // as the Namespace of dest.Ref. + if dest.Ref.Namespace == "" { + //TODO how does this work with deprecated fields + dest.Ref.Namespace = src.GetNamespace() + } + } else if dest.DeprecatedName != "" && dest.DeprecatedNamespace == "" { + // If Ref is nil and the deprecated ref is present, we need to check for + // DeprecatedNamespace. This can be removed when DeprecatedNamespace is + // removed. + dest.DeprecatedNamespace = src.GetNamespace() } - kafkaDesc := fmt.Sprintf("%s/%s,%s", src.Namespace, src.Name, src.GroupVersionKind().String()) - sinkURI, err := r.sinkReconciler.GetSinkURI(sinkObjRef, src, kafkaDesc) + sinkURI, err := r.sinkResolver.URIFromDestination(*dest, src) if err != nil { src.Status.MarkNoSink("NotFound", "") return fmt.Errorf("getting sink URI: %v", err) } - - src.Status.MarkSink(sinkURI) + if src.Spec.Sink.DeprecatedAPIVersion != "" && + src.Spec.Sink.DeprecatedKind != "" && + src.Spec.Sink.DeprecatedName != "" { + src.Status.MarkSinkWarnRefDeprecated(sinkURI) + } else { + src.Status.MarkSink(sinkURI) + } ra, err := r.createReceiveAdapter(ctx, src, sinkURI) if err != nil { @@ -391,7 +411,7 @@ func (r *Reconciler) makeEventTypes(src *v1alpha1.KafkaSource) ([]eventingv1alph // Only create EventTypes for Broker sinks. // We add this check here in case the KafkaSource was changed from Broker to non-Broker sink. // If so, we need to delete the existing ones, thus we return empty expected. - if src.Spec.Sink.Kind != "Broker" { + if ref := src.Spec.Sink.GetRef(); ref == nil || ref.Kind != "Broker" { return eventTypes, nil } topics := strings.Split(src.Spec.Topics, ",") diff --git a/kafka/source/pkg/reconciler/resources/eventtype.go b/kafka/source/pkg/reconciler/resources/eventtype.go index 6b6c1d42c0..59fc3f9fb0 100644 --- a/kafka/source/pkg/reconciler/resources/eventtype.go +++ b/kafka/source/pkg/reconciler/resources/eventtype.go @@ -50,7 +50,7 @@ func MakeEventType(args *EventTypeArgs) eventingv1alpha1.EventType { Spec: eventingv1alpha1.EventTypeSpec{ Type: args.Type, Source: args.Source, - Broker: args.Src.Spec.Sink.Name, + Broker: args.Src.Spec.Sink.GetRef().Name, }, } } diff --git a/kafka/source/pkg/reconciler/resources/eventtype_test.go b/kafka/source/pkg/reconciler/resources/eventtype_test.go index b3b88d6e16..c58d197435 100644 --- a/kafka/source/pkg/reconciler/resources/eventtype_test.go +++ b/kafka/source/pkg/reconciler/resources/eventtype_test.go @@ -28,6 +28,7 @@ import ( "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1" eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1" "knative.dev/eventing/pkg/utils" + apisv1alpha1 "knative.dev/pkg/apis/v1alpha1" ) func TestMakeEventType(t *testing.T) { @@ -42,10 +43,12 @@ func TestMakeEventType(t *testing.T) { Topics: "topic1,topic2", BootstrapServers: "server1,server2", ConsumerGroup: "group", - Sink: &corev1.ObjectReference{ - Name: "test-sink", - Kind: "Sink", - APIVersion: "duck.knative.dev/v1", + Sink: &apisv1alpha1.Destination{ + Ref: &corev1.ObjectReference{ + Name: "test-sink", + Kind: "Sink", + APIVersion: "duck.knative.dev/v1", + }, }, }, },