Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Kafkasource v1beta1 reconciler #1405

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions kafka/source/pkg/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

"knative.dev/eventing/pkg/kncloudevents"

sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
sourcesv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1"
)

func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
Expand Down Expand Up @@ -63,8 +63,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
},
Expand All @@ -85,8 +85,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "-16771305",
},
Expand All @@ -108,8 +108,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "0.00000000000000000000000000000000000002536316309005082",
},
Expand All @@ -131,8 +131,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "AQoXFw==",
},
Expand Down Expand Up @@ -162,8 +162,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
"ce-kafkaheaderhello": "world",
Expand Down Expand Up @@ -194,8 +194,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
"ce-kafkaheaderhellobla": "world",
Expand Down Expand Up @@ -306,8 +306,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
},
Expand Down
6 changes: 3 additions & 3 deletions kafka/source/pkg/adapter/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"go.opencensus.io/trace"
"go.uber.org/zap"

sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
sourcesv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1"
)

func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, span *trace.Span, cm *sarama.ConsumerMessage, req *nethttp.Request, logger *zap.Logger) error {
Expand Down Expand Up @@ -61,8 +61,8 @@ func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, span *trace.

event.SetID(makeEventId(cm.Partition, cm.Offset))
event.SetTime(cm.Timestamp)
event.SetType(sourcesv1alpha1.KafkaEventType)
event.SetSource(sourcesv1alpha1.KafkaEventSource(a.config.Namespace, a.config.Name, cm.Topic))
event.SetType(sourcesv1beta1.KafkaEventType)
event.SetSource(sourcesv1beta1.KafkaEventSource(a.config.Namespace, a.config.Name, cm.Topic))
event.SetSubject(makeEventSubject(cm.Partition, cm.Offset))

dumpKafkaMetaToEvent(&event, a.keyTypeMapper, cm.Key, kafkaMsg)
Expand Down
14 changes: 7 additions & 7 deletions kafka/source/pkg/apis/sources/v1beta1/kafka_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ const (
// KafkaConditionDeployed has status True when the KafkaSource has had it's receive adapter deployment created.
KafkaConditionDeployed apis.ConditionType = "Deployed"

// KafkaConditionResources is True when the resources listed for the KafkaSource have been properly
// parsed and match specified syntax for resource quantities
KafkaConditionResources apis.ConditionType = "ResourcesCorrect"
// KafkaConditionKeyType is True when the KafkaSource has been configured with valid key type for
// the key deserializer.
KafkaConditionKeyType apis.ConditionType = "KeyTypeCorrect"
)

var KafkaSourceCondSet = apis.NewLivingConditionSet(
Expand Down Expand Up @@ -106,10 +106,10 @@ func (s *KafkaSourceStatus) MarkNotDeployed(reason, messageFormat string, messag
KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionDeployed, reason, messageFormat, messageA...)
}

func (s *KafkaSourceStatus) MarkResourcesCorrect() {
KafkaSourceCondSet.Manage(s).MarkTrue(KafkaConditionResources)
func (s *KafkaSourceStatus) MarkKeyTypeCorrect() {
KafkaSourceCondSet.Manage(s).MarkTrue(KafkaConditionKeyType)
}

func (s *KafkaSourceStatus) MarkResourcesIncorrect(reason, messageFormat string, messageA ...interface{}) {
KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionResources, reason, messageFormat, messageA...)
func (s *KafkaSourceStatus) MarkKeyTypeIncorrect(reason, messageFormat string, messageA ...interface{}) {
KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionKeyType, reason, messageFormat, messageA...)
}
6 changes: 3 additions & 3 deletions kafka/source/pkg/reconciler/binding/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package binding
import (
"context"

kfkinformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/bindings/v1alpha1/kafkabinding"
kfkinformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/bindings/v1beta1/kafkabinding"
"knative.dev/pkg/client/injection/ducks/duck/v1/podspecable"
"knative.dev/pkg/client/injection/kube/informers/core/v1/namespace"
"knative.dev/pkg/reconciler"
Expand All @@ -30,7 +30,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1alpha1"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1beta1"
"knative.dev/pkg/apis/duck"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -72,7 +72,7 @@ func NewController(
return nil
},
},
GVR: v1alpha1.SchemeGroupVersion.WithResource("kafkabindings"),
GVR: v1beta1.SchemeGroupVersion.WithResource("kafkabindings"),
Get: func(namespace string, name string) (psbinding.Bindable, error) {
return kfkInformer.Lister().KafkaBindings(namespace).Get(name)
},
Expand Down
4 changes: 2 additions & 2 deletions kafka/source/pkg/reconciler/source/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"knative.dev/pkg/resolver"

kafkaclient "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/client"
kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1alpha1/kafkasource"
"knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1alpha1/kafkasource"
kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1beta1/kafkasource"
"knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1beta1/kafkasource"
)

