From f1f0b1e9832da92624775ba055df26074f866343 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Mon, 27 Jul 2020 11:58:58 +0300 Subject: [PATCH] Kafkasource v1beta1 reconciler (#1405) * Search & replace v1alpha1 of Kafka source and binding to v1beta1 - doesn't compile yet * Use v1beta1 of KafkaSource and KafkaBinding in reconciler --- kafka/source/pkg/adapter/adapter_test.go | 30 ++++----- kafka/source/pkg/adapter/message.go | 6 +- .../apis/sources/v1beta1/kafka_lifecycle.go | 14 ++-- .../pkg/reconciler/binding/controller.go | 6 +- .../pkg/reconciler/source/controller.go | 4 +- .../pkg/reconciler/source/kafkasource.go | 64 +++++-------------- .../source/resources/receive_adapter.go | 43 ++----------- .../source/resources/receive_adapter_test.go | 61 +++++++----------- 8 files changed, 76 insertions(+), 152 deletions(-) diff --git a/kafka/source/pkg/adapter/adapter_test.go b/kafka/source/pkg/adapter/adapter_test.go index 8ab3ed0c15..c5a1d4115d 100644 --- a/kafka/source/pkg/adapter/adapter_test.go +++ b/kafka/source/pkg/adapter/adapter_test.go @@ -35,7 +35,7 @@ import ( "knative.dev/eventing/pkg/kncloudevents" - sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1" + sourcesv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1" ) func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) { @@ -63,8 +63,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) { "ce-specversion": "1.0", "ce-id": makeEventId(1, 2), "ce-time": types.FormatTime(aTimestamp), - "ce-type": sourcesv1alpha1.KafkaEventType, - "ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"), + "ce-type": sourcesv1beta1.KafkaEventType, + "ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"), "ce-subject": makeEventSubject(1, 2), "ce-key": "key", }, @@ -85,8 +85,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) { "ce-specversion": "1.0", "ce-id": makeEventId(1, 2), "ce-time": types.FormatTime(aTimestamp), - "ce-type": sourcesv1alpha1.KafkaEventType, - "ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"), + "ce-type": sourcesv1beta1.KafkaEventType, + "ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"), "ce-subject": makeEventSubject(1, 2), "ce-key": "-16771305", }, @@ -108,8 +108,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) { "ce-specversion": "1.0", "ce-id": makeEventId(1, 2), "ce-time": types.FormatTime(aTimestamp), - "ce-type": sourcesv1alpha1.KafkaEventType, - "ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"), + "ce-type": sourcesv1beta1.KafkaEventType, + "ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"), "ce-subject": makeEventSubject(1, 2), "ce-key": "0.00000000000000000000000000000000000002536316309005082", }, @@ -131,8 +131,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) { "ce-specversion": "1.0", "ce-id": makeEventId(1, 2), "ce-time": types.FormatTime(aTimestamp), - "ce-type": sourcesv1alpha1.KafkaEventType, - "ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"), + "ce-type": sourcesv1beta1.KafkaEventType, + "ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"), "ce-subject": makeEventSubject(1, 2), "ce-key": "AQoXFw==", }, @@ -162,8 +162,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) { "ce-specversion": "1.0", "ce-id": makeEventId(1, 2), "ce-time": types.FormatTime(aTimestamp), - "ce-type": sourcesv1alpha1.KafkaEventType, - "ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"), + "ce-type": sourcesv1beta1.KafkaEventType, + "ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"), "ce-subject": makeEventSubject(1, 2), "ce-key": "key", "ce-kafkaheaderhello": "world", @@ -194,8 +194,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) { "ce-specversion": "1.0", "ce-id": makeEventId(1, 2), "ce-time": types.FormatTime(aTimestamp), - "ce-type": sourcesv1alpha1.KafkaEventType, - "ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"), + "ce-type": sourcesv1beta1.KafkaEventType, + "ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"), "ce-subject": makeEventSubject(1, 2), "ce-key": "key", "ce-kafkaheaderhellobla": "world", @@ -306,8 +306,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) { "ce-specversion": "1.0", "ce-id": makeEventId(1, 2), "ce-time": types.FormatTime(aTimestamp), - "ce-type": sourcesv1alpha1.KafkaEventType, - "ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"), + "ce-type": sourcesv1beta1.KafkaEventType, + "ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"), "ce-subject": makeEventSubject(1, 2), "ce-key": "key", }, diff --git a/kafka/source/pkg/adapter/message.go b/kafka/source/pkg/adapter/message.go index 2eeb0f234a..f665e2e8c2 100644 --- a/kafka/source/pkg/adapter/message.go +++ b/kafka/source/pkg/adapter/message.go @@ -33,7 +33,7 @@ import ( "go.opencensus.io/trace" "go.uber.org/zap" - sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1" + sourcesv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1" ) func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, span *trace.Span, cm *sarama.ConsumerMessage, req *nethttp.Request, logger *zap.Logger) error { @@ -61,8 +61,8 @@ func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, span *trace. event.SetID(makeEventId(cm.Partition, cm.Offset)) event.SetTime(cm.Timestamp) - event.SetType(sourcesv1alpha1.KafkaEventType) - event.SetSource(sourcesv1alpha1.KafkaEventSource(a.config.Namespace, a.config.Name, cm.Topic)) + event.SetType(sourcesv1beta1.KafkaEventType) + event.SetSource(sourcesv1beta1.KafkaEventSource(a.config.Namespace, a.config.Name, cm.Topic)) event.SetSubject(makeEventSubject(cm.Partition, cm.Offset)) dumpKafkaMetaToEvent(&event, a.keyTypeMapper, cm.Key, kafkaMsg) diff --git a/kafka/source/pkg/apis/sources/v1beta1/kafka_lifecycle.go b/kafka/source/pkg/apis/sources/v1beta1/kafka_lifecycle.go index ea1109617a..f5d6e1494c 100644 --- a/kafka/source/pkg/apis/sources/v1beta1/kafka_lifecycle.go +++ b/kafka/source/pkg/apis/sources/v1beta1/kafka_lifecycle.go @@ -32,9 +32,9 @@ const ( // KafkaConditionDeployed has status True when the KafkaSource has had it's receive adapter deployment created. KafkaConditionDeployed apis.ConditionType = "Deployed" - // KafkaConditionResources is True when the resources listed for the KafkaSource have been properly - // parsed and match specified syntax for resource quantities - KafkaConditionResources apis.ConditionType = "ResourcesCorrect" + // KafkaConditionKeyType is True when the KafkaSource has been configured with valid key type for + // the key deserializer. + KafkaConditionKeyType apis.ConditionType = "KeyTypeCorrect" ) var KafkaSourceCondSet = apis.NewLivingConditionSet( @@ -106,10 +106,10 @@ func (s *KafkaSourceStatus) MarkNotDeployed(reason, messageFormat string, messag KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionDeployed, reason, messageFormat, messageA...) } -func (s *KafkaSourceStatus) MarkResourcesCorrect() { - KafkaSourceCondSet.Manage(s).MarkTrue(KafkaConditionResources) +func (s *KafkaSourceStatus) MarkKeyTypeCorrect() { + KafkaSourceCondSet.Manage(s).MarkTrue(KafkaConditionKeyType) } -func (s *KafkaSourceStatus) MarkResourcesIncorrect(reason, messageFormat string, messageA ...interface{}) { - KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionResources, reason, messageFormat, messageA...) +func (s *KafkaSourceStatus) MarkKeyTypeIncorrect(reason, messageFormat string, messageA ...interface{}) { + KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionKeyType, reason, messageFormat, messageA...) } diff --git a/kafka/source/pkg/reconciler/binding/controller.go b/kafka/source/pkg/reconciler/binding/controller.go index 9bfa3db1d4..931f381b7d 100644 --- a/kafka/source/pkg/reconciler/binding/controller.go +++ b/kafka/source/pkg/reconciler/binding/controller.go @@ -19,7 +19,7 @@ package binding import ( "context" - kfkinformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/bindings/v1alpha1/kafkabinding" + kfkinformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/bindings/v1beta1/kafkabinding" "knative.dev/pkg/client/injection/ducks/duck/v1/podspecable" "knative.dev/pkg/client/injection/kube/informers/core/v1/namespace" "knative.dev/pkg/reconciler" @@ -30,7 +30,7 @@ import ( "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1alpha1" + "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1beta1" "knative.dev/pkg/apis/duck" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" @@ -72,7 +72,7 @@ func NewController( return nil }, }, - GVR: v1alpha1.SchemeGroupVersion.WithResource("kafkabindings"), + GVR: v1beta1.SchemeGroupVersion.WithResource("kafkabindings"), Get: func(namespace string, name string) (psbinding.Bindable, error) { return kfkInformer.Lister().KafkaBindings(namespace).Get(name) }, diff --git a/kafka/source/pkg/reconciler/source/controller.go b/kafka/source/pkg/reconciler/source/controller.go index 7d0e1b5a68..74f8c9c039 100644 --- a/kafka/source/pkg/reconciler/source/controller.go +++ b/kafka/source/pkg/reconciler/source/controller.go @@ -33,8 +33,8 @@ import ( "knative.dev/pkg/resolver" kafkaclient "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/client" - kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1alpha1/kafkasource" - "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1alpha1/kafkasource" + kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1beta1/kafkasource" + "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1beta1/kafkasource" ) func NewController( diff --git a/kafka/source/pkg/reconciler/source/kafkasource.go b/kafka/source/pkg/reconciler/source/kafkasource.go index 78c5067de9..6771cb3029 100644 --- a/kafka/source/pkg/reconciler/source/kafkasource.go +++ b/kafka/source/pkg/reconciler/source/kafkasource.go @@ -27,13 +27,12 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/eventing/pkg/reconciler/source" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" - "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1" + "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1" "knative.dev/eventing-contrib/kafka/source/pkg/reconciler/source/resources" "k8s.io/client-go/kubernetes" @@ -43,8 +42,8 @@ import ( "knative.dev/pkg/resolver" "knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned" - reconcilerkafkasource "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1alpha1/kafkasource" - listers "knative.dev/eventing-contrib/kafka/source/pkg/client/listers/sources/v1alpha1" + reconcilerkafkasource "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1beta1/kafkasource" + listers "knative.dev/eventing-contrib/kafka/source/pkg/client/listers/sources/v1beta1" ) const ( @@ -93,10 +92,10 @@ type Reconciler struct { // Check that our Reconciler implements Interface var _ reconcilerkafkasource.Interface = (*Reconciler)(nil) -func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSource) pkgreconciler.Event { +func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1beta1.KafkaSource) pkgreconciler.Event { src.Status.InitializeConditions() - if src.Spec.Sink == nil { + if (src.Spec.Sink == duckv1.Destination{}) { src.Status.MarkNoSink("SinkMissing", "") return fmt.Errorf("spec.sink missing") } @@ -117,17 +116,19 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSourc } src.Status.MarkSink(sinkURI) - if val, ok := src.GetLabels()[v1alpha1.KafkaKeyTypeLabel]; ok { + if val, ok := src.GetLabels()[v1beta1.KafkaKeyTypeLabel]; ok { found := false - for _, allowed := range v1alpha1.KafkaKeyTypeAllowed { + for _, allowed := range v1beta1.KafkaKeyTypeAllowed { if allowed == val { found = true } } if !found { - src.Status.MarkResourcesIncorrect("IncorrectKafkaKeyTypeLabel", "Invalid value for %s: %s. Allowed: %v", v1alpha1.KafkaKeyTypeLabel, val, v1alpha1.KafkaKeyTypeAllowed) - logging.FromContext(ctx).Errorf("Invalid value for %s: %s. Allowed: %v", v1alpha1.KafkaKeyTypeLabel, val, v1alpha1.KafkaKeyTypeAllowed) + src.Status.MarkKeyTypeIncorrect("IncorrectKafkaKeyTypeLabel", "Invalid value for %s: %s. Allowed: %v", v1beta1.KafkaKeyTypeLabel, val, v1beta1.KafkaKeyTypeAllowed) + logging.FromContext(ctx).Errorf("Invalid value for %s: %s. Allowed: %v", v1beta1.KafkaKeyTypeLabel, val, v1beta1.KafkaKeyTypeAllowed) return errors.New("IncorrectKafkaKeyTypeLabel") + } else { + src.Status.MarkKeyTypeCorrect() } } @@ -151,40 +152,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSourc return nil } -func checkResourcesStatus(src *v1alpha1.KafkaSource) error { - for _, rsrc := range []struct { - key string - field string - }{{ - key: "Request.CPU", - field: src.Spec.Resources.Requests.ResourceCPU, - }, { - key: "Request.Memory", - field: src.Spec.Resources.Requests.ResourceMemory, - }, { - key: "Limit.CPU", - field: src.Spec.Resources.Limits.ResourceCPU, - }, { - key: "Limit.Memory", - field: src.Spec.Resources.Limits.ResourceMemory, - }} { - // In the event the field isn't specified, we assign a default in the receive_adapter - if rsrc.field != "" { - if _, err := resource.ParseQuantity(rsrc.field); err != nil { - src.Status.MarkResourcesIncorrect("Incorrect Resource", "%s: %s, Error: %s", rsrc.key, rsrc.field, err) - return err - } - } - } - src.Status.MarkResourcesCorrect() - return nil -} - -func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.KafkaSource, sinkURI *apis.URL) (*appsv1.Deployment, error) { - if err := checkResourcesStatus(src); err != nil { - return nil, err - } - +func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1beta1.KafkaSource, sinkURI *apis.URL) (*appsv1.Deployment, error) { raArgs := resources.ReceiveAdapterArgs{ Image: r.receiveAdapterImage, Source: src, @@ -195,7 +163,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Kaf expected := resources.MakeReceiveAdapter(&raArgs) ra, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).Get(expected.Name, metav1.GetOptions{}) - if apierrors.IsNotFound(err) { + if err != nil && apierrors.IsNotFound(err) { ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected) if err != nil { return nil, newDeploymentFailed(ra.Namespace, ra.Name, err) @@ -233,14 +201,14 @@ func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool { return false } -func (r *Reconciler) createCloudEventAttributes(src *v1alpha1.KafkaSource) []duckv1.CloudEventAttributes { +func (r *Reconciler) createCloudEventAttributes(src *v1beta1.KafkaSource) []duckv1.CloudEventAttributes { ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(src.Spec.Topics)) for i := range src.Spec.Topics { topics := strings.Split(src.Spec.Topics[i], ",") for _, topic := range topics { ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{ - Type: v1alpha1.KafkaEventType, - Source: v1alpha1.KafkaEventSource(src.Namespace, src.Name, topic), + Type: v1beta1.KafkaEventType, + Source: v1beta1.KafkaEventSource(src.Namespace, src.Name, topic), }) } } diff --git a/kafka/source/pkg/reconciler/source/resources/receive_adapter.go b/kafka/source/pkg/reconciler/source/resources/receive_adapter.go index 43d8ac30c6..d68abc3238 100644 --- a/kafka/source/pkg/reconciler/source/resources/receive_adapter.go +++ b/kafka/source/pkg/reconciler/source/resources/receive_adapter.go @@ -23,16 +23,15 @@ import ( v1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1" + "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1" "knative.dev/eventing/pkg/utils" "knative.dev/pkg/kmeta" ) type ReceiveAdapterArgs struct { Image string - Source *v1alpha1.KafkaSource + Source *v1beta1.KafkaSource Labels map[string]string SinkURI string AdditionalEnvs []corev1.EnvVar @@ -67,7 +66,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { Value: args.Source.Namespace, }}, args.AdditionalEnvs...) - if val, ok := args.Source.GetLabels()[v1alpha1.KafkaKeyTypeLabel]; ok { + if val, ok := args.Source.GetLabels()[v1beta1.KafkaKeyTypeLabel]; ok { env = append(env, corev1.EnvVar{ Name: "KEY_TYPE", Value: val, @@ -80,34 +79,6 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_TLS_KEY", args.Source.Spec.Net.TLS.Key.SecretKeyRef) env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_TLS_CA_CERT", args.Source.Spec.Net.TLS.CACert.SecretKeyRef) - RequestResourceCPU, err := resource.ParseQuantity(args.Source.Spec.Resources.Requests.ResourceCPU) - if err != nil { - RequestResourceCPU = resource.MustParse("250m") - } - RequestResourceMemory, err := resource.ParseQuantity(args.Source.Spec.Resources.Requests.ResourceMemory) - if err != nil { - RequestResourceMemory = resource.MustParse("512Mi") - } - LimitResourceCPU, err := resource.ParseQuantity(args.Source.Spec.Resources.Limits.ResourceCPU) - if err != nil { - LimitResourceCPU = resource.MustParse("250m") - } - LimitResourceMemory, err := resource.ParseQuantity(args.Source.Spec.Resources.Limits.ResourceMemory) - if err != nil { - LimitResourceMemory = resource.MustParse("512Mi") - } - - res := corev1.ResourceRequirements{ - Limits: corev1.ResourceList{ - corev1.ResourceCPU: RequestResourceCPU, - corev1.ResourceMemory: RequestResourceMemory, - }, - Requests: corev1.ResourceList{ - corev1.ResourceCPU: LimitResourceCPU, - corev1.ResourceMemory: LimitResourceMemory, - }, - } - return &v1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: utils.GenerateFixedName(args.Source, fmt.Sprintf("kafkasource-%s", args.Source.Name)), @@ -130,13 +101,11 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment { Labels: args.Labels, }, Spec: corev1.PodSpec{ - ServiceAccountName: args.Source.Spec.ServiceAccountName, Containers: []corev1.Container{ { - Name: "receive-adapter", - Image: args.Image, - Env: env, - Resources: res, + Name: "receive-adapter", + Image: args.Image, + Env: env, }, }, }, diff --git a/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go b/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go index bb61635ae8..b940a3843d 100644 --- a/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go +++ b/kafka/source/pkg/reconciler/source/resources/receive_adapter_test.go @@ -24,27 +24,26 @@ import ( "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - bindingsv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1alpha1" - "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1" + bindingsv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1beta1" + "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1" "knative.dev/pkg/kmp" ) func TestMakeReceiveAdapter(t *testing.T) { - src := &v1alpha1.KafkaSource{ + src := &v1beta1.KafkaSource{ ObjectMeta: metav1.ObjectMeta{ Name: "source-name", Namespace: "source-namespace", }, - Spec: v1alpha1.KafkaSourceSpec{ - ServiceAccountName: "source-svc-acct", - Topics: []string{"topic1,topic2"}, - ConsumerGroup: "group", - KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{ + Spec: v1beta1.KafkaSourceSpec{ + Topics: []string{"topic1,topic2"}, + ConsumerGroup: "group", + KafkaAuthSpec: bindingsv1beta1.KafkaAuthSpec{ BootstrapServers: []string{"server1,server2"}, - Net: bindingsv1alpha1.KafkaNetSpec{ - SASL: bindingsv1alpha1.KafkaSASLSpec{ + Net: bindingsv1beta1.KafkaNetSpec{ + SASL: bindingsv1beta1.KafkaSASLSpec{ Enable: true, - User: bindingsv1alpha1.SecretValueFromSource{ + User: bindingsv1beta1.SecretValueFromSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: "the-user-secret", @@ -52,7 +51,7 @@ func TestMakeReceiveAdapter(t *testing.T) { Key: "user", }, }, - Password: bindingsv1alpha1.SecretValueFromSource{ + Password: bindingsv1beta1.SecretValueFromSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: "the-password-secret", @@ -61,9 +60,9 @@ func TestMakeReceiveAdapter(t *testing.T) { }, }, }, - TLS: bindingsv1alpha1.KafkaTLSSpec{ + TLS: bindingsv1beta1.KafkaTLSSpec{ Enable: true, - Cert: bindingsv1alpha1.SecretValueFromSource{ + Cert: bindingsv1beta1.SecretValueFromSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: "the-cert-secret", @@ -71,7 +70,7 @@ func TestMakeReceiveAdapter(t *testing.T) { Key: "tls.crt", }, }, - Key: bindingsv1alpha1.SecretValueFromSource{ + Key: bindingsv1beta1.SecretValueFromSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: "the-key-secret", @@ -79,7 +78,7 @@ func TestMakeReceiveAdapter(t *testing.T) { Key: "tls.key", }, }, - CACert: bindingsv1alpha1.SecretValueFromSource{ + CACert: bindingsv1beta1.SecretValueFromSource{ SecretKeyRef: &corev1.SecretKeySelector{ LocalObjectReference: corev1.LocalObjectReference{ Name: "the-ca-cert-secret", @@ -249,15 +248,14 @@ func TestMakeReceiveAdapter(t *testing.T) { } func TestMakeReceiveAdapterNoNet(t *testing.T) { - src := &v1alpha1.KafkaSource{ + src := &v1beta1.KafkaSource{ ObjectMeta: metav1.ObjectMeta{ Name: "source-name", Namespace: "source-namespace", }, - Spec: v1alpha1.KafkaSourceSpec{ - ServiceAccountName: "source-svc-acct", - Topics: []string{"topic1,topic2"}, - KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{ + Spec: v1beta1.KafkaSourceSpec{ + Topics: []string{"topic1,topic2"}, + KafkaAuthSpec: bindingsv1beta1.KafkaAuthSpec{ BootstrapServers: []string{"server1,server2"}, }, ConsumerGroup: "group", @@ -362,16 +360,6 @@ func TestMakeReceiveAdapterNoNet(t *testing.T) { if diff, err := kmp.SafeDiff(want, got); err != nil { t.Errorf("unexpected deploy (-want, +got) = %v", diff) } - src.Spec.Resources = v1alpha1.KafkaResourceSpec{ - Requests: v1alpha1.KafkaRequestsSpec{ - ResourceCPU: "101m", - ResourceMemory: "200Mi", - }, - Limits: v1alpha1.KafkaLimitsSpec{ - ResourceCPU: "102m", - ResourceMemory: "500Mi", - }, - } want.Spec.Template.Spec.Containers = []corev1.Container{ { Name: "receive-adapter", @@ -476,18 +464,17 @@ func TestMakeReceiveAdapterNoNet(t *testing.T) { } func TestMakeReceiveAdapterKeyType(t *testing.T) { - src := &v1alpha1.KafkaSource{ + src := &v1beta1.KafkaSource{ ObjectMeta: metav1.ObjectMeta{ Name: "source-name", Namespace: "source-namespace", Labels: map[string]string{ - v1alpha1.KafkaKeyTypeLabel: "int", + v1beta1.KafkaKeyTypeLabel: "int", }, }, - Spec: v1alpha1.KafkaSourceSpec{ - ServiceAccountName: "source-svc-acct", - Topics: []string{"topic1,topic2"}, - KafkaAuthSpec: bindingsv1alpha1.KafkaAuthSpec{ + Spec: v1beta1.KafkaSourceSpec{ + Topics: []string{"topic1,topic2"}, + KafkaAuthSpec: bindingsv1beta1.KafkaAuthSpec{ BootstrapServers: []string{"server1,server2"}, }, ConsumerGroup: "group",