From 9d9e2f8085c2b7a0d1f7d9cbc403f55b5134472d Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Fri, 17 Jul 2020 18:19:33 +0300 Subject: [PATCH] Fix KafkaChannel conversion (#1398) (cherry picked from commit e4de8d36412c54da4722cc35eb2a748424c8edf5) --- kafka/channel/config/300-kafka-channel.yaml | 7 +++++ .../v1alpha1/kafka_channel_conversion.go | 26 ++++++++++++++----- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/kafka/channel/config/300-kafka-channel.yaml b/kafka/channel/config/300-kafka-channel.yaml index fd8bc5a83e..8c3cfddf93 100644 --- a/kafka/channel/config/300-kafka-channel.yaml +++ b/kafka/channel/config/300-kafka-channel.yaml @@ -66,3 +66,10 @@ spec: - name: v1beta1 served: true storage: false + conversion: + strategy: Webhook + conversionReviewVersions: ["v1beta1", "v1alpha1"] + webhookClientConfig: + service: + name: kafka-webhook + namespace: knative-eventing diff --git a/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_conversion.go b/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_conversion.go index d06ab6e73b..3d4bdace18 100644 --- a/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_conversion.go +++ b/kafka/channel/pkg/apis/messaging/v1alpha1/kafka_channel_conversion.go @@ -65,14 +65,21 @@ func (source *KafkaChannel) ConvertTo(ctx context.Context, obj apis.Convertible) subscribableSpec.Subscribers = make([]eventingduckv1.SubscriberSpec, len(source.Spec.Subscribable.Subscribers)) for i, ss := range source.Spec.Subscribable.Subscribers { delivery := &eventingduckv1.DeliverySpec{} - if err := ss.Delivery.ConvertTo(ctx, delivery); err != nil { - return err + if ss.Delivery != nil { + if err := ss.Delivery.ConvertTo(ctx, delivery); err != nil { + return err + } + } + + var subscriberURI *apis.URL + if ss.SubscriberURI != nil { + subscriberURI = ss.SubscriberURI.DeepCopy() } subscribableSpec.Subscribers[i] = eventingduckv1.SubscriberSpec{ UID: ss.UID, Generation: ss.Generation, - SubscriberURI: ss.SubscriberURI.DeepCopy(), + SubscriberURI: subscriberURI, ReplyURI: ss.ReplyURI, Delivery: delivery, } @@ -141,8 +148,15 @@ func (sink *KafkaChannel) ConvertFrom(ctx context.Context, obj apis.Convertible) subscribableSpec.Subscribers = make([]eventingduckv1alpha1.SubscriberSpec, len(source.Spec.SubscribableSpec.Subscribers)) for i, ss := range source.Spec.SubscribableSpec.Subscribers { delivery := &eventingduckv1beta1.DeliverySpec{} - if err := delivery.ConvertFrom(ctx, ss.Delivery); err != nil { - return err + if ss.Delivery != nil { + if err := delivery.ConvertFrom(ctx, ss.Delivery); err != nil { + return err + } + } + + var deadLetterSinkURI *apis.URL + if ss.Delivery != nil && ss.Delivery.DeadLetterSink != nil && ss.Delivery.DeadLetterSink.URI != nil { + deadLetterSinkURI = ss.Delivery.DeadLetterSink.URI.DeepCopy() } subscribableSpec.Subscribers[i] = eventingduckv1alpha1.SubscriberSpec{ @@ -150,7 +164,7 @@ func (sink *KafkaChannel) ConvertFrom(ctx context.Context, obj apis.Convertible) Generation: ss.Generation, SubscriberURI: ss.SubscriberURI.DeepCopy(), ReplyURI: ss.ReplyURI, - DeadLetterSinkURI: ss.Delivery.DeadLetterSink.URI.DeepCopy(), + DeadLetterSinkURI: deadLetterSinkURI, Delivery: delivery, } }