Skip to content

Commit

Permalink
Add data plane consumer conformance tests (#61)
Browse files Browse the repository at this point in the history
* Add data plane consumer conformance tests

Signed-off-by: Pierangelo Di Pilato <[email protected]>

* Fix equals

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored Jul 27, 2020
1 parent 2bd9dd9 commit deaa219
Show file tree
Hide file tree
Showing 24 changed files with 372 additions and 356 deletions.
48 changes: 27 additions & 21 deletions control-plane/pkg/reconciler/base/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"fmt"

"github.com/gogo/protobuf/proto"
"go.uber.org/multierr"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/util/retry"

coreconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/core/config"
)
Expand Down Expand Up @@ -115,32 +115,39 @@ func (r *Reconciler) UpdateDataPlaneConfigMap(brokersTriggers *coreconfig.Broker

_, err = r.KubeClient.CoreV1().ConfigMaps(configMap.Namespace).Update(configMap)
if err != nil {
return fmt.Errorf("failed to update config map %s/%s: %w", configMap.Namespace, configMap.Name, err)
// Return the same error, so that we can handle conflicting updates.
return err
}

return nil
}

func (r *Reconciler) UpdateDispatcherPodsAnnotation(logger *zap.Logger, volumeGeneration uint64) error {

labelSelector := labels.SelectorFromSet(map[string]string{"app": DispatcherLabel})
pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector)
if errors != nil {
return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.SystemNamespace, errors)
}
return retry.RetryOnConflict(retry.DefaultRetry, func() error {

labelSelector := labels.SelectorFromSet(map[string]string{"app": DispatcherLabel})
pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector)
if errors != nil {
return fmt.Errorf("failed to list dispatcher pods in namespace %s: %w", r.SystemNamespace, errors)
}

return r.updatePodsAnnotation(logger, volumeGeneration, pods)
return r.updatePodsAnnotation(logger, volumeGeneration, pods)
})
}

func (r *Reconciler) UpdateReceiverPodsAnnotation(logger *zap.Logger, volumeGeneration uint64) error {

labelSelector := labels.SelectorFromSet(map[string]string{"app": ReceiverLabel})
pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector)
if errors != nil {
return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.SystemNamespace, errors)
}
return retry.RetryOnConflict(retry.DefaultRetry, func() error {

return r.updatePodsAnnotation(logger, volumeGeneration, pods)
labelSelector := labels.SelectorFromSet(map[string]string{"app": ReceiverLabel})
pods, errors := r.PodLister.Pods(r.SystemNamespace).List(labelSelector)
if errors != nil {
return fmt.Errorf("failed to list receiver pods in namespace %s: %w", r.SystemNamespace, errors)
}

return r.updatePodsAnnotation(logger, volumeGeneration, pods)
})
}

func (r *Reconciler) updatePodsAnnotation(logger *zap.Logger, volumeGeneration uint64, pods []*corev1.Pod) error {
Expand All @@ -167,14 +174,13 @@ func (r *Reconciler) updatePodsAnnotation(logger *zap.Logger, volumeGeneration u
pod.SetAnnotations(annotations)

if _, err := r.KubeClient.CoreV1().Pods(pod.Namespace).Update(pod); err != nil {

errors = multierr.Append(errors, fmt.Errorf(
"failed to update pod %s/%s: %w",
pod.Namespace,
pod.Name,
err,
))
// Return the same error, so that we can handle conflicting updates.
return err
}
}
return errors
}

func (r *Reconciler) HandleConflicts(f func() error) error {
return retry.RetryOnConflict(retry.DefaultRetry, f)
}
23 changes: 19 additions & 4 deletions control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/Shopify/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/retry"
eventing "knative.dev/eventing/pkg/apis/eventing/v1beta1"
"knative.dev/eventing/pkg/logging"
"knative.dev/pkg/configmap"
Expand Down Expand Up @@ -61,6 +62,12 @@ type Reconciler struct {
}

func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
return r.reconcileKind(ctx, broker)
})
}

func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker) reconciler.Event {

logger := log.Logger(ctx, "broker", broker)

Expand Down Expand Up @@ -123,7 +130,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker)

// Update the configuration map with the new brokersTriggers data.
if err := r.UpdateDataPlaneConfigMap(brokersTriggers, brokersTriggersConfigMap); err != nil {
return statusConditionManager.failedToUpdateBrokersTriggersConfigMap(err)
return err
}
statusConditionManager.brokersTriggersConfigMapUpdated()

Expand All @@ -137,7 +144,7 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker)

// Update volume generation annotation of receiver pods
if err := r.UpdateReceiverPodsAnnotation(logger, brokersTriggers.VolumeGeneration); err != nil {
return statusConditionManager.failedToUpdateReceiverPodsAnnotation(err)
return err
}

