Skip to content
This repository has been archived by the owner on Jun 4, 2021. It is now read-only.

Commit

Permalink
Kafkasource v1beta1 reconciler (#1405)
Browse files Browse the repository at this point in the history
* Search & replace v1alpha1 of Kafka source and binding to v1beta1 - doesn't compile yet

* Use v1beta1 of KafkaSource and KafkaBinding in reconciler
  • Loading branch information
aliok authored Jul 27, 2020
1 parent 5b4e7dd commit f1f0b1e
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 152 deletions.
30 changes: 15 additions & 15 deletions kafka/source/pkg/adapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (

"knative.dev/eventing/pkg/kncloudevents"

sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
sourcesv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1"
)

func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
Expand Down Expand Up @@ -63,8 +63,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
},
Expand All @@ -85,8 +85,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "-16771305",
},
Expand All @@ -108,8 +108,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "0.00000000000000000000000000000000000002536316309005082",
},
Expand All @@ -131,8 +131,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "AQoXFw==",
},
Expand Down Expand Up @@ -162,8 +162,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
"ce-kafkaheaderhello": "world",
Expand Down Expand Up @@ -194,8 +194,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
"ce-kafkaheaderhellobla": "world",
Expand Down Expand Up @@ -306,8 +306,8 @@ func TestPostMessage_ServeHTTP_binary_mode(t *testing.T) {
"ce-specversion": "1.0",
"ce-id": makeEventId(1, 2),
"ce-time": types.FormatTime(aTimestamp),
"ce-type": sourcesv1alpha1.KafkaEventType,
"ce-source": sourcesv1alpha1.KafkaEventSource("test", "test", "topic1"),
"ce-type": sourcesv1beta1.KafkaEventType,
"ce-source": sourcesv1beta1.KafkaEventSource("test", "test", "topic1"),
"ce-subject": makeEventSubject(1, 2),
"ce-key": "key",
},
Expand Down
6 changes: 3 additions & 3 deletions kafka/source/pkg/adapter/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"go.opencensus.io/trace"
"go.uber.org/zap"

sourcesv1alpha1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
sourcesv1beta1 "knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1"
)