func NewController(
Copy link
Member Author

@aliok aliok Jul 23, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if that's a blocker for this PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps go w/ "latest" of the dependency ? (aka master branch)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to learn more about Go modules :D

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me do this later. Nothing unique from that package is used anyway. Only a Kind function which is identical in v1alpha1.
When the dependency to eventing is changed to master I can do it.

Expand Down
64 changes: 16 additions & 48 deletions kafka/source/pkg/reconciler/source/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/pkg/reconciler/source"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1"
"knative.dev/eventing-contrib/kafka/source/pkg/reconciler/source/resources"

"k8s.io/client-go/kubernetes"
Expand All @@ -43,8 +42,8 @@ import (
"knative.dev/pkg/resolver"

"knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned"
reconcilerkafkasource "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1alpha1/kafkasource"
listers "knative.dev/eventing-contrib/kafka/source/pkg/client/listers/sources/v1alpha1"
reconcilerkafkasource "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1beta1/kafkasource"
listers "knative.dev/eventing-contrib/kafka/source/pkg/client/listers/sources/v1beta1"
)

const (
Expand Down Expand Up @@ -93,10 +92,10 @@ type Reconciler struct {
// Check that our Reconciler implements Interface
var _ reconcilerkafkasource.Interface = (*Reconciler)(nil)

func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSource) pkgreconciler.Event {
func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1beta1.KafkaSource) pkgreconciler.Event {
src.Status.InitializeConditions()

if src.Spec.Sink == nil {
if (src.Spec.Sink == duckv1.Destination{}) {
src.Status.MarkNoSink("SinkMissing", "")
return fmt.Errorf("spec.sink missing")
}
Expand All @@ -117,17 +116,19 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSourc
}
src.Status.MarkSink(sinkURI)

if val, ok := src.GetLabels()[v1alpha1.KafkaKeyTypeLabel]; ok {
if val, ok := src.GetLabels()[v1beta1.KafkaKeyTypeLabel]; ok {
found := false
for _, allowed := range v1alpha1.KafkaKeyTypeAllowed {
for _, allowed := range v1beta1.KafkaKeyTypeAllowed {
if allowed == val {
found = true
}
}
if !found {
src.Status.MarkResourcesIncorrect("IncorrectKafkaKeyTypeLabel", "Invalid value for %s: %s. Allowed: %v", v1alpha1.KafkaKeyTypeLabel, val, v1alpha1.KafkaKeyTypeAllowed)
logging.FromContext(ctx).Errorf("Invalid value for %s: %s. Allowed: %v", v1alpha1.KafkaKeyTypeLabel, val, v1alpha1.KafkaKeyTypeAllowed)
src.Status.MarkKeyTypeIncorrect("IncorrectKafkaKeyTypeLabel", "Invalid value for %s: %s. Allowed: %v", v1beta1.KafkaKeyTypeLabel, val, v1beta1.KafkaKeyTypeAllowed)
logging.FromContext(ctx).Errorf("Invalid value for %s: %s. Allowed: %v", v1beta1.KafkaKeyTypeLabel, val, v1beta1.KafkaKeyTypeAllowed)
return errors.New("IncorrectKafkaKeyTypeLabel")
} else {
src.Status.MarkKeyTypeCorrect()
}
}

Expand All @@ -151,40 +152,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSourc
return nil
}

func checkResourcesStatus(src *v1alpha1.KafkaSource) error {
for _, rsrc := range []struct {
key string
field string
}{{
key: "Request.CPU",
field: src.Spec.Resources.Requests.ResourceCPU,
}, {
key: "Request.Memory",
field: src.Spec.Resources.Requests.ResourceMemory,
}, {
key: "Limit.CPU",
field: src.Spec.Resources.Limits.ResourceCPU,
}, {
key: "Limit.Memory",
field: src.Spec.Resources.Limits.ResourceMemory,
}} {
// In the event the field isn't specified, we assign a default in the receive_adapter
if rsrc.field != "" {
if _, err := resource.ParseQuantity(rsrc.field); err != nil {
src.Status.MarkResourcesIncorrect("Incorrect Resource", "%s: %s, Error: %s", rsrc.key, rsrc.field, err)
return err
}
}
}
src.Status.MarkResourcesCorrect()
return nil
}

func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.KafkaSource, sinkURI *apis.URL) (*appsv1.Deployment, error) {
if err := checkResourcesStatus(src); err != nil {
return nil, err
}

func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1beta1.KafkaSource, sinkURI *apis.URL) (*appsv1.Deployment, error) {
raArgs := resources.ReceiveAdapterArgs{
Image: r.receiveAdapterImage,
Source: src,
Expand All @@ -195,7 +163,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Kaf
expected := resources.MakeReceiveAdapter(&raArgs)

ra, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).Get(expected.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
if err != nil && apierrors.IsNotFound(err) {
ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected)
if err != nil {
return nil, newDeploymentFailed(ra.Namespace, ra.Name, err)
Expand Down Expand Up @@ -233,14 +201,14 @@ func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool {
return false
}

func (r *Reconciler) createCloudEventAttributes(src *v1alpha1.KafkaSource) []duckv1.CloudEventAttributes {
func (r *Reconciler) createCloudEventAttributes(src *v1beta1.KafkaSource) []duckv1.CloudEventAttributes {
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(src.Spec.Topics))
for i := range src.Spec.Topics {
topics := strings.Split(src.Spec.Topics[i], ",")
for _, topic := range topics {
ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{
Type: v1alpha1.KafkaEventType,
Source: v1alpha1.KafkaEventSource(src.Namespace, src.Name, topic),
Type: v1beta1.KafkaEventType,
Source: v1beta1.KafkaEventSource(src.Namespace, src.Name, topic),
})
}
}
Expand Down
43 changes: 6 additions & 37 deletions kafka/source/pkg/reconciler/source/resources/receive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ import (

v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/kmeta"
)

type ReceiveAdapterArgs struct {
Image string
Source *v1alpha1.KafkaSource
Source *v1beta1.KafkaSource
Labels map[string]string
SinkURI string
AdditionalEnvs []corev1.EnvVar
Expand Down Expand Up @@ -67,7 +66,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
Value: args.Source.Namespace,
}}, args.AdditionalEnvs...)

if val, ok := args.Source.GetLabels()[v1alpha1.KafkaKeyTypeLabel]; ok {
if val, ok := args.Source.GetLabels()[v1beta1.KafkaKeyTypeLabel]; ok {
env = append(env, corev1.EnvVar{
Name: "KEY_TYPE",
Value: val,
Expand All @@ -80,34 +79,6 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_TLS_KEY", args.Source.Spec.Net.TLS.Key.SecretKeyRef)
env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_TLS_CA_CERT", args.Source.Spec.Net.TLS.CACert.SecretKeyRef)

RequestResourceCPU, err := resource.ParseQuantity(args.Source.Spec.Resources.Requests.ResourceCPU)
if err != nil {
RequestResourceCPU = resource.MustParse("250m")
}
RequestResourceMemory, err := resource.ParseQuantity(args.Source.Spec.Resources.Requests.ResourceMemory)
if err != nil {
RequestResourceMemory = resource.MustParse("512Mi")
}
LimitResourceCPU, err := resource.ParseQuantity(args.Source.Spec.Resources.Limits.ResourceCPU)
if err != nil {
LimitResourceCPU = resource.MustParse("250m")
}
LimitResourceMemory, err := resource.ParseQuantity(args.Source.Spec.Resources.Limits.ResourceMemory)
if err != nil {
LimitResourceMemory = resource.MustParse("512Mi")
}

res := corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: RequestResourceCPU,
corev1.ResourceMemory: RequestResourceMemory,
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: LimitResourceCPU,
corev1.ResourceMemory: LimitResourceMemory,
},
}

return &v1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: utils.GenerateFixedName(args.Source, fmt.Sprintf("kafkasource-%s", args.Source.Name)),
Expand All @@ -130,13 +101,11 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
Labels: args.Labels,
},
Spec: corev1.PodSpec{
ServiceAccountName: args.Source.Spec.ServiceAccountName,
Containers: []corev1.Container{
{
Name: "receive-adapter",
Image: args.Image,
Env: env,
Resources: res,
Name: "receive-adapter",
Image: args.Image,
Env: env,
},
},
},
Expand Down
Loading