logger.Debug("Updated receiver pod annotation")
Expand All @@ -161,6 +168,13 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, broker *eventing.Broker)
}

func (r *Reconciler) FinalizeKind(ctx context.Context, broker *eventing.Broker) reconciler.Event {
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
return r.finalizeKind(ctx, broker)
})
}

func (r *Reconciler) finalizeKind(ctx context.Context, broker *eventing.Broker) reconciler.Event {

logger := log.Logger(ctx, "broker", broker)

logger.Debug("Finalizing broker")
Expand Down Expand Up @@ -192,7 +206,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, broker *eventing.Broker)

// Update the configuration map with the new brokersTriggers data.
if err := r.UpdateDataPlaneConfigMap(brokersTriggers, brokersTriggersConfigMap); err != nil {
return fmt.Errorf("failed to update configuration map: %w", err)
return err
}

logger.Debug("Brokers and triggers config map updated")
Expand All @@ -208,7 +222,7 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, broker *eventing.Broker)

logger.Debug("Topic deleted", zap.String("topic", topic))

return reconciledNormal(broker.Namespace, broker.Name)
return nil
}

func incrementVolumeGeneration(generation uint64) uint64 {
Expand Down Expand Up @@ -289,6 +303,7 @@ func (r *Reconciler) SetBootstrapServers(servers string) error {
config := sarama.NewConfig()
config.Version = sarama.MaxVersion
config.Net.KeepAlive = time.Second * 60
config.Metadata.RefreshFrequency = time.Minute

kafkaClusterAdmin, err := NewClusterAdmin(addrs, config)
if err != nil {
Expand Down
10 changes: 1 addition & 9 deletions control-plane/pkg/reconciler/broker/broker_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (manager *statusConditionManager) reconciled() reconciler.Event {
}
broker.GetConditionSet().Manage(&broker.Status).MarkTrue(ConditionAddressable)

return reconciledNormal(broker.Namespace, broker.Name)
return nil
}

func (manager *statusConditionManager) failedToUpdateDispatcherPodsAnnotation(err error) {
Expand All @@ -161,11 +161,3 @@ func (manager *statusConditionManager) failedToGetBrokerConfig(err error) reconc

return fmt.Errorf("failed to get broker configuration: %w", err)
}

func reconciledNormal(namespace, name string) reconciler.Event {
return reconciler.NewEvent(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, namespace, name),
)
}
70 changes: 0 additions & 70 deletions control-plane/pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &coreconfig.Brokers{
Expand Down Expand Up @@ -164,11 +159,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &coreconfig.Brokers{
Expand Down Expand Up @@ -285,11 +275,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &coreconfig.Brokers{
Expand Down Expand Up @@ -355,11 +340,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &coreconfig.Brokers{
Expand Down Expand Up @@ -447,11 +427,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &coreconfig.Brokers{
Expand Down Expand Up @@ -539,11 +514,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &coreconfig.Brokers{
Expand Down Expand Up @@ -621,11 +591,6 @@ func brokerReconciliation(t *testing.T, format string, configs Configs) {
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &coreconfig.Brokers{
Expand Down Expand Up @@ -771,13 +736,6 @@ func brokerFinalization(t *testing.T, format string, configs Configs) {
}, &configs),
},
Key: testKey,
WantEvents: []string{
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &coreconfig.Brokers{
Broker: []*coreconfig.Broker{},
Expand Down Expand Up @@ -805,13 +763,6 @@ func brokerFinalization(t *testing.T, format string, configs Configs) {
}, &configs),
},
Key: testKey,
WantEvents: []string{
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &coreconfig.Brokers{
VolumeGeneration: 1,
Expand Down Expand Up @@ -912,13 +863,6 @@ func brokerFinalization(t *testing.T, format string, configs Configs) {
}, &configs),
},
Key: testKey,
WantEvents: []string{
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &coreconfig.Brokers{
Broker: []*coreconfig.Broker{
Expand Down Expand Up @@ -953,13 +897,6 @@ func brokerFinalization(t *testing.T, format string, configs Configs) {
}, &configs),
},
Key: testKey,
WantEvents: []string{
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(&configs, &coreconfig.Brokers{
Broker: []*coreconfig.Broker{
Expand Down Expand Up @@ -992,13 +929,6 @@ func brokerFinalization(t *testing.T, format string, configs Configs) {
}, &configs),
},
Key: testKey,
WantEvents: []string{
Eventf(
corev1.EventTypeNormal,
Reconciled,
fmt.Sprintf(`%s reconciled: "%s/%s"`, Broker, BrokerNamespace, BrokerName),
),
},
},
}

Expand Down
Loading

0 comments on commit deaa219

Please sign in to comment.