Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Kafka src desitation #688

Merged
merged 3 commits into from
Oct 29, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions kafka/source/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,10 @@ event sink.
bootstrapServers: REPLACE_WITH_CLUSTER_URL
topics: knative-demo-topic
sink:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: event-display
ref:
apiVersion: serving.knative.dev/v1alpha1
kind: Service
name: event-display
```

## Example
Expand Down
25 changes: 24 additions & 1 deletion kafka/source/config/300-kafkasource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,30 @@ spec:
serviceAccountName:
type: string
sink:
type: object
anyOf:
- type: object
description: "the destination that should receive events."
properties:
ref:
type: object
description: "a reference to a Kubernetes object from which to retrieve the target URI."
required:
- apiVersion
- kind
- name
properties:
apiVersion:
type: string
minLength: 1
kind:
type: string
minLength: 1
name:
type: string
minLength: 1
uri:
type: string
description: "the target URI. If ref is provided, this must be relative URI reference."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, @grantr will PR the same for couch as well - after these are in, ok ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm on it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lionelvillard I think we should also change anyOf to oneOf

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@grantr yup you're right

type: object
required:
- bootstrapServers
Expand Down
17 changes: 17 additions & 0 deletions kafka/source/pkg/apis/sources/v1alpha1/kafka_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package v1alpha1

import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"knative.dev/eventing/pkg/apis/duck"
"knative.dev/pkg/apis"
)
Expand Down Expand Up @@ -68,6 +69,22 @@ func (s *KafkaSourceStatus) MarkSink(uri string) {
}
}

// MarkSinkWarnDeprecated sets the condition that the source has a sink configured and warns ref is deprecated.
func (s *KafkaSourceStatus) MarkSinkWarnRefDeprecated(uri string) {
s.SinkURI = uri
if len(uri) > 0 {
c := apis.Condition{
Type: KafkaConditionSinkProvided,
Status: corev1.ConditionTrue,
Severity: apis.ConditionSeverityError,
Message: "Using deprecated object ref fields when specifying spec.sink. Update to spec.sink.ref. These will be removed in 0.11.",
}
KafkaSourceCondSet.Manage(s).SetCondition(c)
} else {
KafkaSourceCondSet.Manage(s).MarkUnknown(KafkaConditionSinkProvided, "SinkEmpty", "Sink has resolved to empty.%s", "")
}
}

// MarkNoSink sets the condition that the source does not have a sink configured.
func (s *KafkaSourceStatus) MarkNoSink(reason, messageFormat string, messageA ...interface{}) {
KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionSinkProvided, reason, messageFormat, messageA...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ var (
},
},
}
condReady = apis.Condition{
Type: KafkaConditionReady,
Status: corev1.ConditionTrue,
}
)

// Check that KafkaSource implements the Conditions duck type.
Expand Down
3 changes: 2 additions & 1 deletion kafka/source/pkg/apis/sources/v1alpha1/kafka_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
apisv1alpha1 "knative.dev/pkg/apis/v1alpha1"
"knative.dev/pkg/kmeta"
)

Expand Down Expand Up @@ -119,7 +120,7 @@ type KafkaSourceSpec struct {

// Sink is a reference to an object that will resolve to a domain name to use as the sink.
// +optional
Sink *corev1.ObjectReference `json:"sink,omitempty"`
Sink *apisv1alpha1.Destination `json:"sink,omitempty"`

// ServiceAccoutName is the name of the ServiceAccount that will be used to run the Receive
// Adapter Deployment.
Expand Down
73 changes: 43 additions & 30 deletions kafka/source/pkg/apis/sources/v1alpha1/kafka_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,21 @@ import (
"testing"

corev1 "k8s.io/api/core/v1"
apisv1alpha1 "knative.dev/pkg/apis/v1alpha1"
)

var (
fullSpec = KafkaSourceSpec{
BootstrapServers: "servers",
Topics: "topics",
ConsumerGroup: "group",
Sink: &corev1.ObjectReference{
APIVersion: "foo",
Kind: "bar",
Namespace: "baz",
Name: "qux",
Sink: &apisv1alpha1.Destination{
Ref: &corev1.ObjectReference{
APIVersion: "foo",
Kind: "bar",
Namespace: "baz",
Name: "qux",
},
},
ServiceAccountName: "service-account-name",
}
Expand Down Expand Up @@ -70,11 +73,13 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) {
orig: &fullSpec,
updated: KafkaSourceSpec{
Topics: fullSpec.Topics,
Sink: &corev1.ObjectReference{
APIVersion: "some-other-api-version",
Kind: fullSpec.Sink.Kind,
Namespace: fullSpec.Sink.Namespace,
Name: fullSpec.Sink.Name,
Sink: &apisv1alpha1.Destination{
Ref: &corev1.ObjectReference{
APIVersion: "some-other-api-version",
Kind: fullSpec.Sink.Ref.APIVersion,
Namespace: fullSpec.Sink.Ref.Namespace,
Name: fullSpec.Sink.Ref.Name,
},
},
ServiceAccountName: fullSpec.ServiceAccountName,
},
Expand All @@ -84,11 +89,13 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) {
orig: &fullSpec,
updated: KafkaSourceSpec{
Topics: fullSpec.Topics,
Sink: &corev1.ObjectReference{
APIVersion: fullSpec.Sink.APIVersion,
Kind: "some-other-kind",
Namespace: fullSpec.Sink.Namespace,
Name: fullSpec.Sink.Name,
Sink: &apisv1alpha1.Destination{
Ref: &corev1.ObjectReference{
APIVersion: fullSpec.Sink.Ref.APIVersion,
Kind: "some-other-kind",
Namespace: fullSpec.Sink.Ref.Namespace,
Name: fullSpec.Sink.Ref.Name,
},
},
ServiceAccountName: fullSpec.ServiceAccountName,
},
Expand All @@ -98,11 +105,13 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) {
orig: &fullSpec,
updated: KafkaSourceSpec{
Topics: fullSpec.Topics,
Sink: &corev1.ObjectReference{
APIVersion: fullSpec.Sink.APIVersion,
Kind: fullSpec.Sink.Kind,
Namespace: "some-other-namespace",
Name: fullSpec.Sink.Name,
Sink: &apisv1alpha1.Destination{
Ref: &corev1.ObjectReference{
APIVersion: fullSpec.Sink.Ref.APIVersion,
Kind: fullSpec.Sink.Ref.Kind,
Namespace: "some-other-namespace",
Name: fullSpec.Sink.Ref.Name,
},
},
ServiceAccountName: fullSpec.ServiceAccountName,
},
Expand All @@ -112,11 +121,13 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) {
orig: &fullSpec,
updated: KafkaSourceSpec{
Topics: fullSpec.Topics,
Sink: &corev1.ObjectReference{
APIVersion: fullSpec.Sink.APIVersion,
Kind: fullSpec.Sink.Kind,
Namespace: fullSpec.Sink.Namespace,
Name: "some-other-name",
Sink: &apisv1alpha1.Destination{
Ref: &corev1.ObjectReference{
APIVersion: fullSpec.Sink.Ref.APIVersion,
Kind: fullSpec.Sink.Ref.Kind,
Namespace: fullSpec.Sink.Ref.Namespace,
Name: "some-other-name",
},
},
ServiceAccountName: fullSpec.ServiceAccountName,
},
Expand All @@ -126,11 +137,13 @@ func TestKafkaSourceCheckImmutableFields(t *testing.T) {
orig: &fullSpec,
updated: KafkaSourceSpec{
Topics: fullSpec.Topics,
Sink: &corev1.ObjectReference{
APIVersion: fullSpec.Sink.APIVersion,
Kind: fullSpec.Sink.Kind,
Namespace: fullSpec.Sink.Namespace,
Name: "some-other-name",
Sink: &apisv1alpha1.Destination{
Ref: &corev1.ObjectReference{
APIVersion: fullSpec.Sink.Ref.APIVersion,
Kind: fullSpec.Sink.Ref.Kind,
Namespace: fullSpec.Sink.Ref.Namespace,
Name: "some-other-name",
},
},
ServiceAccountName: fullSpec.ServiceAccountName,
},
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions kafka/source/pkg/reconciler/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import (
kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1alpha1/kafkasource"
"knative.dev/eventing/pkg/apis/sources/v1alpha1"
eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventtype"
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/reconciler"
deploymentinformer "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/resolver"
)

const (
Expand Down Expand Up @@ -65,8 +65,7 @@ func NewController(
}

impl := controller.NewImpl(c, c.Logger, "KafkaSource")

c.sinkReconciler = duck.NewSinkReconciler(ctx, impl.EnqueueKey)
c.sinkResolver = resolver.NewURIResolver(ctx, impl.EnqueueKey)

c.Logger.Info("Setting up kafka event handlers")

Expand Down
40 changes: 30 additions & 10 deletions kafka/source/pkg/reconciler/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import (
clientset "knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned"
listers "knative.dev/eventing-contrib/kafka/source/pkg/client/listers/sources/v1alpha1"
eventinglisters "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"
"knative.dev/eventing/pkg/duck"
"knative.dev/eventing/pkg/reconciler"
"knative.dev/pkg/resolver"

"knative.dev/pkg/controller"
pkgLogging "knative.dev/pkg/logging"
Expand All @@ -75,7 +75,7 @@ type Reconciler struct {
receiveAdapterImage string
eventTypeLister eventinglisters.EventTypeLister
kafkaLister listers.KafkaSourceLister
sinkReconciler *duck.SinkReconciler
sinkResolver *resolver.URIResolver
deploymentLister appsv1listers.DeploymentLister
loggingContext context.Context
loggingConfig *pkgLogging.Config
Expand Down Expand Up @@ -130,18 +130,38 @@ func (r *Reconciler) reconcile(ctx context.Context, src *v1alpha1.KafkaSource) e
return errors.New("spec.sink missing")
}

sinkObjRef := src.Spec.Sink
if sinkObjRef.Namespace == "" {
sinkObjRef.Namespace = src.Namespace
if src.Spec.Sink == nil {
src.Status.MarkNoSink("SinkMissing", "")
return fmt.Errorf("spec.sink missing")
}

dest := src.Spec.Sink.DeepCopy()
if dest.Ref != nil {
// To call URIFromDestination(), dest.Ref must have a Namespace. If there is
// no Namespace defined in dest.Ref, we will use the Namespace of the source
// as the Namespace of dest.Ref.
if dest.Ref.Namespace == "" {
//TODO how does this work with deprecated fields
dest.Ref.Namespace = src.GetNamespace()
}
} else if dest.DeprecatedName != "" && dest.DeprecatedNamespace == "" {
// If Ref is nil and the deprecated ref is present, we need to check for
// DeprecatedNamespace. This can be removed when DeprecatedNamespace is
// removed.
dest.DeprecatedNamespace = src.GetNamespace()
}
kafkaDesc := fmt.Sprintf("%s/%s,%s", src.Namespace, src.Name, src.GroupVersionKind().String())
sinkURI, err := r.sinkReconciler.GetSinkURI(sinkObjRef, src, kafkaDesc)
sinkURI, err := r.sinkResolver.URIFromDestination(*dest, src)
if err != nil {
src.Status.MarkNoSink("NotFound", "")
return fmt.Errorf("getting sink URI: %v", err)
}

src.Status.MarkSink(sinkURI)
if src.Spec.Sink.DeprecatedAPIVersion != "" &&
src.Spec.Sink.DeprecatedKind != "" &&
src.Spec.Sink.DeprecatedName != "" {
src.Status.MarkSinkWarnRefDeprecated(sinkURI)
} else {
src.Status.MarkSink(sinkURI)
}

ra, err := r.createReceiveAdapter(ctx, src, sinkURI)
if err != nil {
Expand Down Expand Up @@ -391,7 +411,7 @@ func (r *Reconciler) makeEventTypes(src *v1alpha1.KafkaSource) ([]eventingv1alph
// Only create EventTypes for Broker sinks.
// We add this check here in case the KafkaSource was changed from Broker to non-Broker sink.
// If so, we need to delete the existing ones, thus we return empty expected.
if src.Spec.Sink.Kind != "Broker" {
if ref := src.Spec.Sink.GetRef(); ref == nil || ref.Kind != "Broker" {
return eventTypes, nil
}
topics := strings.Split(src.Spec.Topics, ",")
Expand Down
2 changes: 1 addition & 1 deletion kafka/source/pkg/reconciler/resources/eventtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func MakeEventType(args *EventTypeArgs) eventingv1alpha1.EventType {
Spec: eventingv1alpha1.EventTypeSpec{
Type: args.Type,
Source: args.Source,
Broker: args.Src.Spec.Sink.Name,
Broker: args.Src.Spec.Sink.GetRef().Name,
},
}
}
11 changes: 7 additions & 4 deletions kafka/source/pkg/reconciler/resources/eventtype_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
eventingv1alpha1 "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
"knative.dev/eventing/pkg/utils"
apisv1alpha1 "knative.dev/pkg/apis/v1alpha1"
)

func TestMakeEventType(t *testing.T) {
Expand All @@ -42,10 +43,12 @@ func TestMakeEventType(t *testing.T) {
Topics: "topic1,topic2",
BootstrapServers: "server1,server2",
ConsumerGroup: "group",
Sink: &corev1.ObjectReference{
Name: "test-sink",
Kind: "Sink",
APIVersion: "duck.knative.dev/v1",
Sink: &apisv1alpha1.Destination{
Ref: &corev1.ObjectReference{
Name: "test-sink",
Kind: "Sink",
APIVersion: "duck.knative.dev/v1",
},
},
},
},
Expand Down