Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Fix KafkaChannel conversion (#1398) (#1567)
Browse files Browse the repository at this point in the history
(cherry picked from commit e4de8d3)
  • Loading branch information
aliok authored Sep 14, 2020
1 parent 9cefcda commit 15eb732
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 6 deletions.
7 changes: 7 additions & 0 deletions kafka/channel/config/300-kafka-channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,10 @@ spec:
- name: v1beta1
served: true
storage: false
conversion:
strategy: Webhook
conversionReviewVersions: ["v1beta1", "v1alpha1"]
webhookClientConfig:
service:
name: kafka-webhook
namespace: knative-eventing
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,21 @@ func (source *KafkaChannel) ConvertTo(ctx context.Context, obj apis.Convertible)
subscribableSpec.Subscribers = make([]eventingduckv1.SubscriberSpec, len(source.Spec.Subscribable.Subscribers))
for i, ss := range source.Spec.Subscribable.Subscribers {
delivery := &eventingduckv1.DeliverySpec{}
if err := ss.Delivery.ConvertTo(ctx, delivery); err != nil {
return err
if ss.Delivery != nil {
if err := ss.Delivery.ConvertTo(ctx, delivery); err != nil {
return err
}
}

var subscriberURI *apis.URL
if ss.SubscriberURI != nil {
subscriberURI = ss.SubscriberURI.DeepCopy()
}

subscribableSpec.Subscribers[i] = eventingduckv1.SubscriberSpec{
UID: ss.UID,
Generation: ss.Generation,
SubscriberURI: ss.SubscriberURI.DeepCopy(),
SubscriberURI: subscriberURI,
ReplyURI: ss.ReplyURI,
Delivery: delivery,
}
Expand Down Expand Up @@ -141,16 +148,23 @@ func (sink *KafkaChannel) ConvertFrom(ctx context.Context, obj apis.Convertible)
subscribableSpec.Subscribers = make([]eventingduckv1alpha1.SubscriberSpec, len(source.Spec.SubscribableSpec.Subscribers))
for i, ss := range source.Spec.SubscribableSpec.Subscribers {
delivery := &eventingduckv1beta1.DeliverySpec{}
if err := delivery.ConvertFrom(ctx, ss.Delivery); err != nil {
return err
if ss.Delivery != nil {
if err := delivery.ConvertFrom(ctx, ss.Delivery); err != nil {
return err
}
}

var deadLetterSinkURI *apis.URL
if ss.Delivery != nil && ss.Delivery.DeadLetterSink != nil && ss.Delivery.DeadLetterSink.URI != nil {
deadLetterSinkURI = ss.Delivery.DeadLetterSink.URI.DeepCopy()
}

subscribableSpec.Subscribers[i] = eventingduckv1alpha1.SubscriberSpec{
UID: ss.UID,
Generation: ss.Generation,
SubscriberURI: ss.SubscriberURI.DeepCopy(),
ReplyURI: ss.ReplyURI,
DeadLetterSinkURI: ss.Delivery.DeadLetterSink.URI.DeepCopy(),
DeadLetterSinkURI: deadLetterSinkURI,
Delivery: delivery,
}
}
Expand Down

0 comments on commit 15eb732

Please sign in to comment.