func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, span *trace.Span, cm *sarama.ConsumerMessage, req *nethttp.Request, logger *zap.Logger) error {
Expand Down Expand Up @@ -61,8 +61,8 @@ func (a *Adapter) ConsumerMessageToHttpRequest(ctx context.Context, span *trace.

event.SetID(makeEventId(cm.Partition, cm.Offset))
event.SetTime(cm.Timestamp)
event.SetType(sourcesv1alpha1.KafkaEventType)
event.SetSource(sourcesv1alpha1.KafkaEventSource(a.config.Namespace, a.config.Name, cm.Topic))
event.SetType(sourcesv1beta1.KafkaEventType)
event.SetSource(sourcesv1beta1.KafkaEventSource(a.config.Namespace, a.config.Name, cm.Topic))
event.SetSubject(makeEventSubject(cm.Partition, cm.Offset))

dumpKafkaMetaToEvent(&event, a.keyTypeMapper, cm.Key, kafkaMsg)
Expand Down
14 changes: 7 additions & 7 deletions kafka/source/pkg/apis/sources/v1beta1/kafka_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ const (
// KafkaConditionDeployed has status True when the KafkaSource has had it's receive adapter deployment created.
KafkaConditionDeployed apis.ConditionType = "Deployed"

// KafkaConditionResources is True when the resources listed for the KafkaSource have been properly
// parsed and match specified syntax for resource quantities
KafkaConditionResources apis.ConditionType = "ResourcesCorrect"
// KafkaConditionKeyType is True when the KafkaSource has been configured with valid key type for
// the key deserializer.
KafkaConditionKeyType apis.ConditionType = "KeyTypeCorrect"
)

var KafkaSourceCondSet = apis.NewLivingConditionSet(
Expand Down Expand Up @@ -106,10 +106,10 @@ func (s *KafkaSourceStatus) MarkNotDeployed(reason, messageFormat string, messag
KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionDeployed, reason, messageFormat, messageA...)
}

func (s *KafkaSourceStatus) MarkResourcesCorrect() {
KafkaSourceCondSet.Manage(s).MarkTrue(KafkaConditionResources)
func (s *KafkaSourceStatus) MarkKeyTypeCorrect() {
KafkaSourceCondSet.Manage(s).MarkTrue(KafkaConditionKeyType)
}

func (s *KafkaSourceStatus) MarkResourcesIncorrect(reason, messageFormat string, messageA ...interface{}) {
KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionResources, reason, messageFormat, messageA...)
func (s *KafkaSourceStatus) MarkKeyTypeIncorrect(reason, messageFormat string, messageA ...interface{}) {
KafkaSourceCondSet.Manage(s).MarkFalse(KafkaConditionKeyType, reason, messageFormat, messageA...)
}
6 changes: 3 additions & 3 deletions kafka/source/pkg/reconciler/binding/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package binding
import (
"context"

kfkinformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/bindings/v1alpha1/kafkabinding"
kfkinformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/bindings/v1beta1/kafkabinding"
"knative.dev/pkg/client/injection/ducks/duck/v1/podspecable"
"knative.dev/pkg/client/injection/kube/informers/core/v1/namespace"
"knative.dev/pkg/reconciler"
Expand All @@ -30,7 +30,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1alpha1"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/bindings/v1beta1"
"knative.dev/pkg/apis/duck"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -72,7 +72,7 @@ func NewController(
return nil
},
},
GVR: v1alpha1.SchemeGroupVersion.WithResource("kafkabindings"),
GVR: v1beta1.SchemeGroupVersion.WithResource("kafkabindings"),
Get: func(namespace string, name string) (psbinding.Bindable, error) {
return kfkInformer.Lister().KafkaBindings(namespace).Get(name)
},
Expand Down
4 changes: 2 additions & 2 deletions kafka/source/pkg/reconciler/source/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ import (
"knative.dev/pkg/resolver"

kafkaclient "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/client"
kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1alpha1/kafkasource"
"knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1alpha1/kafkasource"
kafkainformer "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/informers/sources/v1beta1/kafkasource"
"knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1beta1/kafkasource"
)

func NewController(
Expand Down
64 changes: 16 additions & 48 deletions kafka/source/pkg/reconciler/source/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,12 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing/pkg/reconciler/source"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"

"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1"
"knative.dev/eventing-contrib/kafka/source/pkg/reconciler/source/resources"

"k8s.io/client-go/kubernetes"
Expand All @@ -43,8 +42,8 @@ import (
"knative.dev/pkg/resolver"

"knative.dev/eventing-contrib/kafka/source/pkg/client/clientset/versioned"
reconcilerkafkasource "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1alpha1/kafkasource"
listers "knative.dev/eventing-contrib/kafka/source/pkg/client/listers/sources/v1alpha1"
reconcilerkafkasource "knative.dev/eventing-contrib/kafka/source/pkg/client/injection/reconciler/sources/v1beta1/kafkasource"
listers "knative.dev/eventing-contrib/kafka/source/pkg/client/listers/sources/v1beta1"
)

const (
Expand Down Expand Up @@ -93,10 +92,10 @@ type Reconciler struct {
// Check that our Reconciler implements Interface
var _ reconcilerkafkasource.Interface = (*Reconciler)(nil)

func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSource) pkgreconciler.Event {
func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1beta1.KafkaSource) pkgreconciler.Event {
src.Status.InitializeConditions()

if src.Spec.Sink == nil {
if (src.Spec.Sink == duckv1.Destination{}) {
src.Status.MarkNoSink("SinkMissing", "")
return fmt.Errorf("spec.sink missing")
}
Expand All @@ -117,17 +116,19 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSourc
}
src.Status.MarkSink(sinkURI)

if val, ok := src.GetLabels()[v1alpha1.KafkaKeyTypeLabel]; ok {
if val, ok := src.GetLabels()[v1beta1.KafkaKeyTypeLabel]; ok {
found := false
for _, allowed := range v1alpha1.KafkaKeyTypeAllowed {
for _, allowed := range v1beta1.KafkaKeyTypeAllowed {
if allowed == val {
found = true
}
}
if !found {
src.Status.MarkResourcesIncorrect("IncorrectKafkaKeyTypeLabel", "Invalid value for %s: %s. Allowed: %v", v1alpha1.KafkaKeyTypeLabel, val, v1alpha1.KafkaKeyTypeAllowed)
logging.FromContext(ctx).Errorf("Invalid value for %s: %s. Allowed: %v", v1alpha1.KafkaKeyTypeLabel, val, v1alpha1.KafkaKeyTypeAllowed)
src.Status.MarkKeyTypeIncorrect("IncorrectKafkaKeyTypeLabel", "Invalid value for %s: %s. Allowed: %v", v1beta1.KafkaKeyTypeLabel, val, v1beta1.KafkaKeyTypeAllowed)
logging.FromContext(ctx).Errorf("Invalid value for %s: %s. Allowed: %v", v1beta1.KafkaKeyTypeLabel, val, v1beta1.KafkaKeyTypeAllowed)
return errors.New("IncorrectKafkaKeyTypeLabel")
} else {
src.Status.MarkKeyTypeCorrect()
}
}

Expand All @@ -151,40 +152,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, src *v1alpha1.KafkaSourc
return nil
}

func checkResourcesStatus(src *v1alpha1.KafkaSource) error {
for _, rsrc := range []struct {
key string
field string
}{{
key: "Request.CPU",
field: src.Spec.Resources.Requests.ResourceCPU,
}, {
key: "Request.Memory",
field: src.Spec.Resources.Requests.ResourceMemory,
}, {
key: "Limit.CPU",
field: src.Spec.Resources.Limits.ResourceCPU,
}, {
key: "Limit.Memory",
field: src.Spec.Resources.Limits.ResourceMemory,
}} {
// In the event the field isn't specified, we assign a default in the receive_adapter
if rsrc.field != "" {
if _, err := resource.ParseQuantity(rsrc.field); err != nil {
src.Status.MarkResourcesIncorrect("Incorrect Resource", "%s: %s, Error: %s", rsrc.key, rsrc.field, err)
return err
}
}
}
src.Status.MarkResourcesCorrect()
return nil
}

func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.KafkaSource, sinkURI *apis.URL) (*appsv1.Deployment, error) {
if err := checkResourcesStatus(src); err != nil {
return nil, err
}

func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1beta1.KafkaSource, sinkURI *apis.URL) (*appsv1.Deployment, error) {
raArgs := resources.ReceiveAdapterArgs{
Image: r.receiveAdapterImage,
Source: src,
Expand All @@ -195,7 +163,7 @@ func (r *Reconciler) createReceiveAdapter(ctx context.Context, src *v1alpha1.Kaf
expected := resources.MakeReceiveAdapter(&raArgs)

ra, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).Get(expected.Name, metav1.GetOptions{})
if apierrors.IsNotFound(err) {
if err != nil && apierrors.IsNotFound(err) {
ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected)
if err != nil {
return nil, newDeploymentFailed(ra.Namespace, ra.Name, err)
Expand Down Expand Up @@ -233,14 +201,14 @@ func podSpecChanged(oldPodSpec corev1.PodSpec, newPodSpec corev1.PodSpec) bool {
return false
}

func (r *Reconciler) createCloudEventAttributes(src *v1alpha1.KafkaSource) []duckv1.CloudEventAttributes {
func (r *Reconciler) createCloudEventAttributes(src *v1beta1.KafkaSource) []duckv1.CloudEventAttributes {
ceAttributes := make([]duckv1.CloudEventAttributes, 0, len(src.Spec.Topics))
for i := range src.Spec.Topics {
topics := strings.Split(src.Spec.Topics[i], ",")
for _, topic := range topics {
ceAttributes = append(ceAttributes, duckv1.CloudEventAttributes{
Type: v1alpha1.KafkaEventType,
Source: v1alpha1.KafkaEventSource(src.Namespace, src.Name, topic),
Type: v1beta1.KafkaEventType,
Source: v1beta1.KafkaEventSource(src.Namespace, src.Name, topic),
})
}
}
Expand Down
43 changes: 6 additions & 37 deletions kafka/source/pkg/reconciler/source/resources/receive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ import (

v1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1alpha1"
"knative.dev/eventing-contrib/kafka/source/pkg/apis/sources/v1beta1"
"knative.dev/eventing/pkg/utils"
"knative.dev/pkg/kmeta"
)

type ReceiveAdapterArgs struct {
Image string
Source *v1alpha1.KafkaSource
Source *v1beta1.KafkaSource
Labels map[string]string
SinkURI string
AdditionalEnvs []corev1.EnvVar
Expand Down Expand Up @@ -67,7 +66,7 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
Value: args.Source.Namespace,
}}, args.AdditionalEnvs...)

if val, ok := args.Source.GetLabels()[v1alpha1.KafkaKeyTypeLabel]; ok {
if val, ok := args.Source.GetLabels()[v1beta1.KafkaKeyTypeLabel]; ok {
env = append(env, corev1.EnvVar{
Name: "KEY_TYPE",
Value: val,
Expand All @@ -80,34 +79,6 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_TLS_KEY", args.Source.Spec.Net.TLS.Key.SecretKeyRef)
env = appendEnvFromSecretKeyRef(env, "KAFKA_NET_TLS_CA_CERT", args.Source.Spec.Net.TLS.CACert.SecretKeyRef)

RequestResourceCPU, err := resource.ParseQuantity(args.Source.Spec.Resources.Requests.ResourceCPU)
if err != nil {
RequestResourceCPU = resource.MustParse("250m")
}
RequestResourceMemory, err := resource.ParseQuantity(args.Source.Spec.Resources.Requests.ResourceMemory)
if err != nil {
RequestResourceMemory = resource.MustParse("512Mi")
}
LimitResourceCPU, err := resource.ParseQuantity(args.Source.Spec.Resources.Limits.ResourceCPU)
if err != nil {
LimitResourceCPU = resource.MustParse("250m")
}
LimitResourceMemory, err := resource.ParseQuantity(args.Source.Spec.Resources.Limits.ResourceMemory)
if err != nil {
LimitResourceMemory = resource.MustParse("512Mi")
}

res := corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: RequestResourceCPU,
corev1.ResourceMemory: RequestResourceMemory,
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: LimitResourceCPU,
corev1.ResourceMemory: LimitResourceMemory,
},
}

return &v1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: utils.GenerateFixedName(args.Source, fmt.Sprintf("kafkasource-%s", args.Source.Name)),
Expand All @@ -130,13 +101,11 @@ func MakeReceiveAdapter(args *ReceiveAdapterArgs) *v1.Deployment {
Labels: args.Labels,
},
Spec: corev1.PodSpec{
ServiceAccountName: args.Source.Spec.ServiceAccountName,
Containers: []corev1.Container{
{
Name: "receive-adapter",
Image: args.Image,
Env: env,
Resources: res,
Name: "receive-adapter",
Image: args.Image,
Env: env,
},
},
},
Expand Down
Loading

0 comments on commit f1f0b1e

Please sign in to comment.